Skip to content

Instantly share code, notes, and snippets.

@usunyu
Created October 30, 2021 03:11
Show Gist options
  • Save usunyu/753fce8781df3e0f108f96f3a39965b0 to your computer and use it in GitHub Desktop.
Save usunyu/753fce8781df3e0f108f96f3a39965b0 to your computer and use it in GitHub Desktop.
Kraken websockets
from enum import Enum
import asyncio
import traceback
import websockets
import json
from abc import ABC, abstractmethod
from typing import Callable, Optional, List, TypedDict, AsyncIterable
class ExchangeType(Enum):
BITSTAMP = 'bitstamp'
KRAKEN = 'kraken'
# class CurrencyPairType(Enum):
# USDT_USD = 'usdt_usd'
# USDT_EUR = 'usdt_eur'
# EUR_USD = 'eur_usd'
# USDC_USD = 'usdc_usd'
# USDC_EUR = 'usdc_eur'
class WebSocketChannelType(Enum):
TRADE = 'trade'
ORDER = 'order'
ORDER_BOOK = 'order_book'
MY_ORDER = 'my_order'
MY_TRADE = 'my_trade'
class WebSocketChannelType(Enum):
TRADE = 'trade'
ORDER = 'order'
ORDER_BOOK = 'order_book'
MY_ORDER = 'my_order'
MY_TRADE = 'my_trade'
class StreamBase(ABC):
MESSAGE_TIMEOUT = 10.0
PING_TIMEOUT = 5.0
RECONNECT_WAIT = 30.0
def __init__(self, ws_address: str, currency_pairs: List[str],
subscribe_channels: List[WebSocketChannelType]):
self._ws_address: str = ws_address
self._ws: Optional[websockets.WebSocketClientProtocol] = None
self._currency_pairs: List[str] = currency_pairs
self._subscribe_channels: List[WebSocketChannelType] = subscribe_channels
self._shutdown: bool = False
# add new stream channel function here
self._channel_function_map: TypedDict[WebSocketChannelType, Callable] = {
WebSocketChannelType.ORDER_BOOK: self.listen_for_order_book,
WebSocketChannelType.TRADE: self.listen_for_trade,
# WebSocketChannelType.MY_ORDER: self.listen_for_my_order,
# TODO WebSocketChannelType.ORDER: self.listen_for_order,
}
@staticmethod
def load(exchange: ExchangeType, currency_pairs: List[str],
subscribe_channels: List[WebSocketChannelType]):
# add new exchange stream class initialization here
# if exchange == ExchangeType.BITSTAMP:
# from exchange.bitstamp.bitstamp_stream import BitstampStream
# return BitstampStream(currency_pairs, subscribe_channels)
if exchange == ExchangeType.KRAKEN:
return KrakenStream(currency_pairs, subscribe_channels)
raise ModuleNotFoundError
async def start(self):
# start stream tasks
stream_tasks: List[asyncio.Task] = [asyncio.create_task(
self._channel_function_map[channel]()) for channel in self._subscribe_channels]
await asyncio.gather(*stream_tasks)
def stop(self):
self._shutdown = True
@abstractmethod
async def listen_for_order_book(self):
raise NotImplementedError
@abstractmethod
async def listen_for_my_order(self):
raise NotImplementedError
@abstractmethod
async def listen_for_trade(self):
raise NotImplementedError
class KrakenStream(StreamBase):
"""
https://docs.kraken.com/websockets/
"""
WS_ADDRESS = "wss://ws.kraken.com"
CHANNEL_ORDER_BOOK = "book"
CHANNEL_TRADE = "trade"
CHANNEL_ORDER_BOOK_DEPTH = 100
def __init__(self, currency_pairs: List[str],
subscribe_channels: List[WebSocketChannelType]):
super().__init__(self.WS_ADDRESS, currency_pairs, subscribe_channels)
def _parse_message(self, msg: str):
# data = json.loads(msg)
# print(data)
print(f"kraken message: {msg[0:20]}...")
async def _recv_raw_messages(self, ws: websockets.WebSocketClientProtocol) -> AsyncIterable[str]:
try:
while not self._shutdown:
try:
msg: str = await asyncio.wait_for(ws.recv(), timeout=self.MESSAGE_TIMEOUT)
if ((msg != "{\"event\":\"heartbeat\"}" and
"\"event\":\"systemStatus\"" not in msg and
"\"event\":\"subscriptionStatus\"" not in msg)):
yield msg
except asyncio.TimeoutError:
pong_waiter = await ws.ping()
await asyncio.wait_for(pong_waiter, timeout=self.PING_TIMEOUT)
except asyncio.TimeoutError:
print("WebSocket ping timed out. Going to reconnect...")
return
except websockets.exceptions.ConnectionClosed:
return
finally:
await ws.close()
async def listen_for_order_book(self):
while not self._shutdown:
try:
async with websockets.connect(self._ws_address) as ws:
self._ws: websockets.WebSocketClientProtocol = ws
await self._ws.send(json.dumps({
"event": "subscribe",
"pair": self._currency_pairs,
"subscription": {
"name": self.CHANNEL_ORDER_BOOK,
"depth": self.CHANNEL_ORDER_BOOK_DEPTH,
}
}))
print("Subscribed to kraken order book...")
async for raw_msg in self._recv_raw_messages(ws):
self._parse_message(raw_msg)
except asyncio.CancelledError:
raise
except Exception as e:
print(
f"Unexpected error with WebSocket connection. Retrying after {self.RECONNECT_WAIT} seconds...")
print(str(e))
print(traceback.format_exc())
await asyncio.sleep(self.RECONNECT_WAIT)
async def listen_for_trade(self):
while not self._shutdown:
try:
async with websockets.connect(self._ws_address) as ws:
self._ws: websockets.WebSocketClientProtocol = ws
await self._ws.send(json.dumps({
"event": "subscribe",
"pair": self._currency_pairs,
"subscription": {
"name": self.CHANNEL_TRADE,
}
}))
print("Subscribed to kraken trades...")
async for raw_msg in self._recv_raw_messages(ws):
self._parse_message(raw_msg)
except asyncio.CancelledError:
raise
except Exception as e:
print(
f"Unexpected error with WebSocket connection. Retrying after {self.RECONNECT_WAIT} seconds...")
print(str(e))
print(traceback.format_exc())
await asyncio.sleep(self.RECONNECT_WAIT)
async def listen_for_my_order(self):
# TODO
pass
if __name__ == '__main__':
async def start_stream(stream):
# start stream
stream_tasks: List[asyncio.tasks.Task] = []
stream_tasks.append(asyncio.create_task(stream.start()))
await asyncio.gather(*stream_tasks)
stream = StreamBase.load(
ExchangeType.KRAKEN,
[
'USDT/USD',
'USDT/EUR',
'EUR/USD',
'USDC/USD',
'USDC/EUR'
],
[
WebSocketChannelType.ORDER_BOOK,
WebSocketChannelType.TRADE,
]
)
try:
asyncio.run(start_stream(stream))
except KeyboardInterrupt:
stream.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment