Skip to content

KAFKA-20399: Implement VersionedKeyValueStoreWithHeaders#21995

Open
Shekharrajak wants to merge 1 commit intoapache:trunkfrom
Shekharrajak:KAFKA-20399-versioned-store-with-headers
Open

KAFKA-20399: Implement VersionedKeyValueStoreWithHeaders#21995
Shekharrajak wants to merge 1 commit intoapache:trunkfrom
Shekharrajak:KAFKA-20399-versioned-store-with-headers

Conversation

@Shekharrajak
Copy link
Copy Markdown

@Shekharrajak Shekharrajak commented Apr 8, 2026

Ref https://issues.apache.org/jira/browse/KAFKA-20399

This PR implements VersionedKeyValueStoreWithHeaders as part of KIP-1271. It adds header support to versioned key-value stores, completing the last missing store type in the KIP.

VersionedKeyValueStoreWithHeaders<K, V> -- new interface extending VersionedKeyValueStore<K, V> with put(K key, V value, long timestamp, Headers headers)

@github-actions github-actions bot added triage PRs from the community streams labels Apr 8, 2026
Copy link
Copy Markdown
Contributor

@UladzislauBlok UladzislauBlok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vibe code

* in {@link VersionedKeyValueStore#put(Object, Object, long)}.
* @throws NullPointerException if {@code headers} is {@code null}
*/
long put(K key, V value, long timestamp, Headers headers);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do add this method? for other store types we support existing api and value brings headers under the hood

Comment on lines +74 to +84
private static long defaultSegmentInterval(final long historyRetentionMs) {
if (historyRetentionMs <= 60_000L) {
return Math.max(historyRetentionMs / 3, 2_000L);
} else if (historyRetentionMs <= 300_000L) {
return Math.max(historyRetentionMs / 5, 20_000L);
} else if (historyRetentionMs <= 3600_000L) {
return Math.max(historyRetentionMs / 12, 60_000L);
} else {
return Math.max(historyRetentionMs / 24, 300_000L);
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?

Comment on lines +37 to +39
default long put(final Bytes key, final byte[] value, final long timestamp, final Headers headers) {
return put(key, value, timestamp);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

private final V value;
private final long timestamp;
private final Optional<Long> validTo;
private final Headers headers;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we changed original record instead of creating new one. see AggregationWithHeaders / ValueTimestampHeaders

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants