Created
May 5, 2023 13:51
-
-
Save mkulke/ea8a5e82cfeb92a351ae3893b2a02612 to your computer and use it in GitHub Desktop.
Client Side Key Encryption with Azure Blob Storage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, uuid | |
from azure.identity import DefaultAzureCredential | |
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives.keywrap import aes_key_wrap, aes_key_unwrap | |
from base64 import b64decode | |
KEK_B64 = 'DGLwgnzlaBYAlxRuuCkthRgYRfUWryR8Cqo79g88Gek=' | |
class KeyWrapper: | |
def __init__(self, kek_b64): | |
self.kek = b64decode(kek_b64) | |
self.backend = default_backend() | |
self.kid = 'local' | |
def wrap_key(self, key, algorithm='A256KW'): | |
if algorithm == 'A256KW': | |
return aes_key_wrap(self.kek, key, self.backend) | |
raise ValueError(_ERROR_UNKNOWN_KEY_WRAP_ALGORITHM) | |
def unwrap_key(self, key, algorithm): | |
if algorithm == 'A256KW': | |
return aes_key_unwrap(self.kek, key, self.backend) | |
raise ValueError(_ERROR_UNKNOWN_KEY_WRAP_ALGORITHM) | |
def get_key_wrap_algorithm(self): | |
return 'A256KW' | |
def get_kid(self): | |
return self.kid | |
try: | |
account_url = "https://mycontainer.blob.core.windows.net" | |
default_credential = DefaultAzureCredential() | |
# Create the client | |
blob_service_client = BlobServiceClient(account_url, credential=default_credential) | |
container_name = "my-container" | |
# Create a key wrapper | |
kek = KeyWrapper(KEK_B64) | |
file_name = "people.csv" | |
# Create a blob client using the local file name as the name for the blob | |
blob_client = blob_service_client.get_blob_client(container=container_name, blob=file_name) | |
blob_client.require_encryption = True | |
blob_client.key_encryption_key = kek | |
blob_client.encryption_version = '2.0' | |
print("\nUploading to Azure Storage as blob:\n\t" + file_name) | |
with open(file=local_file_name, mode="rb") as data: | |
blob_client.upload_blob(data, overwrite=True) | |
// the blob should be encrypted on the container | |
file_name = "people_roundtrip.csv" | |
print("\nDownloading blob from Azure Storage to file:\n\t" + file_name) | |
with open(file=file_name, mode="wb") as data: | |
download_stream = blob_client.download_blob() | |
data.write(download_stream.readall()) | |
except Exception as ex: | |
print('Exception:') | |
print(ex) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment