From 93421185992830ea1f0d58486e533c43805292aa Mon Sep 17 00:00:00 2001 From: fjtirado Date: Mon, 11 May 2026 16:34:38 +0200 Subject: [PATCH] [Fix #1380] Adding AllStrategyCorrelationInfo customization. Signed-off-by: fjtirado --- .../impl/WorkflowApplication.java | 19 +++++ .../scheduler/AllStrategyCorrelationInfo.java | 30 ++++++++ .../AllStrategyCorrelationInfoFactory.java | 22 ++++++ .../InMemoryAllStrategyCorrelationInfo.java | 74 +++++++++++++++++++ .../scheduler/ScheduledEventConsumer.java | 54 ++++---------- 5 files changed, 159 insertions(+), 40 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfo.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfoFactory.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 08b22cdbf..489e60206 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -43,7 +43,9 @@ import io.serverlessworkflow.impl.resources.ExternalResourceHandler; import io.serverlessworkflow.impl.resources.ResourceLoaderFactory; import io.serverlessworkflow.impl.resources.URITemplateResolver; +import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfoFactory; import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler; +import io.serverlessworkflow.impl.scheduler.InMemoryAllStrategyCorrelationInfo; import io.serverlessworkflow.impl.scheduler.WorkflowScheduler; import io.serverlessworkflow.impl.schema.SchemaValidator; import io.serverlessworkflow.impl.schema.SchemaValidatorFactory; @@ -93,6 +95,7 @@ public class WorkflowApplication implements AutoCloseable { private final URI defaultCatalogURI; private final Collection callableProxyBuilders; private final CloudEventPredicateFactory cloudEventPredicateFactory; + private final AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory; private WorkflowApplication(Builder builder) { this.taskFactory = builder.taskFactory; @@ -122,6 +125,7 @@ private WorkflowApplication(Builder builder) { this.id = builder.id; this.callableProxyBuilders = builder.callableProxyBuilders; this.cloudEventPredicateFactory = builder.cloudEventPredicateFactory; + this.allStrategyCorrelationInfoFactory = builder.allStrategyCorrelationInfoFactory; } public TaskExecutorFactory taskFactory() { @@ -240,6 +244,7 @@ public SchemaValidator getValidator(SchemaInline inline) { private Optional functionReader; private URI defaultCatalogURI; private CloudEventPredicateFactory cloudEventPredicateFactory; + private AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory; private Builder() { ServiceLoader.load(NamedWorkflowAdditionalObject.class) @@ -372,6 +377,12 @@ public Builder withCloudEventPredicateFactory( return this; } + public Builder withAllStrategyCorrelationInfoFactory( + AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory) { + this.allStrategyCorrelationInfoFactory = allStrategyCorrelationInfoFactory; + return this; + } + public WorkflowApplication build() { if (modelFactory == null) { @@ -432,6 +443,10 @@ public WorkflowApplication build() { loadFirst(CloudEventPredicateFactory.class) .orElseGet(() -> new DefaultCloudEventPredicateFactory()); } + if (allStrategyCorrelationInfoFactory == null) { + allStrategyCorrelationInfoFactory = definition -> new InMemoryAllStrategyCorrelationInfo(); + } + if (defaultCatalogURI == null) { defaultCatalogURI = URI.create("https://github.com/serverlessworkflow/catalog"); } @@ -559,4 +574,8 @@ public Optional additionalObject( public Collection callableProxyBuilders() { return callableProxyBuilders; } + + public AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory() { + return allStrategyCorrelationInfoFactory; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfo.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfo.java new file mode 100644 index 000000000..e9e73f025 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfo.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.events.EventRegistrationBuilder; +import java.util.Collection; +import java.util.function.Consumer; + +public interface AllStrategyCorrelationInfo extends AutoCloseable { + void correlate( + EventRegistrationBuilder reg, CloudEvent event, Consumer> starter); + + void register(EventRegistrationBuilder reg); + + default void close() {} +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfoFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfoFactory.java new file mode 100644 index 000000000..4d6e34478 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfoFactory.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import java.util.function.Function; + +public interface AllStrategyCorrelationInfoFactory + extends Function {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java new file mode 100644 index 000000000..4fcf7a9d1 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java @@ -0,0 +1,74 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.events.EventRegistrationBuilder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +public class InMemoryAllStrategyCorrelationInfo implements AllStrategyCorrelationInfo { + + private Map> correlatedEvents; + + @Override + public void correlate( + EventRegistrationBuilder reg, CloudEvent event, Consumer> starter) { + Collection collection = new ArrayList<>(); + // to minimize the critical section, conversion is done later, here we are + // performing just collection, if any + synchronized (correlatedEvents) { + correlatedEvents.get(reg).add(event); + Collection> events = correlatedEvents.values(); + if (satisfyCondition(events)) { + for (List values : events) { + collection.add(values.remove(0)); + } + } + } + if (!collection.isEmpty()) { + starter.accept(collection); + } + } + + @Override + public void register(EventRegistrationBuilder reg) { + if (correlatedEvents == null) { + correlatedEvents = new HashMap<>(); + } + correlatedEvents.put(reg, new ArrayList()); + } + + private boolean satisfyCondition(Collection> events) { + for (List values : events) { + if (values.isEmpty()) { + return false; + } + } + return true; + } + + @Override + public void close() { + if (correlatedEvents != null) { + correlatedEvents.clear(); + } + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java index 01f3f6f7f..6443c4667 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java @@ -15,19 +15,17 @@ */ package io.serverlessworkflow.impl.scheduler; +import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; + import io.cloudevents.CloudEvent; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowModelCollection; import io.serverlessworkflow.impl.events.EventConsumer; import io.serverlessworkflow.impl.events.EventRegistration; -import io.serverlessworkflow.impl.events.EventRegistrationBuilder; import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.function.Function; public class ScheduledEventConsumer implements AutoCloseable { @@ -37,8 +35,8 @@ public class ScheduledEventConsumer implements AutoCloseable { private final EventRegistrationBuilderInfo builderInfo; private final EventConsumer eventConsumer; private final ScheduledInstanceRunnable instanceRunner; - private Map> correlatedEvents; - private Collection registrations = new ArrayList<>(); + private final Collection registrations = new ArrayList<>(); + private AllStrategyCorrelationInfo allStrategyCorrelationInfo; public ScheduledEventConsumer( WorkflowDefinition definition, @@ -50,17 +48,23 @@ public ScheduledEventConsumer( this.builderInfo = builderInfo; this.instanceRunner = instanceRunner; this.eventConsumer = definition.application().eventConsumer(); + if (builderInfo.registrations().isAnd() && builderInfo.registrations().registrations().size() > 1) { - this.correlatedEvents = new HashMap<>(); + this.allStrategyCorrelationInfo = + definition.application().allStrategyCorrelationInfoFactory().apply(definition); builderInfo .registrations() .registrations() .forEach( reg -> { - correlatedEvents.put(reg, new ArrayList<>()); + allStrategyCorrelationInfo.register(reg); registrations.add( - eventConsumer.register(reg, ce -> consumeEvent(reg, (CloudEvent) ce))); + eventConsumer.register( + reg, + ce -> + allStrategyCorrelationInfo.correlate( + reg, (CloudEvent) ce, this::start))); }); } else { builderInfo @@ -71,34 +75,6 @@ public ScheduledEventConsumer( } } - private void consumeEvent(EventRegistrationBuilder reg, CloudEvent ce) { - Collection> collections = new ArrayList<>(); - // to minimize the critical section, conversion is done later, here we are - // performing - // just collection, if any - synchronized (correlatedEvents) { - correlatedEvents.get(reg).add((CloudEvent) ce); - while (satisfyCondition()) { - Collection collection = new ArrayList<>(); - for (List values : correlatedEvents.values()) { - collection.add(values.remove(0)); - } - collections.add(collection); - } - } - // convert and start outside synchronized - collections.forEach(this::start); - } - - private boolean satisfyCondition() { - for (List values : correlatedEvents.values()) { - if (values.isEmpty()) { - return false; - } - } - return true; - } - protected void start(CloudEvent ce) { WorkflowModelCollection model = definition.application().modelFactory().createCollection(); model.add(converter.apply(ce)); @@ -112,9 +88,7 @@ protected void start(Collection ces) { } public void close() { - if (correlatedEvents != null) { - correlatedEvents.clear(); - } registrations.forEach(eventConsumer::unregister); + safeClose(allStrategyCorrelationInfo); } }