mirror of
https://github.com/brendan-ch/project-inter-server.git
synced 2026-04-17 07:50:31 +00:00
Implement in-memory equivalents of both ETA repositories
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import { IEta } from "../../../entities/ShuttleRepositoryEntities";
|
||||
import { ETAGetterRepository, ETARepositoryEventListener, ETARepositoryEventName } from "./ETAGetterRepository";
|
||||
import { ETAGetterRepository, ETARepositoryEvent, ETARepositoryEventListener, ETARepositoryEventName } from "./ETAGetterRepository";
|
||||
import EventEmitter from "node:events";
|
||||
|
||||
export abstract class BaseInMemoryETARepository extends EventEmitter implements ETAGetterRepository {
|
||||
@@ -18,6 +18,17 @@ export abstract class BaseInMemoryETARepository extends EventEmitter implements
|
||||
return eta ?? null;
|
||||
}
|
||||
|
||||
// Protected setter for internal use
|
||||
protected async addOrUpdateEta(eta: IEta): Promise<void> {
|
||||
const index = this.etas.findIndex((e) => e.stopId === eta.stopId && e.shuttleId === eta.shuttleId);
|
||||
if (index !== -1) {
|
||||
this.etas[index] = eta;
|
||||
} else {
|
||||
this.etas.push(eta);
|
||||
}
|
||||
this.emit(ETARepositoryEvent.ETA_UPDATED, eta);
|
||||
}
|
||||
|
||||
// EventEmitter overrides for type safety
|
||||
override on<T extends ETARepositoryEventName>(
|
||||
event: T,
|
||||
|
||||
@@ -1,13 +1,22 @@
|
||||
import { IEta } from "../../../entities/ShuttleRepositoryEntities";
|
||||
import { ExternalSourceETARepository } from "./ExternalSourceETARepository";
|
||||
import { BaseInMemoryETARepository } from "./BaseInMemoryETARepository";
|
||||
import { ETARepositoryEvent } from "./ETAGetterRepository";
|
||||
|
||||
export class InMemoryExternalSourceETARepository extends BaseInMemoryETARepository implements ExternalSourceETARepository {
|
||||
addOrUpdateEtaFromExternalSource(eta: IEta): Promise<void> {
|
||||
throw new Error("Method not implemented.");
|
||||
async addOrUpdateEtaFromExternalSource(eta: IEta): Promise<void> {
|
||||
await this.addOrUpdateEta(eta);
|
||||
}
|
||||
|
||||
removeEtaIfExists(shuttleId: string, stopId: string): Promise<IEta | null> {
|
||||
throw new Error("Method not implemented.");
|
||||
async removeEtaIfExists(shuttleId: string, stopId: string): Promise<IEta | null> {
|
||||
const index = this.etas.findIndex((e) => e.stopId === stopId && e.shuttleId === shuttleId);
|
||||
if (index === -1) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const removedEta = this.etas[index];
|
||||
this.etas.splice(index, 1);
|
||||
this.emit(ETARepositoryEvent.ETA_REMOVED, removedEta);
|
||||
return removedEta;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,22 +1,118 @@
|
||||
import { SelfUpdatingETARepository } from "./SelfUpdatingETARepository";
|
||||
import { ShuttleGetterRepository, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments } from "../ShuttleGetterRepository";
|
||||
import { ShuttleGetterRepository, ShuttleRepositoryEvent, ShuttleStopArrival, ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDateFilterArguments } from "../ShuttleGetterRepository";
|
||||
import { BaseInMemoryETARepository } from "./BaseInMemoryETARepository";
|
||||
import { IShuttle } from "../../../entities/ShuttleRepositoryEntities";
|
||||
|
||||
export class InMemorySelfUpdatingETARepository extends BaseInMemoryETARepository implements SelfUpdatingETARepository {
|
||||
private referenceTime: Date | null = null;
|
||||
private travelTimeData: Map<string, Array<{ timestamp: number; seconds: number }>> = new Map();
|
||||
|
||||
constructor(
|
||||
readonly shuttleRepository: ShuttleGetterRepository
|
||||
) {
|
||||
super();
|
||||
}
|
||||
setReferenceTime(referenceTime: Date): void {
|
||||
throw new Error("Method not implemented.");
|
||||
|
||||
this.setReferenceTime = this.setReferenceTime.bind(this);
|
||||
this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this);
|
||||
this.startListeningForUpdates = this.startListeningForUpdates.bind(this);
|
||||
this.handleShuttleUpdate = this.handleShuttleUpdate.bind(this);
|
||||
this.handleShuttleWillArriveAtStop = this.handleShuttleWillArriveAtStop.bind(this);
|
||||
}
|
||||
|
||||
getAverageTravelTimeSeconds(identifier: ShuttleTravelTimeDataIdentifier, dateFilter: ShuttleTravelTimeDateFilterArguments): Promise<number | undefined> {
|
||||
throw new Error("Method not implemented.");
|
||||
setReferenceTime(referenceTime: Date): void {
|
||||
this.referenceTime = referenceTime;
|
||||
}
|
||||
|
||||
async getAverageTravelTimeSeconds(
|
||||
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
|
||||
{ from, to }: ShuttleTravelTimeDateFilterArguments
|
||||
): Promise<number | undefined> {
|
||||
const key = `${routeId}:${fromStopId}:${toStopId}`;
|
||||
const dataPoints = this.travelTimeData.get(key);
|
||||
|
||||
if (!dataPoints || dataPoints.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const fromTimestamp = from.getTime();
|
||||
const toTimestamp = to.getTime();
|
||||
|
||||
const filteredPoints = dataPoints.filter(
|
||||
(point) => point.timestamp >= fromTimestamp && point.timestamp <= toTimestamp
|
||||
);
|
||||
|
||||
if (filteredPoints.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const sum = filteredPoints.reduce((acc, point) => acc + point.seconds, 0);
|
||||
return sum / filteredPoints.length;
|
||||
}
|
||||
|
||||
startListeningForUpdates(): void {
|
||||
throw new Error("Method not implemented.");
|
||||
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_UPDATED, this.handleShuttleUpdate);
|
||||
this.shuttleRepository.addListener(ShuttleRepositoryEvent.SHUTTLE_WILL_ARRIVE_AT_STOP, this.handleShuttleWillArriveAtStop);
|
||||
}
|
||||
|
||||
private async handleShuttleUpdate(shuttle: IShuttle): Promise<void> {
|
||||
const lastStop = await this.shuttleRepository.getShuttleLastStopArrival(shuttle.id);
|
||||
if (!lastStop) return;
|
||||
|
||||
const lastOrderedStop = await this.shuttleRepository.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStop.stopId);
|
||||
const nextStop = lastOrderedStop?.nextStop;
|
||||
if (!nextStop) return;
|
||||
|
||||
let referenceCurrentTime = new Date();
|
||||
if (this.referenceTime != null) {
|
||||
referenceCurrentTime = this.referenceTime;
|
||||
}
|
||||
const oneWeekAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 7 * 1000));
|
||||
|
||||
const travelTimeSeconds = await this.getAverageTravelTimeSeconds({
|
||||
routeId: shuttle.routeId,
|
||||
fromStopId: lastStop.stopId,
|
||||
toStopId: nextStop.stopId,
|
||||
}, {
|
||||
from: oneWeekAgo,
|
||||
to: new Date(oneWeekAgo.getTime() + (60 * 60 * 1000))
|
||||
});
|
||||
if (travelTimeSeconds == undefined) return;
|
||||
|
||||
const elapsedTimeMs = referenceCurrentTime.getTime() - lastStop.timestamp.getTime();
|
||||
const predictedTimeSeconds = travelTimeSeconds - (elapsedTimeMs / 1000);
|
||||
|
||||
await this.addOrUpdateEta({
|
||||
secondsRemaining: predictedTimeSeconds,
|
||||
shuttleId: shuttle.id,
|
||||
stopId: nextStop.stopId,
|
||||
systemId: nextStop.systemId,
|
||||
updatedTime: new Date(),
|
||||
});
|
||||
}
|
||||
|
||||
private async handleShuttleWillArriveAtStop(shuttleArrival: ShuttleStopArrival): Promise<void> {
|
||||
const lastStopTimestamp = await this.shuttleRepository.getShuttleLastStopArrival(shuttleArrival.shuttleId);
|
||||
if (lastStopTimestamp) {
|
||||
const shuttle = await this.shuttleRepository.getShuttleById(lastStopTimestamp.shuttleId);
|
||||
if (!shuttle) return;
|
||||
|
||||
const routeId = shuttle.routeId;
|
||||
const fromStopId = lastStopTimestamp.stopId;
|
||||
const toStopId = shuttleArrival.stopId;
|
||||
|
||||
const travelTimeSeconds = (shuttleArrival.timestamp.getTime() - lastStopTimestamp.timestamp.getTime()) / 1000;
|
||||
await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId }, travelTimeSeconds, shuttleArrival.timestamp.getTime());
|
||||
}
|
||||
}
|
||||
|
||||
private async addTravelTimeDataPoint(
|
||||
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
|
||||
travelTimeSeconds: number,
|
||||
timestamp = Date.now(),
|
||||
): Promise<void> {
|
||||
const key = `${routeId}:${fromStopId}:${toStopId}`;
|
||||
const dataPoints = this.travelTimeData.get(key) || [];
|
||||
dataPoints.push({ timestamp, seconds: travelTimeSeconds });
|
||||
this.travelTimeData.set(key, dataPoints);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ import { createClient, RedisClientType } from "redis";
|
||||
import { RepositoryHolder } from "../../../../../testHelpers/RepositoryHolder";
|
||||
import { ExternalSourceETARepository } from "../ExternalSourceETARepository";
|
||||
import { RedisExternalSourceETARepository } from "../RedisExternalSourceETARepository";
|
||||
import { InMemoryExternalSourceETARepository } from "../InMemoryExternalSourceETARepository";
|
||||
import { generateMockEtas } from "../../../../../testHelpers/mockDataGenerators";
|
||||
|
||||
class RedisExternalSourceETARepositoryHolder implements RepositoryHolder<ExternalSourceETARepository> {
|
||||
@@ -26,8 +27,22 @@ class RedisExternalSourceETARepositoryHolder implements RepositoryHolder<Externa
|
||||
}
|
||||
}
|
||||
|
||||
class InMemoryExternalSourceETARepositoryHolder implements RepositoryHolder<ExternalSourceETARepository> {
|
||||
repo: InMemoryExternalSourceETARepository | undefined;
|
||||
|
||||
name = "InMemoryExternalSourceETARepository"
|
||||
factory = async () => {
|
||||
this.repo = new InMemoryExternalSourceETARepository();
|
||||
return this.repo;
|
||||
}
|
||||
teardown = async () => {
|
||||
// No teardown needed for in-memory
|
||||
}
|
||||
}
|
||||
|
||||
const repositoryImplementations = [
|
||||
new RedisExternalSourceETARepositoryHolder()
|
||||
new RedisExternalSourceETARepositoryHolder(),
|
||||
new InMemoryExternalSourceETARepositoryHolder()
|
||||
];
|
||||
|
||||
describe.each(repositoryImplementations)('$name', (holder) => {
|
||||
|
||||
@@ -3,8 +3,11 @@ import { createClient, RedisClientType } from "redis";
|
||||
import { RepositoryHolder } from "../../../../../testHelpers/RepositoryHolder";
|
||||
import { SelfUpdatingETARepository } from "../SelfUpdatingETARepository";
|
||||
import { RedisSelfUpdatingETARepository } from "../RedisSelfUpdatingETARepository";
|
||||
import { InMemorySelfUpdatingETARepository } from "../InMemorySelfUpdatingETARepository";
|
||||
import { RedisShuttleRepository } from "../../RedisShuttleRepository";
|
||||
import { UnoptimizedInMemoryShuttleRepository } from "../../UnoptimizedInMemoryShuttleRepository";
|
||||
import { setupRouteAndOrderedStopsForShuttleRepository } from "../../../../../testHelpers/setupRouteAndOrderedStopsForShuttleRepository";
|
||||
import { ShuttleGetterSetterRepository } from "../../ShuttleGetterSetterRepository";
|
||||
|
||||
class RedisSelfUpdatingETARepositoryHolder implements RepositoryHolder<SelfUpdatingETARepository> {
|
||||
repo: RedisSelfUpdatingETARepository | undefined;
|
||||
@@ -32,13 +35,29 @@ class RedisSelfUpdatingETARepositoryHolder implements RepositoryHolder<SelfUpdat
|
||||
}
|
||||
}
|
||||
|
||||
class InMemorySelfUpdatingETARepositoryHolder implements RepositoryHolder<SelfUpdatingETARepository> {
|
||||
repo: InMemorySelfUpdatingETARepository | undefined;
|
||||
shuttleRepo: UnoptimizedInMemoryShuttleRepository | undefined;
|
||||
|
||||
name = "InMemorySelfUpdatingETARepository"
|
||||
factory = async () => {
|
||||
this.shuttleRepo = new UnoptimizedInMemoryShuttleRepository();
|
||||
this.repo = new InMemorySelfUpdatingETARepository(this.shuttleRepo);
|
||||
return this.repo;
|
||||
}
|
||||
teardown = async () => {
|
||||
// No teardown needed for in-memory
|
||||
}
|
||||
}
|
||||
|
||||
const repositoryImplementations = [
|
||||
new RedisSelfUpdatingETARepositoryHolder()
|
||||
new RedisSelfUpdatingETARepositoryHolder(),
|
||||
new InMemorySelfUpdatingETARepositoryHolder()
|
||||
];
|
||||
|
||||
describe.each(repositoryImplementations)('$name', (holder) => {
|
||||
let repository: SelfUpdatingETARepository;
|
||||
let shuttleRepository: RedisShuttleRepository;
|
||||
let shuttleRepository: ShuttleGetterSetterRepository;
|
||||
|
||||
beforeEach(async () => {
|
||||
repository = await holder.factory();
|
||||
|
||||
Reference in New Issue
Block a user