refactor: 10x faster RPC splitting (#615)

Builds on #582. 10x faster than current master. 0 allocs.

The basic logic is the same as the old version, except we return an
`iter.Seq[RPC]` and yield `RPC` types instead of a slice of `*RPC`. This
lets us avoid allocations for heap pointers.

Please review @algorandskiy, and let me know if this improves your use
case.
This commit is contained in:
Marco Munizaga 2025-05-28 22:32:25 -07:00 committed by GitHub
parent 38ad16a687
commit c405ca8028
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 378 additions and 146 deletions

View File

@ -1375,14 +1375,13 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, urgent bool) {
}
// Potentially split the RPC into multiple RPCs that are below the max message size
outRPCs := appendOrMergeRPC(nil, gs.p.maxMessageSize, *out)
for _, rpc := range outRPCs {
for rpc := range out.split(gs.p.maxMessageSize) {
if rpc.Size() > gs.p.maxMessageSize {
// This should only happen if a single message/control is above the maxMessageSize.
gs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), gs.p.maxMessageSize, rpc.Size()-gs.p.maxMessageSize))
continue
}
gs.doSendRPC(rpc, p, q, urgent)
gs.doSendRPC(&rpc, p, q, urgent)
}
}
@ -1412,137 +1411,6 @@ func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, q *rpcQueue, urgent bo
gs.tracer.SendRPC(rpc, p)
}
// appendOrMergeRPC appends the given RPCs to the slice, merging them if possible.
// If any elem is too large to fit in a single RPC, it will be split into multiple RPCs.
// If an RPC is too large and can't be split further (e.g. Message data is
// bigger than the RPC limit), then it will be returned as an oversized RPC.
// The caller should filter out oversized RPCs.
func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
if len(elems) == 0 {
return slice
}
if len(slice) == 0 && len(elems) == 1 && elems[0].Size() < limit {
// Fast path: no merging needed and only one element
return append(slice, &elems[0])
}
out := slice
if len(out) == 0 {
out = append(out, &RPC{RPC: pb.RPC{}})
out[0].from = elems[0].from
}
for _, elem := range elems {
lastRPC := out[len(out)-1]
// Merge/Append publish messages
// TODO: Never merge messages. The current behavior is the same as the
// old behavior. In the future let's not merge messages. Since,
// it may increase message latency.
for _, msg := range elem.GetPublish() {
if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit {
lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1]
lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from}
lastRPC.Publish = append(lastRPC.Publish, msg)
out = append(out, lastRPC)
}
}
// Merge/Append Subscriptions
for _, sub := range elem.GetSubscriptions() {
if lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub); lastRPC.Size() > limit {
lastRPC.Subscriptions = lastRPC.Subscriptions[:len(lastRPC.Subscriptions)-1]
lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from}
lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub)
out = append(out, lastRPC)
}
}
// Merge/Append Control messages
if ctl := elem.GetControl(); ctl != nil {
if lastRPC.Control == nil {
lastRPC.Control = &pb.ControlMessage{}
if lastRPC.Size() > limit {
lastRPC.Control = nil
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
out = append(out, lastRPC)
}
}
for _, graft := range ctl.GetGraft() {
if lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft); lastRPC.Size() > limit {
lastRPC.Control.Graft = lastRPC.Control.Graft[:len(lastRPC.Control.Graft)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft)
out = append(out, lastRPC)
}
}
for _, prune := range ctl.GetPrune() {
if lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune); lastRPC.Size() > limit {
lastRPC.Control.Prune = lastRPC.Control.Prune[:len(lastRPC.Control.Prune)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune)
out = append(out, lastRPC)
}
}
for _, iwant := range ctl.GetIwant() {
if len(lastRPC.Control.Iwant) == 0 {
// Initialize with a single IWANT.
// For IWANTs we don't need more than a single one,
// since there are no topic IDs here.
newIWant := &pb.ControlIWant{}
if lastRPC.Control.Iwant = append(lastRPC.Control.Iwant, newIWant); lastRPC.Size() > limit {
lastRPC.Control.Iwant = lastRPC.Control.Iwant[:len(lastRPC.Control.Iwant)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Iwant: []*pb.ControlIWant{newIWant},
}}, from: elem.from}
out = append(out, lastRPC)
}
}
for _, msgID := range iwant.GetMessageIDs() {
if lastRPC.Control.Iwant[0].MessageIDs = append(lastRPC.Control.Iwant[0].MessageIDs, msgID); lastRPC.Size() > limit {
lastRPC.Control.Iwant[0].MessageIDs = lastRPC.Control.Iwant[0].MessageIDs[:len(lastRPC.Control.Iwant[0].MessageIDs)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Iwant: []*pb.ControlIWant{{MessageIDs: []string{msgID}}},
}}, from: elem.from}
out = append(out, lastRPC)
}
}
}
for _, ihave := range ctl.GetIhave() {
if len(lastRPC.Control.Ihave) == 0 ||
lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1].TopicID != ihave.TopicID {
// Start a new IHAVE if we are referencing a new topic ID
newIhave := &pb.ControlIHave{TopicID: ihave.TopicID}
if lastRPC.Control.Ihave = append(lastRPC.Control.Ihave, newIhave); lastRPC.Size() > limit {
lastRPC.Control.Ihave = lastRPC.Control.Ihave[:len(lastRPC.Control.Ihave)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Ihave: []*pb.ControlIHave{newIhave},
}}, from: elem.from}
out = append(out, lastRPC)
}
}
for _, msgID := range ihave.GetMessageIDs() {
lastIHave := lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1]
if lastIHave.MessageIDs = append(lastIHave.MessageIDs, msgID); lastRPC.Size() > limit {
lastIHave.MessageIDs = lastIHave.MessageIDs[:len(lastIHave.MessageIDs)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Ihave: []*pb.ControlIHave{{TopicID: ihave.TopicID, MessageIDs: []string{msgID}}},
}}, from: elem.from}
out = append(out, lastRPC)
}
}
}
}
}
return out
}
func (gs *GossipSubRouter) heartbeatTimer() {
time.Sleep(gs.params.HeartbeatInitialDelay)
select {

View File

@ -8,7 +8,10 @@ import (
"fmt"
"io"
mrand "math/rand"
mrand2 "math/rand/v2"
"slices"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
@ -2341,7 +2344,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) {
}
}
func validRPCSizes(slice []*RPC, limit int) bool {
func validRPCSizes(slice []RPC, limit int) bool {
for _, rpc := range slice {
if rpc.Size() > limit {
return false
@ -2351,8 +2354,8 @@ func validRPCSizes(slice []*RPC, limit int) bool {
}
func TestFragmentRPCFunction(t *testing.T) {
fragmentRPC := func(rpc *RPC, limit int) ([]*RPC, error) {
rpcs := appendOrMergeRPC(nil, limit, *rpc)
fragmentRPC := func(rpc *RPC, limit int) ([]RPC, error) {
rpcs := slices.Collect(rpc.split(limit))
if allValid := validRPCSizes(rpcs, limit); !allValid {
return rpcs, fmt.Errorf("RPC size exceeds limit")
}
@ -2371,7 +2374,7 @@ func TestFragmentRPCFunction(t *testing.T) {
return msg
}
ensureBelowLimit := func(rpcs []*RPC) {
ensureBelowLimit := func(rpcs []RPC) {
for _, r := range rpcs {
if r.Size() > limit {
t.Fatalf("expected fragmented RPC to be below %d bytes, was %d", limit, r.Size())
@ -2387,7 +2390,7 @@ func TestFragmentRPCFunction(t *testing.T) {
t.Fatal(err)
}
if len(results) != 1 {
t.Fatalf("expected single RPC if input is < limit, got %d", len(results))
t.Fatalf("expected single RPC if input is < limit, got %d %#v", len(results), results)
}
// if there's a message larger than the limit, we should fail
@ -2418,8 +2421,8 @@ func TestFragmentRPCFunction(t *testing.T) {
ensureBelowLimit(results)
msgsPerRPC := limit / msgSize
expectedRPCs := nMessages / msgsPerRPC
if len(results) != expectedRPCs {
t.Fatalf("expected %d RPC messages in output, got %d", expectedRPCs, len(results))
if len(results) > expectedRPCs+1 {
t.Fatalf("expected around %d RPC messages in output, got %d", expectedRPCs, len(results))
}
var nMessagesFragmented int
var nSubscriptions int
@ -2514,7 +2517,7 @@ func TestFragmentRPCFunction(t *testing.T) {
// Now we return a the giant ID in a RPC by itself so that it can be
// dropped before actually sending the RPC. This lets us log the anamoly.
// To keep this test useful, we implement the old behavior here.
filtered := make([]*RPC, 0, len(results))
filtered := make([]RPC, 0, len(results))
for _, r := range results {
if r.Size() < limit {
filtered = append(filtered, r)
@ -2541,7 +2544,7 @@ func TestFragmentRPCFunction(t *testing.T) {
}
}
func FuzzAppendOrMergeRPC(f *testing.F) {
func FuzzRPCSplit(f *testing.F) {
minMaxMsgSize := 100
maxMaxMsgSize := 2048
f.Fuzz(func(t *testing.T, data []byte) {
@ -2550,14 +2553,102 @@ func FuzzAppendOrMergeRPC(f *testing.F) {
maxSize = minMaxMsgSize
}
rpc := generateRPC(data, maxSize)
rpcs := appendOrMergeRPC(nil, maxSize, *rpc)
if !validRPCSizes(rpcs, maxSize) {
t.Fatalf("invalid RPC size")
originalControl := compressedRPC{ihave: make(map[string][]string)}
originalControl.append(&rpc.RPC)
mergedControl := compressedRPC{ihave: make(map[string][]string)}
for rpc := range rpc.split(maxSize) {
if rpc.Size() > maxSize {
t.Fatalf("invalid RPC size %v %d (max=%d)", rpc, rpc.Size(), maxSize)
}
mergedControl.append(&rpc.RPC)
}
if !originalControl.equal(&mergedControl) {
t.Fatalf("control mismatch: \n%#v\n%#v\n", originalControl, mergedControl)
}
})
}
type compressedRPC struct {
msgs [][]byte
iwant []string
ihave map[string][]string // topic -> []string
idontwant []string
prune [][]byte
graft []string // list of topic
}
func (c *compressedRPC) equal(o *compressedRPC) bool {
equalBytesSlices := func(a, b [][]byte) bool {
return slices.EqualFunc(a, b, func(e1 []byte, e2 []byte) bool {
return bytes.Equal(e1, e2)
})
}
if !equalBytesSlices(c.msgs, o.msgs) {
return false
}
if !slices.Equal(c.iwant, o.iwant) ||
!slices.Equal(c.idontwant, o.idontwant) ||
!equalBytesSlices(c.prune, o.prune) ||
!slices.Equal(c.graft, o.graft) {
return false
}
if len(c.ihave) != len(o.ihave) {
return false
}
for topic, ids := range c.ihave {
if !slices.Equal(ids, o.ihave[topic]) {
return false
}
}
return true
}
func (c *compressedRPC) append(rpc *pb.RPC) {
for _, m := range rpc.Publish {
d, err := m.Marshal()
if err != nil {
panic(err)
}
c.msgs = append(c.msgs, d)
}
ctrl := rpc.Control
if ctrl == nil {
return
}
for _, iwant := range ctrl.Iwant {
c.iwant = append(c.iwant, iwant.MessageIDs...)
c.iwant = slices.DeleteFunc(c.iwant, func(e string) bool { return len(e) == 0 })
}
for _, ihave := range ctrl.Ihave {
c.ihave[*ihave.TopicID] = append(c.ihave[*ihave.TopicID], ihave.MessageIDs...)
c.ihave[*ihave.TopicID] = slices.DeleteFunc(c.ihave[*ihave.TopicID], func(e string) bool { return len(e) == 0 })
}
for _, idontwant := range ctrl.Idontwant {
c.idontwant = append(c.idontwant, idontwant.MessageIDs...)
c.idontwant = slices.DeleteFunc(c.idontwant, func(e string) bool { return len(e) == 0 })
}
for _, prune := range ctrl.Prune {
d, err := prune.Marshal()
if err != nil {
panic(err)
}
c.prune = append(c.prune, d)
}
for _, graft := range ctrl.Graft {
c.graft = append(c.graft, *graft.TopicID)
c.graft = slices.DeleteFunc(c.graft, func(e string) bool { return len(e) == 0 })
}
}
func TestGossipsubManagesAnAddressBook(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -3675,3 +3766,78 @@ func TestPublishDuplicateMessage(t *testing.T) {
t.Fatal("Duplicate message should not return an error")
}
}
func genNRpcs(tb testing.TB, n int, maxSize int) []*RPC {
r := mrand2.NewChaCha8([32]byte{})
rpcs := make([]*RPC, n)
for i := range rpcs {
var data [64]byte
_, err := r.Read(data[:])
if err != nil {
tb.Fatal(err)
}
rpcs[i] = generateRPC(data[:], maxSize)
}
return rpcs
}
func BenchmarkSplitRPC(b *testing.B) {
maxSize := 2048
rpcs := genNRpcs(b, 100, maxSize)
b.ResetTimer()
for i := 0; i < b.N; i++ {
rpc := rpcs[i%len(rpcs)]
rpc.split(maxSize)
}
}
func BenchmarkSplitRPCLargeMessages(b *testing.B) {
addToRPC := func(rpc *RPC, numMsgs int, msgSize int) {
msgs := make([]*pb.Message, numMsgs)
payload := make([]byte, msgSize)
for i := range msgs {
rpc.Publish = append(rpc.Publish, &pb.Message{
Data: payload,
From: []byte(strconv.Itoa(i)),
})
}
}
b.Run("Many large messages", func(b *testing.B) {
r := mrand.New(mrand.NewSource(99))
const numRPCs = 30
const msgSize = 50 * 1024
rpc := &RPC{}
for i := 0; i < numRPCs; i++ {
addToRPC(rpc, 20, msgSize+r.Intn(100))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for range rpc.split(DefaultMaxMessageSize) {
}
}
})
b.Run("2 large messages", func(b *testing.B) {
const numRPCs = 2
const msgSize = DefaultMaxMessageSize - 100
rpc := &RPC{}
for i := 0; i < numRPCs; i++ {
addToRPC(rpc, 1, msgSize)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
count := 0
for range rpc.split(DefaultMaxMessageSize) {
count++
}
if count != 2 {
b.Fatalf("expected 2 RPCs, got %d", count)
}
}
})
}

198
pubsub.go
View File

@ -5,6 +5,8 @@ import (
"encoding/binary"
"errors"
"fmt"
"iter"
"math/bits"
"math/rand"
"sync"
"sync/atomic"
@ -255,6 +257,202 @@ type RPC struct {
from peer.ID
}
// split splits the given RPC If a sub RPC is too large and can't be split
// further (e.g. Message data is bigger than the RPC limit), then it will be
// returned as an oversized RPC. The caller should filter out oversized RPCs.
func (rpc *RPC) split(limit int) iter.Seq[RPC] {
return func(yield func(RPC) bool) {
nextRPC := RPC{from: rpc.from}
{
nextRPCSize := 0
messagesInNextRPC := 0
messageSlice := rpc.Publish
// Merge/Append publish messages. This pattern is optimized compared the
// the patterns for other fields because this is the common cause for
// splitting a message.
for _, msg := range rpc.Publish {
// We know the message field number is <15 so this is safe.
incrementalSize := pbFieldNumberLT15Size + sizeOfEmbeddedMsg(msg.Size())
if nextRPCSize+incrementalSize > limit {
// The message doesn't fit. Let's set the messages that did fit
// into this RPC, yield it, then make a new one
nextRPC.Publish = messageSlice[:messagesInNextRPC]
messageSlice = messageSlice[messagesInNextRPC:]
if !yield(nextRPC) {
return
}
nextRPC = RPC{from: rpc.from}
nextRPCSize = 0
messagesInNextRPC = 0
}
messagesInNextRPC++
nextRPCSize += incrementalSize
}
if nextRPCSize > 0 {
// yield the message here for simplicity. We aren't optimally
// packing this RPC, but we avoid successively calling .Size()
// on the messages for the next parts.
nextRPC.Publish = messageSlice[:messagesInNextRPC]
if !yield(nextRPC) {
return
}
nextRPC = RPC{from: rpc.from}
}
}
// Fast path check. It's possible the original RPC is now small enough
// without the messages to publish
nextRPC = *rpc
nextRPC.Publish = nil
if s := nextRPC.Size(); s < limit {
if s != 0 {
yield(nextRPC)
}
return
}
// We have to split the RPC into multiple parts
nextRPC = RPC{from: rpc.from}
// Merge/Append Subscriptions
for _, sub := range rpc.Subscriptions {
if nextRPC.Subscriptions = append(nextRPC.Subscriptions, sub); nextRPC.Size() > limit {
nextRPC.Subscriptions = nextRPC.Subscriptions[:len(nextRPC.Subscriptions)-1]
if !yield(nextRPC) {
return
}
nextRPC = RPC{from: rpc.from}
nextRPC.Subscriptions = append(nextRPC.Subscriptions, sub)
}
}
// Merge/Append Control messages
if ctl := rpc.Control; ctl != nil {
if nextRPC.Control == nil {
nextRPC.Control = &pb.ControlMessage{}
if nextRPC.Size() > limit {
nextRPC.Control = nil
if !yield(nextRPC) {
return
}
nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from}
}
}
for _, graft := range ctl.GetGraft() {
if nextRPC.Control.Graft = append(nextRPC.Control.Graft, graft); nextRPC.Size() > limit {
nextRPC.Control.Graft = nextRPC.Control.Graft[:len(nextRPC.Control.Graft)-1]
if !yield(nextRPC) {
return
}
nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from}
nextRPC.Control.Graft = append(nextRPC.Control.Graft, graft)
}
}
for _, prune := range ctl.GetPrune() {
if nextRPC.Control.Prune = append(nextRPC.Control.Prune, prune); nextRPC.Size() > limit {
nextRPC.Control.Prune = nextRPC.Control.Prune[:len(nextRPC.Control.Prune)-1]
if !yield(nextRPC) {
return
}
nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from}
nextRPC.Control.Prune = append(nextRPC.Control.Prune, prune)
}
}
for _, iwant := range ctl.GetIwant() {
if len(nextRPC.Control.Iwant) == 0 {
// Initialize with a single IWANT.
// For IWANTs we don't need more than a single one,
// since there are no topic IDs here.
newIWant := &pb.ControlIWant{}
if nextRPC.Control.Iwant = append(nextRPC.Control.Iwant, newIWant); nextRPC.Size() > limit {
nextRPC.Control.Iwant = nextRPC.Control.Iwant[:len(nextRPC.Control.Iwant)-1]
if !yield(nextRPC) {
return
}
nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Iwant: []*pb.ControlIWant{newIWant},
}}, from: rpc.from}
}
}
for _, msgID := range iwant.GetMessageIDs() {
if nextRPC.Control.Iwant[0].MessageIDs = append(nextRPC.Control.Iwant[0].MessageIDs, msgID); nextRPC.Size() > limit {
nextRPC.Control.Iwant[0].MessageIDs = nextRPC.Control.Iwant[0].MessageIDs[:len(nextRPC.Control.Iwant[0].MessageIDs)-1]
if !yield(nextRPC) {
return
}
nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Iwant: []*pb.ControlIWant{{MessageIDs: []string{msgID}}},
}}, from: rpc.from}
}
}
}
for _, ihave := range ctl.GetIhave() {
if len(nextRPC.Control.Ihave) == 0 ||
nextRPC.Control.Ihave[len(nextRPC.Control.Ihave)-1].TopicID != ihave.TopicID {
// Start a new IHAVE if we are referencing a new topic ID
newIhave := &pb.ControlIHave{TopicID: ihave.TopicID}
if nextRPC.Control.Ihave = append(nextRPC.Control.Ihave, newIhave); nextRPC.Size() > limit {
nextRPC.Control.Ihave = nextRPC.Control.Ihave[:len(nextRPC.Control.Ihave)-1]
if !yield(nextRPC) {
return
}
nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Ihave: []*pb.ControlIHave{newIhave},
}}, from: rpc.from}
}
}
for _, msgID := range ihave.GetMessageIDs() {
lastIHave := nextRPC.Control.Ihave[len(nextRPC.Control.Ihave)-1]
if lastIHave.MessageIDs = append(lastIHave.MessageIDs, msgID); nextRPC.Size() > limit {
lastIHave.MessageIDs = lastIHave.MessageIDs[:len(lastIHave.MessageIDs)-1]
if !yield(nextRPC) {
return
}
nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Ihave: []*pb.ControlIHave{{TopicID: ihave.TopicID, MessageIDs: []string{msgID}}},
}}, from: rpc.from}
}
}
}
}
if nextRPC.Size() > 0 {
if !yield(nextRPC) {
return
}
}
}
}
// pbFieldNumberLT15Size is the number of bytes required to encode a protobuf
// field number less than or equal to 15 along with its wire type. This is 1
// byte because the protobuf encoding of field numbers is a varint encoding of:
// fieldNumber << 3 | wireType
// Refer to https://protobuf.dev/programming-guides/encoding/#structure
// for more details on the encoding of messages. You may also reference the
// concrete implementation of pb.RPC.Size()
const pbFieldNumberLT15Size = 1
func sovRpc(x uint64) (n int) {
return (bits.Len64(x) + 6) / 7
}
func sizeOfEmbeddedMsg(
msgSize int,
) int {
prefixSize := sovRpc(uint64(msgSize))
return prefixSize + msgSize
}
type Option func(*PubSub) error
// NewPubSub returns a new PubSub management object.