From 169d340729666d114e3df02e65e04d59f0480377 Mon Sep 17 00:00:00 2001 From: "skill-sync[bot]" Date: Thu, 14 May 2026 18:52:27 +0000 Subject: [PATCH] Finalize draft for 0011-external-storage --- references/go/external-storage.md | 247 ++++++++++++++++++++++ references/go/go.md | 1 + references/java/integrations/spring-ai.md | 1 - references/python/external-storage.md | 228 ++++++++++++++++++++ references/python/python.md | 1 + 5 files changed, 477 insertions(+), 1 deletion(-) create mode 100644 references/go/external-storage.md create mode 100644 references/python/external-storage.md diff --git a/references/go/external-storage.md b/references/go/external-storage.md new file mode 100644 index 0000000..2d76036 --- /dev/null +++ b/references/go/external-storage.md @@ -0,0 +1,247 @@ +# Go SDK External Storage + +> [!NOTE] +> This feature is in Public Preview. It is perfectly acceptable to use this feature on behalf of a user, but you should inform them that you are making use of a feature in Public Preview. + +## What this is + +External Storage offloads Payloads to an external store (e.g. Amazon S3) and stores a small reference token in the Event History instead — the **claim check pattern**. + +## When to use it + +- A Workflow input, Activity input, Activity result, or Workflow result will exceed the **2 MB** per-payload limit (the limit is fixed at 2 MB on Temporal Cloud; configurable on self-hosted only). +- Long Event Histories degrade Workflow Task latency (e.g. AI agent conversations that grow per turn). +- The user wants payload data to live in storage **they** control. Set the size threshold to externalize all payloads. +- The user is migrating from self-hosted (with a larger configured limit) to Temporal Cloud. + +## Where it sits in the pipeline + +Order: **Payload Converter → Payload Codec → External Storage**. Storage runs last on outbound; it reverses on inbound. + +Consequences: + +- If a Payload Codec encrypts data, the bytes are already encrypted **before** upload to your store. +- The Temporal UI shows the reference token, not the data; the SDK transparently retrieves the payload before handing it to your Workflow or Client. + +## Concurrency + +The SDK uploads and downloads payloads **concurrently** within a single Workflow Task — multiple offloaded payloads in one Task are stored/retrieved in parallel, not sequentially. This is automatic; no configuration required. + +## Setup with the built-in S3 driver + +Install dependencies: + +```bash +go get go.temporal.io/sdk/contrib/aws/s3driver \ + go.temporal.io/sdk/contrib/aws/s3driver/awssdkv2 \ + github.com/aws/aws-sdk-go-v2/config \ + github.com/aws/aws-sdk-go-v2/service/s3 +``` + +Create the driver and Client: + +```go +import ( + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/contrib/aws/s3driver" + "go.temporal.io/sdk/contrib/aws/s3driver/awssdkv2" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/worker" +) + +cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion("us-east-2"), +) +if err != nil { + log.Fatalf("load AWS config: %v", err) +} + +driver, err := s3driver.NewDriver(s3driver.Options{ + Client: awssdkv2.NewClient(s3.NewFromConfig(cfg)), + Bucket: s3driver.StaticBucket("my-temporal-payloads"), +}) +if err != nil { + log.Fatalf("create S3 driver: %v", err) +} + +c, err := client.Dial(client.Options{ + HostPort: "localhost:7233", + ExternalStorage: converter.ExternalStorage{ + Drivers: []converter.StorageDriver{driver}, + }, +}) +if err != nil { + log.Fatalf("connect to Temporal: %v", err) +} +defer c.Close() + +w := worker.New(c, "my-task-queue", worker.Options{}) +``` + +The S3 driver uses standard AWS credentials from the environment (env vars, IAM role, or AWS config file). + +Workflows and Activities running on the Worker use the driver automatically — no changes to business logic. + +## Payload size threshold + +- Default: **256 KiB**. +- Set `PayloadSizeThreshold: 1` to externalize **all** payloads regardless of size. +- `PayloadSizeThreshold: 0` is **interpreted as the default (256 KiB)** — it does **not** mean "externalize everything". + +```go +c, err := client.Dial(client.Options{ + ExternalStorage: converter.ExternalStorage{ + Drivers: []converter.StorageDriver{driver}, + PayloadSizeThreshold: 1, + }, +}) +``` + +## Multiple drivers and migration + +When you register more than one driver, you **must** supply a `DriverSelector` implementing `StorageDriverSelector`. The selector chooses which driver stores each payload. Unselected drivers remain available for **retrieval** — this is how you migrate between storage backends without losing access to existing claims. + +- Return `nil` from the selector to keep a specific payload inline in Event History. + +```go +type PreferredSelector struct { + preferred converter.StorageDriver +} + +func (s *PreferredSelector) SelectDriver( + ctx converter.StorageDriverStoreContext, + payload *commonpb.Payload, +) (converter.StorageDriver, error) { + return s.preferred, nil +} + +func MultipleDriversSetup(preferredDriver, legacyDriver converter.StorageDriver) converter.ExternalStorage { + return converter.ExternalStorage{ + Drivers: []converter.StorageDriver{preferredDriver, legacyDriver}, + DriverSelector: &PreferredSelector{preferred: preferredDriver}, + } +} +``` + +## Custom storage driver + +Implement `converter.StorageDriver` with **four** methods: + +- `Name() string` — unique identifier for **this driver instance**, stored in the claim reference so the SDK can route retrieval. Renaming after payloads are stored **breaks retrieval**. +- `Type() string` — identifier for the driver **implementation**, same across all instances regardless of configuration (e.g. `"aws.s3driver"`, `"local-disk"`). +- `Store(ctx, payloads) ([]StorageDriverClaim, error)` — upload each Payload protobuf and return one claim per payload. A claim is a `map[string]string` the driver uses to locate the payload later. +- `Retrieve(ctx, claims) ([]*commonpb.Payload, error)` — download bytes using claim data and reconstruct each Payload. + +Inside `Store()`, marshal each payload with `proto.Marshal(payload)`; in `Retrieve()`, reconstruct with `proto.Unmarshal(data, payload)`. The application data has already been serialized by the Payload Converter and Payload Codec before it reaches the driver. + +`ctx.Target` provides identity information. Type-switch over `StorageDriverWorkflowInfo` and `StorageDriverActivityInfo` to access the namespace / Workflow ID / Activity ID. `StorageDriverActivityInfo` is only used for standalone (non-workflow-bound) Activities; Activities started by a Workflow get `StorageDriverWorkflowInfo`. + +Worked example — local-disk driver (development/testing only): + +```go +type LocalDiskStorageDriver struct { + storeDir string +} + +func NewLocalDiskStorageDriver(storeDir string) converter.StorageDriver { + return &LocalDiskStorageDriver{storeDir: storeDir} +} + +func (d *LocalDiskStorageDriver) Name() string { return "my-local-disk" } +func (d *LocalDiskStorageDriver) Type() string { return "local-disk" } + +func (d *LocalDiskStorageDriver) Store( + ctx converter.StorageDriverStoreContext, + payloads []*commonpb.Payload, +) ([]converter.StorageDriverClaim, error) { + dir := d.storeDir + switch info := ctx.Target.(type) { + case converter.StorageDriverWorkflowInfo: + if info.WorkflowID != "" { + dir = filepath.Join(d.storeDir, info.Namespace, info.WorkflowID) + } + case converter.StorageDriverActivityInfo: + if info.ActivityID != "" { + dir = filepath.Join(d.storeDir, info.Namespace, info.ActivityID) + } + } + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("create store directory: %w", err) + } + + claims := make([]converter.StorageDriverClaim, len(payloads)) + for i, payload := range payloads { + key := uuid.NewString() + ".bin" + filePath := filepath.Join(dir, key) + data, err := proto.Marshal(payload) + if err != nil { + return nil, fmt.Errorf("marshal payload: %w", err) + } + if err := os.WriteFile(filePath, data, 0o644); err != nil { + return nil, fmt.Errorf("write payload: %w", err) + } + claims[i] = converter.StorageDriverClaim{ + ClaimData: map[string]string{"path": filePath}, + } + } + return claims, nil +} + +func (d *LocalDiskStorageDriver) Retrieve( + ctx converter.StorageDriverRetrieveContext, + claims []converter.StorageDriverClaim, +) ([]*commonpb.Payload, error) { + payloads := make([]*commonpb.Payload, len(claims)) + for i, claim := range claims { + filePath := claim.ClaimData["path"] + data, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("read payload: %w", err) + } + payload := &commonpb.Payload{} + if err := proto.Unmarshal(data, payload); err != nil { + return nil, fmt.Errorf("unmarshal payload: %w", err) + } + payloads[i] = payload + } + return payloads, nil +} +``` + +You can package a custom driver as a [plugin](/develop/plugins-guide) for reuse across services. + +## Codec Server with External Storage + +When Workers and Clients use External Storage, Event History contains reference tokens — not payload data. For the Web UI and CLI to display decoded payloads, the Codec Server must download from external storage **and** decode through the Payload Codec in the correct order. + +Build the Codec Server with `NewPayloadHTTPHandler` and `PayloadHTTPHandlerOptions`. Pass it your storage drivers, your pre-storage codecs (the Payload Codecs your Workers use), and any post-storage codecs (applied by a proxy after external storage). + +When configured with storage drivers, the handler exposes: + +- **`/download`** — retrieves payload data from external storage and decodes it through the Payload Codec. The Web UI calls this when a user clicks to view the full payload behind a reference. +- **`/decode`** — decodes encoded payloads and, by default, retrieves storage references inline. Pass `?preserveStorageRefs=true` to return storage references as-is without retrieval. +- **`/encode`** — applies the Payload Codec, then uploads payloads exceeding the threshold and replaces them with reference tokens. + +**Don't use `NewPayloadHTTPHandler` as a remote Data Converter or remote codec target for your Workers** — it runs the full encode-store-encode and decode-retrieve-decode pipeline. For remote codecs use `NewPayloadCodecHTTPHandler` separately. If you need both, run both handlers, configured with the same codecs. + +## Lifecycle management + +Temporal does **not** auto-delete payloads from your store. Configure a TTL on your bucket: + +``` +TTL > Maximum Workflow Run Timeout + Namespace Retention Period +``` + +Example: Run Timeout 14 days + Namespace retention 30 days → set TTL to at least 44 days. + +For Workflows with no finite Run Timeout, there is no safe finite TTL. Use Continue-as-New so the new run uploads fresh payloads and the old run's payloads only need to survive its retention period. + +## Anti-patterns + +- **Don't change `Name()` after payloads have been stored.** The name is embedded in the claim reference; renaming breaks retrieval of existing claims. +- **Don't use `PayloadSizeThreshold: 0` to mean "externalize all".** `0` is interpreted as the default (256 KiB). Use `PayloadSizeThreshold: 1`. +- **Don't register multiple drivers without a `DriverSelector`.** The selector is required when there are multiple drivers. +- **Don't point a Worker's remote codec at `NewPayloadHTTPHandler`.** Use `NewPayloadCodecHTTPHandler` for remote codec endpoints. +- **Don't omit a TTL on the bucket.** Payloads are orphaned otherwise; orphaned objects can also remain if a request fails after upload. diff --git a/references/go/go.md b/references/go/go.md index 6c42bed..903e992 100644 --- a/references/go/go.md +++ b/references/go/go.md @@ -250,5 +250,6 @@ See `references/go/testing.md` for info on writing tests. - **`references/go/testing.md`** - TestWorkflowEnvironment, time-skipping, activity mocking - **`references/go/advanced-features.md`** - Schedules, worker tuning, and more - **`references/go/data-handling.md`** - Data converters, payload codecs, encryption +- **`references/go/external-storage.md`** - Claim-check pattern for large payloads (S3 driver, custom drivers, codec-server handling) - **`references/go/versioning.md`** - Patching API (`workflow.GetVersion`), Worker Versioning - **`references/go/determinism-protection.md`** - Information on **`workflowcheck`** tool to help statically check for determinism issues. diff --git a/references/java/integrations/spring-ai.md b/references/java/integrations/spring-ai.md index 5ee0704..ae5154f 100644 --- a/references/java/integrations/spring-ai.md +++ b/references/java/integrations/spring-ai.md @@ -217,7 +217,6 @@ Media image = new Media(MimeTypeUtils.IMAGE_PNG, URI.create("https://cdn.example For anything larger than a small thumbnail, route the bytes to a binary store from an Activity and pass only the URL across the conversation. - ## Vector stores, embeddings, and MCP When the corresponding Spring AI modules (`spring-ai-rag`, `spring-ai-mcp`) are on the classpath, the integration registers Activities for vector stores, embeddings, and MCP tool calls automatically. Inject the matching Spring AI types into your Activities or Workflows and use them as you would in any Spring AI application — each operation executes through a Temporal Activity. diff --git a/references/python/external-storage.md b/references/python/external-storage.md new file mode 100644 index 0000000..0a3e1b5 --- /dev/null +++ b/references/python/external-storage.md @@ -0,0 +1,228 @@ +# Python SDK External Storage + +> [!NOTE] +> This feature is in Public Preview. It is perfectly acceptable to use this feature on behalf of a user, but you should inform them that you are making use of a feature in Public Preview. + +## What this is + +External Storage offloads Payloads to an external store (e.g. Amazon S3) and stores a small reference token in the Event History instead — the **claim check pattern**. + +## When to use it + +- A Workflow input, Activity input, Activity result, or Workflow result will exceed the **2 MB** per-payload limit (fixed at 2 MB on Temporal Cloud; configurable on self-hosted only). +- Long Event Histories degrade Workflow Task latency (e.g. AI agent conversations growing per turn). +- The user wants payload data to live in storage **they** control. Set the size threshold to externalize all payloads. +- The user is migrating from self-hosted (with a larger configured limit) to Temporal Cloud. + +## Where it sits in the pipeline + +Order: **Payload Converter → Payload Codec → External Storage**. Storage runs last on outbound; it reverses on inbound. + +Consequences: + +- If a Payload Codec encrypts data, the bytes are already encrypted **before** upload. +- The Temporal UI displays the reference token, not the data; the SDK retrieves the payload transparently before handing it to your Workflow or Client. + +## Concurrency + +The SDK uploads and downloads payloads **concurrently** within a single Workflow Task — multiple offloaded payloads in one Task are stored or retrieved in parallel, not sequentially. This is automatic. + +## Setup with the built-in S3 driver + +Install the `aioboto3` extra: + +```bash +python -m pip install "temporalio[aioboto3]" +``` + +Create the driver, attach it to a `DataConverter`, and pass the converter to both Client and Worker: + +```python +import aioboto3 +import dataclasses +from temporalio.client import Client, ClientConfig +from temporalio.contrib.aioboto3 import new_aioboto3_client +from temporalio.converter import DataConverter +from temporalio.external_storage import ExternalStorage, S3StorageDriver +from temporalio.worker import Worker + +session = aioboto3.Session(profile_name=AWS_PROFILE, region_name=AWS_REGION) +async with session.client("s3") as s3_client: + driver = S3StorageDriver( + client=new_aioboto3_client(s3_client), + bucket="my-temporal-payloads", + ) + + data_converter = dataclasses.replace( + DataConverter.default, + external_storage=ExternalStorage(drivers=[driver]), + ) + + client_config = ClientConfig.load_client_connect_config() + client = await Client.connect(**client_config, data_converter=data_converter) + + worker = Worker( + client, + task_queue="my-task-queue", + workflows=[], + activities=[], + ) +``` + +The S3 driver uses standard AWS credentials from the environment (env vars, IAM role, or AWS config file). + +Workflows and Activities on the Worker use the driver automatically — no business-logic changes. + +## Payload size threshold + +- Default: **256 KiB**. +- Set `payload_size_threshold=0` to externalize **all** payloads regardless of size. + +```python +data_converter = dataclasses.replace( + DataConverter.default, + external_storage=ExternalStorage( + drivers=[driver], + payload_size_threshold=0, + ), +) +``` + +## Multiple drivers and migration + +When you register more than one driver, you **must** supply a `driver_selector` function. The selector chooses which driver stores each payload. Unselected drivers remain available for **retrieval** — this is how you migrate between storage backends without losing access to existing claims. + +- Return `None` from the selector to keep a specific payload inline in Event History. + +```python +preferred_driver = S3StorageDriver(client=s3_client, bucket="my-bucket") +legacy_driver = LegacyStorageDriver() + +ExternalStorage( + drivers=[preferred_driver, legacy_driver], + driver_selector=lambda context, payload: preferred_driver, +) +``` + +## Custom storage driver + +Extend `StorageDriver` and implement **three** methods: + +- `name() -> str` — unique identifier for the driver, stored in the claim reference so the SDK can route retrieval. Renaming after payloads are stored **breaks retrieval**. +- `async store(context, payloads) -> list[StorageDriverClaim]` — upload each Payload and return one claim per payload. A claim is a `dict[str, str]` the driver uses to locate the payload later. +- `async retrieve(context, claims) -> list[Payload]` — download bytes using claim data and reconstruct each Payload. + +Inside `store()`, serialize each payload with `payload.SerializeToString()`; in `retrieve()`, reconstruct with `payload.ParseFromString(data)`. The application data has already been serialized by the Payload Converter and Payload Codec before reaching the driver. + +`context.target` provides identity information (namespace, Workflow ID, or Activity ID). Check the target type with `isinstance(target, StorageDriverWorkflowInfo)`; the Workflow info exposes `target.namespace` and `target.id`. Use this to scope storage keys per Workflow. + +Worked example — local-disk driver (development/testing only): + +```python +import os +import uuid +from typing import Sequence + +from temporalio.api.common.v1 import Payload +from temporalio.external_storage import ( + StorageDriver, + StorageDriverClaim, + StorageDriverRetrieveContext, + StorageDriverStoreContext, + StorageDriverWorkflowInfo, +) + + +class LocalDiskStorageDriver(StorageDriver): + def __init__(self, store_dir: str = "/tmp/temporal-payload-store") -> None: + self._store_dir = store_dir + + def name(self) -> str: + return "local-disk" + + async def store( + self, + context: StorageDriverStoreContext, + payloads: Sequence[Payload], + ) -> list[StorageDriverClaim]: + os.makedirs(self._store_dir, exist_ok=True) + + prefix = self._store_dir + target = context.target + if isinstance(target, StorageDriverWorkflowInfo) and target.id: + prefix = os.path.join(self._store_dir, target.namespace, target.id) + os.makedirs(prefix, exist_ok=True) + + claims = [] + for payload in payloads: + key = f"{uuid.uuid4()}.bin" + file_path = os.path.join(prefix, key) + with open(file_path, "wb") as f: + f.write(payload.SerializeToString()) + claims.append(StorageDriverClaim(claim_data={"path": file_path})) + return claims + + async def retrieve( + self, + context: StorageDriverRetrieveContext, + claims: Sequence[StorageDriverClaim], + ) -> list[Payload]: + payloads = [] + for claim in claims: + file_path = claim.claim_data["path"] + with open(file_path, "rb") as f: + raw = f.read() + payload = Payload() + payload.ParseFromString(raw) + payloads.append(payload) + return payloads +``` + +Wire the custom driver into the Data Converter the same way as the S3 driver: + +```python +data_converter = dataclasses.replace( + DataConverter.default, + external_storage=ExternalStorage( + drivers=[LocalDiskStorageDriver()], + ), +) +``` + +You can package a custom driver as a [plugin](/develop/plugins-guide) for reuse across services. + +## Codec Server with External Storage + +When Workers and Clients use External Storage, Event History contains reference tokens — not payload data. For the Web UI and CLI to show decoded payloads, the Codec Server must download from external storage **and** decode through the Payload Codec in the correct order. + +Build the Codec Server with a payload HTTP handler that accepts your storage drivers, your pre-storage codecs (the Payload Codecs your Workers use), and any post-storage codecs (applied by a proxy after external storage). The handler applies them in the correct order across all endpoints. + +Endpoints exposed when storage drivers are configured: + +- **`/download`** — retrieves payload data from external storage and decodes it through the Payload Codec. The Web UI calls this when a user clicks to view the full payload behind a reference. +- **`/decode`** — decodes encoded payloads and, by default, retrieves storage references inline. Pass `?preserveStorageRefs=true` to return storage references as-is without retrieval. +- **`/encode`** — applies the Payload Codec, then uploads payloads exceeding the threshold and replaces them with reference tokens. + +**Don't point a Worker's remote codec at the storage-aware handler** — it runs the full encode-store-encode and decode-retrieve-decode pipeline. Run a separate non-storage codec HTTP handler for remote codecs, configured with the same codecs. + +The [Python Codec Server sample](https://github.com/temporalio/samples-python/blob/main/encryption/codec_server.py) demonstrates a Codec Server implementation. + +## Lifecycle management + +Temporal does **not** auto-delete payloads from your store. Configure a TTL on your bucket: + +``` +TTL > Maximum Workflow Run Timeout + Namespace Retention Period +``` + +Example: Run Timeout 14 days + Namespace retention 30 days → set TTL to at least 44 days. + +For Workflows with no finite Run Timeout, there is no safe finite TTL. Use Continue-as-New so the new run uploads fresh payloads and the old run's payloads only need to survive its retention period. + +## Anti-patterns + +- **Don't change the value returned by `name()` after payloads have been stored.** The name is embedded in the claim reference; renaming breaks retrieval of existing claims. +- **Don't use `payload_size_threshold=1` to mean "externalize all"** — use `payload_size_threshold=0`. (This sentinel differs from Go, where `0` is the default and `1` externalizes all.) +- **Don't register multiple drivers without a `driver_selector`.** The selector is required when there is more than one driver. +- **Don't pass the storage-aware payload HTTP handler as a Worker's remote codec target.** Use a separate non-storage codec HTTP handler for that role. +- **Don't omit a TTL on the bucket.** Payloads can be orphaned if a request fails after upload. diff --git a/references/python/python.md b/references/python/python.md index 5493387..0561bd2 100644 --- a/references/python/python.md +++ b/references/python/python.md @@ -179,6 +179,7 @@ See `references/python/testing.md` for info on writing tests. - **`references/python/sync-vs-async.md`** - Sync vs async activities, event loop blocking, executor configuration - **`references/python/advanced-features.md`** - Schedules, worker tuning, and more - **`references/python/data-handling.md`** - Data converters, Pydantic, payload encryption +- **`references/python/external-storage.md`** - Claim-check pattern for large payloads (S3 driver, custom drivers, codec-server handling) - **`references/python/versioning.md`** - Patching API, workflow type versioning, Worker Versioning - **`references/python/determinism-protection.md`** - Python sandbox specifics, forbidden operations, pass-through imports - **`references/python/ai-patterns.md`** - LLM integration, Pydantic data converter, AI workflow patterns