Merge pull request #96 from brendan-ch:feat/next-stop-eta-lock

feat/next-stop-eta-lock
This commit is contained in:
2025-11-23 20:38:37 -08:00
committed by GitHub
17 changed files with 628 additions and 333 deletions

View File

@@ -4,8 +4,8 @@
"description": "",
"main": "dist/index.js",
"scripts": {
"build:dev": "npm install --include=dev && npm run generate && tsc",
"build": "npm install --include=dev && npm run generate && tsc && npm prune --omit=dev",
"build:dev": "npm install --include=dev && npm run generate && tsc --project tsconfig.build.json",
"build": "npm install --include=dev && npm run generate && tsc --project tsconfig.build.json && npm prune --omit=dev",
"start:dev": "npm run build:dev && node ./dist/index.js",
"start": "npm run build && node ./dist/index.js",
"generate": "graphql-codegen --config codegen.ts",

View File

@@ -23,6 +23,7 @@ import { RedisExternalSourceETARepository } from "../repositories/shuttle/eta/Re
import { InMemorySelfUpdatingETARepository } from "../repositories/shuttle/eta/InMemorySelfUpdatingETARepository";
import { BaseRedisETARepository } from "../repositories/shuttle/eta/BaseRedisETARepository";
import { BaseInMemoryETARepository } from "../repositories/shuttle/eta/BaseInMemoryETARepository";
import createRedisClientForRepository from "../helpers/createRedisClientForRepository";
export interface InterchangeSystemBuilderArguments {
name: string;
@@ -46,7 +47,13 @@ export interface InterchangeSystemBuilderArguments {
* Controls whether to self-calculate ETAs or use the external
* shuttle provider for them.
*/
useSelfUpdatingEtas: boolean
useSelfUpdatingEtas: boolean;
/**
* The size of the threshold to detect when a shuttle has arrived
* at a stop, in latitude/longitude degrees.
*/
shuttleStopArrivalDegreeDelta: number;
}
export class InterchangeSystem {
@@ -98,7 +105,10 @@ export class InterchangeSystem {
}
private static async buildRedisShuttleLoaderAndRepositories(args: InterchangeSystemBuilderArguments) {
const shuttleRepository = new RedisShuttleRepository();
const shuttleRepository = new RedisShuttleRepository(
createRedisClientForRepository(),
args.shuttleStopArrivalDegreeDelta,
);
await shuttleRepository.connect();
let etaRepository: BaseRedisETARepository;
@@ -247,7 +257,9 @@ export class InterchangeSystem {
}
private static buildInMemoryShuttleLoaderAndRepositories(args: InterchangeSystemBuilderArguments) {
const shuttleRepository = new UnoptimizedInMemoryShuttleRepository();
const shuttleRepository = new UnoptimizedInMemoryShuttleRepository(
args.shuttleStopArrivalDegreeDelta,
);
let etaRepository: BaseInMemoryETARepository;
let shuttleDataLoader: ApiBasedShuttleRepositoryLoader;

View File

@@ -0,0 +1,14 @@
import { createClient, RedisClientType } from "redis";
import { REDIS_RECONNECT_INTERVAL } from "../environment";
export default function createRedisClientForRepository() {
const client = createClient({
url: process.env.REDIS_URL,
socket: {
tls: process.env.NODE_ENV === 'production',
rejectUnauthorized: false,
reconnectStrategy: REDIS_RECONNECT_INTERVAL,
},
});
return client as RedisClientType;
}

View File

@@ -24,6 +24,7 @@ const supportedSystems: InterchangeSystemBuilderArguments[] = [
parkingSystemId: ChapmanApiBasedParkingRepositoryLoader.id,
name: "Chapman University",
useSelfUpdatingEtas: true,
shuttleStopArrivalDegreeDelta: 0.001,
}
]

View File

@@ -37,10 +37,10 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
await this.updateStopAndPolylineDataForRoutesInSystem();
await this.updateShuttleDataForSystemBasedOnProximityToRoutes();
// Because ETA method doesn't support pruning yet,
// add a call to the clear method here
if (this.etaRepository) {
await this.updateEtaDataForExistingStopsForSystem();
}
}
public async updateRouteDataForSystem() {
try {

View File

@@ -1,19 +1,12 @@
import { createClient, RedisClientType } from 'redis';
import { REDIS_RECONNECT_INTERVAL } from "../environment";
import { RedisClientType } from 'redis';
import { EventEmitter } from 'stream';
import createRedisClientForRepository from '../helpers/createRedisClientForRepository';
export abstract class BaseRedisRepository extends EventEmitter {
protected redisClient;
constructor(
redisClient: RedisClientType = createClient({
url: process.env.REDIS_URL,
socket: {
tls: process.env.NODE_ENV === 'production',
rejectUnauthorized: false,
reconnectStrategy: REDIS_RECONNECT_INTERVAL,
},
}),
redisClient: RedisClientType = createRedisClientForRepository(),
) {
super();
this.redisClient = redisClient;

View File

@@ -10,8 +10,17 @@ import {
ShuttleTravelTimeDateFilterArguments
} from "./ShuttleGetterRepository";
import { BaseRedisRepository } from "../BaseRedisRepository";
import { RedisClientType } from "redis";
import createRedisClientForRepository from "../../helpers/createRedisClientForRepository";
export class RedisShuttleRepository extends BaseRedisRepository implements ShuttleGetterSetterRepository {
constructor(
redisClient: RedisClientType = createRedisClientForRepository(),
readonly shuttleStopArrivalDegreeDelta: number = 0.001,
) {
super(redisClient);
}
get isReady() {
return this.redisClient.isReady;
}
@@ -73,16 +82,35 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
return super.emit(event, ...args);
}
// Key prefixes for individual entity keys
private readonly stopKeyPrefix = 'shuttle:stop:';
private readonly routeKeyPrefix = 'shuttle:route:';
private readonly shuttleKeyPrefix = 'shuttle:shuttle:';
private readonly orderedStopKeyPrefix = 'shuttle:orderedstop:';
private readonly lastStopKeyPrefix = 'shuttle:laststop:';
private readonly historicalEtaKeyPrefix = 'shuttle:eta:historical:';
// Key patterns for bulk operations (e.g., getting all keys, clearing data)
private readonly stopKeyPattern = 'shuttle:stop:*';
private readonly routeKeyPattern = 'shuttle:route:*';
private readonly shuttleKeyPattern = 'shuttle:shuttle:*';
private readonly orderedStopKeyPattern = 'shuttle:orderedstop:*';
private readonly lastStopKeyPattern = 'shuttle:laststop:*';
/**
* Represents a set storing the shuttles that are currently at a stop.
*/
private readonly shuttleIsAtStopKey = 'shuttle:atstop';
// Helper methods for Redis key generation
private createStopKey = (stopId: string) => `shuttle:stop:${stopId}`;
private createRouteKey = (routeId: string) => `shuttle:route:${routeId}`;
private createShuttleKey = (shuttleId: string) => `shuttle:shuttle:${shuttleId}`;
private createEtaKey = (shuttleId: string, stopId: string) => `shuttle:eta:${shuttleId}:${stopId}`;
private createOrderedStopKey = (routeId: string, stopId: string) => `shuttle:orderedstop:${routeId}:${stopId}`;
private createShuttleLastStopKey = (shuttleId: string) => `shuttle:laststop:${shuttleId}`;
private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => {
return `shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`;
}
private readonly createStopKey = (stopId: string) => `${this.stopKeyPrefix}${stopId}`;
private readonly createRouteKey = (routeId: string) => `${this.routeKeyPrefix}${routeId}`;
private readonly createShuttleKey = (shuttleId: string) => `${this.shuttleKeyPrefix}${shuttleId}`;
private readonly createOrderedStopKey = (routeId: string, stopId: string) => `${this.orderedStopKeyPrefix}${routeId}:${stopId}`;
private readonly createShuttleLastStopKey = (shuttleId: string) => `${this.lastStopKeyPrefix}${shuttleId}`;
private readonly createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => {
return `${this.historicalEtaKeyPrefix}${routeId}:${fromStopId}:${toStopId}`;
};
// Helper methods for converting entities to Redis hashes
private createRedisHashFromStop = (stop: IStop): Record<string, string> => ({
@@ -214,7 +242,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
// Getter methods
public async getStops(): Promise<IStop[]> {
const keys = await this.redisClient.keys('shuttle:stop:*');
const keys = await this.redisClient.keys(this.stopKeyPattern);
const stops: IStop[] = [];
for (const key of keys) {
@@ -239,7 +267,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
}
public async getRoutes(): Promise<IRoute[]> {
const keys = await this.redisClient.keys('shuttle:route:*');
const keys = await this.redisClient.keys(this.routeKeyPattern);
const routes: IRoute[] = [];
for (const key of keys) {
@@ -264,7 +292,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
}
public async getShuttles(): Promise<IShuttle[]> {
const keys = await this.redisClient.keys('shuttle:shuttle:*');
const keys = await this.redisClient.keys(this.shuttleKeyPattern);
const shuttles: IShuttle[] = [];
for (const key of keys) {
@@ -293,45 +321,6 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
return this.createShuttleFromRedisData(data);
}
public async getEtasForShuttleId(shuttleId: string): Promise<IEta[]> {
const keys = await this.redisClient.keys(`shuttle:eta:${shuttleId}:*`);
const etas: IEta[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
etas.push(this.createEtaFromRedisData(data));
}
}
return etas;
}
public async getEtasForStopId(stopId: string): Promise<IEta[]> {
const keys = await this.redisClient.keys('shuttle:eta:*');
const etas: IEta[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0 && data.stopId === stopId) {
etas.push(this.createEtaFromRedisData(data));
}
}
return etas;
}
public async getEtaForShuttleAndStopId(shuttleId: string, stopId: string): Promise<IEta | null> {
const key = this.createEtaKey(shuttleId, stopId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return null;
}
return this.createEtaFromRedisData(data);
}
public async getOrderedStopByRouteAndStopId(routeId: string, stopId: string): Promise<IOrderedStop | null> {
const key = this.createOrderedStopKey(routeId, stopId);
const data = await this.redisClient.hGetAll(key);
@@ -344,7 +333,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
}
public async getOrderedStopsByStopId(stopId: string): Promise<IOrderedStop[]> {
const keys = await this.redisClient.keys('shuttle:orderedstop:*');
const keys = await this.redisClient.keys(this.orderedStopKeyPattern);
const orderedStops: IOrderedStop[] = [];
for (const key of keys) {
@@ -358,7 +347,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
}
public async getOrderedStopsByRouteId(routeId: string): Promise<IOrderedStop[]> {
const keys = await this.redisClient.keys(`shuttle:orderedstop:${routeId}:*`);
const keys = await this.redisClient.keys(`${this.orderedStopKeyPrefix}${routeId}:*`);
const orderedStops: IOrderedStop[] = [];
for (const key of keys) {
@@ -393,26 +382,58 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
shuttle: IShuttle,
travelTimeTimestamp = Date.now(),
) {
const arrivedStop = await this.getArrivedStopIfExists(shuttle);
const isAtStop = await this.checkIfShuttleIsAtStop(shuttle.id);
let arrivedStop: IStop | undefined;
if (isAtStop) {
// Allow retrieval of the shuttle's current stop
// Will still return undefined when the shuttle leaves the stop
arrivedStop = await this.getArrivedStopIfNextStop(shuttle, true);
} else {
arrivedStop = await this.getArrivedStopIfNextStop(shuttle, false);
}
// Will not fire *any* events if the same stop
const lastStop = await this.getShuttleLastStopArrival(shuttle.id);
if (lastStop?.stopId === arrivedStop?.id) return;
if (isAtStop) {
if (lastStop) {
this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, {
stopArrivalThatShuttleIsLeaving: lastStop,
});
}
await this.markShuttleAsNotAtStop(shuttle.id);
}
if (arrivedStop) {
// stop if same stop
const lastStop = await this.getShuttleLastStopArrival(shuttle.id);
if (lastStop?.stopId === arrivedStop.id) return;
const shuttleArrival = {
stopId: arrivedStop.id,
timestamp: new Date(travelTimeTimestamp),
shuttleId: shuttle.id,
};
this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, {
lastArrival: lastStop,
currentArrival: shuttleArrival,
lastStopArrival: lastStop,
willArriveAt: shuttleArrival,
});
await this.markShuttleAsAtStop(shuttleArrival.shuttleId);
await this.updateShuttleLastStopArrival(shuttleArrival);
}
}
private async markShuttleAsAtStop(shuttleId: string) {
await this.redisClient.sAdd(this.shuttleIsAtStopKey, shuttleId);
}
private async markShuttleAsNotAtStop(shuttleId: string) {
await this.redisClient.sRem(this.shuttleIsAtStopKey, shuttleId);
}
public async checkIfShuttleIsAtStop(shuttleId: string) {
return await this.redisClient.sIsMember(this.shuttleIsAtStopKey, shuttleId);
}
public async getAverageTravelTimeSeconds(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
{ from, to }: ShuttleTravelTimeDateFilterArguments,
@@ -445,28 +466,27 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
}
}
/**
* Get the stop that the shuttle is currently at, if it exists.
*
* If the shuttle has a "last stop", it will only return the stop
* directly after the last stop. Otherwise, it may return any stop that
* is on the shuttle's route.
*
* @param shuttle
* @param delta
* @returns
*/
public async getArrivedStopIfExists(
public async getArrivedStopIfNextStop(
shuttle: IShuttle,
delta = 0.001,
canReturnShuttleCurrentStop: boolean = false,
): Promise<IStop | undefined> {
const lastStop = await this.getShuttleLastStopArrival(shuttle.id);
if (lastStop) {
const lastOrderedStop = await this.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId);
const degreeDelta = this.shuttleStopArrivalDegreeDelta;
const lastStopArrival = await this.getShuttleLastStopArrival(shuttle.id);
if (lastStopArrival) {
// Return the shuttle's current stop depending on the flag
if (canReturnShuttleCurrentStop) {
const lastStop = await this.getStopById(lastStopArrival.stopId);
if (lastStop && shuttleHasArrivedAtStop(shuttle, lastStop, degreeDelta)) {
return lastStop;
}
}
const lastOrderedStop = await this.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStopArrival.stopId);
const orderedStopAfter = lastOrderedStop?.nextStop;
if (orderedStopAfter) {
const stopAfter = await this.getStopById(orderedStopAfter.stopId);
if (stopAfter && shuttleHasArrivedAtStop(shuttle, stopAfter, delta)) {
if (stopAfter && shuttleHasArrivedAtStop(shuttle, stopAfter, degreeDelta)) {
return stopAfter;
}
}
@@ -475,7 +495,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
for (const orderedStop of orderedStops) {
const stop = await this.getStopById(orderedStop.stopId);
if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, delta)) {
if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, degreeDelta)) {
return stop;
}
}
@@ -536,6 +556,7 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
this.emit(ShuttleRepositoryEvent.SHUTTLE_REMOVED, shuttle);
await this.removeShuttleLastStopIfExists(shuttleId);
await this.markShuttleAsNotAtStop(shuttleId);
return shuttle;
}
@@ -573,39 +594,36 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
}
// Clear methods
public async clearShuttleData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:shuttle:*');
private async clearRedisKeys(pattern: string): Promise<void> {
const keys = await this.redisClient.keys(pattern);
if (keys.length > 0) {
await this.redisClient.del(keys);
}
}
public async clearShuttleData(): Promise<void> {
await this.clearRedisKeys(this.shuttleKeyPattern);
await this.clearShuttleLastStopData();
await this.clearShuttleIsAtStopData();
}
public async clearOrderedStopData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:orderedstop:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
await this.clearRedisKeys(this.orderedStopKeyPattern);
}
public async clearRouteData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:route:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
await this.clearRedisKeys(this.routeKeyPattern);
}
public async clearStopData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:stop:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
await this.clearRedisKeys(this.stopKeyPattern);
}
private async clearShuttleLastStopData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:laststop:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
await this.clearRedisKeys(this.lastStopKeyPattern);
}
private async clearShuttleIsAtStopData(): Promise<void> {
await this.clearRedisKeys(this.shuttleIsAtStopKey);
}
}

View File

@@ -5,6 +5,7 @@ export const ShuttleRepositoryEvent = {
SHUTTLE_UPDATED: "shuttleUpdated",
SHUTTLE_REMOVED: "shuttleRemoved",
SHUTTLE_WILL_ARRIVE_AT_STOP: "shuttleArrivedAtStop",
SHUTTLE_WILL_LEAVE_STOP: "shuttleWillLeaveStop",
} as const;
export type ShuttleRepositoryEventName = typeof ShuttleRepositoryEvent[keyof typeof ShuttleRepositoryEvent];
@@ -12,15 +13,22 @@ export type ShuttleRepositoryEventName = typeof ShuttleRepositoryEvent[keyof typ
export type EtaRemovedEventPayload = IEta;
export type EtaDataClearedEventPayload = IEta[];
export interface WillArriveAtStopPayload {
lastArrival?: ShuttleStopArrival;
currentArrival: ShuttleStopArrival;
export interface ShuttleWillArriveAtStopPayload {
lastStopArrival?: ShuttleStopArrival;
willArriveAt: ShuttleStopArrival;
};
export type ShuttleIsNearStopPayload = ShuttleWillArriveAtStopPayload;
export interface ShuttleWillLeaveStopPayload {
stopArrivalThatShuttleIsLeaving: ShuttleStopArrival;
}
export interface ShuttleRepositoryEventPayloads {
[ShuttleRepositoryEvent.SHUTTLE_UPDATED]: IShuttle,
[ShuttleRepositoryEvent.SHUTTLE_REMOVED]: IShuttle,
[ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP]: WillArriveAtStopPayload,
[ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP]: ShuttleWillArriveAtStopPayload,
[ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP]: ShuttleWillLeaveStopPayload,
}
export type ShuttleRepositoryEventListener<T extends ShuttleRepositoryEventName> = (
@@ -88,10 +96,24 @@ export interface ShuttleGetterRepository extends EventEmitter {
getShuttleLastStopArrival(shuttleId: string): Promise<ShuttleStopArrival | undefined>;
/**
* Check if a shuttle has arrived at a stop within the given delta.
* Returns the stop if the shuttle is at a stop, otherwise undefined.
* @param shuttle
* @param delta - The coordinate delta tolerance (default 0.001)
* Determine if the shuttle is currently at a stop.
* If `true`, then calling `getShuttleLastStopArrival` will get
* the stop the shuttle is currently at.
* @param shuttleId
*/
getArrivedStopIfExists(shuttle: IShuttle, delta?: number): Promise<IStop | undefined>;
checkIfShuttleIsAtStop(shuttleId: string): Promise<boolean>;
/**
* Get the stop that the shuttle is currently at, if it's the shuttle's
* next stop based on the "last stop" the shuttle was at. If there was no
* "last stop" for the shuttle, it may return any stop on the shuttle's route.
*
* @param shuttle
* @param canReturnShuttleCurrentStop If set to true, and the shuttle's "last stop"
* matches the arrived stop, continue to return the arrived stop.
* Otherwise, only return the shuttle's next stop.
* This flag has no effect if the shuttle has not had a "last stop".
* @returns
*/
getArrivedStopIfNextStop(shuttle: IShuttle, canReturnShuttleCurrentStop: boolean): Promise<IStop | undefined>;
}

View File

@@ -20,6 +20,13 @@ import {
export class UnoptimizedInMemoryShuttleRepository
extends EventEmitter
implements ShuttleGetterSetterRepository {
constructor(
readonly shuttleStopArrivalDegreeDelta: number = 0.001,
) {
super()
}
public override on<T extends ShuttleRepositoryEventName>(
event: T,
listener: ShuttleRepositoryEventListener<T>,
@@ -74,6 +81,7 @@ export class UnoptimizedInMemoryShuttleRepository
private orderedStops: IOrderedStop[] = [];
private shuttleLastStopArrivals: Map<string, ShuttleStopArrival> = new Map();
private travelTimeData: Map<string, Array<{ timestamp: number; seconds: number }>> = new Map();
private shuttlesAtStop: Set<string> = new Set();
public async getStops(): Promise<IStop[]> {
return this.stops;
@@ -174,26 +182,60 @@ export class UnoptimizedInMemoryShuttleRepository
shuttle: IShuttle,
travelTimeTimestamp = Date.now(),
) {
const arrivedStop = await this.getArrivedStopIfExists(shuttle);
const isAtStop = await this.checkIfShuttleIsAtStop(shuttle.id);
if (arrivedStop != undefined) {
// stop if same stop
let arrivedStop: IStop | undefined;
if (isAtStop) {
// Allow retrieval of the shuttle's current stop
// Will still return undefined when the shuttle leaves the stop
arrivedStop = await this.getArrivedStopIfNextStop(shuttle, true);
} else {
arrivedStop = await this.getArrivedStopIfNextStop(shuttle, false);
}
// Will not fire *any* events if the same stop
const lastStop = await this.getShuttleLastStopArrival(shuttle.id);
if (lastStop?.stopId === arrivedStop.id) return;
if (lastStop?.stopId === arrivedStop?.id) return;
if (isAtStop) {
if (lastStop) {
this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, {
stopArrivalThatShuttleIsLeaving: lastStop,
});
}
await this.markShuttleAsNotAtStop(shuttle.id);
}
if (arrivedStop) {
// stop if same stop
const shuttleArrival = {
stopId: arrivedStop.id,
timestamp: new Date(travelTimeTimestamp),
shuttleId: shuttle.id,
};
this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, {
lastArrival: lastStop,
currentArrival: shuttleArrival,
lastStopArrival: lastStop,
willArriveAt: shuttleArrival,
});
await this.markShuttleAsAtStop(shuttleArrival.shuttleId);
await this.updateShuttleLastStopArrival(shuttleArrival);
}
}
private async markShuttleAsAtStop(shuttleId: string) {
this.shuttlesAtStop.add(shuttleId);
}
private async markShuttleAsNotAtStop(shuttleId: string) {
this.shuttlesAtStop.delete(shuttleId);
}
public async checkIfShuttleIsAtStop(shuttleId: string) {
return this.shuttlesAtStop.has(shuttleId);
}
private async updateShuttleLastStopArrival(lastStopArrival: ShuttleStopArrival) {
this.shuttleLastStopArrivals.set(lastStopArrival.shuttleId, lastStopArrival);
@@ -225,18 +267,41 @@ export class UnoptimizedInMemoryShuttleRepository
return sum / filteredPoints.length;
}
public async getArrivedStopIfExists(
public async getArrivedStopIfNextStop(
shuttle: IShuttle,
delta = 0.001,
canReturnShuttleCurrentStop: boolean = false,
): Promise<IStop | undefined> {
const degreeDelta = this.shuttleStopArrivalDegreeDelta;
const lastStopArrival = await this.getShuttleLastStopArrival(shuttle.id);
if (lastStopArrival) {
// Return the shuttle's current stop depending on the flag
if (canReturnShuttleCurrentStop) {
const lastStop = await this.getStopById(lastStopArrival.stopId);
if (lastStop && shuttleHasArrivedAtStop(shuttle, lastStop, degreeDelta)) {
return lastStop;
}
}
const lastOrderedStop = await this.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStopArrival.stopId);
const orderedStopAfter = lastOrderedStop?.nextStop;
if (orderedStopAfter) {
const stopAfter = await this.getStopById(orderedStopAfter.stopId);
if (stopAfter && shuttleHasArrivedAtStop(shuttle, stopAfter, degreeDelta)) {
return stopAfter;
}
}
} else {
const orderedStops = await this.getOrderedStopsByRouteId(shuttle.routeId);
for (const orderedStop of orderedStops) {
const stop = await this.getStopById(orderedStop.stopId);
if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, delta)) {
if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, degreeDelta)) {
return stop;
}
}
}
return undefined;
}
@@ -267,6 +332,7 @@ export class UnoptimizedInMemoryShuttleRepository
const shuttle = await this.removeEntityByIdIfExists(shuttleId, this.shuttles);
if (shuttle != null) {
this.emit(ShuttleRepositoryEvent.SHUTTLE_REMOVED, shuttle);
this.shuttlesAtStop.delete(shuttleId);
await this.removeShuttleLastStopIfExists(shuttleId);
}
return shuttle;
@@ -289,6 +355,7 @@ export class UnoptimizedInMemoryShuttleRepository
public async clearShuttleData(): Promise<void> {
this.shuttles = [];
this.shuttlesAtStop.clear();
await this.clearShuttleLastStopData();
}

View File

@@ -565,17 +565,7 @@ describe.each(repositoryImplementations)('$name', (holder) => {
describe("addOrUpdateShuttle with shuttle tracking", () => {
test("updates the shuttle's last stop arrival if shuttle is at a stop", async () => {
const { route, systemId, stop2 } = await setupRouteAndOrderedStops();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop2.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
const { stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
await repository.addOrUpdateShuttle(shuttle);
const lastStop = await repository.getShuttleLastStopArrival(shuttle.id);
@@ -584,20 +574,10 @@ describe.each(repositoryImplementations)('$name', (holder) => {
});
describe("getArrivedStopIfExists", () => {
test("gets the stop that the shuttle is currently at, if exists", async () => {
const { route, systemId, stop2 } = await setupRouteAndOrderedStops();
test("gets any stop that the shuttle is currently at, if the shuttle has not had a last stop", async () => {
const { sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop2.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
const result = await repository.getArrivedStopIfExists(shuttle);
const result = await repository.getArrivedStopIfNextStop(shuttle, false);
expect(result).toBeDefined();
expect(result?.id).toBe("st2");
@@ -605,37 +585,44 @@ describe.each(repositoryImplementations)('$name', (holder) => {
});
test("returns undefined if shuttle is not currently at a stop", async () => {
const { route, systemId } = await setupRouteAndOrderedStops();
const { sampleShuttleNotInRepository } = await setupRouteAndOrderedStops();
const shuttle = { ...sampleShuttleNotInRepository, coordinates: { latitude: 12.5, longitude: 22.5 } }; // Not at any stop
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: { latitude: 12.5, longitude: 22.5 },
orientationInDegrees: 0,
updatedTime: new Date(),
};
const result = await repository.getArrivedStopIfExists(shuttle);
const result = await repository.getArrivedStopIfNextStop(shuttle, false);
expect(result).toBeUndefined();
});
test("only gets the shuttle's next stop if shuttle has previously arrived at a stop", async () => {
const { sampleShuttleNotInRepository: shuttle, stop1, stop2 } = await setupRouteAndOrderedStops();
shuttle.coordinates = stop1.coordinates;
await repository.addOrUpdateShuttle(shuttle);
let result = await repository.getArrivedStopIfNextStop(shuttle, false);
expect(result).toBeUndefined();
shuttle.coordinates = stop2.coordinates;
result = await repository.getArrivedStopIfNextStop(shuttle, false);
expect(result).not.toBeUndefined();
});
test("returns the shuttle's currently arrived stop if flag passed", async () => {
const { sampleShuttleNotInRepository: shuttle, stop1 } = await setupRouteAndOrderedStops();
shuttle.coordinates = stop1.coordinates;
await repository.addOrUpdateShuttle(shuttle);
const result = await repository.getArrivedStopIfNextStop(shuttle, true);
expect(result?.id === stop1.id)
});
});
describe("getShuttleLastStopArrival", () => {
test("gets the shuttle's last stop if existing in the data", async () => {
const { route, systemId, stop1 } = await setupRouteAndOrderedStops();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
const { stop1, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
shuttle.coordinates = stop1.coordinates;
const stopArrivalTime = new Date("2024-01-15T10:30:00Z");
await repository.addOrUpdateShuttle(shuttle, stopArrivalTime.getTime());
@@ -657,17 +644,8 @@ describe.each(repositoryImplementations)('$name', (holder) => {
});
test("returns the most recent stop arrival when updated multiple times", async () => {
const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
const { stop1, stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
shuttle.coordinates = stop1.coordinates;
const firstArrivalTime = new Date("2024-01-15T10:30:00Z");
await repository.addOrUpdateShuttle(shuttle, firstArrivalTime.getTime());
@@ -750,27 +728,19 @@ describe.each(repositoryImplementations)('$name', (holder) => {
describe("SHUTTLE_WILL_ARRIVE_AT_STOP event", () => {
test("emits SHUTTLE_WILL_ARRIVE_AT_STOP event before shuttle arrives at a stop", async () => {
const { route, systemId, stop1 } = await setupRouteAndOrderedStops();
const { stop1, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
const listener = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, listener);
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
shuttle.coordinates = stop1.coordinates;
const arrivalTime = new Date("2024-01-15T10:30:00Z");
await repository.addOrUpdateShuttle(shuttle, arrivalTime.getTime());
expect(listener).toHaveBeenCalledTimes(1);
const emittedPayload = listener.mock.calls[0][0] as any;
expect(emittedPayload.currentArrival).toEqual({
expect(emittedPayload.willArriveAt).toEqual({
shuttleId: shuttle.id,
stopId: stop1.id,
timestamp: arrivalTime,
@@ -778,20 +748,12 @@ describe.each(repositoryImplementations)('$name', (holder) => {
});
test("does not emit event when shuttle is not at a stop", async () => {
const { route, systemId } = await setupRouteAndOrderedStops();
const { sampleShuttleNotInRepository } = await setupRouteAndOrderedStops();
const listener = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, listener);
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: { latitude: 12.5, longitude: 22.5 }, // Not at any stop
orientationInDegrees: 0,
updatedTime: new Date(),
};
const shuttle = { ...sampleShuttleNotInRepository, coordinates: { latitude: 12.5, longitude: 22.5 } }; // Not at any stop
await repository.addOrUpdateShuttle(shuttle);
@@ -799,20 +761,12 @@ describe.each(repositoryImplementations)('$name', (holder) => {
});
test("emits multiple events as shuttle visits multiple stops", async () => {
const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops();
const { stop1, stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
const listener = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, listener);
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
shuttle.coordinates = stop1.coordinates;
const firstArrivalTime = new Date("2024-01-15T10:30:00Z");
await repository.addOrUpdateShuttle(shuttle, firstArrivalTime.getTime());
@@ -824,14 +778,101 @@ describe.each(repositoryImplementations)('$name', (holder) => {
expect(listener).toHaveBeenCalledTimes(2);
const firstPayload = listener.mock.calls[0][0] as any;
expect(firstPayload.currentArrival).toEqual({
expect(firstPayload.willArriveAt).toEqual({
shuttleId: shuttle.id,
stopId: stop1.id,
timestamp: firstArrivalTime,
});
const secondPayload = listener.mock.calls[1][0] as any;
expect(secondPayload.currentArrival).toEqual({
expect(secondPayload.willArriveAt).toEqual({
shuttleId: shuttle.id,
stopId: stop2.id,
timestamp: secondArrivalTime,
});
});
});
describe("SHUTTLE_WILL_LEAVE_STOP event", () => {
test("emits SHUTTLE_WILL_LEAVE_STOP event when shuttle leaves a stop", async () => {
const { stop1, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
shuttle.coordinates = stop1.coordinates;
const listener = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, listener);
// Simulate arrival at stop 1
const arrivalTime = new Date("2024-01-15T10:30:00Z");
await repository.addOrUpdateShuttle(shuttle, arrivalTime.getTime());
// Test that it actually emits the event correctly and not right after the shuttle arrives
await repository.addOrUpdateShuttle(shuttle, arrivalTime.getTime());
expect(listener).not.toHaveBeenCalled();
shuttle.coordinates = { latitude: 12.5, longitude: 22.5 }; // Not at any stop
// Simulate leaving stop 1
const leaveTime = new Date("2024-01-15T10:32:00Z");
await repository.addOrUpdateShuttle(shuttle, leaveTime.getTime());
expect(listener).toHaveBeenCalledTimes(1);
const emittedPayload = listener.mock.calls[0][0] as any;
expect(emittedPayload.stopArrivalThatShuttleIsLeaving).toEqual({
shuttleId: shuttle.id,
stopId: stop1.id,
timestamp: arrivalTime,
});
});
test("does not emit event when shuttle was not at a stop", async () => {
const { sampleShuttleNotInRepository } = await setupRouteAndOrderedStops();
const listener = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, listener);
// Start at coordinates not at any stop
const shuttle = { ...sampleShuttleNotInRepository, coordinates: { latitude: 12.5, longitude: 22.5 } };
await repository.addOrUpdateShuttle(shuttle);
// Move to different coordinates; still not at any stop
shuttle.coordinates = { latitude: 13.0, longitude: 23.0 };
await repository.addOrUpdateShuttle(shuttle);
expect(listener).toHaveBeenCalledTimes(0);
});
test("emits multiple events as shuttle leaves multiple stops", async () => {
const { stop1, stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
const listener = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, listener);
// Arrive at stop1
shuttle.coordinates = stop1.coordinates;
const firstArrivalTime = new Date("2024-01-15T10:30:00Z");
await repository.addOrUpdateShuttle(shuttle, firstArrivalTime.getTime());
// Leave stop1 and arrive at stop2
shuttle.coordinates = stop2.coordinates;
const secondArrivalTime = new Date("2024-01-15T10:35:00Z");
await repository.addOrUpdateShuttle(shuttle, secondArrivalTime.getTime());
// Leave stop2
shuttle.coordinates = { latitude: 12.5, longitude: 22.5 }; // Not at any stop
const secondLeaveTime = new Date("2024-01-15T10:40:00Z");
await repository.addOrUpdateShuttle(shuttle, secondLeaveTime.getTime());
expect(listener).toHaveBeenCalledTimes(2);
const firstPayload = listener.mock.calls[0][0] as any;
expect(firstPayload.stopArrivalThatShuttleIsLeaving).toEqual({
shuttleId: shuttle.id,
stopId: stop1.id,
timestamp: firstArrivalTime,
});
const secondPayload = listener.mock.calls[1][0] as any;
expect(secondPayload.stopArrivalThatShuttleIsLeaving).toEqual({
shuttleId: shuttle.id,
stopId: stop2.id,
timestamp: secondArrivalTime,

View File

@@ -1,5 +1,5 @@
import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository";
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, WillArriveAtStopPayload } from "../ShuttleGetterRepository";
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, ShuttleWillArriveAtStopPayload, ShuttleWillLeaveStopPayload } from "../ShuttleGetterRepository";
import { BaseInMemoryETARepository } from "./BaseInMemoryETARepository";
import { IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities";
import { ETARepositoryEvent } from "./ETAGetterRepository";
@@ -8,6 +8,8 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository
private referenceTime: Date | null = null;
private travelTimeData: Map<string, Array<{ timestamp: number; seconds: number }>> = new Map();
private isListening = false;
constructor(
readonly shuttleRepository: ShuttleGetterRepository
) {
@@ -16,8 +18,12 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository
this.setReferenceTime = this.setReferenceTime.bind(this);
this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this);
this.startListeningForUpdates = this.startListeningForUpdates.bind(this);
this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this);
this.handleShuttleWillArriveAtStop = this.handleShuttleWillArriveAtStop.bind(this);
this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this);
this.updateCascadingEta = this.updateCascadingEta.bind(this);
this.getAverageTravelTimeSecondsWithFallbacks = this.getAverageTravelTimeSecondsWithFallbacks.bind(this);
this.removeEtaIfExists = this.removeEtaIfExists.bind(this);
this.handleShuttleWillLeaveStop = this.handleShuttleWillLeaveStop.bind(this);
}
setReferenceTime(referenceTime: Date): void {
@@ -51,13 +57,23 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository
}
startListeningForUpdates(): void {
if (this.isListening) {
console.warn("Already listening to updates; did you call startListeningForUpdates twice?");
return;
}
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop);
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop);
this.isListening = true;
}
stopListeningForUpdates(): void {
if (!this.isListening) {
return;
}
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop);
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop);
this.isListening = false;
}
private async getAverageTravelTimeSecondsWithFallbacks(
@@ -74,9 +90,25 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository
}
private async handleShuttleUpdate(shuttle: IShuttle): Promise<void> {
const isAtStop = await this.shuttleRepository.checkIfShuttleIsAtStop(shuttle.id);
const lastStop = await this.shuttleRepository.getShuttleLastStopArrival(shuttle.id);
if (!lastStop) return;
if (isAtStop) {
// Update the ETA *to* the stop the shuttle is currently at,
// before starting from the current stop as normal.
// Account for cases where the shuttle arrived way earlier than
// expected based on the calculated ETA.
await this.addOrUpdateEta({
secondsRemaining: 1,
shuttleId: shuttle.id,
stopId: lastStop.stopId,
systemId: shuttle.systemId,
updatedTime: new Date(),
});
}
const lastOrderedStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId);
await this.updateCascadingEta({
@@ -157,14 +189,9 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository
}
private async handleShuttleWillArriveAtStop({
lastArrival,
currentArrival,
}: WillArriveAtStopPayload): Promise<void> {
const etas = await this.getEtasForShuttleId(currentArrival.shuttleId);
for (const eta of etas) {
await this.removeEtaIfExists(eta.shuttleId, eta.stopId);
}
lastStopArrival: lastArrival,
willArriveAt: currentArrival,
}: ShuttleWillArriveAtStopPayload): Promise<void> {
if (lastArrival) {
// disallow cases where this gets triggered multiple times
if (lastArrival.stopId === currentArrival.stopId) return;
@@ -181,6 +208,12 @@ export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository
}
}
private async handleShuttleWillLeaveStop({
stopArrivalThatShuttleIsLeaving,
}: ShuttleWillLeaveStopPayload) {
await this.removeEtaIfExists(stopArrivalThatShuttleIsLeaving.shuttleId, stopArrivalThatShuttleIsLeaving.stopId);
}
private async addTravelTimeDataPoint(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
travelTimeSeconds: number,

View File

@@ -1,22 +1,17 @@
import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository";
import { BaseRedisETARepository } from "./BaseRedisETARepository";
import { createClient, RedisClientType } from "redis";
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, WillArriveAtStopPayload } from "../ShuttleGetterRepository";
import { REDIS_RECONNECT_INTERVAL } from "../../../environment";
import { RedisClientType } from "redis";
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, ShuttleWillArriveAtStopPayload, ShuttleWillLeaveStopPayload } from "../ShuttleGetterRepository";
import { IEta, IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities";
import { ETARepositoryEvent } from "./ETAGetterRepository";
import createRedisClientForRepository from "../../../helpers/createRedisClientForRepository";
export class RedisSelfUpdatingETARepository extends BaseRedisETARepository implements SelfUpdatingETARepository {
private isListening = false;
constructor(
readonly shuttleRepository: ShuttleGetterRepository,
redisClient: RedisClientType = createClient({
url: process.env.REDIS_URL,
socket: {
tls: process.env.NODE_ENV === 'production',
rejectUnauthorized: false,
reconnectStrategy: REDIS_RECONNECT_INTERVAL,
},
}),
redisClient: RedisClientType = createRedisClientForRepository(),
private referenceTime: Date | null = null,
) {
super(redisClient);
@@ -29,6 +24,7 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple
this.updateCascadingEta = this.updateCascadingEta.bind(this);
this.getAverageTravelTimeSecondsWithFallbacks = this.getAverageTravelTimeSecondsWithFallbacks.bind(this);
this.removeEtaIfExists = this.removeEtaIfExists.bind(this);
this.handleShuttleWillLeaveStop = this.handleShuttleWillLeaveStop.bind(this);
}
private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => {
@@ -71,14 +67,25 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple
}
}
startListeningForUpdates() {
startListeningForUpdates(): void {
if (this.isListening) {
console.warn("Already listening to updates; did you call startListeningForUpdates twice?");
return;
}
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop)
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop);
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop);
this.isListening = true;
}
stopListeningForUpdates() {
stopListeningForUpdates(): void {
if (!this.isListening) {
return;
}
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop);
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_LEAVE_STOP, this.handleShuttleWillLeaveStop);
this.isListening = false;
}
private async getAverageTravelTimeSecondsWithFallbacks(
@@ -95,9 +102,25 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple
}
private async handleShuttleUpdate(shuttle: IShuttle) {
const isAtStop = await this.shuttleRepository.checkIfShuttleIsAtStop(shuttle.id);
const lastStop = await this.shuttleRepository.getShuttleLastStopArrival(shuttle.id);
if (!lastStop) return;
if (isAtStop) {
// Update the ETA *to* the stop the shuttle is currently at,
// before starting from the current stop as normal.
// Account for cases where the shuttle arrived way earlier than
// expected based on the calculated ETA.
await this.addOrUpdateEta({
secondsRemaining: 1,
shuttleId: shuttle.id,
stopId: lastStop.stopId,
systemId: shuttle.systemId,
updatedTime: new Date(),
});
}
const lastOrderedStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId);
await this.updateCascadingEta({
@@ -179,14 +202,9 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple
private async handleShuttleWillArriveAtStop({
lastArrival,
currentArrival,
}: WillArriveAtStopPayload) {
const etas = await this.getEtasForShuttleId(currentArrival.shuttleId);
for (const eta of etas) {
await this.removeEtaIfExists(eta.shuttleId, eta.stopId);
}
lastStopArrival: lastArrival,
willArriveAt: currentArrival,
}: ShuttleWillArriveAtStopPayload) {
// only update time traveled if last arrival exists
if (lastArrival) {
// disallow cases where this gets triggered multiple times
@@ -204,6 +222,13 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple
}
}
private async handleShuttleWillLeaveStop({
stopArrivalThatShuttleIsLeaving,
}: ShuttleWillLeaveStopPayload) {
await this.removeEtaIfExists(stopArrivalThatShuttleIsLeaving.shuttleId, stopArrivalThatShuttleIsLeaving.stopId);
}
public async addTravelTimeDataPoint(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
travelTimeSeconds: number,

View File

@@ -8,6 +8,7 @@ import { RedisShuttleRepository } from "../../RedisShuttleRepository";
import { UnoptimizedInMemoryShuttleRepository } from "../../UnoptimizedInMemoryShuttleRepository";
import { setupRouteAndOrderedStopsForShuttleRepository } from "../../../../../testHelpers/setupRouteAndOrderedStopsForShuttleRepository";
import { ShuttleGetterSetterRepository } from "../../ShuttleGetterSetterRepository";
import { IShuttle, IStop } from "../../../../entities/ShuttleRepositoryEntities";
class RedisSelfUpdatingETARepositoryHolder implements RepositoryHolder<SelfUpdatingETARepository> {
repo: RedisSelfUpdatingETARepository | undefined;
@@ -53,7 +54,7 @@ class InMemorySelfUpdatingETARepositoryHolder implements RepositoryHolder<SelfUp
const repositoryImplementations = [
new RedisSelfUpdatingETARepositoryHolder(),
new InMemorySelfUpdatingETARepositoryHolder()
new InMemorySelfUpdatingETARepositoryHolder(),
];
describe.each(repositoryImplementations)('$name', (holder) => {
@@ -74,21 +75,45 @@ describe.each(repositoryImplementations)('$name', (holder) => {
return await setupRouteAndOrderedStopsForShuttleRepository(shuttleRepository);
}
async function populateTravelTimeDataForStops({
currentTime,
shuttle,
stop1,
stop2,
stop3,
firstStopArrivalTime = new Date(2025, 0, 1, 11, 0, 0),
secondStopArrivalTime = new Date(2025, 0, 1, 11, 15, 0),
thirdStopArrivalTime = new Date(2025, 0, 1, 11, 20, 0),
}: {
currentTime: Date;
shuttle: IShuttle;
stop1: IStop;
stop2: IStop;
stop3: IStop;
firstStopArrivalTime?: Date;
secondStopArrivalTime?: Date;
thirdStopArrivalTime?: Date;
}) {
repository.setReferenceTime(currentTime);
repository.startListeningForUpdates();
shuttle.coordinates = stop1.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, firstStopArrivalTime.getTime());
shuttle.coordinates = stop2.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, secondStopArrivalTime.getTime());
shuttle.coordinates = stop3.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, thirdStopArrivalTime.getTime());
}
describe("handleShuttleWillArriveAtStop", () => {
test("updates how long the shuttle took to get from one stop to another", async () => {
const { route, systemId, stop2, stop1 } = await setupRouteAndOrderedStops();
const { route, stop2, stop1, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
repository.startListeningForUpdates();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
shuttle.coordinates = stop1.coordinates;
const firstStopArrivalTime = new Date(2025, 0, 1, 12, 0, 0);
await shuttleRepository.addOrUpdateShuttle(shuttle, firstStopArrivalTime.getTime());
@@ -118,33 +143,10 @@ describe.each(repositoryImplementations)('$name', (holder) => {
currentTime: Date,
shuttleSecondArrivalTimeAtFirstStop: Date
) {
const { route, systemId, stop1, stop2, stop3 } = await setupRouteAndOrderedStops();
const { stop1, stop2, stop3, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
// Populating travel time data
const firstStopArrivalTime = new Date(2025, 0, 1, 11, 0, 0);
const secondStopArrivalTime = new Date(2025, 0, 1, 11, 15, 0);
const thirdStopArrivalTime = new Date(2025, 0, 1, 11, 20, 0);
repository.setReferenceTime(currentTime);
repository.startListeningForUpdates();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
await shuttleRepository.addOrUpdateShuttle(shuttle, firstStopArrivalTime.getTime());
shuttle.coordinates = stop2.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, secondStopArrivalTime.getTime());
shuttle.coordinates = stop3.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, thirdStopArrivalTime.getTime());
await populateTravelTimeDataForStops({ currentTime, shuttle, stop1, stop2, stop3 });
// Populating ETA data
shuttle.coordinates = stop1.coordinates;
@@ -192,23 +194,79 @@ describe.each(repositoryImplementations)('$name', (holder) => {
currentTime, shuttleSecondArrivalTimeAtFirstStop
);
});
test("adds a 'stopgap' entry of 1 second when the shuttle arrives at a stop", async () => {
const { stop1, stop2, stop3, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
const shuttleSecondArrivalTimeAtFirstStop = new Date(2025, 0, 1, 12, 5, 0);
const currentTime = new Date(shuttleSecondArrivalTimeAtFirstStop.getTime() + 7 * 60 * 1000);
// Populating travel time data
await populateTravelTimeDataForStops({ currentTime, shuttle, stop1, stop2, stop3 });
// Populate ETA data
// Simulate shuttle running early for second stop
shuttle.coordinates = stop1.coordinates;
await shuttleRepository.addOrUpdateShuttle(
shuttle,
shuttleSecondArrivalTimeAtFirstStop.getTime()
);
shuttle.coordinates = stop2.coordinates;
// Call twice to get the ETA repository to read the correct flag
await shuttleRepository.addOrUpdateShuttle(
shuttle,
currentTime.getTime(),
);
await new Promise((resolve) => setTimeout(resolve, 500));
await shuttleRepository.addOrUpdateShuttle(
shuttle,
currentTime.getTime(), // ~8 minutes early
);
await new Promise((resolve) => setTimeout(resolve, 500));
const eta = await repository.getEtaForShuttleAndStopId(shuttle.id, stop2.id);
expect(eta?.secondsRemaining).toEqual(1);
});
});
describe("handleShuttleWillLeaveStop", () => {
test("clears ETA of correct stop on leaving stop", async () => {
const { stop1, stop2, stop3, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
const shuttleSecondArrivalTimeAtFirstStop = new Date(2025, 0, 8, 12, 0, 0);
const shuttleSecondArrivalTimeAtSecondStop = new Date(2025, 0, 8, 12, 15, 0);
const currentTime = new Date(shuttleSecondArrivalTimeAtSecondStop.getTime() + 3 * 60 * 1000);
await populateTravelTimeDataForStops({ currentTime, shuttle, stop1, stop2, stop3 });
// Populating ETA data
shuttle.coordinates = stop1.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, shuttleSecondArrivalTimeAtFirstStop.getTime());
shuttle.coordinates = stop2.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, shuttleSecondArrivalTimeAtSecondStop.getTime());
shuttle.coordinates = { latitude: 12.5, longitude: 12.5 }
await shuttleRepository.addOrUpdateShuttle(shuttle, currentTime.getTime());
await new Promise((resolve) => setTimeout(resolve, 1000));
const etaForStop3 = await repository.getEtaForShuttleAndStopId(shuttle.id, stop3.id);
expect(etaForStop3).not.toBeNull();
const etaForStop2 = await repository.getEtaForShuttleAndStopId(shuttle.id, stop2.id);
expect(etaForStop2).toBeNull();
}, 60000);
});
describe("getAverageTravelTimeSeconds", () => {
test("returns the average travel time when historical data exists", async () => {
const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops();
const { route, stop1, stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
repository.startListeningForUpdates();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
shuttle.coordinates = stop1.coordinates;
const firstStopTime = new Date(2025, 0, 1, 12, 0, 0);
await shuttleRepository.addOrUpdateShuttle(shuttle, firstStopTime.getTime());
@@ -232,19 +290,11 @@ describe.each(repositoryImplementations)('$name', (holder) => {
});
test("returns average of multiple data points", async () => {
const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops();
const { route, stop1, stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
repository.startListeningForUpdates();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
shuttle.coordinates = stop1.coordinates;
// First trip: 10 minutes travel time
await shuttleRepository.addOrUpdateShuttle(shuttle, new Date(2025, 0, 1, 12, 0, 0).getTime());
@@ -288,19 +338,11 @@ describe.each(repositoryImplementations)('$name', (holder) => {
});
test("returns undefined when querying outside the time range of data", async () => {
const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops();
const { route, stop1, stop2, sampleShuttleNotInRepository: shuttle } = await setupRouteAndOrderedStops();
repository.startListeningForUpdates();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
shuttle.coordinates = stop1.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, new Date(2025, 0, 1, 12, 0, 0).getTime());
shuttle.coordinates = stop2.coordinates;

View File

@@ -25,6 +25,7 @@ const systemInfoForTesting: InterchangeSystemBuilderArguments = {
passioSystemId: "263",
parkingSystemId: ChapmanApiBasedParkingRepositoryLoader.id,
useSelfUpdatingEtas: false,
shuttleStopArrivalDegreeDelta: 0.001,
};
export function buildSystemForTesting() {

View File

@@ -1,4 +1,4 @@
import { IOrderedStop, IStop } from "../src/entities/ShuttleRepositoryEntities";
import { IOrderedStop, IShuttle, IStop } from "../src/entities/ShuttleRepositoryEntities";
import { ShuttleGetterSetterRepository } from "../src/repositories/shuttle/ShuttleGetterSetterRepository";
export async function setupRouteAndOrderedStopsForShuttleRepository(
@@ -71,11 +71,22 @@ export async function setupRouteAndOrderedStopsForShuttleRepository(
await shuttleRepository.addOrUpdateOrderedStop(orderedStop2);
await shuttleRepository.addOrUpdateOrderedStop(orderedStop3);
const sampleShuttleNotInRepository: IShuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop2.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
return {
route,
systemId,
stop1,
stop2,
stop3,
sampleShuttleNotInRepository,
};
}

15
tsconfig.build.json Normal file
View File

@@ -0,0 +1,15 @@
// For builds, excludes tests and mocks
{
"compilerOptions": {
"target": "es2016",
"module": "commonjs",
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
"strict": true,
"skipLibCheck": true,
"outDir": "dist",
"sourceMap": true
},
"include": ["src"],
"exclude": ["**/__tests__/*/**", "**/__mocks__/*/**"]
}

View File

@@ -1,3 +1,4 @@
// For type-checking, includes tests and mocks
{
"compilerOptions": {
"target": "es2016",
@@ -10,5 +11,4 @@
"sourceMap": true
},
"include": ["src"],
"exclude": ["**/__tests__/*/**", "**/__mocks__/*/**"]
}