fragment large RPCs in sendRPC

This commit is contained in:
Yusef Napora 2020-05-05 10:11:18 -04:00 committed by vyzo
parent 5bbe37191a
commit 27f009a9c7
2 changed files with 118 additions and 5 deletions

View File

@ -899,18 +899,91 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
return
}
select {
case mch <- out:
gs.tracer.SendRPC(out, p)
default:
log.Infof("dropping message to peer %s: queue full", p)
// fragment into multiple RPCs if necessary
outRPCs, err := fragmentRPC(out, gs.p.maxMessageSize)
if err != nil {
log.Warnf("unable to fragment message to peer %s: %s", p, err)
gs.tracer.DropRPC(out, p)
// push control messages that need to be retried
ctl := out.GetControl()
if ctl != nil {
gs.pushControl(p, ctl)
}
return
}
for _, rpc := range outRPCs {
select {
case mch <- rpc:
gs.tracer.SendRPC(rpc, p)
default:
log.Infof("dropping message to peer %s: queue full", p)
gs.tracer.DropRPC(rpc, p)
// push control messages that need to be retried
ctl := rpc.GetControl()
if ctl != nil {
gs.pushControl(p, ctl)
}
}
}
}
func fragmentRPC(rpc *RPC, limit int) ([]*RPC, error) {
if rpc.Size() < limit {
return []*RPC{rpc}, nil
}
c := (rpc.Size() / limit) + 1
rpcs := make([]*RPC, 1, c)
rpcs[0] = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from}
outRPC := func(sizeToAdd int) *RPC {
current := rpcs[len(rpcs)-1]
if current.Size()+sizeToAdd < limit {
return current
}
next := &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from}
rpcs = append(rpcs, next)
return next
}
for _, msg := range rpc.GetPublish() {
s := msg.Size()
// if an individual message is too large, we can't fragment it and have to fail entirely
if s > limit {
return nil, fmt.Errorf("message with len=%d exceeds limit %d", s, limit)
}
out := outRPC(s)
out.Publish = append(out.Publish, msg)
}
for _, sub := range rpc.GetSubscriptions() {
out := outRPC(sub.Size())
out.Subscriptions = append(out.Subscriptions, sub)
}
ctl := rpc.GetControl()
if ctl == nil {
// if there were no control messages, we're done
return rpcs, nil
}
for _, graft := range ctl.Graft {
out := outRPC(graft.Size())
out.Control.Graft = append(out.Control.Graft, graft)
}
for _, prune := range ctl.Prune {
out := outRPC(prune.Size())
out.Control.Prune = append(out.Control.Prune, prune)
}
for _, ihave := range ctl.Ihave {
out := outRPC(ihave.Size())
out.Control.Ihave = append(out.Control.Ihave, ihave)
}
for _, iwant := range ctl.Iwant {
out := outRPC(iwant.Size())
out.Control.Iwant = append(out.Control.Iwant, iwant)
}
return rpcs, nil
}
func (gs *GossipSubRouter) heartbeatTimer() {

View File

@ -1487,6 +1487,46 @@ func TestGossipsubPiggybackControl(t *testing.T) {
}
}
func TestGossipsubRPCFragmentation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
ps := getGossipsub(ctx, h)
ps.maxMessageSize = 500
blah := peer.ID("bogotr0n")
msgch := make(chan *RPC, 10)
ps.peers[blah] = msgch
ps.eval <- func() {
gs := ps.rt.(*GossipSubRouter)
topic := "test1"
gs.mesh[topic] = make(map[peer.ID]struct{})
gs.mesh[topic][blah] = struct{}{}
rpc := &RPC{RPC: pb.RPC{}, from: h.ID()}
msgs := make([]*pb.Message, 10)
for i := 0; i < 10; i++ {
msgs[i] = makeTestMessage(i)
msgs[i].Data = make([]byte, 100)
rand.Read(msgs[i].Data)
}
rpc.Publish = msgs
gs.sendRPC(blah, rpc)
close(msgch)
delete(ps.peers, blah)
}
rpcs := make([]*RPC, 0, 10)
for rpc := range msgch {
rpcs = append(rpcs, rpc)
}
if len(rpcs) < 2 {
t.Fatalf("expected large rpc message to be fragmented before sending on output channel, got %d rpcs", len(rpcs))
}
}
func TestGossipsubOpportunisticGrafting(t *testing.T) {
originalGossipSubPruneBackoff := GossipSubPruneBackoff
GossipSubPruneBackoff = 500 * time.Millisecond