270 lines
7.2 KiB
Rust
270 lines
7.2 KiB
Rust
use std::{
|
|
fmt,
|
|
io::{self, Read},
|
|
time::Duration,
|
|
};
|
|
|
|
use s3::BlockingClient;
|
|
|
|
#[derive(Debug)]
|
|
pub enum VfsError {
|
|
S3(s3::Error),
|
|
Io(io::Error),
|
|
NotFound(String),
|
|
Unsupported(&'static str),
|
|
}
|
|
|
|
impl fmt::Display for VfsError {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
match self {
|
|
VfsError::S3(e) => write!(f, "{e}"),
|
|
VfsError::Io(e) => write!(f, "{e}"),
|
|
VfsError::NotFound(p) => write!(f, "not found: {p}"),
|
|
VfsError::Unsupported(msg) => write!(f, "unsupported: {msg}"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for VfsError {
|
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
|
match self {
|
|
VfsError::S3(e) => Some(e),
|
|
VfsError::Io(e) => Some(e),
|
|
VfsError::NotFound(_) => None,
|
|
VfsError::Unsupported(_) => None,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<s3::Error> for VfsError {
|
|
fn from(value: s3::Error) -> Self {
|
|
VfsError::S3(value)
|
|
}
|
|
}
|
|
|
|
impl From<io::Error> for VfsError {
|
|
fn from(value: io::Error) -> Self {
|
|
VfsError::Io(value)
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct ObjectStat {
|
|
pub content_length: Option<u64>,
|
|
pub content_type: Option<String>,
|
|
pub etag: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct ListPage {
|
|
pub common_prefixes: Vec<String>,
|
|
pub keys: Vec<String>,
|
|
}
|
|
|
|
pub trait VirtualFilesystem: Send + Sync {
|
|
fn list_page(
|
|
&self,
|
|
prefix: &str,
|
|
recursive: bool,
|
|
max_keys: Option<u32>,
|
|
) -> Result<ListPage, VfsError>;
|
|
|
|
fn list_all_keys(&self, prefix: &str) -> Result<Vec<String>, VfsError>;
|
|
|
|
fn list_all(&self, prefix: &str, recursive: bool) -> Result<ListPage, VfsError>;
|
|
|
|
fn read_bytes(&self, key: &str, range: Option<(u64, u64)>) -> Result<Vec<u8>, VfsError>;
|
|
|
|
fn write_from_reader(
|
|
&self,
|
|
key: &str,
|
|
len: u64,
|
|
r: Box<dyn Read + Send>,
|
|
content_type: Option<&str>,
|
|
) -> Result<(), VfsError>;
|
|
|
|
fn delete(&self, key: &str) -> Result<(), VfsError>;
|
|
fn delete_many(&self, keys: &[String]) -> Result<(), VfsError>;
|
|
fn stat(&self, key: &str) -> Result<ObjectStat, VfsError>;
|
|
fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), VfsError>;
|
|
fn presign_get(&self, key: &str, expires: Duration) -> Result<String, VfsError>;
|
|
fn presign_put(&self, key: &str, expires: Duration) -> Result<String, VfsError>;
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct S3Vfs {
|
|
client: BlockingClient,
|
|
bucket: String,
|
|
}
|
|
|
|
impl S3Vfs {
|
|
pub fn new(client: BlockingClient, bucket: impl Into<String>) -> Self {
|
|
Self {
|
|
client,
|
|
bucket: bucket.into(),
|
|
}
|
|
}
|
|
|
|
pub fn bucket(&self) -> &str {
|
|
&self.bucket
|
|
}
|
|
}
|
|
|
|
impl VirtualFilesystem for S3Vfs {
|
|
fn list_page(
|
|
&self,
|
|
prefix: &str,
|
|
recursive: bool,
|
|
max_keys: Option<u32>,
|
|
) -> Result<ListPage, VfsError> {
|
|
let mut req = self.client.objects().list_v2(&self.bucket);
|
|
if !prefix.is_empty() {
|
|
req = req.prefix(prefix.to_string());
|
|
}
|
|
if !recursive {
|
|
req = req.delimiter("/");
|
|
}
|
|
if let Some(max_keys) = max_keys {
|
|
req = req.max_keys(max_keys);
|
|
}
|
|
|
|
let out = req.send()?;
|
|
Ok(ListPage {
|
|
common_prefixes: out.common_prefixes,
|
|
keys: out.contents.into_iter().map(|o| o.key).collect(),
|
|
})
|
|
}
|
|
|
|
fn list_all_keys(&self, prefix: &str) -> Result<Vec<String>, VfsError> {
|
|
let mut keys = Vec::new();
|
|
let mut req = self.client.objects().list_v2(&self.bucket);
|
|
if !prefix.is_empty() {
|
|
req = req.prefix(prefix.to_string());
|
|
}
|
|
for page in req.pager() {
|
|
let page = page?;
|
|
for obj in page.contents {
|
|
keys.push(obj.key);
|
|
}
|
|
}
|
|
Ok(keys)
|
|
}
|
|
|
|
fn list_all(&self, prefix: &str, recursive: bool) -> Result<ListPage, VfsError> {
|
|
let mut common_prefixes: Vec<String> = Vec::new();
|
|
let mut keys: Vec<String> = Vec::new();
|
|
|
|
let mut req = self.client.objects().list_v2(&self.bucket);
|
|
if !prefix.is_empty() {
|
|
req = req.prefix(prefix.to_string());
|
|
}
|
|
if !recursive {
|
|
req = req.delimiter("/");
|
|
}
|
|
|
|
for page in req.pager() {
|
|
let page = page?;
|
|
common_prefixes.extend(page.common_prefixes);
|
|
keys.extend(page.contents.into_iter().map(|o| o.key));
|
|
}
|
|
|
|
common_prefixes.sort();
|
|
common_prefixes.dedup();
|
|
Ok(ListPage {
|
|
common_prefixes,
|
|
keys,
|
|
})
|
|
}
|
|
|
|
fn read_bytes(&self, key: &str, range: Option<(u64, u64)>) -> Result<Vec<u8>, VfsError> {
|
|
let mut req = self.client.objects().get(&self.bucket, key.to_string());
|
|
if let Some((start, end_inclusive)) = range {
|
|
req = req.range_bytes(start, end_inclusive);
|
|
}
|
|
let obj = req.send()?;
|
|
Ok(obj.bytes()?.to_vec())
|
|
}
|
|
|
|
fn write_from_reader(
|
|
&self,
|
|
key: &str,
|
|
len: u64,
|
|
r: Box<dyn Read + Send>,
|
|
content_type: Option<&str>,
|
|
) -> Result<(), VfsError> {
|
|
let mut req = self
|
|
.client
|
|
.objects()
|
|
.put(&self.bucket, key.to_string())
|
|
.body_reader_sized(r, len);
|
|
|
|
if let Some(ct) = content_type {
|
|
req = req.content_type(ct.to_string());
|
|
}
|
|
|
|
req.send()?;
|
|
Ok(())
|
|
}
|
|
|
|
fn delete(&self, key: &str) -> Result<(), VfsError> {
|
|
self.client
|
|
.objects()
|
|
.delete(&self.bucket, key.to_string())
|
|
.send()?;
|
|
Ok(())
|
|
}
|
|
|
|
fn delete_many(&self, keys: &[String]) -> Result<(), VfsError> {
|
|
if keys.is_empty() {
|
|
return Ok(());
|
|
}
|
|
self.client
|
|
.objects()
|
|
.delete_objects(&self.bucket)
|
|
.objects(keys.iter().cloned())
|
|
.send()?;
|
|
Ok(())
|
|
}
|
|
|
|
fn stat(&self, key: &str) -> Result<ObjectStat, VfsError> {
|
|
let out = self
|
|
.client
|
|
.objects()
|
|
.head(&self.bucket, key.to_string())
|
|
.send()?;
|
|
Ok(ObjectStat {
|
|
content_length: out.content_length,
|
|
content_type: out.content_type,
|
|
etag: out.etag,
|
|
})
|
|
}
|
|
|
|
fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), VfsError> {
|
|
self.client
|
|
.objects()
|
|
.copy(&self.bucket, src_key.to_string(), &self.bucket, dst_key.to_string())
|
|
.send()?;
|
|
Ok(())
|
|
}
|
|
|
|
fn presign_get(&self, key: &str, expires: Duration) -> Result<String, VfsError> {
|
|
let presigned = self
|
|
.client
|
|
.objects()
|
|
.presign_get(&self.bucket, key.to_string())
|
|
.expires_in(expires)
|
|
.build()?;
|
|
Ok(presigned.url.to_string())
|
|
}
|
|
|
|
fn presign_put(&self, key: &str, expires: Duration) -> Result<String, VfsError> {
|
|
let presigned = self
|
|
.client
|
|
.objects()
|
|
.presign_put(&self.bucket, key.to_string())
|
|
.expires_in(expires)
|
|
.build()?;
|
|
Ok(presigned.url.to_string())
|
|
}
|
|
}
|
|
|