Last active
October 11, 2019 03:07
-
-
Save ramya-rao-a/d8c4caab4218caedce22b5bd273fbaf9 to your computer and use it in GitHub Desktop.
Event Hubs library proposal with single client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
This proposal builds on top of what was shipped in Preview 4 of Event Hubs library | |
and attempts to discard a separate notion of EPH by rolling that feature into the | |
existing concept of a "consumer". | |
This is mainly geared towards JS and can be tweaked for Python | |
This needs serious naming re-considerations for the methods on the consumer :) | |
*/ | |
const client = new EventHubClient(connectionString, eventHubName); | |
// Create producer to send without partitionId | |
const producer = client.createProducer(); | |
// Create producer to send specific partition | |
const producer = client.createProducer(paritionId); | |
// Create consumer to receive from all partitions | |
const consumer = client.createConsumer(consumerGroupName); | |
// Create consumer to receive from all partitions with load balancing | |
const consumer = client.createConsumer(consumerGroupName, partitionManager) | |
// Create consumer to receive from specific partition | |
const consumer = client.createConsumer(consumerGroupName, partitionId, eventPosition) | |
// Receive with push model with message & error handler | |
consumer.subscribe(onMessage, onError) | |
await consumer.close(); | |
// Callbacks for setup/teardown work for each partition | |
consumer.onInitializePartition = (partitionContext) => {// user code} | |
consumer.onClosePartition = (partitionContext, closeReason) => {// user code} | |
// Track last enqueued event info | |
const info = consumer.lastEnqueuedEventInfo[partitionId]; | |
console.log(info.sequenceNumber, info.offset, info.enqueuedTime, info.retrievalTime); | |
// checkpointing. Will throw error when no partition manager is passed | |
// this might leadus into the discussion if the load balancing & checkpointing features should be split | |
// so that users dealing with single partition or those not planning to have multiple instances of their app | |
// have the feature of checkpointing | |
await consumer.updateCheckpoint(event) | |
await consumer.updateCheckpoint(offset, partitionId) | |
// Custom Checkpointing requires people to provide customized partition manager & call updateCheckpoint on it | |
await customizedPartitionManger.updateCheckpoint(what-ever-this-needs-to-do-checkpointing) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment