diff --git a/src/repositories/InMemoryParkingRepository.ts b/src/repositories/InMemoryParkingRepository.ts index 05c7b3e..741cfd9 100644 --- a/src/repositories/InMemoryParkingRepository.ts +++ b/src/repositories/InMemoryParkingRepository.ts @@ -5,16 +5,13 @@ import { } from "../entities/ParkingRepositoryEntities"; import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository"; import { CircularQueue } from "../types/CircularQueue"; +import { PARKING_LOGGING_INTERVAL_MS } from "./ParkingRepositoryConstants"; + +// If every 10 minutes, two weeks of data (6x per hour * 24x per day * 7x per week * 2) +export const MAX_NUM_ENTRIES = 2016; export type ParkingStructureID = string; -// Every 10 minutes -// 6x per hour * 24x per day * 7x per week = 1008 entries for one week -export const PARKING_LOGGING_INTERVAL_MS = 600000; - -// This will last two weeks -export const MAX_NUM_ENTRIES = 2016; - export class InMemoryParkingRepository implements ParkingGetterSetterRepository { private dataLastAdded: Map = new Map(); private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS; @@ -34,29 +31,18 @@ export class InMemoryParkingRepository implements ParkingGetterSetterRepository const now = Date.now(); const lastAdded = this.dataLastAdded.get(structure.id); - const parkingLoggingIntervalExceeded = () => { - return !lastAdded || (now - lastAdded.getTime()) >= this.loggingIntervalMs; - }; - - if (parkingLoggingIntervalExceeded()) { - const timestampRecord: IParkingStructureTimestampRecord = { - id: structure.id, - spotsAvailable: structure.spotsAvailable, - timestampMs: now, - }; - - if (!this.historicalData.has(structure.id)) { - this.historicalData.set(structure.id, new CircularQueue(MAX_NUM_ENTRIES)); - } - - const sortingCallback = (a: IParkingStructureTimestampRecord, b: IParkingStructureTimestampRecord) => a.timestampMs - b.timestampMs; - this.historicalData.get(structure.id)?.appendWithSorting(timestampRecord, sortingCallback); + if (this.shouldLogHistoricalData(lastAdded, now)) { + const timestampRecord = this.createTimestampRecord(structure, now); + this.ensureHistoricalDataExists(structure.id); + this.addRecordToHistoricalData(structure.id, timestampRecord); this.dataLastAdded.set(structure.id, new Date(now)); } - } + }; clearParkingStructureData = async (): Promise => { this.structures.clear(); + this.historicalData.clear(); + this.dataLastAdded.clear(); }; getParkingStructureById = async (id: string): Promise => { @@ -70,6 +56,8 @@ export class InMemoryParkingRepository implements ParkingGetterSetterRepository const structure = this.structures.get(id); if (structure) { this.structures.delete(id); + this.historicalData.delete(id); + this.dataLastAdded.delete(id); return { ...structure }; } return null; @@ -81,41 +69,74 @@ export class InMemoryParkingRepository implements ParkingGetterSetterRepository return []; } + const records = this.extractRecordsFromQueue(queue); + return this.calculateAveragesFromRecords(records, options); + }; + + private shouldLogHistoricalData = (lastAdded: Date | undefined, currentTime: number): boolean => { + return !lastAdded || (currentTime - lastAdded.getTime()) >= this.loggingIntervalMs; + }; + + private createTimestampRecord = (structure: IParkingStructure, timestampMs: number): IParkingStructureTimestampRecord => ({ + id: structure.id, + spotsAvailable: structure.spotsAvailable, + timestampMs, + }); + + private ensureHistoricalDataExists = (structureId: string): void => { + if (!this.historicalData.has(structureId)) { + this.historicalData.set(structureId, new CircularQueue(MAX_NUM_ENTRIES)); + } + }; + + private addRecordToHistoricalData = (structureId: string, record: IParkingStructureTimestampRecord): void => { + const sortingCallback = (a: IParkingStructureTimestampRecord, b: IParkingStructureTimestampRecord) => a.timestampMs - b.timestampMs; + this.historicalData.get(structureId)?.appendWithSorting(record, sortingCallback); + }; + + private extractRecordsFromQueue = (queue: CircularQueue): IParkingStructureTimestampRecord[] => { + const records: IParkingStructureTimestampRecord[] = []; + for (let i = 0; i < queue.size(); i++) { + const record = queue.get(i); + if (record) { + records.push(record); + } + } + return records; + }; + + private calculateAveragesFromRecords = ( + records: IParkingStructureTimestampRecord[], + options: ParkingStructureCountOptions + ): HistoricalParkingAverageQueryResult[] => { const results: HistoricalParkingAverageQueryResult[] = []; const { startUnixEpochMs, endUnixEpochMs, intervalMs } = options; - + let currentIntervalStart = startUnixEpochMs; - + while (currentIntervalStart < endUnixEpochMs) { const currentIntervalEnd = Math.min(currentIntervalStart + intervalMs, endUnixEpochMs); - const recordsInInterval = this.getRecordsInTimeRange(queue, currentIntervalStart, currentIntervalEnd); - + const recordsInInterval = this.getRecordsInTimeRange(records, currentIntervalStart, currentIntervalEnd); + if (recordsInInterval.length > 0) { const averageResult = this.calculateAverageForInterval(currentIntervalStart, currentIntervalEnd, recordsInInterval); results.push(averageResult); } - + currentIntervalStart = currentIntervalEnd; } - + return results; }; private getRecordsInTimeRange = ( - queue: CircularQueue, - startMs: number, + records: IParkingStructureTimestampRecord[], + startMs: number, endMs: number ): IParkingStructureTimestampRecord[] => { - const recordsInInterval: IParkingStructureTimestampRecord[] = []; - - for (let i = 0; i < queue.size(); i++) { - const record = queue.get(i); - if (record && record.timestampMs >= startMs && record.timestampMs < endMs) { - recordsInInterval.push(record); - } - } - - return recordsInInterval; + return records.filter(record => + record.timestampMs >= startMs && record.timestampMs < endMs + ); }; private calculateAverageForInterval = ( @@ -125,7 +146,7 @@ export class InMemoryParkingRepository implements ParkingGetterSetterRepository ): HistoricalParkingAverageQueryResult => { const totalSpotsAvailable = records.reduce((sum, record) => sum + record.spotsAvailable, 0); const averageSpotsAvailable = totalSpotsAvailable / records.length; - + return { fromUnixEpochMs: fromMs, toUnixEpochMs: toMs, diff --git a/src/repositories/ParkingRepositoryConstants.ts b/src/repositories/ParkingRepositoryConstants.ts new file mode 100644 index 0000000..4d8b30e --- /dev/null +++ b/src/repositories/ParkingRepositoryConstants.ts @@ -0,0 +1,3 @@ +export const PARKING_LOGGING_INTERVAL_MS = process.env.PARKING_LOGGING_INTERVAL_MS + ? parseInt(process.env.PARKING_LOGGING_INTERVAL_MS) + : 600000; // Every 10 minutes diff --git a/src/repositories/RedisParkingRepository.ts b/src/repositories/RedisParkingRepository.ts index f3beb2f..d71a953 100644 --- a/src/repositories/RedisParkingRepository.ts +++ b/src/repositories/RedisParkingRepository.ts @@ -2,30 +2,17 @@ import { ParkingGetterSetterRepository } from "./ParkingGetterSetterRepository"; import { IParkingStructure, IParkingStructureTimestampRecord } from "../entities/ParkingRepositoryEntities"; import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository"; import { BaseRedisRepository } from "./BaseRedisRepository"; +import { PARKING_LOGGING_INTERVAL_MS } from "./ParkingRepositoryConstants"; export type ParkingStructureID = string; -// Every 10 minutes -export const PARKING_LOGGING_INTERVAL_MS = 600000; - export class RedisParkingRepository extends BaseRedisRepository implements ParkingGetterSetterRepository { private dataLastAdded: Map = new Map(); private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS; addOrUpdateParkingStructure = async (structure: IParkingStructure): Promise => { - // Store current structure data - await this.redisClient.hSet(`parking:structure:${structure.id}`, { - id: structure.id, - name: structure.name, - address: structure.address, - capacity: structure.capacity.toString(), - spotsAvailable: structure.spotsAvailable.toString(), - latitude: structure.coordinates.latitude.toString(), - longitude: structure.coordinates.longitude.toString(), - updatedTime: structure.updatedTime.toISOString() - }); - - // Add to historical data if needed + const keys = this.createRedisKeys(structure.id); + await this.redisClient.hSet(keys.structure, this.createRedisHashFromStructure(structure)); await this.addHistoricalDataForStructure(structure); }; @@ -33,92 +20,34 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki const now = Date.now(); const lastAdded = this.dataLastAdded.get(structure.id); - const parkingLoggingIntervalExceeded = () => { - return !lastAdded || (now - lastAdded.getTime()) >= this.loggingIntervalMs; - }; - - if (parkingLoggingIntervalExceeded()) { - // Use Redis Time Series to store historical data - const timeSeriesKey = `parking:timeseries:${structure.id}`; - - try { - // Try to add the time series data point - await this.redisClient.sendCommand([ - 'TS.ADD', - timeSeriesKey, - now.toString(), - structure.spotsAvailable.toString(), - 'LABELS', - 'structureId', - structure.id - ]); - } catch (error) { - // If time series doesn't exist, create it first - try { - await this.redisClient.sendCommand([ - 'TS.CREATE', - timeSeriesKey, - 'LABELS', - 'structureId', - structure.id - ]); - // Now add the data point - await this.redisClient.sendCommand([ - 'TS.ADD', - timeSeriesKey, - now.toString(), - structure.spotsAvailable.toString() - ]); - } catch (createError) { - // If still fails, it might be because time series already exists, try adding again - await this.redisClient.sendCommand([ - 'TS.ADD', - timeSeriesKey, - now.toString(), - structure.spotsAvailable.toString() - ]); - } - } - + if (this.shouldLogHistoricalData(lastAdded, now)) { + const keys = this.createRedisKeys(structure.id); + await this.addTimeSeriesDataPoint(keys.timeSeries, now, structure.spotsAvailable, structure.id); this.dataLastAdded.set(structure.id, new Date(now)); } }; clearParkingStructureData = async (): Promise => { - // Get all parking structure keys const structureKeys = await this.redisClient.keys('parking:structure:*'); const timeSeriesKeys = await this.redisClient.keys('parking:timeseries:*'); - // Delete all structure and time series data - if (structureKeys.length > 0) { - await this.redisClient.del(structureKeys); - } - if (timeSeriesKeys.length > 0) { - await this.redisClient.del(timeSeriesKeys); + const allKeys = [...structureKeys, ...timeSeriesKeys]; + if (allKeys.length > 0) { + await this.redisClient.del(allKeys); } this.dataLastAdded.clear(); }; getParkingStructureById = async (id: string): Promise => { - const data = await this.redisClient.hGetAll(`parking:structure:${id}`); + const keys = this.createRedisKeys(id); + const data = await this.redisClient.hGetAll(keys.structure); if (Object.keys(data).length === 0) { return null; } - return { - id: data.id, - name: data.name, - address: data.address, - capacity: parseInt(data.capacity), - spotsAvailable: parseInt(data.spotsAvailable), - coordinates: { - latitude: parseFloat(data.latitude), - longitude: parseFloat(data.longitude) - }, - updatedTime: new Date(data.updatedTime) - }; + return this.createStructureFromRedisData(data); }; getParkingStructures = async (): Promise => { @@ -128,18 +57,7 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki for (const key of keys) { const data = await this.redisClient.hGetAll(key); if (Object.keys(data).length > 0) { - structures.push({ - id: data.id, - name: data.name, - address: data.address, - capacity: parseInt(data.capacity), - spotsAvailable: parseInt(data.spotsAvailable), - coordinates: { - latitude: parseFloat(data.latitude), - longitude: parseFloat(data.longitude) - }, - updatedTime: new Date(data.updatedTime) - }); + structures.push(this.createStructureFromRedisData(data)); } } @@ -149,8 +67,8 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki removeParkingStructureIfExists = async (id: string): Promise => { const structure = await this.getParkingStructureById(id); if (structure) { - await this.redisClient.del(`parking:structure:${id}`); - await this.redisClient.del(`parking:timeseries:${id}`); + const keys = this.createRedisKeys(id); + await this.redisClient.del([keys.structure, keys.timeSeries]); this.dataLastAdded.delete(id); return structure; } @@ -158,13 +76,12 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki }; getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: ParkingStructureCountOptions): Promise => { - const timeSeriesKey = `parking:timeseries:${id}`; + const keys = this.createRedisKeys(id); try { - // Get time series data for the specified range const timeSeriesData = await this.redisClient.sendCommand([ 'TS.RANGE', - timeSeriesKey, + keys.timeSeries, options.startUnixEpochMs.toString(), options.endUnixEpochMs.toString() ]) as [string, string][]; @@ -173,20 +90,95 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki return []; } - // Convert Redis time series data to our record format - const records: IParkingStructureTimestampRecord[] = timeSeriesData.map(([timestamp, value]) => ({ - id, - timestampMs: parseInt(timestamp), - spotsAvailable: parseInt(value) - })); - + const records = this.convertTimeSeriesDataToRecords(timeSeriesData, id); return this.calculateAveragesFromRecords(records, options); } catch (error) { - // Time series might not exist return []; } }; + private createRedisKeys = (structureId: string) => ({ + structure: `parking:structure:${structureId}`, + timeSeries: `parking:timeseries:${structureId}` + }); + + private createRedisHashFromStructure = (structure: IParkingStructure): Record => ({ + id: structure.id, + name: structure.name, + address: structure.address, + capacity: structure.capacity.toString(), + spotsAvailable: structure.spotsAvailable.toString(), + latitude: structure.coordinates.latitude.toString(), + longitude: structure.coordinates.longitude.toString(), + updatedTime: structure.updatedTime.toISOString() + }); + + private createStructureFromRedisData = (data: Record): IParkingStructure => ({ + id: data.id, + name: data.name, + address: data.address, + capacity: parseInt(data.capacity), + spotsAvailable: parseInt(data.spotsAvailable), + coordinates: { + latitude: parseFloat(data.latitude), + longitude: parseFloat(data.longitude) + }, + updatedTime: new Date(data.updatedTime) + }); + + private shouldLogHistoricalData = (lastAdded: Date | undefined, currentTime: number): boolean => { + return !lastAdded || (currentTime - lastAdded.getTime()) >= this.loggingIntervalMs; + }; + + private addTimeSeriesDataPoint = async (timeSeriesKey: string, timestamp: number, value: number, structureId: string): Promise => { + try { + await this.redisClient.sendCommand([ + 'TS.ADD', + timeSeriesKey, + timestamp.toString(), + value.toString(), + 'LABELS', + 'structureId', + structureId + ]); + } catch (error) { + await this.createTimeSeriesAndAddDataPoint(timeSeriesKey, timestamp, value, structureId); + } + }; + + private createTimeSeriesAndAddDataPoint = async (timeSeriesKey: string, timestamp: number, value: number, structureId: string): Promise => { + try { + await this.redisClient.sendCommand([ + 'TS.CREATE', + timeSeriesKey, + 'LABELS', + 'structureId', + structureId + ]); + await this.redisClient.sendCommand([ + 'TS.ADD', + timeSeriesKey, + timestamp.toString(), + value.toString() + ]); + } catch (createError) { + await this.redisClient.sendCommand([ + 'TS.ADD', + timeSeriesKey, + timestamp.toString(), + value.toString() + ]); + } + }; + + private convertTimeSeriesDataToRecords = (timeSeriesData: [string, string][], id: string): IParkingStructureTimestampRecord[] => { + return timeSeriesData.map(([timestamp, value]) => ({ + id, + timestampMs: parseInt(timestamp), + spotsAvailable: parseInt(value) + })); + }; + private calculateAveragesFromRecords = ( records: IParkingStructureTimestampRecord[], options: ParkingStructureCountOptions diff --git a/test/repositories/ParkingRepositorySharedTests.test.ts b/test/repositories/ParkingRepositorySharedTests.test.ts index 729e88b..21d9f41 100644 --- a/test/repositories/ParkingRepositorySharedTests.test.ts +++ b/test/repositories/ParkingRepositorySharedTests.test.ts @@ -63,6 +63,7 @@ describe.each(repositoryImplementations)('$name', (holder) => { afterEach(async () => { await holder.teardown(); + jest.useRealTimers(); }); describe("addOrUpdateParkingStructure", () => { @@ -177,7 +178,7 @@ describe.each(repositoryImplementations)('$name', (holder) => { // Add updates with small delays to ensure different timestamps for (let i = 0; i < updates.length; i++) { await repository.addOrUpdateParkingStructure(updates[i]); - await new Promise(resolve => setTimeout(resolve, 100)); // Small delay + await new Promise((resolve) => setTimeout(resolve, 200)); } const now = Date.now();