diff --git a/pom.xml b/pom.xml index 89fa1f6..c41a401 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ 1.7.4 4.0 2.4 - 3.14.4 + 4.0.2 diff --git a/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java b/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java index 7967cfd..a6ddac6 100644 --- a/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java +++ b/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java @@ -286,7 +286,7 @@ public boolean canConnect() { && !containsMacro(PROPERTY_PASSWORD) && !containsMacro(PROPERTY_WAREHOUSE) && !containsMacro(PROPERTY_ROLE) && !containsMacro(PROPERTY_CLIENT_ID) && !containsMacro(PROPERTY_CLIENT_SECRET) && !containsMacro(PROPERTY_REFRESH_TOKEN) - && !containsMacro(PROPERTY_PRIVATE_KEY)); + && !containsMacro(PROPERTY_PRIVATE_KEY) && !containsMacro(PROPERTY_PASSPHRASE)); } protected void validateConnection(FailureCollector collector) { @@ -308,7 +308,7 @@ protected void validateConnection(FailureCollector collector) { // TODO: for oauth2 if (Boolean.TRUE.equals(keyPairEnabled)) { - failure.withConfigProperty(PROPERTY_PRIVATE_KEY); + failure.withConfigProperty(PROPERTY_PRIVATE_KEY).withConfigProperty(PROPERTY_PASSPHRASE); } else { failure.withConfigProperty(PROPERTY_PASSWORD); } diff --git a/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorType.java b/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorType.java index 9e6424a..d422eff 100644 --- a/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorType.java +++ b/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorType.java @@ -22,7 +22,7 @@ import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil; -import net.snowflake.client.jdbc.ErrorCode; +import net.snowflake.client.api.exception.ErrorCode; import java.sql.SQLException; import java.util.Arrays; diff --git a/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java b/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java index 8185ae8..24919e0 100644 --- a/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java +++ b/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java @@ -28,7 +28,7 @@ import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException; import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil; import io.cdap.plugin.snowflake.common.util.QueryUtil; -import net.snowflake.client.jdbc.SnowflakeBasicDataSource; +import net.snowflake.client.internal.api.implementation.datasource.SnowflakeBasicDataSource; import org.apache.http.impl.client.HttpClients; import java.io.BufferedWriter; @@ -154,10 +154,10 @@ private void initDataSource(SnowflakeBasicDataSource dataSource, BaseSnowflakeCo if (Boolean.TRUE.equals(config.getOauth2Enabled())) { String accessToken = OAuthUtil.getAccessTokenByRefreshToken(HttpClients.createDefault(), config); - dataSource.setOauthToken(accessToken); - // The recommend way to pass token is in the password when you use the driver with connection pool. - // This is also a mandatory field, so adding the same. - // Refer https://github.com/snowflakedb/snowflake-jdbc/issues/1175 + // In JDBC 4.x, setOauthToken() was removed. The recommended approach is to explicitly + // set the authenticator to "oauth" and pass the access token as the password. + // Migration guide: https://docs.snowflake.com/en/developer-guide/jdbc/jdbc-migration + dataSource.setAuthenticator("oauth"); dataSource.setPassword(accessToken); } else if (Boolean.TRUE.equals(config.getKeyPairEnabled())) { dataSource.setUser(config.getUsername()); diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java index 5eae056..7cfc8aa 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java @@ -19,7 +19,8 @@ import io.cdap.plugin.snowflake.common.SnowflakeErrorType; import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor; import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil; -import net.snowflake.client.jdbc.SnowflakeConnection; +import net.snowflake.client.api.connection.SnowflakeConnection; +import net.snowflake.client.api.connection.UploadStreamConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.InputStream; @@ -52,9 +53,10 @@ public void uploadStream(InputStream inputStream, String stageDir) { LOG.info("Uploading file '{}' to table stage", filename); try (Connection connection = dataSource.getConnection()) { + UploadStreamConfig uploadConfig = UploadStreamConfig.builder().setDestPrefix(null).setCompressData(true).build(); connection.unwrap(SnowflakeConnection.class).uploadStream(stageDir, - null, - inputStream, filename, true); + filename, + inputStream, uploadConfig); } catch (SQLException e) { String errorReason = String.format("Unable to compress '%s' and upload data to destination stage '%s'. For " + "more details, see %s", filename, stageDir, DocumentUrlUtil.getSupportedDocumentUrl()); diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java index 9896ccd..ad1594d 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java @@ -24,7 +24,8 @@ import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil; import io.cdap.plugin.snowflake.common.util.QueryUtil; import io.cdap.plugin.snowflake.common.util.SchemaHelper; -import net.snowflake.client.jdbc.SnowflakeConnection; +import net.snowflake.client.api.connection.DownloadStreamConfig; +import net.snowflake.client.api.connection.SnowflakeConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; @@ -140,8 +141,9 @@ public void removeStageFile(String stageSplit) { */ public CSVReader buildCsvReader(String stageSplit) { try (Connection connection = dataSource.getConnection()) { + DownloadStreamConfig downloadStreamConfig = DownloadStreamConfig.builder().setDecompress(true).build(); InputStream downloadStream = connection.unwrap(SnowflakeConnection.class) - .downloadStream("@~", stageSplit, true); + .downloadStream("@~", stageSplit, downloadStreamConfig); InputStreamReader inputStreamReader = new InputStreamReader(downloadStream); return new CSVReader(inputStreamReader, ',', '"', escapeChar); } catch (SQLException e) { diff --git a/src/test/java/io/cdap/plugin/snowflake/common/BaseSnowflakeTest.java b/src/test/java/io/cdap/plugin/snowflake/common/BaseSnowflakeTest.java index 24b6588..b021cc8 100644 --- a/src/test/java/io/cdap/plugin/snowflake/common/BaseSnowflakeTest.java +++ b/src/test/java/io/cdap/plugin/snowflake/common/BaseSnowflakeTest.java @@ -22,7 +22,7 @@ import io.cdap.plugin.snowflake.common.client.SnowflakeAccessorTest; import io.cdap.plugin.snowflake.source.batch.SnowflakeBatchSourceConfig; import io.cdap.plugin.snowflake.source.batch.SnowflakeBatchSourceConfigBuilder; -import net.snowflake.client.jdbc.SnowflakeBasicDataSource; +import net.snowflake.client.internal.api.implementation.datasource.SnowflakeBasicDataSource; import org.junit.Assume; import org.junit.BeforeClass; import org.junit.ClassRule;