Skip to content

Instantly share code, notes, and snippets.

@ReedD
Created January 7, 2016 20:20

Revisions

  1. ReedD created this gist Jan 7, 2016.
    65 changes: 65 additions & 0 deletions migrate.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,65 @@
    'use strict';

    var _ = require('lodash'),
    async = require('async'),
    Bluebird = require('bluebird'),
    mongodb = Bluebird.promisifyAll(require('mongodb')),
    using = Bluebird.using;

    var concurrency = 3;
    var totalCount = 0;
    var cursorCount = {};

    function processItem (item, cursor) {
    var id = cursor.id;
    if (!cursorCount[id]) cursorCount[id] = 0;
    cursorCount[id]++;
    totalCount++;
    // return Bluebird.delay(500);
    }

    function getConnectionAsync () {
    var url = 'mongodb://localhost:27017/migrate_test';
    return mongodb.MongoClient.connectAsync(url)
    .disposer(function (connection) {
    connection.close();
    });
    }

    function loopCursor (cursor, iterator) {
    return Bluebird.fromCallback(function (callback) {
    async.during(function (callback) {
    cursor.hasNext(callback);
    }, function (callback) {
    cursor.next().then(function (item) {
    return iterator(item, cursor);
    }).then(callback);
    }, callback);
    });
    }

    using(getConnectionAsync(), function (connection) {
    return connection.collectionAsync('filters')
    .then(function (collection) {
    return collection.count().then(function (count) {
    var promises = [];
    var pageSize = Math.floor(count / concurrency);
    _.times(concurrency, function (n) {
    var cursor = collection.find().skip(pageSize * n);
    cursor.id = _.uniqueId();
    cursor.collection = collection;
    if (n !== concurrency - 1) {
    cursor.limit(pageSize);
    }
    promises.push(loopCursor(cursor, processItem));
    });
    return Bluebird.all(promises);
    });
    });
    }).then(function (data) {
    _.each(cursorCount, function (count, id) {
    console.log('Cursor %s Processed: %s', id, count);
    });
    console.log('Total Processed: %s', totalCount);
    console.log('Migration Complete');
    });