Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@

* (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)).
* (Python) Added a pipeline option `--experiments=pip_no_build_isolation` to disable build isolation when installing dependencies in the runtime environment ([#37331](https://github.com/apache/beam/issues/37331)).
* (Go) Added OrderedListState support to the Go SDK stateful DoFn API ([#37629](https://github.com/apache/beam/pull/37629)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes
Expand Down
93 changes: 93 additions & 0 deletions sdks/go/examples/ordered_list_state/ordered_list_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// ordered_list_state is a toy pipeline demonstrating the use of OrderedListState.
// It creates keyed elements with timestamps, stores them in ordered list state,
// and reads back sub-ranges to emit summaries per key.
package main

import (
"context"
"flag"
"fmt"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
)

// eventLogFn accumulates timestamped events per key using OrderedListState
// and emits a summary of events seen so far.
type eventLogFn struct {
Events state.OrderedList[string]
}

func (fn *eventLogFn) ProcessElement(p state.Provider, key string, ts int64, emit func(string)) error {
// Store an event using the input value as the sort key.
event := fmt.Sprintf("event@%d", ts)
fn.Events.Add(p, ts, event)

// Read all events accumulated so far for this key.
entries, ok, err := fn.Events.Read(p)
if err != nil {
return err
}
if ok {
latest := entries[len(entries)-1]
emit(fmt.Sprintf("key=%s count=%d latest=%s (sort_key=%d)", key, len(entries), latest.Value, latest.SortKey))
}

return nil
}

func init() {
register.DoFn4x1[state.Provider, string, int64, func(string), error](&eventLogFn{})
register.Emitter1[string]()
register.Function1x2(toKeyed)
}

// toKeyed maps an integer to a KV pair of (key, timestamp).
func toKeyed(i int) (string, int64) {
return fmt.Sprintf("user-%d", i%3), int64(i * 1000)
}

func main() {
flag.Parse()
beam.Init()

ctx := context.Background()

p, s := beam.NewPipelineWithRoot()

// Create a small set of input elements.
impulse := beam.CreateList(s, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})

// Key and timestamp each element.
keyed := beam.ParDo(s, toKeyed, impulse)

// Apply the stateful DoFn with OrderedListState.
summaries := beam.ParDo(s, &eventLogFn{
Events: state.MakeOrderedListState[string]("events"),
}, keyed)

debug.Print(s, summaries)

if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
32 changes: 32 additions & 0 deletions sdks/go/examples/snippets/04transforms.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,38 @@ func combineState(s beam.Scope, input beam.PCollection) beam.PCollection {
return combined
}

// [START ordered_list_state]

// orderedListStateFn tracks timestamped events per key and reads a sub-range.
type orderedListStateFn struct {
Events state.OrderedList[string]
}

func (s *orderedListStateFn) ProcessElement(p state.Provider, key string, event string, emit func(string)) error {
// Add the event with the current timestamp as the sort key.
now := time.Now().UnixMilli()
s.Events.Add(p, now, event)

// Read a sub-range of events (e.g. the last hour).
oneHourAgo := now - 3600000
entries, ok, err := s.Events.ReadRange(p, oneHourAgo, now+1)
if err != nil {
return err
}
if ok {
for _, e := range entries {
emit(fmt.Sprintf("%s@%d", e.Value, e.SortKey))
}
}

// Clear events older than one hour.
s.Events.ClearRange(p, 0, oneHourAgo)

return nil
}

// [END ordered_list_state]

// [START event_time_timer]

type eventTimerDoFn struct {
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/graph/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1368,10 +1368,10 @@ func validateState(fn *DoFn, numIn mainInputs) error {
"unique per DoFn", k, orig, s)
}
t := s.StateType()
if t != state.TypeValue && t != state.TypeBag && t != state.TypeCombining && t != state.TypeSet && t != state.TypeMap {
if t != state.TypeValue && t != state.TypeBag && t != state.TypeCombining && t != state.TypeSet && t != state.TypeMap && t != state.TypeOrderedList {
err := errors.Errorf("Unrecognized state type %v for state %v", t, s)
return errors.SetTopLevelMsgf(err, "Unrecognized state type %v for state %v. Currently the only supported state"+
"types are state.Value, state.Combining, state.Bag, state.Set, and state.Map", t, s)
"types are state.Value, state.Combining, state.Bag, state.Set, state.Map, and state.OrderedList", t, s)
}
stateKeys[k] = s
}
Expand Down
6 changes: 6 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ type StateReader interface {
OpenMultimapKeysUserStateReader(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.ReadCloser, error)
// OpenMultimapKeysUserStateClearer opens a byte stream for clearing all keys of user multimap state.
OpenMultimapKeysUserStateClearer(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)
// OpenOrderedListUserStateReader opens a byte stream for reading user ordered list state in the range [start, end).
OpenOrderedListUserStateReader(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, start, end int64) (io.ReadCloser, error)
// OpenOrderedListUserStateAppender opens a byte stream for appending user ordered list state.
OpenOrderedListUserStateAppender(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)
// OpenOrderedListUserStateClearer opens a byte stream for clearing user ordered list state in the range [start, end).
OpenOrderedListUserStateClearer(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, start, end int64) (io.Writer, error)
// GetSideInputCache returns the SideInputCache being used at the harness level.
GetSideInputCache() SideCache
}
Expand Down
15 changes: 15 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/sideinput_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,21 @@ func (t *testStateReader) OpenMultimapKeysUserStateClearer(ctx context.Context,
return nil, nil
}

// OpenOrderedListUserStateReader for the testStateReader is a no-op.
func (t *testStateReader) OpenOrderedListUserStateReader(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, start, end int64) (io.ReadCloser, error) {
return nil, nil
}

// OpenOrderedListUserStateAppender for the testStateReader is a no-op.
func (t *testStateReader) OpenOrderedListUserStateAppender(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.Writer, error) {
return nil, nil
}

// OpenOrderedListUserStateClearer for the testStateReader is a no-op.
func (t *testStateReader) OpenOrderedListUserStateClearer(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, start, end int64) (io.Writer, error) {
return nil, nil
}

func (t *testStateReader) GetSideInputCache() SideCache {
return &testSideCache{}
}
Expand Down
2 changes: 2 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,8 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
kcID = ms.KeyCoderId
} else if ss := spec.GetSetSpec(); ss != nil {
kcID = ss.ElementCoderId
} else if ols := spec.GetOrderedListSpec(); ols != nil {
cID = ols.ElementCoderId
} else {
return nil, errors.Errorf("Unrecognized state type %v", spec)
}
Expand Down
150 changes: 150 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/userstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"context"
"fmt"
"io"
"math"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"google.golang.org/protobuf/encoding/protowire"
)

type stateProvider struct {
Expand All @@ -41,6 +43,7 @@ type stateProvider struct {
blindBagWriteCountsByKey map[string]int // Tracks blind writes to bags before a read.
initialMapValuesByKey map[string]map[string]any
initialMapKeysByKey map[string][]any
initialOrderedListByKey map[string][]any
readersByKey map[string]io.ReadCloser
appendersByKey map[string]io.Writer
clearersByKey map[string]io.Writer
Expand Down Expand Up @@ -466,6 +469,152 @@ func (s *stateProvider) getMultiMapKeyReader(userStateID string) (io.ReadCloser,
return s.readersByKey[userStateID], nil
}

// ReadOrderedListState reads an ordered list state from the State API.
// It fetches the full range on first access and caches the result.
func (s *stateProvider) ReadOrderedListState(userStateID string) ([]any, []state.Transaction, error) {
initialValue, ok := s.initialOrderedListByKey[userStateID]
if !ok {
initialValue = []any{}
rw, err := s.getOrderedListReader(userStateID, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, err
}
for {
entry, err := decodeOrderedListEntry(rw, s.codersByKey[userStateID])
if err == io.EOF {
break
}
if err != nil {
return nil, nil, err
}
initialValue = append(initialValue, entry)
}
s.initialOrderedListByKey[userStateID] = initialValue
}

transactions, ok := s.transactionsByKey[userStateID]
if !ok {
transactions = []state.Transaction{}
}

return initialValue, transactions, nil
}

// WriteOrderedListState writes a single entry to the ordered list state.
// The wire format is: varint(sortKey) || coder_encoded(value).
func (s *stateProvider) WriteOrderedListState(val state.Transaction) error {
ap, err := s.getOrderedListAppender(val.Key)
if err != nil {
return err
}

sortKey := val.MapKey.(int64)
if err := encodeOrderedListEntry(sortKey, val.Val, ap, s.codersByKey[val.Key]); err != nil {
return err
}

if transactions, ok := s.transactionsByKey[val.Key]; ok {
s.transactionsByKey[val.Key] = append(transactions, val)
} else {
s.transactionsByKey[val.Key] = []state.Transaction{val}
}

return nil
}

// ClearOrderedListState clears entries in a range from the ordered list state.
func (s *stateProvider) ClearOrderedListState(val state.Transaction) error {
r := val.MapKey.([2]int64)
cl, err := s.getOrderedListClearer(val.Key, r[0], r[1])
if err != nil {
return err
}
_, err = cl.Write([]byte{})
if err != nil {
return err
}

if transactions, ok := s.transactionsByKey[val.Key]; ok {
s.transactionsByKey[val.Key] = append(transactions, val)
} else {
s.transactionsByKey[val.Key] = []state.Transaction{val}
}

return nil
}

func (s *stateProvider) getOrderedListReader(userStateID string, start, end int64) (io.ReadCloser, error) {
r, err := s.sr.OpenOrderedListUserStateReader(s.ctx, s.SID, userStateID, s.elementKey, s.window, start, end)
if err != nil {
return nil, err
}
return r, nil
}

func (s *stateProvider) getOrderedListAppender(userStateID string) (io.Writer, error) {
w, err := s.sr.OpenOrderedListUserStateAppender(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
return w, nil
}

func (s *stateProvider) getOrderedListClearer(userStateID string, start, end int64) (io.Writer, error) {
w, err := s.sr.OpenOrderedListUserStateClearer(s.ctx, s.SID, userStateID, s.elementKey, s.window, start, end)
if err != nil {
return nil, err
}
return w, nil
}

// encodeOrderedListEntry writes varint(uint64(sortKey)) || coder_encoded(value) to w.
// The entire entry is buffered before writing so that each w.Write call
// delivers a complete entry (important when w is a stateKeyWriter that
// sends each Write as a separate gRPC Append request).
func encodeOrderedListEntry(sortKey int64, val any, w io.Writer, c *coder.Coder) error {
var buf bytes.Buffer
b := protowire.AppendVarint(nil, uint64(sortKey))
buf.Write(b)
fv := FullValue{Elm: val}
enc := MakeElementEncoder(coder.SkipW(c))
if err := enc.Encode(&fv, &buf); err != nil {
return err
}
_, err := w.Write(buf.Bytes())
return err
}

// decodeOrderedListEntry reads varint(sortKey) || coder_encoded(value) from r.
func decodeOrderedListEntry(r io.Reader, c *coder.Coder) (state.OrderedListEntry, error) {
// Read varint byte-by-byte.
var buf [10]byte // max varint size
var n int
for n = 0; n < len(buf); n++ {
_, err := r.Read(buf[n : n+1])
if err != nil {
if n == 0 {
return state.OrderedListEntry{}, err
}
return state.OrderedListEntry{}, fmt.Errorf("unexpected error reading varint: %w", err)
}
if buf[n]&0x80 == 0 {
n++
break
}
}
sortKey, consumed := protowire.ConsumeVarint(buf[:n])
if consumed < 0 {
return state.OrderedListEntry{}, fmt.Errorf("invalid varint in ordered list entry")
}

dec := MakeElementDecoder(coder.SkipW(c))
fv, err := dec.Decode(r)
if err != nil {
return state.OrderedListEntry{}, err
}
return state.OrderedListEntry{SortKey: int64(sortKey), Value: fv.Elm}, nil
}

func (s *stateProvider) encodeKey(userStateID string, key any) ([]byte, error) {
fv := FullValue{Elm: key}
enc := MakeElementEncoder(coder.SkipW(s.keyCodersByID[userStateID]))
Expand Down Expand Up @@ -533,6 +682,7 @@ func (s *userStateAdapter) NewStateProvider(ctx context.Context, reader StateRea
blindBagWriteCountsByKey: make(map[string]int),
initialMapValuesByKey: make(map[string]map[string]any),
initialMapKeysByKey: make(map[string][]any),
initialOrderedListByKey: make(map[string][]any),
readersByKey: make(map[string]io.ReadCloser),
appendersByKey: make(map[string]io.Writer),
clearersByKey: make(map[string]io.Writer),
Expand Down
Loading
Loading