Created
June 9, 2023 21:43
-
-
Save edoakes/1b4c73a32a5bb01d0a264a263be7bb11 to your computer and use it in GitHub Desktop.
Example of how to use a message passing architecture with Ray Serve.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from ray import serve | |
@serve.deployment | |
class MessageConsumer: | |
def __init__(self, topic: str): | |
asyncio.get_running_loop().create_task( | |
self.poll_for_messages(topic) | |
) | |
def check_health(self): | |
pass # TODO: check the health of the task pulling from queue. | |
async def poll_for_messages(self, topic: str): | |
client = MessageQueueClient(topic) | |
while True: | |
message = await client.get_message() | |
result = await self.process_message(message) | |
# Can post the result to a downstream topic to chain logic together. | |
await client.post_message(downstream_topic, result) | |
async def process_message(self, message): | |
pass # TODO: fill in code to do preprocessing/inference/etc. | |
def build_app(args): | |
return MessageConsumer.options( | |
num_replicas=args.get("num_replicas", 1) | |
).bind(args["topic"]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
applications: | |
- name: message-consumer-topic-topic1 | |
import_path: message_consumer:build_app | |
args: | |
topic: topic1 | |
- name: message-consumer-topic-topic2 | |
import_path: message_consumer:build_app | |
args: | |
topic: topic2 | |
num_replicas: 2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The example above shows how you can define a set of Ray Serve deployments to process messages from a message queue (minus the relevant business logic).
In the config file, we define two different applications each pulling from a different topic. These can be scaled, updated, etc. independently as any normal Serve application.
An alternative to directly pulling from a message queue would be to have some kind of hook that pulls from the queue and sends the requests to Ray Serve over HTTP instead. That would avoid writing the custom logic to pull from the queue (and associated health checking, performance considerations) at the cost of requiring another component.