diff --git a/src/appstate.rs b/src/appstate.rs index 155fc35..8ce7180 100644 --- a/src/appstate.rs +++ b/src/appstate.rs @@ -11,14 +11,12 @@ use log::{debug, warn}; use super::{ cachefile::CacheFile, cli::Cli, - file::{self, FileTrait}, - sharry::{self, Client, ClientError}, + file::{self, Chunk, FileTrait}, + sharry::{self, Client}, }; pub struct AppState { progress: RefCell>, - buffer: Vec, - http: ureq::Agent, inner: CacheFile, } @@ -39,10 +37,9 @@ fn new_http(timeout: Option) -> ureq::Agent { } impl AppState { - fn new(chunk_size: usize, http: ureq::Agent, inner: CacheFile) -> Self { + fn new(http: ureq::Agent, inner: CacheFile) -> Self { Self { - progress: None.into(), - buffer: vec![0; chunk_size * 1024 * 1024], + progress: RefCell::new(None), http, inner, } @@ -53,11 +50,7 @@ impl AppState { .inspect_err(|e| debug!("could not resume from hash {:?}: {e}", args.get_hash())) .ok()?; - Some(Self::new( - args.chunk_size, - new_http(args.get_timeout()), - inner, - )) + Some(Self::new(new_http(args.get_timeout()), inner)) } pub fn from_args(args: &Cli) -> sharry::Result { @@ -69,11 +62,7 @@ impl AppState { args.get_share_request(), )?; - Ok(Self::new( - args.chunk_size, - http, - CacheFile::from_args(args, share_id), - )) + Ok(Self::new(http, CacheFile::from_args(args, share_id))) } fn get_or_create_progressbar(&self, uploading: &file::Uploading) -> Ref<'_, ProgressBar> { @@ -112,25 +101,57 @@ impl AppState { } } - pub fn upload_chunk(&mut self) -> sharry::Result { + fn next_chunk<'t>(&mut self, buffer: &'t mut [u8]) -> io::Result>> { let Some(mut uploading) = self.inner.pop_file(&self.http) else { self.inner .share_notify(&self.http) .unwrap_or_else(|e| warn!("Failed to notify the share: {e}")); - return Ok(true); + return Ok(None); }; - debug!("{uploading:?}"); self.get_or_create_progressbar(&uploading); - let chunk = uploading - .read(&mut self.buffer) - .map_err(ClientError::from)?; + let chunk_res = uploading.read(buffer); + self.inner.push_file(uploading); + let chunk = chunk_res?; debug!("{chunk:?}"); + Ok(Some(chunk)) + } + + fn is_done(&mut self) -> bool { + let Some(uploading) = self.inner.pop_file(&self.http) else { + return true; + }; + + match uploading.check_eof() { + Ok(uploading) => { + let bar = self.get_or_create_progressbar(&uploading); + bar.set_position(uploading.get_offset()); + // BUG in `indicatif` crate? + // `set_position` does not force an immediate redraw, so we also call `inc_length` here + bar.inc_length(0); + drop(bar); + + self.inner.push_file(uploading); + } + Err(path) => { + debug!("Finished {:?}!", path.display()); + self.finish_progressbar(); + } + } + + self.inner.is_empty() + } + + pub fn upload_chunk(&mut self, buffer: &mut [u8]) -> sharry::Result { + let Some(chunk) = self.next_chunk(buffer)? else { + return Ok(true); + }; + self.http.file_patch( chunk.get_patch_uri(), self.inner.alias_id(), @@ -138,25 +159,19 @@ impl AppState { chunk.get_data(), )?; - match uploading.check_eof() { - Ok(uploading) => { - let bar = self.get_or_create_progressbar(&uploading); - bar.set_position(uploading.get_offset()); - // BUG in `indicatif` crate? - // `set_position` does not force immediate redraw, so we also call `inc_length` here - bar.inc_length(0); - drop(bar); + Ok(self.is_done()) + } - self.inner.push_file(uploading); - Ok(false) - } - Err(path) => { - debug!("Finished {:?}!", path.display()); - self.finish_progressbar(); + pub fn rewind(mut self) -> Option { + let Some(uploading) = self.inner.pop_file(&self.http) else { + warn!("rewind called on empty queue"); + return None; + }; - Ok(self.inner.is_empty()) - } - } + let uploading = uploading.rewind()?; + self.inner.push_file(uploading); + + Some(self) } pub fn file_names(&self) -> Vec<&str> { diff --git a/src/file/chunk.rs b/src/file/chunk.rs index 8f69dd2..f93f172 100644 --- a/src/file/chunk.rs +++ b/src/file/chunk.rs @@ -2,7 +2,7 @@ use std::fmt; pub struct Chunk<'t> { data: &'t [u8], - patch_uri: &'t str, + patch_uri: String, offset: u64, } @@ -17,7 +17,7 @@ impl fmt::Debug for Chunk<'_> { } impl<'t> Chunk<'t> { - pub fn new(data: &'t [u8], patch_uri: &'t str, offset: u64) -> Self { + pub fn new(data: &'t [u8], patch_uri: String, offset: u64) -> Self { Self { data, patch_uri, @@ -39,7 +39,7 @@ impl<'t> Chunk<'t> { } pub fn get_patch_uri(&self) -> &str { - self.patch_uri + &self.patch_uri } pub fn get_offset(&self) -> u64 { diff --git a/src/file/uploading.rs b/src/file/uploading.rs index 26a83e7..0b2118f 100644 --- a/src/file/uploading.rs +++ b/src/file/uploading.rs @@ -4,6 +4,7 @@ use std::{ path::PathBuf, }; +use log::warn; use serde::{Deserialize, Serialize}; use super::{Chunk, FileTrait}; @@ -13,6 +14,7 @@ pub struct Uploading { path: PathBuf, size: u64, patch_uri: String, + last_offset: Option, offset: u64, } @@ -34,6 +36,7 @@ impl Uploading { path, size, patch_uri, + last_offset: None, offset: 0, } } @@ -42,7 +45,21 @@ impl Uploading { self.offset } - pub fn read<'t>(&'t mut self, buf: &'t mut [u8]) -> io::Result> { + pub fn rewind(self) -> Option { + match self.last_offset { + Some(last_offset) => Some(Self { + last_offset: None, + offset: last_offset, + ..self + }), + None => { + warn!("attempted to rewind twice"); + None + } + } + } + + pub fn read<'t>(&mut self, buf: &'t mut [u8]) -> io::Result> { let mut f = fs::File::open(&self.path)?; f.seek(SeekFrom::Start(self.offset))?; @@ -55,7 +72,8 @@ impl Uploading { )); } - let chunk = Chunk::new(&buf[..read_len], &self.patch_uri, self.offset); + let chunk = Chunk::new(&buf[..read_len], self.patch_uri.clone(), self.offset); + self.last_offset = Some(self.offset); self.offset += chunk.get_length(); Ok(chunk) diff --git a/src/main.rs b/src/main.rs index 6f6287d..974e1dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -113,9 +113,21 @@ fn main() { style(state.file_names().join(", ")).magenta(), ); + let mut buffer = vec![0; args.chunk_size * 1024 * 1024]; + loop { - match state.upload_chunk() { - Err(e) => error!("error: {e:?}"), // HACK handle errors better + match state.upload_chunk(&mut buffer) { + Err(e) => { + // HACK handle errors better + error!("error: {e:?}"); + + if let Some(s) = state.rewind() { + state = s; + } else { + eprintln!("{} Failed to retry chunk!", style("Error:").red().bold()); + process::exit(1); + }; + } Ok(true) => { info!("all uploads done"); break;