Compare commits

..

No commits in common. "876869b07357a96c431c20511e9e08cc85a3e4e0" and "e2151b592c339e0b700dac89a476755c34e4e94e" have entirely different histories.

4 changed files with 47 additions and 92 deletions

View file

@ -11,12 +11,14 @@ use log::{debug, warn};
use super::{ use super::{
cachefile::CacheFile, cachefile::CacheFile,
cli::Cli, cli::Cli,
file::{self, Chunk, FileTrait}, file::{self, FileTrait},
sharry::{self, Client}, sharry::{self, Client, ClientError},
}; };
pub struct AppState { pub struct AppState {
progress: RefCell<Option<ProgressBar>>, progress: RefCell<Option<ProgressBar>>,
buffer: Vec<u8>,
http: ureq::Agent, http: ureq::Agent,
inner: CacheFile, inner: CacheFile,
} }
@ -37,9 +39,10 @@ fn new_http(timeout: Option<Duration>) -> ureq::Agent {
} }
impl AppState { impl AppState {
fn new(http: ureq::Agent, inner: CacheFile) -> Self { fn new(chunk_size: usize, http: ureq::Agent, inner: CacheFile) -> Self {
Self { Self {
progress: RefCell::new(None), progress: None.into(),
buffer: vec![0; chunk_size * 1024 * 1024],
http, http,
inner, inner,
} }
@ -50,7 +53,11 @@ impl AppState {
.inspect_err(|e| debug!("could not resume from hash {:?}: {e}", args.get_hash())) .inspect_err(|e| debug!("could not resume from hash {:?}: {e}", args.get_hash()))
.ok()?; .ok()?;
Some(Self::new(new_http(args.get_timeout()), inner)) Some(Self::new(
args.chunk_size,
new_http(args.get_timeout()),
inner,
))
} }
pub fn from_args(args: &Cli) -> sharry::Result<Self> { pub fn from_args(args: &Cli) -> sharry::Result<Self> {
@ -62,7 +69,11 @@ impl AppState {
args.get_share_request(), args.get_share_request(),
)?; )?;
Ok(Self::new(http, CacheFile::from_args(args, share_id))) Ok(Self::new(
args.chunk_size,
http,
CacheFile::from_args(args, share_id),
))
} }
fn get_or_create_progressbar(&self, uploading: &file::Uploading) -> Ref<'_, ProgressBar> { fn get_or_create_progressbar(&self, uploading: &file::Uploading) -> Ref<'_, ProgressBar> {
@ -101,57 +112,25 @@ impl AppState {
} }
} }
fn next_chunk<'t>(&mut self, buffer: &'t mut [u8]) -> io::Result<Option<Chunk<'t>>> { pub fn upload_chunk(&mut self) -> sharry::Result<bool> {
let Some(mut uploading) = self.inner.pop_file(&self.http) else { let Some(mut uploading) = self.inner.pop_file(&self.http) else {
self.inner self.inner
.share_notify(&self.http) .share_notify(&self.http)
.unwrap_or_else(|e| warn!("Failed to notify the share: {e}")); .unwrap_or_else(|e| warn!("Failed to notify the share: {e}"));
return Ok(None); return Ok(true);
}; };
debug!("{uploading:?}"); debug!("{uploading:?}");
self.get_or_create_progressbar(&uploading); self.get_or_create_progressbar(&uploading);
let chunk_res = uploading.read(buffer); let chunk = uploading
self.inner.push_file(uploading); .read(&mut self.buffer)
.map_err(ClientError::from)?;
let chunk = chunk_res?;
debug!("{chunk:?}"); debug!("{chunk:?}");
Ok(Some(chunk))
}
fn is_done(&mut self) -> bool {
let Some(uploading) = self.inner.pop_file(&self.http) else {
return true;
};
match uploading.check_eof() {
Ok(uploading) => {
let bar = self.get_or_create_progressbar(&uploading);
bar.set_position(uploading.get_offset());
// BUG in `indicatif` crate?
// `set_position` does not force an immediate redraw, so we also call `inc_length` here
bar.inc_length(0);
drop(bar);
self.inner.push_file(uploading);
}
Err(path) => {
debug!("Finished {:?}!", path.display());
self.finish_progressbar();
}
}
self.inner.is_empty()
}
pub fn upload_chunk(&mut self, buffer: &mut [u8]) -> sharry::Result<bool> {
let Some(chunk) = self.next_chunk(buffer)? else {
return Ok(true);
};
self.http.file_patch( self.http.file_patch(
chunk.get_patch_uri(), chunk.get_patch_uri(),
self.inner.alias_id(), self.inner.alias_id(),
@ -159,19 +138,25 @@ impl AppState {
chunk.get_data(), chunk.get_data(),
)?; )?;
Ok(self.is_done()) match uploading.check_eof() {
} Ok(uploading) => {
let bar = self.get_or_create_progressbar(&uploading);
bar.set_position(uploading.get_offset());
// BUG in `indicatif` crate?
// `set_position` does not force immediate redraw, so we also call `inc_length` here
bar.inc_length(0);
drop(bar);
pub fn rewind(mut self) -> Option<Self> { self.inner.push_file(uploading);
let Some(uploading) = self.inner.pop_file(&self.http) else { Ok(false)
warn!("rewind called on empty queue"); }
return None; Err(path) => {
}; debug!("Finished {:?}!", path.display());
self.finish_progressbar();
let uploading = uploading.rewind()?; Ok(self.inner.is_empty())
self.inner.push_file(uploading); }
}
Some(self)
} }
pub fn file_names(&self) -> Vec<&str> { pub fn file_names(&self) -> Vec<&str> {

View file

@ -2,7 +2,7 @@ use std::fmt;
pub struct Chunk<'t> { pub struct Chunk<'t> {
data: &'t [u8], data: &'t [u8],
patch_uri: String, patch_uri: &'t str,
offset: u64, offset: u64,
} }
@ -17,7 +17,7 @@ impl fmt::Debug for Chunk<'_> {
} }
impl<'t> Chunk<'t> { impl<'t> Chunk<'t> {
pub fn new(data: &'t [u8], patch_uri: String, offset: u64) -> Self { pub fn new(data: &'t [u8], patch_uri: &'t str, offset: u64) -> Self {
Self { Self {
data, data,
patch_uri, patch_uri,
@ -39,7 +39,7 @@ impl<'t> Chunk<'t> {
} }
pub fn get_patch_uri(&self) -> &str { pub fn get_patch_uri(&self) -> &str {
&self.patch_uri self.patch_uri
} }
pub fn get_offset(&self) -> u64 { pub fn get_offset(&self) -> u64 {

View file

@ -4,7 +4,6 @@ use std::{
path::PathBuf, path::PathBuf,
}; };
use log::warn;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::{Chunk, FileTrait}; use super::{Chunk, FileTrait};
@ -14,7 +13,6 @@ pub struct Uploading {
path: PathBuf, path: PathBuf,
size: u64, size: u64,
patch_uri: String, patch_uri: String,
last_offset: Option<u64>,
offset: u64, offset: u64,
} }
@ -36,7 +34,6 @@ impl Uploading {
path, path,
size, size,
patch_uri, patch_uri,
last_offset: None,
offset: 0, offset: 0,
} }
} }
@ -45,21 +42,7 @@ impl Uploading {
self.offset self.offset
} }
pub fn rewind(self) -> Option<Self> { pub fn read<'t>(&'t mut self, buf: &'t mut [u8]) -> io::Result<Chunk<'t>> {
match self.last_offset {
Some(last_offset) => Some(Self {
last_offset: None,
offset: last_offset,
..self
}),
None => {
warn!("attempted to rewind twice");
None
}
}
}
pub fn read<'t>(&mut self, buf: &'t mut [u8]) -> io::Result<Chunk<'t>> {
let mut f = fs::File::open(&self.path)?; let mut f = fs::File::open(&self.path)?;
f.seek(SeekFrom::Start(self.offset))?; f.seek(SeekFrom::Start(self.offset))?;
@ -72,8 +55,7 @@ impl Uploading {
)); ));
} }
let chunk = Chunk::new(&buf[..read_len], self.patch_uri.clone(), self.offset); let chunk = Chunk::new(&buf[..read_len], &self.patch_uri, self.offset);
self.last_offset = Some(self.offset);
self.offset += chunk.get_length(); self.offset += chunk.get_length();
Ok(chunk) Ok(chunk)

View file

@ -113,21 +113,9 @@ fn main() {
style(state.file_names().join(", ")).magenta(), style(state.file_names().join(", ")).magenta(),
); );
let mut buffer = vec![0; args.chunk_size * 1024 * 1024];
loop { loop {
match state.upload_chunk(&mut buffer) { match state.upload_chunk() {
Err(e) => { Err(e) => error!("error: {e:?}"), // HACK handle errors better
// HACK handle errors better
error!("error: {e:?}");
if let Some(s) = state.rewind() {
state = s;
} else {
eprintln!("{} Failed to retry chunk!", style("Error:").red().bold());
process::exit(1);
};
}
Ok(true) => { Ok(true) => {
info!("all uploads done"); info!("all uploads done");
break; break;