Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public DataStreamsTags(
}
}

// hashable tags are 0-4
// hashable tags are 0-6: bus, direction, exchange, topic, type, subscription, kafkaClusterId
for (int i = 0; i < 7; i++) {
String tag = this.tagByIndex(i);
if (tag != null) {
Expand All @@ -343,9 +343,9 @@ public DataStreamsTags(
}
}

// aggregation tags are 5-7
// aggregation tags are 7-11: datasetName, datasetNamespace, isManual, group, consumerGroup
this.aggregationHash = this.hash;
for (int i = 7; i < 10; i++) {
for (int i = 7; i < 12; i++) {
String tag = this.tagByIndex(i);
if (tag != null) {
this.nonNullSize++;
Expand All @@ -354,9 +354,9 @@ public DataStreamsTags(
}
}

// the rest are values
// values are 12-13: partition, hasRoutingKey
this.completeHash = aggregationHash;
for (int i = 10; i < this.size(); i++) {
for (int i = 12; i < this.size(); i++) {
String tag = this.tagByIndex(i);
if (tag != null) {
this.nonNullSize++;
Expand All @@ -370,6 +370,8 @@ public int size() {
return 14;
}

// WARNING: DO NOT REORDER! Indices 0-6 are hash tags, 7-11 are aggregation tags, 12-13 are
// values. Reordering breaks hashing!
public String tagByIndex(int index) {
switch (index) {
case 0:
Expand All @@ -385,21 +387,21 @@ public String tagByIndex(int index) {
case 5:
return this.subscription;
case 6:
return this.datasetName;
return this.kafkaClusterId;
case 7:
return this.datasetNamespace;
return this.datasetName;
case 8:
return this.isManual;
return this.datasetNamespace;
case 9:
return this.group;
return this.isManual;
case 10:
return this.consumerGroup;
return this.group;
case 11:
return this.hasRoutingKey;
return this.consumerGroup;
case 12:
return this.kafkaClusterId;
case 13:
return this.partition;
case 13:
return this.hasRoutingKey;
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,139 @@ class DataStreamsTagsTest extends Specification {
five.hasAllTags("type:type", "direction:out", "topic:topic", "ds.name:dataset", "ds.namespace:namespace")
six.hasAllTags("type:type", "direction:in", "subscription:subscription")
}

def 'test tagByIndex returns tags in correct order'() {
setup:
def tags = getTags(0)

expect: "Hash tags (0-6): bus, direction, exchange, topic, type, subscription, kafkaClusterId"
tags.tagByIndex(0) == "bus:bus0"
tags.tagByIndex(1) == "direction:out"
tags.tagByIndex(2) == "exchange:exchange0"
tags.tagByIndex(3) == "topic:topic0"
tags.tagByIndex(4) == "type:type0"
tags.tagByIndex(5) == "subscription:subscription0"
tags.tagByIndex(6) == "kafka_cluster_id:kafka_cluster_id0"

and: "Aggregation tags (7-11): datasetName, datasetNamespace, isManual, group, consumerGroup"
tags.tagByIndex(7) == "ds.name:dataset_name0"
tags.tagByIndex(8) == "ds.namespace:dataset_namespace0"
tags.tagByIndex(9) == "manual_checkpoint:true"
tags.tagByIndex(10) == "group:group0"
tags.tagByIndex(11) == "consumer_group:consumer_group0"

and: "Values (12-13): partition, hasRoutingKey"
tags.tagByIndex(12) == "partition:partition0"
tags.tagByIndex(13) == "has_routing_key:true"

and: "Out of bounds returns null"
tags.tagByIndex(14) == null
tags.tagByIndex(-1) == null
}

def 'test only hash tags affect primary hash'() {
setup: "Create base tags with all hash tags (0-6) set"
def base = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
"type", "subscription", null, null, null, null, null, null, "cluster", null)

when: "Change only aggregation tag (datasetName)"
def withDataset = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
"type", "subscription", "dataset", null, null, null, null, null, "cluster", null)

then: "Primary hash should be the same (aggregation tag doesn't affect it)"
base.getHash() == withDataset.getHash()

when: "Change only a value tag (partition)"
def withPartition = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
"type", "subscription", null, null, null, null, null, null, "cluster", "partition")

then: "Primary hash should still be the same (value tag doesn't affect it)"
base.getHash() == withPartition.getHash()

when: "Change a hash tag (topic)"
def withDifferentTopic = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic2",
"type", "subscription", null, null, null, null, null, null, "cluster", null)

then: "Primary hash should be different"
base.getHash() != withDifferentTopic.getHash()
}

def 'test aggregation tags affect aggregation hash but not primary hash'() {
setup: "Create base tags"
def base = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
"type", "subscription", null, null, null, null, null, null, "cluster", null)

when: "Add aggregation tag (datasetName)"
def withDataset = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
"type", "subscription", "dataset", null, null, null, null, null, "cluster", null)

then: "Primary hash is same, but aggregation hash is different"
base.getHash() == withDataset.getHash()
base.getAggregationHash() != withDataset.getAggregationHash()

when: "Add different aggregation tag (group)"
def withGroup = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
"type", "subscription", null, null, null, "group", null, null, "cluster", null)

then: "Primary hash is same, but aggregation hash is different"
base.getHash() == withGroup.getHash()
base.getAggregationHash() != withGroup.getAggregationHash()
}

def 'test values affect only complete hash'() {
setup: "Create base tags"
def base = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
"type", "subscription", null, null, null, null, null, null, "cluster", null)

when: "Add value tag (partition)"
def withPartition = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
"type", "subscription", null, null, null, null, null, null, "cluster", "partition")

then: "Primary and aggregation hashes are same, complete hash is different (via equals)"
base.getHash() == withPartition.getHash()
base.getAggregationHash() == withPartition.getAggregationHash()
base != withPartition // equals uses completeHash

when: "Add different value tag (hasRoutingKey)"
def withRoutingKey = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
"type", "subscription", null, null, null, null, null, true, "cluster", null)

then: "Primary and aggregation hashes are same, but objects are different"
base.getHash() == withRoutingKey.getHash()
base.getAggregationHash() == withRoutingKey.getAggregationHash()
base != withRoutingKey
}

def 'test all three hash levels are different when appropriate tags change'() {
setup:
def base = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, null, "topic",
"type", null, null, null, null, null, null, null, null, null)

when: "Add hash tag -> all hashes change"
def withExchange = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic",
"type", null, null, null, null, null, null, null, null, null)

then:
base.getHash() != withExchange.getHash()
base.getAggregationHash() != withExchange.getAggregationHash()
base != withExchange

when: "Add aggregation tag -> only aggregation and complete hashes change"
def withDataset = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, null, "topic",
"type", null, "dataset", null, null, null, null, null, null, null)

then:
base.getHash() == withDataset.getHash() // primary hash unchanged
base.getAggregationHash() != withDataset.getAggregationHash()
base != withDataset

when: "Add value tag -> only complete hash changes"
def withPartition = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, null, "topic",
"type", null, null, null, null, null, null, null, null, "partition")

then:
base.getHash() == withPartition.getHash() // primary hash unchanged
base.getAggregationHash() == withPartition.getAggregationHash() // aggregation hash unchanged
base != withPartition // but complete hash changed
}
}