import { createClient, RedisClientType } from 'redis'; import { ParkingGetterSetterRepository } from "./ParkingGetterSetterRepository"; import { IParkingStructure, IParkingStructureTimestampRecord } from "../entities/ParkingRepositoryEntities"; import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository"; export type ParkingStructureID = string; // Every 10 minutes export const PARKING_LOGGING_INTERVAL_MS = 600000; export class RedisParkingRepository implements ParkingGetterSetterRepository { private dataLastAdded: Map = new Map(); constructor( private redisClient = createClient({ url: process.env.REDIS_URL, socket: { tls: (process.env.REDIS_URL?.match(/rediss:/) != null), rejectUnauthorized: false, } }), ) { } get isReady() { return this.redisClient.isReady; } public async connect() { await this.redisClient.connect(); } public async disconnect() { await this.redisClient.disconnect(); } public async clearAllData() { await this.redisClient.flushAll(); this.dataLastAdded.clear(); } addOrUpdateParkingStructure = async (structure: IParkingStructure): Promise => { // Store current structure data await this.redisClient.hSet(`parking:structure:${structure.id}`, { id: structure.id, name: structure.name, address: structure.address, capacity: structure.capacity.toString(), spotsAvailable: structure.spotsAvailable.toString(), latitude: structure.coordinates.latitude.toString(), longitude: structure.coordinates.longitude.toString(), updatedTime: structure.updatedTime.toISOString() }); // Add to historical data if needed await this.addHistoricalDataForStructure(structure); }; private addHistoricalDataForStructure = async (structure: IParkingStructure): Promise => { const now = Date.now(); const lastAdded = this.dataLastAdded.get(structure.id); const parkingLoggingIntervalExceeded = () => { return !lastAdded || (now - lastAdded.getTime()) >= PARKING_LOGGING_INTERVAL_MS; }; if (parkingLoggingIntervalExceeded()) { // Use Redis Time Series to store historical data const timeSeriesKey = `parking:timeseries:${structure.id}`; try { // Try to add the time series data point await this.redisClient.sendCommand([ 'TS.ADD', timeSeriesKey, now.toString(), structure.spotsAvailable.toString(), 'LABELS', 'structureId', structure.id ]); } catch (error) { // If time series doesn't exist, create it first try { await this.redisClient.sendCommand([ 'TS.CREATE', timeSeriesKey, 'LABELS', 'structureId', structure.id ]); // Now add the data point await this.redisClient.sendCommand([ 'TS.ADD', timeSeriesKey, now.toString(), structure.spotsAvailable.toString() ]); } catch (createError) { // If still fails, it might be because time series already exists, try adding again await this.redisClient.sendCommand([ 'TS.ADD', timeSeriesKey, now.toString(), structure.spotsAvailable.toString() ]); } } this.dataLastAdded.set(structure.id, new Date(now)); } }; clearParkingStructureData = async (): Promise => { // Get all parking structure keys const structureKeys = await this.redisClient.keys('parking:structure:*'); const timeSeriesKeys = await this.redisClient.keys('parking:timeseries:*'); // Delete all structure and time series data if (structureKeys.length > 0) { await this.redisClient.del(structureKeys); } if (timeSeriesKeys.length > 0) { await this.redisClient.del(timeSeriesKeys); } this.dataLastAdded.clear(); }; getParkingStructureById = async (id: string): Promise => { const data = await this.redisClient.hGetAll(`parking:structure:${id}`); if (Object.keys(data).length === 0) { return null; } return { id: data.id, name: data.name, address: data.address, capacity: parseInt(data.capacity), spotsAvailable: parseInt(data.spotsAvailable), coordinates: { latitude: parseFloat(data.latitude), longitude: parseFloat(data.longitude) }, updatedTime: new Date(data.updatedTime) }; }; getParkingStructures = async (): Promise => { const keys = await this.redisClient.keys('parking:structure:*'); const structures: IParkingStructure[] = []; for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { structures.push({ id: data.id, name: data.name, address: data.address, capacity: parseInt(data.capacity), spotsAvailable: parseInt(data.spotsAvailable), coordinates: { latitude: parseFloat(data.latitude), longitude: parseFloat(data.longitude) }, updatedTime: new Date(data.updatedTime) }); } } return structures; }; removeParkingStructureIfExists = async (id: string): Promise => { const structure = await this.getParkingStructureById(id); if (structure) { await this.redisClient.del(`parking:structure:${id}`); await this.redisClient.del(`parking:timeseries:${id}`); this.dataLastAdded.delete(id); return structure; } return null; }; getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: ParkingStructureCountOptions): Promise => { const timeSeriesKey = `parking:timeseries:${id}`; try { // Get time series data for the specified range const timeSeriesData = await this.redisClient.sendCommand([ 'TS.RANGE', timeSeriesKey, options.startUnixEpochMs.toString(), options.endUnixEpochMs.toString() ]) as [string, string][]; if (!timeSeriesData || timeSeriesData.length === 0) { return []; } // Convert Redis time series data to our record format const records: IParkingStructureTimestampRecord[] = timeSeriesData.map(([timestamp, value]) => ({ id, timestampMs: parseInt(timestamp), spotsAvailable: parseInt(value) })); return this.calculateAveragesFromRecords(records, options); } catch (error) { // Time series might not exist return []; } }; private calculateAveragesFromRecords = ( records: IParkingStructureTimestampRecord[], options: ParkingStructureCountOptions ): HistoricalParkingAverageQueryResult[] => { const results: HistoricalParkingAverageQueryResult[] = []; const { startUnixEpochMs, endUnixEpochMs, intervalMs } = options; let currentIntervalStart = startUnixEpochMs; while (currentIntervalStart < endUnixEpochMs) { const currentIntervalEnd = Math.min(currentIntervalStart + intervalMs, endUnixEpochMs); const recordsInInterval = this.getRecordsInTimeRange(records, currentIntervalStart, currentIntervalEnd); if (recordsInInterval.length > 0) { const averageResult = this.calculateAverageForInterval(currentIntervalStart, currentIntervalEnd, recordsInInterval); results.push(averageResult); } currentIntervalStart = currentIntervalEnd; } return results; }; private getRecordsInTimeRange = ( records: IParkingStructureTimestampRecord[], startMs: number, endMs: number ): IParkingStructureTimestampRecord[] => { return records.filter(record => record.timestampMs >= startMs && record.timestampMs < endMs ); }; private calculateAverageForInterval = ( fromMs: number, toMs: number, records: IParkingStructureTimestampRecord[] ): HistoricalParkingAverageQueryResult => { const totalSpotsAvailable = records.reduce((sum, record) => sum + record.spotsAvailable, 0); const averageSpotsAvailable = totalSpotsAvailable / records.length; return { fromUnixEpochMs: fromMs, toUnixEpochMs: toMs, averageSpotsAvailable }; }; }