diff --git a/src/repositories/shuttle/RedisShuttleRepository.ts b/src/repositories/shuttle/RedisShuttleRepository.ts new file mode 100644 index 0000000..cf6e063 --- /dev/null +++ b/src/repositories/shuttle/RedisShuttleRepository.ts @@ -0,0 +1,530 @@ +import EventEmitter from "node:events"; +import { createClient } from 'redis'; +import { ShuttleGetterSetterRepository } from "./ShuttleGetterSetterRepository"; +import { IEta, IOrderedStop, IRoute, IShuttle, IStop } from "../../entities/ShuttleRepositoryEntities"; +import { REDIS_RECONNECT_INTERVAL } from "../../environment"; +import { + ShuttleRepositoryEvent, + ShuttleRepositoryEventListener, + ShuttleRepositoryEventName, + ShuttleRepositoryEventPayloads, +} from "./ShuttleGetterRepository"; + +export class RedisShuttleRepository extends EventEmitter implements ShuttleGetterSetterRepository { + protected redisClient; + + constructor( + redisClient = createClient({ + url: process.env.REDIS_URL, + socket: { + tls: process.env.NODE_ENV === 'production', + rejectUnauthorized: false, + reconnectStrategy: REDIS_RECONNECT_INTERVAL, + }, + }), + ) { + super(); + this.redisClient = redisClient; + this.redisClient.on('error', (err) => { + console.error(err.stack); + }); + } + + get isReady() { + return this.redisClient.isReady; + } + + public async connect() { + await this.redisClient.connect(); + } + + public async disconnect() { + await this.redisClient.disconnect(); + } + + public async clearAllData() { + await this.redisClient.flushAll(); + } + // EventEmitter override methods for type safety + public override on( + event: T, + listener: ShuttleRepositoryEventListener, + ): this; + public override on(event: string | symbol, listener: (...args: any[]) => void): this { + return super.on(event, listener); + } + + public override once( + event: T, + listener: ShuttleRepositoryEventListener, + ): this; + public override once(event: string | symbol, listener: (...args: any[]) => void): this { + return super.once(event, listener); + } + + public override off( + event: T, + listener: ShuttleRepositoryEventListener, + ): this; + public override off(event: string | symbol, listener: (...args: any[]) => void): this { + return super.off(event, listener); + } + + public override addListener( + event: T, + listener: ShuttleRepositoryEventListener, + ): this; + public override addListener(event: string | symbol, listener: (...args: any[]) => void): this { + return super.addListener(event, listener); + } + + public override removeListener( + event: T, + listener: ShuttleRepositoryEventListener, + ): this; + public override removeListener(event: string | symbol, listener: (...args: any[]) => void): this { + return super.removeListener(event, listener); + } + + public override emit( + event: T, + payload: ShuttleRepositoryEventPayloads[T], + ): boolean; + public override emit(event: string | symbol, ...args: any[]): boolean { + return super.emit(event, ...args); + } + + // Helper methods for Redis key generation + private createStopKey = (stopId: string) => `shuttle:stop:${stopId}`; + private createRouteKey = (routeId: string) => `shuttle:route:${routeId}`; + private createShuttleKey = (shuttleId: string) => `shuttle:shuttle:${shuttleId}`; + private createEtaKey = (shuttleId: string, stopId: string) => `shuttle:eta:${shuttleId}:${stopId}`; + private createOrderedStopKey = (routeId: string, stopId: string) => `shuttle:orderedstop:${routeId}:${stopId}`; + + // Helper methods for converting entities to Redis hashes + private createRedisHashFromStop = (stop: IStop): Record => ({ + id: stop.id, + name: stop.name, + systemId: stop.systemId, + latitude: stop.coordinates.latitude.toString(), + longitude: stop.coordinates.longitude.toString(), + updatedTime: stop.updatedTime.toISOString(), + }); + + private createStopFromRedisData = (data: Record): IStop => ({ + id: data.id, + name: data.name, + systemId: data.systemId, + coordinates: { + latitude: parseFloat(data.latitude), + longitude: parseFloat(data.longitude), + }, + updatedTime: new Date(data.updatedTime), + }); + + private createRedisHashFromRoute = (route: IRoute): Record => ({ + id: route.id, + name: route.name, + color: route.color, + systemId: route.systemId, + polylineCoordinates: JSON.stringify(route.polylineCoordinates), + updatedTime: route.updatedTime.toISOString(), + }); + + private createRouteFromRedisData = (data: Record): IRoute => ({ + id: data.id, + name: data.name, + color: data.color, + systemId: data.systemId, + polylineCoordinates: JSON.parse(data.polylineCoordinates), + updatedTime: new Date(data.updatedTime), + }); + + private createRedisHashFromShuttle = (shuttle: IShuttle): Record => ({ + id: shuttle.id, + name: shuttle.name, + routeId: shuttle.routeId, + systemId: shuttle.systemId, + latitude: shuttle.coordinates.latitude.toString(), + longitude: shuttle.coordinates.longitude.toString(), + orientationInDegrees: shuttle.orientationInDegrees.toString(), + updatedTime: shuttle.updatedTime.toISOString(), + }); + + private createShuttleFromRedisData = (data: Record): IShuttle => ({ + id: data.id, + name: data.name, + routeId: data.routeId, + systemId: data.systemId, + coordinates: { + latitude: parseFloat(data.latitude), + longitude: parseFloat(data.longitude), + }, + orientationInDegrees: parseFloat(data.orientationInDegrees), + updatedTime: new Date(data.updatedTime), + }); + + private createRedisHashFromEta = (eta: IEta): Record => ({ + secondsRemaining: eta.secondsRemaining.toString(), + shuttleId: eta.shuttleId, + stopId: eta.stopId, + systemId: eta.systemId, + updatedTime: eta.updatedTime.toISOString(), + }); + + private createEtaFromRedisData = (data: Record): IEta => ({ + secondsRemaining: parseFloat(data.secondsRemaining), + shuttleId: data.shuttleId, + stopId: data.stopId, + systemId: data.systemId, + updatedTime: new Date(data.updatedTime), + }); + + private createRedisHashFromOrderedStop = (orderedStop: IOrderedStop): Record => { + const hash: Record = { + routeId: orderedStop.routeId, + stopId: orderedStop.stopId, + position: orderedStop.position.toString(), + systemId: orderedStop.systemId, + updatedTime: orderedStop.updatedTime.toISOString(), + }; + + if (orderedStop.nextStop) { + hash.nextStopRouteId = orderedStop.nextStop.routeId; + hash.nextStopStopId = orderedStop.nextStop.stopId; + } + + if (orderedStop.previousStop) { + hash.previousStopRouteId = orderedStop.previousStop.routeId; + hash.previousStopStopId = orderedStop.previousStop.stopId; + } + + return hash; + }; + + private createOrderedStopFromRedisData = (data: Record): IOrderedStop => { + const orderedStop: IOrderedStop = { + routeId: data.routeId, + stopId: data.stopId, + position: parseInt(data.position), + systemId: data.systemId, + updatedTime: new Date(data.updatedTime), + }; + + // Note: We only store the IDs of next/previous stops, not full objects + // to avoid circular references in Redis. These would need to be + // resolved separately if needed. + if (data.nextStopRouteId && data.nextStopStopId) { + orderedStop.nextStop = { + routeId: data.nextStopRouteId, + stopId: data.nextStopStopId, + position: 0, // placeholder + systemId: data.systemId, + updatedTime: new Date(), + }; + } + + if (data.previousStopRouteId && data.previousStopStopId) { + orderedStop.previousStop = { + routeId: data.previousStopRouteId, + stopId: data.previousStopStopId, + position: 0, // placeholder + systemId: data.systemId, + updatedTime: new Date(), + }; + } + + return orderedStop; + }; + + // Getter methods + public async getStops(): Promise { + const keys = await this.redisClient.keys('shuttle:stop:*'); + const stops: IStop[] = []; + + for (const key of keys) { + const data = await this.redisClient.hGetAll(key); + if (Object.keys(data).length > 0) { + stops.push(this.createStopFromRedisData(data)); + } + } + + return stops; + } + + public async getStopById(stopId: string): Promise { + const key = this.createStopKey(stopId); + const data = await this.redisClient.hGetAll(key); + + if (Object.keys(data).length === 0) { + return null; + } + + return this.createStopFromRedisData(data); + } + + public async getRoutes(): Promise { + const keys = await this.redisClient.keys('shuttle:route:*'); + const routes: IRoute[] = []; + + for (const key of keys) { + const data = await this.redisClient.hGetAll(key); + if (Object.keys(data).length > 0) { + routes.push(this.createRouteFromRedisData(data)); + } + } + + return routes; + } + + public async getRouteById(routeId: string): Promise { + const key = this.createRouteKey(routeId); + const data = await this.redisClient.hGetAll(key); + + if (Object.keys(data).length === 0) { + return null; + } + + return this.createRouteFromRedisData(data); + } + + public async getShuttles(): Promise { + const keys = await this.redisClient.keys('shuttle:shuttle:*'); + const shuttles: IShuttle[] = []; + + for (const key of keys) { + const data = await this.redisClient.hGetAll(key); + if (Object.keys(data).length > 0) { + shuttles.push(this.createShuttleFromRedisData(data)); + } + } + + return shuttles; + } + + public async getShuttlesByRouteId(routeId: string): Promise { + const allShuttles = await this.getShuttles(); + return allShuttles.filter(shuttle => shuttle.routeId === routeId); + } + + public async getShuttleById(shuttleId: string): Promise { + const key = this.createShuttleKey(shuttleId); + const data = await this.redisClient.hGetAll(key); + + if (Object.keys(data).length === 0) { + return null; + } + + return this.createShuttleFromRedisData(data); + } + + public async getEtasForShuttleId(shuttleId: string): Promise { + const keys = await this.redisClient.keys(`shuttle:eta:${shuttleId}:*`); + const etas: IEta[] = []; + + for (const key of keys) { + const data = await this.redisClient.hGetAll(key); + if (Object.keys(data).length > 0) { + etas.push(this.createEtaFromRedisData(data)); + } + } + + return etas; + } + + public async getEtasForStopId(stopId: string): Promise { + const keys = await this.redisClient.keys('shuttle:eta:*'); + const etas: IEta[] = []; + + for (const key of keys) { + const data = await this.redisClient.hGetAll(key); + if (Object.keys(data).length > 0 && data.stopId === stopId) { + etas.push(this.createEtaFromRedisData(data)); + } + } + + return etas; + } + + public async getEtaForShuttleAndStopId(shuttleId: string, stopId: string): Promise { + const key = this.createEtaKey(shuttleId, stopId); + const data = await this.redisClient.hGetAll(key); + + if (Object.keys(data).length === 0) { + return null; + } + + return this.createEtaFromRedisData(data); + } + + public async getOrderedStopByRouteAndStopId(routeId: string, stopId: string): Promise { + const key = this.createOrderedStopKey(routeId, stopId); + const data = await this.redisClient.hGetAll(key); + + if (Object.keys(data).length === 0) { + return null; + } + + return this.createOrderedStopFromRedisData(data); + } + + public async getOrderedStopsByStopId(stopId: string): Promise { + const keys = await this.redisClient.keys('shuttle:orderedstop:*'); + const orderedStops: IOrderedStop[] = []; + + for (const key of keys) { + const data = await this.redisClient.hGetAll(key); + if (Object.keys(data).length > 0 && data.stopId === stopId) { + orderedStops.push(this.createOrderedStopFromRedisData(data)); + } + } + + return orderedStops; + } + + public async getOrderedStopsByRouteId(routeId: string): Promise { + const keys = await this.redisClient.keys(`shuttle:orderedstop:${routeId}:*`); + const orderedStops: IOrderedStop[] = []; + + for (const key of keys) { + const data = await this.redisClient.hGetAll(key); + if (Object.keys(data).length > 0) { + orderedStops.push(this.createOrderedStopFromRedisData(data)); + } + } + + return orderedStops; + } + + // Setter/update methods + public async addOrUpdateRoute(route: IRoute): Promise { + const key = this.createRouteKey(route.id); + await this.redisClient.hSet(key, this.createRedisHashFromRoute(route)); + } + + public async addOrUpdateShuttle(shuttle: IShuttle): Promise { + const key = this.createShuttleKey(shuttle.id); + await this.redisClient.hSet(key, this.createRedisHashFromShuttle(shuttle)); + } + + public async addOrUpdateStop(stop: IStop): Promise { + const key = this.createStopKey(stop.id); + await this.redisClient.hSet(key, this.createRedisHashFromStop(stop)); + } + + public async addOrUpdateOrderedStop(orderedStop: IOrderedStop): Promise { + const key = this.createOrderedStopKey(orderedStop.routeId, orderedStop.stopId); + await this.redisClient.hSet(key, this.createRedisHashFromOrderedStop(orderedStop)); + } + + public async addOrUpdateEta(eta: IEta): Promise { + const key = this.createEtaKey(eta.shuttleId, eta.stopId); + await this.redisClient.hSet(key, this.createRedisHashFromEta(eta)); + this.emit(ShuttleRepositoryEvent.ETA_UPDATED, eta); + } + + // Remove methods + public async removeRouteIfExists(routeId: string): Promise { + const route = await this.getRouteById(routeId); + if (route) { + const key = this.createRouteKey(routeId); + await this.redisClient.del(key); + return route; + } + return null; + } + + public async removeShuttleIfExists(shuttleId: string): Promise { + const shuttle = await this.getShuttleById(shuttleId); + if (shuttle) { + const key = this.createShuttleKey(shuttleId); + await this.redisClient.del(key); + return shuttle; + } + return null; + } + + public async removeStopIfExists(stopId: string): Promise { + const stop = await this.getStopById(stopId); + if (stop) { + const key = this.createStopKey(stopId); + await this.redisClient.del(key); + return stop; + } + return null; + } + + public async removeOrderedStopIfExists(stopId: string, routeId: string): Promise { + const orderedStop = await this.getOrderedStopByRouteAndStopId(routeId, stopId); + if (orderedStop) { + const key = this.createOrderedStopKey(routeId, stopId); + await this.redisClient.del(key); + return orderedStop; + } + return null; + } + + public async removeEtaIfExists(shuttleId: string, stopId: string): Promise { + const eta = await this.getEtaForShuttleAndStopId(shuttleId, stopId); + if (eta) { + const key = this.createEtaKey(shuttleId, stopId); + await this.redisClient.del(key); + this.emit(ShuttleRepositoryEvent.ETA_REMOVED, eta); + return eta; + } + return null; + } + + // Clear methods + public async clearShuttleData(): Promise { + const keys = await this.redisClient.keys('shuttle:shuttle:*'); + if (keys.length > 0) { + await this.redisClient.del(keys); + } + } + + public async clearEtaData(): Promise { + const removedEtas = await this.getAllEtas(); + const keys = await this.redisClient.keys('shuttle:eta:*'); + if (keys.length > 0) { + await this.redisClient.del(keys); + } + this.emit(ShuttleRepositoryEvent.ETA_DATA_CLEARED, removedEtas); + } + + public async clearOrderedStopData(): Promise { + const keys = await this.redisClient.keys('shuttle:orderedstop:*'); + if (keys.length > 0) { + await this.redisClient.del(keys); + } + } + + public async clearRouteData(): Promise { + const keys = await this.redisClient.keys('shuttle:route:*'); + if (keys.length > 0) { + await this.redisClient.del(keys); + } + } + + public async clearStopData(): Promise { + const keys = await this.redisClient.keys('shuttle:stop:*'); + if (keys.length > 0) { + await this.redisClient.del(keys); + } + } + + // Helper method to get all ETAs for the clearEtaData event + private async getAllEtas(): Promise { + const keys = await this.redisClient.keys('shuttle:eta:*'); + const etas: IEta[] = []; + + for (const key of keys) { + const data = await this.redisClient.hGetAll(key); + if (Object.keys(data).length > 0) { + etas.push(this.createEtaFromRedisData(data)); + } + } + + return etas; + } +} diff --git a/src/repositories/shuttle/__tests__/UnoptimizedInMemoryShuttleRepositoryTests.test.ts b/src/repositories/shuttle/__tests__/ShuttleRepositorySharedTests.test.ts similarity index 89% rename from src/repositories/shuttle/__tests__/UnoptimizedInMemoryShuttleRepositoryTests.test.ts rename to src/repositories/shuttle/__tests__/ShuttleRepositorySharedTests.test.ts index c5c8d7c..f5c058f 100644 --- a/src/repositories/shuttle/__tests__/UnoptimizedInMemoryShuttleRepositoryTests.test.ts +++ b/src/repositories/shuttle/__tests__/ShuttleRepositorySharedTests.test.ts @@ -1,5 +1,7 @@ -import { beforeEach, describe, expect, jest, test } from "@jest/globals"; +import { afterEach, beforeEach, describe, expect, jest, test } from "@jest/globals"; import { UnoptimizedInMemoryShuttleRepository } from "../UnoptimizedInMemoryShuttleRepository"; +import { ShuttleGetterSetterRepository } from "../ShuttleGetterSetterRepository"; +import { RedisShuttleRepository } from "../RedisShuttleRepository"; import { ShuttleRepositoryEvent } from "../ShuttleGetterRepository"; import { generateMockEtas, @@ -9,16 +11,53 @@ import { generateMockStops, } from "../../../../testHelpers/mockDataGenerators"; -// For repositories created in the future, reuse core testing -// logic from here and differentiate setup (e.g. creating mocks) -// Do this by creating a function which takes a ShuttleGetterRepository -// or ShuttleGetterSetterRepository instance +interface RepositoryHolder { + name: string; + factory(): Promise; + teardown(): Promise; +} -describe("UnoptimizedInMemoryRepository", () => { - let repository: UnoptimizedInMemoryShuttleRepository; +class UnoptimizedInMemoryShuttleRepositoryHolder implements RepositoryHolder { + name = 'UnoptimizedInMemoryShuttleRepository'; + factory = async () => { + return new UnoptimizedInMemoryShuttleRepository(); + }; + teardown = async () => {}; +} - beforeEach(() => { - repository = new UnoptimizedInMemoryShuttleRepository(); +class RedisShuttleRepositoryHolder implements RepositoryHolder { + repo: RedisShuttleRepository | undefined; + + name = 'RedisShuttleRepository'; + factory = async () => { + this.repo = new RedisShuttleRepository(); + await this.repo.connect(); + return this.repo; + }; + teardown = async () => { + if (this.repo) { + await this.repo.clearAllData(); + await this.repo.disconnect(); + } + }; +} + +const repositoryImplementations = [ + new UnoptimizedInMemoryShuttleRepositoryHolder(), + new RedisShuttleRepositoryHolder(), +]; + +describe.each(repositoryImplementations)('$name', (holder) => { + let repository: ShuttleGetterSetterRepository; + + beforeEach(async () => { + repository = await holder.factory(); + jest.useRealTimers(); + }); + + afterEach(async () => { + await holder.teardown(); + jest.useRealTimers(); }); describe("getStops", () => { @@ -29,7 +68,8 @@ describe("UnoptimizedInMemoryRepository", () => { } const result = await repository.getStops(); - expect(result).toEqual(mockStops); + expect(result).toHaveLength(mockStops.length); + expect(result).toEqual(expect.arrayContaining(mockStops)); }); test("returns an empty list if there are no stops for the given system ID", async () => { @@ -62,7 +102,8 @@ describe("UnoptimizedInMemoryRepository", () => { } const result = await repository.getRoutes(); - expect(result).toEqual(mockRoutes); + expect(result).toHaveLength(mockRoutes.length); + expect(result).toEqual(expect.arrayContaining(mockRoutes)); }); test("returns an empty list if there are no routes for the system ID", async () => { @@ -86,6 +127,7 @@ describe("UnoptimizedInMemoryRepository", () => { expect(result).toBeNull(); }); }); + describe("getShuttles", () => { test("gets all shuttles for a specific system ID", async () => { const mockShuttles = generateMockShuttles(); @@ -94,7 +136,8 @@ describe("UnoptimizedInMemoryRepository", () => { } const result = await repository.getShuttles(); - expect(result).toEqual(mockShuttles); + expect(result).toHaveLength(mockShuttles.length); + expect(result).toEqual(expect.arrayContaining(mockShuttles)); }); test("returns an empty list if there are no shuttles for the system ID", async () => { @@ -145,7 +188,9 @@ describe("UnoptimizedInMemoryRepository", () => { } const result = await repository.getEtasForShuttleId("sh1"); - expect(result).toEqual(mockEtas.filter((eta) => eta.shuttleId === "sh1")); + const expected = mockEtas.filter((eta) => eta.shuttleId === "sh1"); + expect(result).toHaveLength(expected.length); + expect(result).toEqual(expect.arrayContaining(expected)); }); test("returns an empty list if there are no ETAs for the shuttle ID", async () => { @@ -240,7 +285,7 @@ describe("UnoptimizedInMemoryRepository", () => { repository.off(ShuttleRepositoryEvent.ETA_UPDATED, () => {}); await repository.addOrUpdateEta(mockEtas[0]); - + expect(mockListener).toHaveBeenCalledTimes(1); expect(mockListener).toHaveBeenCalledWith(mockEtas[0]); }); @@ -277,7 +322,9 @@ describe("UnoptimizedInMemoryRepository", () => { } const result = await repository.getOrderedStopsByStopId("st1"); - expect(result).toEqual(mockOrderedStops.filter((os) => os.stopId === "st1")); + const expected = mockOrderedStops.filter((os) => os.stopId === "st1"); + expect(result).toHaveLength(expected.length); + expect(result).toEqual(expect.arrayContaining(expected)); }); test("returns an empty list if there are no ordered stops for the stop ID", async () => { @@ -294,7 +341,9 @@ describe("UnoptimizedInMemoryRepository", () => { } const result = await repository.getOrderedStopsByRouteId("r1"); - expect(result).toEqual(mockOrderedStops.filter((os) => os.routeId === "r1")); + const expected = mockOrderedStops.filter((os) => os.routeId === "r1"); + expect(result).toHaveLength(expected.length); + expect(result).toEqual(expect.arrayContaining(expected)); }); test("returns an empty list if there are no ordered stops for the route ID", async () => { @@ -641,13 +690,15 @@ describe("UnoptimizedInMemoryRepository", () => { await repository.clearEtaData(); expect(listener).toHaveBeenCalledTimes(1); - expect(listener).toHaveBeenCalledWith(mockEtas); + const emittedEtas = listener.mock.calls[0][0]; + expect(emittedEtas).toHaveLength(mockEtas.length); + expect(emittedEtas).toEqual(expect.arrayContaining(mockEtas)); }); }); describe("clearOrderedStopData", () => { test("clears all ordered stops from the repository", async () => { - const mockOrderedStops = await generateMockOrderedStops(); + const mockOrderedStops = generateMockOrderedStops(); for (const system of mockOrderedStops) { await repository.addOrUpdateOrderedStop(system); }