Skip to content

in_amqp: Add documentation for in_amqp plugin#2387

Open
matwey wants to merge 1 commit intofluent:masterfrom
matwey:in_amqp
Open

in_amqp: Add documentation for in_amqp plugin#2387
matwey wants to merge 1 commit intofluent:masterfrom
matwey:in_amqp

Conversation

@matwey
Copy link

@matwey matwey commented Feb 25, 2026

Reference: fluent/fluent-bit#11427

Summary by CodeRabbit

  • Documentation
    • Added comprehensive AMQP input plugin documentation: configuration options (connection URI, queue, parser, reconnect behavior), how messages map to records (body, properties, headers), optional parsing for JSON, connection retry guidance, usage examples for CLI and config, and a note on required client library. Also linked the new doc from the Data Pipeline Inputs section.

@matwey matwey requested review from a team and eschabell as code owners February 25, 2026 13:10
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 25, 2026

📝 Walkthrough

Walkthrough

Adds a new AMQP input plugin documentation page and a table-of-contents entry describing configuration options, message-to-record mapping (body, properties, headers), parsing behavior, connection retry settings, usage examples, and the rabbitmq-c dependency.

Changes

Cohort / File(s) Summary
Documentation: AMQP input & TOC
pipeline/inputs/amqp.md, SUMMARY.md
Adds pipeline/inputs/amqp.md describing the AMQP input plugin (configuration: uri, queue, parser, reconnect.retry_limits, reconnect.retry_interval), message transformation rules (body → record content, properties → metadata, headers → nested metadata), parsing examples, connection retry behavior, and rabbitmq-c requirement; updates SUMMARY.md to include the new doc.

Estimated code review effort

🎯 1 (Trivial) | ⏱️ ~5 minutes

Poem

🐇 I hopped through queues and bindings bright,
Wrote a note on AMQP tonight,
Body, props, headers in tidy rows,
Fluent Bit listens as the message flows,
📨 — a rabbit's doc, snug and light.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'in_amqp: Add documentation for in_amqp plugin' directly and clearly summarizes the main change: adding documentation for the in_amqp plugin, which is confirmed by the changeset containing new amqp.md documentation and SUMMARY.md updates.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
pipeline/inputs/amqp.md (1)

9-15: Missing thread.ring_buffer.* configuration parameters.

The configuration table omits the ring buffer settings common to Fluent Bit input plugins: thread.ring_buffer.capacity, thread.ring_buffer.window, and thread.ring_buffer.retry_limit. Based on learnings, in Fluent Bit input plugin documentation under pipeline/inputs, use the thread. prefix for ring buffer settings (e.g., thread.ring_buffer.capacity, thread.ring_buffer.window, thread.ring_buffer.retry_limit).

📝 Suggested table rows to add
 | `reconnect.retry_interval` | Retry interval (in seconds) to connect to the broker | `60` |
+| `thread.ring_buffer.capacity` | Set the capacity of the ring buffer | _default_ |
+| `thread.ring_buffer.window` | Set the window size of the ring buffer | _default_ |
+| `thread.ring_buffer.retry_limit` | Set the retry limit for the ring buffer | _default_ |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/inputs/amqp.md` around lines 9 - 15, Add the missing Fluent Bit ring
buffer settings to the AMQP input configuration table: include rows for
thread.ring_buffer.capacity, thread.ring_buffer.window, and
thread.ring_buffer.retry_limit with appropriate descriptions and defaults (use
typical Fluent Bit defaults or project-consistent values); update any
documentation references for the amqp input parser/queue/reconnect entries if
needed and ensure the keys use the thread. prefix exactly
(thread.ring_buffer.capacity, thread.ring_buffer.window,
thread.ring_buffer.retry_limit) so they match other pipeline/inputs docs and the
plugin implementation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pipeline/inputs/amqp.md`:
- Around line 138-140: Summary: Clarify that message requeue on connection loss
depends on acknowledgment mode and timing. Update the bullet that currently
reads "Messages consumed but not yet processed will be requeued by the broker
(assuming proper acknowledgment settings)" to explicitly state this only applies
when the plugin uses manual acknowledgment (not auto-ack), and only if the
connection drops before the client sends basic.ack or basic.nack; reference the
reconnect behavior controlled by reconnect.retry_limits and
reconnect.retry_interval. Edit the text to mention "manual (explicit) ACK mode"
vs "auto-ack", note broker requeue happens on client disconnect prior to
ack/nack, and recommend users ensure the plugin is configured for manual ack if
they require this requeue guarantee.
- Around line 95-110: The YAML places the parsers list under pipeline:, which is
invalid for Fluent Bit; move the "parsers:" block out to top-level (sibling to
"pipeline:"), remove the nested "parsers" under pipeline, and ensure the parser
name ("json") referenced in pipeline.inputs (the amqp input) matches the
top-level parsers entry; verify the "pipeline:", "inputs:", and "outputs:"
structure remains unchanged.
- Around line 9-15: Move the parsers: block out from under pipeline: so parsers:
is a top-level YAML key (alongside pipeline:) and ensure the example uses
parser: json under the amqp input (references: parsers:, pipeline:, inputs:,
parser:). Also update the configuration table to include the two missing
threading parameters thread.ring_buffer.capacity (default 1024) and
thread.ring_buffer.window (default 5) with their descriptions so the AMQP input
docs match the other input plugins.

---

Nitpick comments:
In `@pipeline/inputs/amqp.md`:
- Around line 9-15: Add the missing Fluent Bit ring buffer settings to the AMQP
input configuration table: include rows for thread.ring_buffer.capacity,
thread.ring_buffer.window, and thread.ring_buffer.retry_limit with appropriate
descriptions and defaults (use typical Fluent Bit defaults or project-consistent
values); update any documentation references for the amqp input
parser/queue/reconnect entries if needed and ensure the keys use the thread.
prefix exactly (thread.ring_buffer.capacity, thread.ring_buffer.window,
thread.ring_buffer.retry_limit) so they match other pipeline/inputs docs and the
plugin implementation.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4a18664 and 2b4ec4b.

📒 Files selected for processing (1)
  • pipeline/inputs/amqp.md

Comment on lines +9 to +15
| Key | Description | Default |
|:---|:---|:---|
| `uri` | Specify an AMQP URI to connect to the broker | `amqp://` |
| `queue` | Specify an AMQP queue name to consume from | _none_ (required) |
| `parser` | Specify a parser to process the message payload | _none_ |
| `reconnect.retry_limits` | Maximum number of retries to connect to the broker | `5` |
| `reconnect.retry_interval` | Retry interval (in seconds) to connect to the broker | `60` |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Search for the in_amqp plugin implementation in fluent-bit repository
find . -type f -name "*.c" -o -name "*.h" | xargs grep -l "in_amqp\|amqp" 2>/dev/null | head -20

Repository: fluent/fluent-bit-docs

Length of output: 48


🏁 Script executed:

# Also search for reconnect parameter names in the codebase
rg "reconnect.*retry" --type=c --type=h -i -A2 -B2 | head -100

Repository: fluent/fluent-bit-docs

Length of output: 214


🏁 Script executed:

# Search for thread.ring_buffer parameters to understand their scope
rg "thread\.ring_buffer\|ring_buffer" --type=c --type=h -i -A1 -B1 | head -80

Repository: fluent/fluent-bit-docs

Length of output: 214


🌐 Web query:

Fluent Bit AMQP input plugin configuration parameters reconnect retry

💡 Result:

Fluent Bit does not ship with an AMQP (“in_amqp”) input plugin, so there are no AMQP-input-specific reconnect/retry configuration parameters in core Fluent Bit. In Fluent Bit’s documented input plugin list (e.g., CPU, Tail, TCP, MQTT, Syslog, etc.), AMQP isn’t included. [1]

If what you actually need is retry/backoff behavior when delivering data (outputs) (common confusion when people say “retry”):

  • Per-output max retries: Retry_Limit (N, no_limits/False, or no_retries). [2]
  • Global retry backoff timing (SERVICE section): scheduler.base and scheduler.cap (exponential backoff with jitter). [2][3]

If you truly need AMQP consumption, you’ll need an external Fluent Bit plugin (Go/Wasm) loaded via the plugins mechanism, and its reconnect/retry knobs will be plugin-specific (not standardized by Fluent Bit). [4]

Sources: [1] [2] [3] [4]


🏁 Script executed:

# Check what input plugin documentation exists in fluent-bit-docs
ls -la pipeline/inputs/ | head -20

Repository: fluent/fluent-bit-docs

Length of output: 1247


🏁 Script executed:

# Look at an existing input plugin doc to compare structure and see if it includes thread.ring_buffer params
cat pipeline/inputs/tail.md | head -100

Repository: fluent/fluent-bit-docs

Length of output: 12465


🏁 Script executed:

# Check the YAML structure in the amqp.md file to verify parsers placement issue
sed -n '95,115p' pipeline/inputs/amqp.md

Repository: fluent/fluent-bit-docs

Length of output: 364


🏁 Script executed:

# Search for thread.ring_buffer in existing input plugin docs
rg "thread\.ring_buffer" pipeline/inputs/

Repository: fluent/fluent-bit-docs

Length of output: 2487


🏁 Script executed:

# Check if parsers should be top-level in YAML format by searching other docs
rg "parsers:" pipeline/ -B2 -A2 | head -50

Repository: fluent/fluent-bit-docs

Length of output: 1957


Fix YAML structure: parsers: must be top-level, not nested under pipeline:.

The configuration example at lines 96–110 incorrectly nests parsers: as a subsection within pipeline:. In the Fluent Bit YAML format, parsers: is a top-level key, separate from the pipeline: section. Correct the indentation:

Corrected YAML structure
parsers:
  - name: json
    format: json

pipeline:
  inputs:
    - name: amqp
      queue: json_messages
      parser: json
      uri: amqp://guest:guest@localhost:5672/%2F
  outputs:
    - name: stdout
      match: '*'

Add missing thread.ring_buffer parameters to the configuration table (lines 9–15). All other input plugins in the documentation (tail, opentelemetry, prometheus-textfile, blob) include these threading parameters:

  • thread.ring_buffer.capacity: Number of slots in the ring buffer for threaded mode (default: 1024)
  • thread.ring_buffer.window: Ring buffer flush threshold percentage for threaded mode (default: 5)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/inputs/amqp.md` around lines 9 - 15, Move the parsers: block out
from under pipeline: so parsers: is a top-level YAML key (alongside pipeline:)
and ensure the example uses parser: json under the amqp input (references:
parsers:, pipeline:, inputs:, parser:). Also update the configuration table to
include the two missing threading parameters thread.ring_buffer.capacity
(default 1024) and thread.ring_buffer.window (default 5) with their descriptions
so the AMQP input docs match the other input plugins.

Comment on lines +95 to +110
```yaml
pipeline:
inputs:
- name: amqp
queue: json_messages
parser: json
uri: amqp://guest:guest@localhost:5672/%2F

parsers:
- name: json
format: json

outputs:
- name: stdout
match: '*'
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

parsers: must be a top-level YAML key, not nested under pipeline:.

In Fluent Bit YAML configuration, custom parsers are defined in the parsers: section at the top level of the file, not as a subsection of pipeline:. The pipeline: section only supports inputs:, filters:, and outputs: subsections. The current indentation places parsers: inside pipeline:, which is invalid.

🐛 Proposed fix for YAML structure
 ```yaml
+parsers:
+  - name: json
+    format: json
+
 pipeline:
   inputs:
     - name: amqp
       queue: json_messages
       parser: json
       uri: amqp://guest:guest@localhost:5672/%2F
 
-  parsers:
-    - name: json
-      format: json
-
   outputs:
     - name: stdout
       match: '*'
</details>

<!-- suggestion_start -->

<details>
<summary>📝 Committable suggestion</summary>

> ‼️ **IMPORTANT**
> Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

```suggestion

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/inputs/amqp.md` around lines 95 - 110, The YAML places the parsers
list under pipeline:, which is invalid for Fluent Bit; move the "parsers:" block
out to top-level (sibling to "pipeline:"), remove the nested "parsers" under
pipeline, and ensure the parser name ("json") referenced in pipeline.inputs (the
amqp input) matches the top-level parsers entry; verify the "pipeline:",
"inputs:", and "outputs:" structure remains unchanged.

Comment on lines +138 to +140
1. If the initial connection fails, it will retry based on `reconnect.retry_limits` and `reconnect.retry_interval`
2. If a connection is lost during operation, the plugin will automatically attempt to reconnect
3. Messages consumed but not yet processed will be requeued by the broker (assuming proper acknowledgment settings)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Clarify the requeue guarantee in connection management.

Line 140 states that "Messages consumed but not yet processed will be requeued by the broker (assuming proper acknowledgment settings)." This is misleading: in AMQP, once a message is delivered (consumed) to the client with an unacknowledged delivery, it is only requeued if the connection drops before the basic.ack or basic.nack is sent — and only when the queue is configured with ack mode, not auto-ack. The current phrasing could lead users to believe there is always a safety net regardless of their ACK configuration. Recommend being explicit that this applies only when the plugin uses manual acknowledgment mode (not auto-ack).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/inputs/amqp.md` around lines 138 - 140, Summary: Clarify that
message requeue on connection loss depends on acknowledgment mode and timing.
Update the bullet that currently reads "Messages consumed but not yet processed
will be requeued by the broker (assuming proper acknowledgment settings)" to
explicitly state this only applies when the plugin uses manual acknowledgment
(not auto-ack), and only if the connection drops before the client sends
basic.ack or basic.nack; reference the reconnect behavior controlled by
reconnect.retry_limits and reconnect.retry_interval. Edit the text to mention
"manual (explicit) ACK mode" vs "auto-ack", note broker requeue happens on
client disconnect prior to ack/nack, and recommend users ensure the plugin is
configured for manual ack if they require this requeue guarantee.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (3)
pipeline/inputs/amqp.md (3)

95-110: ⚠️ Potential issue | 🟠 Major

Fix YAML example structure: parsers: must be top-level.

In Line 103, parsers: is nested under pipeline:. Move it to top-level (sibling of pipeline:), and keep parser: json on the input.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/inputs/amqp.md` around lines 95 - 110, The YAML example nests
"parsers:" under "pipeline:" which is incorrect; move the "parsers:" block to be
a top-level sibling of "pipeline:" (so it sits alongside "pipeline:" and
"outputs:"), leaving the input's "parser: json" reference inside the
"pipeline.inputs" entry unchanged; update the example so the keys "pipeline:",
"parsers:", and "outputs:" are all top-level keys and "parser: json" remains
under the amqp input.

9-15: ⚠️ Potential issue | 🟠 Major

Add missing threaded ring buffer parameters to the config table.

Please document thread.ring_buffer.capacity and thread.ring_buffer.window (and thread.ring_buffer.retry_limit if supported by this input) so AMQP matches other input plugin docs.

Based on learnings: In Fluent Bit input plugin documentation under pipeline/inputs, use the thread. prefix for ring buffer settings (for example thread.ring_buffer.capacity, thread.ring_buffer.window, thread.ring_buffer.retry_limit).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/inputs/amqp.md` around lines 9 - 15, The AMQP input docs are missing
the threaded ring buffer settings; update the config table to include entries
for thread.ring_buffer.capacity, thread.ring_buffer.window and (if supported)
thread.ring_buffer.retry_limit with short descriptions and defaults consistent
with other input docs (e.g., capacity = default buffer size, window = default
window size, retry_limit = default retry count); reference the existing keys
`uri`, `queue`, `parser`, `reconnect.retry_limits`, and
`reconnect.retry_interval` so the new `thread.ring_buffer.*` rows follow the
same format and naming convention used elsewhere in pipeline/inputs
documentation.

138-140: ⚠️ Potential issue | 🟡 Minor

Clarify requeue behavior for ACK mode and timing.

Line 140 currently implies a broad guarantee. Please state this only applies in manual ACK mode and only if disconnect happens before ack/nack is sent.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/inputs/amqp.md` around lines 138 - 140, The sentence about requeued
messages is too broad; update the docs around the third bullet so it explicitly
states that message requeueing by the broker applies only when the consumer is
using manual ACK mode and the disconnect occurs before the consumer has sent an
ack or nack for the message (i.e., unacknowledged messages at disconnect).
Reference the existing wording about "Messages consumed but not yet processed
will be requeued by the broker" and change it to mention "manual ACK mode" and
"disconnect before ack/nack is sent" to make the scope precise.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@pipeline/inputs/amqp.md`:
- Around line 95-110: The YAML example nests "parsers:" under "pipeline:" which
is incorrect; move the "parsers:" block to be a top-level sibling of "pipeline:"
(so it sits alongside "pipeline:" and "outputs:"), leaving the input's "parser:
json" reference inside the "pipeline.inputs" entry unchanged; update the example
so the keys "pipeline:", "parsers:", and "outputs:" are all top-level keys and
"parser: json" remains under the amqp input.
- Around line 9-15: The AMQP input docs are missing the threaded ring buffer
settings; update the config table to include entries for
thread.ring_buffer.capacity, thread.ring_buffer.window and (if supported)
thread.ring_buffer.retry_limit with short descriptions and defaults consistent
with other input docs (e.g., capacity = default buffer size, window = default
window size, retry_limit = default retry count); reference the existing keys
`uri`, `queue`, `parser`, `reconnect.retry_limits`, and
`reconnect.retry_interval` so the new `thread.ring_buffer.*` rows follow the
same format and naming convention used elsewhere in pipeline/inputs
documentation.
- Around line 138-140: The sentence about requeued messages is too broad; update
the docs around the third bullet so it explicitly states that message requeueing
by the broker applies only when the consumer is using manual ACK mode and the
disconnect occurs before the consumer has sent an ack or nack for the message
(i.e., unacknowledged messages at disconnect). Reference the existing wording
about "Messages consumed but not yet processed will be requeued by the broker"
and change it to mention "manual ACK mode" and "disconnect before ack/nack is
sent" to make the scope precise.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2b4ec4b and 0641155.

📒 Files selected for processing (2)
  • SUMMARY.md
  • pipeline/inputs/amqp.md

@eschabell eschabell self-assigned this Feb 26, 2026
@eschabell
Copy link
Collaborator

@matwey before I review I would like to ask that you address all the comments from coderabbitai bot, please?

@eschabell eschabell added waiting-for-user Waiting for user/contributors feedback or requested changes waiting-on-code-merge waiting-on-review Waiting on a review from mainteners labels Feb 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

waiting-for-user Waiting for user/contributors feedback or requested changes waiting-on-code-merge waiting-on-review Waiting on a review from mainteners

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants