Compare commits
5 commits
540953e4a9
...
686e0c3e5c
| Author | SHA1 | Date | |
|---|---|---|---|
| 686e0c3e5c | |||
| 5b403ea129 | |||
| 393feec125 | |||
| df055fc4e9 | |||
| 865566ad0c |
4 changed files with 126 additions and 124 deletions
123
src/appstate.rs
123
src/appstate.rs
|
|
@ -1,8 +1,4 @@
|
||||||
use std::{
|
use std::{cell::RefCell, fmt, io, time::Duration};
|
||||||
cell::{Ref, RefCell},
|
|
||||||
fmt, io,
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
|
|
||||||
use console::style;
|
use console::style;
|
||||||
use indicatif::{ProgressBar, ProgressStyle};
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
|
|
@ -11,7 +7,7 @@ use log::{debug, warn};
|
||||||
use crate::{
|
use crate::{
|
||||||
cachefile::CacheFile,
|
cachefile::CacheFile,
|
||||||
cli::Cli,
|
cli::Cli,
|
||||||
file::{self, Chunk, FileTrait},
|
file::{Chunk, FileTrait},
|
||||||
sharry::{self, Client},
|
sharry::{self, Client},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -61,10 +57,14 @@ impl AppState {
|
||||||
Ok(Self::new(http, CacheFile::from_args(args, share_id)))
|
Ok(Self::new(http, CacheFile::from_args(args, share_id)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_or_create_progressbar(&self, uploading: &file::Uploading) -> Ref<'_, ProgressBar> {
|
fn with_progressbar(&self, drop_bar: bool, f: impl FnOnce(&ProgressBar)) {
|
||||||
|
let Some(upl) = self.inner.peek_uploading() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
let mut slot = self.progress.borrow_mut();
|
let mut slot = self.progress.borrow_mut();
|
||||||
if slot.is_none() {
|
if slot.is_none() {
|
||||||
let bar = ProgressBar::new(uploading.get_size())
|
let bar = ProgressBar::no_length()
|
||||||
.with_style(
|
.with_style(
|
||||||
ProgressStyle::with_template(&format!(
|
ProgressStyle::with_template(&format!(
|
||||||
concat!(
|
concat!(
|
||||||
|
|
@ -76,75 +76,68 @@ impl AppState {
|
||||||
))
|
))
|
||||||
.expect("style template is not valid"),
|
.expect("style template is not valid"),
|
||||||
)
|
)
|
||||||
.with_position(uploading.get_offset())
|
.with_message(upl.get_name().to_owned());
|
||||||
.with_message(uploading.get_name().to_owned());
|
|
||||||
|
|
||||||
bar.enable_steady_tick(Duration::from_millis(100));
|
bar.enable_steady_tick(Duration::from_millis(100));
|
||||||
*slot = Some(bar);
|
*slot = Some(bar);
|
||||||
}
|
}
|
||||||
drop(slot);
|
|
||||||
|
|
||||||
Ref::map(self.progress.borrow(), |opt| {
|
let bar = slot.as_ref().expect("somehow the slot holds None");
|
||||||
opt.as_ref().expect("somehow the slot holds None")
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn end_progressbar(&self, f: impl FnOnce(&ProgressBar)) {
|
bar.set_position(upl.get_offset());
|
||||||
let mut slot = self.progress.borrow_mut();
|
// BUG in `indicatif` crate?
|
||||||
if let Some(bar) = slot.as_ref() {
|
// `set_position` does not force an immediate redraw, so we need to call `set_length` after
|
||||||
f(bar);
|
bar.set_length(upl.get_size());
|
||||||
|
|
||||||
|
f(bar);
|
||||||
|
|
||||||
|
if drop_bar {
|
||||||
*slot = None;
|
*slot = None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_chunk<'t>(&mut self, buffer: &'t mut [u8]) -> io::Result<Option<Chunk<'t>>> {
|
fn touch_progressbar(&self) {
|
||||||
let Some(mut uploading) = self.inner.pop_file(&self.http) else {
|
self.with_progressbar(false, |_| ());
|
||||||
self.inner
|
}
|
||||||
.share_notify(&self.http)
|
|
||||||
.unwrap_or_else(|e| warn!("Failed to notify the share: {e}"));
|
|
||||||
|
|
||||||
|
fn next_chunk<'t>(&mut self, buffer: &'t mut [u8]) -> sharry::Result<Option<Chunk<'t>>> {
|
||||||
|
if self.inner.get_uploading(&self.http)?.is_none() {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
}
|
||||||
|
|
||||||
|
self.touch_progressbar();
|
||||||
|
|
||||||
|
let uploading = self
|
||||||
|
.inner
|
||||||
|
.get_uploading(&self.http)?
|
||||||
|
.expect("we just checked!");
|
||||||
debug!("{uploading:?}");
|
debug!("{uploading:?}");
|
||||||
|
|
||||||
self.get_or_create_progressbar(&uploading);
|
let chunk = uploading.read(buffer)?;
|
||||||
|
|
||||||
let chunk_res = uploading.read(buffer);
|
|
||||||
self.inner.push_file(uploading);
|
|
||||||
|
|
||||||
let chunk = chunk_res?;
|
|
||||||
debug!("{chunk:?}");
|
debug!("{chunk:?}");
|
||||||
|
|
||||||
Ok(Some(chunk))
|
Ok(Some(chunk))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_done(&mut self) -> bool {
|
fn is_done(&mut self) -> bool {
|
||||||
let Some(uploading) = self.inner.pop_file(&self.http) else {
|
if let Some(path) = self.inner.check_eof() {
|
||||||
return true;
|
debug!("Finished {:?}!", path.display());
|
||||||
};
|
self.with_progressbar(true, ProgressBar::finish);
|
||||||
|
} else if self.inner.peek_uploading().is_some() {
|
||||||
|
self.touch_progressbar();
|
||||||
|
|
||||||
match uploading.check_eof() {
|
return false;
|
||||||
Ok(uploading) => {
|
|
||||||
let bar = self.get_or_create_progressbar(&uploading);
|
|
||||||
bar.set_position(uploading.get_offset());
|
|
||||||
// BUG in `indicatif` crate?
|
|
||||||
// `set_position` does not force an immediate redraw, so we also call `inc_length` here
|
|
||||||
bar.inc_length(0);
|
|
||||||
drop(bar);
|
|
||||||
|
|
||||||
self.inner.push_file(uploading);
|
|
||||||
}
|
|
||||||
Err(path) => {
|
|
||||||
debug!("Finished {:?}!", path.display());
|
|
||||||
self.end_progressbar(ProgressBar::finish);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
self.inner.is_empty()
|
self.inner.queue_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn upload_chunk(&mut self, buffer: &mut [u8]) -> sharry::Result<bool> {
|
pub fn upload_chunk(&mut self, buffer: &mut [u8]) -> sharry::Result<bool> {
|
||||||
let Some(chunk) = self.next_chunk(buffer)? else {
|
let Some(chunk) = self.next_chunk(buffer)? else {
|
||||||
|
self.inner
|
||||||
|
.share_notify(&self.http)
|
||||||
|
.unwrap_or_else(|e| warn!("Failed to notify the share: {e}"));
|
||||||
|
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -153,30 +146,24 @@ impl AppState {
|
||||||
Ok(self.is_done())
|
Ok(self.is_done())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rewind(mut self) -> Option<Self> {
|
pub fn rewind_chunk(mut self) -> Option<Self> {
|
||||||
let Some(uploading) = self.inner.pop_file(&self.http) else {
|
self.inner = self.inner.rewind_chunk()?;
|
||||||
warn!("rewind called on empty queue");
|
|
||||||
return None;
|
|
||||||
};
|
|
||||||
|
|
||||||
let uploading = uploading.rewind()?;
|
|
||||||
self.inner.push_file(uploading);
|
|
||||||
|
|
||||||
Some(self)
|
Some(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn requeue_file(mut self) -> Option<Self> {
|
pub fn abort_upload(&mut self) {
|
||||||
let Some(uploading) = self.inner.pop_file(&self.http) else {
|
self.inner.abort_upload();
|
||||||
warn!("requeue_file called on empty queue");
|
self.with_progressbar(true, ProgressBar::abandon);
|
||||||
return None;
|
}
|
||||||
};
|
|
||||||
|
|
||||||
let checked = uploading.abort();
|
pub fn rebuild_share(self, args: &Cli) -> sharry::Result<Self> {
|
||||||
self.inner.requeue_file(checked);
|
let share_id =
|
||||||
|
self.http
|
||||||
|
.share_create(&args.get_uri(), &args.alias, args.get_share_request())?;
|
||||||
|
|
||||||
self.end_progressbar(ProgressBar::abandon);
|
self.with_progressbar(true, ProgressBar::abandon);
|
||||||
|
Ok(Self::new(self.http, CacheFile::from_args(args, share_id)))
|
||||||
Some(self)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn save(&self) -> io::Result<()> {
|
pub fn save(&self) -> io::Result<()> {
|
||||||
|
|
|
||||||
|
|
@ -14,27 +14,6 @@ use crate::{
|
||||||
sharry::{self, Client, Uri},
|
sharry::{self, Client, Uri},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
|
||||||
enum FileState {
|
|
||||||
C(file::Checked),
|
|
||||||
U(file::Uploading),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FileState {
|
|
||||||
fn start_upload(
|
|
||||||
self,
|
|
||||||
client: &impl sharry::Client,
|
|
||||||
uri: &sharry::Uri,
|
|
||||||
alias_id: &str,
|
|
||||||
share_id: &str,
|
|
||||||
) -> sharry::Result<file::Uploading> {
|
|
||||||
match self {
|
|
||||||
FileState::C(checked) => checked.start_upload(client, uri, alias_id, share_id),
|
|
||||||
FileState::U(uploading) => Ok(uploading),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub struct CacheFile {
|
pub struct CacheFile {
|
||||||
#[serde(skip)]
|
#[serde(skip)]
|
||||||
|
|
@ -43,7 +22,9 @@ pub struct CacheFile {
|
||||||
uri: Uri,
|
uri: Uri,
|
||||||
alias_id: String,
|
alias_id: String,
|
||||||
share_id: String,
|
share_id: String,
|
||||||
files: VecDeque<FileState>,
|
|
||||||
|
uploading: Option<file::Uploading>,
|
||||||
|
files: VecDeque<file::Checked>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CacheFile {
|
impl CacheFile {
|
||||||
|
|
@ -81,34 +62,62 @@ impl CacheFile {
|
||||||
uri: args.get_uri(),
|
uri: args.get_uri(),
|
||||||
alias_id: args.alias.clone(),
|
alias_id: args.alias.clone(),
|
||||||
share_id,
|
share_id,
|
||||||
files: args.files.clone().into_iter().map(FileState::C).collect(),
|
uploading: None,
|
||||||
|
files: args.files.clone().into(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_empty(&self) -> bool {
|
pub fn queue_empty(&self) -> bool {
|
||||||
self.files.is_empty()
|
self.files.is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn pop_file(&mut self, client: &impl Client) -> Option<file::Uploading> {
|
pub fn get_uploading(
|
||||||
if let Some(state) = self.files.pop_front() {
|
&mut self,
|
||||||
// HACK unwrap
|
client: &impl Client,
|
||||||
// TODO somehow retry
|
) -> sharry::Result<Option<&mut file::Uploading>> {
|
||||||
Some(
|
if self.uploading.is_some() {
|
||||||
state
|
Ok(self.uploading.as_mut())
|
||||||
.start_upload(client, &self.uri, &self.alias_id, &self.share_id)
|
} else if let Some(chk) = self.files.pop_front() {
|
||||||
.unwrap(),
|
let upl = chk.start_upload(client, &self.uri, &self.alias_id, &self.share_id)?;
|
||||||
)
|
self.uploading.replace(upl);
|
||||||
|
|
||||||
|
Ok(self.uploading.as_mut())
|
||||||
} else {
|
} else {
|
||||||
None
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn push_file(&mut self, file: file::Uploading) {
|
pub fn peek_uploading(&self) -> Option<&file::Uploading> {
|
||||||
self.files.push_front(FileState::U(file));
|
self.uploading.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn requeue_file(&mut self, file: file::Checked) {
|
pub fn check_eof(&mut self) -> Option<PathBuf> {
|
||||||
self.files.push_back(FileState::C(file));
|
if let Some(upl) = self.uploading.take() {
|
||||||
|
match upl.check_eof() {
|
||||||
|
Ok(upl) => self.uploading = Some(upl),
|
||||||
|
Err(p) => return Some(p),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn rewind_chunk(mut self) -> Option<Self> {
|
||||||
|
let Some(upl) = self.uploading.take() else {
|
||||||
|
panic!("rewind_chunk called while not uploading");
|
||||||
|
};
|
||||||
|
|
||||||
|
self.uploading = Some(upl.rewind()?);
|
||||||
|
|
||||||
|
Some(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn abort_upload(&mut self) {
|
||||||
|
let Some(upl) = self.uploading.take() else {
|
||||||
|
panic!("abort_upload called while not uploading");
|
||||||
|
};
|
||||||
|
|
||||||
|
self.files.push_front(upl.abort());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn share_notify(&self, client: &impl Client) -> sharry::Result<()> {
|
pub fn share_notify(&self, client: &impl Client) -> sharry::Result<()> {
|
||||||
|
|
|
||||||
|
|
@ -72,6 +72,8 @@ impl sharry::Client for ureq::Agent {
|
||||||
debug!("{res:?}");
|
debug!("{res:?}");
|
||||||
|
|
||||||
if res.success && (res.message == "Share created.") {
|
if res.success && (res.message == "Share created.") {
|
||||||
|
trace!("new share id: {:?}", res.id);
|
||||||
|
|
||||||
Ok(res.id)
|
Ok(res.id)
|
||||||
} else {
|
} else {
|
||||||
Err(ClientError::response(format!("{res:?}")))
|
Err(ClientError::response(format!("{res:?}")))
|
||||||
|
|
|
||||||
40
src/main.rs
40
src/main.rs
|
|
@ -88,31 +88,35 @@ fn main() {
|
||||||
match state.upload_chunk(&mut buffer) {
|
match state.upload_chunk(&mut buffer) {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
Log::handle(&e);
|
Log::handle(&e);
|
||||||
tries += 1;
|
|
||||||
|
|
||||||
if let ClientError::InvalidParameter(p) = e { match p {
|
if let ClientError::InvalidParameter(p) = e {
|
||||||
// TODO Error 404: File not found
|
match p {
|
||||||
Parameter::FileID(fid) => {
|
// TODO Error 404: File not found
|
||||||
// requeue file
|
Parameter::FileID(fid) => {
|
||||||
let Some(s) = state.requeue_file() else {
|
trace!("requeueing file {fid:?}");
|
||||||
Log::error("Failed to requeue file!");
|
|
||||||
};
|
|
||||||
|
|
||||||
trace!("File {fid:?} requeued (tried: {tries})");
|
state.abort_upload();
|
||||||
state = s;
|
}
|
||||||
|
// TODO Error 404: Share not found
|
||||||
|
Parameter::ShareID(sid) => {
|
||||||
|
trace!("rebuilding share {sid:?}");
|
||||||
|
|
||||||
|
// rebuild share
|
||||||
|
let Ok(s) = state.rebuild_share(&args) else {
|
||||||
|
Log::error("Failed to rebuild share!");
|
||||||
|
};
|
||||||
|
state = s;
|
||||||
|
}
|
||||||
|
p => Log::error(format_args!("Unexpected {p}!")),
|
||||||
}
|
}
|
||||||
// TODO Error 404: Share might have been deleted
|
} else {
|
||||||
Parameter::ShareID(sid) => {
|
|
||||||
Log::error(format_args!("404 sid: {sid}"));
|
|
||||||
}
|
|
||||||
p => Log::error(format_args!("Unexpected {p}!")),
|
|
||||||
} } else {
|
|
||||||
// retry chunk
|
// retry chunk
|
||||||
let Some(s) = state.rewind() else {
|
let Some(s) = state.rewind_chunk() else {
|
||||||
Log::error("Failed to retry chunk!");
|
Log::error("Failed to retry chunk!");
|
||||||
};
|
};
|
||||||
|
tries += 1;
|
||||||
|
|
||||||
trace!("State rewound, retrying last chunk (tried: {tries})");
|
trace!("State rewound, retrying last chunk (tries: {tries})");
|
||||||
state = s;
|
state = s;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue