-
-
Save skozz/706815f17791f881b23ed89819087c79 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import os | |
import types | |
from flask import Flask, jsonify, make_response, request, url_for, send_file | |
from app import celery, firebase, firestore_db | |
from app.post_tasks import * | |
from app.master_admin_tasks import check_group_outdated_accounts_token | |
from firebase_admin import db as firebase_db, firestore | |
from . import post | |
from ..models import * | |
import json | |
import arrow | |
import dump | |
import requests | |
from dateutil.parser import parse | |
from datetime import datetime, date, timedelta | |
from ..google.flow import * | |
from ..logger import get_logger | |
from mongoengine import Q | |
from ..google import google_service | |
from ..emailer.emailer import * | |
import jwt | |
from config import config | |
import uuid | |
logger = get_logger(__name__) | |
def populate_location_google_posts(account_location, container, token, page_token=None): | |
logger.debug("Paginating posts for {}".format(account_location)) | |
headers = {'Authorization': 'Bearer {}'.format(token)} | |
url = 'https://mybusiness.googleapis.com/v4/accounts/{}/localPosts?pageSize=100'.format(account_location) | |
if page_token is not None: | |
url += '&pageToken={}'.format(page_token) | |
logger.debug("Sending headers {} for url {}".format(headers, url)) | |
try: | |
response = requests.get(url=url, headers=headers) | |
response.raise_for_status() | |
response = response.json() | |
posts = response.get('localPosts', []) | |
next_page_token = response.get('nextPageToken', None) | |
container += posts | |
if next_page_token is not None: | |
populate_location_google_posts(account_location, container, token, next_page_token) | |
else: | |
return True | |
except StandardError as e: | |
logger.exception(e) | |
def save_post_fs(p, account_location, gid, uid): | |
logger.debug("Saving to FS for {}/{}".format(gid, account_location)) | |
split_account_location = account_location.split('/') | |
account_id = split_account_location[0] | |
location_id = split_account_location[2] | |
_post = LocalPosts( | |
date=datetime.utcnow(), | |
gid=gid, | |
account_id=account_id, | |
location_id=location_id, | |
uid=uid, | |
post=p, | |
status='pending', | |
) | |
_post.save() | |
def update_post_fs(p, account_location, gid, uid): | |
split_account_location = account_location.split('/') | |
account_id = split_account_location[0] | |
location_id = split_account_location[2] | |
_post = LocalPosts( | |
date=datetime.utcnow(), | |
gid=gid, | |
account_id=account_id, | |
location_id=location_id, | |
uid=uid, | |
post=p, | |
status='pending', | |
) | |
_post.save() | |
def push_google_post(account_location, gid, uid, _post, bulk_post=False, locations=[], doc=None): | |
try: | |
logger.debug(gid) | |
logger.debug(uid) | |
token = google_service.obtain_access_token(account_location, gid, uid) | |
_account_location = account_location.replace('accounts/', '') | |
split_account_location = _account_location.split('/') | |
account_id = split_account_location[0] | |
location_id = split_account_location[2] | |
headers = {'Authorization': 'Bearer {}'.format(token)} | |
url = config.GOOGLE_MY_BUSINESS_URL + '/accounts/{}/localPosts'.format(_account_location) | |
logger.debug( | |
"Data to be posted %s\n " | |
"Headers %s\n" | |
"URL: %s\n" % ( | |
json.dumps(_post), headers, url)) | |
logger.debug('POSTED data init') | |
response = requests.post(url, data=json.dumps(_post), headers=headers) | |
response.raise_for_status() | |
response = response.json() | |
logger.debug(response) | |
if not bulk_post: | |
locations = [{'name': account_location}] | |
if doc is None: | |
fs_post = LocalPosts( | |
date=datetime.utcnow(), | |
gid=gid, | |
account_id=account_id, | |
location_id=location_id, | |
uid=uid, | |
post=response, | |
status=u'posted', | |
bulk_post=bulk_post, | |
locations=locations, | |
) | |
fs_post.save() | |
else: | |
fs_post = LocalPosts().get_by_doc(doc) | |
_post.update({ | |
'status': u'posted', | |
'date': datetime.utcnow(), | |
'account_id': account_id, | |
'location_id': location_id, | |
'bulk_post': bulk_post, | |
'locations': locations, | |
'uid': uid, | |
'post': dict(response), | |
}) | |
except StandardError as e: | |
fs_post = LocalPosts( | |
date=datetime.utcnow(), | |
gid=gid, | |
account_id=account_id, | |
location_id=location_id, | |
uid=uid, | |
status='failed', | |
error={'message': u'{}'.format(str(e))}, | |
bulk_post=False, | |
locations=[{'name': u'{}'.format(account_location)}], | |
) | |
fs_post.save() | |
logger.debug(response) | |
logger.warn("ERROR Posting post {}".format(str(e))) | |
@celery.task | |
def get_posts_schedule(): | |
with app.app_context(): | |
fs = firestore.client() | |
doc_ref = fs.collection(u'localposts') | |
# data | |
docs = doc_ref.where(u'status', u'==', u'SCHEDULED').get() | |
for doc in docs: | |
id_doc = doc.id | |
doc = doc.to_dict() | |
schedule = doc.get('schedule', None) | |
utc = arrow.utcnow().timestamp | |
_schedule = arrow.get(schedule).timestamp | |
if _schedule <= utc: | |
gid = doc.get('gid', None) | |
uid = doc.get('uid', None) | |
post = doc.get('post', None) | |
processed_locations = [] | |
locations = doc.get('locations', []) | |
for loc in locations: | |
account_id = loc.get('account_id') | |
location_id = loc.get('location_id') | |
name = 'accounts/{}/locations/{}'.format(account_id, location_id) | |
response = push_posts_to_google(name, post, gid, uid) | |
print(response) | |
_loc = loc | |
if loc.get('status', None) != 'POSTED': | |
if response.get('error', None) is not None: | |
doc_item = doc_ref.document(id_doc) | |
doc_item.update({'status': u'ERROR', 'error': response.get('error', None)}) | |
_loc.update(status=u'ERROR') | |
else: | |
doc_item = doc_ref.document(id_doc) | |
doc_item.update({'status': u'POSTED', 'post': response}) | |
_loc.update(status=u'POSTED') | |
processed_locations.append(_loc) | |
print(processed_locations) | |
doc_item = doc_ref.document(id_doc) | |
doc_item.update({'locations': processed_locations}) | |
return True | |
def push_posts_to_google(account_location, post, gid, uid): | |
response = '' | |
try: | |
token = google_service.obtain_access_token(account_location, gid, uid) | |
_account_location = account_location.replace('accounts/', '') | |
split_account_location = _account_location.split('/') | |
account_id = split_account_location[0] | |
location_id = split_account_location[2] | |
headers = {'Authorization': 'Bearer {}'.format(token)} | |
url = '{}/accounts/{}/localPosts'.format(config.GOOGLE_MY_BUSINESS_URL, _account_location) | |
logger.debug( | |
"Data to be posted %s\n " | |
"Headers %s\n" | |
"URL: %s\n" % ( | |
json.dumps(post), headers, url)) | |
logger.debug('POSTED data init') | |
response = requests.post(url, data=json.dumps(post), headers=headers) | |
response.raise_for_status() | |
response = response.json() | |
return response | |
except StandardError as e: | |
from pprint import pprint | |
import traceback | |
print('--' * 20 + 'AN ERROR HAS OCCURRED' + '--' * 10) | |
pprint(e) | |
pprint(traceback.print_exc()) | |
return {'error': {'message': 'Error Posting post', 'error': response.json()}} | |
def check_exists_posted_fs(_name, account_id, location_id): | |
fs = firestore.client() | |
doc_ref = fs.collection(u'localposts') | |
status = False | |
# data | |
docs = doc_ref.where(u'status', u'==', u'POSTED').where(u'account_id', u'==', u'{}'.format(account_id)).where( | |
u'location_id', u'==', u'{}'.format(location_id)).get() | |
for doc in docs: | |
id_doc = doc.id | |
doc = doc.to_dict() | |
locations = doc.get('locations', []) | |
print('name checked') | |
print(_name) | |
for loc in locations: | |
if loc.get('status', None) == 'POSTED' and loc.get('name', None) == _name: | |
status = True | |
break | |
return status | |
# Queue task | |
@celery.task | |
def save_posts_fs_from_google(account_location, gid, uid=None): | |
logger.debug("Retrieving posts for {}/{}".format(gid, account_location)) | |
token = google_service.obtain_access_token(account_location, gid) | |
split_account_location = account_location.split('/') | |
account_id = split_account_location[0] | |
location_id = split_account_location[2] | |
try: | |
posts = [] | |
populate_location_google_posts(account_location, posts, token) | |
new_posts = [] | |
for item in posts: | |
p = dict(item) | |
# check name of post if exists in firestore | |
_name = p.get('name', None) | |
_post = LocalPosts( | |
date=datetime.utcnow(), | |
gid=gid, | |
account_id=account_id, | |
location_id=location_id, | |
uid=uid, | |
post=p, | |
status=u'POSTED', | |
posted_by=u'GOOGLE', | |
is_bulk=False, | |
locations=[{u'name': _name, u'status': u'POSTED', u'account_id': u'{}'.format(account_id), | |
u'location_id': u'{}'.format(location_id)}], | |
) | |
# check if exists post | |
if not check_exists_posted_fs(_name, account_id, location_id): | |
# if not exists in db save | |
_post.save() | |
new_posts.append(p) | |
response = len(new_posts) | |
return response | |
except StandardError as e: | |
logger.exception(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment