Compare commits

..

No commits in common. "686e0c3e5c9b5b88e940aec272e5a53a63488d2b" and "540953e4a90c849d6f52c619aacf60041a991865" have entirely different histories.

4 changed files with 124 additions and 126 deletions

View file

@ -1,4 +1,8 @@
use std::{cell::RefCell, fmt, io, time::Duration}; use std::{
cell::{Ref, RefCell},
fmt, io,
time::Duration,
};
use console::style; use console::style;
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::{ProgressBar, ProgressStyle};
@ -7,7 +11,7 @@ use log::{debug, warn};
use crate::{ use crate::{
cachefile::CacheFile, cachefile::CacheFile,
cli::Cli, cli::Cli,
file::{Chunk, FileTrait}, file::{self, Chunk, FileTrait},
sharry::{self, Client}, sharry::{self, Client},
}; };
@ -57,14 +61,10 @@ impl AppState {
Ok(Self::new(http, CacheFile::from_args(args, share_id))) Ok(Self::new(http, CacheFile::from_args(args, share_id)))
} }
fn with_progressbar(&self, drop_bar: bool, f: impl FnOnce(&ProgressBar)) { fn get_or_create_progressbar(&self, uploading: &file::Uploading) -> Ref<'_, ProgressBar> {
let Some(upl) = self.inner.peek_uploading() else {
return;
};
let mut slot = self.progress.borrow_mut(); let mut slot = self.progress.borrow_mut();
if slot.is_none() { if slot.is_none() {
let bar = ProgressBar::no_length() let bar = ProgressBar::new(uploading.get_size())
.with_style( .with_style(
ProgressStyle::with_template(&format!( ProgressStyle::with_template(&format!(
concat!( concat!(
@ -76,68 +76,75 @@ impl AppState {
)) ))
.expect("style template is not valid"), .expect("style template is not valid"),
) )
.with_message(upl.get_name().to_owned()); .with_position(uploading.get_offset())
.with_message(uploading.get_name().to_owned());
bar.enable_steady_tick(Duration::from_millis(100)); bar.enable_steady_tick(Duration::from_millis(100));
*slot = Some(bar); *slot = Some(bar);
} }
drop(slot);
let bar = slot.as_ref().expect("somehow the slot holds None"); Ref::map(self.progress.borrow(), |opt| {
opt.as_ref().expect("somehow the slot holds None")
})
}
bar.set_position(upl.get_offset()); fn end_progressbar(&self, f: impl FnOnce(&ProgressBar)) {
// BUG in `indicatif` crate? let mut slot = self.progress.borrow_mut();
// `set_position` does not force an immediate redraw, so we need to call `set_length` after if let Some(bar) = slot.as_ref() {
bar.set_length(upl.get_size()); f(bar);
f(bar);
if drop_bar {
*slot = None; *slot = None;
} }
} }
fn touch_progressbar(&self) { fn next_chunk<'t>(&mut self, buffer: &'t mut [u8]) -> io::Result<Option<Chunk<'t>>> {
self.with_progressbar(false, |_| ()); let Some(mut uploading) = self.inner.pop_file(&self.http) else {
} self.inner
.share_notify(&self.http)
.unwrap_or_else(|e| warn!("Failed to notify the share: {e}"));
fn next_chunk<'t>(&mut self, buffer: &'t mut [u8]) -> sharry::Result<Option<Chunk<'t>>> {
if self.inner.get_uploading(&self.http)?.is_none() {
return Ok(None); return Ok(None);
} };
self.touch_progressbar();
let uploading = self
.inner
.get_uploading(&self.http)?
.expect("we just checked!");
debug!("{uploading:?}"); debug!("{uploading:?}");
let chunk = uploading.read(buffer)?; self.get_or_create_progressbar(&uploading);
let chunk_res = uploading.read(buffer);
self.inner.push_file(uploading);
let chunk = chunk_res?;
debug!("{chunk:?}"); debug!("{chunk:?}");
Ok(Some(chunk)) Ok(Some(chunk))
} }
fn is_done(&mut self) -> bool { fn is_done(&mut self) -> bool {
if let Some(path) = self.inner.check_eof() { let Some(uploading) = self.inner.pop_file(&self.http) else {
debug!("Finished {:?}!", path.display()); return true;
self.with_progressbar(true, ProgressBar::finish); };
} else if self.inner.peek_uploading().is_some() {
self.touch_progressbar();
return false; match uploading.check_eof() {
Ok(uploading) => {
let bar = self.get_or_create_progressbar(&uploading);
bar.set_position(uploading.get_offset());
// BUG in `indicatif` crate?
// `set_position` does not force an immediate redraw, so we also call `inc_length` here
bar.inc_length(0);
drop(bar);
self.inner.push_file(uploading);
}
Err(path) => {
debug!("Finished {:?}!", path.display());
self.end_progressbar(ProgressBar::finish);
}
} }
self.inner.queue_empty() self.inner.is_empty()
} }
pub fn upload_chunk(&mut self, buffer: &mut [u8]) -> sharry::Result<bool> { pub fn upload_chunk(&mut self, buffer: &mut [u8]) -> sharry::Result<bool> {
let Some(chunk) = self.next_chunk(buffer)? else { let Some(chunk) = self.next_chunk(buffer)? else {
self.inner
.share_notify(&self.http)
.unwrap_or_else(|e| warn!("Failed to notify the share: {e}"));
return Ok(true); return Ok(true);
}; };
@ -146,24 +153,30 @@ impl AppState {
Ok(self.is_done()) Ok(self.is_done())
} }
pub fn rewind_chunk(mut self) -> Option<Self> { pub fn rewind(mut self) -> Option<Self> {
self.inner = self.inner.rewind_chunk()?; let Some(uploading) = self.inner.pop_file(&self.http) else {
warn!("rewind called on empty queue");
return None;
};
let uploading = uploading.rewind()?;
self.inner.push_file(uploading);
Some(self) Some(self)
} }
pub fn abort_upload(&mut self) { pub fn requeue_file(mut self) -> Option<Self> {
self.inner.abort_upload(); let Some(uploading) = self.inner.pop_file(&self.http) else {
self.with_progressbar(true, ProgressBar::abandon); warn!("requeue_file called on empty queue");
} return None;
};
pub fn rebuild_share(self, args: &Cli) -> sharry::Result<Self> { let checked = uploading.abort();
let share_id = self.inner.requeue_file(checked);
self.http
.share_create(&args.get_uri(), &args.alias, args.get_share_request())?;
self.with_progressbar(true, ProgressBar::abandon); self.end_progressbar(ProgressBar::abandon);
Ok(Self::new(self.http, CacheFile::from_args(args, share_id)))
Some(self)
} }
pub fn save(&self) -> io::Result<()> { pub fn save(&self) -> io::Result<()> {

View file

@ -14,6 +14,27 @@ use crate::{
sharry::{self, Client, Uri}, sharry::{self, Client, Uri},
}; };
#[derive(Serialize, Deserialize, Debug)]
enum FileState {
C(file::Checked),
U(file::Uploading),
}
impl FileState {
fn start_upload(
self,
client: &impl sharry::Client,
uri: &sharry::Uri,
alias_id: &str,
share_id: &str,
) -> sharry::Result<file::Uploading> {
match self {
FileState::C(checked) => checked.start_upload(client, uri, alias_id, share_id),
FileState::U(uploading) => Ok(uploading),
}
}
}
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct CacheFile { pub struct CacheFile {
#[serde(skip)] #[serde(skip)]
@ -22,9 +43,7 @@ pub struct CacheFile {
uri: Uri, uri: Uri,
alias_id: String, alias_id: String,
share_id: String, share_id: String,
files: VecDeque<FileState>,
uploading: Option<file::Uploading>,
files: VecDeque<file::Checked>,
} }
impl CacheFile { impl CacheFile {
@ -62,62 +81,34 @@ impl CacheFile {
uri: args.get_uri(), uri: args.get_uri(),
alias_id: args.alias.clone(), alias_id: args.alias.clone(),
share_id, share_id,
uploading: None, files: args.files.clone().into_iter().map(FileState::C).collect(),
files: args.files.clone().into(),
} }
} }
pub fn queue_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.files.is_empty() self.files.is_empty()
} }
pub fn get_uploading( pub fn pop_file(&mut self, client: &impl Client) -> Option<file::Uploading> {
&mut self, if let Some(state) = self.files.pop_front() {
client: &impl Client, // HACK unwrap
) -> sharry::Result<Option<&mut file::Uploading>> { // TODO somehow retry
if self.uploading.is_some() { Some(
Ok(self.uploading.as_mut()) state
} else if let Some(chk) = self.files.pop_front() { .start_upload(client, &self.uri, &self.alias_id, &self.share_id)
let upl = chk.start_upload(client, &self.uri, &self.alias_id, &self.share_id)?; .unwrap(),
self.uploading.replace(upl); )
Ok(self.uploading.as_mut())
} else { } else {
Ok(None) None
} }
} }
pub fn peek_uploading(&self) -> Option<&file::Uploading> { pub fn push_file(&mut self, file: file::Uploading) {
self.uploading.as_ref() self.files.push_front(FileState::U(file));
} }
pub fn check_eof(&mut self) -> Option<PathBuf> { pub fn requeue_file(&mut self, file: file::Checked) {
if let Some(upl) = self.uploading.take() { self.files.push_back(FileState::C(file));
match upl.check_eof() {
Ok(upl) => self.uploading = Some(upl),
Err(p) => return Some(p),
}
}
None
}
pub fn rewind_chunk(mut self) -> Option<Self> {
let Some(upl) = self.uploading.take() else {
panic!("rewind_chunk called while not uploading");
};
self.uploading = Some(upl.rewind()?);
Some(self)
}
pub fn abort_upload(&mut self) {
let Some(upl) = self.uploading.take() else {
panic!("abort_upload called while not uploading");
};
self.files.push_front(upl.abort());
} }
pub fn share_notify(&self, client: &impl Client) -> sharry::Result<()> { pub fn share_notify(&self, client: &impl Client) -> sharry::Result<()> {

View file

@ -72,8 +72,6 @@ impl sharry::Client for ureq::Agent {
debug!("{res:?}"); debug!("{res:?}");
if res.success && (res.message == "Share created.") { if res.success && (res.message == "Share created.") {
trace!("new share id: {:?}", res.id);
Ok(res.id) Ok(res.id)
} else { } else {
Err(ClientError::response(format!("{res:?}"))) Err(ClientError::response(format!("{res:?}")))

View file

@ -88,35 +88,31 @@ fn main() {
match state.upload_chunk(&mut buffer) { match state.upload_chunk(&mut buffer) {
Err(e) => { Err(e) => {
Log::handle(&e); Log::handle(&e);
tries += 1;
if let ClientError::InvalidParameter(p) = e { if let ClientError::InvalidParameter(p) = e { match p {
match p { // TODO Error 404: File not found
// TODO Error 404: File not found Parameter::FileID(fid) => {
Parameter::FileID(fid) => { // requeue file
trace!("requeueing file {fid:?}"); let Some(s) = state.requeue_file() else {
Log::error("Failed to requeue file!");
};
state.abort_upload(); trace!("File {fid:?} requeued (tried: {tries})");
} state = s;
// TODO Error 404: Share not found
Parameter::ShareID(sid) => {
trace!("rebuilding share {sid:?}");
// rebuild share
let Ok(s) = state.rebuild_share(&args) else {
Log::error("Failed to rebuild share!");
};
state = s;
}
p => Log::error(format_args!("Unexpected {p}!")),
} }
} else { // TODO Error 404: Share might have been deleted
Parameter::ShareID(sid) => {
Log::error(format_args!("404 sid: {sid}"));
}
p => Log::error(format_args!("Unexpected {p}!")),
} } else {
// retry chunk // retry chunk
let Some(s) = state.rewind_chunk() else { let Some(s) = state.rewind() else {
Log::error("Failed to retry chunk!"); Log::error("Failed to retry chunk!");
}; };
tries += 1;
trace!("State rewound, retrying last chunk (tries: {tries})"); trace!("State rewound, retrying last chunk (tried: {tries})");
state = s; state = s;
} }
} }