diff --git a/gossipsub.go b/gossipsub.go index aa499d9..fdb0054 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -899,18 +899,91 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) { return } - select { - case mch <- out: - gs.tracer.SendRPC(out, p) - default: - log.Infof("dropping message to peer %s: queue full", p) + // fragment into multiple RPCs if necessary + outRPCs, err := fragmentRPC(out, gs.p.maxMessageSize) + if err != nil { + log.Warnf("unable to fragment message to peer %s: %s", p, err) gs.tracer.DropRPC(out, p) // push control messages that need to be retried ctl := out.GetControl() if ctl != nil { gs.pushControl(p, ctl) } + return } + + for _, rpc := range outRPCs { + select { + case mch <- rpc: + gs.tracer.SendRPC(rpc, p) + default: + log.Infof("dropping message to peer %s: queue full", p) + gs.tracer.DropRPC(rpc, p) + // push control messages that need to be retried + ctl := rpc.GetControl() + if ctl != nil { + gs.pushControl(p, ctl) + } + } + } +} + +func fragmentRPC(rpc *RPC, limit int) ([]*RPC, error) { + if rpc.Size() < limit { + return []*RPC{rpc}, nil + } + + c := (rpc.Size() / limit) + 1 + rpcs := make([]*RPC, 1, c) + rpcs[0] = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from} + outRPC := func(sizeToAdd int) *RPC { + current := rpcs[len(rpcs)-1] + if current.Size()+sizeToAdd < limit { + return current + } + next := &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from} + rpcs = append(rpcs, next) + return next + } + + for _, msg := range rpc.GetPublish() { + s := msg.Size() + // if an individual message is too large, we can't fragment it and have to fail entirely + if s > limit { + return nil, fmt.Errorf("message with len=%d exceeds limit %d", s, limit) + } + out := outRPC(s) + out.Publish = append(out.Publish, msg) + } + + for _, sub := range rpc.GetSubscriptions() { + out := outRPC(sub.Size()) + out.Subscriptions = append(out.Subscriptions, sub) + } + + ctl := rpc.GetControl() + if ctl == nil { + // if there were no control messages, we're done + return rpcs, nil + } + for _, graft := range ctl.Graft { + out := outRPC(graft.Size()) + out.Control.Graft = append(out.Control.Graft, graft) + } + for _, prune := range ctl.Prune { + out := outRPC(prune.Size()) + out.Control.Prune = append(out.Control.Prune, prune) + } + for _, ihave := range ctl.Ihave { + out := outRPC(ihave.Size()) + out.Control.Ihave = append(out.Control.Ihave, ihave) + } + for _, iwant := range ctl.Iwant { + out := outRPC(iwant.Size()) + out.Control.Iwant = append(out.Control.Iwant, iwant) + } + + return rpcs, nil } func (gs *GossipSubRouter) heartbeatTimer() { diff --git a/gossipsub_test.go b/gossipsub_test.go index df977fa..da5f630 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -1487,6 +1487,46 @@ func TestGossipsubPiggybackControl(t *testing.T) { } } +func TestGossipsubRPCFragmentation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + ps := getGossipsub(ctx, h) + ps.maxMessageSize = 500 + + blah := peer.ID("bogotr0n") + msgch := make(chan *RPC, 10) + ps.peers[blah] = msgch + ps.eval <- func() { + gs := ps.rt.(*GossipSubRouter) + topic := "test1" + gs.mesh[topic] = make(map[peer.ID]struct{}) + gs.mesh[topic][blah] = struct{}{} + + rpc := &RPC{RPC: pb.RPC{}, from: h.ID()} + msgs := make([]*pb.Message, 10) + for i := 0; i < 10; i++ { + msgs[i] = makeTestMessage(i) + msgs[i].Data = make([]byte, 100) + rand.Read(msgs[i].Data) + } + rpc.Publish = msgs + + gs.sendRPC(blah, rpc) + close(msgch) + delete(ps.peers, blah) + } + + rpcs := make([]*RPC, 0, 10) + for rpc := range msgch { + rpcs = append(rpcs, rpc) + } + if len(rpcs) < 2 { + t.Fatalf("expected large rpc message to be fragmented before sending on output channel, got %d rpcs", len(rpcs)) + } +} + func TestGossipsubOpportunisticGrafting(t *testing.T) { originalGossipSubPruneBackoff := GossipSubPruneBackoff GossipSubPruneBackoff = 500 * time.Millisecond