Skip to content

Commit ff12792

Browse files
tisonkunTennyZhuang
andcommitted
refactor: rework logger API
Signed-off-by: tison <wander4096@gmail.com> Co-authored-by: TennyZhuang <zty0826@gmail.com> Signed-off-by: tison <wander4096@gmail.com>
1 parent 7ec82ae commit ff12792

18 files changed

Lines changed: 218 additions & 331 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
44

55
## Unreleased
66

7+
### Breaking changes
8+
9+
* `Append` has no more `exit` method. Users should compose `logforth::core::default_logger().flush()` with their own graceful shutdown logic.
10+
* `Async` appender's `flush` method is now blocking until all buffered logs are flushed by worker threads. Any errors during flushing will be propagated back to the `flush` caller.
11+
712
## [0.29.1] 2025-11-03
813

914
### Bug fixes

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ insta = { version = "1.43.2" }
6060
jiff = { version = "0.2" }
6161
libc = { version = "0.2.162" }
6262
log = { version = "0.4.27", features = ["kv_std", "kv_sval"] }
63+
oneshot = { version = "0.1.11", default-features = false, features = ["std"] }
6364
opentelemetry = { version = "0.31.0", default-features = false }
6465
opentelemetry-otlp = { version = "0.31.0", default-features = false }
6566
opentelemetry_sdk = { version = "0.31.0", default-features = false }

appenders/async/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ rustdoc-args = ["--cfg", "docsrs"]
3535
arc-swap = { workspace = true }
3636
crossbeam-channel = { workspace = true }
3737
logforth-core = { workspace = true }
38+
oneshot = { workspace = true }
3839

3940
[lints]
4041
workspace = true

appenders/async/src/append.rs

Lines changed: 14 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::Arc;
16-
1715
use logforth_core::Append;
1816
use logforth_core::Diagnostic;
1917
use logforth_core::Error;
@@ -31,10 +29,7 @@ use crate::worker::Worker;
3129
/// A composable appender, logging and flushing asynchronously.
3230
#[derive(Debug)]
3331
pub struct Async {
34-
appends: Arc<[Box<dyn Append>]>,
35-
overflow: Overflow,
3632
state: AsyncState,
37-
trap: Arc<dyn Trap>,
3833
}
3934

4035
impl Append for Async {
@@ -46,38 +41,24 @@ impl Append for Async {
4641
d.visit(&mut collector)?;
4742
}
4843

49-
let overflow = self.overflow;
5044
let task = Task::Log {
51-
appends: self.appends.clone(),
5245
record: Box::new(record.to_owned()),
5346
diags: diagnostics,
5447
};
55-
self.state.send_task(task, overflow)
48+
self.state.send_task(task)
5649
}
5750

5851
fn flush(&self) -> Result<(), Error> {
59-
let overflow = self.overflow;
60-
let task = Task::Flush {
61-
appends: self.appends.clone(),
62-
};
63-
self.state.send_task(task, overflow)
64-
}
52+
let (done_tx, done_rx) = oneshot::channel();
53+
54+
let task = Task::Flush { done: done_tx };
55+
self.state.send_task(task)?;
6556

66-
fn exit(&self) -> Result<(), Error> {
67-
// If the program is tearing down, this will be the final flush. `crossbeam`
68-
// uses thread-local internally, which is not supported in `atexit` callback.
69-
// This can be bypassed by flushing sinks directly on the current thread, but
70-
// before we do that we have to join the thread to ensure that any pending log
71-
// tasks are completed.
72-
//
73-
// @see https://github.com/SpriteOvO/spdlog-rs/issues/64
74-
self.state.destroy();
75-
for append in self.appends.iter() {
76-
if let Err(err) = append.exit() {
77-
self.trap.trap(&err);
78-
}
57+
match done_rx.recv() {
58+
Ok(None) => Ok(()),
59+
Ok(Some(err)) => Err(err),
60+
Err(err) => Err(Error::new("worker exited before completing flush").with_source(err)),
7961
}
80-
Ok(())
8162
}
8263
}
8364

@@ -86,7 +67,7 @@ pub struct AsyncBuilder {
8667
thread_name: String,
8768
appends: Vec<Box<dyn Append>>,
8869
buffered_lines_limit: Option<usize>,
89-
trap: Arc<dyn Trap>,
70+
trap: Box<dyn Trap>,
9071
overflow: Overflow,
9172
}
9273

@@ -97,7 +78,7 @@ impl AsyncBuilder {
9778
thread_name: thread_name.into(),
9879
appends: vec![],
9980
buffered_lines_limit: None,
100-
trap: Arc::new(BestEffortTrap::default()),
81+
trap: Box::new(BestEffortTrap::default()),
10182
overflow: Overflow::Block,
10283
}
10384
}
@@ -122,7 +103,6 @@ impl AsyncBuilder {
122103

123104
/// Set the trap for this async appender.
124105
pub fn trap(mut self, trap: impl Into<Box<dyn Trap>>) -> Self {
125-
let trap = trap.into();
126106
self.trap = trap.into();
127107
self
128108
}
@@ -143,26 +123,19 @@ impl AsyncBuilder {
143123
overflow,
144124
} = self;
145125

146-
let appends = appends.into_boxed_slice().into();
147-
148126
let (sender, receiver) = match buffered_lines_limit {
149127
Some(limit) => crossbeam_channel::bounded(limit),
150128
None => crossbeam_channel::unbounded(),
151129
};
152130

153-
let worker = Worker::new(receiver, trap.clone());
131+
let worker = Worker::new(appends, receiver, trap);
154132
let thread_handle = std::thread::Builder::new()
155133
.name(thread_name)
156134
.spawn(move || worker.run())
157135
.expect("failed to spawn async appender thread");
158-
let state = AsyncState::new(sender, thread_handle);
159136

160-
Async {
161-
appends,
162-
overflow,
163-
state,
164-
trap,
165-
}
137+
let state = AsyncState::new(overflow, sender, thread_handle);
138+
Async { state }
166139
}
167140
}
168141

appenders/async/src/lib.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
1717
#![cfg_attr(docsrs, feature(doc_cfg))]
1818

19-
use std::sync::Arc;
20-
21-
use logforth_core::Append;
19+
use logforth_core::Error;
2220
use logforth_core::kv;
2321
use logforth_core::record::RecordOwned;
2422

@@ -31,12 +29,11 @@ pub use self::append::AsyncBuilder;
3129

3230
enum Task {
3331
Log {
34-
appends: Arc<[Box<dyn Append>]>,
3532
record: Box<RecordOwned>,
3633
diags: Vec<(kv::KeyOwned, kv::ValueOwned)>,
3734
},
3835
Flush {
39-
appends: Arc<[Box<dyn Append>]>,
36+
done: oneshot::Sender<Option<Error>>,
4037
},
4138
}
4239

appenders/async/src/state.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,27 @@ pub(crate) struct AsyncState(ArcSwapOption<State>);
2727

2828
#[derive(Debug)]
2929
struct State {
30+
overflow: Overflow,
3031
sender: Sender<Task>,
3132
handle: JoinHandle<()>,
3233
}
3334

3435
impl AsyncState {
35-
pub(crate) fn new(sender: Sender<Task>, handle: JoinHandle<()>) -> Self {
36-
let state = State { sender, handle };
37-
Self(ArcSwapOption::from(Some(Arc::new(state))))
36+
pub(crate) fn new(overflow: Overflow, sender: Sender<Task>, handle: JoinHandle<()>) -> Self {
37+
Self(ArcSwapOption::from(Some(Arc::new(State {
38+
overflow,
39+
sender,
40+
handle,
41+
}))))
3842
}
3943

40-
pub(crate) fn send_task(&self, task: Task, overflow: Overflow) -> Result<(), Error> {
44+
pub(crate) fn send_task(&self, task: Task) -> Result<(), Error> {
4145
let state = self.0.load();
4246
// SAFETY: state is always Some before dropped.
4347
let state = state.as_ref().unwrap();
4448
let sender = &state.sender;
4549

46-
match overflow {
50+
match state.overflow {
4751
Overflow::Block => sender.send(task).map_err(|err| {
4852
Error::new(match err.0 {
4953
Task::Log { .. } => "failed to send log task to async appender",
@@ -66,7 +70,11 @@ impl AsyncState {
6670
pub(crate) fn destroy(&self) {
6771
if let Some(state) = self.0.swap(None) {
6872
// SAFETY: state has always one strong count before swapped.
69-
let State { sender, handle } = Arc::into_inner(state).unwrap();
73+
let State {
74+
overflow: _,
75+
sender,
76+
handle,
77+
} = Arc::into_inner(state).unwrap();
7078

7179
// drop our sender, threads will break the loop after receiving and processing
7280
drop(sender);

appenders/async/src/worker.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::Arc;
16-
1715
use crossbeam_channel::Receiver;
16+
use logforth_core::Append;
1817
use logforth_core::Diagnostic;
1918
use logforth_core::Error;
2019
use logforth_core::Trap;
@@ -24,25 +23,34 @@ use logforth_core::kv::Visitor;
2423
use crate::Task;
2524

2625
pub(crate) struct Worker {
26+
appends: Vec<Box<dyn Append>>,
2727
receiver: Receiver<Task>,
28-
trap: Arc<dyn Trap>,
28+
trap: Box<dyn Trap>,
2929
}
3030

3131
impl Worker {
32-
pub(crate) fn new(receiver: Receiver<Task>, trap: Arc<dyn Trap>) -> Self {
33-
Self { receiver, trap }
32+
pub(crate) fn new(
33+
appends: Vec<Box<dyn Append>>,
34+
receiver: Receiver<Task>,
35+
trap: Box<dyn Trap>,
36+
) -> Self {
37+
Self {
38+
appends,
39+
receiver,
40+
trap,
41+
}
3442
}
3543

3644
pub(crate) fn run(self) {
37-
let Self { receiver, trap } = self;
45+
let Self {
46+
appends,
47+
receiver,
48+
trap,
49+
} = self;
3850

3951
while let Ok(task) = receiver.recv() {
4052
match task {
41-
Task::Log {
42-
appends,
43-
record,
44-
diags,
45-
} => {
53+
Task::Log { record, diags } => {
4654
let diags: &[Box<dyn Diagnostic>] = if diags.is_empty() {
4755
&[]
4856
} else {
@@ -51,18 +59,23 @@ impl Worker {
5159
let record = record.as_record();
5260
for append in appends.iter() {
5361
if let Err(err) = append.append(&record, diags) {
54-
let err = Error::new("failed to append record").set_source(err);
62+
let err = Error::new("failed to append record").with_source(err);
5563
trap.trap(&err);
5664
}
5765
}
5866
}
59-
Task::Flush { appends } => {
67+
Task::Flush { done } => {
68+
let mut error = None;
6069
for append in appends.iter() {
6170
if let Err(err) = append.flush() {
62-
let err = Error::new("failed to flush").set_source(err);
63-
trap.trap(&err);
71+
error = Some(
72+
error
73+
.unwrap_or_else(|| Error::new("failed to flush appender"))
74+
.with_source(err),
75+
);
6476
}
6577
}
78+
let _ = done.send(error);
6679
}
6780
}
6881
}

0 commit comments

Comments
 (0)