mirror of
https://github.com/brendan-ch/project-inter-server.git
synced 2026-04-19 08:50:29 +00:00
Merge pull request #36 from brendan-ch/feat/persistent-data-storage-for-notifications
This commit is contained in:
@@ -33,7 +33,7 @@ services:
|
|||||||
depends_on:
|
depends_on:
|
||||||
- redis-no-persistence
|
- redis-no-persistence
|
||||||
environment:
|
environment:
|
||||||
<<: *common-server-environment
|
REDIS_URL: redis://redis-no-persistence:6379
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
- .:/usr/src/app
|
- .:/usr/src/app
|
||||||
@@ -44,10 +44,11 @@ services:
|
|||||||
depends_on:
|
depends_on:
|
||||||
- redis-no-persistence
|
- redis-no-persistence
|
||||||
environment:
|
environment:
|
||||||
<<: *common-server-environment
|
REDIS_URL: redis://redis-no-persistence:6379
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
- .:/usr/src/app
|
- .:/usr/src/app
|
||||||
|
|
||||||
redis:
|
redis:
|
||||||
image: redis:alpine
|
image: redis:alpine
|
||||||
ports:
|
ports:
|
||||||
|
|||||||
11
package-lock.json
generated
11
package-lock.json
generated
@@ -20,6 +20,7 @@
|
|||||||
"@jest/globals": "^29.7.0",
|
"@jest/globals": "^29.7.0",
|
||||||
"@types/jsonwebtoken": "^9.0.8",
|
"@types/jsonwebtoken": "^9.0.8",
|
||||||
"@types/node": "^22.10.2",
|
"@types/node": "^22.10.2",
|
||||||
|
"@types/redis": "^4.0.11",
|
||||||
"jest": "^29.7.0",
|
"jest": "^29.7.0",
|
||||||
"ts-jest": "^29.2.5",
|
"ts-jest": "^29.2.5",
|
||||||
"typescript": "^5.7.2"
|
"typescript": "^5.7.2"
|
||||||
@@ -3679,6 +3680,16 @@
|
|||||||
"resolved": "https://registry.npmjs.org/@types/range-parser/-/range-parser-1.2.7.tgz",
|
"resolved": "https://registry.npmjs.org/@types/range-parser/-/range-parser-1.2.7.tgz",
|
||||||
"integrity": "sha512-hKormJbkJqzQGhziax5PItDUTMAM9uE2XXQmM37dyd4hVM+5aVl7oVxMVUiVQn2oCQFN/LKCZdvSM0pFRqbSmQ=="
|
"integrity": "sha512-hKormJbkJqzQGhziax5PItDUTMAM9uE2XXQmM37dyd4hVM+5aVl7oVxMVUiVQn2oCQFN/LKCZdvSM0pFRqbSmQ=="
|
||||||
},
|
},
|
||||||
|
"node_modules/@types/redis": {
|
||||||
|
"version": "4.0.11",
|
||||||
|
"resolved": "https://registry.npmjs.org/@types/redis/-/redis-4.0.11.tgz",
|
||||||
|
"integrity": "sha512-bI+gth8La8Wg/QCR1+V1fhrL9+LZUSWfcqpOj2Kc80ZQ4ffbdL173vQd5wovmoV9i071FU9oP2g6etLuEwb6Rg==",
|
||||||
|
"deprecated": "This is a stub types definition. redis provides its own type definitions, so you do not need this installed.",
|
||||||
|
"dev": true,
|
||||||
|
"dependencies": {
|
||||||
|
"redis": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/@types/send": {
|
"node_modules/@types/send": {
|
||||||
"version": "0.17.4",
|
"version": "0.17.4",
|
||||||
"resolved": "https://registry.npmjs.org/@types/send/-/send-0.17.4.tgz",
|
"resolved": "https://registry.npmjs.org/@types/send/-/send-0.17.4.tgz",
|
||||||
|
|||||||
@@ -9,7 +9,7 @@
|
|||||||
"start:dev": "npm run build:dev && node ./dist/index.js",
|
"start:dev": "npm run build:dev && node ./dist/index.js",
|
||||||
"start": "npm run build && node ./dist/index.js",
|
"start": "npm run build && node ./dist/index.js",
|
||||||
"generate": "graphql-codegen --config codegen.ts",
|
"generate": "graphql-codegen --config codegen.ts",
|
||||||
"test": "npm run build:dev && jest"
|
"test": "npm run build:dev && jest --runInBand"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@graphql-codegen/cli": "5.0.3",
|
"@graphql-codegen/cli": "5.0.3",
|
||||||
@@ -18,6 +18,7 @@
|
|||||||
"@jest/globals": "^29.7.0",
|
"@jest/globals": "^29.7.0",
|
||||||
"@types/jsonwebtoken": "^9.0.8",
|
"@types/jsonwebtoken": "^9.0.8",
|
||||||
"@types/node": "^22.10.2",
|
"@types/node": "^22.10.2",
|
||||||
|
"@types/redis": "^4.0.11",
|
||||||
"jest": "^29.7.0",
|
"jest": "^29.7.0",
|
||||||
"ts-jest": "^29.2.5",
|
"ts-jest": "^29.2.5",
|
||||||
"typescript": "^5.7.2"
|
"typescript": "^5.7.2"
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import { loadShuttleTestData } from "./loaders/loadShuttleTestData";
|
|||||||
import { AppleNotificationSender } from "./notifications/senders/AppleNotificationSender";
|
import { AppleNotificationSender } from "./notifications/senders/AppleNotificationSender";
|
||||||
import { InMemoryNotificationRepository } from "./repositories/InMemoryNotificationRepository";
|
import { InMemoryNotificationRepository } from "./repositories/InMemoryNotificationRepository";
|
||||||
import { NotificationRepository } from "./repositories/NotificationRepository";
|
import { NotificationRepository } from "./repositories/NotificationRepository";
|
||||||
|
import { RedisNotificationRepository } from "./repositories/RedisNotificationRepository";
|
||||||
|
|
||||||
const typeDefs = readFileSync("./schema.graphqls", "utf8");
|
const typeDefs = readFileSync("./schema.graphqls", "utf8");
|
||||||
|
|
||||||
@@ -24,6 +25,7 @@ async function main() {
|
|||||||
|
|
||||||
let notificationRepository: NotificationRepository;
|
let notificationRepository: NotificationRepository;
|
||||||
let notificationService: ETANotificationScheduler;
|
let notificationService: ETANotificationScheduler;
|
||||||
|
|
||||||
if (process.argv.length > 2 && process.argv[2] == "integration-testing") {
|
if (process.argv.length > 2 && process.argv[2] == "integration-testing") {
|
||||||
console.log("Using integration testing setup")
|
console.log("Using integration testing setup")
|
||||||
await loadShuttleTestData(shuttleRepository);
|
await loadShuttleTestData(shuttleRepository);
|
||||||
@@ -43,7 +45,10 @@ async function main() {
|
|||||||
);
|
);
|
||||||
await repositoryDataUpdater.start();
|
await repositoryDataUpdater.start();
|
||||||
|
|
||||||
notificationRepository = new InMemoryNotificationRepository();
|
const redisNotificationRepository = new RedisNotificationRepository();
|
||||||
|
await redisNotificationRepository.connect();
|
||||||
|
|
||||||
|
notificationRepository = redisNotificationRepository;
|
||||||
notificationService = new ETANotificationScheduler(
|
notificationService = new ETANotificationScheduler(
|
||||||
shuttleRepository,
|
shuttleRepository,
|
||||||
notificationRepository
|
notificationRepository
|
||||||
|
|||||||
143
src/repositories/RedisNotificationRepository.ts
Normal file
143
src/repositories/RedisNotificationRepository.ts
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
import { TupleKey } from '../types/TupleKey';
|
||||||
|
import {
|
||||||
|
Listener,
|
||||||
|
NotificationEvent,
|
||||||
|
NotificationLookupArguments,
|
||||||
|
NotificationRepository,
|
||||||
|
ScheduledNotification
|
||||||
|
} from "./NotificationRepository";
|
||||||
|
import { createClient } from "redis";
|
||||||
|
|
||||||
|
export class RedisNotificationRepository implements NotificationRepository {
|
||||||
|
private listeners: Listener[] = [];
|
||||||
|
private readonly NOTIFICATION_KEY_PREFIX = 'notification:';
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private redisClient = createClient({
|
||||||
|
url: process.env.REDIS_URL,
|
||||||
|
}),
|
||||||
|
) {
|
||||||
|
this.getAllNotificationsForShuttleAndStopId = this.getAllNotificationsForShuttleAndStopId.bind(this);
|
||||||
|
this.getSecondsThresholdForNotificationIfExists = this.getSecondsThresholdForNotificationIfExists.bind(this);
|
||||||
|
this.deleteNotificationIfExists = this.deleteNotificationIfExists.bind(this);
|
||||||
|
this.addOrUpdateNotification = this.addOrUpdateNotification.bind(this);
|
||||||
|
this.isNotificationScheduled = this.isNotificationScheduled.bind(this);
|
||||||
|
this.subscribeToNotificationChanges = this.subscribeToNotificationChanges.bind(this);
|
||||||
|
this.unsubscribeFromNotificationChanges = this.unsubscribeFromNotificationChanges.bind(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
get isReady() {
|
||||||
|
return this.redisClient.isReady;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async connect() {
|
||||||
|
await this.redisClient.connect();
|
||||||
|
}
|
||||||
|
|
||||||
|
public async disconnect() {
|
||||||
|
await this.redisClient.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
public async clearAllData() {
|
||||||
|
await this.redisClient.flushAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
private getNotificationKey(shuttleId: string, stopId: string): string {
|
||||||
|
const tuple = new TupleKey(shuttleId, stopId);
|
||||||
|
return `${this.NOTIFICATION_KEY_PREFIX}${tuple.toString()}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async addOrUpdateNotification(notification: ScheduledNotification): Promise<void> {
|
||||||
|
const { shuttleId, stopId, deviceId, secondsThreshold } = notification;
|
||||||
|
const key = this.getNotificationKey(shuttleId, stopId);
|
||||||
|
|
||||||
|
await this.redisClient.hSet(key, deviceId, secondsThreshold.toString());
|
||||||
|
|
||||||
|
this.listeners.forEach((listener: Listener) => {
|
||||||
|
const event: NotificationEvent = {
|
||||||
|
event: 'addOrUpdate',
|
||||||
|
notification
|
||||||
|
};
|
||||||
|
listener(event);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public async deleteNotificationIfExists(lookupArguments: NotificationLookupArguments): Promise<void> {
|
||||||
|
const { shuttleId, stopId, deviceId } = lookupArguments;
|
||||||
|
const key = this.getNotificationKey(shuttleId, stopId);
|
||||||
|
|
||||||
|
const secondsThreshold = await this.redisClient.hGet(key, deviceId);
|
||||||
|
if (secondsThreshold) {
|
||||||
|
await this.redisClient.hDel(key, deviceId);
|
||||||
|
|
||||||
|
// Check if hash is empty and delete it if so
|
||||||
|
const remainingFields = await this.redisClient.hLen(key);
|
||||||
|
if (remainingFields === 0) {
|
||||||
|
await this.redisClient.del(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.listeners.forEach((listener) => {
|
||||||
|
const event: NotificationEvent = {
|
||||||
|
event: 'delete',
|
||||||
|
notification: {
|
||||||
|
deviceId,
|
||||||
|
shuttleId,
|
||||||
|
stopId,
|
||||||
|
secondsThreshold: parseInt(secondsThreshold)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
listener(event);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getAllNotificationsForShuttleAndStopId(
|
||||||
|
shuttleId: string,
|
||||||
|
stopId: string
|
||||||
|
): Promise<ScheduledNotification[]> {
|
||||||
|
const key = this.getNotificationKey(shuttleId, stopId);
|
||||||
|
const allNotifications = await this.redisClient.hGetAll(key);
|
||||||
|
|
||||||
|
return Object.entries(allNotifications).map(([deviceId, secondsThreshold]) => ({
|
||||||
|
shuttleId,
|
||||||
|
stopId,
|
||||||
|
deviceId,
|
||||||
|
secondsThreshold: parseInt(secondsThreshold)
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getSecondsThresholdForNotificationIfExists(
|
||||||
|
lookupArguments: NotificationLookupArguments
|
||||||
|
): Promise<number | null> {
|
||||||
|
const { shuttleId, stopId, deviceId } = lookupArguments;
|
||||||
|
const key = this.getNotificationKey(shuttleId, stopId);
|
||||||
|
|
||||||
|
const threshold = await this.redisClient.hGet(key, deviceId);
|
||||||
|
return threshold ? parseInt(threshold) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async isNotificationScheduled(
|
||||||
|
lookupArguments: NotificationLookupArguments
|
||||||
|
): Promise<boolean> {
|
||||||
|
const threshold = await this.getSecondsThresholdForNotificationIfExists(lookupArguments);
|
||||||
|
return threshold !== null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public subscribeToNotificationChanges(listener: Listener): void {
|
||||||
|
const index = this.listeners.findIndex(
|
||||||
|
(existingListener) => existingListener === listener
|
||||||
|
);
|
||||||
|
if (index < 0) {
|
||||||
|
this.listeners.push(listener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public unsubscribeFromNotificationChanges(listener: Listener): void {
|
||||||
|
const index = this.listeners.findIndex(
|
||||||
|
(existingListener) => existingListener === listener
|
||||||
|
);
|
||||||
|
if (index >= 0) {
|
||||||
|
this.listeners.splice(index, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,12 +1,53 @@
|
|||||||
import { beforeEach, describe, expect, it, jest } from "@jest/globals";
|
import { afterEach, beforeEach, describe, expect, it, jest } from "@jest/globals";
|
||||||
import { InMemoryNotificationRepository } from "../../src/repositories/InMemoryNotificationRepository";
|
import { InMemoryNotificationRepository } from "../../src/repositories/InMemoryNotificationRepository";
|
||||||
import { NotificationEvent } from "../../src/repositories/NotificationRepository";
|
import { NotificationEvent, NotificationRepository } from "../../src/repositories/NotificationRepository";
|
||||||
|
import { RedisNotificationRepository } from "../../src/repositories/RedisNotificationRepository";
|
||||||
|
|
||||||
describe("InMemoryNotificationRepository", () => {
|
interface RepositoryHolder {
|
||||||
let repo: InMemoryNotificationRepository;
|
name: string;
|
||||||
|
factory(): Promise<NotificationRepository>,
|
||||||
|
teardown(): Promise<void>,
|
||||||
|
}
|
||||||
|
|
||||||
beforeEach(() => {
|
class InMemoryRepositoryHolder implements RepositoryHolder {
|
||||||
repo = new InMemoryNotificationRepository();
|
name = 'InMemoryNotificationRepository';
|
||||||
|
factory = async () => {
|
||||||
|
return new InMemoryNotificationRepository();
|
||||||
|
}
|
||||||
|
teardown = async () => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
class RedisNotificationRepositoryHolder implements RepositoryHolder {
|
||||||
|
repo: RedisNotificationRepository | undefined;
|
||||||
|
|
||||||
|
name = 'RedisNotificationRepository';
|
||||||
|
factory = async () => {
|
||||||
|
this.repo = new RedisNotificationRepository();
|
||||||
|
await this.repo.connect();
|
||||||
|
return this.repo;
|
||||||
|
}
|
||||||
|
teardown = async () => {
|
||||||
|
if (this.repo) {
|
||||||
|
await this.repo.clearAllData();
|
||||||
|
await this.repo.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const repositoryImplementations = [
|
||||||
|
new InMemoryRepositoryHolder(),
|
||||||
|
new RedisNotificationRepositoryHolder(),
|
||||||
|
]
|
||||||
|
|
||||||
|
describe.each(repositoryImplementations)('$name', (holder) => {
|
||||||
|
let repo: NotificationRepository;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
repo = await holder.factory();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
await holder.teardown();
|
||||||
})
|
})
|
||||||
|
|
||||||
const notification = {
|
const notification = {
|
||||||
@@ -140,5 +181,33 @@ describe("InMemoryNotificationRepository", () => {
|
|||||||
};
|
};
|
||||||
expect(mockCallback).toHaveBeenCalledWith(expectedEvent);
|
expect(mockCallback).toHaveBeenCalledWith(expectedEvent);
|
||||||
});
|
});
|
||||||
})
|
});
|
||||||
|
|
||||||
|
describe("unsubscribeFromNotificationChanges", () => {
|
||||||
|
it("stops calling subscribers when unsubscribed", async () => {
|
||||||
|
const mockCallback = jest.fn();
|
||||||
|
repo.subscribeToNotificationChanges(mockCallback);
|
||||||
|
|
||||||
|
await repo.addOrUpdateNotification(notification);
|
||||||
|
|
||||||
|
repo.unsubscribeFromNotificationChanges(mockCallback);
|
||||||
|
|
||||||
|
await repo.deleteNotificationIfExists(notification);
|
||||||
|
|
||||||
|
expect(mockCallback).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("isNotificationScheduled", () => {
|
||||||
|
it("returns true if the notification is in the repo", async () => {
|
||||||
|
await repo.addOrUpdateNotification(notification);
|
||||||
|
const result = await repo.isNotificationScheduled(notification);
|
||||||
|
expect(result).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns false if the notification isn't in the repo", async () => {
|
||||||
|
const result = await repo.isNotificationScheduled(notification);
|
||||||
|
expect(result).toBe(false);
|
||||||
|
})
|
||||||
|
});
|
||||||
});
|
});
|
||||||
Reference in New Issue
Block a user