Fast stream based implementation of msgpack in pure Python.
pip install msgpack-streamsAverage of 50 iterations each on a 3.77 MB payload, pure Python 3.14.3 (with
MSGPACK_PUREPYTHON=1).
| Implementation | Operation | Speedup vs msgpack |
|---|---|---|
msgpack-streams unpack |
decode | 2.83x |
msgpack-streams unpack_stream |
decode | 2.70x |
msgpack-streams pack |
encode | 1.84x |
msgpack-streams pack_stream |
encode | 1.69x |
For PyPy 3.11.15, the pure Python performance is comparable to the msgpack C
extension.
| Implementation | Operation | Speedup vs msgpack (C) |
|---|---|---|
msgpack-streams unpack |
decode | 0.95x |
msgpack-streams pack |
encode | 1.96x |
from msgpack_streams import pack, unpack
data = {"key": "value", "number": 42, "list": [1, 2, 3]}
packed = pack(data)
unpacked, excess_data = unpack(packed)
assert data == unpacked
assert not excess_dataThe stream based API is also available:
from msgpack_streams import pack_stream, unpack_stream
import io
data = {"key": "value", "number": 42, "list": [1, 2, 3]}
with io.BytesIO() as stream:
pack_stream(stream, data)
# reset stream position for reading
stream.seek(0)
unpacked = unpack_stream(stream)
assert data == unpackedTimezone-aware datetime objects are natively supported and automatically
encoded using the
msgpack Timestamp extension
(type code -1). The timestamp format (32-, 64-, or 96-bit) is chosen
automatically based on the value's range and precision. Decoded timestamps are
always returned as UTC datetime objects.
from datetime import datetime, timezone
from msgpack_streams import pack_stream, unpack_stream
import io
dt = datetime(2025, 3, 25, 12, 0, 0, tzinfo=timezone.utc)
with io.BytesIO() as stream:
pack_stream(stream, dt)
stream.seek(0)
unpacked = unpack_stream(stream)
assert unpacked == dtNaive datetime objects (without tzinfo) will raise a ValueError.
Arbitrary msgpack extension types are supported via the ExtType dataclass:
from msgpack_streams import ExtType, pack_stream, unpack_stream
import io
obj = ExtType(code=42, data=b"hello")
with io.BytesIO() as stream:
pack_stream(stream, obj)
stream.seek(0)
unpacked = unpack_stream(stream)
assert unpacked == objUse ext_hook to pack custom types as extensions, and ext_hook to decode them
back:
from dataclasses import dataclass
from msgpack_streams import ExtType, pack, unpack
from fmtspec import decode, encode, types # https://pypi.org/project/fmtspec/
@dataclass
class Point:
EXT_CODE = 10
__fmt__ = {
"x": types.u32,
"y": types.u32,
}
x: int
y: int
def unknown_type_hook(obj):
if isinstance(obj, Point):
return ExtType(Point.EXT_CODE, encode(obj))
return None # unsupported type -> TypeError
def ext_hook(ext):
if ext.code == Point.EXT_CODE:
return decode(ext.data, shape=Point)
return None # unknown -> keep as ExtType
pt = Point(1, 2)
packed = pack(pt, ext_hook=unknown_type_hook)
result, _ = unpack(packed, ext_hook=ext_hook)
assert pt == resultdef pack(obj: object, *, float32: bool = False, ext_hook: Callable[[object], ExtType | None] | None = None) -> bytes:
...Serialize obj to a bytes object. Pass float32=True to encode float
values as 32-bit instead of the default 64-bit.
Pass ext_hook to handle types that are not natively supported. The callback
receives the unsupported object and should return an ExtType to pack in its
place. If it returns None a TypeError is raised as normal.
def unpack(data: bytes, *, ext_hook: Callable[[ExtType], object | None] | None = None) -> tuple[object, bytes]:
...Deserialize the first msgpack object from data. Returns (obj, excess) where
excess is any unconsumed bytes that followed the object.
Pass ext_hook to convert ExtType values during decoding. The callback
receives each ExtType and should return the decoded object, or None to leave
it as an ExtType.
def pack_stream(stream: BinaryIO, obj: object, *, float32: bool = False, ext_hook: Callable[[object], ExtType | None] | None = None) -> None:
...Serialize obj directly into a binary stream. Pass float32=True to encode
float values as 32-bit instead of the default 64-bit.
Pass ext_hook to handle types that are not natively supported. The callback
receives the unsupported object and should return an ExtType to pack in its
place. If it returns None a TypeError is raised as normal.
def unpack_stream(stream: BinaryIO, *, ext_hook: Callable[[ExtType], object] | None = None) -> object:
...Deserialize a single msgpack object from a binary stream, advancing the stream position past the consumed bytes.
Pass ext_hook to convert ExtType values during decoding. The callback
receives each ExtType and should return the decoded object, or None to leave
it as an ExtType.