Skip to content

Conversation

@sakurai-youhei
Copy link
Member

@sakurai-youhei sakurai-youhei commented Jan 2, 2026

What does this pull request do?

This PR adds AIOKafkaInstrumentation that instruments aiokafka.

Related issues

n/a

Notes for reviewers

  • This feature addition is a nice-to-have.
  • aiokafka_tests.py basically follows what the existing kafka_tests.py does.
    For similarities and differences, please refer to the output of:
    git diff --no-index tests/instrumentation/kafka_tests.py tests/instrumentation/asyncio_tests/aiokafka_tests.py.
  • The following four tests are added entirely new:
    1, test_aiokafka_getmany_multiple_topics: getmany() is a unique feature of aiokafka.
    2. test_aiokafka_send_batch: send_batch() is also a unique feature of aiokafka.
    3. test_aiokafka_consumer_non_recording_transaction: A test to increase coverage.
    4. test_aiokafka_inject_trace_parent_into_send_arguments: A test that thoroughly checks the most complicated part.

Interoperability check

Echo client using kafka-python
import contextlib
import elasticapm
from kafka import KafkaConsumer, KafkaProducer


def kafka_echo_client(bootstrap_servers: str, request_topic: str, response_topic: str):
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
    consumer = KafkaConsumer(response_topic, bootstrap_servers=bootstrap_servers)

    producer.send(request_topic, b'test')
    for msg in consumer:
        producer.send(request_topic, msg.value)
        break
    next(consumer)


if __name__ == "__main__":
    elasticapm.instrument()
    with contextlib.closing(elasticapm.Client()):
        kafka_echo_client("localhost:9092", "request_topic", "response_topic")
Echo server using aiokafka
import asyncio
import contextlib
import elasticapm
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer


async def aiokafka_echo_server(bootstrap_servers: str, request_topic: str, response_topic: str):
    consumer = AIOKafkaConsumer(request_topic, bootstrap_servers=bootstrap_servers)
    producer = AIOKafkaProducer(bootstrap_servers=bootstrap_servers)

    try:
        await consumer.start()
        await producer.start()

        async for msg in consumer:
            await producer.send_and_wait(response_topic, msg.value)
    finally:
        await consumer.stop()
        await producer.stop()


if __name__ == "__main__":
    elasticapm.instrument()
    with contextlib.closing(elasticapm.Client()):
        asyncio.run(aiokafka_echo_server("localhost:9092", "request_topic", "response_topic"))
image

@github-actions
Copy link

github-actions bot commented Jan 2, 2026

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@github-actions
Copy link

github-actions bot commented Jan 4, 2026

🔍 Preview links for changed docs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant