fix_: rebase

This commit is contained in:
Richard Ramos 2024-10-17 17:54:13 -04:00
parent 3107c1eb0b
commit 8198ac1996
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
6 changed files with 1009 additions and 675 deletions

2
third_party/nwaku vendored

@ -1 +1 @@
Subproject commit c861fa9f7560068874570598c81b7a1425a9e931 Subproject commit c5a825e206c1cb3e6e0cf8c01410527804cb76c4

View File

@ -513,4 +513,4 @@ func (api *PublicWakuAPI) NewMessageFilter(req Criteria) (string, error) {
api.mu.Unlock() api.mu.Unlock()
return id, nil return id, nil
} */ } */

View File

@ -3,7 +3,6 @@ package wakuv2
import ( import (
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/status-im/status-go/wakuv2/common"
"github.com/waku-org/go-waku/waku/v2/api/history" "github.com/waku-org/go-waku/waku/v2/api/history"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
) )
@ -17,7 +16,9 @@ func NewHistoryProcessorWrapper(waku *Waku) history.HistoryProcessor {
} }
func (hr *HistoryProcessorWrapper) OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error { func (hr *HistoryProcessorWrapper) OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error {
return hr.waku.OnNewEnvelopes(env, common.StoreMessageType, processEnvelopes) // TODO-nwaku
// return hr.waku.OnNewEnvelopes(env, common.StoreMessageType, processEnvelopes)
return nil
} }
func (hr *HistoryProcessorWrapper) OnRequestFailed(requestID []byte, peerID peer.ID, err error) { func (hr *HistoryProcessorWrapper) OnRequestFailed(requestID []byte, peerID peer.ID, err error) {

View File

@ -1,5 +1,6 @@
package wakuv2 package wakuv2
/* TODO-nwaku
import ( import (
"errors" "errors"
@ -116,3 +117,4 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) {
}) })
} }
} }
*/

File diff suppressed because it is too large Load Diff

View File

@ -162,20 +162,20 @@ func TestBasicWakuV2(t *testing.T) {
extNodeRestPort := 8646 extNodeRestPort := 8646
storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort) storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort)
require.NoError(t, err) require.NoError(t, err)
nwakuConfig := WakuConfig{ nwakuConfig := WakuConfig{
Port: 30303, Port: 30303,
NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710",
EnableRelay: true, EnableRelay: true,
LogLevel: "DEBUG", LogLevel: "DEBUG",
DnsDiscoveryUrl: "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im", DnsDiscoveryUrl: "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im",
DnsDiscovery: true, DnsDiscovery: true,
Discv5Discovery: true, Discv5Discovery: true,
Staticnodes: []string{storeNodeInfo.ListenAddresses[0]}, Staticnodes: []string{storeNodeInfo.ListenAddresses[0]},
ClusterID: 16, ClusterID: 16,
Shards: []uint16{64}, Shards: []uint16{64},
} }
w, err := New(nil, "", &nwakuConfig, nil, nil, nil, nil, nil) w, err := New(nil, "", &nwakuConfig, nil, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, w.Start()) require.NoError(t, w.Start())
@ -190,7 +190,7 @@ func TestBasicWakuV2(t *testing.T) {
// Sanity check, not great, but it's probably helpful // Sanity check, not great, but it's probably helpful
err = tt.RetryWithBackOff(func() error { err = tt.RetryWithBackOff(func() error {
numConnected, err := w.GetNumConnectedPeers() numConnected, err := w.GetNumConnectedPeers()
if err != nil { if err != nil {
return err return err
@ -204,7 +204,7 @@ func TestBasicWakuV2(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Get local store node address // Get local store node address
storeNode, err :=peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0]) storeNode, err := peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0])
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, err) require.NoError(t, err)
@ -222,7 +222,7 @@ func TestBasicWakuV2(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID) isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID)
require.True(t, isDisconnected, "nwaku should be disconnected from the store node") require.True(t, isDisconnected, "nwaku should be disconnected from the store node")
// Re-connect // Re-connect
err = w.DialPeerByID(storeNode.ID) err = w.DialPeerByID(storeNode.ID)
require.NoError(t, err) require.NoError(t, err)
@ -231,66 +231,66 @@ func TestBasicWakuV2(t *testing.T) {
connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300))
require.NoError(t, err) require.NoError(t, err)
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")
/*
filter := &common.Filter{
PubsubTopic: config.DefaultShardPubsubTopic,
Messages: common.NewMemoryMessageStore(),
ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}),
}
_, err = w.Subscribe(filter) /*
require.NoError(t, err) filter := &common.Filter{
PubsubTopic: config.DefaultShardPubsubTopic,
msgTimestamp := w.timestamp() Messages: common.NewMemoryMessageStore(),
contentTopic := maps.Keys(filter.ContentTopics)[0] ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}),
time.Sleep(2 * time.Second)
_, err = w.Send(config.DefaultShardPubsubTopic, &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: contentTopic.ContentTopic(),
Version: proto.Uint32(0),
Timestamp: &msgTimestamp,
}, nil)
require.NoError(t, err)
time.Sleep(1 * time.Second)
messages := filter.Retrieve()
require.Len(t, messages, 1)
timestampInSeconds := msgTimestamp / int64(time.Second)
marginInSeconds := 20
options = func(b *backoff.ExponentialBackOff) {
b.MaxElapsedTime = 60 * time.Second
b.InitialInterval = 500 * time.Millisecond
}
err = tt.RetryWithBackOff(func() error {
_, envelopeCount, err := w.Query(
context.Background(),
storeNode.PeerID,
store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(config.DefaultShardPubsubTopic, contentTopic.ContentTopic()),
TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)),
TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)),
},
nil,
nil,
false,
)
if err != nil || envelopeCount == 0 {
// in case of failure extend timestamp margin up to 40secs
if marginInSeconds < 40 {
marginInSeconds += 5
}
return errors.New("no messages received from store node")
} }
return nil
}, options) _, err = w.Subscribe(filter)
require.NoError(t, err) */ require.NoError(t, err)
msgTimestamp := w.timestamp()
contentTopic := maps.Keys(filter.ContentTopics)[0]
time.Sleep(2 * time.Second)
_, err = w.Send(config.DefaultShardPubsubTopic, &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: contentTopic.ContentTopic(),
Version: proto.Uint32(0),
Timestamp: &msgTimestamp,
}, nil)
require.NoError(t, err)
time.Sleep(1 * time.Second)
messages := filter.Retrieve()
require.Len(t, messages, 1)
timestampInSeconds := msgTimestamp / int64(time.Second)
marginInSeconds := 20
options = func(b *backoff.ExponentialBackOff) {
b.MaxElapsedTime = 60 * time.Second
b.InitialInterval = 500 * time.Millisecond
}
err = tt.RetryWithBackOff(func() error {
_, envelopeCount, err := w.Query(
context.Background(),
storeNode.PeerID,
store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(config.DefaultShardPubsubTopic, contentTopic.ContentTopic()),
TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)),
TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)),
},
nil,
nil,
false,
)
if err != nil || envelopeCount == 0 {
// in case of failure extend timestamp margin up to 40secs
if marginInSeconds < 40 {
marginInSeconds += 5
}
return errors.New("no messages received from store node")
}
return nil
}, options)
require.NoError(t, err) */
require.NoError(t, w.Stop()) require.NoError(t, w.Stop())
} }