diff --git a/consumer_kafka_avro/pom.xml b/consumer_kafka_avro/pom.xml
new file mode 100644
index 00000000..73081314
--- /dev/null
+++ b/consumer_kafka_avro/pom.xml
@@ -0,0 +1,172 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 4.0.1
+
+
+
+ org.springframework.cloud
+ consumer_kafka_avro
+ 5.0.2-SNAPSHOT
+ jar
+
+
+ 4.0.1
+ 8.1.1
+ 2025.1.0
+ 1.20.4
+ 1.12.0
+
+
+
+
+ confluent
+ Confluent Maven Repository
+ https://packages.confluent.io/maven/
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+ spring-plugin-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+ org.testcontainers
+ testcontainers-bom
+ 1.20.4
+ pom
+ import
+
+
+ org.springframework.boot
+ spring-boot-starter-kafka
+ 4.0.1
+
+
+ io.confluent
+ kafka-avro-serializer
+ ${kafka-avro-serializer.version}
+
+
+
+
+
+
+ org.apache.avro
+ avro
+ ${avro.version}
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-kafka
+
+
+ io.confluent
+ kafka-avro-serializer
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.cloud
+ spring-cloud-starter-contract-stub-runner
+ test
+
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+ org.testcontainers
+ kafka
+ test
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 4.0.1
+
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ 1.11.3
+
+
+ generate-sources
+
+ schema
+
+
+ ${project.basedir}/src/main/resources/avro
+ ${project.build.directory}/generated-sources/avro
+ String
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
diff --git a/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/BooksReturnedListener.java b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/BooksReturnedListener.java
new file mode 100644
index 00000000..d14896ed
--- /dev/null
+++ b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/BooksReturnedListener.java
@@ -0,0 +1,30 @@
+package com.example.kafka.consumer;
+
+import com.example.kafka.avro.Book;
+
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Component
+class BooksReturnedListener {
+
+ private final EmailService emailService;
+
+ BooksReturnedListener(EmailService emailService) {
+ this.emailService = emailService;
+ }
+
+ @KafkaListener(topics = "book.returned")
+ public void sendEmailOnBookReturned(Book book) {
+ String emailBody = """
+ Dear User,
+
+ The book you borrowed has been successfully returned:
+ Title: %s, Author: %s, ISBN: %s
+
+ """.formatted(book.getTitle(), book.getAuthor(), book.getIsbn());
+
+ emailService.sendEmail(emailBody);
+ }
+
+}
diff --git a/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/EmailService.java b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/EmailService.java
new file mode 100644
index 00000000..5af1bd69
--- /dev/null
+++ b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/EmailService.java
@@ -0,0 +1,13 @@
+package com.example.kafka.consumer;
+
+import org.springframework.stereotype.Service;
+
+@Service
+public class EmailService {
+
+ public void sendEmail(String emailBody) {
+ // Simulate sending an email
+ System.out.println("Sending email:\n" + emailBody);
+ }
+
+}
diff --git a/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/KafkaAvroConsumerApplication.java b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/KafkaAvroConsumerApplication.java
new file mode 100644
index 00000000..b75faa5e
--- /dev/null
+++ b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/KafkaAvroConsumerApplication.java
@@ -0,0 +1,13 @@
+package com.example.kafka.consumer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaAvroConsumerApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaAvroConsumerApplication.class, args);
+ }
+
+}
diff --git a/consumer_kafka_avro/src/main/resources/application.yml b/consumer_kafka_avro/src/main/resources/application.yml
new file mode 100644
index 00000000..a9ef5a72
--- /dev/null
+++ b/consumer_kafka_avro/src/main/resources/application.yml
@@ -0,0 +1,15 @@
+spring:
+ application-name: kafka-avro-consumer
+ kafka:
+ bootstrap-servers: localhost:9092
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
+ consumer:
+ group-id: kafka-avro-consumer-group
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
+ properties:
+ specific.avro.reader: true
+ properties:
+ schema.registry.url: mock://
diff --git a/consumer_kafka_avro/src/main/resources/avro/Book.avsc b/consumer_kafka_avro/src/main/resources/avro/Book.avsc
new file mode 100644
index 00000000..c118b8bf
--- /dev/null
+++ b/consumer_kafka_avro/src/main/resources/avro/Book.avsc
@@ -0,0 +1,19 @@
+{
+ "type": "record",
+ "name": "Book",
+ "namespace": "com.example.kafka.avro",
+ "fields": [
+ {
+ "name": "isbn",
+ "type": "string"
+ },
+ {
+ "name": "title",
+ "type": "string"
+ },
+ {
+ "name": "author",
+ "type": "string"
+ }
+ ]
+}
diff --git a/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/CollaborationTest.java b/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/CollaborationTest.java
new file mode 100644
index 00000000..f2380af2
--- /dev/null
+++ b/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/CollaborationTest.java
@@ -0,0 +1,123 @@
+package com.example.kafka.consumer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.example.kafka.avro.Book;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.kafka.ConfluentKafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import wiremock.com.fasterxml.jackson.core.JsonProcessingException;
+import wiremock.com.fasterxml.jackson.databind.json.JsonMapper;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.system.OutputCaptureExtension;
+import org.springframework.cloud.contract.stubrunner.StubTrigger;
+import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
+import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
+import org.springframework.cloud.contract.verifier.converter.YamlContract;
+import org.springframework.cloud.contract.verifier.messaging.MessageVerifierSender;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.springframework.test.context.bean.override.mockito.MockitoBean;
+
+import static java.util.Collections.emptyMap;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.Mockito.verify;
+
+@Tag("kafka-avro")
+@Testcontainers
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {
+ CollaborationTest.TestConfig.class, KafkaAvroConsumerApplication.class })
+@AutoConfigureStubRunner(ids = "org.springframework.cloud:spring-cloud-contract-sample-kafka-avro-producer:+:stubs", stubsMode = StubRunnerProperties.StubsMode.LOCAL)
+@ExtendWith(OutputCaptureExtension.class)
+class CollaborationTest {
+
+ @Autowired
+ StubTrigger trigger;
+
+ @MockitoBean
+ EmailService emailService;
+
+ @Container
+ static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(
+ DockerImageName.parse("confluentinc/cp-kafka"));
+
+ @DynamicPropertySource
+ static void kafkaProperties(DynamicPropertyRegistry registry) {
+ registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
+ }
+
+ @Test
+ void shouldSendEmail_onBookReturned() {
+ trigger.trigger("book_returned");
+
+ // @formatter:off
+ await().untilAsserted(() ->
+ verify(emailService).sendEmail(
+ contains("Title: Contract Testing for Dummies, Author: John Doe, ISBN: 978-1234567890")));
+ // @formatter:om
+ }
+
+ @Configuration
+ static class TestConfig {
+
+ @Bean
+ MessageVerifierSender> standaloneMessageVerifier(KafkaTemplate kafkaTemplate) {
+ return new KafkaAvroMessageVerifierSender<>(kafkaTemplate);
+ }
+
+ }
+
+ static class KafkaAvroMessageVerifierSender implements MessageVerifierSender {
+
+ private final KafkaTemplate kafkaTemplate;
+
+ // TODO: should this be the default?
+ @Override
+ public void send(M message, String destination, @Nullable YamlContract contract) {
+ send(message, emptyMap(), destination, contract);
+ }
+
+ @Override
+ public void send(T payload, Map headers, String destination,
+ @Nullable YamlContract contract) {
+ Map newHeaders = headers != null ? new HashMap<>(headers) : new HashMap<>();
+ newHeaders.put(KafkaHeaders.TOPIC, destination);
+ MessageHeaders msgHeaders = new MessageHeaders(newHeaders);
+
+ try {
+ // TODO: remove this workaround after merging:
+ // https://github.com/spring-cloud/spring-cloud-contract/issues/2404
+ Book avroPayload = new JsonMapper().readValue(payload.toString(), Book.class);
+ var message = MessageBuilder.createMessage(avroPayload, msgHeaders);
+ kafkaTemplate.send(message);
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ KafkaAvroMessageVerifierSender(KafkaTemplate kafkaTemplate) {
+ this.kafkaTemplate = kafkaTemplate;
+ }
+
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index f893fb48..c7eb99d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@
producer_with_latest_2_2_features
producer_java
+ producer_kafka_avro
producer_kafka_middleware
producer_rabbit_middleware
@@ -83,6 +84,7 @@
consumer_with_latest_2_2_features
consumer_java
+ consumer_kafka_avro
consumer_kafka_middleware
consumer_rabbit_middleware
@@ -135,6 +137,7 @@
producer_with_latest_2_2_features
producer_java
+ producer_kafka_avro
producer_kafka_middleware
producer_rabbit_middleware
@@ -150,6 +153,7 @@
consumer_with_latest_2_2_features
consumer_java
+ consumer_kafka_avro
consumer_kafka_middleware
consumer_rabbit_middleware
diff --git a/producer_kafka_avro/pom.xml b/producer_kafka_avro/pom.xml
new file mode 100644
index 00000000..6351d21b
--- /dev/null
+++ b/producer_kafka_avro/pom.xml
@@ -0,0 +1,181 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 4.0.1
+
+
+
+ org.springframework.cloud
+ producer_kafka_avro
+ 5.0.2-SNAPSHOT
+ jar
+
+
+ 4.0.1
+ 8.1.1
+ 2025.1.0
+ 1.20.4
+ 1.12.0
+
+
+
+
+ confluent
+ Confluent Maven Repository
+ https://packages.confluent.io/maven/
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+ spring-plugin-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+ org.testcontainers
+ testcontainers-bom
+ 1.20.4
+ pom
+ import
+
+
+ org.springframework.boot
+ spring-boot-starter-kafka
+ 4.0.1
+
+
+ io.confluent
+ kafka-avro-serializer
+ ${kafka-avro-serializer.version}
+
+
+
+
+
+
+ org.apache.avro
+ avro
+ ${avro.version}
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-kafka
+
+
+ io.confluent
+ kafka-avro-serializer
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.cloud
+ spring-cloud-contract-verifier
+ test
+
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+ org.testcontainers
+ kafka
+ test
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 4.0.1
+
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ 1.11.3
+
+
+ generate-sources
+
+ schema
+
+
+ ${project.basedir}/src/main/resources/avro
+ ${project.build.directory}/generated-sources/avro
+ String
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-contract-maven-plugin
+ true
+
+ JUNIT5
+ com.example.kafka.producer.BaseClass
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
diff --git a/producer_kafka_avro/src/main/java/com/example/kafka/producer/BookService.java b/producer_kafka_avro/src/main/java/com/example/kafka/producer/BookService.java
new file mode 100644
index 00000000..15dd4188
--- /dev/null
+++ b/producer_kafka_avro/src/main/java/com/example/kafka/producer/BookService.java
@@ -0,0 +1,40 @@
+package com.example.kafka.producer;
+
+import java.util.Map;
+
+import com.example.kafka.avro.Book;
+
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Service;
+
+@Service
+class BookService {
+
+ private final KafkaTemplate kafkaTemplate;
+
+ BookService(KafkaTemplate kafkaTemplate) {
+ this.kafkaTemplate = kafkaTemplate;
+ }
+
+ void bookReturned(String isbn, String title, String author) {
+ Book payload = Book.newBuilder().setIsbn(isbn).setTitle(title).setAuthor(author)
+ .build();
+
+ // @formatter:off
+ MessageHeaders headers = new MessageHeaders(Map.of(
+ KafkaHeaders.TOPIC, "book.returned",
+ "X-Correlation-Id", "abc-123-def",
+ "X-Source-System", "library-service",
+ "X-Event-Type", "BOOK_RETURNED"
+ ));
+ // @formatter:on
+
+ Message msg = MessageBuilder.createMessage(payload, headers);
+ kafkaTemplate.send(msg);
+ }
+
+}
diff --git a/producer_kafka_avro/src/main/java/com/example/kafka/producer/KafkaAvroProducerApplication.java b/producer_kafka_avro/src/main/java/com/example/kafka/producer/KafkaAvroProducerApplication.java
new file mode 100644
index 00000000..89e1d0a9
--- /dev/null
+++ b/producer_kafka_avro/src/main/java/com/example/kafka/producer/KafkaAvroProducerApplication.java
@@ -0,0 +1,13 @@
+package com.example.kafka.producer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaAvroProducerApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaAvroProducerApplication.class, args);
+ }
+
+}
diff --git a/producer_kafka_avro/src/main/resources/application.yml b/producer_kafka_avro/src/main/resources/application.yml
new file mode 100644
index 00000000..dd333f05
--- /dev/null
+++ b/producer_kafka_avro/src/main/resources/application.yml
@@ -0,0 +1,15 @@
+spring:
+ application-name: kafka-avro-producer
+ kafka:
+ bootstrap-servers: localhost:9092
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
+ consumer:
+ group-id: kafka-avro-consumer-group
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
+ properties:
+ specific.avro.reader: true
+ properties:
+ schema.registry.url: mock://
diff --git a/producer_kafka_avro/src/main/resources/avro/Book.avsc b/producer_kafka_avro/src/main/resources/avro/Book.avsc
new file mode 100644
index 00000000..c118b8bf
--- /dev/null
+++ b/producer_kafka_avro/src/main/resources/avro/Book.avsc
@@ -0,0 +1,19 @@
+{
+ "type": "record",
+ "name": "Book",
+ "namespace": "com.example.kafka.avro",
+ "fields": [
+ {
+ "name": "isbn",
+ "type": "string"
+ },
+ {
+ "name": "title",
+ "type": "string"
+ },
+ {
+ "name": "author",
+ "type": "string"
+ }
+ ]
+}
diff --git a/producer_kafka_avro/src/test/java/com/example/kafka/producer/BaseClass.java b/producer_kafka_avro/src/test/java/com/example/kafka/producer/BaseClass.java
new file mode 100644
index 00000000..0a3f535e
--- /dev/null
+++ b/producer_kafka_avro/src/test/java/com/example/kafka/producer/BaseClass.java
@@ -0,0 +1,128 @@
+package com.example.kafka.producer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.Tag;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.kafka.ConfluentKafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import tools.jackson.databind.json.JsonMapper;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.contract.verifier.converter.YamlContract;
+import org.springframework.cloud.contract.verifier.messaging.MessageVerifierReceiver;
+import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier;
+import org.springframework.cloud.contract.verifier.messaging.internal.ContractVerifierObjectMapper;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.JsonKafkaHeaderMapper;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+
+@Tag("kafka-avro")
+@Testcontainers
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = {
+ KafkaAvroProducerApplication.class, BaseClass.TestConfig.class })
+@AutoConfigureMessageVerifier
+@ActiveProfiles("contracts")
+class BaseClass {
+
+ @Autowired
+ private BookService bookService;
+
+ @Container
+ static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(
+ DockerImageName.parse("confluentinc/cp-kafka"));
+
+ @DynamicPropertySource
+ static void kafkaProperties(DynamicPropertyRegistry registry) {
+ registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
+ }
+
+ public void publishBookReturned() {
+ bookService.bookReturned("978-1234567890", "Contract Testing for Dummies",
+ "John Doe");
+ }
+
+ @Configuration
+ static class TestConfig {
+
+ @Bean
+ KafkaMessageVerifier kafkaTemplateMessageVerifier() {
+ return new KafkaMessageVerifier();
+ }
+
+ // TODO: remove this workaround after merging:
+ // https://github.com/spring-cloud/spring-cloud-contract/pull/2405
+ @Bean
+ @Primary
+ ContractVerifierObjectMapper contractVerifierObjectMapper() {
+ var json = JsonMapper.builder()
+ .addMixIn(SpecificRecordBase.class, AvroIgnoreMixin.class).build();
+ return new ContractVerifierObjectMapper(json);
+ }
+
+ @JsonIgnoreProperties({ "schema", "specificData", "classSchema", "conversion" })
+ interface AvroIgnoreMixin {
+
+ }
+
+ }
+
+ static class KafkaMessageVerifier implements MessageVerifierReceiver> {
+
+ private final Map>> broker = new ConcurrentHashMap<>();
+
+ @Override
+ public Message> receive(String destination, long timeout, TimeUnit timeUnit,
+ YamlContract contract) {
+ try {
+ broker.putIfAbsent(destination, new ArrayBlockingQueue<>(1));
+ var messageQueue = broker.get(destination);
+ return messageQueue.poll(timeout, timeUnit);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @KafkaListener(topics = { "book.returned" })
+ public void listen(ConsumerRecord, ?> payload,
+ @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
+ Map headers = new HashMap<>();
+ new JsonKafkaHeaderMapper().toHeaders(payload.headers(), headers);
+
+ broker.putIfAbsent(topic, new ArrayBlockingQueue<>(1));
+ var messageQueue = broker.get(topic);
+ messageQueue.add(MessageBuilder.createMessage(payload.value(),
+ new MessageHeaders(headers)));
+ }
+
+ @Override
+ public Message receive(String destination, YamlContract contract) {
+ return receive(destination, 15, TimeUnit.SECONDS, contract);
+ }
+
+ }
+
+}
diff --git a/producer_kafka_avro/src/test/resources/contracts/bookReturned.groovy b/producer_kafka_avro/src/test/resources/contracts/bookReturned.groovy
new file mode 100644
index 00000000..ac6aa458
--- /dev/null
+++ b/producer_kafka_avro/src/test/resources/contracts/bookReturned.groovy
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2013-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.springframework.cloud.contract.spec.Contract
+
+Contract.make {
+ description 'Should publish a book returned event to Kafka'
+ label 'book_returned'
+ input {
+ triggeredBy('publishBookReturned()')
+ }
+ outputMessage {
+ sentTo('book.returned')
+ headers {
+ header('X-Correlation-Id', 'abc-123-def')
+ header('X-Source-System', 'library-service')
+ header('X-Event-Type', 'BOOK_RETURNED')
+ }
+ body(
+ isbn: '978-1234567890',
+ title: 'Contract Testing for Dummies',
+ author: 'John Doe'
+ )
+ }
+}