Unify the new ETA functionality across all shuttle repositories

This commit is contained in:
2025-11-10 20:20:39 -08:00
parent e6793572bf
commit 20c97de94d
4 changed files with 197 additions and 23 deletions

View File

@@ -8,24 +8,11 @@ import {
ShuttleRepositoryEventListener, ShuttleRepositoryEventListener,
ShuttleRepositoryEventName, ShuttleRepositoryEventName,
ShuttleRepositoryEventPayloads, ShuttleRepositoryEventPayloads,
ShuttleStopArrival,
ShuttleTravelTimeDataIdentifier,
ShuttleTravelTimeDateFilterArguments,
} from "./ShuttleGetterRepository"; } from "./ShuttleGetterRepository";
export interface ShuttleStopArrival {
stopId: string;
timestamp: Date;
}
export interface ShuttleTravelTimeDataIdentifier {
routeId: string;
fromStopId: string;
toStopId: string;
}
export interface ShuttleTravelTimeDateFilterArguments {
from: Date;
to: Date;
}
export class RedisShuttleRepository extends EventEmitter implements ShuttleGetterSetterRepository { export class RedisShuttleRepository extends EventEmitter implements ShuttleGetterSetterRepository {
protected redisClient; protected redisClient;
@@ -644,12 +631,16 @@ export class RedisShuttleRepository extends EventEmitter implements ShuttleGette
await this.redisClient.hSet(key, this.createRedisHashFromOrderedStop(orderedStop)); await this.redisClient.hSet(key, this.createRedisHashFromOrderedStop(orderedStop));
} }
public async addOrUpdateEta(eta: IEta): Promise<void> { private async addOrUpdateEta(eta: IEta): Promise<void> {
const key = this.createEtaKey(eta.shuttleId, eta.stopId); const key = this.createEtaKey(eta.shuttleId, eta.stopId);
await this.redisClient.hSet(key, this.createRedisHashFromEta(eta)); await this.redisClient.hSet(key, this.createRedisHashFromEta(eta));
this.emit(ShuttleRepositoryEvent.ETA_UPDATED, eta); this.emit(ShuttleRepositoryEvent.ETA_UPDATED, eta);
} }
public async addOrUpdateEtaFromExternalSource(eta: IEta): Promise<void> {
await this.addOrUpdateEta(eta);
}
// Remove methods // Remove methods
public async removeRouteIfExists(routeId: string): Promise<IRoute | null> { public async removeRouteIfExists(routeId: string): Promise<IRoute | null> {
const route = await this.getRouteById(routeId); const route = await this.getRouteById(routeId);

View File

@@ -22,6 +22,22 @@ export type ShuttleRepositoryEventListener<T extends ShuttleRepositoryEventName>
payload: ShuttleRepositoryEventPayloads[T], payload: ShuttleRepositoryEventPayloads[T],
) => void; ) => void;
export interface ShuttleStopArrival {
stopId: string;
timestamp: Date;
}
export interface ShuttleTravelTimeDataIdentifier {
routeId: string;
fromStopId: string;
toStopId: string;
}
export interface ShuttleTravelTimeDateFilterArguments {
from: Date;
to: Date;
}
/** /**
* Shuttle getter repository to be linked to a system. * Shuttle getter repository to be linked to a system.
*/ */
@@ -61,4 +77,19 @@ export interface ShuttleGetterRepository extends EventEmitter {
* @param routeId * @param routeId
*/ */
getOrderedStopsByRouteId(routeId: string): Promise<IOrderedStop[]>; getOrderedStopsByRouteId(routeId: string): Promise<IOrderedStop[]>;
/**
* Get the last stop arrival for a shuttle.
* Returns undefined if no last stop arrival has been recorded.
* @param shuttleId
*/
getShuttleLastStopArrival(shuttleId: string): Promise<ShuttleStopArrival | undefined>;
/**
* Check if a shuttle has arrived at a stop within the given delta.
* Returns the stop if the shuttle is at a stop, otherwise undefined.
* @param shuttle
* @param delta - The coordinate delta tolerance (default 0.001)
*/
getArrivedStopIfExists(shuttle: IShuttle, delta?: number): Promise<IStop | undefined>;
} }

View File

@@ -1,7 +1,7 @@
// If types match closely, we can use TypeScript "casting" // If types match closely, we can use TypeScript "casting"
// to convert from data repo to GraphQL schema // to convert from data repo to GraphQL schema
import { ShuttleGetterRepository } from "./ShuttleGetterRepository"; import { ShuttleGetterRepository, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments } from "./ShuttleGetterRepository";
import { IEta, IOrderedStop, IRoute, IShuttle, IStop } from "../../entities/ShuttleRepositoryEntities"; import { IEta, IOrderedStop, IRoute, IShuttle, IStop } from "../../entities/ShuttleRepositoryEntities";
/** /**
@@ -13,10 +13,16 @@ import { IEta, IOrderedStop, IRoute, IShuttle, IStop } from "../../entities/Shut
export interface ShuttleGetterSetterRepository extends ShuttleGetterRepository { export interface ShuttleGetterSetterRepository extends ShuttleGetterRepository {
// Setter methods // Setter methods
addOrUpdateRoute(route: IRoute): Promise<void>; addOrUpdateRoute(route: IRoute): Promise<void>;
addOrUpdateShuttle(shuttle: IShuttle): Promise<void>; addOrUpdateShuttle(shuttle: IShuttle, travelTimeTimestamp?: number, referenceCurrentTime?: Date): Promise<void>;
addOrUpdateStop(stop: IStop): Promise<void>; addOrUpdateStop(stop: IStop): Promise<void>;
addOrUpdateOrderedStop(orderedStop: IOrderedStop): Promise<void>; addOrUpdateOrderedStop(orderedStop: IOrderedStop): Promise<void>;
addOrUpdateEta(eta: IEta): Promise<void>;
/**
* Add or update an ETA from an external source (e.g., API or test data).
* This bypasses the internal ETA calculation based on shuttle movements.
* Use this for loading ETAs from external APIs or setting test data.
*/
addOrUpdateEtaFromExternalSource(eta: IEta): Promise<void>;
removeRouteIfExists(routeId: string): Promise<IRoute | null>; removeRouteIfExists(routeId: string): Promise<IRoute | null>;
removeShuttleIfExists(shuttleId: string): Promise<IShuttle | null>; removeShuttleIfExists(shuttleId: string): Promise<IShuttle | null>;
@@ -29,4 +35,15 @@ export interface ShuttleGetterSetterRepository extends ShuttleGetterRepository {
clearStopData(): Promise<void>; clearStopData(): Promise<void>;
clearOrderedStopData(): Promise<void>; clearOrderedStopData(): Promise<void>;
clearEtaData(): Promise<void>; clearEtaData(): Promise<void>;
/**
* Get average travel time between two stops based on historical data.
* Returns undefined if no data exists for the specified time range.
* @param identifier - The route and stop IDs to query
* @param dateFilter - The date range to filter data
*/
getAverageTravelTimeSeconds(
identifier: ShuttleTravelTimeDataIdentifier,
dateFilter: ShuttleTravelTimeDateFilterArguments
): Promise<number | undefined>;
} }

View File

@@ -1,12 +1,15 @@
import EventEmitter from "node:events"; import EventEmitter from "node:events";
import { ShuttleGetterSetterRepository } from "./ShuttleGetterSetterRepository"; import { ShuttleGetterSetterRepository } from "./ShuttleGetterSetterRepository";
import { IEta, IOrderedStop, IRoute, IShuttle, IStop } from "../../entities/ShuttleRepositoryEntities"; import { IEta, IOrderedStop, IRoute, IShuttle, IStop, shuttleHasArrivedAtStop } from "../../entities/ShuttleRepositoryEntities";
import { IEntityWithId } from "../../entities/SharedEntities"; import { IEntityWithId } from "../../entities/SharedEntities";
import { import {
ShuttleRepositoryEvent, ShuttleRepositoryEvent,
ShuttleRepositoryEventListener, ShuttleRepositoryEventListener,
ShuttleRepositoryEventName, ShuttleRepositoryEventName,
ShuttleRepositoryEventPayloads, ShuttleRepositoryEventPayloads,
ShuttleStopArrival,
ShuttleTravelTimeDataIdentifier,
ShuttleTravelTimeDateFilterArguments,
} from "./ShuttleGetterRepository"; } from "./ShuttleGetterRepository";
/** /**
@@ -70,6 +73,8 @@ export class UnoptimizedInMemoryShuttleRepository
private shuttles: IShuttle[] = []; private shuttles: IShuttle[] = [];
private etas: IEta[] = []; private etas: IEta[] = [];
private orderedStops: IOrderedStop[] = []; private orderedStops: IOrderedStop[] = [];
private shuttleLastStopArrivals: Map<string, ShuttleStopArrival> = new Map();
private travelTimeData: Map<string, Array<{ timestamp: number; seconds: number }>> = new Map();
public async getStops(): Promise<IStop[]> { public async getStops(): Promise<IStop[]> {
return this.stops; return this.stops;
@@ -144,13 +149,20 @@ export class UnoptimizedInMemoryShuttleRepository
} }
} }
public async addOrUpdateShuttle(shuttle: IShuttle): Promise<void> { public async addOrUpdateShuttle(
shuttle: IShuttle,
travelTimeTimestamp = Date.now(),
referenceCurrentTime = new Date(),
): Promise<void> {
const index = this.shuttles.findIndex((s) => s.id === shuttle.id); const index = this.shuttles.findIndex((s) => s.id === shuttle.id);
if (index !== -1) { if (index !== -1) {
this.shuttles[index] = shuttle; this.shuttles[index] = shuttle;
} else { } else {
this.shuttles.push(shuttle); this.shuttles.push(shuttle);
} }
await this.updateLastStopArrivalAndTravelTimeDataPoints(shuttle, travelTimeTimestamp);
await this.updateEtasBasedOnHistoricalData(shuttle, referenceCurrentTime);
} }
public async addOrUpdateStop(stop: IStop): Promise<void> { public async addOrUpdateStop(stop: IStop): Promise<void> {
@@ -171,7 +183,7 @@ export class UnoptimizedInMemoryShuttleRepository
} }
} }
public async addOrUpdateEta(eta: IEta): Promise<void> { private async addOrUpdateEta(eta: IEta): Promise<void> {
const index = this.etas.findIndex((e) => e.stopId === eta.stopId && e.shuttleId === eta.shuttleId); const index = this.etas.findIndex((e) => e.stopId === eta.stopId && e.shuttleId === eta.shuttleId);
if (index !== -1) { if (index !== -1) {
this.etas[index] = eta; this.etas[index] = eta;
@@ -181,6 +193,129 @@ export class UnoptimizedInMemoryShuttleRepository
this.emit(ShuttleRepositoryEvent.ETA_UPDATED, eta); this.emit(ShuttleRepositoryEvent.ETA_UPDATED, eta);
} }
public async addOrUpdateEtaFromExternalSource(eta: IEta): Promise<void> {
await this.addOrUpdateEta(eta);
}
private async updateEtasBasedOnHistoricalData(
shuttle: IShuttle,
referenceCurrentTime: Date = new Date(),
) {
const oneWeekAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 7 * 1000));
const lastStopArrival = await this.getShuttleLastStopArrival(shuttle.id);
if (lastStopArrival == undefined) return;
const lastOrderedStop = await this.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStopArrival.stopId);
const nextStop = lastOrderedStop?.nextStop;
if (nextStop == null) return;
const travelTimeSeconds = await this.getAverageTravelTimeSeconds({
routeId: shuttle.routeId,
fromStopId: lastStopArrival.stopId,
toStopId: nextStop.stopId,
}, {
from: oneWeekAgo,
to: new Date(oneWeekAgo.getTime() + (60 * 60 * 1000))
});
if (travelTimeSeconds == undefined) return;
const elapsedTimeMs = referenceCurrentTime.getTime() - lastStopArrival.timestamp.getTime();
const predictedTimeSeconds = travelTimeSeconds - (elapsedTimeMs / 1000);
await this.addOrUpdateEta({
secondsRemaining: predictedTimeSeconds,
shuttleId: shuttle.id,
stopId: nextStop.stopId,
systemId: nextStop.systemId,
updatedTime: new Date(),
});
}
private async updateLastStopArrivalAndTravelTimeDataPoints(
shuttle: IShuttle,
travelTimeTimestamp = Date.now(),
) {
const arrivedStop = await this.getArrivedStopIfExists(shuttle);
if (arrivedStop != undefined) {
const lastStopTimestamp = await this.getShuttleLastStopArrival(shuttle.id);
if (lastStopTimestamp != undefined) {
const routeId = shuttle.routeId;
const fromStopId = lastStopTimestamp.stopId;
const toStopId = arrivedStop.id;
const travelTimeSeconds = (travelTimeTimestamp - lastStopTimestamp.timestamp.getTime()) / 1000;
await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId }, travelTimeSeconds, travelTimeTimestamp);
}
await this.updateShuttleLastStopArrival(shuttle.id, {
stopId: arrivedStop.id,
timestamp: new Date(travelTimeTimestamp),
});
}
}
private async addTravelTimeDataPoint(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
travelTimeSeconds: number,
timestamp = Date.now(),
): Promise<void> {
const key = `${routeId}:${fromStopId}:${toStopId}`;
const dataPoints = this.travelTimeData.get(key) || [];
dataPoints.push({ timestamp, seconds: travelTimeSeconds });
this.travelTimeData.set(key, dataPoints);
}
private async updateShuttleLastStopArrival(shuttleId: string, lastStopArrival: ShuttleStopArrival) {
this.shuttleLastStopArrivals.set(shuttleId, lastStopArrival);
}
public async getAverageTravelTimeSeconds(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
{ from, to }: ShuttleTravelTimeDateFilterArguments,
): Promise<number | undefined> {
const key = `${routeId}:${fromStopId}:${toStopId}`;
const dataPoints = this.travelTimeData.get(key);
if (!dataPoints || dataPoints.length === 0) {
return undefined;
}
const fromTimestamp = from.getTime();
const toTimestamp = to.getTime();
const filteredPoints = dataPoints.filter(
(point) => point.timestamp >= fromTimestamp && point.timestamp <= toTimestamp
);
if (filteredPoints.length === 0) {
return undefined;
}
const sum = filteredPoints.reduce((acc, point) => acc + point.seconds, 0);
return sum / filteredPoints.length;
}
public async getArrivedStopIfExists(
shuttle: IShuttle,
delta = 0.001,
): Promise<IStop | undefined> {
const orderedStops = await this.getOrderedStopsByRouteId(shuttle.routeId);
for (const orderedStop of orderedStops) {
const stop = await this.getStopById(orderedStop.stopId);
if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, delta)) {
return stop;
}
}
return undefined;
}
public async getShuttleLastStopArrival(shuttleId: string): Promise<ShuttleStopArrival | undefined> {
return this.shuttleLastStopArrivals.get(shuttleId);
}
private async removeEntityByMatcherIfExists<T>(callback: (value: T) => boolean, arrayToSearchIn: T[]) { private async removeEntityByMatcherIfExists<T>(callback: (value: T) => boolean, arrayToSearchIn: T[]) {
const index = arrayToSearchIn.findIndex(callback); const index = arrayToSearchIn.findIndex(callback);
if (index > -1) { if (index > -1) {