import { TupleKey } from '../types/TupleKey'; import { Listener, NotificationEvent, NotificationLookupArguments, NotificationRepository, ScheduledNotification } from "./NotificationRepository"; import { createClient } from "redis"; export class RedisNotificationRepository implements NotificationRepository { private listeners: Listener[] = []; private readonly NOTIFICATION_KEY_PREFIX = 'notification:'; constructor( private redisClient = createClient({ url: process.env.REDIS_URL, socket: { tls: (process.env.REDIS_URL?.match(/rediss:/) != null), rejectUnauthorized: false, } }), ) { this.getAllNotificationsForShuttleAndStopId = this.getAllNotificationsForShuttleAndStopId.bind(this); this.getSecondsThresholdForNotificationIfExists = this.getSecondsThresholdForNotificationIfExists.bind(this); this.deleteNotificationIfExists = this.deleteNotificationIfExists.bind(this); this.addOrUpdateNotification = this.addOrUpdateNotification.bind(this); this.isNotificationScheduled = this.isNotificationScheduled.bind(this); this.subscribeToNotificationChanges = this.subscribeToNotificationChanges.bind(this); this.unsubscribeFromNotificationChanges = this.unsubscribeFromNotificationChanges.bind(this); } get isReady() { return this.redisClient.isReady; } public async connect() { await this.redisClient.connect(); } public async disconnect() { await this.redisClient.disconnect(); } public async clearAllData() { await this.redisClient.flushAll(); } private getNotificationKey(shuttleId: string, stopId: string): string { const tuple = new TupleKey(shuttleId, stopId); return `${this.NOTIFICATION_KEY_PREFIX}${tuple.toString()}`; } public async addOrUpdateNotification(notification: ScheduledNotification): Promise { const { shuttleId, stopId, deviceId, secondsThreshold } = notification; const key = this.getNotificationKey(shuttleId, stopId); await this.redisClient.hSet(key, deviceId, secondsThreshold.toString()); this.listeners.forEach((listener: Listener) => { const event: NotificationEvent = { event: 'addOrUpdate', notification }; listener(event); }); } public async deleteNotificationIfExists(lookupArguments: NotificationLookupArguments): Promise { const { shuttleId, stopId, deviceId } = lookupArguments; const key = this.getNotificationKey(shuttleId, stopId); const secondsThreshold = await this.redisClient.hGet(key, deviceId); if (secondsThreshold) { await this.redisClient.hDel(key, deviceId); // Check if hash is empty and delete it if so const remainingFields = await this.redisClient.hLen(key); if (remainingFields === 0) { await this.redisClient.del(key); } this.listeners.forEach((listener) => { const event: NotificationEvent = { event: 'delete', notification: { deviceId, shuttleId, stopId, secondsThreshold: parseInt(secondsThreshold) } }; listener(event); }); } } public async getAllNotificationsForShuttleAndStopId( shuttleId: string, stopId: string ): Promise { const key = this.getNotificationKey(shuttleId, stopId); const allNotifications = await this.redisClient.hGetAll(key); return Object.entries(allNotifications).map(([deviceId, secondsThreshold]) => ({ shuttleId, stopId, deviceId, secondsThreshold: parseInt(secondsThreshold) })); } public async getSecondsThresholdForNotificationIfExists( lookupArguments: NotificationLookupArguments ): Promise { const { shuttleId, stopId, deviceId } = lookupArguments; const key = this.getNotificationKey(shuttleId, stopId); const threshold = await this.redisClient.hGet(key, deviceId); return threshold ? parseInt(threshold) : null; } public async isNotificationScheduled( lookupArguments: NotificationLookupArguments ): Promise { const threshold = await this.getSecondsThresholdForNotificationIfExists(lookupArguments); return threshold !== null; } public subscribeToNotificationChanges(listener: Listener): void { const index = this.listeners.findIndex( (existingListener) => existingListener === listener ); if (index < 0) { this.listeners.push(listener); } } public unsubscribeFromNotificationChanges(listener: Listener): void { const index = this.listeners.findIndex( (existingListener) => existingListener === listener ); if (index >= 0) { this.listeners.splice(index, 1); } } }