Last active
September 18, 2019 18:57
-
-
Save ramya-rao-a/f1d95a86b2c70598575b9eab77231c88 to your computer and use it in GitHub Desktop.
EP .Net API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
There are 3 players in this game. | |
- EventProcessor | |
- Provided by the sdk. | |
- Ideal Usage: Multiple instances on separate machines to balance partition load | |
- PartitionProcessor | |
- Abstract class provided by the sdk, meant to be extended by user where they provide code to process events | |
- Ideal Usage: | |
- Extended by user & passed to EventProcessor constructor via a factory in .Net & Java, | |
via the type/class name in JS & Python | |
- The only method they are absolutely required to implement is ProcessEvents | |
- Methods available for user to provide any set-up/tear-down for processing the particular partition. | |
- Method available for checkpointing. This is implemented by sdk and not overridable by user. | |
- Properties available for info on current partition, consumer group, event hub | |
- PartitionManager | |
- Abstract class provided by the sdk, meant to write/read checkpoints & other relevant info to aid load balancing | |
to a durable store | |
- Vendors to ship implementations of this using different storage solutions. | |
- User to choose one such package based on their storage solution of choice | |
- User to instantiate and pass to the EventProcessor constructor | |
*/ | |
namespace Azure.Messaging.EventHubs.Processor { | |
// ========================================================== | |
// PartitionProcessor is an abstract class definied by the sdk and to be extended by the user | |
// The methods here are called by the EventProcessor at various stages of processing a single partition. | |
// The properties here are all set internally by the EventProcessor. | |
// All properties except LastEnqueuedEventInformation are set only once. | |
// ========================================================== | |
public abstract class PartitionProcessor { | |
// Based on individual language patterns the below are "set" by the Event Processor | |
// or passed to the constructor of PartitionProcessor | |
protected int PartitionId { get; } | |
protected string ConsumerGroup { get; } | |
protected string EventHubName { get; } | |
protected string FullyQualifiedNamespace { get; } | |
// Set by user in the constructor or in the Initialize() method based on whether the parititon info | |
// was passed to the constructor or set directly by the Event Processor | |
// Read by EventProcessor if no checkpoint was found for this partition | |
public EventPosition InitialEventPosition { get; } | |
// Gets called by EventProcessor for every batch of events received. | |
// Max batch size is controlled via options passed to EventProcessor constructor. | |
// Max wait time for each batch is also controlled the same way. | |
abstract Task ProcessEventsAsync(IEnumerable<EventData> events); | |
// Gets called by EventProcessor before starting the receiving of events from the partition | |
// For example, set the InitialEventPosition to use when no checkpoints are present | |
abstract Task InitializeAsync(); | |
// Gets called by EventProcessor after it stops receiving events from the partiition | |
abstract Task CloseAsync(CloseReason reason); | |
// Gets called by EventProcessor for any error that occurs when running user code or | |
// any non-retryable error that occurs either when receiving events | |
abstract Task ProcessErrorAsync(Exception exception); | |
// Methods implemented by the sdk used to update checkpoints, not to be over-ridden by users. | |
Task UpdateCheckpointAsync(EventData event); | |
} | |
// Reasons for stopping the processing of the partition | |
public enum CloseReason { | |
PartitionOwnershipLost = 1, | |
Shutdown = 0, | |
} | |
// ========================================================== | |
// Event Processor defined by sdk, instantiated & run by user | |
// ========================================================== | |
public class EventProcessor { | |
public EventProcessor( | |
string fullyQualifiedNamespace, | |
string eventHubName, | |
string consumerGroup, | |
TokenCredential credential, | |
Func<PartitionProcessor> partitionProcessorFactory, // Python & JS can pass just the class type | |
PartitionManager partitionManager, | |
EventProcessorOptions options = null | |
); | |
public string Identifier { get; } | |
public Task StartAsync(); | |
public Task StopAsync(); | |
} | |
public class EventProcessorOptions { | |
public EventProcessorOptions(); | |
public int MaximumMessageCount { get; set; } | |
public Nullable<TimeSpan> MaximumReceiveWaitTime { get; set; } | |
public EventHubClientOptions { get; set; } | |
} | |
// ========================================================== | |
// PartitionManager is an abstract class defined by the sdk and to be implemented by the vendor | |
// Chosen by the user based on their storage solution of choice | |
// Instantiated by user and passed to the EventProcessor constructor | |
// ========================================================== | |
public abstract class PartitionManager { | |
// Clears checkpoint for given partition | |
public abstract Task DeleteCheckpointAsync( | |
string fullyQualifiedNamespace, | |
string eventHubName, | |
string consumerGroup, | |
int partitionId | |
); | |
// Updates the checkpoint for given partition. All relevant info is in the Checkpoint class. | |
// | |
// This method is invoked by the EventProcessor when the user expresses the desire to checkpoint | |
// an event using the `PartitionContext` instance passed to the user code. | |
public abstract Task UpdateCheckpointAsync(Checkpoint checkpoint); | |
// Lists info on which instance of an EventProcessor is currently processing which partitions | |
// along with the last checkpoint for each partition. | |
// | |
// This method is invoked by the EventProcessor when trying to determine if it needs to pick up | |
// another partition for processing in order to balance the load among multiple instances of EventProcessor | |
public abstract Task<IEnumerable<PartitionOwnership>> ListOwnershipAsync( | |
string fullyQualifiedNamespace, | |
string eventHubName, | |
string consumerGroup | |
); | |
// Updates the ownership for partitions by assigning partitions to different instances of EventProcessors | |
// | |
// This method is invoked by the EventProcessor after it runs its load balancing algorithm and determins that | |
// it needs to pick up one or more extra partitions for processing | |
public abstract Task<IEnumerable<PartitionOwnership>> ClaimOwnershipAsync(IEnumerable<PartitionOwnership> partitionOwnership); | |
} | |
public class Checkpoint { | |
public string FullyQualifiedNamespace { get; } | |
public string EventHubName { get; } | |
public string ConsumerGroup { get; } | |
public int PartitionId { get; } | |
public long Offset { get; } | |
public long SequenceNumber { get; } | |
public string OwnerIdentifier { get; } | |
} | |
public class PartitionOwnership { | |
public string FullyQualifiedNamespace { get; } | |
public string EventHubName { get; } | |
public string ConsumerGroup { get; } | |
public int PartitionId { get; } | |
public string ETag { get; set; } | |
public Nullable<DateTimeOffset> LastModifiedTime { get; } | |
public Nullable<long> Offset { get; } | |
public string OwnerIdentifier { get; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment