Implement travel time data point saving and loading, and tests

This commit is contained in:
2025-11-10 16:46:34 -08:00
parent bd1ae07662
commit d92db84738
2 changed files with 249 additions and 59 deletions

View File

@@ -15,6 +15,17 @@ export interface ShuttleStopArrival {
timestamp: Date;
}
export interface ShuttleTravelTimeDataIdentifier {
routeId: string;
fromStopId: string;
toStopId: string;
}
export interface ShuttleTravelTimeDateFilterArguments {
from: Date;
to: Date;
}
export class RedisShuttleRepository extends EventEmitter implements ShuttleGetterSetterRepository {
protected redisClient;
@@ -106,6 +117,9 @@ export class RedisShuttleRepository extends EventEmitter implements ShuttleGette
private createEtaKey = (shuttleId: string, stopId: string) => `shuttle:eta:${shuttleId}:${stopId}`;
private createOrderedStopKey = (routeId: string, stopId: string) => `shuttle:orderedstop:${routeId}:${stopId}`;
private createShuttleLastStopKey = (shuttleId: string) => `shuttle:laststop:${shuttleId}`;
private createHistoricalEtaTimeSeriesKey =(routeId: string, fromStopId: string, toStopId: string) => {
return `shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`;
}
// Helper methods for converting entities to Redis hashes
private createRedisHashFromStop = (stop: IStop): Record<string, string> => ({
@@ -408,36 +422,153 @@ export class RedisShuttleRepository extends EventEmitter implements ShuttleGette
await this.redisClient.hSet(key, this.createRedisHashFromRoute(route));
}
public async addOrUpdateShuttle(shuttle: IShuttle): Promise<void> {
public async addOrUpdateShuttle(
shuttle: IShuttle,
travelTimeTimestamp = Date.now(),
): Promise<void> {
const key = this.createShuttleKey(shuttle.id);
await this.redisClient.hSet(key, this.createRedisHashFromShuttle(shuttle));
await this.updateHistoricalEtasForShuttle(shuttle);
await this.updateHistoricalEtasForShuttle(shuttle, travelTimeTimestamp);
await this.updateEtasBasedOnHistoricalData(shuttle);
}
private async updateHistoricalEtasForShuttle(shuttle: IShuttle) {
public async updateEtasBasedOnHistoricalData(
shuttle: IShuttle,
) {
// Based on historical data for the key provided by this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId);
// Call this.addOrUpdateEta with the correct information
// "Historical data" being averaged time taken for the current hour of the same day
// in the past week, based on the reference time
}
private async updateHistoricalEtasForShuttle(
shuttle: IShuttle,
travelTimeTimestamp = Date.now(),
) {
const arrivedStop = await this.getArrivedStopIfExists(shuttle);
if (arrivedStop != undefined) {
const lastStopTimestamp = await this.getShuttleLastStopArrival(shuttle)
if (lastStopTimestamp != undefined) {
const now = Date();
const routeId = shuttle.routeId
const routeId = shuttle.routeId;
const fromStopId = lastStopTimestamp.stopId;
const toStopId = arrivedStop.id;
// Create an entry in Redis time series
// Key: routeId:fromStopId:toStopId:
// Value: seconds it took to get from lastStopTimestamp.stopId to arrivedStop.id
const travelTimeSeconds = (travelTimeTimestamp - lastStopTimestamp.timestamp.getTime()) / 1000;
await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId, }, travelTimeSeconds, travelTimeTimestamp);
}
await this.updateShuttleLastStopArrival(shuttle, {
stopId: arrivedStop.id,
timestamp: new Date(),
timestamp: new Date(travelTimeTimestamp),
})
}
}
public async getAverageTravelTimeSeconds(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
{ from, to }: ShuttleTravelTimeDateFilterArguments,
): Promise<number> {
const timeSeriesKey = this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId);
const fromTimestamp = from.getTime();
const toTimestamp = to.getTime();
const intervalMs = toTimestamp - fromTimestamp;
try {
const aggregationResult = await this.redisClient.sendCommand([
'TS.RANGE',
timeSeriesKey,
fromTimestamp.toString(),
toTimestamp.toString(),
'AGGREGATION',
'AVG',
intervalMs.toString()
]) as [string, string][];
if (aggregationResult && aggregationResult.length > 0) {
const [, averageValue] = aggregationResult[0];
return parseFloat(averageValue);
}
throw new Error(`No historical data found for route ${routeId} from stop ${fromStopId} to stop ${toStopId}`);
} catch (error) {
throw new Error(`Failed to get average travel time: ${error instanceof Error ? error.message : String(error)}`);
}
}
private async addTravelTimeDataPoint(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
travelTimeSeconds: number,
timestamp = Date.now(),
): Promise<void> {
const historicalEtaTimeSeriesKey = this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId);
try {
await this.redisClient.sendCommand([
'TS.ADD',
historicalEtaTimeSeriesKey,
timestamp.toString(),
travelTimeSeconds.toString(),
'LABELS',
'routeId',
routeId,
'fromStopId',
fromStopId,
'toStopId',
toStopId
]);
} catch (error) {
await this.createHistoricalEtaTimeSeriesAndAddDataPoint(
historicalEtaTimeSeriesKey,
timestamp,
travelTimeSeconds,
routeId,
fromStopId,
toStopId
);
}
}
private async createHistoricalEtaTimeSeriesAndAddDataPoint(
timeSeriesKey: string,
timestamp: number,
travelTimeSeconds: number,
routeId: string,
fromStopId: string,
toStopId: string,
): Promise<void> {
try {
await this.redisClient.sendCommand([
'TS.CREATE',
timeSeriesKey,
'RETENTION',
'2678400000', // one month in milliseconds
'LABELS',
'routeId',
routeId,
'fromStopId',
fromStopId,
'toStopId',
toStopId
]);
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
timestamp.toString(),
travelTimeSeconds.toString()
]);
} catch (createError) {
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
timestamp.toString(),
travelTimeSeconds.toString()
]);
}
}
public async getArrivedStopIfExists(
shuttle: IShuttle,
delta = 0.001,