Skip to content

Commit

Permalink
Expose QueueEmpty and Contacts (#293)
Browse files Browse the repository at this point in the history
* Expose QueueEmpty and Contacts
* Adapt CLI contacts synchronization command
  • Loading branch information
gferon authored Jan 2, 2025
1 parent 3017e6a commit abf00d1
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 130 deletions.
133 changes: 92 additions & 41 deletions presage-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::convert::TryInto;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use std::time::UNIX_EPOCH;

use anyhow::{anyhow, bail, Context as _};
Expand All @@ -24,10 +25,10 @@ use presage::libsignal_service::proto::sync_message::Sent;
use presage::libsignal_service::protocol::ServiceId;
use presage::libsignal_service::sender::AttachmentSpec;
use presage::libsignal_service::zkgroup::GroupMasterKeyBytes;
use presage::manager::ReceivingMode;
use presage::model::contacts::Contact;
use presage::model::groups::Group;
use presage::model::identity::OnNewIdentity;
use presage::model::messages::Received;
use presage::proto::receipt_message;
use presage::proto::EditMessage;
use presage::proto::ReceiptMessage;
Expand All @@ -42,7 +43,7 @@ use presage::{
use presage_store_sled::MigrationConflictStrategy;
use presage_store_sled::SledStore;
use tempfile::Builder;
use tokio::task;
use tempfile::TempDir;
use tokio::{
fs,
io::{self, AsyncBufReadExt, BufReader},
Expand Down Expand Up @@ -191,7 +192,7 @@ enum Cmd {
#[clap(long = "attach", help = "Path to a file to attach, can be repeated")]
attachment_filepath: Vec<PathBuf>,
},
RequestContactsSync,
SyncContacts,
#[clap(about = "Print various statistics useful for debugging")]
Stats,
}
Expand All @@ -208,6 +209,15 @@ fn parse_group_master_key(value: &str) -> anyhow::Result<GroupMasterKeyBytes> {
.map_err(|_| anyhow::format_err!("master key should be 32 bytes long"))
}

fn attachments_tmp_dir() -> anyhow::Result<TempDir> {
let attachments_tmp_dir = Builder::new().prefix("presage-attachments").tempdir()?;
info!(
path =% attachments_tmp_dir.path().display(),
"attachments will be stored"
);
Ok(attachments_tmp_dir)
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(
Expand Down Expand Up @@ -239,7 +249,7 @@ async fn send<S: Store>(
recipient: Recipient,
msg: impl Into<ContentBody>,
) -> anyhow::Result<()> {
let local = task::LocalSet::new();
let attachments_tmp_dir = attachments_tmp_dir()?;

let timestamp = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand All @@ -251,33 +261,52 @@ async fn send<S: Store>(
d.timestamp = Some(timestamp);
}

local
.run_until(async move {
let mut receiving_manager = manager.clone();
task::spawn_local(async move {
if let Err(error) = receive(&mut receiving_manager, false).await {
error!(%error, "error while receiving stuff");
}
});

match recipient {
Recipient::Contact(uuid) => {
info!(recipient =% uuid, "sending message to contact");
manager
.send_message(ServiceId::Aci(uuid.into()), content_body, timestamp)
.await
.expect("failed to send message");
}
Recipient::Group(master_key) => {
info!("sending message to group");
manager
.send_message_to_group(&master_key, content_body, timestamp)
.await
.expect("failed to send message");
}
let messages = manager
.receive_messages()
.await
.context("failed to initialize messages stream")?;
pin_mut!(messages);

println!("synchronizing messages since last time");

while let Some(content) = messages.next().await {
match content {
Received::QueueEmpty => break,
Received::Contacts => continue,
Received::Content(content) => {
process_incoming_message(manager, attachments_tmp_dir.path(), false, &content).await
}
})
.await;
}
}

println!("done synchronizing, sending your message now!");

match recipient {
Recipient::Contact(uuid) => {
info!(recipient =% uuid, "sending message to contact");
manager
.send_message(ServiceId::Aci(uuid.into()), content_body, timestamp)
.await
.expect("failed to send message");
}
Recipient::Group(master_key) => {
info!("sending message to group");
manager
.send_message_to_group(&master_key, content_body, timestamp)
.await
.expect("failed to send message");
}
}

tokio::time::timeout(Duration::from_secs(60), async move {
while let Some(msg) = messages.next().await {
if let Received::Contacts = msg {
println!("got contacts sync!");
break;
}
}
})
.await?;

Ok(())
}
Expand Down Expand Up @@ -510,21 +539,27 @@ async fn receive<S: Store>(
manager: &mut Manager<S, Registered>,
notifications: bool,
) -> anyhow::Result<()> {
let attachments_tmp_dir = Builder::new().prefix("presage-attachments").tempdir()?;
info!(
path =% attachments_tmp_dir.path().display(),
"attachments will be stored"
);

let attachments_tmp_dir = attachments_tmp_dir()?;
let messages = manager
.receive_messages(ReceivingMode::Forever)
.receive_messages()
.await
.context("failed to initialize messages stream")?;
pin_mut!(messages);

while let Some(content) = messages.next().await {
process_incoming_message(manager, attachments_tmp_dir.path(), notifications, &content)
.await;
match content {
Received::QueueEmpty => println!("done with synchronization"),
Received::Contacts => println!("got contacts synchronization"),
Received::Content(content) => {
process_incoming_message(
manager,
attachments_tmp_dir.path(),
notifications,
&content,
)
.await
}
}
}

Ok(())
Expand Down Expand Up @@ -789,9 +824,25 @@ async fn run<S: Store>(subcommand: Cmd, config_store: S) -> anyhow::Result<()> {
println!("{contact:#?}");
}
}
Cmd::RequestContactsSync => {
Cmd::SyncContacts => {
let mut manager = Manager::load_registered(config_store).await?;
manager.sync_contacts().await?;
manager.request_contacts().await?;

let messages = manager
.receive_messages()
.await
.context("failed to initialize messages stream")?;
pin_mut!(messages);

println!("synchronizing messages until we get contacts (dots are messages synced from the past timeline)");

while let Some(content) = messages.next().await {
match content {
Received::QueueEmpty => break,
Received::Contacts => println!("got contacts! thank you, come again."),
Received::Content(_) => print!("."),
}
}
}
Cmd::ListMessages {
group_master_key,
Expand Down
2 changes: 1 addition & 1 deletion presage/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::fmt;

pub use self::confirmation::Confirmation;
pub use self::linking::Linking;
pub use self::registered::{ReceivingMode, Registered, RegistrationData, RegistrationType};
pub use self::registered::{Registered, RegistrationData, RegistrationType};
pub use self::registration::{Registration, RegistrationOptions};

/// Signal manager
Expand Down
Loading

0 comments on commit abf00d1

Please sign in to comment.