Refactor parking repositories to use dedicated Constants file and smaller functions

This commit is contained in:
2025-07-03 15:19:57 -04:00
parent 7f453157ee
commit b9d5f7b3df
4 changed files with 170 additions and 153 deletions

View File

@@ -5,16 +5,13 @@ import {
} from "../entities/ParkingRepositoryEntities"; } from "../entities/ParkingRepositoryEntities";
import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository"; import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository";
import { CircularQueue } from "../types/CircularQueue"; import { CircularQueue } from "../types/CircularQueue";
import { PARKING_LOGGING_INTERVAL_MS } from "./ParkingRepositoryConstants";
// If every 10 minutes, two weeks of data (6x per hour * 24x per day * 7x per week * 2)
export const MAX_NUM_ENTRIES = 2016;
export type ParkingStructureID = string; export type ParkingStructureID = string;
// Every 10 minutes
// 6x per hour * 24x per day * 7x per week = 1008 entries for one week
export const PARKING_LOGGING_INTERVAL_MS = 600000;
// This will last two weeks
export const MAX_NUM_ENTRIES = 2016;
export class InMemoryParkingRepository implements ParkingGetterSetterRepository { export class InMemoryParkingRepository implements ParkingGetterSetterRepository {
private dataLastAdded: Map<ParkingStructureID, Date> = new Map(); private dataLastAdded: Map<ParkingStructureID, Date> = new Map();
private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS; private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS;
@@ -34,29 +31,18 @@ export class InMemoryParkingRepository implements ParkingGetterSetterRepository
const now = Date.now(); const now = Date.now();
const lastAdded = this.dataLastAdded.get(structure.id); const lastAdded = this.dataLastAdded.get(structure.id);
const parkingLoggingIntervalExceeded = () => { if (this.shouldLogHistoricalData(lastAdded, now)) {
return !lastAdded || (now - lastAdded.getTime()) >= this.loggingIntervalMs; const timestampRecord = this.createTimestampRecord(structure, now);
}; this.ensureHistoricalDataExists(structure.id);
this.addRecordToHistoricalData(structure.id, timestampRecord);
if (parkingLoggingIntervalExceeded()) {
const timestampRecord: IParkingStructureTimestampRecord = {
id: structure.id,
spotsAvailable: structure.spotsAvailable,
timestampMs: now,
};
if (!this.historicalData.has(structure.id)) {
this.historicalData.set(structure.id, new CircularQueue<IParkingStructureTimestampRecord>(MAX_NUM_ENTRIES));
}
const sortingCallback = (a: IParkingStructureTimestampRecord, b: IParkingStructureTimestampRecord) => a.timestampMs - b.timestampMs;
this.historicalData.get(structure.id)?.appendWithSorting(timestampRecord, sortingCallback);
this.dataLastAdded.set(structure.id, new Date(now)); this.dataLastAdded.set(structure.id, new Date(now));
} }
} };
clearParkingStructureData = async (): Promise<void> => { clearParkingStructureData = async (): Promise<void> => {
this.structures.clear(); this.structures.clear();
this.historicalData.clear();
this.dataLastAdded.clear();
}; };
getParkingStructureById = async (id: string): Promise<IParkingStructure | null> => { getParkingStructureById = async (id: string): Promise<IParkingStructure | null> => {
@@ -70,6 +56,8 @@ export class InMemoryParkingRepository implements ParkingGetterSetterRepository
const structure = this.structures.get(id); const structure = this.structures.get(id);
if (structure) { if (structure) {
this.structures.delete(id); this.structures.delete(id);
this.historicalData.delete(id);
this.dataLastAdded.delete(id);
return { ...structure }; return { ...structure };
} }
return null; return null;
@@ -81,41 +69,74 @@ export class InMemoryParkingRepository implements ParkingGetterSetterRepository
return []; return [];
} }
const records = this.extractRecordsFromQueue(queue);
return this.calculateAveragesFromRecords(records, options);
};
private shouldLogHistoricalData = (lastAdded: Date | undefined, currentTime: number): boolean => {
return !lastAdded || (currentTime - lastAdded.getTime()) >= this.loggingIntervalMs;
};
private createTimestampRecord = (structure: IParkingStructure, timestampMs: number): IParkingStructureTimestampRecord => ({
id: structure.id,
spotsAvailable: structure.spotsAvailable,
timestampMs,
});
private ensureHistoricalDataExists = (structureId: string): void => {
if (!this.historicalData.has(structureId)) {
this.historicalData.set(structureId, new CircularQueue<IParkingStructureTimestampRecord>(MAX_NUM_ENTRIES));
}
};
private addRecordToHistoricalData = (structureId: string, record: IParkingStructureTimestampRecord): void => {
const sortingCallback = (a: IParkingStructureTimestampRecord, b: IParkingStructureTimestampRecord) => a.timestampMs - b.timestampMs;
this.historicalData.get(structureId)?.appendWithSorting(record, sortingCallback);
};
private extractRecordsFromQueue = (queue: CircularQueue<IParkingStructureTimestampRecord>): IParkingStructureTimestampRecord[] => {
const records: IParkingStructureTimestampRecord[] = [];
for (let i = 0; i < queue.size(); i++) {
const record = queue.get(i);
if (record) {
records.push(record);
}
}
return records;
};
private calculateAveragesFromRecords = (
records: IParkingStructureTimestampRecord[],
options: ParkingStructureCountOptions
): HistoricalParkingAverageQueryResult[] => {
const results: HistoricalParkingAverageQueryResult[] = []; const results: HistoricalParkingAverageQueryResult[] = [];
const { startUnixEpochMs, endUnixEpochMs, intervalMs } = options; const { startUnixEpochMs, endUnixEpochMs, intervalMs } = options;
let currentIntervalStart = startUnixEpochMs; let currentIntervalStart = startUnixEpochMs;
while (currentIntervalStart < endUnixEpochMs) { while (currentIntervalStart < endUnixEpochMs) {
const currentIntervalEnd = Math.min(currentIntervalStart + intervalMs, endUnixEpochMs); const currentIntervalEnd = Math.min(currentIntervalStart + intervalMs, endUnixEpochMs);
const recordsInInterval = this.getRecordsInTimeRange(queue, currentIntervalStart, currentIntervalEnd); const recordsInInterval = this.getRecordsInTimeRange(records, currentIntervalStart, currentIntervalEnd);
if (recordsInInterval.length > 0) { if (recordsInInterval.length > 0) {
const averageResult = this.calculateAverageForInterval(currentIntervalStart, currentIntervalEnd, recordsInInterval); const averageResult = this.calculateAverageForInterval(currentIntervalStart, currentIntervalEnd, recordsInInterval);
results.push(averageResult); results.push(averageResult);
} }
currentIntervalStart = currentIntervalEnd; currentIntervalStart = currentIntervalEnd;
} }
return results; return results;
}; };
private getRecordsInTimeRange = ( private getRecordsInTimeRange = (
queue: CircularQueue<IParkingStructureTimestampRecord>, records: IParkingStructureTimestampRecord[],
startMs: number, startMs: number,
endMs: number endMs: number
): IParkingStructureTimestampRecord[] => { ): IParkingStructureTimestampRecord[] => {
const recordsInInterval: IParkingStructureTimestampRecord[] = []; return records.filter(record =>
record.timestampMs >= startMs && record.timestampMs < endMs
for (let i = 0; i < queue.size(); i++) { );
const record = queue.get(i);
if (record && record.timestampMs >= startMs && record.timestampMs < endMs) {
recordsInInterval.push(record);
}
}
return recordsInInterval;
}; };
private calculateAverageForInterval = ( private calculateAverageForInterval = (
@@ -125,7 +146,7 @@ export class InMemoryParkingRepository implements ParkingGetterSetterRepository
): HistoricalParkingAverageQueryResult => { ): HistoricalParkingAverageQueryResult => {
const totalSpotsAvailable = records.reduce((sum, record) => sum + record.spotsAvailable, 0); const totalSpotsAvailable = records.reduce((sum, record) => sum + record.spotsAvailable, 0);
const averageSpotsAvailable = totalSpotsAvailable / records.length; const averageSpotsAvailable = totalSpotsAvailable / records.length;
return { return {
fromUnixEpochMs: fromMs, fromUnixEpochMs: fromMs,
toUnixEpochMs: toMs, toUnixEpochMs: toMs,

View File

@@ -0,0 +1,3 @@
export const PARKING_LOGGING_INTERVAL_MS = process.env.PARKING_LOGGING_INTERVAL_MS
? parseInt(process.env.PARKING_LOGGING_INTERVAL_MS)
: 600000; // Every 10 minutes

View File

@@ -2,30 +2,17 @@ import { ParkingGetterSetterRepository } from "./ParkingGetterSetterRepository";
import { IParkingStructure, IParkingStructureTimestampRecord } from "../entities/ParkingRepositoryEntities"; import { IParkingStructure, IParkingStructureTimestampRecord } from "../entities/ParkingRepositoryEntities";
import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository"; import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository";
import { BaseRedisRepository } from "./BaseRedisRepository"; import { BaseRedisRepository } from "./BaseRedisRepository";
import { PARKING_LOGGING_INTERVAL_MS } from "./ParkingRepositoryConstants";
export type ParkingStructureID = string; export type ParkingStructureID = string;
// Every 10 minutes
export const PARKING_LOGGING_INTERVAL_MS = 600000;
export class RedisParkingRepository extends BaseRedisRepository implements ParkingGetterSetterRepository { export class RedisParkingRepository extends BaseRedisRepository implements ParkingGetterSetterRepository {
private dataLastAdded: Map<ParkingStructureID, Date> = new Map(); private dataLastAdded: Map<ParkingStructureID, Date> = new Map();
private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS; private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS;
addOrUpdateParkingStructure = async (structure: IParkingStructure): Promise<void> => { addOrUpdateParkingStructure = async (structure: IParkingStructure): Promise<void> => {
// Store current structure data const keys = this.createRedisKeys(structure.id);
await this.redisClient.hSet(`parking:structure:${structure.id}`, { await this.redisClient.hSet(keys.structure, this.createRedisHashFromStructure(structure));
id: structure.id,
name: structure.name,
address: structure.address,
capacity: structure.capacity.toString(),
spotsAvailable: structure.spotsAvailable.toString(),
latitude: structure.coordinates.latitude.toString(),
longitude: structure.coordinates.longitude.toString(),
updatedTime: structure.updatedTime.toISOString()
});
// Add to historical data if needed
await this.addHistoricalDataForStructure(structure); await this.addHistoricalDataForStructure(structure);
}; };
@@ -33,92 +20,34 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
const now = Date.now(); const now = Date.now();
const lastAdded = this.dataLastAdded.get(structure.id); const lastAdded = this.dataLastAdded.get(structure.id);
const parkingLoggingIntervalExceeded = () => { if (this.shouldLogHistoricalData(lastAdded, now)) {
return !lastAdded || (now - lastAdded.getTime()) >= this.loggingIntervalMs; const keys = this.createRedisKeys(structure.id);
}; await this.addTimeSeriesDataPoint(keys.timeSeries, now, structure.spotsAvailable, structure.id);
if (parkingLoggingIntervalExceeded()) {
// Use Redis Time Series to store historical data
const timeSeriesKey = `parking:timeseries:${structure.id}`;
try {
// Try to add the time series data point
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
now.toString(),
structure.spotsAvailable.toString(),
'LABELS',
'structureId',
structure.id
]);
} catch (error) {
// If time series doesn't exist, create it first
try {
await this.redisClient.sendCommand([
'TS.CREATE',
timeSeriesKey,
'LABELS',
'structureId',
structure.id
]);
// Now add the data point
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
now.toString(),
structure.spotsAvailable.toString()
]);
} catch (createError) {
// If still fails, it might be because time series already exists, try adding again
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
now.toString(),
structure.spotsAvailable.toString()
]);
}
}
this.dataLastAdded.set(structure.id, new Date(now)); this.dataLastAdded.set(structure.id, new Date(now));
} }
}; };
clearParkingStructureData = async (): Promise<void> => { clearParkingStructureData = async (): Promise<void> => {
// Get all parking structure keys
const structureKeys = await this.redisClient.keys('parking:structure:*'); const structureKeys = await this.redisClient.keys('parking:structure:*');
const timeSeriesKeys = await this.redisClient.keys('parking:timeseries:*'); const timeSeriesKeys = await this.redisClient.keys('parking:timeseries:*');
// Delete all structure and time series data const allKeys = [...structureKeys, ...timeSeriesKeys];
if (structureKeys.length > 0) { if (allKeys.length > 0) {
await this.redisClient.del(structureKeys); await this.redisClient.del(allKeys);
}
if (timeSeriesKeys.length > 0) {
await this.redisClient.del(timeSeriesKeys);
} }
this.dataLastAdded.clear(); this.dataLastAdded.clear();
}; };
getParkingStructureById = async (id: string): Promise<IParkingStructure | null> => { getParkingStructureById = async (id: string): Promise<IParkingStructure | null> => {
const data = await this.redisClient.hGetAll(`parking:structure:${id}`); const keys = this.createRedisKeys(id);
const data = await this.redisClient.hGetAll(keys.structure);
if (Object.keys(data).length === 0) { if (Object.keys(data).length === 0) {
return null; return null;
} }
return { return this.createStructureFromRedisData(data);
id: data.id,
name: data.name,
address: data.address,
capacity: parseInt(data.capacity),
spotsAvailable: parseInt(data.spotsAvailable),
coordinates: {
latitude: parseFloat(data.latitude),
longitude: parseFloat(data.longitude)
},
updatedTime: new Date(data.updatedTime)
};
}; };
getParkingStructures = async (): Promise<IParkingStructure[]> => { getParkingStructures = async (): Promise<IParkingStructure[]> => {
@@ -128,18 +57,7 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
for (const key of keys) { for (const key of keys) {
const data = await this.redisClient.hGetAll(key); const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) { if (Object.keys(data).length > 0) {
structures.push({ structures.push(this.createStructureFromRedisData(data));
id: data.id,
name: data.name,
address: data.address,
capacity: parseInt(data.capacity),
spotsAvailable: parseInt(data.spotsAvailable),
coordinates: {
latitude: parseFloat(data.latitude),
longitude: parseFloat(data.longitude)
},
updatedTime: new Date(data.updatedTime)
});
} }
} }
@@ -149,8 +67,8 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
removeParkingStructureIfExists = async (id: string): Promise<IParkingStructure | null> => { removeParkingStructureIfExists = async (id: string): Promise<IParkingStructure | null> => {
const structure = await this.getParkingStructureById(id); const structure = await this.getParkingStructureById(id);
if (structure) { if (structure) {
await this.redisClient.del(`parking:structure:${id}`); const keys = this.createRedisKeys(id);
await this.redisClient.del(`parking:timeseries:${id}`); await this.redisClient.del([keys.structure, keys.timeSeries]);
this.dataLastAdded.delete(id); this.dataLastAdded.delete(id);
return structure; return structure;
} }
@@ -158,13 +76,12 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
}; };
getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: ParkingStructureCountOptions): Promise<HistoricalParkingAverageQueryResult[]> => { getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: ParkingStructureCountOptions): Promise<HistoricalParkingAverageQueryResult[]> => {
const timeSeriesKey = `parking:timeseries:${id}`; const keys = this.createRedisKeys(id);
try { try {
// Get time series data for the specified range
const timeSeriesData = await this.redisClient.sendCommand([ const timeSeriesData = await this.redisClient.sendCommand([
'TS.RANGE', 'TS.RANGE',
timeSeriesKey, keys.timeSeries,
options.startUnixEpochMs.toString(), options.startUnixEpochMs.toString(),
options.endUnixEpochMs.toString() options.endUnixEpochMs.toString()
]) as [string, string][]; ]) as [string, string][];
@@ -173,20 +90,95 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
return []; return [];
} }
// Convert Redis time series data to our record format const records = this.convertTimeSeriesDataToRecords(timeSeriesData, id);
const records: IParkingStructureTimestampRecord[] = timeSeriesData.map(([timestamp, value]) => ({
id,
timestampMs: parseInt(timestamp),
spotsAvailable: parseInt(value)
}));
return this.calculateAveragesFromRecords(records, options); return this.calculateAveragesFromRecords(records, options);
} catch (error) { } catch (error) {
// Time series might not exist
return []; return [];
} }
}; };
private createRedisKeys = (structureId: string) => ({
structure: `parking:structure:${structureId}`,
timeSeries: `parking:timeseries:${structureId}`
});
private createRedisHashFromStructure = (structure: IParkingStructure): Record<string, string> => ({
id: structure.id,
name: structure.name,
address: structure.address,
capacity: structure.capacity.toString(),
spotsAvailable: structure.spotsAvailable.toString(),
latitude: structure.coordinates.latitude.toString(),
longitude: structure.coordinates.longitude.toString(),
updatedTime: structure.updatedTime.toISOString()
});
private createStructureFromRedisData = (data: Record<string, string>): IParkingStructure => ({
id: data.id,
name: data.name,
address: data.address,
capacity: parseInt(data.capacity),
spotsAvailable: parseInt(data.spotsAvailable),
coordinates: {
latitude: parseFloat(data.latitude),
longitude: parseFloat(data.longitude)
},
updatedTime: new Date(data.updatedTime)
});
private shouldLogHistoricalData = (lastAdded: Date | undefined, currentTime: number): boolean => {
return !lastAdded || (currentTime - lastAdded.getTime()) >= this.loggingIntervalMs;
};
private addTimeSeriesDataPoint = async (timeSeriesKey: string, timestamp: number, value: number, structureId: string): Promise<void> => {
try {
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
timestamp.toString(),
value.toString(),
'LABELS',
'structureId',
structureId
]);
} catch (error) {
await this.createTimeSeriesAndAddDataPoint(timeSeriesKey, timestamp, value, structureId);
}
};
private createTimeSeriesAndAddDataPoint = async (timeSeriesKey: string, timestamp: number, value: number, structureId: string): Promise<void> => {
try {
await this.redisClient.sendCommand([
'TS.CREATE',
timeSeriesKey,
'LABELS',
'structureId',
structureId
]);
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
timestamp.toString(),
value.toString()
]);
} catch (createError) {
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
timestamp.toString(),
value.toString()
]);
}
};
private convertTimeSeriesDataToRecords = (timeSeriesData: [string, string][], id: string): IParkingStructureTimestampRecord[] => {
return timeSeriesData.map(([timestamp, value]) => ({
id,
timestampMs: parseInt(timestamp),
spotsAvailable: parseInt(value)
}));
};
private calculateAveragesFromRecords = ( private calculateAveragesFromRecords = (
records: IParkingStructureTimestampRecord[], records: IParkingStructureTimestampRecord[],
options: ParkingStructureCountOptions options: ParkingStructureCountOptions

View File

@@ -63,6 +63,7 @@ describe.each(repositoryImplementations)('$name', (holder) => {
afterEach(async () => { afterEach(async () => {
await holder.teardown(); await holder.teardown();
jest.useRealTimers();
}); });
describe("addOrUpdateParkingStructure", () => { describe("addOrUpdateParkingStructure", () => {
@@ -177,7 +178,7 @@ describe.each(repositoryImplementations)('$name', (holder) => {
// Add updates with small delays to ensure different timestamps // Add updates with small delays to ensure different timestamps
for (let i = 0; i < updates.length; i++) { for (let i = 0; i < updates.length; i++) {
await repository.addOrUpdateParkingStructure(updates[i]); await repository.addOrUpdateParkingStructure(updates[i]);
await new Promise(resolve => setTimeout(resolve, 100)); // Small delay await new Promise((resolve) => setTimeout(resolve, 200));
} }
const now = Date.now(); const now = Date.now();