Created
April 26, 2021 15:00
-
-
Save hellovertex/05e2faadc649899e2115622afed4c9b8 to your computer and use it in GitHub Desktop.
Faust App that redirects websocket traffic via Kafka topic
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
1. Launch Kafka: | |
- $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties | |
- $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties | |
run via faust -A <filename> worker -l info | |
2. Faust library: | |
faust.App.agent: - main processing actor in Faust App | |
""" | |
import faust | |
from database import DatabaseHandler | |
from config import DB_PATH | |
# establish connection to database at db_path, create it if does not exist | |
dbh = DatabaseHandler(DB_PATH) | |
# create table named 'trades' [only if it does not exist] | |
dbh.create_table_trades() | |
# todo: serialize incoming data and write to database | |
app = faust.App( | |
'client', | |
broker='kafka://localhost:9092', | |
value_serializer='raw', | |
autodiscover=False, | |
# topic_partitions=3, | |
# broker_commit_every=100, | |
# stream_buffer_maxsize=65536, | |
) | |
topic = app.topic('ticks') | |
@app.agent(topic) | |
async def echo(ticks: faust.Stream): | |
async for tick in ticks: | |
data_mock = [] | |
dbh.insert_many('trades', data_mock) | |
print('echo from app 1 that wrote to db') | |
print(f'also {tick} has been written') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
from typing import Optional | |
import sys, getopt, os | |
class DatabaseHandler: | |
def __init__(self, abs_path: str): | |
""" Creates or uses existing database located in {abs_path} and stores its connection """ | |
try: | |
self.conn = sqlite3.connect(str(abs_path)) | |
assert self.conn is not None | |
except sqlite3.Error as e: | |
# database could not be created (or opened) -> abort | |
print(e) | |
sys.exit(1) | |
def create_table_trades(self) -> None: | |
try: | |
sql = 'CREATE TABLE trades(event_type TEXT, ' \ | |
'event_time INT, ' \ | |
'symbol TEXT, ' \ | |
'trade_id INT NOT NULL PRIMARY KEY,' \ | |
'price REAL,' \ | |
'quantity REAL,' \ | |
'buyer_order_id INT,' \ | |
'seller_order_id INT,' \ | |
'is_market_maker BOOLEAN CHECK (is_market_maker IN (0, 1)),' \ | |
'ignore BOOLEAN CHECK (ignore IN (0, 1)))' | |
self.conn.cursor().execute(sql) | |
# conn.commit() | |
except sqlite3.Error as e: | |
print('creation failed:') | |
print(e) | |
def remove_table(self, tablename: str) -> None: | |
sql = "DROP TABLE {}".format(tablename) | |
try: | |
self.conn.cursor().executescript(sql) | |
self.conn.commit() | |
except sqlite3.Error as e: | |
print('removing failed:') | |
print(e) | |
def insert_many(self, tablename, data, mode='replace') -> bool: | |
assert mode in ['ignore', 'replace'] | |
try: | |
if mode == "ignore": | |
self.conn.cursor().executemany( | |
"INSERT OR IGNORE INTO {} VALUES (?,?,?,?,?,?,?,?,?,?)".format(tablename), data) | |
elif mode == "replace": | |
self.conn.cursor().executemany( | |
"INSERT OR REPLACE INTO {} VALUES (?,?,?,?,?,?,?,?,?,?)".format(tablename), data) | |
self.conn.commit() | |
return True | |
except sqlite3.Error as e: | |
print('insertion failed:') | |
print(e) | |
return False |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
1. Launch Kafka: | |
- $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties | |
- $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties | |
run via faust -A <filename> worker -l info | |
consumer1 run via faust -A <filename> worker -l info -p 6066 | |
consumer2 run via faust -A <filename> worker -l info -p 6067 | |
etc... | |
2. Faust library: | |
faust.App.agent: - main processing actor in Faust App | |
- unary async function - receives stream as its argument | |
faust.Stream: - async python generator | |
- abstractions over a kafka topic | |
- can apply operations on the stream (filter(), take(5)) | |
faust.Record: - data transfer object: Represents events via python classes inheriting it | |
- serialization, deserialization | |
From: https://faust.readthedocs.io/en/latest/userguide/settings.html#guide-settings | |
app configuration: | |
broker: - only supported production transport is kafka://, | |
- uses the aiokafka client under the hood, for consuming and producing messages | |
- can specify multiple hosts, e.g. broker='kafka://kafka1.example.com:9092;kafka2.example.com:9092' [fault tolerance] | |
store: - default is memory:// | |
- production should use rocksdb:// | |
processing_guarantee: “at_least_once” (default) and “exactly_once”. | |
Note that if exactly-once processing is enabled consumers are configured with isolation.level="read_committed" | |
and producers are configured with retries=Integer.MAX_VALUE and enable.idempotence=true per default. | |
Note that by default exactly-once processing requires a cluster of at least three brokers what is the recommended setting for production. | |
For development you can change this, by adjusting broker setting transaction.state.log.replication.factor to the number of brokers you want to use. | |
autodiscover: set to false, see https://faust.readthedocs.io/en/latest/userguide/settings.html#autodiscover | |
""" | |
from kafka import KafkaProducer | |
from websockets.client import WebSocketClientProtocol | |
from websockets.exceptions import ConnectionClosed | |
from mode import Service | |
import websockets | |
import faust | |
from faust import App | |
import random | |
import numpy as np | |
import asyncio | |
import websocket | |
import time | |
import logging | |
sock_addr = "wss://stream.binance.com:9443/ws/btcusdt@trade" | |
class WebSocketClient(): | |
def __init__(self, sock_addr, **kwargs): | |
self.producer = KafkaProducer() | |
self.sock_addr = sock_addr | |
# need to implement as service when we want to gracefully shutdown | |
async def connect(self): | |
''' | |
returns a WebSocketClientProtocol, used to send and receive messages | |
''' | |
self.connection = await websockets.client.connect(self.sock_addr) | |
if self.connection.open: | |
print('Connection stablished. Client correcly connected') | |
return self.connection | |
async def sendMessage(self, message): | |
await self.connection.send(message) | |
async def receiveMessage(self, connection: WebSocketClientProtocol): | |
while True: | |
try: | |
msg = await connection.recv() | |
self.producer.send(topic='ticks', value=f'{msg}'.encode()) | |
except websockets.exceptions.ConnectionClosed: | |
print('Connection with server closed') | |
#todo: handle 24h disconnects | |
break | |
if __name__ == '__main__': | |
client = WebSocketClient("wss://stream.binance.com:9443/ws/btcusdt@trade") | |
loop = asyncio.get_event_loop() | |
# Start connection and get client connection protocol | |
connection = loop.run_until_complete(client.connect()) | |
# Start listener | |
tasks = [ | |
asyncio.ensure_future(client.receiveMessage(connection)), | |
] | |
loop.run_until_complete(asyncio.wait(tasks)) | |
# producer = KafkaProducer() | |
# i = 0 | |
# while True: | |
# producer.send(topic='ticks', value=str(i).encode()) | |
# producer.send(topic='ticks_ml', value=str(i).encode()) | |
# print(f'sent both msgs at iteration {i}') | |
# i+=1 | |
# time.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment