mirror of
https://github.com/brendan-ch/project-inter-server.git
synced 2026-04-19 08:50:29 +00:00
Update RedisParkingRepository.ts to use Redis aggregation functions
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
import { ParkingGetterSetterRepository } from "./ParkingGetterSetterRepository";
|
import { ParkingGetterSetterRepository } from "./ParkingGetterSetterRepository";
|
||||||
import { IParkingStructure, IParkingStructureTimestampRecord } from "../entities/ParkingRepositoryEntities";
|
import { IParkingStructure } from "../entities/ParkingRepositoryEntities";
|
||||||
import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository";
|
import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository";
|
||||||
import { BaseRedisRepository } from "./BaseRedisRepository";
|
import { BaseRedisRepository } from "./BaseRedisRepository";
|
||||||
import { PARKING_LOGGING_INTERVAL_MS } from "./ParkingRepositoryConstants";
|
import { PARKING_LOGGING_INTERVAL_MS } from "./ParkingRepositoryConstants";
|
||||||
@@ -76,25 +76,7 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
|
|||||||
};
|
};
|
||||||
|
|
||||||
getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: ParkingStructureCountOptions): Promise<HistoricalParkingAverageQueryResult[]> => {
|
getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: ParkingStructureCountOptions): Promise<HistoricalParkingAverageQueryResult[]> => {
|
||||||
const keys = this.createRedisKeys(id);
|
return this.calculateAveragesFromRecords(id, options);
|
||||||
|
|
||||||
try {
|
|
||||||
const timeSeriesData = await this.redisClient.sendCommand([
|
|
||||||
'TS.RANGE',
|
|
||||||
keys.timeSeries,
|
|
||||||
options.startUnixEpochMs.toString(),
|
|
||||||
options.endUnixEpochMs.toString()
|
|
||||||
]) as [string, string][];
|
|
||||||
|
|
||||||
if (!timeSeriesData || timeSeriesData.length === 0) {
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
|
|
||||||
const records = this.convertTimeSeriesDataToRecords(timeSeriesData, id);
|
|
||||||
return this.calculateAveragesFromRecords(records, options);
|
|
||||||
} catch (error) {
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
private createRedisKeys = (structureId: string) => ({
|
private createRedisKeys = (structureId: string) => ({
|
||||||
@@ -173,30 +155,40 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private convertTimeSeriesDataToRecords = (timeSeriesData: [string, string][], id: string): IParkingStructureTimestampRecord[] => {
|
private calculateAveragesFromRecords = async (
|
||||||
return timeSeriesData.map(([timestamp, value]) => ({
|
id: string,
|
||||||
id,
|
|
||||||
timestampMs: parseInt(timestamp),
|
|
||||||
spotsAvailable: parseInt(value)
|
|
||||||
}));
|
|
||||||
};
|
|
||||||
|
|
||||||
private calculateAveragesFromRecords = (
|
|
||||||
records: IParkingStructureTimestampRecord[],
|
|
||||||
options: ParkingStructureCountOptions
|
options: ParkingStructureCountOptions
|
||||||
): HistoricalParkingAverageQueryResult[] => {
|
): Promise<HistoricalParkingAverageQueryResult[]> => {
|
||||||
const results: HistoricalParkingAverageQueryResult[] = [];
|
const keys = this.createRedisKeys(id);
|
||||||
const { startUnixEpochMs, endUnixEpochMs, intervalMs } = options;
|
const { startUnixEpochMs, endUnixEpochMs, intervalMs } = options;
|
||||||
|
const results: HistoricalParkingAverageQueryResult[] = [];
|
||||||
|
|
||||||
let currentIntervalStart = startUnixEpochMs;
|
let currentIntervalStart = startUnixEpochMs;
|
||||||
|
|
||||||
while (currentIntervalStart < endUnixEpochMs) {
|
while (currentIntervalStart < endUnixEpochMs) {
|
||||||
const currentIntervalEnd = Math.min(currentIntervalStart + intervalMs, endUnixEpochMs);
|
const currentIntervalEnd = Math.min(currentIntervalStart + intervalMs, endUnixEpochMs);
|
||||||
const recordsInInterval = this.getRecordsInTimeRange(records, currentIntervalStart, currentIntervalEnd);
|
|
||||||
|
|
||||||
if (recordsInInterval.length > 0) {
|
try {
|
||||||
const averageResult = this.calculateAverageForInterval(currentIntervalStart, currentIntervalEnd, recordsInInterval);
|
const aggregationResult = await this.redisClient.sendCommand([
|
||||||
results.push(averageResult);
|
'TS.RANGE',
|
||||||
|
keys.timeSeries,
|
||||||
|
currentIntervalStart.toString(),
|
||||||
|
currentIntervalEnd.toString(),
|
||||||
|
'AGGREGATION',
|
||||||
|
'AVG',
|
||||||
|
intervalMs.toString()
|
||||||
|
]) as [string, string][];
|
||||||
|
|
||||||
|
if (aggregationResult && aggregationResult.length > 0) {
|
||||||
|
const [, averageValue] = aggregationResult[0];
|
||||||
|
results.push({
|
||||||
|
fromUnixEpochMs: currentIntervalStart,
|
||||||
|
toUnixEpochMs: currentIntervalEnd,
|
||||||
|
averageSpotsAvailable: parseFloat(averageValue)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
// If Redis aggregation fails, skip this interval
|
||||||
}
|
}
|
||||||
|
|
||||||
currentIntervalStart = currentIntervalEnd;
|
currentIntervalStart = currentIntervalEnd;
|
||||||
@@ -205,31 +197,6 @@ export class RedisParkingRepository extends BaseRedisRepository implements Parki
|
|||||||
return results;
|
return results;
|
||||||
};
|
};
|
||||||
|
|
||||||
private getRecordsInTimeRange = (
|
|
||||||
records: IParkingStructureTimestampRecord[],
|
|
||||||
startMs: number,
|
|
||||||
endMs: number
|
|
||||||
): IParkingStructureTimestampRecord[] => {
|
|
||||||
return records.filter(record =>
|
|
||||||
record.timestampMs >= startMs && record.timestampMs < endMs
|
|
||||||
);
|
|
||||||
};
|
|
||||||
|
|
||||||
private calculateAverageForInterval = (
|
|
||||||
fromMs: number,
|
|
||||||
toMs: number,
|
|
||||||
records: IParkingStructureTimestampRecord[]
|
|
||||||
): HistoricalParkingAverageQueryResult => {
|
|
||||||
const totalSpotsAvailable = records.reduce((sum, record) => sum + record.spotsAvailable, 0);
|
|
||||||
const averageSpotsAvailable = totalSpotsAvailable / records.length;
|
|
||||||
|
|
||||||
return {
|
|
||||||
fromUnixEpochMs: fromMs,
|
|
||||||
toUnixEpochMs: toMs,
|
|
||||||
averageSpotsAvailable
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
setLoggingInterval = (intervalMs: number): void => {
|
setLoggingInterval = (intervalMs: number): void => {
|
||||||
this.loggingIntervalMs = intervalMs;
|
this.loggingIntervalMs = intervalMs;
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user