[wip] retry chunks

- move buffer into main function again, avoiding `unsafe` code
- partially revert `Chunk` changes, removing `unsafe` parts
This commit is contained in:
Jörn-Michael Miehe 2025-06-13 22:27:15 +00:00
parent f05e112040
commit 0a8e5cf3f0
4 changed files with 24 additions and 38 deletions

View file

@ -8,19 +8,15 @@ use console::style;
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::{ProgressBar, ProgressStyle};
use log::{debug, warn}; use log::{debug, warn};
use crate::file::Chunk;
use super::{ use super::{
cachefile::CacheFile, cachefile::CacheFile,
cli::Cli, cli::Cli,
file::{self, FileTrait}, file::{self, FileTrait, Chunk},
sharry::{self, Client}, sharry::{self, Client},
}; };
pub struct AppState { pub struct AppState {
progress: RefCell<Option<ProgressBar>>, progress: RefCell<Option<ProgressBar>>,
buffer: Vec<u8>,
http: ureq::Agent, http: ureq::Agent,
inner: CacheFile, inner: CacheFile,
} }
@ -41,10 +37,9 @@ fn new_http(timeout: Option<Duration>) -> ureq::Agent {
} }
impl AppState { impl AppState {
fn new(chunk_size: usize, http: ureq::Agent, inner: CacheFile) -> Self { fn new(http: ureq::Agent, inner: CacheFile) -> Self {
Self { Self {
progress: None.into(), progress: RefCell::new(None),
buffer: vec![0; chunk_size * 1024 * 1024],
http, http,
inner, inner,
} }
@ -55,11 +50,7 @@ impl AppState {
.inspect_err(|e| debug!("could not resume from hash {:?}: {e}", args.get_hash())) .inspect_err(|e| debug!("could not resume from hash {:?}: {e}", args.get_hash()))
.ok()?; .ok()?;
Some(Self::new( Some(Self::new(new_http(args.get_timeout()), inner))
args.chunk_size,
new_http(args.get_timeout()),
inner,
))
} }
pub fn from_args(args: &Cli) -> sharry::Result<Self> { pub fn from_args(args: &Cli) -> sharry::Result<Self> {
@ -71,11 +62,7 @@ impl AppState {
args.get_share_request(), args.get_share_request(),
)?; )?;
Ok(Self::new( Ok(Self::new(http, CacheFile::from_args(args, share_id)))
args.chunk_size,
http,
CacheFile::from_args(args, share_id),
))
} }
fn get_or_create_progressbar(&self, uploading: &file::Uploading) -> Ref<'_, ProgressBar> { fn get_or_create_progressbar(&self, uploading: &file::Uploading) -> Ref<'_, ProgressBar> {
@ -114,7 +101,7 @@ impl AppState {
} }
} }
fn next_chunk(&mut self) -> io::Result<Option<Chunk>> { fn next_chunk<'t>(&mut self, buffer: &'t mut [u8]) -> io::Result<Option<Chunk<'t>>> {
let Some(mut uploading) = self.inner.pop_file(&self.http) else { let Some(mut uploading) = self.inner.pop_file(&self.http) else {
self.inner self.inner
.share_notify(&self.http) .share_notify(&self.http)
@ -122,15 +109,14 @@ impl AppState {
return Ok(None); return Ok(None);
}; };
debug!("{uploading:?}"); debug!("{uploading:?}");
self.get_or_create_progressbar(&uploading); self.get_or_create_progressbar(&uploading);
let chunk = uploading.read(&mut self.buffer); let chunk_res = uploading.read(buffer);
self.inner.push_file(uploading); self.inner.push_file(uploading);
let chunk = chunk?;
let chunk = chunk_res?;
debug!("{chunk:?}"); debug!("{chunk:?}");
Ok(Some(chunk)) Ok(Some(chunk))
@ -146,7 +132,7 @@ impl AppState {
let bar = self.get_or_create_progressbar(&uploading); let bar = self.get_or_create_progressbar(&uploading);
bar.set_position(uploading.get_offset()); bar.set_position(uploading.get_offset());
// BUG in `indicatif` crate? // BUG in `indicatif` crate?
// `set_position` does not force immediate redraw, so we also call `inc_length` here // `set_position` does not force an immediate redraw, so we also call `inc_length` here
bar.inc_length(0); bar.inc_length(0);
drop(bar); drop(bar);
@ -161,8 +147,8 @@ impl AppState {
self.inner.is_empty() self.inner.is_empty()
} }
pub fn upload_chunk(&mut self) -> sharry::Result<bool> { pub fn upload_chunk(&mut self, buffer: &mut [u8]) -> sharry::Result<bool> {
let Some(chunk) = self.next_chunk()? else { let Some(chunk) = self.next_chunk(buffer)? else {
return Ok(true); return Ok(true);
}; };
@ -170,7 +156,7 @@ impl AppState {
chunk.get_patch_uri(), chunk.get_patch_uri(),
self.inner.alias_id(), self.inner.alias_id(),
chunk.get_offset(), chunk.get_offset(),
unsafe { chunk.get_data() }, chunk.get_data(),
)?; )?;
Ok(self.is_done()) Ok(self.is_done())

View file

@ -1,25 +1,23 @@
use std::fmt; use std::fmt;
pub struct Chunk { pub struct Chunk<'t> {
data: *const [u8], data: &'t [u8],
patch_uri: String, patch_uri: String,
offset: u64, offset: u64,
} }
impl fmt::Debug for Chunk { impl fmt::Debug for Chunk<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Chunk") f.debug_struct("Chunk")
.field("patch_uri", &self.patch_uri) .field("patch_uri", &self.patch_uri)
.field("offset", &self.offset) .field("offset", &self.offset)
.field("data_len", &self.data.len()) .field("data.len()", &self.data.len())
.finish_non_exhaustive() .finish_non_exhaustive()
} }
} }
impl Chunk { impl<'t> Chunk<'t> {
pub fn new(data: &[u8], patch_uri: String, offset: u64) -> Self { pub fn new(data: &'t [u8], patch_uri: String, offset: u64) -> Self {
let data: *const [u8] = data as *const [u8];
Self { Self {
data, data,
patch_uri, patch_uri,
@ -27,8 +25,8 @@ impl Chunk {
} }
} }
pub unsafe fn get_data(&self) -> &[u8] { pub fn get_data(&self) -> &[u8] {
unsafe { &*self.data } self.data
} }
pub fn get_length(&self) -> u64 { pub fn get_length(&self) -> u64 {

View file

@ -52,7 +52,7 @@ impl Uploading {
}) })
} }
pub fn read(&mut self, buf: &mut [u8]) -> io::Result<Chunk> { pub fn read<'t>(&mut self, buf: &'t mut [u8]) -> io::Result<Chunk<'t>> {
let mut f = fs::File::open(&self.path)?; let mut f = fs::File::open(&self.path)?;
f.seek(SeekFrom::Start(self.offset))?; f.seek(SeekFrom::Start(self.offset))?;

View file

@ -113,8 +113,10 @@ fn main() {
style(state.file_names().join(", ")).magenta(), style(state.file_names().join(", ")).magenta(),
); );
let mut buffer = vec![0; args.chunk_size * 1024 * 1024];
loop { loop {
match state.upload_chunk() { match state.upload_chunk(&mut buffer) {
Err(e) => error!("error: {e:?}"), // HACK handle errors better Err(e) => error!("error: {e:?}"), // HACK handle errors better
Ok(true) => { Ok(true) => {
info!("all uploads done"); info!("all uploads done");