diff --git a/package.json b/package.json index f0a7798..bd558b4 100644 --- a/package.json +++ b/package.json @@ -4,8 +4,8 @@ "description": "", "main": "dist/index.js", "scripts": { - "build:dev": "npm install --include=dev && npm run generate && tsc", - "build": "npm install --include=dev && npm run generate && tsc && npm prune --omit=dev", + "build:dev": "npm install --include=dev && npm run generate && tsc --project tsconfig.build.json", + "build": "npm install --include=dev && npm run generate && tsc --project tsconfig.build.json && npm prune --omit=dev", "start:dev": "npm run build:dev && node ./dist/index.js", "start": "npm run build && node ./dist/index.js", "generate": "graphql-codegen --config codegen.ts", diff --git a/src/entities/InterchangeSystem.ts b/src/entities/InterchangeSystem.ts index 985a819..c0f8383 100644 --- a/src/entities/InterchangeSystem.ts +++ b/src/entities/InterchangeSystem.ts @@ -23,6 +23,7 @@ import { RedisExternalSourceETARepository } from "../repositories/shuttle/eta/Re import { InMemorySelfUpdatingETARepository } from "../repositories/shuttle/eta/InMemorySelfUpdatingETARepository"; import { BaseRedisETARepository } from "../repositories/shuttle/eta/BaseRedisETARepository"; import { BaseInMemoryETARepository } from "../repositories/shuttle/eta/BaseInMemoryETARepository"; +import createRedisClientForRepository from "../helpers/createRedisClientForRepository"; export interface InterchangeSystemBuilderArguments { name: string; @@ -46,7 +47,13 @@ export interface InterchangeSystemBuilderArguments { * Controls whether to self-calculate ETAs or use the external * shuttle provider for them. */ - useSelfUpdatingEtas: boolean + useSelfUpdatingEtas: boolean; + + /** + * The size of the threshold to detect when a shuttle has arrived + * at a stop, in latitude/longitude degrees. + */ + shuttleStopArrivalDegreeDelta: number; } export class InterchangeSystem { @@ -98,7 +105,10 @@ export class InterchangeSystem { } private static async buildRedisShuttleLoaderAndRepositories(args: InterchangeSystemBuilderArguments) { - const shuttleRepository = new RedisShuttleRepository(); + const shuttleRepository = new RedisShuttleRepository( + createRedisClientForRepository(), + args.shuttleStopArrivalDegreeDelta, + ); await shuttleRepository.connect(); let etaRepository: BaseRedisETARepository; @@ -247,7 +257,9 @@ export class InterchangeSystem { } private static buildInMemoryShuttleLoaderAndRepositories(args: InterchangeSystemBuilderArguments) { - const shuttleRepository = new UnoptimizedInMemoryShuttleRepository(); + const shuttleRepository = new UnoptimizedInMemoryShuttleRepository( + args.shuttleStopArrivalDegreeDelta, + ); let etaRepository: BaseInMemoryETARepository; let shuttleDataLoader: ApiBasedShuttleRepositoryLoader; diff --git a/src/helpers/createRedisClientForRepository.ts b/src/helpers/createRedisClientForRepository.ts new file mode 100644 index 0000000..55312fd --- /dev/null +++ b/src/helpers/createRedisClientForRepository.ts @@ -0,0 +1,14 @@ +import { createClient, RedisClientType } from "redis"; +import { REDIS_RECONNECT_INTERVAL } from "../environment"; + +export default function createRedisClientForRepository() { + const client = createClient({ + url: process.env.REDIS_URL, + socket: { + tls: process.env.NODE_ENV === 'production', + rejectUnauthorized: false, + reconnectStrategy: REDIS_RECONNECT_INTERVAL, + }, + }); + return client as RedisClientType; +} diff --git a/src/index.ts b/src/index.ts index a8fd797..7340b04 100644 --- a/src/index.ts +++ b/src/index.ts @@ -24,6 +24,7 @@ const supportedSystems: InterchangeSystemBuilderArguments[] = [ parkingSystemId: ChapmanApiBasedParkingRepositoryLoader.id, name: "Chapman University", useSelfUpdatingEtas: true, + shuttleStopArrivalDegreeDelta: 0.001, } ] diff --git a/src/loaders/shuttle/ApiBasedShuttleRepositoryLoader.ts b/src/loaders/shuttle/ApiBasedShuttleRepositoryLoader.ts index 19d4a14..bef753a 100644 --- a/src/loaders/shuttle/ApiBasedShuttleRepositoryLoader.ts +++ b/src/loaders/shuttle/ApiBasedShuttleRepositoryLoader.ts @@ -37,9 +37,9 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader await this.updateStopAndPolylineDataForRoutesInSystem(); await this.updateShuttleDataForSystemBasedOnProximityToRoutes(); - // Because ETA method doesn't support pruning yet, - // add a call to the clear method here - await this.updateEtaDataForExistingStopsForSystem(); + if (this.etaRepository) { + await this.updateEtaDataForExistingStopsForSystem(); + } } public async updateRouteDataForSystem() { diff --git a/src/repositories/BaseRedisRepository.ts b/src/repositories/BaseRedisRepository.ts index f722586..c2ec079 100644 --- a/src/repositories/BaseRedisRepository.ts +++ b/src/repositories/BaseRedisRepository.ts @@ -1,19 +1,12 @@ -import { createClient, RedisClientType } from 'redis'; -import { REDIS_RECONNECT_INTERVAL } from "../environment"; +import { RedisClientType } from 'redis'; import { EventEmitter } from 'stream'; +import createRedisClientForRepository from '../helpers/createRedisClientForRepository'; export abstract class BaseRedisRepository extends EventEmitter { protected redisClient; constructor( - redisClient: RedisClientType = createClient({ - url: process.env.REDIS_URL, - socket: { - tls: process.env.NODE_ENV === 'production', - rejectUnauthorized: false, - reconnectStrategy: REDIS_RECONNECT_INTERVAL, - }, - }), + redisClient: RedisClientType = createRedisClientForRepository(), ) { super(); this.redisClient = redisClient; diff --git a/src/repositories/shuttle/RedisShuttleRepository.ts b/src/repositories/shuttle/RedisShuttleRepository.ts index 763dde5..3fb63c0 100644 --- a/src/repositories/shuttle/RedisShuttleRepository.ts +++ b/src/repositories/shuttle/RedisShuttleRepository.ts @@ -10,8 +10,17 @@ import { ShuttleTravelTimeDateFilterArguments } from "./ShuttleGetterRepository"; import { BaseRedisRepository } from "../BaseRedisRepository"; +import { RedisClientType } from "redis"; +import createRedisClientForRepository from "../../helpers/createRedisClientForRepository"; export class RedisShuttleRepository extends BaseRedisRepository implements ShuttleGetterSetterRepository { + constructor( + redisClient: RedisClientType = createRedisClientForRepository(), + readonly shuttleStopArrivalDegreeDelta: number = 0.001, + ) { + super(redisClient); + } + get isReady() { return this.redisClient.isReady; } @@ -73,16 +82,35 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt return super.emit(event, ...args); } + // Key prefixes for individual entity keys + private readonly stopKeyPrefix = 'shuttle:stop:'; + private readonly routeKeyPrefix = 'shuttle:route:'; + private readonly shuttleKeyPrefix = 'shuttle:shuttle:'; + private readonly orderedStopKeyPrefix = 'shuttle:orderedstop:'; + private readonly lastStopKeyPrefix = 'shuttle:laststop:'; + private readonly historicalEtaKeyPrefix = 'shuttle:eta:historical:'; + + // Key patterns for bulk operations (e.g., getting all keys, clearing data) + private readonly stopKeyPattern = 'shuttle:stop:*'; + private readonly routeKeyPattern = 'shuttle:route:*'; + private readonly shuttleKeyPattern = 'shuttle:shuttle:*'; + private readonly orderedStopKeyPattern = 'shuttle:orderedstop:*'; + private readonly lastStopKeyPattern = 'shuttle:laststop:*'; + + /** + * Represents a set storing the shuttles that are currently at a stop. + */ + private readonly shuttleIsAtStopKey = 'shuttle:atstop'; + // Helper methods for Redis key generation - private createStopKey = (stopId: string) => `shuttle:stop:${stopId}`; - private createRouteKey = (routeId: string) => `shuttle:route:${routeId}`; - private createShuttleKey = (shuttleId: string) => `shuttle:shuttle:${shuttleId}`; - private createEtaKey = (shuttleId: string, stopId: string) => `shuttle:eta:${shuttleId}:${stopId}`; - private createOrderedStopKey = (routeId: string, stopId: string) => `shuttle:orderedstop:${routeId}:${stopId}`; - private createShuttleLastStopKey = (shuttleId: string) => `shuttle:laststop:${shuttleId}`; - private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => { - return `shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`; - } + private readonly createStopKey = (stopId: string) => `${this.stopKeyPrefix}${stopId}`; + private readonly createRouteKey = (routeId: string) => `${this.routeKeyPrefix}${routeId}`; + private readonly createShuttleKey = (shuttleId: string) => `${this.shuttleKeyPrefix}${shuttleId}`; + private readonly createOrderedStopKey = (routeId: string, stopId: string) => `${this.orderedStopKeyPrefix}${routeId}:${stopId}`; + private readonly createShuttleLastStopKey = (shuttleId: string) => `${this.lastStopKeyPrefix}${shuttleId}`; + private readonly createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => { + return `${this.historicalEtaKeyPrefix}${routeId}:${fromStopId}:${toStopId}`; + }; // Helper methods for converting entities to Redis hashes private createRedisHashFromStop = (stop: IStop): Record => ({ @@ -214,7 +242,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt // Getter methods public async getStops(): Promise { - const keys = await this.redisClient.keys('shuttle:stop:*'); + const keys = await this.redisClient.keys(this.stopKeyPattern); const stops: IStop[] = []; for (const key of keys) { @@ -239,7 +267,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt } public async getRoutes(): Promise { - const keys = await this.redisClient.keys('shuttle:route:*'); + const keys = await this.redisClient.keys(this.routeKeyPattern); const routes: IRoute[] = []; for (const key of keys) { @@ -264,7 +292,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt } public async getShuttles(): Promise { - const keys = await this.redisClient.keys('shuttle:shuttle:*'); + const keys = await this.redisClient.keys(this.shuttleKeyPattern); const shuttles: IShuttle[] = []; for (const key of keys) { @@ -293,45 +321,6 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt return this.createShuttleFromRedisData(data); } - public async getEtasForShuttleId(shuttleId: string): Promise { - const keys = await this.redisClient.keys(`shuttle:eta:${shuttleId}:*`); - const etas: IEta[] = []; - - for (const key of keys) { - const data = await this.redisClient.hGetAll(key); - if (Object.keys(data).length > 0) { - etas.push(this.createEtaFromRedisData(data)); - } - } - - return etas; - } - - public async getEtasForStopId(stopId: string): Promise { - const keys = await this.redisClient.keys('shuttle:eta:*'); - const etas: IEta[] = []; - - for (const key of keys) { - const data = await this.redisClient.hGetAll(key); - if (Object.keys(data).length > 0 && data.stopId === stopId) { - etas.push(this.createEtaFromRedisData(data)); - } - } - - return etas; - } - - public async getEtaForShuttleAndStopId(shuttleId: string, stopId: string): Promise { - const key = this.createEtaKey(shuttleId, stopId); - const data = await this.redisClient.hGetAll(key); - - if (Object.keys(data).length === 0) { - return null; - } - - return this.createEtaFromRedisData(data); - } - public async getOrderedStopByRouteAndStopId(routeId: string, stopId: string): Promise { const key = this.createOrderedStopKey(routeId, stopId); const data = await this.redisClient.hGetAll(key); @@ -344,7 +333,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt } public async getOrderedStopsByStopId(stopId: string): Promise { - const keys = await this.redisClient.keys('shuttle:orderedstop:*'); + const keys = await this.redisClient.keys(this.orderedStopKeyPattern); const orderedStops: IOrderedStop[] = []; for (const key of keys) { @@ -358,7 +347,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt } public async getOrderedStopsByRouteId(routeId: string): Promise { - const keys = await this.redisClient.keys(`shuttle:orderedstop:${routeId}:*`); + const keys = await this.redisClient.keys(`${this.orderedStopKeyPrefix}${routeId}:*`); const orderedStops: IOrderedStop[] = []; for (const key of keys) { @@ -393,26 +382,58 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt shuttle: IShuttle, travelTimeTimestamp = Date.now(), ) { - const arrivedStop = await this.getArrivedStopIfExists(shuttle); + const isAtStop = await this.checkIfShuttleIsAtStop(shuttle.id); + + let arrivedStop: IStop | undefined; + + if (isAtStop) { + // Allow retrieval of the shuttle's current stop + // Will still return undefined when the shuttle leaves the stop + arrivedStop = await this.getArrivedStopIfNextStop(shuttle, true); + } else { + arrivedStop = await this.getArrivedStopIfNextStop(shuttle, false); + } + + // Will not fire *any* events if the same stop + const lastStop = await this.getShuttleLastStopArrival(shuttle.id); + if (lastStop?.stopId === arrivedStop?.id) return; + + if (isAtStop) { + if (lastStop) { + this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, { + stopArrivalThatShuttleIsLeaving: lastStop, + }); + } + await this.markShuttleAsNotAtStop(shuttle.id); + } if (arrivedStop) { - // stop if same stop - const lastStop = await this.getShuttleLastStopArrival(shuttle.id); - if (lastStop?.stopId === arrivedStop.id) return; - const shuttleArrival = { stopId: arrivedStop.id, timestamp: new Date(travelTimeTimestamp), shuttleId: shuttle.id, }; this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, { - lastArrival: lastStop, - currentArrival: shuttleArrival, + lastStopArrival: lastStop, + willArriveAt: shuttleArrival, }); + await this.markShuttleAsAtStop(shuttleArrival.shuttleId); await this.updateShuttleLastStopArrival(shuttleArrival); } } + private async markShuttleAsAtStop(shuttleId: string) { + await this.redisClient.sAdd(this.shuttleIsAtStopKey, shuttleId); + } + + private async markShuttleAsNotAtStop(shuttleId: string) { + await this.redisClient.sRem(this.shuttleIsAtStopKey, shuttleId); + } + + public async checkIfShuttleIsAtStop(shuttleId: string) { + return await this.redisClient.sIsMember(this.shuttleIsAtStopKey, shuttleId); + } + public async getAverageTravelTimeSeconds( { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, { from, to }: ShuttleTravelTimeDateFilterArguments, @@ -445,28 +466,27 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt } } - /** - * Get the stop that the shuttle is currently at, if it exists. - * - * If the shuttle has a "last stop", it will only return the stop - * directly after the last stop. Otherwise, it may return any stop that - * is on the shuttle's route. - * - * @param shuttle - * @param delta - * @returns - */ - public async getArrivedStopIfExists( + public async getArrivedStopIfNextStop( shuttle: IShuttle, - delta = 0.001, + canReturnShuttleCurrentStop: boolean = false, ): Promise { - const lastStop = await this.getShuttleLastStopArrival(shuttle.id); - if (lastStop) { - const lastOrderedStop = await this.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId); + const degreeDelta = this.shuttleStopArrivalDegreeDelta; + + const lastStopArrival = await this.getShuttleLastStopArrival(shuttle.id); + if (lastStopArrival) { + // Return the shuttle's current stop depending on the flag + if (canReturnShuttleCurrentStop) { + const lastStop = await this.getStopById(lastStopArrival.stopId); + if (lastStop && shuttleHasArrivedAtStop(shuttle, lastStop, degreeDelta)) { + return lastStop; + } + } + + const lastOrderedStop = await this.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStopArrival.stopId); const orderedStopAfter = lastOrderedStop?.nextStop; if (orderedStopAfter) { const stopAfter = await this.getStopById(orderedStopAfter.stopId); - if (stopAfter && shuttleHasArrivedAtStop(shuttle, stopAfter, delta)) { + if (stopAfter && shuttleHasArrivedAtStop(shuttle, stopAfter, degreeDelta)) { return stopAfter; } } @@ -475,7 +495,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt for (const orderedStop of orderedStops) { const stop = await this.getStopById(orderedStop.stopId); - if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, delta)) { + if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, degreeDelta)) { return stop; } } @@ -536,6 +556,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt this.emit(ShuttleRepositoryEvent.SHUTTLE_REMOVED, shuttle); await this.removeShuttleLastStopIfExists(shuttleId); + await this.markShuttleAsNotAtStop(shuttleId); return shuttle; } @@ -573,39 +594,36 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt } // Clear methods - public async clearShuttleData(): Promise { - const keys = await this.redisClient.keys('shuttle:shuttle:*'); + private async clearRedisKeys(pattern: string): Promise { + const keys = await this.redisClient.keys(pattern); if (keys.length > 0) { await this.redisClient.del(keys); } + } + + public async clearShuttleData(): Promise { + await this.clearRedisKeys(this.shuttleKeyPattern); await this.clearShuttleLastStopData(); + await this.clearShuttleIsAtStopData(); } public async clearOrderedStopData(): Promise { - const keys = await this.redisClient.keys('shuttle:orderedstop:*'); - if (keys.length > 0) { - await this.redisClient.del(keys); - } + await this.clearRedisKeys(this.orderedStopKeyPattern); } public async clearRouteData(): Promise { - const keys = await this.redisClient.keys('shuttle:route:*'); - if (keys.length > 0) { - await this.redisClient.del(keys); - } + await this.clearRedisKeys(this.routeKeyPattern); } public async clearStopData(): Promise { - const keys = await this.redisClient.keys('shuttle:stop:*'); - if (keys.length > 0) { - await this.redisClient.del(keys); - } + await this.clearRedisKeys(this.stopKeyPattern); } private async clearShuttleLastStopData(): Promise { - const keys = await this.redisClient.keys('shuttle:laststop:*'); - if (keys.length > 0) { - await this.redisClient.del(keys); - } + await this.clearRedisKeys(this.lastStopKeyPattern); + } + + private async clearShuttleIsAtStopData(): Promise { + await this.clearRedisKeys(this.shuttleIsAtStopKey); } } diff --git a/src/repositories/shuttle/ShuttleGetterRepository.ts b/src/repositories/shuttle/ShuttleGetterRepository.ts index 7fbab0c..91f51e3 100644 --- a/src/repositories/shuttle/ShuttleGetterRepository.ts +++ b/src/repositories/shuttle/ShuttleGetterRepository.ts @@ -5,6 +5,7 @@ export const ShuttleRepositoryEvent = { SHUTTLE_UPDATED: "shuttleUpdated", SHUTTLE_REMOVED: "shuttleRemoved", SHUTTLE_WILL_ARRIVE_AT_STOP: "shuttleArrivedAtStop", + SHUTTLE_WILL_LEAVE_STOP: "shuttleWillLeaveStop", } as const; export type ShuttleRepositoryEventName = typeof ShuttleRepositoryEvent[keyof typeof ShuttleRepositoryEvent]; @@ -12,15 +13,22 @@ export type ShuttleRepositoryEventName = typeof ShuttleRepositoryEvent[keyof typ export type EtaRemovedEventPayload = IEta; export type EtaDataClearedEventPayload = IEta[]; -export interface WillArriveAtStopPayload { - lastArrival?: ShuttleStopArrival; - currentArrival: ShuttleStopArrival; +export interface ShuttleWillArriveAtStopPayload { + lastStopArrival?: ShuttleStopArrival; + willArriveAt: ShuttleStopArrival; }; +export type ShuttleIsNearStopPayload = ShuttleWillArriveAtStopPayload; + +export interface ShuttleWillLeaveStopPayload { + stopArrivalThatShuttleIsLeaving: ShuttleStopArrival; +} + export interface ShuttleRepositoryEventPayloads { [ShuttleRepositoryEvent.SHUTTLE_UPDATED]: IShuttle, [ShuttleRepositoryEvent.SHUTTLE_REMOVED]: IShuttle, - [ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP]: WillArriveAtStopPayload, + [ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP]: ShuttleWillArriveAtStopPayload, + [ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP]: ShuttleWillLeaveStopPayload, } export type ShuttleRepositoryEventListener = ( @@ -88,10 +96,24 @@ export interface ShuttleGetterRepository extends EventEmitter { getShuttleLastStopArrival(shuttleId: string): Promise; /** - * Check if a shuttle has arrived at a stop within the given delta. - * Returns the stop if the shuttle is at a stop, otherwise undefined. - * @param shuttle - * @param delta - The coordinate delta tolerance (default 0.001) + * Determine if the shuttle is currently at a stop. + * If `true`, then calling `getShuttleLastStopArrival` will get + * the stop the shuttle is currently at. + * @param shuttleId */ - getArrivedStopIfExists(shuttle: IShuttle, delta?: number): Promise; + checkIfShuttleIsAtStop(shuttleId: string): Promise; + + /** + * Get the stop that the shuttle is currently at, if it's the shuttle's + * next stop based on the "last stop" the shuttle was at. If there was no + * "last stop" for the shuttle, it may return any stop on the shuttle's route. + * + * @param shuttle + * @param canReturnShuttleCurrentStop If set to true, and the shuttle's "last stop" + * matches the arrived stop, continue to return the arrived stop. + * Otherwise, only return the shuttle's next stop. + * This flag has no effect if the shuttle has not had a "last stop". + * @returns + */ + getArrivedStopIfNextStop(shuttle: IShuttle, canReturnShuttleCurrentStop: boolean): Promise; } diff --git a/src/repositories/shuttle/UnoptimizedInMemoryShuttleRepository.ts b/src/repositories/shuttle/UnoptimizedInMemoryShuttleRepository.ts index a3e7dac..5fe340e 100644 --- a/src/repositories/shuttle/UnoptimizedInMemoryShuttleRepository.ts +++ b/src/repositories/shuttle/UnoptimizedInMemoryShuttleRepository.ts @@ -20,6 +20,13 @@ import { export class UnoptimizedInMemoryShuttleRepository extends EventEmitter implements ShuttleGetterSetterRepository { + + constructor( + readonly shuttleStopArrivalDegreeDelta: number = 0.001, + ) { + super() + } + public override on( event: T, listener: ShuttleRepositoryEventListener, @@ -74,6 +81,7 @@ export class UnoptimizedInMemoryShuttleRepository private orderedStops: IOrderedStop[] = []; private shuttleLastStopArrivals: Map = new Map(); private travelTimeData: Map> = new Map(); + private shuttlesAtStop: Set = new Set(); public async getStops(): Promise { return this.stops; @@ -174,26 +182,60 @@ export class UnoptimizedInMemoryShuttleRepository shuttle: IShuttle, travelTimeTimestamp = Date.now(), ) { - const arrivedStop = await this.getArrivedStopIfExists(shuttle); + const isAtStop = await this.checkIfShuttleIsAtStop(shuttle.id); - if (arrivedStop != undefined) { + let arrivedStop: IStop | undefined; + + if (isAtStop) { + // Allow retrieval of the shuttle's current stop + // Will still return undefined when the shuttle leaves the stop + arrivedStop = await this.getArrivedStopIfNextStop(shuttle, true); + } else { + arrivedStop = await this.getArrivedStopIfNextStop(shuttle, false); + } + + + // Will not fire *any* events if the same stop + const lastStop = await this.getShuttleLastStopArrival(shuttle.id); + if (lastStop?.stopId === arrivedStop?.id) return; + + if (isAtStop) { + if (lastStop) { + this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, { + stopArrivalThatShuttleIsLeaving: lastStop, + }); + } + await this.markShuttleAsNotAtStop(shuttle.id); + } + + if (arrivedStop) { // stop if same stop - const lastStop = await this.getShuttleLastStopArrival(shuttle.id); - if (lastStop?.stopId === arrivedStop.id) return; - const shuttleArrival = { stopId: arrivedStop.id, timestamp: new Date(travelTimeTimestamp), shuttleId: shuttle.id, }; this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, { - lastArrival: lastStop, - currentArrival: shuttleArrival, + lastStopArrival: lastStop, + willArriveAt: shuttleArrival, }); + await this.markShuttleAsAtStop(shuttleArrival.shuttleId); await this.updateShuttleLastStopArrival(shuttleArrival); } } + private async markShuttleAsAtStop(shuttleId: string) { + this.shuttlesAtStop.add(shuttleId); + } + + private async markShuttleAsNotAtStop(shuttleId: string) { + this.shuttlesAtStop.delete(shuttleId); + } + + public async checkIfShuttleIsAtStop(shuttleId: string) { + return this.shuttlesAtStop.has(shuttleId); + } + private async updateShuttleLastStopArrival(lastStopArrival: ShuttleStopArrival) { this.shuttleLastStopArrivals.set(lastStopArrival.shuttleId, lastStopArrival); @@ -225,18 +267,41 @@ export class UnoptimizedInMemoryShuttleRepository return sum / filteredPoints.length; } - public async getArrivedStopIfExists( + public async getArrivedStopIfNextStop( shuttle: IShuttle, - delta = 0.001, + canReturnShuttleCurrentStop: boolean = false, ): Promise { - const orderedStops = await this.getOrderedStopsByRouteId(shuttle.routeId); + const degreeDelta = this.shuttleStopArrivalDegreeDelta; - for (const orderedStop of orderedStops) { - const stop = await this.getStopById(orderedStop.stopId); - if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, delta)) { - return stop; + const lastStopArrival = await this.getShuttleLastStopArrival(shuttle.id); + if (lastStopArrival) { + // Return the shuttle's current stop depending on the flag + if (canReturnShuttleCurrentStop) { + const lastStop = await this.getStopById(lastStopArrival.stopId); + if (lastStop && shuttleHasArrivedAtStop(shuttle, lastStop, degreeDelta)) { + return lastStop; + } + } + + const lastOrderedStop = await this.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStopArrival.stopId); + const orderedStopAfter = lastOrderedStop?.nextStop; + if (orderedStopAfter) { + const stopAfter = await this.getStopById(orderedStopAfter.stopId); + if (stopAfter && shuttleHasArrivedAtStop(shuttle, stopAfter, degreeDelta)) { + return stopAfter; + } + } + } else { + const orderedStops = await this.getOrderedStopsByRouteId(shuttle.routeId); + + for (const orderedStop of orderedStops) { + const stop = await this.getStopById(orderedStop.stopId); + if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, degreeDelta)) { + return stop; + } } } + return undefined; } @@ -267,6 +332,7 @@ export class UnoptimizedInMemoryShuttleRepository const shuttle = await this.removeEntityByIdIfExists(shuttleId, this.shuttles); if (shuttle != null) { this.emit(ShuttleRepositoryEvent.SHUTTLE_REMOVED, shuttle); + this.shuttlesAtStop.delete(shuttleId); await this.removeShuttleLastStopIfExists(shuttleId); } return shuttle; @@ -289,6 +355,7 @@ export class UnoptimizedInMemoryShuttleRepository public async clearShuttleData(): Promise { this.shuttles = []; + this.shuttlesAtStop.clear(); await this.clearShuttleLastStopData(); } diff --git a/src/repositories/shuttle/__tests__/ShuttleRepositorySharedTests.test.ts b/src/repositories/shuttle/__tests__/ShuttleRepositorySharedTests.test.ts index f875f2e..8520999 100644 --- a/src/repositories/shuttle/__tests__/ShuttleRepositorySharedTests.test.ts +++ b/src/repositories/shuttle/__tests__/ShuttleRepositorySharedTests.test.ts @@ -4,10 +4,10 @@ import { UnoptimizedInMemoryShuttleRepository } from "../UnoptimizedInMemoryShut import { ShuttleGetterSetterRepository } from "../ShuttleGetterSetterRepository"; import { RedisShuttleRepository } from "../RedisShuttleRepository"; import { - generateMockOrderedStops, - generateMockRoutes, - generateMockShuttles, - generateMockStops, + generateMockOrderedStops, + generateMockRoutes, + generateMockShuttles, + generateMockStops, } from "../../../../testHelpers/mockDataGenerators"; import { RepositoryHolder } from "../../../../testHelpers/RepositoryHolder"; import { setupRouteAndOrderedStopsForShuttleRepository } from "../../../../testHelpers/setupRouteAndOrderedStopsForShuttleRepository"; @@ -565,17 +565,7 @@ describe.each(repositoryImplementations)('$name', (holder) => { describe("addOrUpdateShuttle with shuttle tracking", () => { test("updates the shuttle's last stop arrival if shuttle is at a stop", async () => { - const { route, systemId, stop2 } = await setupRouteAndOrderedStops(); - - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: stop2.coordinates, - orientationInDegrees: 0, - updatedTime: new Date(), - }; + const { stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); await repository.addOrUpdateShuttle(shuttle); const lastStop = await repository.getShuttleLastStopArrival(shuttle.id); @@ -584,20 +574,10 @@ describe.each(repositoryImplementations)('$name', (holder) => { }); describe("getArrivedStopIfExists", () => { - test("gets the stop that the shuttle is currently at, if exists", async () => { - const { route, systemId, stop2 } = await setupRouteAndOrderedStops(); + test("gets any stop that the shuttle is currently at, if the shuttle has not had a last stop", async () => { + const { sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: stop2.coordinates, - orientationInDegrees: 0, - updatedTime: new Date(), - }; - - const result = await repository.getArrivedStopIfExists(shuttle); + const result = await repository.getArrivedStopIfNextStop(shuttle, false); expect(result).toBeDefined(); expect(result?.id).toBe("st2"); @@ -605,37 +585,44 @@ describe.each(repositoryImplementations)('$name', (holder) => { }); test("returns undefined if shuttle is not currently at a stop", async () => { - const { route, systemId } = await setupRouteAndOrderedStops(); + const { sampleShuttleNotInRepository } = await setupRouteAndOrderedStops(); + const shuttle = { ...sampleShuttleNotInRepository, coordinates: { latitude: 12.5, longitude: 22.5 } }; // Not at any stop - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: { latitude: 12.5, longitude: 22.5 }, - orientationInDegrees: 0, - updatedTime: new Date(), - }; - - const result = await repository.getArrivedStopIfExists(shuttle); + const result = await repository.getArrivedStopIfNextStop(shuttle, false); expect(result).toBeUndefined(); }); + + test("only gets the shuttle's next stop if shuttle has previously arrived at a stop", async () => { + const { sampleShuttleNotInRepository: shuttle, stop1, stop2 } = await setupRouteAndOrderedStops(); + + shuttle.coordinates = stop1.coordinates; + await repository.addOrUpdateShuttle(shuttle); + + let result = await repository.getArrivedStopIfNextStop(shuttle, false); + expect(result).toBeUndefined(); + + shuttle.coordinates = stop2.coordinates; + result = await repository.getArrivedStopIfNextStop(shuttle, false); + expect(result).not.toBeUndefined(); + }); + + test("returns the shuttle's currently arrived stop if flag passed", async () => { + const { sampleShuttleNotInRepository: shuttle, stop1 } = await setupRouteAndOrderedStops(); + + shuttle.coordinates = stop1.coordinates; + await repository.addOrUpdateShuttle(shuttle); + + const result = await repository.getArrivedStopIfNextStop(shuttle, true); + expect(result?.id === stop1.id) + }); + }); describe("getShuttleLastStopArrival", () => { test("gets the shuttle's last stop if existing in the data", async () => { - const { route, systemId, stop1 } = await setupRouteAndOrderedStops(); - - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: stop1.coordinates, - orientationInDegrees: 0, - updatedTime: new Date(), - }; + const { stop1, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); + shuttle.coordinates = stop1.coordinates; const stopArrivalTime = new Date("2024-01-15T10:30:00Z"); await repository.addOrUpdateShuttle(shuttle, stopArrivalTime.getTime()); @@ -657,17 +644,8 @@ describe.each(repositoryImplementations)('$name', (holder) => { }); test("returns the most recent stop arrival when updated multiple times", async () => { - const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops(); - - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: stop1.coordinates, - orientationInDegrees: 0, - updatedTime: new Date(), - }; + const { stop1, stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); + shuttle.coordinates = stop1.coordinates; const firstArrivalTime = new Date("2024-01-15T10:30:00Z"); await repository.addOrUpdateShuttle(shuttle, firstArrivalTime.getTime()); @@ -750,27 +728,19 @@ describe.each(repositoryImplementations)('$name', (holder) => { describe("SHUTTLE_WILL_ARRIVE_AT_STOP event", () => { test("emits SHUTTLE_WILL_ARRIVE_AT_STOP event before shuttle arrives at a stop", async () => { - const { route, systemId, stop1 } = await setupRouteAndOrderedStops(); + const { stop1, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); const listener = jest.fn(); repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, listener); - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: stop1.coordinates, - orientationInDegrees: 0, - updatedTime: new Date(), - }; + shuttle.coordinates = stop1.coordinates; const arrivalTime = new Date("2024-01-15T10:30:00Z"); await repository.addOrUpdateShuttle(shuttle, arrivalTime.getTime()); expect(listener).toHaveBeenCalledTimes(1); const emittedPayload = listener.mock.calls[0][0] as any; - expect(emittedPayload.currentArrival).toEqual({ + expect(emittedPayload.willArriveAt).toEqual({ shuttleId: shuttle.id, stopId: stop1.id, timestamp: arrivalTime, @@ -778,20 +748,12 @@ describe.each(repositoryImplementations)('$name', (holder) => { }); test("does not emit event when shuttle is not at a stop", async () => { - const { route, systemId } = await setupRouteAndOrderedStops(); + const { sampleShuttleNotInRepository } = await setupRouteAndOrderedStops(); const listener = jest.fn(); repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, listener); - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: { latitude: 12.5, longitude: 22.5 }, // Not at any stop - orientationInDegrees: 0, - updatedTime: new Date(), - }; + const shuttle = { ...sampleShuttleNotInRepository, coordinates: { latitude: 12.5, longitude: 22.5 } }; // Not at any stop await repository.addOrUpdateShuttle(shuttle); @@ -799,20 +761,12 @@ describe.each(repositoryImplementations)('$name', (holder) => { }); test("emits multiple events as shuttle visits multiple stops", async () => { - const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops(); + const { stop1, stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); const listener = jest.fn(); repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, listener); - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: stop1.coordinates, - orientationInDegrees: 0, - updatedTime: new Date(), - }; + shuttle.coordinates = stop1.coordinates; const firstArrivalTime = new Date("2024-01-15T10:30:00Z"); await repository.addOrUpdateShuttle(shuttle, firstArrivalTime.getTime()); @@ -824,14 +778,101 @@ describe.each(repositoryImplementations)('$name', (holder) => { expect(listener).toHaveBeenCalledTimes(2); const firstPayload = listener.mock.calls[0][0] as any; - expect(firstPayload.currentArrival).toEqual({ + expect(firstPayload.willArriveAt).toEqual({ shuttleId: shuttle.id, stopId: stop1.id, timestamp: firstArrivalTime, }); const secondPayload = listener.mock.calls[1][0] as any; - expect(secondPayload.currentArrival).toEqual({ + expect(secondPayload.willArriveAt).toEqual({ + shuttleId: shuttle.id, + stopId: stop2.id, + timestamp: secondArrivalTime, + }); + }); + }); + + describe("SHUTTLE_WILL_LEAVE_STOP event", () => { + test("emits SHUTTLE_WILL_LEAVE_STOP event when shuttle leaves a stop", async () => { + const { stop1, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); + shuttle.coordinates = stop1.coordinates; + + const listener = jest.fn(); + repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, listener); + + // Simulate arrival at stop 1 + const arrivalTime = new Date("2024-01-15T10:30:00Z"); + await repository.addOrUpdateShuttle(shuttle, arrivalTime.getTime()); + + // Test that it actually emits the event correctly and not right after the shuttle arrives + await repository.addOrUpdateShuttle(shuttle, arrivalTime.getTime()); + expect(listener).not.toHaveBeenCalled(); + + shuttle.coordinates = { latitude: 12.5, longitude: 22.5 }; // Not at any stop + + // Simulate leaving stop 1 + const leaveTime = new Date("2024-01-15T10:32:00Z"); + await repository.addOrUpdateShuttle(shuttle, leaveTime.getTime()); + + expect(listener).toHaveBeenCalledTimes(1); + const emittedPayload = listener.mock.calls[0][0] as any; + expect(emittedPayload.stopArrivalThatShuttleIsLeaving).toEqual({ + shuttleId: shuttle.id, + stopId: stop1.id, + timestamp: arrivalTime, + }); + }); + + test("does not emit event when shuttle was not at a stop", async () => { + const { sampleShuttleNotInRepository } = await setupRouteAndOrderedStops(); + + const listener = jest.fn(); + repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, listener); + + // Start at coordinates not at any stop + const shuttle = { ...sampleShuttleNotInRepository, coordinates: { latitude: 12.5, longitude: 22.5 } }; + await repository.addOrUpdateShuttle(shuttle); + + // Move to different coordinates; still not at any stop + shuttle.coordinates = { latitude: 13.0, longitude: 23.0 }; + await repository.addOrUpdateShuttle(shuttle); + + expect(listener).toHaveBeenCalledTimes(0); + }); + + test("emits multiple events as shuttle leaves multiple stops", async () => { + const { stop1, stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); + + const listener = jest.fn(); + repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, listener); + + // Arrive at stop1 + shuttle.coordinates = stop1.coordinates; + const firstArrivalTime = new Date("2024-01-15T10:30:00Z"); + await repository.addOrUpdateShuttle(shuttle, firstArrivalTime.getTime()); + + // Leave stop1 and arrive at stop2 + shuttle.coordinates = stop2.coordinates; + const secondArrivalTime = new Date("2024-01-15T10:35:00Z"); + await repository.addOrUpdateShuttle(shuttle, secondArrivalTime.getTime()); + + // Leave stop2 + shuttle.coordinates = { latitude: 12.5, longitude: 22.5 }; // Not at any stop + const secondLeaveTime = new Date("2024-01-15T10:40:00Z"); + await repository.addOrUpdateShuttle(shuttle, secondLeaveTime.getTime()); + + expect(listener).toHaveBeenCalledTimes(2); + + const firstPayload = listener.mock.calls[0][0] as any; + expect(firstPayload.stopArrivalThatShuttleIsLeaving).toEqual({ + shuttleId: shuttle.id, + stopId: stop1.id, + timestamp: firstArrivalTime, + }); + + const secondPayload = listener.mock.calls[1][0] as any; + expect(secondPayload.stopArrivalThatShuttleIsLeaving).toEqual({ shuttleId: shuttle.id, stopId: stop2.id, timestamp: secondArrivalTime, diff --git a/src/repositories/shuttle/eta/InMemorySelfUpdatingETARepository.ts b/src/repositories/shuttle/eta/InMemorySelfUpdatingETARepository.ts index 89212ab..ef04186 100644 --- a/src/repositories/shuttle/eta/InMemorySelfUpdatingETARepository.ts +++ b/src/repositories/shuttle/eta/InMemorySelfUpdatingETARepository.ts @@ -1,5 +1,5 @@ import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository"; -import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, WillArriveAtStopPayload } from "../ShuttleGetterRepository"; +import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, ShuttleWillArriveAtStopPayload, ShuttleWillLeaveStopPayload } from "../ShuttleGetterRepository"; import { BaseInMemoryETARepository } from "./BaseInMemoryETARepository"; import { IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities"; import { ETARepositoryEvent } from "./ETAGetterRepository"; @@ -8,6 +8,8 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository private referenceTime: Date | null = null; private travelTimeData: Map> = new Map(); + private isListening = false; + constructor( readonly shuttleRepository: ShuttleGetterRepository ) { @@ -16,8 +18,12 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository this.setReferenceTime = this.setReferenceTime.bind(this); this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this); this.startListeningForUpdates = this.startListeningForUpdates.bind(this); - this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this); this.handleShuttleWillArriveAtStop = this.handleShuttleWillArriveAtStop.bind(this); + this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this); + this.updateCascadingEta = this.updateCascadingEta.bind(this); + this.getAverageTravelTimeSecondsWithFallbacks = this.getAverageTravelTimeSecondsWithFallbacks.bind(this); + this.removeEtaIfExists = this.removeEtaIfExists.bind(this); + this.handleShuttleWillLeaveStop = this.handleShuttleWillLeaveStop.bind(this); } setReferenceTime(referenceTime: Date): void { @@ -51,13 +57,23 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository } startListeningForUpdates(): void { + if (this.isListening) { + console.warn("Already listening to updates; did you call startListeningForUpdates twice?"); + return; + } this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop); + this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop); + this.isListening = true; } - stopListeningForUpdates(): void { + if (!this.isListening) { + return; + } this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop); + this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop); + this.isListening = false; } private async getAverageTravelTimeSecondsWithFallbacks( @@ -74,9 +90,25 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository } private async handleShuttleUpdate(shuttle: IShuttle): Promise { + const isAtStop = await this.shuttleRepository.checkIfShuttleIsAtStop(shuttle.id); const lastStop = await this.shuttleRepository.getShuttleLastStopArrival(shuttle.id); if (!lastStop) return; + if (isAtStop) { + // Update the ETA *to* the stop the shuttle is currently at, + // before starting from the current stop as normal. + // Account for cases where the shuttle arrived way earlier than + // expected based on the calculated ETA. + + await this.addOrUpdateEta({ + secondsRemaining: 1, + shuttleId: shuttle.id, + stopId: lastStop.stopId, + systemId: shuttle.systemId, + updatedTime: new Date(), + }); + } + const lastOrderedStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId); await this.updateCascadingEta({ @@ -157,14 +189,9 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository } private async handleShuttleWillArriveAtStop({ - lastArrival, - currentArrival, - }: WillArriveAtStopPayload): Promise { - const etas = await this.getEtasForShuttleId(currentArrival.shuttleId); - for (const eta of etas) { - await this.removeEtaIfExists(eta.shuttleId, eta.stopId); - } - + lastStopArrival: lastArrival, + willArriveAt: currentArrival, + }: ShuttleWillArriveAtStopPayload): Promise { if (lastArrival) { // disallow cases where this gets triggered multiple times if (lastArrival.stopId === currentArrival.stopId) return; @@ -181,6 +208,12 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository } } + private async handleShuttleWillLeaveStop({ + stopArrivalThatShuttleIsLeaving, + }: ShuttleWillLeaveStopPayload) { + await this.removeEtaIfExists(stopArrivalThatShuttleIsLeaving.shuttleId, stopArrivalThatShuttleIsLeaving.stopId); + } + private async addTravelTimeDataPoint( { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, travelTimeSeconds: number, diff --git a/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts b/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts index 76e723b..cabf5a3 100644 --- a/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts +++ b/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts @@ -1,22 +1,17 @@ import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository"; import { BaseRedisETARepository } from "./BaseRedisETARepository"; -import { createClient, RedisClientType } from "redis"; -import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, WillArriveAtStopPayload } from "../ShuttleGetterRepository"; -import { REDIS_RECONNECT_INTERVAL } from "../../../environment"; +import { RedisClientType } from "redis"; +import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, ShuttleWillArriveAtStopPayload, ShuttleWillLeaveStopPayload } from "../ShuttleGetterRepository"; import { IEta, IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities"; import { ETARepositoryEvent } from "./ETAGetterRepository"; +import createRedisClientForRepository from "../../../helpers/createRedisClientForRepository"; export class RedisSelfUpdatingETARepository extends BaseRedisETARepository implements SelfUpdatingETARepository { + private isListening = false; + constructor( readonly shuttleRepository: ShuttleGetterRepository, - redisClient: RedisClientType = createClient({ - url: process.env.REDIS_URL, - socket: { - tls: process.env.NODE_ENV === 'production', - rejectUnauthorized: false, - reconnectStrategy: REDIS_RECONNECT_INTERVAL, - }, - }), + redisClient: RedisClientType = createRedisClientForRepository(), private referenceTime: Date | null = null, ) { super(redisClient); @@ -29,6 +24,7 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple this.updateCascadingEta = this.updateCascadingEta.bind(this); this.getAverageTravelTimeSecondsWithFallbacks = this.getAverageTravelTimeSecondsWithFallbacks.bind(this); this.removeEtaIfExists = this.removeEtaIfExists.bind(this); + this.handleShuttleWillLeaveStop = this.handleShuttleWillLeaveStop.bind(this); } private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => { @@ -71,14 +67,25 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple } } - startListeningForUpdates() { + startListeningForUpdates(): void { + if (this.isListening) { + console.warn("Already listening to updates; did you call startListeningForUpdates twice?"); + return; + } this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); - this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop) + this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop); + this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop); + this.isListening = true; } - stopListeningForUpdates() { + stopListeningForUpdates(): void { + if (!this.isListening) { + return; + } this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate); this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop); + this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop); + this.isListening = false; } private async getAverageTravelTimeSecondsWithFallbacks( @@ -95,9 +102,25 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple } private async handleShuttleUpdate(shuttle: IShuttle) { + const isAtStop = await this.shuttleRepository.checkIfShuttleIsAtStop(shuttle.id); const lastStop = await this.shuttleRepository.getShuttleLastStopArrival(shuttle.id); if (!lastStop) return; + if (isAtStop) { + // Update the ETA *to* the stop the shuttle is currently at, + // before starting from the current stop as normal. + // Account for cases where the shuttle arrived way earlier than + // expected based on the calculated ETA. + + await this.addOrUpdateEta({ + secondsRemaining: 1, + shuttleId: shuttle.id, + stopId: lastStop.stopId, + systemId: shuttle.systemId, + updatedTime: new Date(), + }); + } + const lastOrderedStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId); await this.updateCascadingEta({ @@ -179,14 +202,9 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple private async handleShuttleWillArriveAtStop({ - lastArrival, - currentArrival, - }: WillArriveAtStopPayload) { - const etas = await this.getEtasForShuttleId(currentArrival.shuttleId); - for (const eta of etas) { - await this.removeEtaIfExists(eta.shuttleId, eta.stopId); - } - + lastStopArrival: lastArrival, + willArriveAt: currentArrival, + }: ShuttleWillArriveAtStopPayload) { // only update time traveled if last arrival exists if (lastArrival) { // disallow cases where this gets triggered multiple times @@ -204,6 +222,13 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple } } + private async handleShuttleWillLeaveStop({ + stopArrivalThatShuttleIsLeaving, + }: ShuttleWillLeaveStopPayload) { + await this.removeEtaIfExists(stopArrivalThatShuttleIsLeaving.shuttleId, stopArrivalThatShuttleIsLeaving.stopId); + } + + public async addTravelTimeDataPoint( { routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier, travelTimeSeconds: number, diff --git a/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts b/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts index b260e3e..6d5d9b1 100644 --- a/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts +++ b/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts @@ -8,6 +8,7 @@ import { RedisShuttleRepository } from "../../RedisShuttleRepository"; import { UnoptimizedInMemoryShuttleRepository } from "../../UnoptimizedInMemoryShuttleRepository"; import { setupRouteAndOrderedStopsForShuttleRepository } from "../../../../../testHelpers/setupRouteAndOrderedStopsForShuttleRepository"; import { ShuttleGetterSetterRepository } from "../../ShuttleGetterSetterRepository"; +import { IShuttle, IStop } from "../../../../entities/ShuttleRepositoryEntities"; class RedisSelfUpdatingETARepositoryHolder implements RepositoryHolder { repo: RedisSelfUpdatingETARepository | undefined; @@ -53,7 +54,7 @@ class InMemorySelfUpdatingETARepositoryHolder implements RepositoryHolder { @@ -74,21 +75,45 @@ describe.each(repositoryImplementations)('$name', (holder) => { return await setupRouteAndOrderedStopsForShuttleRepository(shuttleRepository); } + async function populateTravelTimeDataForStops({ + currentTime, + shuttle, + stop1, + stop2, + stop3, + firstStopArrivalTime = new Date(2025, 0, 1, 11, 0, 0), + secondStopArrivalTime = new Date(2025, 0, 1, 11, 15, 0), + thirdStopArrivalTime = new Date(2025, 0, 1, 11, 20, 0), + }: { + currentTime: Date; + shuttle: IShuttle; + stop1: IStop; + stop2: IStop; + stop3: IStop; + firstStopArrivalTime?: Date; + secondStopArrivalTime?: Date; + thirdStopArrivalTime?: Date; + }) { + repository.setReferenceTime(currentTime); + repository.startListeningForUpdates(); + + shuttle.coordinates = stop1.coordinates; + await shuttleRepository.addOrUpdateShuttle(shuttle, firstStopArrivalTime.getTime()); + + shuttle.coordinates = stop2.coordinates; + await shuttleRepository.addOrUpdateShuttle(shuttle, secondStopArrivalTime.getTime()); + + shuttle.coordinates = stop3.coordinates; + await shuttleRepository.addOrUpdateShuttle(shuttle, thirdStopArrivalTime.getTime()); + } + describe("handleShuttleWillArriveAtStop", () => { test("updates how long the shuttle took to get from one stop to another", async () => { - const { route, systemId, stop2, stop1 } = await setupRouteAndOrderedStops(); + const { route, stop2, stop1, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); repository.startListeningForUpdates(); - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: stop1.coordinates, - orientationInDegrees: 0, - updatedTime: new Date(), - }; + shuttle.coordinates = stop1.coordinates; const firstStopArrivalTime = new Date(2025, 0, 1, 12, 0, 0); await shuttleRepository.addOrUpdateShuttle(shuttle, firstStopArrivalTime.getTime()); @@ -118,33 +143,10 @@ describe.each(repositoryImplementations)('$name', (holder) => { currentTime: Date, shuttleSecondArrivalTimeAtFirstStop: Date ) { - const { route, systemId, stop1, stop2, stop3 } = await setupRouteAndOrderedStops(); + const { stop1, stop2, stop3, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); // Populating travel time data - const firstStopArrivalTime = new Date(2025, 0, 1, 11, 0, 0); - const secondStopArrivalTime = new Date(2025, 0, 1, 11, 15, 0); - const thirdStopArrivalTime = new Date(2025, 0, 1, 11, 20, 0); - - repository.setReferenceTime(currentTime); - repository.startListeningForUpdates(); - - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: stop1.coordinates, - orientationInDegrees: 0, - updatedTime: new Date(), - }; - - await shuttleRepository.addOrUpdateShuttle(shuttle, firstStopArrivalTime.getTime()); - - shuttle.coordinates = stop2.coordinates; - await shuttleRepository.addOrUpdateShuttle(shuttle, secondStopArrivalTime.getTime()); - - shuttle.coordinates = stop3.coordinates; - await shuttleRepository.addOrUpdateShuttle(shuttle, thirdStopArrivalTime.getTime()); + await populateTravelTimeDataForStops({ currentTime, shuttle, stop1, stop2, stop3 }); // Populating ETA data shuttle.coordinates = stop1.coordinates; @@ -192,23 +194,79 @@ describe.each(repositoryImplementations)('$name', (holder) => { currentTime, shuttleSecondArrivalTimeAtFirstStop ); }); + + test("adds a 'stopgap' entry of 1 second when the shuttle arrives at a stop", async () => { + const { stop1, stop2, stop3, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); + + const shuttleSecondArrivalTimeAtFirstStop = new Date(2025, 0, 1, 12, 5, 0); + const currentTime = new Date(shuttleSecondArrivalTimeAtFirstStop.getTime() + 7 * 60 * 1000); + + // Populating travel time data + await populateTravelTimeDataForStops({ currentTime, shuttle, stop1, stop2, stop3 }); + + // Populate ETA data + // Simulate shuttle running early for second stop + shuttle.coordinates = stop1.coordinates; + await shuttleRepository.addOrUpdateShuttle( + shuttle, + shuttleSecondArrivalTimeAtFirstStop.getTime() + ); + + shuttle.coordinates = stop2.coordinates; + // Call twice to get the ETA repository to read the correct flag + await shuttleRepository.addOrUpdateShuttle( + shuttle, + currentTime.getTime(), + ); + await new Promise((resolve) => setTimeout(resolve, 500)); + + await shuttleRepository.addOrUpdateShuttle( + shuttle, + currentTime.getTime(), // ~8 minutes early + ); + await new Promise((resolve) => setTimeout(resolve, 500)); + + const eta = await repository.getEtaForShuttleAndStopId(shuttle.id, stop2.id); + expect(eta?.secondsRemaining).toEqual(1); + }); + }); + + describe("handleShuttleWillLeaveStop", () => { + test("clears ETA of correct stop on leaving stop", async () => { + const { stop1, stop2, stop3, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); + + const shuttleSecondArrivalTimeAtFirstStop = new Date(2025, 0, 8, 12, 0, 0); + const shuttleSecondArrivalTimeAtSecondStop = new Date(2025, 0, 8, 12, 15, 0); + const currentTime = new Date(shuttleSecondArrivalTimeAtSecondStop.getTime() + 3 * 60 * 1000); + + await populateTravelTimeDataForStops({ currentTime, shuttle, stop1, stop2, stop3 }); + + // Populating ETA data + shuttle.coordinates = stop1.coordinates; + await shuttleRepository.addOrUpdateShuttle(shuttle, shuttleSecondArrivalTimeAtFirstStop.getTime()); + + shuttle.coordinates = stop2.coordinates; + await shuttleRepository.addOrUpdateShuttle(shuttle, shuttleSecondArrivalTimeAtSecondStop.getTime()); + + shuttle.coordinates = { latitude: 12.5, longitude: 12.5 } + await shuttleRepository.addOrUpdateShuttle(shuttle, currentTime.getTime()); + + await new Promise((resolve) => setTimeout(resolve, 1000)); + + const etaForStop3 = await repository.getEtaForShuttleAndStopId(shuttle.id, stop3.id); + expect(etaForStop3).not.toBeNull(); + const etaForStop2 = await repository.getEtaForShuttleAndStopId(shuttle.id, stop2.id); + expect(etaForStop2).toBeNull(); + }, 60000); }); describe("getAverageTravelTimeSeconds", () => { test("returns the average travel time when historical data exists", async () => { - const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops(); + const { route, stop1, stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); repository.startListeningForUpdates(); - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: stop1.coordinates, - orientationInDegrees: 0, - updatedTime: new Date(), - }; + shuttle.coordinates = stop1.coordinates; const firstStopTime = new Date(2025, 0, 1, 12, 0, 0); await shuttleRepository.addOrUpdateShuttle(shuttle, firstStopTime.getTime()); @@ -232,19 +290,11 @@ describe.each(repositoryImplementations)('$name', (holder) => { }); test("returns average of multiple data points", async () => { - const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops(); + const { route, stop1, stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); repository.startListeningForUpdates(); - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: stop1.coordinates, - orientationInDegrees: 0, - updatedTime: new Date(), - }; + shuttle.coordinates = stop1.coordinates; // First trip: 10 minutes travel time await shuttleRepository.addOrUpdateShuttle(shuttle, new Date(2025, 0, 1, 12, 0, 0).getTime()); @@ -288,19 +338,11 @@ describe.each(repositoryImplementations)('$name', (holder) => { }); test("returns undefined when querying outside the time range of data", async () => { - const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops(); + const { route, stop1, stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops(); repository.startListeningForUpdates(); - const shuttle = { - id: "sh1", - name: "Shuttle 1", - routeId: route.id, - systemId: systemId, - coordinates: stop1.coordinates, - orientationInDegrees: 0, - updatedTime: new Date(), - }; + shuttle.coordinates = stop1.coordinates; await shuttleRepository.addOrUpdateShuttle(shuttle, new Date(2025, 0, 1, 12, 0, 0).getTime()); shuttle.coordinates = stop2.coordinates; diff --git a/testHelpers/apolloTestServerHelpers.ts b/testHelpers/apolloTestServerHelpers.ts index 60a8355..6f32883 100644 --- a/testHelpers/apolloTestServerHelpers.ts +++ b/testHelpers/apolloTestServerHelpers.ts @@ -25,6 +25,7 @@ const systemInfoForTesting: InterchangeSystemBuilderArguments = { passioSystemId: "263", parkingSystemId: ChapmanApiBasedParkingRepositoryLoader.id, useSelfUpdatingEtas: false, + shuttleStopArrivalDegreeDelta: 0.001, }; export function buildSystemForTesting() { diff --git a/testHelpers/setupRouteAndOrderedStopsForShuttleRepository.ts b/testHelpers/setupRouteAndOrderedStopsForShuttleRepository.ts index 7dd80e9..65e81ba 100644 --- a/testHelpers/setupRouteAndOrderedStopsForShuttleRepository.ts +++ b/testHelpers/setupRouteAndOrderedStopsForShuttleRepository.ts @@ -1,4 +1,4 @@ -import { IOrderedStop, IStop } from "../src/entities/ShuttleRepositoryEntities"; +import { IOrderedStop, IShuttle, IStop } from "../src/entities/ShuttleRepositoryEntities"; import { ShuttleGetterSetterRepository } from "../src/repositories/shuttle/ShuttleGetterSetterRepository"; export async function setupRouteAndOrderedStopsForShuttleRepository( @@ -71,11 +71,22 @@ export async function setupRouteAndOrderedStopsForShuttleRepository( await shuttleRepository.addOrUpdateOrderedStop(orderedStop2); await shuttleRepository.addOrUpdateOrderedStop(orderedStop3); + const sampleShuttleNotInRepository: IShuttle = { + id: "sh1", + name: "Shuttle 1", + routeId: route.id, + systemId: systemId, + coordinates: stop2.coordinates, + orientationInDegrees: 0, + updatedTime: new Date(), + }; + return { route, systemId, stop1, stop2, stop3, + sampleShuttleNotInRepository, }; } diff --git a/tsconfig.build.json b/tsconfig.build.json new file mode 100644 index 0000000..37ac446 --- /dev/null +++ b/tsconfig.build.json @@ -0,0 +1,15 @@ +// For builds, excludes tests and mocks +{ + "compilerOptions": { + "target": "es2016", + "module": "commonjs", + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "strict": true, + "skipLibCheck": true, + "outDir": "dist", + "sourceMap": true + }, + "include": ["src"], + "exclude": ["**/__tests__/*/**", "**/__mocks__/*/**"] +} diff --git a/tsconfig.json b/tsconfig.json index dfc1303..57e4d7f 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,3 +1,4 @@ +// For type-checking, includes tests and mocks { "compilerOptions": { "target": "es2016", @@ -10,5 +11,4 @@ "sourceMap": true }, "include": ["src"], - "exclude": ["**/__tests__/*/**", "**/__mocks__/*/**"] }