Last active
June 2, 2022 03:47
-
-
Save akramarev/c6d4b747d5c3a8c255ac99c7638238a6 to your computer and use it in GitHub Desktop.
[Distributed Data Loader] GraphQL data loaders enhanced by distributed cache #AmplitudeEngineeringHandbook #public
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import DataLoader from 'dataloader'; | |
import logger from '../../../logger'; | |
export const REDIS_KEY_DELIMITER = '\x1F'; // ASCII unit separator (31 in hex) | |
export const REDIS_NULL_VALUE = '\x00'; // ASCII NULL (0 in hex), value is set but it's null | |
export type RedisNullValueType = '\x00'; | |
export type DistributedDataLoaderOptions<K, V> = DataLoader.Options< | |
K, | |
V, | |
string | |
> & { | |
expiration?: number; | |
}; | |
export type OptionsEx<K, V> = DataLoader.Options<K, V, string> & { | |
expiration: number; | |
cacheKeyFn: (key: K) => string; | |
}; | |
export abstract class DistributedDataLoader<K, V> | |
implements DataLoader<K, V, string> | |
{ | |
protected readonly defaultExpiration = 60 * 60 * 24; // 1 day | |
protected readonly options: OptionsEx<K, V>; | |
protected readonly loader: DataLoader<K, V>; | |
protected constructor( | |
protected keySpace: string, | |
protected batchLoadFn: DataLoader.BatchLoadFn<K, V>, | |
options?: DistributedDataLoaderOptions<K, V>, | |
) { | |
this.options = { | |
...options, | |
expiration: options?.expiration || this.defaultExpiration, | |
cacheKeyFn: options?.cacheKeyFn || ((key) => JSON.stringify(key)), | |
}; | |
// create a new loader with augmented batchLoadFn | |
this.loader = new DataLoader( | |
this.wrapBatchLoadFn(this.batchLoadFn), | |
this.options, | |
); | |
} | |
prime(key: K, value: Error | V): this { | |
if (value instanceof Error) { | |
this.loader.clear(key).prime(key, value); | |
return this; | |
} | |
// eslint-disable-next-line promise/prefer-await-to-then | |
this.rSet([[key, value]]).catch((err: Error) => logger.error(err)); | |
this.loader.clear(key).prime(key, value); | |
return this; | |
} | |
clear(key: K): this { | |
// eslint-disable-next-line promise/prefer-await-to-then | |
this.rDel([key]).catch((err: Error) => logger.error(err)); | |
this.loader.clear(key); | |
return this; | |
} | |
clearAll(): this { | |
// eslint-disable-next-line promise/prefer-await-to-then | |
this.rDelAll().catch((err: Error) => logger.error(err)); | |
this.loader.clearAll(); | |
return this; | |
} | |
async load(key: K): Promise<V> { | |
return this.loader.load(key); | |
} | |
async loadMany(keys: ArrayLike<K>): Promise<Array<Error | V>> { | |
return this.loader.loadMany(keys); | |
} | |
protected wrapBatchLoadFn( | |
batchLoadFn: DataLoader.BatchLoadFn<K, V>, | |
): DataLoader.BatchLoadFn<K, V> { | |
return async (keys: ReadonlyArray<K>): Promise<Array<V | Error>> => { | |
const result = new Array<V | Error>(); | |
// query distributed cache first | |
const rRecords = await this.rGet(keys); | |
// check if there are cache misses | |
const cacheMisses = rRecords.reduce( | |
(acc, rRecord, idx) => { | |
if (rRecord === null) { | |
acc.keys.push(keys[idx]); | |
acc.positions.push(idx); | |
} | |
result[idx] = (rRecord === REDIS_NULL_VALUE ? null : rRecord) as V; | |
return acc; | |
}, | |
{ | |
keys: new Array<K>(), | |
positions: new Array<number>(), | |
}, | |
); | |
if (cacheMisses.keys.length === 0) { | |
return rRecords as Array<V>; | |
} | |
// query primary data source | |
const dbRecords = await batchLoadFn(cacheMisses.keys); | |
// populate cache (skip errors) | |
await this.rSet( | |
Array.from(dbRecords) | |
.filter((r): r is V => !(r instanceof Error)) | |
.map((record, idx) => [cacheMisses.keys[idx], record]), | |
); | |
// fill the gaps and return complete result | |
return cacheMisses.positions.reduce((acc, pos, idx) => { | |
result[pos] = dbRecords[idx]; | |
return acc; | |
}, result); | |
}; | |
} | |
protected rSerialize(value: V): string | RedisNullValueType { | |
if (value === null) { | |
return REDIS_NULL_VALUE; | |
} | |
return JSON.stringify(value); | |
} | |
protected rParse(value: string | null): V | null | RedisNullValueType { | |
if (value === null) { | |
return null; | |
} else if (value === REDIS_NULL_VALUE) { | |
return REDIS_NULL_VALUE; | |
} | |
return JSON.parse(value) as V; | |
} | |
protected abstract rGet( | |
keys: ReadonlyArray<K>, | |
): Promise<Array<V | null | RedisNullValueType>>; | |
protected abstract rSet(tuples: Array<[K, V]>): Promise<void>; | |
protected abstract rDel(keys: ReadonlyArray<K>): Promise<void>; | |
protected abstract rDelAll(): Promise<void>; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { RedisIface } from 'amplitude/lib/redis'; | |
import DataLoader from 'dataloader'; | |
import { | |
DistributedDataLoader, | |
REDIS_KEY_DELIMITER, | |
DistributedDataLoaderOptions, | |
} from './DistributedDataLoader'; | |
import { HSDataLoader } from './HSDataLoader'; | |
import { KVDataLoader } from './KVDataLoader'; | |
export class DistributedDataLoaderFactory { | |
constructor(private redis: RedisIface, private keySpacePrefix: string = '') {} | |
/** | |
* Creates a new distributed data loader backend by HashSet data structure. | |
* Use it when write/clear operations are frequent. | |
*/ | |
public createHSDataLoader<K, V>( | |
keySpace: string, | |
batchLoadFn: DataLoader.BatchLoadFn<K, V>, | |
options?: DistributedDataLoaderOptions<K, V>, | |
): DistributedDataLoader<K, V> { | |
return new HSDataLoader( | |
this.redis, | |
[this.keySpacePrefix, keySpace, 'hs'].join(REDIS_KEY_DELIMITER), | |
batchLoadFn, | |
options, | |
); | |
} | |
/** | |
* Creates a new distributed data loader backend by HashSet data structure. | |
* Use it when load operations are frequent. | |
*/ | |
public createKVDataLoader<K, V>( | |
keySpace: string, | |
batchLoadFn: DataLoader.BatchLoadFn<K, V>, | |
options?: DistributedDataLoaderOptions<K, V>, | |
): DistributedDataLoader<K, V> { | |
return new KVDataLoader( | |
this.redis, | |
[this.keySpacePrefix, keySpace, 'kv'].join(REDIS_KEY_DELIMITER), | |
batchLoadFn, | |
options, | |
); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { RedisIface } from 'amplitude/lib/redis'; | |
import DataLoader from 'dataloader'; | |
import logger from '../../../logger'; | |
import { | |
DistributedDataLoader, | |
DistributedDataLoaderOptions, | |
RedisNullValueType, | |
} from './DistributedDataLoader'; | |
export class HSDataLoader<K, V> extends DistributedDataLoader<K, V> { | |
constructor( | |
private redis: RedisIface, | |
keySpace: string, | |
batchLoadFn: DataLoader.BatchLoadFn<K, V>, | |
options?: DistributedDataLoaderOptions<K, V>, | |
) { | |
super(keySpace, batchLoadFn, options); | |
} | |
protected async rGet( | |
keys: ReadonlyArray<K>, | |
): Promise<Array<V | null | RedisNullValueType>> { | |
return this.redis.runWithReconnect(async (redis) => { | |
const result = await redis.hmget( | |
this.keySpace, | |
...keys.map(this.options.cacheKeyFn), | |
); | |
return result.map(this.rParse); | |
}); | |
} | |
protected async rSet(tuples: Array<[K, V]>): Promise<void> { | |
return this.redis.runWithReconnect(async (redis) => { | |
const result = await redis.hset( | |
this.keySpace, | |
tuples.reduce((acc, [key, value]) => { | |
acc.set(this.options.cacheKeyFn(key), this.rSerialize(value)); | |
return acc; | |
}, new Map<string, string>()), | |
); | |
// expire command works only with keys | |
// TODO: when required, add manual fields expiration logic | |
if (result > 0) { | |
redis | |
.expire(this.keySpace, this.options.expiration) | |
// eslint-disable-next-line promise/prefer-await-to-then | |
.catch((err: Error) => logger.error(err)); | |
} | |
}); | |
} | |
protected async rDel(keys: ReadonlyArray<K>): Promise<void> { | |
return this.redis.runWithReconnect(async (redis) => { | |
await redis.hdel(this.keySpace, ...keys.map(this.options.cacheKeyFn)); | |
}); | |
} | |
protected async rDelAll(): Promise<void> { | |
return this.redis.runWithReconnect(async (redis) => { | |
await redis.del(this.keySpace); | |
}); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { RedisIface } from 'amplitude/lib/redis'; | |
import DataLoader from 'dataloader'; | |
import { | |
DistributedDataLoader, | |
REDIS_KEY_DELIMITER, | |
DistributedDataLoaderOptions, | |
RedisNullValueType, | |
} from './DistributedDataLoader'; | |
export class KVDataLoader<K, V> extends DistributedDataLoader<K, V> { | |
protected keySpaceSet: string; | |
constructor( | |
private redis: RedisIface, | |
keySpace: string, | |
batchLoadFn: DataLoader.BatchLoadFn<K, V>, | |
options?: DistributedDataLoaderOptions<K, V>, | |
) { | |
super(keySpace, batchLoadFn, options); | |
// separate set name to store all correspondent keys | |
this.keySpaceSet = `${keySpace}-set`; | |
} | |
protected async rGet( | |
keys: ReadonlyArray<K>, | |
): Promise<Array<V | null | RedisNullValueType>> { | |
return this.redis.runWithReconnect(async (redis) => { | |
const result = await Promise.all( | |
keys.map(async (k) => { | |
return redis.get( | |
[this.keySpace, this.options.cacheKeyFn(k)].join( | |
REDIS_KEY_DELIMITER, | |
), | |
); | |
}), | |
); | |
return result.map(this.rParse); | |
}); | |
} | |
protected async rSet(tuples: Array<[K, V]>): Promise<void> { | |
const keySet = new Set<string>(); | |
return this.redis.runWithReconnect(async (redis) => { | |
// save key-values first | |
await Promise.all( | |
tuples.map(async ([k, v]) => { | |
const key = [this.keySpace, this.options.cacheKeyFn(k)].join( | |
REDIS_KEY_DELIMITER, | |
); | |
keySet.add(key); | |
return redis.setex(key, this.options.expiration, this.rSerialize(v)); | |
}), | |
); | |
// put the keys into correspondent set (and always set new expiration) | |
await redis.sadd(this.keySpaceSet, ...keySet); | |
await redis.expire(this.keySpaceSet, this.options.expiration); | |
}); | |
} | |
protected async rDel(keys: ReadonlyArray<K>): Promise<void> { | |
const keySet = new Set<string>(); | |
return this.redis.runWithReconnect(async (redis) => { | |
await Promise.all( | |
keys.map(async (k) => { | |
const key = [this.keySpace, this.options.cacheKeyFn(k)].join( | |
REDIS_KEY_DELIMITER, | |
); | |
keySet.add(key); | |
return redis.del(key); | |
}), | |
); | |
await redis.srem(this.keySpaceSet, ...keySet); | |
}); | |
} | |
protected async rDelAll(): Promise<void> { | |
return this.redis.runWithReconnect(async (redis) => { | |
const keys = await redis.smembers(this.keySpaceSet); | |
await Promise.all( | |
keys.map(async (k) => { | |
return redis.del(k); | |
}), | |
); | |
await redis.del(this.keySpaceSet); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment