Compare commits

...

2 commits

Author SHA1 Message Date
0993679641 [wip] impl Client for ureq::Agent
- clippy fix
2025-06-11 00:06:08 +00:00
dc2a330d58 [wip] impl Client for ureq::Agent
- Chunk implementation
2025-06-10 23:39:20 +00:00
7 changed files with 89 additions and 53 deletions

View file

@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
use super::{ use super::{
cli::Cli, cli::Cli,
file::{self, FileTrait}, file::{self, FileTrait},
sharry::{self, Client, Uri}, sharry::{self, Client, ClientError, Uri},
}; };
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -23,6 +23,8 @@ pub struct AppState {
file_name: PathBuf, file_name: PathBuf,
#[serde(skip)] #[serde(skip)]
progress: Option<ProgressBar>, progress: Option<ProgressBar>,
#[serde(skip)]
buffer: Vec<u8>,
uri: Uri, uri: Uri,
alias_id: String, alias_id: String,
@ -53,7 +55,7 @@ impl FileState {
) -> sharry::Result<file::Uploading> { ) -> sharry::Result<file::Uploading> {
match self { match self {
FileState::C(checked) => { FileState::C(checked) => {
let endpoint = &uri.endpoint(format!("alias/upload/{}/files/tus", share_id)); let endpoint = &uri.endpoint(format!("alias/upload/{share_id}/files/tus"));
checked.start_upload(http, endpoint, alias_id) checked.start_upload(http, endpoint, alias_id)
} }
FileState::U(uploading) => Ok(uploading), FileState::U(uploading) => Ok(uploading),
@ -94,6 +96,7 @@ impl AppState {
Self { Self {
file_name, file_name,
progress: None, progress: None,
buffer: Vec::with_capacity(args.chunk_size),
uri: state.uri, uri: state.uri,
alias_id: state.alias_id, alias_id: state.alias_id,
share_id: state.share_id, share_id: state.share_id,
@ -119,6 +122,7 @@ impl AppState {
Ok(Self { Ok(Self {
file_name, file_name,
progress: None, progress: None,
buffer: Vec::with_capacity(args.chunk_size),
uri, uri,
alias_id, alias_id,
share_id, share_id,
@ -130,18 +134,16 @@ impl AppState {
self.files.iter().map(FileState::file_name).collect() self.files.iter().map(FileState::file_name).collect()
} }
pub fn upload_chunk( pub fn upload_chunk(&mut self, http: &impl Client) -> sharry::Result<Option<()>> {
&mut self, let mut uploading = if let Some(state) = self.files.pop_front() {
http: &ureq::Agent, state
chunk_size: usize, .start_upload(http, &self.uri, &self.alias_id, &self.share_id)
) -> sharry::Result<Option<()>> { .unwrap() // HACK unwrap
let uploading = if let Some(state) = self.files.pop_front() {
state.start_upload(http, &self.uri, &self.alias_id, &self.share_id)?
} else { } else {
return Ok(None); return Ok(None);
}; };
debug!("{uploading} chunk {chunk_size}"); debug!("{uploading} chunk {}", self.buffer.len());
// Initialize or fetch the existing ProgressBar in one call: // Initialize or fetch the existing ProgressBar in one call:
let bar = &*self.progress.get_or_insert_with(|| { let bar = &*self.progress.get_or_insert_with(|| {
@ -165,21 +167,35 @@ impl AppState {
bar bar
}); });
match uploading.upload_chunk(http, &self.alias_id, chunk_size) { let chunk = uploading
ChunkState::Ok(upl) => { .read(&mut self.buffer)
bar.set_position(upl.get_offset()); .map_err(ClientError::req_err)?;
self.files.push_front(FileState::U(upl)); if chunk.get_length() == 0 {
return Err(ClientError::req_err("wtf"));
}
http.file_patch(
chunk.get_patch_uri(),
&self.alias_id,
chunk.get_offset(),
chunk.get_data(),
)?;
match uploading.check_eof() {
Ok(uploading) => {
bar.set_position(uploading.get_offset());
self.files.push_front(FileState::U(uploading));
Ok(Some(())) Ok(Some(()))
} }
ChunkState::Err(upl, e) => { Err(path) => {
self.files.push_front(FileState::U(upl));
Err(e)
}
ChunkState::Finished(path) => {
debug!("Finished {:?}!", path.display()); debug!("Finished {:?}!", path.display());
bar.finish(); bar.finish();
self.progress = None; self.progress = None;
self.share_id.notify(http, &self.alias_id).unwrap(); // HACK unwrap
let endpoint = self
.uri
.endpoint(format!("alias/mail/notify/{}", self.share_id));
http.share_notify(&endpoint, &self.alias_id).unwrap(); // HACK unwrap
Ok(self.files.front().map(drop)) Ok(self.files.front().map(drop))
} }

36
src/file/chunk.rs Normal file
View file

@ -0,0 +1,36 @@
pub struct Chunk<'t> {
data: &'t [u8],
patch_uri: &'t str,
offset: u64,
}
impl<'t> Chunk<'t> {
pub fn new(data: &'t [u8], patch_uri: &'t str, offset: u64) -> Self {
Self {
data,
patch_uri,
offset,
}
}
pub fn get_data(&self) -> &[u8] {
self.data
}
pub fn get_length(&self) -> u64 {
let len = self.data.len();
// BOOKMARK this might **panic** on platforms where `usize` has more than 64 bit.
// Also, you've allocated more than 2 EiB ... in ONE chunk.
// Whoa! Maybe just chill?
u64::try_from(len).unwrap_or_else(|e| panic!("usize={len} did not fit into u64: {e}"))
}
pub fn get_patch_uri(&self) -> &str {
self.patch_uri
}
pub fn get_offset(&self) -> u64 {
self.offset
}
}

View file

@ -1,9 +1,11 @@
mod checked; mod checked;
mod chunk;
mod uploading; mod uploading;
use std::{ffi::OsStr, path::Path}; use std::{ffi::OsStr, path::Path};
pub use checked::Checked; pub use checked::Checked;
pub use chunk::Chunk;
pub use uploading::Uploading; pub use uploading::Uploading;
pub trait FileTrait<'t> { pub trait FileTrait<'t> {

View file

@ -6,7 +6,7 @@ use std::{
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::FileTrait; use super::{Chunk, FileTrait};
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct Uploading { pub struct Uploading {
@ -38,31 +38,20 @@ impl Uploading {
} }
} }
pub fn get_patch_uri(&self) -> &str {
&self.patch_uri
}
pub fn get_offset(&self) -> u64 { pub fn get_offset(&self) -> u64 {
self.offset self.offset
} }
pub fn read(&mut self, buf: &mut [u8]) -> io::Result<u64> { pub fn read<'t>(&'t mut self, buf: &'t mut [u8]) -> io::Result<Chunk<'t>> {
let mut f = fs::File::open(&self.path)?; let mut f = fs::File::open(&self.path)?;
f.seek(SeekFrom::Start(self.offset))?; f.seek(SeekFrom::Start(self.offset))?;
let read_len = f.read(buf)?; let read_len = f.read(buf)?;
// convert into `u64` let chunk = Chunk::new(&buf[..read_len], &self.patch_uri, self.offset);
// self.offset += chunk.get_length();
// BOOKMARK this might **panic** on platforms where `usize` has more than 64 bit.
// Also, you're reading more than 2 EiB ... in ONE chunk.
// Whoa! Maybe just chill?
let read_len = u64::try_from(read_len)
.unwrap_or_else(|e| panic!("usize={} did not fit into u64: {}", read_len, e));
self.offset += read_len; Ok(chunk)
Ok(read_len)
} }
pub fn check_eof(self) -> Result<Self, PathBuf> { pub fn check_eof(self) -> Result<Self, PathBuf> {

View file

@ -99,7 +99,7 @@ fn main() {
info!("continuing with state: {state:?}"); info!("continuing with state: {state:?}");
loop { loop {
match state.upload_chunk(&agent, args.chunk_size * 1024 * 1024) { match state.upload_chunk(&agent) {
Err(e) => error!("error: {e:?}"), Err(e) => error!("error: {e:?}"),
Ok(None) => { Ok(None) => {
info!("all uploads done"); info!("all uploads done");

View file

@ -18,7 +18,7 @@ impl Uri {
} }
pub fn endpoint(&self, endpoint: impl fmt::Display) -> String { pub fn endpoint(&self, endpoint: impl fmt::Display) -> String {
let uri = format!("{}/{}", self, endpoint); let uri = format!("{self}/{endpoint}");
debug!("endpoint: {uri:?}"); debug!("endpoint: {uri:?}");
uri uri

View file

@ -21,8 +21,7 @@ pub trait Client {
file_size: u64, file_size: u64,
) -> Result<String>; ) -> Result<String>;
fn file_patch(&self, patch_uri: &str, alias_id: &str, offset: u64, chunk: &[u8]) fn file_patch(&self, patch_uri: &str, alias_id: &str, offset: u64, chunk: &[u8]) -> Result<()>;
-> Result<u64>;
} }
#[derive(Debug, Error)] #[derive(Debug, Error)]
@ -41,15 +40,15 @@ pub enum ClientError {
} }
impl ClientError { impl ClientError {
fn req_err(msg: impl fmt::Display) -> Self { pub fn req_err(msg: impl fmt::Display) -> Self {
Self::Request(msg.to_string()) Self::Request(msg.to_string())
} }
fn res_parse_err(msg: impl fmt::Display) -> Self { pub fn res_parse_err(msg: impl fmt::Display) -> Self {
Self::ResponseParsing(msg.to_string()) Self::ResponseParsing(msg.to_string())
} }
fn res_check_status<T>(actual: T, expected: T) -> Result<()> pub fn res_check_status<T>(actual: T, expected: T) -> Result<()>
where where
T: Into<u16> + Eq, T: Into<u16> + Eq,
{ {
@ -103,7 +102,7 @@ impl Client for ureq::Agent {
.post(endpoint) .post(endpoint)
.header("Sharry-Alias", alias_id) .header("Sharry-Alias", alias_id)
.send_empty() .send_empty()
.map_err(|e| ClientError::req_err(e))?; .map_err(ClientError::req_err)?;
trace!("{endpoint:?} response: {res:?}"); trace!("{endpoint:?} response: {res:?}");
ClientError::res_check_status(res.status(), ureq::http::StatusCode::OK)?; ClientError::res_check_status(res.status(), ureq::http::StatusCode::OK)?;
@ -111,7 +110,7 @@ impl Client for ureq::Agent {
let res = res let res = res
.body_mut() .body_mut()
.read_json::<NotifyShareResponse>() .read_json::<NotifyShareResponse>()
.map_err(|e| ClientError::res_parse_err(e))?; .map_err(ClientError::res_parse_err)?;
debug!("{res:?}"); debug!("{res:?}");
@ -149,13 +148,7 @@ impl Client for ureq::Agent {
Ok(location) Ok(location)
} }
fn file_patch( fn file_patch(&self, patch_uri: &str, alias_id: &str, offset: u64, chunk: &[u8]) -> Result<()> {
&self,
patch_uri: &str,
alias_id: &str,
offset: u64,
chunk: &[u8],
) -> Result<u64> {
let res = self let res = self
.patch(patch_uri) .patch(patch_uri)
.header("Sharry-Alias", alias_id) .header("Sharry-Alias", alias_id)
@ -177,7 +170,7 @@ impl Client for ureq::Agent {
let chunk_len = u64::try_from(chunk.len()).expect("something's VERY wrong"); let chunk_len = u64::try_from(chunk.len()).expect("something's VERY wrong");
if offset + chunk_len == res_offset { if offset + chunk_len == res_offset {
Ok(res_offset) Ok(())
} else { } else {
Err(ClientError::ResponseContent(format!( Err(ClientError::ResponseContent(format!(
"Unexpected Upload-Offset: {} (expected {})", "Unexpected Upload-Offset: {} (expected {})",