Compare commits
4 commits
e2151b592c
...
876869b073
| Author | SHA1 | Date | |
|---|---|---|---|
| 876869b073 | |||
| 30855ed8ff | |||
| 0a8e5cf3f0 | |||
| f05e112040 |
4 changed files with 92 additions and 47 deletions
|
|
@ -11,14 +11,12 @@ use log::{debug, warn};
|
||||||
use super::{
|
use super::{
|
||||||
cachefile::CacheFile,
|
cachefile::CacheFile,
|
||||||
cli::Cli,
|
cli::Cli,
|
||||||
file::{self, FileTrait},
|
file::{self, Chunk, FileTrait},
|
||||||
sharry::{self, Client, ClientError},
|
sharry::{self, Client},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct AppState {
|
pub struct AppState {
|
||||||
progress: RefCell<Option<ProgressBar>>,
|
progress: RefCell<Option<ProgressBar>>,
|
||||||
buffer: Vec<u8>,
|
|
||||||
|
|
||||||
http: ureq::Agent,
|
http: ureq::Agent,
|
||||||
inner: CacheFile,
|
inner: CacheFile,
|
||||||
}
|
}
|
||||||
|
|
@ -39,10 +37,9 @@ fn new_http(timeout: Option<Duration>) -> ureq::Agent {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AppState {
|
impl AppState {
|
||||||
fn new(chunk_size: usize, http: ureq::Agent, inner: CacheFile) -> Self {
|
fn new(http: ureq::Agent, inner: CacheFile) -> Self {
|
||||||
Self {
|
Self {
|
||||||
progress: None.into(),
|
progress: RefCell::new(None),
|
||||||
buffer: vec![0; chunk_size * 1024 * 1024],
|
|
||||||
http,
|
http,
|
||||||
inner,
|
inner,
|
||||||
}
|
}
|
||||||
|
|
@ -53,11 +50,7 @@ impl AppState {
|
||||||
.inspect_err(|e| debug!("could not resume from hash {:?}: {e}", args.get_hash()))
|
.inspect_err(|e| debug!("could not resume from hash {:?}: {e}", args.get_hash()))
|
||||||
.ok()?;
|
.ok()?;
|
||||||
|
|
||||||
Some(Self::new(
|
Some(Self::new(new_http(args.get_timeout()), inner))
|
||||||
args.chunk_size,
|
|
||||||
new_http(args.get_timeout()),
|
|
||||||
inner,
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_args(args: &Cli) -> sharry::Result<Self> {
|
pub fn from_args(args: &Cli) -> sharry::Result<Self> {
|
||||||
|
|
@ -69,11 +62,7 @@ impl AppState {
|
||||||
args.get_share_request(),
|
args.get_share_request(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(Self::new(
|
Ok(Self::new(http, CacheFile::from_args(args, share_id)))
|
||||||
args.chunk_size,
|
|
||||||
http,
|
|
||||||
CacheFile::from_args(args, share_id),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_or_create_progressbar(&self, uploading: &file::Uploading) -> Ref<'_, ProgressBar> {
|
fn get_or_create_progressbar(&self, uploading: &file::Uploading) -> Ref<'_, ProgressBar> {
|
||||||
|
|
@ -112,25 +101,57 @@ impl AppState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn upload_chunk(&mut self) -> sharry::Result<bool> {
|
fn next_chunk<'t>(&mut self, buffer: &'t mut [u8]) -> io::Result<Option<Chunk<'t>>> {
|
||||||
let Some(mut uploading) = self.inner.pop_file(&self.http) else {
|
let Some(mut uploading) = self.inner.pop_file(&self.http) else {
|
||||||
self.inner
|
self.inner
|
||||||
.share_notify(&self.http)
|
.share_notify(&self.http)
|
||||||
.unwrap_or_else(|e| warn!("Failed to notify the share: {e}"));
|
.unwrap_or_else(|e| warn!("Failed to notify the share: {e}"));
|
||||||
|
|
||||||
return Ok(true);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!("{uploading:?}");
|
debug!("{uploading:?}");
|
||||||
|
|
||||||
self.get_or_create_progressbar(&uploading);
|
self.get_or_create_progressbar(&uploading);
|
||||||
|
|
||||||
let chunk = uploading
|
let chunk_res = uploading.read(buffer);
|
||||||
.read(&mut self.buffer)
|
self.inner.push_file(uploading);
|
||||||
.map_err(ClientError::from)?;
|
|
||||||
|
|
||||||
|
let chunk = chunk_res?;
|
||||||
debug!("{chunk:?}");
|
debug!("{chunk:?}");
|
||||||
|
|
||||||
|
Ok(Some(chunk))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_done(&mut self) -> bool {
|
||||||
|
let Some(uploading) = self.inner.pop_file(&self.http) else {
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
|
match uploading.check_eof() {
|
||||||
|
Ok(uploading) => {
|
||||||
|
let bar = self.get_or_create_progressbar(&uploading);
|
||||||
|
bar.set_position(uploading.get_offset());
|
||||||
|
// BUG in `indicatif` crate?
|
||||||
|
// `set_position` does not force an immediate redraw, so we also call `inc_length` here
|
||||||
|
bar.inc_length(0);
|
||||||
|
drop(bar);
|
||||||
|
|
||||||
|
self.inner.push_file(uploading);
|
||||||
|
}
|
||||||
|
Err(path) => {
|
||||||
|
debug!("Finished {:?}!", path.display());
|
||||||
|
self.finish_progressbar();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.inner.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn upload_chunk(&mut self, buffer: &mut [u8]) -> sharry::Result<bool> {
|
||||||
|
let Some(chunk) = self.next_chunk(buffer)? else {
|
||||||
|
return Ok(true);
|
||||||
|
};
|
||||||
|
|
||||||
self.http.file_patch(
|
self.http.file_patch(
|
||||||
chunk.get_patch_uri(),
|
chunk.get_patch_uri(),
|
||||||
self.inner.alias_id(),
|
self.inner.alias_id(),
|
||||||
|
|
@ -138,25 +159,19 @@ impl AppState {
|
||||||
chunk.get_data(),
|
chunk.get_data(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
match uploading.check_eof() {
|
Ok(self.is_done())
|
||||||
Ok(uploading) => {
|
}
|
||||||
let bar = self.get_or_create_progressbar(&uploading);
|
|
||||||
bar.set_position(uploading.get_offset());
|
|
||||||
// BUG in `indicatif` crate?
|
|
||||||
// `set_position` does not force immediate redraw, so we also call `inc_length` here
|
|
||||||
bar.inc_length(0);
|
|
||||||
drop(bar);
|
|
||||||
|
|
||||||
|
pub fn rewind(mut self) -> Option<Self> {
|
||||||
|
let Some(uploading) = self.inner.pop_file(&self.http) else {
|
||||||
|
warn!("rewind called on empty queue");
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
let uploading = uploading.rewind()?;
|
||||||
self.inner.push_file(uploading);
|
self.inner.push_file(uploading);
|
||||||
Ok(false)
|
|
||||||
}
|
|
||||||
Err(path) => {
|
|
||||||
debug!("Finished {:?}!", path.display());
|
|
||||||
self.finish_progressbar();
|
|
||||||
|
|
||||||
Ok(self.inner.is_empty())
|
Some(self)
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn file_names(&self) -> Vec<&str> {
|
pub fn file_names(&self) -> Vec<&str> {
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@ use std::fmt;
|
||||||
|
|
||||||
pub struct Chunk<'t> {
|
pub struct Chunk<'t> {
|
||||||
data: &'t [u8],
|
data: &'t [u8],
|
||||||
patch_uri: &'t str,
|
patch_uri: String,
|
||||||
offset: u64,
|
offset: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -17,7 +17,7 @@ impl fmt::Debug for Chunk<'_> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'t> Chunk<'t> {
|
impl<'t> Chunk<'t> {
|
||||||
pub fn new(data: &'t [u8], patch_uri: &'t str, offset: u64) -> Self {
|
pub fn new(data: &'t [u8], patch_uri: String, offset: u64) -> Self {
|
||||||
Self {
|
Self {
|
||||||
data,
|
data,
|
||||||
patch_uri,
|
patch_uri,
|
||||||
|
|
@ -39,7 +39,7 @@ impl<'t> Chunk<'t> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_patch_uri(&self) -> &str {
|
pub fn get_patch_uri(&self) -> &str {
|
||||||
self.patch_uri
|
&self.patch_uri
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_offset(&self) -> u64 {
|
pub fn get_offset(&self) -> u64 {
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ use std::{
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use log::warn;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use super::{Chunk, FileTrait};
|
use super::{Chunk, FileTrait};
|
||||||
|
|
@ -13,6 +14,7 @@ pub struct Uploading {
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
size: u64,
|
size: u64,
|
||||||
patch_uri: String,
|
patch_uri: String,
|
||||||
|
last_offset: Option<u64>,
|
||||||
offset: u64,
|
offset: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -34,6 +36,7 @@ impl Uploading {
|
||||||
path,
|
path,
|
||||||
size,
|
size,
|
||||||
patch_uri,
|
patch_uri,
|
||||||
|
last_offset: None,
|
||||||
offset: 0,
|
offset: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -42,7 +45,21 @@ impl Uploading {
|
||||||
self.offset
|
self.offset
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read<'t>(&'t mut self, buf: &'t mut [u8]) -> io::Result<Chunk<'t>> {
|
pub fn rewind(self) -> Option<Self> {
|
||||||
|
match self.last_offset {
|
||||||
|
Some(last_offset) => Some(Self {
|
||||||
|
last_offset: None,
|
||||||
|
offset: last_offset,
|
||||||
|
..self
|
||||||
|
}),
|
||||||
|
None => {
|
||||||
|
warn!("attempted to rewind twice");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read<'t>(&mut self, buf: &'t mut [u8]) -> io::Result<Chunk<'t>> {
|
||||||
let mut f = fs::File::open(&self.path)?;
|
let mut f = fs::File::open(&self.path)?;
|
||||||
|
|
||||||
f.seek(SeekFrom::Start(self.offset))?;
|
f.seek(SeekFrom::Start(self.offset))?;
|
||||||
|
|
@ -55,7 +72,8 @@ impl Uploading {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunk = Chunk::new(&buf[..read_len], &self.patch_uri, self.offset);
|
let chunk = Chunk::new(&buf[..read_len], self.patch_uri.clone(), self.offset);
|
||||||
|
self.last_offset = Some(self.offset);
|
||||||
self.offset += chunk.get_length();
|
self.offset += chunk.get_length();
|
||||||
|
|
||||||
Ok(chunk)
|
Ok(chunk)
|
||||||
|
|
|
||||||
16
src/main.rs
16
src/main.rs
|
|
@ -113,9 +113,21 @@ fn main() {
|
||||||
style(state.file_names().join(", ")).magenta(),
|
style(state.file_names().join(", ")).magenta(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut buffer = vec![0; args.chunk_size * 1024 * 1024];
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match state.upload_chunk() {
|
match state.upload_chunk(&mut buffer) {
|
||||||
Err(e) => error!("error: {e:?}"), // HACK handle errors better
|
Err(e) => {
|
||||||
|
// HACK handle errors better
|
||||||
|
error!("error: {e:?}");
|
||||||
|
|
||||||
|
if let Some(s) = state.rewind() {
|
||||||
|
state = s;
|
||||||
|
} else {
|
||||||
|
eprintln!("{} Failed to retry chunk!", style("Error:").red().bold());
|
||||||
|
process::exit(1);
|
||||||
|
};
|
||||||
|
}
|
||||||
Ok(true) => {
|
Ok(true) => {
|
||||||
info!("all uploads done");
|
info!("all uploads done");
|
||||||
break;
|
break;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue