|
1 | 1 | use diesel::r2d2::Builder; |
| 2 | +use diesel::sql_types::Text; |
2 | 3 | use diesel::{connection::SimpleConnection, pg::PgConnection}; |
3 | 4 | use diesel::{ |
4 | 5 | r2d2::{self, event as e, ConnectionManager, HandleEvent, Pool, PooledConnection}, |
5 | 6 | Connection, |
6 | 7 | }; |
7 | | -use diesel::{sql_query, RunQueryDsl}; |
| 8 | +use diesel::{sql_query, OptionalExtension, RunQueryDsl}; |
8 | 9 |
|
9 | 10 | use diesel_migrations::{EmbeddedMigrations, HarnessWithOutput}; |
10 | 11 | use graph::cheap_clone::CheapClone; |
@@ -380,13 +381,18 @@ impl PoolState { |
380 | 381 |
|
381 | 382 | use PoolStateInner::*; |
382 | 383 | match &*guard { |
383 | | - Created(pool, coord) => (pool.cheap_clone(), coord.cheap_clone()), |
| 384 | + Created(pool, coord) => { |
| 385 | + warn!(self.logger, "pool_state: pool not ready; creating"); |
| 386 | + (pool.cheap_clone(), coord.cheap_clone()) |
| 387 | + } |
384 | 388 | Ready(pool) => return Ok(pool.clone()), |
385 | 389 | } |
386 | 390 | }; |
387 | 391 |
|
388 | 392 | // self is `Created` and needs to have setup run |
| 393 | + warn!(self.logger, "pool_state: setting up pool"); |
389 | 394 | let migrated = coord.setup_bg(self.cheap_clone())?; |
| 395 | + warn!(self.logger, "pool_state: pool is ready"); |
390 | 396 | if migrated { |
391 | 397 | Ok(pool) |
392 | 398 | } else { |
@@ -1329,6 +1335,7 @@ impl PoolInner { |
1329 | 1335 |
|
1330 | 1336 | pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); |
1331 | 1337 |
|
| 1338 | +#[derive(Debug)] |
1332 | 1339 | struct MigrationCount { |
1333 | 1340 | old: usize, |
1334 | 1341 | new: usize, |
@@ -1452,6 +1459,31 @@ impl PoolCoordinator { |
1452 | 1459 | /// code that does _not_ hold the migration lock as it will otherwise |
1453 | 1460 | /// deadlock |
1454 | 1461 | fn propagate(&self, pool: &PoolInner, count: MigrationCount) -> Result<(), StoreError> { |
| 1462 | + #[derive(Debug, QueryableByName)] |
| 1463 | + struct Name { |
| 1464 | + #[diesel(sql_type = Text)] |
| 1465 | + pub name: String, |
| 1466 | + } |
| 1467 | + |
| 1468 | + let mut conn = pool.get()?; |
| 1469 | + let db = sql_query("select current_database() as name") |
| 1470 | + .get_result::<Name>(&mut conn) |
| 1471 | + .optional()?; |
| 1472 | + warn!(self.logger, "CURRENT DATABASE {:?}", db); |
| 1473 | + |
| 1474 | + match sql_query( |
| 1475 | + "select table_name as name from information_schema.tables where table_schema = 'public' order by 1", |
| 1476 | + ) |
| 1477 | + .load::<Name>(&mut conn).optional()? { |
| 1478 | + Some(tables) => { |
| 1479 | + let tables = tables.into_iter().map(|t| t.name).collect::<Vec<_>>(); |
| 1480 | + warn!(self.logger, "PUBLIC SCHEMA: tables {:?}", tables); |
| 1481 | + } |
| 1482 | + None => { |
| 1483 | + warn!(self.logger, "PUBLIC SCHEMA: no tables"); |
| 1484 | + } |
| 1485 | + } |
| 1486 | + |
1455 | 1487 | // We need to remap all these servers into `pool` if the list of |
1456 | 1488 | // tables that are mapped have changed from the code of the previous |
1457 | 1489 | // version. Since dropping and recreating the foreign table |
@@ -1547,6 +1579,7 @@ impl PoolCoordinator { |
1547 | 1579 | /// the setup was actually run, i.e. if `pool` was available |
1548 | 1580 | fn setup_bg(self: Arc<Self>, pool: PoolState) -> Result<bool, StoreError> { |
1549 | 1581 | let migrated = graph::spawn_thread("database-setup", move || { |
| 1582 | + std::thread::sleep(Duration::from_secs(1)); |
1550 | 1583 | graph::block_on(self.setup(vec![pool.clone()])) |
1551 | 1584 | }) |
1552 | 1585 | .join() |
@@ -1609,6 +1642,7 @@ impl PoolCoordinator { |
1609 | 1642 | .cheap_clone() |
1610 | 1643 | .migrate(servers) |
1611 | 1644 | .map(|res| (state.cheap_clone(), res)) |
| 1645 | + .inspect(|(state, res)| warn!(state.logger, "MIGRATE RESULT {:?}", res)) |
1612 | 1646 | }) |
1613 | 1647 | .collect::<Vec<_>>(); |
1614 | 1648 | join_all(futures) |
@@ -1658,6 +1692,10 @@ impl PoolCoordinator { |
1658 | 1692 |
|
1659 | 1693 | let migrated = migrate(&pools, self.servers.as_ref()).await?; |
1660 | 1694 |
|
| 1695 | + warn!(self.logger, "Sleeping for 5s"; "migrated" => migrated.len()); |
| 1696 | + tokio::time::sleep(Duration::from_secs(5)).await; |
| 1697 | + warn!(self.logger, "Waking up"); |
| 1698 | + |
1661 | 1699 | let propagated = propagate(&self, migrated).await?; |
1662 | 1700 |
|
1663 | 1701 | primary.create_cross_shard_views(&self.servers)?; |
|
0 commit comments