-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathEventsCacheInRedis.ts
More file actions
75 lines (65 loc) · 2.27 KB
/
EventsCacheInRedis.ts
File metadata and controls
75 lines (65 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import { IEventsCacheAsync } from '../types';
import { IMetadata } from '../../dtos/types';
import SplitIO from '../../../types/splitio';
import { ILogger } from '../../logger/types';
import { LOG_PREFIX } from './constants';
import { StoredEventWithMetadata } from '../../sync/submitters/types';
import type { RedisAdapter } from './RedisAdapter';
export class EventsCacheInRedis implements IEventsCacheAsync {
private readonly log: ILogger;
private readonly key: string;
private readonly redis: RedisAdapter;
private readonly metadata: IMetadata;
constructor(log: ILogger, key: string, redis: RedisAdapter, metadata: IMetadata) {
this.log = log;
this.key = key;
this.redis = redis;
this.metadata = metadata;
}
/**
* Add a new event object into the queue.
* Unlike `impressions::track`, result promise is never rejected.
*/
track(eventData: SplitIO.EventData): Promise<boolean> {
return this.redis.rpush(
this.key,
this._toJSON(eventData)
)
// We use boolean values to signal successful queueing
.then(() => true)
.catch((err: unknown) => {
this.log.error(`${LOG_PREFIX}Error adding event to queue: ${err}.`);
return false;
});
}
/**
* Generates the JSON as we'll store it on Redis.
*/
private _toJSON(eventData: SplitIO.EventData): string {
return JSON.stringify({
m: this.metadata,
e: eventData
} as StoredEventWithMetadata);
}
count(): Promise<number> {
return this.redis.llen(this.key).catch(() => 0);
}
drop(count?: number): Promise<any> {
if (!count) return this.redis.del(this.key);
return this.redis.ltrim(this.key, count, -1);
}
/**
* Pop the given number of events from the storage.
* The returned promise rejects if the redis operation fails.
*
* NOTE: this method doesn't take into account MAX_EVENT_SIZE or MAX_QUEUE_BYTE_SIZE limits.
* It is the submitter responsability to handle that.
*/
popNWithMetadata(count: number): Promise<StoredEventWithMetadata[]> {
return this.redis.lrange(this.key, 0, count - 1).then((items: string[]) => {
return this.redis.ltrim(this.key, items.length, -1).then(() => {
return items.map((item: string) => JSON.parse(item) as StoredEventWithMetadata);
});
});
}
}