Skip to content

Commit

Permalink
Merge pull request #5 from p4ymak/file_abort
Browse files Browse the repository at this point in the history
File abort
  • Loading branch information
p4ymak authored Sep 17, 2024
2 parents 5cfe032 + e74402c commit 7cfb692
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "roomor"
version = "0.3.0"
version = "0.3.2"
edition = "2021"
authors = ["Roman Chumak <p4ymak@yandex.ru>"]
description = "Minimalistic offline chat over local network."
Expand Down Expand Up @@ -51,7 +51,7 @@ strip = "debuginfo"
name = "Roomor"
identifier = "com.p4ymak.roomor"
icon = ["icon/128x128.png"]
version = "0.3.0"
version = "0.3.2"
short_description = "Minimalistic offline chat over local network."
long_description = "Minimalistic offline chat over local network."
deb_depends = ["libgl1-mesa-glx", "libsdl2-2.0-0 (>= 2.0.5)"]
Expand Down
7 changes: 6 additions & 1 deletion src/app/rooms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,9 @@ impl TextMessage {
ui.label(&link.name);
ui.label(human_bytes(link.size as f64));
let width = ui.min_rect().width();
if link.is_ready() {
if link.is_aborted() && !link.is_ready() {
ui.label("(X_X)");
} else if link.is_ready() {
#[cfg(debug_assertions)]
{
let bandwidth = link.bandwidth();
Expand All @@ -673,6 +675,9 @@ impl TextMessage {
.desired_width(width)
.show_percentage(),
);
if ui.link("Cancel").clicked() {
link.abort();
}
}
}
_ => (),
Expand Down
14 changes: 13 additions & 1 deletion src/chat/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct FileLink {
pub count: ShardCount,
pub completed: AtomicU64,
pub is_ready: AtomicBool,
pub is_aborted: AtomicBool,
}

impl FileLink {
Expand All @@ -45,6 +46,7 @@ impl FileLink {
count,
completed: AtomicU64::new(0),
is_ready: AtomicBool::new(false),
is_aborted: AtomicBool::new(false),
}
}

Expand All @@ -61,6 +63,7 @@ impl FileLink {
count: size.div_ceil(DATA_LIMIT_BYTES as ShardCount),
completed: AtomicU64::new(0),
is_ready: AtomicBool::new(false),
is_aborted: AtomicBool::new(false),
})
}
pub fn id(&self) -> Id {
Expand All @@ -85,10 +88,16 @@ impl FileLink {
count: size.div_ceil(DATA_LIMIT_BYTES as ShardCount),
completed: AtomicU64::new(0),
is_ready: AtomicBool::new(false),
is_aborted: AtomicBool::new(false),
})
}
pub fn progress(&self) -> f32 {
self.completed.load(std::sync::atomic::Ordering::Relaxed) as f32 / self.count as f32
(self.completed.load(std::sync::atomic::Ordering::Relaxed) as f32 / self.count as f32)
.min(0.99)
}
pub fn abort(&self) {
self.is_aborted
.store(true, std::sync::atomic::Ordering::Relaxed);
}
pub fn set_ready(&self) {
self.is_ready
Expand All @@ -105,6 +114,9 @@ impl FileLink {
self.bandwidth
.store(bandwidth, std::sync::atomic::Ordering::Relaxed);
}
pub fn is_aborted(&self) -> bool {
self.is_aborted.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn is_ready(&self) -> bool {
self.is_ready.load(std::sync::atomic::Ordering::Relaxed)
}
Expand Down
38 changes: 13 additions & 25 deletions src/chat/inbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::{
};

pub type Shard = Vec<u8>;
// pub const MAX_ATTEMPTS: u8 = 10;

#[derive(Default)]
pub struct Inbox(BTreeMap<Id, InMessage>);
Expand All @@ -27,7 +28,7 @@ impl Inbox {
!(SystemTime::now()
.duration_since(msg.ts)
.is_ok_and(|d| d > delta)
&& msg.combine(networker, ctx).is_ok())
&& (msg.combine(networker, ctx).is_ok())) // || msg.attempt < MAX_ATTEMPTS))
});
}
pub fn insert(&mut self, id: Id, msg: InMessage) {
Expand All @@ -51,6 +52,7 @@ pub struct InMessage {
pub link: Arc<FileLink>,
pub terminal: ShardCount,
pub shards: Vec<Option<Shard>>,
pub attempt: u8,
}
impl InMessage {
pub fn new(ip: Ipv4Addr, msg: UdpMessage, downloads_path: &Path) -> Option<Self> {
Expand All @@ -73,6 +75,7 @@ impl InMessage {
link: Arc::new(link),
terminal: init.count().saturating_sub(1),
shards: vec![None; init.count() as usize],
attempt: 0,
})
} else {
None
Expand All @@ -99,27 +102,8 @@ impl InMessage {
warn!("Received terminal {position}");
return self.combine(networker, ctx).is_ok();
}
// if self.shards.get(position.saturating_sub(1) as usize) == Some(None).as_ref() {
// if let Some(range) = self.find_missing_back(position) {
// networker
// .send(
// UdpMessage::ask_to_repeat(self.id, Part::AskRange(range)),
// Recepients::One(self.sender),
// )
// .ok();
// }
// }
false
}
// pub fn find_missing_back(&self, pos: ShardCount) -> Option<RangeInclusive<ShardCount>> {
// self.shards
// .iter()
// .enumerate()
// .rev()
// .skip(self.shards.len().saturating_sub(pos as usize))
// .find(|x| x.1.is_some())
// .map(|x| (x.0 + 1) as ShardCount..=pos)
// }

pub fn combine(
&mut self,
Expand Down Expand Up @@ -166,20 +150,24 @@ impl InMessage {
debug!("Writing new file to {path:?}");
fs::write(path, data)?;
self.link.set_ready();
// self.link
// .is_ready
// .store(true, std::sync::atomic::Ordering::Relaxed);
ctx.request_repaint();
Ok(())
}
_ => Ok(()),
}
} else {
error!("Shards missing!");
self.terminal = missed
let terminal = missed
.last()
.map(|l| *l.end())
.unwrap_or(self.link.count.saturating_sub(1));
if terminal == self.terminal {
self.attempt = self.attempt.saturating_add(1);
warn!("New attempt: {}", self.attempt);
} else {
self.terminal = terminal;
warn!("New terminal: {}", self.terminal);
}
// TODO save outbox
if !matches!(
networker.peers.online_status(Recepients::One(self.sender)),
Expand All @@ -195,7 +183,7 @@ impl InMessage {
.ok();
});
}
warn!("New terminal: {}", self.terminal);

Err("Missing Shards".into())
}
}
Expand Down
12 changes: 11 additions & 1 deletion src/chat/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub enum Command {
Exit,
Seen,
Error,
Abort,
}
impl Command {
pub fn to_code(self) -> u8 {
Expand Down Expand Up @@ -146,7 +147,16 @@ impl UdpMessage {
data: vec![],
}
}

pub fn abort(id: Id) -> Self {
UdpMessage {
id,
public: false,
part: Part::Single,
checksum: 0,
command: Command::Abort,
data: vec![],
}
}
pub fn ask_to_repeat(id: Id, part: Part) -> Self {
UdpMessage {
id,
Expand Down
98 changes: 71 additions & 27 deletions src/chat/networker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl NetWorker {
self.send(UdpMessage::exit(), Recepients::All)
.inspect_err(|e| error!("{e}"))
.ok();

return ControlFlow::Break(());
}
}
Expand Down Expand Up @@ -193,6 +194,7 @@ impl NetWorker {
Command::Exit => {
self.handle_back_event(BackEvent::PeerLeft(r_ip), ctx);
}

Command::Text | Command::File | Command::Repeat => match r_msg.part {
message::Part::Single => {
let txt_msg = TextMessage::from_udp(r_ip, &r_msg, true);
Expand All @@ -219,28 +221,53 @@ impl NetWorker {
}
}
message::Part::Shard(count) => {
let mut completed = false;
if let Some(inmsg) = inbox.get_mut(&r_msg.id) {
completed = inmsg.insert(count, r_msg, self, ctx);
let mut is_completed = false;
let mut is_aborted = false;
if let Some(inmsg) = inbox.get_mut(&r_id) {
is_aborted = inmsg.link.is_aborted();
if is_aborted {
self.send(UdpMessage::abort(r_id), Recepients::One(r_ip))
.inspect_err(|e| error!("{e}"))
.ok();
} else {
is_completed = inmsg.insert(count, r_msg, self, ctx);
}
} else {
// FIXME
self.send(UdpMessage::abort(r_id), Recepients::One(r_ip))
.inspect_err(|e| error!("{e}"))
.ok();
}

if completed {
if is_aborted {
inbox.remove(&r_id);
}
if is_completed {
inbox.remove(&r_id);
self.send(UdpMessage::seen_id(r_id, false), Recepients::One(r_ip))
.inspect_err(|e| error!("{e}"))
.ok();
}
}

_ => (),
},

Command::Seen => {
let txt_msg = TextMessage::from_udp(r_ip, &r_msg, true);
self.incoming(r_ip);
outbox.remove(r_ip, txt_msg.id());
self.handle_back_event(BackEvent::Message(txt_msg), ctx);
}
Command::Abort => {
debug!("ABORTING! {}", r_id);
if let Some(msg) = inbox.get_mut(&r_id) {
msg.link.abort();
}
inbox.remove(&r_id);
if let Some(link) = outbox.files.get(&r_id) {
link.abort();
}
outbox.files.remove(&r_id);
}
Command::Error => {
self.incoming(r_ip);
self.send(
Expand All @@ -257,32 +284,46 @@ impl NetWorker {

Command::AskToRepeat => {
self.incoming(r_ip);
let id = r_msg.id;
debug!("Asked to repeat {id}, part: {:?}", r_msg.part);
debug!("Asked to repeat {r_id}, part: {:?}", r_msg.part);
// Resend my Name
if id == 0 {
if r_id == 0 {
self.send(UdpMessage::greating(&self.name), Recepients::One(r_ip))
.inspect_err(|e| error!("{e}"))
.ok();
} else if let message::Part::AskRange(range) = &r_msg.part {
let msg_text = r_msg.read_text();
debug!("{msg_text}");
if let Some(link) = outbox.files.get(&id) {
let completed = link.completed.load(std::sync::atomic::Ordering::Relaxed);
link.completed.store(
completed.saturating_sub(range.clone().count() as ShardCount),
std::sync::atomic::Ordering::Relaxed,
);
debug!("sending shards {range:?}");
self.send_shards(
link.clone(),
range.to_owned(),
r_msg.id,
Recepients::One(r_ip),
ctx,
);
let mut is_aborted = false;
if let Some(link) = outbox.files.get(&r_id) {
is_aborted = link.is_aborted();
if !is_aborted {
let completed =
link.completed.load(std::sync::atomic::Ordering::Relaxed);
link.completed.store(
completed.saturating_sub(range.clone().count() as ShardCount),
std::sync::atomic::Ordering::Relaxed,
);
debug!("sending shards {range:?}");
self.send_shards(
link.clone(),
range.to_owned(),
r_id,
Recepients::One(r_ip),
ctx,
);
}
} else {
self.send(UdpMessage::abort(r_id), Recepients::One(r_ip))
.inspect_err(|e| error!("{e}"))
.ok();
error!("File not found. Aborting transmission!");
}
} else if let Some(message) = outbox.get(r_ip, id) {
if is_aborted {
self.send(UdpMessage::abort(r_id), Recepients::One(r_ip))
.inspect_err(|e| error!("{e}"))
.ok();
debug!("Transmition aborted by user.");
outbox.files.remove(&r_id);
}
} else if let Some(message) = outbox.get(r_ip, r_id) {
debug!("Message found..");

let mut message = message.clone();
Expand All @@ -291,7 +332,10 @@ impl NetWorker {
.inspect_err(|e| error!("{e}"))
.ok();
} else {
error!("Message not found!");
self.send(UdpMessage::abort(r_id), Recepients::One(r_ip))
.inspect_err(|e| error!("{e}"))
.ok();
error!("Message not found. Aborting transmission!");
}
} // Command::File => todo!(),
}
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ fn main() -> Result<(), eframe::Error> {
let options = eframe::NativeOptions {
viewport: egui::ViewportBuilder::default()
.with_icon(icon)
.with_taskbar(true)
.with_app_id("roomor")
.with_decorations(true)
.with_transparent(false)
.with_resizable(true)
Expand Down

0 comments on commit 7cfb692

Please sign in to comment.