in_amqp: Add documentation for in_amqp plugin#2387
in_amqp: Add documentation for in_amqp plugin#2387matwey wants to merge 1 commit intofluent:masterfrom
Conversation
📝 WalkthroughWalkthroughAdds a new AMQP input plugin documentation page and a table-of-contents entry describing configuration options, message-to-record mapping (body, properties, headers), parsing behavior, connection retry settings, usage examples, and the rabbitmq-c dependency. Changes
Estimated code review effort🎯 1 (Trivial) | ⏱️ ~5 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
pipeline/inputs/amqp.md (1)
9-15: Missingthread.ring_buffer.*configuration parameters.The configuration table omits the ring buffer settings common to Fluent Bit input plugins:
thread.ring_buffer.capacity,thread.ring_buffer.window, andthread.ring_buffer.retry_limit. Based on learnings, in Fluent Bit input plugin documentation underpipeline/inputs, use thethread.prefix for ring buffer settings (e.g.,thread.ring_buffer.capacity,thread.ring_buffer.window,thread.ring_buffer.retry_limit).📝 Suggested table rows to add
| `reconnect.retry_interval` | Retry interval (in seconds) to connect to the broker | `60` | +| `thread.ring_buffer.capacity` | Set the capacity of the ring buffer | _default_ | +| `thread.ring_buffer.window` | Set the window size of the ring buffer | _default_ | +| `thread.ring_buffer.retry_limit` | Set the retry limit for the ring buffer | _default_ |🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pipeline/inputs/amqp.md` around lines 9 - 15, Add the missing Fluent Bit ring buffer settings to the AMQP input configuration table: include rows for thread.ring_buffer.capacity, thread.ring_buffer.window, and thread.ring_buffer.retry_limit with appropriate descriptions and defaults (use typical Fluent Bit defaults or project-consistent values); update any documentation references for the amqp input parser/queue/reconnect entries if needed and ensure the keys use the thread. prefix exactly (thread.ring_buffer.capacity, thread.ring_buffer.window, thread.ring_buffer.retry_limit) so they match other pipeline/inputs docs and the plugin implementation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pipeline/inputs/amqp.md`:
- Around line 138-140: Summary: Clarify that message requeue on connection loss
depends on acknowledgment mode and timing. Update the bullet that currently
reads "Messages consumed but not yet processed will be requeued by the broker
(assuming proper acknowledgment settings)" to explicitly state this only applies
when the plugin uses manual acknowledgment (not auto-ack), and only if the
connection drops before the client sends basic.ack or basic.nack; reference the
reconnect behavior controlled by reconnect.retry_limits and
reconnect.retry_interval. Edit the text to mention "manual (explicit) ACK mode"
vs "auto-ack", note broker requeue happens on client disconnect prior to
ack/nack, and recommend users ensure the plugin is configured for manual ack if
they require this requeue guarantee.
- Around line 95-110: The YAML places the parsers list under pipeline:, which is
invalid for Fluent Bit; move the "parsers:" block out to top-level (sibling to
"pipeline:"), remove the nested "parsers" under pipeline, and ensure the parser
name ("json") referenced in pipeline.inputs (the amqp input) matches the
top-level parsers entry; verify the "pipeline:", "inputs:", and "outputs:"
structure remains unchanged.
- Around line 9-15: Move the parsers: block out from under pipeline: so parsers:
is a top-level YAML key (alongside pipeline:) and ensure the example uses
parser: json under the amqp input (references: parsers:, pipeline:, inputs:,
parser:). Also update the configuration table to include the two missing
threading parameters thread.ring_buffer.capacity (default 1024) and
thread.ring_buffer.window (default 5) with their descriptions so the AMQP input
docs match the other input plugins.
---
Nitpick comments:
In `@pipeline/inputs/amqp.md`:
- Around line 9-15: Add the missing Fluent Bit ring buffer settings to the AMQP
input configuration table: include rows for thread.ring_buffer.capacity,
thread.ring_buffer.window, and thread.ring_buffer.retry_limit with appropriate
descriptions and defaults (use typical Fluent Bit defaults or project-consistent
values); update any documentation references for the amqp input
parser/queue/reconnect entries if needed and ensure the keys use the thread.
prefix exactly (thread.ring_buffer.capacity, thread.ring_buffer.window,
thread.ring_buffer.retry_limit) so they match other pipeline/inputs docs and the
plugin implementation.
| | Key | Description | Default | | ||
| |:---|:---|:---| | ||
| | `uri` | Specify an AMQP URI to connect to the broker | `amqp://` | | ||
| | `queue` | Specify an AMQP queue name to consume from | _none_ (required) | | ||
| | `parser` | Specify a parser to process the message payload | _none_ | | ||
| | `reconnect.retry_limits` | Maximum number of retries to connect to the broker | `5` | | ||
| | `reconnect.retry_interval` | Retry interval (in seconds) to connect to the broker | `60` | |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Search for the in_amqp plugin implementation in fluent-bit repository
find . -type f -name "*.c" -o -name "*.h" | xargs grep -l "in_amqp\|amqp" 2>/dev/null | head -20Repository: fluent/fluent-bit-docs
Length of output: 48
🏁 Script executed:
# Also search for reconnect parameter names in the codebase
rg "reconnect.*retry" --type=c --type=h -i -A2 -B2 | head -100Repository: fluent/fluent-bit-docs
Length of output: 214
🏁 Script executed:
# Search for thread.ring_buffer parameters to understand their scope
rg "thread\.ring_buffer\|ring_buffer" --type=c --type=h -i -A1 -B1 | head -80Repository: fluent/fluent-bit-docs
Length of output: 214
🌐 Web query:
Fluent Bit AMQP input plugin configuration parameters reconnect retry
💡 Result:
Fluent Bit does not ship with an AMQP (“in_amqp”) input plugin, so there are no AMQP-input-specific reconnect/retry configuration parameters in core Fluent Bit. In Fluent Bit’s documented input plugin list (e.g., CPU, Tail, TCP, MQTT, Syslog, etc.), AMQP isn’t included. [1]
If what you actually need is retry/backoff behavior when delivering data (outputs) (common confusion when people say “retry”):
- Per-output max retries:
Retry_Limit(N,no_limits/False, orno_retries). [2] - Global retry backoff timing (SERVICE section):
scheduler.baseandscheduler.cap(exponential backoff with jitter). [2][3]
If you truly need AMQP consumption, you’ll need an external Fluent Bit plugin (Go/Wasm) loaded via the plugins mechanism, and its reconnect/retry knobs will be plugin-specific (not standardized by Fluent Bit). [4]
Sources: [1] [2] [3] [4]
🏁 Script executed:
# Check what input plugin documentation exists in fluent-bit-docs
ls -la pipeline/inputs/ | head -20Repository: fluent/fluent-bit-docs
Length of output: 1247
🏁 Script executed:
# Look at an existing input plugin doc to compare structure and see if it includes thread.ring_buffer params
cat pipeline/inputs/tail.md | head -100Repository: fluent/fluent-bit-docs
Length of output: 12465
🏁 Script executed:
# Check the YAML structure in the amqp.md file to verify parsers placement issue
sed -n '95,115p' pipeline/inputs/amqp.mdRepository: fluent/fluent-bit-docs
Length of output: 364
🏁 Script executed:
# Search for thread.ring_buffer in existing input plugin docs
rg "thread\.ring_buffer" pipeline/inputs/Repository: fluent/fluent-bit-docs
Length of output: 2487
🏁 Script executed:
# Check if parsers should be top-level in YAML format by searching other docs
rg "parsers:" pipeline/ -B2 -A2 | head -50Repository: fluent/fluent-bit-docs
Length of output: 1957
Fix YAML structure: parsers: must be top-level, not nested under pipeline:.
The configuration example at lines 96–110 incorrectly nests parsers: as a subsection within pipeline:. In the Fluent Bit YAML format, parsers: is a top-level key, separate from the pipeline: section. Correct the indentation:
Corrected YAML structure
parsers:
- name: json
format: json
pipeline:
inputs:
- name: amqp
queue: json_messages
parser: json
uri: amqp://guest:guest@localhost:5672/%2F
outputs:
- name: stdout
match: '*'Add missing thread.ring_buffer parameters to the configuration table (lines 9–15). All other input plugins in the documentation (tail, opentelemetry, prometheus-textfile, blob) include these threading parameters:
thread.ring_buffer.capacity: Number of slots in the ring buffer for threaded mode (default:1024)thread.ring_buffer.window: Ring buffer flush threshold percentage for threaded mode (default:5)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pipeline/inputs/amqp.md` around lines 9 - 15, Move the parsers: block out
from under pipeline: so parsers: is a top-level YAML key (alongside pipeline:)
and ensure the example uses parser: json under the amqp input (references:
parsers:, pipeline:, inputs:, parser:). Also update the configuration table to
include the two missing threading parameters thread.ring_buffer.capacity
(default 1024) and thread.ring_buffer.window (default 5) with their descriptions
so the AMQP input docs match the other input plugins.
| ```yaml | ||
| pipeline: | ||
| inputs: | ||
| - name: amqp | ||
| queue: json_messages | ||
| parser: json | ||
| uri: amqp://guest:guest@localhost:5672/%2F | ||
|
|
||
| parsers: | ||
| - name: json | ||
| format: json | ||
|
|
||
| outputs: | ||
| - name: stdout | ||
| match: '*' | ||
| ``` |
There was a problem hiding this comment.
parsers: must be a top-level YAML key, not nested under pipeline:.
In Fluent Bit YAML configuration, custom parsers are defined in the parsers: section at the top level of the file, not as a subsection of pipeline:. The pipeline: section only supports inputs:, filters:, and outputs: subsections. The current indentation places parsers: inside pipeline:, which is invalid.
🐛 Proposed fix for YAML structure
```yaml
+parsers:
+ - name: json
+ format: json
+
pipeline:
inputs:
- name: amqp
queue: json_messages
parser: json
uri: amqp://guest:guest@localhost:5672/%2F
- parsers:
- - name: json
- format: json
-
outputs:
- name: stdout
match: '*'</details>
<!-- suggestion_start -->
<details>
<summary>📝 Committable suggestion</summary>
> ‼️ **IMPORTANT**
> Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
```suggestion
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pipeline/inputs/amqp.md` around lines 95 - 110, The YAML places the parsers
list under pipeline:, which is invalid for Fluent Bit; move the "parsers:" block
out to top-level (sibling to "pipeline:"), remove the nested "parsers" under
pipeline, and ensure the parser name ("json") referenced in pipeline.inputs (the
amqp input) matches the top-level parsers entry; verify the "pipeline:",
"inputs:", and "outputs:" structure remains unchanged.
| 1. If the initial connection fails, it will retry based on `reconnect.retry_limits` and `reconnect.retry_interval` | ||
| 2. If a connection is lost during operation, the plugin will automatically attempt to reconnect | ||
| 3. Messages consumed but not yet processed will be requeued by the broker (assuming proper acknowledgment settings) |
There was a problem hiding this comment.
Clarify the requeue guarantee in connection management.
Line 140 states that "Messages consumed but not yet processed will be requeued by the broker (assuming proper acknowledgment settings)." This is misleading: in AMQP, once a message is delivered (consumed) to the client with an unacknowledged delivery, it is only requeued if the connection drops before the basic.ack or basic.nack is sent — and only when the queue is configured with ack mode, not auto-ack. The current phrasing could lead users to believe there is always a safety net regardless of their ACK configuration. Recommend being explicit that this applies only when the plugin uses manual acknowledgment mode (not auto-ack).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pipeline/inputs/amqp.md` around lines 138 - 140, Summary: Clarify that
message requeue on connection loss depends on acknowledgment mode and timing.
Update the bullet that currently reads "Messages consumed but not yet processed
will be requeued by the broker (assuming proper acknowledgment settings)" to
explicitly state this only applies when the plugin uses manual acknowledgment
(not auto-ack), and only if the connection drops before the client sends
basic.ack or basic.nack; reference the reconnect behavior controlled by
reconnect.retry_limits and reconnect.retry_interval. Edit the text to mention
"manual (explicit) ACK mode" vs "auto-ack", note broker requeue happens on
client disconnect prior to ack/nack, and recommend users ensure the plugin is
configured for manual ack if they require this requeue guarantee.
There was a problem hiding this comment.
♻️ Duplicate comments (3)
pipeline/inputs/amqp.md (3)
95-110:⚠️ Potential issue | 🟠 MajorFix YAML example structure:
parsers:must be top-level.In Line 103,
parsers:is nested underpipeline:. Move it to top-level (sibling ofpipeline:), and keepparser: jsonon the input.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pipeline/inputs/amqp.md` around lines 95 - 110, The YAML example nests "parsers:" under "pipeline:" which is incorrect; move the "parsers:" block to be a top-level sibling of "pipeline:" (so it sits alongside "pipeline:" and "outputs:"), leaving the input's "parser: json" reference inside the "pipeline.inputs" entry unchanged; update the example so the keys "pipeline:", "parsers:", and "outputs:" are all top-level keys and "parser: json" remains under the amqp input.
9-15:⚠️ Potential issue | 🟠 MajorAdd missing threaded ring buffer parameters to the config table.
Please document
thread.ring_buffer.capacityandthread.ring_buffer.window(andthread.ring_buffer.retry_limitif supported by this input) so AMQP matches other input plugin docs.Based on learnings: In Fluent Bit input plugin documentation under pipeline/inputs, use the
thread.prefix for ring buffer settings (for examplethread.ring_buffer.capacity,thread.ring_buffer.window,thread.ring_buffer.retry_limit).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pipeline/inputs/amqp.md` around lines 9 - 15, The AMQP input docs are missing the threaded ring buffer settings; update the config table to include entries for thread.ring_buffer.capacity, thread.ring_buffer.window and (if supported) thread.ring_buffer.retry_limit with short descriptions and defaults consistent with other input docs (e.g., capacity = default buffer size, window = default window size, retry_limit = default retry count); reference the existing keys `uri`, `queue`, `parser`, `reconnect.retry_limits`, and `reconnect.retry_interval` so the new `thread.ring_buffer.*` rows follow the same format and naming convention used elsewhere in pipeline/inputs documentation.
138-140:⚠️ Potential issue | 🟡 MinorClarify requeue behavior for ACK mode and timing.
Line 140 currently implies a broad guarantee. Please state this only applies in manual ACK mode and only if disconnect happens before
ack/nackis sent.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pipeline/inputs/amqp.md` around lines 138 - 140, The sentence about requeued messages is too broad; update the docs around the third bullet so it explicitly states that message requeueing by the broker applies only when the consumer is using manual ACK mode and the disconnect occurs before the consumer has sent an ack or nack for the message (i.e., unacknowledged messages at disconnect). Reference the existing wording about "Messages consumed but not yet processed will be requeued by the broker" and change it to mention "manual ACK mode" and "disconnect before ack/nack is sent" to make the scope precise.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@pipeline/inputs/amqp.md`:
- Around line 95-110: The YAML example nests "parsers:" under "pipeline:" which
is incorrect; move the "parsers:" block to be a top-level sibling of "pipeline:"
(so it sits alongside "pipeline:" and "outputs:"), leaving the input's "parser:
json" reference inside the "pipeline.inputs" entry unchanged; update the example
so the keys "pipeline:", "parsers:", and "outputs:" are all top-level keys and
"parser: json" remains under the amqp input.
- Around line 9-15: The AMQP input docs are missing the threaded ring buffer
settings; update the config table to include entries for
thread.ring_buffer.capacity, thread.ring_buffer.window and (if supported)
thread.ring_buffer.retry_limit with short descriptions and defaults consistent
with other input docs (e.g., capacity = default buffer size, window = default
window size, retry_limit = default retry count); reference the existing keys
`uri`, `queue`, `parser`, `reconnect.retry_limits`, and
`reconnect.retry_interval` so the new `thread.ring_buffer.*` rows follow the
same format and naming convention used elsewhere in pipeline/inputs
documentation.
- Around line 138-140: The sentence about requeued messages is too broad; update
the docs around the third bullet so it explicitly states that message requeueing
by the broker applies only when the consumer is using manual ACK mode and the
disconnect occurs before the consumer has sent an ack or nack for the message
(i.e., unacknowledged messages at disconnect). Reference the existing wording
about "Messages consumed but not yet processed will be requeued by the broker"
and change it to mention "manual ACK mode" and "disconnect before ack/nack is
sent" to make the scope precise.
|
@matwey before I review I would like to ask that you address all the comments from coderabbitai bot, please? |
Reference: fluent/fluent-bit#11427
Summary by CodeRabbit