Created
May 17, 2019 20:44
-
-
Save ericsysmin/7c368d9d38367c10426a4d43c1e19ebf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/pyhton | |
# Copyright (c) 2019 Avi Networks | |
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) | |
from __future__ import (absolute_import, division, print_function) | |
__metaclass__ = type | |
from ansible.errors import AnsibleError | |
from ansible.module_utils.gcp_utils import GcpSession | |
import json | |
class GcpMockModule(object): | |
def __init__(self, params): | |
self.params = params | |
def fail_json(self, *args, **kwargs): | |
raise AnsibleError(kwargs['msg']) | |
def raise_for_status(self, response): | |
try: | |
response.raise_for_status() | |
except getattr(requests.exceptions, 'RequestException'): | |
self.fail_json(msg="GCP returned error: %s" % response.json()) | |
class GcpKmsFilter(): | |
def run(self, method, **kwargs): | |
params = { | |
'ciphertext': kwargs.get('ciphertext', None), | |
'plaintext': kwargs.get('plaintext', None), | |
'additional_authenticated_data': kwargs.get('additional_authenticated_data', None), | |
'key_ring': kwargs.get('key_ring', None), | |
'crypto_key': kwargs.get('crypto_key', None), | |
'projects': kwargs.get('projects', None), | |
'scopes': kwargs.get('scopes', None), | |
'locations': kwargs.get('locations', 'global'), | |
'auth_kind': kwargs.get('auth_kind', None), | |
'service_account_file': kwargs.get('service_account_file', None), | |
'service_account_email': kwargs.get('service_account_email', None), | |
} | |
if not params['scopes']: | |
params['scopes'] = ['https://www.googleapis.com/auth/cloudkms'] | |
fake_module = GcpMockModule(params) | |
if method == "encrypt": | |
return self.kms_encrypt(fake_module) | |
elif method == "decrypt": | |
return self.kms_decrypt(fake_module) | |
def kms_decrypt(self, module): | |
payload = {"ciphertext": module.params['ciphertext']} | |
if module.params['additional_authenticated_data']: | |
payload['additionalAuthenticatedData'] = module.params['additional_authenticated_data'] | |
auth = GcpSession(module, 'cloudkms') | |
url = "https://cloudkms.googleapis.com/v1/projects/{projects}/locations/{locations}/" \ | |
"keyRings/{key_ring}/cryptoKeys/{crypto_key}:decrypt".format(**module.params) | |
response = auth.post(url, body=payload) | |
return response.json()['plaintext'] | |
def kms_encrypt(self, module): | |
payload = {"plaintext": module.params['plaintext']} | |
if module.params['additional_authenticated_data']: | |
payload['additionalAuthenticatedData'] = module.params['additional_authenticated_data'] | |
auth = GcpSession(module, 'cloudkms') | |
url = "https://cloudkms.googleapis.com/v1/projects/{projects}/locations/{locations}/" \ | |
"keyRings/{key_ring}/cryptoKeys/{crypto_key}:encrypt".format(**module.params) | |
response = auth.post(url, body=payload) | |
return response.json()['ciphertext'] | |
def gcp_kms_encrypt(plaintext, **kwargs): | |
return GcpKmsFilter().run('encrypt', plaintext=plaintext, **kwargs) | |
def gcp_kms_decrypt(ciphertext, **kwargs): | |
return GcpKmsFilter().run('decrypt', ciphertext=ciphertext, **kwargs) | |
class FilterModule(object): | |
def filters(self): | |
return { | |
'gcp_kms_encrypt': gcp_kms_encrypt, | |
'gcp_kms_decrypt': gcp_kms_decrypt | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment