From 186454528ba030a793cdeec254272578a2c70f49 Mon Sep 17 00:00:00 2001 From: Patryk Osmaczko Date: Tue, 10 Jan 2023 21:59:10 +0100 Subject: [PATCH] chore: make `createMessagesRequestV2` respect `mailserverRequestTimeout` closes: #3053 --- protocol/transport/transport.go | 36 +++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 2eddfec3e..b4e2bcf76 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -476,16 +476,44 @@ func (t *Transport) createMessagesRequestV1( } func (t *Transport) createMessagesRequestV2( + ctx context.Context, peerID []byte, from, to uint32, previousStoreCursor *types.StoreRequestCursor, topics []types.TopicType, + waitForResponse bool, ) (storeCursor *types.StoreRequestCursor, err error) { r := createMessagesRequest(from, to, nil, previousStoreCursor, topics) - storeCursor, err = t.waku.RequestStoreMessages(peerID, r) - if err != nil { - return + + if waitForResponse { + resultCh := make(chan struct { + storeCursor *types.StoreRequestCursor + err error + }) + + go func() { + storeCursor, err = t.waku.RequestStoreMessages(peerID, r) + resultCh <- struct { + storeCursor *types.StoreRequestCursor + err error + }{storeCursor, err} + }() + + select { + case result := <-resultCh: + return result.storeCursor, result.err + case <-ctx.Done(): + return nil, ctx.Err() + } + } else { + go func() { + _, err = t.waku.RequestStoreMessages(peerID, r) + if err != nil { + t.logger.Error("failed to request store messages", zap.Error(err)) + } + }() } + return } @@ -500,7 +528,7 @@ func (t *Transport) SendMessagesRequestForTopics( ) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) { switch t.waku.Version() { case 2: - storeCursor, err = t.createMessagesRequestV2(peerID, from, to, previousStoreCursor, topics) + storeCursor, err = t.createMessagesRequestV2(ctx, peerID, from, to, previousStoreCursor, topics, waitForResponse) case 1: cursor, err = t.createMessagesRequestV1(ctx, peerID, from, to, previousCursor, topics, waitForResponse) default: