mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 12:53:09 +00:00
## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
1443 lines
39 KiB
Go
1443 lines
39 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
|
"github.com/libp2p/go-libp2p-pubsub/timecache"
|
|
|
|
"github.com/libp2p/go-libp2p/core/crypto"
|
|
"github.com/libp2p/go-libp2p/core/discovery"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/protocol"
|
|
|
|
logging "github.com/ipfs/go-log/v2"
|
|
)
|
|
|
|
// DefaultMaximumMessageSize is 1mb.
|
|
const DefaultMaxMessageSize = 1 << 20
|
|
|
|
var (
|
|
// TimeCacheDuration specifies how long a message ID will be remembered as seen.
|
|
// Use WithSeenMessagesTTL to configure this per pubsub instance, instead of overriding the global default.
|
|
TimeCacheDuration = 120 * time.Second
|
|
|
|
// TimeCacheStrategy specifies which type of lookup/cleanup strategy is used by the seen messages cache.
|
|
// Use WithSeenMessagesStrategy to configure this per pubsub instance, instead of overriding the global default.
|
|
TimeCacheStrategy = timecache.Strategy_FirstSeen
|
|
|
|
// ErrSubscriptionCancelled may be returned when a subscription Next() is called after the
|
|
// subscription has been cancelled.
|
|
ErrSubscriptionCancelled = errors.New("subscription cancelled")
|
|
)
|
|
|
|
var log = logging.Logger("pubsub")
|
|
|
|
type ProtocolMatchFn = func(protocol.ID) func(protocol.ID) bool
|
|
|
|
// PubSub is the implementation of the pubsub system.
|
|
type PubSub struct {
|
|
// atomic counter for seqnos
|
|
// NOTE: Must be declared at the top of the struct as we perform atomic
|
|
// operations on this field.
|
|
//
|
|
// See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
|
counter uint64
|
|
|
|
host host.Host
|
|
|
|
rt PubSubRouter
|
|
|
|
val *validation
|
|
|
|
disc *discover
|
|
|
|
tracer *pubsubTracer
|
|
|
|
peerFilter PeerFilter
|
|
|
|
// maxMessageSize is the maximum message size; it applies globally to all
|
|
// topics.
|
|
maxMessageSize int
|
|
|
|
// size of the outbound message channel that we maintain for each peer
|
|
peerOutboundQueueSize int
|
|
|
|
// incoming messages from other peers
|
|
incoming chan *RPC
|
|
|
|
// addSub is a control channel for us to add and remove subscriptions
|
|
addSub chan *addSubReq
|
|
|
|
// addRelay is a control channel for us to add and remove relays
|
|
addRelay chan *addRelayReq
|
|
|
|
// rmRelay is a relay cancellation channel
|
|
rmRelay chan string
|
|
|
|
// get list of topics we are subscribed to
|
|
getTopics chan *topicReq
|
|
|
|
// get chan of peers we are connected to
|
|
getPeers chan *listPeerReq
|
|
|
|
// send subscription here to cancel it
|
|
cancelCh chan *Subscription
|
|
|
|
// addSub is a channel for us to add a topic
|
|
addTopic chan *addTopicReq
|
|
|
|
// removeTopic is a topic cancellation channel
|
|
rmTopic chan *rmTopicReq
|
|
|
|
// a notification channel for new peer connections accumulated
|
|
newPeers chan struct{}
|
|
newPeersPrioLk sync.RWMutex
|
|
newPeersMx sync.Mutex
|
|
newPeersPend map[peer.ID]struct{}
|
|
|
|
// a notification channel for new outoging peer streams
|
|
newPeerStream chan network.Stream
|
|
|
|
// a notification channel for errors opening new peer streams
|
|
newPeerError chan peer.ID
|
|
|
|
// a notification channel for when our peers die
|
|
peerDead chan struct{}
|
|
peerDeadPrioLk sync.RWMutex
|
|
peerDeadMx sync.Mutex
|
|
peerDeadPend map[peer.ID]struct{}
|
|
// backoff for retrying new connections to dead peers
|
|
deadPeerBackoff *backoff
|
|
|
|
// The set of topics we are subscribed to
|
|
mySubs map[string]map[*Subscription]struct{}
|
|
|
|
// The set of topics we are relaying for
|
|
myRelays map[string]int
|
|
|
|
// The set of topics we are interested in
|
|
myTopics map[string]*Topic
|
|
|
|
// topics tracks which topics each of our peers are subscribed to
|
|
topics map[string]map[peer.ID]struct{}
|
|
|
|
// sendMsg handles messages that have been validated
|
|
sendMsg chan *Message
|
|
|
|
// addVal handles validator registration requests
|
|
addVal chan *addValReq
|
|
|
|
// rmVal handles validator unregistration requests
|
|
rmVal chan *rmValReq
|
|
|
|
// eval thunk in event loop
|
|
eval chan func()
|
|
|
|
// peer blacklist
|
|
blacklist Blacklist
|
|
blacklistPeer chan peer.ID
|
|
|
|
peers map[peer.ID]*rpcQueue
|
|
|
|
inboundStreamsMx sync.Mutex
|
|
inboundStreams map[peer.ID]network.Stream
|
|
|
|
seenMessages timecache.TimeCache
|
|
seenMsgTTL time.Duration
|
|
seenMsgStrategy timecache.Strategy
|
|
|
|
// generator used to compute the ID for a message
|
|
idGen *msgIDGenerator
|
|
|
|
// key for signing messages; nil when signing is disabled
|
|
signKey crypto.PrivKey
|
|
// source ID for signed messages; corresponds to signKey, empty when signing is disabled.
|
|
// If empty, the author and seq-nr are completely omitted from the messages.
|
|
signID peer.ID
|
|
// strict mode rejects all unsigned messages prior to validation
|
|
signPolicy MessageSignaturePolicy
|
|
|
|
// filter for tracking subscriptions in topics of interest; if nil, then we track all subscriptions
|
|
subFilter SubscriptionFilter
|
|
|
|
// protoMatchFunc is a matching function for protocol selection.
|
|
protoMatchFunc ProtocolMatchFn
|
|
|
|
ctx context.Context
|
|
|
|
// appSpecificRpcInspector is an auxiliary that may be set by the application to inspect incoming RPCs prior to
|
|
// processing them. The inspector is invoked on an accepted RPC right prior to handling it.
|
|
// The return value of the inspector function is an error indicating whether the RPC should be processed or not.
|
|
// If the error is nil, the RPC is processed as usual. If the error is non-nil, the RPC is dropped.
|
|
appSpecificRpcInspector func(peer.ID, *RPC) error
|
|
}
|
|
|
|
// PubSubRouter is the message router component of PubSub.
|
|
type PubSubRouter interface {
|
|
// Protocols returns the list of protocols supported by the router.
|
|
Protocols() []protocol.ID
|
|
// Attach is invoked by the PubSub constructor to attach the router to a
|
|
// freshly initialized PubSub instance.
|
|
Attach(*PubSub)
|
|
// AddPeer notifies the router that a new peer has been connected.
|
|
AddPeer(peer.ID, protocol.ID)
|
|
// RemovePeer notifies the router that a peer has been disconnected.
|
|
RemovePeer(peer.ID)
|
|
// EnoughPeers returns whether the router needs more peers before it's ready to publish new records.
|
|
// Suggested (if greater than 0) is a suggested number of peers that the router should need.
|
|
EnoughPeers(topic string, suggested int) bool
|
|
// AcceptFrom is invoked on any RPC envelope before pushing it to the validation pipeline
|
|
// or processing control information.
|
|
// Allows routers with internal scoring to vet peers before committing any processing resources
|
|
// to the message and implement an effective graylist and react to validation queue overload.
|
|
AcceptFrom(peer.ID) AcceptStatus
|
|
// PreValidation is invoked on messages in the RPC envelope right before pushing it to
|
|
// the validation pipeline
|
|
PreValidation([]*Message)
|
|
// HandleRPC is invoked to process control messages in the RPC envelope.
|
|
// It is invoked after subscriptions and payload messages have been processed.
|
|
HandleRPC(*RPC)
|
|
// Publish is invoked to forward a new message that has been validated.
|
|
Publish(*Message)
|
|
// Join notifies the router that we want to receive and forward messages in a topic.
|
|
// It is invoked after the subscription announcement.
|
|
Join(topic string)
|
|
// Leave notifies the router that we are no longer interested in a topic.
|
|
// It is invoked after the unsubscription announcement.
|
|
Leave(topic string)
|
|
}
|
|
|
|
type AcceptStatus int
|
|
|
|
const (
|
|
// AcceptNone signals to drop the incoming RPC
|
|
AcceptNone AcceptStatus = iota
|
|
// AcceptControl signals to accept the incoming RPC only for control message processing by
|
|
// the router. Included payload messages will _not_ be pushed to the validation queue.
|
|
AcceptControl
|
|
// AcceptAll signals to accept the incoming RPC for full processing
|
|
AcceptAll
|
|
)
|
|
|
|
type Message struct {
|
|
*pb.Message
|
|
ID string
|
|
ReceivedFrom peer.ID
|
|
ValidatorData interface{}
|
|
Local bool
|
|
}
|
|
|
|
func (m *Message) GetFrom() peer.ID {
|
|
return peer.ID(m.Message.GetFrom())
|
|
}
|
|
|
|
type RPC struct {
|
|
pb.RPC
|
|
|
|
// unexported on purpose, not sending this over the wire
|
|
from peer.ID
|
|
}
|
|
|
|
type Option func(*PubSub) error
|
|
|
|
// NewPubSub returns a new PubSub management object.
|
|
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) {
|
|
ps := &PubSub{
|
|
host: h,
|
|
ctx: ctx,
|
|
rt: rt,
|
|
val: newValidation(),
|
|
peerFilter: DefaultPeerFilter,
|
|
disc: &discover{},
|
|
maxMessageSize: DefaultMaxMessageSize,
|
|
peerOutboundQueueSize: 32,
|
|
signID: h.ID(),
|
|
signKey: nil,
|
|
signPolicy: StrictSign,
|
|
incoming: make(chan *RPC, 32),
|
|
newPeers: make(chan struct{}, 1),
|
|
newPeersPend: make(map[peer.ID]struct{}),
|
|
newPeerStream: make(chan network.Stream),
|
|
newPeerError: make(chan peer.ID),
|
|
peerDead: make(chan struct{}, 1),
|
|
peerDeadPend: make(map[peer.ID]struct{}),
|
|
deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts),
|
|
cancelCh: make(chan *Subscription),
|
|
getPeers: make(chan *listPeerReq),
|
|
addSub: make(chan *addSubReq),
|
|
addRelay: make(chan *addRelayReq),
|
|
rmRelay: make(chan string),
|
|
addTopic: make(chan *addTopicReq),
|
|
rmTopic: make(chan *rmTopicReq),
|
|
getTopics: make(chan *topicReq),
|
|
sendMsg: make(chan *Message, 32),
|
|
addVal: make(chan *addValReq),
|
|
rmVal: make(chan *rmValReq),
|
|
eval: make(chan func()),
|
|
myTopics: make(map[string]*Topic),
|
|
mySubs: make(map[string]map[*Subscription]struct{}),
|
|
myRelays: make(map[string]int),
|
|
topics: make(map[string]map[peer.ID]struct{}),
|
|
peers: make(map[peer.ID]*rpcQueue),
|
|
inboundStreams: make(map[peer.ID]network.Stream),
|
|
blacklist: NewMapBlacklist(),
|
|
blacklistPeer: make(chan peer.ID),
|
|
seenMsgTTL: TimeCacheDuration,
|
|
seenMsgStrategy: TimeCacheStrategy,
|
|
idGen: newMsgIdGenerator(),
|
|
counter: uint64(time.Now().UnixNano()),
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
err := opt(ps)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if ps.signPolicy.mustSign() {
|
|
if ps.signID == "" {
|
|
return nil, fmt.Errorf("strict signature usage enabled but message author was disabled")
|
|
}
|
|
ps.signKey = ps.host.Peerstore().PrivKey(ps.signID)
|
|
if ps.signKey == nil {
|
|
return nil, fmt.Errorf("can't sign for peer %s: no private key", ps.signID)
|
|
}
|
|
}
|
|
|
|
ps.seenMessages = timecache.NewTimeCacheWithStrategy(ps.seenMsgStrategy, ps.seenMsgTTL)
|
|
|
|
if err := ps.disc.Start(ps); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rt.Attach(ps)
|
|
|
|
for _, id := range rt.Protocols() {
|
|
if ps.protoMatchFunc != nil {
|
|
h.SetStreamHandlerMatch(id, ps.protoMatchFunc(id), ps.handleNewStream)
|
|
} else {
|
|
h.SetStreamHandler(id, ps.handleNewStream)
|
|
}
|
|
}
|
|
go ps.watchForNewPeers(ctx)
|
|
|
|
ps.val.Start(ps)
|
|
|
|
go ps.processLoop(ctx)
|
|
|
|
return ps, nil
|
|
}
|
|
|
|
// MsgIdFunction returns a unique ID for the passed Message, and PubSub can be customized to use any
|
|
// implementation of this function by configuring it with the Option from WithMessageIdFn.
|
|
type MsgIdFunction func(pmsg *pb.Message) string
|
|
|
|
// WithMessageIdFn is an option to customize the way a message ID is computed for a pubsub message.
|
|
// The default ID function is DefaultMsgIdFn (concatenate source and seq nr.),
|
|
// but it can be customized to e.g. the hash of the message.
|
|
func WithMessageIdFn(fn MsgIdFunction) Option {
|
|
return func(p *PubSub) error {
|
|
p.idGen.Default = fn
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// PeerFilter is used to filter pubsub peers. It should return true for peers that are accepted for
|
|
// a given topic. PubSub can be customized to use any implementation of this function by configuring
|
|
// it with the Option from WithPeerFilter.
|
|
type PeerFilter func(pid peer.ID, topic string) bool
|
|
|
|
// WithPeerFilter is an option to set a filter for pubsub peers.
|
|
// The default peer filter is DefaultPeerFilter (which always returns true), but it can be customized
|
|
// to any custom implementation.
|
|
func WithPeerFilter(filter PeerFilter) Option {
|
|
return func(p *PubSub) error {
|
|
p.peerFilter = filter
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer
|
|
// We start dropping messages to a peer if the outbound queue if full
|
|
func WithPeerOutboundQueueSize(size int) Option {
|
|
return func(p *PubSub) error {
|
|
if size <= 0 {
|
|
return errors.New("outbound queue size must always be positive")
|
|
}
|
|
p.peerOutboundQueueSize = size
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithMessageSignaturePolicy sets the mode of operation for producing and verifying message signatures.
|
|
func WithMessageSignaturePolicy(policy MessageSignaturePolicy) Option {
|
|
return func(p *PubSub) error {
|
|
p.signPolicy = policy
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithMessageSigning enables or disables message signing (enabled by default).
|
|
// Deprecated: signature verification without message signing,
|
|
// or message signing without verification, are not recommended.
|
|
func WithMessageSigning(enabled bool) Option {
|
|
return func(p *PubSub) error {
|
|
if enabled {
|
|
p.signPolicy |= msgSigning
|
|
} else {
|
|
p.signPolicy &^= msgSigning
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithMessageAuthor sets the author for outbound messages to the given peer ID
|
|
// (defaults to the host's ID). If message signing is enabled, the private key
|
|
// must be available in the host's peerstore.
|
|
func WithMessageAuthor(author peer.ID) Option {
|
|
return func(p *PubSub) error {
|
|
author := author
|
|
if author == "" {
|
|
author = p.host.ID()
|
|
}
|
|
p.signID = author
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithNoAuthor omits the author and seq-number data of messages, and disables the use of signatures.
|
|
// Not recommended to use with the default message ID function, see WithMessageIdFn.
|
|
func WithNoAuthor() Option {
|
|
return func(p *PubSub) error {
|
|
p.signID = ""
|
|
p.signPolicy &^= msgSigning
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithStrictSignatureVerification is an option to enable or disable strict message signing.
|
|
// When enabled (which is the default), unsigned messages will be discarded.
|
|
// Deprecated: signature verification without message signing,
|
|
// or message signing without verification, are not recommended.
|
|
func WithStrictSignatureVerification(required bool) Option {
|
|
return func(p *PubSub) error {
|
|
if required {
|
|
p.signPolicy |= msgVerification
|
|
} else {
|
|
p.signPolicy &^= msgVerification
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithBlacklist provides an implementation of the blacklist; the default is a
|
|
// MapBlacklist
|
|
func WithBlacklist(b Blacklist) Option {
|
|
return func(p *PubSub) error {
|
|
p.blacklist = b
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithDiscovery provides a discovery mechanism used to bootstrap and provide peers into PubSub
|
|
func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option {
|
|
return func(p *PubSub) error {
|
|
discoverOpts := defaultDiscoverOptions()
|
|
for _, opt := range opts {
|
|
err := opt(discoverOpts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
p.disc.discovery = &pubSubDiscovery{Discovery: d, opts: discoverOpts.opts}
|
|
p.disc.options = discoverOpts
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithEventTracer provides a tracer for the pubsub system
|
|
func WithEventTracer(tracer EventTracer) Option {
|
|
return func(p *PubSub) error {
|
|
if p.tracer != nil {
|
|
p.tracer.tracer = tracer
|
|
} else {
|
|
p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID(), idGen: p.idGen}
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithRawTracer adds a raw tracer to the pubsub system.
|
|
// Multiple tracers can be added using multiple invocations of the option.
|
|
func WithRawTracer(tracer RawTracer) Option {
|
|
return func(p *PubSub) error {
|
|
if p.tracer != nil {
|
|
p.tracer.raw = append(p.tracer.raw, tracer)
|
|
} else {
|
|
p.tracer = &pubsubTracer{raw: []RawTracer{tracer}, pid: p.host.ID(), idGen: p.idGen}
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithMaxMessageSize sets the global maximum message size for pubsub wire
|
|
// messages. The default value is 1MiB (DefaultMaxMessageSize).
|
|
//
|
|
// Observe the following warnings when setting this option.
|
|
//
|
|
// WARNING #1: Make sure to change the default protocol prefixes for floodsub
|
|
// (FloodSubID) and gossipsub (GossipSubID). This avoids accidentally joining
|
|
// the public default network, which uses the default max message size, and
|
|
// therefore will cause messages to be dropped.
|
|
//
|
|
// WARNING #2: Reducing the default max message limit is fine, if you are
|
|
// certain that your application messages will not exceed the new limit.
|
|
// However, be wary of increasing the limit, as pubsub networks are naturally
|
|
// write-amplifying, i.e. for every message we receive, we send D copies of the
|
|
// message to our peers. If those messages are large, the bandwidth requirements
|
|
// will grow linearly. Note that propagation is sent on the uplink, which
|
|
// traditionally is more constrained than the downlink. Instead, consider
|
|
// out-of-band retrieval for large messages, by sending a CID (Content-ID) or
|
|
// another type of locator, such that messages can be fetched on-demand, rather
|
|
// than being pushed proactively. Under this design, you'd use the pubsub layer
|
|
// as a signalling system, rather than a data delivery system.
|
|
func WithMaxMessageSize(maxMessageSize int) Option {
|
|
return func(ps *PubSub) error {
|
|
ps.maxMessageSize = maxMessageSize
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithProtocolMatchFn sets a custom matching function for protocol selection to
|
|
// be used by the protocol handler on the Host's Mux. Should be combined with
|
|
// WithGossipSubProtocols feature function for checking if certain protocol features
|
|
// are supported
|
|
func WithProtocolMatchFn(m ProtocolMatchFn) Option {
|
|
return func(ps *PubSub) error {
|
|
ps.protoMatchFunc = m
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithSeenMessagesTTL configures when a previously seen message ID can be forgotten about
|
|
func WithSeenMessagesTTL(ttl time.Duration) Option {
|
|
return func(ps *PubSub) error {
|
|
ps.seenMsgTTL = ttl
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithSeenMessagesStrategy configures which type of lookup/cleanup strategy is used by the seen messages cache
|
|
func WithSeenMessagesStrategy(strategy timecache.Strategy) Option {
|
|
return func(ps *PubSub) error {
|
|
ps.seenMsgStrategy = strategy
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithAppSpecificRpcInspector sets a hook that inspect incomings RPCs prior to
|
|
// processing them. The inspector is invoked on an accepted RPC just before it
|
|
// is handled. If inspector's error is nil, the RPC is handled. Otherwise, it
|
|
// is dropped.
|
|
func WithAppSpecificRpcInspector(inspector func(peer.ID, *RPC) error) Option {
|
|
return func(ps *PubSub) error {
|
|
ps.appSpecificRpcInspector = inspector
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// processLoop handles all inputs arriving on the channels
|
|
func (p *PubSub) processLoop(ctx context.Context) {
|
|
defer func() {
|
|
// Clean up go routines.
|
|
for _, queue := range p.peers {
|
|
queue.Close()
|
|
}
|
|
p.peers = nil
|
|
p.topics = nil
|
|
p.seenMessages.Done()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-p.newPeers:
|
|
p.handlePendingPeers()
|
|
|
|
case s := <-p.newPeerStream:
|
|
pid := s.Conn().RemotePeer()
|
|
|
|
q, ok := p.peers[pid]
|
|
if !ok {
|
|
log.Warn("new stream for unknown peer: ", pid)
|
|
s.Reset()
|
|
continue
|
|
}
|
|
|
|
if p.blacklist.Contains(pid) {
|
|
log.Warn("closing stream for blacklisted peer: ", pid)
|
|
q.Close()
|
|
delete(p.peers, pid)
|
|
s.Reset()
|
|
continue
|
|
}
|
|
|
|
p.rt.AddPeer(pid, s.Protocol())
|
|
|
|
case pid := <-p.newPeerError:
|
|
delete(p.peers, pid)
|
|
|
|
case <-p.peerDead:
|
|
p.handleDeadPeers()
|
|
|
|
case treq := <-p.getTopics:
|
|
var out []string
|
|
for t := range p.mySubs {
|
|
out = append(out, t)
|
|
}
|
|
treq.resp <- out
|
|
case topic := <-p.addTopic:
|
|
p.handleAddTopic(topic)
|
|
case topic := <-p.rmTopic:
|
|
p.handleRemoveTopic(topic)
|
|
case sub := <-p.cancelCh:
|
|
p.handleRemoveSubscription(sub)
|
|
case sub := <-p.addSub:
|
|
p.handleAddSubscription(sub)
|
|
case relay := <-p.addRelay:
|
|
p.handleAddRelay(relay)
|
|
case topic := <-p.rmRelay:
|
|
p.handleRemoveRelay(topic)
|
|
case preq := <-p.getPeers:
|
|
tmap, ok := p.topics[preq.topic]
|
|
if preq.topic != "" && !ok {
|
|
preq.resp <- nil
|
|
continue
|
|
}
|
|
var peers []peer.ID
|
|
for p := range p.peers {
|
|
if preq.topic != "" {
|
|
_, ok := tmap[p]
|
|
if !ok {
|
|
continue
|
|
}
|
|
}
|
|
peers = append(peers, p)
|
|
}
|
|
preq.resp <- peers
|
|
case rpc := <-p.incoming:
|
|
p.handleIncomingRPC(rpc)
|
|
|
|
case msg := <-p.sendMsg:
|
|
p.publishMessage(msg)
|
|
|
|
case req := <-p.addVal:
|
|
p.val.AddValidator(req)
|
|
|
|
case req := <-p.rmVal:
|
|
p.val.RemoveValidator(req)
|
|
|
|
case thunk := <-p.eval:
|
|
thunk()
|
|
|
|
case pid := <-p.blacklistPeer:
|
|
log.Infof("Blacklisting peer %s", pid)
|
|
p.blacklist.Add(pid)
|
|
|
|
q, ok := p.peers[pid]
|
|
if ok {
|
|
q.Close()
|
|
delete(p.peers, pid)
|
|
for t, tmap := range p.topics {
|
|
if _, ok := tmap[pid]; ok {
|
|
delete(tmap, pid)
|
|
p.notifyLeave(t, pid)
|
|
}
|
|
}
|
|
p.rt.RemovePeer(pid)
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
log.Info("pubsub processloop shutting down")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *PubSub) handlePendingPeers() {
|
|
p.newPeersPrioLk.Lock()
|
|
|
|
if len(p.newPeersPend) == 0 {
|
|
p.newPeersPrioLk.Unlock()
|
|
return
|
|
}
|
|
|
|
newPeers := p.newPeersPend
|
|
p.newPeersPend = make(map[peer.ID]struct{})
|
|
p.newPeersPrioLk.Unlock()
|
|
|
|
for pid := range newPeers {
|
|
// Make sure we have a non-limited connection. We do this late because we may have
|
|
// disconnected in the meantime.
|
|
if p.host.Network().Connectedness(pid) != network.Connected {
|
|
continue
|
|
}
|
|
|
|
if _, ok := p.peers[pid]; ok {
|
|
log.Debug("already have connection to peer: ", pid)
|
|
continue
|
|
}
|
|
|
|
if p.blacklist.Contains(pid) {
|
|
log.Warn("ignoring connection from blacklisted peer: ", pid)
|
|
continue
|
|
}
|
|
|
|
rpcQueue := newRpcQueue(p.peerOutboundQueueSize)
|
|
rpcQueue.Push(p.getHelloPacket(), true)
|
|
go p.handleNewPeer(p.ctx, pid, rpcQueue)
|
|
p.peers[pid] = rpcQueue
|
|
}
|
|
}
|
|
|
|
func (p *PubSub) handleDeadPeers() {
|
|
p.peerDeadPrioLk.Lock()
|
|
|
|
if len(p.peerDeadPend) == 0 {
|
|
p.peerDeadPrioLk.Unlock()
|
|
return
|
|
}
|
|
|
|
deadPeers := p.peerDeadPend
|
|
p.peerDeadPend = make(map[peer.ID]struct{})
|
|
p.peerDeadPrioLk.Unlock()
|
|
|
|
for pid := range deadPeers {
|
|
q, ok := p.peers[pid]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
q.Close()
|
|
delete(p.peers, pid)
|
|
|
|
for t, tmap := range p.topics {
|
|
if _, ok := tmap[pid]; ok {
|
|
delete(tmap, pid)
|
|
p.notifyLeave(t, pid)
|
|
}
|
|
}
|
|
|
|
p.rt.RemovePeer(pid)
|
|
|
|
if p.host.Network().Connectedness(pid) == network.Connected {
|
|
backoffDelay, err := p.deadPeerBackoff.updateAndGet(pid)
|
|
if err != nil {
|
|
log.Debug(err)
|
|
continue
|
|
}
|
|
|
|
// still connected, must be a duplicate connection being closed.
|
|
// we respawn the writer as we need to ensure there is a stream active
|
|
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
|
|
rpcQueue := newRpcQueue(p.peerOutboundQueueSize)
|
|
rpcQueue.Push(p.getHelloPacket(), true)
|
|
p.peers[pid] = rpcQueue
|
|
go p.handleNewPeerWithBackoff(p.ctx, pid, backoffDelay, rpcQueue)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleAddTopic adds a tracker for a particular topic.
|
|
// Only called from processLoop.
|
|
func (p *PubSub) handleAddTopic(req *addTopicReq) {
|
|
topic := req.topic
|
|
topicID := topic.topic
|
|
|
|
t, ok := p.myTopics[topicID]
|
|
if ok {
|
|
req.resp <- t
|
|
return
|
|
}
|
|
|
|
p.myTopics[topicID] = topic
|
|
req.resp <- topic
|
|
}
|
|
|
|
// handleRemoveTopic removes Topic tracker from bookkeeping.
|
|
// Only called from processLoop.
|
|
func (p *PubSub) handleRemoveTopic(req *rmTopicReq) {
|
|
topic := p.myTopics[req.topic.topic]
|
|
|
|
if topic == nil {
|
|
req.resp <- nil
|
|
return
|
|
}
|
|
|
|
if len(topic.evtHandlers) == 0 &&
|
|
len(p.mySubs[req.topic.topic]) == 0 &&
|
|
p.myRelays[req.topic.topic] == 0 {
|
|
delete(p.myTopics, topic.topic)
|
|
req.resp <- nil
|
|
return
|
|
}
|
|
|
|
req.resp <- fmt.Errorf("cannot close topic: outstanding event handlers or subscriptions")
|
|
}
|
|
|
|
// handleRemoveSubscription removes Subscription sub from bookeeping.
|
|
// If this was the last subscription and no more relays exist for a given topic,
|
|
// it will also announce that this node is not subscribing to this topic anymore.
|
|
// Only called from processLoop.
|
|
func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
|
|
subs := p.mySubs[sub.topic]
|
|
|
|
if subs == nil {
|
|
return
|
|
}
|
|
|
|
sub.err = ErrSubscriptionCancelled
|
|
sub.close()
|
|
delete(subs, sub)
|
|
|
|
if len(subs) == 0 {
|
|
delete(p.mySubs, sub.topic)
|
|
|
|
// stop announcing only if there are no more subs and relays
|
|
if p.myRelays[sub.topic] == 0 {
|
|
p.disc.StopAdvertise(sub.topic)
|
|
p.announce(sub.topic, false)
|
|
p.rt.Leave(sub.topic)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleAddSubscription adds a Subscription for a particular topic. If it is
|
|
// the first subscription and no relays exist so far for the topic, it will
|
|
// announce that this node subscribes to the topic.
|
|
// Only called from processLoop.
|
|
func (p *PubSub) handleAddSubscription(req *addSubReq) {
|
|
sub := req.sub
|
|
subs := p.mySubs[sub.topic]
|
|
|
|
// announce we want this topic if neither subs nor relays exist so far
|
|
if len(subs) == 0 && p.myRelays[sub.topic] == 0 {
|
|
p.disc.Advertise(sub.topic)
|
|
p.announce(sub.topic, true)
|
|
p.rt.Join(sub.topic)
|
|
}
|
|
|
|
// make new if not there
|
|
if subs == nil {
|
|
p.mySubs[sub.topic] = make(map[*Subscription]struct{})
|
|
}
|
|
|
|
sub.cancelCh = p.cancelCh
|
|
|
|
p.mySubs[sub.topic][sub] = struct{}{}
|
|
|
|
req.resp <- sub
|
|
}
|
|
|
|
// handleAddRelay adds a relay for a particular topic. If it is
|
|
// the first relay and no subscriptions exist so far for the topic , it will
|
|
// announce that this node relays for the topic.
|
|
// Only called from processLoop.
|
|
func (p *PubSub) handleAddRelay(req *addRelayReq) {
|
|
topic := req.topic
|
|
|
|
p.myRelays[topic]++
|
|
|
|
// announce we want this topic if neither relays nor subs exist so far
|
|
if p.myRelays[topic] == 1 && len(p.mySubs[topic]) == 0 {
|
|
p.disc.Advertise(topic)
|
|
p.announce(topic, true)
|
|
p.rt.Join(topic)
|
|
}
|
|
|
|
// flag used to prevent calling cancel function multiple times
|
|
isCancelled := false
|
|
|
|
relayCancelFunc := func() {
|
|
if isCancelled {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case p.rmRelay <- topic:
|
|
isCancelled = true
|
|
case <-p.ctx.Done():
|
|
}
|
|
}
|
|
|
|
req.resp <- relayCancelFunc
|
|
}
|
|
|
|
// handleRemoveRelay removes one relay reference from bookkeeping.
|
|
// If this was the last relay reference and no more subscriptions exist
|
|
// for a given topic, it will also announce that this node is not relaying
|
|
// for this topic anymore.
|
|
// Only called from processLoop.
|
|
func (p *PubSub) handleRemoveRelay(topic string) {
|
|
if p.myRelays[topic] == 0 {
|
|
return
|
|
}
|
|
|
|
p.myRelays[topic]--
|
|
|
|
if p.myRelays[topic] == 0 {
|
|
delete(p.myRelays, topic)
|
|
|
|
// stop announcing only if there are no more relays and subs
|
|
if len(p.mySubs[topic]) == 0 {
|
|
p.disc.StopAdvertise(topic)
|
|
p.announce(topic, false)
|
|
p.rt.Leave(topic)
|
|
}
|
|
}
|
|
}
|
|
|
|
// announce announces whether or not this node is interested in a given topic
|
|
// Only called from processLoop.
|
|
func (p *PubSub) announce(topic string, sub bool) {
|
|
subopt := &pb.RPC_SubOpts{
|
|
Topicid: &topic,
|
|
Subscribe: &sub,
|
|
}
|
|
|
|
out := rpcWithSubs(subopt)
|
|
for pid, peer := range p.peers {
|
|
err := peer.Push(out, false)
|
|
if err != nil {
|
|
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
|
|
p.tracer.DropRPC(out, pid)
|
|
go p.announceRetry(pid, topic, sub)
|
|
continue
|
|
}
|
|
p.tracer.SendRPC(out, pid)
|
|
}
|
|
}
|
|
|
|
func (p *PubSub) announceRetry(pid peer.ID, topic string, sub bool) {
|
|
time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond)
|
|
|
|
retry := func() {
|
|
_, okSubs := p.mySubs[topic]
|
|
_, okRelays := p.myRelays[topic]
|
|
|
|
ok := okSubs || okRelays
|
|
|
|
if (ok && sub) || (!ok && !sub) {
|
|
p.doAnnounceRetry(pid, topic, sub)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case p.eval <- retry:
|
|
case <-p.ctx.Done():
|
|
}
|
|
}
|
|
|
|
func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
|
|
peer, ok := p.peers[pid]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
subopt := &pb.RPC_SubOpts{
|
|
Topicid: &topic,
|
|
Subscribe: &sub,
|
|
}
|
|
|
|
out := rpcWithSubs(subopt)
|
|
err := peer.Push(out, false)
|
|
if err != nil {
|
|
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
|
|
p.tracer.DropRPC(out, pid)
|
|
go p.announceRetry(pid, topic, sub)
|
|
return
|
|
}
|
|
p.tracer.SendRPC(out, pid)
|
|
}
|
|
|
|
// notifySubs sends a given message to all corresponding subscribers.
|
|
// Only called from processLoop.
|
|
func (p *PubSub) notifySubs(msg *Message) {
|
|
topic := msg.GetTopic()
|
|
subs := p.mySubs[topic]
|
|
for f := range subs {
|
|
select {
|
|
case f.ch <- msg:
|
|
default:
|
|
p.tracer.UndeliverableMessage(msg)
|
|
log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic)
|
|
}
|
|
}
|
|
}
|
|
|
|
// seenMessage returns whether we already saw this message before
|
|
func (p *PubSub) seenMessage(id string) bool {
|
|
return p.seenMessages.Has(id)
|
|
}
|
|
|
|
// markSeen marks a message as seen such that seenMessage returns `true' for the given id
|
|
// returns true if the message was freshly marked
|
|
func (p *PubSub) markSeen(id string) bool {
|
|
return p.seenMessages.Add(id)
|
|
}
|
|
|
|
// subscribedToMessage returns whether we are subscribed to one of the topics
|
|
// of a given message
|
|
func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
|
|
if len(p.mySubs) == 0 {
|
|
return false
|
|
}
|
|
|
|
topic := msg.GetTopic()
|
|
_, ok := p.mySubs[topic]
|
|
|
|
return ok
|
|
}
|
|
|
|
// canRelayMsg returns whether we are able to relay for one of the topics
|
|
// of a given message
|
|
func (p *PubSub) canRelayMsg(msg *pb.Message) bool {
|
|
if len(p.myRelays) == 0 {
|
|
return false
|
|
}
|
|
|
|
topic := msg.GetTopic()
|
|
relays := p.myRelays[topic]
|
|
|
|
return relays > 0
|
|
}
|
|
|
|
func (p *PubSub) notifyLeave(topic string, pid peer.ID) {
|
|
if t, ok := p.myTopics[topic]; ok {
|
|
t.sendNotification(PeerEvent{PeerLeave, pid})
|
|
}
|
|
}
|
|
|
|
func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
|
// pass the rpc through app specific validation (if any available).
|
|
if p.appSpecificRpcInspector != nil {
|
|
// check if the RPC is allowed by the external inspector
|
|
if err := p.appSpecificRpcInspector(rpc.from, rpc); err != nil {
|
|
log.Debugf("application-specific inspection failed, rejecting incoming rpc: %s", err)
|
|
return // reject the RPC
|
|
}
|
|
}
|
|
|
|
p.tracer.RecvRPC(rpc)
|
|
|
|
subs := rpc.GetSubscriptions()
|
|
if len(subs) != 0 && p.subFilter != nil {
|
|
var err error
|
|
subs, err = p.subFilter.FilterIncomingSubscriptions(rpc.from, subs)
|
|
if err != nil {
|
|
log.Debugf("subscription filter error: %s; ignoring RPC", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
for _, subopt := range subs {
|
|
t := subopt.GetTopicid()
|
|
|
|
if subopt.GetSubscribe() {
|
|
tmap, ok := p.topics[t]
|
|
if !ok {
|
|
tmap = make(map[peer.ID]struct{})
|
|
p.topics[t] = tmap
|
|
}
|
|
|
|
if _, ok = tmap[rpc.from]; !ok {
|
|
tmap[rpc.from] = struct{}{}
|
|
if topic, ok := p.myTopics[t]; ok {
|
|
peer := rpc.from
|
|
topic.sendNotification(PeerEvent{PeerJoin, peer})
|
|
}
|
|
}
|
|
} else {
|
|
tmap, ok := p.topics[t]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if _, ok := tmap[rpc.from]; ok {
|
|
delete(tmap, rpc.from)
|
|
p.notifyLeave(t, rpc.from)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ask the router to vet the peer before commiting any processing resources
|
|
switch p.rt.AcceptFrom(rpc.from) {
|
|
case AcceptNone:
|
|
log.Debugf("received RPC from router graylisted peer %s; dropping RPC", rpc.from)
|
|
return
|
|
|
|
case AcceptControl:
|
|
if len(rpc.GetPublish()) > 0 {
|
|
log.Debugf("peer %s was throttled by router; ignoring %d payload messages", rpc.from, len(rpc.GetPublish()))
|
|
}
|
|
p.tracer.ThrottlePeer(rpc.from)
|
|
|
|
case AcceptAll:
|
|
var toPush []*Message
|
|
for _, pmsg := range rpc.GetPublish() {
|
|
if !(p.subscribedToMsg(pmsg) || p.canRelayMsg(pmsg)) {
|
|
log.Debug("received message in topic we didn't subscribe to; ignoring message")
|
|
continue
|
|
}
|
|
|
|
msg := &Message{pmsg, "", rpc.from, nil, false}
|
|
if p.shouldPush(msg) {
|
|
toPush = append(toPush, msg)
|
|
}
|
|
}
|
|
p.rt.PreValidation(toPush)
|
|
for _, msg := range toPush {
|
|
p.pushMsg(msg)
|
|
}
|
|
}
|
|
|
|
p.rt.HandleRPC(rpc)
|
|
}
|
|
|
|
// DefaultMsgIdFn returns a unique ID of the passed Message
|
|
func DefaultMsgIdFn(pmsg *pb.Message) string {
|
|
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
|
|
}
|
|
|
|
// DefaultPeerFilter accepts all peers on all topics
|
|
func DefaultPeerFilter(pid peer.ID, topic string) bool {
|
|
return true
|
|
}
|
|
|
|
// shouldPush filters a message before validating and pushing it
|
|
// It returns true if the message can be further validated and pushed
|
|
func (p *PubSub) shouldPush(msg *Message) bool {
|
|
src := msg.ReceivedFrom
|
|
// reject messages from blacklisted peers
|
|
if p.blacklist.Contains(src) {
|
|
log.Debugf("dropping message from blacklisted peer %s", src)
|
|
p.tracer.RejectMessage(msg, RejectBlacklstedPeer)
|
|
return false
|
|
}
|
|
|
|
// even if they are forwarded by good peers
|
|
if p.blacklist.Contains(msg.GetFrom()) {
|
|
log.Debugf("dropping message from blacklisted source %s", src)
|
|
p.tracer.RejectMessage(msg, RejectBlacklistedSource)
|
|
return false
|
|
}
|
|
|
|
err := p.checkSigningPolicy(msg)
|
|
if err != nil {
|
|
log.Debugf("dropping message from %s: %s", src, err)
|
|
return false
|
|
}
|
|
|
|
// reject messages claiming to be from ourselves but not locally published
|
|
self := p.host.ID()
|
|
if peer.ID(msg.GetFrom()) == self && src != self {
|
|
log.Debugf("dropping message claiming to be from self but forwarded from %s", src)
|
|
p.tracer.RejectMessage(msg, RejectSelfOrigin)
|
|
return false
|
|
}
|
|
|
|
// have we already seen and validated this message?
|
|
id := p.idGen.ID(msg)
|
|
if p.seenMessage(id) {
|
|
p.tracer.DuplicateMessage(msg)
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// pushMsg pushes a message performing validation as necessary
|
|
func (p *PubSub) pushMsg(msg *Message) {
|
|
src := msg.ReceivedFrom
|
|
id := p.idGen.ID(msg)
|
|
|
|
if !p.val.Push(src, msg) {
|
|
return
|
|
}
|
|
|
|
if p.markSeen(id) {
|
|
p.publishMessage(msg)
|
|
}
|
|
}
|
|
|
|
func (p *PubSub) checkSigningPolicy(msg *Message) error {
|
|
// reject unsigned messages when strict before we even process the id
|
|
if p.signPolicy.mustVerify() {
|
|
if p.signPolicy.mustSign() {
|
|
if msg.Signature == nil {
|
|
p.tracer.RejectMessage(msg, RejectMissingSignature)
|
|
return ValidationError{Reason: RejectMissingSignature}
|
|
}
|
|
// Actual signature verification happens in the validation pipeline,
|
|
// after checking if the message was already seen or not,
|
|
// to avoid unnecessary signature verification processing-cost.
|
|
} else {
|
|
if msg.Signature != nil {
|
|
p.tracer.RejectMessage(msg, RejectUnexpectedSignature)
|
|
return ValidationError{Reason: RejectUnexpectedSignature}
|
|
}
|
|
// If we are expecting signed messages, and not authoring messages,
|
|
// then do no accept seq numbers, from data, or key data.
|
|
// The default msgID function still relies on Seqno and From,
|
|
// but is not used if we are not authoring messages ourselves.
|
|
if p.signID == "" {
|
|
if msg.Seqno != nil || msg.From != nil || msg.Key != nil {
|
|
p.tracer.RejectMessage(msg, RejectUnexpectedAuthInfo)
|
|
return ValidationError{Reason: RejectUnexpectedAuthInfo}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *PubSub) publishMessage(msg *Message) {
|
|
p.tracer.DeliverMessage(msg)
|
|
p.notifySubs(msg)
|
|
if !msg.Local {
|
|
p.rt.Publish(msg)
|
|
}
|
|
}
|
|
|
|
type addTopicReq struct {
|
|
topic *Topic
|
|
resp chan *Topic
|
|
}
|
|
|
|
type rmTopicReq struct {
|
|
topic *Topic
|
|
resp chan error
|
|
}
|
|
|
|
type TopicOptions struct{}
|
|
|
|
type TopicOpt func(t *Topic) error
|
|
|
|
// WithTopicMessageIdFn sets custom MsgIdFunction for a Topic, enabling topics to have own msg id generation rules.
|
|
func WithTopicMessageIdFn(msgId MsgIdFunction) TopicOpt {
|
|
return func(t *Topic) error {
|
|
t.p.idGen.Set(t.topic, msgId)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if
|
|
// the Topic handle already exists.
|
|
func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error) {
|
|
t, ok, err := p.tryJoin(topic, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !ok {
|
|
return nil, fmt.Errorf("topic already exists")
|
|
}
|
|
|
|
return t, nil
|
|
}
|
|
|
|
// tryJoin is an internal function that tries to join a topic
|
|
// Returns the topic if it can be created or found
|
|
// Returns true if the topic was newly created, false otherwise
|
|
// Can be removed once pubsub.Publish() and pubsub.Subscribe() are removed
|
|
func (p *PubSub) tryJoin(topic string, opts ...TopicOpt) (*Topic, bool, error) {
|
|
if p.subFilter != nil && !p.subFilter.CanSubscribe(topic) {
|
|
return nil, false, fmt.Errorf("topic is not allowed by the subscription filter")
|
|
}
|
|
|
|
t := &Topic{
|
|
p: p,
|
|
topic: topic,
|
|
evtHandlers: make(map[*TopicEventHandler]struct{}),
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
err := opt(t)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
}
|
|
|
|
resp := make(chan *Topic, 1)
|
|
select {
|
|
case t.p.addTopic <- &addTopicReq{
|
|
topic: t,
|
|
resp: resp,
|
|
}:
|
|
case <-t.p.ctx.Done():
|
|
return nil, false, t.p.ctx.Err()
|
|
}
|
|
returnedTopic := <-resp
|
|
|
|
if returnedTopic != t {
|
|
return returnedTopic, false, nil
|
|
}
|
|
|
|
return t, true, nil
|
|
}
|
|
|
|
type addSubReq struct {
|
|
sub *Subscription
|
|
resp chan *Subscription
|
|
}
|
|
|
|
type SubOpt func(sub *Subscription) error
|
|
|
|
// Subscribe returns a new Subscription for the given topic.
|
|
// Note that subscription is not an instantaneous operation. It may take some time
|
|
// before the subscription is processed by the pubsub main loop and propagated to our peers.
|
|
//
|
|
// Deprecated: use pubsub.Join() and topic.Subscribe() instead
|
|
func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) {
|
|
// ignore whether the topic was newly created or not, since either way we have a valid topic to work with
|
|
topicHandle, _, err := p.tryJoin(topic)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return topicHandle.Subscribe(opts...)
|
|
}
|
|
|
|
// WithBufferSize is a Subscribe option to customize the size of the subscribe output buffer.
|
|
// The default length is 32 but it can be configured to avoid dropping messages if the consumer is not reading fast
|
|
// enough.
|
|
func WithBufferSize(size int) SubOpt {
|
|
return func(sub *Subscription) error {
|
|
sub.ch = make(chan *Message, size)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
type topicReq struct {
|
|
resp chan []string
|
|
}
|
|
|
|
// GetTopics returns the topics this node is subscribed to.
|
|
func (p *PubSub) GetTopics() []string {
|
|
out := make(chan []string, 1)
|
|
select {
|
|
case p.getTopics <- &topicReq{resp: out}:
|
|
case <-p.ctx.Done():
|
|
return nil
|
|
}
|
|
return <-out
|
|
}
|
|
|
|
// Publish publishes data to the given topic.
|
|
//
|
|
// Deprecated: use pubsub.Join() and topic.Publish() instead
|
|
func (p *PubSub) Publish(topic string, data []byte, opts ...PubOpt) error {
|
|
// ignore whether the topic was newly created or not, since either way we have a valid topic to work with
|
|
t, _, err := p.tryJoin(topic)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return t.Publish(context.TODO(), data, opts...)
|
|
}
|
|
|
|
func (p *PubSub) nextSeqno() []byte {
|
|
seqno := make([]byte, 8)
|
|
counter := atomic.AddUint64(&p.counter, 1)
|
|
binary.BigEndian.PutUint64(seqno, counter)
|
|
return seqno
|
|
}
|
|
|
|
type listPeerReq struct {
|
|
resp chan []peer.ID
|
|
topic string
|
|
}
|
|
|
|
// ListPeers returns a list of peers we are connected to in the given topic.
|
|
func (p *PubSub) ListPeers(topic string) []peer.ID {
|
|
out := make(chan []peer.ID)
|
|
select {
|
|
case p.getPeers <- &listPeerReq{
|
|
resp: out,
|
|
topic: topic,
|
|
}:
|
|
case <-p.ctx.Done():
|
|
return nil
|
|
}
|
|
return <-out
|
|
}
|
|
|
|
// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
|
|
func (p *PubSub) BlacklistPeer(pid peer.ID) {
|
|
select {
|
|
case p.blacklistPeer <- pid:
|
|
case <-p.ctx.Done():
|
|
}
|
|
}
|
|
|
|
// RegisterTopicValidator registers a validator for topic.
|
|
// By default validators are asynchronous, which means they will run in a separate goroutine.
|
|
// The number of active goroutines is controlled by global and per topic validator
|
|
// throttles; if it exceeds the throttle threshold, messages will be dropped.
|
|
func (p *PubSub) RegisterTopicValidator(topic string, val interface{}, opts ...ValidatorOpt) error {
|
|
addVal := &addValReq{
|
|
topic: topic,
|
|
validate: val,
|
|
resp: make(chan error, 1),
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
err := opt(addVal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
select {
|
|
case p.addVal <- addVal:
|
|
case <-p.ctx.Done():
|
|
return p.ctx.Err()
|
|
}
|
|
return <-addVal.resp
|
|
}
|
|
|
|
// UnregisterTopicValidator removes a validator from a topic.
|
|
// Returns an error if there was no validator registered with the topic.
|
|
func (p *PubSub) UnregisterTopicValidator(topic string) error {
|
|
rmVal := &rmValReq{
|
|
topic: topic,
|
|
resp: make(chan error, 1),
|
|
}
|
|
|
|
select {
|
|
case p.rmVal <- rmVal:
|
|
case <-p.ctx.Done():
|
|
return p.ctx.Err()
|
|
}
|
|
return <-rmVal.resp
|
|
}
|
|
|
|
type RelayCancelFunc func()
|
|
|
|
type addRelayReq struct {
|
|
topic string
|
|
resp chan RelayCancelFunc
|
|
}
|