From f05e112040331a483ac8f48fa825f86e8660c404 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn-Michael=20Miehe?= <40151420+ldericher@users.noreply.github.com> Date: Fri, 13 Jun 2025 17:03:25 +0000 Subject: [PATCH 1/3] [wip] retry chunks - make `Chunk` lifetime agnostic by using a raw pointer - add `fn Uploading::rewind` - add `fn AppState::{next_chunk, is_done}` --- src/appstate.rs | 47 +++++++++++++++++++++++++++++-------------- src/file/chunk.rs | 22 +++++++++++--------- src/file/uploading.rs | 15 ++++++++++++-- 3 files changed, 57 insertions(+), 27 deletions(-) diff --git a/src/appstate.rs b/src/appstate.rs index 155fc35..979f03f 100644 --- a/src/appstate.rs +++ b/src/appstate.rs @@ -8,11 +8,13 @@ use console::style; use indicatif::{ProgressBar, ProgressStyle}; use log::{debug, warn}; +use crate::file::Chunk; + use super::{ cachefile::CacheFile, cli::Cli, file::{self, FileTrait}, - sharry::{self, Client, ClientError}, + sharry::{self, Client}, }; pub struct AppState { @@ -112,31 +114,32 @@ impl AppState { } } - pub fn upload_chunk(&mut self) -> sharry::Result { + fn next_chunk(&mut self) -> io::Result> { let Some(mut uploading) = self.inner.pop_file(&self.http) else { self.inner .share_notify(&self.http) .unwrap_or_else(|e| warn!("Failed to notify the share: {e}")); - return Ok(true); + return Ok(None); }; debug!("{uploading:?}"); self.get_or_create_progressbar(&uploading); - let chunk = uploading - .read(&mut self.buffer) - .map_err(ClientError::from)?; + let chunk = uploading.read(&mut self.buffer); + self.inner.push_file(uploading); + let chunk = chunk?; debug!("{chunk:?}"); - self.http.file_patch( - chunk.get_patch_uri(), - self.inner.alias_id(), - chunk.get_offset(), - chunk.get_data(), - )?; + Ok(Some(chunk)) + } + + fn is_done(&mut self) -> bool { + let Some(uploading) = self.inner.pop_file(&self.http) else { + return true; + }; match uploading.check_eof() { Ok(uploading) => { @@ -148,15 +151,29 @@ impl AppState { drop(bar); self.inner.push_file(uploading); - Ok(false) } Err(path) => { debug!("Finished {:?}!", path.display()); self.finish_progressbar(); - - Ok(self.inner.is_empty()) } } + + self.inner.is_empty() + } + + pub fn upload_chunk(&mut self) -> sharry::Result { + let Some(chunk) = self.next_chunk()? else { + return Ok(true); + }; + + self.http.file_patch( + chunk.get_patch_uri(), + self.inner.alias_id(), + chunk.get_offset(), + unsafe { chunk.get_data() }, + )?; + + Ok(self.is_done()) } pub fn file_names(&self) -> Vec<&str> { diff --git a/src/file/chunk.rs b/src/file/chunk.rs index 8f69dd2..8cf3c49 100644 --- a/src/file/chunk.rs +++ b/src/file/chunk.rs @@ -1,23 +1,25 @@ use std::fmt; -pub struct Chunk<'t> { - data: &'t [u8], - patch_uri: &'t str, +pub struct Chunk { + data: *const [u8], + patch_uri: String, offset: u64, } -impl fmt::Debug for Chunk<'_> { +impl fmt::Debug for Chunk { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Chunk") .field("patch_uri", &self.patch_uri) .field("offset", &self.offset) - .field("data.len()", &self.data.len()) + .field("data_len", &self.data.len()) .finish_non_exhaustive() } } -impl<'t> Chunk<'t> { - pub fn new(data: &'t [u8], patch_uri: &'t str, offset: u64) -> Self { +impl Chunk { + pub fn new(data: &[u8], patch_uri: String, offset: u64) -> Self { + let data: *const [u8] = data as *const [u8]; + Self { data, patch_uri, @@ -25,8 +27,8 @@ impl<'t> Chunk<'t> { } } - pub fn get_data(&self) -> &[u8] { - self.data + pub unsafe fn get_data(&self) -> &[u8] { + unsafe { &*self.data } } pub fn get_length(&self) -> u64 { @@ -39,7 +41,7 @@ impl<'t> Chunk<'t> { } pub fn get_patch_uri(&self) -> &str { - self.patch_uri + &self.patch_uri } pub fn get_offset(&self) -> u64 { diff --git a/src/file/uploading.rs b/src/file/uploading.rs index 26a83e7..2831bcd 100644 --- a/src/file/uploading.rs +++ b/src/file/uploading.rs @@ -13,6 +13,7 @@ pub struct Uploading { path: PathBuf, size: u64, patch_uri: String, + last_offset: Option, offset: u64, } @@ -34,6 +35,7 @@ impl Uploading { path, size, patch_uri, + last_offset: None, offset: 0, } } @@ -42,7 +44,15 @@ impl Uploading { self.offset } - pub fn read<'t>(&'t mut self, buf: &'t mut [u8]) -> io::Result> { + pub fn rewind(self) -> Option { + self.last_offset.map(|last_offset| Self { + last_offset: None, + offset: last_offset, + ..self + }) + } + + pub fn read(&mut self, buf: &mut [u8]) -> io::Result { let mut f = fs::File::open(&self.path)?; f.seek(SeekFrom::Start(self.offset))?; @@ -55,7 +65,8 @@ impl Uploading { )); } - let chunk = Chunk::new(&buf[..read_len], &self.patch_uri, self.offset); + let chunk = Chunk::new(&buf[..read_len], self.patch_uri.clone(), self.offset); + self.last_offset = Some(self.offset); self.offset += chunk.get_length(); Ok(chunk) From 0a8e5cf3f0bfeec3035769f66c36899246ddb936 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn-Michael=20Miehe?= <40151420+ldericher@users.noreply.github.com> Date: Fri, 13 Jun 2025 22:27:15 +0000 Subject: [PATCH 2/3] [wip] retry chunks - move buffer into main function again, avoiding `unsafe` code - partially revert `Chunk` changes, removing `unsafe` parts --- src/appstate.rs | 38 ++++++++++++-------------------------- src/file/chunk.rs | 18 ++++++++---------- src/file/uploading.rs | 2 +- src/main.rs | 4 +++- 4 files changed, 24 insertions(+), 38 deletions(-) diff --git a/src/appstate.rs b/src/appstate.rs index 979f03f..e253e53 100644 --- a/src/appstate.rs +++ b/src/appstate.rs @@ -8,19 +8,15 @@ use console::style; use indicatif::{ProgressBar, ProgressStyle}; use log::{debug, warn}; -use crate::file::Chunk; - use super::{ cachefile::CacheFile, cli::Cli, - file::{self, FileTrait}, + file::{self, FileTrait, Chunk}, sharry::{self, Client}, }; pub struct AppState { progress: RefCell>, - buffer: Vec, - http: ureq::Agent, inner: CacheFile, } @@ -41,10 +37,9 @@ fn new_http(timeout: Option) -> ureq::Agent { } impl AppState { - fn new(chunk_size: usize, http: ureq::Agent, inner: CacheFile) -> Self { + fn new(http: ureq::Agent, inner: CacheFile) -> Self { Self { - progress: None.into(), - buffer: vec![0; chunk_size * 1024 * 1024], + progress: RefCell::new(None), http, inner, } @@ -55,11 +50,7 @@ impl AppState { .inspect_err(|e| debug!("could not resume from hash {:?}: {e}", args.get_hash())) .ok()?; - Some(Self::new( - args.chunk_size, - new_http(args.get_timeout()), - inner, - )) + Some(Self::new(new_http(args.get_timeout()), inner)) } pub fn from_args(args: &Cli) -> sharry::Result { @@ -71,11 +62,7 @@ impl AppState { args.get_share_request(), )?; - Ok(Self::new( - args.chunk_size, - http, - CacheFile::from_args(args, share_id), - )) + Ok(Self::new(http, CacheFile::from_args(args, share_id))) } fn get_or_create_progressbar(&self, uploading: &file::Uploading) -> Ref<'_, ProgressBar> { @@ -114,7 +101,7 @@ impl AppState { } } - fn next_chunk(&mut self) -> io::Result> { + fn next_chunk<'t>(&mut self, buffer: &'t mut [u8]) -> io::Result>> { let Some(mut uploading) = self.inner.pop_file(&self.http) else { self.inner .share_notify(&self.http) @@ -122,15 +109,14 @@ impl AppState { return Ok(None); }; - debug!("{uploading:?}"); self.get_or_create_progressbar(&uploading); - let chunk = uploading.read(&mut self.buffer); + let chunk_res = uploading.read(buffer); self.inner.push_file(uploading); - let chunk = chunk?; + let chunk = chunk_res?; debug!("{chunk:?}"); Ok(Some(chunk)) @@ -146,7 +132,7 @@ impl AppState { let bar = self.get_or_create_progressbar(&uploading); bar.set_position(uploading.get_offset()); // BUG in `indicatif` crate? - // `set_position` does not force immediate redraw, so we also call `inc_length` here + // `set_position` does not force an immediate redraw, so we also call `inc_length` here bar.inc_length(0); drop(bar); @@ -161,8 +147,8 @@ impl AppState { self.inner.is_empty() } - pub fn upload_chunk(&mut self) -> sharry::Result { - let Some(chunk) = self.next_chunk()? else { + pub fn upload_chunk(&mut self, buffer: &mut [u8]) -> sharry::Result { + let Some(chunk) = self.next_chunk(buffer)? else { return Ok(true); }; @@ -170,7 +156,7 @@ impl AppState { chunk.get_patch_uri(), self.inner.alias_id(), chunk.get_offset(), - unsafe { chunk.get_data() }, + chunk.get_data(), )?; Ok(self.is_done()) diff --git a/src/file/chunk.rs b/src/file/chunk.rs index 8cf3c49..f93f172 100644 --- a/src/file/chunk.rs +++ b/src/file/chunk.rs @@ -1,25 +1,23 @@ use std::fmt; -pub struct Chunk { - data: *const [u8], +pub struct Chunk<'t> { + data: &'t [u8], patch_uri: String, offset: u64, } -impl fmt::Debug for Chunk { +impl fmt::Debug for Chunk<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Chunk") .field("patch_uri", &self.patch_uri) .field("offset", &self.offset) - .field("data_len", &self.data.len()) + .field("data.len()", &self.data.len()) .finish_non_exhaustive() } } -impl Chunk { - pub fn new(data: &[u8], patch_uri: String, offset: u64) -> Self { - let data: *const [u8] = data as *const [u8]; - +impl<'t> Chunk<'t> { + pub fn new(data: &'t [u8], patch_uri: String, offset: u64) -> Self { Self { data, patch_uri, @@ -27,8 +25,8 @@ impl Chunk { } } - pub unsafe fn get_data(&self) -> &[u8] { - unsafe { &*self.data } + pub fn get_data(&self) -> &[u8] { + self.data } pub fn get_length(&self) -> u64 { diff --git a/src/file/uploading.rs b/src/file/uploading.rs index 2831bcd..dc40d86 100644 --- a/src/file/uploading.rs +++ b/src/file/uploading.rs @@ -52,7 +52,7 @@ impl Uploading { }) } - pub fn read(&mut self, buf: &mut [u8]) -> io::Result { + pub fn read<'t>(&mut self, buf: &'t mut [u8]) -> io::Result> { let mut f = fs::File::open(&self.path)?; f.seek(SeekFrom::Start(self.offset))?; diff --git a/src/main.rs b/src/main.rs index 38b799d..9477a8a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -113,8 +113,10 @@ fn main() { style(state.file_names().join(", ")).magenta(), ); + let mut buffer = vec![0; args.chunk_size * 1024 * 1024]; + loop { - match state.upload_chunk() { + match state.upload_chunk(&mut buffer) { Err(e) => error!("error: {e:?}"), // HACK handle errors better Ok(true) => { info!("all uploads done"); From 30855ed8ffed683b244135adc4f97266a5c27095 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn-Michael=20Miehe?= <40151420+ldericher@users.noreply.github.com> Date: Fri, 13 Jun 2025 23:00:36 +0000 Subject: [PATCH 3/3] [wip] retry chunks - `AppState::rewind` impl - error handling in `main` --- src/appstate.rs | 14 +++++++++++++- src/file/uploading.rs | 17 ++++++++++++----- src/main.rs | 12 +++++++++++- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/src/appstate.rs b/src/appstate.rs index e253e53..8ce7180 100644 --- a/src/appstate.rs +++ b/src/appstate.rs @@ -11,7 +11,7 @@ use log::{debug, warn}; use super::{ cachefile::CacheFile, cli::Cli, - file::{self, FileTrait, Chunk}, + file::{self, Chunk, FileTrait}, sharry::{self, Client}, }; @@ -162,6 +162,18 @@ impl AppState { Ok(self.is_done()) } + pub fn rewind(mut self) -> Option { + let Some(uploading) = self.inner.pop_file(&self.http) else { + warn!("rewind called on empty queue"); + return None; + }; + + let uploading = uploading.rewind()?; + self.inner.push_file(uploading); + + Some(self) + } + pub fn file_names(&self) -> Vec<&str> { self.inner.file_names() } diff --git a/src/file/uploading.rs b/src/file/uploading.rs index dc40d86..0b2118f 100644 --- a/src/file/uploading.rs +++ b/src/file/uploading.rs @@ -4,6 +4,7 @@ use std::{ path::PathBuf, }; +use log::warn; use serde::{Deserialize, Serialize}; use super::{Chunk, FileTrait}; @@ -45,11 +46,17 @@ impl Uploading { } pub fn rewind(self) -> Option { - self.last_offset.map(|last_offset| Self { - last_offset: None, - offset: last_offset, - ..self - }) + match self.last_offset { + Some(last_offset) => Some(Self { + last_offset: None, + offset: last_offset, + ..self + }), + None => { + warn!("attempted to rewind twice"); + None + } + } } pub fn read<'t>(&mut self, buf: &'t mut [u8]) -> io::Result> { diff --git a/src/main.rs b/src/main.rs index 9477a8a..dcfa946 100644 --- a/src/main.rs +++ b/src/main.rs @@ -117,7 +117,17 @@ fn main() { loop { match state.upload_chunk(&mut buffer) { - Err(e) => error!("error: {e:?}"), // HACK handle errors better + Err(e) => { + // HACK handle errors better + error!("error: {e:?}"); + + if let Some(s) = state.rewind() { + state = s; + } else { + eprintln!("{} Failed to retry chunk!", style("Error:").red().bold()); + process::exit(1); + }; + } Ok(true) => { info!("all uploads done"); state.clear().unwrap_or_else(|e| {