Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GRPC Ingest #183

Open
wants to merge 87 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
ea5379e
fmt
fanatid Nov 20, 2023
8b12316
clippy
fanatid Nov 20, 2023
974c90f
ci: add lock, fmt, clippy checks
fanatid Nov 20, 2023
788973e
include migration to workspace
fanatid Nov 20, 2023
7a870c2
include das_api to workspace
fanatid Nov 20, 2023
7e3f9ef
add global clippy lints
fanatid Nov 20, 2023
aa916a4
use workspace
fanatid Nov 20, 2023
033873d
add crate program_transformers
fanatid Nov 22, 2023
1008f86
nft_ingester: use program_transformers crate
fanatid Nov 23, 2023
83b171e
remove not used deps
fanatid Nov 23, 2023
9d8031e
remove AccountInfo
fanatid Nov 23, 2023
096616d
remove plerkle from program_transformers
fanatid Nov 24, 2023
ef0b0e9
nft_ingester2: grpc2redis
fanatid Nov 27, 2023
076bffa
add redis streaming for ingester
fanatid Nov 28, 2023
d8673cf
create pg pool
fanatid Nov 29, 2023
c93adff
parse incoming message from redis
fanatid Nov 29, 2023
53fa481
add force shutdown with signals
fanatid Nov 30, 2023
11b4151
insert download metadata tasks
fanatid Dec 1, 2023
8fca2c2
download-metadata subtask
fanatid Dec 2, 2023
fcc18d6
refactor: rename nft_ingest2
kespinola Apr 10, 2024
294fa1a
fix: applying account and transction filters to grpc subscription req…
kespinola Apr 15, 2024
98a7e4e
refactor: take out multiple connections to dragonmouth and then have …
kespinola May 2, 2024
997d515
refactor: switch to accepting multiple dragonmouth endoints to mimic …
kespinola May 3, 2024
75c4115
Merge branch 'main' of github.com:rpcpool/digital-asset-rpc-infrastru…
kespinola May 9, 2024
67cc21e
fix: clippy errors
kespinola May 9, 2024
5f08298
fix: processes transactions for a tree sequentially to make sure ther…
kespinola May 15, 2024
0eced5d
refactor: use program transform in the account backfill no queuing in…
kespinola May 16, 2024
138bf2e
refactor: move bubble backfill to a lib so can be used by other proje…
kespinola May 30, 2024
d80e7b8
Merge remote-tracking branch 'origin' into grpc-ingest
kespinola Jul 19, 2024
2cc3411
chor: refactor grpc command to use topograph to clean up the control …
kespinola Jul 30, 2024
81f5d71
chore: switch to run_v2 for ingest which handles ingesting accounts a…
kespinola Aug 9, 2024
10c46d5
Merge remote-tracking branch 'origin' into grpc-ingest
kespinola Aug 10, 2024
a151630
chore: handle metadata json within the ingest command
kespinola Aug 12, 2024
f398880
test: metadata json stream processing
kespinola Aug 12, 2024
70eef04
feat: set num threads for topograph. report on postgres and redis in …
kespinola Aug 14, 2024
77a98f6
chore: switch to topograph for thread management. create separate ing…
kespinola Aug 20, 2024
0db0612
fix: config and ack
kespinola Aug 22, 2024
32f9852
hack: disable pending processing while see why messages arent being d…
kespinola Aug 22, 2024
bab4678
fix: exit early if slot matches metadata json download and reindex is…
kespinola Aug 22, 2024
0c4f872
fix: picking dangling pending messages before reading new
kespinola Aug 22, 2024
eb75ca3
fix: ingest stream shutdown and requesting snapshots
kespinola Aug 23, 2024
3213080
fix: processing pending. add metrics for total workers.
kespinola Aug 23, 2024
7f3b391
fix: bubble backfill transform. no op out of download metadata info.
kespinola Aug 26, 2024
4c3f649
Add force flag to TreeWorkerArgs for complete reindexing (#148)
kevinrodriguez-io Sep 13, 2024
4e3c076
refactor: drop exculsion clauses on cl_items to allow for reindexing …
kespinola Sep 16, 2024
c789b52
feat: replay bubblegum transactions based on what is in cl_audits_v2
kespinola Sep 26, 2024
45a5dcf
fix: order bubblegum backfill instructions by seq (#158)
kespinola Oct 2, 2024
f4744c0
include back seq checks
kespinola Oct 7, 2024
1afee45
Remove enforcing seq in bubblegum backfill
kespinola Oct 7, 2024
18411a9
Replay single tree with targeted seq replay (#159)
kespinola Oct 8, 2024
110f71c
Support Account Snapshot Flushes (#154)
kespinola Oct 9, 2024
118baec
fix: keep grpc alive by sending using ping-pong (#160)
Nagaprasadvr Oct 9, 2024
01bbe99
Only Time Based Flushes (#161)
kespinola Oct 9, 2024
cde4594
delete and recreate the consumer (#162)
kespinola Oct 11, 2024
0005b3e
Type out redis message handlers (#163)
kespinola Oct 12, 2024
d092ad8
skip reading new messages if the ack buffer is full. track running ta…
kespinola Oct 13, 2024
6f158f3
only push handler job for ingest if capacity (#165)
kespinola Oct 14, 2024
a347066
Only throttle based on the task semaphore (#166)
kespinola Oct 14, 2024
188c878
Remove topograph from ingest command (#167)
kespinola Oct 14, 2024
e15172f
Configure database connection idle timeout and max lifetime (#169)
kespinola Oct 21, 2024
fdbf03a
Add single NFT fetch and backfill feature to ops (#170)
Nagaprasadvr Oct 22, 2024
d7cc058
Add lock timeout on v1_asset statements
kespinola Oct 23, 2024
17d7e0a
Set the lock timeout to 5s
kespinola Oct 23, 2024
99b59c0
Bubblegum verify (#150)
kespinola Oct 29, 2024
d3bfe1b
Separate global stream to multiple Program separated streams and hand…
Nagaprasadvr Nov 8, 2024
f311e14
Report proofs as they come (#178)
kespinola Nov 8, 2024
f8bdef2
Buffer redis messages (#179)
kespinola Nov 11, 2024
776ae0a
Fix tracking ack tasks
kespinola Nov 11, 2024
151168e
fix: add back ack tasks total
kespinola Nov 11, 2024
ae11907
recreate consumer but not groups (#181)
kespinola Nov 12, 2024
80d61dc
Time Ingest jobs (#182)
kespinola Nov 14, 2024
4baaca6
Use Example Configs (#180)
kespinola Nov 27, 2024
c235125
Respect commitment level set in grpc config (#191)
kespinola Dec 18, 2024
a464fd3
Merge remote-tracking branch 'origin/main' into grpc-ingest
kespinola Jan 15, 2025
9f3d543
Use tokio join set ingest stream task tracking
kespinola Jan 20, 2025
3d7a88d
Add metrics to track metadata_json_download success and failure (#207)
Nagaprasadvr Jan 21, 2025
44ce53f
add retry config to download_metadata_json task (#208)
Nagaprasadvr Jan 21, 2025
991bf70
Merge remote-tracking branch 'origin/main' into grpc-ingest
kespinola Jan 22, 2025
b491d0a
Resolve clippy
kespinola Jan 22, 2025
aaee1ad
Use multiple pipelines per subscription (#211)
kespinola Jan 23, 2025
1343e33
Das 106 add metadata json backfiller to das (#210)
Nagaprasadvr Jan 24, 2025
e678814
Shutdown program on stream error or close (#213)
kespinola Jan 24, 2025
5aa04bd
Global shutdown in the None arm
kespinola Jan 24, 2025
b473efa
update few actions workflows to v4 (#237)
Nagaprasadvr Feb 3, 2025
0e368c1
Migrate to channel and worker pool for download notifier (#217)
fernandodeluret Feb 3, 2025
55d11b6
`chore` Add auto_explain to postgres to log slow db query plans (#218)
fernandodeluret Feb 3, 2025
8389596
add redis pipes for processing metadata download (#222)
Nagaprasadvr Feb 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
skip reading new messages if the ack buffer is full. track running ta…
…sks. (#164)
kespinola authored Oct 13, 2024
commit d092ad8a2546a221ad5ae7797bbaef6a034c02db
4 changes: 3 additions & 1 deletion grpc-ingest/config-ingester.yml
Original file line number Diff line number Diff line change
@@ -3,19 +3,21 @@ redis: redis://localhost:6379
postgres:
url: postgres://solana:solana@localhost/solana
min_connections: 10
max_connections: 50 # `max_connection` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible
max_connections: 50
snapshots:
name: SNAPSHOTS
max_concurrency: 10
batch_size: 100
xack_batch_max_idle_ms: 1_000
xack_buffer_size: 10_000
xack_batch_max_size: 500
accounts:
name: ACCOUNTS
max_concurrency: 10
batch_size: 100
xack_batch_max_idle_ms: 1_000
xack_buffer_size: 10_000
xack_batch_max_size: 500
transactions:
name: TRANSACTIONS
download_metadata:
24 changes: 17 additions & 7 deletions grpc-ingest/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use {
crate::{
config::ConfigGrpc, prom::redis_xadd_status_inc, redis::TrackedPipeline,
config::ConfigGrpc,
prom::{grpc_tasks_total_dec, grpc_tasks_total_inc, redis_xadd_status_inc},
redis::TrackedPipeline,
util::create_shutdown,
},
anyhow::Context,
@@ -56,6 +58,8 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock

let subscribe_tx = Arc::clone(&self.subscribe_tx);

grpc_tasks_total_inc();

async move {
match job {
GrpcJob::FlushRedisPipe => {
@@ -68,7 +72,7 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock
let counts = flush.as_ref().unwrap_or_else(|counts| counts);

for (stream, count) in counts.iter() {
debug!(message = "Redis pipe flushed", ?stream, ?status, ?count);
debug!(target: "grpc2redis", action = "flush_redis_pipe", stream = ?stream, status = ?status, count = ?count);
redis_xadd_status_inc(stream, status, *count);
}
}
@@ -91,6 +95,7 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock
"*",
account.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_account_update", stream = ?accounts_stream, maxlen = ?accounts_stream_maxlen);
}
UpdateOneof::Transaction(transaction) => {
pipe.xadd_maxlen(
@@ -99,6 +104,7 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock
"*",
transaction.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_transaction_update", stream = ?transactions_stream, maxlen = ?transactions_stream_maxlen);
}
UpdateOneof::Ping(_) => {
let ping = subscribe_tx
@@ -112,25 +118,29 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock

match ping {
Ok(_) => {
debug!(message = "Ping sent successfully", id = PING_ID)
debug!(target: "grpc2redis", action = "send_ping", message = "Ping sent successfully", id = PING_ID)
}
Err(err) => {
warn!(message = "Failed to send ping", ?err, id = PING_ID)
warn!(target: "grpc2redis", action = "send_ping_failed", message = "Failed to send ping", ?err, id = PING_ID)
}
}
}
UpdateOneof::Pong(pong) => {
if pong.id == PING_ID {
debug!(message = "Pong received", id = PING_ID);
debug!(target: "grpc2redis", action = "receive_pong", message = "Pong received", id = PING_ID);
} else {
warn!(message = "Unknown pong id received", id = pong.id);
warn!(target: "grpc2redis", action = "receive_unknown_pong", message = "Unknown pong id received", id = pong.id);
}
}
var => warn!(message = "Unknown update variant", ?var),
var => {
warn!(target: "grpc2redis", action = "unknown_update_variant", message = "Unknown update variant", ?var)
}
}
}
}
}

grpc_tasks_total_dec();
}
}
}
7 changes: 4 additions & 3 deletions grpc-ingest/src/ingester.rs
Original file line number Diff line number Diff line change
@@ -122,10 +122,11 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {

report.abort();

accounts.stop().await?;
transactions.stop().await?;
futures::future::join_all(vec![accounts.stop(), transactions.stop(), snapshots.stop()])
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
download_metadatas.stop().await?;
snapshots.stop().await?;

pool.close().await;

28 changes: 26 additions & 2 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
@@ -62,6 +62,16 @@ lazy_static::lazy_static! {
Opts::new("ingest_tasks", "Number of tasks spawned for ingest"),
&["stream"]
).unwrap();

static ref ACK_TASKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("ack_tasks", "Number of tasks spawned for ack redis messages"),
&["stream"]
).unwrap();

static ref GRPC_TASKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("grpc_tasks", "Number of tasks spawned for writing grpc messages to redis "),
&[]
).unwrap();
}

pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
@@ -84,6 +94,8 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(PROGRAM_TRANSFORMER_TASK_STATUS_COUNT);
register!(DOWNLOAD_METADATA_INSERTED_COUNT);
register!(INGEST_TASKS);
register!(ACK_TASKS);
register!(GRPC_TASKS);

VERSION_INFO_METRIC
.with_label_values(&[
@@ -182,8 +194,20 @@ pub fn ingest_tasks_total_dec(stream: &str) {
INGEST_TASKS.with_label_values(&[stream]).dec()
}

pub fn ingest_tasks_reset(stream: &str) {
INGEST_TASKS.with_label_values(&[stream]).set(0)
pub fn ack_tasks_total_inc(stream: &str) {
ACK_TASKS.with_label_values(&[stream]).inc()
}

pub fn ack_tasks_total_dec(stream: &str) {
ACK_TASKS.with_label_values(&[stream]).dec()
}

pub fn grpc_tasks_total_inc() {
GRPC_TASKS.with_label_values(&[]).inc()
}

pub fn grpc_tasks_total_dec() {
GRPC_TASKS.with_label_values(&[]).dec()
}

#[derive(Debug, Clone, Copy)]
13 changes: 7 additions & 6 deletions grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
@@ -2,9 +2,9 @@ use {
crate::{
config::{ConfigIngestStream, REDIS_STREAM_DATA_KEY},
prom::{
ingest_tasks_reset, ingest_tasks_total_dec, ingest_tasks_total_inc,
program_transformer_task_status_inc, redis_xack_inc, redis_xlen_set, redis_xread_inc,
ProgramTransformerTaskStatusKind,
ack_tasks_total_dec, ack_tasks_total_inc, ingest_tasks_total_dec,
ingest_tasks_total_inc, program_transformer_task_status_inc, redis_xack_inc,
redis_xlen_set, redis_xread_inc, ProgramTransformerTaskStatusKind,
},
},
das_core::{DownloadMetadata, DownloadMetadataInfo},
@@ -400,6 +400,7 @@ impl<'a>

let count = ids.len();

ack_tasks_total_inc(&config.name);
async move {
match redis::pipe()
.atomic()
@@ -425,6 +426,8 @@ impl<'a>
);
}
}

ack_tasks_total_dec(&config.name);
}
}
}
@@ -548,8 +551,6 @@ impl<H: MessageHandler> IngestStream<H> {
}
});

ingest_tasks_reset(&config.name);

let executor = Executor::builder(Nonblock(Tokio))
.max_concurrency(Some(config.max_concurrency))
.build_async(IngestStreamHandler::new(
@@ -602,7 +603,7 @@ impl<H: MessageHandler> IngestStream<H> {

break;
},
result = self.read(&mut connection) => {
result = self.read(&mut connection), if ack_tx.capacity() >= config.batch_size => {
match result {
Ok(reply) => {
for StreamKey { key: _, ids } in reply.keys {