diff --git a/consumer_kafka_avro/pom.xml b/consumer_kafka_avro/pom.xml new file mode 100644 index 00000000..73081314 --- /dev/null +++ b/consumer_kafka_avro/pom.xml @@ -0,0 +1,172 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 4.0.1 + + + + org.springframework.cloud + consumer_kafka_avro + 5.0.2-SNAPSHOT + jar + + + 4.0.1 + 8.1.1 + 2025.1.0 + 1.20.4 + 1.12.0 + + + + + confluent + Confluent Maven Repository + https://packages.confluent.io/maven/ + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + spring-plugin-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + org.testcontainers + testcontainers-bom + 1.20.4 + pom + import + + + org.springframework.boot + spring-boot-starter-kafka + 4.0.1 + + + io.confluent + kafka-avro-serializer + ${kafka-avro-serializer.version} + + + + + + + org.apache.avro + avro + ${avro.version} + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-kafka + + + io.confluent + kafka-avro-serializer + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.cloud + spring-cloud-starter-contract-stub-runner + test + + + + org.testcontainers + testcontainers + test + + + org.testcontainers + junit-jupiter + test + + + org.testcontainers + kafka + test + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + 4.0.1 + + + + + + org.apache.avro + avro-maven-plugin + 1.11.3 + + + generate-sources + + schema + + + ${project.basedir}/src/main/resources/avro + ${project.build.directory}/generated-sources/avro + String + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/BooksReturnedListener.java b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/BooksReturnedListener.java new file mode 100644 index 00000000..d14896ed --- /dev/null +++ b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/BooksReturnedListener.java @@ -0,0 +1,30 @@ +package com.example.kafka.consumer; + +import com.example.kafka.avro.Book; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +class BooksReturnedListener { + + private final EmailService emailService; + + BooksReturnedListener(EmailService emailService) { + this.emailService = emailService; + } + + @KafkaListener(topics = "book.returned") + public void sendEmailOnBookReturned(Book book) { + String emailBody = """ + Dear User, + + The book you borrowed has been successfully returned: + Title: %s, Author: %s, ISBN: %s + + """.formatted(book.getTitle(), book.getAuthor(), book.getIsbn()); + + emailService.sendEmail(emailBody); + } + +} diff --git a/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/EmailService.java b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/EmailService.java new file mode 100644 index 00000000..5af1bd69 --- /dev/null +++ b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/EmailService.java @@ -0,0 +1,13 @@ +package com.example.kafka.consumer; + +import org.springframework.stereotype.Service; + +@Service +public class EmailService { + + public void sendEmail(String emailBody) { + // Simulate sending an email + System.out.println("Sending email:\n" + emailBody); + } + +} diff --git a/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/KafkaAvroConsumerApplication.java b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/KafkaAvroConsumerApplication.java new file mode 100644 index 00000000..b75faa5e --- /dev/null +++ b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/KafkaAvroConsumerApplication.java @@ -0,0 +1,13 @@ +package com.example.kafka.consumer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaAvroConsumerApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaAvroConsumerApplication.class, args); + } + +} diff --git a/consumer_kafka_avro/src/main/resources/application.yml b/consumer_kafka_avro/src/main/resources/application.yml new file mode 100644 index 00000000..a9ef5a72 --- /dev/null +++ b/consumer_kafka_avro/src/main/resources/application.yml @@ -0,0 +1,15 @@ +spring: + application-name: kafka-avro-consumer + kafka: + bootstrap-servers: localhost:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer + consumer: + group-id: kafka-avro-consumer-group + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer + properties: + specific.avro.reader: true + properties: + schema.registry.url: mock:// diff --git a/consumer_kafka_avro/src/main/resources/avro/Book.avsc b/consumer_kafka_avro/src/main/resources/avro/Book.avsc new file mode 100644 index 00000000..c118b8bf --- /dev/null +++ b/consumer_kafka_avro/src/main/resources/avro/Book.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "name": "Book", + "namespace": "com.example.kafka.avro", + "fields": [ + { + "name": "isbn", + "type": "string" + }, + { + "name": "title", + "type": "string" + }, + { + "name": "author", + "type": "string" + } + ] +} diff --git a/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/CollaborationTest.java b/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/CollaborationTest.java new file mode 100644 index 00000000..f2380af2 --- /dev/null +++ b/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/CollaborationTest.java @@ -0,0 +1,123 @@ +package com.example.kafka.consumer; + +import java.util.HashMap; +import java.util.Map; + +import com.example.kafka.avro.Book; + +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import wiremock.com.fasterxml.jackson.core.JsonProcessingException; +import wiremock.com.fasterxml.jackson.databind.json.JsonMapper; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.system.OutputCaptureExtension; +import org.springframework.cloud.contract.stubrunner.StubTrigger; +import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner; +import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties; +import org.springframework.cloud.contract.verifier.converter.YamlContract; +import org.springframework.cloud.contract.verifier.messaging.MessageVerifierSender; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; + +import static java.util.Collections.emptyMap; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.Mockito.verify; + +@Tag("kafka-avro") +@Testcontainers +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = { + CollaborationTest.TestConfig.class, KafkaAvroConsumerApplication.class }) +@AutoConfigureStubRunner(ids = "org.springframework.cloud:spring-cloud-contract-sample-kafka-avro-producer:+:stubs", stubsMode = StubRunnerProperties.StubsMode.LOCAL) +@ExtendWith(OutputCaptureExtension.class) +class CollaborationTest { + + @Autowired + StubTrigger trigger; + + @MockitoBean + EmailService emailService; + + @Container + static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka")); + + @DynamicPropertySource + static void kafkaProperties(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); + } + + @Test + void shouldSendEmail_onBookReturned() { + trigger.trigger("book_returned"); + + // @formatter:off + await().untilAsserted(() -> + verify(emailService).sendEmail( + contains("Title: Contract Testing for Dummies, Author: John Doe, ISBN: 978-1234567890"))); + // @formatter:om + } + + @Configuration + static class TestConfig { + + @Bean + MessageVerifierSender> standaloneMessageVerifier(KafkaTemplate kafkaTemplate) { + return new KafkaAvroMessageVerifierSender<>(kafkaTemplate); + } + + } + + static class KafkaAvroMessageVerifierSender implements MessageVerifierSender { + + private final KafkaTemplate kafkaTemplate; + + // TODO: should this be the default? + @Override + public void send(M message, String destination, @Nullable YamlContract contract) { + send(message, emptyMap(), destination, contract); + } + + @Override + public void send(T payload, Map headers, String destination, + @Nullable YamlContract contract) { + Map newHeaders = headers != null ? new HashMap<>(headers) : new HashMap<>(); + newHeaders.put(KafkaHeaders.TOPIC, destination); + MessageHeaders msgHeaders = new MessageHeaders(newHeaders); + + try { + // TODO: remove this workaround after merging: + // https://github.com/spring-cloud/spring-cloud-contract/issues/2404 + Book avroPayload = new JsonMapper().readValue(payload.toString(), Book.class); + var message = MessageBuilder.createMessage(avroPayload, msgHeaders); + kafkaTemplate.send(message); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + KafkaAvroMessageVerifierSender(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + } + +} diff --git a/pom.xml b/pom.xml index f893fb48..c7eb99d1 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ producer_with_latest_2_2_features producer_java + producer_kafka_avro producer_kafka_middleware producer_rabbit_middleware @@ -83,6 +84,7 @@ consumer_with_latest_2_2_features consumer_java + consumer_kafka_avro consumer_kafka_middleware consumer_rabbit_middleware @@ -135,6 +137,7 @@ producer_with_latest_2_2_features producer_java + producer_kafka_avro producer_kafka_middleware producer_rabbit_middleware @@ -150,6 +153,7 @@ consumer_with_latest_2_2_features consumer_java + consumer_kafka_avro consumer_kafka_middleware consumer_rabbit_middleware diff --git a/producer_kafka_avro/pom.xml b/producer_kafka_avro/pom.xml new file mode 100644 index 00000000..6351d21b --- /dev/null +++ b/producer_kafka_avro/pom.xml @@ -0,0 +1,181 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 4.0.1 + + + + org.springframework.cloud + producer_kafka_avro + 5.0.2-SNAPSHOT + jar + + + 4.0.1 + 8.1.1 + 2025.1.0 + 1.20.4 + 1.12.0 + + + + + confluent + Confluent Maven Repository + https://packages.confluent.io/maven/ + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + spring-plugin-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + org.testcontainers + testcontainers-bom + 1.20.4 + pom + import + + + org.springframework.boot + spring-boot-starter-kafka + 4.0.1 + + + io.confluent + kafka-avro-serializer + ${kafka-avro-serializer.version} + + + + + + + org.apache.avro + avro + ${avro.version} + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-kafka + + + io.confluent + kafka-avro-serializer + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.cloud + spring-cloud-contract-verifier + test + + + + org.testcontainers + testcontainers + test + + + org.testcontainers + junit-jupiter + test + + + org.testcontainers + kafka + test + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + 4.0.1 + + + + + + org.apache.avro + avro-maven-plugin + 1.11.3 + + + generate-sources + + schema + + + ${project.basedir}/src/main/resources/avro + ${project.build.directory}/generated-sources/avro + String + + + + + + org.springframework.cloud + spring-cloud-contract-maven-plugin + true + + JUNIT5 + com.example.kafka.producer.BaseClass + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/producer_kafka_avro/src/main/java/com/example/kafka/producer/BookService.java b/producer_kafka_avro/src/main/java/com/example/kafka/producer/BookService.java new file mode 100644 index 00000000..15dd4188 --- /dev/null +++ b/producer_kafka_avro/src/main/java/com/example/kafka/producer/BookService.java @@ -0,0 +1,40 @@ +package com.example.kafka.producer; + +import java.util.Map; + +import com.example.kafka.avro.Book; + +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +@Service +class BookService { + + private final KafkaTemplate kafkaTemplate; + + BookService(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + void bookReturned(String isbn, String title, String author) { + Book payload = Book.newBuilder().setIsbn(isbn).setTitle(title).setAuthor(author) + .build(); + + // @formatter:off + MessageHeaders headers = new MessageHeaders(Map.of( + KafkaHeaders.TOPIC, "book.returned", + "X-Correlation-Id", "abc-123-def", + "X-Source-System", "library-service", + "X-Event-Type", "BOOK_RETURNED" + )); + // @formatter:on + + Message msg = MessageBuilder.createMessage(payload, headers); + kafkaTemplate.send(msg); + } + +} diff --git a/producer_kafka_avro/src/main/java/com/example/kafka/producer/KafkaAvroProducerApplication.java b/producer_kafka_avro/src/main/java/com/example/kafka/producer/KafkaAvroProducerApplication.java new file mode 100644 index 00000000..89e1d0a9 --- /dev/null +++ b/producer_kafka_avro/src/main/java/com/example/kafka/producer/KafkaAvroProducerApplication.java @@ -0,0 +1,13 @@ +package com.example.kafka.producer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaAvroProducerApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaAvroProducerApplication.class, args); + } + +} diff --git a/producer_kafka_avro/src/main/resources/application.yml b/producer_kafka_avro/src/main/resources/application.yml new file mode 100644 index 00000000..dd333f05 --- /dev/null +++ b/producer_kafka_avro/src/main/resources/application.yml @@ -0,0 +1,15 @@ +spring: + application-name: kafka-avro-producer + kafka: + bootstrap-servers: localhost:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer + consumer: + group-id: kafka-avro-consumer-group + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer + properties: + specific.avro.reader: true + properties: + schema.registry.url: mock:// diff --git a/producer_kafka_avro/src/main/resources/avro/Book.avsc b/producer_kafka_avro/src/main/resources/avro/Book.avsc new file mode 100644 index 00000000..c118b8bf --- /dev/null +++ b/producer_kafka_avro/src/main/resources/avro/Book.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "name": "Book", + "namespace": "com.example.kafka.avro", + "fields": [ + { + "name": "isbn", + "type": "string" + }, + { + "name": "title", + "type": "string" + }, + { + "name": "author", + "type": "string" + } + ] +} diff --git a/producer_kafka_avro/src/test/java/com/example/kafka/producer/BaseClass.java b/producer_kafka_avro/src/test/java/com/example/kafka/producer/BaseClass.java new file mode 100644 index 00000000..0a3f535e --- /dev/null +++ b/producer_kafka_avro/src/test/java/com/example/kafka/producer/BaseClass.java @@ -0,0 +1,128 @@ +package com.example.kafka.producer; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Tag; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import tools.jackson.databind.json.JsonMapper; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.contract.verifier.converter.YamlContract; +import org.springframework.cloud.contract.verifier.messaging.MessageVerifierReceiver; +import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier; +import org.springframework.cloud.contract.verifier.messaging.internal.ContractVerifierObjectMapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.JsonKafkaHeaderMapper; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; + +@Tag("kafka-avro") +@Testcontainers +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = { + KafkaAvroProducerApplication.class, BaseClass.TestConfig.class }) +@AutoConfigureMessageVerifier +@ActiveProfiles("contracts") +class BaseClass { + + @Autowired + private BookService bookService; + + @Container + static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka")); + + @DynamicPropertySource + static void kafkaProperties(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); + } + + public void publishBookReturned() { + bookService.bookReturned("978-1234567890", "Contract Testing for Dummies", + "John Doe"); + } + + @Configuration + static class TestConfig { + + @Bean + KafkaMessageVerifier kafkaTemplateMessageVerifier() { + return new KafkaMessageVerifier(); + } + + // TODO: remove this workaround after merging: + // https://github.com/spring-cloud/spring-cloud-contract/pull/2405 + @Bean + @Primary + ContractVerifierObjectMapper contractVerifierObjectMapper() { + var json = JsonMapper.builder() + .addMixIn(SpecificRecordBase.class, AvroIgnoreMixin.class).build(); + return new ContractVerifierObjectMapper(json); + } + + @JsonIgnoreProperties({ "schema", "specificData", "classSchema", "conversion" }) + interface AvroIgnoreMixin { + + } + + } + + static class KafkaMessageVerifier implements MessageVerifierReceiver> { + + private final Map>> broker = new ConcurrentHashMap<>(); + + @Override + public Message receive(String destination, long timeout, TimeUnit timeUnit, + YamlContract contract) { + try { + broker.putIfAbsent(destination, new ArrayBlockingQueue<>(1)); + var messageQueue = broker.get(destination); + return messageQueue.poll(timeout, timeUnit); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @KafkaListener(topics = { "book.returned" }) + public void listen(ConsumerRecord payload, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + Map headers = new HashMap<>(); + new JsonKafkaHeaderMapper().toHeaders(payload.headers(), headers); + + broker.putIfAbsent(topic, new ArrayBlockingQueue<>(1)); + var messageQueue = broker.get(topic); + messageQueue.add(MessageBuilder.createMessage(payload.value(), + new MessageHeaders(headers))); + } + + @Override + public Message receive(String destination, YamlContract contract) { + return receive(destination, 15, TimeUnit.SECONDS, contract); + } + + } + +} diff --git a/producer_kafka_avro/src/test/resources/contracts/bookReturned.groovy b/producer_kafka_avro/src/test/resources/contracts/bookReturned.groovy new file mode 100644 index 00000000..ac6aa458 --- /dev/null +++ b/producer_kafka_avro/src/test/resources/contracts/bookReturned.groovy @@ -0,0 +1,38 @@ +/* + * Copyright 2013-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.springframework.cloud.contract.spec.Contract + +Contract.make { + description 'Should publish a book returned event to Kafka' + label 'book_returned' + input { + triggeredBy('publishBookReturned()') + } + outputMessage { + sentTo('book.returned') + headers { + header('X-Correlation-Id', 'abc-123-def') + header('X-Source-System', 'library-service') + header('X-Event-Type', 'BOOK_RETURNED') + } + body( + isbn: '978-1234567890', + title: 'Contract Testing for Dummies', + author: 'John Doe' + ) + } +}