Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<powermock.version>1.7.4</powermock.version>
<guice.version>4.0</guice.version>
<opencsv.version>2.4</opencsv.version>
<snowflake-jdbc.version>3.14.4</snowflake-jdbc.version>
<snowflake-jdbc.version>4.0.2</snowflake-jdbc.version>
</properties>

<repositories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public boolean canConnect() {
&& !containsMacro(PROPERTY_PASSWORD) && !containsMacro(PROPERTY_WAREHOUSE)
&& !containsMacro(PROPERTY_ROLE) && !containsMacro(PROPERTY_CLIENT_ID)
&& !containsMacro(PROPERTY_CLIENT_SECRET) && !containsMacro(PROPERTY_REFRESH_TOKEN)
&& !containsMacro(PROPERTY_PRIVATE_KEY));
&& !containsMacro(PROPERTY_PRIVATE_KEY) && !containsMacro(PROPERTY_PASSPHRASE));
}

protected void validateConnection(FailureCollector collector) {
Expand All @@ -308,7 +308,7 @@ protected void validateConnection(FailureCollector collector) {

// TODO: for oauth2
if (Boolean.TRUE.equals(keyPairEnabled)) {
failure.withConfigProperty(PROPERTY_PRIVATE_KEY);
failure.withConfigProperty(PROPERTY_PRIVATE_KEY).withConfigProperty(PROPERTY_PASSPHRASE);
} else {
failure.withConfigProperty(PROPERTY_PASSWORD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil;
import net.snowflake.client.jdbc.ErrorCode;
import net.snowflake.client.api.exception.ErrorCode;

import java.sql.SQLException;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException;
import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil;
import io.cdap.plugin.snowflake.common.util.QueryUtil;
import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
import net.snowflake.client.internal.api.implementation.datasource.SnowflakeBasicDataSource;
import org.apache.http.impl.client.HttpClients;

import java.io.BufferedWriter;
Expand Down Expand Up @@ -154,10 +154,10 @@ private void initDataSource(SnowflakeBasicDataSource dataSource, BaseSnowflakeCo

if (Boolean.TRUE.equals(config.getOauth2Enabled())) {
String accessToken = OAuthUtil.getAccessTokenByRefreshToken(HttpClients.createDefault(), config);
dataSource.setOauthToken(accessToken);
// The recommend way to pass token is in the password when you use the driver with connection pool.
// This is also a mandatory field, so adding the same.
// Refer https://github.com/snowflakedb/snowflake-jdbc/issues/1175
dataSource.setAuthenticator("oauth");
dataSource.setPassword(accessToken);
} else if (Boolean.TRUE.equals(config.getKeyPairEnabled())) {
dataSource.setUser(config.getUsername());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import io.cdap.plugin.snowflake.common.SnowflakeErrorType;
import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor;
import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil;
import net.snowflake.client.jdbc.SnowflakeConnection;
import net.snowflake.client.api.connection.SnowflakeConnection;
import net.snowflake.client.api.connection.UploadStreamConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
Expand Down Expand Up @@ -52,9 +53,10 @@ public void uploadStream(InputStream inputStream, String stageDir) {
LOG.info("Uploading file '{}' to table stage", filename);

try (Connection connection = dataSource.getConnection()) {
UploadStreamConfig uploadConfig = UploadStreamConfig.builder().setDestPrefix(null).setCompressData(true).build();
connection.unwrap(SnowflakeConnection.class).uploadStream(stageDir,
null,
inputStream, filename, true);
filename,
inputStream, uploadConfig);
} catch (SQLException e) {
String errorReason = String.format("Unable to compress '%s' and upload data to destination stage '%s'. For " +
"more details, see %s", filename, stageDir, DocumentUrlUtil.getSupportedDocumentUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil;
import io.cdap.plugin.snowflake.common.util.QueryUtil;
import io.cdap.plugin.snowflake.common.util.SchemaHelper;
import net.snowflake.client.jdbc.SnowflakeConnection;
import net.snowflake.client.api.connection.DownloadStreamConfig;
import net.snowflake.client.api.connection.SnowflakeConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
Expand Down Expand Up @@ -140,8 +141,9 @@ public void removeStageFile(String stageSplit) {
*/
public CSVReader buildCsvReader(String stageSplit) {
try (Connection connection = dataSource.getConnection()) {
DownloadStreamConfig downloadStreamConfig = DownloadStreamConfig.builder().setDecompress(true).build();
InputStream downloadStream = connection.unwrap(SnowflakeConnection.class)
.downloadStream("@~", stageSplit, true);
.downloadStream("@~", stageSplit, downloadStreamConfig);
InputStreamReader inputStreamReader = new InputStreamReader(downloadStream);
return new CSVReader(inputStreamReader, ',', '"', escapeChar);
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.cdap.plugin.snowflake.common.client.SnowflakeAccessorTest;
import io.cdap.plugin.snowflake.source.batch.SnowflakeBatchSourceConfig;
import io.cdap.plugin.snowflake.source.batch.SnowflakeBatchSourceConfigBuilder;
import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
import net.snowflake.client.internal.api.implementation.datasource.SnowflakeBasicDataSource;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down