diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 305cd10c8ab8..5759af954f19 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -87,6 +87,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.CacheFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn; @@ -1746,6 +1747,8 @@ public abstract static class ReadChangeStream abstract String getChangeStreamName(); + abstract @Nullable List getTvfNameList(); + abstract @Nullable String getMetadataInstance(); abstract @Nullable String getMetadataDatabase(); @@ -1783,6 +1786,8 @@ abstract static class Builder { abstract Builder setChangeStreamName(String changeStreamName); + abstract Builder setTvfNameList(List tvfNameList); + abstract Builder setMetadataInstance(String metadataInstance); abstract Builder setMetadataDatabase(String metadataDatabase); @@ -1861,6 +1866,11 @@ public ReadChangeStream withChangeStreamName(String changeStreamName) { return toBuilder().setChangeStreamName(changeStreamName).build(); } + /** Specifies the list of TVF names to query and union. */ + public ReadChangeStream withTvfNameList(List tvfNameList) { + return toBuilder().setTvfNameList(tvfNameList).build(); + } + /** Specifies the metadata database. */ public ReadChangeStream withMetadataInstance(String metadataInstance) { return toBuilder().setMetadataInstance(metadataInstance).build(); @@ -2042,6 +2052,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0 ? MAX_INCLUSIVE_END_AT : getInclusiveEndAt(); + final List tvfNameList = getTvfNameList(); final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect); final ChangeStreamMetrics metrics = new ChangeStreamMetrics(); final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH); @@ -2051,10 +2062,19 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta isMutableChangeStream( spannerAccessor.getDatabaseClient(), changeStreamDatabaseDialect, changeStreamName); LOG.info("The change stream {} is mutable: {}", changeStreamName, isMutableChangeStream); + if (tvfNameList != null && !tvfNameList.isEmpty()) { + if (!isMutableChangeStream) { + throw new IllegalArgumentException( + "tvfNameList is only supported for change streams with MUTABLE_KEY_RANGE mode"); + } + // TODO: if !per_placement_tvf=true, throw exception. + checkTvfExistence(spannerAccessor.getDatabaseClient(), tvfNameList); + } final DaoFactory daoFactory = new DaoFactory( changeStreamSpannerConfig, changeStreamName, + tvfNameList, partitionMetadataSpannerConfig, partitionMetadataTableNames, rpcPriority, @@ -2754,6 +2774,50 @@ static String resolveSpannerProjectId(SpannerConfig config) { : config.getProjectId().get(); } + @VisibleForTesting + static void checkTvfExistence(DatabaseClient databaseClient, List tvfNameList) { + if (tvfNameList == null || tvfNameList.isEmpty()) { + return; + } + Dialect dialect = databaseClient.getDialect(); + try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) { + StringBuilder sql = + new StringBuilder( + "SELECT routine_name FROM information_schema.routines WHERE routine_type LIKE '%FUNCTION' AND routine_name IN ("); + for (int i = 0; i < tvfNameList.size(); i++) { + if (dialect == Dialect.POSTGRESQL) { + sql.append("$").append(i + 1); + } else { + sql.append("@p").append(i); + } + if (i < tvfNameList.size() - 1) { + sql.append(", "); + } + } + sql.append(")"); + Statement.Builder builder = Statement.newBuilder(sql.toString()); + for (int i = 0; i < tvfNameList.size(); i++) { + if (dialect == Dialect.POSTGRESQL) { + builder.bind("p" + (i + 1)).to(PartitionMetadataDao.escapeTvfName(tvfNameList.get(i))); + } else { + builder.bind("p" + i).to(PartitionMetadataDao.escapeTvfName(tvfNameList.get(i))); + } + } + Statement statement = builder.build(); + ResultSet resultSet = tx.executeQuery(statement); + java.util.Set foundNames = new java.util.HashSet<>(); + while (resultSet.next()) { + foundNames.add(resultSet.getString(0)); + } + for (String tvfName : tvfNameList) { + if (!foundNames.contains(PartitionMetadataDao.escapeTvfName(tvfName))) { + throw new IllegalArgumentException( + "TVF specified: " + tvfName + " is not found in the existing TVF's: " + foundNames); + } + } + } + } + @VisibleForTesting static boolean isMutableChangeStream( DatabaseClient databaseClient, Dialect dialect, String changeStreamName) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java index 9b7c76a3ff50..ea325a118728 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java @@ -20,6 +20,7 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.RpcPriority; import java.util.Collections; +import java.util.List; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State; @@ -64,6 +65,18 @@ public class ChangeStreamsConstants { /** The sliding window size in seconds for throughput reporting. */ public static final int THROUGHPUT_WINDOW_SECONDS = 10; + /** + * The delimiter used to separate the partition token and the tvf name. Note this string does not + * exist in the partition token itself. + */ + public static final String PARTITION_TOKEN_TVF_NAME_DELIMITER = "#"; + + /** The default tvf name for a change stream query is the empty {@link String}. */ + public static final String DEFAULT_TVF_NAME = ""; + + /** The default tvf name list to query and union is empty {@link Collections.emptyList()}. */ + public static final List DEFAULT_TVF_NAME_LIST = Collections.emptyList(); + /** * We use the following partition token to provide an estimate size of a partition token. A usual * partition token has around 140 characters. @@ -85,6 +98,7 @@ public class ChangeStreamsConstants { .setState(State.CREATED) .setWatermark(Timestamp.now()) .setCreatedAt(Timestamp.now()) + .setTvfName(DEFAULT_TVF_NAME) .build(); /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java index 14b6b2e2453a..5c6004ba5dc6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java @@ -155,13 +155,17 @@ private void processChildPartition( record.getStartTimestamp(), partition.getEndTimestamp(), partition.getHeartbeatMillis(), + partition.getTvfName(), childPartition); LOG.debug("[{}] Inserting child partition token {}", partitionToken, childPartitionToken); final Boolean insertedRow = partitionMetadataDao .runInTransaction( transaction -> { - if (transaction.getPartition(childPartitionToken) == null) { + if (transaction.getPartition( + PartitionMetadataDao.composePartitionTokenWithTvfName( + childPartitionToken, partition.getTvfName())) + == null) { transaction.insert(row); return true; } else { @@ -188,6 +192,7 @@ private PartitionMetadata toPartitionMetadata( Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis, + String tvfName, ChildPartition childPartition) { return PartitionMetadata.newBuilder() .setPartitionToken(childPartition.getToken()) @@ -195,6 +200,7 @@ private PartitionMetadata toPartitionMetadata( .setStartTimestamp(startTimestamp) .setEndTimestamp(endTimestamp) .setHeartbeatMillis(heartbeatMillis) + .setTvfName(tvfName) .setState(CREATED) .setWatermark(startTimestamp) .build(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java index c889d41279ff..e3f291ead74b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java @@ -172,11 +172,14 @@ private ProcessContinuation schedulePartitions( } private Timestamp updateBatchToScheduled(List batchPartitions) { - final List batchPartitionTokens = + final List batchComposedPartitionTokens = batchPartitions.stream() - .map(PartitionMetadata::getPartitionToken) + .map( + partition -> + PartitionMetadataDao.composePartitionTokenWithTvfName( + partition.getPartitionToken(), partition.getTvfName())) .collect(Collectors.toList()); - return dao.updateToScheduled(batchPartitionTokens); + return dao.updateToScheduled(batchComposedPartitionTokens); } private void outputBatch( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionStartRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionStartRecordAction.java index 3d20b858e4ac..d89e86692319 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionStartRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionStartRecordAction.java @@ -138,6 +138,7 @@ private void processStartPartition( .setStartTimestamp(record.getStartTimestamp()) .setEndTimestamp(partition.getEndTimestamp()) .setHeartbeatMillis(partition.getHeartbeatMillis()) + .setTvfName(partition.getTvfName()) .setState(CREATED) .setWatermark(record.getStartTimestamp()) .build(); @@ -146,7 +147,10 @@ private void processStartPartition( partitionMetadataDao .runInTransaction( transaction -> { - if (transaction.getPartition(startPartitionToken) == null) { + if (transaction.getPartition( + PartitionMetadataDao.composePartitionTokenWithTvfName( + startPartitionToken, partition.getTvfName())) + == null) { transaction.insert(row); return true; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 7feec990f52e..1eb16af6c347 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -185,11 +185,14 @@ public ProcessContinuation run( ManualWatermarkEstimator watermarkEstimator, BundleFinalizer bundleFinalizer) { final String token = partition.getPartitionToken(); + final String tvfName = partition.getTvfName(); // TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the // ReadChangeStreamPartitionDoFn#processElement is called final PartitionMetadata updatedPartition = - Optional.ofNullable(partitionMetadataDao.getPartition(token)) + Optional.ofNullable( + partitionMetadataDao.getPartition( + PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName))) .map(partitionMetadataMapper::from) .orElseThrow( () -> @@ -223,7 +226,11 @@ public ProcessContinuation run( try (ChangeStreamResultSet resultSet = changeStreamDao.changeStreamQuery( - token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) { + token, + tvfName, + startTimestamp, + changeStreamQueryEndTimestamp, + partition.getHeartbeatMillis())) { metrics.incQueryCounter(); while (resultSet.next()) { @@ -298,7 +305,9 @@ public ProcessContinuation run( LOG.debug("[{}] Continuation present, returning {}", token, maybeContinuation); bundleFinalizer.afterBundleCommit( Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT), - updateWatermarkCallback(token, watermarkEstimator)); + updateWatermarkCallback( + PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName), + watermarkEstimator)); return maybeContinuation.get(); } } @@ -361,25 +370,27 @@ public ProcessContinuation run( LOG.debug("[{}] Finishing partition", token); // TODO: This should be performed after the commit succeeds. Since bundle finalizers are not // guaranteed to be called, this needs to be performed in a subsequent fused stage. - partitionMetadataDao.updateToFinished(token); + partitionMetadataDao.updateToFinished( + PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName)); metrics.decActivePartitionReadCounter(); LOG.info("[{}] After attempting to finish the partition", token); return ProcessContinuation.stop(); } private BundleFinalizer.Callback updateWatermarkCallback( - String token, WatermarkEstimator watermarkEstimator) { + String composedToken, WatermarkEstimator watermarkEstimator) { return () -> { final Instant watermark = watermarkEstimator.currentWatermark(); - LOG.debug("[{}] Updating current watermark to {}", token, watermark); + LOG.debug("[{}] Updating current watermark to {}", composedToken, watermark); try { partitionMetadataDao.updateWatermark( - token, Timestamp.ofTimeMicroseconds(watermark.getMillis() * 1_000L)); + composedToken, Timestamp.ofTimeMicroseconds(watermark.getMillis() * 1_000L)); } catch (SpannerException e) { if (e.getErrorCode() == ErrorCode.NOT_FOUND) { - LOG.debug("[{}] Unable to update the current watermark, partition NOT FOUND", token); + LOG.debug( + "[{}] Unable to update the current watermark, partition NOT FOUND", composedToken); } else { - LOG.error("[{}] Error updating the current watermark", token, e); + LOG.error("[{}] Error updating the current watermark", composedToken, e); } } }; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java index 0f32fa46cde7..b5f24a0a2c2e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_TVF_NAME; + import com.google.cloud.Timestamp; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.Dialect; @@ -83,6 +85,7 @@ public class ChangeStreamDao { */ public ChangeStreamResultSet changeStreamQuery( String partitionToken, + String tvfName, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { @@ -95,10 +98,14 @@ public ChangeStreamResultSet changeStreamQuery( if (this.isPostgres()) { // Ensure we have determined whether change stream uses mutable key range if (this.isMutableChangeStream) { - query = - "SELECT * FROM \"spanner\".\"read_proto_bytes_" - + changeStreamName - + "\"($1, $2, $3, $4, null)"; + if (tvfName == null || tvfName.equals(DEFAULT_TVF_NAME)) { + query = + "SELECT * FROM \"spanner\".\"read_proto_bytes_" + + changeStreamName + + "\"($1, $2, $3, $4, null)"; + } else { + query = "SELECT * FROM \"spanner\".\"" + tvfName + "\"($1, $2, $3, $4, null)"; + } } else { query = "SELECT * FROM \"spanner\".\"read_json_" @@ -117,16 +124,29 @@ public ChangeStreamResultSet changeStreamQuery( .to(heartbeatMillis) .build(); } else { - query = - "SELECT * FROM READ_" - + changeStreamName - + "(" - + " start_timestamp => @startTimestamp," - + " end_timestamp => @endTimestamp," - + " partition_token => @partitionToken," - + " read_options => null," - + " heartbeat_milliseconds => @heartbeatMillis" - + ")"; + if (this.isMutableChangeStream && tvfName != null && !tvfName.equals(DEFAULT_TVF_NAME)) { + query = + "SELECT * FROM " + + tvfName + + "(" + + " start_timestamp => @startTimestamp," + + " end_timestamp => @endTimestamp," + + " partition_token => @partitionToken," + + " read_options => null," + + " heartbeat_milliseconds => @heartbeatMillis" + + ")"; + } else { + query = + "SELECT * FROM READ_" + + changeStreamName + + "(" + + " start_timestamp => @startTimestamp," + + " end_timestamp => @endTimestamp," + + " partition_token => @partitionToken," + + " read_options => null," + + " heartbeat_milliseconds => @heartbeatMillis" + + ")"; + } statement = Statement.newBuilder(query) .bind("startTimestamp") diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java index 95bdbfed7ca5..2fe4239ca0fd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java @@ -21,8 +21,10 @@ import com.google.cloud.spanner.Dialect; import com.google.cloud.spanner.Options.RpcPriority; import java.io.Serializable; +import java.util.List; import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants; /** * Factory class to create data access objects to perform change stream queries and access the @@ -45,6 +47,7 @@ public class DaoFactory implements Serializable { private final String changeStreamName; private final PartitionMetadataTableNames partitionMetadataTableNames; + private final List tvfNameList; private final RpcPriority rpcPriority; private final String jobName; private final Dialect spannerChangeStreamDatabaseDialect; @@ -58,12 +61,14 @@ public class DaoFactory implements Serializable { * @param changeStreamName the name of the change stream for the change streams DAO * @param metadataSpannerConfig the metadata tables configuration * @param partitionMetadataTableNames the names of the partition metadata ddl objects + * @param tvfNameList the list of TVF names specified to query and union * @param rpcPriority the priority of the requests made by the DAO queries * @param jobName the name of the running job */ public DaoFactory( SpannerConfig changeStreamSpannerConfig, String changeStreamName, + List tvfNameList, SpannerConfig metadataSpannerConfig, PartitionMetadataTableNames partitionMetadataTableNames, RpcPriority rpcPriority, @@ -79,6 +84,8 @@ public DaoFactory( } this.changeStreamSpannerConfig = changeStreamSpannerConfig; this.changeStreamName = changeStreamName; + this.tvfNameList = + tvfNameList == null ? ChangeStreamsConstants.DEFAULT_TVF_NAME_LIST : tvfNameList; this.metadataSpannerConfig = metadataSpannerConfig; this.partitionMetadataTableNames = partitionMetadataTableNames; this.rpcPriority = rpcPriority; @@ -88,6 +95,11 @@ public DaoFactory( this.isMutableChangeStream = isMutableChangeStream; } + /** Returns the tvf name list. */ + public List getTvfNameList() { + return this.tvfNameList; + } + /** * Creates and returns a singleton DAO instance for admin operations over the partition metadata * table. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java index b407d5b0b6cc..3b3620fd0005 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_TVF_NAME; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.PARTITION_TOKEN_TVF_NAME_DELIMITER; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_CREATED_AT; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_END_TIMESTAMP; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_FINISHED_AT; @@ -132,14 +134,41 @@ public List findAllTableIndexes() { return result; } + public static String escapeTvfName(String tvfName) { + return tvfName.replace("'", "").replace("\"", ""); + } + + public static String composePartitionTokenWithTvfName(String partitionToken, String tvfName) { + if (tvfName == null || tvfName.isEmpty()) { + return partitionToken; + } + return partitionToken + PARTITION_TOKEN_TVF_NAME_DELIMITER + escapeTvfName(tvfName); + } + + public static String extractPartitionToken(String composedPartitionToken) { + int index = composedPartitionToken.indexOf(PARTITION_TOKEN_TVF_NAME_DELIMITER); + if (index == -1) { + return composedPartitionToken; + } + return composedPartitionToken.substring(0, index); + } + + public static String extractTvfName(String composedPartitionToken) { + int index = composedPartitionToken.indexOf(PARTITION_TOKEN_TVF_NAME_DELIMITER); + if (index == -1) { + return DEFAULT_TVF_NAME; + } + return composedPartitionToken.substring(index + PARTITION_TOKEN_TVF_NAME_DELIMITER.length()); + } + /** - * Fetches the partition metadata row data for the given partition token. + * Fetches the partition metadata row data for the given composedPartitionToken. * - * @param partitionToken the partition unique identifier - * @return the partition metadata for the given token if it exists as a struct. Otherwise, it - * returns null. + * @param composedPartitionToken the partition unique identifier + * @return the partition metadata for the given composedPartitionToken if it exists as a struct. + * Otherwise, it returns null. */ - public @Nullable Struct getPartition(String partitionToken) { + public @Nullable Struct getPartition(String composedPartitionToken) { Statement statement; if (this.isPostgres()) { statement = @@ -150,7 +179,7 @@ public List findAllTableIndexes() { + COLUMN_PARTITION_TOKEN + "\" = $1") .bind("p1") - .to(partitionToken) + .to(composedPartitionToken) .build(); } else { statement = @@ -161,7 +190,7 @@ public List findAllTableIndexes() { + COLUMN_PARTITION_TOKEN + " = @partition") .bind("partition") - .to(partitionToken) + .to(composedPartitionToken) .build(); } try (ResultSet resultSet = @@ -340,54 +369,69 @@ public Timestamp insert(PartitionMetadata row) { return transactionResult.getCommitTimestamp(); } + /** + * Inserts the partition metadata in batch. + * + * @param rows the partition metadata objects to be inserted + * @return the commit timestamp of the read / write transaction + */ + public Timestamp insert(List rows) { + final TransactionResult transactionResult = + runInTransaction(transaction -> transaction.insert(rows), "InsertsPartitionMetadataBatch"); + return transactionResult.getCommitTimestamp(); + } + /** * Updates multiple partition row to {@link State#SCHEDULED} state. * - * @param partitionTokens the partitions' unique identifiers + * @param composedPartitionTokens the partitions' unique identifiers * @return the commit timestamp of the read / write transaction */ - public Timestamp updateToScheduled(List partitionTokens) { + public Timestamp updateToScheduled(List composedPartitionTokens) { final TransactionResult transactionResult = runInTransaction( - transaction -> transaction.updateToScheduled(partitionTokens), "updateToScheduled"); + transaction -> transaction.updateToScheduled(composedPartitionTokens), + "updateToScheduled"); return transactionResult.getCommitTimestamp(); } /** * Updates a partition row to {@link State#RUNNING} state. * - * @param partitionToken the partition unique identifier + * @param composedPartitionToken the partition unique identifier * @return the commit timestamp of the read / write transaction */ - public Timestamp updateToRunning(String partitionToken) { + public Timestamp updateToRunning(String composedPartitionToken) { final TransactionResult transactionResult = runInTransaction( - transaction -> transaction.updateToRunning(partitionToken), "updateToRunning"); + transaction -> transaction.updateToRunning(composedPartitionToken), "updateToRunning"); return transactionResult.getCommitTimestamp(); } /** * Updates a partition row to {@link State#FINISHED} state. * - * @param partitionToken the partition unique identifier + * @param composedPartitionToken the partition unique identifier * @return the commit timestamp of the read / write transaction */ - public Timestamp updateToFinished(String partitionToken) { + public Timestamp updateToFinished(String composedPartitionToken) { final TransactionResult transactionResult = runInTransaction( - transaction -> transaction.updateToFinished(partitionToken), "updateToFinished"); + transaction -> transaction.updateToFinished(composedPartitionToken), + "updateToFinished"); return transactionResult.getCommitTimestamp(); } /** * Update the partition watermark to the given timestamp. * - * @param partitionToken the partition unique identifier + * @param composedPartitionToken the partition unique identifier * @param watermark the new partition watermark */ - public void updateWatermark(String partitionToken, Timestamp watermark) { + public void updateWatermark(String composedPartitionToken, Timestamp watermark) { runInTransaction( - transaction -> transaction.updateWatermark(partitionToken, watermark), "updateWatermark"); + transaction -> transaction.updateWatermark(composedPartitionToken, watermark), + "updateWatermark"); } /** @@ -464,14 +508,28 @@ public Void insert(PartitionMetadata row) { return null; } + /** + * Inserts the partition metadata in batch. + * + * @param rows the partition metadata objects to be inserted + */ + public Void insert(List rows) { + List mutations = new ArrayList<>(); + for (PartitionMetadata row : rows) { + mutations.add(createInsertMetadataMutationFrom(row)); + } + transaction.buffer(mutations); + return null; + } + /** * Updates multiple partition rows to {@link State#SCHEDULED} state. * - * @param partitionTokens the partitions' unique identifiers + * @param composedPartitionTokens the partitions' unique identifiers */ - public Void updateToScheduled(List partitionTokens) { + public Void updateToScheduled(List composedPartitionTokens) { HashSet tokens = new HashSet<>(); - Statement statement = getPartitionsMatchingState(partitionTokens, State.CREATED); + Statement statement = getPartitionsMatchingState(composedPartitionTokens, State.CREATED); try (ResultSet resultSet = transaction.executeQuery(statement, Options.tag("getPartitionsMatchingState=CREATED"))) { while (resultSet.next()) { @@ -479,16 +537,16 @@ public Void updateToScheduled(List partitionTokens) { } } - for (String partitionToken : partitionTokens) { - if (!tokens.contains(partitionToken)) { - LOG.info("[{}] Did not update to be SCHEDULED", partitionToken); + for (String composedPartitionToken : composedPartitionTokens) { + if (!tokens.contains(composedPartitionToken)) { + LOG.info("[{}] Did not update to be SCHEDULED", composedPartitionToken); continue; } - LOG.info("[{}] Successfully updating to be SCHEDULED", partitionToken); + LOG.info("[{}] Successfully updating to be SCHEDULED", composedPartitionToken); transaction.buffer( ImmutableList.of( - createUpdateMetadataStateMutationFrom(partitionToken, State.SCHEDULED))); + createUpdateMetadataStateMutationFrom(composedPartitionToken, State.SCHEDULED))); } return null; } @@ -496,35 +554,38 @@ public Void updateToScheduled(List partitionTokens) { /** * Updates a partition row to {@link State#RUNNING} state. * - * @param partitionToken the partition unique identifier + * @param composedPartitionToken the partition unique identifier */ - public Void updateToRunning(String partitionToken) { + public Void updateToRunning(String composedPartitionToken) { Statement statement = - getPartitionsMatchingState(Collections.singletonList(partitionToken), State.SCHEDULED); + getPartitionsMatchingState( + Collections.singletonList(composedPartitionToken), State.SCHEDULED); try (ResultSet resultSet = transaction.executeQuery( statement, Options.tag("getPartitionsMatchingState=SCHEDULED"))) { if (!resultSet.next()) { - LOG.info("[{}] Did not update to be RUNNING", partitionToken); + LOG.info("[{}] Did not update to be RUNNING", composedPartitionToken); return null; } } - LOG.info("[{}] Successfully updating to be RUNNING", partitionToken); + LOG.info("[{}] Successfully updating to be RUNNING", composedPartitionToken); transaction.buffer( - ImmutableList.of(createUpdateMetadataStateMutationFrom(partitionToken, State.RUNNING))); + ImmutableList.of( + createUpdateMetadataStateMutationFrom(composedPartitionToken, State.RUNNING))); return null; } /** * Updates a partition row to {@link State#FINISHED} state. * - * @param partitionToken the partition unique identifier + * @param composedPartitionToken the partition unique identifier */ - public Void updateToFinished(String partitionToken) { - LOG.info("[{}] Successfully updating to be FINISHED", partitionToken); + public Void updateToFinished(String composedPartitionToken) { + LOG.info("[{}] Successfully updating to be FINISHED", composedPartitionToken); transaction.buffer( - ImmutableList.of(createUpdateMetadataStateMutationFrom(partitionToken, State.FINISHED))); + ImmutableList.of( + createUpdateMetadataStateMutationFrom(composedPartitionToken, State.FINISHED))); return null; } @@ -532,33 +593,36 @@ public Void updateToFinished(String partitionToken) { * Update the partition watermark to the given timestamp iff the partition watermark in metadata * table is smaller than the given watermark. * - * @param partitionToken the partition unique identifier + * @param composedPartitionToken the partition unique identifier * @param watermark the new partition watermark * @return the commit timestamp of the read / write transaction */ - public Void updateWatermark(String partitionToken, Timestamp watermark) { + public Void updateWatermark(String composedPartitionToken, Timestamp watermark) { Struct row = transaction.readRow( - metadataTableName, Key.of(partitionToken), Collections.singleton(COLUMN_WATERMARK)); + metadataTableName, + Key.of(composedPartitionToken), + Collections.singleton(COLUMN_WATERMARK)); if (row == null) { - LOG.error("[{}] Failed to read Watermark column", partitionToken); + LOG.error("[{}] Failed to read Watermark column", composedPartitionToken); return null; } Timestamp partitionWatermark = row.getTimestamp(COLUMN_WATERMARK); if (partitionWatermark.compareTo(watermark) < 0) { - transaction.buffer(createUpdateMetadataWatermarkMutationFrom(partitionToken, watermark)); + transaction.buffer( + createUpdateMetadataWatermarkMutationFrom(composedPartitionToken, watermark)); } return null; } /** - * Fetches the partition metadata row data for the given partition token. + * Fetches the partition metadata row data for the given composedPartitionToken. * - * @param partitionToken the partition unique identifier - * @return the partition metadata for the given token if it exists as a struct. Otherwise, it - * returns null. + * @param composedPartitionToken the partition unique identifier + * @return the partition metadata for the given composedPartitionToken if it exists as a struct. + * Otherwise, it returns null. */ - public @Nullable Struct getPartition(String partitionToken) { + public @Nullable Struct getPartition(String composedPartitionToken) { Statement statement; if (this.dialect == Dialect.POSTGRESQL) { statement = @@ -569,7 +633,7 @@ public Void updateWatermark(String partitionToken, Timestamp watermark) { + COLUMN_PARTITION_TOKEN + "\" = $1") .bind("p1") - .to(partitionToken) + .to(composedPartitionToken) .build(); } else { @@ -581,7 +645,7 @@ public Void updateWatermark(String partitionToken, Timestamp watermark) { + COLUMN_PARTITION_TOKEN + " = @partition") .bind("partition") - .to(partitionToken) + .to(composedPartitionToken) .build(); } try (ResultSet resultSet = @@ -597,7 +661,9 @@ public Void updateWatermark(String partitionToken, Timestamp watermark) { private Mutation createInsertMetadataMutationFrom(PartitionMetadata partitionMetadata) { return Mutation.newInsertBuilder(metadataTableName) .set(COLUMN_PARTITION_TOKEN) - .to(partitionMetadata.getPartitionToken()) + .to( + composePartitionTokenWithTvfName( + partitionMetadata.getPartitionToken(), partitionMetadata.getTvfName())) .set(COLUMN_PARENT_TOKENS) .toStringArray(partitionMetadata.getParentTokens()) .set(COLUMN_START_TIMESTAMP) @@ -615,20 +681,23 @@ private Mutation createInsertMetadataMutationFrom(PartitionMetadata partitionMet .build(); } - private Statement getPartitionsMatchingState(List partitionTokens, State state) { + private Statement getPartitionsMatchingState( + List composedPartitionTokens, State state) { Statement statement; if (this.dialect == Dialect.POSTGRESQL) { StringBuilder sqlStringBuilder = new StringBuilder("SELECT * FROM \"" + metadataTableName + "\""); sqlStringBuilder.append(" WHERE \""); sqlStringBuilder.append(COLUMN_STATE + "\" = " + "'" + state.toString() + "'"); - if (!partitionTokens.isEmpty()) { + if (!composedPartitionTokens.isEmpty()) { sqlStringBuilder.append(" AND \""); sqlStringBuilder.append(COLUMN_PARTITION_TOKEN); sqlStringBuilder.append("\""); sqlStringBuilder.append(" = ANY (Array["); sqlStringBuilder.append( - partitionTokens.stream().map(s -> "'" + s + "'").collect(Collectors.joining(","))); + composedPartitionTokens.stream() + .map(s -> "'" + s + "'") + .collect(Collectors.joining(","))); sqlStringBuilder.append("])"); } statement = Statement.newBuilder(sqlStringBuilder.toString()).build(); @@ -639,11 +708,11 @@ private Statement getPartitionsMatchingState(List partitionTokens, State + metadataTableName + " WHERE " + COLUMN_PARTITION_TOKEN - + " IN UNNEST(@partitionTokens) AND " + + " IN UNNEST(@composedPartitionTokens) AND " + COLUMN_STATE + " = @state") - .bind("partitionTokens") - .to(Value.stringArray(new ArrayList<>(partitionTokens))) + .bind("composedPartitionTokens") + .to(Value.stringArray(new ArrayList<>(composedPartitionTokens))) .bind("state") .to(state.toString()) .build(); @@ -651,14 +720,15 @@ private Statement getPartitionsMatchingState(List partitionTokens, State return statement; } - private Mutation createUpdateMetadataStateMutationFrom(String partitionToken, State state) { + private Mutation createUpdateMetadataStateMutationFrom( + String composedPartitionToken, State state) { final String timestampColumn = stateToTimestampColumn.get(state); if (timestampColumn == null) { throw new IllegalArgumentException("No timestamp column name found for state " + state); } return Mutation.newUpdateBuilder(metadataTableName) .set(COLUMN_PARTITION_TOKEN) - .to(partitionToken) + .to(composedPartitionToken) .set(COLUMN_STATE) .to(state.toString()) .set(timestampColumn) @@ -667,10 +737,10 @@ private Mutation createUpdateMetadataStateMutationFrom(String partitionToken, St } private Mutation createUpdateMetadataWatermarkMutationFrom( - String partitionToken, Timestamp watermark) { + String composedPartitionToken, Timestamp watermark) { return Mutation.newUpdateBuilder(metadataTableName) .set(COLUMN_PARTITION_TOKEN) - .to(partitionToken) + .to(composedPartitionToken) .set(COLUMN_WATERMARK) .to(watermark) .build(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java index 4191f2d93594..d643707e9688 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java @@ -18,7 +18,10 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory; @@ -66,12 +69,38 @@ public void processElement(OutputReceiver receiver) { daoFactory.getPartitionMetadataAdminDao().createPartitionMetadataTable(); createFakeParentPartition(); } - final PartitionMetadata initialPartition = - Optional.ofNullable(partitionMetadataDao.getPartition(InitialPartition.PARTITION_TOKEN)) - .map(mapperFactory.partitionMetadataMapper()::from) - .orElseThrow( - () -> new IllegalStateException("Initial partition not found in metadata table.")); - receiver.output(initialPartition); + if (daoFactory.getTvfNameList().isEmpty()) { + // For IMMUTABLE_KEY_RANGE change stream or MUTABLE_KEY_RANGE change stream without the + // specified tvf name list. + final PartitionMetadata initialPartition = + Optional.ofNullable( + partitionMetadataDao.getPartition( + PartitionMetadataDao.composePartitionTokenWithTvfName( + InitialPartition.PARTITION_TOKEN, + ChangeStreamsConstants.DEFAULT_TVF_NAME))) + .map(mapperFactory.partitionMetadataMapper()::from) + .orElseThrow( + () -> + new IllegalStateException("Initial partition not found in metadata table.")); + receiver.output(initialPartition); + } else { + // For MUTABLE_KEY_RANGE change stream with the specified tvf name list. + // We only need to output ONE initial partition to the next stage because + // DetectNewPartitionsDoFn will discover all fake parent partitions created in the same + // Spanner + // transaction with the same CreatedAt timestamp and schedule them properly in on batch. + String firstTvfName = daoFactory.getTvfNameList().get(0); + final PartitionMetadata initialPartition = + Optional.ofNullable( + partitionMetadataDao.getPartition( + PartitionMetadataDao.composePartitionTokenWithTvfName( + InitialPartition.PARTITION_TOKEN, firstTvfName))) + .map(mapperFactory.partitionMetadataMapper()::from) + .orElseThrow( + () -> + new IllegalStateException("Initial partition not found in metadata table.")); + receiver.output(initialPartition); + } } /** @@ -81,15 +110,37 @@ public void processElement(OutputReceiver receiver) { * specified in {@link InitializeDoFn#DEFAULT_HEARTBEAT_MILLIS}. */ private void createFakeParentPartition() { - PartitionMetadata parentPartition = - PartitionMetadata.newBuilder() - .setPartitionToken(InitialPartition.PARTITION_TOKEN) - .setStartTimestamp(startTimestamp) - .setEndTimestamp(endTimestamp) - .setHeartbeatMillis(heartbeatMillis) - .setState(State.CREATED) - .setWatermark(startTimestamp) - .build(); - daoFactory.getPartitionMetadataDao().insert(parentPartition); + if (daoFactory.getTvfNameList().isEmpty()) { + // For IMMUTABLE_KEY_RANGE change stream or MUTABLE_KEY_RANGE + // change stream without the specified tvf name list. + PartitionMetadata parentPartition = + PartitionMetadata.newBuilder() + .setPartitionToken(InitialPartition.PARTITION_TOKEN) + .setTvfName(ChangeStreamsConstants.DEFAULT_TVF_NAME) + .setStartTimestamp(startTimestamp) + .setEndTimestamp(endTimestamp) + .setHeartbeatMillis(heartbeatMillis) + .setState(State.CREATED) + .setWatermark(startTimestamp) + .build(); + daoFactory.getPartitionMetadataDao().insert(parentPartition); + } else { + // For MUTABLE_KEY_RANGE change stream with the specified tvf name list. + List parentPartitions = new ArrayList<>(); + for (String tvfName : daoFactory.getTvfNameList()) { + PartitionMetadata parentPartition = + PartitionMetadata.newBuilder() + .setPartitionToken(InitialPartition.PARTITION_TOKEN) + .setTvfName(tvfName) + .setStartTimestamp(startTimestamp) + .setEndTimestamp(endTimestamp) + .setHeartbeatMillis(heartbeatMillis) + .setState(State.CREATED) + .setWatermark(startTimestamp) + .build(); + parentPartitions.add(parentPartition); + } + daoFactory.getPartitionMetadataDao().insert(parentPartitions); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java index 750865efbf02..de509257ad8f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java @@ -145,13 +145,16 @@ public ManualWatermarkEstimator newWatermarkEstimator( @GetInitialRestriction public TimestampRange initialRestriction(@Element PartitionMetadata partition) { final String token = partition.getPartitionToken(); + final String tvfName = partition.getTvfName(); final com.google.cloud.Timestamp startTimestamp = partition.getStartTimestamp(); // Range represents closed-open interval final com.google.cloud.Timestamp endTimestamp = TimestampUtils.next(partition.getEndTimestamp()); final com.google.cloud.Timestamp partitionScheduledAt = partition.getScheduledAt(); final com.google.cloud.Timestamp partitionRunningAt = - daoFactory.getPartitionMetadataDao().updateToRunning(token); + daoFactory + .getPartitionMetadataDao() + .updateToRunning(PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName)); if (partitionScheduledAt != null && partitionRunningAt != null) { metrics.updatePartitionScheduledToRunning( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java index 8f3e14e7f185..e86da93aec06 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java @@ -33,6 +33,7 @@ import com.google.cloud.spanner.Struct; import java.util.List; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; @@ -79,7 +80,9 @@ public class PartitionMetadataMapper { */ public PartitionMetadata from(Struct row) { return PartitionMetadata.newBuilder() - .setPartitionToken(row.getString(COLUMN_PARTITION_TOKEN)) + .setPartitionToken( + PartitionMetadataDao.extractPartitionToken(row.getString(COLUMN_PARTITION_TOKEN))) + .setTvfName(PartitionMetadataDao.extractTvfName(row.getString(COLUMN_PARTITION_TOKEN))) .setParentTokens( !row.isNull(COLUMN_PARENT_TOKENS) ? Sets.newHashSet(row.getStringList(COLUMN_PARENT_TOKENS)) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/PartitionMetadata.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/PartitionMetadata.java index 1db605fc2506..67a481aa6e20 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/PartitionMetadata.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/PartitionMetadata.java @@ -93,6 +93,8 @@ public enum State { @AvroEncode(using = TimestampEncoding.class) private Timestamp finishedAt; + private String tvfName; + /** Default constructor for serialization only. */ private PartitionMetadata() {} @@ -107,7 +109,8 @@ public PartitionMetadata( Timestamp createdAt, @Nullable Timestamp scheduledAt, @Nullable Timestamp runningAt, - @Nullable Timestamp finishedAt) { + @Nullable Timestamp finishedAt, + String tvfName) { this.partitionToken = partitionToken; this.parentTokens = parentTokens; this.startTimestamp = startTimestamp; @@ -119,13 +122,19 @@ public PartitionMetadata( this.scheduledAt = scheduledAt; this.runningAt = runningAt; this.finishedAt = finishedAt; + this.tvfName = tvfName; } - /** Unique partition identifier, which can be used to perform a change stream query. */ + /** The partition token used to perform a change stream query. */ public String getPartitionToken() { return partitionToken; } + /** The table-valued function used to perform a change stream query. */ + public String getTvfName() { + return tvfName; + } + /** * The unique partition identifiers of the parent partitions where this child partition originated * from. @@ -212,7 +221,8 @@ public boolean equals(@Nullable Object o) { && Objects.equals(createdAt, that.createdAt) && Objects.equals(scheduledAt, that.scheduledAt) && Objects.equals(runningAt, that.runningAt) - && Objects.equals(finishedAt, that.finishedAt); + && Objects.equals(finishedAt, that.finishedAt) + && Objects.equals(tvfName, that.tvfName); } @Override @@ -228,7 +238,8 @@ public int hashCode() { createdAt, scheduledAt, runningAt, - finishedAt); + finishedAt, + tvfName); } @Override @@ -257,6 +268,8 @@ public String toString() { + runningAt + ", finishedAt=" + finishedAt + + ", tvfName=" + + tvfName + '}'; } @@ -279,6 +292,7 @@ public static class Builder { @Nullable private Timestamp scheduledAt; @Nullable private Timestamp runningAt; @Nullable private Timestamp finishedAt; + private String tvfName; public Builder() {} @@ -294,6 +308,7 @@ private Builder(PartitionMetadata partition) { this.scheduledAt = partition.scheduledAt; this.runningAt = partition.runningAt; this.finishedAt = partition.finishedAt; + this.tvfName = partition.tvfName; } /** Sets the unique partition identifier. */ @@ -362,6 +377,12 @@ public Builder setFinishedAt(@Nullable Timestamp finishedAt) { return this; } + /** Sets the TVF to be used for querying this partition. */ + public Builder setTvfName(String tvfName) { + this.tvfName = tvfName; + return this; + } + /** * Builds a {@link PartitionMetadata} from the given fields. Mandatory fields are: * @@ -394,7 +415,8 @@ public PartitionMetadata build() { createdAt, scheduledAt, runningAt, - finishedAt); + finishedAt, + tvfName); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadChangeStreamTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadChangeStreamTest.java index 589d831e1a45..f9529a154476 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadChangeStreamTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadChangeStreamTest.java @@ -230,6 +230,52 @@ public void testIsMutableChangeStream() { } } + /** Tests for TVF existence validation. */ + @RunWith(JUnit4.class) + public static class TvfExistenceTests { + + @Test + public void testCheckTvfExistence_Success() { + DatabaseClient databaseClient = mock(DatabaseClient.class); + ReadOnlyTransaction transaction = mock(ReadOnlyTransaction.class); + ResultSet resultSet = mock(ResultSet.class); + + when(databaseClient.readOnlyTransaction()).thenReturn(transaction); + when(transaction.executeQuery(any(Statement.class))).thenReturn(resultSet); + when(resultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false); + when(resultSet.getString(0)).thenReturn("tvf1").thenReturn("tvf2"); + + SpannerIO.checkTvfExistence(databaseClient, Arrays.asList("tvf1", "tvf2")); + } + + @Test(expected = IllegalArgumentException.class) + public void testCheckTvfExistence_CaseInsensitive() { + DatabaseClient databaseClient = mock(DatabaseClient.class); + ReadOnlyTransaction transaction = mock(ReadOnlyTransaction.class); + ResultSet resultSet = mock(ResultSet.class); + + when(databaseClient.readOnlyTransaction()).thenReturn(transaction); + when(transaction.executeQuery(any(Statement.class))).thenReturn(resultSet); + when(resultSet.next()).thenReturn(true).thenReturn(false); + when(resultSet.getString(0)).thenReturn("tvf1"); // DB returns uppercase + + SpannerIO.checkTvfExistence(databaseClient, Arrays.asList("TVF1")); + } + + @Test(expected = IllegalArgumentException.class) + public void testCheckTvfExistence_NotFound() { + DatabaseClient databaseClient = mock(DatabaseClient.class); + ReadOnlyTransaction transaction = mock(ReadOnlyTransaction.class); + ResultSet resultSet = mock(ResultSet.class); + + when(databaseClient.readOnlyTransaction()).thenReturn(transaction); + when(transaction.executeQuery(any(Statement.class))).thenReturn(resultSet); + when(resultSet.next()).thenReturn(false); + + SpannerIO.checkTvfExistence(databaseClient, Arrays.asList("missing_tvf")); + } + } + /** Tests for error handling and exceptions. */ @RunWith(JUnit4.class) public static class ErrorTests { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java index c8435bcfdffd..f7f2eca60bbc 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java @@ -593,13 +593,13 @@ private void mockTableExists() { private ResultSet mockchangePartitionState( Timestamp startTimestamp, Timestamp after3Seconds, String state) { - List tokens = new ArrayList<>(); - tokens.add("Parent0"); + List composedPartitionTokens = new ArrayList<>(); + composedPartitionTokens.add("Parent0"); Statement getPartitionStatement = Statement.newBuilder( - "SELECT * FROM my-metadata-table WHERE PartitionToken IN UNNEST(@partitionTokens) AND State = @state") - .bind("partitionTokens") - .toStringArray(tokens) + "SELECT * FROM my-metadata-table WHERE PartitionToken IN UNNEST(@composedPartitionTokens) AND State = @state") + .bind("composedPartitionTokens") + .toStringArray(composedPartitionTokens) .bind("state") .to(state) .build(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java index 38e3b7fdfa0a..c99ee743b6a8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java @@ -218,7 +218,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildExists() { assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); - verify(transaction, never()).insert(any()); + verify(transaction, never()).insert(any(PartitionMetadata.class)); } @Test @@ -242,7 +242,7 @@ public void testRestrictionNotClaimed() { assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); - verify(dao, never()).insert(any()); + verify(dao, never()).insert(any(PartitionMetadata.class)); } @Test @@ -267,6 +267,6 @@ public void testSoftDeadlineReached() { assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); - verify(dao, never()).insert(any()); + verify(dao, never()).insert(any(PartitionMetadata.class)); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionStartRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionStartRecordActionTest.java index b37f3d6b198f..32b73393da70 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionStartRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionStartRecordActionTest.java @@ -115,7 +115,7 @@ public void testRestrictionNotClaimed() { assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); - verify(dao, never()).insert(any()); + verify(dao, never()).insert(any(PartitionMetadata.class)); } @Test @@ -135,6 +135,6 @@ public void testSoftDeadlineReached() { assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); - verify(dao, never()).insert(any()); + verify(dao, never()).insert(any(PartitionMetadata.class)); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index 7c5d6d0f1870..17ece3da2877 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -66,6 +66,7 @@ public class QueryChangeStreamActionTest { private static final String PARTITION_TOKEN = "partitionToken"; + private static final String TVF_NAME = ""; private static final Timestamp PARTITION_START_TIMESTAMP = Timestamp.ofTimeMicroseconds(10L); private static final Timestamp RECORD_TIMESTAMP = Timestamp.ofTimeMicroseconds(20L); private static final Timestamp PARTITION_END_TIMESTAMP = Timestamp.ofTimeMicroseconds(30L); @@ -126,6 +127,7 @@ public void setUp() throws Exception { partition = PartitionMetadata.newBuilder() .setPartitionToken(PARTITION_TOKEN) + .setTvfName(TVF_NAME) .setParentTokens(Sets.newHashSet("parentToken")) .setStartTimestamp(PARTITION_START_TIMESTAMP) .setEndTimestamp(PARTITION_END_TIMESTAMP) @@ -143,7 +145,9 @@ public void setUp() throws Exception { when(restrictionTracker.currentRestriction()).thenReturn(restriction); when(restriction.getFrom()).thenReturn(PARTITION_START_TIMESTAMP); when(restriction.getTo()).thenReturn(PARTITION_END_TIMESTAMP); - when(partitionMetadataDao.getPartition(PARTITION_TOKEN)).thenReturn(row); + when(partitionMetadataDao.getPartition( + PartitionMetadataDao.composePartitionTokenWithTvfName(PARTITION_TOKEN, TVF_NAME))) + .thenReturn(row); when(partitionMetadataMapper.from(row)).thenReturn(partition); } @@ -157,6 +161,7 @@ void setupUnboundedPartition() { .setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS) .setState(SCHEDULED) .setWatermark(WATERMARK_TIMESTAMP) + .setTvfName(TVF_NAME) .setScheduledAt(Timestamp.now()) .build(); when(partitionMetadataMapper.from(any())).thenReturn(partition); @@ -175,6 +180,7 @@ public void testQueryChangeStreamWithDataChangeRecord() { when(record2.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, + TVF_NAME, PARTITION_START_TIMESTAMP, PARTITION_END_TIMESTAMP, PARTITION_HEARTBEAT_MILLIS)) @@ -223,7 +229,10 @@ public void testQueryChangeStreamWithDataChangeRecord() { any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator)); - verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); + verify(partitionMetadataDao) + .updateWatermark( + PartitionMetadataDao.composePartitionTokenWithTvfName(PARTITION_TOKEN, TVF_NAME), + WATERMARK_TIMESTAMP); verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -245,6 +254,7 @@ public void testQueryChangeStreamWithHeartbeatRecord() { when(record2.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, + TVF_NAME, PARTITION_START_TIMESTAMP, PARTITION_END_TIMESTAMP, PARTITION_HEARTBEAT_MILLIS)) @@ -315,6 +325,7 @@ public void testQueryChangeStreamWithHeartbeatRecordAndCancelOnHeartbeat() { when(record2.getRecordTimestamp()).thenReturn(PARTITION_END_TIMESTAMP); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, + TVF_NAME, PARTITION_START_TIMESTAMP, PARTITION_END_TIMESTAMP, PARTITION_HEARTBEAT_MILLIS)) @@ -379,6 +390,7 @@ public void testQueryChangeStreamWithChildPartitionsRecord() { when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, + TVF_NAME, PARTITION_START_TIMESTAMP, PARTITION_END_TIMESTAMP, PARTITION_HEARTBEAT_MILLIS)) @@ -423,7 +435,10 @@ public void testQueryChangeStreamWithChildPartitionsRecord() { eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); - verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); + verify(partitionMetadataDao) + .updateWatermark( + PartitionMetadataDao.composePartitionTokenWithTvfName(PARTITION_TOKEN, TVF_NAME), + WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); @@ -449,6 +464,7 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStart() { when(record2.getRecordTimestamp()).thenReturn(Timestamp.ofTimeMicroseconds(25L)); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, + TVF_NAME, Timestamp.ofTimeMicroseconds(15L), PARTITION_END_TIMESTAMP, PARTITION_HEARTBEAT_MILLIS)) @@ -486,7 +502,10 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStart() { eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); - verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); + verify(partitionMetadataDao) + .updateWatermark( + PartitionMetadataDao.composePartitionTokenWithTvfName(PARTITION_TOKEN, TVF_NAME), + WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); @@ -505,6 +524,7 @@ public void testQueryChangeStreamWithPartitionStartRecord() { when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, + TVF_NAME, PARTITION_START_TIMESTAMP, PARTITION_END_TIMESTAMP, PARTITION_HEARTBEAT_MILLIS)) @@ -534,7 +554,10 @@ public void testQueryChangeStreamWithPartitionStartRecord() { eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); - verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); + verify(partitionMetadataDao) + .updateWatermark( + PartitionMetadataDao.composePartitionTokenWithTvfName(PARTITION_TOKEN, TVF_NAME), + WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); @@ -555,6 +578,7 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStartForPartit when(record1.getRecordTimestamp()).thenReturn(Timestamp.ofTimeMicroseconds(15L)); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, + TVF_NAME, Timestamp.ofTimeMicroseconds(15L), PARTITION_END_TIMESTAMP, PARTITION_HEARTBEAT_MILLIS)) @@ -584,7 +608,10 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStartForPartit eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); - verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); + verify(partitionMetadataDao) + .updateWatermark( + PartitionMetadataDao.composePartitionTokenWithTvfName(PARTITION_TOKEN, TVF_NAME), + WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); @@ -601,6 +628,7 @@ public void testQueryChangeStreamWithPartitionEndRecordBoundedRestriction() { when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, + TVF_NAME, PARTITION_START_TIMESTAMP, PARTITION_END_TIMESTAMP, PARTITION_HEARTBEAT_MILLIS)) @@ -653,6 +681,7 @@ public void testQueryChangeStreamWithPartitionEndRecordUnboundedRestriction() { final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); when(changeStreamDao.changeStreamQuery( eq(PARTITION_TOKEN), + eq(TVF_NAME), eq(PARTITION_START_TIMESTAMP), timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS))) @@ -703,6 +732,7 @@ public void testQueryChangeStreamWithPartitionEventRecord() { when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, + TVF_NAME, PARTITION_START_TIMESTAMP, PARTITION_END_TIMESTAMP, PARTITION_HEARTBEAT_MILLIS)) @@ -732,7 +762,10 @@ public void testQueryChangeStreamWithPartitionEventRecord() { eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); - verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); + verify(partitionMetadataDao) + .updateWatermark( + PartitionMetadataDao.composePartitionTokenWithTvfName(PARTITION_TOKEN, TVF_NAME), + WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); @@ -747,6 +780,7 @@ public void testQueryChangeStreamWithStreamFinished() { final ChangeStreamResultSet changeStreamResultSet = mock(ChangeStreamResultSet.class); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, + TVF_NAME, PARTITION_START_TIMESTAMP, PARTITION_END_TIMESTAMP, PARTITION_HEARTBEAT_MILLIS)) @@ -782,6 +816,7 @@ public void testQueryChangeStreamFinishedWithResume() { .setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS) .setState(SCHEDULED) .setWatermark(WATERMARK_TIMESTAMP) + .setTvfName(TVF_NAME) .setScheduledAt(Timestamp.now()) .build(); when(partitionMetadataMapper.from(any())).thenReturn(partition); @@ -790,6 +825,7 @@ public void testQueryChangeStreamFinishedWithResume() { final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); when(changeStreamDao.changeStreamQuery( eq(PARTITION_TOKEN), + eq(TVF_NAME), eq(PARTITION_START_TIMESTAMP), timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS))) @@ -826,6 +862,7 @@ public void testQueryChangeStreamWithOutOfRangeErrorOnUnboundedPartition() { final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); when(changeStreamDao.changeStreamQuery( eq(PARTITION_TOKEN), + eq(TVF_NAME), eq(PARTITION_START_TIMESTAMP), timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS))) @@ -860,6 +897,7 @@ public void testQueryChangeStreamWithOutOfRangeErrorOnUnboundedPartition() { public void testQueryChangeStreamWithOutOfRangeErrorOnBoundedPartition() { when(changeStreamDao.changeStreamQuery( eq(PARTITION_TOKEN), + eq(TVF_NAME), eq(PARTITION_START_TIMESTAMP), eq(PARTITION_END_TIMESTAMP), eq(PARTITION_HEARTBEAT_MILLIS))) @@ -896,6 +934,7 @@ public void testQueryChangeStreamWithChildPartitionsRecordBoundedRestriction() { when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, + TVF_NAME, PARTITION_START_TIMESTAMP, PARTITION_END_TIMESTAMP, PARTITION_HEARTBEAT_MILLIS)) @@ -948,6 +987,7 @@ public void testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction() final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); when(changeStreamDao.changeStreamQuery( eq(PARTITION_TOKEN), + eq(TVF_NAME), eq(PARTITION_START_TIMESTAMP), timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS))) @@ -1020,8 +1060,11 @@ public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() { final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); when(changeStreamDao.changeStreamQuery( - eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP), - timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS))) + eq(PARTITION_TOKEN), + eq(TVF_NAME), + eq(PARTITION_START_TIMESTAMP), + timestampCaptor.capture(), + eq(PARTITION_HEARTBEAT_MILLIS))) .thenReturn(resultSet); when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap) when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -1068,8 +1111,11 @@ public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() { final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); when(changeStreamDao.changeStreamQuery( - eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP), - timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS))) + eq(PARTITION_TOKEN), + eq(TVF_NAME), + eq(PARTITION_START_TIMESTAMP), + timestampCaptor.capture(), + eq(PARTITION_HEARTBEAT_MILLIS))) .thenReturn(resultSet); when(resultSet.next()).thenReturn(false); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -1092,7 +1138,8 @@ public void testQueryChangeStreamUnboundedResumesCorrectly() { setupUnboundedPartition(); final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); - when(changeStreamDao.changeStreamQuery(any(), any(), any(), anyLong())).thenReturn(resultSet); + when(changeStreamDao.changeStreamQuery(any(), any(), any(), any(), anyLong())) + .thenReturn(resultSet); when(resultSet.next()).thenReturn(false); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDaoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDaoTest.java index 9bdaa7b9fa5a..1f0e05ce6861 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDaoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDaoTest.java @@ -57,7 +57,7 @@ public void testChangeStreamQueryPostgresMutable() { CHANGE_STREAM_NAME, databaseClient, rpcPriority, "testjob", Dialect.POSTGRESQL, true); // Act: call the method that constructs and executes the statement - changeStreamDao.changeStreamQuery(null, null, null, 0L); + changeStreamDao.changeStreamQuery(null, null, null, null, 0L); // Assert: capture the Statement passed to singleUse().executeQuery and verify // SQL @@ -84,7 +84,7 @@ public void testChangeStreamQueryPostgresImmutable() { CHANGE_STREAM_NAME, databaseClient, rpcPriority, "testjob", Dialect.POSTGRESQL, false); // Act - changeStreamDao.changeStreamQuery(null, null, null, 0L); + changeStreamDao.changeStreamQuery(null, null, null, null, 0L); // Assert ArgumentCaptor captor = ArgumentCaptor.forClass(Statement.class); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java index c3bee10f8e14..b1256f7916e8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java @@ -28,6 +28,7 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Struct; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; @@ -72,16 +73,22 @@ public void testInitialize() { when(partitionMetadataDao.tableExists()).thenReturn(false); when(daoFactory.getPartitionMetadataAdminDao()).thenReturn(partitionMetadataAdminDao); doNothing().when(partitionMetadataAdminDao).createPartitionMetadataTable(); - when(partitionMetadataDao.insert(any())).thenReturn(Timestamp.ofTimeMicroseconds(1L)); - when(partitionMetadataDao.getPartition(InitialPartition.PARTITION_TOKEN)) + when(partitionMetadataDao.insert(any(PartitionMetadata.class))) + .thenReturn(Timestamp.ofTimeMicroseconds(1L)); + when(partitionMetadataDao.getPartition( + PartitionMetadataDao.composePartitionTokenWithTvfName( + InitialPartition.PARTITION_TOKEN, ChangeStreamsConstants.DEFAULT_TVF_NAME))) .thenReturn(Struct.newBuilder().build()); when(mapperFactory.partitionMetadataMapper()).thenReturn(partitionMetadataMapper); when(partitionMetadataMapper.from(any())).thenReturn(mock(PartitionMetadata.class)); initializeDoFn.processElement(receiver); verify(daoFactory, times(2)).getPartitionMetadataDao(); verify(daoFactory, times(1)).getPartitionMetadataAdminDao(); - verify(partitionMetadataDao, times(1)).insert(any()); - verify(partitionMetadataDao, times(1)).getPartition(InitialPartition.PARTITION_TOKEN); + verify(partitionMetadataDao, times(1)).insert(any(PartitionMetadata.class)); + verify(partitionMetadataDao, times(1)) + .getPartition( + PartitionMetadataDao.composePartitionTokenWithTvfName( + InitialPartition.PARTITION_TOKEN, ChangeStreamsConstants.DEFAULT_TVF_NAME)); verify(partitionMetadataDao, times(1)).tableExists(); verify(mapperFactory, times(1)).partitionMetadataMapper(); verify(partitionMetadataMapper, times(1)).from(any()); @@ -93,7 +100,8 @@ public void testInitializeWithNoPartition() { when(partitionMetadataDao.tableExists()).thenReturn(false); when(daoFactory.getPartitionMetadataAdminDao()).thenReturn(partitionMetadataAdminDao); doNothing().when(partitionMetadataAdminDao).createPartitionMetadataTable(); - when(partitionMetadataDao.insert(any())).thenReturn(Timestamp.ofTimeMicroseconds(1L)); + when(partitionMetadataDao.insert(any(PartitionMetadata.class))) + .thenReturn(Timestamp.ofTimeMicroseconds(1L)); when(mapperFactory.partitionMetadataMapper()).thenReturn(partitionMetadataMapper); when(partitionMetadataMapper.from(any())).thenReturn(mock(PartitionMetadata.class)); try { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java index d7e7e48a4b6a..feb425309f10 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java @@ -245,8 +245,7 @@ String createChangeStreamFor(String tableName) .updateDatabaseDdl( instanceId, databaseId, - Collections.singletonList( - "CREATE CHANGE STREAM \"" + changeStreamName + "\" FOR \"" + tableName + "\""), + Collections.singletonList(createPostgresChangeStreamDDL(changeStreamName, tableName)), null) .get(TIMEOUT_MINUTES, TimeUnit.MINUTES); } else { @@ -276,6 +275,21 @@ String createGSQLChangeStreamDDL(String changeStreamName, String tableName) { return "CREATE CHANGE STREAM " + changeStreamName + " FOR " + tableName; } + String createPostgresChangeStreamDDL(String changeStreamName, String tableName) { + if (this.isPlacementTableBasedChangeStream) { + // Create a MUTABLE_KEY_RANGE change stream. + String statement = + "CREATE CHANGE STREAM \"" + + changeStreamName + + "\" FOR \"" + + tableName + + "\"" + + " WITH (partition_mode = 'MUTABLE_KEY_RANGE')"; + return statement; + } + return "CREATE CHANGE STREAM \"" + changeStreamName + "\" FOR \"" + tableName + "\""; + } + void createRoleAndGrantPrivileges(String table, String changeStream) throws InterruptedException, ExecutionException, TimeoutException { if (this.isPostgres) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTableIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTableIT.java index 008328870223..390741e1c313 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTableIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTableIT.java @@ -33,6 +33,7 @@ import com.google.cloud.spanner.Statement; import com.google.gson.Gson; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -108,12 +109,18 @@ public void before() { @Test public void testReadSpannerChangeStream() { - testReadSpannerChangeStreamImpl(pipeline, null); + testReadSpannerChangeStreamImpl(pipeline, null, null); + } + + @Test + public void testReadSpannerChangeStreamWithTvfNameList() { + testReadSpannerChangeStreamImpl( + pipeline, null, Collections.singletonList("READ_" + changeStreamName)); } @Test public void testReadSpannerChangeStreamWithAuthorizedRole() { - testReadSpannerChangeStreamImpl(pipeline, ENV.getDatabaseRole()); + testReadSpannerChangeStreamImpl(pipeline, ENV.getDatabaseRole(), null); } @Test @@ -121,10 +128,12 @@ public void testReadSpannerChangeStreamWithUnauthorizedRole() { assumeTrue(pipeline.getOptions().getRunner() == DirectRunner.class); exception.expect(SpannerException.class); exception.expectMessage("Role not found: bad_role."); - testReadSpannerChangeStreamImpl(pipeline.enableAbandonedNodeEnforcement(false), "bad_role"); + testReadSpannerChangeStreamImpl( + pipeline.enableAbandonedNodeEnforcement(false), "bad_role", null); } - public void testReadSpannerChangeStreamImpl(TestPipeline testPipeline, String role) { + public void testReadSpannerChangeStreamImpl( + TestPipeline testPipeline, String role, List tvfNameList) { // Defines how many rows are going to be inserted / updated / deleted in the test final int numRows = 5; // Inserts numRows rows and uses the first commit timestamp as the startAt for reading the @@ -147,17 +156,21 @@ public void testReadSpannerChangeStreamImpl(TestPipeline testPipeline, String ro spannerConfig = spannerConfig.withDatabaseRole(StaticValueProvider.of(role)); } + SpannerIO.ReadChangeStream readChangeStream = + SpannerIO.readChangeStream() + .withSpannerConfig(spannerConfig) + .withChangeStreamName(changeStreamName) + .withMetadataDatabase(ENV.getMetadataDatabaseId()) + .withMetadataTable(metadataTableName) + .withInclusiveStartAt(startAt) + .withInclusiveEndAt(endAt); + + if (tvfNameList != null) { + readChangeStream = readChangeStream.withTvfNameList(tvfNameList); + } + final PCollection tokens = - testPipeline - .apply( - SpannerIO.readChangeStream() - .withSpannerConfig(spannerConfig) - .withChangeStreamName(changeStreamName) - .withMetadataDatabase(ENV.getMetadataDatabaseId()) - .withMetadataTable(metadataTableName) - .withInclusiveStartAt(startAt) - .withInclusiveEndAt(endAt)) - .apply(ParDo.of(new ModsToString())); + testPipeline.apply(readChangeStream).apply(ParDo.of(new ModsToString())); // Each row is composed by the following data // diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java index 573ac8259101..f53ba9382655 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java @@ -32,6 +32,7 @@ import com.google.cloud.spanner.Statement; import com.google.gson.Gson; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; @@ -99,6 +100,16 @@ public void before() { @Test public void testReadSpannerChangeStream() { + testReadSpannerChangeStreamImpl(null); + } + + @Test + public void testReadSpannerChangeStreamWithTvfNameList() { + testReadSpannerChangeStreamImpl( + Collections.singletonList("read_proto_bytes_" + "'" + changeStreamName + "'")); + } + + private void testReadSpannerChangeStreamImpl(List tvfNameList) { // Defines how many rows are going to be inserted / updated / deleted in the test final int numRows = 5; // Inserts numRows rows and uses the first commit timestamp as the startAt for reading the @@ -119,17 +130,21 @@ public void testReadSpannerChangeStream() { .withDatabaseId(databaseId) .withHost(ValueProvider.StaticValueProvider.of(host)); + SpannerIO.ReadChangeStream readChangeStream = + SpannerIO.readChangeStream() + .withSpannerConfig(spannerConfig) + .withChangeStreamName(changeStreamName) + .withMetadataDatabase(databaseId) + .withMetadataTable(metadataTableName) + .withInclusiveStartAt(startAt) + .withInclusiveEndAt(endAt); + + if (tvfNameList != null) { + readChangeStream = readChangeStream.withTvfNameList(tvfNameList); + } + final PCollection tokens = - pipeline - .apply( - SpannerIO.readChangeStream() - .withSpannerConfig(spannerConfig) - .withChangeStreamName(changeStreamName) - .withMetadataDatabase(databaseId) - .withMetadataTable(metadataTableName) - .withInclusiveStartAt(startAt) - .withInclusiveEndAt(endAt)) - .apply(ParDo.of(new ModsToString())); + pipeline.apply(readChangeStream).apply(ParDo.of(new ModsToString())); // Each row is composed by the following data // diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapperTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapperTest.java index 717f436b5998..5214138b7736 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapperTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapperTest.java @@ -33,6 +33,7 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Struct; import java.util.Collections; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; @@ -80,7 +81,7 @@ public void testMapPartitionMetadataFromResultSet() { assertEquals( new PartitionMetadata( - "token", + PartitionMetadataDao.extractPartitionToken("token"), Sets.newHashSet("parentToken"), Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(20L), @@ -90,7 +91,8 @@ public void testMapPartitionMetadataFromResultSet() { Timestamp.ofTimeMicroseconds(40), Timestamp.ofTimeMicroseconds(50), Timestamp.ofTimeMicroseconds(60), - Timestamp.ofTimeMicroseconds(70)), + Timestamp.ofTimeMicroseconds(70), + PartitionMetadataDao.extractTvfName("token")), partition); } @@ -126,7 +128,7 @@ public void testMapPartitionMetadataFromResultSetWithNulls() { assertEquals( new PartitionMetadata( - "token", + PartitionMetadataDao.extractPartitionToken("token"), Sets.newHashSet("parentToken"), Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(20L), @@ -136,7 +138,8 @@ public void testMapPartitionMetadataFromResultSetWithNulls() { Timestamp.ofTimeMicroseconds(40), null, null, - null), + null, + PartitionMetadataDao.extractTvfName("token")), partition); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ModelEncodingTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ModelEncodingTest.java index 37612c7d2e93..403dcdc19352 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ModelEncodingTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/ModelEncodingTest.java @@ -160,7 +160,8 @@ public void testPartitionMetadataCanBeEncoded() throws IOException { Timestamp.now(), Timestamp.now(), Timestamp.now(), - Timestamp.now()); + Timestamp.now(), + ""); assertEquals(partitionMetadata, encodeAndDecode(partitionMetadata)); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/PartitionMetadataTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/PartitionMetadataTest.java index 695c4c30d97c..5d8604d00af8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/PartitionMetadataTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/PartitionMetadataTest.java @@ -40,6 +40,7 @@ public class PartitionMetadataTest { private static final Timestamp SCHEDULED_AT = Timestamp.ofTimeSecondsAndNanos(5, 5); private static final Timestamp RUNNING_AT = Timestamp.ofTimeSecondsAndNanos(6, 6); private static final Timestamp FINISHED_AT = Timestamp.ofTimeSecondsAndNanos(7, 7); + private static final String TVF_NAME = ""; @Test public void testBuilderDefaultsToInclusiveStartAndExclusiveEnd() { @@ -55,7 +56,8 @@ public void testBuilderDefaultsToInclusiveStartAndExclusiveEnd() { CREATED_AT, SCHEDULED_AT, RUNNING_AT, - FINISHED_AT); + FINISHED_AT, + TVF_NAME); PartitionMetadata actualPartitionMetadata = PartitionMetadata.newBuilder() .setPartitionToken(PARTITION_TOKEN) @@ -69,6 +71,7 @@ public void testBuilderDefaultsToInclusiveStartAndExclusiveEnd() { .setScheduledAt(SCHEDULED_AT) .setRunningAt(RUNNING_AT) .setFinishedAt(FINISHED_AT) + .setTvfName(TVF_NAME) .build(); assertEquals(expectedPartitionMetadata.hashCode(), actualPartitionMetadata.hashCode()); @@ -90,7 +93,8 @@ public void testBuilderDefaultsToCommitTimestampWhenCreatedAtIsNotGiven() { Value.COMMIT_TIMESTAMP, SCHEDULED_AT, RUNNING_AT, - FINISHED_AT); + FINISHED_AT, + TVF_NAME); PartitionMetadata actualPartitionMetadata = PartitionMetadata.newBuilder() .setPartitionToken(PARTITION_TOKEN) @@ -103,6 +107,7 @@ public void testBuilderDefaultsToCommitTimestampWhenCreatedAtIsNotGiven() { .setScheduledAt(SCHEDULED_AT) .setRunningAt(RUNNING_AT) .setFinishedAt(FINISHED_AT) + .setTvfName(TVF_NAME) .build(); assertEquals(expectedPartitionMetadata, actualPartitionMetadata); } @@ -206,7 +211,8 @@ public void testGetters() { CREATED_AT, SCHEDULED_AT, RUNNING_AT, - FINISHED_AT); + FINISHED_AT, + TVF_NAME); assertEquals(PARTITION_TOKEN, partitionMetadata.getPartitionToken()); assertEquals(1, partitionMetadata.getParentTokens().size()); @@ -220,5 +226,6 @@ public void testGetters() { assertEquals(SCHEDULED_AT, partitionMetadata.getScheduledAt()); assertEquals(RUNNING_AT, partitionMetadata.getRunningAt()); assertEquals(FINISHED_AT, partitionMetadata.getFinishedAt()); + assertEquals(TVF_NAME, partitionMetadata.getTvfName()); } }