Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ impl EnvVars {
let mapping_handlers = InnerMappingHandlers::init_from_env()?.into();
let store = InnerStore::init_from_env()?.into();

// The default reorganization (reorg) threshold is set to 250.
// For testing purposes, we need to set this threshold to 0 because:
// 1. Many tests involve reverting blocks.
// 2. Blocks cannot be reverted below the reorg threshold.
// Therefore, during tests, we want to set the reorg threshold to 0.
let reorg_threshold =
inner
.reorg_threshold
.unwrap_or_else(|| if cfg!(debug_assertions) { 0 } else { 250 });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wow .. I didn't know you could use cfg! as a normal macro .. nice

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thought: will tests still fail if reorg_trheshold is set to 1? It seems a little weird to use 0 here, but if that's what's needed to let tests pass, I am ok with it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately no, there are some tests that fail when we se 1.


Ok(Self {
graphql,
mappings: mapping_handlers,
Expand Down Expand Up @@ -262,15 +272,13 @@ impl EnvVars {
external_http_base_url: inner.external_http_base_url,
external_ws_base_url: inner.external_ws_base_url,
static_filters_threshold: inner.static_filters_threshold,
reorg_threshold: inner.reorg_threshold,
reorg_threshold: reorg_threshold,
ingestor_polling_interval: Duration::from_millis(inner.ingestor_polling_interval),
subgraph_settings: inner.subgraph_settings,
prefer_substreams_block_streams: inner.prefer_substreams_block_streams,
enable_dips_metrics: inner.enable_dips_metrics.0,
history_blocks_override: inner.history_blocks_override,
min_history_blocks: inner
.min_history_blocks
.unwrap_or(2 * inner.reorg_threshold),
min_history_blocks: inner.min_history_blocks.unwrap_or(2 * reorg_threshold),
dips_metrics_object_store_url: inner.dips_metrics_object_store_url,
})
}
Expand Down Expand Up @@ -392,8 +400,8 @@ struct Inner {
#[envconfig(from = "GRAPH_STATIC_FILTERS_THRESHOLD", default = "10000")]
static_filters_threshold: usize,
// JSON-RPC specific.
#[envconfig(from = "ETHEREUM_REORG_THRESHOLD", default = "250")]
reorg_threshold: BlockNumber,
#[envconfig(from = "ETHEREUM_REORG_THRESHOLD")]
reorg_threshold: Option<BlockNumber>,
#[envconfig(from = "ETHEREUM_POLLING_INTERVAL", default = "1000")]
ingestor_polling_interval: u64,
#[envconfig(from = "GRAPH_EXPERIMENTAL_SUBGRAPH_SETTINGS")]
Expand Down
30 changes: 27 additions & 3 deletions node/src/manager/commands/rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ use std::thread;
use std::time::Duration;
use std::{collections::HashSet, convert::TryFrom};

use crate::manager::commands::assign::pause_or_resume;
use crate::manager::deployment::{Deployment, DeploymentSearch};
use graph::anyhow::bail;
use graph::components::store::{BlockStore as _, ChainStore as _};
use graph::env::ENV_VARS;
use graph::prelude::{anyhow, BlockNumber, BlockPtr};
use graph_store_postgres::command_support::catalog::{self as store_catalog};
use graph_store_postgres::{connection_pool::ConnectionPool, Store};
use graph_store_postgres::{BlockStore, NotificationSender};

use crate::manager::commands::assign::pause_or_resume;
use crate::manager::deployment::{Deployment, DeploymentSearch};

async fn block_ptr(
store: Arc<BlockStore>,
searches: &[DeploymentSearch],
Expand Down Expand Up @@ -71,6 +72,8 @@ pub async fn run(
if !start_block && (block_hash.is_none() || block_number.is_none()) {
bail!("--block-hash and --block-number must be specified when --start-block is not set");
}
let pconn = primary.get()?;
let mut conn = store_catalog::Connection::new(pconn);

let subgraph_store = store.subgraph_store();
let block_store = store.block_store();
Expand Down Expand Up @@ -108,6 +111,27 @@ pub async fn run(
)
};

println!("Checking if its safe to rewind deployments");
for deployment in &deployments {
let locator = &deployment.locator();
let site = conn
.locate_site(locator.clone())?
.ok_or_else(|| anyhow!("failed to locate site for {locator}"))?;
let deployment_store = subgraph_store.for_site(&site)?;
let deployment_details = deployment_store.deployment_details_for_id(locator)?;
let block_number_to = block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0);

if block_number_to < deployment_details.earliest_block_number + ENV_VARS.reorg_threshold {
bail!(
"The block number {} is not safe to rewind to for deployment {}. The earliest block number of this deployment is {}. You can only safely rewind to block number {}",
block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0),
locator,
deployment_details.earliest_block_number,
deployment_details.earliest_block_number + ENV_VARS.reorg_threshold
);
}
}

println!("Pausing deployments");
for deployment in &deployments {
pause_or_resume(primary.clone(), &sender, &deployment.locator(), true)?;
Expand Down
39 changes: 26 additions & 13 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use diesel::{
sql_types::{Nullable, Text},
};
use graph::{
blockchain::block_stream::FirehoseCursor, data::subgraph::schema::SubgraphError,
blockchain::block_stream::FirehoseCursor, data::subgraph::schema::SubgraphError, env::ENV_VARS,
schema::EntityType,
};
use graph::{
Expand Down Expand Up @@ -539,18 +539,31 @@ pub fn revert_block_ptr(
// Work around a Diesel issue with serializing BigDecimals to numeric
let number = format!("{}::numeric", ptr.number);

update(d::table.filter(d::deployment.eq(id.as_str())))
.set((
d::latest_ethereum_block_number.eq(sql(&number)),
d::latest_ethereum_block_hash.eq(ptr.hash_slice()),
d::firehose_cursor.eq(firehose_cursor.as_ref()),
d::reorg_count.eq(d::reorg_count + 1),
d::current_reorg_depth.eq(d::current_reorg_depth + 1),
d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")),
))
.execute(conn)
.map(|_| ())
.map_err(|e| e.into())
let affected_rows = update(
d::table
.filter(d::deployment.eq(id.as_str()))
.filter(d::earliest_block_number.le(ptr.number - ENV_VARS.reorg_threshold)),
)
.set((
d::latest_ethereum_block_number.eq(sql(&number)),
d::latest_ethereum_block_hash.eq(ptr.hash_slice()),
d::firehose_cursor.eq(firehose_cursor.as_ref()),
d::reorg_count.eq(d::reorg_count + 1),
d::current_reorg_depth.eq(d::current_reorg_depth + 1),
d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")),
))
.execute(conn)?;

match affected_rows {
1 => Ok(()),
0 => Err(StoreError::Unknown(anyhow!(
"No rows affected. This could be due to an attempt to revert beyond earliest_block + reorg_threshold",
))),
_ => Err(StoreError::Unknown(anyhow!(
"Expected to update 1 row, but {} rows were affected",
affected_rows
))),
}
}

pub fn block_ptr(
Expand Down
15 changes: 13 additions & 2 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use graph::blockchain::block_stream::FirehoseCursor;
use graph::blockchain::BlockTime;
use graph::components::store::write::RowGroup;
use graph::components::store::{
Batch, DerivedEntityQuery, PrunePhase, PruneReporter, PruneRequest, PruningStrategy,
QueryPermit, StoredDynamicDataSource, VersionStats,
Batch, DeploymentLocator, DerivedEntityQuery, PrunePhase, PruneReporter, PruneRequest,
PruningStrategy, QueryPermit, StoredDynamicDataSource, VersionStats,
};
use graph::components::versions::VERSIONS;
use graph::data::query::Trace;
Expand Down Expand Up @@ -527,6 +527,17 @@ impl DeploymentStore {
conn.transaction(|conn| -> Result<_, StoreError> { detail::deployment_details(conn, ids) })
}

pub fn deployment_details_for_id(
&self,
locator: &DeploymentLocator,
) -> Result<DeploymentDetail, StoreError> {
let id = DeploymentId::from(locator.clone());
let conn = &mut *self.get_conn()?;
conn.transaction(|conn| -> Result<_, StoreError> {
detail::deployment_details_for_id(conn, &id)
})
}

pub(crate) fn deployment_statuses(
&self,
sites: &[Arc<Site>],
Expand Down
14 changes: 13 additions & 1 deletion store/postgres/src/detail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct DeploymentDetail {
fatal_error: Option<String>,
non_fatal_errors: Vec<String>,
/// The earliest block for which we have history
earliest_block_number: i32,
pub earliest_block_number: i32,
pub latest_ethereum_block_hash: Option<Bytes>,
pub latest_ethereum_block_number: Option<BigDecimal>,
last_healthy_ethereum_block_hash: Option<Bytes>,
Expand Down Expand Up @@ -268,6 +268,18 @@ pub(crate) fn deployment_details(
Ok(details)
}

/// Return the details for `deployment`
pub(crate) fn deployment_details_for_id(
conn: &mut PgConnection,
deployment: &DeploymentId,
) -> Result<DeploymentDetail, StoreError> {
use subgraph_deployment as d;
d::table
.filter(d::id.eq(&deployment))
.first::<DeploymentDetail>(conn)
.map_err(StoreError::from)
}

pub(crate) fn deployment_statuses(
conn: &mut PgConnection,
sites: &[Arc<Site>],
Expand Down
4 changes: 4 additions & 0 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ impl SubgraphStore {
pub fn notification_sender(&self) -> Arc<NotificationSender> {
self.sender.clone()
}

pub fn for_site(&self, site: &Site) -> Result<&Arc<DeploymentStore>, StoreError> {
self.inner.for_site(site)
}
}

impl std::ops::Deref for SubgraphStore {
Expand Down