Change SHUTTLE_WILL_ARRIVE_AT_STOP to return payload of last stop and current stop, to avoid data race

This commit is contained in:
2025-11-13 18:52:13 -08:00
parent 0cf2a4d2e7
commit d6ad90ee7a
6 changed files with 60 additions and 36 deletions

View File

@@ -405,7 +405,10 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
timestamp: new Date(travelTimeTimestamp), timestamp: new Date(travelTimeTimestamp),
shuttleId: shuttle.id, shuttleId: shuttle.id,
}; };
this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, shuttleArrival); this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, {
lastArrival: lastStop,
currentArrival: shuttleArrival,
});
await this.updateShuttleLastStopArrival(shuttleArrival); await this.updateShuttleLastStopArrival(shuttleArrival);
} }
} }

View File

@@ -12,10 +12,15 @@ export type ShuttleRepositoryEventName = typeof ShuttleRepositoryEvent[keyof typ
export type EtaRemovedEventPayload = IEta; export type EtaRemovedEventPayload = IEta;
export type EtaDataClearedEventPayload = IEta[]; export type EtaDataClearedEventPayload = IEta[];
export interface WillArriveAtStopPayload {
lastArrival?: ShuttleStopArrival;
currentArrival: ShuttleStopArrival;
};
export interface ShuttleRepositoryEventPayloads { export interface ShuttleRepositoryEventPayloads {
[ShuttleRepositoryEvent.SHUTTLE_UPDATED]: IShuttle, [ShuttleRepositoryEvent.SHUTTLE_UPDATED]: IShuttle,
[ShuttleRepositoryEvent.SHUTTLE_REMOVED]: IShuttle, [ShuttleRepositoryEvent.SHUTTLE_REMOVED]: IShuttle,
[ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP]: ShuttleStopArrival, [ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP]: WillArriveAtStopPayload,
} }
export type ShuttleRepositoryEventListener<T extends ShuttleRepositoryEventName> = ( export type ShuttleRepositoryEventListener<T extends ShuttleRepositoryEventName> = (

View File

@@ -177,12 +177,19 @@ export class UnoptimizedInMemoryShuttleRepository
const arrivedStop = await this.getArrivedStopIfExists(shuttle); const arrivedStop = await this.getArrivedStopIfExists(shuttle);
if (arrivedStop != undefined) { if (arrivedStop != undefined) {
// stop if same stop
const lastStop = await this.getShuttleLastStopArrival(shuttle.id);
if (lastStop?.stopId === arrivedStop.id) return;
const shuttleArrival = { const shuttleArrival = {
stopId: arrivedStop.id, stopId: arrivedStop.id,
timestamp: new Date(travelTimeTimestamp), timestamp: new Date(travelTimeTimestamp),
shuttleId: shuttle.id, shuttleId: shuttle.id,
}; };
this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, shuttleArrival); this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, {
lastArrival: lastStop,
currentArrival: shuttleArrival,
});
await this.updateShuttleLastStopArrival(shuttleArrival); await this.updateShuttleLastStopArrival(shuttleArrival);
} }
} }

View File

@@ -734,9 +734,11 @@ describe.each(repositoryImplementations)('$name', (holder) => {
expect(listener).toHaveBeenCalledTimes(1); expect(listener).toHaveBeenCalledTimes(1);
const emittedPayload = listener.mock.calls[0][0] as any; const emittedPayload = listener.mock.calls[0][0] as any;
expect(emittedPayload.shuttleId).toBe(shuttle.id); expect(emittedPayload.currentArrival).toEqual({
expect(emittedPayload.stopId).toBe(stop1.id); shuttleId: shuttle.id,
expect(emittedPayload.timestamp.getTime()).toBe(arrivalTime.getTime()); stopId: stop1.id,
timestamp: arrivalTime,
});
}); });
test("does not emit event when shuttle is not at a stop", async () => { test("does not emit event when shuttle is not at a stop", async () => {
@@ -786,14 +788,18 @@ describe.each(repositoryImplementations)('$name', (holder) => {
expect(listener).toHaveBeenCalledTimes(2); expect(listener).toHaveBeenCalledTimes(2);
const firstPayload = listener.mock.calls[0][0] as any; const firstPayload = listener.mock.calls[0][0] as any;
expect(firstPayload.shuttleId).toBe(shuttle.id); expect(firstPayload.currentArrival).toEqual({
expect(firstPayload.stopId).toBe(stop1.id); shuttleId: shuttle.id,
expect(firstPayload.timestamp.getTime()).toBe(firstArrivalTime.getTime()); stopId: stop1.id,
timestamp: firstArrivalTime,
});
const secondPayload = listener.mock.calls[1][0] as any; const secondPayload = listener.mock.calls[1][0] as any;
expect(secondPayload.shuttleId).toBe(shuttle.id); expect(secondPayload.currentArrival).toEqual({
expect(secondPayload.stopId).toBe(stop2.id); shuttleId: shuttle.id,
expect(secondPayload.timestamp.getTime()).toBe(secondArrivalTime.getTime()); stopId: stop2.id,
timestamp: secondArrivalTime,
});
}); });
}); });
}); });

View File

@@ -1,5 +1,5 @@
import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository"; import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository";
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments } from "../ShuttleGetterRepository"; import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, WillArriveAtStopPayload } from "../ShuttleGetterRepository";
import { BaseInMemoryETARepository } from "./BaseInMemoryETARepository"; import { BaseInMemoryETARepository } from "./BaseInMemoryETARepository";
import { IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities"; import { IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities";
import { ETARepositoryEvent } from "./ETAGetterRepository"; import { ETARepositoryEvent } from "./ETAGetterRepository";
@@ -155,26 +155,28 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository
) )
} }
private async handleShuttleWillArriveAtStop(shuttleArrival: ShuttleStopArrival): Promise<void> { private async handleShuttleWillArriveAtStop({
const etas = await this.getEtasForShuttleId(shuttleArrival.shuttleId); lastArrival,
currentArrival,
}: WillArriveAtStopPayload): Promise<void> {
const etas = await this.getEtasForShuttleId(currentArrival.shuttleId);
for (const eta of etas) { for (const eta of etas) {
await this.removeEtaIfExists(eta.shuttleId, eta.stopId); await this.removeEtaIfExists(eta.shuttleId, eta.stopId);
} }
const lastStopTimestamp = await this.shuttleRepository.getShuttleLastStopArrival(shuttleArrival.shuttleId); if (lastArrival) {
if (lastStopTimestamp) {
// disallow cases where this gets triggered multiple times // disallow cases where this gets triggered multiple times
if (lastStopTimestamp.stopId === shuttleArrival.stopId) return; if (lastArrival.stopId === currentArrival.stopId) return;
const shuttle = await this.shuttleRepository.getShuttleById(lastStopTimestamp.shuttleId); const shuttle = await this.shuttleRepository.getShuttleById(lastArrival.shuttleId);
if (!shuttle) return; if (!shuttle) return;
const routeId = shuttle.routeId; const routeId = shuttle.routeId;
const fromStopId = lastStopTimestamp.stopId; const fromStopId = lastArrival.stopId;
const toStopId = shuttleArrival.stopId; const toStopId = currentArrival.stopId;
const travelTimeSeconds = (shuttleArrival.timestamp.getTime() - lastStopTimestamp.timestamp.getTime()) / 1000; const travelTimeSeconds = (currentArrival.timestamp.getTime() - lastArrival.timestamp.getTime()) / 1000;
await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId }, travelTimeSeconds, shuttleArrival.timestamp.getTime()); await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId }, travelTimeSeconds, currentArrival.timestamp.getTime());
} }
} }

View File

@@ -1,7 +1,7 @@
import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository"; import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository";
import { BaseRedisETARepository } from "./BaseRedisETARepository"; import { BaseRedisETARepository } from "./BaseRedisETARepository";
import { createClient, RedisClientType } from "redis"; import { createClient, RedisClientType } from "redis";
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments } from "../ShuttleGetterRepository"; import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, WillArriveAtStopPayload } from "../ShuttleGetterRepository";
import { REDIS_RECONNECT_INTERVAL } from "../../../environment"; import { REDIS_RECONNECT_INTERVAL } from "../../../environment";
import { IEta, IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities"; import { IEta, IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities";
import { ETARepositoryEvent } from "./ETAGetterRepository"; import { ETARepositoryEvent } from "./ETAGetterRepository";
@@ -174,28 +174,29 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple
} }
private async handleShuttleWillArriveAtStop( private async handleShuttleWillArriveAtStop({
shuttleArrival: ShuttleStopArrival, lastArrival,
) { currentArrival,
const etas = await this.getEtasForShuttleId(shuttleArrival.shuttleId); }: WillArriveAtStopPayload) {
const etas = await this.getEtasForShuttleId(currentArrival.shuttleId);
for (const eta of etas) { for (const eta of etas) {
await this.removeEtaIfExists(eta.shuttleId, eta.stopId); await this.removeEtaIfExists(eta.shuttleId, eta.stopId);
} }
const lastStopTimestamp = await this.shuttleRepository.getShuttleLastStopArrival(shuttleArrival.shuttleId); // only update time traveled if last arrival exists
if (lastStopTimestamp) { if (lastArrival) {
// disallow cases where this gets triggered multiple times // disallow cases where this gets triggered multiple times
if (lastStopTimestamp.stopId === shuttleArrival.stopId) return; if (lastArrival.stopId === currentArrival.stopId) return;
const shuttle = await this.shuttleRepository.getShuttleById(lastStopTimestamp.shuttleId); const shuttle = await this.shuttleRepository.getShuttleById(lastArrival.shuttleId);
if (!shuttle) return; if (!shuttle) return;
const routeId = shuttle.routeId; const routeId = shuttle.routeId;
const fromStopId = lastStopTimestamp.stopId; const fromStopId = lastArrival.stopId;
const toStopId = shuttleArrival.stopId; const toStopId = currentArrival.stopId;
const travelTimeSeconds = (shuttleArrival.timestamp.getTime() - lastStopTimestamp.timestamp.getTime()) / 1000; const travelTimeSeconds = (currentArrival.timestamp.getTime() - lastArrival.timestamp.getTime()) / 1000;
await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId, }, travelTimeSeconds, shuttleArrival.timestamp.getTime()); await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId, }, travelTimeSeconds, currentArrival.timestamp.getTime());
} }
} }