Make BaseRedisRepository extend EventEmitter; make RedisShuttleRepository extend BaseRedisRepository

This commit is contained in:
2025-11-11 12:31:32 -08:00
parent b6b79e1345
commit 01c55d52ec
5 changed files with 12 additions and 86 deletions

View File

@@ -1,7 +1,8 @@
import { createClient, RedisClientType } from 'redis'; import { createClient, RedisClientType } from 'redis';
import { REDIS_RECONNECT_INTERVAL } from "../environment"; import { REDIS_RECONNECT_INTERVAL } from "../environment";
import { EventEmitter } from 'stream';
export abstract class BaseRedisRepository { export abstract class BaseRedisRepository extends EventEmitter {
protected redisClient; protected redisClient;
constructor( constructor(
@@ -14,6 +15,7 @@ export abstract class BaseRedisRepository {
}, },
}), }),
) { ) {
super();
this.redisClient = redisClient; this.redisClient = redisClient;
this.redisClient.on('error', (err) => { this.redisClient.on('error', (err) => {
console.error(err.stack); console.error(err.stack);

View File

@@ -9,7 +9,7 @@ import {
import { BaseRedisRepository } from "../BaseRedisRepository"; import { BaseRedisRepository } from "../BaseRedisRepository";
export class RedisNotificationRepository extends BaseRedisRepository implements NotificationRepository { export class RedisNotificationRepository extends BaseRedisRepository implements NotificationRepository {
private listeners: Listener[] = []; private notificationListeners: Listener[] = [];
private readonly NOTIFICATION_KEY_PREFIX = 'notification:'; private readonly NOTIFICATION_KEY_PREFIX = 'notification:';
private getNotificationKey = (shuttleId: string, stopId: string): string => { private getNotificationKey = (shuttleId: string, stopId: string): string => {
@@ -23,7 +23,7 @@ export class RedisNotificationRepository extends BaseRedisRepository implements
await this.redisClient.hSet(key, deviceId, secondsThreshold.toString()); await this.redisClient.hSet(key, deviceId, secondsThreshold.toString());
this.listeners.forEach((listener: Listener) => { this.notificationListeners.forEach((listener: Listener) => {
const event: NotificationEvent = { const event: NotificationEvent = {
event: 'addOrUpdate', event: 'addOrUpdate',
notification notification
@@ -46,7 +46,7 @@ export class RedisNotificationRepository extends BaseRedisRepository implements
await this.redisClient.del(key); await this.redisClient.del(key);
} }
this.listeners.forEach((listener) => { this.notificationListeners.forEach((listener) => {
const event: NotificationEvent = { const event: NotificationEvent = {
event: 'delete', event: 'delete',
notification: { notification: {
@@ -94,20 +94,20 @@ export class RedisNotificationRepository extends BaseRedisRepository implements
}; };
public subscribeToNotificationChanges = (listener: Listener): void => { public subscribeToNotificationChanges = (listener: Listener): void => {
const index = this.listeners.findIndex( const index = this.notificationListeners.findIndex(
(existingListener) => existingListener === listener (existingListener) => existingListener === listener
); );
if (index < 0) { if (index < 0) {
this.listeners.push(listener); this.notificationListeners.push(listener);
} }
}; };
public unsubscribeFromNotificationChanges = (listener: Listener): void => { public unsubscribeFromNotificationChanges = (listener: Listener): void => {
const index = this.listeners.findIndex( const index = this.notificationListeners.findIndex(
(existingListener) => existingListener === listener (existingListener) => existingListener === listener
); );
if (index >= 0) { if (index >= 0) {
this.listeners.splice(index, 1); this.notificationListeners.splice(index, 1);
} }
}; };
} }

View File

@@ -1,8 +1,5 @@
import EventEmitter from "node:events";
import { createClient } from 'redis';
import { ShuttleGetterSetterRepository } from "./ShuttleGetterSetterRepository"; import { ShuttleGetterSetterRepository } from "./ShuttleGetterSetterRepository";
import { IEta, IOrderedStop, IRoute, IShuttle, IStop, shuttleHasArrivedAtStop } from "../../entities/ShuttleRepositoryEntities"; import { IEta, IOrderedStop, IRoute, IShuttle, IStop, shuttleHasArrivedAtStop } from "../../entities/ShuttleRepositoryEntities";
import { REDIS_RECONNECT_INTERVAL } from "../../environment";
import { import {
ShuttleRepositoryEvent, ShuttleRepositoryEvent,
ShuttleRepositoryEventListener, ShuttleRepositoryEventListener,
@@ -12,27 +9,9 @@ import {
ShuttleTravelTimeDataIdentifier, ShuttleTravelTimeDataIdentifier,
ShuttleTravelTimeDateFilterArguments, ShuttleTravelTimeDateFilterArguments,
} from "./ShuttleGetterRepository"; } from "./ShuttleGetterRepository";
import { BaseRedisRepository } from "../BaseRedisRepository";
export class RedisShuttleRepository extends EventEmitter implements ShuttleGetterSetterRepository { export class RedisShuttleRepository extends BaseRedisRepository implements ShuttleGetterSetterRepository {
protected redisClient;
constructor(
redisClient = createClient({
url: process.env.REDIS_URL,
socket: {
tls: process.env.NODE_ENV === 'production',
rejectUnauthorized: false,
reconnectStrategy: REDIS_RECONNECT_INTERVAL,
},
}),
) {
super();
this.redisClient = redisClient;
this.redisClient.on('error', (err) => {
console.error(err.stack);
});
}
get isReady() { get isReady() {
return this.redisClient.isReady; return this.redisClient.isReady;
} }

View File

@@ -37,33 +37,6 @@ export class RedisExternalSourceETARepository extends BaseRedisRepository implem
removeAllListeners(eventName?: string | symbol | undefined): this { removeAllListeners(eventName?: string | symbol | undefined): this {
throw new Error("Method not implemented."); throw new Error("Method not implemented.");
} }
setMaxListeners(n: number): this {
throw new Error("Method not implemented.");
}
getMaxListeners(): number {
throw new Error("Method not implemented.");
}
listeners<K>(eventName: string | symbol): Function[] {
throw new Error("Method not implemented.");
}
rawListeners<K>(eventName: string | symbol): Function[] {
throw new Error("Method not implemented.");
}
emit<K>(eventName: string | symbol, ...args: any[]): boolean {
throw new Error("Method not implemented.");
}
listenerCount<K>(eventName: string | symbol, listener?: Function | undefined): number {
throw new Error("Method not implemented.");
}
prependListener<K>(eventName: string | symbol, listener: (...args: any[]) => void): this {
throw new Error("Method not implemented.");
}
prependOnceListener<K>(eventName: string | symbol, listener: (...args: any[]) => void): this {
throw new Error("Method not implemented.");
}
eventNames(): (string | symbol)[] {
throw new Error("Method not implemented.");
}
clearAllData(): Promise<void> { clearAllData(): Promise<void> {
throw new Error("Method not implemented."); throw new Error("Method not implemented.");
} }

View File

@@ -53,32 +53,4 @@ export class RedisSelfUpdatingETARepository extends BaseRedisRepository implemen
removeAllListeners(eventName?: string | symbol | undefined): this { removeAllListeners(eventName?: string | symbol | undefined): this {
throw new Error("Method not implemented."); throw new Error("Method not implemented.");
} }
setMaxListeners(n: number): this {
throw new Error("Method not implemented.");
}
getMaxListeners(): number {
throw new Error("Method not implemented.");
}
listeners<K>(eventName: string | symbol): Function[] {
throw new Error("Method not implemented.");
}
rawListeners<K>(eventName: string | symbol): Function[] {
throw new Error("Method not implemented.");
}
emit<K>(eventName: string | symbol, ...args: any[]): boolean {
throw new Error("Method not implemented.");
}
listenerCount<K>(eventName: string | symbol, listener?: Function | undefined): number {
throw new Error("Method not implemented.");
}
prependListener<K>(eventName: string | symbol, listener: (...args: any[]) => void): this {
throw new Error("Method not implemented.");
}
prependOnceListener<K>(eventName: string | symbol, listener: (...args: any[]) => void): this {
throw new Error("Method not implemented.");
}
eventNames(): (string | symbol)[] {
throw new Error("Method not implemented.");
}
} }