Merge pull request #57 from brendan-ch/feat/time-series-database

feat/time-series-database
This commit is contained in:
2025-07-03 16:05:44 -04:00
committed by GitHub
24 changed files with 1003 additions and 230 deletions

View File

@@ -7,3 +7,5 @@ APNS_BUNDLE_ID=
# base64-encoded APNs private key
APNS_PRIVATE_KEY=
PARKING_LOGGING_INTERVAL_MS=

3
.gitignore vendored
View File

@@ -23,3 +23,6 @@ yarn-error.log*
# Keys
private/
# JetBrains
.idea

8
.idea/.gitignore generated vendored
View File

@@ -1,8 +0,0 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@@ -1,5 +0,0 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

View File

@@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="ExceptionCaughtLocallyJS" enabled="false" level="WARNING" enabled_by_default="false" />
</profile>
</component>

8
.idea/modules.xml generated
View File

@@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/project-inter-server.iml" filepath="$PROJECT_DIR$/.idea/project-inter-server.iml" />
</modules>
</component>
</project>

View File

@@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.tmp" />
<excludeFolder url="file://$MODULE_DIR$/temp" />
<excludeFolder url="file://$MODULE_DIR$/tmp" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

6
.idea/vcs.xml generated
View File

@@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

View File

@@ -93,5 +93,11 @@ Currently supports Chapman University (Passio System ID: "263"). Each university
## Development Guidelines
### General Guidelines
- Use test-driven development. Always write tests before implementation, and run them before and after implementation.
### Git Workflow
- Use the name of the branch for all pull requests
- Use the name of the branch for all pull requests
### Code Style
- Prefer arrow functions, especially in classes

View File

@@ -10,6 +10,7 @@ x-common-environment: &common-server-environment
APNS_TEAM_ID: ${APNS_TEAM_ID}
APNS_KEY_ID: ${APNS_KEY_ID}
APNS_PRIVATE_KEY: ${APNS_PRIVATE_KEY}
PARKING_LOGGING_INTERVAL_MS: ${PARKING_LOGGING_INTERVAL_MS}
REDIS_URL: redis://redis:6379
services:
@@ -50,15 +51,21 @@ services:
- .:/usr/src/app
redis:
image: redis:alpine
image: redis/redis-stack:7.2.0-v17
container_name: redis-timeseries
ports:
- "6379:6379"
volumes:
- ./redis.conf:/usr/local/etc/redis/redis.conf
command: redis-server /usr/local/etc/redis/redis.conf
- redis_data:/data
- ./redis-stack.conf:/redis-stack.conf
command: redis-stack-server /redis-stack.conf
redis-no-persistence:
image: redis:alpine
image: redis/redis-stack:7.2.0-v17
container_name: redis-timeseries-no-persistence
ports:
- "6379:6379"
volumes:
redis_data: # Add this volume definition

View File

@@ -13,6 +13,7 @@ import {
buildParkingRepositoryLoaderIfExists,
ParkingRepositoryLoaderBuilderArguments
} from "../loaders/parking/buildParkingRepositoryLoaderIfExists";
import { RedisParkingRepository } from "../repositories/RedisParkingRepository";
export interface InterchangeSystemBuilderArguments {
name: string;
@@ -75,7 +76,7 @@ export class InterchangeSystem {
);
notificationScheduler.startListeningForUpdates();
let { parkingRepository, timedParkingLoader } = this.buildParkingLoaderAndRepository(args.parkingSystemId);
let { parkingRepository, timedParkingLoader } = await this.buildRedisParkingLoaderAndRepository(args.parkingSystemId);
timedParkingLoader?.start();
return new InterchangeSystem(
@@ -120,8 +121,8 @@ export class InterchangeSystem {
);
notificationScheduler.startListeningForUpdates();
let { parkingRepository, timedParkingLoader } = this.buildParkingLoaderAndRepository(args.parkingSystemId);
// Timed parking loader is not started
let { parkingRepository, timedParkingLoader } = this.buildInMemoryParkingLoaderAndRepository(args.parkingSystemId);
// Timed parking loader is not started here
return new InterchangeSystem(
args.name,
@@ -135,7 +136,32 @@ export class InterchangeSystem {
);
}
private static buildParkingLoaderAndRepository(id?: string) {
private static async buildRedisParkingLoaderAndRepository(id?: string) {
if (id === undefined) {
return { parkingRepository: null, timedParkingLoader: null };
}
let parkingRepository: RedisParkingRepository | null = new RedisParkingRepository();
await parkingRepository.connect();
const loaderBuilderArguments: ParkingRepositoryLoaderBuilderArguments = {
id,
repository: parkingRepository,
};
let parkingLoader = buildParkingRepositoryLoaderIfExists(
loaderBuilderArguments,
);
let timedParkingLoader = null;
if (parkingLoader == null) {
parkingRepository = null;
} else {
timedParkingLoader = new TimedApiBasedRepositoryLoader(parkingLoader);
}
return { parkingRepository, timedParkingLoader };
}
private static buildInMemoryParkingLoaderAndRepository(id?: string) {
if (id === undefined) {
return { parkingRepository: null, timedParkingLoader: null };
}

View File

@@ -8,4 +8,10 @@ export interface IParkingStructure extends IEntityWithTimestamp, IEntityWithId {
name: string;
}
export interface IParkingStructureTimestampRecord {
timestampMs: number;
id: string;
spotsAvailable: number;
}
// In the future, add support for viewing different levels of the structure

View File

@@ -0,0 +1,33 @@
import { createClient } from 'redis';
export abstract class BaseRedisRepository {
protected redisClient;
constructor(
redisClient = createClient({
url: process.env.REDIS_URL,
socket: {
tls: (process.env.REDIS_URL?.match(/rediss:/) != null),
rejectUnauthorized: false,
}
}),
) {
this.redisClient = redisClient;
}
get isReady() {
return this.redisClient.isReady;
}
public async connect() {
await this.redisClient.connect();
}
public async disconnect() {
await this.redisClient.disconnect();
}
public async clearAllData() {
await this.redisClient.flushAll();
}
}

View File

@@ -1,36 +1,160 @@
import { ParkingGetterSetterRepository } from "./ParkingGetterSetterRepository";
import { IParkingStructure } from "../entities/ParkingRepositoryEntities";
import {
IParkingStructure,
IParkingStructureTimestampRecord
} from "../entities/ParkingRepositoryEntities";
import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository";
import { CircularQueue } from "../types/CircularQueue";
import { PARKING_LOGGING_INTERVAL_MS } from "./ParkingRepositoryConstants";
// If every 10 minutes, two weeks of data (6x per hour * 24x per day * 7x per week * 2)
export const MAX_NUM_ENTRIES = 2016;
export type ParkingStructureID = string;
export class InMemoryParkingRepository implements ParkingGetterSetterRepository {
private structures: Map<string, IParkingStructure>;
private dataLastAdded: Map<ParkingStructureID, Date> = new Map();
private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS;
constructor() {
this.structures = new Map<string, IParkingStructure>();
constructor(
private structures: Map<ParkingStructureID, IParkingStructure> = new Map(),
private historicalData: Map<ParkingStructureID, CircularQueue<IParkingStructureTimestampRecord>> = new Map(),
) {
}
async addOrUpdateParkingStructure(structure: IParkingStructure): Promise<void> {
addOrUpdateParkingStructure = async (structure: IParkingStructure): Promise<void> => {
this.structures.set(structure.id, { ...structure });
}
await this.addHistoricalDataForStructure(structure);
};
async clearParkingStructureData(): Promise<void> {
private addHistoricalDataForStructure = async (structure: IParkingStructure): Promise<void> => {
const now = Date.now();
const lastAdded = this.dataLastAdded.get(structure.id);
if (this.shouldLogHistoricalData(lastAdded, now)) {
const timestampRecord = this.createTimestampRecord(structure, now);
this.ensureHistoricalDataExists(structure.id);
this.addRecordToHistoricalData(structure.id, timestampRecord);
this.dataLastAdded.set(structure.id, new Date(now));
}
};
clearParkingStructureData = async (): Promise<void> => {
this.structures.clear();
}
this.historicalData.clear();
this.dataLastAdded.clear();
};
async getParkingStructureById(id: string): Promise<IParkingStructure | null> {
getParkingStructureById = async (id: string): Promise<IParkingStructure | null> => {
const structure = this.structures.get(id);
return structure ? { ...structure } : null;
}
};
async getParkingStructures(): Promise<IParkingStructure[]> {
return Array.from(this.structures.values()).map(structure => ({ ...structure }));
}
getParkingStructures = async (): Promise<IParkingStructure[]> => Array.from(this.structures.values()).map(structure => ({...structure}));
async removeParkingStructureIfExists(id: string): Promise<IParkingStructure | null> {
removeParkingStructureIfExists = async (id: string): Promise<IParkingStructure | null> => {
const structure = this.structures.get(id);
if (structure) {
this.structures.delete(id);
this.historicalData.delete(id);
this.dataLastAdded.delete(id);
return { ...structure };
}
return null;
}
};
getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: ParkingStructureCountOptions): Promise<HistoricalParkingAverageQueryResult[]> => {
const queue = this.historicalData.get(id);
if (!queue || queue.size() === 0) {
return [];
}
const records = this.extractRecordsFromQueue(queue);
return this.calculateAveragesFromRecords(records, options);
};
private shouldLogHistoricalData = (lastAdded: Date | undefined, currentTime: number): boolean => {
return !lastAdded || (currentTime - lastAdded.getTime()) >= this.loggingIntervalMs;
};
private createTimestampRecord = (structure: IParkingStructure, timestampMs: number): IParkingStructureTimestampRecord => ({
id: structure.id,
spotsAvailable: structure.spotsAvailable,
timestampMs,
});
private ensureHistoricalDataExists = (structureId: string): void => {
if (!this.historicalData.has(structureId)) {
this.historicalData.set(structureId, new CircularQueue<IParkingStructureTimestampRecord>(MAX_NUM_ENTRIES));
}
};
private addRecordToHistoricalData = (structureId: string, record: IParkingStructureTimestampRecord): void => {
const sortingCallback = (a: IParkingStructureTimestampRecord, b: IParkingStructureTimestampRecord) => a.timestampMs - b.timestampMs;
this.historicalData.get(structureId)?.appendWithSorting(record, sortingCallback);
};
private extractRecordsFromQueue = (queue: CircularQueue<IParkingStructureTimestampRecord>): IParkingStructureTimestampRecord[] => {
const records: IParkingStructureTimestampRecord[] = [];
for (let i = 0; i < queue.size(); i++) {
const record = queue.get(i);
if (record) {
records.push(record);
}
}
return records;
};
private calculateAveragesFromRecords = (
records: IParkingStructureTimestampRecord[],
options: ParkingStructureCountOptions
): HistoricalParkingAverageQueryResult[] => {
const results: HistoricalParkingAverageQueryResult[] = [];
const { startUnixEpochMs, endUnixEpochMs, intervalMs } = options;
let currentIntervalStart = startUnixEpochMs;
while (currentIntervalStart < endUnixEpochMs) {
const currentIntervalEnd = Math.min(currentIntervalStart + intervalMs, endUnixEpochMs);
const recordsInInterval = this.getRecordsInTimeRange(records, currentIntervalStart, currentIntervalEnd);
if (recordsInInterval.length > 0) {
const averageResult = this.calculateAverageForInterval(currentIntervalStart, currentIntervalEnd, recordsInInterval);
results.push(averageResult);
}
currentIntervalStart = currentIntervalEnd;
}
return results;
};
private getRecordsInTimeRange = (
records: IParkingStructureTimestampRecord[],
startMs: number,
endMs: number
): IParkingStructureTimestampRecord[] => {
return records.filter(record =>
record.timestampMs >= startMs && record.timestampMs < endMs
);
};
private calculateAverageForInterval = (
fromMs: number,
toMs: number,
records: IParkingStructureTimestampRecord[]
): HistoricalParkingAverageQueryResult => {
const totalSpotsAvailable = records.reduce((sum, record) => sum + record.spotsAvailable, 0);
const averageSpotsAvailable = totalSpotsAvailable / records.length;
return {
fromUnixEpochMs: fromMs,
toUnixEpochMs: toMs,
averageSpotsAvailable
};
};
setLoggingInterval = (intervalMs: number): void => {
this.loggingIntervalMs = intervalMs;
};
}

View File

@@ -1,6 +1,26 @@
import { IParkingStructure } from "../entities/ParkingRepositoryEntities";
export interface ParkingStructureCountOptions {
startUnixEpochMs: number;
endUnixEpochMs: number;
intervalMs: number;
}
export interface HistoricalParkingAverageQueryResult {
fromUnixEpochMs: number;
toUnixEpochMs: number;
averageSpotsAvailable: number;
}
export interface ParkingGetterRepository {
getParkingStructures(): Promise<IParkingStructure[]>;
getParkingStructureById(id: string): Promise<IParkingStructure | null>;
/**
* Get historical averages of parking structure data using the filtering options.
* @param id
* @param options
*/
getHistoricalAveragesOfParkingStructureCounts(id: string, options: ParkingStructureCountOptions): Promise<HistoricalParkingAverageQueryResult[]>;
}

View File

@@ -7,4 +7,6 @@ export interface ParkingGetterSetterRepository extends ParkingGetterRepository {
removeParkingStructureIfExists(id: string): Promise<IParkingStructure | null>;
clearParkingStructureData(): Promise<void>;
setLoggingInterval(intervalMs: number): void;
}

View File

@@ -0,0 +1,3 @@
export const PARKING_LOGGING_INTERVAL_MS = process.env.PARKING_LOGGING_INTERVAL_MS
? parseInt(process.env.PARKING_LOGGING_INTERVAL_MS)
: 600000; // Every 10 minutes

View File

@@ -6,52 +6,18 @@ import {
NotificationRepository,
ScheduledNotification
} from "./NotificationRepository";
import { createClient } from "redis";
import { BaseRedisRepository } from "./BaseRedisRepository";
export class RedisNotificationRepository implements NotificationRepository {
export class RedisNotificationRepository extends BaseRedisRepository implements NotificationRepository {
private listeners: Listener[] = [];
private readonly NOTIFICATION_KEY_PREFIX = 'notification:';
constructor(
private redisClient = createClient({
url: process.env.REDIS_URL,
socket: {
tls: (process.env.REDIS_URL?.match(/rediss:/) != null),
rejectUnauthorized: false,
}
}),
) {
this.getAllNotificationsForShuttleAndStopId = this.getAllNotificationsForShuttleAndStopId.bind(this);
this.getSecondsThresholdForNotificationIfExists = this.getSecondsThresholdForNotificationIfExists.bind(this);
this.deleteNotificationIfExists = this.deleteNotificationIfExists.bind(this);
this.addOrUpdateNotification = this.addOrUpdateNotification.bind(this);
this.isNotificationScheduled = this.isNotificationScheduled.bind(this);
this.subscribeToNotificationChanges = this.subscribeToNotificationChanges.bind(this);
this.unsubscribeFromNotificationChanges = this.unsubscribeFromNotificationChanges.bind(this);
}
get isReady() {
return this.redisClient.isReady;
}
public async connect() {
await this.redisClient.connect();
}
public async disconnect() {
await this.redisClient.disconnect();
}
public async clearAllData() {
await this.redisClient.flushAll();
}
private getNotificationKey(shuttleId: string, stopId: string): string {
private getNotificationKey = (shuttleId: string, stopId: string): string => {
const tuple = new TupleKey(shuttleId, stopId);
return `${this.NOTIFICATION_KEY_PREFIX}${tuple.toString()}`;
}
};
public async addOrUpdateNotification(notification: ScheduledNotification): Promise<void> {
public addOrUpdateNotification = async (notification: ScheduledNotification): Promise<void> => {
const { shuttleId, stopId, deviceId, secondsThreshold } = notification;
const key = this.getNotificationKey(shuttleId, stopId);
@@ -64,9 +30,9 @@ export class RedisNotificationRepository implements NotificationRepository {
};
listener(event);
});
}
};
public async deleteNotificationIfExists(lookupArguments: NotificationLookupArguments): Promise<void> {
public deleteNotificationIfExists = async (lookupArguments: NotificationLookupArguments): Promise<void> => {
const { shuttleId, stopId, deviceId } = lookupArguments;
const key = this.getNotificationKey(shuttleId, stopId);
@@ -93,12 +59,12 @@ export class RedisNotificationRepository implements NotificationRepository {
listener(event);
});
}
}
};
public async getAllNotificationsForShuttleAndStopId(
public getAllNotificationsForShuttleAndStopId = async (
shuttleId: string,
stopId: string
): Promise<ScheduledNotification[]> {
): Promise<ScheduledNotification[]> => {
const key = this.getNotificationKey(shuttleId, stopId);
const allNotifications = await this.redisClient.hGetAll(key);
@@ -108,40 +74,40 @@ export class RedisNotificationRepository implements NotificationRepository {
deviceId,
secondsThreshold: parseInt(secondsThreshold)
}));
}
};
public async getSecondsThresholdForNotificationIfExists(
public getSecondsThresholdForNotificationIfExists = async (
lookupArguments: NotificationLookupArguments
): Promise<number | null> {
): Promise<number | null> => {
const { shuttleId, stopId, deviceId } = lookupArguments;
const key = this.getNotificationKey(shuttleId, stopId);
const threshold = await this.redisClient.hGet(key, deviceId);
return threshold ? parseInt(threshold) : null;
}
};
public async isNotificationScheduled(
public isNotificationScheduled = async (
lookupArguments: NotificationLookupArguments
): Promise<boolean> {
): Promise<boolean> => {
const threshold = await this.getSecondsThresholdForNotificationIfExists(lookupArguments);
return threshold !== null;
}
};
public subscribeToNotificationChanges(listener: Listener): void {
public subscribeToNotificationChanges = (listener: Listener): void => {
const index = this.listeners.findIndex(
(existingListener) => existingListener === listener
);
if (index < 0) {
this.listeners.push(listener);
}
}
};
public unsubscribeFromNotificationChanges(listener: Listener): void {
public unsubscribeFromNotificationChanges = (listener: Listener): void => {
const index = this.listeners.findIndex(
(existingListener) => existingListener === listener
);
if (index >= 0) {
this.listeners.splice(index, 1);
}
}
};
}

View File

@@ -0,0 +1,203 @@
import { ParkingGetterSetterRepository } from "./ParkingGetterSetterRepository";
import { IParkingStructure } from "../entities/ParkingRepositoryEntities";
import { HistoricalParkingAverageQueryResult, ParkingStructureCountOptions } from "./ParkingGetterRepository";
import { BaseRedisRepository } from "./BaseRedisRepository";
import { PARKING_LOGGING_INTERVAL_MS } from "./ParkingRepositoryConstants";
export type ParkingStructureID = string;
export class RedisParkingRepository extends BaseRedisRepository implements ParkingGetterSetterRepository {
private dataLastAdded: Map<ParkingStructureID, Date> = new Map();
private loggingIntervalMs = PARKING_LOGGING_INTERVAL_MS;
addOrUpdateParkingStructure = async (structure: IParkingStructure): Promise<void> => {
const keys = this.createRedisKeys(structure.id);
await this.redisClient.hSet(keys.structure, this.createRedisHashFromStructure(structure));
await this.addHistoricalDataForStructure(structure);
};
private addHistoricalDataForStructure = async (structure: IParkingStructure): Promise<void> => {
const now = Date.now();
const lastAdded = this.dataLastAdded.get(structure.id);
if (this.shouldLogHistoricalData(lastAdded, now)) {
const keys = this.createRedisKeys(structure.id);
await this.addTimeSeriesDataPoint(keys.timeSeries, now, structure.spotsAvailable, structure.id);
this.dataLastAdded.set(structure.id, new Date(now));
}
};
clearParkingStructureData = async (): Promise<void> => {
const structureKeys = await this.redisClient.keys('parking:structure:*');
const timeSeriesKeys = await this.redisClient.keys('parking:timeseries:*');
const allKeys = [...structureKeys, ...timeSeriesKeys];
if (allKeys.length > 0) {
await this.redisClient.del(allKeys);
}
this.dataLastAdded.clear();
};
getParkingStructureById = async (id: string): Promise<IParkingStructure | null> => {
const keys = this.createRedisKeys(id);
const data = await this.redisClient.hGetAll(keys.structure);
if (Object.keys(data).length === 0) {
return null;
}
return this.createStructureFromRedisData(data);
};
getParkingStructures = async (): Promise<IParkingStructure[]> => {
const keys = await this.redisClient.keys('parking:structure:*');
const structures: IParkingStructure[] = [];
for (const key of keys) {
const data = await this.redisClient.hGetAll(key);
if (Object.keys(data).length > 0) {
structures.push(this.createStructureFromRedisData(data));
}
}
return structures;
};
removeParkingStructureIfExists = async (id: string): Promise<IParkingStructure | null> => {
const structure = await this.getParkingStructureById(id);
if (structure) {
const keys = this.createRedisKeys(id);
await this.redisClient.del([keys.structure, keys.timeSeries]);
this.dataLastAdded.delete(id);
return structure;
}
return null;
};
getHistoricalAveragesOfParkingStructureCounts = async (id: string, options: ParkingStructureCountOptions): Promise<HistoricalParkingAverageQueryResult[]> => {
return this.calculateAveragesFromRecords(id, options);
};
private createRedisKeys = (structureId: string) => ({
structure: `parking:structure:${structureId}`,
timeSeries: `parking:timeseries:${structureId}`
});
private createRedisHashFromStructure = (structure: IParkingStructure): Record<string, string> => ({
id: structure.id,
name: structure.name,
address: structure.address,
capacity: structure.capacity.toString(),
spotsAvailable: structure.spotsAvailable.toString(),
latitude: structure.coordinates.latitude.toString(),
longitude: structure.coordinates.longitude.toString(),
updatedTime: structure.updatedTime.toISOString()
});
private createStructureFromRedisData = (data: Record<string, string>): IParkingStructure => ({
id: data.id,
name: data.name,
address: data.address,
capacity: parseInt(data.capacity),
spotsAvailable: parseInt(data.spotsAvailable),
coordinates: {
latitude: parseFloat(data.latitude),
longitude: parseFloat(data.longitude)
},
updatedTime: new Date(data.updatedTime)
});
private shouldLogHistoricalData = (lastAdded: Date | undefined, currentTime: number): boolean => {
return !lastAdded || (currentTime - lastAdded.getTime()) >= this.loggingIntervalMs;
};
private addTimeSeriesDataPoint = async (timeSeriesKey: string, timestamp: number, value: number, structureId: string): Promise<void> => {
try {
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
timestamp.toString(),
value.toString(),
'LABELS',
'structureId',
structureId
]);
} catch (error) {
await this.createTimeSeriesAndAddDataPoint(timeSeriesKey, timestamp, value, structureId);
}
};
private createTimeSeriesAndAddDataPoint = async (timeSeriesKey: string, timestamp: number, value: number, structureId: string): Promise<void> => {
try {
await this.redisClient.sendCommand([
'TS.CREATE',
timeSeriesKey,
'RETENTION',
'2678400000', // one month
'LABELS',
'structureId',
structureId
]);
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
timestamp.toString(),
value.toString()
]);
} catch (createError) {
await this.redisClient.sendCommand([
'TS.ADD',
timeSeriesKey,
timestamp.toString(),
value.toString()
]);
}
};
private calculateAveragesFromRecords = async (
id: string,
options: ParkingStructureCountOptions
): Promise<HistoricalParkingAverageQueryResult[]> => {
const keys = this.createRedisKeys(id);
const { startUnixEpochMs, endUnixEpochMs, intervalMs } = options;
const results: HistoricalParkingAverageQueryResult[] = [];
let currentIntervalStart = startUnixEpochMs;
while (currentIntervalStart < endUnixEpochMs) {
const currentIntervalEnd = Math.min(currentIntervalStart + intervalMs, endUnixEpochMs);
try {
const aggregationResult = await this.redisClient.sendCommand([
'TS.RANGE',
keys.timeSeries,
currentIntervalStart.toString(),
currentIntervalEnd.toString(),
'AGGREGATION',
'AVG',
intervalMs.toString()
]) as [string, string][];
if (aggregationResult && aggregationResult.length > 0) {
const [, averageValue] = aggregationResult[0];
results.push({
fromUnixEpochMs: currentIntervalStart,
toUnixEpochMs: currentIntervalEnd,
averageSpotsAvailable: parseFloat(averageValue)
});
}
} catch (error) {
// If Redis aggregation fails, skip this interval
}
currentIntervalStart = currentIntervalEnd;
}
return results;
};
setLoggingInterval = (intervalMs: number): void => {
this.loggingIntervalMs = intervalMs;
};
}

119
src/types/CircularQueue.ts Normal file
View File

@@ -0,0 +1,119 @@
export class CircularQueue<T> {
private startIndex: number;
private endIndex: number;
private _data: T[];
private _size: number;
private _capacity: number;
constructor(
size: number,
) {
// See the Mozilla documentation on sparse arrays (*not* undefined values)
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Indexed_collections#sparse_arrays
this._data = new Array<T>(size);
this.startIndex = 0;
this.endIndex = 0;
this._size = 0;
this._capacity = size;
}
size = (): number => this._size;
get = (index: number): T | undefined => {
if (index < 0 || index >= this._size) {
return undefined;
}
const actualIndex = (this.startIndex + index) % this._capacity;
return this._data[actualIndex];
};
appendWithSorting = (
data: T,
sortingCallback: (a: T, b: T) => number
) => {
if (this._size === 0) {
this._data[this.startIndex] = data;
this._size = 1;
this.endIndex = this.startIndex;
return;
}
const lastItem = this.get(this._size - 1);
const isAlreadyInOrder = lastItem && sortingCallback(lastItem, data) <= 0;
if (this._size < this._capacity) {
this.endIndex = (this.endIndex + 1) % this._capacity;
this._data[this.endIndex] = data;
this._size++;
} else {
this.startIndex = (this.startIndex + 1) % this._capacity;
this.endIndex = (this.endIndex + 1) % this._capacity;
this._data[this.endIndex] = data;
}
if (!isAlreadyInOrder) {
this.sortData(sortingCallback);
}
}
popFront = () => {
if (this._size === 0) {
return;
}
this._data[this.startIndex] = undefined as any;
if (this._size === 1) {
this._size = 0;
this.startIndex = 0;
this.endIndex = 0;
} else {
this.startIndex = (this.startIndex + 1) % this._capacity;
this._size--;
}
}
binarySearch = <K>(
searchKey: K,
keyExtractor: (item: T) => K
): T | undefined => {
if (this._size === 0) {
return undefined;
}
let left = 0;
let right = this._size - 1;
while (left <= right) {
const mid = Math.floor((left + right) / 2);
const midItem = this.get(mid)!;
const midKey = keyExtractor(midItem);
if (midKey === searchKey) {
return midItem;
} else if (midKey < searchKey) {
left = mid + 1;
} else {
right = mid - 1;
}
}
return undefined;
}
private sortData = (sortingCallback: (a: T, b: T) => number) => {
const items: T[] = [];
for (let i = 0; i < this._size; i++) {
const item = this.get(i);
if (item !== undefined) {
items.push(item);
}
}
items.sort(sortingCallback);
for (let i = 0; i < items.length; i++) {
const actualIndex = (this.startIndex + i) % this._capacity;
this._data[actualIndex] = items[i];
}
};
}

View File

@@ -1,106 +0,0 @@
import { beforeEach, describe, expect, it } from "@jest/globals";
import { InMemoryParkingRepository } from "../../src/repositories/InMemoryParkingRepository";
import { IParkingStructure } from "../../src/entities/ParkingRepositoryEntities";
describe("InMemoryParkingRepository", () => {
let repository: InMemoryParkingRepository;
const testStructure: IParkingStructure = {
coordinates: {
latitude: 33.794795,
longitude: -117.850807,
},
spotsAvailable: 0,
id: "1",
name: "Anderson Parking Structure",
capacity: 100,
address: "300 E Walnut Ave, Orange, CA 92867",
updatedTime: new Date(),
};
beforeEach(() => {
repository = new InMemoryParkingRepository();
});
describe("addOrUpdateParkingStructure", () => {
it("should add a new parking structure", async () => {
await repository.addOrUpdateParkingStructure(testStructure);
const result = await repository.getParkingStructureById(testStructure.id);
expect(result).toEqual(testStructure);
});
it("should update existing parking structure", async () => {
await repository.addOrUpdateParkingStructure(testStructure);
const updatedStructure = { ...testStructure, name: "Updated Garage" };
await repository.addOrUpdateParkingStructure(updatedStructure);
const result = await repository.getParkingStructureById(testStructure.id);
expect(result).toEqual(updatedStructure);
});
});
describe("removeParkingStructureIfExists", () => {
it("should remove existing parking structure and return it", async () => {
await repository.addOrUpdateParkingStructure(testStructure);
const removed = await repository.removeParkingStructureIfExists(testStructure.id);
expect(removed).toEqual(testStructure);
const result = await repository.getParkingStructureById(testStructure.id);
expect(result).toBeNull();
});
it("should return null when removing non-existent structure", async () => {
const result = await repository.removeParkingStructureIfExists("non-existent");
expect(result).toBeNull();
});
});
describe("clearParkingStructureData", () => {
it("should remove all parking structures", async () => {
const structures = [
testStructure,
{ ...testStructure, id: "test-id-2", name: "Second Garage" }
];
for (const structure of structures) {
await repository.addOrUpdateParkingStructure(structure);
}
await repository.clearParkingStructureData();
const result = await repository.getParkingStructures();
expect(result).toHaveLength(0);
});
});
describe("getParkingStructures", () => {
it("should return empty array when no structures exist", async () => {
const result = await repository.getParkingStructures();
expect(result).toEqual([]);
});
it("should return all added structures", async () => {
const structures = [
testStructure,
{ ...testStructure, id: "test-id-2", name: "Second Garage" }
];
for (const structure of structures) {
await repository.addOrUpdateParkingStructure(structure);
}
const result = await repository.getParkingStructures();
expect(result).toHaveLength(2);
expect(result).toEqual(expect.arrayContaining(structures));
});
});
describe("getParkingStructureById", () => {
it("should return null for non-existent structure", async () => {
const result = await repository.getParkingStructureById("non-existent");
expect(result).toBeNull();
});
it("should return structure by id", async () => {
await repository.addOrUpdateParkingStructure(testStructure);
const result = await repository.getParkingStructureById(testStructure.id);
expect(result).toEqual(testStructure);
});
});
});

View File

@@ -0,0 +1,203 @@
import { afterEach, beforeEach, describe, expect, it, jest } from "@jest/globals";
import { InMemoryParkingRepository, } from "../../src/repositories/InMemoryParkingRepository";
import { IParkingStructure } from "../../src/entities/ParkingRepositoryEntities";
import { ParkingStructureCountOptions } from "../../src/repositories/ParkingGetterRepository";
import { ParkingGetterSetterRepository } from "../../src/repositories/ParkingGetterSetterRepository";
import { RedisParkingRepository } from "../../src/repositories/RedisParkingRepository";
interface RepositoryHolder {
name: string;
factory(): Promise<ParkingGetterSetterRepository>;
teardown(): Promise<void>;
}
class InMemoryParkingRepositoryHolder implements RepositoryHolder {
name = 'InMemoryParkingRepository';
factory = async () => {
return new InMemoryParkingRepository();
};
teardown = async () => {};
}
class RedisParkingRepositoryHolder implements RepositoryHolder {
repo: RedisParkingRepository | undefined;
name = 'RedisParkingRepository';
factory = async () => {
this.repo = new RedisParkingRepository();
await this.repo.connect();
return this.repo;
};
teardown = async () => {
if (this.repo) {
await this.repo.clearAllData();
await this.repo.disconnect();
}
};
}
const repositoryImplementations = [
new InMemoryParkingRepositoryHolder(),
new RedisParkingRepositoryHolder(),
];
describe.each(repositoryImplementations)('$name', (holder) => {
let repository: ParkingGetterSetterRepository;
const testStructure: IParkingStructure = {
coordinates: {
latitude: 33.794795,
longitude: -117.850807,
},
spotsAvailable: 0,
id: "1",
name: "Anderson Parking Structure",
capacity: 100,
address: "300 E Walnut Ave, Orange, CA 92867",
updatedTime: new Date(),
};
beforeEach(async () => {
repository = await holder.factory();
jest.useRealTimers();
});
afterEach(async () => {
await holder.teardown();
jest.useRealTimers();
});
describe("addOrUpdateParkingStructure", () => {
it("should add a new parking structure", async () => {
await repository.addOrUpdateParkingStructure(testStructure);
const result = await repository.getParkingStructureById(testStructure.id);
expect(result).toEqual(testStructure);
});
it("should update existing parking structure", async () => {
await repository.addOrUpdateParkingStructure(testStructure);
const updatedStructure = { ...testStructure, name: "Updated Garage" };
await repository.addOrUpdateParkingStructure(updatedStructure);
const result = await repository.getParkingStructureById(testStructure.id);
expect(result).toEqual(updatedStructure);
});
});
describe("removeParkingStructureIfExists", () => {
it("should remove existing parking structure and return it", async () => {
await repository.addOrUpdateParkingStructure(testStructure);
const removed = await repository.removeParkingStructureIfExists(testStructure.id);
expect(removed).toEqual(testStructure);
const result = await repository.getParkingStructureById(testStructure.id);
expect(result).toBeNull();
});
it("should return null when removing non-existent structure", async () => {
const result = await repository.removeParkingStructureIfExists("non-existent");
expect(result).toBeNull();
});
});
describe("clearParkingStructureData", () => {
it("should remove all parking structures", async () => {
const structures = [
testStructure,
{ ...testStructure, id: "test-id-2", name: "Second Garage" }
];
for (const structure of structures) {
await repository.addOrUpdateParkingStructure(structure);
}
await repository.clearParkingStructureData();
const result = await repository.getParkingStructures();
expect(result).toHaveLength(0);
});
});
describe("getParkingStructures", () => {
it("should return empty array when no structures exist", async () => {
const result = await repository.getParkingStructures();
expect(result).toEqual([]);
});
it("should return all added structures", async () => {
const structures = [
testStructure,
{ ...testStructure, id: "test-id-2", name: "Second Garage" }
];
for (const structure of structures) {
await repository.addOrUpdateParkingStructure(structure);
}
const result = await repository.getParkingStructures();
expect(result).toHaveLength(2);
expect(result).toEqual(expect.arrayContaining(structures));
});
});
describe("getParkingStructureById", () => {
it("should return null for non-existent structure", async () => {
const result = await repository.getParkingStructureById("non-existent");
expect(result).toBeNull();
});
it("should return structure by id", async () => {
await repository.addOrUpdateParkingStructure(testStructure);
const result = await repository.getParkingStructureById(testStructure.id);
expect(result).toEqual(testStructure);
});
});
describe("getHistoricalAveragesOfParkingStructureCounts", () => {
it("should return empty array for non-existent structure or no data", async () => {
const options: ParkingStructureCountOptions = {
startUnixEpochMs: 1000,
endUnixEpochMs: 2000,
intervalMs: 500
};
expect(await repository.getHistoricalAveragesOfParkingStructureCounts("non-existent", options)).toEqual([]);
await repository.addOrUpdateParkingStructure(testStructure);
expect(await repository.getHistoricalAveragesOfParkingStructureCounts(testStructure.id, options)).toEqual([]);
});
it("should calculate averages for intervals with manual historical data", async () => {
// Set logging interval to 0 so every update creates historical data
repository.setLoggingInterval(0);
await repository.addOrUpdateParkingStructure(testStructure);
const updates = [
{ ...testStructure, spotsAvailable: 80, updatedTime: new Date() },
{ ...testStructure, spotsAvailable: 70, updatedTime: new Date() },
{ ...testStructure, spotsAvailable: 60, updatedTime: new Date() },
];
for (let i = 0; i < updates.length; i++) {
// Ensure that different timestamps are created, even after adding the first test structure
await new Promise((resolve) => setTimeout(resolve, 200));
await repository.addOrUpdateParkingStructure(updates[i]);
}
const now = Date.now();
const options: ParkingStructureCountOptions = {
startUnixEpochMs: now - 10000, // Look back 10 seconds
endUnixEpochMs: now + 10000, // Look forward 10 seconds
intervalMs: 20000 // Single large interval
};
const result = await repository.getHistoricalAveragesOfParkingStructureCounts(testStructure.id, options);
// Should have at least some historical data
expect(result.length).toEqual(1);
if (result.length > 0) {
expect(result[0]).toHaveProperty('fromUnixEpochMs');
expect(result[0]).toHaveProperty('toUnixEpochMs');
expect(result[0]).toHaveProperty('averageSpotsAvailable');
expect(result[0].averageSpotsAvailable).toBeCloseTo(52.5);
}
});
});
});

View File

@@ -0,0 +1,201 @@
import { describe, expect, it } from "@jest/globals";
import { CircularQueue } from "../../src/types/CircularQueue";
interface TestItem {
id: number;
value: string;
}
describe("CircularQueue", () => {
const testItems = {
first: { id: 1, value: "first" },
second: { id: 2, value: "second" },
third: { id: 3, value: "third" },
fourth: { id: 4, value: "fourth" },
test: { id: 1, value: "test" },
apple: { id: 1, value: "apple" },
banana: { id: 2, value: "banana" },
cherry: { id: 3, value: "cherry" },
grape: { id: 5, value: "grape" },
orange: { id: 7, value: "orange" },
a: { id: 1, value: "a" },
b: { id: 2, value: "b" },
c: { id: 3, value: "c" },
d: { id: 4, value: "d" }
};
const sortingCallbacks = {
byId: (a: TestItem, b: TestItem) => a.id - b.id,
byValue: (a: TestItem, b: TestItem) => a.value.localeCompare(b.value)
};
const keyExtractors = {
id: (item: TestItem) => item.id,
value: (item: TestItem) => item.value
};
const createQueueWithItems = (size: number, items: TestItem[], sortingCallback: (a: TestItem, b: TestItem) => number) => {
const queue = new CircularQueue<TestItem>(size);
items.forEach(item => queue.appendWithSorting(item, sortingCallback));
return queue;
};
describe("constructor", () => {
it("creates queue with specified size", () => {
const queue = new CircularQueue<TestItem>(5);
expect(queue).toBeDefined();
});
});
describe("appendWithSorting", () => {
it("adds items to the queue with sorting callback", () => {
const queue = createQueueWithItems(3, [testItems.third, testItems.first, testItems.second], sortingCallbacks.byId);
expect(queue.size()).toBe(3);
expect(queue.get(0)).toEqual(testItems.first);
expect(queue.get(1)).toEqual(testItems.second);
expect(queue.get(2)).toEqual(testItems.third);
});
it("overwrites oldest items when queue is full", () => {
const queue = createQueueWithItems(2, [testItems.first, testItems.second, testItems.third], sortingCallbacks.byId);
expect(queue.size()).toBe(2);
});
it("handles appending to empty queue", () => {
const queue = createQueueWithItems(3, [testItems.test], sortingCallbacks.byId);
expect(queue.size()).toBe(1);
expect(queue.get(0)).toEqual(testItems.test);
});
it("optimizes append when items are already in order", () => {
const queue = new CircularQueue<TestItem>(5);
let sortCallCount = 0;
const trackingSortCallback = (a: TestItem, b: TestItem) => {
sortCallCount++;
return a.id - b.id;
};
queue.appendWithSorting(testItems.first, trackingSortCallback);
expect(sortCallCount).toBe(0);
queue.appendWithSorting(testItems.second, trackingSortCallback);
expect(sortCallCount).toBe(1);
queue.appendWithSorting(testItems.third, trackingSortCallback);
expect(sortCallCount).toBe(2);
queue.appendWithSorting({ id: 0, value: "zero" }, trackingSortCallback);
expect(sortCallCount).toBeGreaterThan(3);
expect(queue.get(0)).toEqual({ id: 0, value: "zero" });
expect(queue.get(1)).toEqual(testItems.first);
});
});
describe("popFront", () => {
it("removes the oldest item from queue", () => {
const queue = createQueueWithItems(3, [testItems.first, testItems.second], sortingCallbacks.byId);
expect(queue.size()).toBe(2);
queue.popFront();
expect(queue.size()).toBe(1);
expect(queue.get(0)).toEqual(testItems.second);
});
it("handles popping from empty queue", () => {
const queue = new CircularQueue<TestItem>(3);
expect(() => queue.popFront()).not.toThrow();
expect(queue.size()).toBe(0);
});
it("handles popping until empty", () => {
const queue = createQueueWithItems(2, [testItems.first, testItems.second], sortingCallbacks.byId);
queue.popFront();
expect(queue.size()).toBe(1);
queue.popFront();
expect(queue.size()).toBe(0);
queue.popFront();
expect(queue.size()).toBe(0);
});
});
describe("binarySearch", () => {
it("finds item using key extractor function", () => {
const queue = createQueueWithItems(5, [testItems.apple, testItems.cherry, testItems.grape, testItems.orange], sortingCallbacks.byId);
const result = queue.binarySearch(5, keyExtractors.id);
expect(result).toEqual(testItems.grape);
});
it("returns undefined when item not found", () => {
const queue = createQueueWithItems(5, [testItems.apple, testItems.cherry, testItems.orange], sortingCallbacks.byId);
const result = queue.binarySearch(5, keyExtractors.id);
expect(result).toBeUndefined();
});
it("finds first item", () => {
const queue = createQueueWithItems(5, [testItems.apple, testItems.cherry, testItems.orange], sortingCallbacks.byId);
const result = queue.binarySearch(1, keyExtractors.id);
expect(result).toEqual(testItems.apple);
});
it("finds last item", () => {
const queue = createQueueWithItems(5, [testItems.apple, testItems.cherry, testItems.orange], sortingCallbacks.byId);
const result = queue.binarySearch(7, keyExtractors.id);
expect(result).toEqual(testItems.orange);
});
it("returns undefined for empty queue", () => {
const queue = new CircularQueue<TestItem>(5);
const result = queue.binarySearch(1, keyExtractors.id);
expect(result).toBeUndefined();
});
it("works with string keys", () => {
const queue = createQueueWithItems(5, [testItems.apple, testItems.banana, testItems.cherry], sortingCallbacks.byValue);
const result = queue.binarySearch("banana", keyExtractors.value);
expect(result).toEqual(testItems.banana);
});
it("maintains sorted order assumption", () => {
const queue = createQueueWithItems(5, [testItems.d, testItems.a, testItems.c, testItems.b], sortingCallbacks.byValue);
expect(queue.binarySearch("a", keyExtractors.value)).toEqual(testItems.a);
expect(queue.binarySearch("b", keyExtractors.value)).toEqual(testItems.b);
expect(queue.binarySearch("c", keyExtractors.value)).toEqual(testItems.c);
expect(queue.binarySearch("d", keyExtractors.value)).toEqual(testItems.d);
expect(queue.binarySearch("z", keyExtractors.value)).toBeUndefined();
});
});
describe("integration", () => {
it("handles appendWithSorting, popFront, and binarySearch together", () => {
const queue = createQueueWithItems(3, [testItems.third, testItems.first, testItems.second], sortingCallbacks.byId);
expect(queue.binarySearch(2, keyExtractors.id)).toEqual(testItems.second);
queue.popFront();
expect(queue.binarySearch(1, keyExtractors.id)).toBeUndefined();
expect(queue.binarySearch(2, keyExtractors.id)).toEqual(testItems.second);
queue.appendWithSorting(testItems.fourth, sortingCallbacks.byId);
expect(queue.binarySearch(4, keyExtractors.id)).toEqual(testItems.fourth);
});
});
});