mirror of
https://github.com/brendan-ch/project-inter-server.git
synced 2026-04-16 23:40:32 +00:00
Update shuttle repository implementations to remove ETA functionality
This commit is contained in:
@@ -1,13 +1,12 @@
|
||||
import { ShuttleGetterSetterRepository } from "./ShuttleGetterSetterRepository";
|
||||
import { IEta, IOrderedStop, IRoute, IShuttle, IStop, shuttleHasArrivedAtStop } from "../../entities/ShuttleRepositoryEntities";
|
||||
import {
|
||||
ShuttleRepositoryEvent,
|
||||
ShuttleRepositoryEventListener,
|
||||
ShuttleRepositoryEventName,
|
||||
ShuttleRepositoryEventPayloads,
|
||||
ShuttleStopArrival,
|
||||
ShuttleTravelTimeDataIdentifier,
|
||||
ShuttleTravelTimeDateFilterArguments,
|
||||
ShuttleTravelTimeDateFilterArguments
|
||||
} from "./ShuttleGetterRepository";
|
||||
import { BaseRedisRepository } from "../BaseRedisRepository";
|
||||
|
||||
@@ -147,14 +146,6 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
|
||||
updatedTime: new Date(data.updatedTime),
|
||||
});
|
||||
|
||||
private createRedisHashFromEta = (eta: IEta): Record<string, string> => ({
|
||||
secondsRemaining: eta.secondsRemaining.toString(),
|
||||
shuttleId: eta.shuttleId,
|
||||
stopId: eta.stopId,
|
||||
systemId: eta.systemId,
|
||||
updatedTime: eta.updatedTime.toISOString(),
|
||||
});
|
||||
|
||||
private createEtaFromRedisData = (data: Record<string, string>): IEta => ({
|
||||
secondsRemaining: parseFloat(data.secondsRemaining),
|
||||
shuttleId: data.shuttleId,
|
||||
@@ -388,70 +379,25 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
|
||||
public async addOrUpdateShuttle(
|
||||
shuttle: IShuttle,
|
||||
travelTimeTimestamp = Date.now(),
|
||||
referenceCurrentTime = new Date(),
|
||||
): Promise<void> {
|
||||
const key = this.createShuttleKey(shuttle.id);
|
||||
await this.redisClient.hSet(key, this.createRedisHashFromShuttle(shuttle));
|
||||
|
||||
await this.updateLastStopArrivalAndTravelTimeDataPoints(shuttle, travelTimeTimestamp);
|
||||
await this.updateEtasBasedOnHistoricalData(shuttle, referenceCurrentTime);
|
||||
await this.updateLastStopArrival(shuttle, travelTimeTimestamp);
|
||||
}
|
||||
|
||||
private async updateEtasBasedOnHistoricalData(
|
||||
shuttle: IShuttle,
|
||||
referenceCurrentTime: Date = new Date(),
|
||||
) {
|
||||
const oneWeekAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 7 * 1000))
|
||||
|
||||
const lastStopArrival = await this.getShuttleLastStopArrival(shuttle.id)
|
||||
if (lastStopArrival == undefined) return;
|
||||
|
||||
const lastOrderedStop = await this.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStopArrival.stopId);
|
||||
const nextStop = lastOrderedStop?.nextStop;
|
||||
if (nextStop == null) return;
|
||||
|
||||
const travelTimeSeconds = await this.getAverageTravelTimeSeconds({
|
||||
routeId: shuttle.routeId,
|
||||
fromStopId: lastStopArrival.stopId,
|
||||
toStopId: nextStop.stopId,
|
||||
}, {
|
||||
from: oneWeekAgo,
|
||||
to: new Date(oneWeekAgo.getTime() + (60 * 60 * 1000))
|
||||
});
|
||||
if (travelTimeSeconds == undefined) return;
|
||||
|
||||
const elapsedTimeMs = referenceCurrentTime.getTime() - lastStopArrival.timestamp.getTime();
|
||||
const predictedTimeSeconds = travelTimeSeconds - (elapsedTimeMs / 1000);
|
||||
|
||||
await this.addOrUpdateEta({
|
||||
secondsRemaining: predictedTimeSeconds,
|
||||
shuttleId: shuttle.id,
|
||||
stopId: nextStop.stopId,
|
||||
systemId: nextStop.systemId,
|
||||
updatedTime: new Date(),
|
||||
});
|
||||
}
|
||||
|
||||
private async updateLastStopArrivalAndTravelTimeDataPoints(
|
||||
private async updateLastStopArrival(
|
||||
shuttle: IShuttle,
|
||||
travelTimeTimestamp = Date.now(),
|
||||
) {
|
||||
const arrivedStop = await this.getArrivedStopIfExists(shuttle);
|
||||
|
||||
if (arrivedStop != undefined) {
|
||||
const lastStopTimestamp = await this.getShuttleLastStopArrival(shuttle.id)
|
||||
if (lastStopTimestamp != undefined) {
|
||||
const routeId = shuttle.routeId;
|
||||
const fromStopId = lastStopTimestamp.stopId;
|
||||
const toStopId = arrivedStop.id;
|
||||
|
||||
const travelTimeSeconds = (travelTimeTimestamp - lastStopTimestamp.timestamp.getTime()) / 1000;
|
||||
await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId, }, travelTimeSeconds, travelTimeTimestamp);
|
||||
}
|
||||
|
||||
await this.updateShuttleLastStopArrival(shuttle.id, {
|
||||
// TODO: Implement the event
|
||||
await this.updateShuttleLastStopArrival({
|
||||
stopId: arrivedStop.id,
|
||||
timestamp: new Date(travelTimeTimestamp),
|
||||
shuttleId: shuttle.id,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -488,78 +434,6 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
|
||||
}
|
||||
}
|
||||
|
||||
private async addTravelTimeDataPoint(
|
||||
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
|
||||
travelTimeSeconds: number,
|
||||
timestamp = Date.now(),
|
||||
): Promise<void> {
|
||||
const historicalEtaTimeSeriesKey = this.createHistoricalEtaTimeSeriesKey(routeId, fromStopId, toStopId);
|
||||
|
||||
try {
|
||||
await this.redisClient.sendCommand([
|
||||
'TS.ADD',
|
||||
historicalEtaTimeSeriesKey,
|
||||
timestamp.toString(),
|
||||
travelTimeSeconds.toString(),
|
||||
'LABELS',
|
||||
'routeId',
|
||||
routeId,
|
||||
'fromStopId',
|
||||
fromStopId,
|
||||
'toStopId',
|
||||
toStopId
|
||||
]);
|
||||
} catch (error) {
|
||||
await this.createHistoricalEtaTimeSeriesAndAddDataPoint(
|
||||
historicalEtaTimeSeriesKey,
|
||||
timestamp,
|
||||
travelTimeSeconds,
|
||||
routeId,
|
||||
fromStopId,
|
||||
toStopId
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private async createHistoricalEtaTimeSeriesAndAddDataPoint(
|
||||
timeSeriesKey: string,
|
||||
timestamp: number,
|
||||
travelTimeSeconds: number,
|
||||
routeId: string,
|
||||
fromStopId: string,
|
||||
toStopId: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
await this.redisClient.sendCommand([
|
||||
'TS.CREATE',
|
||||
timeSeriesKey,
|
||||
'RETENTION',
|
||||
'2678400000', // one month in milliseconds
|
||||
'LABELS',
|
||||
'routeId',
|
||||
routeId,
|
||||
'fromStopId',
|
||||
fromStopId,
|
||||
'toStopId',
|
||||
toStopId
|
||||
]);
|
||||
await this.redisClient.sendCommand([
|
||||
'TS.ADD',
|
||||
timeSeriesKey,
|
||||
timestamp.toString(),
|
||||
travelTimeSeconds.toString()
|
||||
]);
|
||||
} catch (createError) {
|
||||
await this.redisClient.sendCommand([
|
||||
'TS.ADD',
|
||||
timeSeriesKey,
|
||||
timestamp.toString(),
|
||||
travelTimeSeconds.toString()
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
public async getArrivedStopIfExists(
|
||||
shuttle: IShuttle,
|
||||
delta = 0.001,
|
||||
@@ -584,13 +458,14 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
|
||||
}
|
||||
|
||||
return {
|
||||
shuttleId,
|
||||
stopId: data.stopId,
|
||||
timestamp: new Date(data.timestamp),
|
||||
};
|
||||
}
|
||||
|
||||
private async updateShuttleLastStopArrival(shuttleId: string, lastStopArrival: ShuttleStopArrival) {
|
||||
const key = this.createShuttleLastStopKey(shuttleId);
|
||||
private async updateShuttleLastStopArrival(lastStopArrival: ShuttleStopArrival) {
|
||||
const key = this.createShuttleLastStopKey(lastStopArrival.shuttleId);
|
||||
await this.redisClient.hSet(key, {
|
||||
stopId: lastStopArrival.stopId,
|
||||
timestamp: lastStopArrival.timestamp.toISOString(),
|
||||
@@ -607,16 +482,6 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
|
||||
await this.redisClient.hSet(key, this.createRedisHashFromOrderedStop(orderedStop));
|
||||
}
|
||||
|
||||
private async addOrUpdateEta(eta: IEta): Promise<void> {
|
||||
const key = this.createEtaKey(eta.shuttleId, eta.stopId);
|
||||
await this.redisClient.hSet(key, this.createRedisHashFromEta(eta));
|
||||
this.emit(ShuttleRepositoryEvent.ETA_UPDATED, eta);
|
||||
}
|
||||
|
||||
public async addOrUpdateEtaFromExternalSource(eta: IEta): Promise<void> {
|
||||
await this.addOrUpdateEta(eta);
|
||||
}
|
||||
|
||||
// Remove methods
|
||||
public async removeRouteIfExists(routeId: string): Promise<IRoute | null> {
|
||||
const route = await this.getRouteById(routeId);
|
||||
@@ -658,16 +523,6 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
|
||||
return null;
|
||||
}
|
||||
|
||||
public async removeEtaIfExists(shuttleId: string, stopId: string): Promise<IEta | null> {
|
||||
const eta = await this.getEtaForShuttleAndStopId(shuttleId, stopId);
|
||||
if (eta) {
|
||||
const key = this.createEtaKey(shuttleId, stopId);
|
||||
await this.redisClient.del(key);
|
||||
this.emit(ShuttleRepositoryEvent.ETA_REMOVED, eta);
|
||||
return eta;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// Clear methods
|
||||
public async clearShuttleData(): Promise<void> {
|
||||
@@ -677,15 +532,6 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
|
||||
}
|
||||
}
|
||||
|
||||
public async clearEtaData(): Promise<void> {
|
||||
const removedEtas = await this.getAllEtas();
|
||||
const keys = await this.redisClient.keys('shuttle:eta:*');
|
||||
if (keys.length > 0) {
|
||||
await this.redisClient.del(keys);
|
||||
}
|
||||
this.emit(ShuttleRepositoryEvent.ETA_DATA_CLEARED, removedEtas);
|
||||
}
|
||||
|
||||
public async clearOrderedStopData(): Promise<void> {
|
||||
const keys = await this.redisClient.keys('shuttle:orderedstop:*');
|
||||
if (keys.length > 0) {
|
||||
@@ -706,19 +552,4 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
|
||||
await this.redisClient.del(keys);
|
||||
}
|
||||
}
|
||||
|
||||
// Helper method to get all ETAs for the clearEtaData event
|
||||
private async getAllEtas(): Promise<IEta[]> {
|
||||
const keys = await this.redisClient.keys('shuttle:eta:*');
|
||||
const etas: IEta[] = [];
|
||||
|
||||
for (const key of keys) {
|
||||
const data = await this.redisClient.hGetAll(key);
|
||||
if (Object.keys(data).length > 0) {
|
||||
etas.push(this.createEtaFromRedisData(data));
|
||||
}
|
||||
}
|
||||
|
||||
return etas;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
import EventEmitter from "node:events";
|
||||
import { ShuttleGetterSetterRepository } from "./ShuttleGetterSetterRepository";
|
||||
import { IEta, IOrderedStop, IRoute, IShuttle, IStop, shuttleHasArrivedAtStop } from "../../entities/ShuttleRepositoryEntities";
|
||||
import { IOrderedStop, IRoute, IShuttle, IStop, shuttleHasArrivedAtStop } from "../../entities/ShuttleRepositoryEntities";
|
||||
import { IEntityWithId } from "../../entities/SharedEntities";
|
||||
import {
|
||||
ShuttleRepositoryEvent,
|
||||
ShuttleRepositoryEventListener,
|
||||
ShuttleRepositoryEventName,
|
||||
ShuttleRepositoryEventPayloads,
|
||||
@@ -71,7 +70,6 @@ export class UnoptimizedInMemoryShuttleRepository
|
||||
private stops: IStop[] = [];
|
||||
private routes: IRoute[] = [];
|
||||
private shuttles: IShuttle[] = [];
|
||||
private etas: IEta[] = [];
|
||||
private orderedStops: IOrderedStop[] = [];
|
||||
private shuttleLastStopArrivals: Map<string, ShuttleStopArrival> = new Map();
|
||||
private travelTimeData: Map<string, Array<{ timestamp: number; seconds: number }>> = new Map();
|
||||
@@ -104,18 +102,6 @@ export class UnoptimizedInMemoryShuttleRepository
|
||||
return this.findEntityById(shuttleId, this.shuttles);
|
||||
}
|
||||
|
||||
public async getEtasForShuttleId(shuttleId: string): Promise<IEta[]> {
|
||||
return this.etas.filter(eta => eta.shuttleId === shuttleId);
|
||||
}
|
||||
|
||||
public async getEtasForStopId(stopId: string) {
|
||||
return this.etas.filter(eta => eta.stopId === stopId);
|
||||
}
|
||||
|
||||
public async getEtaForShuttleAndStopId(shuttleId: string, stopId: string) {
|
||||
return this.findEntityByMatcher<IEta>((value) => value.stopId === stopId && value.shuttleId === shuttleId, this.etas);
|
||||
}
|
||||
|
||||
public async getOrderedStopByRouteAndStopId(routeId: string, stopId: string) {
|
||||
return this.findEntityByMatcher<IOrderedStop>((value) => value.routeId === routeId && value.stopId === stopId, this.orderedStops)
|
||||
}
|
||||
@@ -161,8 +147,7 @@ export class UnoptimizedInMemoryShuttleRepository
|
||||
this.shuttles.push(shuttle);
|
||||
}
|
||||
|
||||
await this.updateLastStopArrivalAndTravelTimeDataPoints(shuttle, travelTimeTimestamp);
|
||||
await this.updateEtasBasedOnHistoricalData(shuttle, referenceCurrentTime);
|
||||
await this.updateLastStopArrival(shuttle, travelTimeTimestamp);
|
||||
}
|
||||
|
||||
public async addOrUpdateStop(stop: IStop): Promise<void> {
|
||||
@@ -183,89 +168,21 @@ export class UnoptimizedInMemoryShuttleRepository
|
||||
}
|
||||
}
|
||||
|
||||
private async addOrUpdateEta(eta: IEta): Promise<void> {
|
||||
const index = this.etas.findIndex((e) => e.stopId === eta.stopId && e.shuttleId === eta.shuttleId);
|
||||
if (index !== -1) {
|
||||
this.etas[index] = eta;
|
||||
} else {
|
||||
this.etas.push(eta);
|
||||
}
|
||||
this.emit(ShuttleRepositoryEvent.ETA_UPDATED, eta);
|
||||
}
|
||||
|
||||
public async addOrUpdateEtaFromExternalSource(eta: IEta): Promise<void> {
|
||||
await this.addOrUpdateEta(eta);
|
||||
}
|
||||
|
||||
private async updateEtasBasedOnHistoricalData(
|
||||
shuttle: IShuttle,
|
||||
referenceCurrentTime: Date = new Date(),
|
||||
) {
|
||||
const oneWeekAgo = new Date(referenceCurrentTime.getTime() - (60 * 60 * 24 * 7 * 1000));
|
||||
|
||||
const lastStopArrival = await this.getShuttleLastStopArrival(shuttle.id);
|
||||
if (lastStopArrival == undefined) return;
|
||||
|
||||
const lastOrderedStop = await this.getOrderedStopByRouteAndStopId(shuttle.routeId, lastStopArrival.stopId);
|
||||
const nextStop = lastOrderedStop?.nextStop;
|
||||
if (nextStop == null) return;
|
||||
|
||||
const travelTimeSeconds = await this.getAverageTravelTimeSeconds({
|
||||
routeId: shuttle.routeId,
|
||||
fromStopId: lastStopArrival.stopId,
|
||||
toStopId: nextStop.stopId,
|
||||
}, {
|
||||
from: oneWeekAgo,
|
||||
to: new Date(oneWeekAgo.getTime() + (60 * 60 * 1000))
|
||||
});
|
||||
if (travelTimeSeconds == undefined) return;
|
||||
|
||||
const elapsedTimeMs = referenceCurrentTime.getTime() - lastStopArrival.timestamp.getTime();
|
||||
const predictedTimeSeconds = travelTimeSeconds - (elapsedTimeMs / 1000);
|
||||
|
||||
await this.addOrUpdateEta({
|
||||
secondsRemaining: predictedTimeSeconds,
|
||||
shuttleId: shuttle.id,
|
||||
stopId: nextStop.stopId,
|
||||
systemId: nextStop.systemId,
|
||||
updatedTime: new Date(),
|
||||
});
|
||||
}
|
||||
|
||||
private async updateLastStopArrivalAndTravelTimeDataPoints(
|
||||
private async updateLastStopArrival(
|
||||
shuttle: IShuttle,
|
||||
travelTimeTimestamp = Date.now(),
|
||||
) {
|
||||
const arrivedStop = await this.getArrivedStopIfExists(shuttle);
|
||||
|
||||
if (arrivedStop != undefined) {
|
||||
const lastStopTimestamp = await this.getShuttleLastStopArrival(shuttle.id);
|
||||
if (lastStopTimestamp != undefined) {
|
||||
const routeId = shuttle.routeId;
|
||||
const fromStopId = lastStopTimestamp.stopId;
|
||||
const toStopId = arrivedStop.id;
|
||||
|
||||
const travelTimeSeconds = (travelTimeTimestamp - lastStopTimestamp.timestamp.getTime()) / 1000;
|
||||
await this.addTravelTimeDataPoint({ routeId, fromStopId, toStopId }, travelTimeSeconds, travelTimeTimestamp);
|
||||
}
|
||||
|
||||
await this.updateShuttleLastStopArrival(shuttle.id, {
|
||||
stopId: arrivedStop.id,
|
||||
timestamp: new Date(travelTimeTimestamp),
|
||||
shuttleId: shuttle.id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async addTravelTimeDataPoint(
|
||||
{ routeId, fromStopId, toStopId }: ShuttleTravelTimeDataIdentifier,
|
||||
travelTimeSeconds: number,
|
||||
timestamp = Date.now(),
|
||||
): Promise<void> {
|
||||
const key = `${routeId}:${fromStopId}:${toStopId}`;
|
||||
const dataPoints = this.travelTimeData.get(key) || [];
|
||||
dataPoints.push({ timestamp, seconds: travelTimeSeconds });
|
||||
this.travelTimeData.set(key, dataPoints);
|
||||
}
|
||||
|
||||
private async updateShuttleLastStopArrival(shuttleId: string, lastStopArrival: ShuttleStopArrival) {
|
||||
this.shuttleLastStopArrivals.set(shuttleId, lastStopArrival);
|
||||
@@ -350,27 +267,10 @@ export class UnoptimizedInMemoryShuttleRepository
|
||||
}, this.orderedStops);
|
||||
}
|
||||
|
||||
public async removeEtaIfExists(shuttleId: string, stopId: string): Promise<IEta | null> {
|
||||
const removedEta = await this.removeEntityByMatcherIfExists((eta) => {
|
||||
return eta.stopId === stopId
|
||||
&& eta.shuttleId === shuttleId
|
||||
}, this.etas);
|
||||
if (removedEta) {
|
||||
this.emit(ShuttleRepositoryEvent.ETA_REMOVED, removedEta);
|
||||
}
|
||||
return removedEta;
|
||||
}
|
||||
|
||||
public async clearShuttleData(): Promise<void> {
|
||||
this.shuttles = [];
|
||||
}
|
||||
|
||||
public async clearEtaData(): Promise<void> {
|
||||
const removedEtas = [...this.etas];
|
||||
this.etas = [];
|
||||
this.emit(ShuttleRepositoryEvent.ETA_DATA_CLEARED, removedEtas);
|
||||
}
|
||||
|
||||
public async clearOrderedStopData(): Promise<void> {
|
||||
this.orderedStops = [];
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user