Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.streaming;

/**
* A handle to use when requesting pulling more work from @BoundedQueueExecutor
* via @BoundedQueueExecutor.pollWork
*/
public interface BoundedQueueExecutorWorkHandle {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,29 @@
package org.apache.beam.runners.dataflow.worker.streaming;

import com.google.auto.value.AutoValue;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;

/** {@link Work} instance and a processing function used to process the work. */
@AutoValue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove the autovalue? It is pretty simple class and we run it a lot, could benefit from just being a final class without virtual function overhead

public abstract class ExecutableWork implements Runnable {
public abstract class ExecutableWork {

public static ExecutableWork create(Work work, Consumer<Work> executeWorkFn) {
public static ExecutableWork create(
Work work, BiConsumer<Work, BoundedQueueExecutorWorkHandle> executeWorkFn) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on executeWorkFn and what handle is

return new AutoValue_ExecutableWork(work, executeWorkFn);
}

public abstract Work work();

public abstract Consumer<Work> executeWorkFn();
public abstract BiConsumer<Work, BoundedQueueExecutorWorkHandle> executeWorkFn();

@Override
public void run() {
executeWorkFn().accept(work());
public void run(BoundedQueueExecutorWorkHandle handle) {
try {
executeWorkFn().accept(work(), handle);
} catch (Throwable t) {
throw ExceptionUtils.propagate(t);
}
}

public final WorkId id() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
*/
package org.apache.beam.runners.dataflow.worker.util;

import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.streaming.BoundedQueueExecutorWorkHandle;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;

Expand All @@ -38,7 +43,19 @@ public class BoundedQueueExecutor {

// Used to guard elementsOutstanding and bytesOutstanding.
private final Monitor monitor;
private final ConcurrentLinkedQueue<Long> decrementQueue = new ConcurrentLinkedQueue<>();

private static class Budget {

final int elements;
final long bytes;

Budget(int elements, long bytes) {
this.elements = elements;
this.bytes = bytes;
}
}

private final ConcurrentLinkedQueue<Budget> decrementQueue = new ConcurrentLinkedQueue<>();
private final Object decrementQueueDrainLock = new Object();
private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false);
private int elementsOutstanding = 0;
Expand Down Expand Up @@ -106,7 +123,7 @@ protected void afterExecute(Runnable r, Throwable t) {

// Before adding a Work to the queue, check that there are enough bytes of space or no other
// outstanding elements of work.
public void execute(Runnable work, long workBytes) {
public void execute(ExecutableWork work, long workBytes) {
monitor.enterWhenUninterruptibly(
new Guard(monitor) {
@Override
Expand All @@ -119,12 +136,18 @@ public boolean isSatisfied() {
executeMonitorHeld(work, workBytes);
}

// Forcibly add something to the queue, ignoring the length limit.
public void forceExecute(Runnable work, long workBytes) {
// Forcibly add ExecutableWork to the queue, ignoring the limits.
public void forceExecute(ExecutableWork work, long workBytes) {
monitor.enter();
executeMonitorHeld(work, workBytes);
}

/** Forcibly execute a Runnable callback with 0 bytes of size. */
public void forceExecute(Runnable runnable) {
monitor.enter();
executeMonitorHeld(runnable);
}

// Set the maximum/core pool size of the executor.
public synchronized void setMaximumPoolSize(int maximumPoolSize, int maximumElementsOutstanding) {
// For ThreadPoolExecutor, the maximum pool size should always greater than or equal to core
Expand Down Expand Up @@ -221,8 +244,87 @@ public String summaryHtml() {
}
}

private void executeMonitorHeld(Runnable work, long workBytes) {
class BoundedQueueExecutorWorkHandleImpl
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

implements BoundedQueueExecutorWorkHandle, AutoCloseable {

private int elements;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add GuardedBy("this") annotations to these

private long bytes;
private boolean closed = false;

private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
this.elements = elements;
this.bytes = bytes;
}

public synchronized void addBudget(int elements, long bytes) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we enforce elements and bytes being non-negative?

Preconditions.checkState(!closed, "Cannot add budget to a closed WorkBudgetHandle");
this.elements += elements;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we incrementCounters here to match decrementCounters in close?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see below comment on merge function first

this.bytes += bytes;
}

public synchronized void cancel() {
this.closed = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call close() instead? seems like we are going to not decrement the counters properly

If this is intended I think it could have better method name and comments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see below comment on merge function first

}

@Override
public synchronized void close() {
if (closed) return;
closed = true;
decrementCounters(this.elements, this.bytes);
}
Comment thread
arunpandianp marked this conversation as resolved.
Comment thread
arunpandianp marked this conversation as resolved.
}

private static class QueuedWork implements Runnable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?


private final ExecutableWork work;
private final BoundedQueueExecutorWorkHandleImpl handle;
private final long workBytes;

public QueuedWork(
ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle, long workBytes) {
this.work = work;
this.handle = handle;
this.workBytes = workBytes;
}

public void cancelHandle() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment

handle.cancel();
}

public ExecutableWork getWork() {
return work;
}

public long getWorkBytes() {
return workBytes;
}

@Override
public void run() {
Preconditions.checkArgument(!handle.closed);
try {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it's autocloseable could you do
try (handle) {
work.run(handle);
}

work.run(handle);
} finally {
handle.close();
}
}
}

private void executeMonitorHeld(ExecutableWork work, long workBytes) {
++elementsOutstanding;
bytesOutstanding += workBytes;
monitor.leave();
BoundedQueueExecutorWorkHandleImpl handle =
new BoundedQueueExecutorWorkHandleImpl(1, workBytes);
try {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use
try (BoundedQueueExecutorWorkHandleImpl handle = new BoundedQueueExecutorWorkHandleImpl(1, workBytes)) {

}

executor.execute(new QueuedWork(work, handle, workBytes));
} catch (Throwable e) {
handle.close();
throw ExceptionUtils.propagate(e);
}
}

private void executeMonitorHeld(Runnable work) {
++elementsOutstanding;
monitor.leave();

Expand All @@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long workBytes) {
try {
work.run();
} finally {
decrementCounters(workBytes);
decrementCounters(1, 0L);
}
});
} catch (RuntimeException e) {
// If the execute() call threw an exception, decrement counters here.
decrementCounters(workBytes);
throw e;
} catch (Throwable e) {
decrementCounters(1, 0L);
throw ExceptionUtils.propagate(e);
}
}

@VisibleForTesting
BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
}

/**
* Poll additional work to be executed inline inside with the current execute(ExecutableWork work,
* long workBytes) call. It is the responsibility of the caller to execute or discard the returned
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does discard mean?

* ExecutableWork. Budget for the returned work is released when the execute() call finishes.
*
* @param handle the handle that was passed to ExecutableWork.executeWorkFn
*/
public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle handle) {
Preconditions.checkArgument(handle instanceof BoundedQueueExecutorWorkHandleImpl);
BoundedQueueExecutorWorkHandleImpl internalHandle = (BoundedQueueExecutorWorkHandleImpl) handle;
Comment thread
arunpandianp marked this conversation as resolved.
Comment thread
arunpandianp marked this conversation as resolved.
while (true) {
Runnable runnable = executor.getQueue().poll();
if (runnable == null) {
return Optional.empty();
}
if (runnable instanceof QueuedWork) {
QueuedWork queuedWork = (QueuedWork) runnable;
queuedWork.cancelHandle();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is the only use of cancel/addBudget, you could instead have way for a handle to consume/merge with another

I think that more limited API makes it clearer that the budget is already accounted to answering some of my questions above.

internalHandle.addBudget(1, queuedWork.getWorkBytes());
return Optional.of(queuedWork.getWork());
}
// Pop and execute standard callbacks immediately on the calling thread to drain the queue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the standard callbacks? It seems like this could be undesirable to pause execution of a work item to try to merge stuff and then end up blocking on some other callback for potentially a long time.

Other ways to handle this could be:

  • don't remove the runnable and just stop iterating here
  • pull off runnables and put them back (if they are rare)
  • have separate queues

runnable.run();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you do run it here we need to think about how to handle the possible exception as well.

}
Comment thread
arunpandianp marked this conversation as resolved.
}

private void decrementCounters(long workBytes) {
// All threads queue decrements and one thread grabs the monitor and updates
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep comment?

// counters. We do this to reduce contention on monitor which is locked by
// GetWork thread
decrementQueue.add(workBytes);
private void decrementCounters(int elements, long bytes) {
decrementQueue.add(new Budget(elements, bytes));
boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true);
if (submittedToExistingBatch) {
// There is already a thread about to drain the decrement queue
Expand All @@ -265,12 +394,12 @@ private void decrementCounters(long workBytes) {
long bytesToDecrement = 0;
int elementsToDecrement = 0;
while (true) {
Long pollResult = decrementQueue.poll();
Budget pollResult = decrementQueue.poll();
if (pollResult == null) {
break;
}
bytesToDecrement += pollResult;
++elementsToDecrement;
bytesToDecrement += pollResult.bytes;
elementsToDecrement += pollResult.elements;
}
if (elementsToDecrement == 0) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.util;

import org.apache.beam.sdk.annotations.Internal;

/** Utility methods for simplifying work with exceptions and throwables. */
@Internal
public final class ExceptionUtils {

private ExceptionUtils() {}

/**
* Propagates {@code throwable} as-is if it is an instance of {@link RuntimeException} or {@link
* Error}, or else as a last resort wraps it in a {@code RuntimeException} and then propagates.
*/
public static RuntimeException propagate(Throwable throwable) {
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw new RuntimeException(throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void finalizeCommits(Iterable<Long> finalizeIds) {
}
for (Runnable callback : callbacksToExecute) {
try {
finalizationExecutor.forceExecute(callback, 0);
finalizationExecutor.forceExecute(callback);
} catch (OutOfMemoryError oom) {
throw oom;
} catch (Throwable t) {
Expand Down
Loading
Loading