Skip to content

Instantly share code, notes, and snippets.

@Marklb
Created August 13, 2018 22:16
Show Gist options
  • Save Marklb/e09e4699024c2001968f4b2c00a0462c to your computer and use it in GitHub Desktop.
Save Marklb/e09e4699024c2001968f4b2c00a0462c to your computer and use it in GitHub Desktop.
#
# NOTE: This code is not intended for direct use. It is just providing some simplified functions
# that helped with a demo app, that is moving to better code.
#
###################################
# TEMPORARILY disabling
import urllib3
urllib3.disable_warnings()
###################################
import datetime
import sys
from sandbox_resources import SandboxNetworkyStuff
from nucypher.characters import Alice, Bob, Ursula
from nucypher.data_sources import DataSource
from nucypher.network.node import NetworkyStuff
from nucypher.crypto.powers import SigningPower, KeyPairBasedPower, DerivedKeyBasedPower, EncryptingPower, DelegatingPower
from nucypher.crypto.kits import UmbralMessageKit
from nucypher.keystore.keypairs import Keypair, SigningKeypair, EncryptingKeypair
from nucypher.crypto.api import keccak_digest
from nucypher.network.server import ProxyRESTServer
from nucypher.keystore.threading import ThreadedSession
from nucypher.keystore.db.models import Key, PolicyArrangement, Workorder
import maya
from umbral import pre, keys, config, fragments
from umbral.keys import UmbralPrivateKey, UmbralPublicKey, UmbralKeyingMaterial
import base64
class Api(object):
"""
Simplified api for using nucypher.
Some of the methods are not named correctly if thinking about nucypher,
instead they are simplified to terms that relate to the terms currently in
the AgriBlockchainApp.
NOTE: Only meant to assist with providing a rest api for AgriBlockchainApp
development.
"""
VERSION = '2.0'
def __init__(self):
# This is already running in another process.
self.ursula = Ursula.from_rest_url(NetworkyStuff(), address="localhost", port=3601)
self.network_middleware = SandboxNetworkyStuff([self.ursula])
def gen_keypair(self):
"""
Generate a keypair using Umbral
:return: private_key, public_key
"""
private_key = keys.UmbralPrivateKey.gen_key()
public_key = private_key.get_pubkey()
return private_key, public_key
def create_policy(self, label, alice_privkey, bob_pubkey,
policy_expiration, m, n):
"""
Create a Policy with Alice granting Bob access to `label` DataSource
:param label: A label to represent the policies data
:param alice_privkey: Alice's private key
:param bob_pubkey: Bob's public key
:param policy_expiration: Datetime of policy expiration duration
:param m: Minimum number of KFrags needed to rebuild ciphertext
:param n: Total number of rekey shares to generate
:return: The policy granted to Bob
"""
if not isinstance(alice_privkey, UmbralPrivateKey):
alice_privkey = UmbralPrivateKey.from_bytes(alice_privkey,
decoder=base64.urlsafe_b64decode)
if not isinstance(bob_pubkey, UmbralPublicKey):
bob_pubkey = UmbralPublicKey.from_bytes(bob_pubkey,
decoder=base64.urlsafe_b64decode)
# This is not how this should be implemented, but I am still figuring out about
# the keying material and why it is randomly generative when a character is
# initialized, instead of being derived from the keys like the other powers
# or explained how it should be stored.
d = DelegatingPower()
d.umbral_keying_material = UmbralKeyingMaterial.from_bytes(alice_privkey.to_bytes() + alice_privkey.get_pubkey().to_bytes())
# Initialize Alice
ALICE = Alice(crypto_power_ups=[
SigningPower(keypair=SigningKeypair(alice_privkey)),
EncryptingPower(keypair=EncryptingKeypair(alice_privkey)),
# DelegatingPower
d
], network_middleware=self.network_middleware)
# Alice gets on the network and, knowing about at least one Ursula,
# Is able to discover all Ursulas.
ALICE.network_bootstrap([("localhost", 3601)])
delag_pow = ALICE._crypto_power._power_ups[DelegatingPower].umbral_keying_material.keying_material
delag_pow_s = base64.urlsafe_b64encode(delag_pow).decode('utf8')
print(delag_pow_s)
# Initialize Bob
BOB = Bob(crypto_power_ups=[
SigningPower(pubkey=bob_pubkey),
EncryptingPower(pubkey=bob_pubkey)
])
# Alice grants a policy for Bob
policy = ALICE.grant(BOB, label, m=m, n=n, expiration=policy_expiration)
return policy
def revoke_policy(self, label, alice_privkey, bob_pubkey):
"""
TODO: Figure out the correct way to revoke instead of using a custom
implementation
Revoke a Policy that Alice granted Bob to access `label` DataSource
:param label: A label to represent the policies data
:param alice_privkey: Alice's private key
:param bob_pubkey: Bob's public key
"""
if not isinstance(alice_privkey, UmbralPrivateKey):
alice_privkey = UmbralPrivateKey.from_bytes(alice_privkey,
decoder=base64.urlsafe_b64decode)
if not isinstance(bob_pubkey, UmbralPublicKey):
bob_pubkey = UmbralPublicKey.from_bytes(bob_pubkey,
decoder=base64.urlsafe_b64decode)
alice_pubkey = alice_privkey.get_pubkey()
hrac = keccak_digest(bytes(alice_pubkey) + bytes(bob_pubkey) + label)
db_name = 'non-mining-proxy-node'
test_server = ProxyRESTServer(3601, db_name)
test_server.start_datastore(db_name)
with ThreadedSession(test_server.db_engine) as session:
test_server.datastore.del_policy_arrangement(
hrac=hrac.hex().encode(),
session=session
)
def encrypt_for_policy(self, policy_pubkey, plaintext):
"""
Encrypt data for a Policy
:param policy_pubkey: Policy public key
:param plaintext: Plaintext bytes to encrypt
:return: data_source, message_kit, _signature
"""
if not isinstance(policy_pubkey, UmbralPublicKey):
policy_pubkey = UmbralPublicKey.from_bytes(policy_pubkey,
decoder=base64.urlsafe_b64decode)
# First we make a DataSource for this policy
data_source = DataSource(policy_pubkey_enc=policy_pubkey)
# Generate a MessageKit for the policy
message_kit, _signature = data_source.encapsulate_single_message(plaintext)
return data_source, message_kit, _signature
def decrypt_for_policy(self, label, message_kit, alice_pubkey, bob_privkey,
policy_pubkey, data_source_pubkey):
"""
Decrypt data for a Policy
:param label: A label to represent the policies data
:param message_kit: UmbralMessageKit
:param alice_pubkey: Alice's public key
:param bob_privkey: Bob's private key
:param policy_pubkey: Policy's private key
:param data_source_pubkey: DataSource's private key
:return: The decrypted cleartext
"""
if not isinstance(alice_pubkey, UmbralPublicKey):
alice_pubkey = UmbralPublicKey.from_bytes(alice_pubkey,
decoder=base64.urlsafe_b64decode)
if not isinstance(bob_privkey, UmbralPrivateKey):
bob_privkey = UmbralPrivateKey.from_bytes(bob_privkey,
decoder=base64.urlsafe_b64decode)
if not isinstance(policy_pubkey, UmbralPublicKey):
policy_pubkey = UmbralPublicKey.from_bytes(policy_pubkey,
decoder=base64.urlsafe_b64decode)
if not isinstance(data_source_pubkey, UmbralPublicKey):
data_source_pubkey = UmbralPublicKey.from_bytes(data_source_pubkey,
decoder=base64.urlsafe_b64decode)
if not isinstance(message_kit, UmbralMessageKit):
message_kit = UmbralMessageKit.from_bytes(message_kit)
# Initialize Bob
BOB = Bob(crypto_power_ups=[
SigningPower(keypair=SigningKeypair(bob_privkey)),
EncryptingPower(keypair=EncryptingKeypair(bob_privkey))
])
# Bob joins the policy so that he can receive data shared on it
BOB.join_policy(label, # The label - he needs to know what data he's after.
bytes(alice_pubkey), # To verify the signature, he'll need Alice's public key.
verify_sig=True, # And yes, he usually wants to verify that signature.
# He can also bootstrap himself onto the network more quickly
# by providing a list of known nodes at this time.
node_list=[("localhost", 3601)]
)
# Bob needs to reconstruct the DataSource.
datasource_as_understood_by_bob = DataSource.from_public_keys(
policy_public_key=policy_pubkey,
datasource_public_key=bytes(data_source_pubkey),
label=label
)
# NOTE: Not sure if I am doing something wrong or if this is missing
# from the serialized bytes
message_kit.policy_pubkey = policy_pubkey
# Now Bob can retrieve the original message. He just needs the MessageKit
# and the DataSource which produced it.
cleartext = BOB.retrieve(message_kit=message_kit,
data_source=datasource_as_understood_by_bob,
alice_pubkey_sig=alice_pubkey)
return cleartext
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment