From bac5d5910ca100d57ae20905413a6085dfa15e75 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 22 Apr 2020 18:31:12 +0300 Subject: [PATCH] add test for opportunistic grafting --- gossipsub_test.go | 150 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) diff --git a/gossipsub_test.go b/gossipsub_test.go index 88dda36..298830a 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "math/rand" "testing" "time" @@ -11,11 +12,14 @@ import ( pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" bhost "github.com/libp2p/go-libp2p-blankhost" swarmt "github.com/libp2p/go-libp2p-swarm/testing" + + ggio "github.com/gogo/protobuf/io" ) func getGossipsub(ctx context.Context, h host.Host, opts ...Option) *PubSub { @@ -1269,3 +1273,149 @@ func TestGossipsubPiggybackControl(t *testing.T) { t.Fatal("expected test3 as prune topic ID") } } + +func TestGossipsubOpportunisticGrafting(t *testing.T) { + originalGossipSubOpportunisticGraftTicks := GossipSubOpportunisticGraftTicks + GossipSubOpportunisticGraftTicks = 2 + defer func() { + GossipSubOpportunisticGraftTicks = originalGossipSubOpportunisticGraftTicks + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 50) + // pubsubs for the first 10 hosts + psubs := getGossipsubs(ctx, hosts[:10], + WithFloodPublish(true), + WithPeerScore( + &PeerScoreParams{ + AppSpecificScore: func(peer.ID) float64 { return 0 }, + AppSpecificWeight: 0, + DecayInterval: time.Second, + DecayToZero: 0.01, + Topics: map[string]*TopicScoreParams{ + "test": &TopicScoreParams{ + TopicWeight: 1, + TimeInMeshWeight: 0.0002777, + TimeInMeshQuantum: time.Second, + TimeInMeshCap: 3600, + FirstMessageDeliveriesWeight: 1, + FirstMessageDeliveriesDecay: 0.9997, + FirstMessageDeliveriesCap: 100, + InvalidMessageDeliveriesDecay: 0.99997, + }, + }, + }, + &PeerScoreThresholds{ + GossipThreshold: -10, + PublishThreshold: -100, + GraylistThreshold: -10000, + OpportunisticGraftThreshold: 1, + })) + + // connect the real hosts with degree 5 + connectSome(t, hosts[:10], 5) + + // sybil squatters for the remaining 40 hosts + squatters := make([]*sybilSquatter, 0, 40) + for _, h := range hosts[10:] { + squatter := &sybilSquatter{h: h} + h.SetStreamHandler(GossipSubID_v10, squatter.handleStream) + squatters = append(squatters, squatter) + } + + // connect all squatters to every real host + for _, squatter := range hosts[10:] { + for _, real := range hosts[:10] { + connect(t, squatter, real) + } + } + + // wait a bit for the connections to propagate events to the pubsubs + time.Sleep(time.Second) + + // ask the real pubsus to join the topic + for _, ps := range psubs { + sub, err := ps.Subscribe("test") + if err != nil { + t.Fatal(err) + } + // consume the messages + go func(sub *Subscription) { + for { + _, err := sub.Next(ctx) + if err != nil { + return + } + } + }(sub) + } + + // publish a bunch of messages from the real hosts + for i := 0; i < 1000; i++ { + msg := []byte(fmt.Sprintf("message %d", i)) + psubs[i%10].Publish("test", msg) + time.Sleep(20 * time.Millisecond) + } + + // now wait a few of oppgraft cycles + time.Sleep(7 * time.Second) + + // check the honest peer meshes, they should have at least 3 honest peers each + res := make(chan int, 1) + for _, ps := range psubs { + ps.eval <- func() { + gs := ps.rt.(*GossipSubRouter) + count := 0 + for _, h := range hosts[:10] { + _, ok := gs.mesh["test"][h.ID()] + if ok { + count++ + } + } + res <- count + } + + count := <-res + if count < 3 { + t.Fatalf("expected at least 3 honest peers, got %d", count) + } + } +} + +type sybilSquatter struct { + h host.Host +} + +func (sq *sybilSquatter) handleStream(s network.Stream) { + defer s.Close() + + os, err := sq.h.NewStream(context.Background(), s.Conn().RemotePeer(), GossipSubID_v10) + if err != nil { + panic(err) + } + + // send a subscription for test in the output stream to become candidate for GRAFT + // and then just read and ignore the incoming RPCs + r := ggio.NewDelimitedReader(s, 1<<20) + w := ggio.NewDelimitedWriter(os) + truth := true + topic := "test" + err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}}) + if err != nil { + panic(err) + } + + var rpc pb.RPC + for { + rpc.Reset() + err = r.ReadMsg(&rpc) + if err != nil { + if err != io.EOF { + s.Reset() + } + return + } + } +}