patch: simplify FUSE

Signed-off-by: ClSlaid <cailue@bupt.edu.cn>
This commit is contained in:
ClSlaid
2023-10-16 00:51:12 +08:00
parent 796e2ec825
commit 9adda25e00
6 changed files with 387 additions and 598 deletions

View File

@@ -24,22 +24,21 @@ use std::{
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc::{Receiver, Sender},
Arc,
},
time::{Duration, SystemTime},
};
use dashmap::DashMap;
use fuser::{ReplyDirectory, Request, FUSE_ROOT_ID};
use fuser::{ReplyDirectory, FUSE_ROOT_ID};
use hbb_common::{
bytes::{Buf, Bytes},
log,
};
use parking_lot::{Condvar, Mutex, RwLock};
use rayon::prelude::*;
use parking_lot::{Condvar, Mutex};
use utf16string::WStr;
use crate::{ClipboardFile, CliprdrError};
use crate::{send_data, ClipboardFile, CliprdrError};
#[cfg(target_os = "linux")]
use super::LDAP_EPOCH_DELTA;
@@ -54,175 +53,71 @@ const PERM_READ: u16 = 0o444;
/// max length of file name
const MAX_NAME_LEN: usize = 255;
// fuse server state
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Status {
// active and ready for all incoming requests
Active,
// marking and waiting for all FDs to be closed
// only serve read requests
Gc,
// gc completes
// serve no requests
GcComplete,
// fetching new files from remote
// serve no requests
// this state is to make sure only one fetching is running
Fetching,
// fetched, building new FS
Building,
}
#[derive(Debug, Default)]
struct PendingRequest {
content: Mutex<Option<ClipboardFile>>,
cvar: Condvar,
}
impl PendingRequest {
pub fn new() -> Self {
Self {
content: Mutex::new(None),
cvar: Condvar::new(),
}
}
pub fn recv_timeout(&self, timeout: Duration) -> Result<ClipboardFile, std::io::Error> {
let mut guard = self.content.lock();
let res = self.cvar.wait_for(&mut guard, timeout);
if res.timed_out() {
Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout"))
} else {
let content = guard.take();
match content {
Some(content) => Ok(content),
None => Err(std::io::Error::new(std::io::ErrorKind::Other, "no content")),
}
}
}
pub fn set(&self, content: ClipboardFile) {
let mut guard = self.content.lock();
let _ = guard.insert(content);
self.cvar.notify_all();
}
}
/// clipboard message dispatcher
#[derive(Debug, Default)]
struct CliprdrTxnDispatcher {
txn_handler: DashMap<(i32, Option<i32>), Arc<PendingRequest>>,
}
impl CliprdrTxnDispatcher {
pub fn send(&self, conn_id: i32, request: ClipboardFile) -> Arc<PendingRequest> {
let stream_id = match &request {
ClipboardFile::FormatDataRequest { .. } => None,
ClipboardFile::FileContentsRequest { stream_id, .. } => Some(stream_id),
_ => unreachable!(),
};
let req = Arc::new(PendingRequest::new());
self.txn_handler
.insert((conn_id, stream_id.copied()), req.clone());
log::debug!(
"send request to conn_id={}, stream_id={:?}",
conn_id,
stream_id
);
crate::send_data(conn_id, request);
req
}
pub fn recv(&self, conn_id: i32, response: ClipboardFile) {
let stream_id = match &response {
ClipboardFile::FormatDataResponse { .. } => None,
ClipboardFile::FileContentsResponse { stream_id, .. } => Some(stream_id),
_ => unreachable!(),
};
let key = (conn_id, stream_id.cloned());
log::debug!("recv response for {:?}", key);
match self.txn_handler.remove(&key) {
Some((_, tx)) => tx.set(response),
None => log::warn!("no request found for {:?}", key),
}
}
}
/// this is a proxy type
/// to avoid occupy FuseServer with &mut self
/// fuse client
/// this is a proxy to the fuse server
#[derive(Debug)]
pub(crate) struct FuseClient {
server: Arc<FuseServer>,
}
impl FuseClient {
pub fn new(server: Arc<FuseServer>) -> Self {
Self { server }
}
pub struct FuseClient {
server: Arc<Mutex<FuseServer>>,
}
impl fuser::Filesystem for FuseClient {
fn init(
&mut self,
_req: &fuser::Request<'_>,
_config: &mut fuser::KernelConfig,
req: &fuser::Request<'_>,
config: &mut fuser::KernelConfig,
) -> Result<(), libc::c_int> {
log::debug!("init fuse server");
self.server.init();
Ok(())
let mut server = self.server.lock();
server.init(req, config)
}
fn lookup(
&mut self,
_req: &Request,
req: &fuser::Request<'_>,
parent: u64,
name: &std::ffi::OsStr,
reply: fuser::ReplyEntry,
) {
log::debug!("lookup: parent={}, name={:?}", parent, name);
self.server.look_up(parent, name, reply)
let mut server = self.server.lock();
server.lookup(req, parent, name, reply)
}
fn opendir(&mut self, _req: &Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) {
log::debug!("opendir: ino={}, flags={}", ino, flags);
self.server.opendir(ino, flags, reply)
fn opendir(&mut self, req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) {
let mut server = self.server.lock();
server.opendir(req, ino, flags, reply)
}
fn readdir(
&mut self,
_req: &Request<'_>,
req: &fuser::Request<'_>,
ino: u64,
fh: u64,
offset: i64,
reply: ReplyDirectory,
reply: fuser::ReplyDirectory,
) {
log::debug!("readdir: ino={}, fh={}, offset={}", ino, fh, offset);
self.server.readdir(ino, fh, offset, reply)
let mut server = self.server.lock();
server.readdir(req, ino, fh, offset, reply)
}
fn releasedir(
&mut self,
_req: &Request<'_>,
req: &fuser::Request<'_>,
ino: u64,
fh: u64,
flags: i32,
_flags: i32,
reply: fuser::ReplyEmpty,
) {
log::debug!("releasedir: ino={}, fh={}, flags={}", ino, fh, flags);
self.server.releasedir(ino, fh, flags, reply)
let mut server = self.server.lock();
server.releasedir(req, ino, fh, _flags, reply)
}
fn open(&mut self, _req: &Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) {
log::debug!("open: ino={}, flags={}", ino, flags);
self.server.open(ino, flags, reply)
fn open(&mut self, req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) {
let mut server = self.server.lock();
server.open(req, ino, flags, reply)
}
fn read(
&mut self,
_req: &Request<'_>,
req: &fuser::Request<'_>,
ino: u64,
fh: u64,
offset: i64,
@@ -231,31 +126,22 @@ impl fuser::Filesystem for FuseClient {
lock_owner: Option<u64>,
reply: fuser::ReplyData,
) {
log::debug!(
"read: ino={}, fh={}, offset={}, size={}, flags={}",
ino,
fh,
offset,
size,
flags
);
self.server
.read(ino, fh, offset, size, flags, lock_owner, reply)
let mut server = self.server.lock();
server.read(req, ino, fh, offset, size, flags, lock_owner, reply)
}
fn release(
&mut self,
_req: &Request<'_>,
req: &fuser::Request<'_>,
ino: u64,
fh: u64,
flags: i32,
lock_owner: Option<u64>,
flush: bool,
_flags: i32,
_lock_owner: Option<u64>,
_flush: bool,
reply: fuser::ReplyEmpty,
) {
log::debug!("release: ino={}, fh={}, flush={}", ino, fh, flush);
self.server
.release(ino, fh, flags, lock_owner, flush, reply)
let mut server = self.server.lock();
server.release(req, ino, fh, _flags, _lock_owner, _flush, reply)
}
}
@@ -263,58 +149,75 @@ impl fuser::Filesystem for FuseClient {
/// provides a read-only file system
#[derive(Debug)]
pub(crate) struct FuseServer {
status: RwLock<Status>,
dispatcher: CliprdrTxnDispatcher,
// timeout
// current files
// inode mapping:
// 1 -> root (parent of all files)
// 2~n+1 -> nth file in the list (n is the length of the list)
// 0 | n+2.. -> not found
// Note that the file tree is pre-ordered
files: RwLock<Vec<FuseNode>>,
generation: AtomicU64,
files: Vec<FuseNode>,
// file handle counter
file_handle_counter: AtomicU64,
// file system generations
generation: AtomicU64,
// timeout
timeout: Duration,
// file read reply channel
tx: Sender<ClipboardFile>,
// file read reply channel
rx: Receiver<ClipboardFile>,
}
impl FuseServer {
/// create a new fuse server
pub fn new(timeout: Duration) -> Self {
let (tx, rx) = std::sync::mpsc::channel();
Self {
status: RwLock::new(Status::Active),
dispatcher: CliprdrTxnDispatcher::default(),
files: RwLock::new(Vec::new()),
file_handle_counter: AtomicU64::new(0),
generation: AtomicU64::new(0),
files: Vec::new(),
file_handle_counter: AtomicU64::new(0),
timeout,
rx,
tx,
}
}
pub fn client(self: &Arc<Self>) -> FuseClient {
FuseClient::new(self.clone())
pub fn client(server: Arc<Mutex<Self>>) -> FuseClient {
FuseClient { server }
}
}
pub fn init(&self) {
let mut w_guard = self.files.write();
if w_guard.is_empty() {
impl FuseServer {
pub fn serve(&mut self, reply: ClipboardFile) -> Result<(), CliprdrError> {
self.tx.send(reply).map_err(|e| {
log::error!("failed to serve cliprdr reply from endpoint: {:?}", e);
CliprdrError::ClipboardInternalError
})?;
Ok(())
}
}
impl fuser::Filesystem for FuseServer {
fn init(
&mut self,
_req: &fuser::Request<'_>,
_config: &mut fuser::KernelConfig,
) -> Result<(), libc::c_int> {
if self.files.is_empty() {
// create a root file
let root = FuseNode::new_root();
w_guard.push(root);
self.files.push(root);
}
Ok(())
}
pub fn look_up(&self, parent: u64, name: &std::ffi::OsStr, reply: fuser::ReplyEntry) {
fn lookup(
&mut self,
_req: &fuser::Request<'_>,
parent: u64,
name: &std::ffi::OsStr,
reply: fuser::ReplyEntry,
) {
if name.len() > MAX_NAME_LEN {
log::debug!("fuse: name too long");
reply.error(libc::ENAMETOOLONG);
return;
}
let entries = self.files.read();
let entries = &self.files;
let generation = self.generation.load(Ordering::Relaxed);
@@ -353,8 +256,14 @@ impl FuseServer {
return;
}
pub fn opendir(&self, ino: u64, flags: i32, reply: fuser::ReplyOpen) {
let files = self.files.read();
fn opendir(
&mut self,
_req: &fuser::Request<'_>,
ino: u64,
flags: i32,
reply: fuser::ReplyOpen,
) {
let files = &self.files;
let Some(entry) = files.get(ino as usize - 1) else {
reply.error(libc::ENOENT);
log::error!("fuse: opendir: entry not found");
@@ -383,8 +292,15 @@ impl FuseServer {
return;
}
pub fn readdir(&self, ino: u64, fh: u64, offset: i64, mut reply: ReplyDirectory) {
let files = self.files.read();
fn readdir(
&mut self,
_req: &fuser::Request<'_>,
ino: u64,
fh: u64,
offset: i64,
mut reply: ReplyDirectory,
) {
let files = &self.files;
let Some(entry) = files.get(ino as usize - 1) else {
reply.error(libc::ENOENT);
log::error!("fuse: readdir: entry not found");
@@ -429,8 +345,15 @@ impl FuseServer {
return;
}
pub fn releasedir(&self, ino: u64, fh: u64, _flags: i32, reply: fuser::ReplyEmpty) {
let files = self.files.read();
fn releasedir(
&mut self,
_req: &fuser::Request<'_>,
ino: u64,
fh: u64,
_flags: i32,
reply: fuser::ReplyEmpty,
) {
let files = &self.files;
let Some(entry) = files.get(ino as usize - 1) else {
reply.error(libc::ENOENT);
log::error!("fuse: releasedir: entry not found");
@@ -452,8 +375,8 @@ impl FuseServer {
return;
}
pub fn open(&self, ino: u64, flags: i32, reply: fuser::ReplyOpen) {
let files = self.files.read();
fn open(&mut self, _req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) {
let files = &self.files;
let Some(entry) = files.get(ino as usize - 1) else {
reply.error(libc::ENOENT);
log::error!("fuse: open: entry not found");
@@ -485,8 +408,9 @@ impl FuseServer {
return;
}
pub fn read(
&self,
fn read(
&mut self,
_req: &fuser::Request<'_>,
ino: u64,
fh: u64,
offset: i64,
@@ -495,7 +419,7 @@ impl FuseServer {
_lock_owner: Option<u64>,
reply: fuser::ReplyData,
) {
let files = self.files.read();
let files = &self.files;
let Some(entry) = files.get(ino as usize - 1) else {
reply.error(libc::ENOENT);
log::error!("fuse: read: entry not found");
@@ -536,8 +460,9 @@ impl FuseServer {
reply.data(bytes.as_slice());
}
pub fn release(
&self,
fn release(
&mut self,
_req: &fuser::Request<'_>,
ino: u64,
fh: u64,
_flags: i32,
@@ -545,7 +470,7 @@ impl FuseServer {
_flush: bool,
reply: fuser::ReplyEmpty,
) {
let files = self.files.read();
let files = &self.files;
let Some(entry) = files.get(ino as usize - 1) else {
reply.error(libc::ENOENT);
log::error!("fuse: release: entry not found");
@@ -560,10 +485,12 @@ impl FuseServer {
reply.ok();
return;
}
}
impl FuseServer {
// get files and directory path right in root of FUSE fs
pub fn list_root(&self) -> Vec<PathBuf> {
let files = self.files.read();
let files = &self.files;
let children = &files[0].children;
let mut paths = Vec::with_capacity(children.len());
for idx in children.iter().copied() {
@@ -572,59 +499,19 @@ impl FuseServer {
paths
}
/// gc filesystem
fn gc_files(&self) {
{
let mut status = self.status.write();
// received update after fetching complete
// should fetch again
if *status == Status::Building {
*status = Status::GcComplete;
return;
}
// really update only when:
// running: Active
if *status != Status::Active {
return;
}
*status = Status::Gc;
}
let mut old = self.files.write();
let _ = old.par_iter_mut().fold(|| (), |_, f| f.gc());
let mut status = self.status.write();
*status = Status::GcComplete;
}
/// fetch file list from remote
fn sync_file_system(
&self,
&mut self,
conn_id: i32,
file_group_format_id: i32,
_file_contents_format_id: i32,
) -> Result<bool, CliprdrError> {
{
let mut status = self.status.write();
if *status != Status::GcComplete {
return Ok(false);
}
*status = Status::Fetching;
}
// request file list
let request = ClipboardFile::FormatDataRequest {
requested_format_id: file_group_format_id,
};
let rx = self.dispatcher.send(conn_id, request);
let resp = rx.recv_timeout(self.timeout);
let resp = self.send_sync_fs_request(conn_id, file_group_format_id, self.timeout)?;
let descs = match resp {
Ok(ClipboardFile::FormatDataResponse {
ClipboardFile::FormatDataResponse {
msg_flags,
format_data,
}) => {
} => {
if msg_flags != 0x1 {
log::error!("clipboard FUSE server: received unexpected response flags");
return Err(CliprdrError::ClipboardInternalError);
@@ -633,155 +520,61 @@ impl FuseServer {
descs
}
Ok(_) => {
_ => {
log::error!("clipboard FUSE server: received unexpected response type");
// rollback status
let mut status = self.status.write();
*status = Status::GcComplete;
return Err(CliprdrError::ClipboardInternalError);
}
Err(e) => {
log::error!("clipboard FUSE server: failed to fetch file list, {:?}", e);
// rollback status
let mut status = self.status.write();
*status = Status::GcComplete;
return Err(CliprdrError::ClipboardInternalError);
}
};
{
// fetch successful, start building
let mut status = self.status.write();
*status = Status::Building;
}
let mut new_tree = FuseNode::build_tree(descs)?;
let res = new_tree
.par_iter_mut()
.iter_mut()
.filter(|f_node| f_node.is_file() && f_node.attributes.size == 0)
.fold(|| Ok(()), |_, f_node| self.sync_node_size(f_node))
.find_last(|p| p.is_err());
.try_for_each(|f_node| self.sync_node_size(f_node));
if res.is_some() {
// rollback status on failure
let mut status = self.status.write();
if *status == Status::Building {
*status = Status::GcComplete;
}
log::error!("clipboard FUSE server: failed to fetch file size");
if let Err(err) = res {
log::error!(
"clipboard FUSE server: failed to fetch file size: {:?}",
err
);
return Err(CliprdrError::ClipboardInternalError);
}
// replace current file system
let mut old = self.files.write();
{
let mut status = self.status.write();
if *status != Status::Building {
// build interrupted, meaning fetched data is outdated
// do not replace
return Ok(false);
}
*status = Status::Active;
}
*old = new_tree;
self.files = new_tree;
self.generation.fetch_add(1, Ordering::Relaxed);
Ok(true)
}
/// replace current files with new files, cucurrently
///
/// # Note
///
/// This function should allow concurrent calls. In short, the server can handle multiple update_file calles
/// at a short period of time and make sure it call RPCs as few and late as possible.
///
/// ## Function Phases
///
/// ### clear phase
///
/// - just mark all files to be deleted, all new `open` operations will be denied
/// - current FDs will not be affected, listing (in this level of directory) and reading operations can still be performed.
/// - this will return only when all FDs are closed, or some unexpected error occurs
/// - after all FDs are closed and no more FDs can be opened, dropping the current file list will be safe
///
/// ### request phase
///
/// - after all FDs are closed, send a format data request to the clipboard server
///
/// ### replace phase
///
/// - after all FDs are closed, the file list will be replaced with the new file list
///
/// ## Concurrent calls
///
/// ### server is Active
///
/// threads calling this function may win getting the write lock on server.status:
/// - the winner will start [clear phase], changing the server to Gc.
/// - the loser or later comming threads calling `server.gc_files` will return directly.
///
/// movement: Active -> Gc
///
/// ### server is Gc
///
/// this indicates there must be exactly one thread running in [clear phase].
/// - the thread will run `server.sync_file_system` after this phase
/// - other threads try to call `server.gc_files` will return directly
/// - other threads try to call `server.sync_file_system` will return directly
/// - no other threads could be running `server.sync_file_system`
///
/// after all, only one thread will successfully complete the [clear phase], and that thread will try to complete the whole updating.
///
/// movement: Gc -> GcComplete
///
/// ### server is GcComplete
///
/// This indicates there must be at least one thread trying to call `server.sync_file_system`.
/// threads will trying to get the write lock of status.
/// - the winner will set status to Fetching.
/// - the latter threads get the write lock, only to find the status is not `GcComplete`, return directly.
/// - there might be threads trying to call `server.gc_files`, but will return directly and call `server.sync_file_system`.
///
/// movement: GcComplete -> Fetching
///
/// ### server is Fetching
///
/// This indicates there must be exactly one thread running in `server.sync_file_system`, in its fetching phase.
/// - any other threads calling this function will return directly.
/// - after fetching finishes, it will set status to Building
/// - timeout may reach, then we rollback
///
/// movement: Fetching -> Building
/// failure: Fetching -> GcComplete
///
/// ### server is Building
///
/// The reason why we have this status is to prevent requesting outdated data.
/// There should be exactly one thread start running [replace phase] and might be other threads trying to call `gc_files`
/// - if the building phase is finished, the thread will set status to Active, and other threads may run [clear phase]
/// - if the building phase is interrupted, the thread will quit, and other threads will skip the clear phase, try to fetch directly.
///
/// movements: Building -> Active, Building -> GcComplete
///
pub fn update_files(
fn send_sync_fs_request(
&self,
conn_id: i32,
file_group_format_id: i32,
timeout: std::time::Duration,
) -> Result<ClipboardFile, CliprdrError> {
// request file list
let data = ClipboardFile::FormatDataRequest {
requested_format_id: file_group_format_id,
};
send_data(conn_id, data);
self.rx.recv_timeout(timeout).map_err(|e| {
log::error!("failed to receive file list from channel: {:?}", e);
CliprdrError::ClipboardInternalError
})
}
pub fn update_files(
&mut self,
conn_id: i32,
file_group_format_id: i32,
file_contents_format_id: i32,
) -> Result<bool, CliprdrError> {
self.gc_files();
self.sync_file_system(conn_id, file_group_format_id, file_contents_format_id)
}
pub fn recv(&self, conn_id: i32, clip_file: ClipboardFile) {
self.dispatcher.recv(conn_id, clip_file)
}
/// allocate a new file descriptor
fn alloc_fd(&self) -> u64 {
self.file_handle_counter.fetch_add(1, Ordering::Relaxed)
@@ -807,7 +600,7 @@ impl FuseServer {
clip_data_id: 0,
};
let rx = self.dispatcher.send(node.conn_id, request);
send_data(node.conn_id, request);
log::debug!(
"waiting for metadata sync reply for {:?} on channel {}",
@@ -815,8 +608,10 @@ impl FuseServer {
node.conn_id
);
let reply = rx.recv_timeout(self.timeout)?;
let reply = self
.rx
.recv_timeout(self.timeout)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::TimedOut, e))?;
log::debug!(
"got metadata sync reply for {:?} on channel {}",
node.name,
@@ -897,7 +692,7 @@ impl FuseServer {
clip_data_id: 0,
};
let rx = self.dispatcher.send(node.conn_id, request);
send_data(node.conn_id, request);
log::debug!(
"waiting for read reply for {:?} on stream: {}",
@@ -905,7 +700,10 @@ impl FuseServer {
node.stream_id
);
let reply = rx.recv_timeout(self.timeout)?;
let reply = self.rx.recv_timeout(self.timeout).map_err(|e| {
log::error!("failed to receive file list from channel: {:?}", e);
std::io::Error::new(std::io::ErrorKind::TimedOut, e)
})?;
match reply {
ClipboardFile::FileContentsResponse {
@@ -961,7 +759,7 @@ impl FileDescription {
// skip reserved 32 bytes
bytes.advance(32);
let attributes = bytes.get_u32_le();
// skip reserverd 16 bytes
// skip reserved 16 bytes
bytes.advance(16);
// last write time from 1601-01-01 00:00:00, in 100ns
let last_write_time = bytes.get_u64_le();
@@ -1126,11 +924,6 @@ impl FuseNode {
self.file_handlers.marked()
}
/// mark all files to be deleted
pub fn gc(&mut self) {
self.file_handlers.mark_and_wait()
}
pub fn add_handler(&self, fh: u64) {
self.file_handlers.add_handler(fh)
}
@@ -1336,18 +1129,6 @@ impl FileHandles {
self.handlers.lock().push(fh);
}
// wait till gc completes
pub fn mark_and_wait(&self) {
let mut handlers = self.handlers.lock();
self.gc.store(true, Ordering::Relaxed);
loop {
if handlers.is_empty() {
return;
}
self.waiter.wait(&mut handlers);
}
}
pub fn marked(&self) -> bool {
self.gc.load(Ordering::Relaxed)
}