From ec4b36dc1b40032e73f2b0713f326299ae182c64 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Mon, 12 May 2025 15:52:53 +0530 Subject: [PATCH 01/19] graph: Add clone_for_deployment to FileLinkResolver to create FileLinkResolver with the right base dir for a subgraph --- graph/src/components/link_resolver/file.rs | 30 +++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/graph/src/components/link_resolver/file.rs b/graph/src/components/link_resolver/file.rs index 827a3f267e5..0e09f4e0475 100644 --- a/graph/src/components/link_resolver/file.rs +++ b/graph/src/components/link_resolver/file.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use slog::Logger; use crate::data::subgraph::Link; -use crate::prelude::{Error, JsonValueStream, LinkResolver as LinkResolverTrait}; +use crate::prelude::{DeploymentHash, Error, JsonValueStream, LinkResolver as LinkResolverTrait}; #[derive(Clone, Debug)] pub struct FileLinkResolver { @@ -45,6 +45,34 @@ impl FileLinkResolver { .as_ref() .map_or_else(|| path.to_owned(), |base_dir| base_dir.join(link)) } + + /// This method creates a new resolver that is scoped to a specific subgraph + /// It will set the base directory to the parent directory of the manifest path + fn clone_for_deployment(&self, deployment: DeploymentHash) -> Result { + let mut resolver = self.clone(); + let deployment_str = deployment.to_string(); + + // Create a path to the manifest based on the current resolver's + // base directory or default to using the deployment string as path + let manifest_path = match &resolver.base_dir { + Some(dir) => dir.join(&deployment_str), + None => PathBuf::from(deployment_str), + }; + + let canonical_manifest_path = manifest_path + .canonicalize() + .map_err(|e| Error::from(anyhow!("Failed to canonicalize manifest path: {}", e)))?; + + // The manifest path is the path of the subgraph manifest file in the build directory + // We use the parent directory as the base directory for the new resolver + let base_dir = canonical_manifest_path + .parent() + .ok_or_else(|| Error::from(anyhow!("Manifest path has no parent directory")))? + .to_path_buf(); + + resolver.base_dir = Some(base_dir); + Ok(resolver) + } } pub fn remove_prefix(link: &str) -> &str { From 4af0e6dc801eac020657945ddee592df4816eecf Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Sun, 11 May 2025 20:53:07 +0530 Subject: [PATCH 02/19] graph: Add for_deployment to LinkResolverTrait --- chain/substreams/src/data_source.rs | 9 ++++++++- graph/src/components/link_resolver/file.rs | 9 +++++++++ graph/src/components/link_resolver/ipfs.rs | 7 +++++++ graph/src/components/link_resolver/mod.rs | 9 ++++++++- store/test-store/tests/chain/ethereum/manifest.rs | 7 +++++++ 5 files changed, 39 insertions(+), 2 deletions(-) diff --git a/chain/substreams/src/data_source.rs b/chain/substreams/src/data_source.rs index dff2cfa31c4..65f0cffa584 100644 --- a/chain/substreams/src/data_source.rs +++ b/chain/substreams/src/data_source.rs @@ -331,7 +331,7 @@ mod test { blockchain::{DataSource as _, UnresolvedDataSource as _}, components::link_resolver::LinkResolver, data::subgraph::LATEST_VERSION, - prelude::{async_trait, serde_yaml, JsonValueStream, Link}, + prelude::{async_trait, serde_yaml, DeploymentHash, JsonValueStream, Link}, slog::{o, Discard, Logger}, substreams::{ module::{ @@ -705,6 +705,13 @@ mod test { unimplemented!() } + fn for_deployment( + &self, + _deployment: DeploymentHash, + ) -> Result, Error> { + unimplemented!() + } + async fn cat(&self, _logger: &Logger, _link: &Link) -> Result, Error> { Ok(gen_package().encode_to_vec()) } diff --git a/graph/src/components/link_resolver/file.rs b/graph/src/components/link_resolver/file.rs index 0e09f4e0475..18fb7e2c427 100644 --- a/graph/src/components/link_resolver/file.rs +++ b/graph/src/components/link_resolver/file.rs @@ -48,6 +48,8 @@ impl FileLinkResolver { /// This method creates a new resolver that is scoped to a specific subgraph /// It will set the base directory to the parent directory of the manifest path + /// This is required because paths mentioned in the subgraph manifest are relative paths + /// and we need a new resolver with the right base directory for the specific subgraph fn clone_for_deployment(&self, deployment: DeploymentHash) -> Result { let mut resolver = self.clone(); let deployment_str = deployment.to_string(); @@ -114,6 +116,13 @@ impl LinkResolverTrait for FileLinkResolver { } } + fn for_deployment( + &self, + deployment: DeploymentHash, + ) -> Result, Error> { + Ok(Box::new(self.clone_for_deployment(deployment)?)) + } + async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result, Error> { Err(anyhow!("get_block is not implemented for FileLinkResolver").into()) } diff --git a/graph/src/components/link_resolver/ipfs.rs b/graph/src/components/link_resolver/ipfs.rs index 9ecf4ff02e3..1897d781c3c 100644 --- a/graph/src/components/link_resolver/ipfs.rs +++ b/graph/src/components/link_resolver/ipfs.rs @@ -74,6 +74,13 @@ impl LinkResolverTrait for IpfsResolver { Box::new(s) } + fn for_deployment( + &self, + _deployment: DeploymentHash, + ) -> Result, Error> { + Ok(Box::new(self.cheap_clone())) + } + async fn cat(&self, logger: &Logger, link: &Link) -> Result, Error> { let path = ContentPath::new(&link.link)?; let timeout = self.timeout; diff --git a/graph/src/components/link_resolver/mod.rs b/graph/src/components/link_resolver/mod.rs index 851b4296b47..05728bbcc29 100644 --- a/graph/src/components/link_resolver/mod.rs +++ b/graph/src/components/link_resolver/mod.rs @@ -3,7 +3,7 @@ use std::time::Duration; use slog::Logger; use crate::data::subgraph::Link; -use crate::prelude::Error; +use crate::prelude::{DeploymentHash, Error}; use std::fmt::Debug; mod arweave; @@ -30,6 +30,13 @@ pub trait LinkResolver: Send + Sync + 'static + Debug { /// Fetches the IPLD block contents as bytes. async fn get_block(&self, logger: &Logger, link: &Link) -> Result, Error>; + /// Creates a new resolver that is scoped to a specific subgraph + /// This is used by FileLinkResolver to create a new resolver for a specific subgraph + /// For other resolvers, this method will simply return the current resolver + /// This is required because paths mentioned in the subgraph manifest are relative paths + /// and we need a new resolver with the right base directory for the specific subgraph + fn for_deployment(&self, deployment: DeploymentHash) -> Result, Error>; + /// Read the contents of `link` and deserialize them into a stream of JSON /// values. The values must each be on a single line; newlines are significant /// as they are used to split the file contents and each line is deserialized diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index f025be2e626..8b888c13da4 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -91,6 +91,13 @@ impl LinkResolverTrait for TextResolver { Box::new(self.clone()) } + fn for_deployment( + &self, + _deployment: DeploymentHash, + ) -> Result, anyhow::Error> { + Ok(Box::new(self.clone())) + } + async fn cat(&self, _logger: &Logger, link: &Link) -> Result, anyhow::Error> { self.texts .get(&link.link) From b1e4a0c5e3f251176ba10c68ee7ff03a433acc99 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Mon, 12 May 2025 15:56:20 +0530 Subject: [PATCH 03/19] core, graph: use for_deployment to get properly scoped resolver --- core/src/subgraph/instance_manager.rs | 7 ++++++- core/src/subgraph/provider.rs | 6 +++++- core/src/subgraph/registrar.rs | 20 +++++++++++++------- graph/src/data_source/subgraph.rs | 6 +++++- 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 8c2b76e5b6c..332eebb513d 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -287,7 +287,12 @@ impl SubgraphInstanceManager { let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?; // Allow for infinite retries for subgraph definition files. - let link_resolver = Arc::from(self.link_resolver.with_retries()); + let link_resolver = Arc::from( + self.link_resolver + .for_deployment(deployment.hash.clone()) + .map_err(SubgraphRegistrarError::Unknown)? + .with_retries(), + ); // Make sure the `raw_yaml` is present on both this subgraph and the graft base. self.subgraph_store diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index 00d379db01f..d566389fe27 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -86,8 +86,12 @@ impl SubgraphAssignmentProviderTrait for SubgraphAss )); } - let file_bytes = self + let link_resolver = self .link_resolver + .for_deployment(loc.hash.clone()) + .map_err(SubgraphAssignmentProviderError::ResolveError)?; + + let file_bytes = link_resolver .cat(&logger, &loc.hash.to_ipfs_link()) .await .map_err(SubgraphAssignmentProviderError::ResolveError)?; diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 6f7ae17425f..501ef9dea16 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -286,9 +286,14 @@ where .logger_factory .subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone())); + let resolver: Arc = Arc::from( + self.resolver + .for_deployment(hash.clone()) + .map_err(SubgraphRegistrarError::Unknown)?, + ); + let raw: serde_yaml::Mapping = { - let file_bytes = self - .resolver + let file_bytes = resolver .cat(&logger, &hash.to_ipfs_link()) .await .map_err(|e| { @@ -323,7 +328,7 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &resolver, history_blocks, ) .await? @@ -341,7 +346,7 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &resolver, history_blocks, ) .await? @@ -359,7 +364,7 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &resolver, history_blocks, ) .await? @@ -377,7 +382,7 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &resolver, history_blocks, ) .await? @@ -567,10 +572,11 @@ async fn create_subgraph_version( history_blocks_override: Option, ) -> Result { let raw_string = serde_yaml::to_string(&raw).unwrap(); + let unvalidated = UnvalidatedSubgraphManifest::::resolve( deployment.clone(), raw, - resolver, + &resolver, logger, ENV_VARS.max_spec_version.clone(), ) diff --git a/graph/src/data_source/subgraph.rs b/graph/src/data_source/subgraph.rs index 87b44e66174..3170754d499 100644 --- a/graph/src/data_source/subgraph.rs +++ b/graph/src/data_source/subgraph.rs @@ -259,6 +259,8 @@ impl UnresolvedDataSource { resolver: &Arc, logger: &Logger, ) -> Result>, Error> { + let resolver: Arc = + Arc::from(resolver.for_deployment(self.source.address.clone())?); let source_raw = resolver .cat(logger, &self.source.address.to_ipfs_link()) .await @@ -281,8 +283,10 @@ impl UnresolvedDataSource { self.source.address ))?; + let resolver: Arc = + Arc::from(resolver.for_deployment(self.source.address.clone())?); source_manifest - .resolve(resolver, logger, LATEST_VERSION.clone()) + .resolve(&resolver, logger, LATEST_VERSION.clone()) .await .context(format!( "Failed to resolve source subgraph [{}] manifest", From b7dcdca2620f2dac2c348c0f9ab0b10b9e0f28bf Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Mon, 12 May 2025 12:32:30 +0530 Subject: [PATCH 04/19] graph: Implement aliases for file link resolver --- graph/src/components/link_resolver/file.rs | 99 ++++++++++++++++++++-- node/src/bin/dev.rs | 7 +- 2 files changed, 98 insertions(+), 8 deletions(-) diff --git a/graph/src/components/link_resolver/file.rs b/graph/src/components/link_resolver/file.rs index 18fb7e2c427..4a347eb770f 100644 --- a/graph/src/components/link_resolver/file.rs +++ b/graph/src/components/link_resolver/file.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::time::Duration; @@ -12,16 +13,29 @@ use crate::prelude::{DeploymentHash, Error, JsonValueStream, LinkResolver as Lin pub struct FileLinkResolver { base_dir: Option, timeout: Duration, + // This is a hashmap that maps the alias name to the path of the file that is aliased + aliases: HashMap, +} + +impl Default for FileLinkResolver { + fn default() -> Self { + Self { + base_dir: None, + timeout: Duration::from_secs(30), + aliases: HashMap::new(), + } + } } impl FileLinkResolver { /// Create a new FileLinkResolver /// /// All paths are treated as absolute paths. - pub fn new() -> Self { + pub fn new(base_dir: Option, aliases: HashMap) -> Self { Self { - base_dir: None, + base_dir: base_dir, timeout: Duration::from_secs(30), + aliases, } } @@ -33,12 +47,18 @@ impl FileLinkResolver { Self { base_dir: Some(base_dir.as_ref().to_owned()), timeout: Duration::from_secs(30), + aliases: HashMap::new(), } } fn resolve_path(&self, link: &str) -> PathBuf { let path = Path::new(link); + // If the path is an alias, use the aliased path + if let Some(aliased) = self.aliases.get(link) { + return aliased.clone(); + } + // Return the path as is if base_dir is None, or join with base_dir if present. // if "link" is an absolute path, join will simply return that path. self.base_dir @@ -52,13 +72,19 @@ impl FileLinkResolver { /// and we need a new resolver with the right base directory for the specific subgraph fn clone_for_deployment(&self, deployment: DeploymentHash) -> Result { let mut resolver = self.clone(); + let deployment_str = deployment.to_string(); // Create a path to the manifest based on the current resolver's // base directory or default to using the deployment string as path - let manifest_path = match &resolver.base_dir { - Some(dir) => dir.join(&deployment_str), - None => PathBuf::from(deployment_str), + // If the deployment string is an alias, use the aliased path + let manifest_path = if let Some(aliased) = self.aliases.get(&deployment_str) { + aliased.clone() + } else { + match &resolver.base_dir { + Some(dir) => dir.join(&deployment_str), + None => PathBuf::from(deployment_str), + } }; let canonical_manifest_path = manifest_path @@ -154,7 +180,7 @@ mod tests { file.write_all(test_content).unwrap(); // Create a resolver without a base directory - let resolver = FileLinkResolver::new(); + let resolver = FileLinkResolver::default(); let logger = slog::Logger::root(slog::Discard, slog::o!()); // Test valid path resolution @@ -222,4 +248,65 @@ mod tests { let _ = fs::remove_file(test_file_path); let _ = fs::remove_dir(temp_dir); } + + #[tokio::test] + async fn test_file_resolver_with_aliases() { + // Create a temporary directory for test files + let temp_dir = env::temp_dir().join("file_resolver_test_aliases"); + let _ = fs::create_dir_all(&temp_dir); + + // Create two test files with different content + let test_file1_path = temp_dir.join("file.txt"); + let test_content1 = b"This is the file content"; + let mut file1 = fs::File::create(&test_file1_path).unwrap(); + file1.write_all(test_content1).unwrap(); + + let test_file2_path = temp_dir.join("another_file.txt"); + let test_content2 = b"This is another file content"; + let mut file2 = fs::File::create(&test_file2_path).unwrap(); + file2.write_all(test_content2).unwrap(); + + // Create aliases mapping + let mut aliases = HashMap::new(); + aliases.insert("alias1".to_string(), test_file1_path.clone()); + aliases.insert("alias2".to_string(), test_file2_path.clone()); + aliases.insert("deployment-id".to_string(), test_file1_path.clone()); + + // Create resolver with aliases + let resolver = FileLinkResolver::new(Some(temp_dir.clone()), aliases); + let logger = slog::Logger::root(slog::Discard, slog::o!()); + + // Test resolving by aliases + let link1 = Link { + link: "alias1".to_string(), + }; + let result1 = resolver.cat(&logger, &link1).await.unwrap(); + assert_eq!(result1, test_content1); + + let link2 = Link { + link: "alias2".to_string(), + }; + let result2 = resolver.cat(&logger, &link2).await.unwrap(); + assert_eq!(result2, test_content2); + + // Test that the alias works in for_deployment as well + let deployment = DeploymentHash::new("deployment-id").unwrap(); + let deployment_resolver = resolver.clone_for_deployment(deployment).unwrap(); + + let expected_dir = test_file1_path.parent().unwrap(); + let deployment_base_dir = deployment_resolver.base_dir.clone().unwrap(); + + let canonical_expected_dir = expected_dir.canonicalize().unwrap(); + let canonical_deployment_dir = deployment_base_dir.canonicalize().unwrap(); + + assert_eq!( + canonical_deployment_dir, canonical_expected_dir, + "Build directory paths don't match" + ); + + // Clean up + let _ = fs::remove_file(test_file1_path); + let _ = fs::remove_file(test_file2_path); + let _ = fs::remove_dir(temp_dir); + } } diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs index 1545c4bad4c..cd1e3c5390e 100644 --- a/node/src/bin/dev.rs +++ b/node/src/bin/dev.rs @@ -1,4 +1,4 @@ -use std::{path::Path, sync::Arc}; +use std::{collections::HashMap, path::Path, sync::Arc}; use anyhow::{Context, Result}; use clap::Parser; @@ -133,7 +133,10 @@ async fn main() -> Result<()> { let (tx, rx) = mpsc::channel(1); let opt = build_args(&dev_opt, &db.connection_uri(), &dev_opt.manifest)?; - let file_link_resolver = Arc::new(FileLinkResolver::with_base_dir(&build_dir)); + let file_link_resolver = Arc::new(FileLinkResolver::new( + None, + HashMap::new(), + )); let ctx = DevModeContext { watch: dev_opt.watch, From 4263ebcf95efbad935f9e12fb0d3b54b6ef568f0 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Mon, 12 May 2025 17:17:20 +0530 Subject: [PATCH 05/19] node: Make gnd work with multiple subgraphs --- node/src/bin/dev.rs | 86 +++++++---------- node/src/dev/helpers.rs | 35 ++++++- node/src/dev/watcher.rs | 201 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 250 insertions(+), 72 deletions(-) diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs index cd1e3c5390e..aaa970b5f8d 100644 --- a/node/src/bin/dev.rs +++ b/node/src/bin/dev.rs @@ -7,10 +7,14 @@ use graph::{ components::link_resolver::FileLinkResolver, env::EnvVars, log::logger, + slog::{error, info}, tokio::{self, sync::mpsc}, }; use graph_node::{ - dev::{helpers::DevModeContext, watcher::watch_subgraph_dir}, + dev::{ + helpers::DevModeContext, + watcher::{parse_manifest_args, watch_subgraphs}, + }, launcher, opt::Opt, }; @@ -38,10 +42,19 @@ pub struct DevOpt { #[clap( long, - help = "The location of the subgraph manifest file.", - default_value = "./build/subgraph.yaml" + value_name = "MANIFEST:[BUILD_DIR]", + help = "The location of the subgraph manifest file. If no build directory is provided, the default is 'build'. The file can be an alias, in the format '[BUILD_DIR:]manifest' where 'manifest' is the path to the manifest file, and 'BUILD_DIR' is the path to the build directory relative to the manifest file.", + default_value = "./build/subgraph.yaml", + value_delimiter = ',' )] - pub manifest: String, + pub manifests: Vec, + + #[clap( + long, + help = "The location of the database directory.", + default_value = "./build" + )] + pub database_dir: String, #[clap( long, @@ -63,7 +76,7 @@ pub struct DevOpt { } /// Builds the Graph Node options from DevOpt -fn build_args(dev_opt: &DevOpt, db_url: &str, manifest_path: &str) -> Result { +fn build_args(dev_opt: &DevOpt, db_url: &str) -> Result { let mut args = vec!["gnd".to_string()]; if !dev_opt.ipfs.is_empty() { @@ -76,16 +89,6 @@ fn build_args(dev_opt: &DevOpt, db_url: &str, manifest_path: &str) -> Result Result Result { - let manifest_path = Path::new(manifest_path_str); - - if !manifest_path.exists() { - anyhow::bail!("Subgraph manifest file not found at {}", manifest_path_str); - } - - let dir = manifest_path - .parent() - .context("Failed to get parent directory of manifest")?; - - dir.canonicalize() - .context("Failed to canonicalize build directory path") -} - async fn run_graph_node(opt: Opt, ctx: Option) -> Result<()> { let env_vars = Arc::new(EnvVars::from_env().context("Failed to load environment variables")?); @@ -122,21 +109,25 @@ async fn main() -> Result<()> { env_logger::init(); let dev_opt = DevOpt::parse(); - let build_dir = get_build_dir(&dev_opt.manifest)?; + let database_dir = Path::new(&dev_opt.database_dir); + + let logger = logger(true); + + info!(logger, "Starting Graph Node Dev"); + info!(logger, "Database directory: {}", database_dir.display()); let db = PgTempDBBuilder::new() - .with_data_dir_prefix(build_dir.clone()) + .with_data_dir_prefix(database_dir) .with_initdb_param("-E", "UTF8") .with_initdb_param("--locale", "C") .start_async() .await; let (tx, rx) = mpsc::channel(1); - let opt = build_args(&dev_opt, &db.connection_uri(), &dev_opt.manifest)?; - let file_link_resolver = Arc::new(FileLinkResolver::new( - None, - HashMap::new(), - )); + let opt = build_args(&dev_opt, &db.connection_uri())?; + + let manifests_paths = parse_manifest_args(dev_opt.manifests, &logger)?; + let file_link_resolver = Arc::new(FileLinkResolver::new(None, HashMap::new())); let ctx = DevModeContext { watch: dev_opt.watch, @@ -144,11 +135,6 @@ async fn main() -> Result<()> { updates_rx: rx, }; - let subgraph = opt.subgraph.clone().unwrap(); - - // Set up logger - let logger = logger(opt.debug); - // Run graph node graph::spawn(async move { let _ = run_graph_node(opt, Some(ctx)).await; @@ -156,14 +142,12 @@ async fn main() -> Result<()> { if dev_opt.watch { graph::spawn_blocking(async move { - watch_subgraph_dir( - &logger, - build_dir, - subgraph, - vec!["pgtemp-*".to_string()], - tx, - ) - .await; + let result = + watch_subgraphs(&logger, manifests_paths, vec!["pgtemp-*".to_string()], tx).await; + if let Err(e) = result { + error!(logger, "Error watching subgraphs"; "error" => e.to_string()); + std::process::exit(1); + } }); } diff --git a/node/src/dev/helpers.rs b/node/src/dev/helpers.rs index 45f7af9b75e..3158b8726b3 100644 --- a/node/src/dev/helpers.rs +++ b/node/src/dev/helpers.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; use graph::components::link_resolver::FileLinkResolver; use graph::prelude::{ BlockPtr, DeploymentHash, NodeId, SubgraphRegistrarError, SubgraphStore as SubgraphStoreTrait, @@ -118,9 +118,42 @@ pub async fn watch_subgraph_updates( "hash" => hash.to_string(), "error" => e.to_string() ); + std::process::exit(1); } } error!(logger, "Subgraph watcher terminated unexpectedly"; "action" => "exiting"); std::process::exit(1); } + +pub fn parse_alias(alias: &str) -> anyhow::Result<(String, String, Option)> { + let mut split = alias.split(':'); + let alias_name = split.next(); + let alias_value = split.next(); + + if alias_name.is_none() || alias_value.is_none() || split.next().is_some() { + return Err(anyhow::anyhow!( + "Invalid alias format: expected 'alias=[BUILD_DIR:]manifest', got '{}'", + alias + )); + } + + let alias_name = alias_name.unwrap().to_owned(); + let (build_dir, manifest) = parse_manifest_arg(alias_value.unwrap()) + .with_context(|| format!("While parsing alias '{}'", alias))?; + + Ok((alias_name, build_dir, manifest)) +} + +pub fn parse_manifest_arg(value: &str) -> anyhow::Result<(String, Option)> { + match value.split_once(':') { + Some((manifest, build_dir)) if !manifest.is_empty() => { + Ok((manifest.to_owned(), Some(build_dir.to_owned()))) + } + Some(_) => Err(anyhow::anyhow!( + "Invalid manifest arg: missing manifest in '{}'", + value + )), + None => Ok((value.to_owned(), None)), + } +} diff --git a/node/src/dev/watcher.rs b/node/src/dev/watcher.rs index 53fbd729bcd..09c965c704b 100644 --- a/node/src/dev/watcher.rs +++ b/node/src/dev/watcher.rs @@ -1,28 +1,136 @@ +use anyhow::{anyhow, Context, Result}; use globset::{Glob, GlobSet, GlobSetBuilder}; use graph::prelude::{DeploymentHash, SubgraphName}; -use graph::slog::{error, info, Logger}; +use graph::slog::{self, error, info, Logger}; use graph::tokio::sync::mpsc::Sender; use notify::{recommended_watcher, Event, RecursiveMode, Watcher}; use std::path::{Path, PathBuf}; use std::sync::mpsc; use std::time::Duration; +use super::helpers::parse_manifest_arg; + const WATCH_DELAY: Duration = Duration::from_secs(5); +const DEFAULT_BUILD_DIR: &str = "build"; + +// Parses manifest arguments and returns a vector of paths to the manifest files +pub fn parse_manifest_args(manifests: Vec, logger: &Logger) -> Result> { + let mut manifests_paths = Vec::new(); + + for manifest_str in manifests { + let (manifest_path_str, build_dir_opt) = parse_manifest_arg(&manifest_str) + .with_context(|| format!("While parsing manifest '{}'", manifest_str))?; + + let built_manifest_path = + process_manifest(build_dir_opt, &manifest_path_str, None, logger)?; + + manifests_paths.push(built_manifest_path); + } + + Ok(manifests_paths) +} + +/// Helper function to process a manifest +fn process_manifest( + build_dir_opt: Option, + manifest_path_str: &str, + alias_name: Option<&String>, + logger: &Logger, +) -> Result { + let build_dir_str = build_dir_opt.unwrap_or_else(|| DEFAULT_BUILD_DIR.to_owned()); + + info!(logger, "Validating manifest: {}", manifest_path_str); + + let manifest_path = Path::new(manifest_path_str); + let manifest_path = manifest_path + .canonicalize() + .with_context(|| format!("Manifest path does not exist: {}", manifest_path_str))?; + + // Get the parent directory of the manifest + let parent_dir = manifest_path + .parent() + .ok_or_else(|| { + anyhow!( + "Failed to get parent directory for manifest: {}", + manifest_path_str + ) + })? + .canonicalize() + .with_context(|| { + format!( + "Parent directory does not exist for manifest: {}", + manifest_path_str + ) + })?; + + // Create the build directory path by joining the parent directory with the build_dir_str + let build_dir = parent_dir.join(build_dir_str); + let build_dir = build_dir + .canonicalize() + .with_context(|| format!("Build directory does not exist: {}", build_dir.display()))?; + + let manifest_file_name = manifest_path.file_name().ok_or_else(|| { + anyhow!( + "Failed to get file name for manifest: {}", + manifest_path_str + ) + })?; + + let built_manifest_path = build_dir.join(manifest_file_name); + + info!( + logger, + "Watching manifest: {}", + built_manifest_path.display() + ); + + if let Some(name) = alias_name { + info!( + logger, + "Using build directory for {}: {}", + name, + build_dir.display() + ); + } else { + info!(logger, "Using build directory: {}", build_dir.display()); + } + + Ok(built_manifest_path) +} /// Sets up a watcher for the given directory with optional exclusions. /// Exclusions can include glob patterns like "pgtemp-*". -pub async fn watch_subgraph_dir( +pub async fn watch_subgraphs( logger: &Logger, - dir: PathBuf, - id: String, + manifests_paths: Vec, exclusions: Vec, sender: Sender<(DeploymentHash, SubgraphName)>, -) { +) -> Result<()> { + let logger = logger.new(slog::o!("component" => "Watcher")); + + watch_subgraph_dirs(&logger, manifests_paths, exclusions, sender).await?; + Ok(()) +} + +/// Sets up a watcher for the given directories with optional exclusions. +/// Exclusions can include glob patterns like "pgtemp-*". +pub async fn watch_subgraph_dirs( + logger: &Logger, + manifests_paths: Vec, + exclusions: Vec, + sender: Sender<(DeploymentHash, SubgraphName)>, +) -> Result<()> { + if manifests_paths.is_empty() { + info!(logger, "No directories to watch"); + return Ok(()); + } + info!( logger, - "Watching for changes in directory: {}", - dir.display() + "Watching for changes in {} directories", + manifests_paths.len() ); + if !exclusions.is_empty() { info!(logger, "Excluding patterns: {}", exclusions.join(", ")); } @@ -33,31 +141,54 @@ pub async fn watch_subgraph_dir( // Create a channel to receive the events let (tx, rx) = mpsc::channel(); - // Create a watcher object let mut watcher = match recommended_watcher(tx) { Ok(w) => w, Err(e) => { error!(logger, "Error creating file watcher: {}", e); - return; + return Err(anyhow!("Error creating file watcher")); } }; - if let Err(e) = watcher.watch(&dir, RecursiveMode::Recursive) { - error!(logger, "Error watching directory {}: {}", dir.display(), e); - return; + for manifest_path in manifests_paths.iter() { + let dir = manifest_path.parent().unwrap(); + if let Err(e) = watcher.watch(dir, RecursiveMode::Recursive) { + error!(logger, "Error watching directory {}: {}", dir.display(), e); + std::process::exit(1); + } + info!(logger, "Watching directory: {}", dir.display()); } - let watch_dir = dir.clone(); - let watch_exclusion_set = exclusion_set.clone(); + // Process file change events + process_file_events(logger, rx, &exclusion_set, &manifests_paths, sender).await +} +/// Processes file change events and triggers redeployments +async fn process_file_events( + logger: &Logger, + rx: mpsc::Receiver>, + exclusion_set: &GlobSet, + manifests_paths: &Vec, + sender: Sender<(DeploymentHash, SubgraphName)>, +) -> Result<()> { loop { - let first_event = match rx.recv() { - Ok(Ok(e)) if should_process_event(&e, &watch_dir, &watch_exclusion_set) => Some(e), + // Wait for an event + let event = match rx.recv() { + Ok(Ok(e)) => e, Ok(_) => continue, - Err(_) => break, + Err(_) => { + error!(logger, "Error receiving file change event"); + return Err(anyhow!("Error receiving file change event")); + } }; - if first_event.is_none() { + if !is_relevant_event( + &event, + manifests_paths + .iter() + .map(|p| p.parent().unwrap().to_path_buf()) + .collect(), + exclusion_set, + ) { continue; } @@ -73,13 +204,43 @@ pub async fn watch_subgraph_dir( } } + // Redeploy all subgraphs + redeploy_all_subgraphs(logger, manifests_paths, &sender).await?; + } +} + +/// Checks if an event is relevant for any of the watched directories +fn is_relevant_event(event: &Event, watched_dirs: Vec, exclusion_set: &GlobSet) -> bool { + for path in event.paths.iter() { + for dir in watched_dirs.iter() { + if path.starts_with(dir) && should_process_event(event, dir, exclusion_set) { + return true; + } + } + } + false +} + +/// Redeploys all subgraphs in the order defined by the BTreeMap +async fn redeploy_all_subgraphs( + logger: &Logger, + manifests_paths: &Vec, + sender: &Sender<(DeploymentHash, SubgraphName)>, +) -> Result<()> { + info!(logger, "File change detected, redeploying all subgraphs"); + let mut count = 0; + for manifest_path in manifests_paths { let _ = sender .send(( - DeploymentHash::new(id.clone()).unwrap(), - SubgraphName::new("test").unwrap(), + DeploymentHash::new(manifest_path.display().to_string()) + .map_err(|_| anyhow!("Failed to create deployment hash"))?, + SubgraphName::new(format!("subgraph-{}", count)) + .map_err(|_| anyhow!("Failed to create subgraph name"))?, )) .await; + count += 1; } + Ok(()) } /// Build a GlobSet from the provided patterns From 1616bf854f3137a08737024e38dad5950460c489 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Mon, 12 May 2025 17:41:13 +0530 Subject: [PATCH 06/19] node: Support subgraph datasource in gnd --- node/src/bin/dev.rs | 25 ++++++++++++++---- node/src/dev/helpers.rs | 6 +++-- node/src/dev/watcher.rs | 58 +++++++++++++++++++++++++++++++++++------ 3 files changed, 74 insertions(+), 15 deletions(-) diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs index aaa970b5f8d..4af260891f2 100644 --- a/node/src/bin/dev.rs +++ b/node/src/bin/dev.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, path::Path, sync::Arc}; +use std::{path::Path, sync::Arc}; use anyhow::{Context, Result}; use clap::Parser; @@ -49,6 +49,14 @@ pub struct DevOpt { )] pub manifests: Vec, + #[clap( + long, + value_name = "ALIAS:MANIFEST:[BUILD_DIR]", + value_delimiter = ',', + help = "The location of the source subgraph manifest files. This is used to resolve aliases in the manifest files for subgraph data sources. The format is ALIAS:MANIFEST:[BUILD_DIR], where ALIAS is the alias name, BUILD_DIR is the build directory relative to the manifest file, and MANIFEST is the manifest file location." + )] + pub sources: Vec, + #[clap( long, help = "The location of the database directory.", @@ -126,8 +134,9 @@ async fn main() -> Result<()> { let (tx, rx) = mpsc::channel(1); let opt = build_args(&dev_opt, &db.connection_uri())?; - let manifests_paths = parse_manifest_args(dev_opt.manifests, &logger)?; - let file_link_resolver = Arc::new(FileLinkResolver::new(None, HashMap::new())); + let (manifests_paths, source_subgraph_aliases) = + parse_manifest_args(dev_opt.manifests, dev_opt.sources, &logger)?; + let file_link_resolver = Arc::new(FileLinkResolver::new(None, source_subgraph_aliases.clone())); let ctx = DevModeContext { watch: dev_opt.watch, @@ -142,8 +151,14 @@ async fn main() -> Result<()> { if dev_opt.watch { graph::spawn_blocking(async move { - let result = - watch_subgraphs(&logger, manifests_paths, vec!["pgtemp-*".to_string()], tx).await; + let result = watch_subgraphs( + &logger, + manifests_paths, + source_subgraph_aliases, + vec!["pgtemp-*".to_string()], + tx, + ) + .await; if let Err(e) = result { error!(logger, "Error watching subgraphs"; "error" => e.to_string()); std::process::exit(1); diff --git a/node/src/dev/helpers.rs b/node/src/dev/helpers.rs index 3158b8726b3..620483edba1 100644 --- a/node/src/dev/helpers.rs +++ b/node/src/dev/helpers.rs @@ -126,6 +126,7 @@ pub async fn watch_subgraph_updates( std::process::exit(1); } +/// Parse an alias string into a tuple of (alias_name, manifest, Option) pub fn parse_alias(alias: &str) -> anyhow::Result<(String, String, Option)> { let mut split = alias.split(':'); let alias_name = split.next(); @@ -139,12 +140,13 @@ pub fn parse_alias(alias: &str) -> anyhow::Result<(String, String, Option) pub fn parse_manifest_arg(value: &str) -> anyhow::Result<(String, Option)> { match value.split_once(':') { Some((manifest, build_dir)) if !manifest.is_empty() => { diff --git a/node/src/dev/watcher.rs b/node/src/dev/watcher.rs index 09c965c704b..08acad403c0 100644 --- a/node/src/dev/watcher.rs +++ b/node/src/dev/watcher.rs @@ -4,18 +4,33 @@ use graph::prelude::{DeploymentHash, SubgraphName}; use graph::slog::{self, error, info, Logger}; use graph::tokio::sync::mpsc::Sender; use notify::{recommended_watcher, Event, RecursiveMode, Watcher}; +use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::mpsc; use std::time::Duration; -use super::helpers::parse_manifest_arg; +use super::helpers::{parse_alias, parse_manifest_arg}; const WATCH_DELAY: Duration = Duration::from_secs(5); const DEFAULT_BUILD_DIR: &str = "build"; // Parses manifest arguments and returns a vector of paths to the manifest files -pub fn parse_manifest_args(manifests: Vec, logger: &Logger) -> Result> { +pub fn parse_manifest_args( + manifests: Vec, + subgraph_sources: Vec, + logger: &Logger, +) -> Result<(Vec, HashMap)> { let mut manifests_paths = Vec::new(); + let mut source_subgraph_aliases = HashMap::new(); + + for subgraph_source in subgraph_sources { + let (alias_name, manifest_path_str, build_dir_opt) = parse_alias(&subgraph_source)?; + let manifest_path = + process_manifest(build_dir_opt, &manifest_path_str, Some(&alias_name), logger)?; + + manifests_paths.push(manifest_path.clone()); + source_subgraph_aliases.insert(alias_name, manifest_path); + } for manifest_str in manifests { let (manifest_path_str, build_dir_opt) = parse_manifest_arg(&manifest_str) @@ -27,7 +42,7 @@ pub fn parse_manifest_args(manifests: Vec, logger: &Logger) -> Result, + source_subgraph_aliases: HashMap, exclusions: Vec, sender: Sender<(DeploymentHash, SubgraphName)>, ) -> Result<()> { let logger = logger.new(slog::o!("component" => "Watcher")); - watch_subgraph_dirs(&logger, manifests_paths, exclusions, sender).await?; + watch_subgraph_dirs( + &logger, + manifests_paths, + source_subgraph_aliases, + exclusions, + sender, + ) + .await?; Ok(()) } @@ -117,6 +140,7 @@ pub async fn watch_subgraphs( pub async fn watch_subgraph_dirs( logger: &Logger, manifests_paths: Vec, + source_subgraph_aliases: HashMap, exclusions: Vec, sender: Sender<(DeploymentHash, SubgraphName)>, ) -> Result<()> { @@ -159,7 +183,15 @@ pub async fn watch_subgraph_dirs( } // Process file change events - process_file_events(logger, rx, &exclusion_set, &manifests_paths, sender).await + process_file_events( + logger, + rx, + &exclusion_set, + &manifests_paths, + &source_subgraph_aliases, + sender, + ) + .await } /// Processes file change events and triggers redeployments @@ -168,6 +200,7 @@ async fn process_file_events( rx: mpsc::Receiver>, exclusion_set: &GlobSet, manifests_paths: &Vec, + source_subgraph_aliases: &HashMap, sender: Sender<(DeploymentHash, SubgraphName)>, ) -> Result<()> { loop { @@ -205,7 +238,7 @@ async fn process_file_events( } // Redeploy all subgraphs - redeploy_all_subgraphs(logger, manifests_paths, &sender).await?; + redeploy_all_subgraphs(logger, manifests_paths, source_subgraph_aliases, &sender).await?; } } @@ -225,15 +258,24 @@ fn is_relevant_event(event: &Event, watched_dirs: Vec, exclusion_set: & async fn redeploy_all_subgraphs( logger: &Logger, manifests_paths: &Vec, + source_subgraph_aliases: &HashMap, sender: &Sender<(DeploymentHash, SubgraphName)>, ) -> Result<()> { info!(logger, "File change detected, redeploying all subgraphs"); let mut count = 0; for manifest_path in manifests_paths { + let alias_name = source_subgraph_aliases + .iter() + .find(|(_, path)| path == &manifest_path) + .map(|(name, _)| name); + + let id = alias_name + .map(|s| s.to_owned()) + .unwrap_or_else(|| manifest_path.display().to_string()); + let _ = sender .send(( - DeploymentHash::new(manifest_path.display().to_string()) - .map_err(|_| anyhow!("Failed to create deployment hash"))?, + DeploymentHash::new(id).map_err(|_| anyhow!("Failed to create deployment hash"))?, SubgraphName::new(format!("subgraph-{}", count)) .map_err(|_| anyhow!("Failed to create subgraph name"))?, )) From 39156862fd8e74968732d69fd7e63b6a4216ad2e Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Mon, 12 May 2025 18:21:54 +0530 Subject: [PATCH 07/19] node: correct the default value for manfiest --- node/src/bin/dev.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs index 4af260891f2..540319139b2 100644 --- a/node/src/bin/dev.rs +++ b/node/src/bin/dev.rs @@ -44,7 +44,7 @@ pub struct DevOpt { long, value_name = "MANIFEST:[BUILD_DIR]", help = "The location of the subgraph manifest file. If no build directory is provided, the default is 'build'. The file can be an alias, in the format '[BUILD_DIR:]manifest' where 'manifest' is the path to the manifest file, and 'BUILD_DIR' is the path to the build directory relative to the manifest file.", - default_value = "./build/subgraph.yaml", + default_value = "./subgraph.yaml", value_delimiter = ',' )] pub manifests: Vec, From 32b716396024956397fbda5f5c28d0436159ba00 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Mon, 12 May 2025 19:02:53 +0530 Subject: [PATCH 08/19] core, node, graph: Ignore graft base in dev mode --- core/src/subgraph/registrar.rs | 25 ++++++++++++++++++---- graph/src/components/subgraph/registrar.rs | 1 + node/src/dev/helpers.rs | 1 + node/src/launcher.rs | 1 + node/src/manager/commands/run.rs | 1 + server/json-rpc/src/lib.rs | 1 + tests/src/fixture/mod.rs | 1 + 7 files changed, 27 insertions(+), 4 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 501ef9dea16..c75ee542571 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -278,6 +278,7 @@ where start_block_override: Option, graft_block_override: Option, history_blocks: Option, + ignore_graft_base: bool, ) -> Result { // We don't have a location for the subgraph yet; that will be // assigned when we deploy for real. For logging purposes, make up a @@ -330,6 +331,7 @@ where self.version_switching_mode, &resolver, history_blocks, + ignore_graft_base, ) .await? } @@ -348,6 +350,7 @@ where self.version_switching_mode, &resolver, history_blocks, + ignore_graft_base, ) .await? } @@ -366,6 +369,7 @@ where self.version_switching_mode, &resolver, history_blocks, + ignore_graft_base, ) .await? } @@ -384,6 +388,7 @@ where self.version_switching_mode, &resolver, history_blocks, + ignore_graft_base, ) .await? } @@ -570,6 +575,7 @@ async fn create_subgraph_version( version_switching_mode: SubgraphVersionSwitchingMode, resolver: &Arc, history_blocks_override: Option, + ignore_graft_base: bool, ) -> Result { let raw_string = serde_yaml::to_string(&raw).unwrap(); @@ -591,10 +597,21 @@ async fn create_subgraph_version( Err(StoreError::DeploymentNotFound(_)) => true, Err(e) => return Err(SubgraphRegistrarError::StoreError(e)), }; - let manifest = unvalidated - .validate(store.cheap_clone(), should_validate) - .await - .map_err(SubgraphRegistrarError::ManifestValidationError)?; + + let manifest = { + let should_validate = should_validate && !ignore_graft_base; + + let mut manifest = unvalidated + .validate(store.cheap_clone(), should_validate) + .await + .map_err(SubgraphRegistrarError::ManifestValidationError)?; + + if ignore_graft_base { + manifest.graft = None; + } + + manifest + }; let network_name: Word = manifest.network_name().into(); diff --git a/graph/src/components/subgraph/registrar.rs b/graph/src/components/subgraph/registrar.rs index 691c341e38b..361a704e754 100644 --- a/graph/src/components/subgraph/registrar.rs +++ b/graph/src/components/subgraph/registrar.rs @@ -45,6 +45,7 @@ pub trait SubgraphRegistrar: Send + Sync + 'static { start_block_block: Option, graft_block_override: Option, history_blocks: Option, + ignore_graft_base: bool, ) -> Result; async fn remove_subgraph(&self, name: SubgraphName) -> Result<(), SubgraphRegistrarError>; diff --git a/node/src/dev/helpers.rs b/node/src/dev/helpers.rs index 620483edba1..6a2bc97da0d 100644 --- a/node/src/dev/helpers.rs +++ b/node/src/dev/helpers.rs @@ -55,6 +55,7 @@ async fn deploy_subgraph( start_block, None, None, + true ) .await .and_then(|locator| { diff --git a/node/src/launcher.rs b/node/src/launcher.rs index d82a5d0fcbf..34c653f4d68 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -253,6 +253,7 @@ fn deploy_subgraph_from_flag( start_block, None, None, + false, ) .await } diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 2c6bfdcb148..1892353c6a9 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -213,6 +213,7 @@ pub async fn run( None, None, None, + false, ) .await?; diff --git a/server/json-rpc/src/lib.rs b/server/json-rpc/src/lib.rs index 103d36f806c..970bb3959d3 100644 --- a/server/json-rpc/src/lib.rs +++ b/server/json-rpc/src/lib.rs @@ -133,6 +133,7 @@ impl ServerState { None, None, params.history_blocks, + false, ) .await { diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index b8151857db3..b9c07a41e7d 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -612,6 +612,7 @@ pub async fn setup_inner( None, graft_block, None, + false, ) .await .expect("failed to create subgraph version"); From 317b381c54700702021c7d5a84fac266832de44b Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Tue, 13 May 2025 13:55:54 +0530 Subject: [PATCH 09/19] node: Allow providing a postgres url for gnd --- node/src/bin/dev.rs | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs index 540319139b2..bae63e54495 100644 --- a/node/src/bin/dev.rs +++ b/node/src/bin/dev.rs @@ -1,4 +1,4 @@ -use std::{path::Path, sync::Arc}; +use std::{mem, path::Path, sync::Arc}; use anyhow::{Context, Result}; use clap::Parser; @@ -64,6 +64,14 @@ pub struct DevOpt { )] pub database_dir: String, + #[clap( + long, + value_name = "URL", + env = "POSTGRES_URL", + help = "Location of the Postgres database used for storing entities" + )] + pub postgres_url: Option, + #[clap( long, allow_negative_numbers = false, @@ -124,15 +132,23 @@ async fn main() -> Result<()> { info!(logger, "Starting Graph Node Dev"); info!(logger, "Database directory: {}", database_dir.display()); - let db = PgTempDBBuilder::new() - .with_data_dir_prefix(database_dir) - .with_initdb_param("-E", "UTF8") - .with_initdb_param("--locale", "C") - .start_async() - .await; + // Create the database or use the provided URL + let db_url = if let Some(url) = dev_opt.postgres_url.as_ref() { + url.clone() + } else { + let db = PgTempDBBuilder::new() + .with_data_dir_prefix(database_dir) + .with_initdb_param("-E", "UTF8") + .with_initdb_param("--locale", "C") + .start(); + let url = db.connection_uri().to_string(); + // Prevent the database from being dropped by forgetting it + mem::forget(db); + url + }; let (tx, rx) = mpsc::channel(1); - let opt = build_args(&dev_opt, &db.connection_uri())?; + let opt = build_args(&dev_opt, &db_url)?; let (manifests_paths, source_subgraph_aliases) = parse_manifest_args(dev_opt.manifests, dev_opt.sources, &logger)?; From fdedb235ef4aa33654539eec62816ffa23eb286e Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Tue, 13 May 2025 14:03:16 +0530 Subject: [PATCH 10/19] node: Do not use pgtemp in windows --- node/Cargo.toml | 4 +++- node/src/bin/dev.rs | 45 +++++++++++++++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/node/Cargo.toml b/node/Cargo.toml index cb3039c4a34..16393c36af5 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -44,6 +44,8 @@ termcolor = "1.4.1" diesel = { workspace = true } prometheus = { version = "0.13.4", features = ["push"] } json-structural-diff = { version = "0.2", features = ["colorize"] } -pgtemp = { git = "https://github.com/incrypto32/pgtemp", branch = "initdb-args" } globset = "0.4.16" notify = "8.0.0" + +[target.'cfg(unix)'.dependencies] +pgtemp = { git = "https://github.com/incrypto32/pgtemp", branch = "initdb-args" } diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs index bae63e54495..b0ba4b6108d 100644 --- a/node/src/bin/dev.rs +++ b/node/src/bin/dev.rs @@ -19,6 +19,8 @@ use graph_node::{ opt::Opt, }; use lazy_static::lazy_static; + +#[cfg(unix)] use pgtemp::PgTempDBBuilder; git_testament!(TESTAMENT); @@ -120,6 +122,33 @@ async fn run_graph_node(opt: Opt, ctx: Option) -> Result<()> { Ok(()) } +/// Get the database URL, either from the provided option or by creating a temporary database +fn get_database_url(postgres_url: Option<&String>, database_dir: &Path) -> Result { + if let Some(url) = postgres_url { + Ok(url.clone()) + } else { + #[cfg(unix)] + { + let db = PgTempDBBuilder::new() + .with_data_dir_prefix(database_dir) + .with_initdb_param("-E", "UTF8") + .with_initdb_param("--locale", "C") + .start(); + let url = db.connection_uri().to_string(); + // Prevent the database from being dropped by forgetting it + mem::forget(db); + Ok(url) + } + + #[cfg(not(unix))] + { + anyhow::bail!( + "Please provide a postgres_url manually using the --postgres-url option." + ); + } + } +} + #[tokio::main] async fn main() -> Result<()> { env_logger::init(); @@ -132,20 +161,8 @@ async fn main() -> Result<()> { info!(logger, "Starting Graph Node Dev"); info!(logger, "Database directory: {}", database_dir.display()); - // Create the database or use the provided URL - let db_url = if let Some(url) = dev_opt.postgres_url.as_ref() { - url.clone() - } else { - let db = PgTempDBBuilder::new() - .with_data_dir_prefix(database_dir) - .with_initdb_param("-E", "UTF8") - .with_initdb_param("--locale", "C") - .start(); - let url = db.connection_uri().to_string(); - // Prevent the database from being dropped by forgetting it - mem::forget(db); - url - }; + // Get the database URL + let db_url = get_database_url(dev_opt.postgres_url.as_ref(), database_dir)?; let (tx, rx) = mpsc::channel(1); let opt = build_args(&dev_opt, &db_url)?; From 9a90dff839ef83188e098ad8eb760848516cade1 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Tue, 13 May 2025 17:30:50 +0530 Subject: [PATCH 11/19] store: enable `vendored` feature for openssl crate --- Cargo.lock | 10 ++++++++++ store/postgres/Cargo.toml | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 86e11298c50..51ae390703d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3635,6 +3635,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-src" +version = "300.5.0+3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8ce546f549326b0e6052b649198487d91320875da901e7bd11a06d1ee3f9c2f" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.107" @@ -3643,6 +3652,7 @@ checksum = "8288979acd84749c744a9014b4382d42b8f7b2592847b5afb2ed29e5d16ede07" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] diff --git a/store/postgres/Cargo.toml b/store/postgres/Cargo.toml index 027a46414d9..82a12d823c2 100644 --- a/store/postgres/Cargo.toml +++ b/store/postgres/Cargo.toml @@ -21,7 +21,7 @@ lazy_static = "1.5" lru_time_cache = "0.11" maybe-owned = "0.3.4" postgres = "0.19.1" -openssl = "0.10.72" +openssl = { version = "0.10.72", features = ["vendored"] } postgres-openssl = "0.5.1" rand.workspace = true serde = { workspace = true } From 30122243cbbc28db96a65009ea3c9bac7b08d38f Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Tue, 13 May 2025 17:32:29 +0530 Subject: [PATCH 12/19] chain/ethereum: Return error when ipc is used in non unix platform --- chain/ethereum/src/transport.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/chain/ethereum/src/transport.rs b/chain/ethereum/src/transport.rs index a90a6b9720b..ef571efacb8 100644 --- a/chain/ethereum/src/transport.rs +++ b/chain/ethereum/src/transport.rs @@ -32,6 +32,11 @@ impl Transport { .expect("Failed to connect to Ethereum IPC") } + #[cfg(not(unix))] + pub async fn new_ipc(_ipc: &str) -> Self { + panic!("IPC connections are not supported on non-Unix platforms") + } + /// Creates a WebSocket transport. pub async fn new_ws(ws: &str) -> Self { ws::WebSocket::new(ws) From 72e0da4ac3c84c95dceb87a5c24fb4a34d17c63f Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Fri, 16 May 2025 13:48:56 +0530 Subject: [PATCH 13/19] node: Refactor launcher --- node/src/bin/dev.rs | 56 ++++++++++++++++++++++----------- node/src/dev/helpers.rs | 2 -- node/src/launcher.rs | 70 ++++++++++++++++++----------------------- node/src/main.rs | 21 +++++++++++-- 4 files changed, 87 insertions(+), 62 deletions(-) diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs index b0ba4b6108d..e841c820b1e 100644 --- a/node/src/bin/dev.rs +++ b/node/src/bin/dev.rs @@ -7,14 +7,13 @@ use graph::{ components::link_resolver::FileLinkResolver, env::EnvVars, log::logger, - slog::{error, info}, + prelude::{CheapClone, DeploymentHash, LinkResolver, SubgraphName}, + slog::{error, info, Logger}, tokio::{self, sync::mpsc}, }; +use graph_core::polling_monitor::ipfs_service; use graph_node::{ - dev::{ - helpers::DevModeContext, - watcher::{parse_manifest_args, watch_subgraphs}, - }, + dev::watcher::{parse_manifest_args, watch_subgraphs}, launcher, opt::Opt, }; @@ -115,10 +114,34 @@ fn build_args(dev_opt: &DevOpt, db_url: &str) -> Result { Ok(opt) } -async fn run_graph_node(opt: Opt, ctx: Option) -> Result<()> { +async fn run_graph_node( + logger: &Logger, + opt: Opt, + link_resolver: Arc, + subgraph_updates_channel: Option>, +) -> Result<()> { let env_vars = Arc::new(EnvVars::from_env().context("Failed to load environment variables")?); - launcher::run(opt, env_vars, ctx).await; + let ipfs_client = graph::ipfs::new_ipfs_client(&opt.ipfs, &logger) + .await + .unwrap_or_else(|err| panic!("Failed to create IPFS client: {err:#}")); + + let ipfs_service = ipfs_service( + ipfs_client.cheap_clone(), + env_vars.mappings.max_ipfs_file_bytes, + env_vars.mappings.ipfs_timeout, + env_vars.mappings.ipfs_request_limit, + ); + + launcher::run( + logger.clone(), + opt, + env_vars, + ipfs_service, + link_resolver, + subgraph_updates_channel, + ) + .await; Ok(()) } @@ -164,35 +187,30 @@ async fn main() -> Result<()> { // Get the database URL let db_url = get_database_url(dev_opt.postgres_url.as_ref(), database_dir)?; - let (tx, rx) = mpsc::channel(1); let opt = build_args(&dev_opt, &db_url)?; let (manifests_paths, source_subgraph_aliases) = parse_manifest_args(dev_opt.manifests, dev_opt.sources, &logger)?; let file_link_resolver = Arc::new(FileLinkResolver::new(None, source_subgraph_aliases.clone())); - let ctx = DevModeContext { - watch: dev_opt.watch, - file_link_resolver, - updates_rx: rx, - }; + let (tx, rx) = dev_opt.watch.then(|| mpsc::channel(1)).unzip(); - // Run graph node + let logger_clone = logger.clone(); graph::spawn(async move { - let _ = run_graph_node(opt, Some(ctx)).await; + let _ = run_graph_node(&logger_clone, opt, file_link_resolver, rx).await; }); - if dev_opt.watch { + if let Some(tx) = tx { graph::spawn_blocking(async move { - let result = watch_subgraphs( + if let Err(e) = watch_subgraphs( &logger, manifests_paths, source_subgraph_aliases, vec!["pgtemp-*".to_string()], tx, ) - .await; - if let Err(e) = result { + .await + { error!(logger, "Error watching subgraphs"; "error" => e.to_string()); std::process::exit(1); } diff --git a/node/src/dev/helpers.rs b/node/src/dev/helpers.rs index 6a2bc97da0d..5aff80eabd0 100644 --- a/node/src/dev/helpers.rs +++ b/node/src/dev/helpers.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use anyhow::{Context, Result}; -use graph::components::link_resolver::FileLinkResolver; use graph::prelude::{ BlockPtr, DeploymentHash, NodeId, SubgraphRegistrarError, SubgraphStore as SubgraphStoreTrait, }; @@ -15,7 +14,6 @@ use graph_store_postgres::SubgraphStore; pub struct DevModeContext { pub watch: bool, - pub file_link_resolver: Arc, pub updates_rx: Receiver<(DeploymentHash, SubgraphName)>, } diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 34c653f4d68..8f7ee385135 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -6,7 +6,7 @@ use graph::futures03::compat::Future01CompatExt; use graph::futures03::future::TryFutureExt; use crate::config::Config; -use crate::dev::helpers::{watch_subgraph_updates, DevModeContext}; +use crate::dev::helpers::watch_subgraph_updates; use crate::network_setup::Networks; use crate::opt::Opt; use crate::store_builder::StoreBuilder; @@ -16,11 +16,10 @@ use graph::components::subgraph::Settings; use graph::data::graphql::load_manager::LoadManager; use graph::endpoint::EndpointMetrics; use graph::env::EnvVars; -use graph::log::logger; use graph::prelude::*; use graph::prometheus::Registry; use graph::url::Url; -use graph_core::polling_monitor::{arweave_service, ipfs_service, ArweaveService, IpfsService}; +use graph_core::polling_monitor::{arweave_service, ArweaveService, IpfsService}; use graph_core::{ SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar as IpfsSubgraphRegistrar, @@ -349,10 +348,24 @@ fn build_graphql_server( graphql_server } -pub async fn run(opt: Opt, env_vars: Arc, dev_ctx: Option) { - // Set up logger - let logger = logger(opt.debug); - +/// Runs the Graph Node by initializing all components and starting all required services +/// This function is the main entry point for running a Graph Node instance +/// +/// # Arguments +/// +/// * `opt` - Command line options controlling node behavior and configuration +/// * `env_vars` - Environment variables for configuring the node +/// * `ipfs_service` - Service for interacting with IPFS for subgraph deployments +/// * `link_resolver` - Resolver for IPFS links in subgraph manifests and files +/// * `dev_updates` - Optional channel for receiving subgraph update notifications in development mode +pub async fn run( + logger: Logger, + opt: Opt, + env_vars: Arc, + ipfs_service: IpfsService, + link_resolver: Arc, + dev_updates: Option>, +) { // Log version information info!( logger, @@ -408,17 +421,6 @@ pub async fn run(opt: Opt, env_vars: Arc, dev_ctx: Option, dev_ctx: Option = if let Some(dev_ctx) = &dev_ctx { - dev_ctx.file_link_resolver.clone() - } else { - Arc::new(IpfsResolver::new(ipfs_client, env_vars.cheap_clone())) - }; - let metrics_server = PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone()); let endpoint_metrics = Arc::new(EndpointMetrics::new( @@ -577,19 +571,17 @@ pub async fn run(opt: Opt, env_vars: Arc, dev_ctx: Option Date: Fri, 16 May 2025 13:51:12 +0530 Subject: [PATCH 14/19] node/dev : Better error message when database directory doesn't exist --- node/src/bin/dev.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs index e841c820b1e..27020ee3eb3 100644 --- a/node/src/bin/dev.rs +++ b/node/src/bin/dev.rs @@ -152,6 +152,14 @@ fn get_database_url(postgres_url: Option<&String>, database_dir: &Path) -> Resul } else { #[cfg(unix)] { + // Check the database directory exists + if !database_dir.exists() { + anyhow::bail!( + "Database directory does not exist: {}", + database_dir.display() + ); + } + let db = PgTempDBBuilder::new() .with_data_dir_prefix(database_dir) .with_initdb_param("-E", "UTF8") From 747f7a92db90e6c6e7a4197755403000e7cdef86 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Fri, 16 May 2025 14:00:29 +0530 Subject: [PATCH 15/19] node: refactor watcher --- node/src/dev/watcher.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/node/src/dev/watcher.rs b/node/src/dev/watcher.rs index 08acad403c0..a4625544f18 100644 --- a/node/src/dev/watcher.rs +++ b/node/src/dev/watcher.rs @@ -309,11 +309,6 @@ fn build_glob_set(patterns: &[String], logger: &Logger) -> GlobSet { /// Determines if an event should be processed based on exclusion patterns fn should_process_event(event: &Event, base_dir: &Path, exclusion_set: &GlobSet) -> bool { - // If no exclusions, process all events - if exclusion_set.is_empty() { - return true; - } - // Check each path in the event for path in event.paths.iter() { // Get the relative path from the base directory From 2eccf0629fbd8d1df5cf86b5d87d9c5fab79899e Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Thu, 22 May 2025 15:40:57 +0530 Subject: [PATCH 16/19] core, node, graph: Manipulate raw manifest instead of passing ignore_graft_base This reverts commit b5bbf9378ca8f513ba0fedf0c89cca47cd15df81. --- core/src/subgraph/registrar.rs | 57 +++++++++++++++------------------- node/src/dev/helpers.rs | 2 +- 2 files changed, 26 insertions(+), 33 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index c75ee542571..9cda70572ba 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -293,18 +293,27 @@ where .map_err(SubgraphRegistrarError::Unknown)?, ); - let raw: serde_yaml::Mapping = { - let file_bytes = resolver - .cat(&logger, &hash.to_ipfs_link()) - .await - .map_err(|e| { - SubgraphRegistrarError::ResolveError( - SubgraphManifestResolveError::ResolveError(e), - ) - })?; - - serde_yaml::from_slice(&file_bytes) - .map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))? + let raw = { + let mut raw: serde_yaml::Mapping = { + let file_bytes = + resolver + .cat(&logger, &hash.to_ipfs_link()) + .await + .map_err(|e| { + SubgraphRegistrarError::ResolveError( + SubgraphManifestResolveError::ResolveError(e), + ) + })?; + + serde_yaml::from_slice(&file_bytes) + .map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))? + }; + + if ignore_graft_base { + raw.remove("graft"); + } + + raw }; let kind = BlockchainKind::from_manifest(&raw).map_err(|e| { @@ -331,7 +340,6 @@ where self.version_switching_mode, &resolver, history_blocks, - ignore_graft_base, ) .await? } @@ -350,7 +358,6 @@ where self.version_switching_mode, &resolver, history_blocks, - ignore_graft_base, ) .await? } @@ -369,7 +376,6 @@ where self.version_switching_mode, &resolver, history_blocks, - ignore_graft_base, ) .await? } @@ -388,7 +394,6 @@ where self.version_switching_mode, &resolver, history_blocks, - ignore_graft_base, ) .await? } @@ -575,7 +580,6 @@ async fn create_subgraph_version( version_switching_mode: SubgraphVersionSwitchingMode, resolver: &Arc, history_blocks_override: Option, - ignore_graft_base: bool, ) -> Result { let raw_string = serde_yaml::to_string(&raw).unwrap(); @@ -597,21 +601,10 @@ async fn create_subgraph_version( Err(StoreError::DeploymentNotFound(_)) => true, Err(e) => return Err(SubgraphRegistrarError::StoreError(e)), }; - - let manifest = { - let should_validate = should_validate && !ignore_graft_base; - - let mut manifest = unvalidated - .validate(store.cheap_clone(), should_validate) - .await - .map_err(SubgraphRegistrarError::ManifestValidationError)?; - - if ignore_graft_base { - manifest.graft = None; - } - - manifest - }; + let manifest = unvalidated + .validate(store.cheap_clone(), should_validate) + .await + .map_err(SubgraphRegistrarError::ManifestValidationError)?; let network_name: Word = manifest.network_name().into(); diff --git a/node/src/dev/helpers.rs b/node/src/dev/helpers.rs index 5aff80eabd0..19af9d23382 100644 --- a/node/src/dev/helpers.rs +++ b/node/src/dev/helpers.rs @@ -53,7 +53,7 @@ async fn deploy_subgraph( start_block, None, None, - true + true, ) .await .and_then(|locator| { From d99396fd7df972b8652764e9738a385631322554 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Thu, 22 May 2025 16:09:14 +0530 Subject: [PATCH 17/19] node: Correct comments on `redeploy_all_subgraphs` --- node/src/dev/watcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/dev/watcher.rs b/node/src/dev/watcher.rs index a4625544f18..c73d4f16b4d 100644 --- a/node/src/dev/watcher.rs +++ b/node/src/dev/watcher.rs @@ -254,7 +254,7 @@ fn is_relevant_event(event: &Event, watched_dirs: Vec, exclusion_set: & false } -/// Redeploys all subgraphs in the order defined by the BTreeMap +/// Redeploys all subgraphs in the order it appears in the manifests_paths async fn redeploy_all_subgraphs( logger: &Logger, manifests_paths: &Vec, From 80e71393660e3de59d82f6b29b74a3c800a9cb1d Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Tue, 27 May 2025 11:19:35 +0530 Subject: [PATCH 18/19] node/gnd: Deploy all subgraphs first before wathcing files --- node/src/dev/watcher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/src/dev/watcher.rs b/node/src/dev/watcher.rs index c73d4f16b4d..9436db9ac2c 100644 --- a/node/src/dev/watcher.rs +++ b/node/src/dev/watcher.rs @@ -238,7 +238,7 @@ async fn process_file_events( } // Redeploy all subgraphs - redeploy_all_subgraphs(logger, manifests_paths, source_subgraph_aliases, &sender).await?; + deploy_all_subgraphs(logger, manifests_paths, source_subgraph_aliases, &sender).await?; } } @@ -255,7 +255,7 @@ fn is_relevant_event(event: &Event, watched_dirs: Vec, exclusion_set: & } /// Redeploys all subgraphs in the order it appears in the manifests_paths -async fn redeploy_all_subgraphs( +async fn deploy_all_subgraphs( logger: &Logger, manifests_paths: &Vec, source_subgraph_aliases: &HashMap, From b349d604bda5f2e003a17ef82216590385b7e030 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Tue, 27 May 2025 20:21:18 +0530 Subject: [PATCH 19/19] core, graph : Refactor LinkResolver trait --- chain/substreams/src/data_source.rs | 7 ++---- core/src/subgraph/instance_manager.rs | 2 +- core/src/subgraph/provider.rs | 2 +- core/src/subgraph/registrar.rs | 2 +- graph/src/components/link_resolver/file.rs | 23 ++++++++----------- graph/src/components/link_resolver/ipfs.rs | 5 +--- graph/src/components/link_resolver/mod.rs | 21 +++++++++++------ graph/src/data_source/subgraph.rs | 4 ++-- .../tests/chain/ethereum/manifest.rs | 4 ++-- 9 files changed, 33 insertions(+), 37 deletions(-) diff --git a/chain/substreams/src/data_source.rs b/chain/substreams/src/data_source.rs index 65f0cffa584..3969f83e373 100644 --- a/chain/substreams/src/data_source.rs +++ b/chain/substreams/src/data_source.rs @@ -331,7 +331,7 @@ mod test { blockchain::{DataSource as _, UnresolvedDataSource as _}, components::link_resolver::LinkResolver, data::subgraph::LATEST_VERSION, - prelude::{async_trait, serde_yaml, DeploymentHash, JsonValueStream, Link}, + prelude::{async_trait, serde_yaml, JsonValueStream, Link}, slog::{o, Discard, Logger}, substreams::{ module::{ @@ -705,10 +705,7 @@ mod test { unimplemented!() } - fn for_deployment( - &self, - _deployment: DeploymentHash, - ) -> Result, Error> { + fn for_manifest(&self, _manifest_path: &str) -> Result, Error> { unimplemented!() } diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 332eebb513d..f6e1e7ffbe1 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -289,7 +289,7 @@ impl SubgraphInstanceManager { // Allow for infinite retries for subgraph definition files. let link_resolver = Arc::from( self.link_resolver - .for_deployment(deployment.hash.clone()) + .for_manifest(&deployment.hash.to_string()) .map_err(SubgraphRegistrarError::Unknown)? .with_retries(), ); diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index d566389fe27..9ad50f43942 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -88,7 +88,7 @@ impl SubgraphAssignmentProviderTrait for SubgraphAss let link_resolver = self .link_resolver - .for_deployment(loc.hash.clone()) + .for_manifest(&loc.hash.to_string()) .map_err(SubgraphAssignmentProviderError::ResolveError)?; let file_bytes = link_resolver diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 9cda70572ba..fa0c31390e0 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -289,7 +289,7 @@ where let resolver: Arc = Arc::from( self.resolver - .for_deployment(hash.clone()) + .for_manifest(&hash.to_string()) .map_err(SubgraphRegistrarError::Unknown)?, ); diff --git a/graph/src/components/link_resolver/file.rs b/graph/src/components/link_resolver/file.rs index 4a347eb770f..3d78bb9244d 100644 --- a/graph/src/components/link_resolver/file.rs +++ b/graph/src/components/link_resolver/file.rs @@ -7,7 +7,7 @@ use async_trait::async_trait; use slog::Logger; use crate::data::subgraph::Link; -use crate::prelude::{DeploymentHash, Error, JsonValueStream, LinkResolver as LinkResolverTrait}; +use crate::prelude::{Error, JsonValueStream, LinkResolver as LinkResolverTrait}; #[derive(Clone, Debug)] pub struct FileLinkResolver { @@ -70,20 +70,19 @@ impl FileLinkResolver { /// It will set the base directory to the parent directory of the manifest path /// This is required because paths mentioned in the subgraph manifest are relative paths /// and we need a new resolver with the right base directory for the specific subgraph - fn clone_for_deployment(&self, deployment: DeploymentHash) -> Result { + fn clone_for_manifest(&self, manifest_path_str: &str) -> Result { let mut resolver = self.clone(); - let deployment_str = deployment.to_string(); - // Create a path to the manifest based on the current resolver's // base directory or default to using the deployment string as path // If the deployment string is an alias, use the aliased path - let manifest_path = if let Some(aliased) = self.aliases.get(&deployment_str) { + let manifest_path = if let Some(aliased) = self.aliases.get(&manifest_path_str.to_string()) + { aliased.clone() } else { match &resolver.base_dir { - Some(dir) => dir.join(&deployment_str), - None => PathBuf::from(deployment_str), + Some(dir) => dir.join(&manifest_path_str), + None => PathBuf::from(manifest_path_str), } }; @@ -142,11 +141,8 @@ impl LinkResolverTrait for FileLinkResolver { } } - fn for_deployment( - &self, - deployment: DeploymentHash, - ) -> Result, Error> { - Ok(Box::new(self.clone_for_deployment(deployment)?)) + fn for_manifest(&self, manifest_path: &str) -> Result, Error> { + Ok(Box::new(self.clone_for_manifest(manifest_path)?)) } async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result, Error> { @@ -290,8 +286,7 @@ mod tests { assert_eq!(result2, test_content2); // Test that the alias works in for_deployment as well - let deployment = DeploymentHash::new("deployment-id").unwrap(); - let deployment_resolver = resolver.clone_for_deployment(deployment).unwrap(); + let deployment_resolver = resolver.clone_for_manifest("deployment-id").unwrap(); let expected_dir = test_file1_path.parent().unwrap(); let deployment_base_dir = deployment_resolver.base_dir.clone().unwrap(); diff --git a/graph/src/components/link_resolver/ipfs.rs b/graph/src/components/link_resolver/ipfs.rs index 1897d781c3c..d81944ab70f 100644 --- a/graph/src/components/link_resolver/ipfs.rs +++ b/graph/src/components/link_resolver/ipfs.rs @@ -74,10 +74,7 @@ impl LinkResolverTrait for IpfsResolver { Box::new(s) } - fn for_deployment( - &self, - _deployment: DeploymentHash, - ) -> Result, Error> { + fn for_manifest(&self, _manifest_path: &str) -> Result, Error> { Ok(Box::new(self.cheap_clone())) } diff --git a/graph/src/components/link_resolver/mod.rs b/graph/src/components/link_resolver/mod.rs index 05728bbcc29..4788a9bd51f 100644 --- a/graph/src/components/link_resolver/mod.rs +++ b/graph/src/components/link_resolver/mod.rs @@ -3,7 +3,7 @@ use std::time::Duration; use slog::Logger; use crate::data::subgraph::Link; -use crate::prelude::{DeploymentHash, Error}; +use crate::prelude::Error; use std::fmt::Debug; mod arweave; @@ -30,12 +30,19 @@ pub trait LinkResolver: Send + Sync + 'static + Debug { /// Fetches the IPLD block contents as bytes. async fn get_block(&self, logger: &Logger, link: &Link) -> Result, Error>; - /// Creates a new resolver that is scoped to a specific subgraph - /// This is used by FileLinkResolver to create a new resolver for a specific subgraph - /// For other resolvers, this method will simply return the current resolver - /// This is required because paths mentioned in the subgraph manifest are relative paths - /// and we need a new resolver with the right base directory for the specific subgraph - fn for_deployment(&self, deployment: DeploymentHash) -> Result, Error>; + /// Creates a new resolver scoped to a specific subgraph manifest. + /// + /// For FileLinkResolver, this sets the base directory to the manifest's parent directory. + /// Note the manifest here is the manifest in the build directory, not the manifest in the source directory + /// to properly resolve relative paths referenced in the manifest (schema, mappings, etc.). + /// For other resolvers (IPFS/Arweave), this simply returns a clone since they use + /// absolute content identifiers. + /// + /// The `manifest_path` parameter can be a filesystem path or an alias. Aliases are used + /// in development environments (via `gnd --sources`) to map user-defined + /// aliases to actual subgraph paths, enabling local development with file-based + /// subgraphs that reference each other. + fn for_manifest(&self, manifest_path: &str) -> Result, Error>; /// Read the contents of `link` and deserialize them into a stream of JSON /// values. The values must each be on a single line; newlines are significant diff --git a/graph/src/data_source/subgraph.rs b/graph/src/data_source/subgraph.rs index 3170754d499..b1b83a5137b 100644 --- a/graph/src/data_source/subgraph.rs +++ b/graph/src/data_source/subgraph.rs @@ -260,7 +260,7 @@ impl UnresolvedDataSource { logger: &Logger, ) -> Result>, Error> { let resolver: Arc = - Arc::from(resolver.for_deployment(self.source.address.clone())?); + Arc::from(resolver.for_manifest(&self.source.address.to_string())?); let source_raw = resolver .cat(logger, &self.source.address.to_ipfs_link()) .await @@ -284,7 +284,7 @@ impl UnresolvedDataSource { ))?; let resolver: Arc = - Arc::from(resolver.for_deployment(self.source.address.clone())?); + Arc::from(resolver.for_manifest(&self.source.address.to_string())?); source_manifest .resolve(&resolver, logger, LATEST_VERSION.clone()) .await diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index 8b888c13da4..084398502bb 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -91,9 +91,9 @@ impl LinkResolverTrait for TextResolver { Box::new(self.clone()) } - fn for_deployment( + fn for_manifest( &self, - _deployment: DeploymentHash, + _manifest_path: &str, ) -> Result, anyhow::Error> { Ok(Box::new(self.clone())) }