Skip to content

Commit 4b5f273

Browse files
authored
Merge pull request hetu-project#9 from ai-chen2050/feat/vlc-inner-node
feat: support socket query api, about clockinfos, mergelogs
2 parents deb8319 + 17c6fad commit 4b5f273

20 files changed

+656
-161
lines changed

Cargo.lock

+22-22
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Dockerfile

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ WORKDIR /app
1212

1313
copy . .
1414

15-
RUN cargo build -p Zchronod
15+
RUN cargo build -p zchronod
1616

1717

1818
FROM debian:bookworm-slim
@@ -21,10 +21,10 @@ RUN apt update && apt install -y openssl
2121

2222
WORKDIR /app
2323

24-
COPY --from=builder /app/target/debug/Zchronod ./Zchronod
24+
COPY --from=builder /app/target/debug/zchronod ./zchronod
2525

2626
COPY ./zchronod/config-tempelete.yaml ./config-tempelete.yaml
2727

2828
EXPOSE 8080
2929

30-
CMD ["./Zchronod", "--config", "./config-tempelete.yaml"]
30+
CMD ["./zchronod", "--config", "./config-tempelete.yaml"]

crates/protos/src/bussiness.proto

+23-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ message ZChat {
1515
// Gateway just only needs read api
1616
message ZGateway {
1717
GatewayType type = 1;
18-
bytes data = 2;
18+
QueryMethod method = 2;
19+
bytes data = 3;
1920
}
2021

2122
enum GatewayType {
@@ -24,6 +25,11 @@ enum GatewayType {
2425
GATEWAY_TYPE_NODE_INFO = 2; // heartbeat or node info
2526
}
2627

28+
enum QueryMethod {
29+
QUERY_BY_MSGID = 0;
30+
QUERY_BY_TABLE_KEYID = 1;
31+
}
32+
2733
// ZGateway.type = GATEWAY_TYPE_CLOCK_NODE
2834
message ClockNode {
2935
vlc.Clock clock = 1;
@@ -37,4 +43,20 @@ message ClockNode {
3743
// ZGateway.type = GATEWAY_TYPE_NODE_INFO
3844
message NodeInfo {
3945
repeated string node_ids = 1;
46+
}
47+
48+
message QueryResponse {
49+
bool success = 1;
50+
string reason = 2;
51+
bytes data = 3;
52+
}
53+
54+
// ZGateway.method = QUERY_BY_MSGID
55+
message QueryByMsgID {
56+
string msg_id = 1;
57+
}
58+
59+
// ZGateway.method = QUERY_BY_TABLE_KEYID
60+
message QueryByTableKeyID {
61+
uint64 last_pos = 1;
4062
}

crates/protos/src/bussiness.rs

+53-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ pub struct ZChat {
1616
pub struct ZGateway {
1717
#[prost(enumeration = "GatewayType", tag = "1")]
1818
pub r#type: i32,
19-
#[prost(bytes = "vec", tag = "2")]
19+
#[prost(enumeration = "QueryMethod", tag = "2")]
20+
pub method: i32,
21+
#[prost(bytes = "vec", tag = "3")]
2022
pub data: ::prost::alloc::vec::Vec<u8>,
2123
}
2224
/// ZGateway.type = GATEWAY_TYPE_CLOCK_NODE
@@ -43,6 +45,30 @@ pub struct NodeInfo {
4345
#[prost(string, repeated, tag = "1")]
4446
pub node_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
4547
}
48+
#[allow(clippy::derive_partial_eq_without_eq)]
49+
#[derive(Clone, PartialEq, ::prost::Message)]
50+
pub struct QueryResponse {
51+
#[prost(bool, tag = "1")]
52+
pub success: bool,
53+
#[prost(string, tag = "2")]
54+
pub reason: ::prost::alloc::string::String,
55+
#[prost(bytes = "vec", tag = "3")]
56+
pub data: ::prost::alloc::vec::Vec<u8>,
57+
}
58+
/// ZGateway.method = QUERY_BY_MSGID
59+
#[allow(clippy::derive_partial_eq_without_eq)]
60+
#[derive(Clone, PartialEq, ::prost::Message)]
61+
pub struct QueryByMsgId {
62+
#[prost(string, tag = "1")]
63+
pub msg_id: ::prost::alloc::string::String,
64+
}
65+
/// ZGateway.method = QUERY_BY_TABLE_KEYID
66+
#[allow(clippy::derive_partial_eq_without_eq)]
67+
#[derive(Clone, PartialEq, ::prost::Message)]
68+
pub struct QueryByTableKeyId {
69+
#[prost(uint64, tag = "1")]
70+
pub last_pos: u64,
71+
}
4672
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
4773
#[repr(i32)]
4874
pub enum GatewayType {
@@ -74,3 +100,29 @@ impl GatewayType {
74100
}
75101
}
76102
}
103+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
104+
#[repr(i32)]
105+
pub enum QueryMethod {
106+
QueryByMsgid = 0,
107+
QueryByTableKeyid = 1,
108+
}
109+
impl QueryMethod {
110+
/// String value of the enum field names used in the ProtoBuf definition.
111+
///
112+
/// The values are not transformed in any way and thus are considered stable
113+
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
114+
pub fn as_str_name(&self) -> &'static str {
115+
match self {
116+
QueryMethod::QueryByMsgid => "QUERY_BY_MSGID",
117+
QueryMethod::QueryByTableKeyid => "QUERY_BY_TABLE_KEYID",
118+
}
119+
}
120+
/// Creates an enum from field names used in the ProtoBuf definition.
121+
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
122+
match value {
123+
"QUERY_BY_MSGID" => Some(Self::QueryByMsgid),
124+
"QUERY_BY_TABLE_KEYID" => Some(Self::QueryByTableKeyid),
125+
_ => None,
126+
}
127+
}
128+
}

crates/protos/src/vlc.proto

+10-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@ message Clock {
66
map<string, uint64> values = 1;
77
}
88

9+
message ClockInfos {
10+
repeated ClockInfo clock_infos = 1;
11+
}
12+
13+
message MergeLogs {
14+
repeated MergeLog merge_logs = 1;
15+
}
16+
917
message ClockInfo {
1018
Clock clock = 1;
1119
bytes id = 2;
@@ -19,8 +27,8 @@ message MergeLog {
1927
bytes to_id = 2;
2028
uint64 start_count = 3;
2129
uint64 end_count = 4;
22-
Clock s_clock = 5;
23-
Clock e_clock = 6;
30+
bytes s_clock_hash = 5;
31+
bytes e_clock_hash = 6;
2432
uint64 merge_at = 7;
2533
}
2634

crates/protos/src/vlc.rs

+16-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,18 @@ pub struct Clock {
77
}
88
#[allow(clippy::derive_partial_eq_without_eq)]
99
#[derive(Clone, PartialEq, ::prost::Message)]
10+
pub struct ClockInfos {
11+
#[prost(message, repeated, tag = "1")]
12+
pub clock_infos: ::prost::alloc::vec::Vec<ClockInfo>,
13+
}
14+
#[allow(clippy::derive_partial_eq_without_eq)]
15+
#[derive(Clone, PartialEq, ::prost::Message)]
16+
pub struct MergeLogs {
17+
#[prost(message, repeated, tag = "1")]
18+
pub merge_logs: ::prost::alloc::vec::Vec<MergeLog>,
19+
}
20+
#[allow(clippy::derive_partial_eq_without_eq)]
21+
#[derive(Clone, PartialEq, ::prost::Message)]
1022
pub struct ClockInfo {
1123
#[prost(message, optional, tag = "1")]
1224
pub clock: ::core::option::Option<Clock>,
@@ -30,10 +42,10 @@ pub struct MergeLog {
3042
pub start_count: u64,
3143
#[prost(uint64, tag = "4")]
3244
pub end_count: u64,
33-
#[prost(message, optional, tag = "5")]
34-
pub s_clock: ::core::option::Option<Clock>,
35-
#[prost(message, optional, tag = "6")]
36-
pub e_clock: ::core::option::Option<Clock>,
45+
#[prost(bytes = "vec", tag = "5")]
46+
pub s_clock_hash: ::prost::alloc::vec::Vec<u8>,
47+
#[prost(bytes = "vec", tag = "6")]
48+
pub e_clock_hash: ::prost::alloc::vec::Vec<u8>,
3749
#[prost(uint64, tag = "7")]
3850
pub merge_at: u64,
3951
}

zchronod/config-tempelete.yaml

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ pg_db_url: "postgres://postgres:hetu@0.0.0.0:5432"
22
pg_db_name: "vlc_inner_db"
33
outer_p2p: "0.0.0.0:8051"
44
inner_p2p: "0.0.0.0:8050"
5-
ws_url: "0.0.0.0:8052"
5+
ws_url: "0.0.0.0:8052"
6+
read_maximum: 20

zchronod/db_sql/src/pg/migrator/m20240428_000001_create_clock_infos_table.rs

+27-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use sea_orm_migration::prelude::*;
2+
use sea_query::Index;
23
pub struct Migration;
34

45
impl MigrationName for Migration {
@@ -11,7 +12,7 @@ impl MigrationName for Migration {
1112
impl MigrationTrait for Migration {
1213
// Define how to apply this migration: Create the clock_infos table.
1314
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
14-
manager
15+
let result = manager
1516
.create_table(
1617
Table::create()
1718
.table(ClockInfos::Table)
@@ -30,8 +31,31 @@ impl MigrationTrait for Migration {
3031
.col(ColumnDef::new(ClockInfos::EventCount).integer().not_null())
3132
.col(ColumnDef::new(ClockInfos::CreateAt).timestamp())
3233
.to_owned(),
33-
)
34-
.await
34+
).await;
35+
36+
if let Err(err) = result {
37+
return Err(err);
38+
}
39+
40+
// create index
41+
let msgid_index = Index::create()
42+
.if_not_exists()
43+
.name("idx-clockinfos-messageid")
44+
.table(ClockInfos::Table)
45+
.col(ClockInfos::MessageId)
46+
.to_owned();
47+
let result = manager.create_index(msgid_index).await;
48+
if let Err(err) = result {
49+
return Err(err);
50+
}
51+
52+
let nodeid_index = Index::create()
53+
.if_not_exists()
54+
.name("idx-clockinfos-nodeid")
55+
.table(ClockInfos::Table)
56+
.col(ClockInfos::NodeId)
57+
.to_owned();
58+
manager.create_index(nodeid_index).await
3559
}
3660

3761
// Define how to rollback this migration: Drop the ClockInfo table.

zchronod/db_sql/src/pg/migrator/m20240428_000001_create_merge_logs_table.rs

-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use sea_orm_migration::prelude::*;
22

3-
use super::m20240428_000001_create_clock_infos_table::ClockInfos;
4-
53
pub struct Migration;
64

75
impl MigrationName for Migration {

zchronod/db_sql/src/pg/pg_client.rs

+30-4
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use futures::executor::block_on;
2+
use sea_orm::*;
23
use sea_orm_migration::prelude::*;
3-
use sea_orm::{ConnectionTrait, Database, DatabaseConnection, DbBackend, DbErr, Statement};
44
use super::migrator::Migrator;
55
use super::entities::{prelude::*, *};
6-
use sea_orm::*;
76

87
const DATABASE_PG_URL: &str = "postgres://postgres:hetu@0.0.0.0:5432";
98
const DB_NAME: &str = "vlc_inner_db";
@@ -106,7 +105,34 @@ mod tests {
106105
ClockInfos::insert(clock3).exec(&db).await.expect("insert error");
107106
let clock_vec = ClockInfos::find().all(&db).await.expect("query error");
108107
println!("clock_vec-2 = {:?}", clock_vec);
109-
}
110-
108+
}
109+
}
110+
111+
#[tokio::test]
112+
async fn test_get_clocks_by_msgid() {
113+
let url = format!("{}/{}", DATABASE_PG_URL, DB_NAME);
114+
let db = Database::connect(&url).await.expect("failed to connect to database");
115+
{
116+
let msg_id = "todo1";
117+
let clock_info = ClockInfos::find().filter(clock_infos::Column::MessageId.eq(msg_id)).all(&db).await.expect("query error");
118+
println!("pointed message_id's clocks = {:?}", clock_info);
119+
}
120+
}
121+
122+
#[tokio::test]
123+
async fn test_get_clocks_from_start_id() {
124+
let url = format!("{}/{}", DATABASE_PG_URL, DB_NAME);
125+
let db = Database::connect(&url).await.expect("failed to connect to database");
126+
127+
let start_id = 0;
128+
129+
let clocks = ClockInfos::find()
130+
.filter(clock_infos::Column::Id.gt(start_id))
131+
.limit(1)
132+
.all(&db)
133+
.await
134+
.expect("query error");
135+
136+
println!("clocks = {:?}", clocks);
111137
}
112138
}

zchronod/node_api/src/config.rs

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub struct ZchronodConfig {
1313
pub outer_p2p: Option<String>,
1414
pub inner_p2p: String, // vlc server bind udp socket
1515
pub ws_url: String,
16+
pub read_maximum: u64,
1617
}
1718

1819
#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, Default)]

zchronod/tools/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
//! this crate is for tools function in the zchronod repository.
22
3-
pub mod tokio_zhronod;
3+
pub mod tokio_zchronod;
44
pub mod helper;

zchronod/zchronod/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11

22
[package]
3-
name = "Zchronod"
3+
name = "zchronod"
44
version = "0.1.0"
55
edition = "2021"
66

0 commit comments

Comments
 (0)