Replace fragmentRPC with appendOrMergeRPC (#557)

This will allow us to add more logic around when we split/merge
messages. It will also allow us to build the outgoing rpcs as we go
rather than building one giant rpc and then splitting it.
This commit is contained in:
Marco Munizaga 2024-05-02 09:40:54 -07:00 committed by GitHub
parent 048a4d30d0
commit c0a528ee7b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 322 additions and 107 deletions

149
fuzz_helpers_test.go Normal file
View File

@ -0,0 +1,149 @@
package pubsub
import (
"encoding/binary"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)
func generateU16(data *[]byte) uint16 {
if len(*data) < 2 {
return 0
}
out := binary.LittleEndian.Uint16((*data)[:2])
*data = (*data)[2:]
return out
}
func generateBool(data *[]byte) bool {
if len(*data) < 1 {
return false
}
out := (*data)[0]&1 == 1
*data = (*data)[1:]
return out
}
func generateMessage(data []byte, limit int) *pb.Message {
msgSize := int(generateU16(&data)) % limit
return &pb.Message{Data: make([]byte, msgSize)}
}
func generateSub(data []byte, limit int) *pb.RPC_SubOpts {
topicIDSize := int(generateU16(&data)) % limit
subscribe := generateBool(&data)
str := string(make([]byte, topicIDSize))
return &pb.RPC_SubOpts{Subscribe: &subscribe, Topicid: &str}
}
func generateControl(data []byte, limit int) *pb.ControlMessage {
numIWANTMsgs := int(generateU16(&data)) % (limit / 2)
numIHAVEMsgs := int(generateU16(&data)) % (limit / 2)
ctl := &pb.ControlMessage{}
ctl.Iwant = make([]*pb.ControlIWant, 0, numIWANTMsgs)
for i := 0; i < numIWANTMsgs; i++ {
msgSize := int(generateU16(&data)) % limit
msgCount := int(generateU16(&data)) % limit
ctl.Iwant = append(ctl.Iwant, &pb.ControlIWant{})
ctl.Iwant[i].MessageIDs = make([]string, 0, msgCount)
for j := 0; j < msgCount; j++ {
ctl.Iwant[i].MessageIDs = append(ctl.Iwant[i].MessageIDs, string(make([]byte, msgSize)))
}
}
if ctl.Size() > limit {
return &pb.ControlMessage{}
}
ctl.Ihave = make([]*pb.ControlIHave, 0, numIHAVEMsgs)
for i := 0; i < numIHAVEMsgs; i++ {
msgSize := int(generateU16(&data)) % limit
msgCount := int(generateU16(&data)) % limit
topicSize := int(generateU16(&data)) % limit
topic := string(make([]byte, topicSize))
ctl.Ihave = append(ctl.Ihave, &pb.ControlIHave{TopicID: &topic})
ctl.Ihave[i].MessageIDs = make([]string, 0, msgCount)
for j := 0; j < msgCount; j++ {
ctl.Ihave[i].MessageIDs = append(ctl.Ihave[i].MessageIDs, string(make([]byte, msgSize)))
}
}
if ctl.Size() > limit {
return &pb.ControlMessage{}
}
numGraft := int(generateU16(&data)) % limit
ctl.Graft = make([]*pb.ControlGraft, 0, numGraft)
for i := 0; i < numGraft; i++ {
topicSize := int(generateU16(&data)) % limit
topic := string(make([]byte, topicSize))
ctl.Graft = append(ctl.Graft, &pb.ControlGraft{TopicID: &topic})
}
if ctl.Size() > limit {
return &pb.ControlMessage{}
}
numPrune := int(generateU16(&data)) % limit
ctl.Prune = make([]*pb.ControlPrune, 0, numPrune)
for i := 0; i < numPrune; i++ {
topicSize := int(generateU16(&data)) % limit
topic := string(make([]byte, topicSize))
ctl.Prune = append(ctl.Prune, &pb.ControlPrune{TopicID: &topic})
}
if ctl.Size() > limit {
return &pb.ControlMessage{}
}
return ctl
}
func generateRPC(data []byte, limit int) *RPC {
rpc := &RPC{RPC: pb.RPC{}}
sizeTester := RPC{RPC: pb.RPC{}}
msgCount := int(generateU16(&data)) % (limit / 2)
rpc.Publish = make([]*pb.Message, 0, msgCount)
for i := 0; i < msgCount; i++ {
msg := generateMessage(data, limit)
sizeTester.Publish = []*pb.Message{msg}
size := sizeTester.Size()
sizeTester.Publish = nil
if size > limit {
continue
}
rpc.Publish = append(rpc.Publish, msg)
}
subCount := int(generateU16(&data)) % (limit / 2)
rpc.Subscriptions = make([]*pb.RPC_SubOpts, 0, subCount)
for i := 0; i < subCount; i++ {
sub := generateSub(data, limit)
sizeTester.Subscriptions = []*pb.RPC_SubOpts{sub}
size := sizeTester.Size()
sizeTester.Subscriptions = nil
if size > limit {
continue
}
rpc.Subscriptions = append(rpc.Subscriptions, sub)
}
ctl := generateControl(data, limit)
sizeTester.Control = ctl
size := sizeTester.Size()
sizeTester.Control = nil
if size <= limit {
rpc.Control = ctl
}
return rpc
}

View File

@ -1170,14 +1170,14 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
return return
} }
// If we're too big, fragment into multiple RPCs and send each sequentially // Potentially split the RPC into multiple RPCs that are below the max message size
outRPCs, err := fragmentRPC(out, gs.p.maxMessageSize) outRPCs := appendOrMergeRPC(nil, gs.p.maxMessageSize, *out)
if err != nil {
gs.doDropRPC(out, p, fmt.Sprintf("unable to fragment RPC: %s", err))
return
}
for _, rpc := range outRPCs { for _, rpc := range outRPCs {
if rpc.Size() > gs.p.maxMessageSize {
// This should only happen if a single message/control is above the maxMessageSize.
gs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), gs.p.maxMessageSize, rpc.Size()-gs.p.maxMessageSize))
continue
}
gs.doSendRPC(rpc, p, mch) gs.doSendRPC(rpc, p, mch)
} }
} }
@ -1201,119 +1201,134 @@ func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, mch chan *RPC) {
} }
} }
func fragmentRPC(rpc *RPC, limit int) ([]*RPC, error) { // appendOrMergeRPC appends the given RPCs to the slice, merging them if possible.
if rpc.Size() < limit { // If any elem is too large to fit in a single RPC, it will be split into multiple RPCs.
return []*RPC{rpc}, nil // If an RPC is too large and can't be split further (e.g. Message data is
// bigger than the RPC limit), then it will be returned as an oversized RPC.
// The caller should filter out oversized RPCs.
func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
if len(elems) == 0 {
return slice
} }
c := (rpc.Size() / limit) + 1 if len(slice) == 0 && len(elems) == 1 && elems[0].Size() < limit {
rpcs := make([]*RPC, 1, c) // Fast path: no merging needed and only one element
rpcs[0] = &RPC{RPC: pb.RPC{}, from: rpc.from} return append(slice, &elems[0])
}
// outRPC returns the current RPC message if it will fit sizeToAdd more bytes out := slice
// otherwise, it will create a new RPC message and add it to the list. if len(out) == 0 {
// if withCtl is true, the returned message will have a non-nil empty Control message. out = append(out, &RPC{RPC: pb.RPC{}})
outRPC := func(sizeToAdd int, withCtl bool) *RPC { out[0].from = elems[0].from
current := rpcs[len(rpcs)-1] }
// check if we can fit the new data, plus an extra byte for the protobuf field tag
if current.Size()+sizeToAdd+1 < limit { for _, elem := range elems {
if withCtl && current.Control == nil { lastRPC := out[len(out)-1]
current.Control = &pb.ControlMessage{}
// Merge/Append publish messages
// TODO: Never merge messages. The current behavior is the same as the
// old behavior. In the future let's not merge messages. Since,
// it may increase message latency.
for _, msg := range elem.GetPublish() {
if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit {
lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1]
lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from}
lastRPC.Publish = append(lastRPC.Publish, msg)
out = append(out, lastRPC)
} }
return current
} }
var ctl *pb.ControlMessage
if withCtl { // Merge/Append Subscriptions
ctl = &pb.ControlMessage{} for _, sub := range elem.GetSubscriptions() {
if lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub); lastRPC.Size() > limit {
lastRPC.Subscriptions = lastRPC.Subscriptions[:len(lastRPC.Subscriptions)-1]
lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from}
lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub)
out = append(out, lastRPC)
}
} }
next := &RPC{RPC: pb.RPC{Control: ctl}, from: rpc.from}
rpcs = append(rpcs, next)
return next
}
for _, msg := range rpc.GetPublish() { // Merge/Append Control messages
s := msg.Size() if ctl := elem.GetControl(); ctl != nil {
// if an individual message is too large, we can't fragment it and have to fail entirely if lastRPC.Control == nil {
if s > limit { lastRPC.Control = &pb.ControlMessage{}
return nil, fmt.Errorf("message with len=%d exceeds limit %d", s, limit) if lastRPC.Size() > limit {
} lastRPC.Control = nil
out := outRPC(s, false) lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
out.Publish = append(out.Publish, msg) out = append(out, lastRPC)
} }
}
for _, sub := range rpc.GetSubscriptions() { for _, graft := range ctl.GetGraft() {
out := outRPC(sub.Size(), false) if lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft); lastRPC.Size() > limit {
out.Subscriptions = append(out.Subscriptions, sub) lastRPC.Control.Graft = lastRPC.Control.Graft[:len(lastRPC.Control.Graft)-1]
} lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft)
out = append(out, lastRPC)
}
}
ctl := rpc.GetControl() for _, prune := range ctl.GetPrune() {
if ctl == nil { if lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune); lastRPC.Size() > limit {
// if there were no control messages, we're done lastRPC.Control.Prune = lastRPC.Control.Prune[:len(lastRPC.Control.Prune)-1]
return rpcs, nil lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
} lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune)
// if all the control messages fit into one RPC, we just add it to the end and return out = append(out, lastRPC)
ctlOut := &RPC{RPC: pb.RPC{Control: ctl}, from: rpc.from} }
if ctlOut.Size() < limit { }
rpcs = append(rpcs, ctlOut)
return rpcs, nil
}
// we need to split up the control messages into multiple RPCs for _, iwant := range ctl.GetIwant() {
for _, graft := range ctl.Graft { if len(lastRPC.Control.Iwant) == 0 {
out := outRPC(graft.Size(), true) // Initialize with a single IWANT.
out.Control.Graft = append(out.Control.Graft, graft) // For IWANTs we don't need more than a single one,
} // since there are no topic IDs here.
for _, prune := range ctl.Prune { newIWant := &pb.ControlIWant{}
out := outRPC(prune.Size(), true) if lastRPC.Control.Iwant = append(lastRPC.Control.Iwant, newIWant); lastRPC.Size() > limit {
out.Control.Prune = append(out.Control.Prune, prune) lastRPC.Control.Iwant = lastRPC.Control.Iwant[:len(lastRPC.Control.Iwant)-1]
} lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Iwant: []*pb.ControlIWant{newIWant},
}}, from: elem.from}
out = append(out, lastRPC)
}
}
for _, msgID := range iwant.GetMessageIDs() {
if lastRPC.Control.Iwant[0].MessageIDs = append(lastRPC.Control.Iwant[0].MessageIDs, msgID); lastRPC.Size() > limit {
lastRPC.Control.Iwant[0].MessageIDs = lastRPC.Control.Iwant[0].MessageIDs[:len(lastRPC.Control.Iwant[0].MessageIDs)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Iwant: []*pb.ControlIWant{{MessageIDs: []string{msgID}}},
}}, from: elem.from}
out = append(out, lastRPC)
}
}
}
// An individual IWANT or IHAVE message could be larger than the limit if we have for _, ihave := range ctl.GetIhave() {
// a lot of message IDs. fragmentMessageIds will split them into buckets that if len(lastRPC.Control.Ihave) == 0 ||
// fit within the limit, with some overhead for the control messages themselves lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1].TopicID != ihave.TopicID {
for _, iwant := range ctl.Iwant { // Start a new IHAVE if we are referencing a new topic ID
const protobufOverhead = 6 newIhave := &pb.ControlIHave{TopicID: ihave.TopicID}
idBuckets := fragmentMessageIds(iwant.MessageIDs, limit-protobufOverhead) if lastRPC.Control.Ihave = append(lastRPC.Control.Ihave, newIhave); lastRPC.Size() > limit {
for _, ids := range idBuckets { lastRPC.Control.Ihave = lastRPC.Control.Ihave[:len(lastRPC.Control.Ihave)-1]
iwant := &pb.ControlIWant{MessageIDs: ids} lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
out := outRPC(iwant.Size(), true) Ihave: []*pb.ControlIHave{newIhave},
out.Control.Iwant = append(out.Control.Iwant, iwant) }}, from: elem.from}
out = append(out, lastRPC)
}
}
for _, msgID := range ihave.GetMessageIDs() {
lastIHave := lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1]
if lastIHave.MessageIDs = append(lastIHave.MessageIDs, msgID); lastRPC.Size() > limit {
lastIHave.MessageIDs = lastIHave.MessageIDs[:len(lastIHave.MessageIDs)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Ihave: []*pb.ControlIHave{{TopicID: ihave.TopicID, MessageIDs: []string{msgID}}},
}}, from: elem.from}
out = append(out, lastRPC)
}
}
}
} }
} }
for _, ihave := range ctl.Ihave {
const protobufOverhead = 6
idBuckets := fragmentMessageIds(ihave.MessageIDs, limit-protobufOverhead)
for _, ids := range idBuckets {
ihave := &pb.ControlIHave{MessageIDs: ids}
out := outRPC(ihave.Size(), true)
out.Control.Ihave = append(out.Control.Ihave, ihave)
}
}
return rpcs, nil
}
func fragmentMessageIds(msgIds []string, limit int) [][]string {
// account for two bytes of protobuf overhead per array element
const protobufOverhead = 2
out := [][]string{{}}
var currentBucket int
var bucketLen int
for i := 0; i < len(msgIds); i++ {
size := len(msgIds[i]) + protobufOverhead
if size > limit {
// pathological case where a single message ID exceeds the limit.
log.Warnf("message ID length %d exceeds limit %d, removing from outgoing gossip", size, limit)
continue
}
bucketLen += size
if bucketLen > limit {
out = append(out, []string{})
currentBucket++
bucketLen = size
}
out[currentBucket] = append(out[currentBucket], msgIds[i])
}
return out return out
} }

View File

@ -2335,7 +2335,24 @@ func (iwe *iwantEverything) handleStream(s network.Stream) {
} }
} }
func validRPCSizes(slice []*RPC, limit int) bool {
for _, rpc := range slice {
if rpc.Size() > limit {
return false
}
}
return true
}
func TestFragmentRPCFunction(t *testing.T) { func TestFragmentRPCFunction(t *testing.T) {
fragmentRPC := func(rpc *RPC, limit int) ([]*RPC, error) {
rpcs := appendOrMergeRPC(nil, limit, *rpc)
if allValid := validRPCSizes(rpcs, limit); !allValid {
return rpcs, fmt.Errorf("RPC size exceeds limit")
}
return rpcs, nil
}
p := peer.ID("some-peer") p := peer.ID("some-peer")
topic := "test" topic := "test"
rpc := &RPC{from: p} rpc := &RPC{from: p}
@ -2485,7 +2502,24 @@ func TestFragmentRPCFunction(t *testing.T) {
{MessageIDs: []string{"hello", string(giantIdBytes)}}, {MessageIDs: []string{"hello", string(giantIdBytes)}},
}, },
} }
results, err = fragmentRPC(rpc, limit) results, _ = fragmentRPC(rpc, limit)
// The old behavior would silently drop the giant ID.
// Now we return a the giant ID in a RPC by itself so that it can be
// dropped before actually sending the RPC. This lets us log the anamoly.
// To keep this test useful, we implement the old behavior here.
filtered := make([]*RPC, 0, len(results))
for _, r := range results {
if r.Size() < limit {
filtered = append(filtered, r)
}
}
results = filtered
err = nil
if !validRPCSizes(results, limit) {
err = fmt.Errorf("RPC size exceeds limit")
}
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -2500,3 +2534,20 @@ func TestFragmentRPCFunction(t *testing.T) {
results[0].Control.Iwant[0].MessageIDs[0]) results[0].Control.Iwant[0].MessageIDs[0])
} }
} }
func FuzzAppendOrMergeRPC(f *testing.F) {
minMaxMsgSize := 100
maxMaxMsgSize := 2048
f.Fuzz(func(t *testing.T, data []byte) {
maxSize := int(generateU16(&data)) % maxMaxMsgSize
if maxSize < minMaxMsgSize {
maxSize = minMaxMsgSize
}
rpc := generateRPC(data, maxSize)
rpcs := appendOrMergeRPC(nil, maxSize, *rpc)
if !validRPCSizes(rpcs, maxSize) {
t.Fatalf("invalid RPC size")
}
})
}