mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 21:03:07 +00:00
topic.Publish is already thread safe. topic.AddToBatch should strive to follow similar semantics. Looking at how this would integrate with Prysm, they use separate goroutines per message they'd like to batch.
79 lines
1.7 KiB
Go
79 lines
1.7 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"iter"
|
|
"sync"
|
|
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
)
|
|
|
|
// MessageBatch allows a user to batch related messages and then publish them at
|
|
// once. This allows the Scheduler to define an order for outgoing RPCs.
|
|
// This helps bandwidth constrained peers.
|
|
type MessageBatch struct {
|
|
mu sync.Mutex
|
|
messages []*Message
|
|
}
|
|
|
|
func (mb *MessageBatch) add(msg *Message) {
|
|
mb.mu.Lock()
|
|
defer mb.mu.Unlock()
|
|
mb.messages = append(mb.messages, msg)
|
|
}
|
|
|
|
func (mb *MessageBatch) take() []*Message {
|
|
mb.mu.Lock()
|
|
defer mb.mu.Unlock()
|
|
messages := mb.messages
|
|
mb.messages = nil
|
|
return messages
|
|
}
|
|
|
|
type messageBatchAndPublishOptions struct {
|
|
messages []*Message
|
|
opts *BatchPublishOptions
|
|
}
|
|
|
|
// RPCScheduler schedules outgoing RPCs.
|
|
type RPCScheduler interface {
|
|
// AddRPC adds an RPC to the scheduler.
|
|
AddRPC(peer peer.ID, msgID string, rpc *RPC)
|
|
// All returns an ordered iterator of RPCs.
|
|
All() iter.Seq2[peer.ID, *RPC]
|
|
}
|
|
|
|
type pendingRPC struct {
|
|
peer peer.ID
|
|
rpc *RPC
|
|
}
|
|
|
|
// RoundRobinMessageIDScheduler schedules outgoing RPCs in round-robin order of message IDs.
|
|
type RoundRobinMessageIDScheduler struct {
|
|
rpcs map[string][]pendingRPC
|
|
}
|
|
|
|
func (s *RoundRobinMessageIDScheduler) AddRPC(peer peer.ID, msgID string, rpc *RPC) {
|
|
if s.rpcs == nil {
|
|
s.rpcs = make(map[string][]pendingRPC)
|
|
}
|
|
s.rpcs[msgID] = append(s.rpcs[msgID], pendingRPC{peer: peer, rpc: rpc})
|
|
}
|
|
|
|
func (s *RoundRobinMessageIDScheduler) All() iter.Seq2[peer.ID, *RPC] {
|
|
return func(yield func(peer.ID, *RPC) bool) {
|
|
for len(s.rpcs) > 0 {
|
|
for msgID, rpcs := range s.rpcs {
|
|
if len(rpcs) == 0 {
|
|
delete(s.rpcs, msgID)
|
|
continue
|
|
}
|
|
if !yield(rpcs[0].peer, rpcs[0].rpc) {
|
|
return
|
|
}
|
|
|
|
s.rpcs[msgID] = rpcs[1:]
|
|
}
|
|
}
|
|
}
|
|
}
|