Add RedisShuttleRepository and ShuttleRepositorySharedTests, like pattern for parking data

This commit is contained in:
2025-11-03 10:34:21 -08:00
parent 72f596821a
commit 4b4715cdb2
2 changed files with 599 additions and 18 deletions

View File

@@ -0,0 +1,530 @@
import EventEmitter from "node:events";
import { createClient } from 'redis';
import { ShuttleGetterSetterRepository } from "./ShuttleGetterSetterRepository";
import { IEta, IOrderedStop, IRoute, IShuttle, IStop } from "../../entities/ShuttleRepositoryEntities";
import { REDIS_RECONNECT_INTERVAL } from "../../environment";
import {
ShuttleRepositoryEvent,
ShuttleRepositoryEventListener,
ShuttleRepositoryEventName,
ShuttleRepositoryEventPayloads,
} from "./ShuttleGetterRepository";
export class RedisShuttleRepository extends EventEmitter implements ShuttleGetterSetterRepository {
protected redisClient;
constructor(
redisClient = createClient({
url: process.env.REDIS_URL,
socket: {
tls: process.env.NODE_ENV === 'production',
rejectUnauthorized: false,
reconnectStrategy: REDIS_RECONNECT_INTERVAL,
},
}),
) {
super();
this.redisClient = redisClient;
this.redisClient.on('error', (err) => {
console.error(err.stack);
});
}
get isReady() {
return this.redisClient.isReady;
}
public async connect() {
await this.redisClient.connect();
}
public async disconnect() {
await this.redisClient.disconnect();
}
public async clearAllData() {
await this.redisClient.flushAll();
}
// EventEmitter override methods for type safety
public override on<T extends ShuttleRepositoryEventName>(
event: T,
listener: ShuttleRepositoryEventListener<T>,
): this;
public override on(event: string | symbol, listener: (...args: any[]) => void): this {
return super.on(event, listener);
}
public override once<T extends ShuttleRepositoryEventName>(
event: T,
listener: ShuttleRepositoryEventListener<T>,
): this;
public override once(event: string | symbol, listener: (...args: any[]) => void): this {
return super.once(event, listener);
}
public override off<T extends ShuttleRepositoryEventName>(
event: T,
listener: ShuttleRepositoryEventListener<T>,
): this;
public override off(event: string | symbol, listener: (...args: any[]) => void): this {
return super.off(event, listener);
}
public override addListener<T extends ShuttleRepositoryEventName>(
event: T,
listener: ShuttleRepositoryEventListener<T>,
): this;
public override addListener(event: string | symbol, listener: (...args: any[]) => void): this {
return super.addListener(event, listener);
}
public override removeListener<T extends ShuttleRepositoryEventName>(
event: T,
listener: ShuttleRepositoryEventListener<T>,
): this;
public override removeListener(event: string | symbol, listener: (...args: any[]) => void): this {
return super.removeListener(event, listener);
}
public override emit<T extends ShuttleRepositoryEventName>(
event: T,
payload: ShuttleRepositoryEventPayloads[T],
): boolean;
public override emit(event: string | symbol, ...args: any[]): boolean {
return super.emit(event, ...args);
}
// Helper methods for Redis key generation
private createStopKey = (stopId: string) => `shuttle:stop:${stopId}`;
private createRouteKey = (routeId: string) => `shuttle:route:${routeId}`;
private createShuttleKey = (shuttleId: string) => `shuttle:shuttle:${shuttleId}`;
private createEtaKey = (shuttleId: string, stopId: string) => `shuttle:eta:${shuttleId}:${stopId}`;
private createOrderedStopKey = (routeId: string, stopId: string) => `shuttle:orderedstop:${routeId}:${stopId}`;
// Helper methods for converting entities to Redis hashes
private createRedisHashFromStop = (stop: IStop): Record<string, string> => ({
id: stop.id,
name: stop.name,
systemId: stop.systemId,
latitude: stop.coordinates.latitude.toString(),
longitude: stop.coordinates.longitude.toString(),
updatedTime: stop.updatedTime.toISOString(),
});
private createStopFromRedisData = (data: Record<string, string>): IStop => ({
id: data.id,
name: data.name,
systemId: data.systemId,
coordinates: {
latitude: parseFloat(data.latitude),
longitude: parseFloat(data.longitude),
},
updatedTime: new Date(data.updatedTime),
});
private createRedisHashFromRoute = (route: IRoute): Record<string, string> => ({
id: route.id,
name: route.name,
color: route.color,
systemId: route.systemId,
polylineCoordinates: JSON.stringify(route.polylineCoordinates),
updatedTime: route.updatedTime.toISOString(),
});
private createRouteFromRedisData = (data: Record<string, string>): IRoute => ({
id: data.id,
name: data.name,
color: data.color,
systemId: data.systemId,
polylineCoordinates: JSON.parse(data.polylineCoordinates),
updatedTime: new Date(data.updatedTime),
});
private createRedisHashFromShuttle = (shuttle: IShuttle): Record<string, string> => ({
id: shuttle.id,
name: shuttle.name,
routeId: shuttle.routeId,
systemId: shuttle.systemId,
latitude: shuttle.coordinates.latitude.toString(),
longitude: shuttle.coordinates.longitude.toString(),
orientationInDegrees: shuttle.orientationInDegrees.toString(),
updatedTime: shuttle.updatedTime.toISOString(),
});
private createShuttleFromRedisData = (data: Record<string, string>): IShuttle => ({
id: data.id,
name: data.name,
routeId: data.routeId,
systemId: data.systemId,
coordinates: {
latitude: parseFloat(data.latitude),
longitude: parseFloat(data.longitude),
},
orientationInDegrees: parseFloat(data.orientationInDegrees),
updatedTime: new Date(data.updatedTime),
});
private createRedisHashFromEta = (eta: IEta): Record<string, string> => ({
secondsRemaining: eta.secondsRemaining.toString(),
shuttleId: eta.shuttleId,
stopId: eta.stopId,
systemId: eta.systemId,
updatedTime: eta.updatedTime.toISOString(),
});
private createEtaFromRedisData = (data: Record<string, string>): IEta => ({
secondsRemaining: parseFloat(data.secondsRemaining),
shuttleId: data.shuttleId,
stopId: data.stopId,
systemId: data.systemId,
updatedTime: new Date(data.updatedTime),
});
private createRedisHashFromOrderedStop = (orderedStop: IOrderedStop): Record<string, string> => {
const hash: Record<string, string> = {
routeId: orderedStop.routeId,
stopId: orderedStop.stopId,
position: orderedStop.position.toString(),
systemId: orderedStop.systemId,
updatedTime: orderedStop.updatedTime.toISOString(),
};
if (orderedStop.nextStop) {
hash.nextStopRouteId = orderedStop.nextStop.routeId;
hash.nextStopStopId = orderedStop.nextStop.stopId;
}
if (orderedStop.previousStop) {
hash.previousStopRouteId = orderedStop.previousStop.routeId;
hash.previousStopStopId = orderedStop.previousStop.stopId;
}
return hash;
};
private createOrderedStopFromRedisData = (data: Record<string, string>): IOrderedStop => {
const orderedStop: IOrderedStop = {
routeId: data.routeId,
stopId: data.stopId,
position: parseInt(data.position),
systemId: data.systemId,
updatedTime: new Date(data.updatedTime),
};
// Note: We only store the IDs of next/previous stops, not full objects
// to avoid circular references in Redis. These would need to be
// resolved separately if needed.
if (data.nextStopRouteId && data.nextStopStopId) {
orderedStop.nextStop = {
routeId: data.nextStopRouteId,
stopId: data.nextStopStopId,
position: 0, // placeholder
systemId: data.systemId,
updatedTime: new Date(),
};
}
if (data.previousStopRouteId && data.previousStopStopId) {
orderedStop.previousStop = {
routeId: data.previousStopRouteId,
stopId: data.previousStopStopId,
position: 0, // placeholder
systemId: data.systemId,
updatedTime: new Date(),
};
}
return orderedStop;
};
// Getter methods
public async getStops(): Promise<IStop[]> {
const keys = await this.redisClient.keys('shuttle:stop:*');
const stops: IStop[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
stops.push(this.createStopFromRedisData(data));
}
}
return stops;
}
public async getStopById(stopId: string): Promise<IStop | null> {
const key = this.createStopKey(stopId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return null;
}
return this.createStopFromRedisData(data);
}
public async getRoutes(): Promise<IRoute[]> {
const keys = await this.redisClient.keys('shuttle:route:*');
const routes: IRoute[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
routes.push(this.createRouteFromRedisData(data));
}
}
return routes;
}
public async getRouteById(routeId: string): Promise<IRoute | null> {
const key = this.createRouteKey(routeId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return null;
}
return this.createRouteFromRedisData(data);
}
public async getShuttles(): Promise<IShuttle[]> {
const keys = await this.redisClient.keys('shuttle:shuttle:*');
const shuttles: IShuttle[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
shuttles.push(this.createShuttleFromRedisData(data));
}
}
return shuttles;
}
public async getShuttlesByRouteId(routeId: string): Promise<IShuttle[]> {
const allShuttles = await this.getShuttles();
return allShuttles.filter(shuttle => shuttle.routeId === routeId);
}
public async getShuttleById(shuttleId: string): Promise<IShuttle | null> {
const key = this.createShuttleKey(shuttleId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return null;
}
return this.createShuttleFromRedisData(data);
}
public async getEtasForShuttleId(shuttleId: string): Promise<IEta[]> {
const keys = await this.redisClient.keys(`shuttle:eta:${shuttleId}:*`);
const etas: IEta[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
etas.push(this.createEtaFromRedisData(data));
}
}
return etas;
}
public async getEtasForStopId(stopId: string): Promise<IEta[]> {
const keys = await this.redisClient.keys('shuttle:eta:*');
const etas: IEta[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0 && data.stopId === stopId) {
etas.push(this.createEtaFromRedisData(data));
}
}
return etas;
}
public async getEtaForShuttleAndStopId(shuttleId: string, stopId: string): Promise<IEta | null> {
const key = this.createEtaKey(shuttleId, stopId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return null;
}
return this.createEtaFromRedisData(data);
}
public async getOrderedStopByRouteAndStopId(routeId: string, stopId: string): Promise<IOrderedStop | null> {
const key = this.createOrderedStopKey(routeId, stopId);
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length === 0) {
return null;
}
return this.createOrderedStopFromRedisData(data);
}
public async getOrderedStopsByStopId(stopId: string): Promise<IOrderedStop[]> {
const keys = await this.redisClient.keys('shuttle:orderedstop:*');
const orderedStops: IOrderedStop[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0 && data.stopId === stopId) {
orderedStops.push(this.createOrderedStopFromRedisData(data));
}
}
return orderedStops;
}
public async getOrderedStopsByRouteId(routeId: string): Promise<IOrderedStop[]> {
const keys = await this.redisClient.keys(`shuttle:orderedstop:${routeId}:*`);
const orderedStops: IOrderedStop[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
orderedStops.push(this.createOrderedStopFromRedisData(data));
}
}
return orderedStops;
}
// Setter/update methods
public async addOrUpdateRoute(route: IRoute): Promise<void> {
const key = this.createRouteKey(route.id);
await this.redisClient.hSet(key, this.createRedisHashFromRoute(route));
}
public async addOrUpdateShuttle(shuttle: IShuttle): Promise<void> {
const key = this.createShuttleKey(shuttle.id);
await this.redisClient.hSet(key, this.createRedisHashFromShuttle(shuttle));
}
public async addOrUpdateStop(stop: IStop): Promise<void> {
const key = this.createStopKey(stop.id);
await this.redisClient.hSet(key, this.createRedisHashFromStop(stop));
}
public async addOrUpdateOrderedStop(orderedStop: IOrderedStop): Promise<void> {
const key = this.createOrderedStopKey(orderedStop.routeId, orderedStop.stopId);
await this.redisClient.hSet(key, this.createRedisHashFromOrderedStop(orderedStop));
}
public async addOrUpdateEta(eta: IEta): Promise<void> {
const key = this.createEtaKey(eta.shuttleId, eta.stopId);
await this.redisClient.hSet(key, this.createRedisHashFromEta(eta));
this.emit(ShuttleRepositoryEvent.ETA_UPDATED, eta);
}
// Remove methods
public async removeRouteIfExists(routeId: string): Promise<IRoute | null> {
const route = await this.getRouteById(routeId);
if (route) {
const key = this.createRouteKey(routeId);
await this.redisClient.del(key);
return route;
}
return null;
}
public async removeShuttleIfExists(shuttleId: string): Promise<IShuttle | null> {
const shuttle = await this.getShuttleById(shuttleId);
if (shuttle) {
const key = this.createShuttleKey(shuttleId);
await this.redisClient.del(key);
return shuttle;
}
return null;
}
public async removeStopIfExists(stopId: string): Promise<IStop | null> {
const stop = await this.getStopById(stopId);
if (stop) {
const key = this.createStopKey(stopId);
await this.redisClient.del(key);
return stop;
}
return null;
}
public async removeOrderedStopIfExists(stopId: string, routeId: string): Promise<IOrderedStop | null> {
const orderedStop = await this.getOrderedStopByRouteAndStopId(routeId, stopId);
if (orderedStop) {
const key = this.createOrderedStopKey(routeId, stopId);
await this.redisClient.del(key);
return orderedStop;
}
return null;
}
public async removeEtaIfExists(shuttleId: string, stopId: string): Promise<IEta | null> {
const eta = await this.getEtaForShuttleAndStopId(shuttleId, stopId);
if (eta) {
const key = this.createEtaKey(shuttleId, stopId);
await this.redisClient.del(key);
this.emit(ShuttleRepositoryEvent.ETA_REMOVED, eta);
return eta;
}
return null;
}
// Clear methods
public async clearShuttleData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:shuttle:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
}
public async clearEtaData(): Promise<void> {
const removedEtas = await this.getAllEtas();
const keys = await this.redisClient.keys('shuttle:eta:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
this.emit(ShuttleRepositoryEvent.ETA_DATA_CLEARED, removedEtas);
}
public async clearOrderedStopData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:orderedstop:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
}
public async clearRouteData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:route:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
}
public async clearStopData(): Promise<void> {
const keys = await this.redisClient.keys('shuttle:stop:*');
if (keys.length > 0) {
await this.redisClient.del(keys);
}
}
// Helper method to get all ETAs for the clearEtaData event
private async getAllEtas(): Promise<IEta[]> {
const keys = await this.redisClient.keys('shuttle:eta:*');
const etas: IEta[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
etas.push(this.createEtaFromRedisData(data));
}
}
return etas;
}
}

View File

@@ -1,5 +1,7 @@
import { beforeEach, describe, expect, jest, test } from "@jest/globals";
import { afterEach, beforeEach, describe, expect, jest, test } from "@jest/globals";
import { UnoptimizedInMemoryShuttleRepository } from "../UnoptimizedInMemoryShuttleRepository";
import { ShuttleGetterSetterRepository } from "../ShuttleGetterSetterRepository";
import { RedisShuttleRepository } from "../RedisShuttleRepository";
import { ShuttleRepositoryEvent } from "../ShuttleGetterRepository";
import {
generateMockEtas,
@@ -9,16 +11,53 @@ import {
generateMockStops,
} from "../../../../testHelpers/mockDataGenerators";
// For repositories created in the future, reuse core testing
// logic from here and differentiate setup (e.g. creating mocks)
// Do this by creating a function which takes a ShuttleGetterRepository
// or ShuttleGetterSetterRepository instance
interface RepositoryHolder {
name: string;
factory(): Promise<ShuttleGetterSetterRepository>;
teardown(): Promise<void>;
}
describe("UnoptimizedInMemoryRepository", () => {
let repository: UnoptimizedInMemoryShuttleRepository;
class UnoptimizedInMemoryShuttleRepositoryHolder implements RepositoryHolder {
name = 'UnoptimizedInMemoryShuttleRepository';
factory = async () => {
return new UnoptimizedInMemoryShuttleRepository();
};
teardown = async () => {};
}
beforeEach(() => {
repository = new UnoptimizedInMemoryShuttleRepository();
class RedisShuttleRepositoryHolder implements RepositoryHolder {
repo: RedisShuttleRepository | undefined;
name = 'RedisShuttleRepository';
factory = async () => {
this.repo = new RedisShuttleRepository();
await this.repo.connect();
return this.repo;
};
teardown = async () => {
if (this.repo) {
await this.repo.clearAllData();
await this.repo.disconnect();
}
};
}
const repositoryImplementations = [
new UnoptimizedInMemoryShuttleRepositoryHolder(),
new RedisShuttleRepositoryHolder(),
];
describe.each(repositoryImplementations)('$name', (holder) => {
let repository: ShuttleGetterSetterRepository;
beforeEach(async () => {
repository = await holder.factory();
jest.useRealTimers();
});
afterEach(async () => {
await holder.teardown();
jest.useRealTimers();
});
describe("getStops", () => {
@@ -29,7 +68,8 @@ describe("UnoptimizedInMemoryRepository", () => {
}
const result = await repository.getStops();
expect(result).toEqual(mockStops);
expect(result).toHaveLength(mockStops.length);
expect(result).toEqual(expect.arrayContaining(mockStops));
});
test("returns an empty list if there are no stops for the given system ID", async () => {
@@ -62,7 +102,8 @@ describe("UnoptimizedInMemoryRepository", () => {
}
const result = await repository.getRoutes();
expect(result).toEqual(mockRoutes);
expect(result).toHaveLength(mockRoutes.length);
expect(result).toEqual(expect.arrayContaining(mockRoutes));
});
test("returns an empty list if there are no routes for the system ID", async () => {
@@ -86,6 +127,7 @@ describe("UnoptimizedInMemoryRepository", () => {
expect(result).toBeNull();
});
});
describe("getShuttles", () => {
test("gets all shuttles for a specific system ID", async () => {
const mockShuttles = generateMockShuttles();
@@ -94,7 +136,8 @@ describe("UnoptimizedInMemoryRepository", () => {
}
const result = await repository.getShuttles();
expect(result).toEqual(mockShuttles);
expect(result).toHaveLength(mockShuttles.length);
expect(result).toEqual(expect.arrayContaining(mockShuttles));
});
test("returns an empty list if there are no shuttles for the system ID", async () => {
@@ -145,7 +188,9 @@ describe("UnoptimizedInMemoryRepository", () => {
}
const result = await repository.getEtasForShuttleId("sh1");
expect(result).toEqual(mockEtas.filter((eta) => eta.shuttleId === "sh1"));
const expected = mockEtas.filter((eta) => eta.shuttleId === "sh1");
expect(result).toHaveLength(expected.length);
expect(result).toEqual(expect.arrayContaining(expected));
});
test("returns an empty list if there are no ETAs for the shuttle ID", async () => {
@@ -240,7 +285,7 @@ describe("UnoptimizedInMemoryRepository", () => {
repository.off(ShuttleRepositoryEvent.ETA_UPDATED, () => {});
await repository.addOrUpdateEta(mockEtas[0]);
expect(mockListener).toHaveBeenCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith(mockEtas[0]);
});
@@ -277,7 +322,9 @@ describe("UnoptimizedInMemoryRepository", () => {
}
const result = await repository.getOrderedStopsByStopId("st1");
expect(result).toEqual(mockOrderedStops.filter((os) => os.stopId === "st1"));
const expected = mockOrderedStops.filter((os) => os.stopId === "st1");
expect(result).toHaveLength(expected.length);
expect(result).toEqual(expect.arrayContaining(expected));
});
test("returns an empty list if there are no ordered stops for the stop ID", async () => {
@@ -294,7 +341,9 @@ describe("UnoptimizedInMemoryRepository", () => {
}
const result = await repository.getOrderedStopsByRouteId("r1");
expect(result).toEqual(mockOrderedStops.filter((os) => os.routeId === "r1"));
const expected = mockOrderedStops.filter((os) => os.routeId === "r1");
expect(result).toHaveLength(expected.length);
expect(result).toEqual(expect.arrayContaining(expected));
});
test("returns an empty list if there are no ordered stops for the route ID", async () => {
@@ -641,13 +690,15 @@ describe("UnoptimizedInMemoryRepository", () => {
await repository.clearEtaData();
expect(listener).toHaveBeenCalledTimes(1);
expect(listener).toHaveBeenCalledWith(mockEtas);
const emittedEtas = listener.mock.calls[0][0];
expect(emittedEtas).toHaveLength(mockEtas.length);
expect(emittedEtas).toEqual(expect.arrayContaining(mockEtas));
});
});
describe("clearOrderedStopData", () => {
test("clears all ordered stops from the repository", async () => {
const mockOrderedStops = await generateMockOrderedStops();
const mockOrderedStops = generateMockOrderedStops();
for (const system of mockOrderedStops) {
await repository.addOrUpdateOrderedStop(system);
}