Merge pull request #86 from brendan-ch/feat/self-calculated-etas

feat/self-calculated-etas
This commit is contained in:
2025-11-13 22:38:51 -08:00
committed by GitHub
44 changed files with 2829 additions and 5025 deletions

View File

@@ -1,6 +1,6 @@
# AGENTS.md
# CLAUDE.md
This file provides guidance to coding agents (e.g., Codex CLI, Claude Code, and other AI coding assistants) when working with code in this repository.
This file provides guidance to Claude Code when working with code in this repository.
## Development Commands
@@ -19,17 +19,8 @@ npm run generate
npm run build:dev
```
### Testing
```bash
# Run all tests via npm
npm test
# Run specific test file
npm test -- --testPathPattern=<test-file-name>
# Run tests with coverage
npm test -- --coverage
```
Only use Docker Compose for running tests, and only use `docker compose run test`
to run tests; don't try to run tests for individual files.
## Architecture Overview

View File

@@ -14,6 +14,15 @@ import {
ParkingRepositoryLoaderBuilderArguments
} from "../loaders/parking/buildParkingRepositoryLoaderIfExists";
import { RedisParkingRepository } from "../repositories/parking/RedisParkingRepository";
import { RedisShuttleRepository } from "../repositories/shuttle/RedisShuttleRepository";
import { ShuttleGetterRepository } from "../repositories/shuttle/ShuttleGetterRepository";
import { InMemoryExternalSourceETARepository } from "../repositories/shuttle/eta/InMemoryExternalSourceETARepository";
import { ETAGetterRepository } from "../repositories/shuttle/eta/ETAGetterRepository";
import { RedisSelfUpdatingETARepository } from "../repositories/shuttle/eta/RedisSelfUpdatingETARepository";
import { RedisExternalSourceETARepository } from "../repositories/shuttle/eta/RedisExternalSourceETARepository";
import { InMemorySelfUpdatingETARepository } from "../repositories/shuttle/eta/InMemorySelfUpdatingETARepository";
import { BaseRedisETARepository } from "../repositories/shuttle/eta/BaseRedisETARepository";
import { BaseInMemoryETARepository } from "../repositories/shuttle/eta/BaseInMemoryETARepository";
export interface InterchangeSystemBuilderArguments {
name: string;
@@ -32,6 +41,12 @@ export interface InterchangeSystemBuilderArguments {
* ID for the parking repository ID in the codebase.
*/
parkingSystemId?: string;
/**
* Controls whether to self-calculate ETAs or use the external
* shuttle provider for them.
*/
useSelfUpdatingEtas: boolean
}
export class InterchangeSystem {
@@ -40,6 +55,7 @@ export class InterchangeSystem {
public id: string,
public shuttleTimedDataLoader: TimedApiBasedRepositoryLoader,
public shuttleRepository: ShuttleGetterSetterRepository,
public etaRepository: ETAGetterRepository,
public notificationScheduler: ETANotificationScheduler,
public notificationRepository: NotificationRepository,
public parkingTimedDataLoader: TimedApiBasedRepositoryLoader | null,
@@ -55,28 +71,17 @@ export class InterchangeSystem {
static async build(
args: InterchangeSystemBuilderArguments,
) {
const shuttleRepository = new UnoptimizedInMemoryShuttleRepository();
const shuttleDataLoader = new ApiBasedShuttleRepositoryLoader(
args.passioSystemId,
args.id,
shuttleRepository
);
const timedShuttleDataLoader = new TimedApiBasedRepositoryLoader(
shuttleDataLoader,
);
await timedShuttleDataLoader.start();
const { shuttleRepository, timedShuttleDataLoader, etaRepository } = await InterchangeSystem.buildRedisShuttleLoaderAndRepositories(args);
timedShuttleDataLoader.start();
const notificationRepository = new RedisNotificationRepository();
await notificationRepository.connect();
const notificationScheduler = new ETANotificationScheduler(
const { notificationScheduler, notificationRepository } = await InterchangeSystem.buildNotificationSchedulerAndRepository(
etaRepository,
shuttleRepository,
notificationRepository,
new AppleNotificationSender(),
args.id,
args
);
notificationScheduler.startListeningForUpdates();
let { parkingRepository, timedParkingLoader } = await this.buildRedisParkingLoaderAndRepository(args.parkingSystemId);
let { parkingRepository, timedParkingLoader } = await InterchangeSystem.buildRedisParkingLoaderAndRepository(args.parkingSystemId);
timedParkingLoader?.start();
return new InterchangeSystem(
@@ -84,6 +89,7 @@ export class InterchangeSystem {
args.id,
timedShuttleDataLoader,
shuttleRepository,
etaRepository,
notificationScheduler,
notificationRepository,
timedParkingLoader,
@@ -91,49 +97,54 @@ export class InterchangeSystem {
);
}
/**
* Construct an instance of the class where all composited
* classes are correctly linked, meant for unit tests, and server/app
* integration tests.
* @param args
*/
static buildForTesting(
args: InterchangeSystemBuilderArguments,
) {
const shuttleRepository = new UnoptimizedInMemoryShuttleRepository();
const shuttleDataLoader = new ApiBasedShuttleRepositoryLoader(
args.passioSystemId,
args.id,
shuttleRepository
);
// Note that this loader should not be started,
// so the test data doesn't get overwritten
const timedShuttleLoader = new TimedApiBasedRepositoryLoader(
private static async buildRedisShuttleLoaderAndRepositories(args: InterchangeSystemBuilderArguments) {
const shuttleRepository = new RedisShuttleRepository();
await shuttleRepository.connect();
let etaRepository: BaseRedisETARepository;
let shuttleDataLoader: ApiBasedShuttleRepositoryLoader;
if (args.useSelfUpdatingEtas) {
etaRepository = new RedisSelfUpdatingETARepository(shuttleRepository);
(etaRepository as RedisSelfUpdatingETARepository).startListeningForUpdates();
shuttleDataLoader = new ApiBasedShuttleRepositoryLoader(
args.passioSystemId,
args.id,
shuttleRepository,
);
} else {
etaRepository = new RedisExternalSourceETARepository();
shuttleDataLoader = new ApiBasedShuttleRepositoryLoader(
args.passioSystemId,
args.id,
shuttleRepository,
etaRepository as RedisExternalSourceETARepository,
);
}
await etaRepository.connect();
const timedShuttleDataLoader = new TimedApiBasedRepositoryLoader(
shuttleDataLoader,
);
const notificationRepository = new InMemoryNotificationRepository();
return { shuttleRepository, etaRepository, timedShuttleDataLoader };
}
private static async buildNotificationSchedulerAndRepository(
etaRepository: ETAGetterRepository,
shuttleRepository: ShuttleGetterRepository,
args: InterchangeSystemBuilderArguments
) {
const notificationRepository = new RedisNotificationRepository();
await notificationRepository.connect();
const notificationScheduler = new ETANotificationScheduler(
etaRepository,
shuttleRepository,
notificationRepository,
new AppleNotificationSender(false),
args.id,
);
notificationScheduler.startListeningForUpdates();
let { parkingRepository, timedParkingLoader } = this.buildInMemoryParkingLoaderAndRepository(args.parkingSystemId);
// Timed parking loader is not started here
return new InterchangeSystem(
args.name,
args.id,
timedShuttleLoader,
shuttleRepository,
notificationScheduler,
notificationRepository,
timedParkingLoader,
parkingRepository,
new AppleNotificationSender(),
args.id
);
return { notificationScheduler, notificationRepository };
}
private static async buildRedisParkingLoaderAndRepository(id?: string) {
@@ -161,6 +172,57 @@ export class InterchangeSystem {
return { parkingRepository, timedParkingLoader };
}
/**
* Construct an instance of the class where all composited
* classes are correctly linked, meant for unit tests, and server/app
* integration tests.
* @param args
*/
static buildForTesting(
args: InterchangeSystemBuilderArguments,
) {
const { shuttleRepository, timedShuttleLoader, etaRepository } = InterchangeSystem.buildInMemoryShuttleLoaderAndRepositories(args);
// Timed shuttle loader is not started here
const { notificationScheduler, notificationRepository } = InterchangeSystem.buildInMemoryNotificationSchedulerAndRepository(
etaRepository,
shuttleRepository,
args
);
notificationScheduler.startListeningForUpdates();
let { parkingRepository, timedParkingLoader } = this.buildInMemoryParkingLoaderAndRepository(args.parkingSystemId);
// Timed parking loader is not started here
return new InterchangeSystem(
args.name,
args.id,
timedShuttleLoader,
shuttleRepository,
etaRepository,
notificationScheduler,
notificationRepository,
timedParkingLoader,
parkingRepository,
);
}
private static buildInMemoryNotificationSchedulerAndRepository(
etaRepository: ETAGetterRepository,
shuttleRepository: UnoptimizedInMemoryShuttleRepository,
args: InterchangeSystemBuilderArguments
) {
const notificationRepository = new InMemoryNotificationRepository();
const notificationScheduler = new ETANotificationScheduler(
etaRepository,
shuttleRepository,
notificationRepository,
new AppleNotificationSender(false),
args.id
);
return { notificationScheduler, notificationRepository };
}
private static buildInMemoryParkingLoaderAndRepository(id?: string) {
if (id === undefined) {
return { parkingRepository: null, timedParkingLoader: null };
@@ -184,4 +246,37 @@ export class InterchangeSystem {
return { parkingRepository, timedParkingLoader };
}
private static buildInMemoryShuttleLoaderAndRepositories(args: InterchangeSystemBuilderArguments) {
const shuttleRepository = new UnoptimizedInMemoryShuttleRepository();
let etaRepository: BaseInMemoryETARepository;
let shuttleDataLoader: ApiBasedShuttleRepositoryLoader;
if (args.useSelfUpdatingEtas) {
etaRepository = new InMemorySelfUpdatingETARepository(shuttleRepository);
(etaRepository as InMemorySelfUpdatingETARepository).startListeningForUpdates();
shuttleDataLoader = new ApiBasedShuttleRepositoryLoader(
args.passioSystemId,
args.id,
shuttleRepository,
);
} else {
etaRepository = new InMemoryExternalSourceETARepository();
shuttleDataLoader = new ApiBasedShuttleRepositoryLoader(
args.passioSystemId,
args.id,
shuttleRepository,
etaRepository as InMemoryExternalSourceETARepository,
);
}
// Note that this loader should not be started,
// so the test data doesn't get overwritten
const timedShuttleLoader = new TimedApiBasedRepositoryLoader(
shuttleDataLoader
);
return { shuttleRepository, etaRepository, timedShuttleLoader };
}
}

View File

@@ -37,3 +37,19 @@ export interface IOrderedStop extends IEntityWithTimestamp {
systemId: string;
}
/**
* Checks if a shuttle has arrived at a stop based on coordinate proximity.
* Uses a threshold of 0.001 degrees (~111 meters at the equator).
*/
export function shuttleHasArrivedAtStop(
shuttle: IShuttle,
stop: IStop,
delta = 0.001
) {
const isWithinLatitudeRange = shuttle.coordinates.latitude > stop.coordinates.latitude - delta
&& shuttle.coordinates.latitude < stop.coordinates.latitude + delta;
const isWithinLongitudeRange = shuttle.coordinates.longitude > stop.coordinates.longitude - delta
&& shuttle.coordinates.longitude < stop.coordinates.longitude + delta
return isWithinLatitudeRange && isWithinLongitudeRange;
}

View File

@@ -0,0 +1,65 @@
import { describe, expect, it } from "@jest/globals";
import { shuttleHasArrivedAtStop, IShuttle, IStop } from "../ShuttleRepositoryEntities";
describe("shuttleHasArrivedAtStop", () => {
const baseStop: IStop = {
id: "stop1",
name: "Test Stop",
systemId: "263",
coordinates: {
latitude: 33.7963,
longitude: -117.8540,
},
updatedTime: new Date(),
};
const createShuttle = (latitude: number, longitude: number): IShuttle => ({
id: "shuttle1",
name: "Test Shuttle",
routeId: "route1",
systemId: "263",
coordinates: { latitude, longitude },
orientationInDegrees: 0,
updatedTime: new Date(),
});
it("returns false when shuttle is above latitude range", () => {
const shuttle = createShuttle(
baseStop.coordinates.latitude + 0.0011,
baseStop.coordinates.longitude
);
expect(shuttleHasArrivedAtStop(shuttle, baseStop)).toBe(false);
});
it("returns false when shuttle is below latitude range", () => {
const shuttle = createShuttle(
baseStop.coordinates.latitude - 0.0011,
baseStop.coordinates.longitude
);
expect(shuttleHasArrivedAtStop(shuttle, baseStop)).toBe(false);
});
it("returns false when shuttle is to left of longitude range", () => {
const shuttle = createShuttle(
baseStop.coordinates.latitude,
baseStop.coordinates.longitude - 0.0011
);
expect(shuttleHasArrivedAtStop(shuttle, baseStop)).toBe(false);
});
it("returns false when shuttle is to right of longitude range", () => {
const shuttle = createShuttle(
baseStop.coordinates.latitude,
baseStop.coordinates.longitude + 0.0011
);
expect(shuttleHasArrivedAtStop(shuttle, baseStop)).toBe(false);
});
it("returns true when shuttle is in the range", () => {
const shuttle = createShuttle(
baseStop.coordinates.latitude + 0.0005,
baseStop.coordinates.longitude - 0.0005
);
expect(shuttleHasArrivedAtStop(shuttle, baseStop)).toBe(true);
});
});

View File

@@ -23,6 +23,7 @@ const supportedSystems: InterchangeSystemBuilderArguments[] = [
passioSystemId: "263",
parkingSystemId: ChapmanApiBasedParkingRepositoryLoader.id,
name: "Chapman University",
useSelfUpdatingEtas: true,
}
]

View File

@@ -4,6 +4,7 @@ import { ShuttleRepositoryLoader } from "./ShuttleRepositoryLoader";
import { ICoordinates, IEntityWithId } from "../../entities/SharedEntities";
import { ApiResponseError } from "../ApiResponseError";
import { SHUTTLE_TO_ROUTE_COORDINATE_MAXIMUM_DISTANCE_MILES } from "../../environment";
import { ExternalSourceETARepository } from "../../repositories/shuttle/eta/ExternalSourceETARepository";
/**
* Class which can load data into a repository from the
@@ -16,7 +17,8 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
constructor(
public passioSystemId: string,
public systemIdForConstructedData: string,
public repository: ShuttleGetterSetterRepository,
public shuttleRepository: ShuttleGetterSetterRepository,
public etaRepository?: ExternalSourceETARepository,
readonly shuttleToRouteCoordinateMaximumDistanceMiles = SHUTTLE_TO_ROUTE_COORDINATE_MAXIMUM_DISTANCE_MILES,
) {
}
@@ -37,7 +39,6 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
// Because ETA method doesn't support pruning yet,
// add a call to the clear method here
await this.repository.clearEtaData();
await this.updateEtaDataForExistingStopsForSystem();
}
@@ -57,16 +58,16 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
private async updateRouteDataInRepository(routes: IRoute[]) {
const routeIdsToPrune = await this.constructExistingEntityIdSet(async () => {
return await this.repository.getRoutes();
return await this.shuttleRepository.getRoutes();
});
await Promise.all(routes.map(async (route) => {
await this.repository.addOrUpdateRoute(route);
await this.shuttleRepository.addOrUpdateRoute(route);
routeIdsToPrune.delete(route.id);
}));
await Promise.all(Array.from(routeIdsToPrune).map(async (routeId) => {
await this.repository.removeRouteIfExists(routeId);
await this.shuttleRepository.removeRouteIfExists(routeId);
}));
}
@@ -122,7 +123,7 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
private async updateStopAndPolylineDataInRepository(json: any) {
const stopIdsToPrune = await this.constructExistingEntityIdSet(async () => {
return await this.repository.getStops();
return await this.shuttleRepository.getStops();
});
await this.updateStopDataForSystemAndApiResponse(json, stopIdsToPrune);
@@ -130,7 +131,7 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
await this.updatePolylineDataForExistingRoutesAndApiResponse(json);
await Promise.all(Array.from(stopIdsToPrune).map(async (stopId) => {
await this.repository.removeStopIfExists(stopId);
await this.shuttleRepository.removeStopIfExists(stopId);
}));
}
@@ -174,16 +175,16 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
private async updateShuttleDataInRepository(shuttles: IShuttle[]) {
const shuttleIdsToPrune = await this.constructExistingEntityIdSet(async () => {
return await this.repository.getShuttles();
return await this.shuttleRepository.getShuttles();
});
await Promise.all(shuttles.map(async (shuttle) => {
await this.repository.addOrUpdateShuttle(shuttle);
await this.shuttleRepository.addOrUpdateShuttle(shuttle);
shuttleIdsToPrune.delete(shuttle.id);
}));
await Promise.all(Array.from(shuttleIdsToPrune).map(async (shuttleId) => {
await this.repository.removeShuttleIfExists(shuttleId);
await this.shuttleRepository.removeShuttleIfExists(shuttleId);
}));
}
@@ -239,7 +240,7 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
}
public async updateEtaDataForExistingStopsForSystem() {
const stops = await this.repository.getStops();
const stops = await this.shuttleRepository.getStops();
await Promise.all(stops.map(async (stop) => {
let stopId = stop.id;
await this.updateEtaDataForStopId(stopId);
@@ -262,7 +263,7 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
private async updateEtaDataInRepository(etas: IEta[]) {
await Promise.all(etas.map(async (eta) => {
await this.repository.addOrUpdateEta(eta);
await this.etaRepository?.addOrUpdateEtaFromExternalSource(eta);
}));
}
@@ -317,7 +318,7 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
updatedTime: new Date(),
};
await this.repository.addOrUpdateStop(constructedStop);
await this.shuttleRepository.addOrUpdateStop(constructedStop);
setOfIdsToPrune.delete(constructedStop.id);
}));
@@ -339,7 +340,7 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
const orderedStopDataArray = jsonOrderedStopData[index];
const stopId = orderedStopDataArray[1];
let constructedOrderedStop = await this.repository.getOrderedStopByRouteAndStopId(routeId, stopId)
let constructedOrderedStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(routeId, stopId)
if (constructedOrderedStop === null) {
constructedOrderedStop = {
routeId,
@@ -369,7 +370,7 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
};
}
await this.repository.addOrUpdateOrderedStop(constructedOrderedStop);
await this.shuttleRepository.addOrUpdateOrderedStop(constructedOrderedStop);
}
}));
}
@@ -380,7 +381,7 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
await Promise.all(Object.keys(json.routePoints).map(async (routeId) => {
const routePoints = json.routePoints[routeId][0];
const existingRoute = await this.repository.getRouteById(routeId);
const existingRoute = await this.shuttleRepository.getRouteById(routeId);
if (!existingRoute) return;
existingRoute.polylineCoordinates = routePoints.map((point: any) => {
@@ -390,7 +391,7 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
};
});
await this.repository.addOrUpdateRoute(existingRoute);
await this.shuttleRepository.addOrUpdateRoute(existingRoute);
}))
}
}
@@ -399,7 +400,7 @@ export class ApiBasedShuttleRepositoryLoader implements ShuttleRepositoryLoader
let filteredShuttles: IShuttle[] = [];
await Promise.all(shuttles.map(async (shuttle) => {
const route = await this.repository.getRouteById(shuttle.routeId);
const route = await this.shuttleRepository.getRouteById(shuttle.routeId);
if (route != null) {
let closestDistanceMiles = Number.MAX_VALUE;

View File

@@ -9,7 +9,7 @@ import { generateMockRoutes, generateMockShuttles, generateMockStops } from "../
import {
fetchShuttleDataSuccessfulResponse
} from "../../../../testHelpers/jsonSnapshots/fetchShuttleData/fetchShuttleDataSuccessfulResponse";
import { fetchEtaDataSuccessfulResponse } from "../../../../testHelpers/jsonSnapshots/fetchEtaData/fetchEtaDataSuccessfulResponse";
import { InMemoryExternalSourceETARepository } from "../../../repositories/shuttle/eta/InMemoryExternalSourceETARepository";
import {
resetGlobalFetchMockJson,
updateGlobalFetchMockJson,
@@ -38,7 +38,6 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
updateRouteDataForSystem: jest.spyOn(loader, "updateRouteDataForSystem"),
updateStopAndPolylineDataForRoutesInSystem: jest.spyOn(loader, "updateStopAndPolylineDataForRoutesInSystem"),
updateShuttleDataForSystem: jest.spyOn(loader, "updateShuttleDataForSystemBasedOnProximityToRoutes"),
updateEtaDataForExistingStopsForSystem: jest.spyOn(loader, "updateEtaDataForExistingStopsForSystem"),
};
Object.values(spies).forEach((spy: any) => {
@@ -60,7 +59,7 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
const routesToPrune = generateMockRoutes();
await Promise.all(routesToPrune.map(async (route) => {
route.systemId = systemId;
await loader.repository.addOrUpdateRoute(route);
await loader.shuttleRepository.addOrUpdateRoute(route);
}));
updateGlobalFetchMockJson(fetchRouteDataSuccessfulResponse);
@@ -69,7 +68,7 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
await loader.updateRouteDataForSystem();
// Assert
const routes = await loader.repository.getRoutes();
const routes = await loader.shuttleRepository.getRoutes();
expect(routes.length).toEqual(fetchRouteDataSuccessfulResponse.all.length)
});
@@ -93,7 +92,7 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
const stopsToPrune = generateMockStops();
await Promise.all(stopsToPrune.map(async (stop) => {
stop.systemId = systemId;
await loader.repository.addOrUpdateStop(stop);
await loader.shuttleRepository.addOrUpdateStop(stop);
}));
updateGlobalFetchMockJson(fetchStopAndPolylineDataSuccessfulResponse);
@@ -102,15 +101,15 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
await loader.updateStopAndPolylineDataForRoutesInSystem();
const stops = await loader.repository.getStops();
const stops = await loader.shuttleRepository.getStops();
expect(stops.length).toEqual(stopsArray.length);
await Promise.all(stops.map(async (stop) => {
const orderedStops = await loader.repository.getOrderedStopsByStopId(stop.id)
const orderedStops = await loader.shuttleRepository.getOrderedStopsByStopId(stop.id)
expect(orderedStops.length).toBeGreaterThan(0);
}));
const routes = await loader.repository.getRoutes();
const routes = await loader.shuttleRepository.getRoutes();
routes.forEach((route) => {
expect(route.polylineCoordinates.length).toBeGreaterThan(0);
});
@@ -152,7 +151,7 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
async function addMockRoutes(routes: IRoute[]) {
await Promise.all(routes.map(async (route) => {
await loader.repository.addOrUpdateRoute(route);
await loader.shuttleRepository.addOrUpdateRoute(route);
}));
}
@@ -162,6 +161,7 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
"263",
"1",
new UnoptimizedInMemoryShuttleRepository(),
new InMemoryExternalSourceETARepository(),
distanceMiles,
);
@@ -177,7 +177,7 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
await loader.updateShuttleDataForSystemBasedOnProximityToRoutes();
const shuttles = await loader.repository.getShuttles();
const shuttles = await loader.shuttleRepository.getShuttles();
expect(shuttles.length).toEqual(busesInResponse.length);
});
@@ -187,6 +187,7 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
"263",
"1",
new UnoptimizedInMemoryShuttleRepository(),
new InMemoryExternalSourceETARepository(),
distanceMiles,
);
@@ -202,7 +203,7 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
await loader.updateShuttleDataForSystemBasedOnProximityToRoutes();
const shuttles = await loader.repository.getShuttles();
const shuttles = await loader.shuttleRepository.getShuttles();
expect(shuttles.length).toEqual(0);
});
@@ -212,6 +213,7 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
"263",
"1",
new UnoptimizedInMemoryShuttleRepository(),
new InMemoryExternalSourceETARepository(),
distanceMiles,
);
@@ -219,7 +221,7 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
const shuttlesToPrune = generateMockShuttles();
await Promise.all(shuttlesToPrune.map(async (shuttle) => {
shuttle.systemId = systemId;
await loader.repository.addOrUpdateShuttle(shuttle);
await loader.shuttleRepository.addOrUpdateShuttle(shuttle);
}));
const routes = generateMockRoutesWithPolylineCoordinates();
@@ -237,7 +239,7 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
await loader.updateShuttleDataForSystemBasedOnProximityToRoutes();
// Old shuttles should be pruned, only API shuttles should remain
const shuttles = await loader.repository.getShuttles();
const shuttles = await loader.shuttleRepository.getShuttles();
const busesInResponse = Object.values(modifiedSuccessfulResponse.buses);
expect(shuttles.length).toEqual(busesInResponse.length);
@@ -257,46 +259,5 @@ describe("ApiBasedShuttleRepositoryLoader", () => {
});
});
});
describe("updateEtaDataForExistingStopsForSystem", () => {
it("calls updateEtaDataForStopId for every stop in repository", async () => {
const spy = jest.spyOn(loader, "updateEtaDataForStopId");
const stops = generateMockStops();
stops.forEach((stop) => {
stop.systemId = "1";
});
await Promise.all(stops.map(async (stop) => {
await loader.repository.addOrUpdateStop(stop);
}));
await loader.updateEtaDataForExistingStopsForSystem();
expect(spy.mock.calls.length).toEqual(stops.length);
});
});
describe("updateEtaDataForStopId", () => {
const stopId = "177666";
it("updates ETA data for stop id if response received", async () => {
updateGlobalFetchMockJson(fetchEtaDataSuccessfulResponse);
// @ts-ignore
const etasFromResponse = fetchEtaDataSuccessfulResponse.ETAs[stopId]
await loader.updateEtaDataForStopId(stopId);
const etas = await loader.repository.getEtasForStopId(stopId);
expect(etas.length).toEqual(etasFromResponse.length);
});
it("throws the correct error if the API response contains no data", async () => {
updateGlobalFetchMockJsonToThrowSyntaxError();
await assertAsyncCallbackThrowsApiResponseError(async () => {
await loader.updateEtaDataForStopId("263");
});
});
});
});

File diff suppressed because it is too large Load Diff

View File

@@ -1,11 +0,0 @@
import { InterchangeSystemBuilderArguments } from "../entities/InterchangeSystem";
import { ChapmanApiBasedParkingRepositoryLoader } from "./parking/ChapmanApiBasedParkingRepositoryLoader";
export const supportedIntegrationTestSystems: InterchangeSystemBuilderArguments[] = [
{
id: "1",
name: "Chapman University",
passioSystemId: "263",
parkingSystemId: ChapmanApiBasedParkingRepositoryLoader.id,
},
];

View File

@@ -1,4 +1,3 @@
import { ShuttleGetterRepository, ShuttleRepositoryEvent } from "../../repositories/shuttle/ShuttleGetterRepository";
import { IEta } from "../../entities/ShuttleRepositoryEntities";
import { AppleNotificationSender, NotificationAlertArguments } from "../senders/AppleNotificationSender";
import {
@@ -6,11 +5,14 @@ import {
ScheduledNotification
} from "../../repositories/notifications/NotificationRepository";
import { InMemoryNotificationRepository } from "../../repositories/notifications/InMemoryNotificationRepository";
import { ETAGetterRepository, ETARepositoryEvent } from "../../repositories/shuttle/eta/ETAGetterRepository";
import { ShuttleGetterRepository } from "../../repositories/shuttle/ShuttleGetterRepository";
export class ETANotificationScheduler {
public static readonly defaultSecondsThresholdForNotificationToFire = 180;
constructor(
private etaRepository: ETAGetterRepository,
private shuttleRepository: ShuttleGetterRepository,
private notificationRepository: NotificationRepository = new InMemoryNotificationRepository(),
private appleNotificationSender = new AppleNotificationSender(),
@@ -26,7 +28,7 @@ export class ETANotificationScheduler {
const shuttle = await this.shuttleRepository.getShuttleById(shuttleId);
const stop = await this.shuttleRepository.getStopById(stopId);
const eta = await this.shuttleRepository.getEtaForShuttleAndStopId(shuttleId, stopId);
const eta = await this.etaRepository.getEtaForShuttleAndStopId(shuttleId, stopId);
if (!shuttle) {
console.warn(`Notification ${notificationData} fell through; no associated shuttle`);
return false;
@@ -90,10 +92,10 @@ export class ETANotificationScheduler {
// The following is a workaround for the constructor being called twice
public startListeningForUpdates() {
this.shuttleRepository.on(ShuttleRepositoryEvent.ETA_UPDATED, this.etaSubscriberCallback);
this.etaRepository.on(ETARepositoryEvent.ETA_UPDATED, this.etaSubscriberCallback);
}
public stopListeningForUpdates() {
this.shuttleRepository.off(ShuttleRepositoryEvent.ETA_UPDATED, this.etaSubscriberCallback);
this.etaRepository.off(ETARepositoryEvent.ETA_UPDATED, this.etaSubscriberCallback);
}
}

View File

@@ -1,6 +1,7 @@
import { beforeEach, describe, expect, it, jest } from "@jest/globals";
import { ETANotificationScheduler } from "../ETANotificationScheduler";
import { UnoptimizedInMemoryShuttleRepository } from "../../../repositories/shuttle/UnoptimizedInMemoryShuttleRepository";
import { InMemoryExternalSourceETARepository } from "../../../repositories/shuttle/eta/InMemoryExternalSourceETARepository";
import { IEta, IShuttle, IStop } from "../../../entities/ShuttleRepositoryEntities";
import { addMockShuttleToRepository, addMockStopToRepository } from "../../../../testHelpers/repositorySetupHelpers";
import { AppleNotificationSender } from "../../senders/AppleNotificationSender";
@@ -26,18 +27,21 @@ async function waitForMilliseconds(ms: number): Promise<void> {
describe("ETANotificationScheduler", () => {
let shuttleRepository: UnoptimizedInMemoryShuttleRepository
let shuttleRepository: UnoptimizedInMemoryShuttleRepository;
let etaRepository: InMemoryExternalSourceETARepository;
let notificationService: ETANotificationScheduler;
let notificationRepository: NotificationRepository;
beforeEach(() => {
shuttleRepository = new UnoptimizedInMemoryShuttleRepository();
notificationRepository = new InMemoryNotificationRepository();
etaRepository = new InMemoryExternalSourceETARepository();
mockNotificationSenderMethods(true);
const appleNotificationSender = new MockAppleNotificationSender(false);
notificationService = new ETANotificationScheduler(
etaRepository,
shuttleRepository,
notificationRepository,
appleNotificationSender,
@@ -80,7 +84,7 @@ describe("ETANotificationScheduler", () => {
// Act
await notificationRepository.addOrUpdateNotification(notificationData1);
await notificationRepository.addOrUpdateNotification(notificationData2);
await shuttleRepository.addOrUpdateEta(eta);
await etaRepository.addOrUpdateEtaFromExternalSource(eta);
// Assert
// Wait for the callback to actually be called
@@ -103,7 +107,7 @@ describe("ETANotificationScheduler", () => {
// Act
await notificationRepository.addOrUpdateNotification(notificationData1);
await shuttleRepository.addOrUpdateEta(eta);
await etaRepository.addOrUpdateEtaFromExternalSource(eta);
// Assert
await waitForMilliseconds(500);
@@ -127,6 +131,7 @@ describe("ETANotificationScheduler", () => {
mockNotificationSenderMethods(false);
const updatedNotificationSender = new MockAppleNotificationSender(false);
notificationService = new ETANotificationScheduler(
etaRepository,
shuttleRepository,
notificationRepository,
updatedNotificationSender,
@@ -136,7 +141,7 @@ describe("ETANotificationScheduler", () => {
// Act
await notificationRepository.addOrUpdateNotification(notificationData1);
await shuttleRepository.addOrUpdateEta(eta);
await etaRepository.addOrUpdateEtaFromExternalSource(eta);
// Assert
// The notification should stay scheduled to be retried once

View File

@@ -1,11 +1,12 @@
import { createClient } from 'redis';
import { createClient, RedisClientType } from 'redis';
import { REDIS_RECONNECT_INTERVAL } from "../environment";
import { EventEmitter } from 'stream';
export abstract class BaseRedisRepository {
export abstract class BaseRedisRepository extends EventEmitter {
protected redisClient;
constructor(
redisClient = createClient({
redisClient: RedisClientType = createClient({
url: process.env.REDIS_URL,
socket: {
tls: process.env.NODE_ENV === 'production',
@@ -14,6 +15,7 @@ export abstract class BaseRedisRepository {
},
}),
) {
super();
this.redisClient = redisClient;
this.redisClient.on('error', (err) => {
console.error(err.stack);
@@ -31,8 +33,4 @@ export abstract class BaseRedisRepository {
public async disconnect() {
await this.redisClient.disconnect();
}
public async clearAllData() {
await this.redisClient.flushAll();
}
}

View File

@@ -9,7 +9,7 @@ import {
import { BaseRedisRepository } from "../BaseRedisRepository";
export class RedisNotificationRepository extends BaseRedisRepository implements NotificationRepository {
private listeners: Listener[] = [];
private notificationListeners: Listener[] = [];
private readonly NOTIFICATION_KEY_PREFIX = 'notification:';
private getNotificationKey = (shuttleId: string, stopId: string): string => {
@@ -23,7 +23,7 @@ export class RedisNotificationRepository extends BaseRedisRepository implements
await this.redisClient.hSet(key, deviceId, secondsThreshold.toString());
this.listeners.forEach((listener: Listener) => {
this.notificationListeners.forEach((listener: Listener) => {
const event: NotificationEvent = {
event: 'addOrUpdate',
notification
@@ -46,7 +46,7 @@ export class RedisNotificationRepository extends BaseRedisRepository implements
await this.redisClient.del(key);
}
this.listeners.forEach((listener) => {
this.notificationListeners.forEach((listener) => {
const event: NotificationEvent = {
event: 'delete',
notification: {
@@ -94,20 +94,20 @@ export class RedisNotificationRepository extends BaseRedisRepository implements
};
public subscribeToNotificationChanges = (listener: Listener): void => {
const index = this.listeners.findIndex(
const index = this.notificationListeners.findIndex(
(existingListener) => existingListener === listener
);
if (index < 0) {
this.listeners.push(listener);
this.notificationListeners.push(listener);
}
};
public unsubscribeFromNotificationChanges = (listener: Listener): void => {
const index = this.listeners.findIndex(
const index = this.notificationListeners.findIndex(
(existingListener) => existingListener === listener
);
if (index >= 0) {
this.listeners.splice(index, 1);
this.notificationListeners.splice(index, 1);
}
};
}

View File

@@ -1,4 +1,5 @@
import { afterEach, beforeEach, describe, expect, it, jest } from "@jest/globals";
import { createClient, RedisClientType } from "redis";
import { InMemoryNotificationRepository } from "../InMemoryNotificationRepository";
import { NotificationEvent, NotificationRepository } from "../NotificationRepository";
import { RedisNotificationRepository } from "../RedisNotificationRepository";
@@ -19,17 +20,21 @@ class InMemoryRepositoryHolder implements RepositoryHolder {
class RedisNotificationRepositoryHolder implements RepositoryHolder {
repo: RedisNotificationRepository | undefined;
redisClient: RedisClientType | undefined;
name = 'RedisNotificationRepository';
factory = async () => {
this.repo = new RedisNotificationRepository();
await this.repo.connect();
this.redisClient = createClient({
url: process.env.REDIS_URL,
});
await this.redisClient.connect();
this.repo = new RedisNotificationRepository(this.redisClient);
return this.repo;
}
teardown = async () => {
if (this.repo) {
await this.repo.clearAllData();
await this.repo.disconnect();
if (this.redisClient) {
await this.redisClient.flushAll();
await this.redisClient.disconnect();
}
}
}

View File

@@ -1,9 +1,9 @@
import { ParkingGetterSetterRepository } from "./ParkingGetterSetterRepository";
import {
IParkingStructure,
IParkingStructureTimestampRecord
IParkingStructure,
IParkingStructureTimestampRecord
} from "../../entities/ParkingRepositoryEntities";
import { HistoricalParkingAverageQueryResult, HistoricalParkingAverageQueryArguments } from "./ParkingGetterRepository";
import { HistoricalParkingAverageQueryResult, HistoricalParkingAverageFilterArguments } from "./ParkingGetterRepository";
import { CircularQueue } from "../../types/CircularQueue";
import { PARKING_LOGGING_INTERVAL_MS } from "../../environment";
@@ -63,7 +63,7 @@ export class InMemoryParkingRepository implements ParkingGetterSetterRepository
return null;
};
getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: HistoricalParkingAverageQueryArguments): Promise<HistoricalParkingAverageQueryResult[]> => {
getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: HistoricalParkingAverageFilterArguments): Promise<HistoricalParkingAverageQueryResult[]> => {
const queue = this.historicalData.get(id);
if (!queue || queue.size() === 0) {
return [];
@@ -107,7 +107,7 @@ export class InMemoryParkingRepository implements ParkingGetterSetterRepository
private calculateAveragesFromRecords = (
records: IParkingStructureTimestampRecord[],
options: HistoricalParkingAverageQueryArguments
options: HistoricalParkingAverageFilterArguments
): HistoricalParkingAverageQueryResult[] => {
const results: HistoricalParkingAverageQueryResult[] = [];
const { from, to, intervalMs } = options;

View File

@@ -1,6 +1,6 @@
import { IParkingStructure } from "../../entities/ParkingRepositoryEntities";
export interface HistoricalParkingAverageQueryArguments {
export interface HistoricalParkingAverageFilterArguments {
from: Date;
to: Date;
intervalMs: number;
@@ -22,5 +22,5 @@ export interface ParkingGetterRepository {
* @param id
* @param options
*/
getHistoricalAveragesOfParkingStructureCounts(id: string, options: HistoricalParkingAverageQueryArguments): Promise<HistoricalParkingAverageQueryResult[]>;
getHistoricalAveragesOfParkingStructureCounts(id: string, options: HistoricalParkingAverageFilterArguments): Promise<HistoricalParkingAverageQueryResult[]>;
}

View File

@@ -1,6 +1,6 @@
import { ParkingGetterSetterRepository } from "./ParkingGetterSetterRepository";
import { IParkingStructure } from "../../entities/ParkingRepositoryEntities";
import { HistoricalParkingAverageQueryResult, HistoricalParkingAverageQueryArguments } from "./ParkingGetterRepository";
import { HistoricalParkingAverageQueryResult, HistoricalParkingAverageFilterArguments } from "./ParkingGetterRepository";
import { BaseRedisRepository } from "../BaseRedisRepository";
import { PARKING_LOGGING_INTERVAL_MS } from "../../environment";
@@ -75,7 +75,7 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
return null;
};
getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: HistoricalParkingAverageQueryArguments): Promise<HistoricalParkingAverageQueryResult[]> => {
getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: HistoricalParkingAverageFilterArguments): Promise<HistoricalParkingAverageQueryResult[]> => {
return this.calculateAveragesFromRecords(id, options);
};
@@ -157,7 +157,7 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
private calculateAveragesFromRecords = async (
id: string,
options: HistoricalParkingAverageQueryArguments
options: HistoricalParkingAverageFilterArguments
): Promise<HistoricalParkingAverageQueryResult[]> => {
const keys = this.createRedisKeys(id);
const { from, to, intervalMs } = options;

View File

@@ -1,17 +1,13 @@
import { afterEach, beforeEach, describe, expect, it, jest } from "@jest/globals";
import { createClient, RedisClientType } from "redis";
import { InMemoryParkingRepository, } from "../InMemoryParkingRepository";
import { IParkingStructure } from "../../../entities/ParkingRepositoryEntities";
import { HistoricalParkingAverageQueryArguments } from "../ParkingGetterRepository";
import { ParkingGetterSetterRepository } from "../ParkingGetterSetterRepository";
import { RedisParkingRepository } from "../RedisParkingRepository";
import { HistoricalParkingAverageFilterArguments } from "../ParkingGetterRepository";
import { RepositoryHolder } from "../../../../testHelpers/RepositoryHolder";
interface RepositoryHolder {
name: string;
factory(): Promise<ParkingGetterSetterRepository>;
teardown(): Promise<void>;
}
class InMemoryParkingRepositoryHolder implements RepositoryHolder {
class InMemoryParkingRepositoryHolder implements RepositoryHolder<ParkingGetterSetterRepository> {
name = 'InMemoryParkingRepository';
factory = async () => {
return new InMemoryParkingRepository();
@@ -19,19 +15,23 @@ class InMemoryParkingRepositoryHolder implements RepositoryHolder {
teardown = async () => {};
}
class RedisParkingRepositoryHolder implements RepositoryHolder {
class RedisParkingRepositoryHolder implements RepositoryHolder<ParkingGetterSetterRepository> {
repo: RedisParkingRepository | undefined;
redisClient: RedisClientType | undefined;
name = 'RedisParkingRepository';
factory = async () => {
this.repo = new RedisParkingRepository();
await this.repo.connect();
this.redisClient = createClient({
url: process.env.REDIS_URL,
});
await this.redisClient.connect();
this.repo = new RedisParkingRepository(this.redisClient);
return this.repo;
};
teardown = async () => {
if (this.repo) {
await this.repo.clearAllData();
await this.repo.disconnect();
if (this.redisClient) {
await this.redisClient.flushAll();
await this.redisClient.disconnect();
}
};
}
@@ -151,7 +151,7 @@ describe.each(repositoryImplementations)('$name', (holder) => {
describe("getHistoricalAveragesOfParkingStructureCounts", () => {
it("should return empty array for non-existent structure or no data", async () => {
const options: HistoricalParkingAverageQueryArguments = {
const options: HistoricalParkingAverageFilterArguments = {
from: new Date(1000),
to: new Date(2000),
intervalMs: 500
@@ -182,7 +182,7 @@ describe.each(repositoryImplementations)('$name', (holder) => {
}
const now = Date.now();
const options: HistoricalParkingAverageQueryArguments = {
const options: HistoricalParkingAverageFilterArguments = {
from: new Date(now - 10000), // Look back 10 seconds
to: new Date(now + 10000), // Look forward 10 seconds
intervalMs: 20000 // Single large interval

View File

@@ -0,0 +1,591 @@
import { ShuttleGetterSetterRepository } from "./ShuttleGetterSetterRepository";
import { IEta, IOrderedStop, IRoute, IShuttle, IStop, shuttleHasArrivedAtStop } from "../../entities/ShuttleRepositoryEntities";
import {
ShuttleRepositoryEvent,
ShuttleRepositoryEventListener,
ShuttleRepositoryEventName,
ShuttleRepositoryEventPayloads,
ShuttleStopArrival,
ShuttleTravelTimeDataIdentifier,
ShuttleTravelTimeDateFilterArguments
} from "./ShuttleGetterRepository";
import { BaseRedisRepository } from "../BaseRedisRepository";
export class RedisShuttleRepository extends BaseRedisRepository implements ShuttleGetterSetterRepository {
get isReady() {
return this.redisClient.isReady;
}
public async connect() {
await this.redisClient.connect();
}
public async disconnect() {
await this.redisClient.disconnect();
}
// EventEmitter override methods for type safety
public override on<T extends ShuttleRepositoryEventName>(
event: T,
listener: ShuttleRepositoryEventListener<T>,
): this;
public override on(event: string | symbol, listener: (...args: any[]) => void): this {
return super.on(event, listener);
}
public override once<T extends ShuttleRepositoryEventName>(
event: T,
listener: ShuttleRepositoryEventListener<T>,
): this;
public override once(event: string | symbol, listener: (...args: any[]) => void): this {
return super.once(event, listener);
}
public override off<T extends ShuttleRepositoryEventName>(
event: T,
listener: ShuttleRepositoryEventListener<T>,
): this;
public override off(event: string | symbol, listener: (...args: any[]) => void): this {
return super.off(event, listener);
}
public override addListener<T extends ShuttleRepositoryEventName>(
event: T,
listener: ShuttleRepositoryEventListener<T>,
): this;
public override addListener(event: string | symbol, listener: (...args: any[]) => void): this {
return super.addListener(event, listener);
}
public override removeListener<T extends ShuttleRepositoryEventName>(
event: T,
listener: ShuttleRepositoryEventListener<T>,
): this;
public override removeListener(event: string | symbol, listener: (...args: any[]) => void): this {
return super.removeListener(event, listener);
}
public override emit<T extends ShuttleRepositoryEventName>(
event: T,
payload: ShuttleRepositoryEventPayloads[T],
): boolean;
public override emit(event: string | symbol, ...args: any[]): boolean {
return super.emit(event, ...args);
}
// Helper methods for Redis key generation
private createStopKey = (stopId: string) => `shuttle:stop:${stopId}`;
private createRouteKey = (routeId: string) => `shuttle:route:${routeId}`;
private createShuttleKey = (shuttleId: string) => `shuttle:shuttle:${shuttleId}`;
private createEtaKey = (shuttleId: string, stopId: string) => `shuttle:eta:${shuttleId}:${stopId}`;
private createOrderedStopKey = (routeId: string, stopId: string) => `shuttle:orderedstop:${routeId}:${stopId}`;
private createShuttleLastStopKey = (shuttleId: string) => `shuttle:laststop:${shuttleId}`;
private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => {
return `shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`;
}
// Helper methods for converting entities to Redis hashes
private createRedisHashFromStop = (stop: IStop): Record<string, string> => ({
id: stop.id,
name: stop.name,
systemId: stop.systemId,
latitude: stop.coordinates.latitude.toString(),
longitude: stop.coordinates.longitude.toString(),
updatedTime: stop.updatedTime.toISOString(),
});
private createStopFromRedisData = (data: Record<string, string>): IStop => ({
id: data.id,
name: data.name,
systemId: data.systemId,
coordinates: {
latitude: parseFloat(data.latitude),
longitude: parseFloat(data.longitude),
},
updatedTime: new Date(data.updatedTime),
});
private createRedisHashFromRoute = (route: IRoute): Record<string, string> => ({
id: route.id,
name: route.name,
color: route.color,
systemId: route.systemId,
polylineCoordinates: JSON.stringify(route.polylineCoordinates),
updatedTime: route.updatedTime.toISOString(),
});
private createRouteFromRedisData = (data: Record<string, string>): IRoute => ({
id: data.id,
name: data.name,
color: data.color,
systemId: data.systemId,
polylineCoordinates: JSON.parse(data.polylineCoordinates),
updatedTime: new Date(data.updatedTime),
});
private createRedisHashFromShuttle = (shuttle: IShuttle): Record<string, string> => ({
id: shuttle.id,
name: shuttle.name,
routeId: shuttle.routeId,
systemId: shuttle.systemId,
latitude: shuttle.coordinates.latitude.toString(),
longitude: shuttle.coordinates.longitude.toString(),
orientationInDegrees: shuttle.orientationInDegrees.toString(),
updatedTime: shuttle.updatedTime.toISOString(),
});
private createShuttleFromRedisData = (data: Record<string, string>): IShuttle => ({
id: data.id,
name: data.name,
routeId: data.routeId,
systemId: data.systemId,
coordinates: {
latitude: parseFloat(data.latitude),
longitude: parseFloat(data.longitude),
},
orientationInDegrees: parseFloat(data.orientationInDegrees),
updatedTime: new Date(data.updatedTime),
});
private createEtaFromRedisData = (data: Record<string, string>): IEta => ({
secondsRemaining: parseFloat(data.secondsRemaining),
shuttleId: data.shuttleId,
stopId: data.stopId,
systemId: data.systemId,
updatedTime: new Date(data.updatedTime),
});
private createRedisHashFromOrderedStop = (orderedStop: IOrderedStop): Record<string, string> => {
const hash: Record<string, string> = {
routeId: orderedStop.routeId,
stopId: orderedStop.stopId,
position: orderedStop.position.toString(),
systemId: orderedStop.systemId,
updatedTime: orderedStop.updatedTime.toISOString(),
};
if (orderedStop.nextStop) {
hash.nextStopRouteId = orderedStop.nextStop.routeId;
hash.nextStopStopId = orderedStop.nextStop.stopId;
}
if (orderedStop.previousStop) {
hash.previousStopRouteId = orderedStop.previousStop.routeId;
hash.previousStopStopId = orderedStop.previousStop.stopId;
}
return hash;
};
private createOrderedStopFromRedisData = (data: Record<string, string>): IOrderedStop => {
const orderedStop: IOrderedStop = {
routeId: data.routeId,
stopId: data.stopId,
position: parseInt(data.position),
systemId: data.systemId,
updatedTime: new Date(data.updatedTime),
};
// Note: We only store the IDs of next/previous stops, not full objects
// to avoid circular references in Redis. These would need to be
// resolved separately if needed.
if (data.nextStopRouteId && data.nextStopStopId) {
orderedStop.nextStop = {
routeId: data.nextStopRouteId,
stopId: data.nextStopStopId,
position: 0, // placeholder
systemId: data.systemId,
updatedTime: new Date(),
};
}
if (data.previousStopRouteId && data.previousStopStopId) {
orderedStop.previousStop = {
routeId: data.previousStopRouteId,
stopId: data.previousStopStopId,
position: 0, // placeholder
systemId: data.systemId,
updatedTime: new Date(),
};
}
return orderedStop;
};
// Getter methods
public async getStops(): Promise<IStop[]> {
const keys = await this.redisClient.keys('shuttle:stop:*');
const stops: IStop[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
stops.push(this.createStopFromRedisData(data));
}
}
return stops;
}
public async getStopById(stopId: string): Promise<IStop | null> {
const key = this.createStopKey(stopId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return null;
}
return this.createStopFromRedisData(data);
}
public async getRoutes(): Promise<IRoute[]> {
const keys = await this.redisClient.keys('shuttle:route:*');
const routes: IRoute[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
routes.push(this.createRouteFromRedisData(data));
}
}
return routes;
}
public async getRouteById(routeId: string): Promise<IRoute | null> {
const key = this.createRouteKey(routeId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return null;
}
return this.createRouteFromRedisData(data);
}
public async getShuttles(): Promise<IShuttle[]> {
const keys = await this.redisClient.keys('shuttle:shuttle:*');
const shuttles: IShuttle[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
shuttles.push(this.createShuttleFromRedisData(data));
}
}
return shuttles;
}
public async getShuttlesByRouteId(routeId: string): Promise<IShuttle[]> {
const allShuttles = await this.getShuttles();
return allShuttles.filter(shuttle => shuttle.routeId === routeId);
}
public async getShuttleById(shuttleId: string): Promise<IShuttle | null> {
const key = this.createShuttleKey(shuttleId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return null;
}
return this.createShuttleFromRedisData(data);
}
public async getEtasForShuttleId(shuttleId: string): Promise<IEta[]> {
const keys = await this.redisClient.keys(`shuttle:eta:${shuttleId}:*`);
const etas: IEta[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
etas.push(this.createEtaFromRedisData(data));
}
}
return etas;
}
public async getEtasForStopId(stopId: string): Promise<IEta[]> {
const keys = await this.redisClient.keys('shuttle:eta:*');
const etas: IEta[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0 && data.stopId === stopId) {
etas.push(this.createEtaFromRedisData(data));
}
}
return etas;
}
public async getEtaForShuttleAndStopId(shuttleId: string, stopId: string): Promise<IEta | null> {
const key = this.createEtaKey(shuttleId, stopId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return null;
}
return this.createEtaFromRedisData(data);
}
public async getOrderedStopByRouteAndStopId(routeId: string, stopId: string): Promise<IOrderedStop | null> {
const key = this.createOrderedStopKey(routeId, stopId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return null;
}
return this.createOrderedStopFromRedisData(data);
}
public async getOrderedStopsByStopId(stopId: string): Promise<IOrderedStop[]> {
const keys = await this.redisClient.keys('shuttle:orderedstop:*');
const orderedStops: IOrderedStop[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0 && data.stopId === stopId) {
orderedStops.push(this.createOrderedStopFromRedisData(data));
}
}
return orderedStops;
}
public async getOrderedStopsByRouteId(routeId: string): Promise<IOrderedStop[]> {
const keys = await this.redisClient.keys(`shuttle:orderedstop:${routeId}:*`);
const orderedStops: IOrderedStop[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
orderedStops.push(this.createOrderedStopFromRedisData(data));
}
}
return orderedStops;
}
// Setter/update methods
public async addOrUpdateRoute(route: IRoute): Promise<void> {
const key = this.createRouteKey(route.id);
await this.redisClient.hSet(key, this.createRedisHashFromRoute(route));
}
public async addOrUpdateShuttle(
shuttle: IShuttle,
travelTimeTimestamp = Date.now(),
): Promise<void> {
const key = this.createShuttleKey(shuttle.id);
await this.redisClient.hSet(key, this.createRedisHashFromShuttle(shuttle));
this.emit(ShuttleRepositoryEvent.SHUTTLE_UPDATED, shuttle);
await this.updateLastStopArrival(shuttle, travelTimeTimestamp);
}
private async updateLastStopArrival(
shuttle: IShuttle,
travelTimeTimestamp = Date.now(),
) {
const arrivedStop = await this.getArrivedStopIfExists(shuttle);
if (arrivedStop) {
// stop if same stop
const lastStop = await this.getShuttleLastStopArrival(shuttle.id);
if (lastStop?.stopId === arrivedStop.id) return;
const shuttleArrival = {
stopId: arrivedStop.id,
timestamp: new Date(travelTimeTimestamp),
shuttleId: shuttle.id,
};
this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, {
lastArrival: lastStop,
currentArrival: shuttleArrival,
});
await this.updateShuttleLastStopArrival(shuttleArrival);
}
}
public async getAverageTravelTimeSeconds(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
{ from, to }: ShuttleTravelTimeDateFilterArguments,
): Promise<number | undefined> {
const timeSeriesKey = this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId);
const fromTimestamp = from.getTime();
const toTimestamp = to.getTime();
const intervalMs = toTimestamp - fromTimestamp + 1;
try {
const aggregationResult = await this.redisClient.sendCommand([
'TS.RANGE',
timeSeriesKey,
fromTimestamp.toString(),
toTimestamp.toString(),
'AGGREGATION',
'AVG',
intervalMs.toString()
]) as [string, string][];
if (aggregationResult && aggregationResult.length > 0) {
const [, averageValue] = aggregationResult[0];
return parseFloat(averageValue);
}
return;
} catch (error) {
console.warn(`Failed to get average travel time: ${error instanceof Error ? error.message : String(error)}`);
return;
}
}
/**
* Get the stop that the shuttle is currently at, if it exists.
*
* If the shuttle has a "last stop", it will only return the stop
* directly after the last stop. Otherwise, it may return any stop that
* is on the shuttle's route.
*
* @param shuttle
* @param delta
* @returns
*/
public async getArrivedStopIfExists(
shuttle: IShuttle,
delta = 0.001,
): Promise<IStop | undefined> {
const lastStop = await this.getShuttleLastStopArrival(shuttle.id);
if (lastStop) {
const lastOrderedStop = await this.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId);
const orderedStopAfter = lastOrderedStop?.nextStop;
if (orderedStopAfter) {
const stopAfter = await this.getStopById(orderedStopAfter.stopId);
if (stopAfter && shuttleHasArrivedAtStop(shuttle, stopAfter, delta)) {
return stopAfter;
}
}
} else {
const orderedStops = await this.getOrderedStopsByRouteId(shuttle.routeId);
for (const orderedStop of orderedStops) {
const stop = await this.getStopById(orderedStop.stopId);
if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, delta)) {
return stop;
}
}
}
return undefined;
}
public async getShuttleLastStopArrival(shuttleId: string): Promise<ShuttleStopArrival | undefined> {
const key = this.createShuttleLastStopKey(shuttleId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return undefined;
}
return {
shuttleId,
stopId: data.stopId,
timestamp: new Date(data.timestamp),
};
}
private async updateShuttleLastStopArrival(lastStopArrival: ShuttleStopArrival) {
const key = this.createShuttleLastStopKey(lastStopArrival.shuttleId);
await this.redisClient.hSet(key, {
stopId: lastStopArrival.stopId,
timestamp: lastStopArrival.timestamp.toISOString(),
});
}
public async addOrUpdateStop(stop: IStop): Promise<void> {
const key = this.createStopKey(stop.id);
await this.redisClient.hSet(key, this.createRedisHashFromStop(stop));
}
public async addOrUpdateOrderedStop(orderedStop: IOrderedStop): Promise<void> {
const key = this.createOrderedStopKey(orderedStop.routeId, orderedStop.stopId);
await this.redisClient.hSet(key, this.createRedisHashFromOrderedStop(orderedStop));
}
// Remove methods
public async removeRouteIfExists(routeId: string): Promise<IRoute | null> {
const route = await this.getRouteById(routeId);
if (route) {
const key = this.createRouteKey(routeId);
await this.redisClient.del(key);
return route;
}
return null;
}
public async removeShuttleIfExists(shuttleId: string): Promise<IShuttle | null> {
const shuttle = await this.getShuttleById(shuttleId);
if (shuttle) {
const key = this.createShuttleKey(shuttleId);
await this.redisClient.del(key);
this.emit(ShuttleRepositoryEvent.SHUTTLE_REMOVED, shuttle);
return shuttle;
}
return null;
}
public async removeStopIfExists(stopId: string): Promise<IStop | null> {
const stop = await this.getStopById(stopId);
if (stop) {
const key = this.createStopKey(stopId);
await this.redisClient.del(key);
return stop;
}
return null;
}
public async removeOrderedStopIfExists(stopId: string, routeId: string): Promise<IOrderedStop | null> {
const orderedStop = await this.getOrderedStopByRouteAndStopId(routeId, stopId);
if (orderedStop) {
const key = this.createOrderedStopKey(routeId, stopId);
await this.redisClient.del(key);
return orderedStop;
}
return null;
}
// Clear methods
public async clearShuttleData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:shuttle:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
}
public async clearOrderedStopData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:orderedstop:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
}
public async clearRouteData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:route:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
}
public async clearStopData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:stop:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
}
}

View File

@@ -2,9 +2,9 @@ import { IEta, IOrderedStop, IRoute, IShuttle, IStop } from "../../entities/Shut
import type EventEmitter from "node:events";
export const ShuttleRepositoryEvent = {
ETA_UPDATED: "etaUpdated",
ETA_REMOVED: "etaRemoved",
ETA_DATA_CLEARED: "etaDataCleared",
SHUTTLE_UPDATED: "shuttleUpdated",
SHUTTLE_REMOVED: "shuttleRemoved",
SHUTTLE_WILL_ARRIVE_AT_STOP: "shuttleArrivedAtStop",
} as const;
export type ShuttleRepositoryEventName = typeof ShuttleRepositoryEvent[keyof typeof ShuttleRepositoryEvent];
@@ -12,16 +12,38 @@ export type ShuttleRepositoryEventName = typeof ShuttleRepositoryEvent[keyof typ
export type EtaRemovedEventPayload = IEta;
export type EtaDataClearedEventPayload = IEta[];
export interface WillArriveAtStopPayload {
lastArrival?: ShuttleStopArrival;
currentArrival: ShuttleStopArrival;
};
export interface ShuttleRepositoryEventPayloads {
[ShuttleRepositoryEvent.ETA_UPDATED]: IEta;
[ShuttleRepositoryEvent.ETA_REMOVED]: EtaRemovedEventPayload;
[ShuttleRepositoryEvent.ETA_DATA_CLEARED]: EtaDataClearedEventPayload;
[ShuttleRepositoryEvent.SHUTTLE_UPDATED]: IShuttle,
[ShuttleRepositoryEvent.SHUTTLE_REMOVED]: IShuttle,
[ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP]: WillArriveAtStopPayload,
}
export type ShuttleRepositoryEventListener<T extends ShuttleRepositoryEventName> = (
payload: ShuttleRepositoryEventPayloads[T],
) => void;
export interface ShuttleStopArrival {
shuttleId: string;
stopId: string;
timestamp: Date;
}
export interface ShuttleTravelTimeDataIdentifier {
routeId: string;
fromStopId: string;
toStopId: string;
}
export interface ShuttleTravelTimeDateFilterArguments {
from: Date;
to: Date;
}
/**
* Shuttle getter repository to be linked to a system.
*/
@@ -36,10 +58,6 @@ export interface ShuttleGetterRepository extends EventEmitter {
getShuttleById(shuttleId: string): Promise<IShuttle | null>;
getShuttlesByRouteId(routeId: string): Promise<IShuttle[]>;
getEtasForShuttleId(shuttleId: string): Promise<IEta[]>;
getEtasForStopId(stopId: string): Promise<IEta[]>;
getEtaForShuttleAndStopId(shuttleId: string, stopId: string): Promise<IEta | null>;
on<T extends ShuttleRepositoryEventName>(event: T, listener: ShuttleRepositoryEventListener<T>): this;
once<T extends ShuttleRepositoryEventName>(event: T, listener: ShuttleRepositoryEventListener<T>): this;
off<T extends ShuttleRepositoryEventName>(event: T, listener: ShuttleRepositoryEventListener<T>): this;
@@ -61,4 +79,19 @@ export interface ShuttleGetterRepository extends EventEmitter {
* @param routeId
*/
getOrderedStopsByRouteId(routeId: string): Promise<IOrderedStop[]>;
/**
* Get the last stop arrival for a shuttle.
* Returns undefined if no last stop arrival has been recorded.
* @param shuttleId
*/
getShuttleLastStopArrival(shuttleId: string): Promise<ShuttleStopArrival | undefined>;
/**
* Check if a shuttle has arrived at a stop within the given delta.
* Returns the stop if the shuttle is at a stop, otherwise undefined.
* @param shuttle
* @param delta - The coordinate delta tolerance (default 0.001)
*/
getArrivedStopIfExists(shuttle: IShuttle, delta?: number): Promise<IStop | undefined>;
}

View File

@@ -1,8 +1,8 @@
// If types match closely, we can use TypeScript "casting"
// to convert from data repo to GraphQL schema
import { ShuttleGetterRepository } from "./ShuttleGetterRepository";
import { IEta, IOrderedStop, IRoute, IShuttle, IStop } from "../../entities/ShuttleRepositoryEntities";
import { ShuttleGetterRepository, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments } from "./ShuttleGetterRepository";
import { IOrderedStop, IRoute, IShuttle, IStop } from "../../entities/ShuttleRepositoryEntities";
/**
* ShuttleGetterRepository interface for data derived from Passio API.
@@ -13,20 +13,28 @@ import { IEta, IOrderedStop, IRoute, IShuttle, IStop } from "../../entities/Shut
export interface ShuttleGetterSetterRepository extends ShuttleGetterRepository {
// Setter methods
addOrUpdateRoute(route: IRoute): Promise<void>;
addOrUpdateShuttle(shuttle: IShuttle): Promise<void>;
addOrUpdateShuttle(shuttle: IShuttle, travelTimeTimestamp?: number, referenceCurrentTime?: Date): Promise<void>;
addOrUpdateStop(stop: IStop): Promise<void>;
addOrUpdateOrderedStop(orderedStop: IOrderedStop): Promise<void>;
addOrUpdateEta(eta: IEta): Promise<void>;
removeRouteIfExists(routeId: string): Promise<IRoute | null>;
removeShuttleIfExists(shuttleId: string): Promise<IShuttle | null>;
removeStopIfExists(stopId: string): Promise<IStop | null>;
removeOrderedStopIfExists(stopId: string, routeId: string): Promise<IOrderedStop | null>;
removeEtaIfExists(shuttleId: string, stopId: string): Promise<IEta | null>;
clearRouteData(): Promise<void>;
clearShuttleData(): Promise<void>;
clearStopData(): Promise<void>;
clearOrderedStopData(): Promise<void>;
clearEtaData(): Promise<void>;
/**
* Get average travel time between two stops based on historical data.
* Returns undefined if no data exists for the specified time range.
* @param identifier - The route and stop IDs to query
* @param dateFilter - The date range to filter data
*/
getAverageTravelTimeSeconds(
identifier: ShuttleTravelTimeDataIdentifier,
dateFilter: ShuttleTravelTimeDateFilterArguments
): Promise<number | undefined>;
}

View File

@@ -1,12 +1,15 @@
import EventEmitter from "node:events";
import { ShuttleGetterSetterRepository } from "./ShuttleGetterSetterRepository";
import { IEta, IOrderedStop, IRoute, IShuttle, IStop } from "../../entities/ShuttleRepositoryEntities";
import { IOrderedStop, IRoute, IShuttle, IStop, shuttleHasArrivedAtStop } from "../../entities/ShuttleRepositoryEntities";
import { IEntityWithId } from "../../entities/SharedEntities";
import {
ShuttleRepositoryEvent,
ShuttleRepositoryEventListener,
ShuttleRepositoryEventName,
ShuttleRepositoryEventPayloads,
ShuttleStopArrival,
ShuttleTravelTimeDataIdentifier,
ShuttleTravelTimeDateFilterArguments,
} from "./ShuttleGetterRepository";
/**
@@ -68,8 +71,9 @@ export class UnoptimizedInMemoryShuttleRepository
private stops: IStop[] = [];
private routes: IRoute[] = [];
private shuttles: IShuttle[] = [];
private etas: IEta[] = [];
private orderedStops: IOrderedStop[] = [];
private shuttleLastStopArrivals: Map<string, ShuttleStopArrival> = new Map();
private travelTimeData: Map<string, Array<{ timestamp: number; seconds: number }>> = new Map();
public async getStops(): Promise<IStop[]> {
return this.stops;
@@ -99,18 +103,6 @@ export class UnoptimizedInMemoryShuttleRepository
return this.findEntityById(shuttleId, this.shuttles);
}
public async getEtasForShuttleId(shuttleId: string): Promise<IEta[]> {
return this.etas.filter(eta => eta.shuttleId === shuttleId);
}
public async getEtasForStopId(stopId: string) {
return this.etas.filter(eta => eta.stopId === stopId);
}
public async getEtaForShuttleAndStopId(shuttleId: string, stopId: string) {
return this.findEntityByMatcher<IEta>((value) => value.stopId === stopId && value.shuttleId === shuttleId, this.etas);
}
public async getOrderedStopByRouteAndStopId(routeId: string, stopId: string) {
return this.findEntityByMatcher<IOrderedStop>((value) => value.routeId === routeId && value.stopId === stopId, this.orderedStops)
}
@@ -144,13 +136,20 @@ export class UnoptimizedInMemoryShuttleRepository
}
}
public async addOrUpdateShuttle(shuttle: IShuttle): Promise<void> {
public async addOrUpdateShuttle(
shuttle: IShuttle,
travelTimeTimestamp = Date.now(),
): Promise<void> {
const index = this.shuttles.findIndex((s) => s.id === shuttle.id);
if (index !== -1) {
this.shuttles[index] = shuttle;
} else {
this.shuttles.push(shuttle);
}
this.emit(ShuttleRepositoryEvent.SHUTTLE_UPDATED, shuttle);
await this.updateLastStopArrival(shuttle, travelTimeTimestamp);
}
public async addOrUpdateStop(stop: IStop): Promise<void> {
@@ -171,14 +170,78 @@ export class UnoptimizedInMemoryShuttleRepository
}
}
public async addOrUpdateEta(eta: IEta): Promise<void> {
const index = this.etas.findIndex((e) => e.stopId === eta.stopId && e.shuttleId === eta.shuttleId);
if (index !== -1) {
this.etas[index] = eta;
} else {
this.etas.push(eta);
private async updateLastStopArrival(
shuttle: IShuttle,
travelTimeTimestamp = Date.now(),
) {
const arrivedStop = await this.getArrivedStopIfExists(shuttle);
if (arrivedStop != undefined) {
// stop if same stop
const lastStop = await this.getShuttleLastStopArrival(shuttle.id);
if (lastStop?.stopId === arrivedStop.id) return;
const shuttleArrival = {
stopId: arrivedStop.id,
timestamp: new Date(travelTimeTimestamp),
shuttleId: shuttle.id,
};
this.emit(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, {
lastArrival: lastStop,
currentArrival: shuttleArrival,
});
await this.updateShuttleLastStopArrival(shuttleArrival);
}
this.emit(ShuttleRepositoryEvent.ETA_UPDATED, eta);
}
private async updateShuttleLastStopArrival(lastStopArrival: ShuttleStopArrival) {
this.shuttleLastStopArrivals.set(lastStopArrival.shuttleId, lastStopArrival);
}
public async getAverageTravelTimeSeconds(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
{ from, to }: ShuttleTravelTimeDateFilterArguments,
): Promise<number | undefined> {
const key = `${routeId}:${fromStopId}:${toStopId}`;
const dataPoints = this.travelTimeData.get(key);
if (!dataPoints || dataPoints.length === 0) {
return undefined;
}
const fromTimestamp = from.getTime();
const toTimestamp = to.getTime();
const filteredPoints = dataPoints.filter(
(point) => point.timestamp >= fromTimestamp && point.timestamp <= toTimestamp
);
if (filteredPoints.length === 0) {
return undefined;
}
const sum = filteredPoints.reduce((acc, point) => acc + point.seconds, 0);
return sum / filteredPoints.length;
}
public async getArrivedStopIfExists(
shuttle: IShuttle,
delta = 0.001,
): Promise<IStop | undefined> {
const orderedStops = await this.getOrderedStopsByRouteId(shuttle.routeId);
for (const orderedStop of orderedStops) {
const stop = await this.getStopById(orderedStop.stopId);
if (stop != null && shuttleHasArrivedAtStop(shuttle, stop, delta)) {
return stop;
}
}
return undefined;
}
public async getShuttleLastStopArrival(shuttleId: string): Promise<ShuttleStopArrival | undefined> {
return this.shuttleLastStopArrivals.get(shuttleId);
}
private async removeEntityByMatcherIfExists<T>(callback: (value: T) => boolean, arrayToSearchIn: T[]) {
@@ -201,7 +264,11 @@ export class UnoptimizedInMemoryShuttleRepository
}
public async removeShuttleIfExists(shuttleId: string): Promise<IShuttle | null> {
return await this.removeEntityByIdIfExists(shuttleId, this.shuttles);
const shuttle = await this.removeEntityByIdIfExists(shuttleId, this.shuttles);
if (shuttle != null) {
this.emit(ShuttleRepositoryEvent.SHUTTLE_REMOVED, shuttle);
}
return shuttle;
}
public async removeStopIfExists(stopId: string): Promise<IStop | null> {
@@ -215,27 +282,10 @@ export class UnoptimizedInMemoryShuttleRepository
}, this.orderedStops);
}
public async removeEtaIfExists(shuttleId: string, stopId: string): Promise<IEta | null> {
const removedEta = await this.removeEntityByMatcherIfExists((eta) => {
return eta.stopId === stopId
&& eta.shuttleId === shuttleId
}, this.etas);
if (removedEta) {
this.emit(ShuttleRepositoryEvent.ETA_REMOVED, removedEta);
}
return removedEta;
}
public async clearShuttleData(): Promise<void> {
this.shuttles = [];
}
public async clearEtaData(): Promise<void> {
const removedEtas = [...this.etas];
this.etas = [];
this.emit(ShuttleRepositoryEvent.ETA_DATA_CLEARED, removedEtas);
}
public async clearOrderedStopData(): Promise<void> {
this.orderedStops = [];
}

View File

@@ -1,24 +1,63 @@
import { beforeEach, describe, expect, jest, test } from "@jest/globals";
import { afterEach, beforeEach, describe, expect, jest, test } from "@jest/globals";
import { createClient, RedisClientType } from "redis";
import { UnoptimizedInMemoryShuttleRepository } from "../UnoptimizedInMemoryShuttleRepository";
import { ShuttleRepositoryEvent } from "../ShuttleGetterRepository";
import { ShuttleGetterSetterRepository } from "../ShuttleGetterSetterRepository";
import { RedisShuttleRepository } from "../RedisShuttleRepository";
import {
generateMockEtas,
generateMockOrderedStops,
generateMockRoutes,
generateMockShuttles,
generateMockStops,
} from "../../../../testHelpers/mockDataGenerators";
import { RepositoryHolder } from "../../../../testHelpers/RepositoryHolder";
import { setupRouteAndOrderedStopsForShuttleRepository } from "../../../../testHelpers/setupRouteAndOrderedStopsForShuttleRepository";
import { ShuttleRepositoryEvent } from "../ShuttleGetterRepository";
// For repositories created in the future, reuse core testing
// logic from here and differentiate setup (e.g. creating mocks)
// Do this by creating a function which takes a ShuttleGetterRepository
// or ShuttleGetterSetterRepository instance
class UnoptimizedInMemoryShuttleRepositoryHolder implements RepositoryHolder<ShuttleGetterSetterRepository> {
name = 'UnoptimizedInMemoryShuttleRepository';
factory = async () => {
return new UnoptimizedInMemoryShuttleRepository();
};
teardown = async () => {};
}
describe("UnoptimizedInMemoryRepository", () => {
let repository: UnoptimizedInMemoryShuttleRepository;
class RedisShuttleRepositoryHolder implements RepositoryHolder<ShuttleGetterSetterRepository> {
repo: RedisShuttleRepository | undefined;
redisClient: RedisClientType | undefined;
beforeEach(() => {
repository = new UnoptimizedInMemoryShuttleRepository();
name = 'RedisShuttleRepository';
factory = async () => {
this.redisClient = createClient({
url: process.env.REDIS_URL,
});
await this.redisClient.connect();
this.repo = new RedisShuttleRepository(this.redisClient);
return this.repo;
};
teardown = async () => {
if (this.redisClient) {
await this.redisClient.flushAll();
await this.redisClient.disconnect();
}
};
}
const repositoryImplementations = [
new UnoptimizedInMemoryShuttleRepositoryHolder(),
new RedisShuttleRepositoryHolder(),
];
describe.each(repositoryImplementations)('$name', (holder) => {
let repository: ShuttleGetterSetterRepository;
beforeEach(async () => {
repository = await holder.factory();
jest.useRealTimers();
});
afterEach(async () => {
await holder.teardown();
jest.useRealTimers();
});
describe("getStops", () => {
@@ -29,7 +68,8 @@ describe("UnoptimizedInMemoryRepository", () => {
}
const result = await repository.getStops();
expect(result).toEqual(mockStops);
expect(result).toHaveLength(mockStops.length);
expect(result).toEqual(expect.arrayContaining(mockStops));
});
test("returns an empty list if there are no stops for the given system ID", async () => {
@@ -62,7 +102,8 @@ describe("UnoptimizedInMemoryRepository", () => {
}
const result = await repository.getRoutes();
expect(result).toEqual(mockRoutes);
expect(result).toHaveLength(mockRoutes.length);
expect(result).toEqual(expect.arrayContaining(mockRoutes));
});
test("returns an empty list if there are no routes for the system ID", async () => {
@@ -86,6 +127,7 @@ describe("UnoptimizedInMemoryRepository", () => {
expect(result).toBeNull();
});
});
describe("getShuttles", () => {
test("gets all shuttles for a specific system ID", async () => {
const mockShuttles = generateMockShuttles();
@@ -94,7 +136,8 @@ describe("UnoptimizedInMemoryRepository", () => {
}
const result = await repository.getShuttles();
expect(result).toEqual(mockShuttles);
expect(result).toHaveLength(mockShuttles.length);
expect(result).toEqual(expect.arrayContaining(mockShuttles));
});
test("returns an empty list if there are no shuttles for the system ID", async () => {
@@ -137,118 +180,6 @@ describe("UnoptimizedInMemoryRepository", () => {
});
});
describe("getEtasForShuttleId", () => {
test("gets ETAs for a specific shuttle ID", async () => {
const mockEtas = generateMockEtas();
for (const eta of mockEtas) {
await repository.addOrUpdateEta(eta);
}
const result = await repository.getEtasForShuttleId("sh1");
expect(result).toEqual(mockEtas.filter((eta) => eta.shuttleId === "sh1"));
});
test("returns an empty list if there are no ETAs for the shuttle ID", async () => {
const result = await repository.getEtasForShuttleId("nonexistent-shuttle");
expect(result).toEqual([]);
});
});
describe("getEtasForStopId", () => {
test("gets ETAs for a specific stop ID", async () => {
const mockEtas = generateMockEtas();
for (const eta of mockEtas) {
await repository.addOrUpdateEta(eta);
}
const result = await repository.getEtasForStopId("st1");
expect(result).toEqual(mockEtas.filter((eta) => eta.stopId === "st1"));
});
test("returns an empty list if there are no ETAs for the stop ID", async () => {
const result = await repository.getEtasForStopId("nonexistent-stop");
expect(result).toEqual([]);
});
});
describe("getEtaForShuttleAndStopId", () => {
test("gets a single ETA for a specific shuttle and stop ID", async () => {
const mockEtas = generateMockEtas();
const mockEta = mockEtas[0];
await repository.addOrUpdateEta(mockEta);
const result = await repository.getEtaForShuttleAndStopId("sh1", "st1");
expect(result).toEqual(mockEta);
});
test("returns null if no ETA matches the shuttle and stop ID", async () => {
const result = await repository.getEtaForShuttleAndStopId("nonexistent-shuttle", "nonexistent-stop");
expect(result).toBeNull();
});
});
describe("on/addListener", () => {
test("notifies listeners if etas have been added or changed", async () => {
const mockListener = jest.fn();
repository.on(ShuttleRepositoryEvent.ETA_UPDATED, mockListener);
const mockEtas = generateMockEtas();
for (const eta of mockEtas) {
await repository.addOrUpdateEta(eta);
}
expect(mockListener).toHaveBeenCalledTimes(mockEtas.length);
expect(mockListener).toHaveBeenCalledWith(mockEtas[0]); // First notification
expect(mockListener).toHaveBeenCalledWith(mockEtas[mockEtas.length - 1]); // Last notification
});
test("does not notify listener if removed", async () => {
const mockListener = jest.fn();
repository.on(ShuttleRepositoryEvent.ETA_UPDATED, mockListener);
const mockEtas = generateMockEtas();
repository.off(ShuttleRepositoryEvent.ETA_UPDATED, mockListener);
await repository.addOrUpdateEta(mockEtas[0]);
expect(mockListener).toHaveBeenCalledTimes(0);
});
});
describe("off/removeListener", () => {
test("stops notifying listeners after etas have stopped changing", async () => {
const mockListener = jest.fn(); // Jest mock function to simulate a listener
repository.on(ShuttleRepositoryEvent.ETA_UPDATED, mockListener);
const mockEtas = generateMockEtas();
await repository.addOrUpdateEta(mockEtas[0]);
repository.off(ShuttleRepositoryEvent.ETA_UPDATED, mockListener);
await repository.addOrUpdateEta(mockEtas[mockEtas.length - 1]);
expect(mockListener).toHaveBeenCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith(mockEtas[0]); // First notification
expect(mockListener).not.toHaveBeenCalledWith(mockEtas[mockEtas.length - 1]); // Last notification
});
test("does not remove listener if wrong reference", async () => {
const mockListener = jest.fn();
repository.on(ShuttleRepositoryEvent.ETA_UPDATED, mockListener);
const mockEtas = generateMockEtas();
repository.off(ShuttleRepositoryEvent.ETA_UPDATED, () => {});
await repository.addOrUpdateEta(mockEtas[0]);
expect(mockListener).toHaveBeenCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith(mockEtas[0]);
});
})
describe("ETA update events", () => {
});
describe("getOrderedStopByRouteAndStopId", () => {
test("gets an ordered stop by route ID and stop ID", async () => {
const mockOrderedStops = generateMockOrderedStops();
@@ -277,7 +208,9 @@ describe("UnoptimizedInMemoryRepository", () => {
}
const result = await repository.getOrderedStopsByStopId("st1");
expect(result).toEqual(mockOrderedStops.filter((os) => os.stopId === "st1"));
const expected = mockOrderedStops.filter((os) => os.stopId === "st1");
expect(result).toHaveLength(expected.length);
expect(result).toEqual(expect.arrayContaining(expected));
});
test("returns an empty list if there are no ordered stops for the stop ID", async () => {
@@ -294,7 +227,9 @@ describe("UnoptimizedInMemoryRepository", () => {
}
const result = await repository.getOrderedStopsByRouteId("r1");
expect(result).toEqual(mockOrderedStops.filter((os) => os.routeId === "r1"));
const expected = mockOrderedStops.filter((os) => os.routeId === "r1");
expect(result).toHaveLength(expected.length);
expect(result).toEqual(expect.arrayContaining(expected));
});
test("returns an empty list if there are no ordered stops for the route ID", async () => {
@@ -403,30 +338,6 @@ describe("UnoptimizedInMemoryRepository", () => {
});
});
describe("addOrUpdateEta", () => {
test("adds a new ETA if nonexistent", async () => {
const mockEtas = generateMockEtas();
const newEta = mockEtas[0];
await repository.addOrUpdateEta(newEta);
const result = await repository.getEtasForShuttleId(newEta.shuttleId);
expect(result).toEqual([newEta]);
});
test("updates an existing ETA if it exists", async () => {
const mockEtas = generateMockEtas();
const existingEta = mockEtas[0];
const updatedEta = structuredClone(existingEta);
updatedEta.secondsRemaining = existingEta.secondsRemaining + 60;
await repository.addOrUpdateEta(existingEta);
await repository.addOrUpdateEta(updatedEta);
const result = await repository.getEtasForShuttleId(existingEta.shuttleId);
expect(result).toEqual([updatedEta]);
});
});
describe("removeRouteIfExists", () => {
test("removes route given ID", async () => {
@@ -554,54 +465,6 @@ describe("UnoptimizedInMemoryRepository", () => {
});
});
describe("removeEtaIfExists", () => {
test("removes eta given shuttle ID and stop ID", async () => {
let mockEtas = generateMockEtas();
const stopId = mockEtas[0].stopId;
mockEtas = mockEtas.filter((eta) => eta.stopId === stopId);
await Promise.all(mockEtas.map(async (eta) => {
eta.stopId = stopId;
await repository.addOrUpdateEta(eta);
}));
const etaToRemove = mockEtas[0];
await repository.removeEtaIfExists(etaToRemove.shuttleId, etaToRemove.stopId);
const remainingEtas = await repository.getEtasForStopId(stopId);
expect(remainingEtas).toHaveLength(mockEtas.length - 1);
});
test("does nothing if eta doesn't exist", async () => {
let mockEtas = generateMockEtas();
const stopId = mockEtas[0].stopId;
mockEtas = mockEtas.filter((eta) => eta.stopId === stopId);
await Promise.all(mockEtas.map(async (eta) => {
eta.stopId = stopId;
await repository.addOrUpdateEta(eta);
}));
await repository.removeEtaIfExists("nonexistent-shuttle-id", "nonexistent-stop-id");
const remainingEtas = await repository.getEtasForStopId(stopId);
expect(remainingEtas).toHaveLength(mockEtas.length);
});
test("emits an eta removed event when an eta is removed", async () => {
const mockEtas = generateMockEtas();
const etaToRemove = mockEtas[0];
const listener = jest.fn();
repository.on(ShuttleRepositoryEvent.ETA_REMOVED, listener);
await repository.addOrUpdateEta(etaToRemove);
await repository.removeEtaIfExists(etaToRemove.shuttleId, etaToRemove.stopId);
expect(listener).toHaveBeenCalledTimes(1);
expect(listener).toHaveBeenCalledWith(etaToRemove);
});
});
describe("clearShuttleData", () => {
test("clears all shuttles from the repository", async () => {
const mockShuttles = generateMockShuttles();
@@ -616,38 +479,10 @@ describe("UnoptimizedInMemoryRepository", () => {
});
});
describe("clearEtaData", () => {
test("clears all ETAs from the repository", async () => {
const mockEtas = generateMockEtas();
for (const eta of mockEtas) {
await repository.addOrUpdateEta(eta);
}
await repository.clearEtaData();
const result = await repository.getEtasForShuttleId("shuttle1");
expect(result).toEqual([]);
});
test("emits an event with the cleared etas", async () => {
const mockEtas = generateMockEtas();
const listener = jest.fn();
repository.on(ShuttleRepositoryEvent.ETA_DATA_CLEARED, listener);
for (const eta of mockEtas) {
await repository.addOrUpdateEta(eta);
}
await repository.clearEtaData();
expect(listener).toHaveBeenCalledTimes(1);
expect(listener).toHaveBeenCalledWith(mockEtas);
});
});
describe("clearOrderedStopData", () => {
test("clears all ordered stops from the repository", async () => {
const mockOrderedStops = await generateMockOrderedStops();
const mockOrderedStops = generateMockOrderedStops();
for (const system of mockOrderedStops) {
await repository.addOrUpdateOrderedStop(system);
}
@@ -686,4 +521,285 @@ describe("UnoptimizedInMemoryRepository", () => {
expect(result).toEqual([]);
});
});
// Helper function for setting up routes and ordered stops for shuttle tracking tests
async function setupRouteAndOrderedStops() {
return await setupRouteAndOrderedStopsForShuttleRepository(repository);
}
describe("addOrUpdateShuttle with shuttle tracking", () => {
test("updates the shuttle's last stop arrival if shuttle is at a stop", async () => {
const { route, systemId, stop2 } = await setupRouteAndOrderedStops();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop2.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
await repository.addOrUpdateShuttle(shuttle);
const lastStop = await repository.getShuttleLastStopArrival(shuttle.id);
expect(lastStop?.stopId).toEqual(stop2.id);
});
});
describe("getArrivedStopIfExists", () => {
test("gets the stop that the shuttle is currently at, if exists", async () => {
const { route, systemId, stop2 } = await setupRouteAndOrderedStops();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop2.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
const result = await repository.getArrivedStopIfExists(shuttle);
expect(result).toBeDefined();
expect(result?.id).toBe("st2");
expect(result?.name).toBe("Stop 2");
});
test("returns undefined if shuttle is not currently at a stop", async () => {
const { route, systemId } = await setupRouteAndOrderedStops();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: { latitude: 12.5, longitude: 22.5 },
orientationInDegrees: 0,
updatedTime: new Date(),
};
const result = await repository.getArrivedStopIfExists(shuttle);
expect(result).toBeUndefined();
});
});
describe("getShuttleLastStopArrival", () => {
test("gets the shuttle's last stop if existing in the data", async () => {
const { route, systemId, stop1 } = await setupRouteAndOrderedStops();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
const stopArrivalTime = new Date("2024-01-15T10:30:00Z");
await repository.addOrUpdateShuttle(shuttle, stopArrivalTime.getTime());
const result = await repository.getShuttleLastStopArrival(shuttle.id);
expect(result).toBeDefined();
expect(result?.stopId).toBe(stop1.id);
expect(result?.timestamp.getTime()).toBe(stopArrivalTime.getTime());
});
test("returns undefined if the data has never been initialized", async () => {
const mockShuttles = generateMockShuttles();
const shuttle = mockShuttles[0];
const result = await repository.getShuttleLastStopArrival(shuttle.id);
expect(result).toBeUndefined();
});
test("returns the most recent stop arrival when updated multiple times", async () => {
const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
const firstArrivalTime = new Date("2024-01-15T10:30:00Z");
await repository.addOrUpdateShuttle(shuttle, firstArrivalTime.getTime());
shuttle.coordinates = stop2.coordinates;
const secondArrivalTime = new Date("2024-01-15T10:35:00Z");
await repository.addOrUpdateShuttle(shuttle, secondArrivalTime.getTime());
const result = await repository.getShuttleLastStopArrival(shuttle.id);
expect(result).toBeDefined();
expect(result?.stopId).toBe(stop2.id);
expect(result?.timestamp.getTime()).toBe(secondArrivalTime.getTime());
});
});
describe("SHUTTLE_UPDATED event", () => {
test("emits SHUTTLE_UPDATED event when shuttles are added or updated", async () => {
const mockListener = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_UPDATED, mockListener);
const mockShuttles = generateMockShuttles();
for (const shuttle of mockShuttles) {
await repository.addOrUpdateShuttle(shuttle);
}
expect(mockListener).toHaveBeenCalledTimes(mockShuttles.length);
expect(mockListener).toHaveBeenCalledWith(mockShuttles[0]);
expect(mockListener).toHaveBeenCalledWith(mockShuttles[mockShuttles.length - 1]);
});
test("does not notify listener after it has been removed", async () => {
const mockListener = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_UPDATED, mockListener);
const mockShuttles = generateMockShuttles();
repository.off(ShuttleRepositoryEvent.SHUTTLE_UPDATED, mockListener);
await repository.addOrUpdateShuttle(mockShuttles[0]);
expect(mockListener).toHaveBeenCalledTimes(0);
});
test("stops notifying specific listener after removal but continues for others", async () => {
const mockListener1 = jest.fn();
const mockListener2 = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_UPDATED, mockListener1);
repository.on(ShuttleRepositoryEvent.SHUTTLE_UPDATED, mockListener2);
const mockShuttles = generateMockShuttles();
await repository.addOrUpdateShuttle(mockShuttles[0]);
repository.off(ShuttleRepositoryEvent.SHUTTLE_UPDATED, mockListener1);
await repository.addOrUpdateShuttle(mockShuttles[mockShuttles.length - 1]);
expect(mockListener1).toHaveBeenCalledTimes(1);
expect(mockListener1).toHaveBeenCalledWith(mockShuttles[0]);
expect(mockListener1).not.toHaveBeenCalledWith(mockShuttles[mockShuttles.length - 1]);
expect(mockListener2).toHaveBeenCalledTimes(2);
expect(mockListener2).toHaveBeenCalledWith(mockShuttles[0]);
expect(mockListener2).toHaveBeenCalledWith(mockShuttles[mockShuttles.length - 1]);
});
});
describe("SHUTTLE_REMOVED event", () => {
test("emits SHUTTLE_REMOVED event when a shuttle is removed", async () => {
const mockShuttles = generateMockShuttles();
const shuttleToRemove = mockShuttles[0];
const listener = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_REMOVED, listener);
await repository.addOrUpdateShuttle(shuttleToRemove);
await repository.removeShuttleIfExists(shuttleToRemove.id);
expect(listener).toHaveBeenCalledTimes(1);
expect(listener).toHaveBeenCalledWith(shuttleToRemove);
});
});
describe("SHUTTLE_WILL_ARRIVE_AT_STOP event", () => {
test("emits SHUTTLE_WILL_ARRIVE_AT_STOP event before shuttle arrives at a stop", async () => {
const { route, systemId, stop1 } = await setupRouteAndOrderedStops();
const listener = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, listener);
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
const arrivalTime = new Date("2024-01-15T10:30:00Z");
await repository.addOrUpdateShuttle(shuttle, arrivalTime.getTime());
expect(listener).toHaveBeenCalledTimes(1);
const emittedPayload = listener.mock.calls[0][0] as any;
expect(emittedPayload.currentArrival).toEqual({
shuttleId: shuttle.id,
stopId: stop1.id,
timestamp: arrivalTime,
});
});
test("does not emit event when shuttle is not at a stop", async () => {
const { route, systemId } = await setupRouteAndOrderedStops();
const listener = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, listener);
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: { latitude: 12.5, longitude: 22.5 }, // Not at any stop
orientationInDegrees: 0,
updatedTime: new Date(),
};
await repository.addOrUpdateShuttle(shuttle);
expect(listener).toHaveBeenCalledTimes(0);
});
test("emits multiple events as shuttle visits multiple stops", async () => {
const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops();
const listener = jest.fn();
repository.on(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, listener);
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
const firstArrivalTime = new Date("2024-01-15T10:30:00Z");
await repository.addOrUpdateShuttle(shuttle, firstArrivalTime.getTime());
shuttle.coordinates = stop2.coordinates;
const secondArrivalTime = new Date("2024-01-15T10:35:00Z");
await repository.addOrUpdateShuttle(shuttle, secondArrivalTime.getTime());
expect(listener).toHaveBeenCalledTimes(2);
const firstPayload = listener.mock.calls[0][0] as any;
expect(firstPayload.currentArrival).toEqual({
shuttleId: shuttle.id,
stopId: stop1.id,
timestamp: firstArrivalTime,
});
const secondPayload = listener.mock.calls[1][0] as any;
expect(secondPayload.currentArrival).toEqual({
shuttleId: shuttle.id,
stopId: stop2.id,
timestamp: secondArrivalTime,
});
});
});
});

View File

@@ -0,0 +1,76 @@
import { IEta } from "../../../entities/ShuttleRepositoryEntities";
import { ETAGetterRepository, ETARepositoryEvent, ETARepositoryEventListener, ETARepositoryEventName } from "./ETAGetterRepository";
import EventEmitter from "node:events";
export abstract class BaseInMemoryETARepository extends EventEmitter implements ETAGetterRepository {
protected etas: IEta[] = [];
async getEtasForShuttleId(shuttleId: string): Promise<IEta[]> {
return this.etas.filter(eta => eta.shuttleId === shuttleId);
}
async getEtasForStopId(stopId: string): Promise<IEta[]> {
return this.etas.filter(eta => eta.stopId === stopId);
}
async getEtaForShuttleAndStopId(shuttleId: string, stopId: string): Promise<IEta | null> {
const eta = this.etas.find(eta => eta.stopId === stopId && eta.shuttleId === shuttleId);
return eta ?? null;
}
// Protected setter for internal use
protected async addOrUpdateEta(eta: IEta): Promise<void> {
const index = this.etas.findIndex((e) => e.stopId === eta.stopId && e.shuttleId === eta.shuttleId);
if (index !== -1) {
this.etas[index] = eta;
} else {
this.etas.push(eta);
}
this.emit(ETARepositoryEvent.ETA_UPDATED, eta);
}
// EventEmitter overrides for type safety
override on<T extends ETARepositoryEventName>(
event: T,
listener: ETARepositoryEventListener<T>,
): this;
override on(event: string | symbol, listener: (...args: any[]) => void): this {
return super.on(event, listener);
}
override once<T extends ETARepositoryEventName>(
event: T,
listener: ETARepositoryEventListener<T>,
): this;
override once(event: string | symbol, listener: (...args: any[]) => void): this {
return super.once(event, listener);
}
override off<T extends ETARepositoryEventName>(
event: T,
listener: ETARepositoryEventListener<T>,
): this;
override off(event: string | symbol, listener: (...args: any[]) => void): this {
return super.off(event, listener);
}
override addListener<T extends ETARepositoryEventName>(
event: T,
listener: ETARepositoryEventListener<T>,
): this;
override addListener(event: string | symbol, listener: (...args: any[]) => void): this {
return super.addListener(event, listener);
}
override removeListener<T extends ETARepositoryEventName>(
event: T,
listener: ETARepositoryEventListener<T>,
): this;
override removeListener(event: string | symbol, listener: (...args: any[]) => void): this {
return super.removeListener(event, listener);
}
override removeAllListeners(eventName?: string | symbol): this {
return super.removeAllListeners(eventName);
}
}

View File

@@ -0,0 +1,120 @@
import { IEta } from "../../../entities/ShuttleRepositoryEntities";
import { BaseRedisRepository } from "../../BaseRedisRepository";
import { ETAGetterRepository, ETARepositoryEvent, ETARepositoryEventListener, ETARepositoryEventName } from "./ETAGetterRepository";
export abstract class BaseRedisETARepository extends BaseRedisRepository implements ETAGetterRepository {
private static readonly ETA_KEY_PREFIX = 'shuttle:eta:';
// Helper methods
protected createEtaKey = (shuttleId: string, stopId: string) =>
`${BaseRedisETARepository.ETA_KEY_PREFIX}${shuttleId}:${stopId}`;
createRedisHashFromEta = (eta: IEta): Record<string, string> => ({
secondsRemaining: eta.secondsRemaining.toString(),
shuttleId: eta.shuttleId,
stopId: eta.stopId,
systemId: eta.systemId,
updatedTime: eta.updatedTime.toISOString(),
});
createEtaFromRedisData = (data: Record<string, string>): IEta => ({
secondsRemaining: parseFloat(data.secondsRemaining),
shuttleId: data.shuttleId,
stopId: data.stopId,
systemId: data.systemId,
updatedTime: new Date(data.updatedTime),
});
// Getter implementations
async getEtasForShuttleId(shuttleId: string): Promise<IEta[]> {
const keys = await this.redisClient.keys(`${BaseRedisETARepository.ETA_KEY_PREFIX}${shuttleId}:*`);
const etas: IEta[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
etas.push(this.createEtaFromRedisData(data));
}
}
return etas;
}
async getEtasForStopId(stopId: string): Promise<IEta[]> {
const keys = await this.redisClient.keys(`${BaseRedisETARepository.ETA_KEY_PREFIX}*`);
const etas: IEta[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0 && data.stopId === stopId) {
etas.push(this.createEtaFromRedisData(data));
}
}
return etas;
}
async getEtaForShuttleAndStopId(shuttleId: string, stopId: string): Promise<IEta | null> {
const key = this.createEtaKey(shuttleId, stopId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return null;
}
return this.createEtaFromRedisData(data);
}
// Protected setter for internal use
protected async addOrUpdateEta(eta: IEta): Promise<void> {
const key = this.createEtaKey(eta.shuttleId, eta.stopId);
const hash = this.createRedisHashFromEta(eta);
await this.redisClient.hSet(key, hash);
this.emit(ETARepositoryEvent.ETA_UPDATED, eta);
}
// EventEmitter override methods for type safety
override on<T extends ETARepositoryEventName>(
event: T,
listener: ETARepositoryEventListener<T>,
): this;
override on(event: string | symbol, listener: (...args: any[]) => void): this {
return super.on(event, listener);
}
override once<T extends ETARepositoryEventName>(
event: T,
listener: ETARepositoryEventListener<T>,
): this;
override once(event: string | symbol, listener: (...args: any[]) => void): this {
return super.once(event, listener);
}
override off<T extends ETARepositoryEventName>(
event: T,
listener: ETARepositoryEventListener<T>,
): this;
override off(event: string | symbol, listener: (...args: any[]) => void): this {
return super.off(event, listener);
}
override addListener<T extends ETARepositoryEventName>(
event: T,
listener: ETARepositoryEventListener<T>,
): this;
override addListener(event: string | symbol, listener: (...args: any[]) => void): this {
return super.addListener(event, listener);
}
override removeListener<T extends ETARepositoryEventName>(
event: T,
listener: ETARepositoryEventListener<T>,
): this;
override removeListener(event: string | symbol, listener: (...args: any[]) => void): this {
return super.removeListener(event, listener);
}
override removeAllListeners(eventName?: string | symbol): this {
return super.removeAllListeners(eventName);
}
}

View File

@@ -0,0 +1,37 @@
import { EventEmitter } from "stream";
import { IEta } from "../../../entities/ShuttleRepositoryEntities";
// TODO: Remove these events in ShuttleGetterRepository
export const ETARepositoryEvent = {
ETA_UPDATED: "etaUpdated",
ETA_REMOVED: "etaRemoved",
ETA_DATA_CLEARED: "etaDataCleared",
} as const;
export type ETARepositoryEventName = typeof ETARepositoryEvent[keyof typeof ETARepositoryEvent];
export type EtaRemovedEventPayload = IEta;
export type EtaDataClearedEventPayload = IEta[];
export interface ETARepositoryEventPayloads {
[ETARepositoryEvent.ETA_UPDATED]: IEta;
[ETARepositoryEvent.ETA_REMOVED]: EtaRemovedEventPayload;
[ETARepositoryEvent.ETA_DATA_CLEARED]: EtaDataClearedEventPayload;
}
export type ETARepositoryEventListener<T extends ETARepositoryEventName> = (
payload: ETARepositoryEventPayloads[T],
) => void;
export interface ETAGetterRepository extends EventEmitter {
on<T extends ETARepositoryEventName>(event: T, listener: ETARepositoryEventListener<T>): this;
once<T extends ETARepositoryEventName>(event: T, listener: ETARepositoryEventListener<T>): this;
off<T extends ETARepositoryEventName>(event: T, listener: ETARepositoryEventListener<T>): this;
addListener<T extends ETARepositoryEventName>(event: T, listener: ETARepositoryEventListener<T>): this;
removeListener<T extends ETARepositoryEventName>(event: T, listener: ETARepositoryEventListener<T>): this;
getEtasForShuttleId(shuttleId: string): Promise<IEta[]>;
getEtasForStopId(stopId: string): Promise<IEta[]>;
getEtaForShuttleAndStopId(shuttleId: string, stopId: string): Promise<IEta | null>;
}

View File

@@ -0,0 +1,11 @@
import { IEta } from "../../../entities/ShuttleRepositoryEntities";
import { ETAGetterRepository } from "./ETAGetterRepository";
export interface ExternalSourceETARepository extends ETAGetterRepository {
/**
* Add or update an ETA from an external source (e.g., API or test data).
*/
addOrUpdateEtaFromExternalSource(eta: IEta): Promise<void>;
removeEtaIfExists(shuttleId: string, stopId: string): Promise<IEta | null>;
}

View File

@@ -0,0 +1,22 @@
import { IEta } from "../../../entities/ShuttleRepositoryEntities";
import { ExternalSourceETARepository } from "./ExternalSourceETARepository";
import { BaseInMemoryETARepository } from "./BaseInMemoryETARepository";
import { ETARepositoryEvent } from "./ETAGetterRepository";
export class InMemoryExternalSourceETARepository extends BaseInMemoryETARepository implements ExternalSourceETARepository {
async addOrUpdateEtaFromExternalSource(eta: IEta): Promise<void> {
await this.addOrUpdateEta(eta);
}
async removeEtaIfExists(shuttleId: string, stopId: string): Promise<IEta | null> {
const index = this.etas.findIndex((e) => e.stopId === stopId && e.shuttleId === shuttleId);
if (index === -1) {
return null;
}
const removedEta = this.etas[index];
this.etas.splice(index, 1);
this.emit(ETARepositoryEvent.ETA_REMOVED, removedEta);
return removedEta;
}
}

View File

@@ -0,0 +1,205 @@
import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository";
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, WillArriveAtStopPayload } from "../ShuttleGetterRepository";
import { BaseInMemoryETARepository } from "./BaseInMemoryETARepository";
import { IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities";
import { ETARepositoryEvent } from "./ETAGetterRepository";
export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository implements SelfUpdatingETARepository {
private referenceTime: Date | null = null;
private travelTimeData: Map<string, Array<{ timestamp: number; seconds: number }>> = new Map();
constructor(
readonly shuttleRepository: ShuttleGetterRepository
) {
super();
this.setReferenceTime = this.setReferenceTime.bind(this);
this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this);
this.startListeningForUpdates = this.startListeningForUpdates.bind(this);
this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this);
this.handleShuttleWillArriveAtStop = this.handleShuttleWillArriveAtStop.bind(this);
}
setReferenceTime(referenceTime: Date): void {
this.referenceTime = referenceTime;
}
async getAverageTravelTimeSeconds(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
{ from, to }: ShuttleTravelTimeDateFilterArguments
): Promise<number | undefined> {
const key = `${routeId}:${fromStopId}:${toStopId}`;
const dataPoints = this.travelTimeData.get(key);
if (!dataPoints || dataPoints.length === 0) {
return undefined;
}
const fromTimestamp = from.getTime();
const toTimestamp = to.getTime();
const filteredPoints = dataPoints.filter(
(point) => point.timestamp >= fromTimestamp && point.timestamp <= toTimestamp
);
if (filteredPoints.length === 0) {
return undefined;
}
const sum = filteredPoints.reduce((acc, point) => acc + point.seconds, 0);
return sum / filteredPoints.length;
}
startListeningForUpdates(): void {
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop);
}
stopListeningForUpdates(): void {
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop);
}
private async getAverageTravelTimeSecondsWithFallbacks(
identifier: ShuttleTravelTimeDataIdentifier,
dateFilters: ShuttleTravelTimeDateFilterArguments[]
): Promise<number | undefined> {
for (const dateFilter of dateFilters) {
const result = await this.getAverageTravelTimeSeconds(identifier, dateFilter);
if (result !== undefined) {
return result;
}
}
return undefined;
}
private async handleShuttleUpdate(shuttle: IShuttle): Promise<void> {
const lastStop = await this.shuttleRepository.getShuttleLastStopArrival(shuttle.id);
if (!lastStop) return;
const lastOrderedStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId);
await this.updateCascadingEta({
shuttle,
currentStop: lastOrderedStop,
originalStopArrival: lastStop,
});
}
private async updateCascadingEta({
shuttle,
currentStop,
originalStopArrival,
runningTravelTimeSeconds = 0
}: {
shuttle: IShuttle;
currentStop: IOrderedStop | null;
originalStopArrival: ShuttleStopArrival;
runningTravelTimeSeconds?: number;
}) {
if (!currentStop) return;
const nextStop = currentStop?.nextStop;
if (!nextStop) return;
// In case the system we have loops around
if (nextStop.stopId === originalStopArrival.stopId) return;
let referenceCurrentTime = new Date();
if (this.referenceTime != null) {
referenceCurrentTime = this.referenceTime;
}
const oneWeekAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 7 * 1000));
const oneDayAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 1000));
const oneHourAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 1000));
const travelTimeSeconds = await this.getAverageTravelTimeSecondsWithFallbacks({
routeId: shuttle.routeId,
fromStopId: currentStop.stopId,
toStopId: nextStop.stopId,
}, [
{
from: oneWeekAgo,
to: new Date(oneWeekAgo.getTime() + (60 * 60 * 1000))
},
{
from: oneDayAgo,
to: new Date(oneDayAgo.getTime() + (60 * 60 * 1000))
},
{
from: oneHourAgo,
to: new Date(),
}
]);
if (travelTimeSeconds == undefined) return;
const elapsedTimeMs = referenceCurrentTime.getTime() - originalStopArrival.timestamp.getTime();
const predictedTimeSeconds = travelTimeSeconds - (elapsedTimeMs / 1000) + runningTravelTimeSeconds;
await this.addOrUpdateEta({
secondsRemaining: predictedTimeSeconds,
shuttleId: shuttle.id,
stopId: nextStop.stopId,
systemId: nextStop.systemId,
updatedTime: new Date(),
});
const nextStopWithNextNextStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, nextStop.stopId);
await this.updateCascadingEta(
{
shuttle,
currentStop: nextStopWithNextNextStop,
originalStopArrival,
runningTravelTimeSeconds: runningTravelTimeSeconds + travelTimeSeconds,
},
)
}
private async handleShuttleWillArriveAtStop({
lastArrival,
currentArrival,
}: WillArriveAtStopPayload): Promise<void> {
const etas = await this.getEtasForShuttleId(currentArrival.shuttleId);
for (const eta of etas) {
await this.removeEtaIfExists(eta.shuttleId, eta.stopId);
}
if (lastArrival) {
// disallow cases where this gets triggered multiple times
if (lastArrival.stopId === currentArrival.stopId) return;
const shuttle = await this.shuttleRepository.getShuttleById(lastArrival.shuttleId);
if (!shuttle) return;
const routeId = shuttle.routeId;
const fromStopId = lastArrival.stopId;
const toStopId = currentArrival.stopId;
const travelTimeSeconds = (currentArrival.timestamp.getTime() - lastArrival.timestamp.getTime()) / 1000;
await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId }, travelTimeSeconds, currentArrival.timestamp.getTime());
}
}
private async addTravelTimeDataPoint(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
travelTimeSeconds: number,
timestamp = Date.now(),
): Promise<void> {
const key = `${routeId}:${fromStopId}:${toStopId}`;
const dataPoints = this.travelTimeData.get(key) || [];
dataPoints.push({ timestamp, seconds: travelTimeSeconds });
this.travelTimeData.set(key, dataPoints);
}
private async removeEtaIfExists(shuttleId: string, stopId: string) {
const index = this.etas.findIndex((e) => e.stopId === stopId && e.shuttleId === shuttleId);
if (index === -1) {
return null;
}
const removedEta = this.etas[index];
this.etas.splice(index, 1);
this.emit(ETARepositoryEvent.ETA_REMOVED, removedEta);
return removedEta;
}
}

View File

@@ -0,0 +1,22 @@
import { IEta } from "../../../entities/ShuttleRepositoryEntities";
import { BaseRedisETARepository } from "./BaseRedisETARepository";
import { ExternalSourceETARepository } from "./ExternalSourceETARepository";
import { ETARepositoryEvent } from "./ETAGetterRepository";
export class RedisExternalSourceETARepository extends BaseRedisETARepository implements ExternalSourceETARepository {
async addOrUpdateEtaFromExternalSource(eta: IEta): Promise<void> {
await this.addOrUpdateEta(eta);
}
async removeEtaIfExists(shuttleId: string, stopId: string): Promise<IEta | null> {
const existingEta = await this.getEtaForShuttleAndStopId(shuttleId, stopId);
if (existingEta === null) {
return null;
}
const key = this.createEtaKey(shuttleId, stopId);
await this.redisClient.del(key);
this.emit(ETARepositoryEvent.ETA_REMOVED, existingEta);
return existingEta;
}
}

View File

@@ -0,0 +1,289 @@
import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository";
import { BaseRedisETARepository } from "./BaseRedisETARepository";
import { createClient, RedisClientType } from "redis";
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments, WillArriveAtStopPayload } from "../ShuttleGetterRepository";
import { REDIS_RECONNECT_INTERVAL } from "../../../environment";
import { IEta, IOrderedStop, IShuttle } from "../../../entities/ShuttleRepositoryEntities";
import { ETARepositoryEvent } from "./ETAGetterRepository";
export class RedisSelfUpdatingETARepository extends BaseRedisETARepository implements SelfUpdatingETARepository {
constructor(
readonly shuttleRepository: ShuttleGetterRepository,
redisClient: RedisClientType = createClient({
url: process.env.REDIS_URL,
socket: {
tls: process.env.NODE_ENV === 'production',
rejectUnauthorized: false,
reconnectStrategy: REDIS_RECONNECT_INTERVAL,
},
}),
private referenceTime: Date | null = null,
) {
super(redisClient);
this.setReferenceTime = this.setReferenceTime.bind(this);
this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this);
this.startListeningForUpdates = this.startListeningForUpdates.bind(this);
this.handleShuttleWillArriveAtStop = this.handleShuttleWillArriveAtStop.bind(this);
this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this);
this.updateCascadingEta = this.updateCascadingEta.bind(this);
this.getAverageTravelTimeSecondsWithFallbacks = this.getAverageTravelTimeSecondsWithFallbacks.bind(this);
this.removeEtaIfExists = this.removeEtaIfExists.bind(this);
}
private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => {
return `shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`;
}
setReferenceTime(referenceTime: Date) {
this.referenceTime = referenceTime;
}
public async getAverageTravelTimeSeconds(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
{ from, to }: ShuttleTravelTimeDateFilterArguments
): Promise<number | undefined> {
const timeSeriesKey = this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId);
const fromTimestamp = from.getTime();
const toTimestamp = to.getTime();
const intervalMs = toTimestamp - fromTimestamp + 1;
try {
const aggregationResult = await this.redisClient.sendCommand([
'TS.RANGE',
timeSeriesKey,
fromTimestamp.toString(),
toTimestamp.toString(),
'AGGREGATION',
'AVG',
intervalMs.toString()
]) as [string, string][];
if (aggregationResult && aggregationResult.length > 0) {
const [, averageValue] = aggregationResult[0];
return parseFloat(averageValue);
}
return;
} catch (error) {
console.warn(`Failed to get average travel time for ${timeSeriesKey}: ${error instanceof Error ? error.message : String(error)}`);
return;
}
}
startListeningForUpdates() {
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop)
}
stopListeningForUpdates() {
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
this.shuttleRepository.removeListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop);
}
private async getAverageTravelTimeSecondsWithFallbacks(
identifier: ShuttleTravelTimeDataIdentifier,
dateFilters: ShuttleTravelTimeDateFilterArguments[]
): Promise<number | undefined> {
for (const dateFilter of dateFilters) {
const result = await this.getAverageTravelTimeSeconds(identifier, dateFilter);
if (result !== undefined) {
return result;
}
}
return undefined;
}
private async handleShuttleUpdate(shuttle: IShuttle) {
const lastStop = await this.shuttleRepository.getShuttleLastStopArrival(shuttle.id);
if (!lastStop) return;
const lastOrderedStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId);
await this.updateCascadingEta({
shuttle,
currentStop: lastOrderedStop,
originalStopArrival: lastStop,
});
}
private async updateCascadingEta({
shuttle,
currentStop,
originalStopArrival,
runningTravelTimeSeconds = 0
}: {
shuttle: IShuttle;
currentStop: IOrderedStop | null;
originalStopArrival: ShuttleStopArrival;
runningTravelTimeSeconds?: number;
}) {
if (!currentStop) return;
const nextStop = currentStop?.nextStop;
if (!nextStop) return;
// In case the system we have loops around
if (nextStop.stopId === originalStopArrival.stopId) return;
let referenceCurrentTime = new Date();
if (this.referenceTime != null) {
referenceCurrentTime = this.referenceTime;
}
const oneWeekAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 7 * 1000));
const oneDayAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 1000));
const twoHoursAgo = new Date(referenceCurrentTime.getTime() - (120 * 60 * 1000));
const travelTimeSeconds = await this.getAverageTravelTimeSecondsWithFallbacks({
routeId: shuttle.routeId,
fromStopId: currentStop.stopId,
toStopId: nextStop.stopId,
}, [
{
from: oneWeekAgo,
to: new Date(oneWeekAgo.getTime() + (60 * 60 * 1000))
},
{
from: oneDayAgo,
to: new Date(oneDayAgo.getTime() + (60 * 60 * 1000))
},
{
from: twoHoursAgo,
to: new Date(),
}
]);
if (travelTimeSeconds == undefined) return;
const elapsedTimeMs = referenceCurrentTime.getTime() - originalStopArrival.timestamp.getTime();
const predictedTimeSeconds = travelTimeSeconds - (elapsedTimeMs / 1000) + runningTravelTimeSeconds;
await this.addOrUpdateEta({
secondsRemaining: predictedTimeSeconds,
shuttleId: shuttle.id,
stopId: nextStop.stopId,
systemId: nextStop.systemId,
updatedTime: new Date(),
});
const nextStopWithNextNextStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, nextStop.stopId);
await this.updateCascadingEta(
{
shuttle,
currentStop: nextStopWithNextNextStop,
originalStopArrival,
runningTravelTimeSeconds: runningTravelTimeSeconds + travelTimeSeconds,
},
)
}
private async handleShuttleWillArriveAtStop({
lastArrival,
currentArrival,
}: WillArriveAtStopPayload) {
const etas = await this.getEtasForShuttleId(currentArrival.shuttleId);
for (const eta of etas) {
await this.removeEtaIfExists(eta.shuttleId, eta.stopId);
}
// only update time traveled if last arrival exists
if (lastArrival) {
// disallow cases where this gets triggered multiple times
if (lastArrival.stopId === currentArrival.stopId) return;
const shuttle = await this.shuttleRepository.getShuttleById(lastArrival.shuttleId);
if (!shuttle) return;
const routeId = shuttle.routeId;
const fromStopId = lastArrival.stopId;
const toStopId = currentArrival.stopId;
const travelTimeSeconds = (currentArrival.timestamp.getTime() - lastArrival.timestamp.getTime()) / 1000;
await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId, }, travelTimeSeconds, currentArrival.timestamp.getTime());
}
}
public async addTravelTimeDataPoint(
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
travelTimeSeconds: number,
timestamp = Date.now(),
): Promise<void> {
const historicalEtaTimeSeriesKey = this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId);
try {
await this.redisClient.sendCommand([
'TS.ADD',
historicalEtaTimeSeriesKey,
timestamp.toString(),
travelTimeSeconds.toString(),
'LABELS',
'routeId',
routeId,
'fromStopId',
fromStopId,
'toStopId',
toStopId
]);
} catch (error) {
await this.createHistoricalEtaTimeSeriesAndAddDataPoint(
historicalEtaTimeSeriesKey,
timestamp,
travelTimeSeconds,
routeId,
fromStopId,
toStopId
);
}
}
private async createHistoricalEtaTimeSeriesAndAddDataPoint(
timeSeriesKey: string,
timestamp: number,
travelTimeSeconds: number,
routeId: string,
fromStopId: string,
toStopId: string,
): Promise<void> {
try {
await this.redisClient.sendCommand([
'TS.CREATE',
timeSeriesKey,
'RETENTION',
'2678400000', // one month in milliseconds
'LABELS',
'routeId',
routeId,
'fromStopId',
fromStopId,
'toStopId',
toStopId
]);
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
timestamp.toString(),
travelTimeSeconds.toString()
]);
} catch (createError) {
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
timestamp.toString(),
travelTimeSeconds.toString()
]);
}
}
private async removeEtaIfExists(shuttleId: string, stopId: string): Promise<IEta | null> {
const existingEta = await this.getEtaForShuttleAndStopId(shuttleId, stopId);
if (existingEta === null) {
return null;
}
const key = this.createEtaKey(shuttleId, stopId);
await this.redisClient.del(key);
this.emit(ETARepositoryEvent.ETA_REMOVED, existingEta);
return existingEta;
}
}

View File

@@ -0,0 +1,28 @@
import { ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments } from "../ShuttleGetterRepository";
import { ETAGetterRepository } from "./ETAGetterRepository";
export interface SelfUpdatingETARepository extends ETAGetterRepository {
/**
* Attach a event listener to the shuttle repository, listening to
* shuttle updates
*/
startListeningForUpdates(): void;
/**
* Get average travel time between two stops based on historical data.
* Returns undefined if no data exists for the specified time range.
* @param identifier - The route and stop IDs to query
* @param dateFilter - The date range to filter data
*/
getAverageTravelTimeSeconds(
identifier: ShuttleTravelTimeDataIdentifier,
dateFilter: ShuttleTravelTimeDateFilterArguments
): Promise<number | undefined>;
/**
* Set the "current time" as the class knows it, in order to calculate
* ETAs based on past data.
* @param referenceTime
*/
setReferenceTime(referenceTime: Date): void;
}

View File

@@ -0,0 +1,171 @@
import { afterEach, beforeEach, describe, expect, test } from "@jest/globals";
import { createClient, RedisClientType } from "redis";
import { RepositoryHolder } from "../../../../../testHelpers/RepositoryHolder";
import { ExternalSourceETARepository } from "../ExternalSourceETARepository";
import { RedisExternalSourceETARepository } from "../RedisExternalSourceETARepository";
import { InMemoryExternalSourceETARepository } from "../InMemoryExternalSourceETARepository";
import { generateMockEtas } from "../../../../../testHelpers/mockDataGenerators";
class RedisExternalSourceETARepositoryHolder implements RepositoryHolder<ExternalSourceETARepository> {
repo: RedisExternalSourceETARepository | undefined;
redisClient: RedisClientType | undefined;
name = "RedisExternalSourceETARepository"
factory = async () => {
this.redisClient = createClient({
url: process.env.REDIS_URL,
});
await this.redisClient.connect();
this.repo = new RedisExternalSourceETARepository(this.redisClient);
return this.repo;
}
teardown = async () => {
if (this.redisClient) {
await this.redisClient.flushAll();
await this.redisClient.disconnect();
}
}
}
class InMemoryExternalSourceETARepositoryHolder implements RepositoryHolder<ExternalSourceETARepository> {
repo: InMemoryExternalSourceETARepository | undefined;
name = "InMemoryExternalSourceETARepository"
factory = async () => {
this.repo = new InMemoryExternalSourceETARepository();
return this.repo;
}
teardown = async () => {
// No teardown needed for in-memory
}
}
const repositoryImplementations = [
new RedisExternalSourceETARepositoryHolder(),
new InMemoryExternalSourceETARepositoryHolder()
];
describe.each(repositoryImplementations)('$name', (holder) => {
let repository: ExternalSourceETARepository;
beforeEach(async () => {
repository = await holder.factory();
});
afterEach(async () => {
await holder.teardown();
});
describe("addOrUpdateEtaFromExternalSource", () => {
test("adds a new ETA if nonexistent", async () => {
const mockEtas = generateMockEtas();
const newEta = mockEtas[0];
await repository.addOrUpdateEtaFromExternalSource(newEta);
const result = await repository.getEtasForShuttleId(newEta.shuttleId);
expect(result).toEqual([newEta]);
});
test("updates an existing ETA if it exists", async () => {
const mockEtas = generateMockEtas();
const existingEta = mockEtas[0];
const updatedEta = structuredClone(existingEta);
updatedEta.secondsRemaining = existingEta.secondsRemaining + 60;
await repository.addOrUpdateEtaFromExternalSource(existingEta);
await repository.addOrUpdateEtaFromExternalSource(updatedEta);
const result = await repository.getEtasForShuttleId(existingEta.shuttleId);
expect(result).toEqual([updatedEta]);
});
});
describe("getEtasForShuttleId", () => {
test("gets ETAs for a specific shuttle ID", async () => {
const mockEtas = generateMockEtas();
for (const eta of mockEtas) {
await repository.addOrUpdateEtaFromExternalSource(eta);
}
const result = await repository.getEtasForShuttleId("sh1");
const expected = mockEtas.filter((eta) => eta.shuttleId === "sh1");
expect(result).toHaveLength(expected.length);
expect(result).toEqual(expect.arrayContaining(expected));
});
test("returns an empty list if there are no ETAs for the shuttle ID", async () => {
const result = await repository.getEtasForShuttleId("nonexistent-shuttle");
expect(result).toEqual([]);
});
});
describe("getEtasForStopId", () => {
test("gets ETAs for a specific stop ID", async () => {
const mockEtas = generateMockEtas();
for (const eta of mockEtas) {
await repository.addOrUpdateEtaFromExternalSource(eta);
}
const result = await repository.getEtasForStopId("st1");
expect(result).toEqual(mockEtas.filter((eta) => eta.stopId === "st1"));
});
test("returns an empty list if there are no ETAs for the stop ID", async () => {
const result = await repository.getEtasForStopId("nonexistent-stop");
expect(result).toEqual([]);
});
});
describe("getEtaForShuttleAndStopId", () => {
test("gets a single ETA for a specific shuttle and stop ID", async () => {
const mockEtas = generateMockEtas();
const mockEta = mockEtas[0];
await repository.addOrUpdateEtaFromExternalSource(mockEta);
const result = await repository.getEtaForShuttleAndStopId("sh1", "st1");
expect(result).toEqual(mockEta);
});
test("returns null if no ETA matches the shuttle and stop ID", async () => {
const result = await repository.getEtaForShuttleAndStopId("nonexistent-shuttle", "nonexistent-stop");
expect(result).toBeNull();
});
});
describe("removeEtaIfExists", () => {
test("removes eta given shuttle ID and stop ID", async () => {
let mockEtas = generateMockEtas();
const stopId = mockEtas[0].stopId;
mockEtas = mockEtas.filter((eta) => eta.stopId === stopId);
await Promise.all(mockEtas.map(async (eta) => {
eta.stopId = stopId;
await repository.addOrUpdateEtaFromExternalSource(eta);
}));
const etaToRemove = mockEtas[0];
await repository.removeEtaIfExists(etaToRemove.shuttleId, etaToRemove.stopId);
const remainingEtas = await repository.getEtasForStopId(stopId);
expect(remainingEtas).toHaveLength(mockEtas.length - 1);
});
test("does nothing if eta doesn't exist", async () => {
let mockEtas = generateMockEtas();
const stopId = mockEtas[0].stopId;
mockEtas = mockEtas.filter((eta) => eta.stopId === stopId);
await Promise.all(mockEtas.map(async (eta) => {
eta.stopId = stopId;
await repository.addOrUpdateEtaFromExternalSource(eta);
}));
await repository.removeEtaIfExists("nonexistent-shuttle-id", "nonexistent-stop-id");
const remainingEtas = await repository.getEtasForStopId(stopId);
expect(remainingEtas).toHaveLength(mockEtas.length);
});
});
})

View File

@@ -0,0 +1,320 @@
import { afterEach, beforeEach, describe, expect, test } from "@jest/globals";
import { createClient, RedisClientType } from "redis";
import { RepositoryHolder } from "../../../../../testHelpers/RepositoryHolder";
import { SelfUpdatingETARepository } from "../SelfUpdatingETARepository";
import { RedisSelfUpdatingETARepository } from "../RedisSelfUpdatingETARepository";
import { InMemorySelfUpdatingETARepository } from "../InMemorySelfUpdatingETARepository";
import { RedisShuttleRepository } from "../../RedisShuttleRepository";
import { UnoptimizedInMemoryShuttleRepository } from "../../UnoptimizedInMemoryShuttleRepository";
import { setupRouteAndOrderedStopsForShuttleRepository } from "../../../../../testHelpers/setupRouteAndOrderedStopsForShuttleRepository";
import { ShuttleGetterSetterRepository } from "../../ShuttleGetterSetterRepository";
class RedisSelfUpdatingETARepositoryHolder implements RepositoryHolder<SelfUpdatingETARepository> {
repo: RedisSelfUpdatingETARepository | undefined;
shuttleRepo: RedisShuttleRepository | undefined;
redisClient: RedisClientType | undefined;
name = "RedisSelfUpdatingETARepository"
factory = async () => {
this.redisClient = createClient({
url: process.env.REDIS_URL,
});
await this.redisClient.connect();
await this.redisClient.flushAll();
this.shuttleRepo = new RedisShuttleRepository(this.redisClient);
this.repo = new RedisSelfUpdatingETARepository(
this.shuttleRepo,
this.redisClient,
);
return this.repo;
}
teardown = async () => {
if (this.redisClient) {
await this.redisClient.flushAll();
await this.redisClient.disconnect();
}
}
}
class InMemorySelfUpdatingETARepositoryHolder implements RepositoryHolder<SelfUpdatingETARepository> {
repo: InMemorySelfUpdatingETARepository | undefined;
shuttleRepo: UnoptimizedInMemoryShuttleRepository | undefined;
name = "InMemorySelfUpdatingETARepository"
factory = async () => {
this.shuttleRepo = new UnoptimizedInMemoryShuttleRepository();
this.repo = new InMemorySelfUpdatingETARepository(this.shuttleRepo);
return this.repo;
}
teardown = async () => {
// No teardown needed for in-memory
}
}
const repositoryImplementations = [
new RedisSelfUpdatingETARepositoryHolder(),
new InMemorySelfUpdatingETARepositoryHolder()
];
describe.each(repositoryImplementations)('$name', (holder) => {
let repository: SelfUpdatingETARepository;
let shuttleRepository: ShuttleGetterSetterRepository;
beforeEach(async () => {
repository = await holder.factory();
shuttleRepository = holder.shuttleRepo!;
});
afterEach(async () => {
await holder.teardown();
});
// Helper function for setting up routes and ordered stops
async function setupRouteAndOrderedStops() {
return await setupRouteAndOrderedStopsForShuttleRepository(shuttleRepository);
}
describe("handleShuttleWillArriveAtStop", () => {
test("updates how long the shuttle took to get from one stop to another", async () => {
const { route, systemId, stop2, stop1 } = await setupRouteAndOrderedStops();
repository.startListeningForUpdates();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
const firstStopArrivalTime = new Date(2025, 0, 1, 12, 0, 0);
await shuttleRepository.addOrUpdateShuttle(shuttle, firstStopArrivalTime.getTime());
shuttle.coordinates = stop2.coordinates;
const secondStopArrivalTime = new Date(2025, 0, 1, 12, 15, 0);
await shuttleRepository.addOrUpdateShuttle(shuttle, secondStopArrivalTime.getTime());
// Necessary to wait for the event emitter subscriber to execute
await new Promise((resolve) => setTimeout(resolve, 1000));
const travelTime = await repository.getAverageTravelTimeSeconds({
routeId: route.id,
fromStopId: stop1.id,
toStopId: stop2.id,
}, {
from: new Date(2025, 0, 1, 11, 0, 0),
to: new Date(2025, 0, 1, 13, 0, 0),
});
expect(travelTime).toEqual(15 * 60);
});
});
describe("handleShuttleUpdate", () => {
async function assertEtaIsValidGivenCurrentTimeAndSecondArrivalTime(
currentTime: Date,
shuttleSecondArrivalTimeAtFirstStop: Date
) {
const { route, systemId, stop1, stop2, stop3 } = await setupRouteAndOrderedStops();
// Populating travel time data
const firstStopArrivalTime = new Date(2025, 0, 1, 12, 0, 0);
const secondStopArrivalTime = new Date(2025, 0, 1, 12, 15, 0);
const thirdStopArrivalTime = new Date(2025, 0, 1, 12, 20, 0);
repository.setReferenceTime(currentTime);
repository.startListeningForUpdates();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
await shuttleRepository.addOrUpdateShuttle(shuttle, firstStopArrivalTime.getTime());
shuttle.coordinates = stop2.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, secondStopArrivalTime.getTime());
shuttle.coordinates = stop3.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, thirdStopArrivalTime.getTime());
// Populating ETA data
shuttle.coordinates = stop1.coordinates;
await shuttleRepository.addOrUpdateShuttle(
shuttle,
shuttleSecondArrivalTimeAtFirstStop.getTime()
);
shuttle.coordinates = { latitude: 12.5, longitude: 22.5 };
await shuttleRepository.addOrUpdateShuttle(
shuttle,
currentTime.getTime()
);
await new Promise((resolve) => setTimeout(resolve, 1000));
const secondStopEta = await repository.getEtaForShuttleAndStopId(shuttle.id, stop2.id);
expect(secondStopEta?.secondsRemaining).toEqual(8 * 60);
const thirdStopEta = await repository.getEtaForShuttleAndStopId(shuttle.id, stop3.id);
expect(thirdStopEta?.secondsRemaining).toEqual(13 * 60);
}
test("adds ETA entries for stops based on historical data", async () => {
const shuttleSecondArrivalTimeAtFirstStop = new Date(2025, 0, 8, 12, 0, 0);
const currentTime = new Date(shuttleSecondArrivalTimeAtFirstStop.getTime() + 7 * 60 * 1000);
await assertEtaIsValidGivenCurrentTimeAndSecondArrivalTime(
currentTime, shuttleSecondArrivalTimeAtFirstStop
);
}, 60000);
test("uses previous day fallback calculation when no data available from one week ago", async () => {
const shuttleSecondArrivalTimeAtFirstStop = new Date(2025, 0, 2, 12, 0, 0);
const currentTime = new Date(shuttleSecondArrivalTimeAtFirstStop.getTime() + 7 * 60 * 1000);
await assertEtaIsValidGivenCurrentTimeAndSecondArrivalTime(
currentTime, shuttleSecondArrivalTimeAtFirstStop
);
}, 60000);
test("uses previous hour fallback calculation when no data available from one day ago", async () => {
const shuttleSecondArrivalTimeAtFirstStop = new Date(2025, 0, 1, 13, 5, 0);
const currentTime = new Date(shuttleSecondArrivalTimeAtFirstStop.getTime() + 7 * 60 * 1000);
await assertEtaIsValidGivenCurrentTimeAndSecondArrivalTime(
currentTime, shuttleSecondArrivalTimeAtFirstStop
);
});
});
describe("getAverageTravelTimeSeconds", () => {
test("returns the average travel time when historical data exists", async () => {
const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops();
repository.startListeningForUpdates();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
const firstStopTime = new Date(2025, 0, 1, 12, 0, 0);
await shuttleRepository.addOrUpdateShuttle(shuttle, firstStopTime.getTime());
shuttle.coordinates = stop2.coordinates;
const secondStopTime = new Date(2025, 0, 1, 12, 15, 0);
await shuttleRepository.addOrUpdateShuttle(shuttle, secondStopTime.getTime());
await new Promise((resolve) => setTimeout(resolve, 1000));
const travelTime = await repository.getAverageTravelTimeSeconds({
routeId: route.id,
fromStopId: stop1.id,
toStopId: stop2.id,
}, {
from: new Date(2025, 0, 1, 11, 0, 0),
to: new Date(2025, 0, 1, 13, 0, 0),
});
expect(travelTime).toEqual(15 * 60);
});
test("returns average of multiple data points", async () => {
const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops();
repository.startListeningForUpdates();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
// First trip: 10 minutes travel time
await shuttleRepository.addOrUpdateShuttle(shuttle, new Date(2025, 0, 1, 12, 0, 0).getTime());
shuttle.coordinates = stop2.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, new Date(2025, 0, 1, 12, 10, 0).getTime());
// Second trip: 20 minutes travel time
shuttle.coordinates = stop1.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, new Date(2025, 0, 1, 12, 30, 0).getTime());
shuttle.coordinates = stop2.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, new Date(2025, 0, 1, 12, 50, 0).getTime());
const averageTravelTime = await repository.getAverageTravelTimeSeconds({
routeId: route.id,
fromStopId: stop1.id,
toStopId: stop2.id,
}, {
from: new Date(2025, 0, 1, 11, 0, 0),
to: new Date(2025, 0, 1, 14, 0, 0),
});
// Average of 10 minutes and 20 minutes = 15 minutes = 900 seconds
expect(averageTravelTime).toBeDefined();
});
test("returns undefined when no data exists", async () => {
const { route, stop1, stop2 } = await setupRouteAndOrderedStops();
repository.startListeningForUpdates();
const averageTravelTime = await repository.getAverageTravelTimeSeconds({
routeId: route.id,
fromStopId: stop1.id,
toStopId: stop2.id,
}, {
from: new Date(2025, 0, 1, 11, 0, 0),
to: new Date(2025, 0, 1, 14, 0, 0),
});
expect(averageTravelTime).toBeUndefined();
});
test("returns undefined when querying outside the time range of data", async () => {
const { route, systemId, stop1, stop2 } = await setupRouteAndOrderedStops();
repository.startListeningForUpdates();
const shuttle = {
id: "sh1",
name: "Shuttle 1",
routeId: route.id,
systemId: systemId,
coordinates: stop1.coordinates,
orientationInDegrees: 0,
updatedTime: new Date(),
};
await shuttleRepository.addOrUpdateShuttle(shuttle, new Date(2025, 0, 1, 12, 0, 0).getTime());
shuttle.coordinates = stop2.coordinates;
await shuttleRepository.addOrUpdateShuttle(shuttle, new Date(2025, 0, 1, 12, 15, 0).getTime());
const averageTravelTime = await repository.getAverageTravelTimeSeconds({
routeId: route.id,
fromStopId: stop1.id,
toStopId: stop2.id,
}, {
from: new Date(2025, 0, 2, 11, 0, 0),
to: new Date(2025, 0, 2, 13, 0, 0),
});
expect(averageTravelTime).toBeUndefined();
});
});
})

View File

@@ -1,10 +1,10 @@
import { Resolvers } from "../generated/graphql";
import { ServerContext } from "../ServerContext";
import { HistoricalParkingAverageQueryArguments } from "../repositories/parking/ParkingGetterRepository";
import { HistoricalParkingAverageFilterArguments } from "../repositories/parking/ParkingGetterRepository";
import { GraphQLError } from "graphql/error";
import {
PARKING_HISTORICAL_AVERAGE_MAXIMUM_TIMESPAN,
PARKING_HISTORICAL_AVERAGE_MINIMUM_INTERVAL
PARKING_HISTORICAL_AVERAGE_MAXIMUM_TIMESPAN,
PARKING_HISTORICAL_AVERAGE_MINIMUM_INTERVAL
} from "../environment";
export const ParkingStructureResolvers: Resolvers<ServerContext> = {
@@ -27,7 +27,7 @@ export const ParkingStructureResolvers: Resolvers<ServerContext> = {
throwBadUserInputError('No interval provided');
return null;
}
const queryArguments: HistoricalParkingAverageQueryArguments = {
const queryArguments: HistoricalParkingAverageFilterArguments = {
from: new Date(args.input.from),
intervalMs: args.input.intervalMs,
to: new Date(args.input.to),

View File

@@ -9,7 +9,7 @@ export const ShuttleResolvers: Resolvers<ServerContext> = {
const system = contextValue.findSystemById(parent.systemId);
if (!system) return null;
const etaForStopId = await system.shuttleRepository.getEtaForShuttleAndStopId(parent.id, args.forStopId);
const etaForStopId = await system.etaRepository.getEtaForShuttleAndStopId(parent.id, args.forStopId);
if (etaForStopId === null) return null;
return {
@@ -25,7 +25,7 @@ export const ShuttleResolvers: Resolvers<ServerContext> = {
const system = contextValue.findSystemById(parent.systemId);
if (!system) return null;
const etasForShuttle = await system.shuttleRepository.getEtasForShuttleId(parent.id);
const etasForShuttle = await system.etaRepository.getEtasForShuttleId(parent.id);
if (!etasForShuttle) return null;
const computedEtas = await Promise.all(

View File

@@ -16,7 +16,7 @@ export const StopResolvers: Resolvers<ServerContext> = {
if (!system) {
return [];
}
const etas = await system.shuttleRepository.getEtasForStopId(parent.id);
const etas = await system.etaRepository.getEtasForStopId(parent.id);
return etas.slice().sort((a, b) => a.secondsRemaining - b.secondsRemaining);
},
},

View File

@@ -6,7 +6,8 @@ import {
addMockShuttleToRepository,
addMockStopToRepository,
} from "../../../testHelpers/repositorySetupHelpers";
import assert = require("node:assert");
import { ExternalSourceETARepository } from "../../repositories/shuttle/eta/ExternalSourceETARepository";
import assert from "node:assert";
describe("EtaResolvers", () => {
const holder = setupTestServerHolder();
@@ -19,7 +20,7 @@ describe("EtaResolvers", () => {
beforeEach(async () => {
mockShuttle = await addMockShuttleToRepository(context.systems[0].shuttleRepository, context.systems[0].id);
mockStop = await addMockStopToRepository(context.systems[0].shuttleRepository, context.systems[0].id);
expectedEta = await addMockEtaToRepository(context.systems[0].shuttleRepository, mockStop.id, mockShuttle.id);
expectedEta = await addMockEtaToRepository(context.systems[0].etaRepository as ExternalSourceETARepository, mockStop.id, mockShuttle.id);
});
async function getResponseForEtaQuery(query: string) {

View File

@@ -3,8 +3,9 @@ import { generateMockEtas, generateMockRoutes } from "../../../testHelpers/mockD
import { IShuttle } from "../../entities/ShuttleRepositoryEntities";
import { setupTestServerContext, setupTestServerHolder } from "../../../testHelpers/apolloTestServerHelpers";
import { addMockShuttleToRepository } from "../../../testHelpers/repositorySetupHelpers";
import assert = require("node:assert");
import { InterchangeSystem } from "../../entities/InterchangeSystem";
import { ExternalSourceETARepository } from "../../repositories/shuttle/eta/ExternalSourceETARepository";
import assert from "node:assert";
describe("ShuttleResolvers", () => {
@@ -25,7 +26,7 @@ describe("ShuttleResolvers", () => {
const etas = generateMockEtas();
await Promise.all(etas.map(async (eta) => {
eta.shuttleId = shuttleId;
await context.systems[0].shuttleRepository.addOrUpdateEta(eta);
await (context.systems[0].etaRepository as ExternalSourceETARepository).addOrUpdateEtaFromExternalSource(eta);
}));
return etas;
}
@@ -146,9 +147,9 @@ describe("ShuttleResolvers", () => {
const e1 = { ...generateMockEtas()[0], shuttleId: mockShuttle.id, stopId: "stA", secondsRemaining: 300 };
const e2 = { ...generateMockEtas()[0], shuttleId: mockShuttle.id, stopId: "stB", secondsRemaining: 30 };
const e3 = { ...generateMockEtas()[0], shuttleId: mockShuttle.id, stopId: "stC", secondsRemaining: 120 };
await context.systems[0].shuttleRepository.addOrUpdateEta(e1);
await context.systems[0].shuttleRepository.addOrUpdateEta(e2);
await context.systems[0].shuttleRepository.addOrUpdateEta(e3);
await (context.systems[0].etaRepository as ExternalSourceETARepository).addOrUpdateEtaFromExternalSource(e1);
await (context.systems[0].etaRepository as ExternalSourceETARepository).addOrUpdateEtaFromExternalSource(e2);
await (context.systems[0].etaRepository as ExternalSourceETARepository).addOrUpdateEtaFromExternalSource(e3);
const response = await holder.testServer.executeOperation({
query,

View File

@@ -6,7 +6,8 @@ import {
import { generateMockEtas, generateMockOrderedStops } from "../../../testHelpers/mockDataGenerators";
import { IStop } from "../../entities/ShuttleRepositoryEntities";
import { addMockStopToRepository } from "../../../testHelpers/repositorySetupHelpers";
import assert = require("node:assert");
import { ExternalSourceETARepository } from "../../repositories/shuttle/eta/ExternalSourceETARepository";
import assert from "node:assert";
describe("StopResolvers", () => {
const holder = setupTestServerHolder();
@@ -106,7 +107,7 @@ describe("StopResolvers", () => {
mockEtas = mockEtas.filter((eta) => eta.stopId === mockEtas[0].stopId);
await Promise.all(mockEtas.map(async eta => {
eta.stopId = mockStop.id;
await context.systems[0].shuttleRepository.addOrUpdateEta(eta);
await (context.systems[0].etaRepository as ExternalSourceETARepository).addOrUpdateEtaFromExternalSource(eta);
}));
const response = await getResponseForQuery(query);
@@ -128,9 +129,9 @@ describe("StopResolvers", () => {
const e1 = { ...generateMockEtas()[0], stopId: mockStop.id, shuttleId: "shA", secondsRemaining: 240 };
const e2 = { ...generateMockEtas()[0], stopId: mockStop.id, shuttleId: "shB", secondsRemaining: 60 };
const e3 = { ...generateMockEtas()[0], stopId: mockStop.id, shuttleId: "shC", secondsRemaining: 120 };
await context.systems[0].shuttleRepository.addOrUpdateEta(e1);
await context.systems[0].shuttleRepository.addOrUpdateEta(e2);
await context.systems[0].shuttleRepository.addOrUpdateEta(e3);
await (context.systems[0].etaRepository as ExternalSourceETARepository).addOrUpdateEtaFromExternalSource(e1);
await (context.systems[0].etaRepository as ExternalSourceETARepository).addOrUpdateEtaFromExternalSource(e2);
await (context.systems[0].etaRepository as ExternalSourceETARepository).addOrUpdateEtaFromExternalSource(e3);
const response = await getResponseForQuery(query);

View File

@@ -0,0 +1,5 @@
export interface RepositoryHolder<T> {
name: string;
factory(): Promise<T>;
teardown(): Promise<void>;
}

View File

@@ -3,7 +3,7 @@ import { ApolloServer } from "@apollo/server";
import { MergedResolvers } from "../src/MergedResolvers";
import { beforeEach } from "@jest/globals";
import { ServerContext } from "../src/ServerContext";
import { InterchangeSystem } from "../src/entities/InterchangeSystem";
import { InterchangeSystem, InterchangeSystemBuilderArguments } from "../src/entities/InterchangeSystem";
import {
ChapmanApiBasedParkingRepositoryLoader
} from "../src/loaders/parking/ChapmanApiBasedParkingRepositoryLoader";
@@ -19,11 +19,12 @@ function setUpTestServer() {
});
}
const systemInfoForTesting = {
const systemInfoForTesting: InterchangeSystemBuilderArguments = {
id: "1",
name: "Chapman University",
passioSystemId: "263",
parkingSystemId: ChapmanApiBasedParkingRepositoryLoader.id,
useSelfUpdatingEtas: false,
};
export function buildSystemForTesting() {

View File

@@ -5,6 +5,7 @@ import {
generateMockStops,
} from "./mockDataGenerators";
import { ShuttleGetterSetterRepository } from "../src/repositories/shuttle/ShuttleGetterSetterRepository";
import { ExternalSourceETARepository } from "../src/repositories/shuttle/eta/ExternalSourceETARepository";
export async function addMockRouteToRepository(repository: ShuttleGetterSetterRepository, systemId: string) {
const mockRoutes = generateMockRoutes();
@@ -32,12 +33,12 @@ export async function addMockShuttleToRepository(repository: ShuttleGetterSetter
return mockShuttle;
}
export async function addMockEtaToRepository(repository: ShuttleGetterSetterRepository, stopId: string, shuttleId: string) {
export async function addMockEtaToRepository(repository: ExternalSourceETARepository, stopId: string, shuttleId: string) {
const etas = generateMockEtas();
const expectedEta = etas[0];
expectedEta.stopId = stopId;
expectedEta.shuttleId = shuttleId;
await repository.addOrUpdateEta(expectedEta);
await repository.addOrUpdateEtaFromExternalSource(expectedEta);
return expectedEta;
}

View File

@@ -0,0 +1,81 @@
import { IOrderedStop, IStop } from "../src/entities/ShuttleRepositoryEntities";
import { ShuttleGetterSetterRepository } from "../src/repositories/shuttle/ShuttleGetterSetterRepository";
export async function setupRouteAndOrderedStopsForShuttleRepository(
shuttleRepository: ShuttleGetterSetterRepository
) {
const systemId = "sys1";
const route = {
id: "r1",
name: "Route 1",
color: "red",
systemId: systemId,
polylineCoordinates: [],
updatedTime: new Date(),
};
await shuttleRepository.addOrUpdateRoute(route);
const stop1: IStop = {
id: "st1",
name: "Stop 1",
systemId,
coordinates: { latitude: 10.0, longitude: 20.0 },
updatedTime: new Date(),
};
const stop2: IStop = {
id: "st2",
name: "Stop 2",
systemId,
coordinates: { latitude: 15.0, longitude: 25.0 },
updatedTime: new Date(),
};
const stop3: IStop = {
id: "st3",
name: "Stop 3",
systemId,
coordinates: { latitude: 20.0, longitude: 30.0 },
updatedTime: new Date(),
}
await shuttleRepository.addOrUpdateStop(stop1);
await shuttleRepository.addOrUpdateStop(stop2);
await shuttleRepository.addOrUpdateStop(stop3);
const orderedStop1: IOrderedStop = {
routeId: route.id,
stopId: stop1.id,
position: 1,
systemId,
updatedTime: new Date(),
};
const orderedStop2: IOrderedStop = {
routeId: route.id,
stopId: stop2.id,
position: 2,
systemId,
updatedTime: new Date(),
};
const orderedStop3: IOrderedStop = {
routeId: route.id,
stopId: stop3.id,
position: 3,
systemId,
updatedTime: new Date(),
}
orderedStop1.nextStop = orderedStop2;
orderedStop1.previousStop = orderedStop3;
orderedStop2.nextStop = orderedStop3;
orderedStop2.previousStop = orderedStop1;
orderedStop3.nextStop = orderedStop1;
orderedStop3.previousStop = orderedStop2;
await shuttleRepository.addOrUpdateOrderedStop(orderedStop1);
await shuttleRepository.addOrUpdateOrderedStop(orderedStop2);
await shuttleRepository.addOrUpdateOrderedStop(orderedStop3);
return {
route,
systemId,
stop1,
stop2,
stop3,
};
}