mirror of https://github.com/status-im/go-waku.git
chore: peermanager tests coverage improvement (#1035)
This commit is contained in:
parent
3074cdb11c
commit
4b9e3635a2
|
@ -0,0 +1,86 @@
|
|||
package peermanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/libp2p/go-libp2p/core/control"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
manet "github.com/multiformats/go-multiaddr/net"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type mockConnMultiaddrs struct {
|
||||
local, remote ma.Multiaddr
|
||||
}
|
||||
|
||||
func (m mockConnMultiaddrs) LocalMultiaddr() ma.Multiaddr {
|
||||
return m.local
|
||||
}
|
||||
|
||||
func (m mockConnMultiaddrs) RemoteMultiaddr() ma.Multiaddr {
|
||||
return m.remote
|
||||
}
|
||||
|
||||
func TestConnectionGater(t *testing.T) {
|
||||
|
||||
log := utils.Logger()
|
||||
|
||||
_, h1, _ := makeWakuRelay(t, log)
|
||||
_, h2, _ := makeWakuRelay(t, log)
|
||||
|
||||
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
|
||||
err := h1.Connect(context.Background(), h2.Peerstore().PeerInfo(h2.ID()))
|
||||
require.NoError(t, err)
|
||||
|
||||
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)
|
||||
err = h2.Connect(context.Background(), h1.Peerstore().PeerInfo(h1.ID()))
|
||||
require.NoError(t, err)
|
||||
|
||||
peerA := h1.ID()
|
||||
|
||||
remoteAddr1 := ma.StringCast("/ip4/1.2.3.4/tcp/1234")
|
||||
|
||||
connGater := NewConnectionGater(2, log)
|
||||
|
||||
// Test peer blocking
|
||||
allow := connGater.InterceptPeerDial(peerA)
|
||||
require.True(t, allow)
|
||||
|
||||
// Test connection was secured and upgraded
|
||||
allow = connGater.InterceptSecured(network.DirInbound, peerA, &mockConnMultiaddrs{local: nil, remote: nil})
|
||||
require.True(t, allow)
|
||||
|
||||
connection1 := h1.Network().Conns()[0]
|
||||
|
||||
allow, reason := connGater.InterceptUpgraded(connection1)
|
||||
require.True(t, allow)
|
||||
require.Equal(t, control.DisconnectReason(0), reason)
|
||||
|
||||
// Test addr and subnet blocking
|
||||
allow = connGater.InterceptAddrDial(peerA, remoteAddr1)
|
||||
require.True(t, allow)
|
||||
|
||||
// Bellow the connection limit
|
||||
allow = connGater.InterceptAccept(&mockConnMultiaddrs{local: nil, remote: remoteAddr1})
|
||||
require.True(t, allow)
|
||||
|
||||
ip, err := manet.ToIP(remoteAddr1)
|
||||
require.NoError(t, err)
|
||||
connGater.limiter[ip.String()] = 3
|
||||
|
||||
// Above the connection limit
|
||||
allow = connGater.InterceptAccept(&mockConnMultiaddrs{local: nil, remote: remoteAddr1})
|
||||
require.False(t, allow)
|
||||
|
||||
// Call twice NotifyDisconnect to get bellow the limit(2): 3 -> 1
|
||||
connGater.NotifyDisconnect(remoteAddr1)
|
||||
connGater.NotifyDisconnect(remoteAddr1)
|
||||
|
||||
// Bellow the connection limit again
|
||||
allow = connGater.validateInboundConn(remoteAddr1)
|
||||
require.True(t, allow)
|
||||
|
||||
}
|
|
@ -22,12 +22,34 @@ func TestServiceSlot(t *testing.T) {
|
|||
fetchedPeers, err := slots.getPeers(protocol).getRandom(1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, peerID, maps.Keys(fetchedPeers)[0])
|
||||
//TODO: Add test to get more than 1 peers
|
||||
//
|
||||
slots.getPeers(protocol).remove(peerID)
|
||||
//
|
||||
_, err = slots.getPeers(protocol).getRandom(1)
|
||||
require.Equal(t, err, ErrNoPeersAvailable)
|
||||
|
||||
// Test with more peers
|
||||
peerID2 := peer.ID("peerId2")
|
||||
peerID3 := peer.ID("peerId3")
|
||||
|
||||
//
|
||||
slots.getPeers(protocol).add(peerID2)
|
||||
slots.getPeers(protocol).add(peerID3)
|
||||
//
|
||||
|
||||
fetchedPeers, err = slots.getPeers(protocol).getRandom(2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(maps.Keys(fetchedPeers)))
|
||||
|
||||
// Check for uniqueness
|
||||
require.NotEqual(t, maps.Keys(fetchedPeers)[0], maps.Keys(fetchedPeers)[1])
|
||||
|
||||
slots.getPeers(protocol).remove(peerID2)
|
||||
|
||||
fetchedPeers, err = slots.getPeers(protocol).getRandom(10)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, peerID3, maps.Keys(fetchedPeers)[0])
|
||||
|
||||
}
|
||||
|
||||
func TestServiceSlotRemovePeerFromAll(t *testing.T) {
|
||||
|
|
|
@ -0,0 +1,213 @@
|
|||
package peermanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, relay.Broadcaster) {
|
||||
|
||||
broadcaster := relay.NewBroadcaster(10)
|
||||
require.NoError(t, broadcaster.Start(context.Background()))
|
||||
|
||||
port, err := tests.FindFreePort(t, "", 5)
|
||||
require.NoError(t, err)
|
||||
|
||||
h, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
broadcaster.RegisterForAll()
|
||||
|
||||
r := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(),
|
||||
prometheus.DefaultRegisterer, log)
|
||||
|
||||
r.SetHost(h)
|
||||
|
||||
return r, h, broadcaster
|
||||
}
|
||||
|
||||
func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) (*PeerManager, event.Bus) {
|
||||
// Host 1 used by peer manager
|
||||
pm := NewPeerManager(10, 20, nil, utils.Logger())
|
||||
pm.SetHost(*h)
|
||||
|
||||
// Create a new relay event bus
|
||||
relayEvtBus := r.Events()
|
||||
|
||||
// Subscribe to EventBus
|
||||
err := pm.SubscribeToRelayEvtBus(relayEvtBus)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Register necessary protocols
|
||||
pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField)
|
||||
return pm, relayEvtBus
|
||||
}
|
||||
|
||||
func emitTopicEvent(pubSubTopic string, peerID peer.ID, emitter event.Emitter, state relay.PeerTopicState) error {
|
||||
|
||||
peerEvt := relay.EvtPeerTopic{
|
||||
PubsubTopic: pubSubTopic,
|
||||
PeerID: peerID,
|
||||
State: state,
|
||||
}
|
||||
|
||||
return emitter.Emit(peerEvt)
|
||||
}
|
||||
|
||||
func TestSubscribeToRelayEvtBus(t *testing.T) {
|
||||
log := utils.Logger()
|
||||
|
||||
// Host 1
|
||||
r, h1, _ := makeWakuRelay(t, log)
|
||||
|
||||
// Host 1 used by peer manager
|
||||
pm := NewPeerManager(10, 20, nil, utils.Logger())
|
||||
pm.SetHost(h1)
|
||||
|
||||
// Create a new relay event bus
|
||||
relayEvtBus := r.Events()
|
||||
|
||||
// Subscribe to EventBus
|
||||
err := pm.SubscribeToRelayEvtBus(relayEvtBus)
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
|
||||
func TestHandleRelayTopicSubscription(t *testing.T) {
|
||||
log := utils.Logger()
|
||||
pubSubTopic := "/waku/2/go/pm/test"
|
||||
ctx := context.Background()
|
||||
|
||||
// Relay and Host
|
||||
r, h1, _ := makeWakuRelay(t, log)
|
||||
err := r.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Peer manager with event bus
|
||||
pm, _ := makePeerManagerWithEventBus(t, r, &h1)
|
||||
pm.ctx = ctx
|
||||
|
||||
// Start event loop to listen to events
|
||||
ctxEventLoop, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go pm.peerEventLoop(ctxEventLoop)
|
||||
|
||||
// Subscribe to Pubsub topic
|
||||
_, err = r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for event loop to call handler
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Check Peer Manager knows about the topic
|
||||
pm.topicMutex.RLock()
|
||||
_, ok := pm.subRelayTopics[pubSubTopic]
|
||||
require.True(t, ok)
|
||||
pm.topicMutex.RUnlock()
|
||||
|
||||
// UnSubscribe from Pubsub topic
|
||||
err = r.Unsubscribe(ctx, protocol.NewContentFilter(pubSubTopic))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for event loop to call handler
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Check the original topic was removed from Peer Manager
|
||||
pm.topicMutex.RLock()
|
||||
_, ok = pm.subRelayTopics[pubSubTopic]
|
||||
require.False(t, ok)
|
||||
pm.topicMutex.RUnlock()
|
||||
|
||||
r.Stop()
|
||||
}
|
||||
|
||||
func TestHandlePeerTopicEvent(t *testing.T) {
|
||||
log := utils.Logger()
|
||||
pubSubTopic := "/waku/2/go/pm/test"
|
||||
ctx := context.Background()
|
||||
|
||||
hosts := make([]host.Host, 5)
|
||||
relays := make([]*relay.WakuRelay, 5)
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
relays[i], hosts[i], _ = makeWakuRelay(t, log)
|
||||
err := relays[i].Start(ctx)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Create peer manager instance with the first hosts
|
||||
pm, eventBus := makePeerManagerWithEventBus(t, relays[0], &hosts[0])
|
||||
pm.ctx = ctx
|
||||
pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField)
|
||||
|
||||
// Connect host[0] with all other hosts to reach 4 connections
|
||||
for i := 1; i < 5; i++ {
|
||||
pm.host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Addrs(), peerstore.PermanentAddrTTL)
|
||||
err := pm.host.Connect(ctx, hosts[i].Peerstore().PeerInfo(hosts[i].ID()))
|
||||
require.NoError(t, err)
|
||||
err = pm.host.Peerstore().(wps.WakuPeerstore).SetDirection(hosts[i].ID(), network.DirOutbound)
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
|
||||
// Wait for connections to settle
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
if len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic)) == 0 {
|
||||
log.Info("No peers for the topic yet")
|
||||
}
|
||||
|
||||
// Subscribe to Pubsub topic on first host only
|
||||
_, err := relays[0].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start event loop to listen to events
|
||||
ctxEventLoop := context.Background()
|
||||
go pm.peerEventLoop(ctxEventLoop)
|
||||
|
||||
// Prepare emitter
|
||||
emitter, err := eventBus.Emitter(new(relay.EvtPeerTopic))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Send PEER_JOINED events for hosts 2-5
|
||||
for i := 1; i < 5; i++ {
|
||||
err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_JOINED)
|
||||
require.NoError(t, err)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Check four hosts have joined the topic
|
||||
require.Equal(t, 4, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic)))
|
||||
|
||||
// Check all hosts have been connected
|
||||
for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) {
|
||||
require.Equal(t, network.Connected, pm.host.Network().Connectedness(peer))
|
||||
}
|
||||
|
||||
// Send PEER_LEFT events for hosts 2-5
|
||||
for i := 1; i < 5; i++ {
|
||||
err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_LEFT)
|
||||
require.NoError(t, err)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Check all hosts have left the topic
|
||||
require.Equal(t, 0, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic)))
|
||||
|
||||
}
|
Loading…
Reference in New Issue