Skip to content

Instantly share code, notes, and snippets.

@kokospapa8
Last active January 28, 2021 18:15
Show Gist options
  • Select an option

  • Save kokospapa8/ac05247c5f99593b5e7784503447fd4c to your computer and use it in GitHub Desktop.

Select an option

Save kokospapa8/ac05247c5f99593b5e7784503447fd4c to your computer and use it in GitHub Desktop.
CloudWatchToBigQuery Lambda Code
import os
import json
import zlib
from base64 import b64decode
from google.cloud import bigquery
from google.oauth2 import service_account
TABLE_NAME = "<project_id>.<dataset>.<table-name>"
def transform(json_payload):
rows_to_insert = []
for row in json_payload['logEvents']:
item = {}
item['id'] = row['id']
item['timestamp'] = row['timestamp'] / 1000
if 'extractedFields' in row:
for k, v in row['extractedFields'].items():
item[k] = v
rows_to_insert.append(item)
return rows_to_insert
def _get_client_key():
project_id = os.environ['project_id']
private_key_id = os.environ['private_key_id']
private_key = os.environ['private_key'].replace("\\n", "\n")
client_email = os.environ['client_email']
client_id = os.environ['client_id']
client_key = {
"type": "service_account",
"project_id": project_id,
"private_key_id": private_key_id,
"private_key": private_key,
"client_email": client_email,
"client_id": client_id,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/lambda-bigquery-stream%40hangfive-26bb4.iam.gserviceaccount.com"
}
return client_key
def lambda_handler(event, context):
data = event['awslogs']['data']
compressed_payload = b64decode(data)
cloudwatch_payload = zlib.decompress(compressed_payload, 16 + zlib.MAX_WBITS)
json_payload = json.loads(cloudwatch_payload)
client_key = _get_client_key()
credentials = service_account.Credentials.from_service_account_info(
client_key
)
client = bigquery.Client(credentials=credentials, project=credentials.project_id,)
# SKIP control message
if json_payload['messageType'] == 'CONTROL_MESSAGE':
return
rows_to_insert = transform(json_payload)
# print(rows_to_insert)
# BQ insert
errors = client.insert_rows_json(TABLE_NAME, rows_to_insert) # Make an API request.
if errors == []:
print("New rows have been added.")
else:
print("Encountered errors while inserting rows: {}".format(errors))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment