Skip to content

Instantly share code, notes, and snippets.

@aladagemre
Last active April 21, 2025 14:22
Show Gist options
  • Save aladagemre/41dd5a35ac2da585d4cd54d895326064 to your computer and use it in GitHub Desktop.
Save aladagemre/41dd5a35ac2da585d4cd54d895326064 to your computer and use it in GitHub Desktop.
NATS Jetstream pull consumer and publisher sample
"""Consumer Sample for NATS
It creates a "durable pull consumer" named "TEST_DURABLE".
The consumer only listens to the "TEST.pushed" subject.
It prints the json messages pushed to the "TEST.pushed" subject."""
import asyncio
import json
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
await nc.connect("nats://localhost:4222")
js = nc.jetstream()
try:
await js.add_consumer(
stream="TEST",
durable_name="TEST_DURABLE",
filter_subject="TEST.pushed",
ack_policy="explicit"
)
except Exception as e:
if "consumer already exists" not in str(e):
raise e
sub = await js.pull_subscribe("TEST.pushed", durable="TEST_DURABLE")
print("📥 Started listening...\n")
while True:
msgs = await sub.fetch(5, timeout=3)
for msg in msgs:
try:
decoded = json.loads(msg.data.decode())
print(f"📨 Message received: {decoded}")
await msg.ack()
except Exception as e:
print("❌ Error:", e)
if __name__ == '__main__':
asyncio.run(main())
"""Publisher Sample for NATS
It publishes json messages with random values and timestamp to the "TEST.pushed" subject.
"""
import asyncio
import json
import random
import time
from nats.aio.client import Client as NATS
from nats.js.api import StreamConfig, RetentionPolicy
async def main():
nc = NATS()
await nc.connect("nats://localhost:4222")
js = nc.jetstream()
# If stream exists, won't raise error, but uses it otherwise creates a new one
await js.add_stream(
name="TEST",
config=StreamConfig(
name="TEST",
subjects=["TEST.pushed"],
retention=RetentionPolicy.LIMITS,
)
)
while True:
payload = {
"timestamp": time.time(),
"value": random.randint(1, 100)
}
ack = await js.publish("TEST.pushed", json.dumps(payload).encode())
print(f"✅ Published: {payload} -> seq={ack.seq}")
await asyncio.sleep(2)
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment