diff --git a/waku/v2/peermanager/connection_gater_test.go b/waku/v2/peermanager/connection_gater_test.go new file mode 100644 index 00000000..17f74f46 --- /dev/null +++ b/waku/v2/peermanager/connection_gater_test.go @@ -0,0 +1,86 @@ +package peermanager + +import ( + "context" + "github.com/libp2p/go-libp2p/core/control" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peerstore" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/utils" + "testing" +) + +type mockConnMultiaddrs struct { + local, remote ma.Multiaddr +} + +func (m mockConnMultiaddrs) LocalMultiaddr() ma.Multiaddr { + return m.local +} + +func (m mockConnMultiaddrs) RemoteMultiaddr() ma.Multiaddr { + return m.remote +} + +func TestConnectionGater(t *testing.T) { + + log := utils.Logger() + + _, h1, _ := makeWakuRelay(t, log) + _, h2, _ := makeWakuRelay(t, log) + + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) + err := h1.Connect(context.Background(), h2.Peerstore().PeerInfo(h2.ID())) + require.NoError(t, err) + + h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) + err = h2.Connect(context.Background(), h1.Peerstore().PeerInfo(h1.ID())) + require.NoError(t, err) + + peerA := h1.ID() + + remoteAddr1 := ma.StringCast("/ip4/1.2.3.4/tcp/1234") + + connGater := NewConnectionGater(2, log) + + // Test peer blocking + allow := connGater.InterceptPeerDial(peerA) + require.True(t, allow) + + // Test connection was secured and upgraded + allow = connGater.InterceptSecured(network.DirInbound, peerA, &mockConnMultiaddrs{local: nil, remote: nil}) + require.True(t, allow) + + connection1 := h1.Network().Conns()[0] + + allow, reason := connGater.InterceptUpgraded(connection1) + require.True(t, allow) + require.Equal(t, control.DisconnectReason(0), reason) + + // Test addr and subnet blocking + allow = connGater.InterceptAddrDial(peerA, remoteAddr1) + require.True(t, allow) + + // Bellow the connection limit + allow = connGater.InterceptAccept(&mockConnMultiaddrs{local: nil, remote: remoteAddr1}) + require.True(t, allow) + + ip, err := manet.ToIP(remoteAddr1) + require.NoError(t, err) + connGater.limiter[ip.String()] = 3 + + // Above the connection limit + allow = connGater.InterceptAccept(&mockConnMultiaddrs{local: nil, remote: remoteAddr1}) + require.False(t, allow) + + // Call twice NotifyDisconnect to get bellow the limit(2): 3 -> 1 + connGater.NotifyDisconnect(remoteAddr1) + connGater.NotifyDisconnect(remoteAddr1) + + // Bellow the connection limit again + allow = connGater.validateInboundConn(remoteAddr1) + require.True(t, allow) + +} diff --git a/waku/v2/peermanager/service_slot_test.go b/waku/v2/peermanager/service_slot_test.go index a9d944a7..80687bbf 100644 --- a/waku/v2/peermanager/service_slot_test.go +++ b/waku/v2/peermanager/service_slot_test.go @@ -22,12 +22,34 @@ func TestServiceSlot(t *testing.T) { fetchedPeers, err := slots.getPeers(protocol).getRandom(1) require.NoError(t, err) require.Equal(t, peerID, maps.Keys(fetchedPeers)[0]) - //TODO: Add test to get more than 1 peers // slots.getPeers(protocol).remove(peerID) // _, err = slots.getPeers(protocol).getRandom(1) require.Equal(t, err, ErrNoPeersAvailable) + + // Test with more peers + peerID2 := peer.ID("peerId2") + peerID3 := peer.ID("peerId3") + + // + slots.getPeers(protocol).add(peerID2) + slots.getPeers(protocol).add(peerID3) + // + + fetchedPeers, err = slots.getPeers(protocol).getRandom(2) + require.NoError(t, err) + require.Equal(t, 2, len(maps.Keys(fetchedPeers))) + + // Check for uniqueness + require.NotEqual(t, maps.Keys(fetchedPeers)[0], maps.Keys(fetchedPeers)[1]) + + slots.getPeers(protocol).remove(peerID2) + + fetchedPeers, err = slots.getPeers(protocol).getRandom(10) + require.NoError(t, err) + require.Equal(t, peerID3, maps.Keys(fetchedPeers)[0]) + } func TestServiceSlotRemovePeerFromAll(t *testing.T) { diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go new file mode 100644 index 00000000..8fc8da94 --- /dev/null +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -0,0 +1,213 @@ +package peermanager + +import ( + "context" + "crypto/rand" + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" + "testing" + "time" +) + +func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, relay.Broadcaster) { + + broadcaster := relay.NewBroadcaster(10) + require.NoError(t, broadcaster.Start(context.Background())) + + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + h, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + + broadcaster.RegisterForAll() + + r := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), + prometheus.DefaultRegisterer, log) + + r.SetHost(h) + + return r, h, broadcaster +} + +func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) (*PeerManager, event.Bus) { + // Host 1 used by peer manager + pm := NewPeerManager(10, 20, nil, utils.Logger()) + pm.SetHost(*h) + + // Create a new relay event bus + relayEvtBus := r.Events() + + // Subscribe to EventBus + err := pm.SubscribeToRelayEvtBus(relayEvtBus) + require.NoError(t, err) + + // Register necessary protocols + pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) + return pm, relayEvtBus +} + +func emitTopicEvent(pubSubTopic string, peerID peer.ID, emitter event.Emitter, state relay.PeerTopicState) error { + + peerEvt := relay.EvtPeerTopic{ + PubsubTopic: pubSubTopic, + PeerID: peerID, + State: state, + } + + return emitter.Emit(peerEvt) +} + +func TestSubscribeToRelayEvtBus(t *testing.T) { + log := utils.Logger() + + // Host 1 + r, h1, _ := makeWakuRelay(t, log) + + // Host 1 used by peer manager + pm := NewPeerManager(10, 20, nil, utils.Logger()) + pm.SetHost(h1) + + // Create a new relay event bus + relayEvtBus := r.Events() + + // Subscribe to EventBus + err := pm.SubscribeToRelayEvtBus(relayEvtBus) + require.NoError(t, err) + +} + +func TestHandleRelayTopicSubscription(t *testing.T) { + log := utils.Logger() + pubSubTopic := "/waku/2/go/pm/test" + ctx := context.Background() + + // Relay and Host + r, h1, _ := makeWakuRelay(t, log) + err := r.Start(ctx) + require.NoError(t, err) + + // Peer manager with event bus + pm, _ := makePeerManagerWithEventBus(t, r, &h1) + pm.ctx = ctx + + // Start event loop to listen to events + ctxEventLoop, cancel := context.WithCancel(context.Background()) + defer cancel() + go pm.peerEventLoop(ctxEventLoop) + + // Subscribe to Pubsub topic + _, err = r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + require.NoError(t, err) + + // Wait for event loop to call handler + time.Sleep(200 * time.Millisecond) + + // Check Peer Manager knows about the topic + pm.topicMutex.RLock() + _, ok := pm.subRelayTopics[pubSubTopic] + require.True(t, ok) + pm.topicMutex.RUnlock() + + // UnSubscribe from Pubsub topic + err = r.Unsubscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + require.NoError(t, err) + + // Wait for event loop to call handler + time.Sleep(200 * time.Millisecond) + + // Check the original topic was removed from Peer Manager + pm.topicMutex.RLock() + _, ok = pm.subRelayTopics[pubSubTopic] + require.False(t, ok) + pm.topicMutex.RUnlock() + + r.Stop() +} + +func TestHandlePeerTopicEvent(t *testing.T) { + log := utils.Logger() + pubSubTopic := "/waku/2/go/pm/test" + ctx := context.Background() + + hosts := make([]host.Host, 5) + relays := make([]*relay.WakuRelay, 5) + + for i := 0; i < 5; i++ { + relays[i], hosts[i], _ = makeWakuRelay(t, log) + err := relays[i].Start(ctx) + require.NoError(t, err) + } + + // Create peer manager instance with the first hosts + pm, eventBus := makePeerManagerWithEventBus(t, relays[0], &hosts[0]) + pm.ctx = ctx + pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) + + // Connect host[0] with all other hosts to reach 4 connections + for i := 1; i < 5; i++ { + pm.host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Addrs(), peerstore.PermanentAddrTTL) + err := pm.host.Connect(ctx, hosts[i].Peerstore().PeerInfo(hosts[i].ID())) + require.NoError(t, err) + err = pm.host.Peerstore().(wps.WakuPeerstore).SetDirection(hosts[i].ID(), network.DirOutbound) + require.NoError(t, err) + + } + + // Wait for connections to settle + time.Sleep(2 * time.Second) + + if len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic)) == 0 { + log.Info("No peers for the topic yet") + } + + // Subscribe to Pubsub topic on first host only + _, err := relays[0].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + require.NoError(t, err) + + // Start event loop to listen to events + ctxEventLoop := context.Background() + go pm.peerEventLoop(ctxEventLoop) + + // Prepare emitter + emitter, err := eventBus.Emitter(new(relay.EvtPeerTopic)) + require.NoError(t, err) + + // Send PEER_JOINED events for hosts 2-5 + for i := 1; i < 5; i++ { + err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_JOINED) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + } + + // Check four hosts have joined the topic + require.Equal(t, 4, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic))) + + // Check all hosts have been connected + for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { + require.Equal(t, network.Connected, pm.host.Network().Connectedness(peer)) + } + + // Send PEER_LEFT events for hosts 2-5 + for i := 1; i < 5; i++ { + err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_LEFT) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + } + + // Check all hosts have left the topic + require.Equal(t, 0, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic))) + +}