diff --git a/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts b/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts index 971c21e..322c11a 100644 --- a/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts +++ b/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts @@ -19,14 +19,52 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple private referenceTime: Date | null = null, ) { super(redisClient); + + this.setReferenceTime = this.setReferenceTime.bind(this); + this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this); + this.startListeningForUpdates = this.startListeningForUpdates.bind(this); + this.handleShuttleWillArriveAtStop = this.handleShuttleWillArriveAtStop.bind(this); + this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this); + } + + private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => { + return `shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`; } setReferenceTime(referenceTime: Date) { this.referenceTime = referenceTime; } - getAverageTravelTimeSeconds(identifier: ShuttleTravelTimeDataIdentifier, dateFilter: ShuttleTravelTimeDateFilterArguments): Promise { - throw new Error("Method not implemented."); + public async getAverageTravelTimeSeconds( + { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, + { from, to }: ShuttleTravelTimeDateFilterArguments + ): Promise { + const timeSeriesKey = this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId); + const fromTimestamp = from.getTime(); + const toTimestamp = to.getTime(); + const intervalMs = toTimestamp - fromTimestamp + 1; + + try { + const aggregationResult = await this.redisClient.sendCommand([ + 'TS.RANGE', + timeSeriesKey, + fromTimestamp.toString(), + toTimestamp.toString(), + 'AGGREGATION', + 'AVG', + intervalMs.toString() + ]) as [string, string][]; + + if (aggregationResult && aggregationResult.length > 0) { + const [, averageValue] = aggregationResult[0]; + return parseFloat(averageValue); + } + + return; + } catch (error) { + console.warn(`Failed to get average travel time: ${error instanceof Error ? error.message : String(error)}`); + return; + } } startListeningForUpdates() {