fix: use parent ctx (of 30s instead of 20s) and log requestID in case of failures

This commit is contained in:
Richard Ramos 2023-01-26 12:29:00 -04:00 committed by RichΛrd
parent db74eaa0d9
commit 3553761cb5
5 changed files with 11 additions and 11 deletions

View File

@ -1,6 +1,7 @@
package gethbridge package gethbridge
import ( import (
"context"
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"time" "time"
@ -254,7 +255,7 @@ func (w *gethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
w.waku.MarkP2PMessageAsProcessed(hash) w.waku.MarkP2PMessageAsProcessed(hash)
} }
func (w *gethWakuWrapper) RequestStoreMessages(peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) { func (w *gethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) {
return nil, errors.New("not implemented") return nil, errors.New("not implemented")
} }
func (w *gethWakuWrapper) ConnectionChanged(_ connection.State) {} func (w *gethWakuWrapper) ConnectionChanged(_ connection.State) {}

View File

@ -1,6 +1,7 @@
package gethbridge package gethbridge
import ( import (
"context"
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"time" "time"
@ -178,7 +179,7 @@ func (w *gethWakuV2Wrapper) SendMessagesRequest(peerID []byte, r types.MessagesR
return errors.New("DEPRECATED") return errors.New("DEPRECATED")
} }
func (w *gethWakuV2Wrapper) RequestStoreMessages(peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) { func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) {
var options []store.HistoryRequestOption var options []store.HistoryRequestOption
peer, err := peer.Decode(string(peerID)) peer, err := peer.Decode(string(peerID))
@ -203,7 +204,7 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(peerID []byte, r types.Messages
topics = append(topics, wakucommon.BytesToTopic(topic)) topics = append(topics, wakucommon.BytesToTopic(topic))
} }
pbCursor, err := w.waku.Query(peer, topics, uint64(r.From), uint64(r.To), options) pbCursor, err := w.waku.Query(ctx, peer, topics, uint64(r.From), uint64(r.To), options)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,6 +1,7 @@
package types package types
import ( import (
"context"
"crypto/ecdsa" "crypto/ecdsa"
"sync" "sync"
"time" "time"
@ -143,7 +144,7 @@ type Waku interface {
SendMessagesRequest(peerID []byte, request MessagesRequest) error SendMessagesRequest(peerID []byte, request MessagesRequest) error
// RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages // RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages
RequestStoreMessages(peerID []byte, request MessagesRequest) (*StoreRequestCursor, error) RequestStoreMessages(ctx context.Context, peerID []byte, request MessagesRequest) (*StoreRequestCursor, error)
// ProcessingP2PMessages indicates whether there are in-flight p2p messages // ProcessingP2PMessages indicates whether there are in-flight p2p messages
ProcessingP2PMessages() bool ProcessingP2PMessages() bool

View File

@ -492,7 +492,7 @@ func (t *Transport) createMessagesRequestV2(
}) })
go func() { go func() {
storeCursor, err = t.waku.RequestStoreMessages(peerID, r) storeCursor, err = t.waku.RequestStoreMessages(ctx, peerID, r)
resultCh <- struct { resultCh <- struct {
storeCursor *types.StoreRequestCursor storeCursor *types.StoreRequestCursor
err error err error
@ -507,7 +507,7 @@ func (t *Transport) createMessagesRequestV2(
} }
} else { } else {
go func() { go func() {
_, err = t.waku.RequestStoreMessages(peerID, r) _, err = t.waku.RequestStoreMessages(ctx, peerID, r)
if err != nil { if err != nil {
t.logger.Error("failed to request store messages", zap.Error(err)) t.logger.Error("failed to request store messages", zap.Error(err))
} }

View File

@ -1118,7 +1118,7 @@ func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) {
return hash, nil return hash, nil
} }
func (w *Waku) Query(peerID peer.ID, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (cursor *pb.Index, err error) { func (w *Waku) Query(ctx context.Context, peerID peer.ID, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (cursor *pb.Index, err error) {
strTopics := make([]string, len(topics)) strTopics := make([]string, len(topics))
for i, t := range topics { for i, t := range topics {
strTopics[i] = t.ContentTopic() strTopics[i] = t.ContentTopic()
@ -1136,15 +1136,12 @@ func (w *Waku) Query(peerID peer.ID, topics []common.TopicType, from uint64, to
Topic: relay.DefaultWakuTopic, Topic: relay.DefaultWakuTopic,
} }
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
result, err := w.node.Store().Query(ctx, query, opts...) result, err := w.node.Store().Query(ctx, query, opts...)
if err != nil && errors.Is(err, store.ErrEmptyResponse) { if err != nil && errors.Is(err, store.ErrEmptyResponse) {
// No messages // No messages
return nil, nil return nil, nil
} else if err != nil { } else if err != nil {
w.logger.Error("error querying storenode", zap.String("peerID", peerID.String()), zap.Error(err)) w.logger.Error("error querying storenode", zap.String("requestID", hexutil.Encode(requestID)), zap.String("peerID", peerID.String()), zap.Error(err))
signal.SendHistoricMessagesRequestFailed(requestID, peerID, err) signal.SendHistoricMessagesRequestFailed(requestID, peerID, err)
return nil, err return nil, err
} }