-
-
Save cphrmky/65802859d118ba445bc5bbf614155c50 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sseclient import SSEClient | |
import requests | |
from queue import Queue | |
import json | |
import threading | |
import socket | |
class ClosableSSEClient(SSEClient): | |
def __init__(self, *args, **kwargs): | |
self.should_connect = True | |
super(ClosableSSEClient, self).__init__(*args, **kwargs) | |
def _connect(self): | |
if self.should_connect: | |
super(ClosableSSEClient, self)._connect() | |
else: | |
raise StopIteration() | |
def close(self): | |
self.should_connect = False | |
self.retry = 0 | |
try: | |
self.resp.raw._fp.fp._sock.shutdown(socket.SHUT_RDWR) | |
self.resp.raw._fp.fp._sock.close() | |
except AttributeError: | |
pass | |
class RemoteThread(threading.Thread): | |
def __init__(self, parent, URL, function): | |
self.function = function | |
self.URL = URL | |
self.parent = parent | |
super(RemoteThread, self).__init__() | |
def run(self): | |
try: | |
self.sse = ClosableSSEClient(self.URL) | |
for msg in self.sse: | |
msg_data = json.loads(msg.data) | |
if msg_data is None: # keep-alives | |
continue | |
msg_event = msg.event | |
# TODO: update parent cache here | |
self.function((msg.event, msg_data)) | |
except socket.error: | |
pass # this can happen when we close the stream | |
except KeyboardInterrupt: | |
self.close() | |
def close(self): | |
if self.sse: | |
self.sse.close() | |
def firebaseURL(URL): | |
if '.firebaseio.com' not in URL.lower(): | |
if '.json' == URL[-5:]: | |
URL = URL[:-5] | |
if '/' in URL: | |
if '/' == URL[-1]: | |
URL = URL[:-1] | |
URL = 'https://' + \ | |
URL.split('/')[0] + '.firebaseio.com/' + URL.split('/', 1)[1] + '.json' | |
else: | |
URL = 'https://' + URL + '.firebaseio.com/.json' | |
return URL | |
if 'http://' in URL: | |
URL = URL.replace('http://', 'https://') | |
if 'https://' not in URL: | |
URL = 'https://' + URL | |
if '.json' not in URL.lower(): | |
if '/' != URL[-1]: | |
URL = URL + '/.json' | |
else: | |
URL = URL + '.json' | |
return URL | |
class subscriber: | |
def __init__(self, URL, function): | |
self.cache = {} | |
self.remote_thread = RemoteThread(self, firebaseURL(URL), function) | |
def start(self): | |
self.remote_thread.start() | |
def stop(self): | |
self.remote_thread.close() | |
self.remote_thread.join() | |
def wait(self): | |
self.remote_thread.join() | |
class FirebaseException(Exception): | |
pass | |
def put(URL, msg): | |
to_post = json.dumps(msg) | |
response = requests.put(firebaseURL(URL), data=to_post) | |
if response.status_code != 200: | |
raise FirebaseException(response.text) | |
def patch(URL, msg): | |
to_post = json.dumps(msg) | |
response = requests.patch(firebaseURL(URL), data=to_post) | |
if response.status_code != 200: | |
raise FirebaseException(response.text) | |
def get(URL): | |
response = requests.get(firebaseURL(URL)) | |
if response.status_code != 200: | |
raise FirebaseException(response.text) | |
return json.loads(response.text) | |
def push(URL, msg): | |
to_post = json.dumps(msg) | |
response = requests.post(firebaseURL(URL), data=to_post) | |
if response.status_code != 200: | |
raise Exception(response.text) | |
import os | |
from flask import abort, Flask, jsonify, request | |
import pytz | |
from datetime import datetime | |
app = Flask(__name__) | |
def is_request_valid(request): | |
is_token_valid = request.form['token'] == 'nsui4GlruL6QTRfuwO5nmobR' | |
is_team_id_valid = request.form['team_id'] == 'T8CBT81JP' | |
return is_token_valid and is_team_id_valid | |
@app.route('/glass', methods=['POST']) | |
def glass(): | |
if not is_request_valid(request): | |
abort(400) | |
try: | |
amount = int(request.form.get('text')) | |
except ValueError: | |
amount = 1 | |
translator = {'spottedfeather.02' : 'Principessa', 'sauhaarda' : 'Raunak', 'sauhaardammorpg' : 'hagu'} | |
username = translator[request.form.get('user_name')] | |
URL = 'retirementclub-2e565' | |
day = datetime.now(pytz.timezone('US/Pacific')).strftime('%Y-%m-%d') | |
response = '' | |
current = get(URL + '/' + day) | |
if current is None or username not in current: | |
response += 'Wow! It\'s your first drinker today! :clap: :clap: for your :potable_water:!\n' | |
current = {username : 0} | |
current[username] += amount | |
it = iter([(k, current[k]) for k in sorted(current, key=current.get, reverse=True)]) | |
max_user, max_glasses = next(it) | |
if max_user == username: | |
response += "Nice! You're in the lead with {} glasses. Keep up the LOVELY work! Keep your bladder in check though :laughing:!\n".format(max_glasses) | |
if len(current) > 1: | |
s_user, s_glasses = next(it) | |
response += "{} is in 2nd (aka last lol) place with {} glasses, keep drinking ;)\n".format(s_user, s_glasses) | |
else: | |
response += "You've drunk {} glasses of water, but {} is in first place with {} glasses. You've got this! Keep drinking, hopefully hyperhydroxia isn't a thing ;)\n".format(current[username], max_user, max_glasses) | |
patch(URL + '/' + day, current) | |
return jsonify( | |
response_type='in_channel', | |
text=response, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment