diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTags.java b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTags.java index 5c992f99c2e..df3eb214813 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTags.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTags.java @@ -334,7 +334,7 @@ public DataStreamsTags( } } - // hashable tags are 0-4 + // hashable tags are 0-6: bus, direction, exchange, topic, type, subscription, kafkaClusterId for (int i = 0; i < 7; i++) { String tag = this.tagByIndex(i); if (tag != null) { @@ -343,9 +343,9 @@ public DataStreamsTags( } } - // aggregation tags are 5-7 + // aggregation tags are 7-11: datasetName, datasetNamespace, isManual, group, consumerGroup this.aggregationHash = this.hash; - for (int i = 7; i < 10; i++) { + for (int i = 7; i < 12; i++) { String tag = this.tagByIndex(i); if (tag != null) { this.nonNullSize++; @@ -354,9 +354,9 @@ public DataStreamsTags( } } - // the rest are values + // values are 12-13: partition, hasRoutingKey this.completeHash = aggregationHash; - for (int i = 10; i < this.size(); i++) { + for (int i = 12; i < this.size(); i++) { String tag = this.tagByIndex(i); if (tag != null) { this.nonNullSize++; @@ -370,6 +370,8 @@ public int size() { return 14; } + // WARNING: DO NOT REORDER! Indices 0-6 are hash tags, 7-11 are aggregation tags, 12-13 are + // values. Reordering breaks hashing! public String tagByIndex(int index) { switch (index) { case 0: @@ -385,21 +387,21 @@ public String tagByIndex(int index) { case 5: return this.subscription; case 6: - return this.datasetName; + return this.kafkaClusterId; case 7: - return this.datasetNamespace; + return this.datasetName; case 8: - return this.isManual; + return this.datasetNamespace; case 9: - return this.group; + return this.isManual; case 10: - return this.consumerGroup; + return this.group; case 11: - return this.hasRoutingKey; + return this.consumerGroup; case 12: - return this.kafkaClusterId; - case 13: return this.partition; + case 13: + return this.hasRoutingKey; default: return null; } diff --git a/internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsTagsTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsTagsTest.groovy index 620c3ebabf4..aa26cb9ebbb 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsTagsTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsTagsTest.groovy @@ -119,4 +119,139 @@ class DataStreamsTagsTest extends Specification { five.hasAllTags("type:type", "direction:out", "topic:topic", "ds.name:dataset", "ds.namespace:namespace") six.hasAllTags("type:type", "direction:in", "subscription:subscription") } + + def 'test tagByIndex returns tags in correct order'() { + setup: + def tags = getTags(0) + + expect: "Hash tags (0-6): bus, direction, exchange, topic, type, subscription, kafkaClusterId" + tags.tagByIndex(0) == "bus:bus0" + tags.tagByIndex(1) == "direction:out" + tags.tagByIndex(2) == "exchange:exchange0" + tags.tagByIndex(3) == "topic:topic0" + tags.tagByIndex(4) == "type:type0" + tags.tagByIndex(5) == "subscription:subscription0" + tags.tagByIndex(6) == "kafka_cluster_id:kafka_cluster_id0" + + and: "Aggregation tags (7-11): datasetName, datasetNamespace, isManual, group, consumerGroup" + tags.tagByIndex(7) == "ds.name:dataset_name0" + tags.tagByIndex(8) == "ds.namespace:dataset_namespace0" + tags.tagByIndex(9) == "manual_checkpoint:true" + tags.tagByIndex(10) == "group:group0" + tags.tagByIndex(11) == "consumer_group:consumer_group0" + + and: "Values (12-13): partition, hasRoutingKey" + tags.tagByIndex(12) == "partition:partition0" + tags.tagByIndex(13) == "has_routing_key:true" + + and: "Out of bounds returns null" + tags.tagByIndex(14) == null + tags.tagByIndex(-1) == null + } + + def 'test only hash tags affect primary hash'() { + setup: "Create base tags with all hash tags (0-6) set" + def base = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic", + "type", "subscription", null, null, null, null, null, null, "cluster", null) + + when: "Change only aggregation tag (datasetName)" + def withDataset = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic", + "type", "subscription", "dataset", null, null, null, null, null, "cluster", null) + + then: "Primary hash should be the same (aggregation tag doesn't affect it)" + base.getHash() == withDataset.getHash() + + when: "Change only a value tag (partition)" + def withPartition = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic", + "type", "subscription", null, null, null, null, null, null, "cluster", "partition") + + then: "Primary hash should still be the same (value tag doesn't affect it)" + base.getHash() == withPartition.getHash() + + when: "Change a hash tag (topic)" + def withDifferentTopic = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic2", + "type", "subscription", null, null, null, null, null, null, "cluster", null) + + then: "Primary hash should be different" + base.getHash() != withDifferentTopic.getHash() + } + + def 'test aggregation tags affect aggregation hash but not primary hash'() { + setup: "Create base tags" + def base = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic", + "type", "subscription", null, null, null, null, null, null, "cluster", null) + + when: "Add aggregation tag (datasetName)" + def withDataset = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic", + "type", "subscription", "dataset", null, null, null, null, null, "cluster", null) + + then: "Primary hash is same, but aggregation hash is different" + base.getHash() == withDataset.getHash() + base.getAggregationHash() != withDataset.getAggregationHash() + + when: "Add different aggregation tag (group)" + def withGroup = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic", + "type", "subscription", null, null, null, "group", null, null, "cluster", null) + + then: "Primary hash is same, but aggregation hash is different" + base.getHash() == withGroup.getHash() + base.getAggregationHash() != withGroup.getAggregationHash() + } + + def 'test values affect only complete hash'() { + setup: "Create base tags" + def base = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic", + "type", "subscription", null, null, null, null, null, null, "cluster", null) + + when: "Add value tag (partition)" + def withPartition = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic", + "type", "subscription", null, null, null, null, null, null, "cluster", "partition") + + then: "Primary and aggregation hashes are same, complete hash is different (via equals)" + base.getHash() == withPartition.getHash() + base.getAggregationHash() == withPartition.getAggregationHash() + base != withPartition // equals uses completeHash + + when: "Add different value tag (hasRoutingKey)" + def withRoutingKey = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic", + "type", "subscription", null, null, null, null, null, true, "cluster", null) + + then: "Primary and aggregation hashes are same, but objects are different" + base.getHash() == withRoutingKey.getHash() + base.getAggregationHash() == withRoutingKey.getAggregationHash() + base != withRoutingKey + } + + def 'test all three hash levels are different when appropriate tags change'() { + setup: + def base = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, null, "topic", + "type", null, null, null, null, null, null, null, null, null) + + when: "Add hash tag -> all hashes change" + def withExchange = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic", + "type", null, null, null, null, null, null, null, null, null) + + then: + base.getHash() != withExchange.getHash() + base.getAggregationHash() != withExchange.getAggregationHash() + base != withExchange + + when: "Add aggregation tag -> only aggregation and complete hashes change" + def withDataset = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, null, "topic", + "type", null, "dataset", null, null, null, null, null, null, null) + + then: + base.getHash() == withDataset.getHash() // primary hash unchanged + base.getAggregationHash() != withDataset.getAggregationHash() + base != withDataset + + when: "Add value tag -> only complete hash changes" + def withPartition = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, null, "topic", + "type", null, null, null, null, null, null, null, null, "partition") + + then: + base.getHash() == withPartition.getHash() // primary hash unchanged + base.getAggregationHash() == withPartition.getAggregationHash() // aggregation hash unchanged + base != withPartition // but complete hash changed + } }