Skip to content

Commit

Permalink
Merge pull request #65 from CoLearn-Dev/po_heartbeat
Browse files Browse the repository at this point in the history
Protocol operator: add heartbeat, add consumption mark
  • Loading branch information
stneng authored Jun 27, 2023
2 parents cce2074 + 540e745 commit efce46e
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 52 deletions.
13 changes: 9 additions & 4 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ jobs:
with:
submodules: recursive
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: rustfmt, clippy
run: |
rustup toolchain install stable --no-self-update --component rustfmt --component clippy
rustup default stable
- name: Install protobuf
if: ${{ startsWith(matrix.ci_image, 'ubuntu') }}
run: sudo apt update && sudo apt install protobuf-compiler -y
- name: Install protobuf(macos)
if: ${{ startsWith(matrix.ci_image, 'macos') }}
run: brew install protobuf
- name: Check
if: ${{ startsWith(matrix.ci_image, 'ubuntu') }} # skip check in macos because it is slow
run: cargo check --release
Expand Down
8 changes: 5 additions & 3 deletions .github/workflows/publish_to_crates.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ jobs:
submodules: recursive
ssh-key: ${{ secrets.SSH_KEY }}
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
run: |
rustup toolchain install stable --no-self-update
rustup default stable
- name: Install protobuf
run: sudo apt update && sudo apt install protobuf-compiler -y
- name: Publish
env:
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
Expand Down
26 changes: 13 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "colink"
version = "0.3.9"
version = "0.3.10"
edition = "2021"
description = "CoLink Rust SDK"
license = "MIT"
Expand All @@ -14,33 +14,33 @@ async-recursion = { version = "1.0", optional = true }
async-trait = "0.1"
base64 = "0.13"
chrono = "0.4"
clap = { version = "4.0", features = ["derive", "env"] }
futures-lite = "1.12"
clap = { version = "4.3", features = ["derive", "env"] }
futures-lite = "1.13"
hyper = { version = "0.14", optional = true }
hyper-rustls = { version = "0.23", optional = true }
hyper-rustls = { version = "0.24", optional = true }
jsonwebtoken = { version = "7.2", optional = true }
lapin = "2.1"
prost = "0.10"
lapin = "2.2"
prost = "0.11"
rand = { version = "0.8", features = ["std_rng"] }
rcgen = { version = "0.10", optional = true }
rdbc2 = { version = "0.2.2", optional = true }
redis = { version = "0.22", features = ["tokio-comp"] }
redis = { version = "0.23", features = ["tokio-rustls-comp"] }
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-native-roots"], optional = true }
secp256k1 = { version = "0.25", features = ["rand-std"] }
secp256k1 = { version = "0.27", features = ["rand-std"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10"
tokio = { version = "1.24", features = ["macros", "rt-multi-thread", "rt", "fs"] }
tokio-rustls = { version = "0.23", optional = true }
tonic = { version = "0.7", features = ["tls", "tls-roots"] }
tokio = { version = "1.28", features = ["macros", "rt-multi-thread", "rt", "fs"] }
tokio-rustls = { version = "0.24", optional = true }
tonic = { version = "0.9", features = ["tls", "tls-roots"] }
tracing = "0.1"
tracing-subscriber = "0.2"
url = "2.2"
uuid = { version = "0.8", features = ["v4"] }

[build-dependencies]
prost-build = "0.10"
tonic-build = "0.7"
prost-build = "0.11"
tonic-build = "0.9"

[features]
default = ["extensions", "remote_storage", "variable_transfer", "registry", "policy_module", "instant_server", "storage_macro"]
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ CoLink SDK helps both application and protocol developers access the functionali
Add this to your Cargo.toml:
```toml
[dependencies]
colink = "0.3.9"
colink = "0.3.10"
```

Enable more features in your Cargo.toml
```
# if you use storage macro dbc
colink = { version = "0.3.9", features = ["storage_macro_dbc"] }
colink = { version = "0.3.10", features = ["storage_macro_dbc"] }
```

## Getting Started
Expand Down
2 changes: 1 addition & 1 deletion src/extensions/variable_transfer/p2p_inbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl VTInboxServer {
let graceful = server.with_graceful_shutdown(async move {
rx.recv().await;
});
tokio::spawn(async { graceful.await });
tokio::spawn(graceful);
Self {
port,
jwt_secret,
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ pub use application::{
};
pub use colink_proto::*;
pub use protocol::{
CoLinkProtocol, ProtocolEntry, _colink_parse_args, _protocol_start, async_trait,
CoLinkProtocol, CoLinkProtocolCommandLineArgs, ProtocolEntry, _colink_parse_args,
_protocol_start, async_trait,
};
pub mod extensions;
pub mod utils;
98 changes: 70 additions & 28 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ pub struct CoLinkProtocol {
protocol_and_role: String,
cl: CoLink,
user_func: Box<dyn ProtocolEntry>,
vt_public_addr: Option<String>,
args: CoLinkProtocolCommandLineArgs,
}

impl CoLinkProtocol {
pub fn new(
protocol_and_role: &str,
cl: CoLink,
user_func: Box<dyn ProtocolEntry>,
vt_public_addr: Option<String>,
args: CoLinkProtocolCommandLineArgs,
) -> Self {
Self {
protocol_and_role: protocol_and_role.to_string(),
cl,
user_func,
vt_public_addr,
args,
}
}

Expand Down Expand Up @@ -78,11 +78,20 @@ impl CoLinkProtocol {
{
cl.vt_p2p_ctx =
Arc::new(crate::extensions::variable_transfer::p2p_inbox::VtP2pCtx {
public_addr: self.vt_public_addr.clone(),
public_addr: self.args.vt_public_addr.clone(),
..Default::default()
});
}
let cl_clone = cl.clone();
let instance_id = match &self.args.instance_id {
Some(instance_id) => instance_id,
None => "anonymous",
};
cl.update_entry(
&format!("_internal:task_po_mapping:{}", task.task_id),
instance_id.as_bytes(),
)
.await?;
match self
.user_func
.start(cl, task.protocol_param, task.participants)
Expand Down Expand Up @@ -170,8 +179,7 @@ impl CoLinkProtocol {
pub fn _protocol_start(
cl: CoLink,
user_funcs: HashMap<String, Box<dyn ProtocolEntry + Send + Sync>>,
keep_alive_when_disconnect: bool,
vt_public_addr: Option<String>,
args: CoLinkProtocolCommandLineArgs,
) -> Result<(), Error> {
let mut operator_funcs: HashMap<String, Box<dyn ProtocolEntry + Send + Sync>> = HashMap::new();
let mut protocols = HashSet::new();
Expand Down Expand Up @@ -233,14 +241,14 @@ pub fn _protocol_start(
let mut threads = vec![];
for (protocol_and_role, user_func) in operator_funcs {
let cl = cl.clone();
let vt_public_addr = vt_public_addr.clone();
let args = args.clone();
threads.push(thread::spawn(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
match CoLinkProtocol::new(&protocol_and_role, cl, user_func, vt_public_addr)
match CoLinkProtocol::new(&protocol_and_role, cl, user_func, args)
.start()
.await
{
Expand All @@ -250,7 +258,36 @@ pub fn _protocol_start(
});
}));
}
if keep_alive_when_disconnect {
if args.enable_heartbeat {
let cl = cl.clone();
if let Some(instance_id) = args.instance_id {
thread::spawn(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
loop {
let timestamp = chrono::Utc::now().timestamp_nanos();
let _ = cl
.update_entry(
&format!(
"_internal:protocol_operator_instances:{}:heartbeat",
instance_id
),
&timestamp.to_le_bytes(),
)
.await;
let st = rand::thread_rng().gen_range(32..64);
tokio::time::sleep(tokio::time::Duration::from_secs(st)).await;
}
});
});
} else {
return Err("Cannot find instance_id while heartbeat is enabled, please specify instance_id to enable this functionality.".into());
}
}
if args.keep_alive_when_disconnect {
for thread in threads {
thread.join().unwrap();
}
Expand Down Expand Up @@ -282,9 +319,9 @@ pub fn _protocol_start(
Ok(())
}

#[derive(Debug, Parser)]
#[derive(Debug, Clone, Default, Parser)]
#[command(name = "CoLink-SDK", about = "CoLink-SDK")]
pub struct CommandLineArgs {
pub struct CoLinkProtocolCommandLineArgs {
/// Address of CoLink server
#[arg(short, long, env = "COLINK_CORE_ADDR")]
pub addr: String,
Expand All @@ -293,6 +330,10 @@ pub struct CommandLineArgs {
#[arg(short, long, env = "COLINK_JWT")]
pub jwt: String,

/// Path to CA certificate.
#[arg(long, env = "COLINK_INSTANCE_ID")]
pub instance_id: Option<String>,

/// Path to CA certificate.
#[arg(long, env = "COLINK_CA_CERT")]
pub ca: Option<String>,
Expand All @@ -309,37 +350,34 @@ pub struct CommandLineArgs {
#[arg(long, env = "COLINK_KEEP_ALIVE_WHEN_DISCONNECT")]
pub keep_alive_when_disconnect: bool,

/// Enable heartbeat.
#[arg(long, env = "COLINK_ENABLE_HEARTBEAT")]
pub enable_heartbeat: bool,

/// Public address for the variable transfer inbox.
#[arg(long, env = "COLINK_VT_PUBLIC_ADDR")]
pub vt_public_addr: Option<String>,
}

pub fn _colink_parse_args() -> (CoLink, bool, Option<String>) {
pub fn _colink_parse_args() -> (CoLink, CoLinkProtocolCommandLineArgs) {
tracing_subscriber::fmt::init();
let CommandLineArgs {
addr,
jwt,
ca,
cert,
key,
keep_alive_when_disconnect,
vt_public_addr,
} = CommandLineArgs::parse();
let mut cl = CoLink::new(&addr, &jwt);
if let Some(ca) = ca {
let args = CoLinkProtocolCommandLineArgs::parse();
let args_clone = args.clone();
let mut cl = CoLink::new(&args.addr, &args.jwt);
if let Some(ca) = args.ca {
cl = cl.ca_certificate(&ca);
}
if let (Some(cert), Some(key)) = (cert, key) {
if let (Some(cert), Some(key)) = (args.cert, args.key) {
cl = cl.identity(&cert, &key);
}
(cl, keep_alive_when_disconnect, vt_public_addr)
(cl, args_clone)
}

#[macro_export]
macro_rules! protocol_start {
( $( $x:expr ),* ) => {
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let (cl, keep_alive_when_disconnect, vt_public_addr) = colink::_colink_parse_args();
let (cl, args) = colink::_colink_parse_args();

let mut user_funcs: std::collections::HashMap<
String,
Expand All @@ -349,7 +387,7 @@ macro_rules! protocol_start {
user_funcs.insert($x.0.to_string(), Box::new($x.1));
)*

colink::_protocol_start(cl, user_funcs, keep_alive_when_disconnect, vt_public_addr)?;
colink::_protocol_start(cl, user_funcs, args)?;

Ok(())
}
Expand All @@ -368,8 +406,12 @@ macro_rules! protocol_attach {
$(
user_funcs.insert($x.0.to_string(), Box::new($x.1));
)*
let args = colink::CoLinkProtocolCommandLineArgs {
vt_public_addr: Some("127.0.0.1".to_string()),
..Default::default()
};
std::thread::spawn(|| {
colink::_protocol_start(cl, user_funcs, false, Some("127.0.0.1".to_string()))?;
colink::_protocol_start(cl, user_funcs, args)?;
Ok::<(), Box<dyn std::error::Error + Send + Sync + 'static>>(())
});
}
Expand Down

0 comments on commit efce46e

Please sign in to comment.