diff --git a/gossipsub_test.go b/gossipsub_test.go index da5f630..3bbc0ad 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -1487,46 +1487,6 @@ func TestGossipsubPiggybackControl(t *testing.T) { } } -func TestGossipsubRPCFragmentation(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) - ps := getGossipsub(ctx, h) - ps.maxMessageSize = 500 - - blah := peer.ID("bogotr0n") - msgch := make(chan *RPC, 10) - ps.peers[blah] = msgch - ps.eval <- func() { - gs := ps.rt.(*GossipSubRouter) - topic := "test1" - gs.mesh[topic] = make(map[peer.ID]struct{}) - gs.mesh[topic][blah] = struct{}{} - - rpc := &RPC{RPC: pb.RPC{}, from: h.ID()} - msgs := make([]*pb.Message, 10) - for i := 0; i < 10; i++ { - msgs[i] = makeTestMessage(i) - msgs[i].Data = make([]byte, 100) - rand.Read(msgs[i].Data) - } - rpc.Publish = msgs - - gs.sendRPC(blah, rpc) - close(msgch) - delete(ps.peers, blah) - } - - rpcs := make([]*RPC, 0, 10) - for rpc := range msgch { - rpcs = append(rpcs, rpc) - } - if len(rpcs) < 2 { - t.Fatalf("expected large rpc message to be fragmented before sending on output channel, got %d rpcs", len(rpcs)) - } -} - func TestGossipsubOpportunisticGrafting(t *testing.T) { originalGossipSubPruneBackoff := GossipSubPruneBackoff GossipSubPruneBackoff = 500 * time.Millisecond @@ -1757,3 +1717,140 @@ func (ps *mockPeerScoreInspector) score(p peer.ID) float64 { defer ps.mx.Unlock() return ps.scores[p] } + +func TestGossipsubRPCFragmentation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 2) + ps := getGossipsub(ctx, hosts[0]) + + // make a fake peer that requests everything through IWANT gossip + iwe := iwantEverything{h: hosts[1]} + iwe.h.SetStreamHandler(GossipSubID_v10, iwe.handleStream) + + connect(t, hosts[0], hosts[1]) + + // have the real pubsub join the test topic + _, err := ps.Subscribe("test") + if err != nil { + t.Fatal(err) + } + + // wait for the real pubsub to connect and try to graft to the faker + time.Sleep(time.Second) + + // publish a bunch of fairly large messages from the real host + nMessages := 1000 + msgSize := 20000 + for i := 0; i < nMessages; i++ { + msg := make([]byte, msgSize) + rand.Read(msg) + ps.Publish("test", msg) + time.Sleep(20 * time.Millisecond) + } + + // wait a bit for them to be received via gossip by the fake peer + time.Sleep(5 * time.Second) + + // we should have received all the messages + if iwe.msgsReceived != nMessages { + t.Fatalf("expected fake gossipsub peer to receive all messages, got %d / %d", iwe.msgsReceived, nMessages) + } + + // and we should have seen an IHAVE message for each of them + if iwe.ihavesReceived != nMessages { + t.Fatalf("expected to get IHAVEs for every message, got %d / %d", iwe.ihavesReceived, nMessages) + } + + // If everything were fragmented with maximum efficiency, we would expect to get + // (nMessages * msgSize) / ps.maxMessageSize total RPCs containing the messages we sent IWANTs for. + // The actual number will probably be larger, since there's some overhead for the RPC itself, and + // we probably aren't packing each RPC to it's maximum size + minExpectedRPCS := (nMessages * msgSize) / ps.maxMessageSize + if iwe.rpcsWithMessages < minExpectedRPCS { + t.Fatalf("expected to receive at least %d RPCs containing messages, got %d", minExpectedRPCS, iwe.rpcsWithMessages) + } +} + +// iwantEverything is a simple gossipsub client that never grafts onto a mesh, +// instead requesting everything through IWANT gossip messages. It is used to +// test that large responses to IWANT requests are fragmented into multiple RPCs. +type iwantEverything struct { + h host.Host + rpcsWithMessages int + msgsReceived int + ihavesReceived int +} + +func (iwe *iwantEverything) handleStream(s network.Stream) { + os, err := iwe.h.NewStream(context.Background(), s.Conn().RemotePeer(), GossipSubID_v10) + if err != nil { + panic(err) + } + + msgIdsReceived := make(map[string]struct{}) + gossipMsgIdsReceived := make(map[string]struct{}) + + // send a subscription for test in the output stream to become candidate for gossip + r := ggio.NewDelimitedReader(s, 1<<20) + w := ggio.NewDelimitedWriter(os) + truth := true + topic := "test" + err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}}) + + remotePidBytes, err := s.Conn().RemotePeer().MarshalBinary() + if err != nil { + panic(err) + } + toPrune := []*pb.PeerInfo{{PeerID: remotePidBytes}} + + var rpc pb.RPC + for { + rpc.Reset() + err = r.ReadMsg(&rpc) + if err != nil { + if err != io.EOF { + s.Reset() + } + return + } + + if len(rpc.Publish) != 0 { + iwe.rpcsWithMessages++ + } + // keep track of unique message ids received + for _, msg := range rpc.Publish { + id := string(msg.Seqno) + if _, seen := msgIdsReceived[id]; !seen { + iwe.msgsReceived++ + } + msgIdsReceived[id] = struct{}{} + } + + if rpc.Control != nil { + // send a PRUNE for all grafts, so we don't get direct message deliveries + var prunes []*pb.ControlPrune + for _, graft := range rpc.Control.Graft { + prunes = append(prunes, &pb.ControlPrune{TopicID: graft.TopicID, Peers: toPrune}) + } + + var iwants []*pb.ControlIWant + for _, ihave := range rpc.Control.Ihave { + iwants = append(iwants, &pb.ControlIWant{MessageIDs: ihave.MessageIDs}) + for _, msgId := range ihave.MessageIDs { + if _, seen := gossipMsgIdsReceived[msgId]; !seen { + iwe.ihavesReceived++ + } + gossipMsgIdsReceived[msgId] = struct{}{} + } + } + + out := rpcWithControl(nil, nil, iwants, nil, prunes) + err = w.WriteMsg(out) + if err != nil { + panic(err) + } + } + } +}