|
1 | 1 | use diesel::r2d2::Builder; |
| 2 | +use diesel::sql_types::Text; |
2 | 3 | use diesel::{connection::SimpleConnection, pg::PgConnection}; |
3 | 4 | use diesel::{ |
4 | 5 | r2d2::{self, event as e, ConnectionManager, HandleEvent, Pool, PooledConnection}, |
5 | 6 | Connection, |
6 | 7 | }; |
7 | | -use diesel::{sql_query, RunQueryDsl}; |
| 8 | +use diesel::{sql_query, OptionalExtension, RunQueryDsl}; |
8 | 9 |
|
9 | 10 | use diesel_migrations::{EmbeddedMigrations, HarnessWithOutput}; |
10 | 11 | use graph::cheap_clone::CheapClone; |
@@ -380,13 +381,18 @@ impl PoolState { |
380 | 381 |
|
381 | 382 | use PoolStateInner::*; |
382 | 383 | match &*guard { |
383 | | - Created(pool, coord) => (pool.cheap_clone(), coord.cheap_clone()), |
| 384 | + Created(pool, coord) => { |
| 385 | + warn!(self.logger, "pool_state: pool not ready; creating"); |
| 386 | + (pool.cheap_clone(), coord.cheap_clone()) |
| 387 | + } |
384 | 388 | Ready(pool) => return Ok(pool.clone()), |
385 | 389 | } |
386 | 390 | }; |
387 | 391 |
|
388 | 392 | // self is `Created` and needs to have setup run |
| 393 | + warn!(self.logger, "pool_state: setting up pool"); |
389 | 394 | let migrated = coord.setup_bg(self.cheap_clone())?; |
| 395 | + warn!(self.logger, "pool_state: pool is ready"); |
390 | 396 | if migrated { |
391 | 397 | Ok(pool) |
392 | 398 | } else { |
@@ -1329,6 +1335,7 @@ impl PoolInner { |
1329 | 1335 |
|
1330 | 1336 | pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); |
1331 | 1337 |
|
| 1338 | +#[derive(Debug)] |
1332 | 1339 | struct MigrationCount { |
1333 | 1340 | old: usize, |
1334 | 1341 | new: usize, |
@@ -1377,11 +1384,13 @@ fn migrate_schema(logger: &Logger, conn: &mut PgConnection) -> Result<MigrationC |
1377 | 1384 | } |
1378 | 1385 |
|
1379 | 1386 | let migrations = catalog::migration_count(conn)?; |
1380 | | - if migrations != old_count { |
1381 | | - // Reset the query statistics since a schema change makes them not |
1382 | | - // all that useful. An error here is not serious and can be ignored. |
1383 | | - conn.batch_execute("select pg_stat_statements_reset()").ok(); |
1384 | | - } |
| 1387 | + /* |
| 1388 | + if migrations != old_count { |
| 1389 | + // Reset the query statistics since a schema change makes them not |
| 1390 | + // all that useful. An error here is not serious and can be ignored. |
| 1391 | + conn.batch_execute("select pg_stat_statements_reset()").ok(); |
| 1392 | + } |
| 1393 | + */ |
1385 | 1394 |
|
1386 | 1395 | Ok(MigrationCount { |
1387 | 1396 | new: migrations, |
@@ -1452,6 +1461,31 @@ impl PoolCoordinator { |
1452 | 1461 | /// code that does _not_ hold the migration lock as it will otherwise |
1453 | 1462 | /// deadlock |
1454 | 1463 | fn propagate(&self, pool: &PoolInner, count: MigrationCount) -> Result<(), StoreError> { |
| 1464 | + #[derive(Debug, QueryableByName)] |
| 1465 | + struct Name { |
| 1466 | + #[diesel(sql_type = Text)] |
| 1467 | + pub name: String, |
| 1468 | + } |
| 1469 | + |
| 1470 | + let mut conn = pool.get()?; |
| 1471 | + let db = sql_query("select current_database() as name") |
| 1472 | + .get_result::<Name>(&mut conn) |
| 1473 | + .optional()?; |
| 1474 | + warn!(self.logger, "CURRENT DATABASE {:?}", db); |
| 1475 | + |
| 1476 | + match sql_query( |
| 1477 | + "select table_name as name from information_schema.tables where table_schema = 'public' order by 1", |
| 1478 | + ) |
| 1479 | + .load::<Name>(&mut conn).optional()? { |
| 1480 | + Some(tables) => { |
| 1481 | + let tables = tables.into_iter().map(|t| t.name).collect::<Vec<_>>(); |
| 1482 | + warn!(self.logger, "PUBLIC SCHEMA: tables {:?}", tables); |
| 1483 | + } |
| 1484 | + None => { |
| 1485 | + warn!(self.logger, "PUBLIC SCHEMA: no tables"); |
| 1486 | + } |
| 1487 | + } |
| 1488 | + |
1455 | 1489 | // We need to remap all these servers into `pool` if the list of |
1456 | 1490 | // tables that are mapped have changed from the code of the previous |
1457 | 1491 | // version. Since dropping and recreating the foreign table |
@@ -1547,6 +1581,7 @@ impl PoolCoordinator { |
1547 | 1581 | /// the setup was actually run, i.e. if `pool` was available |
1548 | 1582 | fn setup_bg(self: Arc<Self>, pool: PoolState) -> Result<bool, StoreError> { |
1549 | 1583 | let migrated = graph::spawn_thread("database-setup", move || { |
| 1584 | + std::thread::sleep(Duration::from_secs(1)); |
1550 | 1585 | graph::block_on(self.setup(vec![pool.clone()])) |
1551 | 1586 | }) |
1552 | 1587 | .join() |
@@ -1609,6 +1644,7 @@ impl PoolCoordinator { |
1609 | 1644 | .cheap_clone() |
1610 | 1645 | .migrate(servers) |
1611 | 1646 | .map(|res| (state.cheap_clone(), res)) |
| 1647 | + .inspect(|(state, res)| warn!(state.logger, "MIGRATE RESULT {:?}", res)) |
1612 | 1648 | }) |
1613 | 1649 | .collect::<Vec<_>>(); |
1614 | 1650 | join_all(futures) |
@@ -1658,6 +1694,10 @@ impl PoolCoordinator { |
1658 | 1694 |
|
1659 | 1695 | let migrated = migrate(&pools, self.servers.as_ref()).await?; |
1660 | 1696 |
|
| 1697 | + warn!(self.logger, "Sleeping for 20s"; "migrated" => migrated.len()); |
| 1698 | + tokio::time::sleep(Duration::from_secs(20)).await; |
| 1699 | + warn!(self.logger, "Waking up"); |
| 1700 | + |
1661 | 1701 | let propagated = propagate(&self, migrated).await?; |
1662 | 1702 |
|
1663 | 1703 | primary.create_cross_shard_views(&self.servers)?; |
|
0 commit comments