diff --git a/docker-compose.yml b/docker-compose.yml index 335f941..e10842b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -33,7 +33,7 @@ services: depends_on: - redis-no-persistence environment: - <<: *common-server-environment + REDIS_URL: redis://redis-no-persistence:6379 volumes: - .:/usr/src/app @@ -44,10 +44,11 @@ services: depends_on: - redis-no-persistence environment: - <<: *common-server-environment + REDIS_URL: redis://redis-no-persistence:6379 volumes: - .:/usr/src/app + redis: image: redis:alpine ports: diff --git a/package-lock.json b/package-lock.json index 2bbd249..32b1839 100644 --- a/package-lock.json +++ b/package-lock.json @@ -20,6 +20,7 @@ "@jest/globals": "^29.7.0", "@types/jsonwebtoken": "^9.0.8", "@types/node": "^22.10.2", + "@types/redis": "^4.0.11", "jest": "^29.7.0", "ts-jest": "^29.2.5", "typescript": "^5.7.2" @@ -3679,6 +3680,16 @@ "resolved": "https://registry.npmjs.org/@types/range-parser/-/range-parser-1.2.7.tgz", "integrity": "sha512-hKormJbkJqzQGhziax5PItDUTMAM9uE2XXQmM37dyd4hVM+5aVl7oVxMVUiVQn2oCQFN/LKCZdvSM0pFRqbSmQ==" }, + "node_modules/@types/redis": { + "version": "4.0.11", + "resolved": "https://registry.npmjs.org/@types/redis/-/redis-4.0.11.tgz", + "integrity": "sha512-bI+gth8La8Wg/QCR1+V1fhrL9+LZUSWfcqpOj2Kc80ZQ4ffbdL173vQd5wovmoV9i071FU9oP2g6etLuEwb6Rg==", + "deprecated": "This is a stub types definition. redis provides its own type definitions, so you do not need this installed.", + "dev": true, + "dependencies": { + "redis": "*" + } + }, "node_modules/@types/send": { "version": "0.17.4", "resolved": "https://registry.npmjs.org/@types/send/-/send-0.17.4.tgz", diff --git a/package.json b/package.json index 7be6333..c582671 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "start:dev": "npm run build:dev && node ./dist/index.js", "start": "npm run build && node ./dist/index.js", "generate": "graphql-codegen --config codegen.ts", - "test": "npm run build:dev && jest" + "test": "npm run build:dev && jest --runInBand" }, "devDependencies": { "@graphql-codegen/cli": "5.0.3", @@ -18,6 +18,7 @@ "@jest/globals": "^29.7.0", "@types/jsonwebtoken": "^9.0.8", "@types/node": "^22.10.2", + "@types/redis": "^4.0.11", "jest": "^29.7.0", "ts-jest": "^29.2.5", "typescript": "^5.7.2" diff --git a/src/index.ts b/src/index.ts index d1a1d14..de49b21 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,6 +10,7 @@ import { loadShuttleTestData } from "./loaders/loadShuttleTestData"; import { AppleNotificationSender } from "./notifications/senders/AppleNotificationSender"; import { InMemoryNotificationRepository } from "./repositories/InMemoryNotificationRepository"; import { NotificationRepository } from "./repositories/NotificationRepository"; +import { RedisNotificationRepository } from "./repositories/RedisNotificationRepository"; const typeDefs = readFileSync("./schema.graphqls", "utf8"); @@ -24,6 +25,7 @@ async function main() { let notificationRepository: NotificationRepository; let notificationService: ETANotificationScheduler; + if (process.argv.length > 2 && process.argv[2] == "integration-testing") { console.log("Using integration testing setup") await loadShuttleTestData(shuttleRepository); @@ -43,7 +45,10 @@ async function main() { ); await repositoryDataUpdater.start(); - notificationRepository = new InMemoryNotificationRepository(); + const redisNotificationRepository = new RedisNotificationRepository(); + await redisNotificationRepository.connect(); + + notificationRepository = redisNotificationRepository; notificationService = new ETANotificationScheduler( shuttleRepository, notificationRepository diff --git a/src/repositories/RedisNotificationRepository.ts b/src/repositories/RedisNotificationRepository.ts new file mode 100644 index 0000000..1690c50 --- /dev/null +++ b/src/repositories/RedisNotificationRepository.ts @@ -0,0 +1,143 @@ +import { TupleKey } from '../types/TupleKey'; +import { + Listener, + NotificationEvent, + NotificationLookupArguments, + NotificationRepository, + ScheduledNotification +} from "./NotificationRepository"; +import { createClient } from "redis"; + +export class RedisNotificationRepository implements NotificationRepository { + private listeners: Listener[] = []; + private readonly NOTIFICATION_KEY_PREFIX = 'notification:'; + + constructor( + private redisClient = createClient({ + url: process.env.REDIS_URL, + }), + ) { + this.getAllNotificationsForShuttleAndStopId = this.getAllNotificationsForShuttleAndStopId.bind(this); + this.getSecondsThresholdForNotificationIfExists = this.getSecondsThresholdForNotificationIfExists.bind(this); + this.deleteNotificationIfExists = this.deleteNotificationIfExists.bind(this); + this.addOrUpdateNotification = this.addOrUpdateNotification.bind(this); + this.isNotificationScheduled = this.isNotificationScheduled.bind(this); + this.subscribeToNotificationChanges = this.subscribeToNotificationChanges.bind(this); + this.unsubscribeFromNotificationChanges = this.unsubscribeFromNotificationChanges.bind(this); + } + + get isReady() { + return this.redisClient.isReady; + } + + public async connect() { + await this.redisClient.connect(); + } + + public async disconnect() { + await this.redisClient.disconnect(); + } + + public async clearAllData() { + await this.redisClient.flushAll(); + } + + private getNotificationKey(shuttleId: string, stopId: string): string { + const tuple = new TupleKey(shuttleId, stopId); + return `${this.NOTIFICATION_KEY_PREFIX}${tuple.toString()}`; + } + + public async addOrUpdateNotification(notification: ScheduledNotification): Promise { + const { shuttleId, stopId, deviceId, secondsThreshold } = notification; + const key = this.getNotificationKey(shuttleId, stopId); + + await this.redisClient.hSet(key, deviceId, secondsThreshold.toString()); + + this.listeners.forEach((listener: Listener) => { + const event: NotificationEvent = { + event: 'addOrUpdate', + notification + }; + listener(event); + }); + } + + public async deleteNotificationIfExists(lookupArguments: NotificationLookupArguments): Promise { + const { shuttleId, stopId, deviceId } = lookupArguments; + const key = this.getNotificationKey(shuttleId, stopId); + + const secondsThreshold = await this.redisClient.hGet(key, deviceId); + if (secondsThreshold) { + await this.redisClient.hDel(key, deviceId); + + // Check if hash is empty and delete it if so + const remainingFields = await this.redisClient.hLen(key); + if (remainingFields === 0) { + await this.redisClient.del(key); + } + + this.listeners.forEach((listener) => { + const event: NotificationEvent = { + event: 'delete', + notification: { + deviceId, + shuttleId, + stopId, + secondsThreshold: parseInt(secondsThreshold) + } + }; + listener(event); + }); + } + } + + public async getAllNotificationsForShuttleAndStopId( + shuttleId: string, + stopId: string + ): Promise { + const key = this.getNotificationKey(shuttleId, stopId); + const allNotifications = await this.redisClient.hGetAll(key); + + return Object.entries(allNotifications).map(([deviceId, secondsThreshold]) => ({ + shuttleId, + stopId, + deviceId, + secondsThreshold: parseInt(secondsThreshold) + })); + } + + public async getSecondsThresholdForNotificationIfExists( + lookupArguments: NotificationLookupArguments + ): Promise { + const { shuttleId, stopId, deviceId } = lookupArguments; + const key = this.getNotificationKey(shuttleId, stopId); + + const threshold = await this.redisClient.hGet(key, deviceId); + return threshold ? parseInt(threshold) : null; + } + + public async isNotificationScheduled( + lookupArguments: NotificationLookupArguments + ): Promise { + const threshold = await this.getSecondsThresholdForNotificationIfExists(lookupArguments); + return threshold !== null; + } + + public subscribeToNotificationChanges(listener: Listener): void { + const index = this.listeners.findIndex( + (existingListener) => existingListener === listener + ); + if (index < 0) { + this.listeners.push(listener); + } + } + + public unsubscribeFromNotificationChanges(listener: Listener): void { + const index = this.listeners.findIndex( + (existingListener) => existingListener === listener + ); + if (index >= 0) { + this.listeners.splice(index, 1); + } + } +} diff --git a/test/repositories/InMemoryNotificationRepositoryTests.test.ts b/test/repositories/NotificationRepositorySharedTests.test.ts similarity index 64% rename from test/repositories/InMemoryNotificationRepositoryTests.test.ts rename to test/repositories/NotificationRepositorySharedTests.test.ts index 3419d96..d8df44e 100644 --- a/test/repositories/InMemoryNotificationRepositoryTests.test.ts +++ b/test/repositories/NotificationRepositorySharedTests.test.ts @@ -1,12 +1,53 @@ -import { beforeEach, describe, expect, it, jest } from "@jest/globals"; +import { afterEach, beforeEach, describe, expect, it, jest } from "@jest/globals"; import { InMemoryNotificationRepository } from "../../src/repositories/InMemoryNotificationRepository"; -import { NotificationEvent } from "../../src/repositories/NotificationRepository"; +import { NotificationEvent, NotificationRepository } from "../../src/repositories/NotificationRepository"; +import { RedisNotificationRepository } from "../../src/repositories/RedisNotificationRepository"; -describe("InMemoryNotificationRepository", () => { - let repo: InMemoryNotificationRepository; +interface RepositoryHolder { + name: string; + factory(): Promise, + teardown(): Promise, +} - beforeEach(() => { - repo = new InMemoryNotificationRepository(); +class InMemoryRepositoryHolder implements RepositoryHolder { + name = 'InMemoryNotificationRepository'; + factory = async () => { + return new InMemoryNotificationRepository(); + } + teardown = async () => {} +} + +class RedisNotificationRepositoryHolder implements RepositoryHolder { + repo: RedisNotificationRepository | undefined; + + name = 'RedisNotificationRepository'; + factory = async () => { + this.repo = new RedisNotificationRepository(); + await this.repo.connect(); + return this.repo; + } + teardown = async () => { + if (this.repo) { + await this.repo.clearAllData(); + await this.repo.disconnect(); + } + } +} + +const repositoryImplementations = [ + new InMemoryRepositoryHolder(), + new RedisNotificationRepositoryHolder(), +] + +describe.each(repositoryImplementations)('$name', (holder) => { + let repo: NotificationRepository; + + beforeEach(async () => { + repo = await holder.factory(); + }); + + afterEach(async () => { + await holder.teardown(); }) const notification = { @@ -140,5 +181,33 @@ describe("InMemoryNotificationRepository", () => { }; expect(mockCallback).toHaveBeenCalledWith(expectedEvent); }); - }) + }); + + describe("unsubscribeFromNotificationChanges", () => { + it("stops calling subscribers when unsubscribed", async () => { + const mockCallback = jest.fn(); + repo.subscribeToNotificationChanges(mockCallback); + + await repo.addOrUpdateNotification(notification); + + repo.unsubscribeFromNotificationChanges(mockCallback); + + await repo.deleteNotificationIfExists(notification); + + expect(mockCallback).toHaveBeenCalledTimes(1); + }); + }); + + describe("isNotificationScheduled", () => { + it("returns true if the notification is in the repo", async () => { + await repo.addOrUpdateNotification(notification); + const result = await repo.isNotificationScheduled(notification); + expect(result).toBe(true); + }); + + it("returns false if the notification isn't in the repo", async () => { + const result = await repo.isNotificationScheduled(notification); + expect(result).toBe(false); + }) + }); });