mirror of
https://github.com/brendan-ch/project-inter-server.git
synced 2026-04-19 08:50:29 +00:00
Add system ID prefix to all Redis keys to prevent cross-system ID clashes
When multiple university systems share the same Redis instance, entity IDs (shuttles, stops, routes, etc.) could collide. This namespaces all Redis keys with the system ID (e.g., `1:shuttle:stop:123` instead of `shuttle:stop:123`). - Add systemId field and prefixKey() helper to BaseRedisRepository - Update all Redis repository subclasses to use prefixed keys - Wire system ID from InterchangeSystem.build() into Redis repositories - Add migration utility (migrateRedisKeysToSystemPrefix) with tests - Update all test holders to pass a test system ID https://claude.ai/code/session_012Vfz1NHWJbVtoDEWcE5tq6
This commit is contained in:
@@ -91,7 +91,7 @@ export class InterchangeSystem {
|
|||||||
);
|
);
|
||||||
notificationScheduler.startListeningForUpdates();
|
notificationScheduler.startListeningForUpdates();
|
||||||
|
|
||||||
let { parkingRepository, timedParkingLoader } = await InterchangeSystem.buildRedisParkingLoaderAndRepository(args.parkingSystemId);
|
let { parkingRepository, timedParkingLoader } = await InterchangeSystem.buildRedisParkingLoaderAndRepository(args.parkingSystemId, args.id);
|
||||||
timedParkingLoader?.start();
|
timedParkingLoader?.start();
|
||||||
|
|
||||||
return new InterchangeSystem(
|
return new InterchangeSystem(
|
||||||
@@ -145,7 +145,7 @@ export class InterchangeSystem {
|
|||||||
shuttleRepository: ShuttleGetterRepository,
|
shuttleRepository: ShuttleGetterRepository,
|
||||||
args: InterchangeSystemBuilderArguments
|
args: InterchangeSystemBuilderArguments
|
||||||
) {
|
) {
|
||||||
const notificationRepository = new RedisNotificationRepository();
|
const notificationRepository = new RedisNotificationRepository(undefined, args.id);
|
||||||
await notificationRepository.connect();
|
await notificationRepository.connect();
|
||||||
const notificationScheduler = new ETANotificationScheduler(
|
const notificationScheduler = new ETANotificationScheduler(
|
||||||
etaRepository,
|
etaRepository,
|
||||||
@@ -157,12 +157,12 @@ export class InterchangeSystem {
|
|||||||
return { notificationScheduler, notificationRepository };
|
return { notificationScheduler, notificationRepository };
|
||||||
}
|
}
|
||||||
|
|
||||||
private static async buildRedisParkingLoaderAndRepository(id?: string) {
|
private static async buildRedisParkingLoaderAndRepository(id?: string, systemId: string = '') {
|
||||||
if (id === undefined) {
|
if (id === undefined) {
|
||||||
return { parkingRepository: null, timedParkingLoader: null };
|
return { parkingRepository: null, timedParkingLoader: null };
|
||||||
}
|
}
|
||||||
|
|
||||||
let parkingRepository: RedisParkingRepository | null = new RedisParkingRepository();
|
let parkingRepository: RedisParkingRepository | null = new RedisParkingRepository(undefined, systemId);
|
||||||
await parkingRepository.connect();
|
await parkingRepository.connect();
|
||||||
|
|
||||||
const loaderBuilderArguments: ParkingRepositoryLoaderBuilderArguments = {
|
const loaderBuilderArguments: ParkingRepositoryLoaderBuilderArguments = {
|
||||||
|
|||||||
@@ -4,17 +4,23 @@ import createRedisClientForRepository from '../helpers/createRedisClientForRepos
|
|||||||
|
|
||||||
export abstract class BaseRedisRepository extends EventEmitter {
|
export abstract class BaseRedisRepository extends EventEmitter {
|
||||||
protected redisClient;
|
protected redisClient;
|
||||||
|
protected readonly systemId: string;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
redisClient: RedisClientType = createRedisClientForRepository(),
|
redisClient: RedisClientType = createRedisClientForRepository(),
|
||||||
|
systemId: string = '',
|
||||||
) {
|
) {
|
||||||
super();
|
super();
|
||||||
this.redisClient = redisClient;
|
this.redisClient = redisClient;
|
||||||
|
this.systemId = systemId;
|
||||||
this.redisClient.on('error', (err) => {
|
this.redisClient.on('error', (err) => {
|
||||||
console.error(err.stack);
|
console.error(err.stack);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected prefixKey = (key: string): string =>
|
||||||
|
this.systemId ? `${this.systemId}:${key}` : key;
|
||||||
|
|
||||||
get isReady() {
|
get isReady() {
|
||||||
return this.redisClient.isReady;
|
return this.redisClient.isReady;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,89 @@
|
|||||||
|
import { afterEach, beforeEach, describe, expect, it } from "@jest/globals";
|
||||||
|
import { createClient, RedisClientType } from "redis";
|
||||||
|
import { migrateRedisKeysToSystemPrefix } from "../migrateRedisKeysToSystemPrefix";
|
||||||
|
|
||||||
|
describe("migrateRedisKeysToSystemPrefix", () => {
|
||||||
|
let redisClient: RedisClientType;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
redisClient = createClient({
|
||||||
|
url: process.env.REDIS_URL,
|
||||||
|
});
|
||||||
|
await redisClient.connect();
|
||||||
|
await redisClient.flushAll();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
if (redisClient) {
|
||||||
|
await redisClient.flushAll();
|
||||||
|
await redisClient.disconnect();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("migrates notification keys", async () => {
|
||||||
|
await redisClient.hSet("notification:shuttle1|stop1", "device1", "180");
|
||||||
|
|
||||||
|
const count = await migrateRedisKeysToSystemPrefix(redisClient, "1");
|
||||||
|
|
||||||
|
expect(count).toBe(1);
|
||||||
|
const oldKeyExists = await redisClient.exists("notification:shuttle1|stop1");
|
||||||
|
expect(oldKeyExists).toBe(0);
|
||||||
|
const newValue = await redisClient.hGet("1:notification:shuttle1|stop1", "device1");
|
||||||
|
expect(newValue).toBe("180");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("migrates shuttle keys", async () => {
|
||||||
|
await redisClient.hSet("shuttle:stop:stop1", { id: "stop1", name: "Test Stop" });
|
||||||
|
await redisClient.hSet("shuttle:route:route1", { id: "route1", name: "Test Route" });
|
||||||
|
|
||||||
|
const count = await migrateRedisKeysToSystemPrefix(redisClient, "1");
|
||||||
|
|
||||||
|
expect(count).toBe(2);
|
||||||
|
const newStop = await redisClient.hGetAll("1:shuttle:stop:stop1");
|
||||||
|
expect(newStop.name).toBe("Test Stop");
|
||||||
|
const newRoute = await redisClient.hGetAll("1:shuttle:route:route1");
|
||||||
|
expect(newRoute.name).toBe("Test Route");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("migrates parking keys", async () => {
|
||||||
|
await redisClient.hSet("parking:structure:struct1", { id: "struct1", name: "Lot A" });
|
||||||
|
|
||||||
|
const count = await migrateRedisKeysToSystemPrefix(redisClient, "1");
|
||||||
|
|
||||||
|
expect(count).toBe(1);
|
||||||
|
const newStructure = await redisClient.hGetAll("1:parking:structure:struct1");
|
||||||
|
expect(newStructure.name).toBe("Lot A");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("skips keys that already have the system prefix", async () => {
|
||||||
|
await redisClient.hSet("1:notification:shuttle1|stop1", "device1", "180");
|
||||||
|
|
||||||
|
const count = await migrateRedisKeysToSystemPrefix(redisClient, "1");
|
||||||
|
|
||||||
|
expect(count).toBe(0);
|
||||||
|
const value = await redisClient.hGet("1:notification:shuttle1|stop1", "device1");
|
||||||
|
expect(value).toBe("180");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not touch unrelated keys", async () => {
|
||||||
|
await redisClient.set("unrelated:key", "value");
|
||||||
|
await redisClient.hSet("notification:shuttle1|stop1", "device1", "180");
|
||||||
|
|
||||||
|
const count = await migrateRedisKeysToSystemPrefix(redisClient, "1");
|
||||||
|
|
||||||
|
expect(count).toBe(1);
|
||||||
|
const unrelatedValue = await redisClient.get("unrelated:key");
|
||||||
|
expect(unrelatedValue).toBe("value");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("throws if systemId is empty", async () => {
|
||||||
|
await expect(
|
||||||
|
migrateRedisKeysToSystemPrefix(redisClient, "")
|
||||||
|
).rejects.toThrow("systemId must be a non-empty string");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns 0 when there are no keys to migrate", async () => {
|
||||||
|
const count = await migrateRedisKeysToSystemPrefix(redisClient, "1");
|
||||||
|
expect(count).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
41
src/repositories/migrateRedisKeysToSystemPrefix.ts
Normal file
41
src/repositories/migrateRedisKeysToSystemPrefix.ts
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
import { RedisClientType } from 'redis';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Migrates existing Redis keys to include a system ID prefix.
|
||||||
|
*
|
||||||
|
* This handles the transition from unprefixed keys (e.g., `notification:shuttle1|stop5`)
|
||||||
|
* to system-prefixed keys (e.g., `1:notification:shuttle1|stop5`), preventing ID clashes
|
||||||
|
* when multiple university systems share the same Redis instance.
|
||||||
|
*
|
||||||
|
* Uses SCAN instead of KEYS to avoid blocking Redis on large datasets.
|
||||||
|
*
|
||||||
|
* @param redisClient - A connected Redis client
|
||||||
|
* @param systemId - The system ID to prefix keys with
|
||||||
|
* @returns The number of keys migrated
|
||||||
|
*/
|
||||||
|
export const migrateRedisKeysToSystemPrefix = async (
|
||||||
|
redisClient: RedisClientType,
|
||||||
|
systemId: string,
|
||||||
|
): Promise<number> => {
|
||||||
|
if (!systemId) {
|
||||||
|
throw new Error('systemId must be a non-empty string');
|
||||||
|
}
|
||||||
|
|
||||||
|
const patterns = ['notification:*', 'shuttle:*', 'parking:*'];
|
||||||
|
let migratedCount = 0;
|
||||||
|
|
||||||
|
for (const pattern of patterns) {
|
||||||
|
for await (const key of redisClient.scanIterator({ MATCH: pattern })) {
|
||||||
|
// Skip keys that already have a system prefix
|
||||||
|
if (key.startsWith(`${systemId}:`)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const newKey = `${systemId}:${key}`;
|
||||||
|
await redisClient.rename(key, newKey);
|
||||||
|
migratedCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return migratedCount;
|
||||||
|
};
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import { RedisClientType } from 'redis';
|
||||||
import { TupleKey } from '../../types/TupleKey';
|
import { TupleKey } from '../../types/TupleKey';
|
||||||
import {
|
import {
|
||||||
Listener,
|
Listener,
|
||||||
@@ -7,14 +8,22 @@ import {
|
|||||||
ScheduledNotification
|
ScheduledNotification
|
||||||
} from "./NotificationRepository";
|
} from "./NotificationRepository";
|
||||||
import { BaseRedisRepository } from "../BaseRedisRepository";
|
import { BaseRedisRepository } from "../BaseRedisRepository";
|
||||||
|
import createRedisClientForRepository from '../../helpers/createRedisClientForRepository';
|
||||||
|
|
||||||
export class RedisNotificationRepository extends BaseRedisRepository implements NotificationRepository {
|
export class RedisNotificationRepository extends BaseRedisRepository implements NotificationRepository {
|
||||||
private notificationListeners: Listener[] = [];
|
private notificationListeners: Listener[] = [];
|
||||||
private readonly NOTIFICATION_KEY_PREFIX = 'notification:';
|
private readonly NOTIFICATION_KEY_PREFIX = 'notification:';
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
redisClient: RedisClientType = createRedisClientForRepository(),
|
||||||
|
systemId: string = '',
|
||||||
|
) {
|
||||||
|
super(redisClient, systemId);
|
||||||
|
}
|
||||||
|
|
||||||
private getNotificationKey = (shuttleId: string, stopId: string): string => {
|
private getNotificationKey = (shuttleId: string, stopId: string): string => {
|
||||||
const tuple = new TupleKey(shuttleId, stopId);
|
const tuple = new TupleKey(shuttleId, stopId);
|
||||||
return `${this.NOTIFICATION_KEY_PREFIX}${tuple.toString()}`;
|
return this.prefixKey(`${this.NOTIFICATION_KEY_PREFIX}${tuple.toString()}`);
|
||||||
};
|
};
|
||||||
|
|
||||||
public addOrUpdateNotification = async (notification: ScheduledNotification): Promise<void> => {
|
public addOrUpdateNotification = async (notification: ScheduledNotification): Promise<void> => {
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ class RedisNotificationRepositoryHolder implements RepositoryHolder {
|
|||||||
url: process.env.REDIS_URL,
|
url: process.env.REDIS_URL,
|
||||||
});
|
});
|
||||||
await this.redisClient.connect();
|
await this.redisClient.connect();
|
||||||
this.repo = new RedisNotificationRepository(this.redisClient);
|
this.repo = new RedisNotificationRepository(this.redisClient, 'test-system');
|
||||||
return this.repo;
|
return this.repo;
|
||||||
}
|
}
|
||||||
teardown = async () => {
|
teardown = async () => {
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ import { IParkingStructure } from "../../entities/ParkingRepositoryEntities";
|
|||||||
import { HistoricalParkingAverageQueryResult, HistoricalParkingAverageFilterArguments } from "./ParkingGetterRepository";
|
import { HistoricalParkingAverageQueryResult, HistoricalParkingAverageFilterArguments } from "./ParkingGetterRepository";
|
||||||
import { BaseRedisRepository } from "../BaseRedisRepository";
|
import { BaseRedisRepository } from "../BaseRedisRepository";
|
||||||
import { PARKING_LOGGING_INTERVAL_MS } from "../../environment";
|
import { PARKING_LOGGING_INTERVAL_MS } from "../../environment";
|
||||||
|
import { RedisClientType } from "redis";
|
||||||
|
import createRedisClientForRepository from "../../helpers/createRedisClientForRepository";
|
||||||
|
|
||||||
export type ParkingStructureID = string;
|
export type ParkingStructureID = string;
|
||||||
|
|
||||||
@@ -10,6 +12,13 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
|
|||||||
private dataLastAdded: Map<ParkingStructureID, Date> = new Map();
|
private dataLastAdded: Map<ParkingStructureID, Date> = new Map();
|
||||||
private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS;
|
private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
redisClient: RedisClientType = createRedisClientForRepository(),
|
||||||
|
systemId: string = '',
|
||||||
|
) {
|
||||||
|
super(redisClient, systemId);
|
||||||
|
}
|
||||||
|
|
||||||
addOrUpdateParkingStructure = async (structure: IParkingStructure): Promise<void> => {
|
addOrUpdateParkingStructure = async (structure: IParkingStructure): Promise<void> => {
|
||||||
const keys = this.createRedisKeys(structure.id);
|
const keys = this.createRedisKeys(structure.id);
|
||||||
await this.redisClient.hSet(keys.structure, this.createRedisHashFromStructure(structure));
|
await this.redisClient.hSet(keys.structure, this.createRedisHashFromStructure(structure));
|
||||||
@@ -28,8 +37,8 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
|
|||||||
};
|
};
|
||||||
|
|
||||||
clearParkingStructureData = async (): Promise<void> => {
|
clearParkingStructureData = async (): Promise<void> => {
|
||||||
const structureKeys = await this.redisClient.keys('parking:structure:*');
|
const structureKeys = await this.redisClient.keys(this.prefixKey('parking:structure:*'));
|
||||||
const timeSeriesKeys = await this.redisClient.keys('parking:timeseries:*');
|
const timeSeriesKeys = await this.redisClient.keys(this.prefixKey('parking:timeseries:*'));
|
||||||
|
|
||||||
const allKeys = [...structureKeys, ...timeSeriesKeys];
|
const allKeys = [...structureKeys, ...timeSeriesKeys];
|
||||||
if (allKeys.length > 0) {
|
if (allKeys.length > 0) {
|
||||||
@@ -51,7 +60,7 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
|
|||||||
};
|
};
|
||||||
|
|
||||||
getParkingStructures = async (): Promise<IParkingStructure[]> => {
|
getParkingStructures = async (): Promise<IParkingStructure[]> => {
|
||||||
const keys = await this.redisClient.keys('parking:structure:*');
|
const keys = await this.redisClient.keys(this.prefixKey('parking:structure:*'));
|
||||||
const structures: IParkingStructure[] = [];
|
const structures: IParkingStructure[] = [];
|
||||||
|
|
||||||
for (const key of keys) {
|
for (const key of keys) {
|
||||||
@@ -80,8 +89,8 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
|
|||||||
};
|
};
|
||||||
|
|
||||||
private createRedisKeys = (structureId: string) => ({
|
private createRedisKeys = (structureId: string) => ({
|
||||||
structure: `parking:structure:${structureId}`,
|
structure: this.prefixKey(`parking:structure:${structureId}`),
|
||||||
timeSeries: `parking:timeseries:${structureId}`
|
timeSeries: this.prefixKey(`parking:timeseries:${structureId}`),
|
||||||
});
|
});
|
||||||
|
|
||||||
private createRedisHashFromStructure = (structure: IParkingStructure): Record<string, string> => ({
|
private createRedisHashFromStructure = (structure: IParkingStructure): Record<string, string> => ({
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ class RedisParkingRepositoryHolder implements RepositoryHolder<ParkingGetterSett
|
|||||||
url: process.env.REDIS_URL,
|
url: process.env.REDIS_URL,
|
||||||
});
|
});
|
||||||
await this.redisClient.connect();
|
await this.redisClient.connect();
|
||||||
this.repo = new RedisParkingRepository(this.redisClient);
|
this.repo = new RedisParkingRepository(this.redisClient, 'test-system');
|
||||||
return this.repo;
|
return this.repo;
|
||||||
};
|
};
|
||||||
teardown = async () => {
|
teardown = async () => {
|
||||||
|
|||||||
@@ -17,8 +17,9 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
|
|||||||
constructor(
|
constructor(
|
||||||
redisClient: RedisClientType = createRedisClientForRepository(),
|
redisClient: RedisClientType = createRedisClientForRepository(),
|
||||||
readonly shuttleStopArrivalDegreeDelta: number = 0.001,
|
readonly shuttleStopArrivalDegreeDelta: number = 0.001,
|
||||||
|
systemId: string = '',
|
||||||
) {
|
) {
|
||||||
super(redisClient);
|
super(redisClient, systemId);
|
||||||
}
|
}
|
||||||
|
|
||||||
get isReady() {
|
get isReady() {
|
||||||
@@ -83,24 +84,24 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Key prefixes for individual entity keys
|
// Key prefixes for individual entity keys
|
||||||
private readonly stopKeyPrefix = 'shuttle:stop:';
|
private get stopKeyPrefix() { return this.prefixKey('shuttle:stop:'); }
|
||||||
private readonly routeKeyPrefix = 'shuttle:route:';
|
private get routeKeyPrefix() { return this.prefixKey('shuttle:route:'); }
|
||||||
private readonly shuttleKeyPrefix = 'shuttle:shuttle:';
|
private get shuttleKeyPrefix() { return this.prefixKey('shuttle:shuttle:'); }
|
||||||
private readonly orderedStopKeyPrefix = 'shuttle:orderedstop:';
|
private get orderedStopKeyPrefix() { return this.prefixKey('shuttle:orderedstop:'); }
|
||||||
private readonly lastStopKeyPrefix = 'shuttle:laststop:';
|
private get lastStopKeyPrefix() { return this.prefixKey('shuttle:laststop:'); }
|
||||||
private readonly historicalEtaKeyPrefix = 'shuttle:eta:historical:';
|
private get historicalEtaKeyPrefix() { return this.prefixKey('shuttle:eta:historical:'); }
|
||||||
|
|
||||||
// Key patterns for bulk operations (e.g., getting all keys, clearing data)
|
// Key patterns for bulk operations (e.g., getting all keys, clearing data)
|
||||||
private readonly stopKeyPattern = 'shuttle:stop:*';
|
private get stopKeyPattern() { return this.prefixKey('shuttle:stop:*'); }
|
||||||
private readonly routeKeyPattern = 'shuttle:route:*';
|
private get routeKeyPattern() { return this.prefixKey('shuttle:route:*'); }
|
||||||
private readonly shuttleKeyPattern = 'shuttle:shuttle:*';
|
private get shuttleKeyPattern() { return this.prefixKey('shuttle:shuttle:*'); }
|
||||||
private readonly orderedStopKeyPattern = 'shuttle:orderedstop:*';
|
private get orderedStopKeyPattern() { return this.prefixKey('shuttle:orderedstop:*'); }
|
||||||
private readonly lastStopKeyPattern = 'shuttle:laststop:*';
|
private get lastStopKeyPattern() { return this.prefixKey('shuttle:laststop:*'); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents a set storing the shuttles that are currently at a stop.
|
* Represents a set storing the shuttles that are currently at a stop.
|
||||||
*/
|
*/
|
||||||
private readonly shuttleIsAtStopKey = 'shuttle:atstop';
|
private get shuttleIsAtStopKey() { return this.prefixKey('shuttle:atstop'); }
|
||||||
|
|
||||||
// Helper methods for Redis key generation
|
// Helper methods for Redis key generation
|
||||||
private readonly createStopKey = (stopId: string) => `${this.stopKeyPrefix}${stopId}`;
|
private readonly createStopKey = (stopId: string) => `${this.stopKeyPrefix}${stopId}`;
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ class RedisShuttleRepositoryHolder implements RepositoryHolder<ShuttleGetterSett
|
|||||||
url: process.env.REDIS_URL,
|
url: process.env.REDIS_URL,
|
||||||
});
|
});
|
||||||
await this.redisClient.connect();
|
await this.redisClient.connect();
|
||||||
this.repo = new RedisShuttleRepository(this.redisClient);
|
this.repo = new RedisShuttleRepository(this.redisClient, 0.001, 'test-system');
|
||||||
return this.repo;
|
return this.repo;
|
||||||
};
|
};
|
||||||
teardown = async () => {
|
teardown = async () => {
|
||||||
|
|||||||
@@ -3,11 +3,11 @@ import { BaseRedisRepository } from "../../BaseRedisRepository";
|
|||||||
import { ETAGetterRepository, ETARepositoryEvent, ETARepositoryEventListener, ETARepositoryEventName } from "./ETAGetterRepository";
|
import { ETAGetterRepository, ETARepositoryEvent, ETARepositoryEventListener, ETARepositoryEventName } from "./ETAGetterRepository";
|
||||||
|
|
||||||
export abstract class BaseRedisETARepository extends BaseRedisRepository implements ETAGetterRepository {
|
export abstract class BaseRedisETARepository extends BaseRedisRepository implements ETAGetterRepository {
|
||||||
private static readonly ETA_KEY_PREFIX = 'shuttle:eta:';
|
private get etaKeyPrefix() { return this.prefixKey('shuttle:eta:'); }
|
||||||
|
|
||||||
// Helper methods
|
// Helper methods
|
||||||
protected createEtaKey = (shuttleId: string, stopId: string) =>
|
protected createEtaKey = (shuttleId: string, stopId: string) =>
|
||||||
`${BaseRedisETARepository.ETA_KEY_PREFIX}${shuttleId}:${stopId}`;
|
`${this.etaKeyPrefix}${shuttleId}:${stopId}`;
|
||||||
|
|
||||||
createRedisHashFromEta = (eta: IEta): Record<string, string> => ({
|
createRedisHashFromEta = (eta: IEta): Record<string, string> => ({
|
||||||
secondsRemaining: eta.secondsRemaining.toString(),
|
secondsRemaining: eta.secondsRemaining.toString(),
|
||||||
@@ -27,7 +27,7 @@ export abstract class BaseRedisETARepository extends BaseRedisRepository impleme
|
|||||||
|
|
||||||
// Getter implementations
|
// Getter implementations
|
||||||
async getEtasForShuttleId(shuttleId: string): Promise<IEta[]> {
|
async getEtasForShuttleId(shuttleId: string): Promise<IEta[]> {
|
||||||
const keys = await this.redisClient.keys(`${BaseRedisETARepository.ETA_KEY_PREFIX}${shuttleId}:*`);
|
const keys = await this.redisClient.keys(`${this.etaKeyPrefix}${shuttleId}:*`);
|
||||||
const etas: IEta[] = [];
|
const etas: IEta[] = [];
|
||||||
|
|
||||||
for (const key of keys) {
|
for (const key of keys) {
|
||||||
@@ -41,7 +41,7 @@ export abstract class BaseRedisETARepository extends BaseRedisRepository impleme
|
|||||||
}
|
}
|
||||||
|
|
||||||
async getEtasForStopId(stopId: string): Promise<IEta[]> {
|
async getEtasForStopId(stopId: string): Promise<IEta[]> {
|
||||||
const keys = await this.redisClient.keys(`${BaseRedisETARepository.ETA_KEY_PREFIX}*`);
|
const keys = await this.redisClient.keys(`${this.etaKeyPrefix}*`);
|
||||||
const etas: IEta[] = [];
|
const etas: IEta[] = [];
|
||||||
|
|
||||||
for (const key of keys) {
|
for (const key of keys) {
|
||||||
|
|||||||
@@ -2,8 +2,17 @@ import { IEta } from "../../../entities/ShuttleRepositoryEntities";
|
|||||||
import { BaseRedisETARepository } from "./BaseRedisETARepository";
|
import { BaseRedisETARepository } from "./BaseRedisETARepository";
|
||||||
import { ExternalSourceETARepository } from "./ExternalSourceETARepository";
|
import { ExternalSourceETARepository } from "./ExternalSourceETARepository";
|
||||||
import { ETARepositoryEvent } from "./ETAGetterRepository";
|
import { ETARepositoryEvent } from "./ETAGetterRepository";
|
||||||
|
import { RedisClientType } from "redis";
|
||||||
|
import createRedisClientForRepository from "../../../helpers/createRedisClientForRepository";
|
||||||
|
|
||||||
export class RedisExternalSourceETARepository extends BaseRedisETARepository implements ExternalSourceETARepository {
|
export class RedisExternalSourceETARepository extends BaseRedisETARepository implements ExternalSourceETARepository {
|
||||||
|
constructor(
|
||||||
|
redisClient: RedisClientType = createRedisClientForRepository(),
|
||||||
|
systemId: string = '',
|
||||||
|
) {
|
||||||
|
super(redisClient, systemId);
|
||||||
|
}
|
||||||
|
|
||||||
async addOrUpdateEtaFromExternalSource(eta: IEta): Promise<void> {
|
async addOrUpdateEtaFromExternalSource(eta: IEta): Promise<void> {
|
||||||
await this.addOrUpdateEta(eta);
|
await this.addOrUpdateEta(eta);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,8 +13,9 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple
|
|||||||
readonly shuttleRepository: ShuttleGetterRepository,
|
readonly shuttleRepository: ShuttleGetterRepository,
|
||||||
redisClient: RedisClientType = createRedisClientForRepository(),
|
redisClient: RedisClientType = createRedisClientForRepository(),
|
||||||
private referenceTime: Date | null = null,
|
private referenceTime: Date | null = null,
|
||||||
|
systemId: string = '',
|
||||||
) {
|
) {
|
||||||
super(redisClient);
|
super(redisClient, systemId);
|
||||||
|
|
||||||
this.setReferenceTime = this.setReferenceTime.bind(this);
|
this.setReferenceTime = this.setReferenceTime.bind(this);
|
||||||
this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this);
|
this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this);
|
||||||
@@ -28,7 +29,7 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple
|
|||||||
}
|
}
|
||||||
|
|
||||||
private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => {
|
private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => {
|
||||||
return `shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`;
|
return this.prefixKey(`shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
setReferenceTime(referenceTime: Date) {
|
setReferenceTime(referenceTime: Date) {
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ class RedisExternalSourceETARepositoryHolder implements RepositoryHolder<Externa
|
|||||||
url: process.env.REDIS_URL,
|
url: process.env.REDIS_URL,
|
||||||
});
|
});
|
||||||
await this.redisClient.connect();
|
await this.redisClient.connect();
|
||||||
this.repo = new RedisExternalSourceETARepository(this.redisClient);
|
this.repo = new RedisExternalSourceETARepository(this.redisClient, 'test-system');
|
||||||
return this.repo;
|
return this.repo;
|
||||||
}
|
}
|
||||||
teardown = async () => {
|
teardown = async () => {
|
||||||
|
|||||||
@@ -22,10 +22,12 @@ class RedisSelfUpdatingETARepositoryHolder implements RepositoryHolder<SelfUpdat
|
|||||||
});
|
});
|
||||||
await this.redisClient.connect();
|
await this.redisClient.connect();
|
||||||
await this.redisClient.flushAll();
|
await this.redisClient.flushAll();
|
||||||
this.shuttleRepo = new RedisShuttleRepository(this.redisClient);
|
this.shuttleRepo = new RedisShuttleRepository(this.redisClient, 0.001, 'test-system');
|
||||||
this.repo = new RedisSelfUpdatingETARepository(
|
this.repo = new RedisSelfUpdatingETARepository(
|
||||||
this.shuttleRepo,
|
this.shuttleRepo,
|
||||||
this.redisClient,
|
this.redisClient,
|
||||||
|
null,
|
||||||
|
'test-system',
|
||||||
);
|
);
|
||||||
return this.repo;
|
return this.repo;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user