Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@ building durable orchestrations. The repo contains two packages:
- Follow PEP 8 conventions.
- Use `autopep8` for Python formatting.

## Python Type Checking

Before linting, check for and fix any Pylance errors in the files you
changed. Use the editor's diagnostics (or the `get_errors` tool) to
identify type errors and resolve them first — type safety takes
priority over style.

## Python Linting

This repository uses [flake8](https://flake8.pycqa.org/) for Python
linting. Run it after making changes to verify there are no issues:

```bash
flake8 path/to/changed/file.py
python -m flake8 path/to/changed/file.py
```

## Markdown Style
Expand Down Expand Up @@ -57,38 +64,45 @@ repository root.
To lint a single file:

```bash
pymarkdown -c .pymarkdown.json scan path/to/file.md
python -m pymarkdown -c .pymarkdown.json scan path/to/file.md
```

To lint all Markdown files in the repository:

```bash
pymarkdown -c .pymarkdown.json scan **/*.md
python -m pymarkdown -c .pymarkdown.json scan **/*.md
```

Install the linter via the dev dependencies:

```bash
pip install -r dev-requirements.txt
python -m pip install -r dev-requirements.txt
```

## Building and Testing

Install the packages locally in editable mode:

```bash
pip install -e . -e ./durabletask-azuremanaged
python -m pip install -e . -e ./durabletask-azuremanaged
```

Run tests with pytest:

```bash
pytest
python -m pytest
```

## Project Structure

- `durabletask/` — core SDK source
- `payload/` — public payload externalization API (`PayloadStore` ABC,
`LargePayloadStorageOptions`, helper functions)
- `extensions/azure_blob_payloads/` — Azure Blob Storage payload store
(installed via `pip install durabletask[azure-blob-payloads]`)
- `entities/` — durable entity support
- `testing/` — in-memory backend for testing without a sidecar
- `internal/` — protobuf definitions, gRPC helpers, tracing (not public API)
- `durabletask-azuremanaged/` — Azure managed provider source
- `examples/` — example orchestrations (see `examples/README.md`)
- `tests/` — test suite
Expand Down
26 changes: 25 additions & 1 deletion .github/workflows/durabletask.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,40 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Set up Node.js (needed for Azurite)
uses: actions/setup-node@v4
with:
node-version: '20.x'

- name: Cache npm
uses: actions/cache@v3
with:
path: ~/.npm
key: ${{ runner.os }}-npm-azurite

- name: Install Azurite
run: npm install -g azurite

- name: Start Azurite
shell: bash
run: |
azurite-blob --silent --blobPort 10000 &
sleep 2

- name: Install durabletask dependencies and the library itself
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
pip install -r requirements.txt
pip install .
pip install ".[azure-blob-payloads]"
pip install aiohttp

- name: Pytest unit tests
working-directory: tests/durabletask
run: |
Expand Down
16 changes: 16 additions & 0 deletions .vscode/mcp.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"inputs": [
{
"id": "ado_org",
"type": "promptString",
"description": "msazure"
}
],
"servers": {
"ado": {
"type": "stdio",
"command": "npx",
"args": ["-y", "@azure-devops/mcp", "msazure"]
}
}
}
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

ADDED

- Added large payload externalization support for automatically
offloading oversized orchestration payloads to Azure Blob Storage.
Install with `pip install durabletask[azure-blob-payloads]`.
Pass a `BlobPayloadStore` to the worker and client via the
`payload_store` parameter.
- Added `durabletask.extensions.azure_blob_payloads` extension
package with `BlobPayloadStore` and `BlobPayloadStoreOptions`
- Added `PayloadStore` abstract base class in
`durabletask.payload` for custom storage backends
- Added `durabletask.testing` module with `InMemoryOrchestrationBackend` for testing orchestrations without a sidecar process
- Added `AsyncTaskHubGrpcClient` for asyncio-based applications using `grpc.aio`
- Added `DefaultAsyncClientInterceptorImpl` for async gRPC metadata interceptors
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@ This repo contains a Python SDK for use with the [Azure Durable Task Scheduler](
- [Development Guide](./docs/development.md)
- [Contributing Guide](./CONTRIBUTING.md)

## Optional Features

### Large Payload Externalization

Install the `azure-blob-payloads` extra to automatically offload
oversized orchestration payloads to Azure Blob Storage:

```bash
pip install durabletask[azure-blob-payloads]
```

See the [feature documentation](./docs/features.md#large-payload-externalization)
and the [example](./examples/large_payload/) for usage details.

## Trademarks
This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft
trademarks or logos is subject to and must follow
Expand Down
143 changes: 143 additions & 0 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,149 @@ Orchestrations can be suspended using the `suspend_orchestration` client API and

Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.

### Large payload externalization

Orchestration inputs, outputs, and event data are transmitted through
gRPC messages. When these payloads become very large they can exceed
gRPC message size limits or degrade performance. Large payload
externalization solves this by transparently offloading oversized
payloads to an external store (such as Azure Blob Storage) and
replacing them with compact reference tokens in the gRPC messages.

This feature is **opt-in** and requires installing an optional
dependency:

```bash
pip install durabletask[azure-blob-payloads]
```

#### How it works

1. When the worker or client sends a payload that exceeds the
configured threshold (default 900 KB), the payload is
compressed (GZip, enabled by default) and uploaded to the
external store.
2. The original payload in the gRPC message is replaced with a
compact token (e.g. `blob:v1:<container>:<blobName>`).
3. When the worker or client receives a message containing a token,
it downloads and decompresses the original payload automatically.

This process is fully transparent to orchestrator and activity code —
no changes are needed in your workflow logic.

#### Configuring the blob payload store

The built-in `BlobPayloadStore` uses Azure Blob Storage. Create a
store instance and pass it to both the worker and client:

```python
from durabletask.extensions.azure_blob_payloads import BlobPayloadStore, BlobPayloadStoreOptions

store = BlobPayloadStore(BlobPayloadStoreOptions(
connection_string="DefaultEndpointsProtocol=https;...",
container_name="durabletask-payloads", # default
threshold_bytes=900_000, # default (900 KB)
max_stored_payload_bytes=10_485_760, # default (10 MB)
enable_compression=True, # default
))
```

Then pass the store to the worker and client:

```python
with DurableTaskSchedulerWorker(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub_name,
token_credential=credential,
payload_store=store,
) as w:
# ... register orchestrators and activities ...
w.start()

c = DurableTaskSchedulerClient(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub_name,
token_credential=credential,
payload_store=store,
)
```

You can also authenticate using `account_url` and a
`TokenCredential` instead of a connection string:

```python
from azure.identity import DefaultAzureCredential

store = BlobPayloadStore(BlobPayloadStoreOptions(
account_url="https://<account>.blob.core.windows.net",
credential=DefaultAzureCredential(),
))
```

#### Configuration options

| Option | Default | Description |
|---|---|---|
| `threshold_bytes` | 900,000 (900 KB) | Payloads larger than this are externalized |
| `max_stored_payload_bytes` | 10,485,760 (10 MB) | Maximum size for externalized payloads |
| `enable_compression` | `True` | GZip-compress payloads before uploading |
| `container_name` | `"durabletask-payloads"` | Azure Blob container name |
| `connection_string` | `None` | Azure Storage connection string |
| `account_url` | `None` | Azure Storage account URL (use with `credential`) |
| `credential` | `None` | `TokenCredential` for token-based auth |

#### Cross-SDK compatibility

The blob token format (`blob:v1:<container>:<blobName>`) is
compatible with the .NET Durable Task SDK, enabling
interoperability between Python and .NET workers sharing the same
task hub and storage account. Note that message serilization strategies
may differ for complex objects and custom types.

#### Custom payload stores

You can implement a custom payload store by subclassing
`PayloadStore` from `durabletask.payload` and implementing
the `upload`, `upload_async`, `download`, `download_async`, and
`is_known_token` methods:

```python
from durabletask.payload import PayloadStore, LargePayloadStorageOptions


class MyPayloadStore(PayloadStore):

def __init__(self, options: LargePayloadStorageOptions):
self._options = options

@property
def options(self) -> LargePayloadStorageOptions:
return self._options

def upload(self, data: bytes) -> str:
# Store data and return a unique token string
...

async def upload_async(self, data: bytes) -> str:
...

Comment on lines +274 to +280
def download(self, token: str) -> bytes:
# Retrieve data by token
...

async def download_async(self, token: str) -> bytes:
...

def is_known_token(self, value: str) -> bool:
# Return True if the value looks like a token from this store
...
```

See the [large payload example](../examples/large_payload/) for a
complete working sample.

### Logging configuration

Both the TaskHubGrpcWorker and TaskHubGrpcClient (as well as DurableTaskSchedulerWorker and DurableTaskSchedulerClient for durabletask-azuremanaged) accept a log_handler and log_formatter object from `logging`. These can be used to customize verbosity, output location, and format of logs emitted by these sources.
Expand Down
Loading
Loading