Last active
February 28, 2017 19:24
-
-
Save carlevans719/089daad500af86a07ab5bfa9a0a2e1e8 to your computer and use it in GitHub Desktop.
publish a relational dataset in meteor with a mongo backend
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// invoked thus: | |
relationalPublish.call(this, { | |
collection: new Mongo.Collection('pens'), | |
collectionName: 'pens', | |
selector: { brand: 'bic' }, | |
options: {limit: 500}, | |
include: [{ | |
collection: new Mongo.Collection('inks'), | |
collectionName: 'inks', | |
fieldName: 'inkType' | |
}, { | |
collection: new Mongo.Collection('penLengths'), | |
collectionName: 'penLengths', | |
fieldName: 'availableLengths' | |
}] | |
); | |
/* | |
* given the following data: | |
*/ | |
// PENS collection | |
{ | |
color: 'red', | |
brand: 'bic', | |
availableLengths: ['id1', 'id2'], | |
inkType: 'id5' | |
} | |
// INKS collection | |
// ... | |
{ | |
_id: 'id5', | |
color: 'blue', | |
type: 'wet' | |
} | |
// PENLENGTHS collection | |
{ | |
_id: 'id1', | |
length: 40 | |
}, | |
{ | |
_id: 'id2', | |
length: 100 | |
} | |
// ... |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Mongo } from 'meteor/mongo' | |
import { didResolve as isset } from '../utility/resolve' | |
import { assertOrThrow } from '../utility/assertOrThrow' | |
import * as array from '../utility/array' | |
/** | |
* @function relationalPublish | |
* @summary A relation publication handler for a query | |
* It will publish matching docs and those which are | |
* related to them (searching by _id) | |
* @param {Object} params an object with the following keys | |
* - collection {Mongo.Collection} a collection to query | |
* - collectionName {String} the value passed to "new Mongo.Collection" | |
* - selector {Object} a Mongo selector | |
* - [options] {Object} an optional options object e.g. {fields: {... | |
* - include {Array<Object>} an array of objects with the following keys: | |
* - collection {Mongo.Collection} a collection to query for the | |
* relational data | |
* - collectionName {String} the value passed to "new Mongo.Colection" | |
* - fieldName {String} the name of the field which contains the related | |
* documents' _ids | |
* @returns {Object} object with the following keys: | |
* - cursor {Object} contains the obtained cursor | |
* - observeHandle {Object} contains the handle to the observer | |
*/ | |
export const relationalPublish = function relationalPublish (params) { | |
const argsConform = isset(params, 'collection') && | |
params.collection instanceof Mongo.Collection && | |
isset(params, 'collectionName') && | |
typeof params.collectionName === 'string' && | |
isset(params, 'selector') && | |
typeof params.selector === 'object' | |
assertOrThrow.call(this, argsConform, 400) | |
// if params.options is set, it must be an object (just | |
// like params.selector) | |
if (isset(params, 'options')) { | |
assertOrThrow.call(this, typeof params.options === 'object', 400) | |
} else { | |
params.options = {} | |
} | |
// Build cache structure & fieldMap | |
const relatedDataCache = {} | |
const relatedFieldMap = {} | |
params.include.forEach(function buildCache (relation) { | |
// All caches are arrays | |
relatedDataCache[relation.fieldName] = [] | |
// Build map for relational cursor | |
relatedFieldMap[relation.fieldName] = 1 | |
}) | |
// Get a cursor | |
const cursor = params.collection.find(params.selector, params.options) | |
// Get a better cursor | |
const relationalCursor = params.collection.find(params.selector, Object.assign(params.options, {fields: relatedFieldMap})) | |
// Watch for changes to the cursor | |
const observeHandle = cursor.observeChanges({ | |
added: (id, fields) => { | |
// Send this doc to the client | |
this.added(params.collectionName, id, fields) | |
// Update our cache | |
// XXX: potentially expensive | |
updateRelationalPublication.call(this, params.include, fields, relatedDataCache) | |
}, | |
changed: (id, fields) => { | |
// Send this doc to the client | |
this.added(params.collectionName, id, fields) | |
// Update our cache | |
// XXX: potentially expensive | |
recalculateCache.call(this, relationalCursor, params.include, relatedDataCache) | |
}, | |
removed: (id) => { | |
// Remove this doc from the client | |
this.remove(params.collectionName, id) | |
// Check if we need to remove docs from another collection | |
// XXX: potentially expensive | |
recalculateCache.call(this, relationalCursor, params.include, relatedDataCache) | |
} | |
}) | |
// Ensure we stop watching for changes | |
// when this pub finishes | |
this.onStop(function stopObservers () { | |
observeHandle.stop() | |
}) | |
return { | |
cursor, | |
relationalCursor, | |
observeHandle | |
} | |
} | |
/** | |
* @function recalculateCache | |
* @summary rebuilds a list of relational items that should have | |
* been published to the client, then adds/removes the deltas | |
* @param {Mongo.Cursor} cursor a mongo cursor to | |
* retrieve the results from | |
* @param {Array<Object>} relations an array of objects with the following | |
* keys: | |
* - collection {Mongo.Collection} a collection to query | |
* - collectionName {String} the value passed to "new Mongo.Colection" | |
* - fieldName {String} the name of the field which contains the related | |
* documents' _ids | |
* @param {Object} oldCache the old cached data | |
* @returns | |
*/ | |
function recalculateCache (cursor, relations, oldCache) { | |
const self = this | |
const newCache = {} | |
// Fetch up-to-date results from the cursor | |
const currentData = cursor.fetch() | |
// Check each document, building up a complete picture | |
// of the related docs required | |
currentData.forEach(function checkAllData (item) { | |
relations.forEach(function checkAllFields (relation) { | |
let newIds = item[relation.fieldName] | |
// newIds === [String] || String | |
if (typeof newIds === 'string') { | |
newIds = [newIds] | |
} | |
array.addToSetInPlace(newCache[relation.fieldName], newIds) | |
}) | |
}) | |
relations.forEach(function processDeltasForEachField (relation) { | |
const deltas = calculateDeltas(newCache[relation.fieldName], oldCache[relation.fieldName]) | |
// Propagate changes to client | |
deltas.toAdd.forEach((id) => self.added(relation.collectionName, id, relation.collection.findOne(id))) | |
deltas.toUpdate.forEach((id) => self.changed(relation.collectionName, id, relation.collection.findOne(id))) | |
deltas.toRemove.forEach((id) => self.removed(relation.collectionName, id)) | |
}) | |
} | |
/** | |
* @function calculateDeltas | |
* @summary given two arrays of strings, works out | |
* which strings have been "added" (present in the | |
* first array, but not the second) and which have | |
* been "removed" (present in the second array, but | |
* not the first) | |
* @param {Array<String>} newIds an array of ids | |
* @param {Array<String>} oldIds an array of ids | |
* @returns {Object} an object with two keys: | |
* - toAdd {Array<String>} an array of ids which | |
* are present in the first array, but not the second | |
* - toUpdate {Array<String>} an array of ids which are | |
* present in both arrays | |
* - toRemove {Array<String>} an array of ids which | |
* are present in the second array, but not the first | |
*/ | |
function calculateDeltas (newIds, oldIds) { | |
const toAdd = [] | |
const toUpdate = [] | |
const toRemove = [] | |
newIds.forEach(function checkNewIds (id) { | |
if (oldIds.indexOf(id) === -1) { | |
// this id has been added | |
toAdd.push(id) | |
} else { | |
// this id is present in both arrays | |
toUpdate.push(id) | |
} | |
}) | |
oldIds.forEach(function checkOldIds (id) { | |
if (newIds.indexOf(id) === -1) { | |
// this id has been removed | |
toRemove.push(id) | |
} | |
}) | |
return { | |
toAdd, | |
toUpdate, | |
toRemove | |
} | |
} | |
/** | |
* @function updateRelationalPublication | |
* @summary Does an "addToSet" from `fields` according to `relations` into | |
* `relatedDataCache`. If adding an item, it will publish it to the client too | |
* @param {Array<Object>} relations an array of objects with the following | |
* keys: | |
* - collection {Mongo.Collection} a collection to query | |
* - collectionName {String} the value passed to "new Mongo.Colection" | |
* - fieldName {String} the name of the field which contains the related | |
* documents' _ids | |
* @param {Object} fields the object to extract values from. Which keys' values | |
* are extracted will depend on the `relations` passed in | |
* @param {Object} relatedDataCache an object with a key for each `relation` | |
* where each key's value is an array of (potentially) strings | |
* @example | |
* var Stationary = new Mongo.Collection('stationary') | |
* var relations = [{ | |
* collection: Stationary, | |
* collectionName: 'stationary', | |
* fieldName: 'inks' | |
* }] | |
* | |
* var fields = Stationary.findOne() | |
* | |
* var relatedDataCache = { | |
* inks: [] | |
* } | |
* | |
* // The next line will cause the value of `fields.inks` to be added | |
* // to `relatedDataCache.inks` | |
* updateRelationalPublication(relations, fields, relatedDataCache) | |
* | |
* // This time, it will have no effect, as the value is already in the cache | |
* updateRelationalPublication(relations, fields, relatedDataCache) | |
*/ | |
function updateRelationalPublication (relations, fields, relatedDataCache) { | |
const self = this | |
/** | |
* @function addToSetAndPublish | |
* @summary checks `cache` for `id`, if it isn't present | |
* then it adds it and calls `this.added()` to publish it | |
* @param {Array<String>} cache an array of already published | |
* items' ids | |
* @param {String} id an item to potentially publish | |
* @param {Object} relation an object with the following | |
* keys: | |
* - collection {Mongo.Collection} a collection to query | |
* - collectionName {String} the value passed to "new Mongo.Colection" | |
* - fieldName {String} the name of the field which contains the related | |
* documents' _ids | |
*/ | |
function addToSetAndPublish (cache, id, relation) { | |
if (cache.indexOf(id) === -1) { | |
cache.push(id) | |
self.added(relation.collectionName, id, relation.collection.findOne(id)) | |
} | |
} | |
// do an "add to set" for each relatedDataCache | |
relations.forEach(function checkEachRelation (relation) { | |
const relatedData = fields[relation.fieldName] | |
// If the field is an array, check each item to ensure | |
// that it isn't already cached, if it isn't add it | |
if (relatedData instanceof Array) { | |
relatedData.forEach(function addToSet (item) { | |
// if the item isn't cached, cache it | |
addToSetAndPublish(relatedDataCache[relation.fieldName], item, relation) | |
}) | |
} else if (typeof relatedData === 'string') { | |
// if the item isn't cached, cache it | |
addToSetAndPublish(relatedDataCache[relation.fieldName], relatedData, relation) | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment