Created
March 20, 2020 10:49
-
-
Save rochoa/e8d4aff09dbe83279bc41ccc8359ce36 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import click | |
import csv | |
import json | |
import requests | |
from io import StringIO | |
from datetime import datetime | |
from kafka import KafkaConsumer | |
import atexit | |
def flush(endpoint, datasource_name, token, rows): | |
try: | |
csv_chunk = StringIO() | |
writer = csv.writer(csv_chunk, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) | |
writer.writerows(rows) | |
except: | |
print(f"Failed to process rows as block, row-by-row processing fallback") | |
csv_chunk = StringIO() | |
writer = csv.writer(csv_chunk, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) | |
for r in rows: | |
try: | |
writer.writerow(r) | |
except: | |
print(f"Failed to process row =", r) | |
append_url = f'{endpoint}/v0/datasources?mode=append&name={datasource_name}' | |
response = requests.post(append_url, | |
data=csv_chunk.getvalue(), | |
headers={'Authorization': f'Bearer {token}'} | |
) | |
print(f"[{datetime.now()}] status={response.status_code} datasource={datasource_name}, rows={len(rows)}") | |
@click.command() | |
@click.argument('topic') | |
@click.argument('datasource_name') | |
@click.option('--token', envvar='TOKEN') | |
@click.option('--endpoint', default='https://api.tinybird.co') | |
def consume(topic, datasource_name, token, endpoint): | |
consumer = KafkaConsumer(topic, value_deserializer=json.loads, group_id=f"tb_{datasource_name}") | |
rows = [] | |
atexit.register(lambda: flush(endpoint, datasource_name, token, rows)) | |
for msg in consumer: | |
rows.append(msg.value) | |
if len(rows) >= 20000: | |
flush(endpoint, datasource_name, token, rows) | |
rows = [] | |
if __name__ == '__main__': | |
consume() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment