chore: make `createMessagesRequestV2` respect `mailserverRequestTimeout`

closes: #3053
This commit is contained in:
Patryk Osmaczko 2023-01-10 21:59:10 +01:00 committed by osmaczko
parent 490570c0ec
commit 186454528b
1 changed files with 32 additions and 4 deletions

View File

@ -476,16 +476,44 @@ func (t *Transport) createMessagesRequestV1(
} }
func (t *Transport) createMessagesRequestV2( func (t *Transport) createMessagesRequestV2(
ctx context.Context,
peerID []byte, peerID []byte,
from, to uint32, from, to uint32,
previousStoreCursor *types.StoreRequestCursor, previousStoreCursor *types.StoreRequestCursor,
topics []types.TopicType, topics []types.TopicType,
waitForResponse bool,
) (storeCursor *types.StoreRequestCursor, err error) { ) (storeCursor *types.StoreRequestCursor, err error) {
r := createMessagesRequest(from, to, nil, previousStoreCursor, topics) r := createMessagesRequest(from, to, nil, previousStoreCursor, topics)
if waitForResponse {
resultCh := make(chan struct {
storeCursor *types.StoreRequestCursor
err error
})
go func() {
storeCursor, err = t.waku.RequestStoreMessages(peerID, r) storeCursor, err = t.waku.RequestStoreMessages(peerID, r)
if err != nil { resultCh <- struct {
return storeCursor *types.StoreRequestCursor
err error
}{storeCursor, err}
}()
select {
case result := <-resultCh:
return result.storeCursor, result.err
case <-ctx.Done():
return nil, ctx.Err()
} }
} else {
go func() {
_, err = t.waku.RequestStoreMessages(peerID, r)
if err != nil {
t.logger.Error("failed to request store messages", zap.Error(err))
}
}()
}
return return
} }
@ -500,7 +528,7 @@ func (t *Transport) SendMessagesRequestForTopics(
) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) { ) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) {
switch t.waku.Version() { switch t.waku.Version() {
case 2: case 2:
storeCursor, err = t.createMessagesRequestV2(peerID, from, to, previousStoreCursor, topics) storeCursor, err = t.createMessagesRequestV2(ctx, peerID, from, to, previousStoreCursor, topics, waitForResponse)
case 1: case 1:
cursor, err = t.createMessagesRequestV1(ctx, peerID, from, to, previousCursor, topics, waitForResponse) cursor, err = t.createMessagesRequestV1(ctx, peerID, from, to, previousCursor, topics, waitForResponse)
default: default: