Skip to content

Instantly share code, notes, and snippets.

@julik
Created February 13, 2025 01:04
Show Gist options
  • Save julik/9938531f1bd2a21de4830203846d54b6 to your computer and use it in GitHub Desktop.
Save julik/9938531f1bd2a21de4830203846d54b6 to your computer and use it in GitHub Desktop.
AES encryption with random access to plaintext (playground)
require "openssl"
require "stringio"
class PassthruScheme
READ_OR_WRITE_CHUNK_SIZE = 1 << 16
def streaming_encrypt(from_plaintext_io, into_ciphertext_io)
while (chunk = from_plaintext_io.read(READ_OR_WRITE_CHUNK_SIZE))
into_ciphertext_io.write(chunk)
end
end
def streaming_decrypt(from_ciphertext_io, into_plaintext_io)
while (chunk = from_ciphertext_io.read(READ_OR_WRITE_CHUNK_SIZE))
into_plaintext_io.write(chunk)
end
end
def decrypt_range(from_ciphertext_io, range)
from_ciphertext_io.seek(from_ciphertext_io.pos + range.begin)
n_bytes = range.end - range.begin + 1
from_ciphertext_io.read(n_bytes)
end
end
class EncryptionSchemeCFB
READ_OR_WRITE_CHUNK_SIZE = 1 << 16 # 65KB of socket buffer size
def initialize(encryption_key)
@encryption_key = encryption_key
cipher = create_cipher
@iv = cipher.random_iv
@key = cipher.random_key
end
def streaming_encrypt(from_plaintext_io, into_destination_io)
cipher = create_cipher
cipher.encrypt
cipher.iv = @iv
cipher.key = @key
while (chunk = from_plaintext_io.read(READ_OR_WRITE_CHUNK_SIZE))
into_destination_io.write(cipher.update(chunk))
end
into_destination_io.write(cipher.final)
end
def streaming_decrypt(from_ciphertext_io, into_plaintext_io)
cipher = create_cipher
cipher.decrypt
cipher.iv = @iv
cipher.key = @key
while (chunk = from_ciphertext_io.read(READ_OR_WRITE_CHUNK_SIZE))
into_plaintext_io.write(cipher.update(chunk))
end
into_plaintext_io.write(cipher.final)
end
def decrypt_range(encrypted_io, range)
buf = StringIO.new.binmode
streaming_decrypt(encrypted_io, buf)
buf.string[range]
end
def create_cipher
OpenSSL::Cipher.new("aes-256-cfb")
end
end
class EncryptionSchemeCBC
READ_OR_WRITE_CHUNK_SIZE = 1 << 16
def initialize(encryption_key)
@encryption_key = encryption_key
cipher = OpenSSL::Cipher.new("aes-256-cbc")
cipher.encrypt
@iv = cipher.random_iv
@key = cipher.random_key
end
def streaming_decrypt(from_ciphertext_io, into_plaintext_io)
cipher = OpenSSL::Cipher.new("aes-256-cbc")
cipher.decrypt
cipher.iv = @iv
cipher.key = @key
while (chunk = from_ciphertext_io.read(READ_OR_WRITE_CHUNK_SIZE))
into_plaintext_io.write(cipher.update(chunk))
end
into_plaintext_io.write(cipher.final)
end
def streaming_encrypt(from_plaintext_io, into_destination_io)
cipher = OpenSSL::Cipher.new("aes-256-cbc")
cipher.encrypt
cipher.iv = @iv
cipher.key = @key
while (chunk = from_plaintext_io.read(READ_OR_WRITE_CHUNK_SIZE))
into_destination_io.write(cipher.update(chunk))
end
into_destination_io.write(cipher.final)
end
def decrypt_range(from_encrypted_io, range)
block_size = 16
n_bytes_to_read = range.end - range.begin + 1
n_blocks_to_skip, offset_into_first_block = range.begin.divmod(block_size)
n_bytes_to_read_between_block_boundaries = n_bytes_to_read + offset_into_first_block
n_blocks_to_read = (n_bytes_to_read_between_block_boundaries.to_f / block_size).ceil
# If the from_io also contains some kind of header, we assume
# that the pointer has been moved to where ciphertext begins - i.e.
# using IO#seek. We need that pointer position so that we can
# seek to block offsets correctly - otherwise we need a wrapper
# which recomputes offsets in the IO
ciphertext_starts_at = from_encrypted_io.pos
start_reading_at = ciphertext_starts_at + (n_blocks_to_skip * block_size)
cipher = OpenSSL::Cipher.new("aes-256-cbc")
cipher.decrypt
cipher.key = @key
# If the first block we will be reading is going to be block 0
# we can use our IV as-is
if n_blocks_to_skip.zero?
cipher.iv = @iv
else
# If we will be skipping blocks, we need the last skipped block
# as it gets used as the IV for the first block we will actually decrypt.
offset_of_preceding_block = (n_blocks_to_skip - 1) * block_size
from_encrypted_io.seek(ciphertext_starts_at + offset_of_preceding_block)
cipher.iv = from_encrypted_io.read(block_size)
end
from_encrypted_io.seek(ciphertext_starts_at + (block_size * n_blocks_to_skip))
buf = StringIO.new.binmode
# We must feed the cipher more than 1 block at a time, otherwise calling .final
# produces an invalid decrypt error. And the IO will be more efficient that way.
loop do
chunk = from_encrypted_io.read(READ_OR_WRITE_CHUNK_SIZE)
break if !chunk || chunk.empty?
buf.write(cipher.update(chunk))
end
#n_blocks_to_read.times do
# block = from_encrypted_io.read(block_size)
# buf.write(cipher.update(block))
#end
buf.write(cipher.final)
buf.seek(offset_into_first_block)
buf.read(n_bytes_to_read) # return just the amount of bytes requested
rescue OpenSSL::Cipher::CipherError
warn range.inspect
raise
end
end
class EncryptionSchemeCTR
READ_BUF_SIZE = 65 * 1024
BLOCK_SIZE = 16 # AES always uses 16 byte blocks, both AES-128 and 256
NONCE_LENGTH_BYTES = 4
IV_LENGTH_BYTES = 8
def initialize(encryption_key)
@encryption_key = encryption_key
cipher = create_cipher
@iv = cipher.random_iv
@key = cipher.random_key
end
def streaming_encrypt(from_plaintext_io, into_destination_io)
cipher = create_cipher
cipher.encrypt
cipher.iv = ctr_iv(@iv, _for_block_n = 0)
cipher.key = @key
while (chunk = from_plaintext_io.read(READ_BUF_SIZE))
into_destination_io.write(cipher.update(chunk))
end
into_destination_io.write(cipher.final)
end
def streaming_decrypt(from_ciphertext_io, into_destination_io)
cipher = create_cipher
cipher.encrypt
cipher.iv = ctr_iv(@iv, _for_block_n = 0)
cipher.key = @key
while (chunk = from_ciphertext_io.read(READ_BUF_SIZE))
into_destination_io.write(cipher.update(chunk))
end
into_destination_io.write(cipher.final)
end
def decrypt_range(from_encrypted_io, range)
n_blocks_to_skip, offset_into_first_block = range.begin.divmod(BLOCK_SIZE)
n_bytes_to_read = range.end - range.begin + 1
n_blocks_to_read = ((offset_into_first_block + n_bytes_to_read) / BLOCK_SIZE.to_f).ceil
# If the from_io also contains some kind of header, we assume
# that the pointer has been moved to where ciphertext begins - i.e.
# using IO#seek. We need that pointer position so that we can
# seek to block offsets correctly - otherwise we need a wrapper
# which recomputes offsets in the IO
ciphertext_starts_at = from_encrypted_io.pos
cipher = create_cipher
cipher.decrypt
cipher.key = @key
cipher.iv = ctr_iv(@iv, n_blocks_to_skip) # Set the IV for the first block we will be reading
from_encrypted_io.seek(ciphertext_starts_at + (n_blocks_to_skip * BLOCK_SIZE))
buf = StringIO.new.binmode
# Read in larger chunks since it is more efficient than reading by block -
# the cipher will maintain state
bytes_encrypted_to_read = n_blocks_to_read * BLOCK_SIZE
n_reads = (bytes_encrypted_to_read / READ_BUF_SIZE.to_f).ceil
n_reads.times do
block = from_encrypted_io.read(READ_BUF_SIZE)
buf.write(cipher.update(block))
end
buf.write(cipher.final)
buf.seek(offset_into_first_block) # Discard the bytes beyound the offset
buf.read(n_bytes_to_read) # return just the amount of bytes requested
end
def ctr_iv(iv_initial, for_block_n)
# The IV is the counter block
# see spec https://datatracker.ietf.org/doc/html/rfc3686#section-4
# It consists of:
# * a nonce (which should be the same across all blocks) - 4 bytes,
# * a chunk of the initial IV bytes - this is used as the actual IV - 8 bytes
# * and the counter, encoded as a big endian uint - 4 bytes
#
# So, while the OpenSSL Cipher reports iv_len to be 16 bytes, it is lying -
# even if it uses the IV to split it into a nonce + iv part, 4 bytes will be...zeroed?
# ignored? something else?
# Either way: for the nonce we can consume a part of our initial IV, for the block IV
# we can consume the rest of the initial IV, and the last 4 bytes will be the counter.
# The rest of the state will be maintained by the Cipher, luckily.
# nonce = iv_initial.byteslice(0, 4)
# iv_part = iv_initial.byteslice(3, 8)
#
# Also... the counter resets once we got more than 0xFFFFFFFF blocks?
# It seems in its infinite wisdom the library we are using (whichever) will do
# whatever the system integer overflow does?..
# https://stackoverflow.com/questions/66790768/aes256-ctr-mode-behavior-on-counter-overflow-rollover
# https://crypto.stackexchange.com/a/71210
# https://crypto.stackexchange.com/a/71196
# But for the counter to overflow we would need our input to be more than 68719476720 bytes.
# That is just short of 64 gigabytes (!). Maybe we need a backstop for that. Or maybe we don't.
ctr = for_block_n % 0xFFFFFFFF
iv_initial.byteslice(0, NONCE_LENGTH_BYTES + IV_LENGTH_BYTES) + [ctr].pack("N")
end
def create_cipher
OpenSSL::Cipher.new("aes-256-ctr")
end
end
class EncryptionSchemeGCM
READ_BUF_SIZE = 65 * 1024
TAG_SIZE = 128 / 8
BLOCK_SIZE = 16 + TAG_SIZE # AES always uses 16 byte blocks, both AES-128 and 256
def initialize(encryption_key)
@encryption_key = encryption_key
cipher = OpenSSL::Cipher.new("aes-256-gcm")
@iv = cipher.random_iv # Will be 12 bytes
@key = cipher.random_key
end
def streaming_encrypt(from_plaintext_io, into_destination_io)
cipher = OpenSSL::Cipher.new("aes-256-gcm")
cipher.encrypt
cipher.iv = @iv
cipher.key = @key
cipher.auth_data = "hello"
while (chunk = from_plaintext_io.read(READ_BUF_SIZE))
into_destination_io.write(cipher.update(chunk))
end
into_destination_io.write(cipher.final)
end
def streaming_decrypt(from_ciphertext_io, into_destination_io)
cipher = OpenSSL::Cipher.new("aes-256-gcm")
cipher.encrypt
cipher.iv = @iv
cipher.key = @key
while (chunk = from_ciphertext_io.read(READ_BUF_SIZE))
into_destination_io.write(cipher.update(chunk))
end
into_destination_io.write(cipher.final)
end
def decrypt_range(from_encrypted_io, range)
n_blocks_to_skip, offset_into_first_block = range.begin.divmod(BLOCK_SIZE)
n_bytes_to_read = range.end - range.begin + 1
n_blocks_to_read = ((offset_into_first_block + n_bytes_to_read) / BLOCK_SIZE.to_f).ceil
# If the from_io also contains some kind of header, we assume
# that the pointer has been moved to where ciphertext begins - i.e.
# using IO#seek. We need that pointer position so that we can
# seek to block offsets correctly - otherwise we need a wrapper
# which recomputes offsets in the IO
ciphertext_starts_at = from_encrypted_io.pos
# Use the CTR cipher mode so that we can use counters easily, see
# https://stackoverflow.com/questions/49228671/aes-gcm-decryption-bypassing-authentication-in-java/49244840#49244840
# What we are doing here is not very secure
# because we lose the authencation of the cipher (this does not verify the tag).
# So this is not a typo: we use GCM for encrypting the entire file and for decrypting the entire file, but to
# have access to random blocks we need to downgrade to CTR.
cipher = OpenSSL::Cipher.new("aes-256-ctr")
cipher.decrypt
cipher.iv = ctr_iv(@iv, n_blocks_to_skip) # Set the IV for the first block we will be reading
cipher.key = @key
from_encrypted_io.seek(ciphertext_starts_at + (n_blocks_to_skip * BLOCK_SIZE))
buf = StringIO.new.binmode
# Read in larger chunks since it is more efficient than reading by block -
# the cipher will maintain state
bytes_encrypted_to_read = n_blocks_to_read * BLOCK_SIZE
n_reads = (bytes_encrypted_to_read / READ_BUF_SIZE.to_f).ceil
n_reads.times do
block = from_encrypted_io.read(READ_BUF_SIZE)
buf.write(cipher.update(block))
end
buf.write(cipher.final) rescue nil
buf.seek(offset_into_first_block) # Discard the bytes beyound the offset
buf.read(n_bytes_to_read) # return just the amount of bytes requested
end
def ctr_iv(iv_initial, for_block_n)
# The counter gets incremented twice per block with GCM, see
# https://stackoverflow.com/a/49244840
ctr = (2 + (for_block_n * 2))
iv_initial + [ctr].pack("N")
end
end
def evaluate_scheme(enc)
warn enc.class
rng = Random.new(150)
amount_of_plain_bytes = rng.rand(0..(1024*1024*7))
plain_bytes = rng.bytes(amount_of_plain_bytes)
source_io = StringIO.new(plain_bytes)
enc_io = StringIO.new.binmode
enc_io.write("HDR") # emulate a header
enc.streaming_encrypt(source_io, enc_io)
enc_io.seek(3) # Move to the offset where ciphertext starts
decrypted_io = StringIO.new.binmode
enc.streaming_decrypt(enc_io, decrypted_io)
warn "---> Correct full readback - #{decrypted_io.string == source_io.string}"
ranges = [
0..0, # The first byte
0..64, # Bytes that overlap blocks
78..91, # A random located byte
(amount_of_plain_bytes - 1)..(amount_of_plain_bytes - 1), # The last byte
0..(amount_of_plain_bytes - 1), # The entire monty, but via ranges
]
ranges += 8.times.map do
r_begin = rng.rand(0..(amount_of_plain_bytes - 1))
n_bytes = rng.rand(1..1204)
r_begin..(r_begin + n_bytes)
end
ranges.each do |range|
enc_io.seek(3) # Emulate the header already did get read
expected = plain_bytes[range]
next unless expected
got = enc.decrypt_range(enc_io, range)
if !got
warn "---> Did not decrypt range #{range.inspect}"
elsif expected.bytesize != got.bytesize
warn "---> Incorrect range #{range.inspect} - expected #{expected.bytesize} bytes but decrypted #{got.bytesize}"
elsif expected != got
warn "---> Incorrect range #{range.inspect} - #{expected[0..16].inspect} but decrypted #{got[0..16].inspect}"
else
warn "---> Range #{range.inspect} OK"
end
end
end
random_key = Random.bytes(48)
schemes = [
PassthruScheme.new,
EncryptionSchemeCFB.new(random_key),
EncryptionSchemeCBC.new(random_key),
EncryptionSchemeCTR.new(random_key),
EncryptionSchemeGCM.new(random_key),
]
schemes.each do |enc|
evaluate_scheme(enc)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment