import { ShuttleGetterSetterRepository } from "./ShuttleGetterSetterRepository"; import { IEta, IOrderedStop, IRoute, IShuttle, IStop, shuttleHasArrivedAtStop } from "../../entities/ShuttleRepositoryEntities"; import { ShuttleRepositoryEvent, ShuttleRepositoryEventListener, ShuttleRepositoryEventName, ShuttleRepositoryEventPayloads, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments } from "./ShuttleGetterRepository"; import { BaseRedisRepository } from "../BaseRedisRepository"; import { RedisClientType } from "redis"; import createRedisClientForRepository from "../../helpers/createRedisClientForRepository"; export class RedisShuttleRepository extends BaseRedisRepository implements ShuttleGetterSetterRepository { constructor( redisClient: RedisClientType = createRedisClientForRepository(), readonly shuttleStopArrivalDegreeDelta: number = 0.001, readonly shuttleStopNearbyDegreeDelta: number = 0.003, ) { super(redisClient); } get isReady() { return this.redisClient.isReady; } public async connect() { await this.redisClient.connect(); } public async disconnect() { await this.redisClient.disconnect(); } // EventEmitter override methods for type safety public override on( event: T, listener: ShuttleRepositoryEventListener, ): this; public override on(event: string | symbol, listener: (...args: any[]) => void): this { return super.on(event, listener); } public override once( event: T, listener: ShuttleRepositoryEventListener, ): this; public override once(event: string | symbol, listener: (...args: any[]) => void): this { return super.once(event, listener); } public override off( event: T, listener: ShuttleRepositoryEventListener, ): this; public override off(event: string | symbol, listener: (...args: any[]) => void): this { return super.off(event, listener); } public override addListener( event: T, listener: ShuttleRepositoryEventListener, ): this; public override addListener(event: string | symbol, listener: (...args: any[]) => void): this { return super.addListener(event, listener); } public override removeListener( event: T, listener: ShuttleRepositoryEventListener, ): this; public override removeListener(event: string | symbol, listener: (...args: any[]) => void): this { return super.removeListener(event, listener); } public override emit( event: T, payload: ShuttleRepositoryEventPayloads[T], ): boolean; public override emit(event: string | symbol, ...args: any[]): boolean { return super.emit(event, ...args); } // Helper methods for Redis key generation private createStopKey = (stopId: string) => `shuttle:stop:${stopId}`; private createRouteKey = (routeId: string) => `shuttle:route:${routeId}`; private createShuttleKey = (shuttleId: string) => `shuttle:shuttle:${shuttleId}`; private createOrderedStopKey = (routeId: string, stopId: string) => `shuttle:orderedstop:${routeId}:${stopId}`; private createShuttleLastStopKey = (shuttleId: string) => `shuttle:laststop:${shuttleId}`; private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => { return `shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`; } /** * Represents a set storing the shuttles that are currently at a stop. */ private readonly shuttleIsAtStopKey = "shuttle:atstop"; // Helper methods for converting entities to Redis hashes private createRedisHashFromStop = (stop: IStop): Record => ({ id: stop.id, name: stop.name, systemId: stop.systemId, latitude: stop.coordinates.latitude.toString(), longitude: stop.coordinates.longitude.toString(), updatedTime: stop.updatedTime.toISOString(), }); private createStopFromRedisData = (data: Record): IStop => ({ id: data.id, name: data.name, systemId: data.systemId, coordinates: { latitude: parseFloat(data.latitude), longitude: parseFloat(data.longitude), }, updatedTime: new Date(data.updatedTime), }); private createRedisHashFromRoute = (route: IRoute): Record => ({ id: route.id, name: route.name, color: route.color, systemId: route.systemId, polylineCoordinates: JSON.stringify(route.polylineCoordinates), updatedTime: route.updatedTime.toISOString(), }); private createRouteFromRedisData = (data: Record): IRoute => ({ id: data.id, name: data.name, color: data.color, systemId: data.systemId, polylineCoordinates: JSON.parse(data.polylineCoordinates), updatedTime: new Date(data.updatedTime), }); private createRedisHashFromShuttle = (shuttle: IShuttle): Record => ({ id: shuttle.id, name: shuttle.name, routeId: shuttle.routeId, systemId: shuttle.systemId, latitude: shuttle.coordinates.latitude.toString(), longitude: shuttle.coordinates.longitude.toString(), orientationInDegrees: shuttle.orientationInDegrees.toString(), updatedTime: shuttle.updatedTime.toISOString(), }); private createShuttleFromRedisData = (data: Record): IShuttle => ({ id: data.id, name: data.name, routeId: data.routeId, systemId: data.systemId, coordinates: { latitude: parseFloat(data.latitude), longitude: parseFloat(data.longitude), }, orientationInDegrees: parseFloat(data.orientationInDegrees), updatedTime: new Date(data.updatedTime), }); private createEtaFromRedisData = (data: Record): IEta => ({ secondsRemaining: parseFloat(data.secondsRemaining), shuttleId: data.shuttleId, stopId: data.stopId, systemId: data.systemId, updatedTime: new Date(data.updatedTime), }); private createRedisHashFromOrderedStop = (orderedStop: IOrderedStop): Record => { const hash: Record = { routeId: orderedStop.routeId, stopId: orderedStop.stopId, position: orderedStop.position.toString(), systemId: orderedStop.systemId, updatedTime: orderedStop.updatedTime.toISOString(), }; if (orderedStop.nextStop) { hash.nextStopRouteId = orderedStop.nextStop.routeId; hash.nextStopStopId = orderedStop.nextStop.stopId; } if (orderedStop.previousStop) { hash.previousStopRouteId = orderedStop.previousStop.routeId; hash.previousStopStopId = orderedStop.previousStop.stopId; } return hash; }; private createOrderedStopFromRedisData = (data: Record): IOrderedStop => { const orderedStop: IOrderedStop = { routeId: data.routeId, stopId: data.stopId, position: parseInt(data.position), systemId: data.systemId, updatedTime: new Date(data.updatedTime), }; // Note: We only store the IDs of next/previous stops, not full objects // to avoid circular references in Redis. These would need to be // resolved separately if needed. if (data.nextStopRouteId && data.nextStopStopId) { orderedStop.nextStop = { routeId: data.nextStopRouteId, stopId: data.nextStopStopId, position: 0, // placeholder systemId: data.systemId, updatedTime: new Date(), }; } if (data.previousStopRouteId && data.previousStopStopId) { orderedStop.previousStop = { routeId: data.previousStopRouteId, stopId: data.previousStopStopId, position: 0, // placeholder systemId: data.systemId, updatedTime: new Date(), }; } return orderedStop; }; // Getter methods public async getStops(): Promise { const keys = await this.redisClient.keys('shuttle:stop:*'); const stops: IStop[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { stops.push(this.createStopFromRedisData(data)); } } return stops; } public async getStopById(stopId: string): Promise { const key = this.createStopKey(stopId); const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length === 0) { return null; } return this.createStopFromRedisData(data); } public async getRoutes(): Promise { const keys = await this.redisClient.keys('shuttle:route:*'); const routes: IRoute[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { routes.push(this.createRouteFromRedisData(data)); } } return routes; } public async getRouteById(routeId: string): Promise { const key = this.createRouteKey(routeId); const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length === 0) { return null; } return this.createRouteFromRedisData(data); } public async getShuttles(): Promise { const keys = await this.redisClient.keys('shuttle:shuttle:*'); const shuttles: IShuttle[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { shuttles.push(this.createShuttleFromRedisData(data)); } } return shuttles; } public async getShuttlesByRouteId(routeId: string): Promise { const allShuttles = await this.getShuttles(); return allShuttles.filter(shuttle => shuttle.routeId === routeId); } public async getShuttleById(shuttleId: string): Promise { const key = this.createShuttleKey(shuttleId); const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length === 0) { return null; } return this.createShuttleFromRedisData(data); } public async getOrderedStopByRouteAndStopId(routeId: string, stopId: string): Promise { const key = this.createOrderedStopKey(routeId, stopId); const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length === 0) { return null; } return this.createOrderedStopFromRedisData(data); } public async getOrderedStopsByStopId(stopId: string): Promise { const keys = await this.redisClient.keys('shuttle:orderedstop:*'); const orderedStops: IOrderedStop[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0 && data.stopId === stopId) { orderedStops.push(this.createOrderedStopFromRedisData(data)); } } return orderedStops; } public async getOrderedStopsByRouteId(routeId: string): Promise { const keys = await this.redisClient.keys(`shuttle:orderedstop:${routeId}:*`); const orderedStops: IOrderedStop[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { orderedStops.push(this.createOrderedStopFromRedisData(data)); } } return orderedStops; } // Setter/update methods public async addOrUpdateRoute(route: IRoute): Promise { const key = this.createRouteKey(route.id); await this.redisClient.hSet(key, this.createRedisHashFromRoute(route)); } public async addOrUpdateShuttle( shuttle: IShuttle, travelTimeTimestamp = Date.now(), ): Promise { const key = this.createShuttleKey(shuttle.id); await this.redisClient.hSet(key, this.createRedisHashFromShuttle(shuttle)); this.emit(ShuttleRepositoryEvent.SHUTTLE_UPDATED, shuttle); await this.updateLastStopArrival(shuttle, travelTimeTimestamp); } private async updateLastStopArrival( shuttle: IShuttle, travelTimeTimestamp = Date.now(), ) { const isAtStop = await this.checkIfShuttleIsAtStop(shuttle.id); let arrivedStop: IStop | undefined; if (isAtStop) { // Allow retrieval of the same stop // Will still return undefined when the shuttle leaves the stop arrivedStop = await this.getArrivedStopIfExists(shuttle, false); } else { arrivedStop = await this.getArrivedStopIfExists(shuttle, true); } // Will not fire *any* events if the same stop const lastStop = await this.getShuttleLastStopArrival(shuttle.id); if (lastStop?.stopId === arrivedStop?.id) return; if (isAtStop) { if (lastStop) { this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, { stopArrivalThatShuttleIsLeaving: lastStop, }); } await this.markShuttleAsNotAtStop(shuttle.id); } if (arrivedStop) { const shuttleArrival = { stopId: arrivedStop.id, timestamp: new Date(travelTimeTimestamp), shuttleId: shuttle.id, }; this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, { lastStopArrival: lastStop, willArriveAt: shuttleArrival, }); await this.markShuttleAsAtStop(shuttleArrival.shuttleId); await this.updateShuttleLastStopArrival(shuttleArrival); } } private async markShuttleAsAtStop(shuttleId: string) { await this.redisClient.sAdd(this.shuttleIsAtStopKey, shuttleId); } private async markShuttleAsNotAtStop(shuttleId: string) { await this.redisClient.sRem(this.shuttleIsAtStopKey, shuttleId); } public async checkIfShuttleIsAtStop(shuttleId: string) { return await this.redisClient.sIsMember(this.shuttleIsAtStopKey, shuttleId); } public async getAverageTravelTimeSeconds( { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, { from, to }: ShuttleTravelTimeDateFilterArguments, ): Promise { const timeSeriesKey = this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId); const fromTimestamp = from.getTime(); const toTimestamp = to.getTime(); const intervalMs = toTimestamp - fromTimestamp + 1; try { const aggregationResult = await this.redisClient.sendCommand([ 'TS.RANGE', timeSeriesKey, fromTimestamp.toString(), toTimestamp.toString(), 'AGGREGATION', 'AVG', intervalMs.toString() ]) as [string, string][]; if (aggregationResult && aggregationResult.length > 0) { const [, averageValue] = aggregationResult[0]; return parseFloat(averageValue); } return; } catch (error) { console.warn(`Failed to get average travel time: ${error instanceof Error ? error.message : String(error)}`); return; } } public async getArrivedStopIfExists( shuttle: IShuttle, returnNextStopOnly: boolean = false, ): Promise { const degreeDelta = this.shuttleStopArrivalDegreeDelta; const lastStop = await this.getShuttleLastStopArrival(shuttle.id); if (lastStop && returnNextStopOnly) { const lastOrderedStop = await this.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId); const orderedStopAfter = lastOrderedStop?.nextStop; if (orderedStopAfter) { const stopAfter = await this.getStopById(orderedStopAfter.stopId); if (stopAfter && shuttleHasArrivedAtStop(shuttle, stopAfter, degreeDelta)) { return stopAfter; } } } else { const orderedStops = await this.getOrderedStopsByRouteId(shuttle.routeId); for (const orderedStop of orderedStops) { const stop = await this.getStopById(orderedStop.stopId); if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, degreeDelta)) { return stop; } } } return undefined; } public async getShuttleLastStopArrival(shuttleId: string): Promise { const key = this.createShuttleLastStopKey(shuttleId); const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length === 0) { return undefined; } return { shuttleId, stopId: data.stopId, timestamp: new Date(data.timestamp), }; } private async updateShuttleLastStopArrival(lastStopArrival: ShuttleStopArrival) { const key = this.createShuttleLastStopKey(lastStopArrival.shuttleId); await this.redisClient.hSet(key, { stopId: lastStopArrival.stopId, timestamp: lastStopArrival.timestamp.toISOString(), }); } public async addOrUpdateStop(stop: IStop): Promise { const key = this.createStopKey(stop.id); await this.redisClient.hSet(key, this.createRedisHashFromStop(stop)); } public async addOrUpdateOrderedStop(orderedStop: IOrderedStop): Promise { const key = this.createOrderedStopKey(orderedStop.routeId, orderedStop.stopId); await this.redisClient.hSet(key, this.createRedisHashFromOrderedStop(orderedStop)); } // Remove methods public async removeRouteIfExists(routeId: string): Promise { const route = await this.getRouteById(routeId); if (route) { const key = this.createRouteKey(routeId); await this.redisClient.del(key); return route; } return null; } public async removeShuttleIfExists(shuttleId: string): Promise { const shuttle = await this.getShuttleById(shuttleId); if (shuttle) { const key = this.createShuttleKey(shuttleId); await this.redisClient.del(key); this.emit(ShuttleRepositoryEvent.SHUTTLE_REMOVED, shuttle); await this.removeShuttleLastStopIfExists(shuttleId); return shuttle; } return null; } public async removeStopIfExists(stopId: string): Promise { const stop = await this.getStopById(stopId); if (stop) { const key = this.createStopKey(stopId); await this.redisClient.del(key); return stop; } return null; } public async removeOrderedStopIfExists(stopId: string, routeId: string): Promise { const orderedStop = await this.getOrderedStopByRouteAndStopId(routeId, stopId); if (orderedStop) { const key = this.createOrderedStopKey(routeId, stopId); await this.redisClient.del(key); return orderedStop; } return null; } private async removeShuttleLastStopIfExists(shuttleId: string) { const lastStop = await this.getShuttleLastStopArrival(shuttleId); if (lastStop) { const key = this.createShuttleLastStopKey(shuttleId); await this.redisClient.del(key); return lastStop; } return null; } // Clear methods public async clearShuttleData(): Promise { const keys = await this.redisClient.keys('shuttle:shuttle:*'); if (keys.length > 0) { await this.redisClient.del(keys); } await this.clearShuttleLastStopData(); } public async clearOrderedStopData(): Promise { const keys = await this.redisClient.keys('shuttle:orderedstop:*'); if (keys.length > 0) { await this.redisClient.del(keys); } } public async clearRouteData(): Promise { const keys = await this.redisClient.keys('shuttle:route:*'); if (keys.length > 0) { await this.redisClient.del(keys); } } public async clearStopData(): Promise { const keys = await this.redisClient.keys('shuttle:stop:*'); if (keys.length > 0) { await this.redisClient.del(keys); } } private async clearShuttleLastStopData(): Promise { const keys = await this.redisClient.keys('shuttle:laststop:*'); if (keys.length > 0) { await this.redisClient.del(keys); } } }