From dc2a330d5810827e24160611bc64ca48cdcab62b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn-Michael=20Miehe?= <40151420+ldericher@users.noreply.github.com> Date: Tue, 10 Jun 2025 23:39:08 +0000 Subject: [PATCH] [wip] impl `Client` for `ureq::Agent` - Chunk implementation --- src/appstate.rs | 54 ++++++++++++++++++++++++++++--------------- src/file/chunk.rs | 36 +++++++++++++++++++++++++++++ src/file/mod.rs | 2 ++ src/file/uploading.rs | 21 ++++------------- src/main.rs | 2 +- src/sharry/client.rs | 19 +++++---------- 6 files changed, 85 insertions(+), 49 deletions(-) create mode 100644 src/file/chunk.rs diff --git a/src/appstate.rs b/src/appstate.rs index 7326608..04ac109 100644 --- a/src/appstate.rs +++ b/src/appstate.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use super::{ cli::Cli, file::{self, FileTrait}, - sharry::{self, Client, Uri}, + sharry::{self, Client, ClientError, Uri}, }; #[derive(Serialize, Deserialize, Debug)] @@ -23,6 +23,8 @@ pub struct AppState { file_name: PathBuf, #[serde(skip)] progress: Option, + #[serde(skip)] + buffer: Vec, uri: Uri, alias_id: String, @@ -94,6 +96,7 @@ impl AppState { Self { file_name, progress: None, + buffer: Vec::with_capacity(args.chunk_size), uri: state.uri, alias_id: state.alias_id, share_id: state.share_id, @@ -119,6 +122,7 @@ impl AppState { Ok(Self { file_name, progress: None, + buffer: Vec::with_capacity(args.chunk_size), uri, alias_id, share_id, @@ -130,18 +134,16 @@ impl AppState { self.files.iter().map(FileState::file_name).collect() } - pub fn upload_chunk( - &mut self, - http: &ureq::Agent, - chunk_size: usize, - ) -> sharry::Result> { - let uploading = if let Some(state) = self.files.pop_front() { - state.start_upload(http, &self.uri, &self.alias_id, &self.share_id)? + pub fn upload_chunk(&mut self, http: &impl Client) -> sharry::Result> { + let mut uploading = if let Some(state) = self.files.pop_front() { + state + .start_upload(http, &self.uri, &self.alias_id, &self.share_id) + .unwrap() // HACK unwrap } else { return Ok(None); }; - debug!("{uploading} chunk {chunk_size}"); + debug!("{uploading} chunk {}", self.buffer.len()); // Initialize or fetch the existing ProgressBar in one call: let bar = &*self.progress.get_or_insert_with(|| { @@ -165,21 +167,35 @@ impl AppState { bar }); - match uploading.upload_chunk(http, &self.alias_id, chunk_size) { - ChunkState::Ok(upl) => { - bar.set_position(upl.get_offset()); - self.files.push_front(FileState::U(upl)); + let chunk = uploading + .read(&mut self.buffer) + .map_err(ClientError::req_err)?; + if chunk.get_length() == 0 { + return Err(ClientError::req_err("wtf")); + } + + http.file_patch( + chunk.get_patch_uri(), + &self.alias_id, + chunk.get_offset(), + chunk.get_data(), + )?; + + match uploading.check_eof() { + Ok(uploading) => { + bar.set_position(uploading.get_offset()); + self.files.push_front(FileState::U(uploading)); Ok(Some(())) } - ChunkState::Err(upl, e) => { - self.files.push_front(FileState::U(upl)); - Err(e) - } - ChunkState::Finished(path) => { + Err(path) => { debug!("Finished {:?}!", path.display()); bar.finish(); self.progress = None; - self.share_id.notify(http, &self.alias_id).unwrap(); // HACK unwrap + + let endpoint = self + .uri + .endpoint(format!("alias/mail/notify/{}", self.share_id)); + http.share_notify(&endpoint, &self.alias_id).unwrap(); // HACK unwrap Ok(self.files.front().map(drop)) } diff --git a/src/file/chunk.rs b/src/file/chunk.rs new file mode 100644 index 0000000..bf57c93 --- /dev/null +++ b/src/file/chunk.rs @@ -0,0 +1,36 @@ +pub struct Chunk<'t> { + data: &'t [u8], + patch_uri: &'t str, + offset: u64, +} + +impl<'t> Chunk<'t> { + pub fn new(data: &'t [u8], patch_uri: &'t str, offset: u64) -> Self { + Self { + data, + patch_uri, + offset, + } + } + + pub fn get_data(&self) -> &[u8] { + self.data + } + + pub fn get_length(&self) -> u64 { + let len = self.data.len(); + + // BOOKMARK this might **panic** on platforms where `usize` has more than 64 bit. + // Also, you've allocated more than 2 EiB ... in ONE chunk. + // Whoa! Maybe just chill? + u64::try_from(len).unwrap_or_else(|e| panic!("usize={} did not fit into u64: {}", len, e)) + } + + pub fn get_patch_uri(&self) -> &str { + self.patch_uri + } + + pub fn get_offset(&self) -> u64 { + self.offset + } +} diff --git a/src/file/mod.rs b/src/file/mod.rs index 513d562..4d508e6 100644 --- a/src/file/mod.rs +++ b/src/file/mod.rs @@ -1,9 +1,11 @@ mod checked; +mod chunk; mod uploading; use std::{ffi::OsStr, path::Path}; pub use checked::Checked; +pub use chunk::Chunk; pub use uploading::Uploading; pub trait FileTrait<'t> { diff --git a/src/file/uploading.rs b/src/file/uploading.rs index aded973..6ce0480 100644 --- a/src/file/uploading.rs +++ b/src/file/uploading.rs @@ -6,7 +6,7 @@ use std::{ use serde::{Deserialize, Serialize}; -use super::FileTrait; +use super::{Chunk, FileTrait}; #[derive(Serialize, Deserialize, Debug)] pub struct Uploading { @@ -38,31 +38,20 @@ impl Uploading { } } - pub fn get_patch_uri(&self) -> &str { - &self.patch_uri - } - pub fn get_offset(&self) -> u64 { self.offset } - pub fn read(&mut self, buf: &mut [u8]) -> io::Result { + pub fn read<'t>(&'t mut self, buf: &'t mut [u8]) -> io::Result> { let mut f = fs::File::open(&self.path)?; f.seek(SeekFrom::Start(self.offset))?; let read_len = f.read(buf)?; - // convert into `u64` - // - // BOOKMARK this might **panic** on platforms where `usize` has more than 64 bit. - // Also, you're reading more than 2 EiB ... in ONE chunk. - // Whoa! Maybe just chill? - let read_len = u64::try_from(read_len) - .unwrap_or_else(|e| panic!("usize={} did not fit into u64: {}", read_len, e)); + let chunk = Chunk::new(&buf[..read_len], &self.patch_uri, self.offset); + self.offset += chunk.get_length(); - self.offset += read_len; - - Ok(read_len) + Ok(chunk) } pub fn check_eof(self) -> Result { diff --git a/src/main.rs b/src/main.rs index 8c15900..17d53c9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -99,7 +99,7 @@ fn main() { info!("continuing with state: {state:?}"); loop { - match state.upload_chunk(&agent, args.chunk_size * 1024 * 1024) { + match state.upload_chunk(&agent) { Err(e) => error!("error: {e:?}"), Ok(None) => { info!("all uploads done"); diff --git a/src/sharry/client.rs b/src/sharry/client.rs index bd6a5ea..8c32799 100644 --- a/src/sharry/client.rs +++ b/src/sharry/client.rs @@ -21,8 +21,7 @@ pub trait Client { file_size: u64, ) -> Result; - fn file_patch(&self, patch_uri: &str, alias_id: &str, offset: u64, chunk: &[u8]) - -> Result; + fn file_patch(&self, patch_uri: &str, alias_id: &str, offset: u64, chunk: &[u8]) -> Result<()>; } #[derive(Debug, Error)] @@ -41,15 +40,15 @@ pub enum ClientError { } impl ClientError { - fn req_err(msg: impl fmt::Display) -> Self { + pub fn req_err(msg: impl fmt::Display) -> Self { Self::Request(msg.to_string()) } - fn res_parse_err(msg: impl fmt::Display) -> Self { + pub fn res_parse_err(msg: impl fmt::Display) -> Self { Self::ResponseParsing(msg.to_string()) } - fn res_check_status(actual: T, expected: T) -> Result<()> + pub fn res_check_status(actual: T, expected: T) -> Result<()> where T: Into + Eq, { @@ -149,13 +148,7 @@ impl Client for ureq::Agent { Ok(location) } - fn file_patch( - &self, - patch_uri: &str, - alias_id: &str, - offset: u64, - chunk: &[u8], - ) -> Result { + fn file_patch(&self, patch_uri: &str, alias_id: &str, offset: u64, chunk: &[u8]) -> Result<()> { let res = self .patch(patch_uri) .header("Sharry-Alias", alias_id) @@ -177,7 +170,7 @@ impl Client for ureq::Agent { let chunk_len = u64::try_from(chunk.len()).expect("something's VERY wrong"); if offset + chunk_len == res_offset { - Ok(res_offset) + Ok(()) } else { Err(ClientError::ResponseContent(format!( "Unexpected Upload-Offset: {} (expected {})",