Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 247 additions & 0 deletions references/go/external-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
# Go SDK External Storage

> [!NOTE]
> This feature is in Public Preview. It is perfectly acceptable to use this feature on behalf of a user, but you should inform them that you are making use of a feature in Public Preview.

## What this is

External Storage offloads Payloads to an external store (e.g. Amazon S3) and stores a small reference token in the Event History instead — the **claim check pattern**.

## When to use it

- A Workflow input, Activity input, Activity result, or Workflow result will exceed the **2 MB** per-payload limit (the limit is fixed at 2 MB on Temporal Cloud; configurable on self-hosted only).
- Long Event Histories degrade Workflow Task latency (e.g. AI agent conversations that grow per turn).
- The user wants payload data to live in storage **they** control. Set the size threshold to externalize all payloads.
- The user is migrating from self-hosted (with a larger configured limit) to Temporal Cloud.

## Where it sits in the pipeline

Order: **Payload Converter → Payload Codec → External Storage**. Storage runs last on outbound; it reverses on inbound.

Consequences:

- If a Payload Codec encrypts data, the bytes are already encrypted **before** upload to your store.
- The Temporal UI shows the reference token, not the data; the SDK transparently retrieves the payload before handing it to your Workflow or Client.

## Concurrency

The SDK uploads and downloads payloads **concurrently** within a single Workflow Task — multiple offloaded payloads in one Task are stored/retrieved in parallel, not sequentially. This is automatic; no configuration required.

## Setup with the built-in S3 driver

Install dependencies:

```bash
go get go.temporal.io/sdk/contrib/aws/s3driver \
go.temporal.io/sdk/contrib/aws/s3driver/awssdkv2 \
github.com/aws/aws-sdk-go-v2/config \
github.com/aws/aws-sdk-go-v2/service/s3
```

Create the driver and Client:

```go
import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/contrib/aws/s3driver"
"go.temporal.io/sdk/contrib/aws/s3driver/awssdkv2"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/worker"
)

cfg, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion("us-east-2"),
)
if err != nil {
log.Fatalf("load AWS config: %v", err)
}

driver, err := s3driver.NewDriver(s3driver.Options{
Client: awssdkv2.NewClient(s3.NewFromConfig(cfg)),
Bucket: s3driver.StaticBucket("my-temporal-payloads"),
})
if err != nil {
log.Fatalf("create S3 driver: %v", err)
}

c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
ExternalStorage: converter.ExternalStorage{
Drivers: []converter.StorageDriver{driver},
},
})
if err != nil {
log.Fatalf("connect to Temporal: %v", err)
}
defer c.Close()

w := worker.New(c, "my-task-queue", worker.Options{})
```

The S3 driver uses standard AWS credentials from the environment (env vars, IAM role, or AWS config file).

Workflows and Activities running on the Worker use the driver automatically — no changes to business logic.

## Payload size threshold

- Default: **256 KiB**.
- Set `PayloadSizeThreshold: 1` to externalize **all** payloads regardless of size.
- `PayloadSizeThreshold: 0` is **interpreted as the default (256 KiB)** — it does **not** mean "externalize everything".

```go
c, err := client.Dial(client.Options{
ExternalStorage: converter.ExternalStorage{
Drivers: []converter.StorageDriver{driver},
PayloadSizeThreshold: 1,
},
})
```

## Multiple drivers and migration

When you register more than one driver, you **must** supply a `DriverSelector` implementing `StorageDriverSelector`. The selector chooses which driver stores each payload. Unselected drivers remain available for **retrieval** — this is how you migrate between storage backends without losing access to existing claims.

- Return `nil` from the selector to keep a specific payload inline in Event History.

```go
type PreferredSelector struct {
preferred converter.StorageDriver
}

func (s *PreferredSelector) SelectDriver(
ctx converter.StorageDriverStoreContext,
payload *commonpb.Payload,
) (converter.StorageDriver, error) {
return s.preferred, nil
}

func MultipleDriversSetup(preferredDriver, legacyDriver converter.StorageDriver) converter.ExternalStorage {
return converter.ExternalStorage{
Drivers: []converter.StorageDriver{preferredDriver, legacyDriver},
DriverSelector: &PreferredSelector{preferred: preferredDriver},
}
}
```

## Custom storage driver

Implement `converter.StorageDriver` with **four** methods:

- `Name() string` — unique identifier for **this driver instance**, stored in the claim reference so the SDK can route retrieval. Renaming after payloads are stored **breaks retrieval**.
- `Type() string` — identifier for the driver **implementation**, same across all instances regardless of configuration (e.g. `"aws.s3driver"`, `"local-disk"`).
- `Store(ctx, payloads) ([]StorageDriverClaim, error)` — upload each Payload protobuf and return one claim per payload. A claim is a `map[string]string` the driver uses to locate the payload later.
- `Retrieve(ctx, claims) ([]*commonpb.Payload, error)` — download bytes using claim data and reconstruct each Payload.

Inside `Store()`, marshal each payload with `proto.Marshal(payload)`; in `Retrieve()`, reconstruct with `proto.Unmarshal(data, payload)`. The application data has already been serialized by the Payload Converter and Payload Codec before it reaches the driver.

`ctx.Target` provides identity information. Type-switch over `StorageDriverWorkflowInfo` and `StorageDriverActivityInfo` to access the namespace / Workflow ID / Activity ID. `StorageDriverActivityInfo` is only used for standalone (non-workflow-bound) Activities; Activities started by a Workflow get `StorageDriverWorkflowInfo`.

Worked example — local-disk driver (development/testing only):

```go
type LocalDiskStorageDriver struct {
storeDir string
}

func NewLocalDiskStorageDriver(storeDir string) converter.StorageDriver {
return &LocalDiskStorageDriver{storeDir: storeDir}
}

func (d *LocalDiskStorageDriver) Name() string { return "my-local-disk" }
func (d *LocalDiskStorageDriver) Type() string { return "local-disk" }

func (d *LocalDiskStorageDriver) Store(
ctx converter.StorageDriverStoreContext,
payloads []*commonpb.Payload,
) ([]converter.StorageDriverClaim, error) {
dir := d.storeDir
switch info := ctx.Target.(type) {
case converter.StorageDriverWorkflowInfo:
if info.WorkflowID != "" {
dir = filepath.Join(d.storeDir, info.Namespace, info.WorkflowID)
}
case converter.StorageDriverActivityInfo:
if info.ActivityID != "" {
dir = filepath.Join(d.storeDir, info.Namespace, info.ActivityID)
}
}
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("create store directory: %w", err)
}

claims := make([]converter.StorageDriverClaim, len(payloads))
for i, payload := range payloads {
key := uuid.NewString() + ".bin"
filePath := filepath.Join(dir, key)
data, err := proto.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("marshal payload: %w", err)
}
if err := os.WriteFile(filePath, data, 0o644); err != nil {
return nil, fmt.Errorf("write payload: %w", err)
}
claims[i] = converter.StorageDriverClaim{
ClaimData: map[string]string{"path": filePath},
}
}
return claims, nil
}

func (d *LocalDiskStorageDriver) Retrieve(
ctx converter.StorageDriverRetrieveContext,
claims []converter.StorageDriverClaim,
) ([]*commonpb.Payload, error) {
payloads := make([]*commonpb.Payload, len(claims))
for i, claim := range claims {
filePath := claim.ClaimData["path"]
data, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("read payload: %w", err)
}
payload := &commonpb.Payload{}
if err := proto.Unmarshal(data, payload); err != nil {
return nil, fmt.Errorf("unmarshal payload: %w", err)
}
payloads[i] = payload
}
return payloads, nil
}
```

You can package a custom driver as a [plugin](/develop/plugins-guide) for reuse across services.

## Codec Server with External Storage

When Workers and Clients use External Storage, Event History contains reference tokens — not payload data. For the Web UI and CLI to display decoded payloads, the Codec Server must download from external storage **and** decode through the Payload Codec in the correct order.

Build the Codec Server with `NewPayloadHTTPHandler` and `PayloadHTTPHandlerOptions`. Pass it your storage drivers, your pre-storage codecs (the Payload Codecs your Workers use), and any post-storage codecs (applied by a proxy after external storage).

When configured with storage drivers, the handler exposes:

- **`/download`** — retrieves payload data from external storage and decodes it through the Payload Codec. The Web UI calls this when a user clicks to view the full payload behind a reference.
- **`/decode`** — decodes encoded payloads and, by default, retrieves storage references inline. Pass `?preserveStorageRefs=true` to return storage references as-is without retrieval.
- **`/encode`** — applies the Payload Codec, then uploads payloads exceeding the threshold and replaces them with reference tokens.

**Don't use `NewPayloadHTTPHandler` as a remote Data Converter or remote codec target for your Workers** — it runs the full encode-store-encode and decode-retrieve-decode pipeline. For remote codecs use `NewPayloadCodecHTTPHandler` separately. If you need both, run both handlers, configured with the same codecs.

## Lifecycle management

Temporal does **not** auto-delete payloads from your store. Configure a TTL on your bucket:

```
TTL > Maximum Workflow Run Timeout + Namespace Retention Period
```

Example: Run Timeout 14 days + Namespace retention 30 days → set TTL to at least 44 days.

For Workflows with no finite Run Timeout, there is no safe finite TTL. Use Continue-as-New so the new run uploads fresh payloads and the old run's payloads only need to survive its retention period.

## Anti-patterns

- **Don't change `Name()` after payloads have been stored.** The name is embedded in the claim reference; renaming breaks retrieval of existing claims.
- **Don't use `PayloadSizeThreshold: 0` to mean "externalize all".** `0` is interpreted as the default (256 KiB). Use `PayloadSizeThreshold: 1`.
- **Don't register multiple drivers without a `DriverSelector`.** The selector is required when there are multiple drivers.
- **Don't point a Worker's remote codec at `NewPayloadHTTPHandler`.** Use `NewPayloadCodecHTTPHandler` for remote codec endpoints.
- **Don't omit a TTL on the bucket.** Payloads are orphaned otherwise; orphaned objects can also remain if a request fails after upload.
1 change: 1 addition & 0 deletions references/go/go.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,5 +250,6 @@ See `references/go/testing.md` for info on writing tests.
- **`references/go/testing.md`** - TestWorkflowEnvironment, time-skipping, activity mocking
- **`references/go/advanced-features.md`** - Schedules, worker tuning, and more
- **`references/go/data-handling.md`** - Data converters, payload codecs, encryption
- **`references/go/external-storage.md`** - Claim-check pattern for large payloads (S3 driver, custom drivers, codec-server handling)
- **`references/go/versioning.md`** - Patching API (`workflow.GetVersion`), Worker Versioning
- **`references/go/determinism-protection.md`** - Information on **`workflowcheck`** tool to help statically check for determinism issues.
1 change: 0 additions & 1 deletion references/java/integrations/spring-ai.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ Media image = new Media(MimeTypeUtils.IMAGE_PNG, URI.create("https://cdn.example

For anything larger than a small thumbnail, route the bytes to a binary store from an Activity and pass only the URL across the conversation.


## Vector stores, embeddings, and MCP

When the corresponding Spring AI modules (`spring-ai-rag`, `spring-ai-mcp`) are on the classpath, the integration registers Activities for vector stores, embeddings, and MCP tool calls automatically. Inject the matching Spring AI types into your Activities or Workflows and use them as you would in any Spring AI application — each operation executes through a Temporal Activity.
Expand Down
Loading