-
Notifications
You must be signed in to change notification settings - Fork 34
Expand file tree
/
Copy pathMssqlClient.java
More file actions
253 lines (226 loc) · 12.8 KB
/
MssqlClient.java
File metadata and controls
253 lines (226 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.plugin;
import io.cdap.e2e.utils.PluginPropertyUtils;
import org.junit.Assert;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Arrays;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.TimeZone;
/**
* Mssql client.
*/
public class MssqlClient {
private static Connection getMssqlConnection() throws SQLException, ClassNotFoundException {
TimeZone timezone = TimeZone.getTimeZone("UTC");
TimeZone.setDefault(timezone);
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
String databaseName = PluginPropertyUtils.pluginProp("databaseName");
return DriverManager.getConnection("jdbc:sqlserver://" + System.getenv("MSSQL_HOST")
+ ":" + System.getenv("MSSQL_PORT") + ";databaseName=" + databaseName,
System.getenv("MSSQL_USERNAME"), System.getenv("MSSQL_PASSWORD"));
}
public static int countRecord(String table, String schema) throws SQLException, ClassNotFoundException {
String countQuery = "SELECT COUNT(*) as total FROM " + schema + "." + table;
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement();
ResultSet rs = statement.executeQuery(countQuery)) {
int num = 0;
while (rs.next()) {
num = (rs.getInt(1));
}
return num;
}
}
public static void createSourceTable(String sourceTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
String createSourceTableQuery = createTableQuery(sourceTable, schema,
"(ID varchar(100), LASTNAME varchar(100))");
statement.executeUpdate(createSourceTableQuery);
// Insert dummy data.
statement.executeUpdate("INSERT INTO " + schema + "." + sourceTable + " (ID, LASTNAME)" +
" VALUES ('id1', 'Shelby')");
statement.executeUpdate("INSERT INTO " + schema + "." + sourceTable + " (ID, LASTNAME)" +
" VALUES ('id2', 'Simpson')");
}
}
public static void createTargetTable(String targetTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
String createTargetTableQuery = createTableQuery(targetTable, schema,
"(ID varchar(100), LASTNAME varchar(100))");
statement.executeUpdate(createTargetTableQuery);
}
}
public static void createSourceDatatypesTable(String sourceTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
String datatypeColumns = PluginPropertyUtils.pluginProp("datatypeColumns");
String createSourceTableQuery2 = createTableQuery(sourceTable, schema, datatypeColumns);
statement.executeUpdate(createSourceTableQuery2);
// Insert dummy data.
String datatypeValues = PluginPropertyUtils.pluginProp("datatypeValues");
String datatypeColumnsList = PluginPropertyUtils.pluginProp("datatypeColumnsList");
statement.executeUpdate(insertQuery(sourceTable, schema, datatypeColumnsList, datatypeValues));
}
}
public static void createTargetDatatypesTable(String targetTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
String datatypeColumns = PluginPropertyUtils.pluginProp("datatypeColumns");
String createTargetTableQuery2 = createTableQuery(targetTable, schema, datatypeColumns);
statement.executeUpdate(createTargetTableQuery2);
}
}
public static void createSourceImageTable(String sourceTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
String imageColumns = PluginPropertyUtils.pluginProp("imageColumns");
String createSourceTableQuery3 = createTableQuery(sourceTable, schema, imageColumns);
statement.executeUpdate(createSourceTableQuery3);
// Insert dummy data.
String imageValues = PluginPropertyUtils.pluginProp("imageValues");
String imageColumnsList = PluginPropertyUtils.pluginProp("imageColumnsList");
statement.executeUpdate(insertQuery(sourceTable, schema, imageColumnsList, imageValues));
}
}
public static void createTargetImageTable(String targetTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
String imageColumns = PluginPropertyUtils.pluginProp("imageColumns");
String createTargetTableQuery3 = createTableQuery(targetTable, schema, imageColumns);
statement.executeUpdate(createTargetTableQuery3);
}
}
public static void createSourceUniqueIdentifierTable(String sourceTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
String uniqueIdentifierColumns = PluginPropertyUtils.pluginProp("uniqueIdentifierColumns");
String createSourceTableQuery3 = createTableQuery(sourceTable, schema, uniqueIdentifierColumns);
statement.executeUpdate(createSourceTableQuery3);
// Insert dummy data.
String uniqueIdentifierValues = PluginPropertyUtils.pluginProp("uniqueIdentifierValues");
String uniqueIdentifierColumnsList = PluginPropertyUtils.pluginProp("uniqueIdentifierColumnsList");
statement.executeUpdate(insertQuery(sourceTable, schema, uniqueIdentifierColumnsList,
uniqueIdentifierValues));
}
}
public static void createTargetUniqueIdentifierTable(String targetTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
String uniqueIdentifierColumns = PluginPropertyUtils.pluginProp("uniqueIdentifierColumns");
String createTargetTableQuery3 = createTableQuery(targetTable, schema, uniqueIdentifierColumns);
statement.executeUpdate(createTargetTableQuery3);
}
}
public static void createSourceDateTimeTable(String sourceTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
String dateTimeColumns = PluginPropertyUtils.pluginProp("dateTimeColumns");
String createSourceTableQuery3 = createTableQuery(sourceTable, schema, dateTimeColumns);
statement.executeUpdate(createSourceTableQuery3);
// Insert dummy data.
String dateTimeValues = PluginPropertyUtils.pluginProp("dateTimeValues");
String dateTimeColumnsList = PluginPropertyUtils.pluginProp("dateTimeColumnsList");
statement.executeUpdate(insertQuery(sourceTable, schema, dateTimeColumnsList,
dateTimeValues));
}
}
public static void createTargetDateTimeTable(String targetTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
String dateTimeColumns = PluginPropertyUtils.pluginProp("dateTimeColumns");
String createTargetTableQuery3 = createTableQuery(targetTable, schema, dateTimeColumns);
statement.executeUpdate(createTargetTableQuery3);
}
}
public static void deleteTables(String schema, String[] tables)
throws SQLException, ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
for (String table : tables) {
String dropTableQuery = "DROP TABLE " + schema + "." + table;
statement.execute(dropTableQuery);
}
}
}
public static boolean validateRecordValues(String schema, String sourceTable, String targetTable)
throws SQLException, ClassNotFoundException {
String getSourceQuery = "SELECT * FROM " + schema + "." + sourceTable;
String getTargetQuery = "SELECT * FROM " + schema + "." + targetTable;
try (Connection connect = getMssqlConnection()) {
connect.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
Statement statement1 = connect.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
Statement statement2 = connect.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
ResultSet rsSource = statement1.executeQuery(getSourceQuery);
ResultSet rsTarget = statement2.executeQuery(getTargetQuery);
return compareResultSetData(rsSource, rsTarget);
}
}
private static String createTableQuery(String table, String schema, String columns) {
return String.format("CREATE TABLE %s.%s %s", schema, table, columns);
}
private static String insertQuery(String table, String schema, String columnList, String columnValues) {
return String.format("INSERT INTO %s.%s %s %s", schema, table,
columnList, columnValues);
}
private static boolean compareResultSetData(ResultSet rsSource, ResultSet rsTarget) throws SQLException {
ResultSetMetaData mdSource = rsSource.getMetaData();
ResultSetMetaData mdTarget = rsTarget.getMetaData();
int columnCountSource = mdSource.getColumnCount();
int columnCountTarget = mdTarget.getColumnCount();
Assert.assertEquals("Number of columns in source and target are not equal",
columnCountSource, columnCountTarget);
while (rsSource.next() && rsTarget.next()) {
int currentColumnCount = 1;
while (currentColumnCount <= columnCountSource) {
String columnTypeName = mdSource.getColumnTypeName(currentColumnCount);
int columnType = mdSource.getColumnType(currentColumnCount);
String columnName = mdSource.getColumnName(currentColumnCount);
if (columnType == Types.TIMESTAMP) {
GregorianCalendar gc = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
gc.setGregorianChange(new Date(Long.MIN_VALUE));
Timestamp sourceTS = rsSource.getTimestamp(currentColumnCount, gc);
Timestamp targetTS = rsTarget.getTimestamp(currentColumnCount, gc);
Assert.assertEquals(String.format("Different values found for column : %s", columnName),
sourceTS, targetTS);
} else {
String sourceString = rsSource.getString(currentColumnCount);
String targetString = rsTarget.getString(currentColumnCount);
Assert.assertEquals(String.format("Different values found for column : %s", columnName),
sourceString, targetString);
}
currentColumnCount++;
}
}
Assert.assertFalse("Number of rows in Source table is greater than the number of rows in Target table",
rsSource.next());
Assert.assertFalse("Number of rows in Target table is greater than the number of rows in Source table",
rsTarget.next());
return true;
}
}