From 6a31609960b83f0bf13aef61d9f324b7c7b6fdad Mon Sep 17 00:00:00 2001 From: Brendan Chen Date: Tue, 11 Nov 2025 20:21:40 -0800 Subject: [PATCH] Implement in-memory equivalents of both ETA repositories --- .../shuttle/eta/BaseInMemoryETARepository.ts | 13 ++- .../InMemoryExternalSourceETARepository.ts | 17 ++- .../eta/InMemorySelfUpdatingETARepository.ts | 110 ++++++++++++++++-- ...rnalSourceETARepositorySharedTests.test.ts | 17 ++- ...lfUpdatingETARepositorySharedTests.test.ts | 23 +++- 5 files changed, 165 insertions(+), 15 deletions(-) diff --git a/src/repositories/shuttle/eta/BaseInMemoryETARepository.ts b/src/repositories/shuttle/eta/BaseInMemoryETARepository.ts index 1a2b7cf..c260331 100644 --- a/src/repositories/shuttle/eta/BaseInMemoryETARepository.ts +++ b/src/repositories/shuttle/eta/BaseInMemoryETARepository.ts @@ -1,5 +1,5 @@ import { IEta } from "../../../entities/ShuttleRepositoryEntities"; -import { ETAGetterRepository, ETARepositoryEventListener, ETARepositoryEventName } from "./ETAGetterRepository"; +import { ETAGetterRepository, ETARepositoryEvent, ETARepositoryEventListener, ETARepositoryEventName } from "./ETAGetterRepository"; import EventEmitter from "node:events"; export abstract class BaseInMemoryETARepository extends EventEmitter implements ETAGetterRepository { @@ -18,6 +18,17 @@ export abstract class BaseInMemoryETARepository extends EventEmitter implements return eta ?? null; } + // Protected setter for internal use + protected async addOrUpdateEta(eta: IEta): Promise { + const index = this.etas.findIndex((e) => e.stopId === eta.stopId && e.shuttleId === eta.shuttleId); + if (index !== -1) { + this.etas[index] = eta; + } else { + this.etas.push(eta); + } + this.emit(ETARepositoryEvent.ETA_UPDATED, eta); + } + // EventEmitter overrides for type safety override on( event: T, diff --git a/src/repositories/shuttle/eta/InMemoryExternalSourceETARepository.ts b/src/repositories/shuttle/eta/InMemoryExternalSourceETARepository.ts index b769c12..4054e4f 100644 --- a/src/repositories/shuttle/eta/InMemoryExternalSourceETARepository.ts +++ b/src/repositories/shuttle/eta/InMemoryExternalSourceETARepository.ts @@ -1,13 +1,22 @@ import { IEta } from "../../../entities/ShuttleRepositoryEntities"; import { ExternalSourceETARepository } from "./ExternalSourceETARepository"; import { BaseInMemoryETARepository } from "./BaseInMemoryETARepository"; +import { ETARepositoryEvent } from "./ETAGetterRepository"; export class InMemoryExternalSourceETARepository extends BaseInMemoryETARepository implements ExternalSourceETARepository { - addOrUpdateEtaFromExternalSource(eta: IEta): Promise { - throw new Error("Method not implemented."); + async addOrUpdateEtaFromExternalSource(eta: IEta): Promise { + await this.addOrUpdateEta(eta); } - removeEtaIfExists(shuttleId: string, stopId: string): Promise { - throw new Error("Method not implemented."); + async removeEtaIfExists(shuttleId: string, stopId: string): Promise { + const index = this.etas.findIndex((e) => e.stopId === stopId && e.shuttleId === shuttleId); + if (index === -1) { + return null; + } + + const removedEta = this.etas[index]; + this.etas.splice(index, 1); + this.emit(ETARepositoryEvent.ETA_REMOVED, removedEta); + return removedEta; } } diff --git a/src/repositories/shuttle/eta/InMemorySelfUpdatingETARepository.ts b/src/repositories/shuttle/eta/InMemorySelfUpdatingETARepository.ts index a94955d..4f0f6e3 100644 --- a/src/repositories/shuttle/eta/InMemorySelfUpdatingETARepository.ts +++ b/src/repositories/shuttle/eta/InMemorySelfUpdatingETARepository.ts @@ -1,22 +1,118 @@ import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository"; -import { ShuttleGetterRepository, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments } from "../ShuttleGetterRepository"; +import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments } from "../ShuttleGetterRepository"; import { BaseInMemoryETARepository } from "./BaseInMemoryETARepository"; +import { IShuttle } from "../../../entities/ShuttleRepositoryEntities"; export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository implements SelfUpdatingETARepository { + private referenceTime: Date | null = null; + private travelTimeData: Map> = new Map(); + constructor( readonly shuttleRepository: ShuttleGetterRepository ) { super(); - } - setReferenceTime(referenceTime: Date): void { - throw new Error("Method not implemented."); + + this.setReferenceTime = this.setReferenceTime.bind(this); + this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this); + this.startListeningForUpdates = this.startListeningForUpdates.bind(this); + this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this); + this.handleShuttleWillArriveAtStop = this.handleShuttleWillArriveAtStop.bind(this); } - getAverageTravelTimeSeconds(identifier: ShuttleTravelTimeDataIdentifier, dateFilter: ShuttleTravelTimeDateFilterArguments): Promise { - throw new Error("Method not implemented."); + setReferenceTime(referenceTime: Date): void { + this.referenceTime = referenceTime; + } + + async getAverageTravelTimeSeconds( + { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, + { from, to }: ShuttleTravelTimeDateFilterArguments + ): Promise { + const key = `${routeId}:${fromStopId}:${toStopId}`; + const dataPoints = this.travelTimeData.get(key); + + if (!dataPoints || dataPoints.length === 0) { + return undefined; + } + + const fromTimestamp = from.getTime(); + const toTimestamp = to.getTime(); + + const filteredPoints = dataPoints.filter( + (point) => point.timestamp >= fromTimestamp && point.timestamp <= toTimestamp + ); + + if (filteredPoints.length === 0) { + return undefined; + } + + const sum = filteredPoints.reduce((acc, point) => acc + point.seconds, 0); + return sum / filteredPoints.length; } startListeningForUpdates(): void { - throw new Error("Method not implemented."); + this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); + this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop); + } + + private async handleShuttleUpdate(shuttle: IShuttle): Promise { + const lastStop = await this.shuttleRepository.getShuttleLastStopArrival(shuttle.id); + if (!lastStop) return; + + const lastOrderedStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId); + const nextStop = lastOrderedStop?.nextStop; + if (!nextStop) return; + + let referenceCurrentTime = new Date(); + if (this.referenceTime != null) { + referenceCurrentTime = this.referenceTime; + } + const oneWeekAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 7 * 1000)); + + const travelTimeSeconds = await this.getAverageTravelTimeSeconds({ + routeId: shuttle.routeId, + fromStopId: lastStop.stopId, + toStopId: nextStop.stopId, + }, { + from: oneWeekAgo, + to: new Date(oneWeekAgo.getTime() + (60 * 60 * 1000)) + }); + if (travelTimeSeconds == undefined) return; + + const elapsedTimeMs = referenceCurrentTime.getTime() - lastStop.timestamp.getTime(); + const predictedTimeSeconds = travelTimeSeconds - (elapsedTimeMs / 1000); + + await this.addOrUpdateEta({ + secondsRemaining: predictedTimeSeconds, + shuttleId: shuttle.id, + stopId: nextStop.stopId, + systemId: nextStop.systemId, + updatedTime: new Date(), + }); + } + + private async handleShuttleWillArriveAtStop(shuttleArrival: ShuttleStopArrival): Promise { + const lastStopTimestamp = await this.shuttleRepository.getShuttleLastStopArrival(shuttleArrival.shuttleId); + if (lastStopTimestamp) { + const shuttle = await this.shuttleRepository.getShuttleById(lastStopTimestamp.shuttleId); + if (!shuttle) return; + + const routeId = shuttle.routeId; + const fromStopId = lastStopTimestamp.stopId; + const toStopId = shuttleArrival.stopId; + + const travelTimeSeconds = (shuttleArrival.timestamp.getTime() - lastStopTimestamp.timestamp.getTime()) / 1000; + await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId }, travelTimeSeconds, shuttleArrival.timestamp.getTime()); + } + } + + private async addTravelTimeDataPoint( + { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, + travelTimeSeconds: number, + timestamp = Date.now(), + ): Promise { + const key = `${routeId}:${fromStopId}:${toStopId}`; + const dataPoints = this.travelTimeData.get(key) || []; + dataPoints.push({ timestamp, seconds: travelTimeSeconds }); + this.travelTimeData.set(key, dataPoints); } } diff --git a/src/repositories/shuttle/eta/__tests__/ExternalSourceETARepositorySharedTests.test.ts b/src/repositories/shuttle/eta/__tests__/ExternalSourceETARepositorySharedTests.test.ts index eb4d039..73aac3a 100644 --- a/src/repositories/shuttle/eta/__tests__/ExternalSourceETARepositorySharedTests.test.ts +++ b/src/repositories/shuttle/eta/__tests__/ExternalSourceETARepositorySharedTests.test.ts @@ -3,6 +3,7 @@ import { createClient, RedisClientType } from "redis"; import { RepositoryHolder } from "../../../../../testHelpers/RepositoryHolder"; import { ExternalSourceETARepository } from "../ExternalSourceETARepository"; import { RedisExternalSourceETARepository } from "../RedisExternalSourceETARepository"; +import { InMemoryExternalSourceETARepository } from "../InMemoryExternalSourceETARepository"; import { generateMockEtas } from "../../../../../testHelpers/mockDataGenerators"; class RedisExternalSourceETARepositoryHolder implements RepositoryHolder { @@ -26,8 +27,22 @@ class RedisExternalSourceETARepositoryHolder implements RepositoryHolder { + repo: InMemoryExternalSourceETARepository | undefined; + + name = "InMemoryExternalSourceETARepository" + factory = async () => { + this.repo = new InMemoryExternalSourceETARepository(); + return this.repo; + } + teardown = async () => { + // No teardown needed for in-memory + } +} + const repositoryImplementations = [ - new RedisExternalSourceETARepositoryHolder() + new RedisExternalSourceETARepositoryHolder(), + new InMemoryExternalSourceETARepositoryHolder() ]; describe.each(repositoryImplementations)('$name', (holder) => { diff --git a/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts b/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts index 75ce50a..cbf349f 100644 --- a/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts +++ b/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts @@ -3,8 +3,11 @@ import { createClient, RedisClientType } from "redis"; import { RepositoryHolder } from "../../../../../testHelpers/RepositoryHolder"; import { SelfUpdatingETARepository } from "../SelfUpdatingETARepository"; import { RedisSelfUpdatingETARepository } from "../RedisSelfUpdatingETARepository"; +import { InMemorySelfUpdatingETARepository } from "../InMemorySelfUpdatingETARepository"; import { RedisShuttleRepository } from "../../RedisShuttleRepository"; +import { UnoptimizedInMemoryShuttleRepository } from "../../UnoptimizedInMemoryShuttleRepository"; import { setupRouteAndOrderedStopsForShuttleRepository } from "../../../../../testHelpers/setupRouteAndOrderedStopsForShuttleRepository"; +import { ShuttleGetterSetterRepository } from "../../ShuttleGetterSetterRepository"; class RedisSelfUpdatingETARepositoryHolder implements RepositoryHolder { repo: RedisSelfUpdatingETARepository | undefined; @@ -32,13 +35,29 @@ class RedisSelfUpdatingETARepositoryHolder implements RepositoryHolder { + repo: InMemorySelfUpdatingETARepository | undefined; + shuttleRepo: UnoptimizedInMemoryShuttleRepository | undefined; + + name = "InMemorySelfUpdatingETARepository" + factory = async () => { + this.shuttleRepo = new UnoptimizedInMemoryShuttleRepository(); + this.repo = new InMemorySelfUpdatingETARepository(this.shuttleRepo); + return this.repo; + } + teardown = async () => { + // No teardown needed for in-memory + } +} + const repositoryImplementations = [ - new RedisSelfUpdatingETARepositoryHolder() + new RedisSelfUpdatingETARepositoryHolder(), + new InMemorySelfUpdatingETARepositoryHolder() ]; describe.each(repositoryImplementations)('$name', (holder) => { let repository: SelfUpdatingETARepository; - let shuttleRepository: RedisShuttleRepository; + let shuttleRepository: ShuttleGetterSetterRepository; beforeEach(async () => { repository = await holder.factory();