Add implementation and test for leaving stop event

This commit is contained in:
2025-11-19 11:31:13 -08:00
parent b8c3f17510
commit 545eaef74a
3 changed files with 53 additions and 13 deletions

View File

@@ -1,5 +1,5 @@
import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository";
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, ShuttleWillArriveAtStopPayload } from "../ShuttleGetterRepository";
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, ShuttleWillArriveAtStopPayload, ShuttleWillLeaveStopPayload } from "../ShuttleGetterRepository";
import { BaseInMemoryETARepository } from "./BaseInMemoryETARepository";
import { IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities";
import { ETARepositoryEvent } from "./ETAGetterRepository";
@@ -8,6 +8,8 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository
private referenceTime: Date | null = null;
private travelTimeData: Map<string, Array<{ timestamp: number; seconds: number }>> = new Map();
private isListening = false;
constructor(
readonly shuttleRepository: ShuttleGetterRepository
) {
@@ -16,8 +18,12 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository
this.setReferenceTime = this.setReferenceTime.bind(this);
this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this);
this.startListeningForUpdates = this.startListeningForUpdates.bind(this);
this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this);
this.handleShuttleWillArriveAtStop = this.handleShuttleWillArriveAtStop.bind(this);
this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this);
this.updateCascadingEta = this.updateCascadingEta.bind(this);
this.getAverageTravelTimeSecondsWithFallbacks = this.getAverageTravelTimeSecondsWithFallbacks.bind(this);
this.removeEtaIfExists = this.removeEtaIfExists.bind(this);
this.handleShuttleWillLeaveStop = this.handleShuttleWillLeaveStop.bind(this);
}
setReferenceTime(referenceTime: Date): void {
@@ -51,13 +57,23 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository
}
startListeningForUpdates(): void {
if (this.isListening) {
console.warn("Already listening to updates; did you call startListeningForUpdates twice?");
return;
}
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop);
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop);
this.isListening = true;
}
stopListeningForUpdates(): void {
if (!this.isListening) {
return;
}
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop);
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop);
this.isListening = false;
}
private async getAverageTravelTimeSecondsWithFallbacks(
@@ -176,6 +192,12 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository
}
}
private async handleShuttleWillLeaveStop({
stopArrivalThatShuttleIsLeaving,
}: ShuttleWillLeaveStopPayload) {
await this.removeEtaIfExists(stopArrivalThatShuttleIsLeaving.shuttleId, stopArrivalThatShuttleIsLeaving.stopId);
}
private async addTravelTimeDataPoint(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
travelTimeSeconds: number,

View File

@@ -1,12 +1,14 @@
import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository";
import { BaseRedisETARepository } from "./BaseRedisETARepository";
import { createClient, RedisClientType } from "redis";
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, ShuttleWillArriveAtStopPayload } from "../ShuttleGetterRepository";
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, ShuttleWillArriveAtStopPayload, ShuttleWillLeaveStopPayload } from "../ShuttleGetterRepository";
import { REDIS_RECONNECT_INTERVAL } from "../../../environment";
import { IEta, IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities";
import { ETARepositoryEvent } from "./ETAGetterRepository";
export class RedisSelfUpdatingETARepository extends BaseRedisETARepository implements SelfUpdatingETARepository {
private isListening = false;
constructor(
readonly shuttleRepository: ShuttleGetterRepository,
redisClient: RedisClientType = createClient({
@@ -29,6 +31,7 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple
this.updateCascadingEta = this.updateCascadingEta.bind(this);
this.getAverageTravelTimeSecondsWithFallbacks = this.getAverageTravelTimeSecondsWithFallbacks.bind(this);
this.removeEtaIfExists = this.removeEtaIfExists.bind(this);
this.handleShuttleWillLeaveStop = this.handleShuttleWillLeaveStop.bind(this);
}
private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => {
@@ -71,14 +74,25 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple
}
}
startListeningForUpdates() {
startListeningForUpdates(): void {
if (this.isListening) {
console.warn("Already listening to updates; did you call startListeningForUpdates twice?");
return;
}
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop)
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop);
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop);
this.isListening = true;
}
stopListeningForUpdates() {
stopListeningForUpdates(): void {
if (!this.isListening) {
return;
}
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop);
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop);
this.isListening = false;
}
private async getAverageTravelTimeSecondsWithFallbacks(
@@ -199,6 +213,13 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple
}
}
private async handleShuttleWillLeaveStop({
stopArrivalThatShuttleIsLeaving,
}: ShuttleWillLeaveStopPayload) {
await this.removeEtaIfExists(stopArrivalThatShuttleIsLeaving.shuttleId, stopArrivalThatShuttleIsLeaving.stopId);
}
public async addTravelTimeDataPoint(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
travelTimeSeconds: number,

View File

@@ -54,7 +54,7 @@ class InMemorySelfUpdatingETARepositoryHolder implements RepositoryHolder<SelfUp
const repositoryImplementations = [
new RedisSelfUpdatingETARepositoryHolder(),
new InMemorySelfUpdatingETARepositoryHolder()
new InMemorySelfUpdatingETARepositoryHolder(),
];
describe.each(repositoryImplementations)('$name', (holder) => {
@@ -200,13 +200,10 @@ describe.each(repositoryImplementations)('$name', (holder) => {
test("clears ETA of correct stop on leaving stop", async () => {
const { stop1, stop2, stop3, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
const shuttleSecondArrivalTimeAtFirstStop = new Date(2025, 0, 1, 12, 5, 0);
const shuttleSecondArrivalTimeAtSecondStop = new Date(2025, 0, 1, 12, 20, 0);
const shuttleSecondArrivalTimeAtFirstStop = new Date(2025, 0, 8, 12, 0, 0);
const shuttleSecondArrivalTimeAtSecondStop = new Date(2025, 0, 8, 12, 15, 0);
const currentTime = new Date(shuttleSecondArrivalTimeAtSecondStop.getTime() + 3 * 60 * 1000);
repository.setReferenceTime(currentTime);
repository.startListeningForUpdates();
await populateTravelTimeDataForStops({ currentTime, shuttle, stop1, stop2, stop3 });
// Populating ETA data