diff --git a/src/entities/InterchangeSystem.ts b/src/entities/InterchangeSystem.ts index d183f23..ea2b72d 100644 --- a/src/entities/InterchangeSystem.ts +++ b/src/entities/InterchangeSystem.ts @@ -91,7 +91,7 @@ export class InterchangeSystem { ); notificationScheduler.startListeningForUpdates(); - let { parkingRepository, timedParkingLoader } = await InterchangeSystem.buildRedisParkingLoaderAndRepository(args.parkingSystemId); + let { parkingRepository, timedParkingLoader } = await InterchangeSystem.buildRedisParkingLoaderAndRepository(args.parkingSystemId, args.id); timedParkingLoader?.start(); return new InterchangeSystem( @@ -145,7 +145,7 @@ export class InterchangeSystem { shuttleRepository: ShuttleGetterRepository, args: InterchangeSystemBuilderArguments ) { - const notificationRepository = new RedisNotificationRepository(); + const notificationRepository = new RedisNotificationRepository(undefined, args.id); await notificationRepository.connect(); const notificationScheduler = new ETANotificationScheduler( etaRepository, @@ -157,12 +157,12 @@ export class InterchangeSystem { return { notificationScheduler, notificationRepository }; } - private static async buildRedisParkingLoaderAndRepository(id?: string) { + private static async buildRedisParkingLoaderAndRepository(id?: string, systemId: string = '') { if (id === undefined) { return { parkingRepository: null, timedParkingLoader: null }; } - let parkingRepository: RedisParkingRepository | null = new RedisParkingRepository(); + let parkingRepository: RedisParkingRepository | null = new RedisParkingRepository(undefined, systemId); await parkingRepository.connect(); const loaderBuilderArguments: ParkingRepositoryLoaderBuilderArguments = { diff --git a/src/repositories/BaseRedisRepository.ts b/src/repositories/BaseRedisRepository.ts index c2ec079..7183180 100644 --- a/src/repositories/BaseRedisRepository.ts +++ b/src/repositories/BaseRedisRepository.ts @@ -4,17 +4,23 @@ import createRedisClientForRepository from '../helpers/createRedisClientForRepos export abstract class BaseRedisRepository extends EventEmitter { protected redisClient; + protected readonly systemId: string; constructor( redisClient: RedisClientType = createRedisClientForRepository(), + systemId: string = '', ) { super(); this.redisClient = redisClient; + this.systemId = systemId; this.redisClient.on('error', (err) => { console.error(err.stack); }); } + protected prefixKey = (key: string): string => + this.systemId ? `${this.systemId}:${key}` : key; + get isReady() { return this.redisClient.isReady; } diff --git a/src/repositories/__tests__/migrateRedisKeysToSystemPrefix.test.ts b/src/repositories/__tests__/migrateRedisKeysToSystemPrefix.test.ts new file mode 100644 index 0000000..de1bd54 --- /dev/null +++ b/src/repositories/__tests__/migrateRedisKeysToSystemPrefix.test.ts @@ -0,0 +1,89 @@ +import { afterEach, beforeEach, describe, expect, it } from "@jest/globals"; +import { createClient, RedisClientType } from "redis"; +import { migrateRedisKeysToSystemPrefix } from "../migrateRedisKeysToSystemPrefix"; + +describe("migrateRedisKeysToSystemPrefix", () => { + let redisClient: RedisClientType; + + beforeEach(async () => { + redisClient = createClient({ + url: process.env.REDIS_URL, + }); + await redisClient.connect(); + await redisClient.flushAll(); + }); + + afterEach(async () => { + if (redisClient) { + await redisClient.flushAll(); + await redisClient.disconnect(); + } + }); + + it("migrates notification keys", async () => { + await redisClient.hSet("notification:shuttle1|stop1", "device1", "180"); + + const count = await migrateRedisKeysToSystemPrefix(redisClient, "1"); + + expect(count).toBe(1); + const oldKeyExists = await redisClient.exists("notification:shuttle1|stop1"); + expect(oldKeyExists).toBe(0); + const newValue = await redisClient.hGet("1:notification:shuttle1|stop1", "device1"); + expect(newValue).toBe("180"); + }); + + it("migrates shuttle keys", async () => { + await redisClient.hSet("shuttle:stop:stop1", { id: "stop1", name: "Test Stop" }); + await redisClient.hSet("shuttle:route:route1", { id: "route1", name: "Test Route" }); + + const count = await migrateRedisKeysToSystemPrefix(redisClient, "1"); + + expect(count).toBe(2); + const newStop = await redisClient.hGetAll("1:shuttle:stop:stop1"); + expect(newStop.name).toBe("Test Stop"); + const newRoute = await redisClient.hGetAll("1:shuttle:route:route1"); + expect(newRoute.name).toBe("Test Route"); + }); + + it("migrates parking keys", async () => { + await redisClient.hSet("parking:structure:struct1", { id: "struct1", name: "Lot A" }); + + const count = await migrateRedisKeysToSystemPrefix(redisClient, "1"); + + expect(count).toBe(1); + const newStructure = await redisClient.hGetAll("1:parking:structure:struct1"); + expect(newStructure.name).toBe("Lot A"); + }); + + it("skips keys that already have the system prefix", async () => { + await redisClient.hSet("1:notification:shuttle1|stop1", "device1", "180"); + + const count = await migrateRedisKeysToSystemPrefix(redisClient, "1"); + + expect(count).toBe(0); + const value = await redisClient.hGet("1:notification:shuttle1|stop1", "device1"); + expect(value).toBe("180"); + }); + + it("does not touch unrelated keys", async () => { + await redisClient.set("unrelated:key", "value"); + await redisClient.hSet("notification:shuttle1|stop1", "device1", "180"); + + const count = await migrateRedisKeysToSystemPrefix(redisClient, "1"); + + expect(count).toBe(1); + const unrelatedValue = await redisClient.get("unrelated:key"); + expect(unrelatedValue).toBe("value"); + }); + + it("throws if systemId is empty", async () => { + await expect( + migrateRedisKeysToSystemPrefix(redisClient, "") + ).rejects.toThrow("systemId must be a non-empty string"); + }); + + it("returns 0 when there are no keys to migrate", async () => { + const count = await migrateRedisKeysToSystemPrefix(redisClient, "1"); + expect(count).toBe(0); + }); +}); diff --git a/src/repositories/migrateRedisKeysToSystemPrefix.ts b/src/repositories/migrateRedisKeysToSystemPrefix.ts new file mode 100644 index 0000000..c8328af --- /dev/null +++ b/src/repositories/migrateRedisKeysToSystemPrefix.ts @@ -0,0 +1,41 @@ +import { RedisClientType } from 'redis'; + +/** + * Migrates existing Redis keys to include a system ID prefix. + * + * This handles the transition from unprefixed keys (e.g., `notification:shuttle1|stop5`) + * to system-prefixed keys (e.g., `1:notification:shuttle1|stop5`), preventing ID clashes + * when multiple university systems share the same Redis instance. + * + * Uses SCAN instead of KEYS to avoid blocking Redis on large datasets. + * + * @param redisClient - A connected Redis client + * @param systemId - The system ID to prefix keys with + * @returns The number of keys migrated + */ +export const migrateRedisKeysToSystemPrefix = async ( + redisClient: RedisClientType, + systemId: string, +): Promise => { + if (!systemId) { + throw new Error('systemId must be a non-empty string'); + } + + const patterns = ['notification:*', 'shuttle:*', 'parking:*']; + let migratedCount = 0; + + for (const pattern of patterns) { + for await (const key of redisClient.scanIterator({ MATCH: pattern })) { + // Skip keys that already have a system prefix + if (key.startsWith(`${systemId}:`)) { + continue; + } + + const newKey = `${systemId}:${key}`; + await redisClient.rename(key, newKey); + migratedCount++; + } + } + + return migratedCount; +}; diff --git a/src/repositories/notifications/RedisNotificationRepository.ts b/src/repositories/notifications/RedisNotificationRepository.ts index d4b2e0f..d1cd9eb 100644 --- a/src/repositories/notifications/RedisNotificationRepository.ts +++ b/src/repositories/notifications/RedisNotificationRepository.ts @@ -1,3 +1,4 @@ +import { RedisClientType } from 'redis'; import { TupleKey } from '../../types/TupleKey'; import { Listener, @@ -7,14 +8,22 @@ import { ScheduledNotification } from "./NotificationRepository"; import { BaseRedisRepository } from "../BaseRedisRepository"; +import createRedisClientForRepository from '../../helpers/createRedisClientForRepository'; export class RedisNotificationRepository extends BaseRedisRepository implements NotificationRepository { private notificationListeners: Listener[] = []; private readonly NOTIFICATION_KEY_PREFIX = 'notification:'; + constructor( + redisClient: RedisClientType = createRedisClientForRepository(), + systemId: string = '', + ) { + super(redisClient, systemId); + } + private getNotificationKey = (shuttleId: string, stopId: string): string => { const tuple = new TupleKey(shuttleId, stopId); - return `${this.NOTIFICATION_KEY_PREFIX}${tuple.toString()}`; + return this.prefixKey(`${this.NOTIFICATION_KEY_PREFIX}${tuple.toString()}`); }; public addOrUpdateNotification = async (notification: ScheduledNotification): Promise => { diff --git a/src/repositories/notifications/__tests__/NotificationRepositorySharedTests.test.ts b/src/repositories/notifications/__tests__/NotificationRepositorySharedTests.test.ts index 86ccf07..363a03f 100644 --- a/src/repositories/notifications/__tests__/NotificationRepositorySharedTests.test.ts +++ b/src/repositories/notifications/__tests__/NotificationRepositorySharedTests.test.ts @@ -28,7 +28,7 @@ class RedisNotificationRepositoryHolder implements RepositoryHolder { url: process.env.REDIS_URL, }); await this.redisClient.connect(); - this.repo = new RedisNotificationRepository(this.redisClient); + this.repo = new RedisNotificationRepository(this.redisClient, 'test-system'); return this.repo; } teardown = async () => { diff --git a/src/repositories/parking/RedisParkingRepository.ts b/src/repositories/parking/RedisParkingRepository.ts index 48a74b1..9f2c809 100644 --- a/src/repositories/parking/RedisParkingRepository.ts +++ b/src/repositories/parking/RedisParkingRepository.ts @@ -3,6 +3,8 @@ import { IParkingStructure } from "../../entities/ParkingRepositoryEntities"; import { HistoricalParkingAverageQueryResult, HistoricalParkingAverageFilterArguments } from "./ParkingGetterRepository"; import { BaseRedisRepository } from "../BaseRedisRepository"; import { PARKING_LOGGING_INTERVAL_MS } from "../../environment"; +import { RedisClientType } from "redis"; +import createRedisClientForRepository from "../../helpers/createRedisClientForRepository"; export type ParkingStructureID = string; @@ -10,6 +12,13 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki private dataLastAdded: Map = new Map(); private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS; + constructor( + redisClient: RedisClientType = createRedisClientForRepository(), + systemId: string = '', + ) { + super(redisClient, systemId); + } + addOrUpdateParkingStructure = async (structure: IParkingStructure): Promise => { const keys = this.createRedisKeys(structure.id); await this.redisClient.hSet(keys.structure, this.createRedisHashFromStructure(structure)); @@ -28,8 +37,8 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki }; clearParkingStructureData = async (): Promise => { - const structureKeys = await this.redisClient.keys('parking:structure:*'); - const timeSeriesKeys = await this.redisClient.keys('parking:timeseries:*'); + const structureKeys = await this.redisClient.keys(this.prefixKey('parking:structure:*')); + const timeSeriesKeys = await this.redisClient.keys(this.prefixKey('parking:timeseries:*')); const allKeys = [...structureKeys, ...timeSeriesKeys]; if (allKeys.length > 0) { @@ -51,7 +60,7 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki }; getParkingStructures = async (): Promise => { - const keys = await this.redisClient.keys('parking:structure:*'); + const keys = await this.redisClient.keys(this.prefixKey('parking:structure:*')); const structures: IParkingStructure[] = []; for (const key of keys) { @@ -80,8 +89,8 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki }; private createRedisKeys = (structureId: string) => ({ - structure: `parking:structure:${structureId}`, - timeSeries: `parking:timeseries:${structureId}` + structure: this.prefixKey(`parking:structure:${structureId}`), + timeSeries: this.prefixKey(`parking:timeseries:${structureId}`), }); private createRedisHashFromStructure = (structure: IParkingStructure): Record => ({ diff --git a/src/repositories/parking/__tests__/ParkingRepositorySharedTests.test.ts b/src/repositories/parking/__tests__/ParkingRepositorySharedTests.test.ts index d9b4162..79fdaad 100644 --- a/src/repositories/parking/__tests__/ParkingRepositorySharedTests.test.ts +++ b/src/repositories/parking/__tests__/ParkingRepositorySharedTests.test.ts @@ -25,7 +25,7 @@ class RedisParkingRepositoryHolder implements RepositoryHolder { diff --git a/src/repositories/shuttle/RedisShuttleRepository.ts b/src/repositories/shuttle/RedisShuttleRepository.ts index 3fb63c0..ac0a9c1 100644 --- a/src/repositories/shuttle/RedisShuttleRepository.ts +++ b/src/repositories/shuttle/RedisShuttleRepository.ts @@ -17,8 +17,9 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt constructor( redisClient: RedisClientType = createRedisClientForRepository(), readonly shuttleStopArrivalDegreeDelta: number = 0.001, + systemId: string = '', ) { - super(redisClient); + super(redisClient, systemId); } get isReady() { @@ -83,24 +84,24 @@ export class RedisShuttleRepository extends BaseRedisRepository implements Shutt } // Key prefixes for individual entity keys - private readonly stopKeyPrefix = 'shuttle:stop:'; - private readonly routeKeyPrefix = 'shuttle:route:'; - private readonly shuttleKeyPrefix = 'shuttle:shuttle:'; - private readonly orderedStopKeyPrefix = 'shuttle:orderedstop:'; - private readonly lastStopKeyPrefix = 'shuttle:laststop:'; - private readonly historicalEtaKeyPrefix = 'shuttle:eta:historical:'; + private get stopKeyPrefix() { return this.prefixKey('shuttle:stop:'); } + private get routeKeyPrefix() { return this.prefixKey('shuttle:route:'); } + private get shuttleKeyPrefix() { return this.prefixKey('shuttle:shuttle:'); } + private get orderedStopKeyPrefix() { return this.prefixKey('shuttle:orderedstop:'); } + private get lastStopKeyPrefix() { return this.prefixKey('shuttle:laststop:'); } + private get historicalEtaKeyPrefix() { return this.prefixKey('shuttle:eta:historical:'); } // Key patterns for bulk operations (e.g., getting all keys, clearing data) - private readonly stopKeyPattern = 'shuttle:stop:*'; - private readonly routeKeyPattern = 'shuttle:route:*'; - private readonly shuttleKeyPattern = 'shuttle:shuttle:*'; - private readonly orderedStopKeyPattern = 'shuttle:orderedstop:*'; - private readonly lastStopKeyPattern = 'shuttle:laststop:*'; + private get stopKeyPattern() { return this.prefixKey('shuttle:stop:*'); } + private get routeKeyPattern() { return this.prefixKey('shuttle:route:*'); } + private get shuttleKeyPattern() { return this.prefixKey('shuttle:shuttle:*'); } + private get orderedStopKeyPattern() { return this.prefixKey('shuttle:orderedstop:*'); } + private get lastStopKeyPattern() { return this.prefixKey('shuttle:laststop:*'); } /** * Represents a set storing the shuttles that are currently at a stop. */ - private readonly shuttleIsAtStopKey = 'shuttle:atstop'; + private get shuttleIsAtStopKey() { return this.prefixKey('shuttle:atstop'); } // Helper methods for Redis key generation private readonly createStopKey = (stopId: string) => `${this.stopKeyPrefix}${stopId}`; diff --git a/src/repositories/shuttle/__tests__/ShuttleRepositorySharedTests.test.ts b/src/repositories/shuttle/__tests__/ShuttleRepositorySharedTests.test.ts index 8520999..09255f0 100644 --- a/src/repositories/shuttle/__tests__/ShuttleRepositorySharedTests.test.ts +++ b/src/repositories/shuttle/__tests__/ShuttleRepositorySharedTests.test.ts @@ -31,7 +31,7 @@ class RedisShuttleRepositoryHolder implements RepositoryHolder { diff --git a/src/repositories/shuttle/eta/BaseRedisETARepository.ts b/src/repositories/shuttle/eta/BaseRedisETARepository.ts index 7fef6ee..359bf3e 100644 --- a/src/repositories/shuttle/eta/BaseRedisETARepository.ts +++ b/src/repositories/shuttle/eta/BaseRedisETARepository.ts @@ -3,11 +3,11 @@ import { BaseRedisRepository } from "../../BaseRedisRepository"; import { ETAGetterRepository, ETARepositoryEvent, ETARepositoryEventListener, ETARepositoryEventName } from "./ETAGetterRepository"; export abstract class BaseRedisETARepository extends BaseRedisRepository implements ETAGetterRepository { - private static readonly ETA_KEY_PREFIX = 'shuttle:eta:'; + private get etaKeyPrefix() { return this.prefixKey('shuttle:eta:'); } // Helper methods protected createEtaKey = (shuttleId: string, stopId: string) => - `${BaseRedisETARepository.ETA_KEY_PREFIX}${shuttleId}:${stopId}`; + `${this.etaKeyPrefix}${shuttleId}:${stopId}`; createRedisHashFromEta = (eta: IEta): Record => ({ secondsRemaining: eta.secondsRemaining.toString(), @@ -27,7 +27,7 @@ export abstract class BaseRedisETARepository extends BaseRedisRepository impleme // Getter implementations async getEtasForShuttleId(shuttleId: string): Promise { - const keys = await this.redisClient.keys(`${BaseRedisETARepository.ETA_KEY_PREFIX}${shuttleId}:*`); + const keys = await this.redisClient.keys(`${this.etaKeyPrefix}${shuttleId}:*`); const etas: IEta[] = []; for (const key of keys) { @@ -41,7 +41,7 @@ export abstract class BaseRedisETARepository extends BaseRedisRepository impleme } async getEtasForStopId(stopId: string): Promise { - const keys = await this.redisClient.keys(`${BaseRedisETARepository.ETA_KEY_PREFIX}*`); + const keys = await this.redisClient.keys(`${this.etaKeyPrefix}*`); const etas: IEta[] = []; for (const key of keys) { diff --git a/src/repositories/shuttle/eta/RedisExternalSourceETARepository.ts b/src/repositories/shuttle/eta/RedisExternalSourceETARepository.ts index 298709b..fbedc91 100644 --- a/src/repositories/shuttle/eta/RedisExternalSourceETARepository.ts +++ b/src/repositories/shuttle/eta/RedisExternalSourceETARepository.ts @@ -2,8 +2,17 @@ import { IEta } from "../../../entities/ShuttleRepositoryEntities"; import { BaseRedisETARepository } from "./BaseRedisETARepository"; import { ExternalSourceETARepository } from "./ExternalSourceETARepository"; import { ETARepositoryEvent } from "./ETAGetterRepository"; +import { RedisClientType } from "redis"; +import createRedisClientForRepository from "../../../helpers/createRedisClientForRepository"; export class RedisExternalSourceETARepository extends BaseRedisETARepository implements ExternalSourceETARepository { + constructor( + redisClient: RedisClientType = createRedisClientForRepository(), + systemId: string = '', + ) { + super(redisClient, systemId); + } + async addOrUpdateEtaFromExternalSource(eta: IEta): Promise { await this.addOrUpdateEta(eta); } diff --git a/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts b/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts index cabf5a3..d5f1516 100644 --- a/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts +++ b/src/repositories/shuttle/eta/RedisSelfUpdatingETARepository.ts @@ -13,8 +13,9 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple readonly shuttleRepository: ShuttleGetterRepository, redisClient: RedisClientType = createRedisClientForRepository(), private referenceTime: Date | null = null, + systemId: string = '', ) { - super(redisClient); + super(redisClient, systemId); this.setReferenceTime = this.setReferenceTime.bind(this); this.getAverageTravelTimeSeconds = this.getAverageTravelTimeSeconds.bind(this); @@ -28,7 +29,7 @@ export class RedisSelfUpdatingETARepository extends BaseRedisETARepository imple } private createHistoricalEtaTimeSeriesKey = (routeId: string, fromStopId: string, toStopId: string) => { - return `shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`; + return this.prefixKey(`shuttle:eta:historical:${routeId}:${fromStopId}:${toStopId}`); } setReferenceTime(referenceTime: Date) { diff --git a/src/repositories/shuttle/eta/__tests__/ExternalSourceETARepositorySharedTests.test.ts b/src/repositories/shuttle/eta/__tests__/ExternalSourceETARepositorySharedTests.test.ts index 73aac3a..1f48f83 100644 --- a/src/repositories/shuttle/eta/__tests__/ExternalSourceETARepositorySharedTests.test.ts +++ b/src/repositories/shuttle/eta/__tests__/ExternalSourceETARepositorySharedTests.test.ts @@ -16,7 +16,7 @@ class RedisExternalSourceETARepositoryHolder implements RepositoryHolder { diff --git a/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts b/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts index 6d5d9b1..9b47c6a 100644 --- a/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts +++ b/src/repositories/shuttle/eta/__tests__/SelfUpdatingETARepositorySharedTests.test.ts @@ -22,10 +22,12 @@ class RedisSelfUpdatingETARepositoryHolder implements RepositoryHolder