[wip] impl Client for ureq::Agent
- Chunk implementation
This commit is contained in:
parent
d607380659
commit
dc2a330d58
6 changed files with 85 additions and 49 deletions
|
|
@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
|
|||
use super::{
|
||||
cli::Cli,
|
||||
file::{self, FileTrait},
|
||||
sharry::{self, Client, Uri},
|
||||
sharry::{self, Client, ClientError, Uri},
|
||||
};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
|
@ -23,6 +23,8 @@ pub struct AppState {
|
|||
file_name: PathBuf,
|
||||
#[serde(skip)]
|
||||
progress: Option<ProgressBar>,
|
||||
#[serde(skip)]
|
||||
buffer: Vec<u8>,
|
||||
|
||||
uri: Uri,
|
||||
alias_id: String,
|
||||
|
|
@ -94,6 +96,7 @@ impl AppState {
|
|||
Self {
|
||||
file_name,
|
||||
progress: None,
|
||||
buffer: Vec::with_capacity(args.chunk_size),
|
||||
uri: state.uri,
|
||||
alias_id: state.alias_id,
|
||||
share_id: state.share_id,
|
||||
|
|
@ -119,6 +122,7 @@ impl AppState {
|
|||
Ok(Self {
|
||||
file_name,
|
||||
progress: None,
|
||||
buffer: Vec::with_capacity(args.chunk_size),
|
||||
uri,
|
||||
alias_id,
|
||||
share_id,
|
||||
|
|
@ -130,18 +134,16 @@ impl AppState {
|
|||
self.files.iter().map(FileState::file_name).collect()
|
||||
}
|
||||
|
||||
pub fn upload_chunk(
|
||||
&mut self,
|
||||
http: &ureq::Agent,
|
||||
chunk_size: usize,
|
||||
) -> sharry::Result<Option<()>> {
|
||||
let uploading = if let Some(state) = self.files.pop_front() {
|
||||
state.start_upload(http, &self.uri, &self.alias_id, &self.share_id)?
|
||||
pub fn upload_chunk(&mut self, http: &impl Client) -> sharry::Result<Option<()>> {
|
||||
let mut uploading = if let Some(state) = self.files.pop_front() {
|
||||
state
|
||||
.start_upload(http, &self.uri, &self.alias_id, &self.share_id)
|
||||
.unwrap() // HACK unwrap
|
||||
} else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
debug!("{uploading} chunk {chunk_size}");
|
||||
debug!("{uploading} chunk {}", self.buffer.len());
|
||||
|
||||
// Initialize or fetch the existing ProgressBar in one call:
|
||||
let bar = &*self.progress.get_or_insert_with(|| {
|
||||
|
|
@ -165,21 +167,35 @@ impl AppState {
|
|||
bar
|
||||
});
|
||||
|
||||
match uploading.upload_chunk(http, &self.alias_id, chunk_size) {
|
||||
ChunkState::Ok(upl) => {
|
||||
bar.set_position(upl.get_offset());
|
||||
self.files.push_front(FileState::U(upl));
|
||||
let chunk = uploading
|
||||
.read(&mut self.buffer)
|
||||
.map_err(ClientError::req_err)?;
|
||||
if chunk.get_length() == 0 {
|
||||
return Err(ClientError::req_err("wtf"));
|
||||
}
|
||||
|
||||
http.file_patch(
|
||||
chunk.get_patch_uri(),
|
||||
&self.alias_id,
|
||||
chunk.get_offset(),
|
||||
chunk.get_data(),
|
||||
)?;
|
||||
|
||||
match uploading.check_eof() {
|
||||
Ok(uploading) => {
|
||||
bar.set_position(uploading.get_offset());
|
||||
self.files.push_front(FileState::U(uploading));
|
||||
Ok(Some(()))
|
||||
}
|
||||
ChunkState::Err(upl, e) => {
|
||||
self.files.push_front(FileState::U(upl));
|
||||
Err(e)
|
||||
}
|
||||
ChunkState::Finished(path) => {
|
||||
Err(path) => {
|
||||
debug!("Finished {:?}!", path.display());
|
||||
bar.finish();
|
||||
self.progress = None;
|
||||
self.share_id.notify(http, &self.alias_id).unwrap(); // HACK unwrap
|
||||
|
||||
let endpoint = self
|
||||
.uri
|
||||
.endpoint(format!("alias/mail/notify/{}", self.share_id));
|
||||
http.share_notify(&endpoint, &self.alias_id).unwrap(); // HACK unwrap
|
||||
|
||||
Ok(self.files.front().map(drop))
|
||||
}
|
||||
|
|
|
|||
36
src/file/chunk.rs
Normal file
36
src/file/chunk.rs
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
pub struct Chunk<'t> {
|
||||
data: &'t [u8],
|
||||
patch_uri: &'t str,
|
||||
offset: u64,
|
||||
}
|
||||
|
||||
impl<'t> Chunk<'t> {
|
||||
pub fn new(data: &'t [u8], patch_uri: &'t str, offset: u64) -> Self {
|
||||
Self {
|
||||
data,
|
||||
patch_uri,
|
||||
offset,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_data(&self) -> &[u8] {
|
||||
self.data
|
||||
}
|
||||
|
||||
pub fn get_length(&self) -> u64 {
|
||||
let len = self.data.len();
|
||||
|
||||
// BOOKMARK this might **panic** on platforms where `usize` has more than 64 bit.
|
||||
// Also, you've allocated more than 2 EiB ... in ONE chunk.
|
||||
// Whoa! Maybe just chill?
|
||||
u64::try_from(len).unwrap_or_else(|e| panic!("usize={} did not fit into u64: {}", len, e))
|
||||
}
|
||||
|
||||
pub fn get_patch_uri(&self) -> &str {
|
||||
self.patch_uri
|
||||
}
|
||||
|
||||
pub fn get_offset(&self) -> u64 {
|
||||
self.offset
|
||||
}
|
||||
}
|
||||
|
|
@ -1,9 +1,11 @@
|
|||
mod checked;
|
||||
mod chunk;
|
||||
mod uploading;
|
||||
|
||||
use std::{ffi::OsStr, path::Path};
|
||||
|
||||
pub use checked::Checked;
|
||||
pub use chunk::Chunk;
|
||||
pub use uploading::Uploading;
|
||||
|
||||
pub trait FileTrait<'t> {
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ use std::{
|
|||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::FileTrait;
|
||||
use super::{Chunk, FileTrait};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct Uploading {
|
||||
|
|
@ -38,31 +38,20 @@ impl Uploading {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn get_patch_uri(&self) -> &str {
|
||||
&self.patch_uri
|
||||
}
|
||||
|
||||
pub fn get_offset(&self) -> u64 {
|
||||
self.offset
|
||||
}
|
||||
|
||||
pub fn read(&mut self, buf: &mut [u8]) -> io::Result<u64> {
|
||||
pub fn read<'t>(&'t mut self, buf: &'t mut [u8]) -> io::Result<Chunk<'t>> {
|
||||
let mut f = fs::File::open(&self.path)?;
|
||||
|
||||
f.seek(SeekFrom::Start(self.offset))?;
|
||||
let read_len = f.read(buf)?;
|
||||
|
||||
// convert into `u64`
|
||||
//
|
||||
// BOOKMARK this might **panic** on platforms where `usize` has more than 64 bit.
|
||||
// Also, you're reading more than 2 EiB ... in ONE chunk.
|
||||
// Whoa! Maybe just chill?
|
||||
let read_len = u64::try_from(read_len)
|
||||
.unwrap_or_else(|e| panic!("usize={} did not fit into u64: {}", read_len, e));
|
||||
let chunk = Chunk::new(&buf[..read_len], &self.patch_uri, self.offset);
|
||||
self.offset += chunk.get_length();
|
||||
|
||||
self.offset += read_len;
|
||||
|
||||
Ok(read_len)
|
||||
Ok(chunk)
|
||||
}
|
||||
|
||||
pub fn check_eof(self) -> Result<Self, PathBuf> {
|
||||
|
|
|
|||
|
|
@ -99,7 +99,7 @@ fn main() {
|
|||
info!("continuing with state: {state:?}");
|
||||
|
||||
loop {
|
||||
match state.upload_chunk(&agent, args.chunk_size * 1024 * 1024) {
|
||||
match state.upload_chunk(&agent) {
|
||||
Err(e) => error!("error: {e:?}"),
|
||||
Ok(None) => {
|
||||
info!("all uploads done");
|
||||
|
|
|
|||
|
|
@ -21,8 +21,7 @@ pub trait Client {
|
|||
file_size: u64,
|
||||
) -> Result<String>;
|
||||
|
||||
fn file_patch(&self, patch_uri: &str, alias_id: &str, offset: u64, chunk: &[u8])
|
||||
-> Result<u64>;
|
||||
fn file_patch(&self, patch_uri: &str, alias_id: &str, offset: u64, chunk: &[u8]) -> Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
|
@ -41,15 +40,15 @@ pub enum ClientError {
|
|||
}
|
||||
|
||||
impl ClientError {
|
||||
fn req_err(msg: impl fmt::Display) -> Self {
|
||||
pub fn req_err(msg: impl fmt::Display) -> Self {
|
||||
Self::Request(msg.to_string())
|
||||
}
|
||||
|
||||
fn res_parse_err(msg: impl fmt::Display) -> Self {
|
||||
pub fn res_parse_err(msg: impl fmt::Display) -> Self {
|
||||
Self::ResponseParsing(msg.to_string())
|
||||
}
|
||||
|
||||
fn res_check_status<T>(actual: T, expected: T) -> Result<()>
|
||||
pub fn res_check_status<T>(actual: T, expected: T) -> Result<()>
|
||||
where
|
||||
T: Into<u16> + Eq,
|
||||
{
|
||||
|
|
@ -149,13 +148,7 @@ impl Client for ureq::Agent {
|
|||
Ok(location)
|
||||
}
|
||||
|
||||
fn file_patch(
|
||||
&self,
|
||||
patch_uri: &str,
|
||||
alias_id: &str,
|
||||
offset: u64,
|
||||
chunk: &[u8],
|
||||
) -> Result<u64> {
|
||||
fn file_patch(&self, patch_uri: &str, alias_id: &str, offset: u64, chunk: &[u8]) -> Result<()> {
|
||||
let res = self
|
||||
.patch(patch_uri)
|
||||
.header("Sharry-Alias", alias_id)
|
||||
|
|
@ -177,7 +170,7 @@ impl Client for ureq::Agent {
|
|||
let chunk_len = u64::try_from(chunk.len()).expect("something's VERY wrong");
|
||||
|
||||
if offset + chunk_len == res_offset {
|
||||
Ok(res_offset)
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ClientError::ResponseContent(format!(
|
||||
"Unexpected Upload-Offset: {} (expected {})",
|
||||
|
|
|
|||
Loading…
Reference in a new issue