From f87e7b72d70c370117afffba21aa8657fc55cc78 Mon Sep 17 00:00:00 2001 From: Yusef Napora Date: Fri, 8 May 2020 21:34:34 -0400 Subject: [PATCH] test delivery tags vs sybil storm --- floodsub_test.go | 8 +- gossipsub_connmgr_test.go | 166 +++++++++++++++++++++++++++----------- 2 files changed, 121 insertions(+), 53 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 0ddd158..6245c12 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -41,16 +41,12 @@ func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Sub } } -func getNetHosts(t *testing.T, ctx context.Context, n int, options ...func() bhost.Option) []host.Host { +func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host { var out []host.Host for i := 0; i < n; i++ { netw := swarmt.GenSwarm(t, ctx) - opts := make([]bhost.Option, len(options)) - for i, optFn := range options { - opts[i] = optFn() - } - h := bhost.NewBlankHost(netw, opts...) + h := bhost.NewBlankHost(netw) out = append(out, h) } diff --git a/gossipsub_connmgr_test.go b/gossipsub_connmgr_test.go index 952bf1d..48fada9 100644 --- a/gossipsub_connmgr_test.go +++ b/gossipsub_connmgr_test.go @@ -2,6 +2,9 @@ package pubsub import ( "context" + "github.com/benbjohnson/clock" + "github.com/libp2p/go-libp2p-core/host" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" "testing" "time" @@ -10,59 +13,90 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -// This file has tests for gossipsub's interaction with the libp2p connection manager. -// We tag connections for three reasons: -// -// - direct peers get a `pubsub:direct` tag with a value of GossipSubConnTagValueDirectPeer -// - mesh members get a `pubsub:$topic` tag with a value of GossipSubConnTagValueMeshPeer (default 20) -// - applies for each topic they're a mesh member of -// - anyone who delivers a message first gets a bump to a decaying `pubsub:deliveries:$topic` tag - -func TestGossipsubConnTagDirectPeers(t *testing.T) { - // test that direct peers get tagged with GossipSubConnTagValueDirectPeer - t.Skip("coming soon") -} - -func TestGossipsubConnTagMeshPeers(t *testing.T) { - // test that mesh peers get tagged with GossipSubConnTagValueMeshPeer - t.Skip("coming soon") -} - func TestGossipsubConnTagMessageDeliveries(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // set the gossipsub D parameters low, so that not all - // test peers will be in a mesh together oldGossipSubD := GossipSubD oldGossipSubDHi := GossipSubDhi + oldGossipSubConnTagDecayInterval := GossipSubConnTagDecayInterval + oldGossipSubConnTagMessageDeliveryCap := GossipSubConnTagMessageDeliveryCap + + oldSilencePeriod := connmgr.SilencePeriod + + // set the gossipsub D parameters low, so that we have some peers outside the mesh GossipSubD = 4 - GossipSubDhi = 4 + GossipSubDhi = 5 + // also set the tag decay interval so we don't have to wait forever for tests + GossipSubConnTagDecayInterval = time.Second + + // set the cap for deliveries above GossipSubConnTagValueMeshPeer, so the sybils + // will be forced out even if they end up in someone's mesh + GossipSubConnTagMessageDeliveryCap = 50 + + connmgr.SilencePeriod = time.Millisecond + // reset globals after test defer func() { GossipSubD = oldGossipSubD GossipSubDhi = oldGossipSubDHi + GossipSubConnTagDecayInterval = oldGossipSubConnTagDecayInterval + GossipSubConnTagMessageDeliveryCap = oldGossipSubConnTagMessageDeliveryCap + connmgr.SilencePeriod = oldSilencePeriod }() + decayClock := clock.NewMock() decayCfg := connmgr.DecayerCfg{ Resolution: time.Second, + Clock: decayClock, } - hosts := getNetHosts(t, ctx, 20, func() bhost.Option { - return bhost.WithConnectionManager( - connmgr.NewConnManager(1, 30, 10*time.Millisecond, - connmgr.DecayerConfig(&decayCfg))) - }) + nHonest := 20 + nSquatter := 60 + connLimit := 30 + + connmgrs := make([]*connmgr.BasicConnMgr, nHonest) + honestHosts := make([]host.Host, nHonest) + honestPeers := make(map[peer.ID]struct{}) + + for i := 0; i < nHonest; i++ { + connmgrs[i] = connmgr.NewConnManager(nHonest, connLimit, 0, + connmgr.DecayerConfig(&decayCfg)) + + netw := swarmt.GenSwarm(t, ctx) + netw.Notify(connmgrs[i].Notifee()) // TODO: move this to go-libp2p-blankhost + h := bhost.NewBlankHost(netw, bhost.WithConnectionManager(connmgrs[i])) + honestHosts[i] = h + honestPeers[h.ID()] = struct{}{} + } // use flood publishing, so non-mesh peers will still be delivering messages // to everyone - psubs := getGossipsubs(ctx, hosts, + psubs := getGossipsubs(ctx, honestHosts, WithFloodPublish(true)) - denseConnect(t, hosts) + + // sybil squatters to be connected later + sybilHosts := getNetHosts(t, ctx, nSquatter) + squatters := make([]*sybilSquatter, 0, nSquatter) + for _, h := range sybilHosts { + squatter := &sybilSquatter{h: h} + h.SetStreamHandler(GossipSubID_v10, squatter.handleStream) + squatters = append(squatters, squatter) + } + + // connect the honest hosts + connectAll(t, honestHosts) + + for _, h := range honestHosts { + if len(h.Network().Conns()) != nHonest-1 { + t.Errorf("expected to have conns to all honest peers, have %d", len(h.Network().Conns())) + } + } // subscribe everyone to the topic + topic := "test" var msgs []*Subscription for _, ps := range psubs { - subch, err := ps.Subscribe("foobar") + subch, err := ps.Subscribe(topic) if err != nil { t.Fatal(err) } @@ -70,28 +104,66 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) { msgs = append(msgs, subch) } - // wait a few heartbeats for meshes to form + // sleep to allow meshes to form time.Sleep(2 * time.Second) - // have all the hosts publish messages - -} - -func getMeshPeers(ps PubSub, topic string) []peer.ID { - gs := ps.rt.(*GossipSubRouter) - - peerCh := make(chan peer.ID) - ps.eval <- func() { - peers := gs.mesh[topic] - for pid, _ := range peers { - peerCh <- pid + // have all the hosts publish enough messages to ensure that they get some delivery credit + nMessages := 1000 + for _, ps := range psubs { + for i := 0; i < nMessages; i++ { + ps.Publish(topic, []byte("hello")) } - close(peerCh) } - var out []peer.ID - for pid := range peerCh { - out = append(out, pid) + // advance the fake time for the tag decay + decayClock.Add(time.Second) + + // verify that they've given each other delivery connection tags + tag := "pubsub-deliveries:test" + for _, h := range honestHosts { + for _, h2 := range honestHosts { + if h.ID() == h2.ID() { + continue + } + val := getTagValue(h.ConnManager(), h2.ID(), tag) + if val == 0 { + t.Errorf("Expected non-zero delivery tag value for peer %s", h2.ID()) + } + } + } + + // now connect the sybils to put pressure on the real hosts' connection managers + allHosts := append(honestHosts, sybilHosts...) + connectAll(t, allHosts) + + // verify that we have a bunch of connections + for _, h := range honestHosts { + if len(h.Network().Conns()) != nHonest+nSquatter-1 { + t.Errorf("expected to have conns to all peers, have %d", len(h.Network().Conns())) + } + } + + // force the connection managers to trim, so we don't need to muck about with timing as much + for _, cm := range connmgrs { + cm.TrimOpenConns(ctx) + } + + // we should still have conns to all the honest peers, but not the sybils + for _, h := range honestHosts { + nHonestConns := 0 + nDishonestConns := 0 + for _, conn := range h.Network().Conns() { + if _, ok := honestPeers[conn.RemotePeer()]; !ok { + nDishonestConns++ + } else { + nHonestConns++ + } + } + if nDishonestConns > 10 { + t.Errorf("expected most dishonest conns to be pruned, have %d", nDishonestConns) + } + if nHonestConns != nHonest-1 { + t.Errorf("expected all honest conns to be preserved, have %d", nHonestConns) + } } - return out }