import EventEmitter from "node:events"; import { createClient } from 'redis'; import { ShuttleGetterSetterRepository } from "./ShuttleGetterSetterRepository"; import { IEta, IOrderedStop, IRoute, IShuttle, IStop, shuttleHasArrivedAtStop } from "../../entities/ShuttleRepositoryEntities"; import { REDIS_RECONNECT_INTERVAL } from "../../environment"; import { ShuttleRepositoryEvent, ShuttleRepositoryEventListener, ShuttleRepositoryEventName, ShuttleRepositoryEventPayloads, } from "./ShuttleGetterRepository"; export interface ShuttleStopArrival { stopId: string; timestamp: Date; } export interface ShuttleTravelTimeDataIdentifier { routeId: string; fromStopId: string; toStopId: string; } export interface ShuttleTravelTimeDateFilterArguments { from: Date; to: Date; } export class RedisShuttleRepository extends EventEmitter implements ShuttleGetterSetterRepository { protected redisClient; constructor( redisClient = createClient({ url: process.env.REDIS_URL, socket: { tls: process.env.NODE_ENV === 'production', rejectUnauthorized: false, reconnectStrategy: REDIS_RECONNECT_INTERVAL, }, }), ) { super(); this.redisClient = redisClient; this.redisClient.on('error', (err) => { console.error(err.stack); }); } get isReady() { return this.redisClient.isReady; } public async connect() { await this.redisClient.connect(); } public async disconnect() { await this.redisClient.disconnect(); } public async clearAllData() { await this.redisClient.flushAll(); } // EventEmitter override methods for type safety public override on( event: T, listener: ShuttleRepositoryEventListener, ): this; public override on(event: string | symbol, listener: (...args: any[]) => void): this { return super.on(event, listener); } public override once( event: T, listener: ShuttleRepositoryEventListener, ): this; public override once(event: string | symbol, listener: (...args: any[]) => void): this { return super.once(event, listener); } public override off( event: T, listener: ShuttleRepositoryEventListener, ): this; public override off(event: string | symbol, listener: (...args: any[]) => void): this { return super.off(event, listener); } public override addListener( event: T, listener: ShuttleRepositoryEventListener, ): this; public override addListener(event: string | symbol, listener: (...args: any[]) => void): this { return super.addListener(event, listener); } public override removeListener( event: T, listener: ShuttleRepositoryEventListener, ): this; public override removeListener(event: string | symbol, listener: (...args: any[]) => void): this { return super.removeListener(event, listener); } public override emit( event: T, payload: ShuttleRepositoryEventPayloads[T], ): boolean; public override emit(event: string | symbol, ...args: any[]): boolean { return super.emit(event, ...args); } // Helper methods for Redis key generation private createStopKey = (stopId: string) => `shuttle:stop:${stopId}`; private createRouteKey = (routeId: string) => `shuttle:route:${routeId}`; private createShuttleKey = (shuttleId: string) => `shuttle:shuttle:${shuttleId}`; private createEtaKey = (shuttleId: string, stopId: string) => `shuttle:eta:${shuttleId}:${stopId}`; private createOrderedStopKey = (routeId: string, stopId: string) => `shuttle:orderedstop:${routeId}:${stopId}`; private createShuttleLastStopKey = (shuttleId: string) => `shuttle:laststop:${shuttleId}`; private createHistoricalEtaTimeSeriesKey =(routeId: string, fromStopId: string, toStopId: string) => { return `shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`; } // Helper methods for converting entities to Redis hashes private createRedisHashFromStop = (stop: IStop): Record => ({ id: stop.id, name: stop.name, systemId: stop.systemId, latitude: stop.coordinates.latitude.toString(), longitude: stop.coordinates.longitude.toString(), updatedTime: stop.updatedTime.toISOString(), }); private createStopFromRedisData = (data: Record): IStop => ({ id: data.id, name: data.name, systemId: data.systemId, coordinates: { latitude: parseFloat(data.latitude), longitude: parseFloat(data.longitude), }, updatedTime: new Date(data.updatedTime), }); private createRedisHashFromRoute = (route: IRoute): Record => ({ id: route.id, name: route.name, color: route.color, systemId: route.systemId, polylineCoordinates: JSON.stringify(route.polylineCoordinates), updatedTime: route.updatedTime.toISOString(), }); private createRouteFromRedisData = (data: Record): IRoute => ({ id: data.id, name: data.name, color: data.color, systemId: data.systemId, polylineCoordinates: JSON.parse(data.polylineCoordinates), updatedTime: new Date(data.updatedTime), }); private createRedisHashFromShuttle = (shuttle: IShuttle): Record => ({ id: shuttle.id, name: shuttle.name, routeId: shuttle.routeId, systemId: shuttle.systemId, latitude: shuttle.coordinates.latitude.toString(), longitude: shuttle.coordinates.longitude.toString(), orientationInDegrees: shuttle.orientationInDegrees.toString(), updatedTime: shuttle.updatedTime.toISOString(), }); private createShuttleFromRedisData = (data: Record): IShuttle => ({ id: data.id, name: data.name, routeId: data.routeId, systemId: data.systemId, coordinates: { latitude: parseFloat(data.latitude), longitude: parseFloat(data.longitude), }, orientationInDegrees: parseFloat(data.orientationInDegrees), updatedTime: new Date(data.updatedTime), }); private createRedisHashFromEta = (eta: IEta): Record => ({ secondsRemaining: eta.secondsRemaining.toString(), shuttleId: eta.shuttleId, stopId: eta.stopId, systemId: eta.systemId, updatedTime: eta.updatedTime.toISOString(), }); private createEtaFromRedisData = (data: Record): IEta => ({ secondsRemaining: parseFloat(data.secondsRemaining), shuttleId: data.shuttleId, stopId: data.stopId, systemId: data.systemId, updatedTime: new Date(data.updatedTime), }); private createRedisHashFromOrderedStop = (orderedStop: IOrderedStop): Record => { const hash: Record = { routeId: orderedStop.routeId, stopId: orderedStop.stopId, position: orderedStop.position.toString(), systemId: orderedStop.systemId, updatedTime: orderedStop.updatedTime.toISOString(), }; if (orderedStop.nextStop) { hash.nextStopRouteId = orderedStop.nextStop.routeId; hash.nextStopStopId = orderedStop.nextStop.stopId; } if (orderedStop.previousStop) { hash.previousStopRouteId = orderedStop.previousStop.routeId; hash.previousStopStopId = orderedStop.previousStop.stopId; } return hash; }; private createOrderedStopFromRedisData = (data: Record): IOrderedStop => { const orderedStop: IOrderedStop = { routeId: data.routeId, stopId: data.stopId, position: parseInt(data.position), systemId: data.systemId, updatedTime: new Date(data.updatedTime), }; // Note: We only store the IDs of next/previous stops, not full objects // to avoid circular references in Redis. These would need to be // resolved separately if needed. if (data.nextStopRouteId && data.nextStopStopId) { orderedStop.nextStop = { routeId: data.nextStopRouteId, stopId: data.nextStopStopId, position: 0, // placeholder systemId: data.systemId, updatedTime: new Date(), }; } if (data.previousStopRouteId && data.previousStopStopId) { orderedStop.previousStop = { routeId: data.previousStopRouteId, stopId: data.previousStopStopId, position: 0, // placeholder systemId: data.systemId, updatedTime: new Date(), }; } return orderedStop; }; // Getter methods public async getStops(): Promise { const keys = await this.redisClient.keys('shuttle:stop:*'); const stops: IStop[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { stops.push(this.createStopFromRedisData(data)); } } return stops; } public async getStopById(stopId: string): Promise { const key = this.createStopKey(stopId); const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length === 0) { return null; } return this.createStopFromRedisData(data); } public async getRoutes(): Promise { const keys = await this.redisClient.keys('shuttle:route:*'); const routes: IRoute[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { routes.push(this.createRouteFromRedisData(data)); } } return routes; } public async getRouteById(routeId: string): Promise { const key = this.createRouteKey(routeId); const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length === 0) { return null; } return this.createRouteFromRedisData(data); } public async getShuttles(): Promise { const keys = await this.redisClient.keys('shuttle:shuttle:*'); const shuttles: IShuttle[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { shuttles.push(this.createShuttleFromRedisData(data)); } } return shuttles; } public async getShuttlesByRouteId(routeId: string): Promise { const allShuttles = await this.getShuttles(); return allShuttles.filter(shuttle => shuttle.routeId === routeId); } public async getShuttleById(shuttleId: string): Promise { const key = this.createShuttleKey(shuttleId); const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length === 0) { return null; } return this.createShuttleFromRedisData(data); } public async getEtasForShuttleId(shuttleId: string): Promise { const keys = await this.redisClient.keys(`shuttle:eta:${shuttleId}:*`); const etas: IEta[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { etas.push(this.createEtaFromRedisData(data)); } } return etas; } public async getEtasForStopId(stopId: string): Promise { const keys = await this.redisClient.keys('shuttle:eta:*'); const etas: IEta[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0 && data.stopId === stopId) { etas.push(this.createEtaFromRedisData(data)); } } return etas; } public async getEtaForShuttleAndStopId(shuttleId: string, stopId: string): Promise { const key = this.createEtaKey(shuttleId, stopId); const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length === 0) { return null; } return this.createEtaFromRedisData(data); } public async getOrderedStopByRouteAndStopId(routeId: string, stopId: string): Promise { const key = this.createOrderedStopKey(routeId, stopId); const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length === 0) { return null; } return this.createOrderedStopFromRedisData(data); } public async getOrderedStopsByStopId(stopId: string): Promise { const keys = await this.redisClient.keys('shuttle:orderedstop:*'); const orderedStops: IOrderedStop[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0 && data.stopId === stopId) { orderedStops.push(this.createOrderedStopFromRedisData(data)); } } return orderedStops; } public async getOrderedStopsByRouteId(routeId: string): Promise { const keys = await this.redisClient.keys(`shuttle:orderedstop:${routeId}:*`); const orderedStops: IOrderedStop[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { orderedStops.push(this.createOrderedStopFromRedisData(data)); } } return orderedStops; } // Setter/update methods public async addOrUpdateRoute(route: IRoute): Promise { const key = this.createRouteKey(route.id); await this.redisClient.hSet(key, this.createRedisHashFromRoute(route)); } public async addOrUpdateShuttle( shuttle: IShuttle, travelTimeTimestamp = Date.now(), referenceCurrentTime = new Date(), ): Promise { const key = this.createShuttleKey(shuttle.id); await this.redisClient.hSet(key, this.createRedisHashFromShuttle(shuttle)); await this.updateHistoricalEtasForShuttle(shuttle, travelTimeTimestamp); await this.updateEtasBasedOnHistoricalData(shuttle, referenceCurrentTime); } private async updateEtasBasedOnHistoricalData( shuttle: IShuttle, referenceCurrentTime: Date = new Date(), ) { const oneWeekAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 7 * 1000)) const lastStopArrival = await this.getShuttleLastStopArrival(shuttle.id) if (lastStopArrival == undefined) return; const lastOrderedStop = await this.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStopArrival.stopId); const nextStop = lastOrderedStop?.nextStop; if (nextStop == null) return; const travelTimeSeconds = await this.getAverageTravelTimeSeconds({ routeId: shuttle.routeId, fromStopId: lastStopArrival.stopId, toStopId: nextStop.stopId, }, { from: oneWeekAgo, to: new Date(oneWeekAgo.getTime() + (60 * 60 * 1000)) }); if (travelTimeSeconds == undefined) return; const elapsedTimeMs = referenceCurrentTime.getTime() - lastStopArrival.timestamp.getTime(); const predictedTimeSeconds = travelTimeSeconds - (elapsedTimeMs / 1000); await this.addOrUpdateEta({ secondsRemaining: predictedTimeSeconds, shuttleId: shuttle.id, stopId: nextStop.stopId, systemId: nextStop.systemId, updatedTime: new Date(), }); } private async updateHistoricalEtasForShuttle( shuttle: IShuttle, travelTimeTimestamp = Date.now(), ) { const arrivedStop = await this.getArrivedStopIfExists(shuttle); if (arrivedStop != undefined) { const lastStopTimestamp = await this.getShuttleLastStopArrival(shuttle.id) if (lastStopTimestamp != undefined) { const routeId = shuttle.routeId; const fromStopId = lastStopTimestamp.stopId; const toStopId = arrivedStop.id; const travelTimeSeconds = (travelTimeTimestamp - lastStopTimestamp.timestamp.getTime()) / 1000; await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId, }, travelTimeSeconds, travelTimeTimestamp); } await this.updateShuttleLastStopArrival(shuttle.id, { stopId: arrivedStop.id, timestamp: new Date(travelTimeTimestamp), }) } } public async getAverageTravelTimeSeconds( { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, { from, to }: ShuttleTravelTimeDateFilterArguments, ): Promise { const timeSeriesKey = this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId); const fromTimestamp = from.getTime(); const toTimestamp = to.getTime(); const intervalMs = toTimestamp - fromTimestamp; try { const aggregationResult = await this.redisClient.sendCommand([ 'TS.RANGE', timeSeriesKey, fromTimestamp.toString(), toTimestamp.toString(), 'AGGREGATION', 'AVG', intervalMs.toString() ]) as [string, string][]; if (aggregationResult && aggregationResult.length > 0) { const [, averageValue] = aggregationResult[0]; return parseFloat(averageValue); } throw new Error(`No historical data found for route ${routeId} from stop ${fromStopId} to stop ${toStopId}`); } catch (error) { console.warn(`Failed to get average travel time: ${error instanceof Error ? error.message : String(error)}`); return; } } private async addTravelTimeDataPoint( { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, travelTimeSeconds: number, timestamp = Date.now(), ): Promise { const historicalEtaTimeSeriesKey = this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId); try { await this.redisClient.sendCommand([ 'TS.ADD', historicalEtaTimeSeriesKey, timestamp.toString(), travelTimeSeconds.toString(), 'LABELS', 'routeId', routeId, 'fromStopId', fromStopId, 'toStopId', toStopId ]); } catch (error) { await this.createHistoricalEtaTimeSeriesAndAddDataPoint( historicalEtaTimeSeriesKey, timestamp, travelTimeSeconds, routeId, fromStopId, toStopId ); } } private async createHistoricalEtaTimeSeriesAndAddDataPoint( timeSeriesKey: string, timestamp: number, travelTimeSeconds: number, routeId: string, fromStopId: string, toStopId: string, ): Promise { try { await this.redisClient.sendCommand([ 'TS.CREATE', timeSeriesKey, 'RETENTION', '2678400000', // one month in milliseconds 'LABELS', 'routeId', routeId, 'fromStopId', fromStopId, 'toStopId', toStopId ]); await this.redisClient.sendCommand([ 'TS.ADD', timeSeriesKey, timestamp.toString(), travelTimeSeconds.toString() ]); } catch (createError) { await this.redisClient.sendCommand([ 'TS.ADD', timeSeriesKey, timestamp.toString(), travelTimeSeconds.toString() ]); } } public async getArrivedStopIfExists( shuttle: IShuttle, delta = 0.001, ): Promise { const orderedStops = await this.getOrderedStopsByRouteId(shuttle.routeId); for (const orderedStop of orderedStops) { const stop = await this.getStopById(orderedStop.stopId); if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, delta)) { return stop; } } return undefined; } public async getShuttleLastStopArrival(shuttleId: string): Promise { const key = this.createShuttleLastStopKey(shuttleId); const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length === 0) { return undefined; } return { stopId: data.stopId, timestamp: new Date(data.timestamp), }; } public async updateShuttleLastStopArrival(shuttleId: string, lastStopArrival: ShuttleStopArrival) { const key = this.createShuttleLastStopKey(shuttleId); await this.redisClient.hSet(key, { stopId: lastStopArrival.stopId, timestamp: lastStopArrival.timestamp.toISOString(), }); } public async addOrUpdateStop(stop: IStop): Promise { const key = this.createStopKey(stop.id); await this.redisClient.hSet(key, this.createRedisHashFromStop(stop)); } public async addOrUpdateOrderedStop(orderedStop: IOrderedStop): Promise { const key = this.createOrderedStopKey(orderedStop.routeId, orderedStop.stopId); await this.redisClient.hSet(key, this.createRedisHashFromOrderedStop(orderedStop)); } public async addOrUpdateEta(eta: IEta): Promise { const key = this.createEtaKey(eta.shuttleId, eta.stopId); await this.redisClient.hSet(key, this.createRedisHashFromEta(eta)); this.emit(ShuttleRepositoryEvent.ETA_UPDATED, eta); } // Remove methods public async removeRouteIfExists(routeId: string): Promise { const route = await this.getRouteById(routeId); if (route) { const key = this.createRouteKey(routeId); await this.redisClient.del(key); return route; } return null; } public async removeShuttleIfExists(shuttleId: string): Promise { const shuttle = await this.getShuttleById(shuttleId); if (shuttle) { const key = this.createShuttleKey(shuttleId); await this.redisClient.del(key); return shuttle; } return null; } public async removeStopIfExists(stopId: string): Promise { const stop = await this.getStopById(stopId); if (stop) { const key = this.createStopKey(stopId); await this.redisClient.del(key); return stop; } return null; } public async removeOrderedStopIfExists(stopId: string, routeId: string): Promise { const orderedStop = await this.getOrderedStopByRouteAndStopId(routeId, stopId); if (orderedStop) { const key = this.createOrderedStopKey(routeId, stopId); await this.redisClient.del(key); return orderedStop; } return null; } public async removeEtaIfExists(shuttleId: string, stopId: string): Promise { const eta = await this.getEtaForShuttleAndStopId(shuttleId, stopId); if (eta) { const key = this.createEtaKey(shuttleId, stopId); await this.redisClient.del(key); this.emit(ShuttleRepositoryEvent.ETA_REMOVED, eta); return eta; } return null; } // Clear methods public async clearShuttleData(): Promise { const keys = await this.redisClient.keys('shuttle:shuttle:*'); if (keys.length > 0) { await this.redisClient.del(keys); } } public async clearEtaData(): Promise { const removedEtas = await this.getAllEtas(); const keys = await this.redisClient.keys('shuttle:eta:*'); if (keys.length > 0) { await this.redisClient.del(keys); } this.emit(ShuttleRepositoryEvent.ETA_DATA_CLEARED, removedEtas); } public async clearOrderedStopData(): Promise { const keys = await this.redisClient.keys('shuttle:orderedstop:*'); if (keys.length > 0) { await this.redisClient.del(keys); } } public async clearRouteData(): Promise { const keys = await this.redisClient.keys('shuttle:route:*'); if (keys.length > 0) { await this.redisClient.del(keys); } } public async clearStopData(): Promise { const keys = await this.redisClient.keys('shuttle:stop:*'); if (keys.length > 0) { await this.redisClient.del(keys); } } // Helper method to get all ETAs for the clearEtaData event private async getAllEtas(): Promise { const keys = await this.redisClient.keys('shuttle:eta:*'); const etas: IEta[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { etas.push(this.createEtaFromRedisData(data)); } } return etas; } }