forked from data-integrations/database-plugins
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAbstractDBSpecificSourceConfig.java
More file actions
274 lines (231 loc) · 9.88 KB
/
AbstractDBSpecificSourceConfig.java
File metadata and controls
274 lines (231 loc) · 9.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.plugin.db.config;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.common.Constants;
import io.cdap.plugin.db.TransactionIsolationLevel;
import io.cdap.plugin.db.connector.AbstractDBConnectorConfig;
import io.cdap.plugin.db.source.AbstractDBSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
/**
* Abstract Config for DB Specific Source plugin
*/
public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implements DatabaseSourceConfig {
public static final String IMPORT_QUERY = "importQuery";
public static final String BOUNDING_QUERY = "boundingQuery";
public static final String SPLIT_BY = "splitBy";
public static final String NUM_SPLITS = "numSplits";
public static final String SCHEMA = "schema";
public static final String DATABASE = "database";
public static final String FETCH_SIZE = "fetchSize";
public static final String DEFAULT_FETCH_SIZE = "1000";
public static final Logger LOG = LoggerFactory.getLogger(AbstractDBSpecificSourceConfig.class);
@Name(Constants.Reference.REFERENCE_NAME)
@Description(Constants.Reference.REFERENCE_NAME_DESCRIPTION)
public String referenceName;
@Name(IMPORT_QUERY)
@Description("The SELECT query to use to import data from the specified table. " +
"You can specify an arbitrary number of columns to import, or import all columns using *. " +
"The Query should contain the '$CONDITIONS' string unless numSplits is set to one. " +
"For example, 'SELECT * FROM table WHERE $CONDITIONS'. The '$CONDITIONS' string" +
"will be replaced by 'splitBy' field limits specified by the bounding query.")
@Macro
protected String importQuery;
@Nullable
@Name(BOUNDING_QUERY)
@Description("Bounding Query should return the min and max of the " +
"values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. " +
"This is required unless numSplits is set to one.")
@Macro
protected String boundingQuery;
@Nullable
@Name(SPLIT_BY)
@Description("Field Name which will be used to generate splits. This is required unless numSplits is set to one.")
@Macro
protected String splitBy;
@Nullable
@Name(NUM_SPLITS)
@Description("The number of splits to generate. If set to one, the boundingQuery is not needed, " +
"and no $CONDITIONS string needs to be specified in the importQuery. If not specified, the " +
"execution framework will pick a value.")
@Macro
protected Integer numSplits;
@Nullable
@Name(SCHEMA)
@Description("The schema of records output by the source. This will be used in place of whatever schema comes " +
"back from the query. This should only be used if there is a bug in your jdbc driver. For example, if a column " +
"is not correctly getting marked as nullable.")
private String schema;
@Nullable
@Name(FETCH_SIZE)
@Macro
@Description("The number of rows to fetch at a time per split. Larger fetch size can result in faster import, " +
"with the tradeoff of higher memory usage.")
protected Integer fetchSize;
public String getImportQuery() {
return cleanQuery(importQuery);
}
public String getBoundingQuery() {
return cleanQuery(boundingQuery);
}
public void validate(FailureCollector collector) {
boolean hasOneSplit = false;
if (!containsMacro(NUM_SPLITS) && numSplits != null) {
if (numSplits < 1) {
collector.addFailure(
String.format("Invalid value for Number of Splits '%d'. Must be at least 1.", numSplits),
"Specify a Number of Splits no less than 1.")
.withConfigProperty(NUM_SPLITS);
}
if (numSplits == 1) {
hasOneSplit = true;
}
}
if (getTransactionIsolationLevel() != null) {
TransactionIsolationLevel.validate(getTransactionIsolationLevel(), collector);
}
if (!containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) {
collector.addFailure("Import Query is empty.", "Specify the Import Query.")
.withConfigProperty(IMPORT_QUERY);
}
if (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) {
collector.addFailure(String.format(
"Import Query %s must contain the string '$CONDITIONS'. if Number of Splits is not set to 1.", importQuery),
"Include '$CONDITIONS' in the Import Query")
.withConfigProperty(IMPORT_QUERY);
}
if (!hasOneSplit && !containsMacro(SPLIT_BY) && (splitBy == null || splitBy.isEmpty())) {
collector.addFailure("Split-By Field Name must be specified if Number of Splits is not set to 1.",
"Specify the Split-by Field Name.").withConfigProperty(SPLIT_BY)
.withConfigProperty(NUM_SPLITS);
}
if (!hasOneSplit && !containsMacro(BOUNDING_QUERY) && (boundingQuery == null || boundingQuery.isEmpty())) {
collector.addFailure("Bounding Query must be specified if Number of Splits is not set to 1.",
"Specify the Bounding Query.")
.withConfigProperty(BOUNDING_QUERY).withConfigProperty(NUM_SPLITS);
}
if (!containsMacro(FETCH_SIZE) && fetchSize != null && fetchSize <= 0) {
collector.addFailure("Invalid fetch size.", "Fetch size must be a positive integer.")
.withConfigProperty(FETCH_SIZE);
}
}
public void validateSchema(Schema actualSchema, FailureCollector collector) {
Schema configSchema = getSchema();
if (configSchema == null) {
collector.addFailure("Schema should not be null or empty.", "Fill in the Schema.")
.withConfigProperty(SCHEMA);
return;
}
for (Schema.Field field : configSchema.getFields()) {
Schema.Field actualField = actualSchema.getField(field.getName());
if (actualField == null) {
collector.addFailure(
String.format("Schema field '%s' is not present in actual record", field.getName()),
String.format("Remove the field %s in the schema.", field.getName()))
.withOutputSchemaField(field.getName());
continue;
}
Schema actualFieldSchema = actualField.getSchema().isNullable() ?
actualField.getSchema().getNonNullable() : actualField.getSchema();
Schema expectedFieldSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();
validateField(collector, field, actualFieldSchema, expectedFieldSchema);
}
}
protected void validateField(FailureCollector collector, Schema.Field field, Schema actualFieldSchema,
Schema expectedFieldSchema) {
if (actualFieldSchema.getType() != expectedFieldSchema.getType() ||
actualFieldSchema.getLogicalType() != expectedFieldSchema.getLogicalType()) {
collector.addFailure(
String.format("Schema field '%s' is expected to have type '%s but found '%s'.",
field.getName(), expectedFieldSchema.getDisplayName(),
actualFieldSchema.getDisplayName()),
String.format("Change the data type of field %s to %s.", field.getName(), actualFieldSchema.getDisplayName()))
.withOutputSchemaField(field.getName());
}
}
public Schema getSchema() {
try {
return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema);
} catch (IOException e) {
throw new IllegalArgumentException(String.format("Unable to parse schema '%s'. Reason: %s",
schema, e.getMessage()), e);
}
}
public String getTransactionIsolationLevel() {
return null;
}
public Integer getNumSplits() {
return numSplits;
}
public String getSplitBy() {
return splitBy;
}
public String getConnectionString() {
return getConnection().getConnectionString();
}
public Map<String, String> getConnectionArguments() {
Map<String, String> arguments = new HashMap<>();
arguments.putAll(Maps.fromProperties(getConnection().getConnectionArgumentsProperties()));
arguments.putAll(getDBSpecificArguments());
return arguments;
}
public String getJdbcPluginName() {
return getConnection().getJdbcPluginName();
}
public String getUser() {
return getConnection().getUser();
}
public String getPassword() {
return getConnection().getPassword();
}
public String getReferenceName() {
return referenceName;
}
public List<String> getInitQueries() {
return Collections.emptyList();
}
protected String cleanQuery(@Nullable String query) {
if (query == null) {
return null;
}
return query.trim().replaceAll("[ ,]+$", "");
}
protected abstract Map<String, String> getDBSpecificArguments();
protected abstract AbstractDBConnectorConfig getConnection();
@Override
public boolean canConnect() {
return !containsMacro(AbstractDBSource.DBSourceConfig.IMPORT_QUERY) && getConnection().canConnect();
}
@Override
public Integer getFetchSize() {
return fetchSize;
}
}