diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 24c6ad5e5..5cd81ea0b 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -1,6 +1,7 @@ package gethbridge import ( + "context" "crypto/ecdsa" "errors" "time" @@ -254,7 +255,7 @@ func (w *gethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) { w.waku.MarkP2PMessageAsProcessed(hash) } -func (w *gethWakuWrapper) RequestStoreMessages(peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) { +func (w *gethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) { return nil, errors.New("not implemented") } func (w *gethWakuWrapper) ConnectionChanged(_ connection.State) {} diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index ba04b6fe5..e5cfbd931 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -1,6 +1,7 @@ package gethbridge import ( + "context" "crypto/ecdsa" "errors" "time" @@ -178,7 +179,7 @@ func (w *gethWakuV2Wrapper) SendMessagesRequest(peerID []byte, r types.MessagesR return errors.New("DEPRECATED") } -func (w *gethWakuV2Wrapper) RequestStoreMessages(peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) { +func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) { var options []store.HistoryRequestOption peer, err := peer.Decode(string(peerID)) @@ -203,7 +204,7 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(peerID []byte, r types.Messages topics = append(topics, wakucommon.BytesToTopic(topic)) } - pbCursor, err := w.waku.Query(peer, topics, uint64(r.From), uint64(r.To), options) + pbCursor, err := w.waku.Query(ctx, peer, topics, uint64(r.From), uint64(r.To), options) if err != nil { return nil, err } diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index e6a4bf6a9..3f478dba1 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -1,6 +1,7 @@ package types import ( + "context" "crypto/ecdsa" "sync" "time" @@ -143,7 +144,7 @@ type Waku interface { SendMessagesRequest(peerID []byte, request MessagesRequest) error // RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages - RequestStoreMessages(peerID []byte, request MessagesRequest) (*StoreRequestCursor, error) + RequestStoreMessages(ctx context.Context, peerID []byte, request MessagesRequest) (*StoreRequestCursor, error) // ProcessingP2PMessages indicates whether there are in-flight p2p messages ProcessingP2PMessages() bool diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index b4e2bcf76..ea7bfa558 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -492,7 +492,7 @@ func (t *Transport) createMessagesRequestV2( }) go func() { - storeCursor, err = t.waku.RequestStoreMessages(peerID, r) + storeCursor, err = t.waku.RequestStoreMessages(ctx, peerID, r) resultCh <- struct { storeCursor *types.StoreRequestCursor err error @@ -507,7 +507,7 @@ func (t *Transport) createMessagesRequestV2( } } else { go func() { - _, err = t.waku.RequestStoreMessages(peerID, r) + _, err = t.waku.RequestStoreMessages(ctx, peerID, r) if err != nil { t.logger.Error("failed to request store messages", zap.Error(err)) } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 47455f6d0..1a76fd653 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1118,7 +1118,7 @@ func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) { return hash, nil } -func (w *Waku) Query(peerID peer.ID, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (cursor *pb.Index, err error) { +func (w *Waku) Query(ctx context.Context, peerID peer.ID, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (cursor *pb.Index, err error) { strTopics := make([]string, len(topics)) for i, t := range topics { strTopics[i] = t.ContentTopic() @@ -1136,15 +1136,12 @@ func (w *Waku) Query(peerID peer.ID, topics []common.TopicType, from uint64, to Topic: relay.DefaultWakuTopic, } - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - result, err := w.node.Store().Query(ctx, query, opts...) if err != nil && errors.Is(err, store.ErrEmptyResponse) { // No messages return nil, nil } else if err != nil { - w.logger.Error("error querying storenode", zap.String("peerID", peerID.String()), zap.Error(err)) + w.logger.Error("error querying storenode", zap.String("requestID", hexutil.Encode(requestID)), zap.String("peerID", peerID.String()), zap.Error(err)) signal.SendHistoricMessagesRequestFailed(requestID, peerID, err) return nil, err }