Last active
September 8, 2021 05:14
-
-
Save DamionDamion/69c94ce070334cdb22277b38ff676e1e to your computer and use it in GitHub Desktop.
cute script to feed elasticsearch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from datetime import datetime | |
import json | |
import websocket | |
import time | |
from elasticsearch import Elasticsearch | |
from queue import Queue | |
from threading import Thread | |
import threading | |
es = Elasticsearch(['192.168.1.24']) | |
index = 'rislive' | |
def monq(q): | |
print('Monitoring queue..') | |
while True: | |
state = q.qsize() | |
time.sleep(0.300) | |
delta = q.qsize() - state | |
print('delta:', delta) | |
if(delta > 200): | |
print('queue delta > 200 - start additional workers?') | |
print('qsize', q.qsize()) | |
worker = Thread(target=do_es, args=(q,)) | |
worker.setDaemon(True) | |
worker.start() | |
def do_es(q): | |
print('queue worker starting..') | |
while not q.empty(): | |
d = json.loads(q.get())['data'] | |
d['timestamp'] = datetime.utcfromtimestamp(d['timestamp']).isoformat() | |
es.index(index=index, body=d) | |
q.task_done() | |
print('queue empty, exiting') | |
q = Queue(maxsize=0) | |
worker = Thread(target=monq, args=(q,)) | |
worker.setDaemon(True) | |
worker.start() | |
ws = websocket.WebSocket() | |
ws.connect("wss://ris-live.ripe.net/v1/ws/?client=damo") | |
params = { | |
"moreSpecific": True, | |
"host": "rrc00", | |
"socketOptions": { | |
"includeRaw": False | |
} | |
} | |
ws.send(json.dumps({ | |
"type": "ris_subscribe", | |
"data": params | |
})) | |
for data in ws: | |
parsed = json.loads(data) | |
if not 'ris_message' in parsed["type"]: | |
print(parsed["type"], parsed["data"]) | |
q.put(data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment