Last active
October 10, 2019 01:25
-
-
Save ramya-rao-a/a54d8afb5b5cf292a250e58f42789a44 to your computer and use it in GitHub Desktop.
Event Hubs library proposal where sender and receiver have their own clients
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Key changes: | |
- A dedicated "client" for EPH, making this a EventHubConsumerClient that customers would first gravitate towards | |
- start() & stop() would be top level methods | |
- In .Net, there would be 4 settable callbacks. These would be in the builder for Java. | |
- For Python & JS, these would be passable to the start() method | |
- State management is done via the PartitionContext that gets passed to the callbacks. | |
- .Net & Java will have to allow users to extend the base class to store the state | |
- This results in a dedicated client for send for symmetry, and so EventHubProducerClient | |
- send() & createBatch() would be top level methods, no need to create producers and maintain them | |
- need to check with service if there is ever a need for users to maintain each producer link | |
- Low level consumer is not off of any client. This is a stand-alone class. | |
- In Python, .Net & Java, this would be under sub namespaces/modules | |
- All 3 entry points above would have constructor overloads such that they support | |
- connection string | |
- connection string, event hub name | |
- fully qualified namespace, event hub name, credentials | |
- Advanced: New object "EventHubConnection" when wanting to have clients or low level consumers share a connection | |
*/ | |
// ================================= Send start =========================================== | |
// Send events | |
function sendToAny() { | |
const producerClient = new EventHubProducerClient(connectionString); | |
const myBatch = await producerClient.createBatch(); | |
// add code to populate the batch | |
await producerClient.sendBatch(myBatch); | |
} | |
// Send events using partitionKey | |
function sendWithPartitionKey() { | |
const producerClient = new EventHubProducerClient(connectionString); | |
const batchOptions = { | |
partitionKey: "Apples!" | |
maxSizeInBytes: 1000 // If partitionId is also set, then throw error when creating batch | |
} | |
const myBatch = await producerClient.createBatch(batchOptions); | |
// add code to populate the batch | |
await producerClient.sendBatch(myBatch, sendOptions); | |
} | |
// Send events targeting specific partition | |
function sendToPartition() { | |
const producerClient = new EventHubProducerClient(connectionString); | |
const batchOptions = { | |
partitionId: "2" // If partitionKey is also set, then throw error when creating batch | |
} | |
// creates a new amqp link if this is the first time we see this partitionId, else re-uses link | |
const myBatch = await producerClient.createBatch(batchOptions); | |
await producerClient.sendBatch(myBatch, sendOptions); | |
} | |
// Send events to same partition, but dont re-use AMQP link, but use the same connection | |
// This mimics Track 1's creation of multiple producers from the same client | |
// Need to check with service if this is a case we should care about. | |
// Regardless, this is an advanced case, can be an add on, need not be in GA | |
function sendWithDifferentLinksameConnection() { | |
const connection = new EventHubConnection(connectionString); | |
const producerClient1 = new EventHubProducerClient(connection); | |
const producerClient2 = new EventHubProducerClient(connection); | |
} | |
// ================================= Receive start =========================================== | |
// Receive events | |
function receiveAll() { | |
const consumerClient = new EventHubConsumerClient(connectionString, consumerGroupName); | |
const processEvents = (events, partitionContext) => { | |
console.log(`Received ${events.length} events from partition with id ${partitionContext.partitionId}`); | |
console.log(`I can set state on it as well!!`); | |
partitionContext.state = "hello!"; | |
} | |
consumerClient.start(processEvents); | |
} | |
// Receive events, add error callbacks, initialize | |
function receiveAllWithAllCallbacks() { | |
const consumerClient = new EventHubConsumerClient(connectionString, consumerGroupName); | |
consumerClient.start(processEvents, onError); | |
// Or | |
consumerClient.start({processEvents, onError, onPartitionInitialize, onPartitionClose}); | |
} | |
// Receive events with checkpointing | |
function receiveAllWithCheckpoints() { | |
const consumerClient = new EventHubConsumerClient(connectionString, consumerGroupName, checkpointManager); | |
const processEvents = (events, partitionContext) => { | |
// Many ways to checkpoint | |
partitionContext.updateCheckpoint(event); // The given event is checkpointed | |
partitionContext.updateCheckpoint(event.offset); // The given offset is checkpointed | |
checkpointManager.updateCheckpoint(all-the-info-that-the-checkpoint-manager-needs-to-update-checkpoint); // Custom checkpointing!! | |
} | |
consumerClient.start(processEvents); | |
} | |
// ================================= Low level consumer =========================================== | |
// receive from given partition | |
function receive() { | |
const consumer = new EventHubPartitionConsumer(connectionString, consumerGroupName, partitionId, eventPosition); | |
const events = await consumer.receive(10); | |
} | |
// Share connection among consumers for different consumers | |
// This mimics Track 1's creation of multiple consumers from the same client | |
// Regardless, this is an advanced case, can be an add on, need not be in GA | |
function receive() { | |
const connection = new EventHubConnection(connectionString); | |
const consumer1 = new EventHubPartitionConsumer(connection, consumerGroupName, "1", eventPosition); | |
const consumer2 = new EventHubPartitionConsumer(connection, consumerGroupName, "2", eventPosition); | |
} | |
.NET
class Samples
{
//---------------- Send operations -------------------
// Send events without specifying partition
public void Send()
{
var eventHubProducerClient = new EventHubProducerClient(connectionString);
await eventHubProducerClient.SendAsync(eventBatch);
}
// Send events using a partition key - not a specific partition id
public void SendWithPartitionKey()
{
var sendOptions = new SendOptions
{
PartitionKey = "Key"
};
var eventHubProducerClient = new EventHubProducerClient(connectionString);
await eventHubProducerClient.SendAsync(eventBatch, sendOptions);
}
// Send events to a specific partition id
public void SendToSpecificPartition()
{
var sendOptions = new SendOptions
{
PartitionId = "1"
};
var eventHubProducerClient = new EventHubProducerClient(connectionString);
await eventHubProducerClient.SendAsync(eventBatch, sendOptions);
}
/*
Send events to same partition, but dont re-use AMQP link, but use the same connection
This mimics Track 1's creation of multiple producers from the same client
Need to check with service if this is a case we should care about.
Regardless, this is an advanced case, can be an add on, need not be in GA
*/
public void SendToPartitionsWithSeparateLinks()
{
var sendOptions = new SendOptions
{
PartitionId = "1"
};
// create a connection that will be shared by producers
var connection = new EventHubConnection(connectionString);
var eventHubProducerClient1 = new EventHubProducerClient(connection);
var eventHubProducerClient2 = new EventHubProducerClient(connection);
await eventHubProducerClient1.SendAsync(eventBatch, sendOptions);
await eventHubProducerClient2.SendAsync(eventBatch, sendOptions);
}
//---------------- Receive operations -------------------
// Receive from all partitions without load balancing or checkpointing
public void ReceiveAll()
{
var eventHubConsumerClient = new EventHubConsumerClient(connectionString);
eventHubConsumerClient.Received = async (partitionContext, events) =>
{
foreach (EventData e in events) {
var partitionId = partitionContext.PartitionId;
// process event
checkpoint = e.SequenceNumber.Value;
}
partitionContext.Checkpoint(checkpoint);
};
eventHubConsumerClient.Start();
eventHubConsumerClient.Stop();
}
// Receive from all partitions with load balancing and checkpointing
public void ReceiveAll(PartitionManager PartitionManager)
{
var eventHubConsumerClient = new EventHubConsumerClient(connectionString, partitionManager);
eventHubConsumerClient.Received = async (partitionContext, events) =>
{
foreach (EventData e in events) {
var partitionId = partitionContext.PartitionId;
// process event
checkpoint = e.SequenceNumber.Value;
}
partitionContext.Checkpoint(checkpoint);
};
eventHubConsumerClient.Start();
eventHubConsumerClient.Stop();
}
// Receive from all partitions with load balancing, checkpointing and partition state management
public void ReceiveAll(PartitionManager PartitionManager)
{
var eventHubConsumerClient = new EventHubConsumerClient<MyPartitionContext>(connectionString, partitionManager);
eventHubConsumerClient.Received = async (myPartitionContext, events) =>
{
foreach (EventData e in events) {
var partitionId = myPartitionContext.Id;
// process event
checkpoint = e.SequenceNumber.Value;
}
myPartitionContext.Checkpoint(checkpoint);
myPartitionContext.UpdateState("newState" + checkpoint);
};
eventHubConsumerClient.Start();
eventHubConsumerClient.Stop();
}
// Class for managing user state per partition that extends PartitionContext
// which has partition information and checkpoint method
class MyPartitionContext : PartitionContext
{
String _myState;
// maintain state per partition here
public String UpdateState(String newState)
{
_myState = newState;
}
}
// Receive from a specific partition using low level api
public void ReceivePartition()
{
var eventHubPartitionConsumer = new EventHubPartitionConsumer(connectionString, consumerGroup, partitionId1, eventPosition);
await foreach (EventData currentEvent in eventHubPartitionConsumer.SubscribeToEvents(cancellationSource.Token))
{
// process event
Console.Write("Processed event with sequence number " + currentEvent.SequenceNumber.Value);
}
}
// Receive from specific partitions using same connection and different links
public void ReceivePartitionSeparateLinks()
{
var connection = new EventHubConnection(connectionString);
var eventHubPartitionConsumer1 = new EventHubPartitionConsumer(connection, consumerGroup, partitionId1, eventPosition);
var eventHubPartitionConsumer2 = new EventHubPartitionConsumer(connection, consumerGroup, partitionId2, eventPosition);
await foreach (EventData currentEvent in eventHubPartitionConsumer1.SubscribeToEvents(cancellationSource.Token))
{
// process events from partitionId1
}
await foreach (EventData currentEvent in eventHubPartitionConsumer2.SubscribeToEvents(cancellationSource.Token))
{
// process events from partitionId2
}
}
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Equivalent design for Java