import { ParkingGetterSetterRepository } from "./ParkingGetterSetterRepository"; import { IParkingStructure } from "../../entities/ParkingRepositoryEntities"; import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository"; import { BaseRedisRepository } from "../BaseRedisRepository"; import { PARKING_LOGGING_INTERVAL_MS } from "./ParkingRepositoryConstants"; export type ParkingStructureID = string; export class RedisParkingRepository extends BaseRedisRepository implements ParkingGetterSetterRepository { private dataLastAdded: Map = new Map(); private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS; addOrUpdateParkingStructure = async (structure: IParkingStructure): Promise => { const keys = this.createRedisKeys(structure.id); await this.redisClient.hSet(keys.structure, this.createRedisHashFromStructure(structure)); await this.addHistoricalDataForStructure(structure); }; private addHistoricalDataForStructure = async (structure: IParkingStructure): Promise => { const now = Date.now(); const lastAdded = this.dataLastAdded.get(structure.id); if (this.shouldLogHistoricalData(lastAdded, now)) { const keys = this.createRedisKeys(structure.id); await this.addTimeSeriesDataPoint(keys.timeSeries, now, structure.spotsAvailable, structure.id); this.dataLastAdded.set(structure.id, new Date(now)); } }; clearParkingStructureData = async (): Promise => { const structureKeys = await this.redisClient.keys('parking:structure:*'); const timeSeriesKeys = await this.redisClient.keys('parking:timeseries:*'); const allKeys = [...structureKeys, ...timeSeriesKeys]; if (allKeys.length > 0) { await this.redisClient.del(allKeys); } this.dataLastAdded.clear(); }; getParkingStructureById = async (id: string): Promise => { const keys = this.createRedisKeys(id); const data = await this.redisClient.hGetAll(keys.structure); if (Object.keys(data).length === 0) { return null; } return this.createStructureFromRedisData(data); }; getParkingStructures = async (): Promise => { const keys = await this.redisClient.keys('parking:structure:*'); const structures: IParkingStructure[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { structures.push(this.createStructureFromRedisData(data)); } } return structures; }; removeParkingStructureIfExists = async (id: string): Promise => { const structure = await this.getParkingStructureById(id); if (structure) { const keys = this.createRedisKeys(id); await this.redisClient.del([keys.structure, keys.timeSeries]); this.dataLastAdded.delete(id); return structure; } return null; }; getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: ParkingStructureCountOptions): Promise => { return this.calculateAveragesFromRecords(id, options); }; private createRedisKeys = (structureId: string) => ({ structure: `parking:structure:${structureId}`, timeSeries: `parking:timeseries:${structureId}` }); private createRedisHashFromStructure = (structure: IParkingStructure): Record => ({ id: structure.id, name: structure.name, address: structure.address, capacity: structure.capacity.toString(), spotsAvailable: structure.spotsAvailable.toString(), latitude: structure.coordinates.latitude.toString(), longitude: structure.coordinates.longitude.toString(), updatedTime: structure.updatedTime.toISOString() }); private createStructureFromRedisData = (data: Record): IParkingStructure => ({ id: data.id, name: data.name, address: data.address, capacity: parseInt(data.capacity), spotsAvailable: parseInt(data.spotsAvailable), coordinates: { latitude: parseFloat(data.latitude), longitude: parseFloat(data.longitude) }, updatedTime: new Date(data.updatedTime) }); private shouldLogHistoricalData = (lastAdded: Date | undefined, currentTime: number): boolean => { return !lastAdded || (currentTime - lastAdded.getTime()) >= this.loggingIntervalMs; }; private addTimeSeriesDataPoint = async (timeSeriesKey: string, timestamp: number, value: number, structureId: string): Promise => { try { await this.redisClient.sendCommand([ 'TS.ADD', timeSeriesKey, timestamp.toString(), value.toString(), 'LABELS', 'structureId', structureId ]); } catch (error) { await this.createTimeSeriesAndAddDataPoint(timeSeriesKey, timestamp, value, structureId); } }; private createTimeSeriesAndAddDataPoint = async (timeSeriesKey: string, timestamp: number, value: number, structureId: string): Promise => { try { await this.redisClient.sendCommand([ 'TS.CREATE', timeSeriesKey, 'RETENTION', '2678400000', // one month 'LABELS', 'structureId', structureId ]); await this.redisClient.sendCommand([ 'TS.ADD', timeSeriesKey, timestamp.toString(), value.toString() ]); } catch (createError) { await this.redisClient.sendCommand([ 'TS.ADD', timeSeriesKey, timestamp.toString(), value.toString() ]); } }; private calculateAveragesFromRecords = async ( id: string, options: ParkingStructureCountOptions ): Promise => { const keys = this.createRedisKeys(id); const { from, to, intervalMs } = options; const results: HistoricalParkingAverageQueryResult[] = []; let currentIntervalStart = from.getTime(); const endTime = to.getTime(); while (currentIntervalStart < endTime) { const currentIntervalEnd = Math.min(currentIntervalStart + intervalMs, endTime); try { const aggregationResult = await this.redisClient.sendCommand([ 'TS.RANGE', keys.timeSeries, currentIntervalStart.toString(), currentIntervalEnd.toString(), 'AGGREGATION', 'AVG', intervalMs.toString() ]) as [string, string][]; if (aggregationResult && aggregationResult.length > 0) { const [, averageValue] = aggregationResult[0]; results.push({ from: new Date(currentIntervalStart), to: new Date(currentIntervalEnd), averageSpotsAvailable: parseFloat(averageValue) }); } } catch (error) { // If Redis aggregation fails, skip this interval } currentIntervalStart = currentIntervalEnd; } return results; }; setLoggingInterval = (intervalMs: number): void => { this.loggingIntervalMs = intervalMs; }; }