From 068ce80b8ddc5baa9cfe6daa0fde3d5bc776c3cd Mon Sep 17 00:00:00 2001 From: KunoVonHagen <41991675+KunoVonHagen@users.noreply.github.com> Date: Sun, 31 May 2026 16:20:57 +0200 Subject: [PATCH 1/4] Updated API to object-oriented design --- src/admin.rs | 28 ++++++---- src/c_api.rs | 82 ++++++++++++++------------- src/client.rs | 60 ++++++++++++++++---- src/config.rs | 127 ++++++++++++++++++++++++------------------ src/consumer.rs | 53 +++++++++--------- src/lib.rs | 123 ++++++++++++++++++++++++++-------------- src/produce_output.rs | 31 +++++++---- src/producer.rs | 40 +++++-------- 8 files changed, 319 insertions(+), 225 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index 7708937..920aad0 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -1,19 +1,23 @@ -use fluvio::FluvioAdmin; +use fluvio::FluvioAdmin as FluvioAdminNative; use fluvio_sc_schema::topic::TopicSpec; use fluvio_future::task::run_block_on; -pub struct FluvioAdminClient { pub inner: FluvioAdmin } +pub struct FluvioAdmin { pub inner: FluvioAdminNative } -pub fn fluvio_admin_connect() -> Result, String> { - run_block_on(FluvioAdmin::connect()).map(|a| Box::new(FluvioAdminClient { inner: a })).map_err(|e| e.to_string()) -} -pub fn admin_create_topic(admin: &FluvioAdminClient, topic: &str, partitions: i32, replicas: i32) -> Result<(), String> { - run_block_on(admin.inner.create(topic.to_string(), false, TopicSpec::new_computed(partitions as u32, replicas as u32, None))) - .map_err(|e| e.to_string()) -} +impl FluvioAdmin { + pub fn connect() -> Result, String> { + run_block_on(FluvioAdminNative::connect()).map(|a| Box::new(FluvioAdmin { inner: a })).map_err(|e| e.to_string()) + } + + pub fn create_topic(self: &Self, topic: &str, partitions: i32, replicas: i32) -> Result<(), String> { + run_block_on(self.inner.create(topic.to_string(), false, TopicSpec::new_computed(partitions as u32, replicas as u32, None))) + .map_err(|e| e.to_string()) + } + + pub fn delete_topic(self: &Self, topic: &str) -> Result<(), String> { + run_block_on(self.inner.delete::(topic.to_string())) + .map_err(|e| e.to_string()) + } -pub fn admin_delete_topic(admin: &FluvioAdminClient, topic: &str) -> Result<(), String> { - run_block_on(admin.inner.delete::(topic.to_string())) - .map_err(|e| e.to_string()) } diff --git a/src/c_api.rs b/src/c_api.rs index f26e26d..7a81e68 100644 --- a/src/c_api.rs +++ b/src/c_api.rs @@ -1,8 +1,10 @@ -use crate::client::{FluvioClient, fluvio_connect}; -use crate::producer::{FluvioProducer, create_producer, producer_send, producer_flush}; -use crate::consumer::{FluvioStream, FluvioRecord, consumer_stream, stream_next}; -use crate::produce_output::{FluvioProduceOutput, produce_output_wait}; -use crate::config::{FluvioConfigWrapper, fluvio_config_load}; +use crate::{ + client::Fluvio, + producer::TopicProducerPool, + consumer::{FluvioStream, Record}, + produce_output::ProduceOutput, + config::FluvioConfig +}; use std::os::raw::c_char; use std::ffi::CStr; @@ -12,8 +14,8 @@ pub struct fluvio_config_t { } #[unsafe(no_mangle)] -pub extern "C" fn fluvio_c_connect(out_client: *mut *mut FluvioClient) -> i32 { - match fluvio_connect() { +pub extern "C" fn fluvio_c_connect(out_client: *mut *mut Fluvio) -> i32 { + match Fluvio::connect() { Ok(client) => { unsafe { *out_client = Box::into_raw(client); } 0 @@ -23,10 +25,10 @@ pub extern "C" fn fluvio_c_connect(out_client: *mut *mut FluvioClient) -> i32 { } #[unsafe(no_mangle)] -pub unsafe extern "C" fn fluvio_c_connect_with_config(config: *mut fluvio_config_t, out_client: *mut *mut FluvioClient) -> i32 { +pub unsafe extern "C" fn fluvio_c_connect_with_config(config: *mut fluvio_config_t, out_client: *mut *mut Fluvio) -> i32 { if config.is_null() || out_client.is_null() { return -1; } - let config_wrapper = &mut *(config as *mut FluvioConfigWrapper); - match crate::client::fluvio_connect_with_config(config_wrapper) { + let config_wrapper = &mut *(config as *mut FluvioConfig); + match Fluvio::connect_with_config(config_wrapper) { Ok(client) => { unsafe { *out_client = Box::into_raw(client); } 0 @@ -36,57 +38,57 @@ pub unsafe extern "C" fn fluvio_c_connect_with_config(config: *mut fluvio_config } #[unsafe(no_mangle)] -pub extern "C" fn fluvio_c_client_free(client: *mut FluvioClient) { +pub extern "C" fn fluvio_c_client_free(client: *mut Fluvio) { if !client.is_null() { unsafe { let _ = Box::from_raw(client); } } } #[unsafe(no_mangle)] -pub extern "C" fn fluvio_c_create_producer(client: *mut FluvioClient, topic: *const c_char, out_producer: *mut *mut FluvioProducer) -> i32 { +pub extern "C" fn fluvio_c_create_producer(client: *mut Fluvio, topic: *const c_char, out_producer: *mut *mut TopicProducerPool) -> i32 { if client.is_null() || topic.is_null() || out_producer.is_null() { return -1; } let topic_str = unsafe { CStr::from_ptr(topic).to_str() }.unwrap_or(""); - match create_producer(unsafe { &*client }, topic_str) { + match Fluvio::topic_producer(unsafe { &*client }, topic_str) { Ok(producer) => { unsafe { *out_producer = Box::into_raw(producer); } 0 } Err(_) => -1, } } #[unsafe(no_mangle)] -pub extern "C" fn fluvio_c_producer_send(producer: *mut FluvioProducer, key: *const u8, key_len: usize, val: *const u8, val_len: usize, out: *mut *mut FluvioProduceOutput) -> i32 { +pub extern "C" fn fluvio_c_producer_send(producer: *mut TopicProducerPool, key: *const u8, key_len: usize, val: *const u8, val_len: usize, out: *mut *mut ProduceOutput) -> i32 { if producer.is_null() || (key.is_null() && key_len > 0) || (val.is_null() && val_len > 0) { return -1; } let key_slice = if key_len > 0 { unsafe { std::slice::from_raw_parts(key, key_len) } } else { &[] }; let val_slice = if val_len > 0 { unsafe { std::slice::from_raw_parts(val, val_len) } } else { &[] }; - match producer_send(unsafe { &*producer }, key_slice, val_slice) { + match TopicProducerPool::send(unsafe { &*producer }, key_slice, val_slice) { Ok(o) => { if !out.is_null() { unsafe { *out = Box::into_raw(o); } } 0 } Err(_) => -1, } } #[unsafe(no_mangle)] -pub extern "C" fn fluvio_c_produce_output_wait(out: *mut FluvioProduceOutput) -> i32 { +pub extern "C" fn fluvio_c_produce_output_wait(out: *mut ProduceOutput) -> i32 { if out.is_null() { return -1; } - match produce_output_wait(unsafe { &mut *out }) { Ok(_) => 0, Err(_) => -1 } + match ProduceOutput::wait(unsafe { &mut *out }) { Ok(_) => 0, Err(_) => -1 } } #[unsafe(no_mangle)] -pub extern "C" fn fluvio_c_producer_flush(producer: *mut FluvioProducer) -> i32 { +pub extern "C" fn fluvio_c_producer_flush(producer: *mut TopicProducerPool) -> i32 { if producer.is_null() { return -1; } - match producer_flush(unsafe { &*producer }) { Ok(_) => 0, Err(_) => -1 } + match TopicProducerPool::flush(unsafe { &*producer }) { Ok(_) => 0, Err(_) => -1 } } #[unsafe(no_mangle)] -pub extern "C" fn fluvio_c_producer_free(producer: *mut FluvioProducer) { +pub extern "C" fn fluvio_c_producer_free(producer: *mut TopicProducerPool) { if !producer.is_null() { unsafe { let _ = Box::from_raw(producer); } } } #[unsafe(no_mangle)] -pub extern "C" fn fluvio_c_produce_output_free(out: *mut FluvioProduceOutput) { +pub extern "C" fn fluvio_c_produce_output_free(out: *mut ProduceOutput) { if !out.is_null() { unsafe { let _ = Box::from_raw(out); } } } #[unsafe(no_mangle)] pub extern "C" fn fluvio_c_config_load(out_config: *mut *mut fluvio_config_t) -> i32 { if out_config.is_null() { return -1; } - match fluvio_config_load() { + match FluvioConfig::load() { Ok(config) => { unsafe { *out_config = Box::into_raw(config) as *mut fluvio_config_t; } 0 } Err(_) => -1, } @@ -95,38 +97,38 @@ pub extern "C" fn fluvio_c_config_load(out_config: *mut *mut fluvio_config_t) -> #[unsafe(no_mangle)] pub unsafe extern "C" fn fluvio_c_config_set_endpoint(config: *mut fluvio_config_t, endpoint: *const std::ffi::c_char) { if config.is_null() || endpoint.is_null() { return; } - let config_wrapper = &mut *(config as *mut FluvioConfigWrapper); + let config_wrapper = &mut *(config as *mut FluvioConfig); let ep_str = std::ffi::CStr::from_ptr(endpoint).to_string_lossy(); - crate::config::fluvio_config_set_endpoint(config_wrapper, &ep_str); + config_wrapper.set_endpoint(&ep_str); } #[unsafe(no_mangle)] pub unsafe extern "C" fn fluvio_c_config_set_client_id(config: *mut fluvio_config_t, client_id: *const std::ffi::c_char) { if config.is_null() || client_id.is_null() { return; } - let config_wrapper = &mut *(config as *mut FluvioConfigWrapper); + let config_wrapper = &mut *(config as *mut FluvioConfig); let client_id_str = std::ffi::CStr::from_ptr(client_id).to_string_lossy(); - crate::config::fluvio_config_set_client_id(config_wrapper, &client_id_str); + config_wrapper.set_client_id(&client_id_str); } #[unsafe(no_mangle)] pub unsafe extern "C" fn fluvio_c_config_disable_tls(config: *mut fluvio_config_t) { if config.is_null() { return; } - let config_wrapper = &mut *(config as *mut FluvioConfigWrapper); - crate::config::fluvio_config_disable_tls(config_wrapper); + let config_wrapper = &mut *(config as *mut FluvioConfig); + config_wrapper.disable_tls(); } #[unsafe(no_mangle)] pub unsafe extern "C" fn fluvio_c_config_set_anonymous_tls(config: *mut fluvio_config_t) { if config.is_null() { return; } - let config_wrapper = &mut *(config as *mut FluvioConfigWrapper); - crate::config::fluvio_config_set_anonymous_tls(config_wrapper); + let config_wrapper = &mut *(config as *mut FluvioConfig); + config_wrapper.set_anonymous_tls(); } #[unsafe(no_mangle)] pub unsafe extern "C" fn fluvio_c_config_set_inline_tls(config: *mut fluvio_config_t, domain: *const std::ffi::c_char, key: *const std::ffi::c_char, cert: *const std::ffi::c_char, ca_cert: *const std::ffi::c_char) { if config.is_null() || domain.is_null() || key.is_null() || cert.is_null() || ca_cert.is_null() { return; } - let config_wrapper = &mut *(config as *mut FluvioConfigWrapper); - crate::config::fluvio_config_set_inline_tls(config_wrapper, + let config_wrapper = &mut *(config as *mut FluvioConfig); + config_wrapper.set_inline_tls( &std::ffi::CStr::from_ptr(domain).to_string_lossy(), &std::ffi::CStr::from_ptr(key).to_string_lossy(), &std::ffi::CStr::from_ptr(cert).to_string_lossy(), @@ -137,8 +139,8 @@ pub unsafe extern "C" fn fluvio_c_config_set_inline_tls(config: *mut fluvio_conf #[unsafe(no_mangle)] pub unsafe extern "C" fn fluvio_c_config_set_tls_file_paths(config: *mut fluvio_config_t, domain: *const std::ffi::c_char, key_path: *const std::ffi::c_char, cert_path: *const std::ffi::c_char, ca_cert_path: *const std::ffi::c_char) { if config.is_null() || domain.is_null() || key_path.is_null() || cert_path.is_null() || ca_cert_path.is_null() { return; } - let config_wrapper = &mut *(config as *mut FluvioConfigWrapper); - crate::config::fluvio_config_set_tls_file_paths(config_wrapper, + let config_wrapper = &mut *(config as *mut FluvioConfig); + config_wrapper.set_tls_file_paths( &std::ffi::CStr::from_ptr(domain).to_string_lossy(), &std::ffi::CStr::from_ptr(key_path).to_string_lossy(), &std::ffi::CStr::from_ptr(cert_path).to_string_lossy(), @@ -147,26 +149,26 @@ pub unsafe extern "C" fn fluvio_c_config_set_tls_file_paths(config: *mut fluvio_ } #[unsafe(no_mangle)] -pub extern "C" fn fluvio_c_consumer_stream(client: *mut FluvioClient, topic: *const c_char, partition: u32, offset_index: i64, out_stream: *mut *mut FluvioStream) -> i32 { +pub extern "C" fn fluvio_c_consumer_stream(client: *mut Fluvio, topic: *const c_char, partition: u32, offset_index: i64, out_stream: *mut *mut FluvioStream) -> i32 { if client.is_null() || topic.is_null() || out_stream.is_null() { return -1; } let topic_str = unsafe { CStr::from_ptr(topic).to_str() }.unwrap_or(""); - match consumer_stream(unsafe { &*client }, topic_str, partition, offset_index) { + match Fluvio::consumer_stream(unsafe { &*client }, topic_str, partition, offset_index) { Ok(stream) => { unsafe { *out_stream = Box::into_raw(stream); } 0 } Err(_) => -1, } } #[unsafe(no_mangle)] -pub extern "C" fn fluvio_c_stream_next(stream: *mut FluvioStream, out_record: *mut *mut FluvioRecord) -> i32 { +pub extern "C" fn fluvio_c_stream_next(stream: *mut FluvioStream, out_record: *mut *mut Record) -> i32 { if stream.is_null() || out_record.is_null() { return -1; } - match stream_next(unsafe { &mut *stream }) { + match FluvioStream::next(unsafe { &mut *stream }) { Ok(record) => { unsafe { *out_record = Box::into_raw(record); } 0 } Err(_) => -1, } } #[unsafe(no_mangle)] -pub extern "C" fn fluvio_c_record_value(record: *mut FluvioRecord, out_buf: *mut *const u8, out_len: *mut usize) -> i32 { +pub extern "C" fn fluvio_c_record_value(record: *mut Record, out_buf: *mut *const u8, out_len: *mut usize) -> i32 { if record.is_null() || out_buf.is_null() || out_len.is_null() { return -1; } let val = unsafe { &*record }.inner.value(); unsafe { *out_buf = val.as_ptr(); *out_len = val.len(); } @@ -174,7 +176,7 @@ pub extern "C" fn fluvio_c_record_value(record: *mut FluvioRecord, out_buf: *mut } #[unsafe(no_mangle)] -pub extern "C" fn fluvio_c_record_free(rec: *mut FluvioRecord) { +pub extern "C" fn fluvio_c_record_free(rec: *mut Record) { if !rec.is_null() { unsafe { let _ = Box::from_raw(rec); } } } diff --git a/src/client.rs b/src/client.rs index d53a840..6cfbf4a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,17 +1,53 @@ -use fluvio::Fluvio; +use fluvio::{ + Fluvio as FluvioNative, + Offset as OffsetNative, + consumer::ConsumerConfigExtBuilder as ConsumerConfigExtBuilderNative +}; use fluvio_future::task::run_block_on; -use crate::config::FluvioConfigWrapper; +use crate::config::FluvioConfig; +use crate::{FluvioStream, TopicProducerConfigBuilder, TopicProducerPool}; -pub struct FluvioClient { pub inner: Fluvio } +pub struct Fluvio { pub inner: FluvioNative } -pub fn fluvio_connect() -> Result, String> { - run_block_on(Fluvio::connect()) - .map(|fluvio| Box::new(FluvioClient { inner: fluvio })) - .map_err(|e| e.to_string()) -} -pub fn fluvio_connect_with_config(config: &FluvioConfigWrapper) -> Result, String> { - run_block_on(Fluvio::connect_with_config(&config.inner)) - .map(|fluvio| Box::new(FluvioClient { inner: fluvio })) - .map_err(|e| e.to_string()) +impl Fluvio { + pub fn connect() -> Result, String> { + run_block_on(FluvioNative::connect()) + .map(|fluvio| Box::new(Fluvio { inner: fluvio })) + .map_err(|e| e.to_string()) + } + + pub fn connect_with_config(config: &FluvioConfig) -> Result, String> { + run_block_on(FluvioNative::connect_with_config(&config.inner)) + .map(|fluvio| Box::new(Fluvio { inner: fluvio })) + .map_err(|e| e.to_string()) + } + pub fn topic_producer(self: &Self, topic: &str) -> Result, String> { + run_block_on(self.inner.topic_producer(topic)) + .map(|producer| Box::new(TopicProducerPool { inner: producer })) + .map_err(|e| e.to_string()) + } + + pub fn topic_producer_with_config(self: &Self, topic: &str, config: &TopicProducerConfigBuilder) -> Result, String> { + let built_config = config.inner.build().map_err(|e| e.to_string())?; + run_block_on(self.inner.topic_producer_with_config(topic, built_config)) + .map(|producer| Box::new(TopicProducerPool { inner: producer })) + .map_err(|e| e.to_string()) + } + + pub fn consumer_stream(self: &Self, topic: &str, partition: u32, offset_index: i64) -> Result, String> { + let offset = if offset_index == -1 { OffsetNative::end() } else if offset_index == 0 { OffsetNative::beginning() } else { OffsetNative::absolute(offset_index).unwrap() }; + let config = ConsumerConfigExtBuilderNative::default() + .topic(topic.to_string()) + .partition(partition) + .offset_start(offset) + .build() + .map_err(|e| e.to_string())?; + + let consumer_stream = run_block_on(self.inner.consumer_with_config(config)).map_err(|e| e.to_string())?; + Ok(Box::new(FluvioStream { inner: Box::pin(consumer_stream) })) + } } + + + diff --git a/src/config.rs b/src/config.rs index 65a984f..a18d223 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,73 +1,90 @@ -use fluvio::FluvioConfig; -use fluvio::consumer::ConsumerConfigBuilder; -use fluvio::TopicProducerConfigBuilder; +use std::ops::DerefMut; +use fluvio::FluvioConfig as FluvioConfigNative; +use fluvio::consumer::ConsumerConfigBuilder as ConsumerConfigBuilderNative; +use fluvio::TopicProducerConfigBuilder as TopicProducerConfigBuilderNative; -pub struct FluvioConfigWrapper { pub inner: FluvioConfig } -pub struct ConsumerConfigWrapper { pub inner: ConsumerConfigBuilder } -pub struct ProducerConfigWrapper { pub inner: TopicProducerConfigBuilder } +pub struct ConsumerConfigBuilder { pub inner: ConsumerConfigBuilderNative } +pub struct TopicProducerConfigBuilder { pub inner: TopicProducerConfigBuilderNative } +pub struct FluvioConfig { pub inner: FluvioConfigNative } -pub fn producer_config_new() -> Box { - Box::new(ProducerConfigWrapper { inner: TopicProducerConfigBuilder::default() }) +impl TopicProducerConfigBuilder { + pub fn create() -> Box { + Box::new(TopicProducerConfigBuilder { inner: TopicProducerConfigBuilderNative::default() }) + } + pub fn batch_size(self: &mut Self, size: usize) -> Box { + self.inner.batch_size(size); + Box::new(TopicProducerConfigBuilder { inner: self.inner.clone() }) + } + pub fn linger(self: &mut Self, linger: u64) -> Box { + self.inner.linger(std::time::Duration::from_millis(linger)); + Box::new(TopicProducerConfigBuilder { inner: self.inner.clone() }) + } } -pub fn consumer_config_new() -> Box { - Box::new(ConsumerConfigWrapper { inner: ConsumerConfigBuilder::default() }) -} +impl ConsumerConfigBuilder { + pub fn create() -> Box { + Box::new(ConsumerConfigBuilder { inner: ConsumerConfigBuilderNative::default() }) + } + pub fn max_bytes(self: &mut Self, max: i32) -> Box { + self.inner.max_bytes(max); + Box::new(ConsumerConfigBuilder { inner: self.inner.clone() }) + } + pub fn disable_continuous(self: &mut Self, val: bool) -> Box { + self.inner.disable_continuous(val); + Box::new(ConsumerConfigBuilder { inner: self.inner.clone() }) + } -pub fn consumer_config_max_bytes(c: &mut ConsumerConfigWrapper, max: i32) { - c.inner.max_bytes(max); } -pub fn consumer_config_disable_continuous(c: &mut ConsumerConfigWrapper, val: bool) { - c.inner.disable_continuous(val); +impl FluvioConfig { + pub fn create(addr: &str) -> Box { + Box::new(FluvioConfig { inner: FluvioConfigNative::new(addr) }) + } + pub fn load() -> Result, String> { + FluvioConfigNative::load().map(|c| Box::new(FluvioConfig { inner: c })).map_err(|e| e.to_string()) + } + pub fn set_endpoint(self: &mut Self, endpoint: &str) { + self.inner.endpoint = endpoint.to_string(); + } + pub fn set_client_id(self: &mut Self, client_id: &str) { + self.inner.client_id = Some(client_id.to_string()); + } + pub fn disable_tls(self: &mut Self) { + self.inner.tls = fluvio::config::TlsPolicy::Disabled; + } + pub fn set_anonymous_tls(self: &mut Self) { + self.inner.tls = fluvio::config::TlsPolicy::Anonymous; + } + pub fn set_inline_tls(self: &mut Self, domain: &str, key: &str, cert: &str, ca_cert: &str) { + self.inner.tls = fluvio::config::TlsPolicy::Verified(fluvio::config::TlsConfig::Inline(fluvio::config::TlsCerts { + domain: domain.to_string(), + key: key.to_string(), + cert: cert.to_string(), + ca_cert: ca_cert.to_string(), + })); + } + pub fn set_tls_file_paths(self: &mut Self, domain: &str, key_path: &str, cert_path: &str, ca_cert_path: &str) { + self.inner.tls = fluvio::config::TlsPolicy::Verified(fluvio::config::TlsConfig::Files(fluvio::config::TlsPaths { + domain: domain.to_string(), + key: std::path::PathBuf::from(key_path), + cert: std::path::PathBuf::from(cert_path), + ca_cert: std::path::PathBuf::from(ca_cert_path), + })); + } } -pub fn producer_config_batch_size(c: &mut ProducerConfigWrapper, size: usize) { - c.inner.batch_size(size); -} -pub fn producer_config_linger(c: &mut ProducerConfigWrapper, linger: u64) { - c.inner.linger(std::time::Duration::from_millis(linger)); -} -pub fn fluvio_config_new(addr: &str) -> Box { - Box::new(FluvioConfigWrapper { inner: FluvioConfig::new(addr) }) -} -pub fn fluvio_config_load() -> Result, String> { - FluvioConfig::load().map(|c| Box::new(FluvioConfigWrapper { inner: c })).map_err(|e| e.to_string()) -} -pub fn fluvio_config_set_endpoint(c: &mut FluvioConfigWrapper, endpoint: &str) { - c.inner.endpoint = endpoint.to_string(); -} -pub fn fluvio_config_set_client_id(c: &mut FluvioConfigWrapper, client_id: &str) { - c.inner.client_id = Some(client_id.to_string()); -} -pub fn fluvio_config_disable_tls(c: &mut FluvioConfigWrapper) { - c.inner.tls = fluvio::config::TlsPolicy::Disabled; -} -pub fn fluvio_config_set_anonymous_tls(c: &mut FluvioConfigWrapper) { - c.inner.tls = fluvio::config::TlsPolicy::Anonymous; -} -pub fn fluvio_config_set_inline_tls(c: &mut FluvioConfigWrapper, domain: &str, key: &str, cert: &str, ca_cert: &str) { - c.inner.tls = fluvio::config::TlsPolicy::Verified(fluvio::config::TlsConfig::Inline(fluvio::config::TlsCerts { - domain: domain.to_string(), - key: key.to_string(), - cert: cert.to_string(), - ca_cert: ca_cert.to_string(), - })); -} -pub fn fluvio_config_set_tls_file_paths(c: &mut FluvioConfigWrapper, domain: &str, key_path: &str, cert_path: &str, ca_cert_path: &str) { - c.inner.tls = fluvio::config::TlsPolicy::Verified(fluvio::config::TlsConfig::Files(fluvio::config::TlsPaths { - domain: domain.to_string(), - key: std::path::PathBuf::from(key_path), - cert: std::path::PathBuf::from(cert_path), - ca_cert: std::path::PathBuf::from(ca_cert_path), - })); -} + + + + + + diff --git a/src/consumer.rs b/src/consumer.rs index 7c920e9..041a1f7 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -1,45 +1,42 @@ -use fluvio::{Offset, consumer::Record as NativeRecord, consumer::ConsumerConfigExtBuilder}; +use fluvio::{ + Offset as OffsetNative, + consumer::Record as NativeRecord, + consumer::ConsumerConfigExtBuilder as ConsumerConfigExtBuilderNative, +}; use fluvio_future::task::run_block_on; use futures_util::stream::StreamExt; use futures_util::stream::Stream; use std::pin::Pin; use fluvio::dataplane::link::ErrorCode; -use crate::client::FluvioClient; +use crate::client::Fluvio; -pub struct FluvioRecord { pub inner: NativeRecord } +pub struct Record { pub inner: NativeRecord } type ConsumerStreamInner = Pin> + Send>>; pub struct FluvioStream { pub inner: ConsumerStreamInner } -pub fn consumer_stream(client: &FluvioClient, topic: &str, partition: u32, offset_index: i64) -> Result, String> { - let offset = if offset_index == -1 { Offset::end() } else if offset_index == 0 { Offset::beginning() } else { Offset::absolute(offset_index).unwrap() }; - let config = ConsumerConfigExtBuilder::default() - .topic(topic.to_string()) - .partition(partition) - .offset_start(offset) - .build() - .map_err(|e| e.to_string())?; - - let consumer_stream = run_block_on(client.inner.consumer_with_config(config)).map_err(|e| e.to_string())?; - Ok(Box::new(FluvioStream { inner: Box::pin(consumer_stream) })) +impl FluvioStream { + pub fn next(self: &mut Self) -> Result, String> { + match run_block_on(self.inner.next()) { + Some(Ok(rec)) => Ok(Box::new(Record { inner: rec })), + Some(Err(e)) => Err(e.to_string()), + None => Err("EOF".to_string()), + } + } } -pub fn stream_next(stream: &mut FluvioStream) -> Result, String> { - match run_block_on(stream.inner.next()) { - Some(Ok(rec)) => Ok(Box::new(FluvioRecord { inner: rec })), - Some(Err(e)) => Err(e.to_string()), - None => Err("EOF".to_string()), +impl Record { + pub fn value(self: &Self) -> Vec { + self.inner.value().iter().cloned().collect() + } + pub fn key(self: &Self) -> Vec { + self.inner.key().map(|k| k.iter().cloned().collect()).unwrap_or_default() + } + pub fn offset(self: &Self) -> i64 { + self.inner.offset() } } -pub fn record_value(record: &FluvioRecord) -> Vec { - record.inner.value().iter().cloned().collect() -} -pub fn record_key(record: &FluvioRecord) -> Vec { - record.inner.key().map(|k| k.iter().cloned().collect()).unwrap_or_default() -} -pub fn record_offset(record: &FluvioRecord) -> i64 { - record.inner.offset() -} + diff --git a/src/lib.rs b/src/lib.rs index 3a85b10..4d45b6d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,78 +9,119 @@ pub mod c_api; #[cxx::bridge] mod ffi { extern "Rust" { - type FluvioClient; - type FluvioProducer; + type Fluvio; + type TopicProducerPool; type FluvioStream; - type FluvioRecord; + type Record; - type FluvioConfigWrapper; - type ConsumerConfigWrapper; - type ProducerConfigWrapper; - type FluvioProduceOutput; - type FluvioRecordMetadata; - type FluvioAdminClient; + type FluvioConfig; + type ConsumerConfigBuilder; + type TopicProducerConfigBuilder; + type ProduceOutput; + type RecordMetadata; + type FluvioAdmin; /// Connects to a Fluvio cluster - fn fluvio_connect() -> Result>; + #[Self = "Fluvio"] + fn connect() -> Result>; + /// Connects to a Fluvio cluster with explicit config - fn fluvio_connect_with_config(config: &FluvioConfigWrapper) -> Result>; + #[Self = "Fluvio"] + fn connect_with_config(config: &FluvioConfig) -> Result>; + + /// Creates a producer for the specified topic + fn topic_producer(self: &Fluvio, topic: &str) -> Result>; + + /// Creates a producer for the specified topic with custom configuration + fn topic_producer_with_config(self: &Fluvio, topic: &str, config: &TopicProducerConfigBuilder) -> Result>; + + /// Creates a continuous stream for the consumer starting from the given offset index (0=Beginning, -1=End) + fn consumer_stream(self: &Fluvio, topic: &str, partition: u32, offset_index: i64) -> Result>; + + + /// Creates a new topic producer configuration builder - fn producer_config_new() -> Box; + #[Self = "TopicProducerConfigBuilder"] + fn create() -> Box; + /// Sets the maximum batch size in bytes for the producer - fn producer_config_batch_size(c: &mut ProducerConfigWrapper, size: usize); + fn batch_size(self: &mut TopicProducerConfigBuilder, size: usize) -> Box; + /// Sets the linger time in milliseconds for the producer - fn producer_config_linger(c: &mut ProducerConfigWrapper, linger: u64); + fn linger(self: &mut TopicProducerConfigBuilder, linger: u64) -> Box; + + /// Creates a new consumer configuration builder - fn consumer_config_new() -> Box; + #[Self = "ConsumerConfigBuilder"] + fn create() -> Box; /// Sets the maximum bytes to fetch per request - fn consumer_config_max_bytes(c: &mut ConsumerConfigWrapper, max: i32); + fn max_bytes(self: &mut ConsumerConfigBuilder, max: i32) -> Box; /// Disables continuous fetching - fn consumer_config_disable_continuous(c: &mut ConsumerConfigWrapper, val: bool); + fn disable_continuous(self: &mut ConsumerConfigBuilder, val: bool) -> Box; + + /// Creates a new Fluvio cluster configuration with the specified endpoint - fn fluvio_config_new(addr: &str) -> Box; + #[Self = "FluvioConfig"] + fn create(addr: &str) -> Box; + /// Loads the Fluvio configuration from the default profile path - fn fluvio_config_load() -> Result>; + #[Self = "FluvioConfig"] + fn load() -> Result>; + /// Sets the endpoint for the cluster configuration - fn fluvio_config_set_endpoint(c: &mut FluvioConfigWrapper, endpoint: &str); + fn set_endpoint(self: &mut FluvioConfig, endpoint: &str); + /// Sets the client identifier for the cluster configuration - fn fluvio_config_set_client_id(c: &mut FluvioConfigWrapper, client_id: &str); - fn fluvio_config_disable_tls(c: &mut FluvioConfigWrapper); - fn fluvio_config_set_anonymous_tls(c: &mut FluvioConfigWrapper); - fn fluvio_config_set_inline_tls(c: &mut FluvioConfigWrapper, domain: &str, key: &str, cert: &str, ca_cert: &str); - fn fluvio_config_set_tls_file_paths(c: &mut FluvioConfigWrapper, domain: &str, key_path: &str, cert_path: &str, ca_cert_path: &str); + fn set_client_id(self: &mut FluvioConfig, client_id: &str); + + fn disable_tls(self: &mut FluvioConfig); + + fn set_anonymous_tls(self: &mut FluvioConfig); + + fn set_inline_tls(self: &mut FluvioConfig, domain: &str, key: &str, cert: &str, ca_cert: &str); + + fn set_tls_file_paths(self: &mut FluvioConfig, domain: &str, key_path: &str, cert_path: &str, ca_cert_path: &str); + + + - /// Creates a producer for the specified topic - fn create_producer(client: &FluvioClient, topic: &str) -> Result>; - /// Creates a producer for the specified topic with custom configuration - fn create_producer_with_config(client: &FluvioClient, topic: &str, config: &ProducerConfigWrapper) -> Result>; /// Sends a key-value record to the topic asynchronously - fn producer_send(producer: &FluvioProducer, key: &[u8], value: &[u8]) -> Result>; + fn send(self: &TopicProducerPool, key: &[u8], value: &[u8]) -> Result>; + /// Flushes the producer batches - fn producer_flush(producer: &FluvioProducer) -> Result<()>; + fn flush(self: &TopicProducerPool) -> Result<()>; + + + /// Blocks and waits for the producer record confirmation - fn produce_output_wait(output: &mut FluvioProduceOutput) -> Result>; + fn wait(self: &mut ProduceOutput) -> Result>; + + - /// Creates a continuous stream for the consumer starting from the given offset index (0=Beginning, -1=End) - fn consumer_stream(client: &FluvioClient, topic: &str, partition: u32, offset_index: i64) -> Result>; /// Retrieves the next record from the stream blocks until available - fn stream_next(stream: &mut FluvioStream) -> Result>; + fn next(self: &mut FluvioStream) -> Result>; + + + /// Retrieves the payload value byte array from a fetched record - fn record_value(record: &FluvioRecord) -> Vec; + fn value(self: &Record) -> Vec; /// Retrieves the key byte array from a fetched record - fn record_key(record: &FluvioRecord) -> Vec; + fn key(self: &Record) -> Vec; /// Retrieves the literal offset index of the fetched record - fn record_offset(record: &FluvioRecord) -> i64; + fn offset(self: &Record) -> i64; + + /// Connects to the Fluvio Administrative controller - fn fluvio_admin_connect() -> Result>; + #[Self = "FluvioAdmin"] + fn connect() -> Result>; /// Dispatches a command to create a new topic - fn admin_create_topic(admin: &FluvioAdminClient, topic: &str, partitions: i32, replicas: i32) -> Result<()>; + fn create_topic(self: &FluvioAdmin, topic: &str, partitions: i32, replicas: i32) -> Result<()>; /// Dispatches a command to violently delete a topic - fn admin_delete_topic(admin: &FluvioAdminClient, topic: &str) -> Result<()>; + fn delete_topic(self: &FluvioAdmin, topic: &str) -> Result<()>; } } diff --git a/src/produce_output.rs b/src/produce_output.rs index e4b8592..f236fbf 100644 --- a/src/produce_output.rs +++ b/src/produce_output.rs @@ -1,17 +1,24 @@ -use fluvio::{ProduceOutput as NativeProduceOutput, RecordMetadata as NativeRecordMetadata}; +use fluvio::{ + ProduceOutput as ProduceOutputNative, + RecordMetadata as RecordMetadataNative +}; use fluvio_future::task::run_block_on; -pub struct FluvioProduceOutput { pub inner: Option } -pub struct FluvioRecordMetadata { pub inner: NativeRecordMetadata } +pub struct ProduceOutput { pub inner: Option } +pub struct RecordMetadata { pub inner: RecordMetadataNative } -pub fn produce_output_wait(output: &mut FluvioProduceOutput) -> Result, String> { - let inner = output.inner.take(); - match inner { - Some(produce_output) => { - run_block_on(produce_output.wait()) - .map(|metadata| Box::new(FluvioRecordMetadata { inner: metadata })) - .map_err(|e| e.to_string()) - }, - None => Err("ProduceOutput already consumed".to_string()) + +impl ProduceOutput{ + pub fn wait(self: &mut Self) -> Result, String> { + let inner = self.inner.take(); + match inner { + Some(produce_output) => { + run_block_on(produce_output.wait()) + .map(|metadata| Box::new(RecordMetadata { inner: metadata })) + .map_err(|e| e.to_string()) + }, + None => Err("ProduceOutput already consumed".to_string()) + } } } + diff --git a/src/producer.rs b/src/producer.rs index bcd2941..b9fa8f2 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -1,32 +1,22 @@ -use fluvio::TopicProducerPool; +use fluvio::TopicProducerPool as TopicProducerPoolNative; use fluvio_future::task::run_block_on; -use crate::config::ProducerConfigWrapper; -use crate::client::FluvioClient; -use crate::produce_output::FluvioProduceOutput; +use crate::produce_output::ProduceOutput; -pub struct FluvioProducer { pub inner: TopicProducerPool } +pub struct TopicProducerPool { pub inner: TopicProducerPoolNative } -pub fn create_producer(client: &FluvioClient, topic: &str) -> Result, String> { - run_block_on(client.inner.topic_producer(topic)) - .map(|producer| Box::new(FluvioProducer { inner: producer })) - .map_err(|e| e.to_string()) -} +impl TopicProducerPool { + pub fn send(self: &Self, key: &[u8], value: &[u8]) -> Result, String> { + run_block_on(self.inner.send(key, value)) + .map(|out| Box::new(ProduceOutput { inner: Some(out) })) + .map_err(|e| e.to_string()) + } + pub fn flush(self: &Self) -> Result<(), String> { + run_block_on(self.inner.flush()) + .map(|_| ()) + .map_err(|e| e.to_string()) + } -pub fn create_producer_with_config(client: &FluvioClient, topic: &str, config: &ProducerConfigWrapper) -> Result, String> { - let built_config = config.inner.build().map_err(|e| e.to_string())?; - run_block_on(client.inner.topic_producer_with_config(topic, built_config)) - .map(|producer| Box::new(FluvioProducer { inner: producer })) - .map_err(|e| e.to_string()) } -pub fn producer_send(producer: &FluvioProducer, key: &[u8], value: &[u8]) -> Result, String> { - run_block_on(producer.inner.send(key, value)) - .map(|out| Box::new(FluvioProduceOutput { inner: Some(out) })) - .map_err(|e| e.to_string()) -} -pub fn producer_flush(producer: &FluvioProducer) -> Result<(), String> { - run_block_on(producer.inner.flush()) - .map(|_| ()) - .map_err(|e| e.to_string()) -} + From 38d0260d0d3d0345c8871dbaa669a350a9ff73a3 Mon Sep 17 00:00:00 2001 From: KunoVonHagen <41991675+KunoVonHagen@users.noreply.github.com> Date: Sun, 31 May 2026 16:21:21 +0200 Subject: [PATCH 2/4] Updated tests to object-oriented design --- tests/test_admin.cpp | 6 +++--- tests/test_auth.cpp | 20 ++++++++++---------- tests/test_config.cpp | 12 ++++++++---- tests/test_consumer.cpp | 8 ++++---- tests/test_producer.cpp | 17 +++++++++-------- 5 files changed, 34 insertions(+), 29 deletions(-) diff --git a/tests/test_admin.cpp b/tests/test_admin.cpp index 8b214cc..95c6efa 100644 --- a/tests/test_admin.cpp +++ b/tests/test_admin.cpp @@ -4,15 +4,15 @@ int main() { try { std::cout << "Test: Connecting to admin..." << std::endl; - auto admin = fluvio_admin_connect(); + auto admin = FluvioAdmin::connect(); std::cout << "Test: Creating topic 'admin-test-topic'..." << std::endl; try { - admin_create_topic(*admin, "admin-test-topic", 1, 1); + admin->create_topic("admin-test-topic", 1, 1); } catch (...) {} std::cout << "Test: Deleting topic 'admin-test-topic'..." << std::endl; - admin_delete_topic(*admin, "admin-test-topic"); + admin->delete_topic("admin-test-topic"); std::cout << "Admin Test Passed!" << std::endl; } catch (const std::exception& e) { diff --git a/tests/test_auth.cpp b/tests/test_auth.cpp index e82726c..1d317e5 100644 --- a/tests/test_auth.cpp +++ b/tests/test_auth.cpp @@ -11,36 +11,36 @@ int main() { const char* cert = std::getenv("FLUVIO_E2E_TLS_CERT"); const char* ca = std::getenv("FLUVIO_E2E_TLS_CA"); - auto fluvioConfig = fluvio_config_new("localhost:9003"); + auto fluvioConfig = FluvioConfig::create("localhost:9003"); if (domain && key && cert && ca) { std::cout << "[E2E-AUTH] Active TLS parameters detected! Configuring strict mTLS execution pipeline." << std::endl; - fluvio_config_set_tls_file_paths(*fluvioConfig, domain, key, cert, ca); + fluvioConfig->set_tls_file_paths(domain, key, cert, ca); } else { std::cout << "[E2E-AUTH] No TLS parameters detected in ENV. Proceeding with TLS-Disabled configuration checks." << std::endl; - fluvio_config_disable_tls(*fluvioConfig); + fluvioConfig->disable_tls(); std::cout << "CXX TLS Auth Object Creation Successfully Evaluated Offline." << std::endl; return 0; } std::cout << "[E2E-AUTH] Attempting live Fluvio Socket TLS Auth Connection..." << std::endl; - auto authenticatedClient = fluvio_connect_with_config(*fluvioConfig); + auto authenticatedClient = Fluvio::connect_with_config(*fluvioConfig); std::cout << "[E2E-AUTH] Successfully authenticated to cluster natively via TLS mTLS bindings!" << std::endl; std::cout << "[E2E-AUTH] Creating producer for 'test-auth-topic'..." << std::endl; - auto producer = create_producer(*authenticatedClient, "test-auth-topic"); + auto producer = authenticatedClient->topic_producer("test-auth-topic"); uint8_t payload[] = {'s', 'e', 'c', 'u', 'r', 'e'}; - producer_send(*producer, + producer->send( rust::Slice(), rust::Slice(payload, sizeof(payload))); - producer_flush(*producer); + producer->flush(); std::cout << "[E2E-AUTH] 🔒 Payload shipped through TLS socket!" << std::endl; std::cout << "[E2E-AUTH] Bootstrapping Authenticated Consumer..." << std::endl; - auto stream = consumer_stream(*authenticatedClient, "test-auth-topic", 0, 0); - auto rec = stream_next(*stream); - auto val = record_value(*rec); + auto stream = authenticatedClient->consumer_stream("test-auth-topic", 0, 0); + auto rec = stream->next(); + auto val = rec->value(); if(val.size() == 6) { std::cout << "[E2E-AUTH] 🔓 Successfully received authenticated payload matrix decrypting exact size match!" << std::endl; diff --git a/tests/test_config.cpp b/tests/test_config.cpp index 64e5ec9..6f81809 100644 --- a/tests/test_config.cpp +++ b/tests/test_config.cpp @@ -3,11 +3,15 @@ int main() { try { - std::cout << "Test: Creating configurations..." << std::endl; - auto cConfig = consumer_config_new(); - consumer_config_max_bytes(*cConfig, 1024); + std::cout << "Test: Creating Consumer Config..." << std::endl; + auto consumerConfigBuilder = ConsumerConfigBuilder::create() + ->max_bytes(1024) + ->disable_continuous(true); - auto pConfig = producer_config_new(); + std::cout << "Test: Creating Producer Config..." << std::endl; + auto producerConfigBuilder = TopicProducerConfigBuilder::create() + ->batch_size(1024) + ->linger(1000); std::cout << "Config Test Passed!" << std::endl; } catch (const std::exception& e) { diff --git a/tests/test_consumer.cpp b/tests/test_consumer.cpp index 3da7d83..6a39f6f 100644 --- a/tests/test_consumer.cpp +++ b/tests/test_consumer.cpp @@ -5,15 +5,15 @@ int main() { try { std::cout << "Test: Connecting to Fluvio..." << std::endl; - auto client = fluvio_connect(); + auto client = Fluvio::connect(); std::cout << "Test: Creating stream for 'test-topic' partition 0..." << std::endl; - auto stream = consumer_stream(*client, "test-topic", 0, 0); // Offset::beginning() + auto stream = client->consumer_stream("test-topic", 0, 0); // Offset::beginning() std::cout << "Test: Fetching one record..." << std::endl; - auto rec = stream_next(*stream); + auto rec = stream->next(); - auto val = record_value(*rec); + auto val = rec->value(); std::cout << "Fetched record value of size: " << val.size() << std::endl; std::cout << "Consumer Test Passed!" << std::endl; diff --git a/tests/test_producer.cpp b/tests/test_producer.cpp index 5da333c..e20efc8 100644 --- a/tests/test_producer.cpp +++ b/tests/test_producer.cpp @@ -5,33 +5,34 @@ int main() { try { std::cout << "Test: Connecting to admin..." << std::endl; - auto admin = fluvio_admin_connect(); + auto admin = FluvioAdmin::connect(); std::cout << "Test: Creating topic 'test-topic' (ignoring if it exists)..." << std::endl; try { - admin_create_topic(*admin, "test-topic", 1, 1); + admin->create_topic("test-topic", 1, 1); } catch (const std::exception& e) { std::cout << "Topic might already exist: " << e.what() << std::endl; } std::cout << "Test: Connecting to Fluvio..." << std::endl; - auto client = fluvio_connect(); + auto client = Fluvio::connect(); std::cout << "Test: Connecting successful." << std::endl; std::cout << "Test: Creating producer for 'test-topic'..." << std::endl; - auto producer = create_producer(*client, "test-topic"); + auto producer = client->topic_producer("test-topic"); std::cout << "Test: Sending record..." << std::endl; uint8_t key[] = {'t', 'e', 's', 't'}; uint8_t val[] = {'1', '2', '3'}; - auto out = producer_send(*producer, + auto out = producer->send( rust::Slice(key, sizeof(key)), - rust::Slice(val, sizeof(val))); + rust::Slice(val, sizeof(val)) + ); std::cout << "Test: Waiting for record confirmation..." << std::endl; - auto meta = produce_output_wait(*out); + auto meta = out->wait(); - producer_flush(*producer); + producer->flush(); std::cout << "Producer Test Passed!" << std::endl; } catch (const std::exception& e) { std::cerr << "Test Failed: " << e.what() << std::endl; From bab329e98a8779791f5a4b26f53d344a43203b3a Mon Sep 17 00:00:00 2001 From: KunoVonHagen <41991675+KunoVonHagen@users.noreply.github.com> Date: Sun, 31 May 2026 16:21:34 +0200 Subject: [PATCH 3/4] Updated examples to object-oriented design --- examples/README.md | 17 ++++++++++++++--- examples/consumer.cpp | 17 +++++++---------- examples/producer.cpp | 16 ++++++++-------- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/examples/README.md b/examples/README.md index 10ed396..49cc279 100644 --- a/examples/README.md +++ b/examples/README.md @@ -12,10 +12,10 @@ Before building the examples, ensure you have the following: ## What's Included? -We provide two simple applications to demonstrate the core features of the client: +We provide two simple applications to demonstrate the current object-oriented client API: -- **Producer (`producer.cpp`)**: Connects to the Fluvio cluster as an admin to ensure a topic named `example-topic` exists. It then creates a producer and sends a mock JSON payload representing sensor data. -- **Consumer (`consumer.cpp`)**: Connects to the Fluvio cluster, opens a stream on `example-topic`, and parses the incoming JSON data using `nlohmann::json`. +- **Producer (`producer.cpp`)**: Uses `FluvioAdmin::connect()` to ensure a topic named `example-topic` exists, then uses `Fluvio::connect()` and `client->topic_producer(...)` to send a JSON payload. +- **Consumer (`consumer.cpp`)**: Uses `Fluvio::connect()` and `client->consumer_stream(...)` to read from `example-topic`, then parses the incoming JSON data with `nlohmann::json`. ## Building the Examples @@ -72,3 +72,14 @@ Parsed JSON successfully: Sensor=temp-01 Value=24.5 ``` Congratulations! You've successfully streamed data using C++! + +## API Mapping + +The examples mirror the types exported from `src/lib.rs`: + +- `FluvioAdmin::connect()` creates an admin client. +- `Fluvio::connect()` creates the main client. +- `client->topic_producer(...)` returns a `TopicProducerPool` for sending records. +- `client->consumer_stream(...)` returns a `FluvioStream` for receiving records. +- `record->value()` exposes the fetched payload bytes. + diff --git a/examples/consumer.cpp b/examples/consumer.cpp index 9ea5778..42f636a 100644 --- a/examples/consumer.cpp +++ b/examples/consumer.cpp @@ -9,24 +9,21 @@ int main() { try { fmt::print("Starting Fluvio Consumer Example...\n"); - auto client = fluvio_connect(); - auto consumer = partition_consumer(*client, "example-topic", 0); - - auto stream = consumer_stream(*consumer, 0); // Offset::beginning() + auto client = Fluvio::connect(); + auto stream = client->consumer_stream("example-topic", 0, 0); fmt::print("Waiting for messages...\n"); - - // Fetch one record - auto rec = stream_next(*stream); - auto val = record_value(*rec); + + auto rec = stream->next(); + auto val = rec->value(); std::string payload(val.begin(), val.end()); fmt::print("Received Raw Bytes: {}\n", payload); try { json j = json::parse(payload); - fmt::print("Parsed JSON successfully: Sensor={} Value={}\n", - j["sensor"].get(), + fmt::print("Parsed JSON successfully: Sensor={} Value={}\n", + j["sensor"].get(), j["value"].get()); } catch (const json::parse_error& e) { fmt::print(stderr, "Failed to parse JSON: {}\n", e.what()); diff --git a/examples/producer.cpp b/examples/producer.cpp index 828c2c0..5515ff7 100644 --- a/examples/producer.cpp +++ b/examples/producer.cpp @@ -9,18 +9,17 @@ int main() { try { fmt::print("Starting Fluvio Producer Example...\n"); - auto admin = fluvio_admin_connect(); + auto admin = FluvioAdmin::connect(); try { - admin_create_topic(*admin, "example-topic", 1, 1); + admin->create_topic("example-topic", 1, 1); fmt::print("Created 'example-topic'.\n"); } catch (...) { fmt::print("'example-topic' already exists or creation failed.\n"); } - auto client = fluvio_connect(); - auto producer = create_producer(*client, "example-topic"); + auto client = Fluvio::connect(); + auto producer = client->topic_producer("example-topic"); - // Create a JSON payload json j = { {"sensor", "temp-01"}, {"value", 24.5}, @@ -31,13 +30,14 @@ int main() { fmt::print("Sending JSON: {}\n", payload); uint8_t key[] = {'j', 's', 'o', 'n'}; - auto out = producer_send(*producer, + auto out = producer->send( rust::Slice(key, sizeof(key)), rust::Slice(reinterpret_cast(payload.data()), payload.size()) ); - auto meta = produce_output_wait(*out); - producer_flush(*producer); + auto meta = out->wait(); + (void)meta; + producer->flush(); fmt::print("Record successfully sent to Fluvio!\n"); From a1d0cf6acd82dfd3703ca1f96238bcf4598f2a3d Mon Sep 17 00:00:00 2001 From: KunoVonHagen <41991675+KunoVonHagen@users.noreply.github.com> Date: Sun, 31 May 2026 16:21:59 +0200 Subject: [PATCH 4/4] Updated `README.md` to object-oriented design of API --- README.md | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index d9adb6a..09fc2e9 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ The client API documentation is written in standard Markdown and generated dynamically into C++ headers. You can find the full API overview in [docs/API.md](docs/API.md). +The public API follows the same object-oriented layout as the original Fluvio client: connect with `Fluvio` or `FluvioAdmin`, then call methods on the returned objects. + ## Installation You can install the client effortlessly without compiling the heavy Rust toolchain by using `vcpkg`. @@ -37,8 +39,8 @@ target_link_libraries(main PRIVATE fluvio_client_cpp::fluvio_client_cpp) #include "fluvio-client-cpp/src/lib.rs.h" int main() { - auto admin = fluvio_admin_connect(); - admin_create_topic(*admin, "a_topic", 1, 1); + auto admin = FluvioAdmin::connect(); + admin->create_topic("a_topic", 1, 1); return 0; } ``` @@ -51,18 +53,20 @@ int main() { #include int main() { - auto client = fluvio_connect(); - auto producer = create_producer(*client, "my-topic"); + auto client = Fluvio::connect(); + auto producer = client->topic_producer("my-topic"); std::string payload = "FOOBAR"; uint8_t key[] = {}; - producer_send(*producer, + auto out = producer->send( rust::Slice(key, 0), rust::Slice(reinterpret_cast(payload.data()), payload.size()) ); - producer_flush(*producer); + auto meta = out->wait(); + (void)meta; + producer->flush(); return 0; } ``` @@ -74,16 +78,13 @@ int main() { #include int main() { - auto client = fluvio_connect(); - auto consumer = partition_consumer(*client, "my-topic", 0); - auto stream = consumer_stream(*consumer, 0); // Offset::beginning + auto client = Fluvio::connect(); + auto stream = client->consumer_stream("my-topic", 0, 0); // Offset::beginning - for (int i = 0; i < 1; i++) { - auto rec = stream_next(*stream); - auto val = record_value(*rec); - std::string payload(val.begin(), val.end()); - std::cout << payload << std::endl; - } + auto rec = stream->next(); + auto val = rec->value(); + std::string payload(val.begin(), val.end()); + std::cout << payload << std::endl; return 0; } @@ -108,4 +109,5 @@ cmake -B build cmake --build build cd build ctest --output-on-failure +cd .. ```