Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
package org.apache.hadoop.hdds.utils;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,20 +41,19 @@
*/
public abstract class BackgroundService {

protected static final Logger LOG =
LoggerFactory.getLogger(BackgroundService.class);
protected static final Logger LOG = LoggerFactory.getLogger(BackgroundService.class);

// Executor to launch child tasks
private ScheduledThreadPoolExecutor exec;
private UncheckedAutoCloseableSupplier<ScheduledExecutorService> periodicTaskScheduler;
private volatile ForkJoinPool exec;
private ThreadGroup threadGroup;
private final String serviceName;
private long interval;
private volatile long intervalInMillis;
private volatile long serviceTimeoutInNanos;
private TimeUnit unit;
private final int threadPoolSize;
private volatile int threadPoolSize;
private final String threadNamePrefix;
private final PeriodicalTask service;
private CompletableFuture<Void> future;
private volatile CompletableFuture<Void> future;
private volatile AtomicReference<Boolean> isShutdown;

public BackgroundService(String serviceName, long interval,
TimeUnit unit, int threadPoolSize, long serviceTimeout) {
Expand All @@ -60,15 +63,13 @@ public BackgroundService(String serviceName, long interval,
public BackgroundService(String serviceName, long interval,
TimeUnit unit, int threadPoolSize, long serviceTimeout,
String threadNamePrefix) {
this.interval = interval;
this.unit = unit;
setInterval(interval, unit);
this.serviceName = serviceName;
this.serviceTimeoutInNanos = TimeDuration.valueOf(serviceTimeout, unit)
.toLong(TimeUnit.NANOSECONDS);
this.threadPoolSize = threadPoolSize;
this.threadNamePrefix = threadNamePrefix;
initExecutorAndThreadGroup();
service = new PeriodicalTask();
this.future = CompletableFuture.completedFuture(null);
}

Expand All @@ -77,22 +78,23 @@ protected CompletableFuture<Void> getFuture() {
}

@VisibleForTesting
public synchronized ExecutorService getExecutorService() {
public synchronized ForkJoinPool getExecutorService() {
return this.exec;
}

public synchronized void setPoolSize(int size) {
/**
* Set the pool size for background service. This would require a shutdown and restart of the service for the
* change to take effect.
* @param size
*/
public void setPoolSize(int size) {
if (size <= 0) {
throw new IllegalArgumentException("Pool size must be positive.");
}

// In ScheduledThreadPoolExecutor, maximumPoolSize is Integer.MAX_VALUE
// the corePoolSize will always less maximumPoolSize.
// So we can directly set the corePoolSize
exec.setCorePoolSize(size);
this.threadPoolSize = size;
}

public synchronized void setServiceTimeoutInNanos(long newTimeout) {
public void setServiceTimeoutInNanos(long newTimeout) {
LOG.info("{} timeout is set to {} {}", serviceName, newTimeout, TimeUnit.NANOSECONDS.name().toLowerCase());
this.serviceTimeoutInNanos = newTimeout;
}
Expand All @@ -104,7 +106,7 @@ public int getThreadCount() {

@VisibleForTesting
public void runPeriodicalTaskNow() throws Exception {
BackgroundTaskQueue tasks = getTasks();
BackgroundTaskQueue tasks = getTasks(false);
while (!tasks.isEmpty()) {
tasks.poll().call();
}
Expand All @@ -116,18 +118,20 @@ public synchronized void start() {
if (exec == null || exec.isShutdown() || exec.isTerminated()) {
initExecutorAndThreadGroup();
}
LOG.info("Starting service {} with interval {} {}", serviceName,
interval, unit.name().toLowerCase());
exec.scheduleWithFixedDelay(service, 0, interval, unit);
LOG.info("Starting service {} with interval {} ms", serviceName, intervalInMillis);
exec.execute(new PeriodicalTask(periodicTaskScheduler.get()));
}

protected synchronized void setInterval(long newInterval, TimeUnit newUnit) {
this.interval = newInterval;
this.unit = newUnit;
protected void setInterval(long newInterval, TimeUnit newUnit) {
this.intervalInMillis = TimeDuration.valueOf(newInterval, newUnit).toLong(TimeUnit.MILLISECONDS);
}

protected synchronized long getIntervalMillis() {
return this.unit.toMillis(interval);
protected long getIntervalMillis() {
return intervalInMillis;
}

public BackgroundTaskQueue getTasks(boolean allowTasksToFork) {
return getTasks();
}

public abstract BackgroundTaskQueue getTasks();
Expand All @@ -138,84 +142,182 @@ protected void execTaskCompletion() { }
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
public class PeriodicalTask implements Runnable {
@Override
public void run() {
// wait for previous set of tasks to complete
try {
future.join();
} catch (RuntimeException e) {
LOG.error("Background service execution failed.", e);
} finally {
execTaskCompletion();
}
public class PeriodicalTask extends RecursiveAction {
private final Queue<BackgroundTask> tasksInFlight;
private final AtomicReference<Boolean> isShutdown;
private final ScheduledExecutorService scheduledExecuterService;

public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) {
this.tasksInFlight = new LinkedList<>();
this.isShutdown = BackgroundService.this.isShutdown;
this.scheduledExecuterService = scheduledExecutorService;
}

private PeriodicalTask(PeriodicalTask other) {
this.tasksInFlight = other.tasksInFlight;
this.isShutdown = other.isShutdown;
this.scheduledExecuterService = other.scheduledExecuterService;
}

private boolean performIfNotShutdown(Runnable runnable) {
return isShutdown.updateAndGet((shutdown) -> {
if (!shutdown) {
runnable.run();
}
return shutdown;
});
}

private <T> boolean performIfNotShutdown(Consumer<T> consumer, T t) {
return isShutdown.updateAndGet((shutdown) -> {
if (!shutdown) {
consumer.accept(t);
}
return shutdown;
});
}

private boolean runTasks() {
if (LOG.isDebugEnabled()) {
LOG.debug("Running background service : {}", serviceName);
}
BackgroundTaskQueue tasks = getTasks();
if (isShutdown.get()) {
return false;
}
if (!tasksInFlight.isEmpty()) {
LOG.warn("Tasks are still in flight service {}. This should not happen schedule should only begin once all " +
"tasks from schedules have completed execution.", serviceName);
tasksInFlight.clear();
}

BackgroundTaskQueue tasks = getTasks(true);
if (tasks.isEmpty()) {
// No task found, or some problems to init tasks
// return and retry in next interval.
return;
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Number of background tasks to execute : {}", tasks.size());
}
synchronized (BackgroundService.this) {
while (!tasks.isEmpty()) {
BackgroundTask task = tasks.poll();
future = future.thenCombine(CompletableFuture.runAsync(() -> {
long startTime = System.nanoTime();
try {
BackgroundTaskResult result = task.call();
if (LOG.isDebugEnabled()) {
LOG.debug("task execution result size {}", result.getSize());
}
} catch (Throwable e) {
LOG.error("Background task execution failed", e);
if (e instanceof Error) {
throw (Error) e;
}
} finally {
long endTime = System.nanoTime();
if (endTime - startTime > serviceTimeoutInNanos) {
LOG.warn("{} Background task execution took {}ns > {}ns(timeout)",
serviceName, endTime - startTime, serviceTimeoutInNanos);
}
}
}, exec).exceptionally(e -> null), (Void1, Void) -> null);
Consumer<BackgroundTask> taskForkHandler = task -> {
task.fork();
tasksInFlight.offer(task);
};
while (!tasks.isEmpty()) {
BackgroundTask task = tasks.poll();
// Fork and submit the task back to executor.
if (performIfNotShutdown(taskForkHandler, task)) {
return false;
}
}
Consumer<BackgroundTask> taskCompletionHandler = task -> {
BackgroundTask.BackgroundTaskForkResult result = task.join();
// Check for exception first in the task execution.
if (result.getThrowable() != null) {
LOG.error("Background task execution failed", result.getThrowable());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("task execution result size {}", result.getResult().getSize());
}
}
if (result.getTotalExecutionTime() > serviceTimeoutInNanos) {
LOG.warn("{} Background task execution took {}ns > {}ns(timeout)",
serviceName, result.getTotalExecutionTime(), serviceTimeoutInNanos);
}
};
while (!tasksInFlight.isEmpty()) {
BackgroundTask taskInFlight = tasksInFlight.poll();
// Join the tasks forked before and wait for the result one by one.
if (performIfNotShutdown(taskCompletionHandler, taskInFlight)) {
return false;
}
}
return true;
}

private void scheduleNextTask() {
performIfNotShutdown(() -> {
if (scheduledExecuterService != null) {
scheduledExecuterService.schedule(() -> exec.submit(new PeriodicalTask(this)),
intervalInMillis, TimeUnit.MILLISECONDS);
}
});
}

@Override
public void compute() {
future = new CompletableFuture<>();
if (runTasks()) {
scheduleNextTask();
} else {
LOG.debug("Service {} is shutdown. Cancelling all schedules of all tasks.", serviceName);
}
future.complete(null);
}
}

// shutdown and make sure all threads are properly released.
public synchronized void shutdown() {
public void shutdown() {
LOG.info("Shutting down service {}", this.serviceName);
exec.shutdown();
try {
if (!exec.awaitTermination(60, TimeUnit.SECONDS)) {
exec.shutdownNow();
final ThreadGroup threadGroupToBeClosed;
final ForkJoinPool execToShutdown;
final UncheckedAutoCloseableSupplier<ScheduledExecutorService> periodicTaskSchedulerToBeClosed;
// Set the shutdown flag to true to prevent new tasks from being submitted.
synchronized (this) {
periodicTaskSchedulerToBeClosed = periodicTaskScheduler;
threadGroupToBeClosed = threadGroup;
execToShutdown = exec;
exec = null;
threadGroup = null;
periodicTaskScheduler = null;
if (isShutdown != null) {
this.isShutdown.set(true);
}
isShutdown = null;
}
if (execToShutdown != null) {
execToShutdown.shutdown();
try {
if (!execToShutdown.awaitTermination(60, TimeUnit.SECONDS)) {
execToShutdown.shutdownNow();
}
} catch (InterruptedException e) {
// Re-interrupt the thread while catching InterruptedException
Thread.currentThread().interrupt();
execToShutdown.shutdownNow();
}
} catch (InterruptedException e) {
// Re-interrupt the thread while catching InterruptedException
Thread.currentThread().interrupt();
exec.shutdownNow();
}
if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
threadGroup.destroy();
if (periodicTaskSchedulerToBeClosed != null) {
periodicTaskSchedulerToBeClosed.close();
}
if (threadGroupToBeClosed != null && !threadGroupToBeClosed.isDestroyed()) {
threadGroupToBeClosed.destroy();
}
}

private void initExecutorAndThreadGroup() {
threadGroup = new ThreadGroup(serviceName);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setThreadFactory(r -> new Thread(threadGroup, r))
.setDaemon(true)
.setNameFormat(threadNamePrefix + serviceName + "#%d")
.build();
exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
private synchronized void initExecutorAndThreadGroup() {
try {
threadGroup = new ThreadGroup(serviceName);
Thread initThread = new Thread(threadGroup, () -> {
ForkJoinPool.ForkJoinWorkerThreadFactory factory =
pool -> {
ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) {
};
thread.setDaemon(true);
thread.setName(threadNamePrefix + serviceName + thread.getPoolIndex());
return thread;
};
exec = new ForkJoinPool(threadPoolSize, factory, null, false);
isShutdown = new AtomicReference<>(false);
});
initThread.start();
initThread.join();
periodicTaskScheduler = BackgroundServiceScheduler.get();
} catch (InterruptedException e) {
shutdown();
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

protected String getServiceName() {
Expand Down
Loading