Last active
September 24, 2019 18:27
-
-
Save ramya-rao-a/66e6d2eb99054a60b8efad7001e7bc6e to your computer and use it in GitHub Desktop.
Proposal for Event Processor in JS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// User class extending the base class provided by the library | |
class SamplePartitionProcessor extends PartitionProcessor { | |
async processEvents(events) { console.log(events);} | |
async processError(error) { console.log(error); } | |
async initialize() { console.log(`Started processing partition: ${this.partitionId}`); } | |
async close(reason) { console.log(`Stopped processing partition: ${this.partitionId} for reason ${reason}`); } | |
} | |
// 3 constructor overloads similar to EventHubClient, extra arguments are consumerGroupName, and PartitionProcessor | |
// Options are not shown, but are similar to what EventHubClientOptions support + max batch size, max wait time per batch | |
myProcessor = new EventProcessor(connectionstring, consumerGroupName, SamplePartitionProcessor); | |
myProcessor = new EventProcessor(connectionstring, eventhubName, consumerGroupName, SamplePartitionProcessor); | |
myProcessor = new EventProcessor(fullyQualifiedNamespace, eventhubName, credentials, consumerGroupName, SamplePartitionProcessor); | |
// And/Or use the EventHubClient as we have proof that adding more connections doesnt help JS much | |
myProcessor = new EventProcessor(eventHubclient, consumerGroupName, SamplePartitionProcessor) | |
// Start without partition manager, user attempt to checkpoint will fail | |
myProcessor.start(); | |
// Start without partition manager, but with top level error handler, user attempt to checkpoint will fail | |
myProcessor.start(errorHandler) | |
// Start by passing partition manager that uses blobs for checkpointing & load balancing | |
containerClient = new ContainerClient("storage-connection-string", "container-name"); | |
myPartitionManager = new BlobPartitionManager(containerClient); | |
myProcessor.start(myPartitionManager); | |
// Start by passing error handler & partition manager that uses blobs for checkpointing & load balancing | |
myProcessor.start(myPartitionManager, errorHandler); | |
// Stop the event processor | |
myProcessor.stop(); |
Yet another alternative based on below motivations from Service team
- Service team wants to encourage sending without setting partitionId
- Service team wants to discourage the use of low level consumer apis that target partitionId directly
Key changes:
createConsumer
andcreateProducer
are nowcreatePartitionConsumer
andcreatePartitionProducer
- There is a
send
andreceive
available directly on the client that dont require user to provide partitionId
// 3 constructor overloads for EventHubClient, options not shown
myClient = new EventHubClient(connectionstring);
myClient = new EventHubClient(connectionstring, eventhubName);
myClient = new EventHubClient(fullyQualifiedNamespace, eventhubName, credentials);
// Send without setting partitionId
await myClient.send(myBatch)
// Send by setting partitionId
myPartitionProducer = myClient.createPartitionProducer(partitionId);
await myPartitionProducer.send(myBatch)
// Receive without caring about partitionId.
// This spins up the EPH and starts receiving without checkpointing or load balancing support
myReceiver = myClient.receive(consumerGroupName, SamplePartitionProcessor);
await myReceiver.stop();
// Receive without caring about partitionId.
// This spins up the EPH and starts receiving with checkpointing or load balancing support
containerClient = new ContainerClient("storage-connection-string", "container-name");
myPartitionManager = new BlobPartitionManager(containerClient);
myReceiver = myClient.receive(consumerGroupName, SamplePartitionProcessor, myPartitionManager);
await myReceiver.stop();
// Receive from a particular partitionId.
myPartitionConsumer = myClient.createPartitionConsumer(consumerGroupName, partitionId, position, options);
myEvents = await myPartitionConsumer.receiveBatch(10);
Open Questions:
createBatch()
will now have to live on the client as well- what should the return type of
receive
look like? - what about the top level error handler?
Here are my quick thoughts:
- I like the send method that does not require users to deal/understand partition IDs.
- I don't like making the names to get producer/consumer longer and more complicated. I think we should keep them as GetProducer/Consumer, even if they take partitionId parameter.
- I don't like that method receive returns an object that needs to be started, and that it takes SamplePartitionProcessor. Receive is a verb and so it should actually do something. Also, it breaks the symmetry between reading and writing.
- I don't like that consumer.receive takes multiple callbacks. If we use callbacks, it would be better to have them be properties.
I don't like that method receive returns an object that needs to be started, ... . Receive is a verb and so it should actually do something.
The proposed API does not require user to "start" anything. The very act of calling receive
on the client starts the receiving of events
I don't like that consumer.receive takes multiple callbacks. If we use callbacks, it would be better to have them be properties.
That is a JS specific API that is under separate review. I have updated the above proposal to replace it with a receiveBatch()
call that returns a promise that resolves to an array of events
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Alternative where PartitionProcessor goes to start():