mirror of
https://github.com/brendan-ch/project-inter-server.git
synced 2026-04-19 17:00:30 +00:00
Move repositories into folders.
This commit is contained in:
160
src/repositories/parking/InMemoryParkingRepository.ts
Normal file
160
src/repositories/parking/InMemoryParkingRepository.ts
Normal file
@@ -0,0 +1,160 @@
|
||||
import { ParkingGetterSetterRepository } from "./ParkingGetterSetterRepository";
|
||||
import {
|
||||
IParkingStructure,
|
||||
IParkingStructureTimestampRecord
|
||||
} from "../../entities/ParkingRepositoryEntities";
|
||||
import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository";
|
||||
import { CircularQueue } from "../../types/CircularQueue";
|
||||
import { PARKING_LOGGING_INTERVAL_MS } from "./ParkingRepositoryConstants";
|
||||
|
||||
// If every 10 minutes, two weeks of data (6x per hour * 24x per day * 7x per week * 2)
|
||||
export const MAX_NUM_ENTRIES = 2016;
|
||||
|
||||
export type ParkingStructureID = string;
|
||||
|
||||
export class InMemoryParkingRepository implements ParkingGetterSetterRepository {
|
||||
private dataLastAdded: Map<ParkingStructureID, Date> = new Map();
|
||||
private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS;
|
||||
|
||||
constructor(
|
||||
private structures: Map<ParkingStructureID, IParkingStructure> = new Map(),
|
||||
private historicalData: Map<ParkingStructureID, CircularQueue<IParkingStructureTimestampRecord>> = new Map(),
|
||||
) {
|
||||
}
|
||||
|
||||
addOrUpdateParkingStructure = async (structure: IParkingStructure): Promise<void> => {
|
||||
this.structures.set(structure.id, { ...structure });
|
||||
await this.addHistoricalDataForStructure(structure);
|
||||
};
|
||||
|
||||
private addHistoricalDataForStructure = async (structure: IParkingStructure): Promise<void> => {
|
||||
const now = Date.now();
|
||||
const lastAdded = this.dataLastAdded.get(structure.id);
|
||||
|
||||
if (this.shouldLogHistoricalData(lastAdded, now)) {
|
||||
const timestampRecord = this.createTimestampRecord(structure, now);
|
||||
this.ensureHistoricalDataExists(structure.id);
|
||||
this.addRecordToHistoricalData(structure.id, timestampRecord);
|
||||
this.dataLastAdded.set(structure.id, new Date(now));
|
||||
}
|
||||
};
|
||||
|
||||
clearParkingStructureData = async (): Promise<void> => {
|
||||
this.structures.clear();
|
||||
this.historicalData.clear();
|
||||
this.dataLastAdded.clear();
|
||||
};
|
||||
|
||||
getParkingStructureById = async (id: string): Promise<IParkingStructure | null> => {
|
||||
const structure = this.structures.get(id);
|
||||
return structure ? { ...structure } : null;
|
||||
};
|
||||
|
||||
getParkingStructures = async (): Promise<IParkingStructure[]> => Array.from(this.structures.values()).map(structure => ({...structure}));
|
||||
|
||||
removeParkingStructureIfExists = async (id: string): Promise<IParkingStructure | null> => {
|
||||
const structure = this.structures.get(id);
|
||||
if (structure) {
|
||||
this.structures.delete(id);
|
||||
this.historicalData.delete(id);
|
||||
this.dataLastAdded.delete(id);
|
||||
return { ...structure };
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: ParkingStructureCountOptions): Promise<HistoricalParkingAverageQueryResult[]> => {
|
||||
const queue = this.historicalData.get(id);
|
||||
if (!queue || queue.size() === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const records = this.extractRecordsFromQueue(queue);
|
||||
return this.calculateAveragesFromRecords(records, options);
|
||||
};
|
||||
|
||||
private shouldLogHistoricalData = (lastAdded: Date | undefined, currentTime: number): boolean => {
|
||||
return !lastAdded || (currentTime - lastAdded.getTime()) >= this.loggingIntervalMs;
|
||||
};
|
||||
|
||||
private createTimestampRecord = (structure: IParkingStructure, timestampMs: number): IParkingStructureTimestampRecord => ({
|
||||
id: structure.id,
|
||||
spotsAvailable: structure.spotsAvailable,
|
||||
timestampMs,
|
||||
});
|
||||
|
||||
private ensureHistoricalDataExists = (structureId: string): void => {
|
||||
if (!this.historicalData.has(structureId)) {
|
||||
this.historicalData.set(structureId, new CircularQueue<IParkingStructureTimestampRecord>(MAX_NUM_ENTRIES));
|
||||
}
|
||||
};
|
||||
|
||||
private addRecordToHistoricalData = (structureId: string, record: IParkingStructureTimestampRecord): void => {
|
||||
const sortingCallback = (a: IParkingStructureTimestampRecord, b: IParkingStructureTimestampRecord) => a.timestampMs - b.timestampMs;
|
||||
this.historicalData.get(structureId)?.appendWithSorting(record, sortingCallback);
|
||||
};
|
||||
|
||||
private extractRecordsFromQueue = (queue: CircularQueue<IParkingStructureTimestampRecord>): IParkingStructureTimestampRecord[] => {
|
||||
const records: IParkingStructureTimestampRecord[] = [];
|
||||
for (let i = 0; i < queue.size(); i++) {
|
||||
const record = queue.get(i);
|
||||
if (record) {
|
||||
records.push(record);
|
||||
}
|
||||
}
|
||||
return records;
|
||||
};
|
||||
|
||||
private calculateAveragesFromRecords = (
|
||||
records: IParkingStructureTimestampRecord[],
|
||||
options: ParkingStructureCountOptions
|
||||
): HistoricalParkingAverageQueryResult[] => {
|
||||
const results: HistoricalParkingAverageQueryResult[] = [];
|
||||
const { startUnixEpochMs, endUnixEpochMs, intervalMs } = options;
|
||||
|
||||
let currentIntervalStart = startUnixEpochMs;
|
||||
|
||||
while (currentIntervalStart < endUnixEpochMs) {
|
||||
const currentIntervalEnd = Math.min(currentIntervalStart + intervalMs, endUnixEpochMs);
|
||||
const recordsInInterval = this.getRecordsInTimeRange(records, currentIntervalStart, currentIntervalEnd);
|
||||
|
||||
if (recordsInInterval.length > 0) {
|
||||
const averageResult = this.calculateAverageForInterval(currentIntervalStart, currentIntervalEnd, recordsInInterval);
|
||||
results.push(averageResult);
|
||||
}
|
||||
|
||||
currentIntervalStart = currentIntervalEnd;
|
||||
}
|
||||
|
||||
return results;
|
||||
};
|
||||
|
||||
private getRecordsInTimeRange = (
|
||||
records: IParkingStructureTimestampRecord[],
|
||||
startMs: number,
|
||||
endMs: number
|
||||
): IParkingStructureTimestampRecord[] => {
|
||||
return records.filter(record =>
|
||||
record.timestampMs >= startMs && record.timestampMs < endMs
|
||||
);
|
||||
};
|
||||
|
||||
private calculateAverageForInterval = (
|
||||
fromMs: number,
|
||||
toMs: number,
|
||||
records: IParkingStructureTimestampRecord[]
|
||||
): HistoricalParkingAverageQueryResult => {
|
||||
const totalSpotsAvailable = records.reduce((sum, record) => sum + record.spotsAvailable, 0);
|
||||
const averageSpotsAvailable = totalSpotsAvailable / records.length;
|
||||
|
||||
return {
|
||||
fromUnixEpochMs: fromMs,
|
||||
toUnixEpochMs: toMs,
|
||||
averageSpotsAvailable
|
||||
};
|
||||
};
|
||||
|
||||
setLoggingInterval = (intervalMs: number): void => {
|
||||
this.loggingIntervalMs = intervalMs;
|
||||
};
|
||||
}
|
||||
26
src/repositories/parking/ParkingGetterRepository.ts
Normal file
26
src/repositories/parking/ParkingGetterRepository.ts
Normal file
@@ -0,0 +1,26 @@
|
||||
import { IParkingStructure } from "../../entities/ParkingRepositoryEntities";
|
||||
|
||||
export interface ParkingStructureCountOptions {
|
||||
startUnixEpochMs: number;
|
||||
endUnixEpochMs: number;
|
||||
intervalMs: number;
|
||||
}
|
||||
|
||||
export interface HistoricalParkingAverageQueryResult {
|
||||
fromUnixEpochMs: number;
|
||||
toUnixEpochMs: number;
|
||||
averageSpotsAvailable: number;
|
||||
}
|
||||
|
||||
|
||||
export interface ParkingGetterRepository {
|
||||
getParkingStructures(): Promise<IParkingStructure[]>;
|
||||
getParkingStructureById(id: string): Promise<IParkingStructure | null>;
|
||||
|
||||
/**
|
||||
* Get historical averages of parking structure data using the filtering options.
|
||||
* @param id
|
||||
* @param options
|
||||
*/
|
||||
getHistoricalAveragesOfParkingStructureCounts(id: string, options: ParkingStructureCountOptions): Promise<HistoricalParkingAverageQueryResult[]>;
|
||||
}
|
||||
12
src/repositories/parking/ParkingGetterSetterRepository.ts
Normal file
12
src/repositories/parking/ParkingGetterSetterRepository.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
import { IParkingStructure } from "../../entities/ParkingRepositoryEntities";
|
||||
import { ParkingGetterRepository } from "./ParkingGetterRepository";
|
||||
|
||||
export interface ParkingGetterSetterRepository extends ParkingGetterRepository {
|
||||
addOrUpdateParkingStructure(structure: IParkingStructure): Promise<void>;
|
||||
|
||||
removeParkingStructureIfExists(id: string): Promise<IParkingStructure | null>;
|
||||
|
||||
clearParkingStructureData(): Promise<void>;
|
||||
|
||||
setLoggingInterval(intervalMs: number): void;
|
||||
}
|
||||
3
src/repositories/parking/ParkingRepositoryConstants.ts
Normal file
3
src/repositories/parking/ParkingRepositoryConstants.ts
Normal file
@@ -0,0 +1,3 @@
|
||||
export const PARKING_LOGGING_INTERVAL_MS = process.env.PARKING_LOGGING_INTERVAL_MS
|
||||
? parseInt(process.env.PARKING_LOGGING_INTERVAL_MS)
|
||||
: 600000; // Every 10 minutes
|
||||
203
src/repositories/parking/RedisParkingRepository.ts
Normal file
203
src/repositories/parking/RedisParkingRepository.ts
Normal file
@@ -0,0 +1,203 @@
|
||||
import { ParkingGetterSetterRepository } from "./ParkingGetterSetterRepository";
|
||||
import { IParkingStructure } from "../../entities/ParkingRepositoryEntities";
|
||||
import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository";
|
||||
import { BaseRedisRepository } from "../BaseRedisRepository";
|
||||
import { PARKING_LOGGING_INTERVAL_MS } from "./ParkingRepositoryConstants";
|
||||
|
||||
export type ParkingStructureID = string;
|
||||
|
||||
export class RedisParkingRepository extends BaseRedisRepository implements ParkingGetterSetterRepository {
|
||||
private dataLastAdded: Map<ParkingStructureID, Date> = new Map();
|
||||
private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS;
|
||||
|
||||
addOrUpdateParkingStructure = async (structure: IParkingStructure): Promise<void> => {
|
||||
const keys = this.createRedisKeys(structure.id);
|
||||
await this.redisClient.hSet(keys.structure, this.createRedisHashFromStructure(structure));
|
||||
await this.addHistoricalDataForStructure(structure);
|
||||
};
|
||||
|
||||
private addHistoricalDataForStructure = async (structure: IParkingStructure): Promise<void> => {
|
||||
const now = Date.now();
|
||||
const lastAdded = this.dataLastAdded.get(structure.id);
|
||||
|
||||
if (this.shouldLogHistoricalData(lastAdded, now)) {
|
||||
const keys = this.createRedisKeys(structure.id);
|
||||
await this.addTimeSeriesDataPoint(keys.timeSeries, now, structure.spotsAvailable, structure.id);
|
||||
this.dataLastAdded.set(structure.id, new Date(now));
|
||||
}
|
||||
};
|
||||
|
||||
clearParkingStructureData = async (): Promise<void> => {
|
||||
const structureKeys = await this.redisClient.keys('parking:structure:*');
|
||||
const timeSeriesKeys = await this.redisClient.keys('parking:timeseries:*');
|
||||
|
||||
const allKeys = [...structureKeys, ...timeSeriesKeys];
|
||||
if (allKeys.length > 0) {
|
||||
await this.redisClient.del(allKeys);
|
||||
}
|
||||
|
||||
this.dataLastAdded.clear();
|
||||
};
|
||||
|
||||
getParkingStructureById = async (id: string): Promise<IParkingStructure | null> => {
|
||||
const keys = this.createRedisKeys(id);
|
||||
const data = await this.redisClient.hGetAll(keys.structure);
|
||||
|
||||
if (Object.keys(data).length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return this.createStructureFromRedisData(data);
|
||||
};
|
||||
|
||||
getParkingStructures = async (): Promise<IParkingStructure[]> => {
|
||||
const keys = await this.redisClient.keys('parking:structure:*');
|
||||
const structures: IParkingStructure[] = [];
|
||||
|
||||
for (const key of keys) {
|
||||
const data = await this.redisClient.hGetAll(key);
|
||||
if (Object.keys(data).length > 0) {
|
||||
structures.push(this.createStructureFromRedisData(data));
|
||||
}
|
||||
}
|
||||
|
||||
return structures;
|
||||
};
|
||||
|
||||
removeParkingStructureIfExists = async (id: string): Promise<IParkingStructure | null> => {
|
||||
const structure = await this.getParkingStructureById(id);
|
||||
if (structure) {
|
||||
const keys = this.createRedisKeys(id);
|
||||
await this.redisClient.del([keys.structure, keys.timeSeries]);
|
||||
this.dataLastAdded.delete(id);
|
||||
return structure;
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: ParkingStructureCountOptions): Promise<HistoricalParkingAverageQueryResult[]> => {
|
||||
return this.calculateAveragesFromRecords(id, options);
|
||||
};
|
||||
|
||||
private createRedisKeys = (structureId: string) => ({
|
||||
structure: `parking:structure:${structureId}`,
|
||||
timeSeries: `parking:timeseries:${structureId}`
|
||||
});
|
||||
|
||||
private createRedisHashFromStructure = (structure: IParkingStructure): Record<string, string> => ({
|
||||
id: structure.id,
|
||||
name: structure.name,
|
||||
address: structure.address,
|
||||
capacity: structure.capacity.toString(),
|
||||
spotsAvailable: structure.spotsAvailable.toString(),
|
||||
latitude: structure.coordinates.latitude.toString(),
|
||||
longitude: structure.coordinates.longitude.toString(),
|
||||
updatedTime: structure.updatedTime.toISOString()
|
||||
});
|
||||
|
||||
private createStructureFromRedisData = (data: Record<string, string>): IParkingStructure => ({
|
||||
id: data.id,
|
||||
name: data.name,
|
||||
address: data.address,
|
||||
capacity: parseInt(data.capacity),
|
||||
spotsAvailable: parseInt(data.spotsAvailable),
|
||||
coordinates: {
|
||||
latitude: parseFloat(data.latitude),
|
||||
longitude: parseFloat(data.longitude)
|
||||
},
|
||||
updatedTime: new Date(data.updatedTime)
|
||||
});
|
||||
|
||||
private shouldLogHistoricalData = (lastAdded: Date | undefined, currentTime: number): boolean => {
|
||||
return !lastAdded || (currentTime - lastAdded.getTime()) >= this.loggingIntervalMs;
|
||||
};
|
||||
|
||||
private addTimeSeriesDataPoint = async (timeSeriesKey: string, timestamp: number, value: number, structureId: string): Promise<void> => {
|
||||
try {
|
||||
await this.redisClient.sendCommand([
|
||||
'TS.ADD',
|
||||
timeSeriesKey,
|
||||
timestamp.toString(),
|
||||
value.toString(),
|
||||
'LABELS',
|
||||
'structureId',
|
||||
structureId
|
||||
]);
|
||||
} catch (error) {
|
||||
await this.createTimeSeriesAndAddDataPoint(timeSeriesKey, timestamp, value, structureId);
|
||||
}
|
||||
};
|
||||
|
||||
private createTimeSeriesAndAddDataPoint = async (timeSeriesKey: string, timestamp: number, value: number, structureId: string): Promise<void> => {
|
||||
try {
|
||||
await this.redisClient.sendCommand([
|
||||
'TS.CREATE',
|
||||
timeSeriesKey,
|
||||
'RETENTION',
|
||||
'2678400000', // one month
|
||||
'LABELS',
|
||||
'structureId',
|
||||
structureId
|
||||
]);
|
||||
await this.redisClient.sendCommand([
|
||||
'TS.ADD',
|
||||
timeSeriesKey,
|
||||
timestamp.toString(),
|
||||
value.toString()
|
||||
]);
|
||||
} catch (createError) {
|
||||
await this.redisClient.sendCommand([
|
||||
'TS.ADD',
|
||||
timeSeriesKey,
|
||||
timestamp.toString(),
|
||||
value.toString()
|
||||
]);
|
||||
}
|
||||
};
|
||||
|
||||
private calculateAveragesFromRecords = async (
|
||||
id: string,
|
||||
options: ParkingStructureCountOptions
|
||||
): Promise<HistoricalParkingAverageQueryResult[]> => {
|
||||
const keys = this.createRedisKeys(id);
|
||||
const { startUnixEpochMs, endUnixEpochMs, intervalMs } = options;
|
||||
const results: HistoricalParkingAverageQueryResult[] = [];
|
||||
|
||||
let currentIntervalStart = startUnixEpochMs;
|
||||
|
||||
while (currentIntervalStart < endUnixEpochMs) {
|
||||
const currentIntervalEnd = Math.min(currentIntervalStart + intervalMs, endUnixEpochMs);
|
||||
|
||||
try {
|
||||
const aggregationResult = await this.redisClient.sendCommand([
|
||||
'TS.RANGE',
|
||||
keys.timeSeries,
|
||||
currentIntervalStart.toString(),
|
||||
currentIntervalEnd.toString(),
|
||||
'AGGREGATION',
|
||||
'AVG',
|
||||
intervalMs.toString()
|
||||
]) as [string, string][];
|
||||
|
||||
if (aggregationResult && aggregationResult.length > 0) {
|
||||
const [, averageValue] = aggregationResult[0];
|
||||
results.push({
|
||||
fromUnixEpochMs: currentIntervalStart,
|
||||
toUnixEpochMs: currentIntervalEnd,
|
||||
averageSpotsAvailable: parseFloat(averageValue)
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
// If Redis aggregation fails, skip this interval
|
||||
}
|
||||
|
||||
currentIntervalStart = currentIntervalEnd;
|
||||
}
|
||||
|
||||
return results;
|
||||
};
|
||||
|
||||
setLoggingInterval = (intervalMs: number): void => {
|
||||
this.loggingIntervalMs = intervalMs;
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user