go-libp2p-pubsub/pubsub.go
Marco Munizaga 0c5ee7bbfe
feat(gossipsub): Add MessageBatch (#607)
to support batch publishing messages

Replaces #602.

Batch publishing lets the system know there are multiple related
messages to be published so it can prioritize sending different messages
before sending copies of messages. For example, with the default API,
when you publish two messages A and B, under the hood A gets sent to D=8
peers first, before B gets sent out. With this MessageBatch api we can
now send one copy of A _and then_ one copy of B before sending multiple
copies.

When a node has bandwidth constraints relative to the messages it is
publishing this improves dissemination time.

For more context see this post:
https://ethresear.ch/t/improving-das-performance-with-gossipsub-batch-publishing/21713
2025-05-08 10:23:02 -07:00

1496 lines
41 KiB
Go

package pubsub
import (
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-pubsub/timecache"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
logging "github.com/ipfs/go-log/v2"
)
// DefaultMaximumMessageSize is 1mb.
const DefaultMaxMessageSize = 1 << 20
var (
// TimeCacheDuration specifies how long a message ID will be remembered as seen.
// Use WithSeenMessagesTTL to configure this per pubsub instance, instead of overriding the global default.
TimeCacheDuration = 120 * time.Second
// TimeCacheStrategy specifies which type of lookup/cleanup strategy is used by the seen messages cache.
// Use WithSeenMessagesStrategy to configure this per pubsub instance, instead of overriding the global default.
TimeCacheStrategy = timecache.Strategy_FirstSeen
// ErrSubscriptionCancelled may be returned when a subscription Next() is called after the
// subscription has been cancelled.
ErrSubscriptionCancelled = errors.New("subscription cancelled")
)
var log = logging.Logger("pubsub")
type ProtocolMatchFn = func(protocol.ID) func(protocol.ID) bool
// PubSub is the implementation of the pubsub system.
type PubSub struct {
// atomic counter for seqnos
// NOTE: Must be declared at the top of the struct as we perform atomic
// operations on this field.
//
// See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
counter uint64
host host.Host
rt PubSubRouter
val *validation
disc *discover
tracer *pubsubTracer
peerFilter PeerFilter
// maxMessageSize is the maximum message size; it applies globally to all
// topics.
maxMessageSize int
// size of the outbound message channel that we maintain for each peer
peerOutboundQueueSize int
// incoming messages from other peers
incoming chan *RPC
// addSub is a control channel for us to add and remove subscriptions
addSub chan *addSubReq
// addRelay is a control channel for us to add and remove relays
addRelay chan *addRelayReq
// rmRelay is a relay cancellation channel
rmRelay chan string
// get list of topics we are subscribed to
getTopics chan *topicReq
// get chan of peers we are connected to
getPeers chan *listPeerReq
// send subscription here to cancel it
cancelCh chan *Subscription
// addSub is a channel for us to add a topic
addTopic chan *addTopicReq
// removeTopic is a topic cancellation channel
rmTopic chan *rmTopicReq
// a notification channel for new peer connections accumulated
newPeers chan struct{}
newPeersPrioLk sync.RWMutex
newPeersMx sync.Mutex
newPeersPend map[peer.ID]struct{}
// a notification channel for new outoging peer streams
newPeerStream chan network.Stream
// a notification channel for errors opening new peer streams
newPeerError chan peer.ID
// a notification channel for when our peers die
peerDead chan struct{}
peerDeadPrioLk sync.RWMutex
peerDeadMx sync.Mutex
peerDeadPend map[peer.ID]struct{}
// backoff for retrying new connections to dead peers
deadPeerBackoff *backoff
// The set of topics we are subscribed to
mySubs map[string]map[*Subscription]struct{}
// The set of topics we are relaying for
myRelays map[string]int
// The set of topics we are interested in
myTopics map[string]*Topic
// topics tracks which topics each of our peers are subscribed to
topics map[string]map[peer.ID]struct{}
// sendMsg handles messages that have been validated
sendMsg chan *Message
// sendMessageBatch publishes a batch of messages
sendMessageBatch chan messageBatchAndPublishOptions
// addVal handles validator registration requests
addVal chan *addValReq
// rmVal handles validator unregistration requests
rmVal chan *rmValReq
// eval thunk in event loop
eval chan func()
// peer blacklist
blacklist Blacklist
blacklistPeer chan peer.ID
peers map[peer.ID]*rpcQueue
inboundStreamsMx sync.Mutex
inboundStreams map[peer.ID]network.Stream
seenMessages timecache.TimeCache
seenMsgTTL time.Duration
seenMsgStrategy timecache.Strategy
// generator used to compute the ID for a message
idGen *msgIDGenerator
// key for signing messages; nil when signing is disabled
signKey crypto.PrivKey
// source ID for signed messages; corresponds to signKey, empty when signing is disabled.
// If empty, the author and seq-nr are completely omitted from the messages.
signID peer.ID
// strict mode rejects all unsigned messages prior to validation
signPolicy MessageSignaturePolicy
// filter for tracking subscriptions in topics of interest; if nil, then we track all subscriptions
subFilter SubscriptionFilter
// protoMatchFunc is a matching function for protocol selection.
protoMatchFunc ProtocolMatchFn
ctx context.Context
// appSpecificRpcInspector is an auxiliary that may be set by the application to inspect incoming RPCs prior to
// processing them. The inspector is invoked on an accepted RPC right prior to handling it.
// The return value of the inspector function is an error indicating whether the RPC should be processed or not.
// If the error is nil, the RPC is processed as usual. If the error is non-nil, the RPC is dropped.
appSpecificRpcInspector func(peer.ID, *RPC) error
}
// PubSubRouter is the message router component of PubSub.
type PubSubRouter interface {
// Protocols returns the list of protocols supported by the router.
Protocols() []protocol.ID
// Attach is invoked by the PubSub constructor to attach the router to a
// freshly initialized PubSub instance.
Attach(*PubSub)
// AddPeer notifies the router that a new peer has been connected.
AddPeer(peer.ID, protocol.ID)
// RemovePeer notifies the router that a peer has been disconnected.
RemovePeer(peer.ID)
// EnoughPeers returns whether the router needs more peers before it's ready to publish new records.
// Suggested (if greater than 0) is a suggested number of peers that the router should need.
EnoughPeers(topic string, suggested int) bool
// AcceptFrom is invoked on any RPC envelope before pushing it to the validation pipeline
// or processing control information.
// Allows routers with internal scoring to vet peers before committing any processing resources
// to the message and implement an effective graylist and react to validation queue overload.
AcceptFrom(peer.ID) AcceptStatus
// PreValidation is invoked on messages in the RPC envelope right before pushing it to
// the validation pipeline
PreValidation(from peer.ID, msgs []*Message)
// HandleRPC is invoked to process control messages in the RPC envelope.
// It is invoked after subscriptions and payload messages have been processed.
HandleRPC(*RPC)
// Publish is invoked to forward a new message that has been validated.
Publish(*Message)
// Join notifies the router that we want to receive and forward messages in a topic.
// It is invoked after the subscription announcement.
Join(topic string)
// Leave notifies the router that we are no longer interested in a topic.
// It is invoked after the unsubscription announcement.
Leave(topic string)
}
type BatchPublisher interface {
PublishBatch(messages []*Message, opts *BatchPublishOptions)
}
type AcceptStatus int
const (
// AcceptNone signals to drop the incoming RPC
AcceptNone AcceptStatus = iota
// AcceptControl signals to accept the incoming RPC only for control message processing by
// the router. Included payload messages will _not_ be pushed to the validation queue.
AcceptControl
// AcceptAll signals to accept the incoming RPC for full processing
AcceptAll
)
type Message struct {
*pb.Message
ID string
ReceivedFrom peer.ID
ValidatorData interface{}
Local bool
}
func (m *Message) GetFrom() peer.ID {
return peer.ID(m.Message.GetFrom())
}
type RPC struct {
pb.RPC
// unexported on purpose, not sending this over the wire
from peer.ID
}
type Option func(*PubSub) error
// NewPubSub returns a new PubSub management object.
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) {
ps := &PubSub{
host: h,
ctx: ctx,
rt: rt,
val: newValidation(),
peerFilter: DefaultPeerFilter,
disc: &discover{},
maxMessageSize: DefaultMaxMessageSize,
peerOutboundQueueSize: 32,
signID: h.ID(),
signKey: nil,
signPolicy: StrictSign,
incoming: make(chan *RPC, 32),
newPeers: make(chan struct{}, 1),
newPeersPend: make(map[peer.ID]struct{}),
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
peerDead: make(chan struct{}, 1),
peerDeadPend: make(map[peer.ID]struct{}),
deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addRelay: make(chan *addRelayReq),
rmRelay: make(chan string),
addTopic: make(chan *addTopicReq),
rmTopic: make(chan *rmTopicReq),
getTopics: make(chan *topicReq),
sendMsg: make(chan *Message, 32),
sendMessageBatch: make(chan messageBatchAndPublishOptions, 1),
addVal: make(chan *addValReq),
rmVal: make(chan *rmValReq),
eval: make(chan func()),
myTopics: make(map[string]*Topic),
mySubs: make(map[string]map[*Subscription]struct{}),
myRelays: make(map[string]int),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]*rpcQueue),
inboundStreams: make(map[peer.ID]network.Stream),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMsgTTL: TimeCacheDuration,
seenMsgStrategy: TimeCacheStrategy,
idGen: newMsgIdGenerator(),
counter: uint64(time.Now().UnixNano()),
}
for _, opt := range opts {
err := opt(ps)
if err != nil {
return nil, err
}
}
if ps.signPolicy.mustSign() {
if ps.signID == "" {
return nil, fmt.Errorf("strict signature usage enabled but message author was disabled")
}
ps.signKey = ps.host.Peerstore().PrivKey(ps.signID)
if ps.signKey == nil {
return nil, fmt.Errorf("can't sign for peer %s: no private key", ps.signID)
}
}
ps.seenMessages = timecache.NewTimeCacheWithStrategy(ps.seenMsgStrategy, ps.seenMsgTTL)
if err := ps.disc.Start(ps); err != nil {
return nil, err
}
rt.Attach(ps)
for _, id := range rt.Protocols() {
if ps.protoMatchFunc != nil {
h.SetStreamHandlerMatch(id, ps.protoMatchFunc(id), ps.handleNewStream)
} else {
h.SetStreamHandler(id, ps.handleNewStream)
}
}
go ps.watchForNewPeers(ctx)
ps.val.Start(ps)
go ps.processLoop(ctx)
return ps, nil
}
// MsgIdFunction returns a unique ID for the passed Message, and PubSub can be customized to use any
// implementation of this function by configuring it with the Option from WithMessageIdFn.
type MsgIdFunction func(pmsg *pb.Message) string
// WithMessageIdFn is an option to customize the way a message ID is computed for a pubsub message.
// The default ID function is DefaultMsgIdFn (concatenate source and seq nr.),
// but it can be customized to e.g. the hash of the message.
func WithMessageIdFn(fn MsgIdFunction) Option {
return func(p *PubSub) error {
p.idGen.Default = fn
return nil
}
}
// PeerFilter is used to filter pubsub peers. It should return true for peers that are accepted for
// a given topic. PubSub can be customized to use any implementation of this function by configuring
// it with the Option from WithPeerFilter.
type PeerFilter func(pid peer.ID, topic string) bool
// WithPeerFilter is an option to set a filter for pubsub peers.
// The default peer filter is DefaultPeerFilter (which always returns true), but it can be customized
// to any custom implementation.
func WithPeerFilter(filter PeerFilter) Option {
return func(p *PubSub) error {
p.peerFilter = filter
return nil
}
}
// WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer
// We start dropping messages to a peer if the outbound queue if full
func WithPeerOutboundQueueSize(size int) Option {
return func(p *PubSub) error {
if size <= 0 {
return errors.New("outbound queue size must always be positive")
}
p.peerOutboundQueueSize = size
return nil
}
}
// WithMessageSignaturePolicy sets the mode of operation for producing and verifying message signatures.
func WithMessageSignaturePolicy(policy MessageSignaturePolicy) Option {
return func(p *PubSub) error {
p.signPolicy = policy
return nil
}
}
// WithMessageSigning enables or disables message signing (enabled by default).
// Deprecated: signature verification without message signing,
// or message signing without verification, are not recommended.
func WithMessageSigning(enabled bool) Option {
return func(p *PubSub) error {
if enabled {
p.signPolicy |= msgSigning
} else {
p.signPolicy &^= msgSigning
}
return nil
}
}
// WithMessageAuthor sets the author for outbound messages to the given peer ID
// (defaults to the host's ID). If message signing is enabled, the private key
// must be available in the host's peerstore.
func WithMessageAuthor(author peer.ID) Option {
return func(p *PubSub) error {
author := author
if author == "" {
author = p.host.ID()
}
p.signID = author
return nil
}
}
// WithNoAuthor omits the author and seq-number data of messages, and disables the use of signatures.
// Not recommended to use with the default message ID function, see WithMessageIdFn.
func WithNoAuthor() Option {
return func(p *PubSub) error {
p.signID = ""
p.signPolicy &^= msgSigning
return nil
}
}
// WithStrictSignatureVerification is an option to enable or disable strict message signing.
// When enabled (which is the default), unsigned messages will be discarded.
// Deprecated: signature verification without message signing,
// or message signing without verification, are not recommended.
func WithStrictSignatureVerification(required bool) Option {
return func(p *PubSub) error {
if required {
p.signPolicy |= msgVerification
} else {
p.signPolicy &^= msgVerification
}
return nil
}
}
// WithBlacklist provides an implementation of the blacklist; the default is a
// MapBlacklist
func WithBlacklist(b Blacklist) Option {
return func(p *PubSub) error {
p.blacklist = b
return nil
}
}
// WithDiscovery provides a discovery mechanism used to bootstrap and provide peers into PubSub
func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option {
return func(p *PubSub) error {
discoverOpts := defaultDiscoverOptions()
for _, opt := range opts {
err := opt(discoverOpts)
if err != nil {
return err
}
}
p.disc.discovery = &pubSubDiscovery{Discovery: d, opts: discoverOpts.opts}
p.disc.options = discoverOpts
return nil
}
}
// WithEventTracer provides a tracer for the pubsub system
func WithEventTracer(tracer EventTracer) Option {
return func(p *PubSub) error {
if p.tracer != nil {
p.tracer.tracer = tracer
} else {
p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID(), idGen: p.idGen}
}
return nil
}
}
// WithRawTracer adds a raw tracer to the pubsub system.
// Multiple tracers can be added using multiple invocations of the option.
func WithRawTracer(tracer RawTracer) Option {
return func(p *PubSub) error {
if p.tracer != nil {
p.tracer.raw = append(p.tracer.raw, tracer)
} else {
p.tracer = &pubsubTracer{raw: []RawTracer{tracer}, pid: p.host.ID(), idGen: p.idGen}
}
return nil
}
}
// WithMaxMessageSize sets the global maximum message size for pubsub wire
// messages. The default value is 1MiB (DefaultMaxMessageSize).
//
// Observe the following warnings when setting this option.
//
// WARNING #1: Make sure to change the default protocol prefixes for floodsub
// (FloodSubID) and gossipsub (GossipSubID). This avoids accidentally joining
// the public default network, which uses the default max message size, and
// therefore will cause messages to be dropped.
//
// WARNING #2: Reducing the default max message limit is fine, if you are
// certain that your application messages will not exceed the new limit.
// However, be wary of increasing the limit, as pubsub networks are naturally
// write-amplifying, i.e. for every message we receive, we send D copies of the
// message to our peers. If those messages are large, the bandwidth requirements
// will grow linearly. Note that propagation is sent on the uplink, which
// traditionally is more constrained than the downlink. Instead, consider
// out-of-band retrieval for large messages, by sending a CID (Content-ID) or
// another type of locator, such that messages can be fetched on-demand, rather
// than being pushed proactively. Under this design, you'd use the pubsub layer
// as a signalling system, rather than a data delivery system.
func WithMaxMessageSize(maxMessageSize int) Option {
return func(ps *PubSub) error {
ps.maxMessageSize = maxMessageSize
return nil
}
}
// WithProtocolMatchFn sets a custom matching function for protocol selection to
// be used by the protocol handler on the Host's Mux. Should be combined with
// WithGossipSubProtocols feature function for checking if certain protocol features
// are supported
func WithProtocolMatchFn(m ProtocolMatchFn) Option {
return func(ps *PubSub) error {
ps.protoMatchFunc = m
return nil
}
}
// WithSeenMessagesTTL configures when a previously seen message ID can be forgotten about
func WithSeenMessagesTTL(ttl time.Duration) Option {
return func(ps *PubSub) error {
ps.seenMsgTTL = ttl
return nil
}
}
// WithSeenMessagesStrategy configures which type of lookup/cleanup strategy is used by the seen messages cache
func WithSeenMessagesStrategy(strategy timecache.Strategy) Option {
return func(ps *PubSub) error {
ps.seenMsgStrategy = strategy
return nil
}
}
// WithAppSpecificRpcInspector sets a hook that inspect incomings RPCs prior to
// processing them. The inspector is invoked on an accepted RPC just before it
// is handled. If inspector's error is nil, the RPC is handled. Otherwise, it
// is dropped.
func WithAppSpecificRpcInspector(inspector func(peer.ID, *RPC) error) Option {
return func(ps *PubSub) error {
ps.appSpecificRpcInspector = inspector
return nil
}
}
// processLoop handles all inputs arriving on the channels
func (p *PubSub) processLoop(ctx context.Context) {
defer func() {
// Clean up go routines.
for _, queue := range p.peers {
queue.Close()
}
p.peers = nil
p.topics = nil
p.seenMessages.Done()
}()
for {
select {
case <-p.newPeers:
p.handlePendingPeers()
case s := <-p.newPeerStream:
pid := s.Conn().RemotePeer()
q, ok := p.peers[pid]
if !ok {
log.Warn("new stream for unknown peer: ", pid)
s.Reset()
continue
}
if p.blacklist.Contains(pid) {
log.Warn("closing stream for blacklisted peer: ", pid)
q.Close()
delete(p.peers, pid)
s.Reset()
continue
}
p.rt.AddPeer(pid, s.Protocol())
case pid := <-p.newPeerError:
delete(p.peers, pid)
case <-p.peerDead:
p.handleDeadPeers()
case treq := <-p.getTopics:
var out []string
for t := range p.mySubs {
out = append(out, t)
}
treq.resp <- out
case topic := <-p.addTopic:
p.handleAddTopic(topic)
case topic := <-p.rmTopic:
p.handleRemoveTopic(topic)
case sub := <-p.cancelCh:
p.handleRemoveSubscription(sub)
case sub := <-p.addSub:
p.handleAddSubscription(sub)
case relay := <-p.addRelay:
p.handleAddRelay(relay)
case topic := <-p.rmRelay:
p.handleRemoveRelay(topic)
case preq := <-p.getPeers:
tmap, ok := p.topics[preq.topic]
if preq.topic != "" && !ok {
preq.resp <- nil
continue
}
var peers []peer.ID
for p := range p.peers {
if preq.topic != "" {
_, ok := tmap[p]
if !ok {
continue
}
}
peers = append(peers, p)
}
preq.resp <- peers
case rpc := <-p.incoming:
p.handleIncomingRPC(rpc)
case msg := <-p.sendMsg:
p.publishMessage(msg)
case batchAndOpts := <-p.sendMessageBatch:
p.publishMessageBatch(batchAndOpts)
case req := <-p.addVal:
p.val.AddValidator(req)
case req := <-p.rmVal:
p.val.RemoveValidator(req)
case thunk := <-p.eval:
thunk()
case pid := <-p.blacklistPeer:
log.Infof("Blacklisting peer %s", pid)
p.blacklist.Add(pid)
q, ok := p.peers[pid]
if ok {
q.Close()
delete(p.peers, pid)
for t, tmap := range p.topics {
if _, ok := tmap[pid]; ok {
delete(tmap, pid)
p.notifyLeave(t, pid)
}
}
p.rt.RemovePeer(pid)
}
case <-ctx.Done():
log.Info("pubsub processloop shutting down")
return
}
}
}
func (p *PubSub) handlePendingPeers() {
p.newPeersPrioLk.Lock()
if len(p.newPeersPend) == 0 {
p.newPeersPrioLk.Unlock()
return
}
newPeers := p.newPeersPend
p.newPeersPend = make(map[peer.ID]struct{})
p.newPeersPrioLk.Unlock()
for pid := range newPeers {
// Make sure we have a non-limited connection. We do this late because we may have
// disconnected in the meantime.
if p.host.Network().Connectedness(pid) != network.Connected {
continue
}
if _, ok := p.peers[pid]; ok {
log.Debug("already have connection to peer: ", pid)
continue
}
if p.blacklist.Contains(pid) {
log.Warn("ignoring connection from blacklisted peer: ", pid)
continue
}
rpcQueue := newRpcQueue(p.peerOutboundQueueSize)
rpcQueue.Push(p.getHelloPacket(), true)
go p.handleNewPeer(p.ctx, pid, rpcQueue)
p.peers[pid] = rpcQueue
}
}
func (p *PubSub) handleDeadPeers() {
p.peerDeadPrioLk.Lock()
if len(p.peerDeadPend) == 0 {
p.peerDeadPrioLk.Unlock()
return
}
deadPeers := p.peerDeadPend
p.peerDeadPend = make(map[peer.ID]struct{})
p.peerDeadPrioLk.Unlock()
for pid := range deadPeers {
q, ok := p.peers[pid]
if !ok {
continue
}
q.Close()
delete(p.peers, pid)
for t, tmap := range p.topics {
if _, ok := tmap[pid]; ok {
delete(tmap, pid)
p.notifyLeave(t, pid)
}
}
p.rt.RemovePeer(pid)
if p.host.Network().Connectedness(pid) == network.Connected {
backoffDelay, err := p.deadPeerBackoff.updateAndGet(pid)
if err != nil {
log.Debug(err)
continue
}
// still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is a stream active
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
rpcQueue := newRpcQueue(p.peerOutboundQueueSize)
rpcQueue.Push(p.getHelloPacket(), true)
p.peers[pid] = rpcQueue
go p.handleNewPeerWithBackoff(p.ctx, pid, backoffDelay, rpcQueue)
}
}
}
// handleAddTopic adds a tracker for a particular topic.
// Only called from processLoop.
func (p *PubSub) handleAddTopic(req *addTopicReq) {
topic := req.topic
topicID := topic.topic
t, ok := p.myTopics[topicID]
if ok {
req.resp <- t
return
}
p.myTopics[topicID] = topic
req.resp <- topic
}
// handleRemoveTopic removes Topic tracker from bookkeeping.
// Only called from processLoop.
func (p *PubSub) handleRemoveTopic(req *rmTopicReq) {
topic := p.myTopics[req.topic.topic]
if topic == nil {
req.resp <- nil
return
}
if len(topic.evtHandlers) == 0 &&
len(p.mySubs[req.topic.topic]) == 0 &&
p.myRelays[req.topic.topic] == 0 {
delete(p.myTopics, topic.topic)
req.resp <- nil
return
}
req.resp <- fmt.Errorf("cannot close topic: outstanding event handlers or subscriptions")
}
// handleRemoveSubscription removes Subscription sub from bookeeping.
// If this was the last subscription and no more relays exist for a given topic,
// it will also announce that this node is not subscribing to this topic anymore.
// Only called from processLoop.
func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
subs := p.mySubs[sub.topic]
if subs == nil {
return
}
sub.err = ErrSubscriptionCancelled
sub.close()
delete(subs, sub)
if len(subs) == 0 {
delete(p.mySubs, sub.topic)
// stop announcing only if there are no more subs and relays
if p.myRelays[sub.topic] == 0 {
p.disc.StopAdvertise(sub.topic)
p.announce(sub.topic, false)
p.rt.Leave(sub.topic)
}
}
}
// handleAddSubscription adds a Subscription for a particular topic. If it is
// the first subscription and no relays exist so far for the topic, it will
// announce that this node subscribes to the topic.
// Only called from processLoop.
func (p *PubSub) handleAddSubscription(req *addSubReq) {
sub := req.sub
subs := p.mySubs[sub.topic]
// announce we want this topic if neither subs nor relays exist so far
if len(subs) == 0 && p.myRelays[sub.topic] == 0 {
p.disc.Advertise(sub.topic)
p.announce(sub.topic, true)
p.rt.Join(sub.topic)
}
// make new if not there
if subs == nil {
p.mySubs[sub.topic] = make(map[*Subscription]struct{})
}
sub.cancelCh = p.cancelCh
p.mySubs[sub.topic][sub] = struct{}{}
req.resp <- sub
}
// handleAddRelay adds a relay for a particular topic. If it is
// the first relay and no subscriptions exist so far for the topic , it will
// announce that this node relays for the topic.
// Only called from processLoop.
func (p *PubSub) handleAddRelay(req *addRelayReq) {
topic := req.topic
p.myRelays[topic]++
// announce we want this topic if neither relays nor subs exist so far
if p.myRelays[topic] == 1 && len(p.mySubs[topic]) == 0 {
p.disc.Advertise(topic)
p.announce(topic, true)
p.rt.Join(topic)
}
// flag used to prevent calling cancel function multiple times
isCancelled := false
relayCancelFunc := func() {
if isCancelled {
return
}
select {
case p.rmRelay <- topic:
isCancelled = true
case <-p.ctx.Done():
}
}
req.resp <- relayCancelFunc
}
// handleRemoveRelay removes one relay reference from bookkeeping.
// If this was the last relay reference and no more subscriptions exist
// for a given topic, it will also announce that this node is not relaying
// for this topic anymore.
// Only called from processLoop.
func (p *PubSub) handleRemoveRelay(topic string) {
if p.myRelays[topic] == 0 {
return
}
p.myRelays[topic]--
if p.myRelays[topic] == 0 {
delete(p.myRelays, topic)
// stop announcing only if there are no more relays and subs
if len(p.mySubs[topic]) == 0 {
p.disc.StopAdvertise(topic)
p.announce(topic, false)
p.rt.Leave(topic)
}
}
}
// announce announces whether or not this node is interested in a given topic
// Only called from processLoop.
func (p *PubSub) announce(topic string, sub bool) {
subopt := &pb.RPC_SubOpts{
Topicid: &topic,
Subscribe: &sub,
}
out := rpcWithSubs(subopt)
for pid, peer := range p.peers {
err := peer.Push(out, false)
if err != nil {
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
p.tracer.DropRPC(out, pid)
go p.announceRetry(pid, topic, sub)
continue
}
p.tracer.SendRPC(out, pid)
}
}
func (p *PubSub) announceRetry(pid peer.ID, topic string, sub bool) {
time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond)
retry := func() {
_, okSubs := p.mySubs[topic]
_, okRelays := p.myRelays[topic]
ok := okSubs || okRelays
if (ok && sub) || (!ok && !sub) {
p.doAnnounceRetry(pid, topic, sub)
}
}
select {
case p.eval <- retry:
case <-p.ctx.Done():
}
}
func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
peer, ok := p.peers[pid]
if !ok {
return
}
subopt := &pb.RPC_SubOpts{
Topicid: &topic,
Subscribe: &sub,
}
out := rpcWithSubs(subopt)
err := peer.Push(out, false)
if err != nil {
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
p.tracer.DropRPC(out, pid)
go p.announceRetry(pid, topic, sub)
return
}
p.tracer.SendRPC(out, pid)
}
// notifySubs sends a given message to all corresponding subscribers.
// Only called from processLoop.
func (p *PubSub) notifySubs(msg *Message) {
topic := msg.GetTopic()
subs := p.mySubs[topic]
for f := range subs {
select {
case f.ch <- msg:
default:
p.tracer.UndeliverableMessage(msg)
log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic)
}
}
}
// seenMessage returns whether we already saw this message before
func (p *PubSub) seenMessage(id string) bool {
return p.seenMessages.Has(id)
}
// markSeen marks a message as seen such that seenMessage returns `true' for the given id
// returns true if the message was freshly marked
func (p *PubSub) markSeen(id string) bool {
return p.seenMessages.Add(id)
}
// subscribedToMessage returns whether we are subscribed to one of the topics
// of a given message
func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
if len(p.mySubs) == 0 {
return false
}
topic := msg.GetTopic()
_, ok := p.mySubs[topic]
return ok
}
// canRelayMsg returns whether we are able to relay for one of the topics
// of a given message
func (p *PubSub) canRelayMsg(msg *pb.Message) bool {
if len(p.myRelays) == 0 {
return false
}
topic := msg.GetTopic()
relays := p.myRelays[topic]
return relays > 0
}
func (p *PubSub) notifyLeave(topic string, pid peer.ID) {
if t, ok := p.myTopics[topic]; ok {
t.sendNotification(PeerEvent{PeerLeave, pid})
}
}
func (p *PubSub) handleIncomingRPC(rpc *RPC) {
// pass the rpc through app specific validation (if any available).
if p.appSpecificRpcInspector != nil {
// check if the RPC is allowed by the external inspector
if err := p.appSpecificRpcInspector(rpc.from, rpc); err != nil {
log.Debugf("application-specific inspection failed, rejecting incoming rpc: %s", err)
return // reject the RPC
}
}
p.tracer.RecvRPC(rpc)
subs := rpc.GetSubscriptions()
if len(subs) != 0 && p.subFilter != nil {
var err error
subs, err = p.subFilter.FilterIncomingSubscriptions(rpc.from, subs)
if err != nil {
log.Debugf("subscription filter error: %s; ignoring RPC", err)
return
}
}
for _, subopt := range subs {
t := subopt.GetTopicid()
if subopt.GetSubscribe() {
tmap, ok := p.topics[t]
if !ok {
tmap = make(map[peer.ID]struct{})
p.topics[t] = tmap
}
if _, ok = tmap[rpc.from]; !ok {
tmap[rpc.from] = struct{}{}
if topic, ok := p.myTopics[t]; ok {
peer := rpc.from
topic.sendNotification(PeerEvent{PeerJoin, peer})
}
}
} else {
tmap, ok := p.topics[t]
if !ok {
continue
}
if _, ok := tmap[rpc.from]; ok {
delete(tmap, rpc.from)
p.notifyLeave(t, rpc.from)
}
}
}
// ask the router to vet the peer before commiting any processing resources
switch p.rt.AcceptFrom(rpc.from) {
case AcceptNone:
log.Debugf("received RPC from router graylisted peer %s; dropping RPC", rpc.from)
return
case AcceptControl:
if len(rpc.GetPublish()) > 0 {
log.Debugf("peer %s was throttled by router; ignoring %d payload messages", rpc.from, len(rpc.GetPublish()))
}
p.tracer.ThrottlePeer(rpc.from)
case AcceptAll:
var toPush []*Message
for _, pmsg := range rpc.GetPublish() {
if !(p.subscribedToMsg(pmsg) || p.canRelayMsg(pmsg)) {
log.Debug("received message in topic we didn't subscribe to; ignoring message")
continue
}
msg := &Message{pmsg, "", rpc.from, nil, false}
if p.shouldPush(msg) {
toPush = append(toPush, msg)
}
}
p.rt.PreValidation(rpc.from, toPush)
for _, msg := range toPush {
p.pushMsg(msg)
}
}
p.rt.HandleRPC(rpc)
}
// DefaultMsgIdFn returns a unique ID of the passed Message
func DefaultMsgIdFn(pmsg *pb.Message) string {
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
}
// DefaultPeerFilter accepts all peers on all topics
func DefaultPeerFilter(pid peer.ID, topic string) bool {
return true
}
// shouldPush filters a message before validating and pushing it
// It returns true if the message can be further validated and pushed
func (p *PubSub) shouldPush(msg *Message) bool {
src := msg.ReceivedFrom
// reject messages from blacklisted peers
if p.blacklist.Contains(src) {
log.Debugf("dropping message from blacklisted peer %s", src)
p.tracer.RejectMessage(msg, RejectBlacklstedPeer)
return false
}
// even if they are forwarded by good peers
if p.blacklist.Contains(msg.GetFrom()) {
log.Debugf("dropping message from blacklisted source %s", src)
p.tracer.RejectMessage(msg, RejectBlacklistedSource)
return false
}
err := p.checkSigningPolicy(msg)
if err != nil {
log.Debugf("dropping message from %s: %s", src, err)
return false
}
// reject messages claiming to be from ourselves but not locally published
self := p.host.ID()
if peer.ID(msg.GetFrom()) == self && src != self {
log.Debugf("dropping message claiming to be from self but forwarded from %s", src)
p.tracer.RejectMessage(msg, RejectSelfOrigin)
return false
}
// have we already seen and validated this message?
id := p.idGen.ID(msg)
if p.seenMessage(id) {
p.tracer.DuplicateMessage(msg)
return false
}
return true
}
// pushMsg pushes a message performing validation as necessary
func (p *PubSub) pushMsg(msg *Message) {
src := msg.ReceivedFrom
id := p.idGen.ID(msg)
if !p.val.Push(src, msg) {
return
}
if p.markSeen(id) {
p.publishMessage(msg)
}
}
func (p *PubSub) checkSigningPolicy(msg *Message) error {
// reject unsigned messages when strict before we even process the id
if p.signPolicy.mustVerify() {
if p.signPolicy.mustSign() {
if msg.Signature == nil {
p.tracer.RejectMessage(msg, RejectMissingSignature)
return ValidationError{Reason: RejectMissingSignature}
}
// Actual signature verification happens in the validation pipeline,
// after checking if the message was already seen or not,
// to avoid unnecessary signature verification processing-cost.
} else {
if msg.Signature != nil {
p.tracer.RejectMessage(msg, RejectUnexpectedSignature)
return ValidationError{Reason: RejectUnexpectedSignature}
}
// If we are expecting signed messages, and not authoring messages,
// then do no accept seq numbers, from data, or key data.
// The default msgID function still relies on Seqno and From,
// but is not used if we are not authoring messages ourselves.
if p.signID == "" {
if msg.Seqno != nil || msg.From != nil || msg.Key != nil {
p.tracer.RejectMessage(msg, RejectUnexpectedAuthInfo)
return ValidationError{Reason: RejectUnexpectedAuthInfo}
}
}
}
}
return nil
}
func (p *PubSub) publishMessage(msg *Message) {
p.tracer.DeliverMessage(msg)
p.notifySubs(msg)
if !msg.Local {
p.rt.Publish(msg)
}
}
func (p *PubSub) publishMessageBatch(batchAndOpts messageBatchAndPublishOptions) {
for _, msg := range batchAndOpts.messages {
p.tracer.DeliverMessage(msg)
p.notifySubs(msg)
}
// We type checked when pushing the batch to the channel
p.rt.(BatchPublisher).PublishBatch(batchAndOpts.messages, batchAndOpts.opts)
}
type addTopicReq struct {
topic *Topic
resp chan *Topic
}
type rmTopicReq struct {
topic *Topic
resp chan error
}
type TopicOptions struct{}
type TopicOpt func(t *Topic) error
// WithTopicMessageIdFn sets custom MsgIdFunction for a Topic, enabling topics to have own msg id generation rules.
func WithTopicMessageIdFn(msgId MsgIdFunction) TopicOpt {
return func(t *Topic) error {
t.p.idGen.Set(t.topic, msgId)
return nil
}
}
// Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if
// the Topic handle already exists.
func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error) {
t, ok, err := p.tryJoin(topic, opts...)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("topic already exists")
}
return t, nil
}
// tryJoin is an internal function that tries to join a topic
// Returns the topic if it can be created or found
// Returns true if the topic was newly created, false otherwise
// Can be removed once pubsub.Publish() and pubsub.Subscribe() are removed
func (p *PubSub) tryJoin(topic string, opts ...TopicOpt) (*Topic, bool, error) {
if p.subFilter != nil && !p.subFilter.CanSubscribe(topic) {
return nil, false, fmt.Errorf("topic is not allowed by the subscription filter")
}
t := &Topic{
p: p,
topic: topic,
evtHandlers: make(map[*TopicEventHandler]struct{}),
}
for _, opt := range opts {
err := opt(t)
if err != nil {
return nil, false, err
}
}
resp := make(chan *Topic, 1)
select {
case t.p.addTopic <- &addTopicReq{
topic: t,
resp: resp,
}:
case <-t.p.ctx.Done():
return nil, false, t.p.ctx.Err()
}
returnedTopic := <-resp
if returnedTopic != t {
return returnedTopic, false, nil
}
return t, true, nil
}
type addSubReq struct {
sub *Subscription
resp chan *Subscription
}
type SubOpt func(sub *Subscription) error
// Subscribe returns a new Subscription for the given topic.
// Note that subscription is not an instantaneous operation. It may take some time
// before the subscription is processed by the pubsub main loop and propagated to our peers.
//
// Deprecated: use pubsub.Join() and topic.Subscribe() instead
func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) {
// ignore whether the topic was newly created or not, since either way we have a valid topic to work with
topicHandle, _, err := p.tryJoin(topic)
if err != nil {
return nil, err
}
return topicHandle.Subscribe(opts...)
}
// WithBufferSize is a Subscribe option to customize the size of the subscribe output buffer.
// The default length is 32 but it can be configured to avoid dropping messages if the consumer is not reading fast
// enough.
func WithBufferSize(size int) SubOpt {
return func(sub *Subscription) error {
sub.ch = make(chan *Message, size)
return nil
}
}
type topicReq struct {
resp chan []string
}
// GetTopics returns the topics this node is subscribed to.
func (p *PubSub) GetTopics() []string {
out := make(chan []string, 1)
select {
case p.getTopics <- &topicReq{resp: out}:
case <-p.ctx.Done():
return nil
}
return <-out
}
// Publish publishes data to the given topic.
//
// Deprecated: use pubsub.Join() and topic.Publish() instead
func (p *PubSub) Publish(topic string, data []byte, opts ...PubOpt) error {
// ignore whether the topic was newly created or not, since either way we have a valid topic to work with
t, _, err := p.tryJoin(topic)
if err != nil {
return err
}
return t.Publish(context.TODO(), data, opts...)
}
// PublishBatch publishes a batch of messages. This only works for routers that
// implement the BatchPublisher interface.
//
// Users should make sure there is enough space in the Peer's outbound queue to
// ensure messages are not dropped. WithPeerOutboundQueueSize should be set to
// at least the expected number of batched messages per peer plus some slack to
// account for gossip messages.
//
// The default publish strategy is RoundRobinMessageIDScheduler.
func (p *PubSub) PublishBatch(batch *MessageBatch, opts ...BatchPubOpt) error {
if _, ok := p.rt.(BatchPublisher); !ok {
return fmt.Errorf("pubsub router is not a BatchPublisher")
}
publishOptions := &BatchPublishOptions{}
for _, o := range opts {
err := o(publishOptions)
if err != nil {
return err
}
}
setDefaultBatchPublishOptions(publishOptions)
p.sendMessageBatch <- messageBatchAndPublishOptions{
messages: batch.messages,
opts: publishOptions,
}
// Clear the batch's messages in case a user reuses the same batch object
batch.messages = nil
return nil
}
func (p *PubSub) nextSeqno() []byte {
seqno := make([]byte, 8)
counter := atomic.AddUint64(&p.counter, 1)
binary.BigEndian.PutUint64(seqno, counter)
return seqno
}
type listPeerReq struct {
resp chan []peer.ID
topic string
}
// ListPeers returns a list of peers we are connected to in the given topic.
func (p *PubSub) ListPeers(topic string) []peer.ID {
out := make(chan []peer.ID)
select {
case p.getPeers <- &listPeerReq{
resp: out,
topic: topic,
}:
case <-p.ctx.Done():
return nil
}
return <-out
}
// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (p *PubSub) BlacklistPeer(pid peer.ID) {
select {
case p.blacklistPeer <- pid:
case <-p.ctx.Done():
}
}
// RegisterTopicValidator registers a validator for topic.
// By default validators are asynchronous, which means they will run in a separate goroutine.
// The number of active goroutines is controlled by global and per topic validator
// throttles; if it exceeds the throttle threshold, messages will be dropped.
func (p *PubSub) RegisterTopicValidator(topic string, val interface{}, opts ...ValidatorOpt) error {
addVal := &addValReq{
topic: topic,
validate: val,
resp: make(chan error, 1),
}
for _, opt := range opts {
err := opt(addVal)
if err != nil {
return err
}
}
select {
case p.addVal <- addVal:
case <-p.ctx.Done():
return p.ctx.Err()
}
return <-addVal.resp
}
// UnregisterTopicValidator removes a validator from a topic.
// Returns an error if there was no validator registered with the topic.
func (p *PubSub) UnregisterTopicValidator(topic string) error {
rmVal := &rmValReq{
topic: topic,
resp: make(chan error, 1),
}
select {
case p.rmVal <- rmVal:
case <-p.ctx.Done():
return p.ctx.Err()
}
return <-rmVal.resp
}
type RelayCancelFunc func()
type addRelayReq struct {
topic string
resp chan RelayCancelFunc
}