Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions consumer_kafka_avro/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>4.0.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>org.springframework.cloud</groupId>
<artifactId>consumer_kafka_avro</artifactId>
<version>5.0.2-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
<spring-boot.version>4.0.1</spring-boot.version>
<kafka-avro-serializer.version>8.1.1</kafka-avro-serializer.version>
<spring-cloud.version>2025.1.0</spring-cloud.version>
<testcontainers.version>1.20.4</testcontainers.version>
<avro.version>1.12.0</avro.version>
</properties>

<repositories>
<repository>
<id>confluent</id>
<name>Confluent Maven Repository</name>
<url>https://packages.confluent.io/maven/</url>
</repository>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-plugin-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.20.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka-avro-serializer.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-contract-stub-runner</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>4.0.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-sources/avro</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.example.kafka.consumer;

import com.example.kafka.avro.Book;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
class BooksReturnedListener {

private final EmailService emailService;

BooksReturnedListener(EmailService emailService) {
this.emailService = emailService;
}

@KafkaListener(topics = "book.returned")
public void sendEmailOnBookReturned(Book book) {
String emailBody = """
Dear User,

The book you borrowed has been successfully returned:
Title: %s, Author: %s, ISBN: %s

""".formatted(book.getTitle(), book.getAuthor(), book.getIsbn());

emailService.sendEmail(emailBody);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.example.kafka.consumer;

import org.springframework.stereotype.Service;

@Service
public class EmailService {

public void sendEmail(String emailBody) {
// Simulate sending an email
System.out.println("Sending email:\n" + emailBody);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.example.kafka.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaAvroConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaAvroConsumerApplication.class, args);
}

}
15 changes: 15 additions & 0 deletions consumer_kafka_avro/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
spring:
application-name: kafka-avro-consumer
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
group-id: kafka-avro-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
specific.avro.reader: true
properties:
schema.registry.url: mock://
19 changes: 19 additions & 0 deletions consumer_kafka_avro/src/main/resources/avro/Book.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"type": "record",
"name": "Book",
"namespace": "com.example.kafka.avro",
"fields": [
{
"name": "isbn",
"type": "string"
},
{
"name": "title",
"type": "string"
},
{
"name": "author",
"type": "string"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.example.kafka.consumer;

import java.util.HashMap;
import java.util.Map;

import com.example.kafka.avro.Book;

import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;

import wiremock.com.fasterxml.jackson.core.JsonProcessingException;
import wiremock.com.fasterxml.jackson.databind.json.JsonMapper;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.contract.stubrunner.StubTrigger;
import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
import org.springframework.cloud.contract.verifier.converter.YamlContract;
import org.springframework.cloud.contract.verifier.messaging.MessageVerifierSender;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.context.bean.override.mockito.MockitoBean;

import static java.util.Collections.emptyMap;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.Mockito.verify;

@Tag("kafka-avro")
@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {
CollaborationTest.TestConfig.class, KafkaAvroConsumerApplication.class })
@AutoConfigureStubRunner(ids = "org.springframework.cloud:spring-cloud-contract-sample-kafka-avro-producer:+:stubs", stubsMode = StubRunnerProperties.StubsMode.LOCAL)
@ExtendWith(OutputCaptureExtension.class)
class CollaborationTest {

@Autowired
StubTrigger trigger;

@MockitoBean
EmailService emailService;

@Container
static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka"));

@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}

@Test
void shouldSendEmail_onBookReturned() {
trigger.trigger("book_returned");

// @formatter:off
await().untilAsserted(() ->
verify(emailService).sendEmail(
contains("Title: Contract Testing for Dummies, Author: John Doe, ISBN: 978-1234567890")));
// @formatter:om
}

@Configuration
static class TestConfig {

@Bean
MessageVerifierSender<Message<?>> standaloneMessageVerifier(KafkaTemplate<String, Object> kafkaTemplate) {
return new KafkaAvroMessageVerifierSender<>(kafkaTemplate);
}

}

static class KafkaAvroMessageVerifierSender<M> implements MessageVerifierSender<M> {

private final KafkaTemplate<String, Object> kafkaTemplate;

// TODO: should this be the default?
@Override
public void send(M message, String destination, @Nullable YamlContract contract) {
send(message, emptyMap(), destination, contract);
}

@Override
public <T> void send(T payload, Map<String, Object> headers, String destination,
@Nullable YamlContract contract) {
Map<String, Object> newHeaders = headers != null ? new HashMap<>(headers) : new HashMap<>();
newHeaders.put(KafkaHeaders.TOPIC, destination);
MessageHeaders msgHeaders = new MessageHeaders(newHeaders);

try {
// TODO: remove this workaround after merging:
// https://github.com/spring-cloud/spring-cloud-contract/issues/2404
Book avroPayload = new JsonMapper().readValue(payload.toString(), Book.class);
var message = MessageBuilder.createMessage(avroPayload, msgHeaders);
kafkaTemplate.send(message);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

KafkaAvroMessageVerifierSender(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

}

}
Loading