fix(BatchPublishing): Make topic.AddToBatch threadsafe (#622)

topic.Publish is already thread safe. topic.AddToBatch should strive to
follow similar semantics.

Looking at how this would integrate with Prysm, they use separate
goroutines per message they'd like to batch.
This commit is contained in:
Marco Munizaga 2025-06-25 12:38:21 -07:00 committed by GitHub
parent 3f89e4331c
commit fedbccc0c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 86 additions and 53 deletions

View File

@ -3682,66 +3682,85 @@ func BenchmarkRoundRobinMessageIDScheduler(b *testing.B) {
} }
func TestMessageBatchPublish(t *testing.T) { func TestMessageBatchPublish(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) concurrentAdds := []bool{false, true}
defer cancel() for _, concurrentAdd := range concurrentAdds {
hosts := getDefaultHosts(t, 20) t.Run(fmt.Sprintf("WithConcurrentAdd=%v", concurrentAdd), func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
hosts := getDefaultHosts(t, 20)
msgIDFn := func(msg *pb.Message) string { msgIDFn := func(msg *pb.Message) string {
hdr := string(msg.Data[0:16]) hdr := string(msg.Data[0:16])
msgID := strings.SplitN(hdr, " ", 2) msgID := strings.SplitN(hdr, " ", 2)
return msgID[0] return msgID[0]
} }
const numMessages = 100 const numMessages = 100
// +8 to account for the gossiping overhead // +8 to account for the gossiping overhead
psubs := getGossipsubs(ctx, hosts, WithMessageIdFn(msgIDFn), WithPeerOutboundQueueSize(numMessages+8), WithValidateQueueSize(numMessages+8)) psubs := getGossipsubs(ctx, hosts, WithMessageIdFn(msgIDFn), WithPeerOutboundQueueSize(numMessages+8), WithValidateQueueSize(numMessages+8))
var topics []*Topic var topics []*Topic
var msgs []*Subscription var msgs []*Subscription
for _, ps := range psubs { for _, ps := range psubs {
topic, err := ps.Join("foobar") topic, err := ps.Join("foobar")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
topics = append(topics, topic) topics = append(topics, topic)
subch, err := topic.Subscribe(WithBufferSize(numMessages + 8)) subch, err := topic.Subscribe(WithBufferSize(numMessages + 8))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
msgs = append(msgs, subch) msgs = append(msgs, subch)
} }
sparseConnect(t, hosts) sparseConnect(t, hosts)
// wait for heartbeats to build mesh // wait for heartbeats to build mesh
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
var batch MessageBatch var batch MessageBatch
for i := 0; i < numMessages; i++ { var wg sync.WaitGroup
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) for i := 0; i < numMessages; i++ {
err := topics[0].AddToBatch(ctx, &batch, msg) msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
if err != nil { if concurrentAdd {
t.Fatal(err) wg.Add(1)
} go func() {
} defer wg.Done()
err := psubs[0].PublishBatch(&batch) err := topics[0].AddToBatch(ctx, &batch, msg)
if err != nil { if err != nil {
t.Fatal(err) t.Log(err)
} t.Fail()
}
for range numMessages { }()
for _, sub := range msgs { } else {
got, err := sub.Next(ctx) err := topics[0].AddToBatch(ctx, &batch, msg)
if err != nil {
t.Fatal(err)
}
}
}
wg.Wait()
err := psubs[0].PublishBatch(&batch)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
id := msgIDFn(got.Message)
expected := []byte(fmt.Sprintf("%s it's not a floooooood %s", id, id)) for range numMessages {
if !bytes.Equal(expected, got.Data) { for _, sub := range msgs {
t.Fatal("got wrong message!") got, err := sub.Next(ctx)
if err != nil {
t.Fatal(err)
}
id := msgIDFn(got.Message)
expected := []byte(fmt.Sprintf("%s it's not a floooooood %s", id, id))
if !bytes.Equal(expected, got.Data) {
t.Fatal("got wrong message!")
}
}
} }
} })
} }
} }

View File

@ -2,6 +2,7 @@ package pubsub
import ( import (
"iter" "iter"
"sync"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
) )
@ -10,9 +11,24 @@ import (
// once. This allows the Scheduler to define an order for outgoing RPCs. // once. This allows the Scheduler to define an order for outgoing RPCs.
// This helps bandwidth constrained peers. // This helps bandwidth constrained peers.
type MessageBatch struct { type MessageBatch struct {
mu sync.Mutex
messages []*Message messages []*Message
} }
func (mb *MessageBatch) add(msg *Message) {
mb.mu.Lock()
defer mb.mu.Unlock()
mb.messages = append(mb.messages, msg)
}
func (mb *MessageBatch) take() []*Message {
mb.mu.Lock()
defer mb.mu.Unlock()
messages := mb.messages
mb.messages = nil
return messages
}
type messageBatchAndPublishOptions struct { type messageBatchAndPublishOptions struct {
messages []*Message messages []*Message
opts *BatchPublishOptions opts *BatchPublishOptions

View File

@ -1600,12 +1600,10 @@ func (p *PubSub) PublishBatch(batch *MessageBatch, opts ...BatchPubOpt) error {
setDefaultBatchPublishOptions(publishOptions) setDefaultBatchPublishOptions(publishOptions)
p.sendMessageBatch <- messageBatchAndPublishOptions{ p.sendMessageBatch <- messageBatchAndPublishOptions{
messages: batch.messages, messages: batch.take(),
opts: publishOptions, opts: publishOptions,
} }
// Clear the batch's messages in case a user reuses the same batch object
batch.messages = nil
return nil return nil
} }

View File

@ -257,7 +257,7 @@ func (t *Topic) AddToBatch(ctx context.Context, batch *MessageBatch, data []byte
} }
return err return err
} }
batch.messages = append(batch.messages, msg) batch.add(msg)
return nil return nil
} }