mirror of
https://github.com/logos-messaging/logos-messaging-go-bindings.git
synced 2026-01-10 01:43:08 +00:00
partial version of basic test
This commit is contained in:
parent
b055996183
commit
b797662242
5
nwaku.go
5
nwaku.go
@ -325,7 +325,6 @@ import (
|
||||
libp2pproto "github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
@ -413,10 +412,10 @@ func (w *Waku) DialPeer(address multiaddr.Multiaddr) error {
|
||||
return w.node.Connect(ctx, address)
|
||||
}
|
||||
|
||||
func (w *Waku) DialPeerByID(peerID peer.ID) error {
|
||||
func (w *Waku) DialPeerByID(peerID peer.ID, protocol libp2pproto.ID) error {
|
||||
ctx, cancel := context.WithTimeout(w.ctx, requestTimeout)
|
||||
defer cancel()
|
||||
return w.node.DialPeerByID(ctx, peerID, relay.WakuRelayID_v200)
|
||||
return w.node.DialPeerByID(ctx, peerID, protocol)
|
||||
}
|
||||
|
||||
func (w *Waku) DropPeer(peerID peer.ID) error {
|
||||
|
||||
164
nwaku_test.go
164
nwaku_test.go
@ -13,9 +13,173 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/cenkalti/backoff/v3"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
)
|
||||
|
||||
// In order to run these tests, you must run an nwaku node
|
||||
//
|
||||
// Using Docker:
|
||||
//
|
||||
// IP_ADDRESS=$(hostname -I | awk '{print $1}');
|
||||
// docker run \
|
||||
// -p 61000:61000/tcp -p 8000:8000/udp -p 8646:8646/tcp harbor.status.im/wakuorg/nwaku:v0.33.0 \
|
||||
// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG --shard=64 --tcp-port=61000 \
|
||||
// --nat=extip:${IP_ADDRESS} --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646
|
||||
|
||||
func TestBasicWakuV2(t *testing.T) {
|
||||
extNodeRestPort := 8646
|
||||
storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort)
|
||||
require.NoError(t, err)
|
||||
|
||||
nwakuConfig := WakuConfig{
|
||||
Port: 30303,
|
||||
NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710",
|
||||
EnableRelay: true,
|
||||
LogLevel: "DEBUG",
|
||||
DnsDiscoveryUrl: "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im",
|
||||
DnsDiscovery: true,
|
||||
Discv5Discovery: true,
|
||||
Staticnodes: []string{storeNodeInfo.ListenAddresses[0]},
|
||||
ClusterID: 16,
|
||||
Shards: []uint16{64},
|
||||
}
|
||||
|
||||
storeNodeMa, err := ma.NewMultiaddr(storeNodeInfo.ListenAddresses[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
w, err := New(&nwakuConfig, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, w.Start())
|
||||
|
||||
enr, err := w.node.ENR()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, enr)
|
||||
|
||||
options := func(b *backoff.ExponentialBackOff) {
|
||||
b.MaxElapsedTime = 30 * time.Second
|
||||
}
|
||||
|
||||
// Sanity check, not great, but it's probably helpful
|
||||
err = RetryWithBackOff(func() error {
|
||||
numConnected, err := w.node.GetNumConnectedPeers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Have to be connected to at least 3 nodes: the static node, the bootstrap node, and one discovered node
|
||||
if numConnected > 2 {
|
||||
return nil
|
||||
}
|
||||
return errors.New("no peers discovered")
|
||||
}, options)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Get local store node address
|
||||
storeNode, err := peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
/* for i := 0; i <= 100; i++ {
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
|
||||
w.StorenodeCycle.SetStorenodeConfigProvider(newTestStorenodeConfigProvider(*storeNode)) */
|
||||
|
||||
// Check that we are indeed connected to the store node
|
||||
connectedStoreNodes, err := w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)
|
||||
require.NoError(t, err)
|
||||
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")
|
||||
|
||||
// Disconnect from the store node
|
||||
err = w.node.DisconnectPeerByID(storeNode.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that we are indeed disconnected
|
||||
connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)
|
||||
require.NoError(t, err)
|
||||
isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID)
|
||||
require.True(t, isDisconnected, "nwaku should be disconnected from the store node")
|
||||
|
||||
// Re-connect
|
||||
err = w.DialPeer(storeNodeMa)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that we are connected again
|
||||
connectedStoreNodes, err = w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)
|
||||
require.NoError(t, err)
|
||||
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")
|
||||
|
||||
/* filter := &common.Filter{
|
||||
PubsubTopic: w.cfg.DefaultShardPubsubTopic,
|
||||
Messages: common.NewMemoryMessageStore(),
|
||||
ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}),
|
||||
}
|
||||
|
||||
_, err = w.Subscribe(filter)
|
||||
require.NoError(t, err)
|
||||
|
||||
msgTimestamp := w.timestamp()
|
||||
contentTopic := maps.Keys(filter.ContentTopics)[0]
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
msgID, err := w.Send(w.cfg.DefaultShardPubsubTopic, &pb.WakuMessage{
|
||||
Payload: []byte{1, 2, 3, 4, 5},
|
||||
ContentTopic: contentTopic.ContentTopic(),
|
||||
Version: proto.Uint32(0),
|
||||
Timestamp: &msgTimestamp,
|
||||
}, nil)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, msgID, "1")
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
messages := filter.Retrieve()
|
||||
require.Len(t, messages, 1)
|
||||
|
||||
timestampInSeconds := msgTimestamp / int64(time.Second)
|
||||
marginInSeconds := 20
|
||||
|
||||
options = func(b *backoff.ExponentialBackOff) {
|
||||
b.MaxElapsedTime = 60 * time.Second
|
||||
b.InitialInterval = 500 * time.Millisecond
|
||||
}
|
||||
err = RetryWithBackOff(func() error {
|
||||
err := w.HistoryRetriever.Query(
|
||||
context.Background(),
|
||||
store.FilterCriteria{
|
||||
ContentFilter: protocol.NewContentFilter(w.cfg.DefaultShardPubsubTopic, contentTopic.ContentTopic()),
|
||||
TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)),
|
||||
TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)),
|
||||
},
|
||||
*storeNode,
|
||||
10,
|
||||
nil, false,
|
||||
)
|
||||
|
||||
return err
|
||||
|
||||
// TODO-nwaku
|
||||
if err != nil || envelopeCount == 0 {
|
||||
// in case of failure extend timestamp margin up to 40secs
|
||||
if marginInSeconds < 40 {
|
||||
marginInSeconds += 5
|
||||
}
|
||||
return errors.New("no messages received from store node")
|
||||
}
|
||||
return nil
|
||||
|
||||
}, options)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(10 * time.Second)
|
||||
*/
|
||||
|
||||
require.NoError(t, w.Stop())
|
||||
}
|
||||
|
||||
func TestPeerExchange(t *testing.T) {
|
||||
logger, err := zap.NewDevelopment()
|
||||
require.NoError(t, err)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user