try to send control messages in one RPC when fragmenting

This commit is contained in:
Yusef Napora 2020-05-05 14:15:16 -04:00 committed by vyzo
parent cb65238a39
commit b1de03b608
1 changed files with 31 additions and 13 deletions

View File

@ -935,13 +935,21 @@ func fragmentRPC(rpc *RPC, limit int) ([]*RPC, error) {
c := (rpc.Size() / limit) + 1
rpcs := make([]*RPC, 1, c)
rpcs[0] = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from}
outRPC := func(sizeToAdd int) *RPC {
rpcs[0] = &RPC{RPC: pb.RPC{}, from: rpc.from}
// outRPC returns the current RPC message if it will fit sizeToAdd more bytes
// otherwise, it will create a new RPC message and add it to the list.
// if withCtl is true, the new RPC message will have a non-nil empty Control message.
outRPC := func(sizeToAdd int, withCtl bool) *RPC {
current := rpcs[len(rpcs)-1]
if current.Size()+sizeToAdd < limit {
return current
}
next := &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from}
var ctl *pb.ControlMessage
if withCtl {
ctl = &pb.ControlMessage{}
}
next := &RPC{RPC: pb.RPC{Control: ctl}, from: rpc.from}
rpcs = append(rpcs, next)
return next
}
@ -952,12 +960,12 @@ func fragmentRPC(rpc *RPC, limit int) ([]*RPC, error) {
if s > limit {
return nil, fmt.Errorf("message with len=%d exceeds limit %d", s, limit)
}
out := outRPC(s)
out := outRPC(s, false)
out.Publish = append(out.Publish, msg)
}
for _, sub := range rpc.GetSubscriptions() {
out := outRPC(sub.Size())
out := outRPC(sub.Size(), false)
out.Subscriptions = append(out.Subscriptions, sub)
}
@ -966,23 +974,33 @@ func fragmentRPC(rpc *RPC, limit int) ([]*RPC, error) {
// if there were no control messages, we're done
return rpcs, nil
}
// if all the control messages fit into one RPC, we just add it to the end and return
ctlOut := &RPC{RPC: pb.RPC{Control: ctl}, from: rpc.from}
if ctlOut.Size() < limit {
rpcs = append(rpcs, ctlOut)
return rpcs, nil
}
// we need to split up the control messages into multiple RPCs
// add a blank rpc message to the end of the list, then use outRPC to get or create
// RPC messages to fit each control message
rpcs = append(rpcs, &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from})
for _, graft := range ctl.Graft {
out := outRPC(graft.Size())
out := outRPC(graft.Size(), true)
out.Control.Graft = append(out.Control.Graft, graft)
}
for _, prune := range ctl.Prune {
out := outRPC(prune.Size())
out := outRPC(prune.Size(), true)
out.Control.Prune = append(out.Control.Prune, prune)
}
for _, ihave := range ctl.Ihave {
out := outRPC(ihave.Size())
out.Control.Ihave = append(out.Control.Ihave, ihave)
}
for _, iwant := range ctl.Iwant {
out := outRPC(iwant.Size())
out := outRPC(iwant.Size(), true)
out.Control.Iwant = append(out.Control.Iwant, iwant)
}
for _, ihave := range ctl.Ihave {
out := outRPC(ihave.Size(), true)
out.Control.Ihave = append(out.Control.Ihave, ihave)
}
return rpcs, nil
}