From d84313231bd8698c2b47bd3f59db55a994b1c2b0 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Wed, 8 Oct 2025 15:27:11 +0300 Subject: [PATCH 1/7] feat: refactor filter Signed-off-by: Timur Tuktamyshev --- cmd/shell-operator/main.go | 5 +- pkg/filter/filter.go | 147 ++++- pkg/filter/filter_test.go | 614 ++++++++++++++++++ pkg/filter/jq/apply.go | 81 --- pkg/filter/jq/apply_test.go | 202 ------ pkg/hook/config/config_v0.go | 8 +- pkg/hook/config/config_v1.go | 9 +- .../kubernetes_bindings_controller.go | 4 +- pkg/kube/object_patch/helpers.go | 6 +- pkg/kube/object_patch/operation.go | 11 +- pkg/kube/object_patch/patch_collector.go | 2 +- pkg/kube_events_manager/filter.go | 68 -- pkg/kube_events_manager/filter_test.go | 27 - pkg/kube_events_manager/monitor_config.go | 3 +- pkg/kube_events_manager/resource_informer.go | 8 +- pkg/shell-operator/bootstrap.go | 5 +- 16 files changed, 794 insertions(+), 406 deletions(-) create mode 100644 pkg/filter/filter_test.go delete mode 100644 pkg/filter/jq/apply.go delete mode 100644 pkg/filter/jq/apply_test.go delete mode 100644 pkg/kube_events_manager/filter.go delete mode 100644 pkg/kube_events_manager/filter_test.go diff --git a/cmd/shell-operator/main.go b/cmd/shell-operator/main.go index 7ee76102..8455177b 100644 --- a/cmd/shell-operator/main.go +++ b/cmd/shell-operator/main.go @@ -10,7 +10,7 @@ import ( "github.com/flant/kube-client/klogtolog" "github.com/flant/shell-operator/pkg/app" "github.com/flant/shell-operator/pkg/debug" - "github.com/flant/shell-operator/pkg/filter/jq" + "github.com/flant/shell-operator/pkg/filter" ) func main() { @@ -31,8 +31,7 @@ func main() { // print version kpApp.Command("version", "Show version.").Action(func(_ *kingpin.ParseContext) error { fmt.Printf("%s %s\n", app.AppName, app.Version) - fl := jq.NewFilter() - fmt.Println(fl.FilterInfo()) + fmt.Println(filter.Info()) return nil }) diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 7551a49c..280b5f05 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -1,6 +1,147 @@ package filter -type Filter interface { - ApplyFilter(filterStr string, data map[string]any) ([]byte, error) - FilterInfo() string +import ( + "context" + "encoding/json" + "errors" + "fmt" + "reflect" + "runtime" + "runtime/trace" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" + utils_checksum "github.com/flant/shell-operator/pkg/utils/checksum" + "github.com/itchyny/gojq" +) + +type FilterFn func(obj *unstructured.Unstructured) (result interface{}, err error) + +type Expression struct { + *gojq.Code + Query string +} + +// applyFilter filters object with custom function or jq expression, calculates checksum +// over the result and returns ObjectAndFilterResult. If no filter is provided, +// checksum is calculated over the full JSON representation of the object. +func Run(expression *Expression, filterFn FilterFn, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { + defer trace.StartRegion(context.Background(), "ApplyJqFilter").End() + + result := &kemtypes.ObjectAndFilterResult{ + Object: obj, + } + result.Metadata.ResourceId = fmt.Sprintf("%s/%s/%s", obj.GetNamespace(), obj.GetKind(), obj.GetName()) + + if expression != nil { + result.Metadata.JqFilter = expression.Query + } + + // Apply custom filter function + if filterFn != nil { + return applyCustomFilter(filterFn, result, obj) + } + + // No filter - use raw object JSON + if expression == nil { + return applyNoFilter(result, obj) + } + + // Apply jq expression filter + return applyJQFilter(expression, result, obj) +} + +func applyCustomFilter(filterFn FilterFn, result *kemtypes.ObjectAndFilterResult, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { + filteredObj, err := filterFn(obj) + if err != nil { + funcName := runtime.FuncForPC(reflect.ValueOf(filterFn).Pointer()).Name() + return nil, fmt.Errorf("filterFn (%s) contains an error: %v", funcName, err) + } + + filteredBytes, err := json.Marshal(filteredObj) + if err != nil { + return nil, err + } + + result.FilterResult = filteredObj + result.Metadata.Checksum = utils_checksum.CalculateChecksum(string(filteredBytes)) + return result, nil +} + +func applyNoFilter(result *kemtypes.ObjectAndFilterResult, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { + data, err := json.Marshal(obj) + if err != nil { + return nil, err + } + + result.Metadata.Checksum = utils_checksum.CalculateChecksum(string(data)) + return result, nil +} + +func applyJQFilter(expression *Expression, result *kemtypes.ObjectAndFilterResult, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { + filtered, err := RunJQ(expression, obj.DeepCopy()) + if err != nil { + return nil, fmt.Errorf("jqFilter: %v", err) + } + + filteredStr := string(filtered) + result.FilterResult = filteredStr + result.Metadata.Checksum = utils_checksum.CalculateChecksum(filteredStr) + return result, nil +} + +func RunJQ(expression *Expression, obj *unstructured.Unstructured) ([]byte, error) { + // Execute jq expression and collect results + iter := expression.Run(obj.UnstructuredContent()) + var results []any + + for { + v, ok := iter.Next() + if !ok { + break + } + + // Handle errors from jq execution + if err, ok := v.(error); ok { + // HaltError with nil value means graceful termination + var haltErr *gojq.HaltError + if errors.As(err, &haltErr) && haltErr.Value() == nil { + break + } + return nil, err + } + + results = append(results, v) + } + + // Marshal results based on count + switch len(results) { + case 0: + return []byte("null"), nil + case 1: + return json.Marshal(results[0]) + default: + return json.Marshal(results) + } +} + +func CompileExpression(expression string) (*Expression, error) { + parsedQuery, err := gojq.Parse(expression) + if err != nil { + return nil, err + } + compiledQuery, err := gojq.Compile(parsedQuery) + if err != nil { + return nil, err + } + + return &Expression{ + Code: compiledQuery, + Query: expression, + }, nil +} + +func Info() string { + return "Filter implementation: using itchyny/gojq" } diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go new file mode 100644 index 00000000..564e4687 --- /dev/null +++ b/pkg/filter/filter_test.go @@ -0,0 +1,614 @@ +package filter + +import ( + "encoding/json" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// Helper function to create test objects +func newTestObject(data map[string]interface{}) *unstructured.Unstructured { + return &unstructured.Unstructured{Object: data} +} + +// Test Filter function - main entry point +func TestRun(t *testing.T) { + t.Run("with custom filter function - success", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test-pod", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{"name": "nginx"}, + }, + }, + }) + + filterFn := func(o *unstructured.Unstructured) (interface{}, error) { + return map[string]interface{}{"name": o.GetName()}, nil + } + + result, err := Run(nil, filterFn, obj) + require.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, obj, result.Object) + assert.NotEmpty(t, result.Metadata.Checksum) + assert.Equal(t, map[string]interface{}{"name": "test-pod"}, result.FilterResult) + }) + + t.Run("with custom filter function - error", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{"foo": "bar"}) + expectedErr := errors.New("filter failed") + + filterFn := func(o *unstructured.Unstructured) (interface{}, error) { + return nil, expectedErr + } + + result, err := Run(nil, filterFn, obj) + assert.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "filterFn") + assert.Contains(t, err.Error(), "filter failed") + }) + + t.Run("with custom filter function - invalid JSON result", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{"foo": "bar"}) + + // Return something that can't be marshaled to JSON + filterFn := func(o *unstructured.Unstructured) (interface{}, error) { + return make(chan int), nil // channels can't be marshaled + } + + result, err := Run(nil, filterFn, obj) + assert.Error(t, err) + assert.Nil(t, result) + }) + + t.Run("no filter - full object checksum", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "test-cm", + "namespace": "kube-system", + }, + "data": map[string]interface{}{ + "key": "value", + }, + }) + + result, err := Run(nil, nil, obj) + require.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, obj, result.Object) + assert.NotEmpty(t, result.Metadata.Checksum) + assert.Nil(t, result.FilterResult) + assert.Equal(t, "kube-system/ConfigMap/test-cm", result.Metadata.ResourceId) + }) + + t.Run("no filter - empty object", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{}) + + result, err := Run(nil, nil, obj) + require.NoError(t, err) + assert.NotNil(t, result) + assert.NotEmpty(t, result.Metadata.Checksum) + }) + + t.Run("with jq expression - simple filter", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test-pod", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "nodeName": "node-1", + }, + }) + + expr, err := CompileExpression(".spec.nodeName") + require.NoError(t, err) + + result, err := Run(expr, nil, obj) + require.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, `"node-1"`, result.FilterResult) + assert.NotEmpty(t, result.Metadata.Checksum) + assert.Equal(t, ".spec.nodeName", result.Metadata.JqFilter) + }) + + t.Run("with jq expression - complex filter", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{"name": "nginx", "image": "nginx:1.19"}, + map[string]interface{}{"name": "sidecar", "image": "sidecar:latest"}, + }, + }, + }) + + expr, err := CompileExpression(".spec.containers[].name") + require.NoError(t, err) + + result, err := Run(expr, nil, obj) + require.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, `["nginx","sidecar"]`, result.FilterResult) + }) + + t.Run("with jq expression - filter returns null", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "spec": map[string]interface{}{}, + }) + + expr, err := CompileExpression(".spec.nonexistent") + require.NoError(t, err) + + result, err := Run(expr, nil, obj) + require.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, "null", result.FilterResult) + }) + + t.Run("with jq expression - error", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "spec": "not-an-object", + }) + + expr, err := CompileExpression(".spec.field") + require.NoError(t, err) + + result, err := Run(expr, nil, obj) + assert.Error(t, err) + assert.Nil(t, result) + }) +} + +// Test RunJQ function +func TestRunJQ(t *testing.T) { + t.Run("simple selection", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "name": "test", + "age": 42, + }) + + expr, err := CompileExpression(".name") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `"test"`, string(result)) + }) + + t.Run("multiple results", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "items": []interface{}{1, 2, 3}, + }) + + expr, err := CompileExpression(".items[]") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "[1,2,3]", string(result)) + }) + + t.Run("no results - returns null", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{}) + + expr, err := CompileExpression("empty") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "null", string(result)) + }) + + t.Run("single null result", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "value": nil, + }) + + expr, err := CompileExpression(".value") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "null", string(result)) + }) + + t.Run("complex object transformation", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "users": []interface{}{ + map[string]interface{}{"name": "Alice", "age": 30}, + map[string]interface{}{"name": "Bob", "age": 25}, + }, + }) + + expr, err := CompileExpression(".users | map({name, older: (.age > 26)})") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + + var parsed []map[string]interface{} + err = json.Unmarshal(result, &parsed) + require.NoError(t, err) + assert.Len(t, parsed, 2) + assert.Equal(t, "Alice", parsed[0]["name"]) + assert.Equal(t, true, parsed[0]["older"]) + assert.Equal(t, "Bob", parsed[1]["name"]) + assert.Equal(t, false, parsed[1]["older"]) + }) + + t.Run("array construction", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "a": 1, + "b": 2, + }) + + expr, err := CompileExpression("[.a, .b, (.a + .b)]") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "[1,2,3]", string(result)) + }) + + t.Run("object construction", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "firstName": "John", + "lastName": "Doe", + }) + + expr, err := CompileExpression("{fullName: (.firstName + \" \" + .lastName)}") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.JSONEq(t, `{"fullName":"John Doe"}`, string(result)) + }) + + t.Run("type error", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "value": "string", + }) + + expr, err := CompileExpression(".value.field") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + assert.Error(t, err) + assert.Nil(t, result) + }) + + t.Run("halt error with nil value - graceful termination", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "items": []interface{}{1, 2, 3}, + }) + + // limit(0) produces halt error with nil value + expr, err := CompileExpression(".items[] | limit(0; .)") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "null", string(result)) + }) + + t.Run("division by zero", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "value": 10, + }) + + expr, err := CompileExpression(".value / 0") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + assert.Error(t, err) + assert.Nil(t, result) + }) + + t.Run("nested array and object", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "data": map[string]interface{}{ + "items": []interface{}{ + map[string]interface{}{ + "tags": []interface{}{"a", "b", "c"}, + }, + map[string]interface{}{ + "tags": []interface{}{"x", "y"}, + }, + }, + }, + }) + + expr, err := CompileExpression(".data.items[].tags[]") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `["a","b","c","x","y"]`, string(result)) + }) + + t.Run("conditional expression", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "status": "active", + }) + + expr, err := CompileExpression(`if .status == "active" then "running" else "stopped" end`) + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `"running"`, string(result)) + }) + + t.Run("boolean values", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "enabled": true, + "disabled": false, + }) + + expr, err := CompileExpression("[.enabled, .disabled]") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "[true,false]", string(result)) + }) + + t.Run("numeric operations", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "a": 10, + "b": 3, + }) + + expr, err := CompileExpression("{sum: (.a + .b), diff: (.a - .b), prod: (.a * .b)}") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.JSONEq(t, `{"sum":13,"diff":7,"prod":30}`, string(result)) + }) + + t.Run("string operations", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "text": "Hello, World!", + }) + + expr, err := CompileExpression(".text | ascii_upcase") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `"HELLO, WORLD!"`, string(result)) + }) + + t.Run("empty object", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{}) + + expr, err := CompileExpression(".") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "{}", string(result)) + }) + + t.Run("special characters in strings", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "text": "Line 1\nLine 2\tTabbed", + }) + + expr, err := CompileExpression(".text") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + + var text string + err = json.Unmarshal(result, &text) + require.NoError(t, err) + assert.Equal(t, "Line 1\nLine 2\tTabbed", text) + }) + + t.Run("unicode characters", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "emoji": "πŸš€βœ¨πŸŽ‰", + "cyrillic": "ΠŸΡ€ΠΈΠ²Π΅Ρ‚", + }) + + expr, err := CompileExpression("{emoji, cyrillic}") + require.NoError(t, err) + + result, err := RunJQ(expr, obj) + require.NoError(t, err) + assert.JSONEq(t, `{"emoji":"πŸš€βœ¨πŸŽ‰","cyrillic":"ΠŸΡ€ΠΈΠ²Π΅Ρ‚"}`, string(result)) + }) +} + +// Test CompileExpression function +func TestCompileExpression(t *testing.T) { + t.Run("valid simple expression", func(t *testing.T) { + expr, err := CompileExpression(".field") + require.NoError(t, err) + assert.NotNil(t, expr) + assert.NotNil(t, expr.Code) + assert.Equal(t, ".field", expr.Query) + }) + + t.Run("valid complex expression", func(t *testing.T) { + expr, err := CompileExpression(`.items[] | select(.type == "pod") | .metadata.name`) + require.NoError(t, err) + assert.NotNil(t, expr) + assert.NotNil(t, expr.Code) + }) + + t.Run("invalid expression - syntax error", func(t *testing.T) { + expr, err := CompileExpression(".field[") + assert.Error(t, err) + assert.Nil(t, expr) + }) + + t.Run("invalid expression - incomplete", func(t *testing.T) { + expr, err := CompileExpression(".") + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("empty expression", func(t *testing.T) { + expr, err := CompileExpression("") + assert.Error(t, err) + assert.Nil(t, expr) + }) + + t.Run("expression with function", func(t *testing.T) { + expr, err := CompileExpression(".items | length") + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("expression with pipe", func(t *testing.T) { + expr, err := CompileExpression(".data | keys | sort") + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("expression with map", func(t *testing.T) { + expr, err := CompileExpression(".items | map(.name)") + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("expression with select", func(t *testing.T) { + expr, err := CompileExpression(`.items[] | select(.status == "active")`) + require.NoError(t, err) + assert.NotNil(t, expr) + }) +} + +// Test edge cases and resource ID generation +func TestResourceIdGeneration(t *testing.T) { + t.Run("standard kubernetes object", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "my-pod", + "namespace": "default", + }, + }) + + result, err := Run(nil, nil, obj) + require.NoError(t, err) + assert.Equal(t, "default/Pod/my-pod", result.Metadata.ResourceId) + }) + + t.Run("cluster-scoped resource", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Node", + "metadata": map[string]interface{}{ + "name": "node-1", + }, + }) + + result, err := Run(nil, nil, obj) + require.NoError(t, err) + assert.Equal(t, "/Node/node-1", result.Metadata.ResourceId) + }) + + t.Run("missing metadata", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Unknown", + }) + + result, err := Run(nil, nil, obj) + require.NoError(t, err) + assert.Equal(t, "/Unknown/", result.Metadata.ResourceId) + }) +} + +// Benchmark tests +func BenchmarkRun(b *testing.B) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test-pod", + "namespace": "default", + "labels": map[string]interface{}{ + "app": "nginx", + }, + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + }, + }) + + b.Run("NoFilter", func(b *testing.B) { + for i := 0; i < b.N; i++ { + _, err := Run(nil, nil, obj) + if err != nil { + b.Fatal(err) + } + } + }) + + b.Run("CustomFilter", func(b *testing.B) { + filterFn := func(o *unstructured.Unstructured) (interface{}, error) { + return map[string]interface{}{"name": o.GetName()}, nil + } + for i := 0; i < b.N; i++ { + _, err := Run(nil, filterFn, obj) + if err != nil { + b.Fatal(err) + } + } + }) + + b.Run("JQFilter", func(b *testing.B) { + expr, err := CompileExpression(".metadata.name") + if err != nil { + b.Fatal(err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := Run(expr, nil, obj) + if err != nil { + b.Fatal(err) + } + } + }) + + b.Run("JQFilterComplex", func(b *testing.B) { + expr, err := CompileExpression(".spec.containers[].name") + if err != nil { + b.Fatal(err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := Run(expr, nil, obj) + if err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/pkg/filter/jq/apply.go b/pkg/filter/jq/apply.go deleted file mode 100644 index 83905572..00000000 --- a/pkg/filter/jq/apply.go +++ /dev/null @@ -1,81 +0,0 @@ -package jq - -import ( - "encoding/json" - "errors" - - "github.com/itchyny/gojq" - - "github.com/flant/shell-operator/pkg/filter" -) - -var _ filter.Filter = (*Filter)(nil) - -func NewFilter() *Filter { - return &Filter{} -} - -type Filter struct{} - -// ApplyFilter runs jq expression provided in jqFilter with jsonData as input. -func (f *Filter) ApplyFilter(jqFilter string, data map[string]any) ([]byte, error) { - query, err := gojq.Parse(jqFilter) - if err != nil { - return nil, err - } - - var workData any - if data == nil { - workData = nil - } else { - workData, err = deepCopyAny(data) - if err != nil { - return nil, err - } - } - - iter := query.Run(workData) - result := make([]any, 0) - for { - v, ok := iter.Next() - if !ok { - break - } - if err, ok := v.(error); ok { - var errGoJq *gojq.HaltError - if errors.As(err, &errGoJq) && errGoJq.Value() == nil { - break - } - return nil, err - } - result = append(result, v) - } - - switch len(result) { - case 0: - return []byte("null"), nil - case 1: - return json.Marshal(result[0]) - default: - return json.Marshal(result) - } -} - -func (f *Filter) FilterInfo() string { - return "jqFilter implementation: using itchyny/gojq" -} - -func deepCopyAny(input any) (any, error) { - if input == nil { - return nil, nil - } - data, err := json.Marshal(input) - if err != nil { - return nil, err - } - var output any - if err := json.Unmarshal(data, &output); err != nil { - return nil, err - } - return output, nil -} diff --git a/pkg/filter/jq/apply_test.go b/pkg/filter/jq/apply_test.go deleted file mode 100644 index 39b24374..00000000 --- a/pkg/filter/jq/apply_test.go +++ /dev/null @@ -1,202 +0,0 @@ -package jq - -import ( - "encoding/json" - "testing" - - . "github.com/onsi/gomega" -) - -func Test_ApplyFilter_SingleDocumentModification(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - jqFilter := `. + {"status": "active"}` - - result, err := filter.ApplyFilter(jqFilter, map[string]any{"name": "John", "age": 30}) - g.Expect(err).Should(BeNil()) - - var resultMap any - err = json.Unmarshal(result, &resultMap) - g.Expect(err).Should(BeNil()) - g.Expect(resultMap).Should(Equal(map[string]any{"name": "John", "age": 30.0, "status": "active"})) -} - -func Test_ApplyFilter_ExtractValuesFromDocument(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - jqFilter := `.user.details` - - result, err := filter.ApplyFilter(jqFilter, map[string]any{"user": map[string]any{"name": "John", "details": map[string]any{"location": "New York", "occupation": "Developer"}}}) - g.Expect(err).Should(BeNil()) - - var resultMap any - err = json.Unmarshal(result, &resultMap) - g.Expect(err).Should(BeNil()) - g.Expect(resultMap).Should(Equal(map[string]any{"location": "New York", "occupation": "Developer"})) -} - -func Test_ApplyFilter_MultipleJsonDocumentsInArray(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - jqFilter := `.users[] | . + {"status": "active"}` - - result, err := filter.ApplyFilter(jqFilter, map[string]any{"users": []any{map[string]any{"name": "John", "status": "inactive"}, map[string]any{"name": "Jane", "status": "inactive"}}}) - g.Expect(err).Should(BeNil()) - - var resultMap []any - err = json.Unmarshal(result, &resultMap) - g.Expect(err).Should(BeNil()) - g.Expect(resultMap).Should(HaveLen(2)) - - expected := []map[string]any{ - {"name": "John", "status": "active"}, - {"name": "Jane", "status": "active"}, - } - - g.Expect(resultMap).Should(ConsistOf(expected)) -} - -func Test_ApplyFilter_InvalidFilter(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - // Test invalid jq syntax - invalidSyntax := `invalid syntax` - result, err := filter.ApplyFilter(invalidSyntax, map[string]any{"name": "John"}) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(result).Should(BeNil()) - - // Test invalid jq function - invalidFunction := `. | invalid_function` - result, err = filter.ApplyFilter(invalidFunction, map[string]any{"name": "John"}) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(result).Should(BeNil()) -} - -func Test_ApplyFilter_InvalidJson(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - jqFilter := `.name` - - result, err := filter.ApplyFilter(jqFilter, map[string]any{"name": "John"}) - g.Expect(err).Should(BeNil()) - g.Expect(result).ShouldNot(BeNil()) - - var resultMap any - err = json.Unmarshal(result, &resultMap) - g.Expect(err).Should(BeNil()) - g.Expect(resultMap).ShouldNot(BeNil()) -} - -func Test_ApplyFilter_NilInputData(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - result, err := filter.ApplyFilter(`.`, nil) - g.Expect(err).Should(BeNil()) - g.Expect(result).ShouldNot(BeNil()) -} - -func Test_ApplyFilter_EmptyFilter(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - result, err := filter.ApplyFilter("", map[string]any{"name": "John"}) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(result).Should(BeNil()) -} - -func Test_ApplyFilter_InvalidJsonInDeepCopy(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - // Create invalid JSON data that cannot be marshaled - invalidData := map[string]any{ - "channel": make(chan int), // channel cannot be marshaled to JSON - } - - result, err := filter.ApplyFilter(`.`, invalidData) - g.Expect(err).ShouldNot(BeNil()) // Expect an error due to invalid JSON - g.Expect(result).Should(BeNil()) -} - -func Test_ApplyFilter_EmptyResult(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - // Filter that returns no results - result, err := filter.ApplyFilter(`.nonexistent`, map[string]any{"name": "John"}) - g.Expect(err).Should(BeNil()) - g.Expect(result).ShouldNot(BeNil()) - - var resultMap any - err = json.Unmarshal(result, &resultMap) - g.Expect(err).Should(BeNil()) - g.Expect(resultMap).Should(BeNil()) // Expect result to be nil (empty) -} - -func Test_ApplyFilter_InvalidJqSyntax(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - result, err := filter.ApplyFilter(`invalid syntax`, map[string]any{"name": "John"}) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(result).Should(BeNil()) -} - -func Test_ApplyFilter_InvalidJqFunction(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - result, err := filter.ApplyFilter(`. | invalid_function`, map[string]any{"name": "John"}) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(result).Should(BeNil()) -} - -func Test_ApplyFilter_PanicSafety(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - defer func() { - if r := recover(); r != nil { - t.Errorf("ApplyFilter panicked: %v", r) - } - }() - - // Test with data that could potentially cause a panic - _, err := filter.ApplyFilter(`.`, map[string]any{"key": func() {}}) - g.Expect(err).ShouldNot(BeNil()) -} - -func Test_deepCopyAny(t *testing.T) { - g := NewWithT(t) - - // Test copying a map - inputMap := map[string]any{"foo": "bar", "num": 42} - copyMap, err := deepCopyAny(inputMap) - g.Expect(err).Should(BeNil()) - g.Expect(copyMap).Should(Equal(map[string]any{"foo": "bar", "num": float64(42)})) - g.Expect(copyMap).ShouldNot(BeIdenticalTo(inputMap)) - - // Test copying a slice - inputSlice := []any{"a", 1, true} - copySlice, err := deepCopyAny(inputSlice) - g.Expect(err).Should(BeNil()) - g.Expect(copySlice).Should(Equal([]any{"a", float64(1), true})) - g.Expect(copySlice).ShouldNot(BeIdenticalTo(inputSlice)) - - // Test copying nil - copyNil, err := deepCopyAny(nil) - g.Expect(err).Should(BeNil()) - g.Expect(copyNil).Should(BeNil()) - - // Test copying a value that cannot be marshaled to JSON - inputInvalid := map[string]any{"ch": make(chan int)} - copyInvalid, err := deepCopyAny(inputInvalid) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(copyInvalid).Should(BeNil()) -} diff --git a/pkg/hook/config/config_v0.go b/pkg/hook/config/config_v0.go index 4fb3d89b..b9b07742 100644 --- a/pkg/hook/config/config_v0.go +++ b/pkg/hook/config/config_v0.go @@ -6,6 +6,7 @@ import ( "gopkg.in/robfig/cron.v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/flant/shell-operator/pkg/filter" htypes "github.com/flant/shell-operator/pkg/hook/types" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" @@ -107,7 +108,12 @@ func (cv0 *HookConfigV0) ConvertAndCheck(c *HookConfig) error { }) } monitor.WithLabelSelector(kubeCfg.Selector) - monitor.JqFilter = kubeCfg.JqFilter + + filter, err := filter.CompileExpression(kubeCfg.JqFilter) + if err != nil { + return fmt.Errorf("invalid jqFilter: %w", err) + } + monitor.JqFilter = filter kubeConfig := htypes.OnKubernetesEventConfig{} kubeConfig.Monitor = monitor diff --git a/pkg/hook/config/config_v1.go b/pkg/hook/config/config_v1.go index 59687fb2..73ad9384 100644 --- a/pkg/hook/config/config_v1.go +++ b/pkg/hook/config/config_v1.go @@ -12,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "github.com/flant/shell-operator/pkg/app" + "github.com/flant/shell-operator/pkg/filter" htypes "github.com/flant/shell-operator/pkg/hook/types" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" @@ -130,7 +131,13 @@ func (cv1 *HookConfigV1) ConvertAndCheck(c *HookConfig) error { monitor.WithFieldSelector((*kemtypes.FieldSelector)(kubeCfg.FieldSelector)) monitor.WithNamespaceSelector((*kemtypes.NamespaceSelector)(kubeCfg.Namespace)) monitor.WithLabelSelector(kubeCfg.LabelSelector) - monitor.JqFilter = kubeCfg.JqFilter + + filter, err := filter.CompileExpression(kubeCfg.JqFilter) + if err != nil { + return fmt.Errorf("invalid jqFilter: %w", err) + } + monitor.JqFilter = filter + // executeHookOnEvent is a priority if kubeCfg.ExecuteHookOnEvents != nil { monitor.WithEventTypes(kubeCfg.ExecuteHookOnEvents) diff --git a/pkg/hook/controller/kubernetes_bindings_controller.go b/pkg/hook/controller/kubernetes_bindings_controller.go index 97f500a0..e9ae4e2c 100644 --- a/pkg/hook/controller/kubernetes_bindings_controller.go +++ b/pkg/hook/controller/kubernetes_bindings_controller.go @@ -344,7 +344,7 @@ func ConvertKubeEventToBindingContext(kubeEvent kemtypes.KubeEvent, link *Kubern Type: kubeEvent.Type, Objects: kubeEvent.Objects, } - bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter + bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter.Query bc.Metadata.BindingType = htypes.OnKubernetesEvent bc.Metadata.IncludeSnapshots = link.BindingConfig.IncludeSnapshotsFrom bc.Metadata.Group = link.BindingConfig.Group @@ -359,7 +359,7 @@ func ConvertKubeEventToBindingContext(kubeEvent kemtypes.KubeEvent, link *Kubern WatchEvent: kEvent, Objects: kubeEvent.Objects, } - bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter + bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter.Query bc.Metadata.BindingType = htypes.OnKubernetesEvent bc.Metadata.IncludeSnapshots = link.BindingConfig.IncludeSnapshotsFrom bc.Metadata.Group = link.BindingConfig.Group diff --git a/pkg/kube/object_patch/helpers.go b/pkg/kube/object_patch/helpers.go index 21da061e..442d21a3 100644 --- a/pkg/kube/object_patch/helpers.go +++ b/pkg/kube/object_patch/helpers.go @@ -64,11 +64,11 @@ func unmarshalFromYaml(yamlSpecs []byte) ([]OperationSpec, error) { return specSlice, nil } -func applyJQPatch(jqFilter string, fl filter.Filter, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { - filterResult, err := fl.ApplyFilter(jqFilter, obj.UnstructuredContent()) +func applyJQPatch(jqFilter *filter.Expression, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + filterResult, err := filter.RunJQ(jqFilter, obj) if err != nil { return nil, fmt.Errorf("failed to apply jqFilter:\n%sto Object:\n%s\n"+ - "error: %s", jqFilter, obj, err) + "error: %s", jqFilter.Query, obj, err) } retObj := &unstructured.Unstructured{} diff --git a/pkg/kube/object_patch/operation.go b/pkg/kube/object_patch/operation.go index 6f8ab26c..bd3a0319 100644 --- a/pkg/kube/object_patch/operation.go +++ b/pkg/kube/object_patch/operation.go @@ -6,12 +6,11 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" sdkpkg "github.com/deckhouse/module-sdk/pkg" + "github.com/flant/shell-operator/pkg/filter" "github.com/hashicorp/go-multierror" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" - - "github.com/flant/shell-operator/pkg/filter/jq" ) // OperationSpec a JSON and YAML representation of the operation for shell hooks @@ -288,8 +287,12 @@ func newPatchOperation(patchType types.PatchType, patch any, apiVersion, kind, n func NewPatchWithJQOperation(jqQuery string, apiVersion string, kind string, namespace string, name string, opts ...sdkpkg.PatchCollectorOption) sdkpkg.PatchCollectorOperation { return newFilterOperation(func(u *unstructured.Unstructured) (*unstructured.Unstructured, error) { - filter := jq.NewFilter() - return applyJQPatch(jqQuery, filter, u) + expression, err := filter.CompileExpression(jqQuery) + if err != nil { + return nil, err + } + + return applyJQPatch(expression, u) }, apiVersion, kind, namespace, name, opts...) } diff --git a/pkg/kube/object_patch/patch_collector.go b/pkg/kube/object_patch/patch_collector.go index 4b6e27ff..b3024678 100644 --- a/pkg/kube/object_patch/patch_collector.go +++ b/pkg/kube/object_patch/patch_collector.go @@ -126,7 +126,7 @@ func (dop *PatchCollector) PatchWithMutatingFunc( // - IgnoreMissingObject β€” do not return error if the specified object is missing. // - IgnoreHookError β€” allows applying patches for a Status subresource even if the hook fails func (dop *PatchCollector) JQFilter( - jqfilter, apiVersion, kind, namespace, name string, opts ...sdkpkg.PatchCollectorOption, + jqfilter string, apiVersion, kind, namespace, name string, opts ...sdkpkg.PatchCollectorOption, ) { dop.add(NewPatchWithJQOperation(jqfilter, apiVersion, kind, namespace, name, opts...)) } diff --git a/pkg/kube_events_manager/filter.go b/pkg/kube_events_manager/filter.go deleted file mode 100644 index 7b704cd0..00000000 --- a/pkg/kube_events_manager/filter.go +++ /dev/null @@ -1,68 +0,0 @@ -package kubeeventsmanager - -import ( - "context" - "encoding/json" - "fmt" - "reflect" - "runtime" - "runtime/trace" - - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - "github.com/flant/shell-operator/pkg/filter" - kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" - utils_checksum "github.com/flant/shell-operator/pkg/utils/checksum" -) - -// applyFilter filters object json representation with jq expression, calculate checksum -// over result and return ObjectAndFilterResult. If jqFilter is empty, no filter -// is required and checksum is calculated over full json representation of the object. -func applyFilter(jqFilter string, fl filter.Filter, filterFn func(obj *unstructured.Unstructured) (result interface{}, err error), obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { - defer trace.StartRegion(context.Background(), "ApplyJqFilter").End() - - res := &kemtypes.ObjectAndFilterResult{ - Object: obj, - } - res.Metadata.JqFilter = jqFilter - res.Metadata.ResourceId = resourceId(obj) - - // If filterFn is passed, run it and return result. - if filterFn != nil { - filteredObj, err := filterFn(obj) - if err != nil { - return nil, fmt.Errorf("filterFn (%s) contains an error: %v", runtime.FuncForPC(reflect.ValueOf(filterFn).Pointer()).Name(), err) - } - - filteredBytes, err := json.Marshal(filteredObj) - if err != nil { - return nil, err - } - - res.FilterResult = filteredObj - res.Metadata.Checksum = utils_checksum.CalculateChecksum(string(filteredBytes)) - - return res, nil - } - - // Render obj to JSON text to apply jq filter. - if jqFilter == "" { - data, err := json.Marshal(obj) - if err != nil { - return nil, err - } - res.Metadata.Checksum = utils_checksum.CalculateChecksum(string(data)) - } else { - var err error - var filtered []byte - filtered, err = fl.ApplyFilter(jqFilter, obj.UnstructuredContent()) - if err != nil { - return nil, fmt.Errorf("jqFilter: %v", err) - } - - res.FilterResult = string(filtered) - res.Metadata.Checksum = utils_checksum.CalculateChecksum(string(filtered)) - } - - return res, nil -} diff --git a/pkg/kube_events_manager/filter_test.go b/pkg/kube_events_manager/filter_test.go deleted file mode 100644 index 5c1961ab..00000000 --- a/pkg/kube_events_manager/filter_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package kubeeventsmanager - -import ( - "encoding/json" - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - "github.com/flant/shell-operator/pkg/filter/jq" -) - -func TestApplyFilter(t *testing.T) { - t.Run("filter func with error", func(t *testing.T) { - uns := &unstructured.Unstructured{Object: map[string]interface{}{"foo": "bar"}} - filter := jq.NewFilter() - _, err := applyFilter("", filter, filterFuncWithError, uns) - assert.EqualError(t, err, "filterFn (github.com/flant/shell-operator/pkg/kube_events_manager.filterFuncWithError) contains an error: invalid character 'a' looking for beginning of value") - }) -} - -func filterFuncWithError(_ *unstructured.Unstructured) (interface{}, error) { - var s []string - err := json.Unmarshal([]byte("asdasd"), &s) - - return s, err -} diff --git a/pkg/kube_events_manager/monitor_config.go b/pkg/kube_events_manager/monitor_config.go index 27df1a17..b17f2250 100644 --- a/pkg/kube_events_manager/monitor_config.go +++ b/pkg/kube_events_manager/monitor_config.go @@ -5,6 +5,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "github.com/flant/shell-operator/pkg/filter" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" ) @@ -24,7 +25,7 @@ type MonitorConfig struct { NamespaceSelector *kemtypes.NamespaceSelector LabelSelector *metav1.LabelSelector FieldSelector *kemtypes.FieldSelector - JqFilter string + JqFilter *filter.Expression Logger *log.Logger Mode kemtypes.KubeEventMode KeepFullObjectsInMemory bool diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index ebb8a5f6..973dd177 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -17,7 +17,7 @@ import ( "k8s.io/client-go/tools/cache" klient "github.com/flant/kube-client/client" - "github.com/flant/shell-operator/pkg/filter/jq" + "github.com/flant/shell-operator/pkg/filter" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" "github.com/flant/shell-operator/pkg/metrics" "github.com/flant/shell-operator/pkg/utils/measure" @@ -227,8 +227,7 @@ func (ei *resourceInformer) loadExistedObjects() error { defer measure.Duration(func(d time.Duration) { ei.metricStorage.HistogramObserve(metrics.KubeJqFilterDurationSeconds, d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) })() - filter := jq.NewFilter() - objFilterRes, err = applyFilter(ei.Monitor.JqFilter, filter, ei.Monitor.FilterFunc, &obj) + objFilterRes, err = filter.Run(ei.Monitor.JqFilter, ei.Monitor.FilterFunc, &obj) }() if err != nil { @@ -307,8 +306,7 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty defer measure.Duration(func(d time.Duration) { ei.metricStorage.HistogramObserve(metrics.KubeJqFilterDurationSeconds, d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) })() - filter := jq.NewFilter() - objFilterRes, err = applyFilter(ei.Monitor.JqFilter, filter, ei.Monitor.FilterFunc, obj) + objFilterRes, err = filter.Run(ei.Monitor.JqFilter, ei.Monitor.FilterFunc, obj) }() if err != nil { log.Error("handleWatchEvent: applyFilter error", diff --git a/pkg/shell-operator/bootstrap.go b/pkg/shell-operator/bootstrap.go index 10121625..2e956f03 100644 --- a/pkg/shell-operator/bootstrap.go +++ b/pkg/shell-operator/bootstrap.go @@ -9,7 +9,6 @@ import ( "github.com/flant/shell-operator/pkg/app" "github.com/flant/shell-operator/pkg/config" "github.com/flant/shell-operator/pkg/debug" - "github.com/flant/shell-operator/pkg/filter/jq" "github.com/flant/shell-operator/pkg/hook" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" "github.com/flant/shell-operator/pkg/metrics" @@ -27,10 +26,8 @@ func Init(logger *log.Logger) (*ShellOperator, error) { // Init logging subsystem. app.SetupLogging(runtimeConfig, logger) - // Log version and jq filtering implementation. + // Log version logger.Info(app.AppStartMessage) - fl := jq.NewFilter() - logger.Debug(fl.FilterInfo()) hooksDir, err := utils.RequireExistingDirectory(app.HooksDir) if err != nil { From 7aabe0b245b25441c91bcff4d76a6d4f74709199 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Wed, 8 Oct 2025 15:35:42 +0300 Subject: [PATCH 2/7] feat: minor improvements Signed-off-by: Timur Tuktamyshev --- pkg/filter/filter.go | 33 ++++++++++++++++-------- pkg/kube/object_patch/patch_collector.go | 2 +- pkg/shell-operator/bootstrap.go | 4 ++- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 280b5f05..9ae1cc60 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -23,33 +23,44 @@ type Expression struct { Query string } -// applyFilter filters object with custom function or jq expression, calculates checksum -// over the result and returns ObjectAndFilterResult. If no filter is provided, -// checksum is calculated over the full JSON representation of the object. +// Run filters an object with a custom function or jq expression, calculates checksum +// over the result and returns ObjectAndFilterResult. +// +// Filter precedence (highest to lowest): +// 1. Custom filter function (filterFn) - if provided, takes precedence over jq expression +// 2. JQ expression (expression) - used when filterFn is nil but expression is provided +// 3. No filter - when both filterFn and expression are nil, uses raw object JSON +// +// The function calculates a checksum over the filtered result (or full object if no filter) +// and populates metadata including resource ID and jq filter query (if applicable). func Run(expression *Expression, filterFn FilterFn, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { defer trace.StartRegion(context.Background(), "ApplyJqFilter").End() + // Initialize result with object and resource ID result := &kemtypes.ObjectAndFilterResult{ Object: obj, } result.Metadata.ResourceId = fmt.Sprintf("%s/%s/%s", obj.GetNamespace(), obj.GetKind(), obj.GetName()) + // Set JQ filter in metadata if expression is provided (even if custom filter takes precedence) if expression != nil { result.Metadata.JqFilter = expression.Query } - // Apply custom filter function - if filterFn != nil { + // Apply filters based on precedence: custom filter > jq expression > no filter + switch { + case filterFn != nil: + // Custom filter function takes highest precedence return applyCustomFilter(filterFn, result, obj) - } - // No filter - use raw object JSON - if expression == nil { + case expression != nil: + // JQ expression filter when no custom filter is provided + return applyJQFilter(expression, result, obj) + + default: + // No filter - use raw object JSON when both filterFn and expression are nil return applyNoFilter(result, obj) } - - // Apply jq expression filter - return applyJQFilter(expression, result, obj) } func applyCustomFilter(filterFn FilterFn, result *kemtypes.ObjectAndFilterResult, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { diff --git a/pkg/kube/object_patch/patch_collector.go b/pkg/kube/object_patch/patch_collector.go index b3024678..4b6e27ff 100644 --- a/pkg/kube/object_patch/patch_collector.go +++ b/pkg/kube/object_patch/patch_collector.go @@ -126,7 +126,7 @@ func (dop *PatchCollector) PatchWithMutatingFunc( // - IgnoreMissingObject β€” do not return error if the specified object is missing. // - IgnoreHookError β€” allows applying patches for a Status subresource even if the hook fails func (dop *PatchCollector) JQFilter( - jqfilter string, apiVersion, kind, namespace, name string, opts ...sdkpkg.PatchCollectorOption, + jqfilter, apiVersion, kind, namespace, name string, opts ...sdkpkg.PatchCollectorOption, ) { dop.add(NewPatchWithJQOperation(jqfilter, apiVersion, kind, namespace, name, opts...)) } diff --git a/pkg/shell-operator/bootstrap.go b/pkg/shell-operator/bootstrap.go index 2e956f03..fdf19e35 100644 --- a/pkg/shell-operator/bootstrap.go +++ b/pkg/shell-operator/bootstrap.go @@ -9,6 +9,7 @@ import ( "github.com/flant/shell-operator/pkg/app" "github.com/flant/shell-operator/pkg/config" "github.com/flant/shell-operator/pkg/debug" + "github.com/flant/shell-operator/pkg/filter" "github.com/flant/shell-operator/pkg/hook" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" "github.com/flant/shell-operator/pkg/metrics" @@ -26,8 +27,9 @@ func Init(logger *log.Logger) (*ShellOperator, error) { // Init logging subsystem. app.SetupLogging(runtimeConfig, logger) - // Log version + // Log version and filter implementation logger.Info(app.AppStartMessage) + logger.Debug(filter.Info()) hooksDir, err := utils.RequireExistingDirectory(app.HooksDir) if err != nil { From dad42a6be721cda511e6c768f4d9e05fee1cf428 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Wed, 8 Oct 2025 15:41:07 +0300 Subject: [PATCH 3/7] chore: fix lint issues Signed-off-by: Timur Tuktamyshev --- pkg/filter/filter.go | 2 +- pkg/filter/filter_test.go | 4 ++-- pkg/kube/object_patch/operation.go | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 9ae1cc60..d0926122 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -9,11 +9,11 @@ import ( "runtime" "runtime/trace" + "github.com/itchyny/gojq" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" utils_checksum "github.com/flant/shell-operator/pkg/utils/checksum" - "github.com/itchyny/gojq" ) type FilterFn func(obj *unstructured.Unstructured) (result interface{}, err error) diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index 564e4687..73e6a616 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -48,7 +48,7 @@ func TestRun(t *testing.T) { obj := newTestObject(map[string]interface{}{"foo": "bar"}) expectedErr := errors.New("filter failed") - filterFn := func(o *unstructured.Unstructured) (interface{}, error) { + filterFn := func(_ *unstructured.Unstructured) (interface{}, error) { return nil, expectedErr } @@ -63,7 +63,7 @@ func TestRun(t *testing.T) { obj := newTestObject(map[string]interface{}{"foo": "bar"}) // Return something that can't be marshaled to JSON - filterFn := func(o *unstructured.Unstructured) (interface{}, error) { + filterFn := func(_ *unstructured.Unstructured) (interface{}, error) { return make(chan int), nil // channels can't be marshaled } diff --git a/pkg/kube/object_patch/operation.go b/pkg/kube/object_patch/operation.go index bd3a0319..12530be7 100644 --- a/pkg/kube/object_patch/operation.go +++ b/pkg/kube/object_patch/operation.go @@ -6,11 +6,12 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" sdkpkg "github.com/deckhouse/module-sdk/pkg" - "github.com/flant/shell-operator/pkg/filter" "github.com/hashicorp/go-multierror" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" + + "github.com/flant/shell-operator/pkg/filter" ) // OperationSpec a JSON and YAML representation of the operation for shell hooks From abfad15a4a68d28def253c1b9f36e0a77c88671a Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Wed, 8 Oct 2025 16:40:16 +0300 Subject: [PATCH 4/7] fix: nil check Signed-off-by: Timur Tuktamyshev --- pkg/hook/controller/kubernetes_bindings_controller.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/hook/controller/kubernetes_bindings_controller.go b/pkg/hook/controller/kubernetes_bindings_controller.go index e9ae4e2c..5f75b1c4 100644 --- a/pkg/hook/controller/kubernetes_bindings_controller.go +++ b/pkg/hook/controller/kubernetes_bindings_controller.go @@ -344,7 +344,9 @@ func ConvertKubeEventToBindingContext(kubeEvent kemtypes.KubeEvent, link *Kubern Type: kubeEvent.Type, Objects: kubeEvent.Objects, } - bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter.Query + if link.BindingConfig.Monitor.JqFilter != nil { + bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter.Query + } bc.Metadata.BindingType = htypes.OnKubernetesEvent bc.Metadata.IncludeSnapshots = link.BindingConfig.IncludeSnapshotsFrom bc.Metadata.Group = link.BindingConfig.Group @@ -359,7 +361,9 @@ func ConvertKubeEventToBindingContext(kubeEvent kemtypes.KubeEvent, link *Kubern WatchEvent: kEvent, Objects: kubeEvent.Objects, } - bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter.Query + if link.BindingConfig.Monitor.JqFilter != nil { + bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter.Query + } bc.Metadata.BindingType = htypes.OnKubernetesEvent bc.Metadata.IncludeSnapshots = link.BindingConfig.IncludeSnapshotsFrom bc.Metadata.Group = link.BindingConfig.Group From 0f311f97d996d1bacf4f0e9e2e8f395ec62befd2 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Wed, 8 Oct 2025 17:13:40 +0300 Subject: [PATCH 5/7] fix: do not compile filter if absent Signed-off-by: Timur Tuktamyshev --- pkg/hook/config/config_v0.go | 10 ++++++---- pkg/hook/config/config_v1.go | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/hook/config/config_v0.go b/pkg/hook/config/config_v0.go index b9b07742..39f0f7e5 100644 --- a/pkg/hook/config/config_v0.go +++ b/pkg/hook/config/config_v0.go @@ -109,11 +109,13 @@ func (cv0 *HookConfigV0) ConvertAndCheck(c *HookConfig) error { } monitor.WithLabelSelector(kubeCfg.Selector) - filter, err := filter.CompileExpression(kubeCfg.JqFilter) - if err != nil { - return fmt.Errorf("invalid jqFilter: %w", err) + if kubeCfg.JqFilter != "" { + filter, err := filter.CompileExpression(kubeCfg.JqFilter) + if err != nil { + return fmt.Errorf("invalid jqFilter: %w", err) + } + monitor.JqFilter = filter } - monitor.JqFilter = filter kubeConfig := htypes.OnKubernetesEventConfig{} kubeConfig.Monitor = monitor diff --git a/pkg/hook/config/config_v1.go b/pkg/hook/config/config_v1.go index 73ad9384..1d4d07da 100644 --- a/pkg/hook/config/config_v1.go +++ b/pkg/hook/config/config_v1.go @@ -132,11 +132,13 @@ func (cv1 *HookConfigV1) ConvertAndCheck(c *HookConfig) error { monitor.WithNamespaceSelector((*kemtypes.NamespaceSelector)(kubeCfg.Namespace)) monitor.WithLabelSelector(kubeCfg.LabelSelector) - filter, err := filter.CompileExpression(kubeCfg.JqFilter) - if err != nil { - return fmt.Errorf("invalid jqFilter: %w", err) + if kubeCfg.JqFilter != "" { + filter, err := filter.CompileExpression(kubeCfg.JqFilter) + if err != nil { + return fmt.Errorf("invalid jqFilter: %w", err) + } + monitor.JqFilter = filter } - monitor.JqFilter = filter // executeHookOnEvent is a priority if kubeCfg.ExecuteHookOnEvents != nil { From 97e333f9205fca79a2042cc8b3f03e0f6889fbd3 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Thu, 9 Oct 2025 15:45:15 +0300 Subject: [PATCH 6/7] chore: small architecture refactor Signed-off-by: Timur Tuktamyshev --- cmd/shell-operator/main.go | 5 +- pkg/filter/filter.go | 170 ++----- pkg/filter/filter_test.go | 476 ++---------------- pkg/hook/config/config_v0.go | 4 +- pkg/hook/config/config_v1.go | 4 +- .../kubernetes_bindings_controller.go | 4 +- pkg/jq/jq.go | 73 +++ pkg/jq/jq_test.go | 428 ++++++++++++++++ pkg/kube/object_patch/helpers.go | 8 +- pkg/kube/object_patch/operation.go | 5 +- pkg/kube_events_manager/monitor_config.go | 4 +- pkg/kube_events_manager/resource_informer.go | 40 +- pkg/kube_events_manager/types/types.go | 14 +- pkg/shell-operator/bootstrap.go | 4 +- 14 files changed, 647 insertions(+), 592 deletions(-) create mode 100644 pkg/jq/jq.go create mode 100644 pkg/jq/jq_test.go diff --git a/cmd/shell-operator/main.go b/cmd/shell-operator/main.go index 8455177b..f01fe4b6 100644 --- a/cmd/shell-operator/main.go +++ b/cmd/shell-operator/main.go @@ -10,7 +10,7 @@ import ( "github.com/flant/kube-client/klogtolog" "github.com/flant/shell-operator/pkg/app" "github.com/flant/shell-operator/pkg/debug" - "github.com/flant/shell-operator/pkg/filter" + "github.com/flant/shell-operator/pkg/jq" ) func main() { @@ -30,8 +30,7 @@ func main() { // print version kpApp.Command("version", "Show version.").Action(func(_ *kingpin.ParseContext) error { - fmt.Printf("%s %s\n", app.AppName, app.Version) - fmt.Println(filter.Info()) + fmt.Printf("%s %s\n%s\n", app.AppName, app.Version, jq.Info()) return nil }) diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index d0926122..1be58bed 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -1,158 +1,96 @@ +/* +Package filter provides functionality for transforming and filtering objects +from plain kubernetes unstructured objects to structured objects, that shell-operator can work with. + - Supports JQ Expressions + - Supports Custom Filter Functions + - Supports Plain Filter Functions + +Enriches objects with metadata and calculates a checksum. +*/ package filter import ( "context" "encoding/json" - "errors" "fmt" "reflect" "runtime" "runtime/trace" - "github.com/itchyny/gojq" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "github.com/flant/shell-operator/pkg/jq" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" utils_checksum "github.com/flant/shell-operator/pkg/utils/checksum" ) type FilterFn func(obj *unstructured.Unstructured) (result interface{}, err error) -type Expression struct { - *gojq.Code - Query string -} +// RunFn runs a filter function on an object and returns an ObjectAndFilterResult. +func RunFn(filterFn FilterFn, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { + defer trace.StartRegion(context.Background(), "FilterRunFn").End() -// Run filters an object with a custom function or jq expression, calculates checksum -// over the result and returns ObjectAndFilterResult. -// -// Filter precedence (highest to lowest): -// 1. Custom filter function (filterFn) - if provided, takes precedence over jq expression -// 2. JQ expression (expression) - used when filterFn is nil but expression is provided -// 3. No filter - when both filterFn and expression are nil, uses raw object JSON -// -// The function calculates a checksum over the filtered result (or full object if no filter) -// and populates metadata including resource ID and jq filter query (if applicable). -func Run(expression *Expression, filterFn FilterFn, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { - defer trace.StartRegion(context.Background(), "ApplyJqFilter").End() - - // Initialize result with object and resource ID - result := &kemtypes.ObjectAndFilterResult{ - Object: obj, - } - result.Metadata.ResourceId = fmt.Sprintf("%s/%s/%s", obj.GetNamespace(), obj.GetKind(), obj.GetName()) - - // Set JQ filter in metadata if expression is provided (even if custom filter takes precedence) - if expression != nil { - result.Metadata.JqFilter = expression.Query - } - - // Apply filters based on precedence: custom filter > jq expression > no filter - switch { - case filterFn != nil: - // Custom filter function takes highest precedence - return applyCustomFilter(filterFn, result, obj) - - case expression != nil: - // JQ expression filter when no custom filter is provided - return applyJQFilter(expression, result, obj) - - default: - // No filter - use raw object JSON when both filterFn and expression are nil - return applyNoFilter(result, obj) - } -} - -func applyCustomFilter(filterFn FilterFn, result *kemtypes.ObjectAndFilterResult, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { - filteredObj, err := filterFn(obj) + filtered, err := filterFn(obj) if err != nil { - funcName := runtime.FuncForPC(reflect.ValueOf(filterFn).Pointer()).Name() - return nil, fmt.Errorf("filterFn (%s) contains an error: %v", funcName, err) + return nil, fmt.Errorf("filter function (%s) execution error: %v", funcName(filterFn), err) } - filteredBytes, err := json.Marshal(filteredObj) + filteredBytes, err := json.Marshal(filtered) if err != nil { return nil, err } - result.FilterResult = filteredObj - result.Metadata.Checksum = utils_checksum.CalculateChecksum(string(filteredBytes)) - return result, nil + return &kemtypes.ObjectAndFilterResult{ + Object: obj, + Metadata: kemtypes.ObjectAndFilterResultMetadata{ + ResourceId: resourceID(obj), + Checksum: utils_checksum.CalculateChecksum(string(filteredBytes)), + }, + FilterResult: filtered, + }, nil } -func applyNoFilter(result *kemtypes.ObjectAndFilterResult, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { - data, err := json.Marshal(obj) - if err != nil { - return nil, err - } +// RunExpression runs a jq expression on an object and returns an ObjectAndFilterResult. +func RunExpression(expression *jq.Expression, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { + defer trace.StartRegion(context.Background(), "FilterRunExpression").End() - result.Metadata.Checksum = utils_checksum.CalculateChecksum(string(data)) - return result, nil -} - -func applyJQFilter(expression *Expression, result *kemtypes.ObjectAndFilterResult, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { - filtered, err := RunJQ(expression, obj.DeepCopy()) + filtered, err := jq.ExecuteJQ(expression, obj) if err != nil { - return nil, fmt.Errorf("jqFilter: %v", err) - } - - filteredStr := string(filtered) - result.FilterResult = filteredStr - result.Metadata.Checksum = utils_checksum.CalculateChecksum(filteredStr) - return result, nil -} - -func RunJQ(expression *Expression, obj *unstructured.Unstructured) ([]byte, error) { - // Execute jq expression and collect results - iter := expression.Run(obj.UnstructuredContent()) - var results []any - - for { - v, ok := iter.Next() - if !ok { - break - } - - // Handle errors from jq execution - if err, ok := v.(error); ok { - // HaltError with nil value means graceful termination - var haltErr *gojq.HaltError - if errors.As(err, &haltErr) && haltErr.Value() == nil { - break - } - return nil, err - } - - results = append(results, v) + return nil, fmt.Errorf("jq expression execution error: %v", err) } - // Marshal results based on count - switch len(results) { - case 0: - return []byte("null"), nil - case 1: - return json.Marshal(results[0]) - default: - return json.Marshal(results) - } + return &kemtypes.ObjectAndFilterResult{ + Object: obj, + Metadata: kemtypes.ObjectAndFilterResultMetadata{ + ResourceId: resourceID(obj), + Checksum: utils_checksum.CalculateChecksum(string(filtered)), + JqFilter: expression.Query(), + }, + FilterResult: filtered, + }, nil } -func CompileExpression(expression string) (*Expression, error) { - parsedQuery, err := gojq.Parse(expression) - if err != nil { - return nil, err - } - compiledQuery, err := gojq.Compile(parsedQuery) +// RunPlain runs NO filter function on an object and returns an ObjectAndFilterResult. +func RunPlain(obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { + // TODO: json operation could be avoided, we can caclculate checksum from the object itself + data, err := json.Marshal(obj) if err != nil { return nil, err } - return &Expression{ - Code: compiledQuery, - Query: expression, + return &kemtypes.ObjectAndFilterResult{ + Object: obj, + Metadata: kemtypes.ObjectAndFilterResultMetadata{ + ResourceId: resourceID(obj), + Checksum: utils_checksum.CalculateChecksum(string(data)), + }, }, nil } -func Info() string { - return "Filter implementation: using itchyny/gojq" +func resourceID(obj *unstructured.Unstructured) string { + return fmt.Sprintf("%s/%s/%s", obj.GetNamespace(), obj.GetKind(), obj.GetName()) +} + +func funcName(fn FilterFn) string { + return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() } diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index 73e6a616..597a8bf8 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -1,10 +1,10 @@ package filter import ( - "encoding/json" "errors" "testing" + "github.com/flant/shell-operator/pkg/jq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -15,8 +15,8 @@ func newTestObject(data map[string]interface{}) *unstructured.Unstructured { return &unstructured.Unstructured{Object: data} } -// Test Filter function - main entry point -func TestRun(t *testing.T) { +// Test RunFn function - custom filter function +func TestRunFn(t *testing.T) { t.Run("with custom filter function - success", func(t *testing.T) { obj := newTestObject(map[string]interface{}{ "apiVersion": "v1", @@ -36,7 +36,7 @@ func TestRun(t *testing.T) { return map[string]interface{}{"name": o.GetName()}, nil } - result, err := Run(nil, filterFn, obj) + result, err := RunFn(filterFn, obj) require.NoError(t, err) assert.NotNil(t, result) assert.Equal(t, obj, result.Object) @@ -52,10 +52,10 @@ func TestRun(t *testing.T) { return nil, expectedErr } - result, err := Run(nil, filterFn, obj) + result, err := RunFn(filterFn, obj) assert.Error(t, err) assert.Nil(t, result) - assert.Contains(t, err.Error(), "filterFn") + assert.Contains(t, err.Error(), "filter function") assert.Contains(t, err.Error(), "filter failed") }) @@ -67,42 +67,14 @@ func TestRun(t *testing.T) { return make(chan int), nil // channels can't be marshaled } - result, err := Run(nil, filterFn, obj) + result, err := RunFn(filterFn, obj) assert.Error(t, err) assert.Nil(t, result) }) +} - t.Run("no filter - full object checksum", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "apiVersion": "v1", - "kind": "ConfigMap", - "metadata": map[string]interface{}{ - "name": "test-cm", - "namespace": "kube-system", - }, - "data": map[string]interface{}{ - "key": "value", - }, - }) - - result, err := Run(nil, nil, obj) - require.NoError(t, err) - assert.NotNil(t, result) - assert.Equal(t, obj, result.Object) - assert.NotEmpty(t, result.Metadata.Checksum) - assert.Nil(t, result.FilterResult) - assert.Equal(t, "kube-system/ConfigMap/test-cm", result.Metadata.ResourceId) - }) - - t.Run("no filter - empty object", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{}) - - result, err := Run(nil, nil, obj) - require.NoError(t, err) - assert.NotNil(t, result) - assert.NotEmpty(t, result.Metadata.Checksum) - }) - +// Test RunExpression function - jq expression filter +func TestRunExpression(t *testing.T) { t.Run("with jq expression - simple filter", func(t *testing.T) { obj := newTestObject(map[string]interface{}{ "apiVersion": "v1", @@ -116,13 +88,13 @@ func TestRun(t *testing.T) { }, }) - expr, err := CompileExpression(".spec.nodeName") + expr, err := jq.CompileExpression(".spec.nodeName") require.NoError(t, err) - result, err := Run(expr, nil, obj) + result, err := RunExpression(expr, obj) require.NoError(t, err) assert.NotNil(t, result) - assert.Equal(t, `"node-1"`, result.FilterResult) + assert.Equal(t, `"node-1"`, string(result.FilterResult.([]byte))) assert.NotEmpty(t, result.Metadata.Checksum) assert.Equal(t, ".spec.nodeName", result.Metadata.JqFilter) }) @@ -137,13 +109,13 @@ func TestRun(t *testing.T) { }, }) - expr, err := CompileExpression(".spec.containers[].name") + expr, err := jq.CompileExpression(".spec.containers[].name") require.NoError(t, err) - result, err := Run(expr, nil, obj) + result, err := RunExpression(expr, obj) require.NoError(t, err) assert.NotNil(t, result) - assert.Equal(t, `["nginx","sidecar"]`, result.FilterResult) + assert.Equal(t, `["nginx","sidecar"]`, string(result.FilterResult.([]byte))) }) t.Run("with jq expression - filter returns null", func(t *testing.T) { @@ -151,13 +123,13 @@ func TestRun(t *testing.T) { "spec": map[string]interface{}{}, }) - expr, err := CompileExpression(".spec.nonexistent") + expr, err := jq.CompileExpression(".spec.nonexistent") require.NoError(t, err) - result, err := Run(expr, nil, obj) + result, err := RunExpression(expr, obj) require.NoError(t, err) assert.NotNil(t, result) - assert.Equal(t, "null", result.FilterResult) + assert.Equal(t, "null", string(result.FilterResult.([]byte))) }) t.Run("with jq expression - error", func(t *testing.T) { @@ -165,336 +137,46 @@ func TestRun(t *testing.T) { "spec": "not-an-object", }) - expr, err := CompileExpression(".spec.field") + expr, err := jq.CompileExpression(".spec.field") require.NoError(t, err) - result, err := Run(expr, nil, obj) + result, err := RunExpression(expr, obj) assert.Error(t, err) assert.Nil(t, result) }) } -// Test RunJQ function -func TestRunJQ(t *testing.T) { - t.Run("simple selection", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "name": "test", - "age": 42, - }) - - expr, err := CompileExpression(".name") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.Equal(t, `"test"`, string(result)) - }) - - t.Run("multiple results", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "items": []interface{}{1, 2, 3}, - }) - - expr, err := CompileExpression(".items[]") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.Equal(t, "[1,2,3]", string(result)) - }) - - t.Run("no results - returns null", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{}) - - expr, err := CompileExpression("empty") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.Equal(t, "null", string(result)) - }) - - t.Run("single null result", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "value": nil, - }) - - expr, err := CompileExpression(".value") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.Equal(t, "null", string(result)) - }) - - t.Run("complex object transformation", func(t *testing.T) { +// Test RunPlain function - no filter +func TestRunPlain(t *testing.T) { + t.Run("no filter - full object checksum", func(t *testing.T) { obj := newTestObject(map[string]interface{}{ - "users": []interface{}{ - map[string]interface{}{"name": "Alice", "age": 30}, - map[string]interface{}{"name": "Bob", "age": 25}, + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "test-cm", + "namespace": "kube-system", }, - }) - - expr, err := CompileExpression(".users | map({name, older: (.age > 26)})") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - - var parsed []map[string]interface{} - err = json.Unmarshal(result, &parsed) - require.NoError(t, err) - assert.Len(t, parsed, 2) - assert.Equal(t, "Alice", parsed[0]["name"]) - assert.Equal(t, true, parsed[0]["older"]) - assert.Equal(t, "Bob", parsed[1]["name"]) - assert.Equal(t, false, parsed[1]["older"]) - }) - - t.Run("array construction", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "a": 1, - "b": 2, - }) - - expr, err := CompileExpression("[.a, .b, (.a + .b)]") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.Equal(t, "[1,2,3]", string(result)) - }) - - t.Run("object construction", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "firstName": "John", - "lastName": "Doe", - }) - - expr, err := CompileExpression("{fullName: (.firstName + \" \" + .lastName)}") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.JSONEq(t, `{"fullName":"John Doe"}`, string(result)) - }) - - t.Run("type error", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "value": "string", - }) - - expr, err := CompileExpression(".value.field") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - assert.Error(t, err) - assert.Nil(t, result) - }) - - t.Run("halt error with nil value - graceful termination", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "items": []interface{}{1, 2, 3}, - }) - - // limit(0) produces halt error with nil value - expr, err := CompileExpression(".items[] | limit(0; .)") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.Equal(t, "null", string(result)) - }) - - t.Run("division by zero", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "value": 10, - }) - - expr, err := CompileExpression(".value / 0") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - assert.Error(t, err) - assert.Nil(t, result) - }) - - t.Run("nested array and object", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ "data": map[string]interface{}{ - "items": []interface{}{ - map[string]interface{}{ - "tags": []interface{}{"a", "b", "c"}, - }, - map[string]interface{}{ - "tags": []interface{}{"x", "y"}, - }, - }, + "key": "value", }, }) - expr, err := CompileExpression(".data.items[].tags[]") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.Equal(t, `["a","b","c","x","y"]`, string(result)) - }) - - t.Run("conditional expression", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "status": "active", - }) - - expr, err := CompileExpression(`if .status == "active" then "running" else "stopped" end`) - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.Equal(t, `"running"`, string(result)) - }) - - t.Run("boolean values", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "enabled": true, - "disabled": false, - }) - - expr, err := CompileExpression("[.enabled, .disabled]") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.Equal(t, "[true,false]", string(result)) - }) - - t.Run("numeric operations", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "a": 10, - "b": 3, - }) - - expr, err := CompileExpression("{sum: (.a + .b), diff: (.a - .b), prod: (.a * .b)}") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) + result, err := RunPlain(obj) require.NoError(t, err) - assert.JSONEq(t, `{"sum":13,"diff":7,"prod":30}`, string(result)) - }) - - t.Run("string operations", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "text": "Hello, World!", - }) - - expr, err := CompileExpression(".text | ascii_upcase") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.Equal(t, `"HELLO, WORLD!"`, string(result)) + assert.NotNil(t, result) + assert.Equal(t, obj, result.Object) + assert.NotEmpty(t, result.Metadata.Checksum) + assert.Nil(t, result.FilterResult) + assert.Equal(t, "kube-system/ConfigMap/test-cm", result.Metadata.ResourceId) }) - t.Run("empty object", func(t *testing.T) { + t.Run("no filter - empty object", func(t *testing.T) { obj := newTestObject(map[string]interface{}{}) - expr, err := CompileExpression(".") + result, err := RunPlain(obj) require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.Equal(t, "{}", string(result)) - }) - - t.Run("special characters in strings", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "text": "Line 1\nLine 2\tTabbed", - }) - - expr, err := CompileExpression(".text") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - - var text string - err = json.Unmarshal(result, &text) - require.NoError(t, err) - assert.Equal(t, "Line 1\nLine 2\tTabbed", text) - }) - - t.Run("unicode characters", func(t *testing.T) { - obj := newTestObject(map[string]interface{}{ - "emoji": "πŸš€βœ¨πŸŽ‰", - "cyrillic": "ΠŸΡ€ΠΈΠ²Π΅Ρ‚", - }) - - expr, err := CompileExpression("{emoji, cyrillic}") - require.NoError(t, err) - - result, err := RunJQ(expr, obj) - require.NoError(t, err) - assert.JSONEq(t, `{"emoji":"πŸš€βœ¨πŸŽ‰","cyrillic":"ΠŸΡ€ΠΈΠ²Π΅Ρ‚"}`, string(result)) - }) -} - -// Test CompileExpression function -func TestCompileExpression(t *testing.T) { - t.Run("valid simple expression", func(t *testing.T) { - expr, err := CompileExpression(".field") - require.NoError(t, err) - assert.NotNil(t, expr) - assert.NotNil(t, expr.Code) - assert.Equal(t, ".field", expr.Query) - }) - - t.Run("valid complex expression", func(t *testing.T) { - expr, err := CompileExpression(`.items[] | select(.type == "pod") | .metadata.name`) - require.NoError(t, err) - assert.NotNil(t, expr) - assert.NotNil(t, expr.Code) - }) - - t.Run("invalid expression - syntax error", func(t *testing.T) { - expr, err := CompileExpression(".field[") - assert.Error(t, err) - assert.Nil(t, expr) - }) - - t.Run("invalid expression - incomplete", func(t *testing.T) { - expr, err := CompileExpression(".") - require.NoError(t, err) - assert.NotNil(t, expr) - }) - - t.Run("empty expression", func(t *testing.T) { - expr, err := CompileExpression("") - assert.Error(t, err) - assert.Nil(t, expr) - }) - - t.Run("expression with function", func(t *testing.T) { - expr, err := CompileExpression(".items | length") - require.NoError(t, err) - assert.NotNil(t, expr) - }) - - t.Run("expression with pipe", func(t *testing.T) { - expr, err := CompileExpression(".data | keys | sort") - require.NoError(t, err) - assert.NotNil(t, expr) - }) - - t.Run("expression with map", func(t *testing.T) { - expr, err := CompileExpression(".items | map(.name)") - require.NoError(t, err) - assert.NotNil(t, expr) - }) - - t.Run("expression with select", func(t *testing.T) { - expr, err := CompileExpression(`.items[] | select(.status == "active")`) - require.NoError(t, err) - assert.NotNil(t, expr) + assert.NotNil(t, result) + assert.NotEmpty(t, result.Metadata.Checksum) }) } @@ -510,7 +192,7 @@ func TestResourceIdGeneration(t *testing.T) { }, }) - result, err := Run(nil, nil, obj) + result, err := RunPlain(obj) require.NoError(t, err) assert.Equal(t, "default/Pod/my-pod", result.Metadata.ResourceId) }) @@ -524,7 +206,7 @@ func TestResourceIdGeneration(t *testing.T) { }, }) - result, err := Run(nil, nil, obj) + result, err := RunPlain(obj) require.NoError(t, err) assert.Equal(t, "/Node/node-1", result.Metadata.ResourceId) }) @@ -535,80 +217,8 @@ func TestResourceIdGeneration(t *testing.T) { "kind": "Unknown", }) - result, err := Run(nil, nil, obj) + result, err := RunPlain(obj) require.NoError(t, err) assert.Equal(t, "/Unknown/", result.Metadata.ResourceId) }) } - -// Benchmark tests -func BenchmarkRun(b *testing.B) { - obj := newTestObject(map[string]interface{}{ - "apiVersion": "v1", - "kind": "Pod", - "metadata": map[string]interface{}{ - "name": "test-pod", - "namespace": "default", - "labels": map[string]interface{}{ - "app": "nginx", - }, - }, - "spec": map[string]interface{}{ - "containers": []interface{}{ - map[string]interface{}{ - "name": "nginx", - "image": "nginx:latest", - }, - }, - }, - }) - - b.Run("NoFilter", func(b *testing.B) { - for i := 0; i < b.N; i++ { - _, err := Run(nil, nil, obj) - if err != nil { - b.Fatal(err) - } - } - }) - - b.Run("CustomFilter", func(b *testing.B) { - filterFn := func(o *unstructured.Unstructured) (interface{}, error) { - return map[string]interface{}{"name": o.GetName()}, nil - } - for i := 0; i < b.N; i++ { - _, err := Run(nil, filterFn, obj) - if err != nil { - b.Fatal(err) - } - } - }) - - b.Run("JQFilter", func(b *testing.B) { - expr, err := CompileExpression(".metadata.name") - if err != nil { - b.Fatal(err) - } - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := Run(expr, nil, obj) - if err != nil { - b.Fatal(err) - } - } - }) - - b.Run("JQFilterComplex", func(b *testing.B) { - expr, err := CompileExpression(".spec.containers[].name") - if err != nil { - b.Fatal(err) - } - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := Run(expr, nil, obj) - if err != nil { - b.Fatal(err) - } - } - }) -} diff --git a/pkg/hook/config/config_v0.go b/pkg/hook/config/config_v0.go index 39f0f7e5..abb4c498 100644 --- a/pkg/hook/config/config_v0.go +++ b/pkg/hook/config/config_v0.go @@ -6,8 +6,8 @@ import ( "gopkg.in/robfig/cron.v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/flant/shell-operator/pkg/filter" htypes "github.com/flant/shell-operator/pkg/hook/types" + "github.com/flant/shell-operator/pkg/jq" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" smtypes "github.com/flant/shell-operator/pkg/schedule_manager/types" @@ -110,7 +110,7 @@ func (cv0 *HookConfigV0) ConvertAndCheck(c *HookConfig) error { monitor.WithLabelSelector(kubeCfg.Selector) if kubeCfg.JqFilter != "" { - filter, err := filter.CompileExpression(kubeCfg.JqFilter) + filter, err := jq.CompileExpression(kubeCfg.JqFilter) if err != nil { return fmt.Errorf("invalid jqFilter: %w", err) } diff --git a/pkg/hook/config/config_v1.go b/pkg/hook/config/config_v1.go index 1d4d07da..71934a71 100644 --- a/pkg/hook/config/config_v1.go +++ b/pkg/hook/config/config_v1.go @@ -12,8 +12,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "github.com/flant/shell-operator/pkg/app" - "github.com/flant/shell-operator/pkg/filter" htypes "github.com/flant/shell-operator/pkg/hook/types" + "github.com/flant/shell-operator/pkg/jq" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" smtypes "github.com/flant/shell-operator/pkg/schedule_manager/types" @@ -133,7 +133,7 @@ func (cv1 *HookConfigV1) ConvertAndCheck(c *HookConfig) error { monitor.WithLabelSelector(kubeCfg.LabelSelector) if kubeCfg.JqFilter != "" { - filter, err := filter.CompileExpression(kubeCfg.JqFilter) + filter, err := jq.CompileExpression(kubeCfg.JqFilter) if err != nil { return fmt.Errorf("invalid jqFilter: %w", err) } diff --git a/pkg/hook/controller/kubernetes_bindings_controller.go b/pkg/hook/controller/kubernetes_bindings_controller.go index 5f75b1c4..4c47505e 100644 --- a/pkg/hook/controller/kubernetes_bindings_controller.go +++ b/pkg/hook/controller/kubernetes_bindings_controller.go @@ -345,7 +345,7 @@ func ConvertKubeEventToBindingContext(kubeEvent kemtypes.KubeEvent, link *Kubern Objects: kubeEvent.Objects, } if link.BindingConfig.Monitor.JqFilter != nil { - bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter.Query + bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter.Query() } bc.Metadata.BindingType = htypes.OnKubernetesEvent bc.Metadata.IncludeSnapshots = link.BindingConfig.IncludeSnapshotsFrom @@ -362,7 +362,7 @@ func ConvertKubeEventToBindingContext(kubeEvent kemtypes.KubeEvent, link *Kubern Objects: kubeEvent.Objects, } if link.BindingConfig.Monitor.JqFilter != nil { - bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter.Query + bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter.Query() } bc.Metadata.BindingType = htypes.OnKubernetesEvent bc.Metadata.IncludeSnapshots = link.BindingConfig.IncludeSnapshotsFrom diff --git a/pkg/jq/jq.go b/pkg/jq/jq.go new file mode 100644 index 00000000..c1dfefbd --- /dev/null +++ b/pkg/jq/jq.go @@ -0,0 +1,73 @@ +package jq + +import ( + "encoding/json" + "errors" + + "github.com/itchyny/gojq" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +type Expression struct { + *gojq.Code + query string +} + +func (e *Expression) Query() string { + return e.query +} + +// ExecuteJQ executes jq expression and collect results +func ExecuteJQ(expression *Expression, obj *unstructured.Unstructured) ([]byte, error) { + iter := expression.Run(obj.UnstructuredContent()) + var results []any + + for { + v, ok := iter.Next() + if !ok { + break + } + + // Handle errors from jq execution + if err, ok := v.(error); ok { + // HaltError with nil value means graceful termination + var haltErr *gojq.HaltError + if errors.As(err, &haltErr) && haltErr.Value() == nil { + break + } + return nil, err + } + + results = append(results, v) + } + + // Marshal results based on count + switch len(results) { + case 0: + return []byte("null"), nil + case 1: + return json.Marshal(results[0]) + default: + return json.Marshal(results) + } +} + +func CompileExpression(expression string) (*Expression, error) { + parsedQuery, err := gojq.Parse(expression) + if err != nil { + return nil, err + } + compiledQuery, err := gojq.Compile(parsedQuery) + if err != nil { + return nil, err + } + + return &Expression{ + Code: compiledQuery, + query: expression, + }, nil +} + +func Info() string { + return "jq implementation: using itchyny/gojq" +} diff --git a/pkg/jq/jq_test.go b/pkg/jq/jq_test.go new file mode 100644 index 00000000..92cee5e4 --- /dev/null +++ b/pkg/jq/jq_test.go @@ -0,0 +1,428 @@ +package jq + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// Helper function to create test objects +func newTestObject(data map[string]interface{}) *unstructured.Unstructured { + return &unstructured.Unstructured{Object: data} +} + +// Test ExecuteJQ function +func TestExecuteJQ(t *testing.T) { + t.Run("simple selection", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "name": "test", + "age": 42, + }) + + expr, err := CompileExpression(".name") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `"test"`, string(result)) + }) + + t.Run("multiple results", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "items": []interface{}{1, 2, 3}, + }) + + expr, err := CompileExpression(".items[]") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "[1,2,3]", string(result)) + }) + + t.Run("no results - returns null", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{}) + + expr, err := CompileExpression("empty") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "null", string(result)) + }) + + t.Run("single null result", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "value": nil, + }) + + expr, err := CompileExpression(".value") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "null", string(result)) + }) + + t.Run("complex object transformation", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "users": []interface{}{ + map[string]interface{}{"name": "Alice", "age": 30}, + map[string]interface{}{"name": "Bob", "age": 25}, + }, + }) + + expr, err := CompileExpression(".users | map({name, older: (.age > 26)})") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + + var parsed []map[string]interface{} + err = json.Unmarshal(result, &parsed) + require.NoError(t, err) + assert.Len(t, parsed, 2) + assert.Equal(t, "Alice", parsed[0]["name"]) + assert.Equal(t, true, parsed[0]["older"]) + assert.Equal(t, "Bob", parsed[1]["name"]) + assert.Equal(t, false, parsed[1]["older"]) + }) + + t.Run("array construction", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "a": 1, + "b": 2, + }) + + expr, err := CompileExpression("[.a, .b, (.a + .b)]") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "[1,2,3]", string(result)) + }) + + t.Run("object construction", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "firstName": "John", + "lastName": "Doe", + }) + + expr, err := CompileExpression("{fullName: (.firstName + \" \" + .lastName)}") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.JSONEq(t, `{"fullName":"John Doe"}`, string(result)) + }) + + t.Run("type error", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "value": "string", + }) + + expr, err := CompileExpression(".value.field") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + assert.Error(t, err) + assert.Nil(t, result) + }) + + t.Run("halt error with nil value - graceful termination", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "items": []interface{}{1, 2, 3}, + }) + + // limit(0) produces halt error with nil value + expr, err := CompileExpression(".items[] | limit(0; .)") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "null", string(result)) + }) + + t.Run("division by zero", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "value": 10, + }) + + expr, err := CompileExpression(".value / 0") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + assert.Error(t, err) + assert.Nil(t, result) + }) + + t.Run("nested array and object", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "data": map[string]interface{}{ + "items": []interface{}{ + map[string]interface{}{ + "tags": []interface{}{"a", "b", "c"}, + }, + map[string]interface{}{ + "tags": []interface{}{"x", "y"}, + }, + }, + }, + }) + + expr, err := CompileExpression(".data.items[].tags[]") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `["a","b","c","x","y"]`, string(result)) + }) + + t.Run("conditional expression", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "status": "active", + }) + + expr, err := CompileExpression(`if .status == "active" then "running" else "stopped" end`) + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `"running"`, string(result)) + }) + + t.Run("boolean values", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "enabled": true, + "disabled": false, + }) + + expr, err := CompileExpression("[.enabled, .disabled]") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "[true,false]", string(result)) + }) + + t.Run("numeric operations", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "a": 10, + "b": 3, + }) + + expr, err := CompileExpression("{sum: (.a + .b), diff: (.a - .b), prod: (.a * .b)}") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.JSONEq(t, `{"sum":13,"diff":7,"prod":30}`, string(result)) + }) + + t.Run("string operations", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "text": "Hello, World!", + }) + + expr, err := CompileExpression(".text | ascii_upcase") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `"HELLO, WORLD!"`, string(result)) + }) + + t.Run("empty object", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{}) + + expr, err := CompileExpression(".") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "{}", string(result)) + }) + + t.Run("special characters in strings", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "text": "Line 1\nLine 2\tTabbed", + }) + + expr, err := CompileExpression(".text") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + + var text string + err = json.Unmarshal(result, &text) + require.NoError(t, err) + assert.Equal(t, "Line 1\nLine 2\tTabbed", text) + }) + + t.Run("unicode characters", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "emoji": "πŸš€βœ¨πŸŽ‰", + "cyrillic": "ΠŸΡ€ΠΈΠ²Π΅Ρ‚", + }) + + expr, err := CompileExpression("{emoji, cyrillic}") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.JSONEq(t, `{"emoji":"πŸš€βœ¨πŸŽ‰","cyrillic":"ΠŸΡ€ΠΈΠ²Π΅Ρ‚"}`, string(result)) + }) + + t.Run("kubernetes object filtering", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test-pod", + "namespace": "default", + "labels": map[string]interface{}{ + "app": "nginx", + }, + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + }, + }) + + expr, err := CompileExpression(".metadata.name") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `"test-pod"`, string(result)) + }) + + t.Run("kubernetes object complex filtering", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test-pod", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + map[string]interface{}{ + "name": "sidecar", + "image": "sidecar:latest", + }, + }, + }, + }) + + expr, err := CompileExpression(".spec.containers[].name") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `["nginx","sidecar"]`, string(result)) + }) +} + +// Test CompileExpression function +func TestCompileExpression(t *testing.T) { + t.Run("valid simple expression", func(t *testing.T) { + expr, err := CompileExpression(".field") + require.NoError(t, err) + assert.NotNil(t, expr) + assert.NotNil(t, expr.Code) + assert.Equal(t, ".field", expr.Query()) + }) + + t.Run("valid complex expression", func(t *testing.T) { + expr, err := CompileExpression(`.items[] | select(.type == "pod") | .metadata.name`) + require.NoError(t, err) + assert.NotNil(t, expr) + assert.NotNil(t, expr.Code) + }) + + t.Run("invalid expression - syntax error", func(t *testing.T) { + expr, err := CompileExpression(".field[") + assert.Error(t, err) + assert.Nil(t, expr) + }) + + t.Run("invalid expression - incomplete", func(t *testing.T) { + expr, err := CompileExpression(".") + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("empty expression", func(t *testing.T) { + expr, err := CompileExpression("") + assert.Error(t, err) + assert.Nil(t, expr) + }) + + t.Run("expression with function", func(t *testing.T) { + expr, err := CompileExpression(".items | length") + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("expression with pipe", func(t *testing.T) { + expr, err := CompileExpression(".data | keys | sort") + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("expression with map", func(t *testing.T) { + expr, err := CompileExpression(".items | map(.name)") + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("expression with select", func(t *testing.T) { + expr, err := CompileExpression(`.items[] | select(.status == "active")`) + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("expression with multiple pipes", func(t *testing.T) { + expr, err := CompileExpression(".items[] | select(.type == \"pod\") | .metadata.name | length") + require.NoError(t, err) + assert.NotNil(t, expr) + }) +} + +// Test Expression struct methods +func TestExpression(t *testing.T) { + t.Run("Query method returns original expression", func(t *testing.T) { + originalQuery := ".metadata.name" + expr, err := CompileExpression(originalQuery) + require.NoError(t, err) + assert.Equal(t, originalQuery, expr.Query()) + }) + + t.Run("Code field is set correctly", func(t *testing.T) { + expr, err := CompileExpression(".field") + require.NoError(t, err) + assert.NotNil(t, expr.Code) + }) +} + +// Test Info function +func TestInfo(t *testing.T) { + t.Run("returns correct implementation info", func(t *testing.T) { + info := Info() + assert.Equal(t, "jq implementation: using itchyny/gojq", info) + }) +} diff --git a/pkg/kube/object_patch/helpers.go b/pkg/kube/object_patch/helpers.go index 442d21a3..a9d22c28 100644 --- a/pkg/kube/object_patch/helpers.go +++ b/pkg/kube/object_patch/helpers.go @@ -12,7 +12,7 @@ import ( k8yaml "sigs.k8s.io/yaml" "github.com/flant/kube-client/manifest" - "github.com/flant/shell-operator/pkg/filter" + "github.com/flant/shell-operator/pkg/jq" ) func unmarshalFromJSONOrYAML(specs []byte) ([]OperationSpec, error) { @@ -64,11 +64,11 @@ func unmarshalFromYaml(yamlSpecs []byte) ([]OperationSpec, error) { return specSlice, nil } -func applyJQPatch(jqFilter *filter.Expression, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { - filterResult, err := filter.RunJQ(jqFilter, obj) +func applyJQPatch(jqFilter *jq.Expression, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + filterResult, err := jq.ExecuteJQ(jqFilter, obj) if err != nil { return nil, fmt.Errorf("failed to apply jqFilter:\n%sto Object:\n%s\n"+ - "error: %s", jqFilter.Query, obj, err) + "error: %s", jqFilter.Query(), obj, err) } retObj := &unstructured.Unstructured{} diff --git a/pkg/kube/object_patch/operation.go b/pkg/kube/object_patch/operation.go index 12530be7..1518123e 100644 --- a/pkg/kube/object_patch/operation.go +++ b/pkg/kube/object_patch/operation.go @@ -6,12 +6,11 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" sdkpkg "github.com/deckhouse/module-sdk/pkg" + "github.com/flant/shell-operator/pkg/jq" "github.com/hashicorp/go-multierror" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" - - "github.com/flant/shell-operator/pkg/filter" ) // OperationSpec a JSON and YAML representation of the operation for shell hooks @@ -288,7 +287,7 @@ func newPatchOperation(patchType types.PatchType, patch any, apiVersion, kind, n func NewPatchWithJQOperation(jqQuery string, apiVersion string, kind string, namespace string, name string, opts ...sdkpkg.PatchCollectorOption) sdkpkg.PatchCollectorOperation { return newFilterOperation(func(u *unstructured.Unstructured) (*unstructured.Unstructured, error) { - expression, err := filter.CompileExpression(jqQuery) + expression, err := jq.CompileExpression(jqQuery) if err != nil { return nil, err } diff --git a/pkg/kube_events_manager/monitor_config.go b/pkg/kube_events_manager/monitor_config.go index b17f2250..e7f7f3ec 100644 --- a/pkg/kube_events_manager/monitor_config.go +++ b/pkg/kube_events_manager/monitor_config.go @@ -5,7 +5,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "github.com/flant/shell-operator/pkg/filter" + "github.com/flant/shell-operator/pkg/jq" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" ) @@ -25,7 +25,7 @@ type MonitorConfig struct { NamespaceSelector *kemtypes.NamespaceSelector LabelSelector *metav1.LabelSelector FieldSelector *kemtypes.FieldSelector - JqFilter *filter.Expression + JqFilter *jq.Expression Logger *log.Logger Mode kemtypes.KubeEventMode KeepFullObjectsInMemory bool diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index 973dd177..d1a1db22 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -221,15 +221,7 @@ func (ei *resourceInformer) loadExistedObjects() error { // copy loop var to avoid duplication of pointer in filteredObjects obj := item - var objFilterRes *kemtypes.ObjectAndFilterResult - var err error - func() { - defer measure.Duration(func(d time.Duration) { - ei.metricStorage.HistogramObserve(metrics.KubeJqFilterDurationSeconds, d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) - })() - objFilterRes, err = filter.Run(ei.Monitor.JqFilter, ei.Monitor.FilterFunc, &obj) - }() - + objFilterRes, err := ei.transformObject(&obj) if err != nil { return err } @@ -300,14 +292,7 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty // Always calculate checksum and update cache, because we need an actual state in ei.cachedObjects. - var objFilterRes *kemtypes.ObjectAndFilterResult - var err error - func() { - defer measure.Duration(func(d time.Duration) { - ei.metricStorage.HistogramObserve(metrics.KubeJqFilterDurationSeconds, d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) - })() - objFilterRes, err = filter.Run(ei.Monitor.JqFilter, ei.Monitor.FilterFunc, obj) - }() + objFilterRes, err := ei.transformObject(obj) if err != nil { log.Error("handleWatchEvent: applyFilter error", slog.String("debugName", ei.Monitor.Metadata.DebugName), @@ -488,3 +473,24 @@ func (ei *resourceInformer) getCachedObjectsInfoIncrement() CachedObjectsInfo { ei.cachedObjectsIncrement = &CachedObjectsInfo{} return info } + +func (ei *resourceInformer) transformObject(obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { + recordDuration := measure.Duration(func(d time.Duration) { + ei.metricStorage.HistogramObserve( + metrics.KubeJqFilterDurationSeconds, + d.Seconds(), + ei.Monitor.Metadata.MetricLabels, + nil, + ) + }) + defer recordDuration() + + if ei.Monitor.JqFilter != nil { + return filter.RunExpression(ei.Monitor.JqFilter, obj) + } + if ei.Monitor.FilterFunc != nil { + return filter.RunFn(ei.Monitor.FilterFunc, obj) + } + + return filter.RunPlain(obj) +} diff --git a/pkg/kube_events_manager/types/types.go b/pkg/kube_events_manager/types/types.go index 5bdfb8d7..1f898e1a 100644 --- a/pkg/kube_events_manager/types/types.go +++ b/pkg/kube_events_manager/types/types.go @@ -35,16 +35,18 @@ const ( ) type ObjectAndFilterResult struct { - Metadata struct { - JqFilter string - Checksum string - ResourceId string // Used for sorting - RemoveObject bool - } + Metadata ObjectAndFilterResultMetadata Object *unstructured.Unstructured // here is a pointer because of MarshalJSON receiver FilterResult interface{} } +type ObjectAndFilterResultMetadata struct { + JqFilter string + Checksum string + ResourceId string // Used for sorting + RemoveObject bool +} + // Map constructs a map suitable for use in binding context. func (o ObjectAndFilterResult) Map() map[string]interface{} { m := map[string]interface{}{} diff --git a/pkg/shell-operator/bootstrap.go b/pkg/shell-operator/bootstrap.go index fdf19e35..c5460d9c 100644 --- a/pkg/shell-operator/bootstrap.go +++ b/pkg/shell-operator/bootstrap.go @@ -9,8 +9,8 @@ import ( "github.com/flant/shell-operator/pkg/app" "github.com/flant/shell-operator/pkg/config" "github.com/flant/shell-operator/pkg/debug" - "github.com/flant/shell-operator/pkg/filter" "github.com/flant/shell-operator/pkg/hook" + "github.com/flant/shell-operator/pkg/jq" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" "github.com/flant/shell-operator/pkg/metrics" schedulemanager "github.com/flant/shell-operator/pkg/schedule_manager" @@ -29,7 +29,7 @@ func Init(logger *log.Logger) (*ShellOperator, error) { // Log version and filter implementation logger.Info(app.AppStartMessage) - logger.Debug(filter.Info()) + logger.Debug(jq.Info()) hooksDir, err := utils.RequireExistingDirectory(app.HooksDir) if err != nil { From 33798ad3dfd774627c1cae4d22f7542f2ef76692 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Thu, 9 Oct 2025 15:49:50 +0300 Subject: [PATCH 7/7] chore: lint fix Signed-off-by: Timur Tuktamyshev --- pkg/filter/filter_test.go | 3 ++- pkg/kube/object_patch/operation.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index 597a8bf8..061fc0cf 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -4,10 +4,11 @@ import ( "errors" "testing" - "github.com/flant/shell-operator/pkg/jq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/flant/shell-operator/pkg/jq" ) // Helper function to create test objects diff --git a/pkg/kube/object_patch/operation.go b/pkg/kube/object_patch/operation.go index 1518123e..1af5c872 100644 --- a/pkg/kube/object_patch/operation.go +++ b/pkg/kube/object_patch/operation.go @@ -6,11 +6,12 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" sdkpkg "github.com/deckhouse/module-sdk/pkg" - "github.com/flant/shell-operator/pkg/jq" "github.com/hashicorp/go-multierror" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" + + "github.com/flant/shell-operator/pkg/jq" ) // OperationSpec a JSON and YAML representation of the operation for shell hooks