Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Config struct {
// NewClient creates a new DA client with pre-calculated namespace bytes.
func NewClient(cfg Config) *client {
if cfg.DefaultTimeout == 0 {
cfg.DefaultTimeout = 30 * time.Second
cfg.DefaultTimeout = 60 * time.Second
}
if cfg.RetrieveBatchSize <= 0 {
cfg.RetrieveBatchSize = defaultRetrieveBatchSize
Expand All @@ -72,7 +72,10 @@ func NewClient(cfg Config) *client {

// Submit submits blobs to the DA layer with the specified options.
func (c *client) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) coreda.ResultSubmit {
ids, err := c.da.SubmitWithOptions(ctx, data, gasPrice, namespace, options)
submitCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
defer cancel()

ids, err := c.da.SubmitWithOptions(submitCtx, data, gasPrice, namespace, options)
Comment on lines +75 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

While adding a timeout here is a great improvement, I've noticed two related points:

  1. Error Handling: The error handling for context.DeadlineExceeded (which will be returned by c.da.SubmitWithOptions on timeout) seems to be missing. The current error handling in this function does not specifically handle context.DeadlineExceeded. This will cause a timeout to be treated as a generic error (coreda.StatusError) instead of a more specific timeout status like coreda.StatusContextDeadline. Please consider updating the switch statement that handles errors to include a case for context.DeadlineExceeded:

    case errors.Is(err, context.DeadlineExceeded):
        status = coreda.StatusContextDeadline
  2. Missing Test: The timeout mechanism for Submit is not tested. While TestClient_Submit has a case for "context deadline error", it only checks the mapping of a pre-canned coreda.ErrContextDeadline and doesn't verify that the context.WithTimeout works as expected. A dedicated test, similar to TestClient_Retrieve_Timeout, should be added to client_test.go to ensure the timeout in Submit is effective.


// calculate blob size
var blobSize uint64
Expand Down Expand Up @@ -160,9 +163,9 @@ func (c *client) Submit(ctx context.Context, data [][]byte, gasPrice float64, na

// Retrieve retrieves blobs from the DA layer at the specified height and namespace.
func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) coreda.ResultRetrieve {
// 1. Get IDs
getIDsCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
defer cancel()

idsResult, err := c.da.GetIDs(getIDsCtx, height, namespace)
if err != nil {
// Handle specific "not found" error
Expand Down
2 changes: 1 addition & 1 deletion block/internal/da/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestNewClient(t *testing.T) {

expectedTimeout := tt.cfg.DefaultTimeout
if expectedTimeout == 0 {
expectedTimeout = 30 * time.Second
expectedTimeout = 60 * time.Second
}
assert.Equal(t, client.defaultTimeout, expectedTimeout)
})
Expand Down
5 changes: 1 addition & 4 deletions block/internal/submitting/da_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import (
"github.com/evstack/ev-node/types"
)

const (
submissionTimeout = 60 * time.Second
initialBackoff = 100 * time.Millisecond
)
const initialBackoff = 100 * time.Millisecond

// retryPolicy defines clamped bounds for retries and backoff.
type retryPolicy struct {
Expand Down
31 changes: 0 additions & 31 deletions block/internal/syncing/da_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,38 +110,7 @@ func TestDARetriever_RetrieveFromDA_HeightFromFuture(t *testing.T) {
assert.Nil(t, events)
}

func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) {
t.Skip("Skipping flaky timeout test - timing is now controlled by DA client")

mockDA := testmocks.NewMockDA(t)

// Mock GetIDs to hang longer than the timeout
mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything).
Run(func(ctx context.Context, height uint64, namespace []byte) {
<-ctx.Done()
}).
Return(nil, context.DeadlineExceeded).Maybe()

r := newTestDARetriever(t, mockDA, config.DefaultConfig(), genesis.Genesis{})

start := time.Now()
events, err := r.RetrieveFromDA(context.Background(), 42)
duration := time.Since(start)

// Verify error is returned and contains deadline exceeded information
require.Error(t, err)
assert.Contains(t, err.Error(), "DA retrieval failed")
assert.Contains(t, err.Error(), "context deadline exceeded")
assert.Len(t, events, 0)

// Verify timeout occurred approximately at expected time (with some tolerance)
// DA client has a 30-second default timeout
assert.Greater(t, duration, 29*time.Second, "should timeout after approximately 30 seconds")
assert.Less(t, duration, 35*time.Second, "should not take much longer than timeout")
}

func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) {

mockDA := testmocks.NewMockDA(t)

// Mock GetIDs to immediately return context deadline exceeded
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ type DAConfig struct {
BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Average block time of the DA chain (duration). Determines frequency of DA layer syncing, maximum backoff time for retries, and is multiplied by MempoolTTL to calculate transaction expiration. Examples: \"15s\", \"30s\", \"1m\", \"2m30s\", \"10m\"."`
MempoolTTL uint64 `mapstructure:"mempool_ttl" yaml:"mempool_ttl" comment:"Number of DA blocks after which a transaction is considered expired and dropped from the mempool. Controls retry backoff timing."`
MaxSubmitAttempts int `mapstructure:"max_submit_attempts" yaml:"max_submit_attempts" comment:"Maximum number of attempts to submit data to the DA layer before giving up. Higher values provide more resilience but can delay error reporting."`
RetrieveBatchSize int `mapstructure:"retrieve_batch_size" yaml:"retrieve_batch_size" comment:"Number of IDs to request per DA Get call when retrieving blobs. Smaller batches lower per-request latency; larger batches reduce the number of RPC round trips. Default: 100."`
RequestTimeout DurationWrapper `mapstructure:"request_timeout" yaml:"request_timeout" comment:"Per-request timeout applied to DA GetIDs/Get calls when retrieving blobs. Larger values tolerate slower DA nodes at the cost of waiting longer before failing. Default: 30s."`
RetrieveBatchSize int `mapstructure:"retrieve_batch_size" yaml:"retrieve_batch_size" comment:"Number of IDs to request per DA Get call when retrieving blobs. Smaller batches lower per-request latency; larger batches reduce the number of RPC round trips."`
RequestTimeout DurationWrapper `mapstructure:"request_timeout" yaml:"request_timeout" comment:"Per-request timeout applied to DA interactions. Larger values tolerate slower DA nodes at the cost of waiting longer before failing."`
}

// GetNamespace returns the namespace for header submissions.
Expand Down Expand Up @@ -327,7 +327,7 @@ func AddFlags(cmd *cobra.Command) {
cmd.Flags().Uint64(FlagDAMempoolTTL, def.DA.MempoolTTL, "number of DA blocks until transaction is dropped from the mempool")
cmd.Flags().Int(FlagDAMaxSubmitAttempts, def.DA.MaxSubmitAttempts, "maximum number of attempts to submit data to the DA layer before giving up")
cmd.Flags().Int(FlagDARetrieveBatchSize, def.DA.RetrieveBatchSize, "number of IDs to request per DA Get call when retrieving blobs")
cmd.Flags().Duration(FlagDARequestTimeout, def.DA.RequestTimeout.Duration, "per-request timeout when retrieving blobs from the DA layer")
cmd.Flags().Duration(FlagDARequestTimeout, def.DA.RequestTimeout.Duration, "per-request timeout when interacting with the DA layer")

// P2P configuration flags
cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)")
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestDefaultConfig(t *testing.T) {
assert.Equal(t, "", def.DA.SubmitOptions)
assert.NotEmpty(t, def.DA.Namespace)
assert.Equal(t, 100, def.DA.RetrieveBatchSize)
assert.Equal(t, 30*time.Second, def.DA.RequestTimeout.Duration)
assert.Equal(t, 60*time.Second, def.DA.RequestTimeout.Duration)
assert.Equal(t, 1*time.Second, def.Node.BlockTime.Duration)
assert.Equal(t, 6*time.Second, def.DA.BlockTime.Duration)
assert.Equal(t, uint64(0), def.DA.MempoolTTL)
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func DefaultConfig() Config {
Namespace: randString(10),
DataNamespace: "",
RetrieveBatchSize: 100,
RequestTimeout: DurationWrapper{30 * time.Second},
RequestTimeout: DurationWrapper{60 * time.Second},
},
Instrumentation: DefaultInstrumentationConfig(),
Log: LogConfig{
Expand Down
Loading