Last active
August 29, 2015 14:01
-
-
Save AlexSnet/1d2b0e2c9bedcc5713be to your computer and use it in GitHub Desktop.
Binary structures in python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
from collections import namedtuple | |
from collections import OrderedDict | |
from _abcoll import * | |
import _abcoll | |
from _collections import deque, defaultdict | |
from operaator import itemgetter as _itemgetter, eq as _eq | |
from keyword import iskeyword as _iskeyword | |
import sys as _sys | |
import heapq as _heapq | |
from itertools import repeat as _repeat, chain as _chain, starmap as _starmap | |
from itertools import imap as _imap | |
try: | |
from thread import get_ident as _get_ident | |
except ImportError: | |
from dummy_thread import get_ident as _get_ident | |
_class_template = '''\ | |
class {typename}(tuple): | |
'{typename}({arg_list})' | |
__slots__ = () | |
_fields = {field_names!r} | |
_types = ({field_types},) | |
def __new__(_cls, {arg_list}): | |
'Create new instance of {typename}({arg_list})' | |
return _tuple.__new__(_cls, ({arg_list})) | |
def __repr__(self): | |
'Return a nicely formatted representation string' | |
return '{typename}({repr_fmt})' % self | |
def _asdict(self): | |
'Return a new OrderedDict which maps field names to their values' | |
return OrderedDict(zip(self._fields, self)) | |
def __getnewargs__(self): | |
'Return self as a plain tuple. Used by copy and pickle.' | |
return tuple(self) | |
__dict__ = _property(_asdict) | |
def _replace(_self, **kwds): | |
'Return a new {typename} object replacing specified fields with new values' | |
result = _self._make(map(kwds.pop, {field_names!r}, _self)) | |
if kwds: | |
raise ValueError('Got unexpected field names: %r' % kwds.keys()) | |
return result | |
def __getstate__(self): | |
'Exclude the OrderedDict from pickling' | |
pass | |
@classmethod | |
def _make(cls, iterable, new=tuple.__new__, len=len): | |
'Make a new {typename} object from a sequence or iterable' | |
result = new(cls, iterable) | |
if len(result) != {num_fields:d}: | |
raise TypeError('Expected {num_fields:d} arguments, got %d' % len(result)) | |
return result | |
@classmethod | |
def bytesOf(cls, var): | |
'Returns bytes of given attribute' | |
pos = cls._fields.index(var) | |
if type(cls._types[pos]) == str: | |
return struct.calcsize(cls._types[pos]) | |
else: | |
return cls._types[pos].bytes() | |
@classmethod | |
def bytes(cls): | |
'Return total number of bytes for all attributes' | |
total = 0 | |
for i in cls._fields: | |
total += cls.bytesOf(i) | |
return total | |
@classmethod | |
def unpack(cls, fp): | |
'Unpacks structure from file pointer and returns new instance of it.' | |
d = dict() | |
for i in cls._fields: | |
pos = cls._fields.index(i) | |
if type(cls._types[pos]) in (str, unicode): | |
d[i] = struct.unpack(cls._types[pos], fp.read(cls.bytesOf(i))) | |
if cls._types[pos] in ('I', 'i', 'H', 'h', 'L', 'l', 'Q', 'q', 'P'): | |
d[i] = int(d[i][0]) | |
elif cls._types[pos] in ('f','d'): | |
d[i] = float(d[i][0]) | |
elif cls._types[pos] == '?': | |
d[i] = bool(d[i][0]) | |
elif 'c' in cls._types[pos] or 's' in cls._types[pos]: | |
d[i] = r''.join(d[i]) | |
else: | |
d[i] = cls._types[pos].unpack(fp) | |
if _mappers['in'][pos]: | |
d[i] = _mappers['in'][pos](d[i]) | |
return cls(**d) | |
def pack(self, fp): | |
'Reverse to unpack' | |
for i in self._fields: | |
pos = self._fields.index(i) | |
if type(self._types[pos]) == str: | |
val = getattr(self, i) | |
if _mappers['out'][pos]: | |
val = _mappers['out'][pos](val) | |
try: | |
if len(self._types[pos]) > 1 and int(self._types[pos][0]): | |
for x in range(len(self._types[pos])): | |
fp.write( struct.pack(self._types[pos][x], val[x]) ) | |
except: | |
pass | |
else: | |
fp.write( struct.pack(self._types[pos], val) ) | |
else: | |
val = getattr(self, i) | |
val.pack(fp) | |
{field_defs} | |
''' | |
_repr_template = '{name}=%r' | |
_field_template = '''\ | |
{name} = _property(_itemgetter({index:d}), doc='Alias for field number {index:d}') | |
''' | |
def bstruct(typename, field_names, field_types=None, field_map_in=None, field_map_out=None, verbose=False, rename=False): | |
""" | |
>>> from StringIO import StringIO | |
>>> A = bstruct('A', 'a b c d', 'c I cccc c') | |
>>> x = A.unpack(StringIO('\x47\x00\x01\x01\x00\x20\x21\x22\x23\x30')) | |
>>> print x | |
A(a='G', b=65792, c=' !"#', d='0') | |
>>> b = bstruct('B', 'a version', [A, 'i']) | |
>>> s = StringIO('\x47\x00\x01\x01\x00\x20\x21\x22\x23\x30\x01\x00\x00\x00') | |
>>> z = b.unpack(s) | |
>>> print z | |
B(a=A(a='G', b=65792, c=' !"#', d='0'), version=1) | |
>>> ss = StringIO() | |
>>> z.pack(ss) | |
>>> ss.seek(0) | |
>>> s.seek(0) | |
>>> s.read() == ss.read() | |
True | |
""" | |
field_map_in = [None]*len(field_names) if not field_map_in else field_map_in | |
field_map_out= [None]*len(field_names) if not field_map_out else field_map_out | |
if type(field_names) == tuple: | |
for i,x in enumerate(field_names): | |
field_map_in[i] = x[2] if len(x) > 2 else None | |
field_map_out[i] = x[3] if len(x) == 4 else None | |
field_types = [x[1] for x in field_names] | |
field_names = [x[0] for x in field_names] | |
# Validate the field names. At the user's option, either generate an error | |
# message or automatically replace the field name with a valid name. | |
if isinstance(field_names, basestring): | |
field_names = field_names.replace(',', ' ').split() | |
field_names = map(str, field_names) | |
if isinstance(field_types, basestring): | |
field_types = field_types.replace(',', ' ').split() | |
# field_types = map(, field_types) | |
# print field_types | |
if len(field_names) != len(field_types): | |
raise Exception('Lenght of fields must be equal to lenght of field types.') | |
if rename: | |
seen = set() | |
for index, name in enumerate(field_names): | |
if (not all(c.isalnum() or c=='_' for c in name) | |
or _iskeyword(name) | |
or not name | |
or name[0].isdigit() | |
or name.startswith('_') | |
or name in seen): | |
field_names[index] = '_%d' % index | |
seen.add(name) | |
for name in [typename] + field_names: | |
if not all(c.isalnum() or c=='_' for c in name): | |
raise ValueError('Type names and field names can only contain ' | |
'alphanumeric characters and underscores: %r' % name) | |
if _iskeyword(name): | |
raise ValueError('Type names and field names cannot be a ' | |
'keyword: %r' % name) | |
if name[0].isdigit(): | |
raise ValueError('Type names and field names cannot start with ' | |
'a number: %r' % name) | |
seen = set() | |
for name in field_names: | |
if name.startswith('_') and not rename: | |
raise ValueError('Field names cannot start with an underscore: ' | |
'%r' % name) | |
if name in seen: | |
raise ValueError('Encountered duplicate field name: %r' % name) | |
seen.add(name) | |
# for index, name in enumerate(field_names): | |
# print index, name, field_types[index] | |
# Fill-in the class template | |
class_definition = _class_template.format( | |
typename = typename, | |
field_names = tuple(field_names), | |
field_types = ', '.join(['_%s'%x.__name__ if type(x)!=str else "'%s'"%x for x in field_types]), | |
num_fields = len(field_names), | |
arg_list = repr(tuple(field_names)).replace("'", "")[1:-1], | |
repr_fmt = ', '.join(_repr_template.format(name=name) | |
for name in field_names), # V | |
field_defs = '\n'.join(_field_template.format(index=index, name=name) | |
for index, name in enumerate(field_names)) | |
) | |
if verbose: | |
print class_definition | |
# Execute the template string in a temporary namespace and support | |
# tracing utilities by setting a value for frame.f_globals['__name__'] | |
namespace = dict(_itemgetter=_itemgetter, __name__='bstruct_%s' % typename, | |
OrderedDict=OrderedDict, _property=property, _tuple=tuple, | |
struct=struct, _mappers={'in':field_map_in, 'out':field_map_out}) | |
for x in field_types: | |
if type(x)!=str: | |
namespace['_%s'%x.__name__] = x | |
try: | |
exec class_definition in namespace | |
except SyntaxError as e: | |
raise SyntaxError(e.message + ':\n' + class_definition) | |
result = namespace[typename] | |
# For pickling to work, the __module__ variable needs to be set to the frame | |
# where the named tuple is created. Bypass this step in enviroments where | |
# sys._getframe is not defined (Jython for example) or sys._getframe is not | |
# defined for arguments greater than 0 (IronPython). | |
try: | |
result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__') | |
except (AttributeError, ValueError): | |
pass | |
return result | |
if __name__ == '__main__': | |
from StringIO import StringIO | |
A = bstruct('A', 'a b c d', 'c I cccc c', verbose=True) | |
x = A.unpack(StringIO('\x47\x00\x01\x01\x00\x20\x21\x22\x23\x30')) | |
print x | |
b = bstruct('B', 'a version', [A, 'i']) | |
s = StringIO('\x47\x00\x01\x01\x00\x20\x21\x22\x23\x30\x01\x00\x00\x00') | |
z = b.unpack(s) | |
print z | |
ss = StringIO() | |
z.pack(ss) | |
ss.seek(0) | |
s.seek(0) | |
print 'Unpack test:', s.read() == ss.read() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Use c-structs as easy as in c. | |
The simple pure-python version. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment