test_: disable store request for wakuv1

This commit is contained in:
Richard Ramos 2024-08-20 16:27:42 -04:00
parent 629291dbf7
commit f0ce9a712e
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760

View File

@ -1,12 +1,10 @@
package transport
import (
"bytes"
"context"
"crypto/ecdsa"
"database/sql"
"encoding/hex"
"fmt"
"sync"
"time"
@ -463,40 +461,6 @@ func (t *Transport) Peers() types.PeerStats {
return t.waku.Peers()
}
func (t *Transport) createMessagesRequestV1(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
topics []types.TopicType,
waitForResponse bool,
) (cursor []byte, err error) {
r := createMessagesRequest(from, to, previousCursor, nil, "", topics, 1000)
events := make(chan types.EnvelopeEvent, 10)
sub := t.waku.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
err = t.waku.SendMessagesRequest(peerID, r)
if err != nil {
return
}
if !waitForResponse {
return
}
var resp *types.MailServerResponse
resp, err = t.waitForRequestCompleted(ctx, r.ID, events)
if err == nil && resp != nil && resp.Error != nil {
err = resp.Error
} else if err == nil && resp != nil {
cursor = resp.Cursor
}
return
}
func (t *Transport) createMessagesRequestV2(
ctx context.Context,
peerID []byte,
@ -556,14 +520,7 @@ func (t *Transport) SendMessagesRequestForTopics(
waitForResponse bool,
processEnvelopes bool,
) (cursor []byte, storeCursor types.StoreRequestCursor, envelopesCount int, err error) {
switch t.waku.Version() {
case 2:
storeCursor, envelopesCount, err = t.createMessagesRequestV2(ctx, peerID, from, to, previousStoreCursor, pubsubTopic, contentTopics, limit, waitForResponse, processEnvelopes)
case 1:
cursor, err = t.createMessagesRequestV1(ctx, peerID, from, to, previousCursor, contentTopics, waitForResponse)
default:
err = fmt.Errorf("unsupported version %d", t.waku.Version())
}
storeCursor, envelopesCount, err = t.createMessagesRequestV2(ctx, peerID, from, to, previousStoreCursor, pubsubTopic, contentTopics, limit, waitForResponse, processEnvelopes)
return
}
@ -587,26 +544,6 @@ func createMessagesRequest(from, to uint32, cursor []byte, storeCursor types.Sto
}
}
func (t *Transport) waitForRequestCompleted(ctx context.Context, requestID []byte, events chan types.EnvelopeEvent) (*types.MailServerResponse, error) {
for {
select {
case ev := <-events:
if !bytes.Equal(ev.Hash.Bytes(), requestID) {
continue
}
if ev.Event != types.EventMailServerRequestCompleted {
continue
}
data, ok := ev.Data.(*types.MailServerResponse)
if ok {
return data, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// ConfirmMessagesProcessed marks the messages as processed in the cache so
// they won't be passed to the next layer anymore
func (t *Transport) ConfirmMessagesProcessed(ids []string, timestamp uint64) error {