diff --git a/src/repositories/shuttle/eta/InMemorySelfUpdatingETARepository.ts b/src/repositories/shuttle/eta/InMemorySelfUpdatingETARepository.ts index 87945a4..164b734 100644 --- a/src/repositories/shuttle/eta/InMemorySelfUpdatingETARepository.ts +++ b/src/repositories/shuttle/eta/InMemorySelfUpdatingETARepository.ts @@ -1,5 +1,5 @@ import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository"; -import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, ShuttleWillArriveAtStopPayload } from "../ShuttleGetterRepository"; +import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, ShuttleWillArriveAtStopPayload, ShuttleWillLeaveStopPayload } from "../ShuttleGetterRepository"; import { BaseInMemoryETARepository } from "./BaseInMemoryETARepository"; import { IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities"; import { ETARepositoryEvent } from "./ETAGetterRepository"; @@ -8,6 +8,8 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository private referenceTime: Date | null = null; private travelTimeData: Map> = new Map(); + private isListening = false; + constructor( readonly shuttleRepository: ShuttleGetterRepository ) { @@ -16,8 +18,12 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository this.setReferenceTime = this.setReferenceTime.bind(this); this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this); this.startListeningForUpdates = this.startListeningForUpdates.bind(this); - this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this); this.handleShuttleWillArriveAtStop = this.handleShuttleWillArriveAtStop.bind(this); + this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this); + this.updateCascadingEta = this.updateCascadingEta.bind(this); + this.getAverageTravelTimeSecondsWithFallbacks = this.getAverageTravelTimeSecondsWithFallbacks.bind(this); + this.removeEtaIfExists = this.removeEtaIfExists.bind(this); + this.handleShuttleWillLeaveStop = this.handleShuttleWillLeaveStop.bind(this); } setReferenceTime(referenceTime: Date): void { @@ -51,13 +57,23 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository } startListeningForUpdates(): void { + if (this.isListening) { + console.warn("Already listening to updates; did you call startListeningForUpdates twice?"); + return; + } this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop); + this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop); + this.isListening = true; } - stopListeningForUpdates(): void { + if (!this.isListening) { + return; + } this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop); + this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop); + this.isListening = false; } private async getAverageTravelTimeSecondsWithFallbacks( @@ -176,6 +192,12 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository } } + private async handleShuttleWillLeaveStop({ + stopArrivalThatShuttleIsLeaving, + }: ShuttleWillLeaveStopPayload) { + await this.removeEtaIfExists(stopArrivalThatShuttleIsLeaving.shuttleId, stopArrivalThatShuttleIsLeaving.stopId); + } + private async addTravelTimeDataPoint( { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, travelTimeSeconds: number, diff --git a/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts b/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts index b9f292c..230eca1 100644 --- a/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts +++ b/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts @@ -1,12 +1,14 @@ import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository"; import { BaseRedisETARepository } from "./BaseRedisETARepository"; import { createClient, RedisClientType } from "redis"; -import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, ShuttleWillArriveAtStopPayload } from "../ShuttleGetterRepository"; +import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, ShuttleWillArriveAtStopPayload, ShuttleWillLeaveStopPayload } from "../ShuttleGetterRepository"; import { REDIS_RECONNECT_INTERVAL } from "../../../environment"; import { IEta, IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities"; import { ETARepositoryEvent } from "./ETAGetterRepository"; export class RedisSelfUpdatingETARepository extends BaseRedisETARepository implements SelfUpdatingETARepository { + private isListening = false; + constructor( readonly shuttleRepository: ShuttleGetterRepository, redisClient: RedisClientType = createClient({ @@ -29,6 +31,7 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple this.updateCascadingEta = this.updateCascadingEta.bind(this); this.getAverageTravelTimeSecondsWithFallbacks = this.getAverageTravelTimeSecondsWithFallbacks.bind(this); this.removeEtaIfExists = this.removeEtaIfExists.bind(this); + this.handleShuttleWillLeaveStop = this.handleShuttleWillLeaveStop.bind(this); } private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => { @@ -71,14 +74,25 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple } } - startListeningForUpdates() { + startListeningForUpdates(): void { + if (this.isListening) { + console.warn("Already listening to updates; did you call startListeningForUpdates twice?"); + return; + } this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); - this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop) + this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop); + this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop); + this.isListening = true; } - stopListeningForUpdates() { + stopListeningForUpdates(): void { + if (!this.isListening) { + return; + } this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop); + this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop); + this.isListening = false; } private async getAverageTravelTimeSecondsWithFallbacks( @@ -199,6 +213,13 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple } } + private async handleShuttleWillLeaveStop({ + stopArrivalThatShuttleIsLeaving, + }: ShuttleWillLeaveStopPayload) { + await this.removeEtaIfExists(stopArrivalThatShuttleIsLeaving.shuttleId, stopArrivalThatShuttleIsLeaving.stopId); + } + + public async addTravelTimeDataPoint( { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, travelTimeSeconds: number, diff --git a/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts b/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts index 6f7d266..86ae52b 100644 --- a/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts +++ b/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts @@ -54,7 +54,7 @@ class InMemorySelfUpdatingETARepositoryHolder implements RepositoryHolder { @@ -200,13 +200,10 @@ describe.each(repositoryImplementations)('$name', (holder) => { test("clears ETA of correct stop on leaving stop", async () => { const { stop1, stop2, stop3, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); - const shuttleSecondArrivalTimeAtFirstStop = new Date(2025, 0, 1, 12, 5, 0); - const shuttleSecondArrivalTimeAtSecondStop = new Date(2025, 0, 1, 12, 20, 0); + const shuttleSecondArrivalTimeAtFirstStop = new Date(2025, 0, 8, 12, 0, 0); + const shuttleSecondArrivalTimeAtSecondStop = new Date(2025, 0, 8, 12, 15, 0); const currentTime = new Date(shuttleSecondArrivalTimeAtSecondStop.getTime() + 3 * 60 * 1000); - repository.setReferenceTime(currentTime); - repository.startListeningForUpdates(); - await populateTravelTimeDataForStops({ currentTime, shuttle, stop1, stop2, stop3 }); // Populating ETA data