Skip to content

gesslerpd/msgpack-stream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

24 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

msgpack-streams

Fast stream based implementation of msgpack in pure Python.

Installation

pip install msgpack-streams

Benchmarks

Average of 50 iterations each on a 3.77 MB payload, pure Python 3.14.3 (with MSGPACK_PUREPYTHON=1).

Implementation Operation Speedup vs msgpack
msgpack-streams unpack decode 2.83x
msgpack-streams unpack_stream decode 2.70x
msgpack-streams pack encode 1.84x
msgpack-streams pack_stream encode 1.69x

For PyPy 3.11.15, the pure Python performance is comparable to the msgpack C extension.

Implementation Operation Speedup vs msgpack (C)
msgpack-streams unpack decode 0.95x
msgpack-streams pack encode 1.96x

Usage

from msgpack_streams import pack, unpack

data = {"key": "value", "number": 42, "list": [1, 2, 3]}
packed = pack(data)
unpacked, excess_data = unpack(packed)
assert data == unpacked
assert not excess_data

The stream based API is also available:

from msgpack_streams import pack_stream, unpack_stream
import io

data = {"key": "value", "number": 42, "list": [1, 2, 3]}

with io.BytesIO() as stream:
    pack_stream(stream, data)
    # reset stream position for reading
    stream.seek(0)
    unpacked = unpack_stream(stream)

assert data == unpacked

Extensions

Datetime

Timezone-aware datetime objects are natively supported and automatically encoded using the msgpack Timestamp extension (type code -1). The timestamp format (32-, 64-, or 96-bit) is chosen automatically based on the value's range and precision. Decoded timestamps are always returned as UTC datetime objects.

from datetime import datetime, timezone
from msgpack_streams import pack_stream, unpack_stream
import io

dt = datetime(2025, 3, 25, 12, 0, 0, tzinfo=timezone.utc)

with io.BytesIO() as stream:
    pack_stream(stream, dt)
    stream.seek(0)
    unpacked = unpack_stream(stream)

assert unpacked == dt

Naive datetime objects (without tzinfo) will raise a ValueError.

ExtType

Arbitrary msgpack extension types are supported via the ExtType dataclass:

from msgpack_streams import ExtType, pack_stream, unpack_stream
import io

obj = ExtType(code=42, data=b"hello")

with io.BytesIO() as stream:
    pack_stream(stream, obj)
    stream.seek(0)
    unpacked = unpack_stream(stream)

assert unpacked == obj

Use ext_hook to pack custom types as extensions, and ext_hook to decode them back:

from dataclasses import dataclass
from msgpack_streams import ExtType, pack, unpack
from fmtspec import decode, encode, types  # https://pypi.org/project/fmtspec/

@dataclass
class Point:
    EXT_CODE = 10

    __fmt__ = {
        "x": types.u32,
        "y": types.u32,
    }

    x: int
    y: int

def unknown_type_hook(obj):
    if isinstance(obj, Point):
        return ExtType(Point.EXT_CODE, encode(obj))
    return None  # unsupported type -> TypeError

def ext_hook(ext):
    if ext.code == Point.EXT_CODE:
        return decode(ext.data, shape=Point)
    return None  # unknown -> keep as ExtType

pt = Point(1, 2)
packed = pack(pt, ext_hook=unknown_type_hook)
result, _ = unpack(packed, ext_hook=ext_hook)
assert pt == result

API reference

def pack(obj: object, *, float32: bool = False, ext_hook: Callable[[object], ExtType | None] | None = None) -> bytes:
    ...

Serialize obj to a bytes object. Pass float32=True to encode float values as 32-bit instead of the default 64-bit.

Pass ext_hook to handle types that are not natively supported. The callback receives the unsupported object and should return an ExtType to pack in its place. If it returns None a TypeError is raised as normal.


def unpack(data: bytes, *, ext_hook: Callable[[ExtType], object | None] | None = None) -> tuple[object, bytes]:
    ...

Deserialize the first msgpack object from data. Returns (obj, excess) where excess is any unconsumed bytes that followed the object.

Pass ext_hook to convert ExtType values during decoding. The callback receives each ExtType and should return the decoded object, or None to leave it as an ExtType.


def pack_stream(stream: BinaryIO, obj: object, *, float32: bool = False, ext_hook: Callable[[object], ExtType | None] | None = None) -> None:
    ...

Serialize obj directly into a binary stream. Pass float32=True to encode float values as 32-bit instead of the default 64-bit.

Pass ext_hook to handle types that are not natively supported. The callback receives the unsupported object and should return an ExtType to pack in its place. If it returns None a TypeError is raised as normal.


def unpack_stream(stream: BinaryIO, *, ext_hook: Callable[[ExtType], object] | None = None) -> object:
    ...

Deserialize a single msgpack object from a binary stream, advancing the stream position past the consumed bytes.

Pass ext_hook to convert ExtType values during decoding. The callback receives each ExtType and should return the decoded object, or None to leave it as an ExtType.

About

Fast stream based implementation of msgpack in pure Python

Topics

Resources

License

Stars

Watchers

Forks

Contributors

Languages