diff --git a/src/bitstruct/__init__.py b/src/bitstruct/__init__.py index efb4dda..b635783 100644 --- a/src/bitstruct/__init__.py +++ b/src/bitstruct/__init__.py @@ -97,7 +97,7 @@ def pack(self, arg): return bin(int(b'01' + binascii.hexlify(value), 16))[3:] def unpack(self, bits): - packed = _unpack_bytearray(self.size, bits) + packed = _unpack_bytearray(self.size, bits, False) if self.size == 16: value = struct.unpack('>e', packed)[0] @@ -112,18 +112,25 @@ def unpack(self, bits): class _Raw(_Info): + def __init__(self, size, name, align_right): + super().__init__(size, name) + self.align_right = align_right + self.number_of_padding_bits = (8 - (self.size % 8)) % 8 def pack(self, arg): - number_of_padding_bytes = ((self.size - 8 * len(arg)) // 8) - arg += (number_of_padding_bytes * b'\x00') - - return bin(int(b'01' + binascii.hexlify(arg), 16))[3:self.size + 3] + number_of_padding_bytes = max(((self.size - 8 * len(arg)) // 8), 0) + if self.align_right: + arg = number_of_padding_bytes * b'\x00' + arg + return bin(int(b'01' + binascii.hexlify(arg), 16))[-self.size:] + else: + arg += number_of_padding_bytes * b'\x00' + return bin(int(b'01' + binascii.hexlify(arg), 16))[3:self.size + 3] def unpack(self, bits): - rest = self.size % 8 - - if rest > 0: - bits += (8 - rest) * '0' + if self.align_right: + bits = self.number_of_padding_bits * '0' + bits + else: + bits += self.number_of_padding_bits * '0' return binascii.unhexlify(hex(int('10000000' + bits, 2))[4:].rstrip('L')) @@ -147,23 +154,30 @@ def pack(self): class _Text(_Info): - def __init__(self, size, name, encoding, errors): + def __init__(self, size, name, encoding, errors, align_right): super().__init__(size, name) self.encoding = encoding self.errors = errors + self.align_right = align_right def pack(self, arg): encoded = arg.encode('utf-8') number_of_padding_bytes = ((self.size - 8 * len(encoded)) // 8) - encoded += (number_of_padding_bytes * b'\x00') + if self.align_right: + encoded = number_of_padding_bytes * b'\x00' + encoded + else: + encoded += number_of_padding_bytes * b'\x00' - return _pack_bytearray(self.size, encoded) + return _pack_bytearray(self.size, encoded, self.align_right) def unpack(self, bits): - return _unpack_bytearray(self.size, bits).decode(self.encoding, self.errors) + return _unpack_bytearray(self.size, + bits, + self.align_right).decode(self.encoding, + self.errors) -def _parse_format(fmt, names, text_encoding, text_errors): +def _parse_format(fmt, names, text_encoding, text_errors, align_right): if fmt and fmt[-1] in '><': byte_order = fmt[-1] fmt = fmt[:-1] @@ -209,10 +223,10 @@ def _parse_format(fmt, names, text_encoding, text_errors): info = _Boolean(size, name) i += 1 elif type_ == 't': - info = _Text(size, name, text_encoding, text_errors) + info = _Text(size, name, text_encoding, text_errors, align_right) i += 1 elif type_ == 'r': - info = _Raw(size, name) + info = _Raw(size, name, align_right) i += 1 elif type_ == 'p': info = _ZeroPadding(size) @@ -228,15 +242,20 @@ def _parse_format(fmt, names, text_encoding, text_errors): return infos, byte_order or '>' -def _pack_bytearray(size, arg): - return bin(int(b'01' + binascii.hexlify(arg), 16))[3:size + 3] +def _pack_bytearray(size, arg, align_right): + if align_right: + return bin(int(b'01' + binascii.hexlify(arg), 16))[-size:] + else: + return bin(int(b'01' + binascii.hexlify(arg), 16))[3:size + 3] -def _unpack_bytearray(size, bits): - rest = size % 8 +def _unpack_bytearray(size, bits, align_right): + num_padding_bits = (8 - (size % 8)) % 8 - if rest > 0: - bits += (8 - rest) * '0' + if align_right > 0: + bits = num_padding_bits * '0' + bits + else: + bits += num_padding_bits * '0' return binascii.unhexlify(hex(int('10000000' + bits, 2))[4:].rstrip('L')) @@ -247,8 +266,15 @@ def __init__(self, fmt, names=None, text_encoding='utf-8', - text_errors='strict'): - infos, byte_order = _parse_format(fmt, names, text_encoding, text_errors) + text_errors='strict', + align_right=False): + infos, byte_order = _parse_format(fmt, names, text_encoding, text_errors, align_right) + left_padding = 0 + self._align_right = align_right + if align_right: + initial_size = sum([info.size for info in infos]) + left_padding = (8 - (initial_size % 8)) % 8 + infos = [_ZeroPadding(left_padding)] + infos self._infos = infos self._byte_order = byte_order self._number_of_bits_to_unpack = sum([info.size for info in infos]) @@ -290,7 +316,7 @@ def pack_any(self, values): if tail != 0: bits += (8 - tail) * '0' - return bytes(_unpack_bytearray(len(bits), bits)) + return bytes(_unpack_bytearray(len(bits), bits, self._align_right)) def unpack_from_any(self, data, offset, allow_truncated): bits = bin(int(b'01' + binascii.hexlify(data), 16))[3 + offset:] @@ -302,7 +328,8 @@ def unpack_from_any(self, data, offset, allow_truncated): self._number_of_bits_to_unpack, len(bits))) - offset = 0 + offset = (len(bits) - self._number_of_bits_to_unpack + if self._align_right else 0) for info in self._infos: if offset + info.size > len(bits): @@ -341,7 +368,7 @@ def unpack_from_any(self, data, offset, allow_truncated): def pack_into_any(self, buf, offset, data, **kwargs): fill_padding = kwargs.get('fill_padding', True) - buf_bits = _pack_bytearray(8 * len(buf), buf) + buf_bits = _pack_bytearray(8 * len(buf), buf, self._align_right) bits = buf_bits[0:offset] for info in self._infos: @@ -359,7 +386,7 @@ def pack_into_any(self, buf, offset, data, **kwargs): raise Error( f'pack_into requires a buffer of at least {len(bits)} bits') - buf[:] = _unpack_bytearray(len(bits), bits) + buf[:] = _unpack_bytearray(len(bits), bits, self._align_right) def calcsize(self): """Return the number of bits in the compiled format string. @@ -378,15 +405,19 @@ class CompiledFormat(_CompiledFormat): """ - def __init__(self, fmt, text_encoding='utf-8', text_errors='strict'): - super().__init__(fmt, None, text_encoding, text_errors) + def __init__(self, + fmt, + text_encoding='utf-8', + text_errors='strict', + align_right=False): + super().__init__(fmt, None, text_encoding, text_errors, align_right) self._number_of_arguments = 0 for info in self._infos: if not isinstance(info, _Padding): self._number_of_arguments += 1 - def pack(self, *args): + def pack(self, *args, **kwargs): """See :func:`~bitstruct.pack()`. """ @@ -471,7 +502,7 @@ def unpack_from(self, data, offset=0, allow_truncated=False): data, offset, allow_truncated=allow_truncated)} -def pack(fmt, *args): +def pack(fmt, *args, **kwargs): """Return a bytes object containing the values v1, v2, ... packed according to given format string `fmt`. If the total number of bits are not a multiple of 8, padding will be added at the end of @@ -520,14 +551,15 @@ def pack(fmt, *args): """ - return CompiledFormat(fmt).pack(*args) + return CompiledFormat(fmt, **kwargs).pack(*args) def unpack(fmt, data, allow_truncated=False, text_encoding='utf-8', - text_errors='strict'): + text_errors='strict', + align_right=False): """Unpack `data` (bytes or bytearray) according to given format string `fmt`. @@ -542,7 +574,7 @@ def unpack(fmt, """ - return CompiledFormat(fmt, text_encoding, text_errors).unpack( + return CompiledFormat(fmt, text_encoding, text_errors, align_right).unpack( data, allow_truncated=allow_truncated) @@ -599,7 +631,8 @@ def unpack_dict(fmt, data, allow_truncated=False, text_encoding='utf-8', - text_errors='strict'): + text_errors='strict', + align_right=False): """Same as :func:`~bitstruct.unpack()`, but returns a dictionary. See :func:`~bitstruct.pack_dict()` for details on `names`. @@ -621,6 +654,9 @@ def pack_into_dict(fmt, names, buf, offset, data, **kwargs): """ + if kwargs.get('align_right', False): + raise Error('align_right not supported for pack_into') + return CompiledFormatDict(fmt, names).pack_into(buf, offset, data, @@ -679,7 +715,8 @@ def byteswap(fmt, data, offset=0): def compile(fmt, names=None, text_encoding='utf-8', - text_errors='strict'): + text_errors='strict', + align_right=False): """Compile given format string `fmt` and return a compiled format object that can be used to pack and/or unpack data multiple times. @@ -695,6 +732,6 @@ def compile(fmt, """ if names is None: - return CompiledFormat(fmt, text_encoding, text_errors) + return CompiledFormat(fmt, text_encoding, text_errors, align_right) else: - return CompiledFormatDict(fmt, names, text_encoding, text_errors) + return CompiledFormatDict(fmt, names, text_encoding, text_errors, align_right) diff --git a/tests/test_bitstruct.py b/tests/test_bitstruct.py index 9504488..467ff60 100644 --- a/tests/test_bitstruct.py +++ b/tests/test_bitstruct.py @@ -763,6 +763,160 @@ def test_pack_unpack_unsigned(self): self.assertEqual(pack(fmt, value), packed) self.assertEqual(unpack(fmt, packed), (value, )) + def test_align_right(self): + # Pack + packed = pack('u1u1s6u7u9', 0, 0, -2, 65, 22, align_right=True) + self.assertEqual(packed, b'\x3e\x82\x16') + + packed = pack('u1', 1, align_right=True) + self.assertEqual(packed, b'\x01') + + packed = pack('u77', 0x100000000001000000, align_right=True) + ref = b'\x00\x10\x00\x00\x00\x00\x01\x00\x00\x00' + + packed = pack('b1', True, align_right=True) + self.assertEqual(packed, b'\x01') + + packed = pack('b1p6b1', True, True, align_right=True) + self.assertEqual(packed, b'\x81') + + # Unpack + unpacked = unpack('u1', bytearray(b'\x80'), align_right=True) + self.assertEqual(unpacked, (0, )) + + unpacked = unpack('u1', bytearray(b'\x01'), align_right=True) + self.assertEqual(unpacked, (1, )) + + unpacked = unpack('u1', bytearray(b'\x01'), align_right=False) + self.assertEqual(unpacked, (0, )) + + packed = b'\x00\x80\x00\x00\x00\x00\x08\x00\x00\x00' + unpacked = unpack('u77', packed, align_right=True) + self.assertEqual(unpacked, (0x800000000008000000,)) + + # Too many bits to unpack. + with self.assertRaises(Error) as cm: + unpack('u9', b'\x00', align_right=True) + + # # partial unpacking of truncated data (errors with align_right) + # with self.assertRaises(Error) as cm: + # unpack('u4u5', b'\x5f', allow_truncated=True, align_right=True) + # self.assertEqual(str(cm.exception), + # 'cannot allow for truncated unpack with align_right') + # with self.assertRaises(Error) as cm: + # unpack('p8u4u5', b'\x00\x5f', allow_truncated=True, align_right=True) + # self.assertEqual(str(cm.exception), + # 'cannot allow for truncated unpack with align_right') + + # Endianness + ref = b'\x00\x91\xa6\xbf\x80\x00\x00' + packed = pack('>u19s3f32', 0x1234, -2, -1.0, align_right=True) + self.assertEqual(packed, ref) + unpacked = unpack('>u19s3f32', packed, align_right=True) + self.assertEqual(unpacked, (0x1234, -2, -1.0)) + + ref = b'\x0b\x12\x03\x00\x00\x01\xfd' + packed = pack('u19f64r3p4', 1, -2, 1.0, b'\x04', align_right=True) + self.assertEqual(packed, ref) + unpacked = unpack('>u19f64r3p4', packed, align_right=True) + self.assertEqual(unpacked, (1, -2, 1.0, b'\x04')) + + ref = b'\x40\x00\x0f\x00\x00\x00\x00\x00\x00\x07\xfe\x10' + packed = pack('s5s5', 0x1234, align_right=True) + self.assertEqual(packed, ref) + unpacked = unpack('p4u16p4>', packed, align_right=True) + self.assertEqual(unpacked, (0x1234,)) + + ref = b'\x04\x23\x10' + packed = pack('p4u16p4<', 0x1234, align_right=True) + self.assertEqual(packed, ref) + unpacked = unpack('p4u16p4<', packed, align_right=True) + self.assertEqual(unpacked, (0x1234,)) + + ref = b'\x01' + packed = pack('u1<', 1, align_right=True) + self.assertEqual(packed, ref) + unpacked = unpack('u1<', packed, align_right=True) + self.assertEqual(unpacked, (1, )) + + # Raw + packed = pack('r24', b'', align_right=True) + self.assertEqual(packed, b'\x00\x00\x00') + packed = pack('r24', b'12', align_right=True) + self.assertEqual(packed, b'\x0012') + packed = pack('r24', b'123', align_right=True) + self.assertEqual(packed, b'123') + packed = pack('r24', b'1234', align_right=True) + self.assertEqual(packed, b'234') + + # Text + packed = pack('t24', '', align_right=True) + self.assertEqual(packed, b'\x00\x00\x00') + packed = pack('t24', '12', align_right=True) + self.assertEqual(packed, b'\x0012') + packed = pack('t24', '123', align_right=True) + self.assertEqual(packed, b'123') + packed = pack('t24', '1234', align_right=True) + self.assertEqual(packed, b'234') + + unpacked = unpack('t24', b'\x00\x00\x00', align_right=True)[0] + self.assertEqual(unpacked, '\x00\x00\x00') + unpacked = unpack('t24', b'12\x00', align_right=True)[0] + self.assertEqual(unpacked, '12\x00') + unpacked = unpack('t24', b'123', align_right=True)[0] + self.assertEqual(unpacked, '123') + unpacked = unpack('t24', b'1234', align_right=True)[0] + self.assertEqual(unpacked, '234') + + unpacked = unpack('t8', + b'\xff', + text_errors='replace', + align_right=True)[0] + self.assertEqual(unpacked, '�') + unpacked = unpack('t8', + b'\xff', + text_errors='ignore', + align_right=True)[0] + self.assertEqual(unpacked, '') + + cf = bitstruct.compile('t8', + text_encoding='latin-1', + text_errors='replace') + unpacked = cf.unpack(b'\xff')[0] + self.assertEqual(unpacked, 'ÿ') + + cf = bitstruct.compile('t8', + text_encoding='utf-8', + text_errors='ignore') + unpacked = cf.unpack(b'\xff')[0] + self.assertEqual(unpacked, '') + + cf = bitstruct.compile('t8', + names=['a'], + text_encoding='utf-8', + text_errors='replace') + unpacked = cf.unpack(b'\xff') + self.assertEqual(unpacked, {'a': '�'}) + if __name__ == '__main__': unittest.main()