Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

The client API documentation is written in standard Markdown and generated dynamically into C++ headers. You can find the full API overview in [docs/API.md](docs/API.md).

The public API follows the same object-oriented layout as the original Fluvio client: connect with `Fluvio` or `FluvioAdmin`, then call methods on the returned objects.

## Installation

You can install the client effortlessly without compiling the heavy Rust toolchain by using `vcpkg`.
Expand All @@ -37,8 +39,8 @@ target_link_libraries(main PRIVATE fluvio_client_cpp::fluvio_client_cpp)
#include "fluvio-client-cpp/src/lib.rs.h"

int main() {
auto admin = fluvio_admin_connect();
admin_create_topic(*admin, "a_topic", 1, 1);
auto admin = FluvioAdmin::connect();
admin->create_topic("a_topic", 1, 1);
return 0;
}
```
Expand All @@ -51,18 +53,20 @@ int main() {
#include <string>

int main() {
auto client = fluvio_connect();
auto producer = create_producer(*client, "my-topic");
auto client = Fluvio::connect();
auto producer = client->topic_producer("my-topic");

std::string payload = "FOOBAR";
uint8_t key[] = {};

producer_send(*producer,
auto out = producer->send(
rust::Slice<const uint8_t>(key, 0),
rust::Slice<const uint8_t>(reinterpret_cast<const uint8_t*>(payload.data()), payload.size())
);

producer_flush(*producer);
auto meta = out->wait();
(void)meta;
producer->flush();
return 0;
}
```
Expand All @@ -74,16 +78,13 @@ int main() {
#include <iostream>

int main() {
auto client = fluvio_connect();
auto consumer = partition_consumer(*client, "my-topic", 0);
auto stream = consumer_stream(*consumer, 0); // Offset::beginning
auto client = Fluvio::connect();
auto stream = client->consumer_stream("my-topic", 0, 0); // Offset::beginning

for (int i = 0; i < 1; i++) {
auto rec = stream_next(*stream);
auto val = record_value(*rec);
std::string payload(val.begin(), val.end());
std::cout << payload << std::endl;
}
auto rec = stream->next();
auto val = rec->value();
std::string payload(val.begin(), val.end());
std::cout << payload << std::endl;

return 0;
}
Expand All @@ -108,4 +109,5 @@ cmake -B build
cmake --build build
cd build
ctest --output-on-failure
cd ..
```
17 changes: 14 additions & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ Before building the examples, ensure you have the following:

## What's Included?

We provide two simple applications to demonstrate the core features of the client:
We provide two simple applications to demonstrate the current object-oriented client API:

- **Producer (`producer.cpp`)**: Connects to the Fluvio cluster as an admin to ensure a topic named `example-topic` exists. It then creates a producer and sends a mock JSON payload representing sensor data.
- **Consumer (`consumer.cpp`)**: Connects to the Fluvio cluster, opens a stream on `example-topic`, and parses the incoming JSON data using `nlohmann::json`.
- **Producer (`producer.cpp`)**: Uses `FluvioAdmin::connect()` to ensure a topic named `example-topic` exists, then uses `Fluvio::connect()` and `client->topic_producer(...)` to send a JSON payload.
- **Consumer (`consumer.cpp`)**: Uses `Fluvio::connect()` and `client->consumer_stream(...)` to read from `example-topic`, then parses the incoming JSON data with `nlohmann::json`.

## Building the Examples

Expand Down Expand Up @@ -72,3 +72,14 @@ Parsed JSON successfully: Sensor=temp-01 Value=24.5
```

Congratulations! You've successfully streamed data using C++!

## API Mapping

The examples mirror the types exported from `src/lib.rs`:

- `FluvioAdmin::connect()` creates an admin client.
- `Fluvio::connect()` creates the main client.
- `client->topic_producer(...)` returns a `TopicProducerPool` for sending records.
- `client->consumer_stream(...)` returns a `FluvioStream` for receiving records.
- `record->value()` exposes the fetched payload bytes.

17 changes: 7 additions & 10 deletions examples/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,21 @@ int main() {
try {
fmt::print("Starting Fluvio Consumer Example...\n");

auto client = fluvio_connect();
auto consumer = partition_consumer(*client, "example-topic", 0);

auto stream = consumer_stream(*consumer, 0); // Offset::beginning()
auto client = Fluvio::connect();
auto stream = client->consumer_stream("example-topic", 0, 0);

fmt::print("Waiting for messages...\n");

// Fetch one record
auto rec = stream_next(*stream);
auto val = record_value(*rec);

auto rec = stream->next();
auto val = rec->value();

std::string payload(val.begin(), val.end());
fmt::print("Received Raw Bytes: {}\n", payload);

try {
json j = json::parse(payload);
fmt::print("Parsed JSON successfully: Sensor={} Value={}\n",
j["sensor"].get<std::string>(),
fmt::print("Parsed JSON successfully: Sensor={} Value={}\n",
j["sensor"].get<std::string>(),
j["value"].get<double>());
} catch (const json::parse_error& e) {
fmt::print(stderr, "Failed to parse JSON: {}\n", e.what());
Expand Down
16 changes: 8 additions & 8 deletions examples/producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@ int main() {
try {
fmt::print("Starting Fluvio Producer Example...\n");

auto admin = fluvio_admin_connect();
auto admin = FluvioAdmin::connect();
try {
admin_create_topic(*admin, "example-topic", 1, 1);
admin->create_topic("example-topic", 1, 1);
fmt::print("Created 'example-topic'.\n");
} catch (...) {
fmt::print("'example-topic' already exists or creation failed.\n");
}

auto client = fluvio_connect();
auto producer = create_producer(*client, "example-topic");
auto client = Fluvio::connect();
auto producer = client->topic_producer("example-topic");

// Create a JSON payload
json j = {
{"sensor", "temp-01"},
{"value", 24.5},
Expand All @@ -31,13 +30,14 @@ int main() {
fmt::print("Sending JSON: {}\n", payload);

uint8_t key[] = {'j', 's', 'o', 'n'};
auto out = producer_send(*producer,
auto out = producer->send(
rust::Slice<const uint8_t>(key, sizeof(key)),
rust::Slice<const uint8_t>(reinterpret_cast<const uint8_t*>(payload.data()), payload.size())
);

auto meta = produce_output_wait(*out);
producer_flush(*producer);
auto meta = out->wait();
(void)meta;
producer->flush();

fmt::print("Record successfully sent to Fluvio!\n");

Expand Down
28 changes: 16 additions & 12 deletions src/admin.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use fluvio::FluvioAdmin;
use fluvio::FluvioAdmin as FluvioAdminNative;
use fluvio_sc_schema::topic::TopicSpec;
use fluvio_future::task::run_block_on;

pub struct FluvioAdminClient { pub inner: FluvioAdmin }
pub struct FluvioAdmin { pub inner: FluvioAdminNative }

pub fn fluvio_admin_connect() -> Result<Box<FluvioAdminClient>, String> {
run_block_on(FluvioAdmin::connect()).map(|a| Box::new(FluvioAdminClient { inner: a })).map_err(|e| e.to_string())
}

pub fn admin_create_topic(admin: &FluvioAdminClient, topic: &str, partitions: i32, replicas: i32) -> Result<(), String> {
run_block_on(admin.inner.create(topic.to_string(), false, TopicSpec::new_computed(partitions as u32, replicas as u32, None)))
.map_err(|e| e.to_string())
}
impl FluvioAdmin {
pub fn connect() -> Result<Box<FluvioAdmin>, String> {
run_block_on(FluvioAdminNative::connect()).map(|a| Box::new(FluvioAdmin { inner: a })).map_err(|e| e.to_string())
}

pub fn create_topic(self: &Self, topic: &str, partitions: i32, replicas: i32) -> Result<(), String> {
run_block_on(self.inner.create(topic.to_string(), false, TopicSpec::new_computed(partitions as u32, replicas as u32, None)))
.map_err(|e| e.to_string())
}

pub fn delete_topic(self: &Self, topic: &str) -> Result<(), String> {
run_block_on(self.inner.delete::<TopicSpec>(topic.to_string()))
.map_err(|e| e.to_string())
}

pub fn admin_delete_topic(admin: &FluvioAdminClient, topic: &str) -> Result<(), String> {
run_block_on(admin.inner.delete::<TopicSpec>(topic.to_string()))
.map_err(|e| e.to_string())
}
Loading
Loading