import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository"; import { BaseRedisETARepository } from "./BaseRedisETARepository"; import { createClient, RedisClientType } from "redis"; import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments } from "../ShuttleGetterRepository"; import { REDIS_RECONNECT_INTERVAL } from "../../../environment"; import { IShuttle } from "../../../entities/ShuttleRepositoryEntities"; export class RedisSelfUpdatingETARepository extends BaseRedisETARepository implements SelfUpdatingETARepository { constructor( readonly shuttleRepository: ShuttleGetterRepository, redisClient: RedisClientType = createClient({ url: process.env.REDIS_URL, socket: { tls: process.env.NODE_ENV === 'production', rejectUnauthorized: false, reconnectStrategy: REDIS_RECONNECT_INTERVAL, }, }), private referenceTime: Date | null = null, ) { super(redisClient); } setReferenceTime(referenceTime: Date) { this.referenceTime = referenceTime; } getAverageTravelTimeSeconds(identifier: ShuttleTravelTimeDataIdentifier, dateFilter: ShuttleTravelTimeDateFilterArguments): Promise { throw new Error("Method not implemented."); } startListeningForUpdates() { this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop) } stopListeningForUpdates() { this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop); } private handleShuttleUpdate(shuttle: IShuttle) { // TODO: handle shuttle update } private async handleShuttleWillArriveAtStop( shuttleArrival: ShuttleStopArrival, ) { const lastStop = await this.shuttleRepository.getShuttleLastStopArrival(shuttleArrival.shuttleId); if (lastStop == undefined) return; const shuttle = await this.shuttleRepository.getShuttleById(shuttleArrival.shuttleId); if (shuttle == null) return; let referenceCurrentTime = new Date(); if (this.referenceTime != null) { referenceCurrentTime = this.referenceTime; } const oneWeekAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 7 * 1000)); const lastOrderedStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId); const nextStop = lastOrderedStop?.nextStop; if (nextStop == null) return; const travelTimeSeconds = await this.getAverageTravelTimeSeconds({ routeId: shuttle.routeId, fromStopId: lastStop.stopId, toStopId: nextStop.stopId, }, { from: oneWeekAgo, to: new Date(oneWeekAgo.getTime() + (60 * 60 * 1000)) }); if (travelTimeSeconds == undefined) return; const elapsedTimeMs = referenceCurrentTime.getTime() - lastStop.timestamp.getTime(); const predictedTimeSeconds = travelTimeSeconds - (elapsedTimeMs / 1000); await this.addOrUpdateEta({ secondsRemaining: predictedTimeSeconds, shuttleId: shuttle.id, stopId: nextStop.stopId, systemId: nextStop.systemId, updatedTime: new Date(), }); } }