Last active
April 27, 2020 16:50
-
-
Save nagadomi/6d6cacc9541bc0c08e82aca3d43892c8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pip3 install websocket-client | |
import websocket | |
from multiprocessing import Process, Value, Lock, Event | |
from datetime import datetime | |
import dateutil.parser | |
import json | |
from time import sleep | |
class SFDInfo(): | |
def __init__(self): | |
self._fx_ltp = Value('i', 0) | |
self._btc_ltp = Value('i', 0) | |
self._disparity = Value('d', 0) | |
self._updated_at = Value('d', 0) | |
self._last_sfd_at = Value('d', 0) | |
self.lock = Lock() | |
self.event = Event() | |
@property | |
def fx_ltp(self): | |
return self._fx_ltp.value | |
@fx_ltp.setter | |
def fx_ltp(self, val): | |
self._fx_ltp.value = int(val) | |
@property | |
def btc_ltp(self): | |
return self._btc_ltp.value | |
@btc_ltp.setter | |
def btc_ltp(self, val): | |
self._btc_ltp.value = int(val) | |
@property | |
def updated_at(self): | |
return datetime.fromtimestamp(self._updated_at.value) | |
@updated_at.setter | |
def updated_at(self, d): | |
self._updated_at.value = d.timestamp() | |
@property | |
def last_sfd_at(self): | |
return datetime.fromtimestamp(self._last_sfd_at.value) | |
@last_sfd_at.setter | |
def last_sfd_at(self, d): | |
self._last_sfd_at.value = d.timestamp() | |
@property | |
def disparity(self): | |
return self._disparity.value | |
@disparity.setter | |
def disparity(self, val): | |
self._disparity.value = val | |
@property | |
def ready(self): | |
return self.fx_ltp > 0 and self.btc_ltp > 0 | |
# for me (-_-) | |
def unsafe_with_lock(self, side, unsafe_range=0.1): | |
with self.lock: | |
return self.unsafe(side, unsafe_range) | |
def unsafe(self, side, unsafe_range=0.1): | |
assert(side == "BUY" or side == "SELL") | |
disp = self.disparity | |
if side == "BUY": | |
return \ | |
(5.0 - unsafe_range < disp and disp < 5.0 + unsafe_range) or \ | |
(10.0 - unsafe_range < disp and disp < 10.0 + unsafe_range) or \ | |
(15.0 - unsafe_range < disp and disp < 15.0 + unsafe_range) or \ | |
(20.0 - unsafe_range < disp and disp < 20.0 + unsafe_range) | |
else: | |
return \ | |
(-5.0 - unsafe_range < disp and disp < -5.0 + unsafe_range) or \ | |
(-10.0 - unsafe_range < disp and disp < -10.0 + unsafe_range) or \ | |
(-15.0 - unsafe_range < disp and disp < -15.0 + unsafe_range) or \ | |
(-20.0 - unsafe_range < disp and disp < -20.0 + unsafe_range) | |
def sfd_process(sfd): | |
FX_CHANNEL = "lightning_ticker_FX_BTC_JPY" | |
BTC_CHANNEL = "lightning_ticker_BTC_JPY" | |
class BitflyerSubscriberCallback(): | |
def __init__(self): | |
pass | |
def on_open(self, ws): | |
channels = [FX_CHANNEL, | |
BTC_CHANNEL] | |
for channel in channels: | |
ws.send(json.dumps({"method": "subscribe", "params": {"channel": channel}})) | |
def on_message(self, ws, message): | |
message = json.loads(message) | |
if message["method"] != "channelMessage": | |
return | |
message = message["params"] | |
tick = message["message"] | |
timestamp = dateutil.parser.parse(tick["timestamp"]) | |
with sfd.lock: | |
changed = False | |
if message["channel"] == FX_CHANNEL: | |
if sfd.fx_ltp != tick["ltp"]: | |
sfd.fx_ltp = tick["ltp"] | |
changed = True | |
elif message["channel"] == BTC_CHANNEL: | |
if sfd.btc_ltp != tick["ltp"]: | |
sfd.btc_ltp = tick["ltp"] | |
changed = True | |
if sfd.ready and changed: | |
sfd.disparity = (sfd.fx_ltp / sfd.btc_ltp) * 100 - 100 | |
if (sfd.fx_ltp > sfd.btc_ltp and sfd.disparity >= 10.0) or \ | |
(sfd.fx_ltp < sfd.btc_ltp and sfd.disparity <= -10.0): | |
sfd.last_sfd_at = timestamp | |
sfd.updated_at = timestamp | |
sfd.event.set() | |
cb = BitflyerSubscriberCallback() | |
while True: | |
ws = websocket.WebSocketApp("wss://ws.lightstream.bitflyer.com/json-rpc", | |
on_message = cb.on_message, | |
on_open = cb.on_open) | |
ws.run_forever(ping_interval=30, ping_timeout=10) | |
sleep(10) | |
if __name__ == '__main__': | |
from time import sleep | |
EVENT_DRIVEN = True | |
FACE_D = "danger (゚Д゚)" | |
FACE_N = "normal (・∀・)" | |
sfd = SFDInfo() | |
proc1 = Process(target=sfd_process, args=(sfd,)) | |
proc1.start() | |
while True: | |
if EVENT_DRIVEN: | |
sfd.event.wait() | |
sfd.event.clear() | |
else: | |
sleep(1) | |
if not sfd.ready: | |
continue | |
with sfd.lock: | |
buy = FACE_D if sfd.unsafe("BUY") else FACE_N | |
sell = FACE_D if sfd.unsafe("SELL") else FACE_N | |
print("{}, fx: {}, btc: {}, disparity: {:.2f}%, last_sfd: {}, buy: {}, sell: {}".format( | |
sfd.updated_at, | |
sfd.fx_ltp, sfd.btc_ltp, | |
sfd.disparity, | |
sfd.last_sfd_at, | |
buy, sell | |
)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment