So lately I have been sucked more and more into the Apple eco-system, and one of the major reasons that keeps happening (despite my best attempts not to) is the seamless connection between devices. They are definatly doing some magic to make low powered devices (ex. AirPods, AirTags) interconnect with high powered devices (ex. iPhones, MacBooks, Apple TVs) (mostly) reliable. It feels as if all their devices is able to create and maintain a stable two way connection regardless of the technology they use, and all with close to zero configuration.
So how do they do it? I decided to give this a go myself and create a "hyper connectivity framework" for allowing devices to seamlesly connect and maintain that connection reardless of the underlaying technology with a minimum of network overhead and without making writing applications any more complecated.
So first of is the list of requirements I came up with to validate my framework against to see if it is successful. This is quite a list and for this first post I will not fulfill all of them, but I will try to at the once I don't complete in this first time around, in a later post.
This may also be a good time for a disclaimer: This project is a Proof of Concept, not a production grade implementation, so you should proberbly not use this to power any essential equipment. To make this real world usable, some people way smarter than me with more time would have to take a crack at it. This is mostly just an illustation of the principles involved
Multi back-ended: Any connection that can carry a real time two way connection should be usable as a backend for this (TCP, WebSockets, P2P, Bluetooth, smoke signals)
Graceful handover: Devices should be able to switch from one backend to the other, or re-establish a new connection on the exisiting backend in case of disconnects or a more favorable connection method becoming available. This should happen completly opaquly to the application using the framework
Should use and existing common technology for consumers: Applications using this framework should not have to adapt to use this framwork but just use the regular primitives of the exisiting platform, or a widely used component
Should add little to no overhead: For the framework to be useful it should have a very small impact on data sizes or transfer speeds
Connections should be secure: Connections between clients should have e2ee by default, again opaque to the application itself. Additionally enrolling devices and joining them together should be seemless.
Clients should be able to proxy requests through other clients: If for instance two applications can not connect directly, or they decide they would prefer an intermediary proxy, any trusted device should be able to proxy a connections, so for instance a bluetooth device could connect to a phone that then connects to a server over the internet.
So for this PoC I am going to use NodeJS. Mostly because it is the platform I am most at home in, and this project is going to require that I know the platform quite well. I am going to use Node's Stream
object as the primitive for this project as it is pretty universal and it should be possible to use it to integrate into most consuming projects quite easy.
So the hope is to come up with a framework that allows a client to be written something like this
import hyperconnected from '@morten-olsen/hyperconnected';
const { getDevices, createStream, listen } = await hyperconnected(config);
const devices = await getDevices();
const stream = await createStream(devices[0]);
listen((stream) => {
steam.pipe(fs.createWriteStream('./output'));
});
fs.createReadStream('./input').pipe(stream);
So the first thing we need is something that can handle the handoff between connections, so that we can have one stream object for our application, but we can swap the backend underneath without the application knowing.
import { Duplex, PassThrough } from 'stream';
import Duplexify from 'duplexify';
type GetStream<T> = (config: T) => Promise<Duplex>;
class Handoff<ConfigType = any> {
private _duplexStream: Duplex;
private _readStream: PassThrough;
private _writeStream: PassThrough;
private _getStream: GetStream<ConfigType>;
private _cache?: Buffer;
constructor(getStream: GetStream<ConfigType>) {
this._readStream = new PassThrough();
this._writeStream = new PassThrough();
this._duplexStream = new Duplexify(this._writeStream, this._readStream);
this._getStream = getStream;
this._reset();
}
public get out() {
return this._writeStream;
}
public get in() {
return this._readStream;
}
public get stream() {
return this._duplexStream;
}
private _reset() {
this._duplexStream.pause();
this._readStream.unpipe();
this._writeStream.unpipe();
}
public attach = async (config: ConfigType) => {
this._reset();
const stream = await this._getStream(config);
if (this._cache) {
stream.write(this._cache);
this._cache = undefined;
}
const reconnect = () => {
this._duplexStream.pause();
stream.off('error', reconnect);
stream.off('close', reconnect);
this.attach(config);
};
stream.on('error', reconnect);
stream.on('close', reconnect);
stream.on('data', chunk => this._readStream.push(chunk));
this._writeStream.on('data', (chunk) => {
if (stream.destroyed) {
this._cache = Buffer.concat([
this._cache || Buffer.alloc(0),
chunk,
]);
} else {
stream.write(chunk);
}
});
this._duplexStream.resume();
}
}
export default Handoff;
Phew, a few things to go through here. First of all is a setup we are going to be using a few different places, which allows us to have an outer stream which we can pause so we can do setup of our inner stream, and after that resume it and allow traffic through.
We do that using three stream objects, our _readStream
that the consuming application will use for reading data. Our _writeStream
which the consuming application use for writing and lastly we join both of these together into a _duplexStream
stream using duplexify
. _duplexStream
is the stream that an external application will be using.
We also need a _getStream
function, which is the function to call when ever we need to establish a connection to a client. This is the function responsible for figuring out which backend to use and then estabilshing a connection. The actual function is something we will create later.
Lastly we reset our connection to ensure nothing is piped and that all input and output data to the application is paused until we are ready.
And the last thing to cover in this handoff method is our attach
function. This function is the one responsible for finding out outer stream and our inner stream together.
It will reset before starting and then use our _getStream
function to create a new connection to the target device.
We have a small cache for lost connections to ensure we don't lose data, so we start by checking if there is anything in this cache. If there is, we write this to the other device to ensure all data gets send.
Next we prepare a reconnect
method. This is to be used when a connection fails, so we automatically setup a new connection and switch to that.
Then we pass the data event from our inner stream to our intermidiary read stream which will pass it to our outer stream
Then we pass the date event from our intermediary write stream into our inner stram, except if the stream is dead, then we store it in the cache instead.
Lastly we resume our outer stream allowing data to pass between the devices.
Currently our outer and inner connection only carries the data the applications are sharing, but we also want to be able to control aspects of the connection and have the client be able to communicate about framework tasks, so we want to multiplex our connection, and split that so all data send be the application goes to their respective stream, but also have a seperate system for sending and receiving connection managment data.
import { Duplex, PassThrough, Transform } from 'stream';
import Duplexify from 'duplexify';
type Listener = (type: number, content: Buffer) => void;
class ManagmentConnection {
private _stream: Duplex;
private _readStream: PassThrough;
private _writeStream: PassThrough;
private _duplexStream: Duplex;
private _listeners: Listener[] = [];
constructor(stream: Duplex) {
this._stream= stream;
const outTransform = new Transform();
outTransform._transform = (chunk, encoding, cb) => {
outTransform.push(Buffer.concat([
new Uint8Array([0]),
chunk,
]), encoding);
cb();
};
const inTransform = new Transform();
inTransform._transform = (chunk: Buffer, encoding, cb) => {
const type = chunk.readInt8(0);
const content = chunk.slice(1);
if (type > 0) {
this._listeners.forEach(l => l(type, content));
} else {
inTransform.push(content, encoding);
}
cb();
};
this._readStream = inTransform;
this._stream.pipe(this._readStream);
this._writeStream = outTransform;
outTransform.pipe(this._stream);
this._duplexStream = new Duplexify(this._writeStream, this._readStream);
}
public get stream() {
return this._duplexStream;
}
public write = (buffer: Buffer) => {
this._stream.write(Buffer.concat([
new Uint8Array([1]),
buffer,
]));
};
public unlisten = (listener: Listener) => {
this._listeners = this._listeners.filter(l => l !== listener);
};
public listen = (listener: Listener) => {
this._listeners.push(listener);
};
}
export default ManagmentConnection;
I will not dive to deep into this one. We again use the technique of using seperate read/write streams and combining them into one using duplexify
.
Additionally we make a transform on both, where when we pipe data from the _writeStream
into our inner stream we set and Int8 with the value of zero.
We also have a write method for sending data to the managment connection. This will prefix the data with an Int8 with the value of 1
When we read data from the inner stream we remove the first byte and look at it, if it is a 0 we pipe it to our outer stream, or if it is higher we dispatch the int and the remaining data to any listeners on the managment connection