-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[Dataflow Streaming] Prepare BoundedQueueExecutor for MultiKey bundles #38592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.dataflow.worker.streaming; | ||
|
|
||
| /** | ||
| * A handle to use when requesting pulling more work from @BoundedQueueExecutor | ||
| * via @BoundedQueueExecutor.pollWork | ||
| */ | ||
| public interface BoundedQueueExecutorWorkHandle {} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,24 +18,29 @@ | |
| package org.apache.beam.runners.dataflow.worker.streaming; | ||
|
|
||
| import com.google.auto.value.AutoValue; | ||
| import java.util.function.Consumer; | ||
| import java.util.function.BiConsumer; | ||
| import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils; | ||
| import org.apache.beam.runners.dataflow.worker.windmill.Windmill; | ||
|
|
||
| /** {@link Work} instance and a processing function used to process the work. */ | ||
| @AutoValue | ||
| public abstract class ExecutableWork implements Runnable { | ||
| public abstract class ExecutableWork { | ||
|
|
||
| public static ExecutableWork create(Work work, Consumer<Work> executeWorkFn) { | ||
| public static ExecutableWork create( | ||
| Work work, BiConsumer<Work, BoundedQueueExecutorWorkHandle> executeWorkFn) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment on executeWorkFn and what handle is |
||
| return new AutoValue_ExecutableWork(work, executeWorkFn); | ||
| } | ||
|
|
||
| public abstract Work work(); | ||
|
|
||
| public abstract Consumer<Work> executeWorkFn(); | ||
| public abstract BiConsumer<Work, BoundedQueueExecutorWorkHandle> executeWorkFn(); | ||
|
|
||
| @Override | ||
| public void run() { | ||
| executeWorkFn().accept(work()); | ||
| public void run(BoundedQueueExecutorWorkHandle handle) { | ||
| try { | ||
| executeWorkFn().accept(work(), handle); | ||
| } catch (Throwable t) { | ||
| throw ExceptionUtils.propagate(t); | ||
| } | ||
| } | ||
|
|
||
| public final WorkId id() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,13 +17,18 @@ | |
| */ | ||
| package org.apache.beam.runners.dataflow.worker.util; | ||
|
|
||
| import java.util.Optional; | ||
| import java.util.concurrent.ConcurrentLinkedQueue; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import javax.annotation.concurrent.GuardedBy; | ||
| import org.apache.beam.runners.dataflow.worker.streaming.BoundedQueueExecutorWorkHandle; | ||
| import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; | ||
|
|
||
|
|
@@ -38,7 +43,19 @@ public class BoundedQueueExecutor { | |
|
|
||
| // Used to guard elementsOutstanding and bytesOutstanding. | ||
| private final Monitor monitor; | ||
| private final ConcurrentLinkedQueue<Long> decrementQueue = new ConcurrentLinkedQueue<>(); | ||
|
|
||
| private static class Budget { | ||
|
|
||
| final int elements; | ||
| final long bytes; | ||
|
|
||
| Budget(int elements, long bytes) { | ||
| this.elements = elements; | ||
| this.bytes = bytes; | ||
| } | ||
| } | ||
|
|
||
| private final ConcurrentLinkedQueue<Budget> decrementQueue = new ConcurrentLinkedQueue<>(); | ||
| private final Object decrementQueueDrainLock = new Object(); | ||
| private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false); | ||
| private int elementsOutstanding = 0; | ||
|
|
@@ -106,7 +123,7 @@ protected void afterExecute(Runnable r, Throwable t) { | |
|
|
||
| // Before adding a Work to the queue, check that there are enough bytes of space or no other | ||
| // outstanding elements of work. | ||
| public void execute(Runnable work, long workBytes) { | ||
| public void execute(ExecutableWork work, long workBytes) { | ||
| monitor.enterWhenUninterruptibly( | ||
| new Guard(monitor) { | ||
| @Override | ||
|
|
@@ -119,12 +136,18 @@ public boolean isSatisfied() { | |
| executeMonitorHeld(work, workBytes); | ||
| } | ||
|
|
||
| // Forcibly add something to the queue, ignoring the length limit. | ||
| public void forceExecute(Runnable work, long workBytes) { | ||
| // Forcibly add ExecutableWork to the queue, ignoring the limits. | ||
| public void forceExecute(ExecutableWork work, long workBytes) { | ||
| monitor.enter(); | ||
| executeMonitorHeld(work, workBytes); | ||
| } | ||
|
|
||
| /** Forcibly execute a Runnable callback with 0 bytes of size. */ | ||
| public void forceExecute(Runnable runnable) { | ||
| monitor.enter(); | ||
| executeMonitorHeld(runnable); | ||
| } | ||
|
|
||
| // Set the maximum/core pool size of the executor. | ||
| public synchronized void setMaximumPoolSize(int maximumPoolSize, int maximumElementsOutstanding) { | ||
| // For ThreadPoolExecutor, the maximum pool size should always greater than or equal to core | ||
|
|
@@ -221,8 +244,87 @@ public String summaryHtml() { | |
| } | ||
| } | ||
|
|
||
| private void executeMonitorHeld(Runnable work, long workBytes) { | ||
| class BoundedQueueExecutorWorkHandleImpl | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a comment
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. final? |
||
| implements BoundedQueueExecutorWorkHandle, AutoCloseable { | ||
|
|
||
| private int elements; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add GuardedBy("this") annotations to these |
||
| private long bytes; | ||
| private boolean closed = false; | ||
|
|
||
| private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) { | ||
| this.elements = elements; | ||
| this.bytes = bytes; | ||
| } | ||
|
|
||
| public synchronized void addBudget(int elements, long bytes) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we enforce elements and bytes being non-negative? |
||
| Preconditions.checkState(!closed, "Cannot add budget to a closed WorkBudgetHandle"); | ||
| this.elements += elements; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we incrementCounters here to match decrementCounters in close?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see below comment on merge function first |
||
| this.bytes += bytes; | ||
| } | ||
|
|
||
| public synchronized void cancel() { | ||
| this.closed = true; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we call close() instead? seems like we are going to not decrement the counters properly If this is intended I think it could have better method name and comments
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see below comment on merge function first |
||
| } | ||
|
|
||
| @Override | ||
| public synchronized void close() { | ||
| if (closed) return; | ||
| closed = true; | ||
| decrementCounters(this.elements, this.bytes); | ||
| } | ||
|
arunpandianp marked this conversation as resolved.
arunpandianp marked this conversation as resolved.
|
||
| } | ||
|
|
||
| private static class QueuedWork implements Runnable { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. final? |
||
|
|
||
| private final ExecutableWork work; | ||
| private final BoundedQueueExecutorWorkHandleImpl handle; | ||
| private final long workBytes; | ||
|
|
||
| public QueuedWork( | ||
| ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle, long workBytes) { | ||
| this.work = work; | ||
| this.handle = handle; | ||
| this.workBytes = workBytes; | ||
| } | ||
|
|
||
| public void cancelHandle() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment |
||
| handle.cancel(); | ||
| } | ||
|
|
||
| public ExecutableWork getWork() { | ||
| return work; | ||
| } | ||
|
|
||
| public long getWorkBytes() { | ||
| return workBytes; | ||
| } | ||
|
|
||
| @Override | ||
| public void run() { | ||
| Preconditions.checkArgument(!handle.closed); | ||
| try { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since it's autocloseable could you do |
||
| work.run(handle); | ||
| } finally { | ||
| handle.close(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void executeMonitorHeld(ExecutableWork work, long workBytes) { | ||
| ++elementsOutstanding; | ||
| bytesOutstanding += workBytes; | ||
| monitor.leave(); | ||
| BoundedQueueExecutorWorkHandleImpl handle = | ||
| new BoundedQueueExecutorWorkHandleImpl(1, workBytes); | ||
| try { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use } |
||
| executor.execute(new QueuedWork(work, handle, workBytes)); | ||
| } catch (Throwable e) { | ||
| handle.close(); | ||
| throw ExceptionUtils.propagate(e); | ||
| } | ||
| } | ||
|
|
||
| private void executeMonitorHeld(Runnable work) { | ||
| ++elementsOutstanding; | ||
| monitor.leave(); | ||
|
|
||
|
|
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long workBytes) { | |
| try { | ||
| work.run(); | ||
| } finally { | ||
| decrementCounters(workBytes); | ||
| decrementCounters(1, 0L); | ||
| } | ||
| }); | ||
| } catch (RuntimeException e) { | ||
| // If the execute() call threw an exception, decrement counters here. | ||
| decrementCounters(workBytes); | ||
| throw e; | ||
| } catch (Throwable e) { | ||
| decrementCounters(1, 0L); | ||
| throw ExceptionUtils.propagate(e); | ||
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() { | ||
| return new BoundedQueueExecutorWorkHandleImpl(0, 0L); | ||
| } | ||
|
|
||
| /** | ||
| * Poll additional work to be executed inline inside with the current execute(ExecutableWork work, | ||
| * long workBytes) call. It is the responsibility of the caller to execute or discard the returned | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does discard mean? |
||
| * ExecutableWork. Budget for the returned work is released when the execute() call finishes. | ||
| * | ||
| * @param handle the handle that was passed to ExecutableWork.executeWorkFn | ||
| */ | ||
| public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle handle) { | ||
| Preconditions.checkArgument(handle instanceof BoundedQueueExecutorWorkHandleImpl); | ||
| BoundedQueueExecutorWorkHandleImpl internalHandle = (BoundedQueueExecutorWorkHandleImpl) handle; | ||
|
arunpandianp marked this conversation as resolved.
arunpandianp marked this conversation as resolved.
|
||
| while (true) { | ||
| Runnable runnable = executor.getQueue().poll(); | ||
| if (runnable == null) { | ||
| return Optional.empty(); | ||
| } | ||
| if (runnable instanceof QueuedWork) { | ||
| QueuedWork queuedWork = (QueuedWork) runnable; | ||
| queuedWork.cancelHandle(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this is the only use of cancel/addBudget, you could instead have way for a handle to consume/merge with another I think that more limited API makes it clearer that the budget is already accounted to answering some of my questions above. |
||
| internalHandle.addBudget(1, queuedWork.getWorkBytes()); | ||
| return Optional.of(queuedWork.getWork()); | ||
| } | ||
| // Pop and execute standard callbacks immediately on the calling thread to drain the queue | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what are the standard callbacks? It seems like this could be undesirable to pause execution of a work item to try to merge stuff and then end up blocking on some other callback for potentially a long time. Other ways to handle this could be:
|
||
| runnable.run(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you do run it here we need to think about how to handle the possible exception as well. |
||
| } | ||
|
arunpandianp marked this conversation as resolved.
|
||
| } | ||
|
|
||
| private void decrementCounters(long workBytes) { | ||
| // All threads queue decrements and one thread grabs the monitor and updates | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. keep comment? |
||
| // counters. We do this to reduce contention on monitor which is locked by | ||
| // GetWork thread | ||
| decrementQueue.add(workBytes); | ||
| private void decrementCounters(int elements, long bytes) { | ||
| decrementQueue.add(new Budget(elements, bytes)); | ||
| boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true); | ||
| if (submittedToExistingBatch) { | ||
| // There is already a thread about to drain the decrement queue | ||
|
|
@@ -265,12 +394,12 @@ private void decrementCounters(long workBytes) { | |
| long bytesToDecrement = 0; | ||
| int elementsToDecrement = 0; | ||
| while (true) { | ||
| Long pollResult = decrementQueue.poll(); | ||
| Budget pollResult = decrementQueue.poll(); | ||
| if (pollResult == null) { | ||
| break; | ||
| } | ||
| bytesToDecrement += pollResult; | ||
| ++elementsToDecrement; | ||
| bytesToDecrement += pollResult.bytes; | ||
| elementsToDecrement += pollResult.elements; | ||
| } | ||
| if (elementsToDecrement == 0) { | ||
| return; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.dataflow.worker.util; | ||
|
|
||
| import org.apache.beam.sdk.annotations.Internal; | ||
|
|
||
| /** Utility methods for simplifying work with exceptions and throwables. */ | ||
| @Internal | ||
| public final class ExceptionUtils { | ||
|
|
||
| private ExceptionUtils() {} | ||
|
|
||
| /** | ||
| * Propagates {@code throwable} as-is if it is an instance of {@link RuntimeException} or {@link | ||
| * Error}, or else as a last resort wraps it in a {@code RuntimeException} and then propagates. | ||
| */ | ||
| public static RuntimeException propagate(Throwable throwable) { | ||
| if (throwable instanceof RuntimeException) { | ||
| throw (RuntimeException) throwable; | ||
| } else if (throwable instanceof Error) { | ||
| throw (Error) throwable; | ||
| } else { | ||
| throw new RuntimeException(throwable); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we remove the autovalue? It is pretty simple class and we run it a lot, could benefit from just being a final class without virtual function overhead