diff --git a/client-v2/src/main/java/com/clickhouse/client/api/ClickHouseException.java b/client-v2/src/main/java/com/clickhouse/client/api/ClickHouseException.java index 83da17938..4793515d0 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/ClickHouseException.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/ClickHouseException.java @@ -3,6 +3,8 @@ public class ClickHouseException extends RuntimeException { protected boolean isRetryable = false; + protected String queryId; + public ClickHouseException(String message) { super(message); } @@ -11,8 +13,21 @@ public ClickHouseException(String message, Throwable cause) { super(message, cause); } + public ClickHouseException(String message, Throwable cause, String queryId) { + super(message, cause); + this.queryId = queryId; + } + public ClickHouseException(Throwable cause) { super(cause); } public boolean isRetryable() { return isRetryable; } + + public void setQueryId(String queryId) { + this.queryId = queryId; + } + + public String getQueryId() { + return queryId; + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 9287b76fc..9d983decb 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -137,21 +137,18 @@ public class Client implements AutoCloseable { // Server context private String dbUser; private String serverVersion; - private Object metricsRegistry; - private int retries; + private final Object metricsRegistry; + private final int retries; private LZ4Factory lz4Factory = null; + private final Supplier queryIdGenerator; private Client(Collection endpoints, Map configuration, - ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy) { - this(endpoints, configuration, sharedOperationExecutor, columnToMethodMatchingStrategy, null); - } - - private Client(Collection endpoints, Map configuration, - ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy, Object metricsRegistry) { - // Simple initialization + ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy, + Object metricsRegistry, Supplier queryIdGenerator) { this.configuration = ClientConfigProperties.parseConfigMap(configuration); this.readOnlyConfig = Collections.unmodifiableMap(configuration); this.metricsRegistry = metricsRegistry; + this.queryIdGenerator = queryIdGenerator; // Serialization this.pojoSerDe = new POJOSerDe(columnToMethodMatchingStrategy); @@ -266,6 +263,8 @@ public static class Builder { private ExecutorService sharedOperationExecutor = null; private ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy; private Object metricRegistry = null; + private Supplier queryIdGenerator; + public Builder() { this.endpoints = new HashSet<>(); this.configuration = new HashMap<>(); @@ -1085,6 +1084,16 @@ public Builder useHttpFormDataForQuery(boolean enable) { return this; } + /** + * Sets query id generator. Will be used when operation settings (InsertSettings, QuerySettings) do not have query id set. + * @param supplier + * @return + */ + public Builder setQueryIdGenerator(Supplier supplier) { + this.queryIdGenerator = supplier; + return this; + } + public Client build() { // check if endpoint are empty. so can not initiate client if (this.endpoints.isEmpty()) { @@ -1143,7 +1152,7 @@ public Client build() { } return new Client(this.endpoints, this.configuration, this.sharedOperationExecutor, - this.columnToMethodMatchingStrategy, this.metricRegistry); + this.columnToMethodMatchingStrategy, this.metricRegistry, this.queryIdGenerator); } } @@ -1282,6 +1291,9 @@ public CompletableFuture insert(String tableName, List data, final int maxRetries = retry == null ? 0 : retry; requestSettings.setOption(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), format); + if (requestSettings.getQueryId() == null && queryIdGenerator != null) { + requestSettings.setQueryId(queryIdGenerator.get()); + } Supplier supplier = () -> { long startTime = System.nanoTime(); // Selecting some node @@ -1328,8 +1340,8 @@ public CompletableFuture insert(String tableName, List data, metrics.setQueryId(queryId); return new InsertResponse(metrics); } catch (Exception e) { - lastException = httpClientHelper.wrapException(String.format("Query request failed (Attempt: %s/%s - Duration: %s)", - (i + 1), (maxRetries + 1), durationSince(startTime)), e); + String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); + lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) { LOG.warn("Retrying.", e); selectedEndpoint = getNextAliveNode(); @@ -1338,8 +1350,10 @@ public CompletableFuture insert(String tableName, List data, } } } - throw new ClientException("Insert request failed after attempts: " + (maxRetries + 1) + " - Duration: " + durationSince(startTime), lastException); - }; + + String errMsg = requestExMsg("Insert", retries, durationSince(startTime).toMillis(), requestSettings.getQueryId()); + LOG.warn(errMsg); + throw (lastException == null ? new ClientException(errMsg) : lastException); }; return runAsyncOperation(supplier, requestSettings.getAllSettings()); @@ -1499,6 +1513,9 @@ public CompletableFuture insert(String tableName, } sqlStmt.append(" FORMAT ").append(format.name()); requestSettings.serverSetting(ClickHouseHttpProto.QPARAM_QUERY_STMT, sqlStmt.toString()); + if (requestSettings.getQueryId() == null && queryIdGenerator != null) { + requestSettings.setQueryId(queryIdGenerator.get()); + } responseSupplier = () -> { long startTime = System.nanoTime(); // Selecting some node @@ -1530,8 +1547,8 @@ public CompletableFuture insert(String tableName, metrics.setQueryId(queryId); return new InsertResponse(metrics); } catch (Exception e) { - lastException = httpClientHelper.wrapException(String.format("Insert failed (Attempt: %s/%s - Duration: %s)", - (i + 1), (retries + 1), durationSince(startTime)), e); + String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); + lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) { LOG.warn("Retrying.", e); selectedEndpoint = getNextAliveNode(); @@ -1548,8 +1565,9 @@ public CompletableFuture insert(String tableName, } } } - LOG.warn("Insert request failed after attempts: {} - Duration: {}", retries + 1, durationSince(startTime)); - throw (lastException == null ? new ClientException("Failed to complete insert operation") : lastException); + String errMsg = requestExMsg("Insert", retries, durationSince(startTime).toMillis(), requestSettings.getQueryId()); + LOG.warn(errMsg); + throw (lastException == null ? new ClientException(errMsg) : lastException); }; return runAsyncOperation(responseSupplier, requestSettings.getAllSettings()); @@ -1598,7 +1616,7 @@ public CompletableFuture query(String sqlQuery, QuerySettings set * * Notes: *
    - *
  • Server response format can be specified thru {@code settings} or in SQL query.
  • + *
  • Server response format can be specified through {@code settings} or in SQL query.
  • *
  • If specified in both, the {@code sqlQuery} will take precedence.
  • *
* @@ -1623,6 +1641,10 @@ public CompletableFuture query(String sqlQuery, Map responseSupplier = () -> { long startTime = System.nanoTime(); // Selecting some node @@ -1666,8 +1688,8 @@ public CompletableFuture query(String sqlQuery, Map query(String sqlQuery, Map buildRequestSettings(Map opSettings) private Duration durationSince(long sinceNanos) { return Duration.ofNanos(System.nanoTime() - sinceNanos); } + + private String requestExMsg(String operation, int attempt, long operationDuration, String queryId) { + return operation + " request failed (attempt: " + attempt +", duration: " + operationDuration + "ms, queryId: " + queryId + ")"; + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/ServerException.java b/client-v2/src/main/java/com/clickhouse/client/api/ServerException.java index 53dd43429..bd2361bfa 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/ServerException.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/ServerException.java @@ -12,15 +12,14 @@ public class ServerException extends ClickHouseException { private final int transportProtocolCode; - public ServerException(int code, String message) { - this(code, message, 500); - } + private final String queryId; - public ServerException(int code, String message, int transportProtocolCode) { + public ServerException(int code, String message, int transportProtocolCode, String queryId) { super(message); this.code = code; this.transportProtocolCode = transportProtocolCode; this.isRetryable = discoverIsRetryable(code, message, transportProtocolCode); + this.queryId = queryId; } /** @@ -41,8 +40,17 @@ public int getTransportProtocolCode() { return transportProtocolCode; } + @Override public boolean isRetryable() { - return isRetryable; + return super.isRetryable(); + } + + /** + * Returns query ID that is returned by server in {@link com.clickhouse.client.api.http.ClickHouseHttpProto#HEADER_QUERY_ID} + * @return query id non-null string + */ + public String getQueryId() { + return queryId; } private boolean discoverIsRetryable(int code, String message, int transportProtocolCode) { @@ -68,4 +76,27 @@ private boolean discoverIsRetryable(int code, String message, int transportProto }; return false; } + + /** + * Not every server code is listed - only most common + */ + public enum ErrorCodes { + + UNKNOWN(0), + TABLE_NOT_FOUND(60), + DATABASE_NOT_FOUND(81), + UNKNOWN_SETTING(115), + + ; + + private int code; + + ErrorCodes(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 675b37fdd..31cdeff3f 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -93,6 +93,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -341,6 +342,7 @@ public CloseableHttpClient createHttpClient(boolean initSslContext, Map (transport error: " + httpResponse.getCode() + ")"; } - return new ServerException(serverCode, "Code: " + msg, httpResponse.getCode()); + return new ServerException(serverCode, "Code: " + msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId); } catch (Exception e) { LOG.error("Failed to read error message", e); - return new ServerException(serverCode, String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " (transport error: " + httpResponse.getCode() + ")", httpResponse.getCode()); + String msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " (transport error: " + httpResponse.getCode() + ")"; + return new ServerException(serverCode, msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId); } } @@ -799,7 +804,7 @@ public boolean shouldRetry(Throwable ex, Map requestSettings) { // This method wraps some client specific exceptions into specific ClientException or just ClientException // ClientException will be also wrapped - public RuntimeException wrapException(String message, Exception cause) { + public RuntimeException wrapException(String message, Exception cause, String queryId) { if (cause instanceof ClientException || cause instanceof ServerException) { return (RuntimeException) cause; } @@ -810,14 +815,18 @@ public RuntimeException wrapException(String message, Exception cause) { cause instanceof ConnectException || cause instanceof UnknownHostException || cause instanceof NoRouteToHostException) { - return new ConnectionInitiationException(message, cause); + ConnectionInitiationException ex = new ConnectionInitiationException(message, cause); + ex.setQueryId(queryId); + return ex; } if (cause instanceof SocketTimeoutException || cause instanceof IOException) { - return new DataTransferException(message, cause); + DataTransferException ex = new DataTransferException(message, cause); + ex.setQueryId(queryId); + return ex; } // if we can not identify the exception explicitly we catch as our base exception ClickHouseException - return new ClickHouseException(message, cause); + return new ClickHouseException(message, cause, queryId); } private void correctUserAgentHeader(HttpRequest request, Map requestConfig) { diff --git a/client-v2/src/main/java/com/clickhouse/client/api/query/Records.java b/client-v2/src/main/java/com/clickhouse/client/api/query/Records.java index 46d0e827d..f8eeba40c 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/query/Records.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/query/Records.java @@ -125,6 +125,14 @@ public long getResultRows() { return response.getMetrics().getMetric(ServerMetrics.RESULT_ROWS).getLong(); } + /** + * Returns response query id + * @return query id of the request + */ + public String getQueryId() { + return response.getQueryId(); + } + @Override public void close() throws Exception { response.close(); diff --git a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java index 0a85d9456..2be831f55 100644 --- a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java @@ -30,20 +30,23 @@ import java.io.ByteArrayInputStream; import java.net.ConnectException; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import static java.time.temporal.ChronoUnit.MILLIS; import static java.time.temporal.ChronoUnit.SECONDS; @@ -529,12 +532,47 @@ public void testInvalidConfig() { } } + @Test(groups = {"integration"}) + public void testQueryIdGenerator() throws Exception { + final String queryId = UUID.randomUUID().toString(); + Supplier constantQueryIdSupplier = () -> queryId; + + // check getting same UUID + for (int i = 0; i < 3; i++ ) { + try (Client client = newClient().setQueryIdGenerator(constantQueryIdSupplier).build()) { + client.execute("SELECT * FROM unknown_table").get().close(); + } catch (ServerException ex) { + Assert.assertEquals(ex.getCode(), ServerException.ErrorCodes.TABLE_NOT_FOUND.getCode()); + Assert.assertEquals(ex.getQueryId(), queryId); + } + } + + final Queue queryIds = new ConcurrentLinkedQueue<>(); // non-blocking + final Supplier queryIdGen = () -> { + String id = UUID.randomUUID().toString(); + queryIds.add(id); + return id; + }; + int requests = 3; + final Queue actualIds = new ConcurrentLinkedQueue<>(); + for (int i = 0; i < requests; i++ ) { + try (Client client = newClient().setQueryIdGenerator(queryIdGen).build()) { + client.execute("SELECT * FROM unknown_table").get().close(); + } catch (ServerException ex) { + Assert.assertEquals(ex.getCode(), ServerException.ErrorCodes.TABLE_NOT_FOUND.getCode()); + actualIds.add(ex.getQueryId()); + } + } + + Assert.assertEquals(queryIds.size(), requests); + Assert.assertEquals(actualIds, new ArrayList<>(queryIds)); + } + public boolean isVersionMatch(String versionExpression, Client client) { List serverVersion = client.queryAll("SELECT version()"); return ClickHouseVersion.of(serverVersion.get(0).getString(1)).check(versionExpression); } - protected Client.Builder newClient() { ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); boolean isSecure = isCloud(); diff --git a/client-v2/src/test/java/com/clickhouse/client/ErrorHandlingTests.java b/client-v2/src/test/java/com/clickhouse/client/ErrorHandlingTests.java new file mode 100644 index 000000000..314e8c309 --- /dev/null +++ b/client-v2/src/test/java/com/clickhouse/client/ErrorHandlingTests.java @@ -0,0 +1,86 @@ +package com.clickhouse.client; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.DataTransferException; +import com.clickhouse.client.api.ServerException; +import com.clickhouse.client.api.enums.Protocol; +import com.clickhouse.client.api.query.QuerySettings; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.time.temporal.ChronoUnit; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +@Test(groups = {"integration"}) +public class ErrorHandlingTests extends BaseIntegrationTest { + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + } + + /** + * Tests that a SQL error throws a ServerException. + */ + @Test(groups = {"integration"}) + void testServerError() throws Exception { + try (Client client = newClient().build()) { + // Execute a query against a non-existent table + client.query("SELECT * FROM non_existent_table_xyz_123").get(10, TimeUnit.SECONDS); + Assert.fail("Expected ServerException to be thrown"); + } catch (ServerException e) { + Assert.assertEquals(e.getCode(), ServerException.TABLE_NOT_FOUND); + Assert.assertFalse(e.getQueryId().isEmpty()); + Assert.assertTrue(e.getMessage().contains(e.getQueryId())); + } + } + + /** + * Tests that a SQL error throws a ServerException when async option is enabled. + */ + @Test(groups = {"integration"}) + void testServerErrorAsync() throws Exception { + try (Client client = newClient().useAsyncRequests(true).build()) { + // Execute a query against a non-existent table + client.query("SELECT * FROM non_existent_table_xyz_123").get(10, TimeUnit.SECONDS); + Assert.fail("Expected ServerException to be thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof ServerException, + "Expected cause to be ServerException but was: " + e.getCause().getClass().getName()); + ServerException se = (ServerException) e.getCause(); + Assert.assertEquals(se.getCode(), ServerException.TABLE_NOT_FOUND, + "Expected TABLE_NOT_FOUND error code"); + Assert.assertEquals(se.getCode(), ServerException.TABLE_NOT_FOUND); + Assert.assertFalse(se.getQueryId().isEmpty()); + Assert.assertTrue(se.getMessage().contains(se.getQueryId())); + } + } + + /** + * Tests that a query exceeding max_execution_time throws a ServerException with TIMEOUT_EXCEEDED code. + */ + @Test(groups = {"integration"}) + void testQueryTimeout() throws Exception { + String queryId = "test-query-id"; + try (Client client = newClient().setSocketTimeout(1, ChronoUnit.SECONDS).build()) { + QuerySettings settings = new QuerySettings().setQueryId(queryId); + + // Execute a query that will take longer than 1 second using sleep function + client.query("SELECT sleep(3)", settings).get(10, TimeUnit.SECONDS); + Assert.fail("Expected ServerException to be thrown due to timeout"); + } catch (DataTransferException e) { + Assert.assertTrue(e.getMessage().contains(queryId)); + Assert.assertEquals(e.getQueryId(), queryId); + } + } + + protected Client.Builder newClient() { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + return new Client.Builder() + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()); + } +} diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 2da5af59e..fc2d13a86 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -446,7 +446,7 @@ public void testErrorWithSuccessfulResponse() { Assert.fail("Expected exception"); } catch (ServerException e) { e.printStackTrace(); - Assert.assertEquals(e.getMessage(), "Code: 241. DB::Exception: Memory limit (for query) exceeded: would use 97.21 MiB"); + Assert.assertTrue(e.getMessage().startsWith("Code: 241. DB::Exception: Memory limit (for query) exceeded: would use 97.21 MiB")); } catch (Exception e) { e.printStackTrace(); Assert.fail("Unexpected exception", e); @@ -487,7 +487,7 @@ public void testServerErrorsUncompressed(int code, String message, String expect } catch (ServerException e) { e.printStackTrace(); Assert.assertEquals(e.getCode(), code); - Assert.assertEquals(e.getMessage(), expectedMessage); + Assert.assertTrue(e.getMessage().startsWith(expectedMessage)); } catch (Exception e) { e.printStackTrace(); Assert.fail("Unexpected exception", e); diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/DriverProperties.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/DriverProperties.java index 0e3d0b01b..872ec4608 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/DriverProperties.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/DriverProperties.java @@ -6,6 +6,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.UUID; /** * JDBC driver specific properties. Does not include anything from ClientConfigProperties. @@ -69,6 +70,14 @@ public enum DriverProperties { * */ SQL_PARSER("jdbc_sql_parser", "JAVACC", Arrays.asList("ANTLR4", "ANTLR4_PARAMS_PARSER", "JAVACC")), + + /** + * Sets query ID generator as {@link java.util.function.Supplier} to be used for Query ID generation + * before sending request with client. + * When no set - queryID is generated by using {@link UUID#randomUUID()}. + */ + QUERY_ID_GENERATOR("jdbc_query_id_generator", null), + ; diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java index 1909e88ec..a28b15234 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java @@ -22,6 +22,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; public class StatementImpl implements Statement, JdbcV2Wrapper { private static final Logger LOG = LoggerFactory.getLogger(StatementImpl.class); @@ -46,12 +47,14 @@ public class StatementImpl implements Statement, JdbcV2Wrapper { private final boolean resultSetAutoClose; private int maxFieldSize; private boolean escapeProcessingEnabled; + private final Supplier queryIdGenerator; private int fetchSize = 1; // settings local to a statement protected QuerySettings localSettings; + public StatementImpl(ConnectionImpl connection) throws SQLException { this.connection = connection; this.queryTimeout = 0; @@ -63,6 +66,7 @@ public StatementImpl(ConnectionImpl connection) throws SQLException { this.resultSetAutoClose = connection.getJdbcConfig().isSet(DriverProperties.RESULTSET_AUTO_CLOSE); this.escapeProcessingEnabled = true; this.featureManager = new FeatureManager(connection.getJdbcConfig()); + this.queryIdGenerator = connection.getJdbcConfig().getQueryIdGenerator(); } protected void ensureOpen() throws SQLException { @@ -128,6 +132,21 @@ private void closeCurrentResultSet() { } } + /** + * Sets last queryId and returns actual query Id + * Accepts null + * @param queryId + * @return + */ + protected String setLastQueryID(String queryId) { + if (queryId == null) { + queryId = queryIdGenerator == null ? UUID.randomUUID().toString() : queryIdGenerator.get(); + } + lastQueryId = queryId; + LOG.debug("Query ID: {}", lastQueryId); + return queryId; + } + protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) throws SQLException { ensureOpen(); @@ -143,13 +162,7 @@ protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) thr } QuerySettings mergedSettings = QuerySettings.merge(settings, new QuerySettings()); - if (mergedSettings.getQueryId() == null) { - final String queryId = UUID.randomUUID().toString(); - mergedSettings.setQueryId(queryId); - } - lastQueryId = mergedSettings.getQueryId(); - LOG.debug("Query ID: {}", lastQueryId); - + mergedSettings.setQueryId(setLastQueryID(mergedSettings.getQueryId())); QueryResponse response = null; try { lastStatementSql = parseJdbcEscapeSyntax(sql); @@ -206,13 +219,7 @@ protected long executeUpdateImpl(String sql, QuerySettings settings) throws SQLE } QuerySettings mergedSettings = QuerySettings.merge(connection.getDefaultQuerySettings(), settings); - - if (mergedSettings.getQueryId() == null) { - final String queryId = UUID.randomUUID().toString(); - mergedSettings.setQueryId(queryId); - } - lastQueryId = mergedSettings.getQueryId(); - + mergedSettings.setQueryId(setLastQueryID(mergedSettings.getQueryId())); lastStatementSql = parseJdbcEscapeSyntax(sql); LOG.trace("SQL Query: {}", lastStatementSql); int updateCount = 0; diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java index 582065016..c437c5731 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/WriterStatementImpl.java @@ -116,6 +116,8 @@ public long executeLargeUpdate() throws SQLException { int updateCount = 0; InputStream in = new ByteArrayInputStream(out.toByteArray()); InsertSettings settings = new InsertSettings(); + settings.setQueryId(setLastQueryID(settings.getQueryId())); + try (InsertResponse response = queryTimeout == 0 ? connection.getClient().insert(tableSchema.getTableName(),in, writer.getFormat(), settings).get() : connection.getClient().insert(tableSchema.getTableName(),in, writer.getFormat(), settings).get(queryTimeout, TimeUnit.SECONDS)) { diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcConfiguration.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcConfiguration.java index d1b1889d6..21d05a552 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcConfiguration.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcConfiguration.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -69,6 +70,7 @@ public boolean isIgnoreUnsupportedRequests() { DRIVER_PROP_KEYS = driverPropertiesMapBuilder.build(); } + private final Supplier queryIdGenerator; /** * Parses URL to get property and target host. @@ -82,6 +84,9 @@ public JdbcConfiguration(String url, Properties info) throws SQLException { this.clientProperties = new HashMap<>(); this.driverProperties = new HashMap<>(); + // queryID generator should not be set in client because query ID is used in StatementImpl to know the last one + this.queryIdGenerator = (Supplier) props.remove(DriverProperties.QUERY_ID_GENERATOR.getKey());; + Map urlProperties = parseUrl(url); String tmpConnectionUrl = urlProperties.remove(PARSE_URL_CONN_URL_PROP); initProperties(urlProperties, props); @@ -335,6 +340,10 @@ public String getDriverProperty(String key, String defaultValue) { return driverProperties.getOrDefault(key, defaultValue); } + public Supplier getQueryIdGenerator() { + return queryIdGenerator; + } + public Boolean isSet(DriverProperties driverProp) { String v = driverProperties.getOrDefault(driverProp.getKey(), driverProp.getDefaultValue()); return Boolean.parseBoolean(v); diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/JDBCErrorHandlingTests.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/JDBCErrorHandlingTests.java new file mode 100644 index 000000000..64282d0bf --- /dev/null +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/JDBCErrorHandlingTests.java @@ -0,0 +1,51 @@ +package com.clickhouse.jdbc; + + +import com.clickhouse.client.api.ServerException; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Supplier; + +@Test(groups = { "integration" }) +public class JDBCErrorHandlingTests extends JdbcIntegrationTest { + + @Test(groups = {"integration"}) + public void testServerErrorCodePropagatedToSQLException() throws Exception { + try (Connection conn = getJdbcConnection(); Statement stmt = conn.createStatement()) { + stmt.executeQuery("SELECT * FROM somedb.unknown_table"); + } catch (SQLException e) { + Assert.assertEquals(e.getErrorCode(), ServerException.ErrorCodes.DATABASE_NOT_FOUND.getCode()); + } + } + + @Test(groups = {"integration"}) + public void testQueryIDPropagatedToException() throws Exception { + final Queue queryIds = new ConcurrentLinkedQueue<>(); // non-blocking + final Supplier queryIdGen = () -> { + String id = UUID.randomUUID().toString(); + queryIds.add(id); + return id; + }; + int requests = 3; + + Properties connConfig = new Properties(); + connConfig.put(DriverProperties.QUERY_ID_GENERATOR.getKey(), queryIdGen); + for (int i = 0; i < requests; i++) { + try (Connection conn = getJdbcConnection(connConfig); Statement stmt = conn.createStatement()) { + stmt.executeQuery("SELECT * FROM somedb.unknown_table"); + } catch (SQLException e) { + Assert.assertEquals(e.getErrorCode(), ServerException.ErrorCodes.DATABASE_NOT_FOUND.getCode()); + } + } + + Assert.assertEquals(queryIds.size(), requests); + } +}