Created
April 28, 2018 17:31
-
-
Save wldcordeiro/db399052e7ee62a331c791b81325abb6 to your computer and use it in GitHub Desktop.
TypeError?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { from } from 'rxjs' | |
// This is the observable that is going to be emitted once the connection/subscription has been made. This will need to be subscribed to in order to receive actual messages. | |
const listener = client => | |
// This used to be Observable.create(observer => ...) | |
from(observer => { | |
client.on('message', (topic, message) => | |
observer.next(JSON.parse(message.toString())) | |
) | |
return () => { | |
// Remove event listeners | |
client.removeAllListeners('message') | |
} | |
}) | |
// This return an observable that will attempt to connect to a MQTT client. However, this observable is simply dealing with connection, not with messages. The only emitted item from this observable is another observable that could be listened to for messages | |
export default (topic, endpoint, { region, credentials }) => { | |
// This used to be Observable.create(observer => ...) | |
return from(observer => { | |
let client = null | |
Promise.all([import('aws-mqtt'), import('aws-sdk/global')]).then( | |
([AWSMqtt, { config }]) => { | |
config.update({ | |
region, | |
credentials, | |
}) | |
client = AWSMqtt.connect({ | |
WebSocket: window.WebSocket, | |
region, | |
endpoint, | |
credentials, | |
connectTimeout: 30 * 1000, | |
}) | |
client.on('connect', () => { | |
client.subscribe(`${topic}`, { qos: 1 }, err => { | |
if (err) { | |
observer.error(err) | |
} else { | |
// Send back another observable to be subscribed to | |
observer.next(listener(client)) | |
} | |
}) | |
}) | |
client.on('error', err => observer.error(err)) | |
} | |
) | |
return () => { | |
client && client.end() | |
observer.complete() | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment