diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index 08a02826814..54c585d5ab3 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -351,7 +351,8 @@ impl EnvVars { subgraph_error_retry_ceil: Duration::from_secs(inner.subgraph_error_retry_ceil_in_secs), subgraph_error_retry_jitter: inner.subgraph_error_retry_jitter, enable_select_by_specific_attributes: inner.enable_select_by_specific_attributes.0, - postpone_attribute_index_creation: false, + postpone_attribute_index_creation: inner.postpone_attribute_index_creation.0 + || cfg!(debug_assertions), postpone_indexes_creation_threshold: inner.postpone_indexes_creation_threshold, log_trigger_data: inner.log_trigger_data.0, explorer_ttl: Duration::from_secs(inner.explorer_ttl_in_secs), @@ -556,7 +557,6 @@ struct Inner { #[envconfig(from = "GRAPH_ENABLE_SELECT_BY_SPECIFIC_ATTRIBUTES", default = "true")] enable_select_by_specific_attributes: EnvVarBoolean, #[envconfig(from = "GRAPH_POSTPONE_ATTRIBUTE_INDEX_CREATION", default = "false")] - #[allow(unused)] postpone_attribute_index_creation: EnvVarBoolean, #[envconfig(from = "GRAPH_POSTPONE_INDEXES_CREATION_THRESHOLD", default = "10000")] postpone_indexes_creation_threshold: i32, diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index fcc10bf2c93..0fbee8ff416 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -882,6 +882,36 @@ pub(crate) async fn check_index_is_valid( Ok(matches!(result, Some(true))) } +/// Check whether there is an index creation currently running for any index +/// in the given schema. If there is, return the PID of the backend that is +/// creating the index and the name of the index being created. +pub(crate) async fn index_creation_is_running( + conn: &mut AsyncPgConnection, + schema_name: &str, +) -> Result, StoreError> { + #[derive(Queryable, QueryableByName)] + struct IndexCreationCheck { + #[diesel(sql_type = Integer)] + pid: i32, + #[diesel(sql_type = Text)] + index_name: String, + } + + let query = " + select ci.pid, ci.index_relid::regclass as index_name \ + from pg_stat_progress_create_index ci \ + join pg_class c on ci.relid = c.oid \ + join pg_namespace n on c.relnamespace = n.oid \ + where n.nspname = $1"; + sql_query(query) + .bind::(schema_name) + .get_result::(conn) + .await + .optional() + .map(|check| check.map(|check| (check.pid, check.index_name))) + .map_err::(Into::into) +} + pub(crate) async fn indexes_for_table( conn: &mut AsyncPgConnection, schema_name: &str, diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index a69c4dcbac5..d764d451ad6 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1,6 +1,7 @@ use detail::DeploymentDetail; use diesel::sql_query; use diesel_async::{AsyncConnection as _, RunQueryDsl, SimpleAsyncConnection}; +use graph::util::backoff::ExponentialBackoff; use tokio::task::JoinHandle; use graph::anyhow::Context; @@ -42,7 +43,7 @@ use graph::internal_error; use graph::prelude::{ AttributeNames, BlockNumber, BlockPtr, CheapClone, DeploymentHash, DeploymentState, ENV_VARS, Entity, EntityQuery, Error, Logger, QueryExecutionError, StopwatchMetrics, StoreError, - UnfailOutcome, Value, anyhow, debug, info, o, warn, + UnfailOutcome, Value, anyhow, debug, error, info, o, warn, }; use graph::schema::{ApiSchema, EntityKey, EntityType, InputSchema}; @@ -1686,28 +1687,152 @@ impl DeploymentStore { /// this is a one-shot operation: indexes are created exactly once, /// and we never recreate them — even if an external process removes /// indexes that it considers unused. - pub(crate) async fn create_postponed_indexes(&self, site: Arc) -> Result<(), StoreError> { - let layout = self.find_layout(site.cheap_clone()).await?; - let creat = layout.index_creator(true, true); - let mut conn = self.pool.get_permitted().await?; + /// + /// The actual index creation runs in the background so that callers are + /// not blocked while the (potentially long-running) `CREATE INDEX + /// CONCURRENTLY` statements execute. Because of that, errors are only + /// logged and not returned to the caller. The `postponed_indexes_created` + /// flag is only set once all indexes have been created successfully, so a + /// failed run is retried the next time `graph-node` starts the + /// deployment. A run that was interrupted (for example by a restart while + /// a `CREATE INDEX CONCURRENTLY` was still in flight) can leave an invalid + /// index behind; such remnants are dropped and rebuilt on the next run. + pub(crate) fn create_postponed_indexes(&self, logger: &Logger, site: Arc) { + async fn index_creation_is_running( + store: &DeploymentStore, + site: &Site, + ) -> Result, StoreError> { + let mut conn = store.pool.get_permitted().await?; + catalog::index_creation_is_running(&mut conn, site.namespace.as_str()).await + } - if deployment::postponed_indexes_created(&mut conn, &site).await? { - return Ok(()); + async fn postponed_indexes_created( + store: &DeploymentStore, + site: &Site, + ) -> Result { + let mut conn = store.pool.get_permitted().await?; + deployment::postponed_indexes_created(&mut conn, site).await } - for table in layout.tables.values() { - let indexes = table.indexes(&layout.input_schema).map_err(|e| { - StoreError::ConstraintViolation(format!("failed to generate indexes: {}", e)) - })?; - for idx in indexes { - if idx.to_postpone() { - IndexCreator::execute(&creat, &mut conn, &idx).await?; + async fn wait_for_concurrent_index_creation( + logger: &Logger, + store: &DeploymentStore, + site: &Site, + ) -> Result<(), StoreError> { + let mut backoff = + ExponentialBackoff::new(Duration::from_secs(1), Duration::from_mins(5)); + let mut last_log = Instant::now() - Duration::from_mins(2); + while let Some((pid, index_name)) = index_creation_is_running(store, site).await? { + if last_log.elapsed() > Duration::from_mins(1) { + info!(logger, + "Waiting for concurrent index creation to finish"; + "pid" => pid, + "index_name" => index_name, + ); + last_log = Instant::now(); } + backoff.sleep_async().await; } + Ok(()) } - deployment::set_postponed_indexes_created(&mut conn, &site).await?; - Ok(()) + async fn create_index( + logger: &Logger, + store: &DeploymentStore, + layout: &Layout, + site: &Site, + idx: &CreateIndex, + ) -> Result<(), StoreError> { + let mut last_log = Instant::now() - Duration::from_mins(2); + // We (ab)use the fdw pool here since `create index` can take a + // very long time; with many subgraphs creating indexes, we + // might starve the main connection pool which would block the + // progress of many subgraphs. + let mut conn = store.pool.get_fdw(logger, move || { + if last_log.elapsed() > Duration::from_mins(1) { + debug!(logger, "Waiting for a connection to become available so we can create index"; "index_name" => idx.name().unwrap_or("")); + last_log = Instant::now(); + } + false + }).await?; + let schema_name = site.namespace.as_str(); + + // A previous run that was interrupted (e.g. by a node + // restart) can leave an invalid index behind. Since we + // create indexes with `if not exists`, such a leftover + // would be skipped and never rebuilt, so drop it first. + // `check_index_is_valid` returns `false` both when the + // index is missing and when it is invalid; the + // `drop index ... if exists` is a no-op in the former + // case and removes the invalid remnant in the latter. + if let Some(name) = idx.name() + && !catalog::check_index_is_valid(&mut conn, schema_name, name).await? + { + let drop_sql = format!("drop index concurrently if exists {schema_name}.{name}"); + sql_query(drop_sql).execute(&mut conn).await?; + } + + let creat = layout.index_creator(true, true); + IndexCreator::execute(&creat, &mut conn, idx).await + } + + async fn run( + logger: Logger, + store: DeploymentStore, + site: Arc, + ) -> Result<(), StoreError> { + let layout = store.find_layout(site.cheap_clone()).await?; + + // Since this entire run can take many hours, we avoid holding a + // connection for the whole time. Instead, we get a new + // connection for each index that we create. + + if postponed_indexes_created(&store, &site).await? { + return Ok(()); + } + + for table in layout.tables.values() { + let indexes = table.indexes(&layout.input_schema).map_err(|e| { + StoreError::ConstraintViolation(format!("failed to generate indexes: {}", e)) + })?; + for idx in indexes { + if !idx.to_postpone() { + continue; + } + + wait_for_concurrent_index_creation(&logger, &store, &site).await?; + + create_index(&logger, &store, &layout, &site, &idx).await?; + + debug!(logger, "Created index"; + "index_name" => idx.name().unwrap_or(""), + "table_name" => table.name.as_str(), + ); + } + } + + let mut conn = store.pool.get_permitted().await?; + deployment::set_postponed_indexes_created(&mut conn, &site).await?; + Ok(()) + } + + let store = self.cheap_clone(); + let logger = Logger::new(logger, o!("component" => "IndexCreation")); + graph::spawn(async move { + let logger2 = logger.cheap_clone(); + let res = retry::forever(&logger2, "create_postponed_indexes", || { + let store = store.cheap_clone(); + let site = site.cheap_clone(); + let logger = logger2.cheap_clone(); + async move { run(logger, store, site).await } + }) + .await; + match res { + Ok(()) => debug!(logger, "Created postponed indexes"), + Err(e) => error!(logger, "Failed to create postponed indexes"; + "error" => e.to_string()), + } + }); } // If the current block of the deployment is the same as the fatal error, diff --git a/store/postgres/src/relational/index.rs b/store/postgres/src/relational/index.rs index d28c78d7cf0..dc66a04f9a7 100644 --- a/store/postgres/src/relational/index.rs +++ b/store/postgres/src/relational/index.rs @@ -666,10 +666,10 @@ impl CreateIndex { } } - pub fn name(&self) -> Option { + pub fn name(&self) -> Option<&str> { match self { CreateIndex::Unknown { .. } => None, - CreateIndex::Parsed { name, .. } => Some(name.clone()), + CreateIndex::Parsed { name, .. } => Some(name.as_str()), } } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index a6a782308f3..633688aeb1a 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -495,8 +495,8 @@ impl SyncStore { async fn create_postponed_indexes(&self) -> Result<(), StoreError> { self.writable - .create_postponed_indexes(self.site.cheap_clone()) - .await + .create_postponed_indexes(&self.logger, self.site.cheap_clone()); + Ok(()) } fn input_schema(&self) -> InputSchema {