import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository"; import { BaseRedisETARepository } from "./BaseRedisETARepository"; import { createClient, RedisClientType } from "redis"; import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments } from "../ShuttleGetterRepository"; import { REDIS_RECONNECT_INTERVAL } from "../../../environment"; import { IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities"; export class RedisSelfUpdatingETARepository extends BaseRedisETARepository implements SelfUpdatingETARepository { constructor( readonly shuttleRepository: ShuttleGetterRepository, redisClient: RedisClientType = createClient({ url: process.env.REDIS_URL, socket: { tls: process.env.NODE_ENV === 'production', rejectUnauthorized: false, reconnectStrategy: REDIS_RECONNECT_INTERVAL, }, }), private referenceTime: Date | null = null, ) { super(redisClient); this.setReferenceTime = this.setReferenceTime.bind(this); this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this); this.startListeningForUpdates = this.startListeningForUpdates.bind(this); this.handleShuttleWillArriveAtStop = this.handleShuttleWillArriveAtStop.bind(this); this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this); } private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => { return `shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`; } setReferenceTime(referenceTime: Date) { this.referenceTime = referenceTime; } public async getAverageTravelTimeSeconds( { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, { from, to }: ShuttleTravelTimeDateFilterArguments ): Promise { const timeSeriesKey = this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId); const fromTimestamp = from.getTime(); const toTimestamp = to.getTime(); const intervalMs = toTimestamp - fromTimestamp + 1; try { const aggregationResult = await this.redisClient.sendCommand([ 'TS.RANGE', timeSeriesKey, fromTimestamp.toString(), toTimestamp.toString(), 'AGGREGATION', 'AVG', intervalMs.toString() ]) as [string, string][]; if (aggregationResult && aggregationResult.length > 0) { const [, averageValue] = aggregationResult[0]; return parseFloat(averageValue); } return; } catch (error) { console.warn(`Failed to get average travel time: ${error instanceof Error ? error.message : String(error)}`); return; } } startListeningForUpdates() { this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop) } stopListeningForUpdates() { this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop); } private async getAverageTravelTimeSecondsWithFallbacks( identifier: ShuttleTravelTimeDataIdentifier, dateFilters: ShuttleTravelTimeDateFilterArguments[] ): Promise { for (const dateFilter of dateFilters) { const result = await this.getAverageTravelTimeSeconds(identifier, dateFilter); if (result !== undefined) { return result; } } return undefined; } private async handleShuttleUpdate(shuttle: IShuttle) { const lastStop = await this.shuttleRepository.getShuttleLastStopArrival(shuttle.id); if (!lastStop) return; const lastOrderedStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId); await this.updateCascadingEta({ shuttle, currentStop: lastOrderedStop, originalStopArrival: lastStop, }); } private async updateCascadingEta({ shuttle, currentStop, originalStopArrival, runningTravelTimeSeconds = 0 }: { shuttle: IShuttle; currentStop: IOrderedStop | null; originalStopArrival: ShuttleStopArrival; runningTravelTimeSeconds?: number; }) { if (!currentStop) return; const nextStop = currentStop?.nextStop; if (!nextStop) return; // In case the system we have loops around if (nextStop.stopId === originalStopArrival.stopId) return; let referenceCurrentTime = new Date(); if (this.referenceTime != null) { referenceCurrentTime = this.referenceTime; } const oneWeekAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 7 * 1000)); const oneDayAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 1000)); const oneHourAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 1000)); const travelTimeSeconds = await this.getAverageTravelTimeSecondsWithFallbacks({ routeId: shuttle.routeId, fromStopId: currentStop.stopId, toStopId: nextStop.stopId, }, [ { from: oneWeekAgo, to: new Date(oneWeekAgo.getTime() + (60 * 60 * 1000)) }, { from: oneDayAgo, to: new Date(oneDayAgo.getTime() + (60 * 60 * 1000)) }, { from: oneHourAgo, to: new Date(), } ]); if (travelTimeSeconds == undefined) return; const elapsedTimeMs = referenceCurrentTime.getTime() - originalStopArrival.timestamp.getTime(); const predictedTimeSeconds = travelTimeSeconds - (elapsedTimeMs / 1000) + runningTravelTimeSeconds; await this.addOrUpdateEta({ secondsRemaining: predictedTimeSeconds, shuttleId: shuttle.id, stopId: nextStop.stopId, systemId: nextStop.systemId, updatedTime: new Date(), }); const nextStopWithNextNextStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, nextStop.stopId); await this.updateCascadingEta( { shuttle, currentStop: nextStopWithNextNextStop, originalStopArrival, runningTravelTimeSeconds: runningTravelTimeSeconds + travelTimeSeconds, }, ) } private async handleShuttleWillArriveAtStop( shuttleArrival: ShuttleStopArrival, ) { const lastStopTimestamp = await this.shuttleRepository.getShuttleLastStopArrival(shuttleArrival.shuttleId); if (lastStopTimestamp) { // disallow cases where this gets triggered multiple times if (lastStopTimestamp.stopId === shuttleArrival.stopId) return; const shuttle = await this.shuttleRepository.getShuttleById(lastStopTimestamp.shuttleId); if (!shuttle) return; const routeId = shuttle.routeId; const fromStopId = lastStopTimestamp.stopId; const toStopId = shuttleArrival.stopId; const travelTimeSeconds = (shuttleArrival.timestamp.getTime() - lastStopTimestamp.timestamp.getTime()) / 1000; await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId, }, travelTimeSeconds, shuttleArrival.timestamp.getTime()); } } public async addTravelTimeDataPoint( { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, travelTimeSeconds: number, timestamp = Date.now(), ): Promise { const historicalEtaTimeSeriesKey = this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId); try { await this.redisClient.sendCommand([ 'TS.ADD', historicalEtaTimeSeriesKey, timestamp.toString(), travelTimeSeconds.toString(), 'LABELS', 'routeId', routeId, 'fromStopId', fromStopId, 'toStopId', toStopId ]); } catch (error) { await this.createHistoricalEtaTimeSeriesAndAddDataPoint( historicalEtaTimeSeriesKey, timestamp, travelTimeSeconds, routeId, fromStopId, toStopId ); } } private async createHistoricalEtaTimeSeriesAndAddDataPoint( timeSeriesKey: string, timestamp: number, travelTimeSeconds: number, routeId: string, fromStopId: string, toStopId: string, ): Promise { try { await this.redisClient.sendCommand([ 'TS.CREATE', timeSeriesKey, 'RETENTION', '2678400000', // one month in milliseconds 'LABELS', 'routeId', routeId, 'fromStopId', fromStopId, 'toStopId', toStopId ]); await this.redisClient.sendCommand([ 'TS.ADD', timeSeriesKey, timestamp.toString(), travelTimeSeconds.toString() ]); } catch (createError) { await this.redisClient.sendCommand([ 'TS.ADD', timeSeriesKey, timestamp.toString(), travelTimeSeconds.toString() ]); } } }