Skip to content

Commit 9f4e0da

Browse files
committed
refactor!: rework core api
Signed-off-by: tison <wander4096@gmail.com>
1 parent 4a5cd53 commit 9f4e0da

18 files changed

Lines changed: 180 additions & 175 deletions

File tree

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ anyhow = { version = "1.0" }
5757
arc-swap = { version = "1.7.1" }
5858
clap = { version = "4.5.49", features = ["derive"] }
5959
colored = { version = "3.0" }
60-
crossbeam-channel = { version = "0.5.15" }
6160
fastrace = { version = "0.7" }
6261
fasyslog = { version = "1.0.0" }
6362
insta = { version = "1.43.2" }

appenders/async/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ rustdoc-args = ["--cfg", "docsrs"]
3333

3434
[dependencies]
3535
arc-swap = { workspace = true }
36-
crossbeam-channel = { workspace = true }
3736
logforth-core = { workspace = true }
3837
oneshot = { workspace = true }
3938

appenders/async/src/append.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ use logforth_core::record::Record;
2222
use logforth_core::trap::BestEffortTrap;
2323

2424
use crate::Overflow;
25+
use crate::Sender;
2526
use crate::Task;
2627
use crate::state::AsyncState;
28+
use crate::worker::RecordOwned;
2729
use crate::worker::Worker;
2830

2931
/// A composable appender, logging and flushing asynchronously.
@@ -64,7 +66,7 @@ impl Append for Async {
6466
}
6567

6668
let task = Task::Log {
67-
record: Box::new(record.to_owned()),
69+
record: Box::new(RecordOwned::from_record(record)),
6870
diags: diagnostics,
6971
};
7072
self.state.send_task(task)
@@ -146,8 +148,14 @@ impl AsyncBuilder {
146148
} = self;
147149

148150
let (sender, receiver) = match buffered_lines_limit {
149-
Some(limit) => crossbeam_channel::bounded(limit),
150-
None => crossbeam_channel::unbounded(),
151+
Some(limit) => {
152+
let (tx, rx) = std::sync::mpsc::sync_channel(limit);
153+
(Sender::Bounded(tx), rx)
154+
}
155+
None => {
156+
let (tx, rx) = std::sync::mpsc::channel();
157+
(Sender::Unbounded(tx), rx)
158+
}
151159
};
152160

153161
let worker = Worker::new(appends, receiver, trap);

appenders/async/src/lib.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
use logforth_core::Error;
2121
use logforth_core::kv;
22-
use logforth_core::record::RecordOwned;
2322

2423
mod append;
2524
mod state;
@@ -30,7 +29,7 @@ pub use self::append::AsyncBuilder;
3029

3130
enum Task {
3231
Log {
33-
record: Box<RecordOwned>,
32+
record: Box<worker::RecordOwned>,
3433
diags: Vec<(kv::KeyOwned, kv::ValueOwned)>,
3534
},
3635
Flush {
@@ -45,3 +44,36 @@ enum Overflow {
4544
/// Drops the incoming operation.
4645
DropIncoming,
4746
}
47+
48+
#[derive(Clone)]
49+
enum Sender<T> {
50+
Unbounded(std::sync::mpsc::Sender<T>),
51+
Bounded(std::sync::mpsc::SyncSender<T>),
52+
}
53+
54+
impl<T> std::fmt::Debug for Sender<T> {
55+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56+
match self {
57+
Sender::Unbounded(tx) => tx.fmt(f),
58+
Sender::Bounded(tx) => tx.fmt(f),
59+
}
60+
}
61+
}
62+
63+
impl<T> Sender<T> {
64+
fn send(&self, value: T) -> Result<(), std::sync::mpsc::SendError<T>> {
65+
match self {
66+
Sender::Unbounded(s) => s.send(value),
67+
Sender::Bounded(s) => s.send(value),
68+
}
69+
}
70+
71+
fn try_send(&self, value: T) -> Result<(), std::sync::mpsc::TrySendError<T>> {
72+
match self {
73+
Sender::Unbounded(s) => s
74+
.send(value)
75+
.map_err(|e| std::sync::mpsc::TrySendError::Disconnected(e.0)),
76+
Sender::Bounded(s) => s.try_send(value),
77+
}
78+
}
79+
}

appenders/async/src/state.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ use std::sync::Arc;
1616
use std::thread::JoinHandle;
1717

1818
use arc_swap::ArcSwapOption;
19-
use crossbeam_channel::Sender;
2019
use logforth_core::Error;
2120

2221
use crate::Overflow;
22+
use crate::Sender;
2323
use crate::Task;
2424

2525
#[derive(Debug)]
@@ -56,8 +56,8 @@ impl AsyncState {
5656
}),
5757
Overflow::DropIncoming => match sender.try_send(task) {
5858
Ok(()) => Ok(()),
59-
Err(crossbeam_channel::TrySendError::Full(_)) => Ok(()),
60-
Err(crossbeam_channel::TrySendError::Disconnected(task)) => {
59+
Err(std::sync::mpsc::TrySendError::Full(_)) => Ok(()),
60+
Err(std::sync::mpsc::TrySendError::Disconnected(task)) => {
6161
Err(Error::new(match task {
6262
Task::Log { .. } => "failed to send log task to async appender",
6363
Task::Flush { .. } => "failed to send flush task to async appender",

appenders/async/src/worker.rs

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,19 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use crossbeam_channel::Receiver;
15+
use std::borrow::Cow;
16+
use std::sync::mpsc::Receiver;
17+
use std::time::SystemTime;
18+
1619
use logforth_core::Append;
1720
use logforth_core::Diagnostic;
1821
use logforth_core::Error;
1922
use logforth_core::Trap;
2023
use logforth_core::kv;
24+
use logforth_core::kv::KeyValues;
2125
use logforth_core::kv::Visitor;
26+
use logforth_core::record::Level;
27+
use logforth_core::record::Record;
2228

2329
use crate::Task;
2430

@@ -56,7 +62,18 @@ impl Worker {
5662
} else {
5763
&[Box::new(OwnedDiagnostic(diags))]
5864
};
59-
let record = record.as_record();
65+
let payload = format_args!("{}", record.payload);
66+
let record = Record::builder()
67+
.time(record.now)
68+
.level(record.level)
69+
.target(record.target.as_ref())
70+
.module_path(record.module_path.as_deref())
71+
.file(record.file.as_deref())
72+
.line(record.line)
73+
.column(record.column)
74+
.payload(payload)
75+
.key_values(KeyValues::from(record.kvs.as_slice()))
76+
.build();
6077
for append in appends.iter() {
6178
if let Err(err) = append.append(&record, diags) {
6279
let err = Error::new("failed to append record").with_source(err);
@@ -93,3 +110,59 @@ impl Diagnostic for OwnedDiagnostic {
93110
Ok(())
94111
}
95112
}
113+
114+
#[derive(Clone, Debug)]
115+
pub(crate) struct RecordOwned {
116+
// the observed time
117+
now: SystemTime,
118+
119+
// the metadata
120+
level: Level,
121+
target: Cow<'static, str>,
122+
module_path: Option<Cow<'static, str>>,
123+
file: Option<Cow<'static, str>>,
124+
line: Option<u32>,
125+
column: Option<u32>,
126+
127+
// the payload
128+
payload: Cow<'static, str>,
129+
130+
// structural logging
131+
kvs: Vec<(kv::KeyOwned, kv::ValueOwned)>,
132+
}
133+
134+
impl RecordOwned {
135+
pub fn from_record(record: &Record) -> Self {
136+
RecordOwned {
137+
now: record.time(),
138+
level: record.level(),
139+
target: if let Some(target) = record.target_static() {
140+
Cow::Borrowed(target)
141+
} else {
142+
Cow::Owned(record.target().to_string())
143+
},
144+
module_path: if let Some(module_path) = record.module_path_static() {
145+
Some(Cow::Borrowed(module_path))
146+
} else {
147+
record.module_path().map(|s| Cow::Owned(s.to_string()))
148+
},
149+
file: if let Some(file) = record.file_static() {
150+
Some(Cow::Borrowed(file))
151+
} else {
152+
record.file().map(|s| Cow::Owned(s.to_string()))
153+
},
154+
line: record.line(),
155+
column: record.column(),
156+
payload: if let Some(payload) = record.payload_static() {
157+
Cow::Borrowed(payload)
158+
} else {
159+
Cow::Owned(record.payload().to_string())
160+
},
161+
kvs: record
162+
.key_values()
163+
.iter()
164+
.map(|(k, v)| (k.to_owned(), v.to_owned()))
165+
.collect(),
166+
}
167+
}
168+
}

appenders/fastrace/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ pub struct FastraceEvent {}
4949

5050
impl Append for FastraceEvent {
5151
fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
52-
let message = record.payload().to_owned();
52+
let message = if let Some(msg) = record.payload_static() {
53+
Cow::Borrowed(msg)
54+
} else {
55+
Cow::Owned(record.payload().to_string())
56+
};
5357

5458
let mut collector = KvCollector { kv: Vec::new() };
5559
record.key_values().visit(&mut collector)?;

appenders/file/tests/global_file_limit.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ fn test_global_file_count_limit() {
4545
writer
4646
.append(
4747
&Record::builder()
48-
.payload(format!("Log entry {}: {}\n", i, "A".repeat(50)))
48+
.payload(format_args!("Log entry {}: {}\n", i, "A".repeat(50)))
4949
.build(),
5050
&[],
5151
)
@@ -156,7 +156,7 @@ fn create_logs(dir: &Path, max_files: usize, max_size: usize, filename: &str, co
156156
writer
157157
.append(
158158
&Record::builder()
159-
.payload(format!(
159+
.payload(format_args!(
160160
"Prefix {}, Log {}: {}\n",
161161
filename,
162162
i,

appenders/journald/src/field.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ impl PutAsFieldValue for &str {
8282
}
8383
}
8484

85+
impl PutAsFieldValue for std::fmt::Arguments<'_> {
86+
fn put_field_value(self, buffer: &mut Vec<u8>) {
87+
// SAFETY: no more than an allocate-less version
88+
// buffer.extend_from_slice(format!("{}", self))
89+
write!(buffer, "{self}").unwrap();
90+
}
91+
}
92+
8593
impl PutAsFieldValue for Value<'_> {
8694
fn put_field_value(self, buffer: &mut Vec<u8>) {
8795
// SAFETY: no more than an allocate-less version

appenders/opentelemetry/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ impl Append for OpentelemetryLog {
245245
} else if let Some(payload) = record.payload_static() {
246246
log_record.set_body(AnyValue::from(payload));
247247
} else {
248-
log_record.set_body(AnyValue::from(record.payload().to_owned()));
248+
log_record.set_body(AnyValue::from(record.payload().to_string()));
249249
}
250250

251251
if let Some(module_path) = record.module_path_static() {

0 commit comments

Comments
 (0)