Last active
June 6, 2025 12:39
-
-
Save SeanPesce/a39cc714f887adefd413554669aa2471 to your computer and use it in GitHub Desktop.
Pure Python 3 FastLZ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Modified from original source: https://gitlab.com/Stefan65/lz77-variants_python | |
""" | |
Implementation of the FastLZ compression algorithm in pure Python. | |
FastLZ is a "small & portable byte-aligned LZ77 compression" algorithm. | |
This version is based upon the original implementation by Ariya Hidayat. | |
""" | |
from datetime import datetime | |
__original_author__ = "Ariya Hidayat" | |
__original_copyright__ = ( | |
"Copyright (C) 2005-2020 Ariya Hidayat <[email protected]>" | |
) | |
__original_license__ = "The MIT License" | |
__original_version__ = [ | |
"04417eea5ed8b1656ed75f91e89ca6c049b3538b", | |
datetime(year=2020, month=2, day=20, hour=5, minute=6), | |
] | |
__original_link__ = "https://github.com/ariya/FastLZ" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Modified from original source: https://gitlab.com/Stefan65/lz77-variants_python | |
# ===================================================================================== | |
# FastLZ - Byte-aligned LZ77 compression library | |
# Copyright (C) 2005-2020 Ariya Hidayat <[email protected]> | |
# Copyright (C) 2020 Stefan65 (Python port) | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
# THE SOFTWARE. | |
# ===================================================================================== | |
""" | |
Common utility methods for the FastLZ algorithm. | |
""" | |
from fastlz_lib import utils | |
from fastlz_lib.configuration import FastLzLevel, Configuration, Constants | |
""" | |
Implementation of the FastLZ compression algorithm, level 1. | |
""" | |
class CompressorLevel1: | |
""" | |
The implementation of the (string) compressor for level 1. | |
""" | |
configuration = None | |
""" | |
The configuration class to use. | |
:type: :class:`class` | |
""" | |
def __init__(self, configuration=Configuration): | |
""" | |
:param configuration: The configuration class to use. | |
:type configuration: class | |
""" | |
self.configuration = configuration | |
def compress(self, source): | |
""" | |
Compress the given source buffer using the FastLZ algorithm. | |
:param source: The source buffer to compress. | |
:type source: bytearray | |
:return: The compression result. | |
:rtype: bytearray | |
""" | |
# The source parameters: the current position inside the source buffer and the | |
# overall length of the source buffer. | |
source_position = 0 | |
source_length = len(source) | |
# Due to using unaligned loads of unsigned 32 bit integers for the hash | |
# hash calculation after determining the match length with our version of the | |
# `flz_cmp` function, we have to keep the last 4 bytes untouched in `flz_cmp`. | |
source_bound = source_length - 4 | |
# The position at which to call `flz_finalize`. With this our final literal | |
# instruction will have (up to) 12 bytes. (We use the `- 1` as we are comparing | |
# it using `>=`.) | |
source_limit = source_length - 12 - 1 | |
# The destination buffer and the current position inside it (which corresponds | |
# to the output length). | |
destination = bytearray() | |
destination_position = 0 | |
# Initialize the hash table using `0` for every possible entry. This is the same | |
# as in the original, except that we do not explicitly use a loop here. | |
hash_table = [0] * self.configuration.HASH_TABLE_SIZE | |
# Initialize the current anchor point, id est the start position of the current | |
# iteration. | |
iteration_start_position = source_position | |
# Move 2 bytes forward. | |
# The minimum match length is 3, so these 2 bytes and the first byte being | |
# ignored while hashing (see the first `current_sequence &= 0xFFFFFF` construct | |
# below) are being skipped at the beginning. With this comparisons will avoid | |
# detecting a match for the first 3 bytes with itself. | |
source_position += 2 | |
# Iterate until the end of the input has been reached. | |
while source_position < source_limit: | |
# Find a match. | |
while True: | |
# Read the first 4 bytes of the current input and only keep the 3 least | |
# significant ones. | |
current_sequence = read_unsigned_integer_32_bit( | |
source, source_position | |
) | |
current_sequence &= 0xFFFFFF | |
# Hash the current group of 3 bytes. | |
current_hash = calculate_hash_value( | |
current_sequence, self.configuration | |
) | |
# Get the position of the referenced entry by retrieving the | |
# corresponding index from the hash table. | |
referenced_position = hash_table[current_hash] | |
# Save the current position inside the hash table (this might replace | |
# an existing value). | |
hash_table[current_hash] = source_position | |
# Determine the current offset between the input position and the | |
# referenced position. | |
current_offset = source_position - referenced_position | |
# Retrieve the value at the referenced position. | |
if current_offset < self.configuration.MATCH_OFFSET_MAX_LEVEL1: | |
# The offset is small enough, so we will keep the 3 least | |
# significant bytes of the value to compare it. | |
comparison_value = read_unsigned_integer_32_bit( | |
source, referenced_position | |
) | |
comparison_value &= 0xFFFFFF | |
else: | |
# The offset is too large, so we will use (1 << 24) > (2^24 - 1), | |
# where (2^24 - 1) is the largest value being representable with 24 | |
# bits (= 3 bytes). | |
comparison_value = 0x1000000 | |
# Stop if the position limit has been reached. | |
if source_position >= source_limit: | |
break | |
# Move to the next input position. | |
source_position += 1 | |
# Stop if we have found a match with at least 3 bytes. | |
if current_sequence == comparison_value: | |
break | |
# Stop if the position limit has been reached. | |
if source_position >= source_limit: | |
break | |
# Decrease the current position again as we have increased it in the last | |
# step of the loop above, but there has been a match 1 byte before. | |
source_position -= 1 | |
# Check whether the current position is after the start position of the | |
# current iteration. | |
# This is true for every match, which should hold in most of the cases. | |
# If this is the case, emit the corresponding literal instructions. | |
if source_position > iteration_start_position: | |
# We still have to determine the literal length here. | |
output_length = emit_literal_instructions( | |
source, | |
destination, | |
iteration_start_position, | |
source_position - iteration_start_position, | |
self.configuration, | |
) | |
destination_position += output_length | |
# Determine the match length. | |
# We increase the positions by 3 as we have already checked the first 3 | |
# bytes in the loop above. | |
# This implies the minimal match length of 3 as well. | |
match_length = compare_buffer_content_until_mismatch( | |
source, referenced_position + 3, source_position + 3, source_bound | |
) | |
# Emit the corresponding match instructions. | |
output_length = self._emit_match_instructions( | |
destination, match_length, current_offset | |
) | |
destination_position += output_length | |
# Move the input position to the end of match minus 2 bytes. | |
source_position += match_length | |
# Get the next 4 bytes. | |
current_sequence = read_unsigned_integer_32_bit( | |
source, source_position | |
) | |
# Save the current position for the hash of the 3 least significant bytes. | |
current_hash = calculate_hash_value( | |
current_sequence & 0xFFFFFF, self.configuration | |
) | |
hash_table[current_hash] = source_position | |
source_position += 1 | |
# Save the current position for the hash of the 3 most significant bytes. | |
current_sequence >>= 8 | |
current_hash = calculate_hash_value( | |
current_sequence, self.configuration | |
) | |
hash_table[current_hash] = source_position | |
source_position += 1 | |
# Save the new start value for the next iteration. | |
iteration_start_position = source_position | |
# Emit the final literal instruction. | |
literal_length = source_length - iteration_start_position | |
output_length = emit_literal_instructions( | |
source, | |
destination, | |
iteration_start_position, | |
literal_length, | |
self.configuration, | |
) | |
destination_position += output_length | |
# Return the destination buffer. | |
return destination | |
def _emit_match_instructions(self, destination, match_length, match_offset): | |
""" | |
Emit the given match instruction. If the match length exceeds | |
`MATCH_LENGTH_MAX`, multiple match instructions will be emitted. | |
:param destination: The destination buffer to write the instructions to. | |
:type destination: bytearray | |
:param match_length: The match length to emit. | |
:type match_length: int | |
:param match_offset: The match offset to emit. | |
:type match_offset: int | |
:return: The number of bytes written to the destination buffer. | |
:rtype: int | |
""" | |
# We have written nothing until now. | |
bytes_written = 0 | |
# Reduce the offset by 1 as an offset of `0` does not make sense as this would | |
# be the current byte itself. | |
match_offset -= 1 | |
# Save the value for easier access. | |
match_length_max = self.configuration.MATCH_LENGTH_MAX | |
# Handle instruction of maximum match length. | |
if match_length > (match_length_max - 2): | |
# The match length to encode exceeds the maximum match length of an | |
# instruction. | |
# So emit match instructions and reduce it until the match length fits 1 | |
# additional instruction. | |
# We use the `- 2` as a match length of 3 bytes gets the value 1 and this | |
# is the difference. | |
while match_length > (match_length_max - 2): | |
# This will always be a long match instruction. | |
# Write opcode[0]. | |
# (7 << 5) = 11100000_2 is the indicator for a long match. | |
# `distance >> 8` fills the 5 least significant bits, corresponding to | |
# the 5 most significant bits of the offset. | |
to_write = (7 << 5) + (match_offset >> 8) | |
destination.append(to_write) | |
bytes_written += 1 | |
# Write opcode[1]. | |
# The match length is 264 - 2 - 7 - 2 = 253. | |
# For the final `- 2`, see the explanation above as well. | |
to_write = match_length_max - 2 - 7 - 2 | |
destination.append(to_write) | |
bytes_written += 1 | |
# Write opcode[2]. | |
# This is the least significant byte of the offset. | |
destination.append(match_offset & 255) | |
bytes_written += 1 | |
# Reduce the match length by the one just emitted. | |
match_length -= match_length_max - 2 | |
# Handle the last match instruction. | |
if match_length < 7: | |
# This is a short match instruction. | |
# Write opcode[0]. | |
# The 3 most significant bits will be the match length (range 3 to 8; value | |
# 1 = 3-byte-match, value 2 = 4-byte-match ...) | |
# The 5 least significant bits will be the 5 most significant bits of the | |
# offset. | |
to_write = (match_length << 5) + (match_offset >> 8) | |
destination.append(to_write) | |
bytes_written += 1 | |
# Write opcode[1]. | |
# This is the least significant byte of the offset. | |
destination.append(match_offset & 255) | |
bytes_written += 1 | |
else: | |
# This is a long match instruction. | |
# Write opcode[0]. | |
# (7 << 5) = 11100000_2 is the indicator for a long match. | |
# `distance >> 8` fills the 5 least significant bits, corresponding to | |
# the 5 most significant bits of the offset. | |
to_write = (7 << 5) + (match_offset >> 8) | |
destination.append(to_write) | |
bytes_written += 1 | |
# Write opcode[1]. | |
# This is the match length. | |
# We subtract `7` - corresponding to the long match indicator in the 3 | |
# most significant bits of opcode[0]. | |
destination.append(match_length - 7) | |
bytes_written += 1 | |
# Write opcode[2]. | |
# This is the least significant byte of the offset. | |
destination.append(match_offset & 255) | |
bytes_written += 1 | |
# Return the number of bytes written. | |
return bytes_written | |
""" | |
Implementation of the FastLZ decompression algorithm, level 1. | |
""" | |
class DecompressorLevel1: | |
""" | |
The implementation of the (string) decompressor for level 1. | |
""" | |
configuration = None | |
""" | |
The configuration class to use. | |
:type: :class:`class` | |
""" | |
def __init__(self, configuration=Configuration): | |
""" | |
:param configuration: The configuration class to use. | |
:type configuration: class | |
""" | |
self.configuration = configuration | |
def decompress(self, source): | |
""" | |
Decompress the given source buffer using the FastLZ algorithm. | |
:param source: The source buffer to decompress. | |
:type source: bytearray | |
:return: The decompression result. | |
:rtype: bytearray | |
""" | |
# The source parameters: the current position inside the source buffer and the | |
# overall length of the source buffer. | |
source_position = 0 | |
source_length = len(source) | |
# The destination buffer and the current position inside it (which corresponds | |
# to the output length). | |
destination = bytearray() | |
destination_position = 0 | |
# Get the first instruction code. | |
# We only need the last 5 bits of the current byte to decide which type of | |
# instruction we have. | |
# As in the original implementation, we are not checking if the first | |
# instruction really is a literal instruction. To detect this, we would have to | |
# save the full byte to check if the 3 most significant bits actually are `000`. | |
instruction_code = utils.get_last_bits(source[source_position], bits=5) | |
source_position += 1 | |
# Iterate until the end of the input has been reached. | |
while source_position < source_length: | |
# Handle the different instruction types. | |
if instruction_code >= 32: | |
# This is a match instruction. | |
# This cannot be reached in the first iteration, as there are only 5 | |
# bits (with a maximum value of 11111_2 = 31) - the first instruction | |
# should be a literal instruction anyway, as we have nothing to copy for | |
# the match. | |
# Starting with the second iteration, `instruction_code` will always be | |
# one byte not starting with `000` (the identifier for a literal | |
# instruction) in the match instruction case, so we will always have at | |
# least 00100000_2 = 32. | |
# Retrieve the length value from opcode[0], using the 3 most | |
# significant bits of it. | |
# We can subtract one, as the smallest length value for a 3-byte-match | |
# uses the value 1. | |
match_length = (instruction_code >> 5) - 1 | |
# Retrieve the 5 most significant bits of the offset from opcode[0]. | |
# Afterwards we shift the current offset value to get an empty byte at | |
# the end for the least significant byte of the offset. | |
match_offset_part1 = utils.get_last_bits(instruction_code, bits=5) << 8 | |
# We can already calculate the position of the copy operation here. | |
# This is possible as we are already working with the shifted offset | |
# value here. | |
# The `-1` basically means `offset + 1`, as an offset of 0 does not | |
# make much sense. We could write | |
# destination_position - (match_offset_part1 + 1) | |
# as well to make it more obvious, but without the brackets it may | |
# actually be faster. | |
referenced_index = destination_position - match_offset_part1 - 1 | |
# Handle long match instructions. | |
if match_length == (7 - 1): | |
# This is a long match instruction. | |
# A long match is indicated by 111_2 = 7, but we are using | |
# `match_length - 1` for the comparison here (see the retrieval of | |
# the match length from the input above), so we have to compare with | |
# 7 - 1 = 6. | |
# Make sure that we can read another byte. | |
if source_position >= source_length: | |
raise common.BadData("End of input reached too early.") | |
# The length is set in opcode[1], which we retrieve here. | |
# Add this to 6, as this is the difference between the minimum match | |
# length of short and long match instructions. | |
match_length += source[source_position] | |
source_position += 1 | |
# The second (short match) or third (long match) byte holds the 8 least | |
# significant bits of the offset, so retrieve it, correct the match | |
# position and move the input forward. | |
referenced_index -= source[source_position] | |
source_position += 1 | |
# Increase the match length by 3, which is the minimum match length for | |
# short instructions. | |
match_length += 3 | |
# Abort if the referenced position is before the start of the | |
# destination buffer. | |
# Please note that this could cause errors if used with a dictionary | |
# shared across the different blocks, but as we are starting with an | |
# empty dictionary on every block compression run, this is no problem | |
# for us. | |
if referenced_index < 0: | |
raise common.BadData("Referenced index is too small.") | |
# Copy the specified amount of bytes. We have to use `memmove` as there | |
# might be overlaps. | |
memmove(destination, referenced_index, match_length) | |
destination_position += match_length | |
else: | |
# This is a literal instruction. | |
# This will always be the first instruction - for this reason | |
# `instruction_code = source[0] & 31` (or any of its equivalents) is | |
# allowed before the loop above, as the identifier `000` at the | |
# beginning of the byte can be ignored (000X_2 = X_2). | |
# Add 1 to the length value, as a value of 0 indicates a literal length | |
# of 1 (a literal length of 0 does not make any sense at all). | |
literal_length = instruction_code + 1 | |
# Make sure that we have enough bytes left in the input. | |
if source_position + literal_length > source_length: | |
raise common.BadData( | |
"Not enough values available to copy with literal instruction." | |
) | |
# Copy the specified amount of bytes. We can use `memcpy` as there are | |
# no overlaps. | |
memcpy(source, destination, source_position, literal_length) | |
# Move the positions forward. | |
source_position += literal_length | |
destination_position += literal_length | |
# Stop if the end of the input has been reached to be able to read the next | |
# byte. | |
if source_position >= source_length: | |
break | |
# Get the next instruction code. | |
# Unlike in the first iteration (the value being assigned before the loop), | |
# we now need the full byte as copy operations make sense now. | |
instruction_code = source[source_position] | |
source_position += 1 | |
# Return the destination buffer. | |
return destination | |
""" | |
Implementation of the FastLZ compression algorithm, level 2. | |
""" | |
class CompressorLevel2: | |
""" | |
The implementation of the (string) compressor for level 2. | |
""" | |
configuration = None | |
""" | |
The configuration class to use. | |
:type: :class:`class` | |
""" | |
def __init__(self, configuration=Configuration): | |
""" | |
:param configuration: The configuration class to use. | |
:type configuration: class | |
""" | |
self.configuration = configuration | |
def compress(self, source): | |
""" | |
Compress the given source buffer using the FastLZ algorithm. | |
:param source: The source buffer to compress. | |
:type source: bytearray | |
:return: The compression result. | |
:rtype: bytearray | |
""" | |
# The source parameters: the current position inside the source buffer and the | |
# overall length of the source buffer. | |
source_position = 0 | |
source_length = len(source) | |
# Due to using unaligned loads of unsigned 32 bit integers for the hash | |
# hash calculation after determining the match length with our version of the | |
# `flz_cmp` function, we have to keep the last 4 bytes untouched in `flz_cmp`. | |
source_bound = source_length - 4 | |
# The position at which to call `flz_finalize`. With this our final literal | |
# instruction will have (up to) 12 bytes. (We use the `- 1` as we are comparing | |
# it using `>=`.) | |
source_limit = source_length - 12 - 1 | |
# The destination buffer and the current position inside it (which corresponds | |
# to the output length). | |
destination = bytearray() | |
destination_position = 0 | |
# Initialize the hash table using `0` for every possible entry. This is the same | |
# as in the original, except that we do not explicitly use a loop here. | |
hash_table = [0] * self.configuration.HASH_TABLE_SIZE | |
# Initialize the current anchor point, id est the start position of the current | |
# iteration. | |
iteration_start_position = source_position | |
# Move 2 bytes forward. | |
# The minimum match length is 3, so these 2 bytes and the first byte being | |
# ignored while hashing (see the first `current_sequence &= 0xFFFFFF` construct | |
# below) are being skipped at the beginning. With this comparisons will avoid | |
# detecting a match for the first 3 bytes with itself. | |
source_position += 2 | |
# Iterate until the end of the input has been reached. | |
while source_position < source_limit: | |
# Find a match. | |
while True: | |
# Read the first 4 bytes of the current input and only keep the 3 least | |
# significant ones. | |
current_sequence = read_unsigned_integer_32_bit( | |
source, source_position | |
) | |
current_sequence &= 0xFFFFFF | |
# Hash the current group of 3 bytes. | |
current_hash = calculate_hash_value( | |
current_sequence, self.configuration | |
) | |
# Get the position of the referenced entry by retrieving the | |
# corresponding index from the hash table. | |
referenced_position = hash_table[current_hash] | |
# Save the current position inside the hash table (this might replace | |
# an existing value). | |
hash_table[current_hash] = source_position | |
# Determine the current offset between the input position and the | |
# referenced position. | |
current_offset = source_position - referenced_position | |
# Retrieve the value at the referenced position. | |
if current_offset < self.configuration.MATCH_OFFSET_MAX_LEVEL1: | |
# The offset is small enough, so we will keep the 3 least | |
# significant bytes of the value to compare it. | |
comparison_value = read_unsigned_integer_32_bit( | |
source, referenced_position | |
) | |
comparison_value &= 0xFFFFFF | |
else: | |
# The offset is too large, so we will use (1 << 24) > (2^24 - 1), | |
# where (2^24 - 1) is the largest value being representable with 24 | |
# bits (= 3 bytes). | |
comparison_value = 0x1000000 | |
# Stop if the position limit has been reached. | |
if source_position >= source_limit: | |
break | |
# Move to the next input position. | |
source_position += 1 | |
# Stop if we have found a match with at least 3 bytes. | |
if current_sequence == comparison_value: | |
break | |
# Stop if the position limit has been reached. | |
if source_position >= source_limit: | |
break | |
# Decrease the current position again as we have increased it in the last | |
# step of the loop above, but there has been a match 1 byte before. | |
source_position -= 1 | |
# Check if this needs at least a 5-byte-match. This is special to level 2. | |
if current_offset >= self.configuration.MATCH_OFFSET_MAX_LEVEL2: | |
# The offset is too large for regular (level 1) encoding. | |
if ( | |
source[referenced_position + 3] != source[source_position + 3] | |
or source[referenced_position + 4] != source[source_position + 4] | |
): | |
# Check for byte 4 and 5. | |
# If they match, move 1 position forward and check for a (longer) | |
# match again. | |
source_position += 1 | |
continue | |
# Check whether the current position is after the start position of the | |
# current iteration. | |
# This is true for every match, which should hold in most of the cases. | |
# If this is the case, emit the corresponding literal instructions. | |
if source_position > iteration_start_position: | |
# We still have to determine the literal length here. | |
output_length = emit_literal_instructions( | |
source, | |
destination, | |
iteration_start_position, | |
source_position - iteration_start_position, | |
self.configuration, | |
) | |
destination_position += output_length | |
# Determine the match length. | |
# We increase the positions by 3 as we have already checked the first 3 | |
# bytes in the loop above. | |
# This implies the minimal match length of 3 as well. | |
match_length = compare_buffer_content_until_mismatch( | |
source, referenced_position + 3, source_position + 3, source_bound | |
) | |
# Emit the corresponding match instructions. | |
output_length = self._emit_match_instruction( | |
destination, match_length, current_offset | |
) | |
destination_position += output_length | |
# Move the input position to the end of match minus 2 bytes. | |
source_position += match_length | |
# Get the next 4 bytes. | |
current_sequence = read_unsigned_integer_32_bit( | |
source, source_position | |
) | |
# Save the current position for the hash of the 3 least significant bytes. | |
current_hash = calculate_hash_value( | |
current_sequence & 0xFFFFFF, self.configuration | |
) | |
hash_table[current_hash] = source_position | |
source_position += 1 | |
# Save the current position for the hash of the 3 most significant bytes. | |
current_sequence >>= 8 | |
current_hash = calculate_hash_value( | |
current_sequence, self.configuration | |
) | |
hash_table[current_hash] = source_position | |
source_position += 1 | |
# Save the new start value for the next iteration. | |
iteration_start_position = source_position | |
# Emit the final literal instruction. | |
literal_length = source_length - iteration_start_position | |
output_length = emit_literal_instructions( | |
source, | |
destination, | |
iteration_start_position, | |
literal_length, | |
self.configuration, | |
) | |
destination_position += output_length | |
# Add the marker for level 2. | |
# This corresponds to 100000_2, so the 3 most significant bits of the first | |
# output byte will be `001` (as the first instruction is a literal instruction | |
# which is indicated by `000` by default). The 5 least significant bits of the | |
# first output byte will be the actual literal length. | |
destination[0] |= 1 << 5 | |
# Return the destination buffer. | |
return destination | |
def _emit_match_instruction(self, destination, match_length, match_offset): | |
""" | |
Emit the given match instruction. | |
:param destination: The destination buffer to write the instructions to. | |
:type destination: bytearray | |
:param match_length: The match length to emit. | |
:type match_length: int | |
:param match_offset: The match offset to emit. | |
:type match_offset: int | |
:return: The number of bytes written to the destination buffer. | |
:rtype: int | |
""" | |
# We have written nothing until now. | |
bytes_written = 0 | |
# Reduce the offset by 1 as an offset of `0` does not make sense as this would | |
# be the current byte itself. | |
match_offset -= 1 | |
# Save the value for easier access. | |
match_offset_max_level2 = self.configuration.MATCH_OFFSET_MAX_LEVEL2 | |
# Check if we need level 1 encoding is enough. | |
if match_offset < match_offset_max_level2: | |
# We can use the simple encoding similar to level 1, as the match offset is | |
# small enough. | |
if match_length < 7: | |
# This is a short match instruction. | |
# Write opcode[0]. | |
# The 3 most significant bits will be the match length (range 3 to 8; | |
# value 1 = 3-byte-match, value 2 = 4-byte-match ...) | |
# The 5 least significant bits will be the 5 most significant bits of | |
# the offset. | |
to_write = (match_length << 5) + (match_offset >> 8) | |
destination.append(to_write) | |
bytes_written += 1 | |
# Write opcode[1]. | |
# This is the least significant byte of the offset. | |
destination.append(match_offset & 255) | |
bytes_written += 1 | |
else: | |
# This is a long match instruction. | |
# Write opcode[0]. | |
# (7 << 5) = 11100000_2 is the indicator for a long match. | |
# `distance >> 8` fills the 5 least significant bits, corresponding to | |
# the 5 most significant bits of the offset. | |
to_write = (7 << 5) + (match_offset >> 8) | |
destination.append(to_write) | |
bytes_written += 1 | |
# Write opcode[1] (which can have multiple bytes). | |
# This is the match length. | |
# We subtract `7` - corresponding to the long match indicator in the 3 | |
# most significant bits of opcode[0]. | |
match_length -= 7 | |
# This uses a Gamma code. | |
while match_length >= 255: | |
# While the match length still cannot be written into 1 byte, write | |
# full bytes. | |
destination.append(255) | |
bytes_written += 1 | |
match_length -= 255 | |
# Write the remaining length value which fits into 1 byte. | |
destination.append(match_length) | |
bytes_written += 1 | |
# Write opcode[2]. | |
# This is the least significant byte of the offset. | |
destination.append(match_offset & 255) | |
bytes_written += 1 | |
else: | |
# We have to use level 2 encoding. | |
if match_length < 7: | |
# This is a short match instruction. | |
# Reduce the offset as we know that we are in this case. | |
match_offset -= match_offset_max_level2 | |
# Write opcode[0]. | |
# The 3 most significant bits will be the match length (range 3 to 8; | |
# value 1 = 3-byte-match, value 2 = 4-byte-match ...) | |
# The 5 least significant bits will be the 5 most significant bits of | |
# the offset - which is larger than 31, so we are just writing 11111_2 | |
# for it. | |
destination.append((match_length << 5) + 31) | |
bytes_written += 1 | |
# Write the remaining opcode[...]. | |
# With the value 31 from above and the following value 255 it holds | |
# (31 << 8) + 255 = 7936 + 255 = 8191 = `MAX_L2_DISTANCE`, | |
# so we indicate that we are in the "larger offset" case by writing the | |
# following byte (which is the 255 from the above formula). | |
destination.append(255) | |
bytes_written += 1 | |
# Write the most significant byte of the remaining offset. | |
destination.append(match_offset >> 8) | |
bytes_written += 1 | |
# Write the least significant byte of the remaining offset. | |
destination.append(match_offset & 255) | |
bytes_written += 1 | |
else: | |
# This is a long match instruction. | |
# Reduce the offset as we know that we are in this case. | |
match_offset -= match_offset_max_level2 | |
# Write opcode[0]. | |
# (7 << 5) = 11100000_2 is the indicator for a long match. | |
# The 5 least significant bits will be the 5 most significant bits of | |
# the offset - which is larger than 31, so we are just writing 11111_2 | |
# for it. | |
destination.append((7 << 5) + 31) | |
bytes_written += 1 | |
# Write opcode[1] (which can have multiple bytes). | |
# This is the match length. | |
# We subtract `7` - corresponding to the long match indicator in the 3 | |
# most significant bits of opcode[0]. | |
match_length -= 7 | |
# This uses a Gamma code. | |
while match_length >= 255: | |
# While the match length still cannot be written into 1 byte, write | |
# full bytes. | |
destination.append(255) | |
bytes_written += 1 | |
match_length -= 255 | |
# Write the remaining length value which fits into 1 byte. | |
destination.append(match_length) | |
bytes_written += 1 | |
# Write the remaining opcode[...]. | |
# With the value 31 from above and the following value 255 it holds | |
# (31 << 8) + 255 = 7936 + 255 = 8191 = `MAX_L2_DISTANCE`, | |
# so we indicate that we are in the "larger offset" case by writing the | |
# following byte (which is the 255 from the above formula). | |
destination.append(255) | |
bytes_written += 1 | |
# Write the most significant byte of the remaining offset. | |
destination.append(match_offset >> 8) | |
bytes_written += 1 | |
# Write the least significant byte of the remaining offset. | |
destination.append(match_offset & 255) | |
bytes_written += 1 | |
# Return the number of bytes written. | |
return bytes_written | |
""" | |
Implementation of the FastLZ decompression algorithm, level 2. | |
""" | |
class DecompressorLevel2: | |
""" | |
The implementation of the (string) decompressor for level 2. | |
""" | |
configuration = None | |
""" | |
The configuration class to use. | |
:type: :class:`class` | |
""" | |
def __init__(self, configuration=Configuration): | |
""" | |
:param configuration: The configuration class to use. | |
:type configuration: class | |
""" | |
self.configuration = configuration | |
def decompress(self, source): | |
""" | |
Decompress the given source buffer using the FastLZ algorithm. | |
:param source: The source buffer to decompress. | |
:type source: bytearray | |
:return: The decompression result. | |
:rtype: bytearray | |
""" | |
# The source parameters: the current position inside the source buffer and the | |
# overall length of the source buffer. | |
source_position = 0 | |
source_length = len(source) | |
# The destination buffer and the current position inside it (which corresponds | |
# to the output length). | |
destination = bytearray() | |
destination_position = 0 | |
# Get the first instruction code. | |
# We only need the last 5 bits of the current byte to decide which type of | |
# instruction we have. | |
# Please note that the first 3 bits will be `001` here to indicate that this is | |
# level 2. | |
instruction_code = utils.get_last_bits(source[source_position], bits=5) | |
source_position += 1 | |
# Iterate until the end of the input has been reached. | |
while source_position < source_length: | |
# Handle the different instruction types. | |
if instruction_code >= 32: | |
# This is a match instruction. | |
# This cannot be reached in the first iteration, as there are only 5 | |
# bits (with a maximum value of 11111_2 = 31) - the first instruction | |
# should be a literal instruction anyway, as we have nothing to copy for | |
# the match. | |
# Starting with the second iteration, `instruction_code` will always be | |
# one byte not starting with `000` (the identifier for a literal | |
# instruction) in the match instruction case, so we will always have at | |
# least 00100000_2 = 32. | |
# Retrieve the length value from opcode[0], using the 3 most | |
# significant bits of it. | |
# We can subtract one, as the smallest length value for a 3-byte-match | |
# uses the value 1. | |
match_length = (instruction_code >> 5) - 1 | |
# Retrieve the 5 most significant bits of the offset from opcode[0]. | |
# Afterwards we shift the current offset value to get an empty byte at | |
# the end for the least significant byte of the offset. | |
match_offset_part1 = utils.get_last_bits(instruction_code, bits=5) << 8 | |
# We can already calculate the position of the copy operation here. | |
# This is possible as we are already working with the shifted offset | |
# value here. | |
# The `-1` basically means `offset + 1`, as an offset of 0 does not | |
# make much sense. We could write | |
# destination_position - (match_offset_part1 + 1) | |
# as well to make it more obvious, but without the brackets it may | |
# actually be faster. | |
referenced_index = destination_position - match_offset_part1 - 1 | |
# Handle long match instructions. | |
if match_length == (7 - 1): | |
# This is a long match instruction. | |
# A long match is indicated by 111_2 = 7, but we are using | |
# `match_length - 1` for the comparison here (see the retrieval of | |
# the match length from the input above), so we have to compare with | |
# 7 - 1 = 6. | |
while True: | |
# This is a very long match which only occurs for level 2. | |
# The length value uses a Gamma code, as the actual byte number | |
# is unknown, see the `Wikipedia article on Elias gamma coding | |
# <https://en.wikipedia.org/wiki/Elias_gamma_coding>`_ for some | |
# information on this type of technique. | |
# Make sure that we can read another byte. | |
if source_position >= source_length: | |
raise common.BadData("End of input reached too early.") | |
# Retrieve the current byte. | |
additional_length = source[source_position] | |
source_position += 1 | |
# Add the additional length value to the length. | |
# Add this to 6 initially, as this is the difference between the | |
# minimum match length of short and long match instructions. | |
match_length += additional_length | |
# There is no other byte with an additional length value to add. | |
if additional_length != 255: | |
break | |
# The next byte holds the 8 least significant bits of the offset, so | |
# retrieve it, correct the match position and move the input forward. | |
additional_offset = source[source_position] | |
source_position += 1 | |
referenced_index -= additional_offset | |
# Increase the match length by 3, which is the minimum match length for | |
# short instructions. | |
match_length += 3 | |
# Handle greater match distances of 16 bit, which only occur for level | |
# 2. | |
if additional_offset == 255: | |
if match_offset_part1 == (31 << 8): | |
# We have the largest offset value possible until now. | |
# With the additional offset being 255 and checking for the | |
# most significant bits leading to 31 << 8, we have | |
# (31 << 8) + 255 = 7936 + 255 = 8191 = `MAX_L2_DISTANCE`. | |
# For the second condition being true, the 5 least significant | |
# bits of opcode[0] have to be 11111_2 = 31. After shifting it | |
# to the left using `value << 8`, we get 31 << 8 - which is what | |
# we check for. | |
# Make sure that we can read another byte. | |
if source_position >= source_length: | |
raise common.BadData("End of input reached too early.") | |
# The additional offset value are the next 2 bytes of the input, | |
# which leads to the offset not being greater than 65535 = | |
# 2^16 - 1. | |
offset = source[source_position] << 8 | |
source_position += 1 | |
offset += source[source_position] | |
source_position += 1 | |
# Determine the new referenced index. | |
# We already know that the offset is at least `MAX_L2_DISTANCE` | |
# when we reach this code. The `offset` value cannot be greater | |
# than 65535, the `- 1` can be added as an offset of 0 does not | |
# make any sense. | |
referenced_index = ( | |
destination_position | |
- offset | |
- self.configuration.MATCH_OFFSET_MAX_LEVEL2 | |
- 1 | |
) | |
# Abort if the referenced position is before the start of the | |
# destination buffer. | |
# Please note that this could cause errors if used with a dictionary | |
# shared across the different blocks, but as we are starting with an | |
# empty dictionary on every block compression run, this is no problem | |
# for us. | |
if referenced_index < 0: | |
raise common.BadData("Referenced index is too small.") | |
# Copy the specified amount of bytes. We have to use `memmove` as there | |
# might be overlaps. | |
memmove(destination, referenced_index, match_length) | |
destination_position += match_length | |
else: | |
# This is a literal instruction. | |
# This will always be the first instruction - for this reason | |
# `instruction_code = source[0] & 31` (or any of its equivalents) is | |
# allowed before the loop above, as the identifier `000` at the | |
# beginning of the byte can be ignored (000X_2 = X_2). | |
# Add 1 to the length value, as a value of 0 indicates a literal length | |
# of 1 (a literal length of 0 does not make any sense at all). | |
literal_length = instruction_code + 1 | |
# Make sure that we have enough bytes left in the input. | |
if source_position + literal_length > source_length: | |
raise common.BadData( | |
"Not enough values available to copy with literal instruction." | |
) | |
# Copy the specified amount of bytes. We can use `memcpy` as there are | |
# no overlaps. | |
memcpy(source, destination, source_position, literal_length) | |
# Move the positions forward. | |
source_position += literal_length | |
destination_position += literal_length | |
# Stop if the end of the input has been reached to be able to read the next | |
# byte. | |
if source_position >= source_length: | |
break | |
# Get the next instruction code. | |
# Unlike in the first iteration (the value being assigned before the loop), | |
# we now need the full byte as copy operations make sense now. | |
instruction_code = source[source_position] | |
source_position += 1 | |
# Return the destination buffer. | |
return destination | |
class BadData(ValueError): | |
""" | |
Error indicating bad input data during compression. | |
""" | |
pass | |
def determine_level_from_buffer(buffer): | |
""" | |
Determine the FastLZ level from the given buffer. | |
:param buffer: The buffer to get the level from. | |
:type buffer: bytearray | |
:return: The level used in the buffer. | |
:rtype: int | |
:raises ValueError: The level is invalid. | |
""" | |
# Read the magic identifier for the compression level. | |
# This will read the first 3 bits and decide which level has been used. | |
# We have to increase this by one as level 2 uses identifier 1 and level 1 uses | |
# identifier 0. | |
level = (buffer[0] >> 5) + 1 | |
# Map the level. | |
# Fail if the level value is invalid. | |
if level == 1: | |
return FastLzLevel.LEVEL1 | |
elif level == 2: | |
return FastLzLevel.LEVEL2 | |
raise ValueError("Invalid level {}.".format(level)) | |
def call_decompressor_for_buffer_level(buffer): | |
""" | |
Call the decompressor for the FastLZ level determined from the given buffer. | |
:param buffer: The buffer to get the level from and to decompress. | |
:type buffer: bytearray | |
:return: The decompression result. | |
:rtype: bytearray | |
:raises ValueError: The level is invalid. | |
""" | |
level = determine_level_from_buffer(buffer) | |
if level == FastLzLevel.LEVEL1: | |
# Use level 1. | |
return DecompressorLevel1().decompress(buffer) | |
elif level == FastLzLevel.LEVEL2: | |
# Use level 2. | |
return DecompressorLevel2().decompress(buffer) | |
raise ValueError("Invalid level {} for decompression.".format(level)) | |
def update_adler32(checksum, buffer): | |
""" | |
Update the running Adler-32 checksum with the bytes of the buffer and return the | |
updated checksum. | |
The checksum should be initialized by 1 in the first run. | |
For more information on the checksum calculation, see | |
`RFC 1950 Section 8.2 <https://tools.ietf.org/html/rfc1950#section-8>`_ or the | |
corresponding `Wikipedia article <https://en.wikipedia.org/wiki/Adler-32>`_. | |
:param checksum: The old checksum value. | |
:type checksum: int | |
:param buffer: The buffer to update the checksum with. The complete buffer will be | |
consumed. | |
:type buffer: bytearray | |
:return: The updated checksum value. | |
:rtype: int | |
""" | |
# The current position inside the buffer. | |
buffer_position = 0 | |
# Split the checksum into component sums. | |
# `sum1` holds the last 4 bytes. | |
# For `sum2` we remove the last 4 bytes, then get the last 4 bytes again (this | |
# finally corresponds in `sum2` containing the first 4 bytes). | |
sum1 = checksum & 0xFFFF | |
sum2 = (checksum >> 16) & 0xFFFF | |
# Retrieve the buffer length. | |
length = len(buffer) | |
# Iterate until the input is exhausted. | |
while length > 0: | |
# The modulo operation on unsigned long accumulators can be delayed for 5552 | |
# bytes, so the modulo operation time is negligible. | |
# Check if we can delay it another time or not. | |
# 5552 is the largest `n` such that | |
# `255n(n+1)/2 + (n+1)(ADLER32_BASE-1) <= 2^32-1`. | |
k = min(length, 5552) | |
length -= k | |
# Add the values up for the current "block". | |
for _ in range(k): | |
# `sum1` is the sum of the input data bytes. | |
sum1 += buffer[buffer_position] | |
buffer_position += 1 | |
# `sum2` is the sum of the values from each step. | |
sum2 += sum1 | |
# Perform the modulo once per 5552 values. | |
sum1 = sum1 % Constants.ADLER32_BASE | |
sum2 = sum2 % Constants.ADLER32_BASE | |
# Concatenate the values. | |
return (sum2 << 16) + sum1 | |
def read_unsigned_integer_16_bit(buffer, start_position): | |
""" | |
Read an unsigned 16 bit integer value (= 2 bytes) from the buffer starting at the | |
given index. | |
:param buffer: The buffer to read from. | |
:type buffer: bytearray | |
:param start_position: The index to start at inside the buffer. | |
:type start_position: int | |
:return: The unsigned 16 bit integer value read from the buffer starting at the | |
given index. | |
""" | |
return buffer[start_position] + (buffer[start_position + 1] << 8) | |
def read_unsigned_integer_32_bit(buffer, start_position): | |
""" | |
Read an unsigned 32 bit integer value (= 4 bytes) from the buffer starting at the | |
given index. | |
:param buffer: The buffer to read from. | |
:type buffer: bytearray | |
:param start_position: The index to start at inside the buffer. | |
:type start_position: int | |
:return: The unsigned 32 bit integer value read from the buffer starting at the | |
given index. | |
""" | |
return ( | |
buffer[start_position] | |
+ (buffer[start_position + 1] << 8) | |
+ (buffer[start_position + 2] << 16) | |
+ (buffer[start_position + 3] << 24) | |
) | |
def memmove(buffer, start_position, byte_count): | |
""" | |
Basic re-implementation of the classical `memmove` method. This method is able to | |
handle buffer overlaps. | |
The copy operation will start at the given index, new values will be appended at the | |
end of the buffer. | |
:param buffer: The buffer to work on. | |
:type buffer: bytearray | |
:param start_position: The start index inside the buffer. | |
:type start_position: int | |
:param byte_count: The number of bytes to copy over. | |
:type byte_count: int | |
""" | |
# Use `memcpy` if possible to avoid the manual loop. | |
if start_position + byte_count < len(buffer): | |
memcpy(buffer, buffer, start_position, byte_count) | |
return | |
# There is an overlap, so we have to copy the data byte-wise. | |
for index in range(start_position, start_position + byte_count): | |
buffer.append(buffer[index]) | |
def memcpy(source, destination, source_start_position, byte_count): | |
""" | |
Basic re-implementation of the classical `memcpy` method. This method is not able to | |
handle buffer overlaps. | |
The copy operation will start at the given index inside the source buffer, new | |
values will be appended at the end of the destination buffer. | |
:param source: The source buffer to read from. | |
:type source: bytearray | |
:param destination: The destination buffer to write to. | |
:type destination: bytearray | |
:param source_start_position: The start index inside the source buffer. | |
:type source_start_position: int | |
:param byte_count: The number of bytes to copy over. | |
:type byte_count: int | |
:raises ValueError: The copy operation would result in an overlap. | |
""" | |
# Make sure that there is no overlap. | |
if source_start_position + byte_count > len(source): | |
raise ValueError("`memcpy` cannot handle overlaps. Please use `memmove`.") | |
# Use slicing for the copy operation. | |
destination += source[source_start_position : source_start_position + byte_count] | |
def calculate_hash_value(value, configuration): | |
""" | |
Hash the given 32 bit unsigned integer value into a 16 bit unsigned integer value. | |
:param value: The 32 bit unsigned integer value to hash. | |
:type value: int | |
:param configuration: The configuration to use. | |
:type configuration: fastlz_lib.configuration.Configuration | |
:return: The 16 bit unsigned integer value being the hash of the given value. | |
:rtype: int | |
""" | |
# The multiplication value seems to be | |
# (2 ** 32) * (math.sqrt(5) - 1) // 2 | |
# (calculation in Python), see `Doxygen source for OpenADFortTk, file | |
# Open64.osprey1.0.common.util.id_map.h | |
# <https://web.archive.org/web/20170624215708/https://www.mcs.anl.gov/OpenAD/OpenADFortTkExtendedDox/id__map_8h_source.html>`_ | |
# for example. | |
hash_value = (value * 2654435769) >> (32 - configuration.HASH_TABLE_LOGARITHM) | |
return hash_value & configuration.HASH_TABLE_MASK | |
def compare_buffer_content_until_mismatch(buffer, start1, start2, end2): | |
""" | |
Compare the content of the buffer starting at the both positions. Stop if a mismatch | |
occurs or the second position (which should be the greater one) has reached the | |
given end position. | |
This corresponds to the 32-bit variant of `flz_cmp` in the original implementation. | |
:param buffer: The buffer to work on. | |
:type buffer: bytearray | |
:param start1: The first start index (the smaller one). | |
:type start1: int | |
:param start2: The second start index (the larger one). | |
:type start2: int | |
:param end2: The position for the second (greater) index value. | |
:type end2: int | |
:return: The length of the match. This will always be at least 1. | |
:rtype: int | |
""" | |
# Initialize the length value. | |
length = 0 | |
# Compare byte-wise until the position limit has been reached. | |
while start2 < end2: | |
# Increase the length value before the comparison as the we are doing `*p++` | |
# in the condition before the `break`. | |
# We could move this length change after the comparison as well when | |
# initializing the length value with `1` above. | |
length += 1 | |
# Compare the current byte and stop if they do not match. | |
if buffer[start1] != buffer[start2]: | |
break | |
# Move to the next bytes. | |
start1 += 1 | |
start2 += 1 | |
# Return the length of the match. | |
return length | |
def emit_literal_instructions( | |
source, destination, source_start_position, length, configuration | |
): | |
""" | |
Emit the given number of literal bytes. Each group of `LITERAL_MAX` bytes is | |
preceded by the literal command itself (`000` followed by the length value using 5 | |
bytes), so this function will emit multiple literal instructions if required. | |
:param source: The source buffer to get the data from. | |
:type source: bytearray | |
:param destination: The destination buffer to write the data to. | |
:type destination: bytearray | |
:param source_start_position: The start index inside the source buffer. | |
:type source_start_position: int | |
:param length: The number of bytes to emit. | |
:type length: int | |
:param configuration: The configuration to use. | |
:type configuration: fastlz_lib.configuration.Configuration | |
:return: The number of bytes written to the destination buffer. | |
:rtype: int | |
""" | |
# We have written nothing until now. | |
bytes_written = 0 | |
# Save the value for easier access. | |
literal_max = configuration.LITERAL_MAX | |
# Initialize the current source position. | |
source_position = source_start_position | |
# Handle full groups. | |
while length >= literal_max: | |
# Set the current value inside the output buffer to 31 (`- 1` as a literal | |
# length of 0 does not make any sense). | |
destination.append(literal_max - 1) | |
bytes_written += 1 | |
# Copy the 32 characters of the current group. | |
destination += source[source_position : source_position + literal_max] | |
source_position += literal_max | |
bytes_written += literal_max | |
# We have written 32 characters. | |
length -= literal_max | |
# Handle the last 31 bytes. | |
if length > 0: | |
# Set the current value inside the output buffer to the remaining literal | |
# length (`- 1` as a literal length of 0 does not make any sense). | |
destination.append(length - 1) | |
bytes_written += 1 | |
# Copy the required number of characters. | |
destination += source[source_position : source_position + length] | |
bytes_written += length | |
# Return the number of bytes written. | |
return bytes_written | |
def detect_magic_bytes(buffer): | |
""" | |
Check if the buffer represents a 6pack archive. | |
:param buffer: The buffer to check. | |
:type buffer: bytearray | |
:return: :code:`True` if the buffer represents a 6pack archive, :code:`False` | |
otherwise. | |
:rtype: bool | |
""" | |
# Stop if the buffer is too short to hold the header. | |
if len(buffer) < len(Constants.SIXPACK_MAGIC_IDENTIFIER): | |
return False | |
# Perform a byte-wise check. | |
for index, value in enumerate(Constants.SIXPACK_MAGIC_IDENTIFIER): | |
if buffer[index] != value: | |
return False | |
# No problems could be detected with the magic identifier. | |
return True |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Modified from original source: https://gitlab.com/Stefan65/lz77-variants_python | |
# ===================================================================================== | |
# FastLZ - Byte-aligned LZ77 compression library | |
# Copyright (C) 2005-2020 Ariya Hidayat <[email protected]> | |
# Copyright (C) 2020 Stefan65 (Python port) | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
# THE SOFTWARE. | |
# ===================================================================================== | |
""" | |
Configuration values for the FastLZ algorithm. | |
""" | |
import enum | |
class Configuration: | |
""" | |
The configuration values for executing the algorithm, as they are defined inside | |
the original files. | |
""" | |
LITERAL_MAX = 32 | |
""" | |
The maximum literal length we can encode. | |
This corresponds to 2^5 = 32. | |
:type: :class:`int` | |
""" | |
MATCH_LENGTH_MAX = 264 | |
""" | |
The maximum match length we can encode. | |
This corresponds to 2^8 + 8 = 256 + 8. At least 9 bytes are needed for a long match | |
(below there are short matches). | |
:type: :class:`int` | |
""" | |
MATCH_OFFSET_MAX_LEVEL1 = 8192 | |
""" | |
The maximum match offset we can encode with level 1. | |
This corresponds to 2^13 = 8192. | |
:type: :class:`int` | |
""" | |
MATCH_OFFSET_MAX_LEVEL2 = 8191 | |
""" | |
The maximum match offset we can encode with level 2 in the regular case. | |
This corresponds to 2^13 - 1 = 8192 - 1. | |
:type: :class:`int` | |
""" | |
MATCH_OFFSET_MAX_LEVEL2_FAR = 65535 + MATCH_OFFSET_MAX_LEVEL2 - 1 | |
""" | |
The maximum match offset we can encode with level 2 in the extended case. | |
This corresponds to (2^16 - 1) + (2^13 - 1). Besides the known minimum value of | |
`MATCH_OFFSET_MAX_LEVEL2` in this case, we have 2 additional offset bytes, which | |
corresponds to 16 bits (therefore the 2^16 part). | |
:type: :class:`int` | |
""" | |
HASH_TABLE_LOGARITHM = 14 | |
""" | |
The hash table logarithm base to use. | |
With the default value of 14, the hash table will have 2^14 = 16384 entries. | |
:type: :class:`int` | |
""" | |
HASH_TABLE_SIZE = 1 << HASH_TABLE_LOGARITHM | |
""" | |
The size of the search buffer (hash table/dictionary). | |
:type: :class:`int` | |
""" | |
HASH_TABLE_MASK = HASH_TABLE_SIZE - 1 | |
""" | |
The hash table mask to limit the maximum value of the hash table index (using the | |
modulo operator). | |
:type: :class:`int` | |
""" | |
class FastLzLevel(enum.IntEnum): | |
""" | |
The different FastLZ levels available. | |
""" | |
AUTOMATIC = 0 | |
""" | |
Let the application decide which level to use depending on the input size. | |
This working mode is not recommended. | |
:type: :class:`int` | |
""" | |
LEVEL1 = 1 | |
""" | |
Level 1, which is recommended for shorter blocks. | |
:type: :class:`int` | |
""" | |
LEVEL2 = 2 | |
""" | |
Level 2, which is recommended for larger blocks. | |
:type: :class:`int` | |
""" | |
class Constants: | |
""" | |
Constants to use within the whole application. | |
""" | |
SIXPACK_MAGIC_IDENTIFIER = bytearray.fromhex("89 36 50 4B 0D 0A 1A 0A") | |
""" | |
The magic identifier for 6pack files. | |
This corresponds to the original magic numbers, while translating the decimal values | |
to hexadecimal ones for constructing the byte array. These values have been | |
converted using the following code: | |
.. code-block:: python | |
magic = [137, ord("6"), ord("P"), ord("K"), 13, 10, 26, 10] | |
magic_hex = [hex(x) for x in magic] | |
:type: :class:`bytearray` | |
""" | |
BLOCK_SIZE_DECOMPRESSION = 65536 | |
""" | |
The block size to use during decompression. | |
This corresponds to 2^16, which means that each block can be indexed using a 16 bit | |
unsigned integer variable. It is actually half as large as the block size during the | |
compression. | |
:type: :class:`int` | |
""" | |
BLOCK_SIZE_COMPRESSION = 2 * 64 * 1024 | |
""" | |
The block size to use during compression. | |
This corresponds to 2 * 64 * 1024 = 2^1 * 2^6 * 2^10 = 2^17 = 131072, which means | |
that each block can be indexed using a 17 bit unsigned integer variable. It is | |
actually twice as large as the block size during the decompression. | |
:type: :class:`int` | |
""" | |
ADLER32_BASE = 65521 | |
""" | |
The Adler-32 base to use. | |
This is the largest prime number smaller than 2^16 = 65536. | |
:type: :class:`int` | |
""" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Modified from original source: https://gitlab.com/Stefan65/lz77-variants_python | |
# ===================================================================================== | |
# FastLZ - Byte-aligned LZ77 compression library | |
# Copyright (C) 2005-2020 Ariya Hidayat <[email protected]> | |
# Copyright (C) 2020 Stefan65 (Python port) | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
# THE SOFTWARE. | |
# ===================================================================================== | |
""" | |
Implementation of the file-based FastLZ compression algorithm (6pack). | |
""" | |
import os | |
from fastlz_lib import common | |
from fastlz_lib.configuration import Configuration, FastLzLevel, Constants | |
class FileCompressor: | |
""" | |
The implementation of the (file) compressor (6pack). | |
""" | |
configuration = None | |
""" | |
The configuration class to use. | |
:type: :class:`class` | |
""" | |
def __init__(self, configuration=Configuration): | |
""" | |
:param configuration: The configuration class to use. | |
:type configuration: class | |
""" | |
self.configuration = configuration | |
def compress(self, source, filename, level=FastLzLevel.LEVEL1): | |
""" | |
Compress the given source buffer using the FastLZ algorithm. | |
:param source: The source buffer to compress. | |
:type source: bytearray | |
:param filename: The path to the input file. | |
:type filename: str | |
:param level: The FastLZ level to use. This cannot be `AUTOMATIC`. | |
:type level: `int` | |
:return: The compression result. | |
:rtype: bytearray | |
:raises ValueError: The automatic compression level is selected. | |
:raises common.BadData: This file already is a 6pack archive. | |
""" | |
# Forbid the automatic level. | |
if level == FastLzLevel.AUTOMATIC: | |
raise ValueError("Automatic level not supported for 6pack.") | |
# Check if this already is a 6pack archive. | |
if common.detect_magic_bytes(source): | |
raise common.BadData("File already is a 6pack archive.") | |
# The source parameters: the current position inside the source buffer and the | |
# overall length of the source buffer. | |
source_position = 0 | |
source_length = len(source) | |
# The destination buffer and the current position inside it (which corresponds | |
# to the output length). | |
destination = bytearray() | |
destination_position = 0 | |
# Write the magic identifier. | |
destination += Constants.SIXPACK_MAGIC_IDENTIFIER | |
destination_position += len(Constants.SIXPACK_MAGIC_IDENTIFIER) | |
# Determine the actual filename. | |
shown_name = bytearray(os.path.basename(filename), encoding="utf8") | |
# Write the file size to the buffer and add the length of the filename using 2 | |
# bytes. | |
buffer = self._get_file_size_buffer(source) | |
buffer.append((len(shown_name) + 1) & 255) | |
buffer.append((len(shown_name) + 1) >> 8) | |
# Calculate the initial checksum. | |
# Initialize with 1 as per Adler-32 specification. | |
checksum = 1 | |
# Update the checksum for the buffer with the file size and filename length. | |
checksum = common.update_adler32(checksum, buffer) | |
# Update the checksum using the filename with a null-byte added at the end. | |
# This is not directly obvious, but can be derived from passing | |
# strlen(shown_name) + 1 | |
# to the checksum calculation in the original implementation. | |
checksum = common.update_adler32(checksum, shown_name + bytearray(1)) | |
# Write the header of the first chunk to the output, which takes 16 bytes. | |
header = ChunkHeader() | |
header.chunk_id = 1 | |
header.chunk_options = 0 | |
header.chunk_size = 10 + len(shown_name) + 1 | |
header.chunk_checksum = checksum | |
header.chunk_extra = 0 | |
self._write_chunk_header(destination, header) | |
destination_position += 16 | |
# Write the original file size and the length of the original filename to the | |
# output, which takes 10 bytes (8 bytes for the file size, 2 bytes for the | |
# filename length). | |
destination += buffer | |
destination_position += 10 | |
# Write the original filename to the output, including the trailing null-byte | |
# (probably as string terminator). This is done to keep the application as | |
# similar as possible to the original implementation. | |
destination += shown_name + bytearray(1) | |
destination_position += len(shown_name) + 1 | |
# Iterate until the end of the input has been reached. | |
while source_position < source_length: | |
# The compression method to use. The default value is set by `pack_file` | |
# in the original implementation and means "compress if possible and | |
# useful". | |
compression_method = 1 | |
# Retrieve the current block from the input. | |
buffer = source[ | |
source_position : source_position + Constants.BLOCK_SIZE_COMPRESSION | |
] | |
bytes_read = len(buffer) | |
source_position += bytes_read | |
# If the input is too small, disable compression as it does not make any | |
# sense. | |
if bytes_read < 32: | |
compression_method = 0 | |
# Write to the output. | |
if compression_method == 1: | |
# Compress using FastLZ. | |
# Perform the compression itself using the desired level. We cannot | |
# have the `AUTOMATIC` level here as we already sort this out at the | |
# beginning of this function. | |
if level == FastLzLevel.LEVEL1: | |
result = common.CompressorLevel1(self.configuration).compress(buffer) | |
else: | |
result = common.CompressorLevel2(self.configuration).compress(buffer) | |
chunk_size = len(result) | |
# Update the checksum. | |
checksum = common.update_adler32(1, result) | |
# Write the header for the current chunk, which takes 16 bytes. | |
header = ChunkHeader() | |
header.chunk_id = 17 | |
header.chunk_options = 1 | |
header.chunk_size = chunk_size | |
header.chunk_checksum = checksum | |
header.chunk_extra = bytes_read | |
self._write_chunk_header(destination, header) | |
destination_position += 16 | |
# Add the compression result itself to the output. | |
destination += result | |
destination_position += chunk_size | |
else: | |
# Uncompressed data (and fallback method). | |
# Determine the checksum for the current buffer content. | |
checksum = common.update_adler32(1, buffer) | |
# Write the header for the current chunk, which takes 16 bytes. | |
header = ChunkHeader() | |
header.chunk_id = 17 | |
header.chunk_options = 0 | |
header.chunk_size = bytes_read | |
header.chunk_checksum = checksum | |
header.chunk_extra = bytes_read | |
self._write_chunk_header(destination, header) | |
destination_position += 16 | |
# Add the plain bytes itself to the output. | |
destination += buffer | |
destination_position += bytes_read | |
# Return the destination buffer. | |
return destination | |
@staticmethod | |
def _get_file_size_buffer(source): | |
""" | |
Write the file size to a buffer of size 8, while only the first 4 bytes are | |
actually used for the file size. | |
:param source: The source buffer to determine the write the size. | |
:type source: bytearray | |
:return: A new buffer of size 8 with the first 4 bytes holding the input size | |
and the last 4 bytes being 0. | |
:rtype: bytearray | |
""" | |
# Create the buffer. | |
buffer = bytearray(8) | |
# Determine the input buffer size. | |
size = len(source) | |
# Write the input size to the buffer. | |
buffer[0] = size & 255 | |
buffer[1] = (size >> 8) & 255 | |
buffer[2] = (size >> 16) & 255 | |
buffer[3] = (size >> 24) & 255 | |
# Return the newly created buffer. | |
return buffer | |
@staticmethod | |
def _write_chunk_header(buffer, header): | |
""" | |
Write the header for the current chunk. | |
:param buffer: The buffer to write to. | |
:type buffer: bytearray | |
:param header: The header to write. | |
:type header: ChunkHeader | |
""" | |
# Write the chunk ID, taking the first 2 bytes. | |
buffer.append(header.chunk_id & 255) | |
buffer.append(header.chunk_id >> 8) | |
# Write the chunk options, taking the next 2 bytes. | |
buffer.append(header.chunk_options & 255) | |
buffer.append(header.chunk_options >> 8) | |
# Write the size, taking the next 4 bytes. | |
buffer.append(header.chunk_size & 255) | |
buffer.append((header.chunk_size >> 8) & 255) | |
buffer.append((header.chunk_size >> 16) & 255) | |
buffer.append((header.chunk_size >> 24) & 255) | |
# Write the checksum, taking the next 4 bytes. | |
buffer.append(header.chunk_checksum & 255) | |
buffer.append((header.chunk_checksum >> 8) & 255) | |
buffer.append((header.chunk_checksum >> 16) & 255) | |
buffer.append((header.chunk_checksum >> 24) & 255) | |
# Write the extra data, taking the next 4 bytes. | |
buffer.append(header.chunk_extra & 255) | |
buffer.append((header.chunk_extra >> 8) & 255) | |
buffer.append((header.chunk_extra >> 16) & 255) | |
buffer.append((header.chunk_extra >> 24) & 255) | |
class FileDecompressor: | |
""" | |
The implementation of the (file) decompressor (6unpack). | |
""" | |
configuration = None | |
""" | |
The configuration class to use. | |
:type: :class:`class` | |
""" | |
def __init__(self, configuration=Configuration): | |
""" | |
:param configuration: The configuration class to use. | |
:type configuration: class | |
""" | |
self.configuration = configuration | |
def decompress(self, source): | |
""" | |
Decompress the given source buffer using the FastLZ algorithm. | |
:param source: The source buffer to decompress. | |
:type source: bytearray | |
:return: The decompression result. | |
:rtype: bytearray | |
:raises ValueError: The level of one of the blocks is invalid. | |
:raises common.BadData: Something is wrong with the archive. | |
""" | |
# Check if this is a 6pack archive. | |
if not common.detect_magic_bytes(source): | |
raise common.BadData("File is not a 6pack archive.") | |
# The source parameters: the current position inside the source buffer and the | |
# overall length of the source buffer. | |
# We already skip the magic identifier bytes with the position. | |
source_position = len(Constants.SIXPACK_MAGIC_IDENTIFIER) | |
source_length = len(source) | |
# The destination buffer and the current position inside it (which corresponds | |
# to the output length). | |
destination = bytearray() | |
destination_position = 0 | |
# The original (uncompressed) file size. | |
decompressed_size = 0 | |
# Iterate until the end of the input has been reached. | |
while source_position < source_length: | |
# Retrieve the chunk header, which occupies 16 bytes. | |
header = self._read_chunk_header(source, source_position) | |
source_position += 16 | |
# Handle the different chunk types. | |
if ( | |
header.chunk_id == 1 | |
and 10 < header.chunk_size < Constants.BLOCK_SIZE_DECOMPRESSION | |
): | |
# This is the first chunk in the file. | |
# It cannot be smaller than 11 bytes, as it contains the original file | |
# size (using 8 bytes), the number of bytes for the original filename | |
# (using 2 bytes) and at least 1 byte for the original filename itself. | |
# Clear the output buffer. | |
destination = bytearray() | |
destination_position = 0 | |
# Retrieve the current chunk from the source buffer. | |
buffer = source[source_position : source_position + header.chunk_size] | |
source_position += len(buffer) | |
# Calculate the initial checksum value. | |
# The `1` is coming from the Adler-32 initialization. | |
checksum = common.update_adler32(1, buffer) | |
# Make sure that the checksums match. | |
if checksum != header.chunk_checksum: | |
raise common.BadData( | |
"Checksum mismatch: Expected {}, got {}".format( | |
hex(header.chunk_checksum), hex(checksum) | |
) | |
) | |
# Get the uncompressed size (the original file size) by reading 4 bytes | |
# from the buffer. | |
# This value will be followed by 4 empty (null) bytes inside the buffer. | |
decompressed_size = common.read_unsigned_integer_32_bit(buffer, 0) | |
# Get the name of the input file. | |
# In the first step we read the number of bytes for this name, which is | |
# encoded using 2 bytes (the offset of 8 bytes is due to the original | |
# file size, see above). | |
# Afterwards we make sure that we will not exceed the buffer limits and | |
# retrieve the file name itself. | |
# Please note that we do not use this value in our implementation, but | |
# let the user enter the output filename on their own. This is done to | |
# keep the general interface as similar as possible. | |
# Another note: The output file name still has the trailing null-byte. | |
name_length = common.read_unsigned_integer_16_bit(buffer, 8) | |
name_length = min(name_length, header.chunk_size - 10) | |
output_file_name = buffer[10 : 10 + name_length] | |
assert len(output_file_name) == name_length | |
if header.chunk_id == 17 and decompressed_size: | |
# This is not the first chunk. | |
# Handle the different compression variants. | |
if header.chunk_options == 0: | |
# This is uncompressed data, so we can just copy it to the output. | |
# Initialize the remaining size and the checksum (see Adler-32 | |
# initialization notes). | |
remaining = header.chunk_size | |
checksum = 1 | |
# Read one block at a time, write it and update the checksum. | |
while remaining > 0: | |
# Make sure to not read across the source buffer bounds. | |
if source_position >= source_length: | |
raise common.BadData("Reached end while copying data.") | |
# Determine the size of the current block. | |
bytes_in_block = min( | |
remaining, Constants.BLOCK_SIZE_DECOMPRESSION | |
) | |
# Read the current block from the source buffer and make sure | |
# that it actually has the desired length. | |
buffer = source[ | |
source_position : source_position + bytes_in_block | |
] | |
assert len(buffer) == bytes_in_block | |
source_position += bytes_in_block | |
# Add the current block to the output. | |
destination += buffer | |
destination_position += bytes_in_block | |
# Update the checksum. | |
checksum = common.update_adler32(checksum, buffer) | |
# We have read some bytes. | |
remaining -= bytes_in_block | |
# Make sure that everything has been read/written correctly by | |
# comparing the checksum. | |
if checksum != header.chunk_checksum: | |
raise common.BadData( | |
"Checksum mismatch: Expected {}, got {}".format( | |
hex(header.chunk_checksum), hex(checksum) | |
) | |
) | |
elif header.chunk_options == 1: | |
# This has been compressed using FastLZ. | |
# Read the current chunk from the input. | |
compressed_buffer = source[ | |
source_position : source_position + header.chunk_size | |
] | |
source_position += header.chunk_size | |
# Update the checksum. | |
checksum = common.update_adler32(1, compressed_buffer) | |
# Make sure that everything has been read correctly. | |
if checksum != header.chunk_checksum: | |
raise common.BadData( | |
"Checksum mismatch: Expected {}, got {}".format( | |
hex(header.chunk_checksum), hex(checksum) | |
) | |
) | |
# Decompress the given data. | |
decompressed_buffer = common.call_decompressor_for_buffer_level( | |
compressed_buffer | |
) | |
decompressed_size = len(decompressed_buffer) | |
# Make sure that we did not lose/added some data. | |
if decompressed_size != header.chunk_extra: | |
raise common.BadData( | |
"Expected {} bytes after decompression, got {} " | |
+ "bytes.".format(header.chunk_extra, decompressed_size) | |
) | |
# Add the decompressed buffer to the output. | |
destination += decompressed_buffer | |
destination_position += decompressed_size | |
else: | |
# This is using a compression method not (yet) known. | |
raise common.BadData( | |
"Unknown compression method {}.".format(header.chunk_options) | |
) | |
# Return the destination buffer. | |
return destination | |
@staticmethod | |
def _read_chunk_header(buffer, start_position): | |
""" | |
Read the header for the current chunk. | |
:param buffer: The buffer to read from. | |
:type buffer: bytearray | |
:param start_position: The start index to use inside the buffer. | |
:type start_position: int | |
:return: The chunk header read from the buffer. | |
:rtype: ChunkHeader | |
""" | |
header = ChunkHeader() | |
# Get the chunk ID, taking the first 2 bytes. | |
header.chunk_id = common.read_unsigned_integer_16_bit(buffer, start_position) | |
# Get the chunk options, taking the next 2 bytes. | |
header.chunk_options = common.read_unsigned_integer_16_bit( | |
buffer, start_position + 2 | |
) | |
# Get the size, taking the next 4 bytes. | |
header.chunk_size = common.read_unsigned_integer_32_bit( | |
buffer, start_position + 4 | |
) | |
# Get the checksum, taking the next 4 bytes. | |
header.chunk_checksum = common.read_unsigned_integer_32_bit( | |
buffer, start_position + 8 | |
) | |
# Get the extra data, taking the next 4 bytes. | |
header.chunk_extra = common.read_unsigned_integer_32_bit( | |
buffer, start_position + 12 | |
) | |
# Return the header data. | |
return header | |
class ChunkHeader: | |
""" | |
Container for the chunk header data. | |
""" | |
chunk_id = 0 | |
""" | |
The ID of the chunk, using 2 bytes. | |
This is either 1 (= first chunk) or 17 (= not the first chunk). | |
:type: :class:`int` | |
""" | |
chunk_options = 0 | |
""" | |
The options of the chunk, using 2 bytes. | |
This is either 0 (= uncompressed data, simply copy the data to the output) or 1 | |
(= compressed using FastLZ). | |
:type: :class:`int` | |
""" | |
chunk_size = 0 | |
""" | |
The size of the current chunk, using 4 bytes. | |
This is either `10 + len(filename) + 1` if this is the first chunk, the compressed | |
size for compressed data or the original size of the block for uncompressed data. | |
:type: :class:`int` | |
""" | |
chunk_checksum = 0 | |
""" | |
The checksum of the current chunk, using 4 bytes. | |
This is the current Adler-32 checksum based upon the content. | |
:type: :class:`int` | |
""" | |
chunk_extra = 0 | |
""" | |
The extra data of the current chunk, using 4 bytes. | |
This is 0 for the first chunk and the number of bytes read from the input file | |
during compression. This will never exceed the block size. | |
:type: :class:`int` | |
""" | |
def __init__(self): | |
self.chunk_id = 0 | |
self.chunk_options = 0 | |
self.chunk_size = 0 | |
self.chunk_checksum = 0 | |
self.chunk_extra = 0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Modified from original source: https://gitlab.com/Stefan65/lz77-variants_python | |
# ===================================================================================== | |
# FastLZ - Byte-aligned LZ77 compression library | |
# Copyright (C) 2005-2020 Ariya Hidayat <[email protected]> | |
# Copyright (C) 2020 Stefan65 (Python port) | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
# THE SOFTWARE. | |
# ===================================================================================== | |
""" | |
Interface to the FastLZ implementations itself. | |
This should be the correct entry point for most applications. | |
""" | |
from fastlz_lib import common | |
from fastlz_lib.configuration import FastLzLevel | |
from fastlz_lib.files import FileCompressor, FileDecompressor | |
class FastLzInterface: | |
""" | |
FastLZ interface. | |
This should be the entrypoint for most applications. | |
""" | |
@staticmethod | |
def compress(decompressed, level=FastLzLevel.AUTOMATIC): | |
""" | |
Compress the given buffer using the given FastLZ level. | |
:param decompressed: The buffer to compress. | |
:type decompressed: bytearray | |
:param level: The FastLZ level to use. | |
:type level: `int` | |
:return: The compressed buffer. | |
:rtype: bytearray | |
""" | |
# Use the selected level if possible. | |
if level == FastLzLevel.LEVEL1: | |
return common.CompressorLevel1().compress(decompressed) | |
elif level == FastLzLevel.LEVEL2: | |
return common.CompressorLevel2().compress(decompressed) | |
# For short blocks, choose level 1. | |
if len(decompressed) < 65536: | |
# Use level 1. | |
return common.CompressorLevel1().compress(decompressed) | |
# Use level 2. | |
return common.CompressorLevel2().compress(decompressed) | |
@staticmethod | |
def compress_file(input_file, output_file=None, level=FastLzLevel.AUTOMATIC): | |
""" | |
Compress the given file using the given FastLZ level. | |
:param input_file: The name of the file to compress. | |
:type input_file: str | |
:param output_file: The name of the file to write the compressed data to. Set | |
to :code:`None` to not write the output to a file. | |
:type output_file: str or None | |
:param level: The FastLZ level to use. | |
:type level: `int` | |
:return: The compressed buffer. | |
:rtype: bytearray | |
""" | |
with open(input_file, mode="rb") as infile: | |
decompressed = bytearray(infile.read()) | |
if level != FastLzLevel.AUTOMATIC: | |
# Use the selected level. | |
compressed = FileCompressor().compress(decompressed, input_file, level) | |
elif len(decompressed) < 65536: | |
# For short blocks, choose level 1. | |
compressed = FileCompressor().compress( | |
decompressed, input_file, FastLzLevel.LEVEL1 | |
) | |
else: | |
# Use level 2. | |
compressed = FileCompressor().compress( | |
decompressed, input_file, FastLzLevel.LEVEL2 | |
) | |
if output_file: | |
with open(output_file, mode="wb") as outfile: | |
outfile.write(compressed) | |
return compressed | |
@staticmethod | |
def decompress(compressed): | |
""" | |
Decompress the given buffer using the FastLZ algorithm. | |
:param compressed: The buffer to decompress. | |
:type compressed: bytearray | |
:return: The decompressed buffer. | |
:rtype: bytearray | |
:raises ValueError: The level is invalid. | |
""" | |
return common.call_decompressor_for_buffer_level(compressed) | |
@staticmethod | |
def decompress_file(input_file, output_file=None): | |
""" | |
Decompress the given file using the FastLZ algorithm. | |
:param input_file: The name of the file to decompress. | |
:type input_file: str | |
:param output_file: The name of the file to write the decompressed data to. Set | |
to :code:`None` to not write the output to a file. | |
:type output_file: str or None | |
:return: The decompressed buffer. | |
:rtype: bytearray | |
""" | |
with open(input_file, mode="rb") as infile: | |
compressed = bytearray(infile.read()) | |
decompressed = FileDecompressor().decompress(compressed) | |
if output_file: | |
with open(output_file, mode="wb") as outfile: | |
outfile.write(decompressed) | |
return decompressed |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Modified from original source: https://gitlab.com/Stefan65/lz77-variants_python | |
""" | |
Collection of common utility methods. | |
""" | |
def shift_left_and_cut(value, shift=1, bits=8): | |
""" | |
Shift the given value to the left and cut it to the desired size. | |
:param value: The value to shift. | |
:type value: int | |
:param shift: The number of bits to shift too the left. | |
:type shift: int | |
:param bits: The number of bits of the output. Only the last `bits` bits will be | |
kept. | |
:type bits: int | |
:return: The value being shifted with a maximum of `bits` bits. | |
:rtype: int | |
""" | |
return get_last_bits(value << shift, bits) | |
def get_last_bits(value, bits=8): | |
""" | |
Get the last `bits` bits from the given value. | |
:param value: The value to get the bits from. | |
:type value: int | |
:param bits: The number of bits of the output. Only the last `bits` bits will be | |
kept. | |
:type bits: int | |
:return: The value with a maximum of `bits` bits. | |
:rtype: int | |
""" | |
mask = (1 << bits) - 1 | |
return value & mask | |
def get_last_bytes(value, byte_count=1): | |
""" | |
Get the last `byte_count` bytes from the given value. | |
:param value: The value to get the bits from. | |
:type value: int | |
:param byte_count: The number of bytes of the output. Only the last `byte_count` | |
bytes will be kept. | |
:type byte_count: int | |
:return: The value with a maximum of `byte_count` bytes. | |
:rtype: int | |
""" | |
# Multiply by 2^3 = 8 to get the number of bits. | |
bits = byte_count << 3 | |
return get_last_bits(value, bits) | |
def compare_first_bytes(data_buffer, start_position1, start_position2, byte_count): | |
""" | |
Compare the first `byte_count` bytes of the buffer using the given start positions. | |
:param data_buffer: The buffer to work on. | |
:type data_buffer: bytearray | |
:param start_position1: The first start position for the comparison. | |
:type start_position1: int | |
:param start_position2: The second start position for the comparison. | |
:type start_position2: int | |
:param byte_count: The number of bytes to compare. | |
:type byte_count: int | |
:return: Whether the first `byte_count` bytes of the buffer - when starting at both | |
positions - match. This will be :code:`False` if there are not enough bytes | |
available starting at one of the positions. | |
:rtype: bool | |
""" | |
# Retrieve the first `byte_count` bytes from the buffer starting at each position. | |
buffer1 = data_buffer[start_position1 : start_position1 + byte_count] | |
buffer2 = data_buffer[start_position2 : start_position2 + byte_count] | |
# As slicing does not fail if the number of bytes is less than the requested one, | |
# check it manually and fail if needed. | |
if len(buffer1) != byte_count: | |
return False | |
if len(buffer2) != byte_count: | |
return False | |
# Perform the comparison itself. | |
return buffer1 == buffer2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage:
fastlz_lib
_fastlz-lib__init__.py
to__init__.py
from fastlz_lib.common import CompressorLevel2, DecompressorLevel2
DecompressorLevel2().decompress(data)
)