rewrite test for rpc fragmentation

This commit is contained in:
Yusef Napora 2020-05-05 11:47:46 -04:00 committed by vyzo
parent 27f009a9c7
commit 8642662340

View File

@ -1487,46 +1487,6 @@ func TestGossipsubPiggybackControl(t *testing.T) {
}
}
func TestGossipsubRPCFragmentation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
ps := getGossipsub(ctx, h)
ps.maxMessageSize = 500
blah := peer.ID("bogotr0n")
msgch := make(chan *RPC, 10)
ps.peers[blah] = msgch
ps.eval <- func() {
gs := ps.rt.(*GossipSubRouter)
topic := "test1"
gs.mesh[topic] = make(map[peer.ID]struct{})
gs.mesh[topic][blah] = struct{}{}
rpc := &RPC{RPC: pb.RPC{}, from: h.ID()}
msgs := make([]*pb.Message, 10)
for i := 0; i < 10; i++ {
msgs[i] = makeTestMessage(i)
msgs[i].Data = make([]byte, 100)
rand.Read(msgs[i].Data)
}
rpc.Publish = msgs
gs.sendRPC(blah, rpc)
close(msgch)
delete(ps.peers, blah)
}
rpcs := make([]*RPC, 0, 10)
for rpc := range msgch {
rpcs = append(rpcs, rpc)
}
if len(rpcs) < 2 {
t.Fatalf("expected large rpc message to be fragmented before sending on output channel, got %d rpcs", len(rpcs))
}
}
func TestGossipsubOpportunisticGrafting(t *testing.T) {
originalGossipSubPruneBackoff := GossipSubPruneBackoff
GossipSubPruneBackoff = 500 * time.Millisecond
@ -1757,3 +1717,140 @@ func (ps *mockPeerScoreInspector) score(p peer.ID) float64 {
defer ps.mx.Unlock()
return ps.scores[p]
}
func TestGossipsubRPCFragmentation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
ps := getGossipsub(ctx, hosts[0])
// make a fake peer that requests everything through IWANT gossip
iwe := iwantEverything{h: hosts[1]}
iwe.h.SetStreamHandler(GossipSubID_v10, iwe.handleStream)
connect(t, hosts[0], hosts[1])
// have the real pubsub join the test topic
_, err := ps.Subscribe("test")
if err != nil {
t.Fatal(err)
}
// wait for the real pubsub to connect and try to graft to the faker
time.Sleep(time.Second)
// publish a bunch of fairly large messages from the real host
nMessages := 1000
msgSize := 20000
for i := 0; i < nMessages; i++ {
msg := make([]byte, msgSize)
rand.Read(msg)
ps.Publish("test", msg)
time.Sleep(20 * time.Millisecond)
}
// wait a bit for them to be received via gossip by the fake peer
time.Sleep(5 * time.Second)
// we should have received all the messages
if iwe.msgsReceived != nMessages {
t.Fatalf("expected fake gossipsub peer to receive all messages, got %d / %d", iwe.msgsReceived, nMessages)
}
// and we should have seen an IHAVE message for each of them
if iwe.ihavesReceived != nMessages {
t.Fatalf("expected to get IHAVEs for every message, got %d / %d", iwe.ihavesReceived, nMessages)
}
// If everything were fragmented with maximum efficiency, we would expect to get
// (nMessages * msgSize) / ps.maxMessageSize total RPCs containing the messages we sent IWANTs for.
// The actual number will probably be larger, since there's some overhead for the RPC itself, and
// we probably aren't packing each RPC to it's maximum size
minExpectedRPCS := (nMessages * msgSize) / ps.maxMessageSize
if iwe.rpcsWithMessages < minExpectedRPCS {
t.Fatalf("expected to receive at least %d RPCs containing messages, got %d", minExpectedRPCS, iwe.rpcsWithMessages)
}
}
// iwantEverything is a simple gossipsub client that never grafts onto a mesh,
// instead requesting everything through IWANT gossip messages. It is used to
// test that large responses to IWANT requests are fragmented into multiple RPCs.
type iwantEverything struct {
h host.Host
rpcsWithMessages int
msgsReceived int
ihavesReceived int
}
func (iwe *iwantEverything) handleStream(s network.Stream) {
os, err := iwe.h.NewStream(context.Background(), s.Conn().RemotePeer(), GossipSubID_v10)
if err != nil {
panic(err)
}
msgIdsReceived := make(map[string]struct{})
gossipMsgIdsReceived := make(map[string]struct{})
// send a subscription for test in the output stream to become candidate for gossip
r := ggio.NewDelimitedReader(s, 1<<20)
w := ggio.NewDelimitedWriter(os)
truth := true
topic := "test"
err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}})
remotePidBytes, err := s.Conn().RemotePeer().MarshalBinary()
if err != nil {
panic(err)
}
toPrune := []*pb.PeerInfo{{PeerID: remotePidBytes}}
var rpc pb.RPC
for {
rpc.Reset()
err = r.ReadMsg(&rpc)
if err != nil {
if err != io.EOF {
s.Reset()
}
return
}
if len(rpc.Publish) != 0 {
iwe.rpcsWithMessages++
}
// keep track of unique message ids received
for _, msg := range rpc.Publish {
id := string(msg.Seqno)
if _, seen := msgIdsReceived[id]; !seen {
iwe.msgsReceived++
}
msgIdsReceived[id] = struct{}{}
}
if rpc.Control != nil {
// send a PRUNE for all grafts, so we don't get direct message deliveries
var prunes []*pb.ControlPrune
for _, graft := range rpc.Control.Graft {
prunes = append(prunes, &pb.ControlPrune{TopicID: graft.TopicID, Peers: toPrune})
}
var iwants []*pb.ControlIWant
for _, ihave := range rpc.Control.Ihave {
iwants = append(iwants, &pb.ControlIWant{MessageIDs: ihave.MessageIDs})
for _, msgId := range ihave.MessageIDs {
if _, seen := gossipMsgIdsReceived[msgId]; !seen {
iwe.ihavesReceived++
}
gossipMsgIdsReceived[msgId] = struct{}{}
}
}
out := rpcWithControl(nil, nil, iwants, nil, prunes)
err = w.WriteMsg(out)
if err != nil {
panic(err)
}
}
}
}