Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 124 additions & 94 deletions src/fs/fuser.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::ffi::OsStr;
use std::sync::Arc;

use crate::fs::r#trait::{CommonFileAttr, DirEntryType, FileAttr, Fs, LockOwner, OpenFlags};
use tracing::{debug, error, instrument};
Expand Down Expand Up @@ -115,7 +116,7 @@ where
F::ReaddirError: Into<i32>,
F::ReleaseError: Into<i32>,
{
fs: F,
fs: Arc<F>,
runtime: tokio::runtime::Handle,
}

Expand All @@ -141,7 +142,10 @@ where
const SHAMEFUL_TTL: std::time::Duration = std::time::Duration::from_secs(1);

pub fn new(fs: F, runtime: tokio::runtime::Handle) -> Self {
Self { fs, runtime }
Self {
fs: Arc::new(fs),
runtime,
}
}
}

Expand All @@ -154,6 +158,15 @@ where
F::ReaddirError: Into<i32>,
F::ReleaseError: Into<i32>,
{
fn init(
&mut self,
_req: &fuser::Request<'_>,
_config: &mut fuser::KernelConfig,
) -> Result<(), libc::c_int> {
self.runtime.block_on(self.fs.init());
Ok(())
}

#[instrument(name = "FuserAdapter::lookup", skip(self, _req, reply))]
fn lookup(
&mut self,
Expand All @@ -162,20 +175,22 @@ where
name: &OsStr,
reply: fuser::ReplyEntry,
) {
match self.runtime.block_on(self.fs.lookup(parent, name)) {
Ok(attr) => {
// TODO(markovejnovic): Passing generation = 0 here is a recipe for disaster.
// Someone with A LOT of files will likely see inode reuse which will lead to a
// disaster.
let f_attr: fuser::FileAttr = attr.into();
debug!(?f_attr, "replying...");
reply.entry(&Self::SHAMEFUL_TTL, &f_attr, 0);
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
let fs = Arc::clone(&self.fs);
let name = name.to_owned();
let ttl = Self::SHAMEFUL_TTL;
self.runtime.spawn(async move {
match fs.lookup(parent, &name).await {
Ok(attr) => {
let f_attr: fuser::FileAttr = attr.into();
debug!(?f_attr, "replying...");
reply.entry(&ttl, &f_attr, 0);
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
}
}
}
});
}

#[instrument(name = "FuserAdapter::getattr", skip(self, _req, fh, reply))]
Expand All @@ -186,16 +201,20 @@ where
fh: Option<u64>,
reply: fuser::ReplyAttr,
) {
match self.runtime.block_on(self.fs.getattr(ino, fh)) {
Ok(attr) => {
debug!(?attr, "replying...");
reply.attr(&Self::SHAMEFUL_TTL, &attr.into());
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
let fs = Arc::clone(&self.fs);
let ttl = Self::SHAMEFUL_TTL;
self.runtime.spawn(async move {
match fs.getattr(ino, fh).await {
Ok(attr) => {
debug!(?attr, "replying...");
reply.attr(&ttl, &attr.into());
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
}
}
}
});
}

#[instrument(name = "FuserAdapter::readdir", skip(self, _req, _fh, offset, reply))]
Expand All @@ -207,54 +226,61 @@ where
offset: i64,
mut reply: fuser::ReplyDirectory,
) {
let entries = match self.runtime.block_on(self.fs.readdir(ino)) {
Ok(entries) => entries,
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
return;
}
};

#[expect(
clippy::cast_possible_truncation,
reason = "fuser offset is i64 but always non-negative"
)]
for (i, entry) in entries
.iter()
.enumerate()
.skip(offset.cast_unsigned() as usize)
{
let kind: fuser::FileType = entry.kind.into();
let Ok(idx): Result<i64, _> = (i + 1).try_into() else {
error!("Directory entry index {} too large for fuser", i + 1);
reply.error(libc::EIO);
return;
let fs = Arc::clone(&self.fs);
self.runtime.spawn(async move {
let entries = match fs.readdir(ino).await {
Ok(entries) => entries,
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
return;
}
};

debug!(?entry, "adding entry to reply...");
if reply.add(entry.ino, idx, kind, &entry.name) {
debug!("buffer full for now, stopping readdir");
break;
#[expect(
clippy::cast_possible_truncation,
reason = "fuser offset is i64 but always non-negative"
)]
for (i, entry) in entries
.iter()
.enumerate()
.skip(offset.cast_unsigned() as usize)
{
let kind: fuser::FileType = entry.kind.into();
let Ok(idx): Result<i64, _> = (i + 1).try_into() else {
error!("Directory entry index {} too large for fuser", i + 1);
reply.error(libc::EIO);
return;
};

debug!(?entry, "adding entry to reply...");
if reply.add(entry.ino, idx, kind, &entry.name) {
debug!("buffer full for now, stopping readdir");
break;
}
}
}

debug!("finalizing reply...");
reply.ok();
debug!("finalizing reply...");
reply.ok();
});
}

#[instrument(name = "FuserAdapter::open", skip(self, _req, flags, reply))]
fn open(&mut self, _req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) {
match self.runtime.block_on(self.fs.open(ino, flags.into())) {
Ok(open_file) => {
debug!(handle = open_file.handle, "replying...");
reply.opened(open_file.handle, 0);
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
let fs = Arc::clone(&self.fs);
let flags: OpenFlags = flags.into();
self.runtime.spawn(async move {
match fs.open(ino, flags).await {
Ok(open_file) => {
debug!(handle = open_file.handle, "replying...");
reply.opened(open_file.handle, 0);
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
}
}
}
});
}

#[instrument(
Expand All @@ -272,25 +298,24 @@ where
lock_owner: Option<u64>,
reply: fuser::ReplyData,
) {
let fs = Arc::clone(&self.fs);
let flags: OpenFlags = flags.into();
let lock_owner = lock_owner.map(LockOwner);
match self.runtime.block_on(self.fs.read(
ino,
fh,
offset.cast_unsigned(),
size,
flags,
lock_owner,
)) {
Ok(data) => {
debug!(read_bytes = data.len(), "replying...");
reply.data(&data);
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
self.runtime.spawn(async move {
match fs
.read(ino, fh, offset.cast_unsigned(), size, flags, lock_owner)
.await
{
Ok(data) => {
debug!(read_bytes = data.len(), "replying...");
reply.data(&data);
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
}
}
}
});
}

#[instrument(name = "FuserAdapter::release", skip(self, _req, _lock_owner, reply))]
Expand All @@ -304,30 +329,35 @@ where
flush: bool,
reply: fuser::ReplyEmpty,
) {
match self
.runtime
.block_on(self.fs.release(ino, fh, flags.into(), flush))
{
Ok(()) => {
debug!("replying ok");
reply.ok();
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
let fs = Arc::clone(&self.fs);
let flags: OpenFlags = flags.into();
self.runtime.spawn(async move {
match fs.release(ino, fh, flags, flush).await {
Ok(()) => {
debug!("replying ok");
reply.ok();
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
}
}
}
});
}

#[instrument(name = "FuserAdapter::forget", skip(self, _req, nlookup))]
fn forget(&mut self, _req: &fuser::Request<'_>, ino: u64, nlookup: u64) {
self.runtime.block_on(self.fs.forget(ino, nlookup));
let fs = Arc::clone(&self.fs);
self.runtime.spawn(async move {
fs.forget(ino, nlookup).await;
});
}

#[instrument(name = "FuserAdapter::statfs", skip(self, _req, _ino, reply))]
fn statfs(&mut self, _req: &fuser::Request<'_>, _ino: u64, reply: fuser::ReplyStatfs) {
self.runtime.block_on(async {
match self.fs.statfs().await {
let fs = Arc::clone(&self.fs);
self.runtime.spawn(async move {
match fs.statfs().await {
Ok(statvfs) => {
debug!(?statvfs, "replying...");
reply.statfs(
Expand Down
Loading