diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkOutputCommitter.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkOutputCommitter.java index fb69e7c6c..4ef9e5546 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkOutputCommitter.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkOutputCommitter.java @@ -23,6 +23,8 @@ import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; @@ -35,6 +37,7 @@ * Delegated instances are supplied along with a schema, which is used to configure the commit operation. */ public class DelegatingMultiSinkOutputCommitter extends OutputCommitter { + private static final Logger LOG = LoggerFactory.getLogger(DelegatingMultiSinkOutputCommitter.class); private final Map committerMap; private final Map schemaMap; private final String projectName; @@ -99,18 +102,28 @@ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOE @Override public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { for (String tableName : committerMap.keySet()) { - configureContext(taskAttemptContext, tableName); - - committerMap.get(tableName).commitTask(taskAttemptContext); + try { + configureContext(taskAttemptContext, tableName); + committerMap.get(tableName).commitTask(taskAttemptContext); + } catch (IOException e) { + LOG.warn("BigQuery multi-sink table '{}' failed during task commit. Reason: {}", + tableName, getFailureReason(e), e); + throw e; + } } } @Override public void commitJob(JobContext jobContext) throws IOException { for (String tableName : committerMap.keySet()) { - configureContext(jobContext, tableName); - - committerMap.get(tableName).commitJob(jobContext); + try { + configureContext(jobContext, tableName); + committerMap.get(tableName).commitJob(jobContext); + } catch (IOException e) { + LOG.warn("BigQuery multi-sink table '{}' failed during job commit. Reason: {}", + tableName, getFailureReason(e), e); + throw e; + } } } @@ -168,4 +181,12 @@ public void configureContext(JobContext context, String tableName) throws IOExce gcsPath, fields); } + + private String getFailureReason(IOException exception) { + Throwable rootCause = exception; + while (rootCause.getCause() != null) { + rootCause = rootCause.getCause(); + } + return rootCause.getMessage() == null ? exception.getMessage() : rootCause.getMessage(); + } }