Skip to content

Commit

Permalink
Merge pull request #16 from worldcoin/dzejkop/seed-db
Browse files Browse the repository at this point in the history
Seed both DBs
  • Loading branch information
Dzejkop authored Feb 1, 2024
2 parents 0dbd1cf + f64d31f commit 29d8f6f
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 14 deletions.
54 changes: 54 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dotenv = "0.15.0"
eyre = "0.6.11"
futures = "0.3.30"
hex = { version = "0.4.3", features = ["serde"] }
indicatif = "0.17.7"
indoc = "2.0.4"
itertools = "0.12.0"
metrics = "0.21.1"
Expand Down
100 changes: 99 additions & 1 deletion bin/utils.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use clap::{Args, Parser};
use indicatif::ProgressBar;
use mpc::config::DbConfig;
use mpc::db::coordinator::CoordinatorDb;
use mpc::db::participant::ParticipantDb;
use mpc::template::Template;
use rand::{thread_rng, Rng};

#[derive(Debug, Clone, Parser)]
enum Opt {
HttpQuery(RandomQuery),
SQSQuery(RandomQuery),
SeedDb(SeedDb),
}

#[derive(Debug, Clone, Args)]
Expand All @@ -14,6 +19,21 @@ struct RandomQuery {
pub url: String,
}

#[derive(Debug, Clone, Args)]
struct SeedDb {
#[clap(short, long)]
pub coordinator_db_url: String,

#[clap(short, long)]
pub participant_db_url: Vec<String>,

#[clap(short, long, default_value = "3000000")]
pub num: usize,

#[clap(short, long, default_value = "10000")]
pub batch_size: usize,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
dotenv::dotenv().ok();
Expand All @@ -33,7 +53,9 @@ async fn main() -> eyre::Result<()> {
.await?
.error_for_status()?;
}

Opt::SeedDb(args) => {
seed_db(&args).await?;
}
Opt::SQSQuery(args) => {
let aws_config = aws_config::load_defaults(
aws_config::BehaviorVersion::latest(),
Expand All @@ -57,3 +79,79 @@ async fn main() -> eyre::Result<()> {

Ok(())
}

async fn seed_db(args: &SeedDb) -> eyre::Result<()> {
let mut templates: Vec<Template> = Vec::with_capacity(args.num);

let pb =
ProgressBar::new(args.num as u64).with_message("Generating templates");

for _ in 0..args.num {
templates.push(thread_rng().gen());

pb.inc(1);
}

pb.finish_with_message("done");

let coordinator_db = CoordinatorDb::new(&DbConfig {
url: args.coordinator_db_url.clone(),
migrate: true,
create: true,
})
.await?;

let mut participant_dbs = vec![];

for db_config in args.participant_db_url.iter() {
participant_dbs.push(
ParticipantDb::new(&DbConfig {
url: db_config.clone(),
migrate: true,
create: true,
})
.await?,
);
}

let pb = ProgressBar::new(args.num as u64).with_message("Seeding DBs");

for (idx, chunk) in templates.chunks(args.batch_size).enumerate() {
let mut chunk_masks = Vec::with_capacity(chunk.len());
let mut chunk_shares: Vec<_> = (0..participant_dbs.len())
.map(|_| Vec::with_capacity(chunk.len()))
.collect();

for (offset, template) in chunk.iter().enumerate() {
let shares =
mpc::distance::encode(template).share(participant_dbs.len());

let id = offset + (idx * args.batch_size);

chunk_masks.push((id as u64, template.mask));
for (idx, share) in shares.iter().enumerate() {
chunk_shares[idx].push((id as u64, *share));
}
}

let mut tasks = vec![];

for (idx, db) in participant_dbs.iter().enumerate() {
tasks.push(db.insert_shares(&chunk_shares[idx]));
}

let (coordinator, participants) = tokio::join!(
coordinator_db.insert_masks(&chunk_masks),
futures::future::join_all(tasks),
);

coordinator?;
participants.into_iter().collect::<Result<_, _>>()?;

pb.inc(args.batch_size as u64);
}

pb.finish_with_message("done");

Ok(())
}
14 changes: 14 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: '3'
services:
coordinator_db:
image: postgres
ports:
- 5432:5432
environment:
- POSTGRES_HOST_AUTH_METHOD=trust
participant_db:
image: postgres
ports:
- 5433:5432
environment:
- POSTGRES_HOST_AUTH_METHOD=trust
9 changes: 9 additions & 0 deletions src/bits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,15 @@ mod tests {
assert_eq!(BYTES_PER_COL * 8, COLS);
}

// #[test]
// fn random_bits() {
// let mut rng = thread_rng();
// let bits: Bits = rng.gen();

// println!("bits num bytes = {}", BITS / 8);
// println!("{:?}", bits);
// }

#[test]
fn test_index() {
let mut rng = thread_rng();
Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub struct DbConfig {

#[serde(default)]
pub migrate: bool,

#[serde(default)]
pub create: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -105,6 +108,7 @@ mod tests {
db: DbConfig {
url: "postgres://localhost:5432/mpc".to_string(),
migrate: true,
create: true,
},
}),
participant: None,
Expand Down
2 changes: 1 addition & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod coordinator;
pub mod participant;
pub mod impls;
pub mod participant;
15 changes: 13 additions & 2 deletions src/db/coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use sqlx::migrate::Migrator;
use sqlx::migrate::{MigrateDatabase, Migrator};
use sqlx::{Postgres, QueryBuilder};

use crate::bits::Bits;
Expand All @@ -12,6 +12,12 @@ pub struct CoordinatorDb {

impl CoordinatorDb {
pub async fn new(config: &DbConfig) -> eyre::Result<Self> {
if config.create
&& !sqlx::Postgres::database_exists(&config.url).await?
{
sqlx::Postgres::create_database(&config.url).await?;
}

let pool = sqlx::Pool::connect(&config.url).await?;

if config.migrate {
Expand Down Expand Up @@ -76,7 +82,12 @@ mod tests {
let url =
format!("postgres://postgres:postgres@{}", pg_db.socket_addr());

let db = CoordinatorDb::new(&DbConfig { url, migrate: true }).await?;
let db = CoordinatorDb::new(&DbConfig {
url,
migrate: true,
create: true,
})
.await?;

Ok((db, pg_db))
}
Expand Down
Loading

0 comments on commit 29d8f6f

Please sign in to comment.