From 0351b32ab6bc8792481e52424fa8ac8bb7543254 Mon Sep 17 00:00:00 2001 From: "benato.denis96@gmail.com" Date: Thu, 12 Mar 2026 09:09:51 +0100 Subject: [PATCH] Feat: add fuse mount --- Cargo.lock | 251 +++++++++++++++ Cargo.toml | 2 + README.md | 32 ++ src/fuse_mount.rs | 759 ++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 226 +++++++------- src/vfs.rs | 270 +++++++++++++++++ 6 files changed, 1424 insertions(+), 116 deletions(-) create mode 100644 src/fuse_mount.rs create mode 100644 src/vfs.rs diff --git a/Cargo.lock b/Cargo.lock index 010146a..d27d206 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,6 +66,12 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + [[package]] name = "base64" version = "0.22.1" @@ -138,6 +144,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chacha20" version = "0.10.0" @@ -286,6 +298,26 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fuser" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80a5eca878900c2e39e9e52fd797954b7fc39eeefc8558257114bfea6a698fcf" +dependencies = [ + "bitflags", + "libc", + "log", + "memchr", + "nix", + "num_enum", + "page_size", + "parking_lot", + "pkg-config", + "ref-cast", + "smallvec", + "zerocopy", +] + [[package]] name = "futures-channel" version = "0.3.32" @@ -724,6 +756,15 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.29" @@ -752,6 +793,15 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -773,12 +823,47 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", + "memoffset", +] + [[package]] name = "num-conv" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" +[[package]] +name = "num_enum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1207a7e20ad57b847bbddc6776b968420d38292bbfe2089accff5e19e82454c" +dependencies = [ + "num_enum_derive", + "rustversion", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -791,6 +876,39 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "page_size" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d5b2194ed13191c1999ae0704b7839fb18384fa22e49b57eeaa97d79ce40da" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -840,6 +958,15 @@ dependencies = [ "syn", ] +[[package]] +name = "proc-macro-crate" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" +dependencies = [ + "toml_edit", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -897,6 +1024,35 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + +[[package]] +name = "ref-cast" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "reqx" version = "0.1.27" @@ -1021,6 +1177,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + [[package]] name = "ryu" version = "1.0.23" @@ -1071,6 +1233,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "security-framework" version = "3.7.0" @@ -1217,6 +1385,8 @@ name = "swfss3" version = "0.1.0" dependencies = [ "argh", + "fuser", + "libc", "s3", ] @@ -1371,6 +1541,36 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml_datetime" +version = "1.0.0+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32c2555c699578a4f59f0cc68e5116c8d7cabbd45e1409b989d4be085b53f13e" +dependencies = [ + "serde_core", +] + +[[package]] +name = "toml_edit" +version = "0.25.4+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7193cbd0ce53dc966037f54351dbbcf0d5a642c7f0038c382ef9e677ce8c13f2" +dependencies = [ + "indexmap", + "toml_datetime", + "toml_parser", + "winnow", +] + +[[package]] +name = "toml_parser" +version = "1.0.9+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "702d4415e08923e7e1ef96cd5727c0dfed80b4d2fa25db9647fe5eb6f7c5a4c4" +dependencies = [ + "winnow", +] + [[package]] name = "tower-service" version = "0.3.3" @@ -1592,6 +1792,22 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + [[package]] name = "winapi-util" version = "0.1.11" @@ -1601,6 +1817,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-link" version = "0.2.1" @@ -1755,6 +1977,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945" +dependencies = [ + "memchr", +] + [[package]] name = "wit-bindgen" version = "0.51.0" @@ -1872,6 +2103,26 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zerocopy" +version = "0.8.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zerofrom" version = "0.1.6" diff --git a/Cargo.toml b/Cargo.toml index d180ed1..5a0f69e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,4 +5,6 @@ edition = "2024" [dependencies] argh = "0.1.13" +fuser = "0.17.0" +libc = "0.2.183" s3 = { version = "0.1.22", features = ["blocking", "rustls"] } diff --git a/README.md b/README.md index d0c55df..eb2b13c 100644 --- a/README.md +++ b/README.md @@ -179,6 +179,37 @@ swfss3 --endpoint http://localhost:8333 --bucket mybucket --path-style \ rm-prefix some/path/ --yes ``` +### Mount bucket via FUSE3 (Linux) + +Mount the whole bucket read-only: + +```bash +mkdir -p /tmp/s3mnt +swfss3 --endpoint http://localhost:8333 --bucket mybucket --path-style \ + mount /tmp/s3mnt +``` + +Mount only a prefix (the prefix becomes the filesystem root): + +```bash +swfss3 --endpoint http://localhost:8333 --bucket mybucket --path-style \ + mount /tmp/s3mnt --prefix photos/2025/ +``` + +Enable writes (naive buffered uploads on flush/close): + +```bash +swfss3 --endpoint http://localhost:8333 --bucket mybucket --path-style \ + mount /tmp/s3mnt --read-write +``` + +Allow access by other users (requires `user_allow_other` in `/etc/fuse.conf`): + +```bash +swfss3 --endpoint http://localhost:8333 --bucket mybucket --path-style \ + mount /tmp/s3mnt --allow-other +``` + ## Troubleshooting ### `NoSuchBucket` but the bucket “exists” @@ -205,4 +236,5 @@ swfss3 mv --help swfss3 presign-get --help swfss3 presign-put --help swfss3 rm-prefix --help +swfss3 mount --help ``` diff --git a/src/fuse_mount.rs b/src/fuse_mount.rs new file mode 100644 index 0000000..52e2cbf --- /dev/null +++ b/src/fuse_mount.rs @@ -0,0 +1,759 @@ +#![cfg(target_os = "linux")] + +use std::{ + collections::HashMap, + ffi::OsStr, + io::Cursor, + path::Path, + sync::{Arc, Mutex}, + time::{Duration, SystemTime}, +}; + +use fuser::{ + Errno, FileAttr, FileHandle, FileType, Filesystem, FopenFlags, Generation, INodeNo, LockOwner, + MountOption, OpenFlags, RenameFlags, ReplyAttr, ReplyCreate, ReplyData, ReplyDirectory, + ReplyEmpty, ReplyEntry, ReplyOpen, ReplyWrite, Request, WriteFlags, +}; + +use crate::vfs::{ObjectStat, VfsError, VirtualFilesystem}; + +const ROOT_INO: u64 = 1; +const TTL: Duration = Duration::from_secs(1); + +#[derive(Debug, Clone)] +enum NodeKind { + Dir, + File { key: String, stat: ObjectStat }, +} + +#[derive(Debug, Clone)] +struct Node { + ino: u64, + parent: u64, + name: String, + full_path: String, // without leading '/' + kind: NodeKind, +} + +#[derive(Debug)] +struct OpenFile { + key: String, + data: Vec, + dirty: bool, +} + +#[derive(Default, Debug)] +struct State { + next_ino: u64, + // (parent ino, name) -> ino + children: HashMap<(u64, String), u64>, + nodes: HashMap, + next_fh: u64, + open_files: HashMap, +} + +impl State { + fn alloc_ino(&mut self) -> u64 { + if self.next_ino == 0 { + self.next_ino = ROOT_INO + 1; + } + let ino = self.next_ino; + self.next_ino += 1; + ino + } + + fn alloc_fh(&mut self) -> u64 { + self.next_fh += 1; + self.next_fh + } +} + +pub fn mount_bucket( + vfs: Arc, + mountpoint: &Path, + base_prefix: &str, + read_write: bool, + allow_other: bool, +) -> Result<(), VfsError> { + let prefix = normalize_prefix(base_prefix); + let fs = S3FuseFs::new(vfs, prefix, read_write); + + let mut opts = vec![MountOption::FSName("swfss3".into())]; + if read_write { + opts.push(MountOption::RW); + } else { + opts.push(MountOption::RO); + } + // fuser requires AutoUnmount to be paired with allow_other/allow_root semantics. + if allow_other { + opts.push(MountOption::AutoUnmount); + } + + let mut cfg = fuser::Config::default(); + cfg.mount_options = opts; + if allow_other { + cfg.acl = fuser::SessionACL::All; + } + + fuser::mount2(fs, mountpoint, &cfg).map_err(VfsError::Io)?; + Ok(()) +} + +fn normalize_prefix(prefix: &str) -> String { + let p = prefix.trim_start_matches('/'); + if p.is_empty() { + String::new() + } else if p.ends_with('/') { + p.to_string() + } else { + format!("{p}/") + } +} + +fn join_key(prefix: &str, path: &str) -> String { + if prefix.is_empty() { + path.to_string() + } else if path.is_empty() { + prefix.trim_end_matches('/').to_string() + } else { + format!("{prefix}{path}") + } +} + +fn dir_key(prefix: &str, path: &str) -> String { + if path.is_empty() { + prefix.to_string() + } else if prefix.is_empty() { + format!("{path}/") + } else { + format!("{prefix}{path}/") + } +} + +fn file_attr(ino: u64, size: u64, rw: bool) -> FileAttr { + let perm = if rw { 0o644 } else { 0o444 }; + FileAttr { + ino: INodeNo(ino), + size, + blocks: (size + 511) / 512, + atime: SystemTime::now(), + mtime: SystemTime::now(), + ctime: SystemTime::now(), + crtime: SystemTime::now(), + kind: FileType::RegularFile, + perm, + nlink: 1, + uid: unsafe { libc::getuid() }, + gid: unsafe { libc::getgid() }, + rdev: 0, + blksize: 512, + flags: 0, + } +} + +fn dir_attr(ino: u64) -> FileAttr { + FileAttr { + ino: INodeNo(ino), + size: 0, + blocks: 0, + atime: SystemTime::now(), + mtime: SystemTime::now(), + ctime: SystemTime::now(), + crtime: SystemTime::now(), + kind: FileType::Directory, + perm: 0o755, + nlink: 2, + uid: unsafe { libc::getuid() }, + gid: unsafe { libc::getgid() }, + rdev: 0, + blksize: 512, + flags: 0, + } +} + +struct S3FuseFs { + vfs: Arc, + prefix: String, + read_write: bool, + state: Mutex, +} + +impl S3FuseFs { + fn new(vfs: Arc, prefix: String, read_write: bool) -> Self { + let mut state = State::default(); + state.nodes.insert( + ROOT_INO, + Node { + ino: ROOT_INO, + parent: ROOT_INO, + name: String::new(), + full_path: String::new(), + kind: NodeKind::Dir, + }, + ); + Self { + vfs, + prefix, + read_write, + state: Mutex::new(state), + } + } + + fn node_attr(&self, node: &Node) -> FileAttr { + match &node.kind { + NodeKind::Dir => dir_attr(node.ino), + NodeKind::File { stat, .. } => file_attr(node.ino, stat.content_length.unwrap_or(0), self.read_write), + } + } + + fn ensure_child( + &self, + parent: u64, + name: &str, + kind: NodeKind, + ) -> u64 { + let mut st = self.state.lock().unwrap(); + if let Some(ino) = st.children.get(&(parent, name.to_string())).copied() { + return ino; + } + let parent_path = st + .nodes + .get(&parent) + .map(|n| n.full_path.clone()) + .unwrap_or_default(); + let full_path = if parent_path.is_empty() { + name.to_string() + } else { + format!("{parent_path}/{name}") + }; + let ino = st.alloc_ino(); + st.children + .insert((parent, name.to_string()), ino); + st.nodes.insert( + ino, + Node { + ino, + parent, + name: name.to_string(), + full_path, + kind, + }, + ); + ino + } + + fn lookup_child(&self, parent: u64, name: &str) -> Option { + let st = self.state.lock().unwrap(); + let ino = st.children.get(&(parent, name.to_string())).copied()?; + st.nodes.get(&ino).cloned() + } + + fn get_node(&self, ino: u64) -> Option { + let st = self.state.lock().unwrap(); + st.nodes.get(&ino).cloned() + } + + fn invalidate_child(&self, parent: u64, name: &str) { + let mut st = self.state.lock().unwrap(); + if let Some(ino) = st.children.remove(&(parent, name.to_string())) { + st.nodes.remove(&ino); + } + } + + fn resolve_on_demand(&self, parent: u64, name: &str) -> Result, VfsError> { + if let Some(n) = self.lookup_child(parent, name) { + return Ok(Some(n)); + } + let parent_node = match self.get_node(parent) { + Some(n) => n, + None => return Ok(None), + }; + if !matches!(parent_node.kind, NodeKind::Dir) { + return Ok(None); + } + + let child_path = if parent_node.full_path.is_empty() { + name.to_string() + } else { + format!("{}/{}", parent_node.full_path, name) + }; + + // Prefer file existence. + let file_key = join_key(&self.prefix, &child_path); + match self.vfs.stat(&file_key) { + Ok(stat) => { + let ino = self.ensure_child( + parent, + name, + NodeKind::File { + key: file_key, + stat, + }, + ); + return Ok(self.get_node(ino)); + } + Err(VfsError::S3(_)) | Err(VfsError::NotFound(_)) => { + // fall through to directory check + } + Err(e) => return Err(e), + } + + // Directory check: does anything exist under dir/ prefix? + let dk = dir_key(&self.prefix, &child_path); + let page = self.vfs.list_page(&dk, false, Some(1))?; + if !page.common_prefixes.is_empty() || !page.keys.is_empty() { + let ino = self.ensure_child(parent, name, NodeKind::Dir); + return Ok(self.get_node(ino)); + } + + Ok(None) + } + + fn read_object_range(&self, key: &str, offset: i64, size: u32) -> Result, VfsError> { + if offset < 0 { + return Ok(Vec::new()); + } + let start = offset as u64; + let end_inclusive = start.saturating_add(size as u64).saturating_sub(1); + self.vfs.read_bytes(key, Some((start, end_inclusive))) + } + + fn read_object_all(&self, key: &str) -> Result, VfsError> { + self.vfs.read_bytes(key, None) + } +} + +impl Filesystem for S3FuseFs { + fn getattr(&self, _req: &Request, ino: INodeNo, _fh: Option, reply: ReplyAttr) { + let ino: u64 = ino.into(); + let Some(node) = self.get_node(ino) else { + reply.error(Errno::ENOENT); + return; + }; + + // Refresh file stat opportunistically. + if let NodeKind::File { key, .. } = &node.kind { + if let Ok(stat) = self.vfs.stat(key) { + let _ = self.ensure_child(node.parent, &node.name, NodeKind::File { key: key.clone(), stat }); + } + } + + let Some(node) = self.get_node(ino) else { + reply.error(Errno::ENOENT); + return; + }; + reply.attr(&TTL, &self.node_attr(&node)); + } + + fn lookup(&self, _req: &Request, parent: INodeNo, name: &OsStr, reply: ReplyEntry) { + let parent: u64 = parent.into(); + let Some(name) = name.to_str() else { + reply.error(Errno::ENOENT); + return; + }; + match self.resolve_on_demand(parent, name) { + Ok(Some(node)) => reply.entry(&TTL, &self.node_attr(&node), Generation(0)), + Ok(None) => reply.error(Errno::ENOENT), + Err(_) => reply.error(Errno::EIO), + } + } + + fn readdir( + &self, + _req: &Request, + ino: INodeNo, + _fh: FileHandle, + offset: u64, + mut reply: ReplyDirectory, + ) { + let ino: u64 = ino.into(); + let Some(node) = self.get_node(ino) else { + reply.error(Errno::ENOENT); + return; + }; + if !matches!(node.kind, NodeKind::Dir) { + reply.error(Errno::ENOTDIR); + return; + } + + let dir_prefix_key = dir_key(&self.prefix, &node.full_path); + let page = match self.vfs.list_page(&dir_prefix_key, false, None) { + Ok(p) => p, + Err(_) => { + reply.error(Errno::EIO); + return; + } + }; + + // Build entries: . .. then prefixes + objects. + let mut entries: Vec<(u64, FileType, String)> = Vec::new(); + entries.push((ino, FileType::Directory, ".".into())); + entries.push((node.parent, FileType::Directory, "..".into())); + + for p in page.common_prefixes { + let name = p + .strip_prefix(&dir_prefix_key) + .unwrap_or(&p) + .trim_end_matches('/') + .to_string(); + if name.is_empty() { + continue; + } + let child_ino = self.ensure_child(ino, &name, NodeKind::Dir); + entries.push((child_ino, FileType::Directory, name)); + } + + for k in page.keys { + if k == dir_prefix_key { + continue; + } + let name = k + .strip_prefix(&dir_prefix_key) + .unwrap_or(&k) + .to_string(); + if name.is_empty() || name.contains('/') { + continue; + } + let stat = match self.vfs.stat(&k) { + Ok(s) => s, + Err(_) => ObjectStat { + content_length: None, + content_type: None, + etag: None, + }, + }; + let child_ino = self.ensure_child( + ino, + &name, + NodeKind::File { + key: k.clone(), + stat, + }, + ); + entries.push((child_ino, FileType::RegularFile, name)); + } + + for (i, (child_ino, kind, name)) in entries.into_iter().enumerate().skip(offset as usize) { + let off = (i + 1) as u64; + if reply.add(INodeNo(child_ino), off, kind, name) { + break; + } + } + reply.ok(); + } + + fn open(&self, _req: &Request, ino: INodeNo, flags: OpenFlags, reply: ReplyOpen) { + let ino: u64 = ino.into(); + let Some(node) = self.get_node(ino) else { + reply.error(Errno::ENOENT); + return; + }; + let write = match flags.acc_mode() { + fuser::OpenAccMode::O_RDONLY => false, + fuser::OpenAccMode::O_WRONLY | fuser::OpenAccMode::O_RDWR => true, + }; + if write && !self.read_write { + reply.error(Errno::EROFS); + return; + } + if let NodeKind::Dir = node.kind { + reply.error(Errno::EISDIR); + return; + } + if !write { + reply.opened(FileHandle(0), FopenFlags::empty()); + return; + } + + // Allocate a file handle for buffered writes. + let NodeKind::File { key, .. } = node.kind else { + reply.error(Errno::EIO); + return; + }; + + let mut st = self.state.lock().unwrap(); + let fh_u64 = st.alloc_fh(); + let fh = FileHandle(fh_u64); + st.open_files.insert( + fh_u64, + OpenFile { + key: key.clone(), + data: Vec::new(), + dirty: false, + }, + ); + drop(st); + + // Preload existing object if present (best-effort). + if let Ok(mut bytes) = self.read_object_all(&key) { + let mut st = self.state.lock().unwrap(); + if let Some(of) = st.open_files.get_mut(&fh_u64) { + of.data = std::mem::take(&mut bytes); + } + } + + reply.opened(fh, FopenFlags::empty()); + } + + fn read( + &self, + _req: &Request, + ino: INodeNo, + _fh: FileHandle, + offset: u64, + size: u32, + _flags: OpenFlags, + _lock_owner: Option, + reply: ReplyData, + ) { + let ino: u64 = ino.into(); + let Some(node) = self.get_node(ino) else { + reply.error(Errno::ENOENT); + return; + }; + let NodeKind::File { key, .. } = node.kind else { + reply.error(Errno::EISDIR); + return; + }; + match self.read_object_range(&key, offset as i64, size) { + Ok(bytes) => reply.data(&bytes), + Err(_) => reply.error(Errno::EIO), + } + } + + fn create( + &self, + _req: &Request, + parent: INodeNo, + name: &OsStr, + _mode: u32, + _umask: u32, + flags: i32, + reply: ReplyCreate, + ) { + if !self.read_write { + reply.error(Errno::EROFS); + return; + } + let parent: u64 = parent.into(); + let Some(name) = name.to_str() else { + reply.error(Errno::EINVAL); + return; + }; + let Some(parent_node) = self.get_node(parent) else { + reply.error(Errno::ENOENT); + return; + }; + if !matches!(parent_node.kind, NodeKind::Dir) { + reply.error(Errno::ENOTDIR); + return; + } + let path = if parent_node.full_path.is_empty() { + name.to_string() + } else { + format!("{}/{}", parent_node.full_path, name) + }; + let key = join_key(&self.prefix, &path); + + // Create empty placeholder object. + let empty = Cursor::new(Vec::::new()); + if let Err(_) = self + .vfs + .write_from_reader(&key, 0, Box::new(empty), None) + { + reply.error(Errno::EIO); + return; + } + let stat = self.vfs.stat(&key).unwrap_or(ObjectStat { + content_length: Some(0), + content_type: None, + etag: None, + }); + let ino = self.ensure_child(parent, name, NodeKind::File { key: key.clone(), stat }); + + // Allocate a file handle for buffered writes. + let mut st = self.state.lock().unwrap(); + let fh = st.alloc_fh(); + st.open_files.insert( + fh, + OpenFile { + key, + data: Vec::new(), + dirty: false, + }, + ); + + let node = st.nodes.get(&ino).cloned().unwrap(); + let attr = self.node_attr(&node); + let _flags = OpenFlags(flags); + reply.created( + &TTL, + &attr, + Generation(0), + FileHandle(fh), + FopenFlags::empty(), + ); + } + + fn write( + &self, + _req: &Request, + _ino: INodeNo, + fh: FileHandle, + offset: u64, + data: &[u8], + _write_flags: WriteFlags, + _flags: OpenFlags, + _lock_owner: Option, + reply: ReplyWrite, + ) { + if !self.read_write { + reply.error(Errno::EROFS); + return; + } + let fh = fh.0; + let mut st = self.state.lock().unwrap(); + let Some(of) = st.open_files.get_mut(&fh) else { + reply.error(Errno::EBADF); + return; + }; + let off = offset as usize; + let needed = off.saturating_add(data.len()); + if of.data.len() < needed { + of.data.resize(needed, 0); + } + of.data[off..off + data.len()].copy_from_slice(data); + of.dirty = true; + reply.written(data.len() as u32); + } + + fn flush(&self, _req: &Request, _ino: INodeNo, fh: FileHandle, _lock_owner: LockOwner, reply: ReplyEmpty) { + if !self.read_write { + reply.ok(); + return; + } + let fh = fh.0; + let mut st = self.state.lock().unwrap(); + let Some(of) = st.open_files.get_mut(&fh) else { + reply.error(Errno::EBADF); + return; + }; + if !of.dirty { + reply.ok(); + return; + } + + let key = of.key.clone(); + let len = of.data.len() as u64; + let cur = Cursor::new(std::mem::take(&mut of.data)); + drop(st); + if let Err(_) = self + .vfs + .write_from_reader(&key, len, Box::new(cur), None) + { + reply.error(Errno::EIO); + return; + } + + let mut st = self.state.lock().unwrap(); + if let Some(of) = st.open_files.get_mut(&fh) { + of.dirty = false; + } + reply.ok(); + } + + fn release( + &self, + _req: &Request, + _ino: INodeNo, + fh: FileHandle, + _flags: OpenFlags, + _lock_owner: Option, + _flush: bool, + reply: ReplyEmpty, + ) { + let mut st = self.state.lock().unwrap(); + st.open_files.remove(&fh.0); + reply.ok(); + } + + fn unlink(&self, _req: &Request, parent: INodeNo, name: &OsStr, reply: ReplyEmpty) { + if !self.read_write { + reply.error(Errno::EROFS); + return; + } + let parent: u64 = parent.into(); + let Some(name) = name.to_str() else { + reply.error(Errno::EINVAL); + return; + }; + let Some(node) = self.lookup_child(parent, name) else { + reply.error(Errno::ENOENT); + return; + }; + let NodeKind::File { key, .. } = node.kind else { + reply.error(Errno::EISDIR); + return; + }; + if let Err(_) = self.vfs.delete(&key) { + reply.error(Errno::EIO); + return; + } + self.invalidate_child(parent, name); + reply.ok(); + } + + fn rename( + &self, + _req: &Request, + parent: INodeNo, + name: &OsStr, + newparent: INodeNo, + newname: &OsStr, + _flags: RenameFlags, + reply: ReplyEmpty, + ) { + if !self.read_write { + reply.error(Errno::EROFS); + return; + } + let (Some(name), Some(newname)) = (name.to_str(), newname.to_str()) else { + reply.error(Errno::EINVAL); + return; + }; + let parent: u64 = parent.into(); + let newparent: u64 = newparent.into(); + let Some(node) = self.lookup_child(parent, name) else { + reply.error(Errno::ENOENT); + return; + }; + let NodeKind::File { key: src_key, .. } = node.kind else { + reply.error(Errno::EXDEV); + return; + }; + let Some(newparent_node) = self.get_node(newparent) else { + reply.error(Errno::ENOENT); + return; + }; + if !matches!(newparent_node.kind, NodeKind::Dir) { + reply.error(Errno::ENOTDIR); + return; + } + let new_path = if newparent_node.full_path.is_empty() { + newname.to_string() + } else { + format!("{}/{}", newparent_node.full_path, newname) + }; + let dst_key = join_key(&self.prefix, &new_path); + + if let Err(_) = self.vfs.copy(&src_key, &dst_key).and_then(|_| self.vfs.delete(&src_key)) { + reply.error(Errno::EIO); + return; + } + + self.invalidate_child(parent, name); + self.invalidate_child(newparent, newname); + reply.ok(); + } +} + diff --git a/src/main.rs b/src/main.rs index 66835d7..0851ebf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,12 @@ use std::{ use argh::FromArgs; use s3::{AddressingStyle, Auth, BlockingClient, Credentials}; +mod vfs; +#[cfg(target_os = "linux")] +mod fuse_mount; + +use vfs::{S3Vfs, VfsError, VirtualFilesystem}; + #[derive(FromArgs, Debug)] /// Manage files in a SeaweedFS S3-compatible bucket. struct Cli { @@ -64,6 +70,7 @@ enum Command { PresignGet(CmdPresignGet), PresignPut(CmdPresignPut), RmPrefix(CmdRmPrefix), + Mount(CmdMount), } #[derive(FromArgs, Debug)] @@ -198,6 +205,27 @@ struct CmdRmPrefix { yes: bool, } +#[derive(FromArgs, Debug)] +/// Mount the bucket (or a prefix) via FUSE3 (Linux only). +#[argh(subcommand, name = "mount")] +struct CmdMount { + /// mountpoint path + #[argh(positional)] + mountpoint: PathBuf, + + /// optional bucket prefix to mount (like "photos/2025/") + #[argh(option, default = "String::new()")] + prefix: String, + + /// enable writes (default is read-only) + #[argh(switch)] + read_write: bool, + + /// pass allow_other (requires user_allow_other in /etc/fuse.conf) + #[argh(switch)] + allow_other: bool, +} + fn main() { let cli: Cli = argh::from_env(); if let Err(err) = run(cli) { @@ -206,11 +234,11 @@ fn main() { } } -fn run(cli: Cli) -> Result<(), s3::Error> { +fn run(cli: Cli) -> Result<(), Box> { if cli.path_style && cli.virtual_hosted { - return Err(s3::Error::invalid_config( + return Err(Box::new(s3::Error::invalid_config( "choose only one of --path-style or --virtual-hosted", - )); + ))); } let addressing_style = if cli.path_style { @@ -239,106 +267,83 @@ fn run(cli: Cli) -> Result<(), s3::Error> { .addressing_style(addressing_style) .build()?; + let vfs = S3Vfs::new(client, cli.bucket); + match cli.cmd { - Command::List(cmd) => cmd_list(&client, &cli.bucket, cmd), - Command::Read(cmd) => cmd_read(&client, &cli.bucket, cmd), - Command::Write(cmd) => cmd_write(&client, &cli.bucket, cmd), - Command::Delete(cmd) => cmd_delete(&client, &cli.bucket, cmd), - Command::Stat(cmd) => cmd_stat(&client, &cli.bucket, cmd), - Command::Cp(cmd) => cmd_cp(&client, &cli.bucket, cmd), - Command::Mv(cmd) => cmd_mv(&client, &cli.bucket, cmd), - Command::PresignGet(cmd) => cmd_presign_get(&client, &cli.bucket, cmd), - Command::PresignPut(cmd) => cmd_presign_put(&client, &cli.bucket, cmd), - Command::RmPrefix(cmd) => cmd_rm_prefix(&client, &cli.bucket, cmd), + Command::List(cmd) => cmd_list(&vfs, cmd).map_err(|e| Box::new(e) as _), + Command::Read(cmd) => cmd_read(&vfs, cmd).map_err(|e| Box::new(e) as _), + Command::Write(cmd) => cmd_write(&vfs, cmd).map_err(|e| Box::new(e) as _), + Command::Delete(cmd) => cmd_delete(&vfs, cmd).map_err(|e| Box::new(e) as _), + Command::Stat(cmd) => cmd_stat(&vfs, cmd).map_err(|e| Box::new(e) as _), + Command::Cp(cmd) => cmd_cp(&vfs, cmd).map_err(|e| Box::new(e) as _), + Command::Mv(cmd) => cmd_mv(&vfs, cmd).map_err(|e| Box::new(e) as _), + Command::PresignGet(cmd) => cmd_presign_get(&vfs, cmd).map_err(|e| Box::new(e) as _), + Command::PresignPut(cmd) => cmd_presign_put(&vfs, cmd).map_err(|e| Box::new(e) as _), + Command::RmPrefix(cmd) => cmd_rm_prefix(&vfs, cmd).map_err(|e| Box::new(e) as _), + Command::Mount(cmd) => cmd_mount(vfs, cmd), } } -fn cmd_list(client: &BlockingClient, bucket: &str, cmd: CmdList) -> Result<(), s3::Error> { - let mut req = client.objects().list_v2(bucket); - if !cmd.prefix.is_empty() { - req = req.prefix(cmd.prefix); - } - if !cmd.recursive { - req = req.delimiter("/"); - } - if let Some(max_keys) = cmd.max_keys { - req = req.max_keys(max_keys); - } - - let print_page = |page: s3::types::ListObjectsV2Output| { - for p in page.common_prefixes { - println!("{p}"); - } - for o in page.contents { - println!("{}", o.key); - } - }; - - if cmd.all { - for page in req.pager() { - print_page(page?); - } +fn cmd_list(vfs: &dyn VirtualFilesystem, cmd: CmdList) -> Result<(), VfsError> { + let page = if cmd.all { + vfs.list_all(&cmd.prefix, cmd.recursive)? } else { - print_page(req.send()?); + vfs.list_page(&cmd.prefix, cmd.recursive, cmd.max_keys)? + }; + for p in page.common_prefixes { + println!("{p}"); + } + for k in page.keys { + println!("{k}"); } - Ok(()) } -fn cmd_read(client: &BlockingClient, bucket: &str, cmd: CmdRead) -> Result<(), s3::Error> { - let obj = client.objects().get(bucket, cmd.key).send()?; - +fn cmd_read(vfs: &dyn VirtualFilesystem, cmd: CmdRead) -> Result<(), VfsError> { + let bytes = vfs.read_bytes(&cmd.key, None)?; match cmd.out { Some(path) => { let file = File::create(path) - .map_err(|e| s3::Error::transport("failed to create output file", Some(Box::new(e))))?; + .map_err(VfsError::Io)?; let mut writer = BufWriter::new(file); - obj.write_to(&mut writer)?; - writer - .flush() - .map_err(|e| s3::Error::transport("failed to flush output file", Some(Box::new(e))))?; + writer.write_all(&bytes).map_err(VfsError::Io)?; + writer.flush().map_err(VfsError::Io)?; } None => { let stdout = io::stdout(); let mut handle = stdout.lock(); - obj.write_to(&mut handle)?; - handle - .flush() - .map_err(|e| s3::Error::transport("failed to flush stdout", Some(Box::new(e))))?; + handle.write_all(&bytes).map_err(VfsError::Io)?; + handle.flush().map_err(VfsError::Io)?; } } Ok(()) } -fn cmd_write(client: &BlockingClient, bucket: &str, cmd: CmdWrite) -> Result<(), s3::Error> { +fn cmd_write(vfs: &dyn VirtualFilesystem, cmd: CmdWrite) -> Result<(), VfsError> { let file = File::open(&cmd.file) - .map_err(|e| s3::Error::transport("failed to open input file", Some(Box::new(e))))?; + .map_err(VfsError::Io)?; let len = file .metadata() .map(|m| m.len()) - .map_err(|e| s3::Error::transport("failed to stat input file", Some(Box::new(e))))?; + .map_err(VfsError::Io)?; - let mut req = client - .objects() - .put(bucket, cmd.key) - .body_reader_sized(file, len); - - if let Some(ct) = cmd.content_type { - req = req.content_type(ct); - } - - req.send()?; + vfs.write_from_reader( + &cmd.key, + len, + Box::new(file), + cmd.content_type.as_deref(), + )?; Ok(()) } -fn cmd_delete(client: &BlockingClient, bucket: &str, cmd: CmdDelete) -> Result<(), s3::Error> { - client.objects().delete(bucket, cmd.key).send()?; +fn cmd_delete(vfs: &dyn VirtualFilesystem, cmd: CmdDelete) -> Result<(), VfsError> { + vfs.delete(&cmd.key)?; Ok(()) } -fn cmd_stat(client: &BlockingClient, bucket: &str, cmd: CmdStat) -> Result<(), s3::Error> { - let out = client.objects().head(bucket, cmd.key).send()?; +fn cmd_stat(vfs: &dyn VirtualFilesystem, cmd: CmdStat) -> Result<(), VfsError> { + let out = vfs.stat(&cmd.key)?; if let Some(len) = out.content_length { println!("content_length={len}"); } else { @@ -353,60 +358,31 @@ fn cmd_stat(client: &BlockingClient, bucket: &str, cmd: CmdStat) -> Result<(), s Ok(()) } -fn cmd_cp(client: &BlockingClient, bucket: &str, cmd: CmdCp) -> Result<(), s3::Error> { - client - .objects() - .copy(bucket, cmd.src_key, bucket, cmd.dst_key) - .send()?; +fn cmd_cp(vfs: &dyn VirtualFilesystem, cmd: CmdCp) -> Result<(), VfsError> { + vfs.copy(&cmd.src_key, &cmd.dst_key)?; Ok(()) } -fn cmd_mv(client: &BlockingClient, bucket: &str, cmd: CmdMv) -> Result<(), s3::Error> { - client - .objects() - .copy(bucket, cmd.src_key.clone(), bucket, cmd.dst_key) - .send()?; - client.objects().delete(bucket, cmd.src_key).send()?; +fn cmd_mv(vfs: &dyn VirtualFilesystem, cmd: CmdMv) -> Result<(), VfsError> { + vfs.copy(&cmd.src_key, &cmd.dst_key)?; + vfs.delete(&cmd.src_key)?; Ok(()) } -fn cmd_presign_get( - client: &BlockingClient, - bucket: &str, - cmd: CmdPresignGet, -) -> Result<(), s3::Error> { - let presigned = client - .objects() - .presign_get(bucket, cmd.key) - .expires_in(Duration::from_secs(cmd.expires)) - .build()?; - println!("{}", presigned.url); +fn cmd_presign_get(vfs: &dyn VirtualFilesystem, cmd: CmdPresignGet) -> Result<(), VfsError> { + let url = vfs.presign_get(&cmd.key, Duration::from_secs(cmd.expires))?; + println!("{url}"); Ok(()) } -fn cmd_presign_put( - client: &BlockingClient, - bucket: &str, - cmd: CmdPresignPut, -) -> Result<(), s3::Error> { - let presigned = client - .objects() - .presign_put(bucket, cmd.key) - .expires_in(Duration::from_secs(cmd.expires)) - .build()?; - println!("{}", presigned.url); +fn cmd_presign_put(vfs: &dyn VirtualFilesystem, cmd: CmdPresignPut) -> Result<(), VfsError> { + let url = vfs.presign_put(&cmd.key, Duration::from_secs(cmd.expires))?; + println!("{url}"); Ok(()) } -fn cmd_rm_prefix(client: &BlockingClient, bucket: &str, cmd: CmdRmPrefix) -> Result<(), s3::Error> { - let mut keys: Vec = Vec::new(); - - for page in client.objects().list_v2(bucket).prefix(cmd.prefix).pager() { - let page = page?; - for obj in page.contents { - keys.push(obj.key); - } - } +fn cmd_rm_prefix(vfs: &dyn VirtualFilesystem, cmd: CmdRmPrefix) -> Result<(), VfsError> { + let keys = vfs.list_all_keys(&cmd.prefix)?; if !cmd.yes { for k in &keys { @@ -418,12 +394,30 @@ fn cmd_rm_prefix(client: &BlockingClient, bucket: &str, cmd: CmdRmPrefix) -> Res // S3 multi-delete limit is 1000 keys per request. for chunk in keys.chunks(1000) { - client - .objects() - .delete_objects(bucket) - .objects(chunk.iter().cloned()) - .send()?; + let chunk_vec: Vec = chunk.iter().cloned().collect(); + vfs.delete_many(&chunk_vec)?; } Ok(()) } + +fn cmd_mount(vfs: S3Vfs, cmd: CmdMount) -> Result<(), Box> { + #[cfg(target_os = "linux")] + { + let vfs: std::sync::Arc = std::sync::Arc::new(vfs); + fuse_mount::mount_bucket( + vfs, + &cmd.mountpoint, + &cmd.prefix, + cmd.read_write, + cmd.allow_other, + )?; + Ok(()) + } + #[cfg(not(target_os = "linux"))] + { + let _ = vfs; + let _ = cmd; + Err("mount is only supported on Linux".into()) + } +} diff --git a/src/vfs.rs b/src/vfs.rs new file mode 100644 index 0000000..fb02ef1 --- /dev/null +++ b/src/vfs.rs @@ -0,0 +1,270 @@ +use std::{ + fmt, + io::{self, Read}, + time::Duration, +}; + +use s3::BlockingClient; + +#[derive(Debug)] +pub enum VfsError { + S3(s3::Error), + Io(io::Error), + NotFound(String), + Unsupported(&'static str), +} + +impl fmt::Display for VfsError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + VfsError::S3(e) => write!(f, "{e}"), + VfsError::Io(e) => write!(f, "{e}"), + VfsError::NotFound(p) => write!(f, "not found: {p}"), + VfsError::Unsupported(msg) => write!(f, "unsupported: {msg}"), + } + } +} + +impl std::error::Error for VfsError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + VfsError::S3(e) => Some(e), + VfsError::Io(e) => Some(e), + VfsError::NotFound(_) => None, + VfsError::Unsupported(_) => None, + } + } +} + +impl From for VfsError { + fn from(value: s3::Error) -> Self { + VfsError::S3(value) + } +} + +impl From for VfsError { + fn from(value: io::Error) -> Self { + VfsError::Io(value) + } +} + +#[derive(Debug, Clone)] +pub struct ObjectStat { + pub content_length: Option, + pub content_type: Option, + pub etag: Option, +} + +#[derive(Debug, Clone)] +pub struct ListPage { + pub common_prefixes: Vec, + pub keys: Vec, +} + +pub trait VirtualFilesystem: Send + Sync { + fn list_page( + &self, + prefix: &str, + recursive: bool, + max_keys: Option, + ) -> Result; + + fn list_all_keys(&self, prefix: &str) -> Result, VfsError>; + + fn list_all(&self, prefix: &str, recursive: bool) -> Result; + + fn read_bytes(&self, key: &str, range: Option<(u64, u64)>) -> Result, VfsError>; + + fn write_from_reader( + &self, + key: &str, + len: u64, + r: Box, + content_type: Option<&str>, + ) -> Result<(), VfsError>; + + fn delete(&self, key: &str) -> Result<(), VfsError>; + fn delete_many(&self, keys: &[String]) -> Result<(), VfsError>; + fn stat(&self, key: &str) -> Result; + fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), VfsError>; + fn presign_get(&self, key: &str, expires: Duration) -> Result; + fn presign_put(&self, key: &str, expires: Duration) -> Result; +} + +#[derive(Clone)] +pub struct S3Vfs { + client: BlockingClient, + bucket: String, +} + +impl S3Vfs { + pub fn new(client: BlockingClient, bucket: impl Into) -> Self { + Self { + client, + bucket: bucket.into(), + } + } + + pub fn bucket(&self) -> &str { + &self.bucket + } +} + +impl VirtualFilesystem for S3Vfs { + fn list_page( + &self, + prefix: &str, + recursive: bool, + max_keys: Option, + ) -> Result { + let mut req = self.client.objects().list_v2(&self.bucket); + if !prefix.is_empty() { + req = req.prefix(prefix.to_string()); + } + if !recursive { + req = req.delimiter("/"); + } + if let Some(max_keys) = max_keys { + req = req.max_keys(max_keys); + } + + let out = req.send()?; + Ok(ListPage { + common_prefixes: out.common_prefixes, + keys: out.contents.into_iter().map(|o| o.key).collect(), + }) + } + + fn list_all_keys(&self, prefix: &str) -> Result, VfsError> { + let mut keys = Vec::new(); + let mut req = self.client.objects().list_v2(&self.bucket); + if !prefix.is_empty() { + req = req.prefix(prefix.to_string()); + } + for page in req.pager() { + let page = page?; + for obj in page.contents { + keys.push(obj.key); + } + } + Ok(keys) + } + + fn list_all(&self, prefix: &str, recursive: bool) -> Result { + let mut common_prefixes: Vec = Vec::new(); + let mut keys: Vec = Vec::new(); + + let mut req = self.client.objects().list_v2(&self.bucket); + if !prefix.is_empty() { + req = req.prefix(prefix.to_string()); + } + if !recursive { + req = req.delimiter("/"); + } + + for page in req.pager() { + let page = page?; + common_prefixes.extend(page.common_prefixes); + keys.extend(page.contents.into_iter().map(|o| o.key)); + } + + common_prefixes.sort(); + common_prefixes.dedup(); + Ok(ListPage { + common_prefixes, + keys, + }) + } + + fn read_bytes(&self, key: &str, range: Option<(u64, u64)>) -> Result, VfsError> { + let mut req = self.client.objects().get(&self.bucket, key.to_string()); + if let Some((start, end_inclusive)) = range { + req = req.range_bytes(start, end_inclusive); + } + let obj = req.send()?; + Ok(obj.bytes()?.to_vec()) + } + + fn write_from_reader( + &self, + key: &str, + len: u64, + r: Box, + content_type: Option<&str>, + ) -> Result<(), VfsError> { + let mut req = self + .client + .objects() + .put(&self.bucket, key.to_string()) + .body_reader_sized(r, len); + + if let Some(ct) = content_type { + req = req.content_type(ct.to_string()); + } + + req.send()?; + Ok(()) + } + + fn delete(&self, key: &str) -> Result<(), VfsError> { + self.client + .objects() + .delete(&self.bucket, key.to_string()) + .send()?; + Ok(()) + } + + fn delete_many(&self, keys: &[String]) -> Result<(), VfsError> { + if keys.is_empty() { + return Ok(()); + } + self.client + .objects() + .delete_objects(&self.bucket) + .objects(keys.iter().cloned()) + .send()?; + Ok(()) + } + + fn stat(&self, key: &str) -> Result { + let out = self + .client + .objects() + .head(&self.bucket, key.to_string()) + .send()?; + Ok(ObjectStat { + content_length: out.content_length, + content_type: out.content_type, + etag: out.etag, + }) + } + + fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), VfsError> { + self.client + .objects() + .copy(&self.bucket, src_key.to_string(), &self.bucket, dst_key.to_string()) + .send()?; + Ok(()) + } + + fn presign_get(&self, key: &str, expires: Duration) -> Result { + let presigned = self + .client + .objects() + .presign_get(&self.bucket, key.to_string()) + .expires_in(expires) + .build()?; + Ok(presigned.url.to_string()) + } + + fn presign_put(&self, key: &str, expires: Duration) -> Result { + let presigned = self + .client + .objects() + .presign_put(&self.bucket, key.to_string()) + .expires_in(expires) + .build()?; + Ok(presigned.url.to_string()) + } +} +