Skip to content

Instantly share code, notes, and snippets.

@philholden
Created August 8, 2020 05:44
Show Gist options
  • Save philholden/e037529379eaf42b34821eff94addfec to your computer and use it in GitHub Desktop.
Save philholden/e037529379eaf42b34821eff94addfec to your computer and use it in GitHub Desktop.
import { clone } from "./clone";
function updateQueue() {
const _queue = [];
function add(update) {
_queue.push(update);
console.log("add", _queue);
_queue.sort((a, b) => a.updateNonce - b.updateNonce);
}
function applyUpdates(obj) {
const out = clone(obj.body);
const updates = _queue.filter(
(x) => obj.head.clientId === x.head.setterClientId
);
return {
head: Object.assign(clone(obj.head), ...updates.map((x) => x.head)),
body: Object.assign(clone(obj.body), ...updates.map((x) => x.body)),
};
}
return {
add,
applyUpdates,
get() {
return _queue;
},
};
}
export function ipfsPubSubStore({ ipfs, channelId, messageType, placeholder }) {
let ready;
let _nonce = -1;
let _updateNonce = -1;
let _destroy = () => {};
let _joinNode = false;
let initTime = performance.now();
let timeout;
let joinTimeout;
const _clientId = Math.random();
const updateQueues = [];
const valCache = [];
const subs = [];
function getLatest() {
if (!updateQueues[_nonce]) return valCache[_nonce];
return updateQueues[_nonce].applyUpdates(valCache[_nonce]);
}
const subscribe = (cb) => {
subs.push(cb);
cb(placeholder);
return () => {
const index = subs.findIndex((fn) => fn === cb);
subs.splice(index, 1);
};
};
function notify() {
console.log("notify", updateQueues, _nonce);
if (!valCache[_nonce]) return;
const latestVal = getLatest().body || placeholder;
subs.forEach((fn) => fn(latestVal));
}
function start() {
ipfs.then((ipfs) => {
ipfs.pubsub.subscribe(`${channelId}-${messageType}-set`, onSet);
ipfs.pubsub.subscribe(`${channelId}-${messageType}-update`, onUpdate);
ipfs.pubsub.subscribe(`${channelId}-${messageType}-init`, onInit);
// should do incremental backoff
// setTimeout(() => {
// console.log("send join");
// ipfs.pubsub.publish(
// `${channelId}-${messageType}-join`,
// Buffer.from(JSON.stringify({ head: { clientId: _clientId } }))
// );
// }, 400);
doJoin(ipfs);
});
_destroy = () => {
ipfs.then((ipfs) => {
ipfs.pubsub.unsubscribe(`${channelId}-${messageType}-set`, onSet);
ipfs.pubsub.unsubscribe(`${channelId}-${messageType}-update`, onUpdate);
ipfs.pubsub.unsubscribe(`${channelId}-${messageType}-init`, onInit);
ipfs.pubsub.unsubscribe(`${channelId}-${messageType}-join`, onJoin);
});
_joinNode = false;
};
}
start();
function doJoin(ipfs) {
const fibonacci = [
100,
200,
300,
500,
800,
1300,
2100,
3400,
5500,
8900,
14400,
23300,
37700,
];
function publishJoin() {
console.log("send join");
ipfs.pubsub.publish(
`${channelId}-${messageType}-join`,
Buffer.from(JSON.stringify({ head: { clientId: _clientId } }))
);
if (fibonacci.length) {
console.log(fibonacci.length);
joinTimeout = setTimeout(publishJoin, fibonacci.shift());
}
}
joinTimeout = setTimeout(publishJoin, fibonacci.shift());
}
function set(v) {
//if (shallowEquals(_val, v)) return;
_nonce++;
_updateNonce = 0;
if (!_joinNode) {
console.log("listen to joins");
_joinNode = true;
ipfs.then((ipfs) => {
ipfs.pubsub.subscribe(`${channelId}-${messageType}-join`, onJoin);
});
}
const head = {
nonce: _nonce,
clientId: _clientId,
setterClientId: _clientId,
updateNonce: 0,
};
const data = { head, body: { ...v } };
ipfs.then((ipfs) => {
ipfs.pubsub.publish(
`${channelId}-${messageType}-set`,
Buffer.from(JSON.stringify(data))
);
});
valCache[_nonce] = data;
notify();
}
function update(v, expectedNonce) {
if (expectedNonce !== undefined && expectedNonce < _nonce) {
console.log(`Too late expected nonce: ${expectedNonce} found ${_nonce}`);
return;
}
let latestUpdateNonce;
const latest = getLatest();
if (!latest || !latest.head) return;
console.log("update3");
latestUpdateNonce = latest.head.updateNonce || -1;
_updateNonce = Math.max(_updateNonce, latestUpdateNonce) + 1;
const head = {
nonce: _nonce,
updateNonce: _updateNonce,
clientId: _clientId,
setterClientId: latest.head.clientId,
};
const data = { head, body: { ...v } };
console.log("update", data);
if (updateQueues[_nonce] === undefined) {
updateQueues[_nonce] = updateQueue();
}
updateQueues[_nonce].add(data);
console.log("update", updateQueues[_nonce]);
ipfs.then((ipfs) => {
ipfs.pubsub.publish(
`${channelId}-${messageType}-update`,
Buffer.from(JSON.stringify(data))
);
});
notify();
}
function onInit(msg) {
console.log("oninit", msg);
onSet(msg);
}
function onSet(msg) {
console.log("set", msg);
const data = JSON.parse(msg.data.toString());
if (data.head.clientId === _clientId) return;
clearTimeout(joinTimeout);
const nonce = data.head.nonce;
if (nonce < _nonce) return;
_nonce = nonce;
if (!ready) {
ready = true;
ipfs.then((ipfs) => {
console.log("stop listening to init");
ipfs.pubsub.unsubscribe(`${channelId}-${messageType}-init`, onInit);
});
}
// if nonce exists highest client id wins
if (
valCache[nonce] === undefined ||
valCache[nonce].head.clientId < data.head.clientId
) {
valCache[nonce] = data;
_joinNode = false;
ipfs.then((ipfs) => {
console.log("stop listening to joins");
ipfs.pubsub.unsubscribe(`${channelId}-${messageType}-join`, onJoin);
});
}
notify();
}
function onUpdate(msg) {
const data = JSON.parse(msg.data.toString());
if (data.head.clientId === _clientId) return;
const nonce = data.head.nonce;
if (updateQueues[nonce] === undefined) {
updateQueues[nonce] = updateQueue();
}
updateQueues[nonce].add(data);
if (nonce < _nonce) return;
notify();
}
function sendInit() {
const data = getLatest();
console.log(data);
ipfs.then((ipfs) => {
clearTimeout(timeout);
initTime = performance.now();
ipfs.pubsub.publish(
`${channelId}-${messageType}-init`,
Buffer.from(JSON.stringify(data))
);
});
}
function onJoin(msg) {
console.log("join");
const data = JSON.parse(msg.data.toString());
if (data.head.clientId === _clientId) return;
if (performance.now() - initTime > 1000) {
sendInit();
} else {
clearTimeout(timeout);
timeout = setTimeout(sendInit, 1000);
}
}
function stop() {
_destroy();
}
// update could target nonce and creator
return { start, stop, subscribe, set, update };
}
/*
Last setter should be the node that handels joins
Join data published max once per second
conflicting sets with same nonce are settled by clientId
*/
@philholden
Copy link
Author

philholden commented Aug 8, 2020

Disclosure of previous work

Please read before awarding any prize

My initial plan was to use 3Box Ghost threads for Hymn Sheet and Textile Buckets for storage an not write any custom networking for the song channels. I had Ghost threads essentially working in week 1 but when I came to integrate this in week 4 with the UI I started to get issues bundling Textile. 3Box was being used to track access rights to song books stored in Textile buckets. But without buckets working there was less justification for 3Box. I also listened to the Redit founders talk saying that it is bad to expose new users to Metamask. So I dropped 3Box too.

I decided to write my own custom network layer. But I based it on some previous experiments I had done before I knew about the hackathon. The old code is above in the gist the new code is in the links below. V2 which I created in the hackathon is a ground up rewrite of this that extends its functionality (and makes it work for late joiners). Below I describe the changes:

V1 Four channel system (set, update, join and init)

  • client Id not using peerId so not cryptographically verifiable
  • no concept of an allow list
  • no use of async await
  • set channel (major up dates code is scattered)
  • cannot handle a controller who is late to join as they will publish a 'set' with a nonce of zero and this will be ignored by other clients
  • this also happens if page is reloaded

V2 Five channel system (major, minor, join, init, ipns)

This is the code for V2

https://github.com/philholden/hymnsheet/blob/master/store/channelStore.js
https://github.com/philholden/hymnsheet/blob/master/store/updateQueue.js

  • clientId is cryptographically verifiable because it is the peerId
  • this enables us to do a proper allowList where clientId cannot be faked
  • allowList is implemented such that even previous messages from band people will be remove
  • this means most circumstances user can create open channels where any one can be the controller (more fun)
  • channel will only be restricted in response to abuse
  • for large events controller access can be locked down from start
  • allowList is stored on IPFS and linked to an IPNS name this means allowList can be updated dynamically with IPNS PubSub.
  • this gives us a concept not just of channel controllers who can control lyrics but also channel admins who can control who can control lyrics. Those with access to the IPNS key are channel admins
  • the updateQueue has been placed in its own module
  • it now handles major updates as well as the minor ones in one place
  • the sorting of these messages is now more rigorous making sure all peers see the same thing regardless of the order that messages are received in.
  • Each major and minor message has a count. When a new major comes in all previous majors are wiped and their corresponding minor updates.
  • We also take peerId into account went sorting majors incase two peers send a major with same count.
  • We get a problem when a new peer joins as a controller. It will have a count of zero so all its messages will be ignored until it gets a message from a current controller telling it what the count should be (and this may not happen). So we have made a major count of zero into an ace. It always wins and a major of zero will rest the major count on all peers to zero. This allows new controllers to join when an old one has left.

Song renderer

The song renderer has existed since 2016 and it breaks lines in verses of songs so they optimally match aspect ratio of the screen. This is essential for making the best use of projector brightness. You can see a talk on the song renderer here:

https://www.youtube.com/watch?v=K1LtTDDu9E4&t=4s

and on npm here:

https://www.npmjs.com/package/@omysoul/svelte-song-display

Storybook here:

https://ipfs.infura.io/ipfs/QmciYNTJzeKcEMWTYvNhcRErKvGVXMSbevvfgVhCLT7T2j/?path=/story/component--default

What is new?

  • The whole UI is new written from scratch in Svelte
  • Connecting the song displayer to a controller via a P2P network
  • Ground up rewrite of ipfsPubSubStore with tonnes of new features
  • The ability to have many to have many groups running at the same time
  • Global scalability at zero cost
  • Can finally release something I started work on six years ago so others can use it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment