2018-08-28 03:01:08 +00:00
|
|
|
package pubsub
|
2018-01-24 14:04:39 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/binary"
|
2019-11-15 17:14:10 +00:00
|
|
|
"errors"
|
2018-01-24 14:04:39 +00:00
|
|
|
"fmt"
|
2018-02-22 08:58:58 +00:00
|
|
|
"math/rand"
|
2019-04-25 18:46:40 +00:00
|
|
|
"sync"
|
2018-01-24 14:04:39 +00:00
|
|
|
"sync/atomic"
|
|
|
|
"time"
|
|
|
|
|
2018-08-28 03:01:08 +00:00
|
|
|
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
2018-01-24 14:04:39 +00:00
|
|
|
|
2019-05-26 16:19:03 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/crypto"
|
2019-10-31 19:56:09 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/discovery"
|
2019-05-26 16:19:03 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/host"
|
|
|
|
"github.com/libp2p/go-libp2p-core/network"
|
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
"github.com/libp2p/go-libp2p-core/protocol"
|
|
|
|
|
2018-10-04 23:23:53 +00:00
|
|
|
logging "github.com/ipfs/go-log"
|
|
|
|
timecache "github.com/whyrusleeping/timecache"
|
2018-01-24 14:04:39 +00:00
|
|
|
)
|
|
|
|
|
2020-02-26 02:07:16 +00:00
|
|
|
// DefaultMaximumMessageSize is 1mb.
|
|
|
|
const DefaultMaxMessageSize = 1 << 20
|
|
|
|
|
2019-01-15 13:39:13 +00:00
|
|
|
var (
|
|
|
|
TimeCacheDuration = 120 * time.Second
|
|
|
|
)
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
var log = logging.Logger("pubsub")
|
|
|
|
|
2019-01-04 11:09:21 +00:00
|
|
|
// PubSub is the implementation of the pubsub system.
|
2018-01-24 14:04:39 +00:00
|
|
|
type PubSub struct {
|
2018-05-29 16:59:04 +00:00
|
|
|
// atomic counter for seqnos
|
|
|
|
// NOTE: Must be declared at the top of the struct as we perform atomic
|
|
|
|
// operations on this field.
|
|
|
|
//
|
|
|
|
// See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
|
|
|
counter uint64
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
host host.Host
|
|
|
|
|
|
|
|
rt PubSubRouter
|
|
|
|
|
2019-04-29 19:45:48 +00:00
|
|
|
val *validation
|
|
|
|
|
2019-10-31 19:56:09 +00:00
|
|
|
disc *discover
|
|
|
|
|
2019-11-04 17:22:14 +00:00
|
|
|
tracer *pubsubTracer
|
|
|
|
|
2020-02-26 02:07:16 +00:00
|
|
|
// maxMessageSize is the maximum message size; it applies globally to all
|
|
|
|
// topics.
|
|
|
|
maxMessageSize int
|
|
|
|
|
2019-11-15 17:14:10 +00:00
|
|
|
// size of the outbound message channel that we maintain for each peer
|
|
|
|
peerOutboundQueueSize int
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
// incoming messages from other peers
|
|
|
|
incoming chan *RPC
|
|
|
|
|
|
|
|
// messages we are publishing out to our peers
|
|
|
|
publish chan *Message
|
|
|
|
|
|
|
|
// addSub is a control channel for us to add and remove subscriptions
|
|
|
|
addSub chan *addSubReq
|
|
|
|
|
2020-04-29 11:05:31 +00:00
|
|
|
// addRelay is a control channel for us to add and remove relays
|
|
|
|
addRelay chan *addRelayReq
|
|
|
|
|
2020-04-29 11:33:09 +00:00
|
|
|
// rmRelay is a relay cancellation channel
|
|
|
|
rmRelay chan string
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
// get list of topics we are subscribed to
|
|
|
|
getTopics chan *topicReq
|
|
|
|
|
|
|
|
// get chan of peers we are connected to
|
|
|
|
getPeers chan *listPeerReq
|
|
|
|
|
|
|
|
// send subscription here to cancel it
|
|
|
|
cancelCh chan *Subscription
|
|
|
|
|
2019-10-31 18:26:25 +00:00
|
|
|
// addSub is a channel for us to add a topic
|
|
|
|
addTopic chan *addTopicReq
|
|
|
|
|
|
|
|
// removeTopic is a topic cancellation channel
|
|
|
|
rmTopic chan *rmTopicReq
|
|
|
|
|
2018-12-13 14:58:32 +00:00
|
|
|
// a notification channel for new peer connections
|
|
|
|
newPeers chan peer.ID
|
|
|
|
|
|
|
|
// a notification channel for new outoging peer streams
|
2019-05-26 16:19:03 +00:00
|
|
|
newPeerStream chan network.Stream
|
2018-12-13 14:58:32 +00:00
|
|
|
|
|
|
|
// a notification channel for errors opening new peer streams
|
|
|
|
newPeerError chan peer.ID
|
2018-01-24 14:04:39 +00:00
|
|
|
|
|
|
|
// a notification channel for when our peers die
|
|
|
|
peerDead chan peer.ID
|
|
|
|
|
|
|
|
// The set of topics we are subscribed to
|
2019-10-31 18:26:25 +00:00
|
|
|
mySubs map[string]map[*Subscription]struct{}
|
|
|
|
|
2020-04-29 11:05:31 +00:00
|
|
|
// The set of topics we are relaying for
|
|
|
|
myRelays map[string]int
|
|
|
|
|
2019-10-31 18:26:25 +00:00
|
|
|
// The set of topics we are interested in
|
|
|
|
myTopics map[string]*Topic
|
2018-01-24 14:04:39 +00:00
|
|
|
|
|
|
|
// topics tracks which topics each of our peers are subscribed to
|
|
|
|
topics map[string]map[peer.ID]struct{}
|
|
|
|
|
|
|
|
// sendMsg handles messages that have been validated
|
2019-10-18 06:39:37 +00:00
|
|
|
sendMsg chan *Message
|
2018-01-24 14:04:39 +00:00
|
|
|
|
|
|
|
// addVal handles validator registration requests
|
|
|
|
addVal chan *addValReq
|
|
|
|
|
2018-02-28 20:14:54 +00:00
|
|
|
// rmVal handles validator unregistration requests
|
|
|
|
rmVal chan *rmValReq
|
|
|
|
|
2018-02-19 12:50:14 +00:00
|
|
|
// eval thunk in event loop
|
|
|
|
eval chan func()
|
|
|
|
|
2019-01-15 14:07:58 +00:00
|
|
|
// peer blacklist
|
2019-01-17 12:05:04 +00:00
|
|
|
blacklist Blacklist
|
2019-01-15 14:07:58 +00:00
|
|
|
blacklistPeer chan peer.ID
|
|
|
|
|
2019-04-25 18:46:40 +00:00
|
|
|
peers map[peer.ID]chan *RPC
|
|
|
|
|
|
|
|
seenMessagesMx sync.Mutex
|
|
|
|
seenMessages *timecache.TimeCache
|
2018-01-24 14:04:39 +00:00
|
|
|
|
2019-12-16 02:38:28 +00:00
|
|
|
// function used to compute the ID for a message
|
|
|
|
msgID MsgIdFunction
|
|
|
|
|
2018-08-26 10:26:44 +00:00
|
|
|
// key for signing messages; nil when signing is disabled (default for now)
|
|
|
|
signKey crypto.PrivKey
|
2018-09-06 07:31:01 +00:00
|
|
|
// source ID for signed messages; corresponds to signKey
|
|
|
|
signID peer.ID
|
2018-08-29 14:19:34 +00:00
|
|
|
// strict mode rejects all unsigned messages prior to validation
|
|
|
|
signStrict bool
|
2018-08-26 10:26:44 +00:00
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
ctx context.Context
|
|
|
|
}
|
|
|
|
|
2019-01-04 11:09:21 +00:00
|
|
|
// PubSubRouter is the message router component of PubSub.
|
2018-01-24 14:04:39 +00:00
|
|
|
type PubSubRouter interface {
|
2018-03-10 08:08:50 +00:00
|
|
|
// Protocols returns the list of protocols supported by the router.
|
2018-01-24 14:04:39 +00:00
|
|
|
Protocols() []protocol.ID
|
2018-03-10 08:08:50 +00:00
|
|
|
// Attach is invoked by the PubSub constructor to attach the router to a
|
|
|
|
// freshly initialized PubSub instance.
|
2018-01-24 14:04:39 +00:00
|
|
|
Attach(*PubSub)
|
2018-03-10 08:08:50 +00:00
|
|
|
// AddPeer notifies the router that a new peer has been connected.
|
2018-01-24 14:04:39 +00:00
|
|
|
AddPeer(peer.ID, protocol.ID)
|
2018-03-10 08:08:50 +00:00
|
|
|
// RemovePeer notifies the router that a peer has been disconnected.
|
2018-01-24 14:04:39 +00:00
|
|
|
RemovePeer(peer.ID)
|
2019-10-31 19:56:09 +00:00
|
|
|
// EnoughPeers returns whether the router needs more peers before it's ready to publish new records.
|
|
|
|
// Suggested (if greater than 0) is a suggested number of peers that the router should need.
|
|
|
|
EnoughPeers(topic string, suggested int) bool
|
2020-03-04 12:51:10 +00:00
|
|
|
// AcceptFrom is invoked on any incoming message before pushing it to the validation pipeline
|
|
|
|
// or processing control information.
|
2020-05-08 14:12:52 +00:00
|
|
|
// Allows routers with internal scoring to vet peers before committing any processing resources
|
|
|
|
// to the message and implement an effective graylist.
|
2020-03-04 12:51:10 +00:00
|
|
|
AcceptFrom(peer.ID) bool
|
2018-03-10 08:08:50 +00:00
|
|
|
// HandleRPC is invoked to process control messages in the RPC envelope.
|
|
|
|
// It is invoked after subscriptions and payload messages have been processed.
|
2018-01-27 07:52:35 +00:00
|
|
|
HandleRPC(*RPC)
|
2018-03-10 08:08:50 +00:00
|
|
|
// Publish is invoked to forward a new message that has been validated.
|
2020-03-04 12:39:44 +00:00
|
|
|
Publish(*Message)
|
2018-03-10 08:08:50 +00:00
|
|
|
// Join notifies the router that we want to receive and forward messages in a topic.
|
|
|
|
// It is invoked after the subscription announcement.
|
2018-02-03 09:30:19 +00:00
|
|
|
Join(topic string)
|
2018-03-10 08:08:50 +00:00
|
|
|
// Leave notifies the router that we are no longer interested in a topic.
|
|
|
|
// It is invoked after the unsubscription announcement.
|
2018-02-03 09:30:19 +00:00
|
|
|
Leave(topic string)
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type Message struct {
|
|
|
|
*pb.Message
|
2020-01-27 12:44:03 +00:00
|
|
|
ReceivedFrom peer.ID
|
2019-12-19 23:15:45 +00:00
|
|
|
ValidatorData interface{}
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Message) GetFrom() peer.ID {
|
|
|
|
return peer.ID(m.Message.GetFrom())
|
|
|
|
}
|
|
|
|
|
|
|
|
type RPC struct {
|
|
|
|
pb.RPC
|
|
|
|
|
|
|
|
// unexported on purpose, not sending this over the wire
|
|
|
|
from peer.ID
|
|
|
|
}
|
|
|
|
|
|
|
|
type Option func(*PubSub) error
|
|
|
|
|
2019-01-04 11:09:21 +00:00
|
|
|
// NewPubSub returns a new PubSub management object.
|
2018-01-24 14:04:39 +00:00
|
|
|
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) {
|
|
|
|
ps := &PubSub{
|
2019-11-15 17:14:10 +00:00
|
|
|
host: h,
|
|
|
|
ctx: ctx,
|
|
|
|
rt: rt,
|
|
|
|
val: newValidation(),
|
|
|
|
disc: &discover{},
|
2020-02-26 02:07:16 +00:00
|
|
|
maxMessageSize: DefaultMaxMessageSize,
|
2019-11-15 17:14:10 +00:00
|
|
|
peerOutboundQueueSize: 32,
|
|
|
|
signID: h.ID(),
|
|
|
|
signKey: h.Peerstore().PrivKey(h.ID()),
|
|
|
|
signStrict: true,
|
|
|
|
incoming: make(chan *RPC, 32),
|
|
|
|
publish: make(chan *Message),
|
|
|
|
newPeers: make(chan peer.ID),
|
|
|
|
newPeerStream: make(chan network.Stream),
|
|
|
|
newPeerError: make(chan peer.ID),
|
|
|
|
peerDead: make(chan peer.ID),
|
|
|
|
cancelCh: make(chan *Subscription),
|
|
|
|
getPeers: make(chan *listPeerReq),
|
|
|
|
addSub: make(chan *addSubReq),
|
2020-04-29 11:05:31 +00:00
|
|
|
addRelay: make(chan *addRelayReq),
|
2020-04-29 11:33:09 +00:00
|
|
|
rmRelay: make(chan string),
|
2019-11-15 17:14:10 +00:00
|
|
|
addTopic: make(chan *addTopicReq),
|
|
|
|
rmTopic: make(chan *rmTopicReq),
|
|
|
|
getTopics: make(chan *topicReq),
|
|
|
|
sendMsg: make(chan *Message, 32),
|
|
|
|
addVal: make(chan *addValReq),
|
|
|
|
rmVal: make(chan *rmValReq),
|
|
|
|
eval: make(chan func()),
|
|
|
|
myTopics: make(map[string]*Topic),
|
|
|
|
mySubs: make(map[string]map[*Subscription]struct{}),
|
2020-04-29 11:05:31 +00:00
|
|
|
myRelays: make(map[string]int),
|
2019-11-15 17:14:10 +00:00
|
|
|
topics: make(map[string]map[peer.ID]struct{}),
|
|
|
|
peers: make(map[peer.ID]chan *RPC),
|
|
|
|
blacklist: NewMapBlacklist(),
|
|
|
|
blacklistPeer: make(chan peer.ID),
|
|
|
|
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
|
2019-12-16 02:38:28 +00:00
|
|
|
msgID: DefaultMsgIdFn,
|
2019-11-15 17:14:10 +00:00
|
|
|
counter: uint64(time.Now().UnixNano()),
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
for _, opt := range opts {
|
|
|
|
err := opt(ps)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-25 23:52:22 +00:00
|
|
|
if ps.signStrict && ps.signKey == nil {
|
|
|
|
return nil, fmt.Errorf("strict signature verification enabled but message signing is disabled")
|
|
|
|
}
|
|
|
|
|
2019-10-31 19:56:09 +00:00
|
|
|
if err := ps.disc.Start(ps); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
rt.Attach(ps)
|
|
|
|
|
|
|
|
for _, id := range rt.Protocols() {
|
|
|
|
h.SetStreamHandler(id, ps.handleNewStream)
|
|
|
|
}
|
|
|
|
h.Network().Notify((*PubSubNotif)(ps))
|
|
|
|
|
2019-04-29 19:45:48 +00:00
|
|
|
ps.val.Start(ps)
|
2018-01-24 14:04:39 +00:00
|
|
|
|
2019-04-29 19:45:48 +00:00
|
|
|
go ps.processLoop(ctx)
|
2019-04-25 18:28:13 +00:00
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
return ps, nil
|
|
|
|
}
|
|
|
|
|
2019-12-16 02:38:28 +00:00
|
|
|
// MsgIdFunction returns a unique ID for the passed Message, and PubSub can be customized to use any
|
|
|
|
// implementation of this function by configuring it with the Option from WithMessageIdFn.
|
|
|
|
type MsgIdFunction func(pmsg *pb.Message) string
|
|
|
|
|
|
|
|
// WithMessageIdFn is an option to customize the way a message ID is computed for a pubsub message.
|
|
|
|
// The default ID function is DefaultMsgIdFn (concatenate source and seq nr.),
|
|
|
|
// but it can be customized to e.g. the hash of the message.
|
|
|
|
func WithMessageIdFn(fn MsgIdFunction) Option {
|
|
|
|
return func(p *PubSub) error {
|
|
|
|
p.msgID = fn
|
2019-12-16 11:46:30 +00:00
|
|
|
// the tracer Option may already be set. Update its message ID function to make options order-independent.
|
|
|
|
if p.tracer != nil {
|
|
|
|
p.tracer.msgID = fn
|
|
|
|
}
|
2019-12-16 02:38:28 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-15 17:14:10 +00:00
|
|
|
// WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer
|
|
|
|
// We start dropping messages to a peer if the outbound queue if full
|
|
|
|
func WithPeerOutboundQueueSize(size int) Option {
|
|
|
|
return func(p *PubSub) error {
|
2019-11-15 17:39:12 +00:00
|
|
|
if size <= 0 {
|
|
|
|
return errors.New("outbound queue size must always be positive")
|
2019-11-15 17:14:10 +00:00
|
|
|
}
|
|
|
|
p.peerOutboundQueueSize = size
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-25 13:37:11 +00:00
|
|
|
// WithMessageSigning enables or disables message signing (enabled by default).
|
|
|
|
func WithMessageSigning(enabled bool) Option {
|
2018-08-26 10:40:11 +00:00
|
|
|
return func(p *PubSub) error {
|
2018-10-25 13:37:11 +00:00
|
|
|
if enabled {
|
|
|
|
p.signKey = p.host.Peerstore().PrivKey(p.signID)
|
|
|
|
if p.signKey == nil {
|
|
|
|
return fmt.Errorf("can't sign for peer %s: no private key", p.signID)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
p.signKey = nil
|
2019-05-02 17:15:55 +00:00
|
|
|
p.signStrict = false
|
2018-10-25 13:37:11 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithMessageAuthor sets the author for outbound messages to the given peer ID
|
|
|
|
// (defaults to the host's ID). If message signing is enabled, the private key
|
|
|
|
// must be available in the host's peerstore.
|
|
|
|
func WithMessageAuthor(author peer.ID) Option {
|
|
|
|
return func(p *PubSub) error {
|
2020-04-23 12:33:19 +00:00
|
|
|
author := author
|
2018-10-25 13:37:11 +00:00
|
|
|
if author == "" {
|
|
|
|
author = p.host.ID()
|
|
|
|
}
|
|
|
|
if p.signKey != nil {
|
|
|
|
newSignKey := p.host.Peerstore().PrivKey(author)
|
|
|
|
if newSignKey == nil {
|
2020-04-23 12:33:19 +00:00
|
|
|
return fmt.Errorf("can't sign for peer %s: no private key", author)
|
2018-10-25 13:37:11 +00:00
|
|
|
}
|
|
|
|
p.signKey = newSignKey
|
|
|
|
}
|
|
|
|
p.signID = author
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-05-02 17:15:55 +00:00
|
|
|
// WithStrictSignatureVerification is an option to enable or disable strict message signing.
|
|
|
|
// When enabled (which is the default), unsigned messages will be discarded.
|
2018-10-25 13:37:11 +00:00
|
|
|
func WithStrictSignatureVerification(required bool) Option {
|
|
|
|
return func(p *PubSub) error {
|
|
|
|
p.signStrict = required
|
2018-08-26 10:40:11 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-01-17 12:05:04 +00:00
|
|
|
// WithBlacklist provides an implementation of the blacklist; the default is a
|
|
|
|
// MapBlacklist
|
|
|
|
func WithBlacklist(b Blacklist) Option {
|
|
|
|
return func(p *PubSub) error {
|
|
|
|
p.blacklist = b
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-31 19:56:09 +00:00
|
|
|
// WithDiscovery provides a discovery mechanism used to bootstrap and provide peers into PubSub
|
|
|
|
func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option {
|
|
|
|
return func(p *PubSub) error {
|
|
|
|
discoverOpts := defaultDiscoverOptions()
|
|
|
|
for _, opt := range opts {
|
|
|
|
err := opt(discoverOpts)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
p.disc.discovery = &pubSubDiscovery{Discovery: d, opts: discoverOpts.opts}
|
|
|
|
p.disc.options = discoverOpts
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-04 17:22:14 +00:00
|
|
|
// WithEventTracer provides a tracer for the pubsub system
|
|
|
|
func WithEventTracer(tracer EventTracer) Option {
|
|
|
|
return func(p *PubSub) error {
|
2020-03-11 00:44:57 +00:00
|
|
|
if p.tracer != nil {
|
2020-03-01 12:29:24 +00:00
|
|
|
p.tracer.tracer = tracer
|
|
|
|
} else {
|
|
|
|
p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID(), msgID: p.msgID}
|
|
|
|
}
|
2019-11-04 17:22:14 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-05-19 14:50:45 +00:00
|
|
|
// withInternalTracer adds an internal event tracer to the pubsub system
|
|
|
|
func withInternalTracer(tracer internalTracer) Option {
|
|
|
|
return func(p *PubSub) error {
|
|
|
|
if p.tracer != nil {
|
|
|
|
p.tracer.internal = append(p.tracer.internal, tracer)
|
|
|
|
} else {
|
|
|
|
p.tracer = &pubsubTracer{internal: []internalTracer{tracer}, pid: p.host.ID(), msgID: p.msgID}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-02-26 02:07:16 +00:00
|
|
|
// WithMaxMessageSize sets the global maximum message size for pubsub wire
|
|
|
|
// messages. The default value is 1MiB (DefaultMaxMessageSize).
|
|
|
|
//
|
|
|
|
// Observe the following warnings when setting this option.
|
|
|
|
//
|
|
|
|
// WARNING #1: Make sure to change the default protocol prefixes for floodsub
|
|
|
|
// (FloodSubID) and gossipsub (GossipSubID). This avoids accidentally joining
|
|
|
|
// the public default network, which uses the default max message size, and
|
|
|
|
// therefore will cause messages to be dropped.
|
|
|
|
//
|
|
|
|
// WARNING #2: Reducing the default max message limit is fine, if you are
|
|
|
|
// certain that your application messages will not exceed the new limit.
|
|
|
|
// However, be wary of increasing the limit, as pubsub networks are naturally
|
|
|
|
// write-amplifying, i.e. for every message we receive, we send D copies of the
|
|
|
|
// message to our peers. If those messages are large, the bandwidth requirements
|
|
|
|
// will grow linearly. Note that propagation is sent on the uplink, which
|
|
|
|
// traditionally is more constrained than the downlink. Instead, consider
|
|
|
|
// out-of-band retrieval for large messages, by sending a CID (Content-ID) or
|
|
|
|
// another type of locator, such that messages can be fetched on-demand, rather
|
|
|
|
// than being pushed proactively. Under this design, you'd use the pubsub layer
|
|
|
|
// as a signalling system, rather than a data delivery system.
|
|
|
|
func WithMaxMessageSize(maxMessageSize int) Option {
|
|
|
|
return func(ps *PubSub) error {
|
|
|
|
ps.maxMessageSize = maxMessageSize
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
// processLoop handles all inputs arriving on the channels
|
|
|
|
func (p *PubSub) processLoop(ctx context.Context) {
|
|
|
|
defer func() {
|
|
|
|
// Clean up go routines.
|
|
|
|
for _, ch := range p.peers {
|
|
|
|
close(ch)
|
|
|
|
}
|
|
|
|
p.peers = nil
|
|
|
|
p.topics = nil
|
|
|
|
}()
|
2018-12-13 14:58:32 +00:00
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
for {
|
|
|
|
select {
|
2018-12-13 14:58:32 +00:00
|
|
|
case pid := <-p.newPeers:
|
2019-01-20 22:07:44 +00:00
|
|
|
if _, ok := p.peers[pid]; ok {
|
2020-05-05 17:38:53 +00:00
|
|
|
log.Warn("already have connection to peer: ", pid)
|
2018-12-13 14:58:32 +00:00
|
|
|
continue
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
2019-01-17 12:05:04 +00:00
|
|
|
if p.blacklist.Contains(pid) {
|
2020-05-05 17:38:53 +00:00
|
|
|
log.Warn("ignoring connection from blacklisted peer: ", pid)
|
2019-01-15 16:31:21 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2019-11-15 17:14:10 +00:00
|
|
|
messages := make(chan *RPC, p.peerOutboundQueueSize)
|
2018-12-13 21:56:04 +00:00
|
|
|
messages <- p.getHelloPacket()
|
2018-12-13 14:58:32 +00:00
|
|
|
go p.handleNewPeer(ctx, pid, messages)
|
2018-01-24 14:04:39 +00:00
|
|
|
p.peers[pid] = messages
|
|
|
|
|
2018-12-13 14:58:32 +00:00
|
|
|
case s := <-p.newPeerStream:
|
|
|
|
pid := s.Conn().RemotePeer()
|
|
|
|
|
2019-01-15 16:31:21 +00:00
|
|
|
ch, ok := p.peers[pid]
|
2018-12-13 14:58:32 +00:00
|
|
|
if !ok {
|
2020-05-05 17:38:53 +00:00
|
|
|
log.Warn("new stream for unknown peer: ", pid)
|
2018-12-13 14:58:32 +00:00
|
|
|
s.Reset()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2019-01-17 12:05:04 +00:00
|
|
|
if p.blacklist.Contains(pid) {
|
2020-05-05 17:38:53 +00:00
|
|
|
log.Warn("closing stream for blacklisted peer: ", pid)
|
2019-01-15 16:31:21 +00:00
|
|
|
close(ch)
|
|
|
|
s.Reset()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
p.rt.AddPeer(pid, s.Protocol())
|
|
|
|
|
2018-12-13 14:58:32 +00:00
|
|
|
case pid := <-p.newPeerError:
|
|
|
|
delete(p.peers, pid)
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
case pid := <-p.peerDead:
|
|
|
|
ch, ok := p.peers[pid]
|
2018-12-13 14:58:32 +00:00
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2018-12-13 21:45:30 +00:00
|
|
|
close(ch)
|
|
|
|
|
2019-05-26 16:19:03 +00:00
|
|
|
if p.host.Network().Connectedness(pid) == network.Connected {
|
2018-12-13 14:58:32 +00:00
|
|
|
// still connected, must be a duplicate connection being closed.
|
2018-12-13 20:49:35 +00:00
|
|
|
// we respawn the writer as we need to ensure there is a stream active
|
2020-05-05 17:38:53 +00:00
|
|
|
log.Warn("peer declared dead but still connected; respawning writer: ", pid)
|
2019-11-15 17:14:10 +00:00
|
|
|
messages := make(chan *RPC, p.peerOutboundQueueSize)
|
2018-12-13 21:56:04 +00:00
|
|
|
messages <- p.getHelloPacket()
|
2018-12-13 21:45:30 +00:00
|
|
|
go p.handleNewPeer(ctx, pid, messages)
|
|
|
|
p.peers[pid] = messages
|
2018-12-13 14:58:32 +00:00
|
|
|
continue
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
delete(p.peers, pid)
|
2019-06-11 21:49:28 +00:00
|
|
|
for t, tmap := range p.topics {
|
2019-06-12 14:06:16 +00:00
|
|
|
if _, ok := tmap[pid]; ok {
|
|
|
|
delete(tmap, pid)
|
|
|
|
p.notifyLeave(t, pid)
|
|
|
|
}
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
p.rt.RemovePeer(pid)
|
|
|
|
|
|
|
|
case treq := <-p.getTopics:
|
|
|
|
var out []string
|
2019-10-31 18:26:25 +00:00
|
|
|
for t := range p.mySubs {
|
2018-01-24 14:04:39 +00:00
|
|
|
out = append(out, t)
|
|
|
|
}
|
|
|
|
treq.resp <- out
|
2019-10-31 18:26:25 +00:00
|
|
|
case topic := <-p.addTopic:
|
|
|
|
p.handleAddTopic(topic)
|
|
|
|
case topic := <-p.rmTopic:
|
|
|
|
p.handleRemoveTopic(topic)
|
2018-01-24 14:04:39 +00:00
|
|
|
case sub := <-p.cancelCh:
|
|
|
|
p.handleRemoveSubscription(sub)
|
|
|
|
case sub := <-p.addSub:
|
|
|
|
p.handleAddSubscription(sub)
|
2020-04-29 11:05:31 +00:00
|
|
|
case relay := <-p.addRelay:
|
|
|
|
p.handleAddRelay(relay)
|
2020-04-29 11:33:09 +00:00
|
|
|
case topic := <-p.rmRelay:
|
|
|
|
p.handleRemoveRelay(topic)
|
2018-01-24 14:04:39 +00:00
|
|
|
case preq := <-p.getPeers:
|
|
|
|
tmap, ok := p.topics[preq.topic]
|
|
|
|
if preq.topic != "" && !ok {
|
|
|
|
preq.resp <- nil
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
var peers []peer.ID
|
|
|
|
for p := range p.peers {
|
|
|
|
if preq.topic != "" {
|
|
|
|
_, ok := tmap[p]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
peers = append(peers, p)
|
|
|
|
}
|
|
|
|
preq.resp <- peers
|
|
|
|
case rpc := <-p.incoming:
|
2018-01-27 07:52:35 +00:00
|
|
|
p.handleIncomingRPC(rpc)
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
case msg := <-p.publish:
|
2019-11-04 18:04:55 +00:00
|
|
|
p.tracer.PublishMessage(msg)
|
2019-10-18 06:39:37 +00:00
|
|
|
p.pushMsg(msg)
|
2018-01-24 14:04:39 +00:00
|
|
|
|
2019-10-18 06:39:37 +00:00
|
|
|
case msg := <-p.sendMsg:
|
|
|
|
p.publishMessage(msg)
|
2018-01-24 14:04:39 +00:00
|
|
|
|
|
|
|
case req := <-p.addVal:
|
2019-04-29 19:45:48 +00:00
|
|
|
p.val.AddValidator(req)
|
2018-01-24 14:04:39 +00:00
|
|
|
|
2018-02-28 20:14:54 +00:00
|
|
|
case req := <-p.rmVal:
|
2019-04-29 19:45:48 +00:00
|
|
|
p.val.RemoveValidator(req)
|
2018-02-28 20:14:54 +00:00
|
|
|
|
2018-02-19 12:50:14 +00:00
|
|
|
case thunk := <-p.eval:
|
|
|
|
thunk()
|
|
|
|
|
2019-01-15 14:07:58 +00:00
|
|
|
case pid := <-p.blacklistPeer:
|
|
|
|
log.Infof("Blacklisting peer %s", pid)
|
2019-01-17 12:05:04 +00:00
|
|
|
p.blacklist.Add(pid)
|
2019-01-15 14:07:58 +00:00
|
|
|
|
2019-01-15 16:31:21 +00:00
|
|
|
ch, ok := p.peers[pid]
|
|
|
|
if ok {
|
|
|
|
close(ch)
|
|
|
|
delete(p.peers, pid)
|
2019-06-11 21:49:28 +00:00
|
|
|
for t, tmap := range p.topics {
|
2019-06-12 14:06:16 +00:00
|
|
|
if _, ok := tmap[pid]; ok {
|
|
|
|
delete(tmap, pid)
|
|
|
|
p.notifyLeave(t, pid)
|
|
|
|
}
|
2019-01-15 16:31:21 +00:00
|
|
|
}
|
|
|
|
p.rt.RemovePeer(pid)
|
|
|
|
}
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
case <-ctx.Done():
|
|
|
|
log.Info("pubsub processloop shutting down")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-31 18:26:25 +00:00
|
|
|
// handleAddTopic adds a tracker for a particular topic.
|
|
|
|
// Only called from processLoop.
|
|
|
|
func (p *PubSub) handleAddTopic(req *addTopicReq) {
|
|
|
|
topic := req.topic
|
|
|
|
topicID := topic.topic
|
|
|
|
|
|
|
|
t, ok := p.myTopics[topicID]
|
|
|
|
if ok {
|
|
|
|
req.resp <- t
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
p.myTopics[topicID] = topic
|
|
|
|
req.resp <- topic
|
|
|
|
}
|
|
|
|
|
|
|
|
// handleRemoveTopic removes Topic tracker from bookkeeping.
|
|
|
|
// Only called from processLoop.
|
|
|
|
func (p *PubSub) handleRemoveTopic(req *rmTopicReq) {
|
|
|
|
topic := p.myTopics[req.topic.topic]
|
|
|
|
|
|
|
|
if topic == nil {
|
|
|
|
req.resp <- nil
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-04-29 12:41:21 +00:00
|
|
|
if len(topic.evtHandlers) == 0 &&
|
|
|
|
len(p.mySubs[req.topic.topic]) == 0 &&
|
|
|
|
p.myRelays[req.topic.topic] == 0 {
|
2019-10-31 18:26:25 +00:00
|
|
|
delete(p.myTopics, topic.topic)
|
|
|
|
req.resp <- nil
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
req.resp <- fmt.Errorf("cannot close topic: outstanding event handlers or subscriptions")
|
|
|
|
}
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
// handleRemoveSubscription removes Subscription sub from bookeeping.
|
2020-04-29 13:06:23 +00:00
|
|
|
// If this was the last subscription and no more relays exist for a given topic,
|
|
|
|
// it will also announce that this node is not subscribing to this topic anymore.
|
2018-01-24 14:04:39 +00:00
|
|
|
// Only called from processLoop.
|
|
|
|
func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
|
2019-10-31 18:26:25 +00:00
|
|
|
subs := p.mySubs[sub.topic]
|
2018-01-24 14:04:39 +00:00
|
|
|
|
|
|
|
if subs == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()")
|
2019-06-21 06:46:41 +00:00
|
|
|
sub.close()
|
2018-01-24 14:04:39 +00:00
|
|
|
delete(subs, sub)
|
|
|
|
|
|
|
|
if len(subs) == 0 {
|
2019-10-31 18:26:25 +00:00
|
|
|
delete(p.mySubs, sub.topic)
|
2020-04-29 12:41:21 +00:00
|
|
|
|
2020-05-01 08:04:51 +00:00
|
|
|
// stop announcing only if there are no more subs and relays
|
|
|
|
if p.myRelays[sub.topic] == 0 {
|
|
|
|
p.disc.StopAdvertise(sub.topic)
|
|
|
|
p.announce(sub.topic, false)
|
|
|
|
p.rt.Leave(sub.topic)
|
|
|
|
}
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// handleAddSubscription adds a Subscription for a particular topic. If it is
|
2020-04-29 13:06:23 +00:00
|
|
|
// the first subscription and no relays exist so far for the topic, it will
|
|
|
|
// announce that this node subscribes to the topic.
|
2018-01-24 14:04:39 +00:00
|
|
|
// Only called from processLoop.
|
|
|
|
func (p *PubSub) handleAddSubscription(req *addSubReq) {
|
|
|
|
sub := req.sub
|
2019-10-31 18:26:25 +00:00
|
|
|
subs := p.mySubs[sub.topic]
|
2018-01-24 14:04:39 +00:00
|
|
|
|
2020-04-29 12:41:21 +00:00
|
|
|
// announce we want this topic if neither subs nor relays exist so far
|
|
|
|
if len(subs) == 0 && p.myRelays[sub.topic] == 0 {
|
2019-10-31 19:56:09 +00:00
|
|
|
p.disc.Advertise(sub.topic)
|
2018-01-24 14:04:39 +00:00
|
|
|
p.announce(sub.topic, true)
|
2018-02-03 09:30:19 +00:00
|
|
|
p.rt.Join(sub.topic)
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// make new if not there
|
|
|
|
if subs == nil {
|
2019-10-31 18:26:25 +00:00
|
|
|
p.mySubs[sub.topic] = make(map[*Subscription]struct{})
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
2019-08-01 20:57:05 +00:00
|
|
|
sub.cancelCh = p.cancelCh
|
2019-06-11 21:49:28 +00:00
|
|
|
|
2019-10-31 18:26:25 +00:00
|
|
|
p.mySubs[sub.topic][sub] = struct{}{}
|
2018-01-24 14:04:39 +00:00
|
|
|
|
|
|
|
req.resp <- sub
|
|
|
|
}
|
|
|
|
|
2020-04-29 11:05:31 +00:00
|
|
|
// handleAddRelay adds a relay for a particular topic. If it is
|
2020-04-29 13:06:23 +00:00
|
|
|
// the first relay and no subscriptions exist so far for the topic , it will
|
|
|
|
// announce that this node relays for the topic.
|
2020-04-29 11:05:31 +00:00
|
|
|
// Only called from processLoop.
|
|
|
|
func (p *PubSub) handleAddRelay(req *addRelayReq) {
|
2020-04-29 12:41:21 +00:00
|
|
|
topic := req.topic
|
2020-04-29 11:05:31 +00:00
|
|
|
|
2020-05-01 08:38:06 +00:00
|
|
|
p.myRelays[topic]++
|
|
|
|
|
2020-04-29 12:41:21 +00:00
|
|
|
// announce we want this topic if neither relays nor subs exist so far
|
2020-05-01 08:38:06 +00:00
|
|
|
if p.myRelays[topic] == 1 && len(p.mySubs[topic]) == 0 {
|
2020-04-29 12:41:21 +00:00
|
|
|
p.disc.Advertise(topic)
|
|
|
|
p.announce(topic, true)
|
|
|
|
p.rt.Join(topic)
|
2020-04-29 11:05:31 +00:00
|
|
|
}
|
|
|
|
|
2020-05-01 08:51:23 +00:00
|
|
|
// flag used to prevent calling cancel function multiple times
|
|
|
|
isCancelled := false
|
|
|
|
|
|
|
|
relayCancelFunc := func() {
|
|
|
|
if isCancelled {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-04-29 11:33:09 +00:00
|
|
|
select {
|
2020-04-29 12:41:21 +00:00
|
|
|
case p.rmRelay <- topic:
|
2020-05-01 08:51:23 +00:00
|
|
|
isCancelled = true
|
2020-04-29 11:33:09 +00:00
|
|
|
case <-p.ctx.Done():
|
|
|
|
}
|
|
|
|
}
|
2020-05-01 08:51:23 +00:00
|
|
|
|
|
|
|
req.resp <- relayCancelFunc
|
2020-04-29 11:33:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// handleRemoveRelay removes one relay reference from bookkeeping.
|
2020-04-29 13:06:23 +00:00
|
|
|
// If this was the last relay reference and no more subscriptions exist
|
|
|
|
// for a given topic, it will also announce that this node is not relaying
|
|
|
|
// for this topic anymore.
|
2020-04-29 11:33:09 +00:00
|
|
|
// Only called from processLoop.
|
|
|
|
func (p *PubSub) handleRemoveRelay(topic string) {
|
|
|
|
if p.myRelays[topic] == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
p.myRelays[topic]--
|
|
|
|
|
|
|
|
if p.myRelays[topic] == 0 {
|
|
|
|
delete(p.myRelays, topic)
|
2020-04-29 12:41:21 +00:00
|
|
|
|
2020-05-01 08:04:51 +00:00
|
|
|
// stop announcing only if there are no more relays and subs
|
|
|
|
if len(p.mySubs[topic]) == 0 {
|
|
|
|
p.disc.StopAdvertise(topic)
|
|
|
|
p.announce(topic, false)
|
|
|
|
p.rt.Leave(topic)
|
|
|
|
}
|
2020-04-29 11:33:09 +00:00
|
|
|
}
|
2020-04-29 11:05:31 +00:00
|
|
|
}
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
// announce announces whether or not this node is interested in a given topic
|
|
|
|
// Only called from processLoop.
|
|
|
|
func (p *PubSub) announce(topic string, sub bool) {
|
|
|
|
subopt := &pb.RPC_SubOpts{
|
|
|
|
Topicid: &topic,
|
|
|
|
Subscribe: &sub,
|
|
|
|
}
|
|
|
|
|
|
|
|
out := rpcWithSubs(subopt)
|
|
|
|
for pid, peer := range p.peers {
|
|
|
|
select {
|
|
|
|
case peer <- out:
|
2019-11-11 16:32:12 +00:00
|
|
|
p.tracer.SendRPC(out, pid)
|
2018-01-24 14:04:39 +00:00
|
|
|
default:
|
2018-02-22 08:58:58 +00:00
|
|
|
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
|
2019-11-11 16:32:12 +00:00
|
|
|
p.tracer.DropRPC(out, pid)
|
2018-10-29 09:10:31 +00:00
|
|
|
go p.announceRetry(pid, topic, sub)
|
2018-02-22 08:58:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-29 09:10:31 +00:00
|
|
|
func (p *PubSub) announceRetry(pid peer.ID, topic string, sub bool) {
|
2018-02-22 08:58:58 +00:00
|
|
|
time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond)
|
2018-03-06 08:02:19 +00:00
|
|
|
|
|
|
|
retry := func() {
|
2020-04-29 12:41:21 +00:00
|
|
|
_, okSubs := p.mySubs[topic]
|
|
|
|
_, okRelays := p.myRelays[topic]
|
|
|
|
|
|
|
|
ok := okSubs || okRelays
|
|
|
|
|
2018-02-22 08:58:58 +00:00
|
|
|
if (ok && sub) || (!ok && !sub) {
|
2018-10-29 09:10:31 +00:00
|
|
|
p.doAnnounceRetry(pid, topic, sub)
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
}
|
2018-03-06 08:02:19 +00:00
|
|
|
|
|
|
|
select {
|
|
|
|
case p.eval <- retry:
|
|
|
|
case <-p.ctx.Done():
|
|
|
|
}
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
2018-10-29 09:10:31 +00:00
|
|
|
func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
|
|
|
|
peer, ok := p.peers[pid]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
subopt := &pb.RPC_SubOpts{
|
2018-10-29 10:38:03 +00:00
|
|
|
Topicid: &topic,
|
|
|
|
Subscribe: &sub,
|
2018-10-29 09:10:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
out := rpcWithSubs(subopt)
|
|
|
|
select {
|
|
|
|
case peer <- out:
|
2019-11-11 16:32:12 +00:00
|
|
|
p.tracer.SendRPC(out, pid)
|
2018-10-29 09:10:31 +00:00
|
|
|
default:
|
|
|
|
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
|
2019-11-11 16:32:12 +00:00
|
|
|
p.tracer.DropRPC(out, pid)
|
2018-10-29 09:10:31 +00:00
|
|
|
go p.announceRetry(pid, topic, sub)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
// notifySubs sends a given message to all corresponding subscribers.
|
|
|
|
// Only called from processLoop.
|
2019-10-18 06:39:37 +00:00
|
|
|
func (p *PubSub) notifySubs(msg *Message) {
|
2018-01-24 14:04:39 +00:00
|
|
|
for _, topic := range msg.GetTopicIDs() {
|
2019-10-31 18:26:25 +00:00
|
|
|
subs := p.mySubs[topic]
|
2018-01-24 14:04:39 +00:00
|
|
|
for f := range subs {
|
2018-03-06 09:50:16 +00:00
|
|
|
select {
|
2019-10-18 06:39:37 +00:00
|
|
|
case f.ch <- msg:
|
2018-03-06 09:50:16 +00:00
|
|
|
default:
|
2018-03-06 10:11:06 +00:00
|
|
|
log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic)
|
2018-03-06 09:50:16 +00:00
|
|
|
}
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// seenMessage returns whether we already saw this message before
|
|
|
|
func (p *PubSub) seenMessage(id string) bool {
|
2019-04-25 18:46:40 +00:00
|
|
|
p.seenMessagesMx.Lock()
|
|
|
|
defer p.seenMessagesMx.Unlock()
|
2018-01-24 14:04:39 +00:00
|
|
|
return p.seenMessages.Has(id)
|
|
|
|
}
|
|
|
|
|
|
|
|
// markSeen marks a message as seen such that seenMessage returns `true' for the given id
|
2019-04-25 18:46:40 +00:00
|
|
|
// returns true if the message was freshly marked
|
|
|
|
func (p *PubSub) markSeen(id string) bool {
|
|
|
|
p.seenMessagesMx.Lock()
|
|
|
|
defer p.seenMessagesMx.Unlock()
|
|
|
|
if p.seenMessages.Has(id) {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
p.seenMessages.Add(id)
|
2019-04-25 18:46:40 +00:00
|
|
|
return true
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// subscribedToMessage returns whether we are subscribed to one of the topics
|
|
|
|
// of a given message
|
|
|
|
func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
|
2019-10-31 18:26:25 +00:00
|
|
|
if len(p.mySubs) == 0 {
|
2018-01-24 14:04:39 +00:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, t := range msg.GetTopicIDs() {
|
2019-10-31 18:26:25 +00:00
|
|
|
if _, ok := p.mySubs[t]; ok {
|
2018-01-24 14:04:39 +00:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2020-04-29 13:06:23 +00:00
|
|
|
// canRelayMsg returns whether we are able to relay for one of the topics
|
|
|
|
// of a given message
|
|
|
|
func (p *PubSub) canRelayMsg(msg *pb.Message) bool {
|
|
|
|
if len(p.myRelays) == 0 {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, t := range msg.GetTopicIDs() {
|
|
|
|
if relays := p.myRelays[t]; relays != 0 {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2019-06-12 14:06:16 +00:00
|
|
|
func (p *PubSub) notifyLeave(topic string, pid peer.ID) {
|
2019-10-31 18:26:25 +00:00
|
|
|
if t, ok := p.myTopics[topic]; ok {
|
|
|
|
t.sendNotification(PeerEvent{PeerLeave, pid})
|
2019-06-11 21:49:28 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-01-27 07:52:35 +00:00
|
|
|
func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
2019-11-04 17:22:14 +00:00
|
|
|
p.tracer.RecvRPC(rpc)
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
for _, subopt := range rpc.GetSubscriptions() {
|
|
|
|
t := subopt.GetTopicid()
|
|
|
|
if subopt.GetSubscribe() {
|
|
|
|
tmap, ok := p.topics[t]
|
|
|
|
if !ok {
|
|
|
|
tmap = make(map[peer.ID]struct{})
|
|
|
|
p.topics[t] = tmap
|
|
|
|
}
|
|
|
|
|
2019-06-07 12:31:14 +00:00
|
|
|
if _, ok = tmap[rpc.from]; !ok {
|
|
|
|
tmap[rpc.from] = struct{}{}
|
2019-10-31 18:26:25 +00:00
|
|
|
if topic, ok := p.myTopics[t]; ok {
|
2019-06-21 06:46:41 +00:00
|
|
|
peer := rpc.from
|
2019-10-31 18:26:25 +00:00
|
|
|
topic.sendNotification(PeerEvent{PeerJoin, peer})
|
2019-06-07 12:31:14 +00:00
|
|
|
}
|
|
|
|
}
|
2018-01-24 14:04:39 +00:00
|
|
|
} else {
|
|
|
|
tmap, ok := p.topics[t]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
2019-06-12 14:06:16 +00:00
|
|
|
|
|
|
|
if _, ok := tmap[rpc.from]; ok {
|
|
|
|
delete(tmap, rpc.from)
|
|
|
|
p.notifyLeave(t, rpc.from)
|
|
|
|
}
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-04 12:51:10 +00:00
|
|
|
// ask the router to vet the peer before commiting any processing resources
|
|
|
|
if !p.rt.AcceptFrom(rpc.from) {
|
2020-05-05 17:42:58 +00:00
|
|
|
log.Infof("received message from router graylisted peer %s. Dropping RPC", rpc.from)
|
2020-03-04 12:51:10 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
for _, pmsg := range rpc.GetPublish() {
|
2020-04-29 13:06:23 +00:00
|
|
|
if !(p.subscribedToMsg(pmsg) || p.canRelayMsg(pmsg)) {
|
2020-05-05 17:38:53 +00:00
|
|
|
log.Warn("received message we didn't subscribe to. Dropping.")
|
2018-01-24 14:04:39 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2019-11-15 18:16:58 +00:00
|
|
|
msg := &Message{pmsg, rpc.from, nil}
|
2019-10-18 06:39:37 +00:00
|
|
|
p.pushMsg(msg)
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
2018-01-27 07:52:35 +00:00
|
|
|
p.rt.HandleRPC(rpc)
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
2019-12-16 02:38:28 +00:00
|
|
|
// DefaultMsgIdFn returns a unique ID of the passed Message
|
|
|
|
func DefaultMsgIdFn(pmsg *pb.Message) string {
|
2018-01-24 14:04:39 +00:00
|
|
|
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
|
|
|
|
}
|
|
|
|
|
|
|
|
// pushMsg pushes a message performing validation as necessary
|
2019-10-18 06:39:37 +00:00
|
|
|
func (p *PubSub) pushMsg(msg *Message) {
|
|
|
|
src := msg.ReceivedFrom
|
2019-01-15 14:07:58 +00:00
|
|
|
// reject messages from blacklisted peers
|
2019-01-17 12:05:04 +00:00
|
|
|
if p.blacklist.Contains(src) {
|
2020-05-05 17:38:53 +00:00
|
|
|
log.Warnf("dropping message from blacklisted peer %s", src)
|
2020-03-27 18:31:31 +00:00
|
|
|
p.tracer.RejectMessage(msg, rejectBlacklstedPeer)
|
2019-01-15 14:07:58 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2019-01-16 10:26:26 +00:00
|
|
|
// even if they are forwarded by good peers
|
2019-01-17 12:05:04 +00:00
|
|
|
if p.blacklist.Contains(msg.GetFrom()) {
|
2020-05-05 17:38:53 +00:00
|
|
|
log.Warnf("dropping message from blacklisted source %s", src)
|
2020-03-27 18:31:31 +00:00
|
|
|
p.tracer.RejectMessage(msg, rejectBlacklistedSource)
|
2019-01-16 10:26:26 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-08-29 14:19:34 +00:00
|
|
|
// reject unsigned messages when strict before we even process the id
|
|
|
|
if p.signStrict && msg.Signature == nil {
|
|
|
|
log.Debugf("dropping unsigned message from %s", src)
|
2020-03-27 18:31:31 +00:00
|
|
|
p.tracer.RejectMessage(msg, rejectMissingSignature)
|
2018-08-29 14:19:34 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-03-28 10:22:57 +00:00
|
|
|
// reject messages claiming to be from ourselves but not locally published
|
|
|
|
self := p.host.ID()
|
|
|
|
if peer.ID(msg.GetFrom()) == self && src != self {
|
|
|
|
log.Debugf("dropping message claiming to be from self but forwarded from %s", src)
|
|
|
|
p.tracer.RejectMessage(msg, rejectSelfOrigin)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2019-01-21 22:47:12 +00:00
|
|
|
// have we already seen and validated this message?
|
2019-12-16 02:38:28 +00:00
|
|
|
id := p.msgID(msg.Message)
|
2019-01-21 22:47:12 +00:00
|
|
|
if p.seenMessage(id) {
|
2019-11-04 17:22:14 +00:00
|
|
|
p.tracer.DuplicateMessage(msg)
|
2019-01-21 22:47:12 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2019-04-29 19:45:48 +00:00
|
|
|
if !p.val.Push(src, msg) {
|
2018-01-24 14:04:39 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2019-04-25 18:46:40 +00:00
|
|
|
if p.markSeen(id) {
|
2019-10-18 06:39:37 +00:00
|
|
|
p.publishMessage(msg)
|
2019-04-25 18:46:40 +00:00
|
|
|
}
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
2019-10-18 06:39:37 +00:00
|
|
|
func (p *PubSub) publishMessage(msg *Message) {
|
2019-11-04 17:22:14 +00:00
|
|
|
p.tracer.DeliverMessage(msg)
|
2019-10-18 06:39:37 +00:00
|
|
|
p.notifySubs(msg)
|
2020-03-04 12:39:44 +00:00
|
|
|
p.rt.Publish(msg)
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
2019-10-31 18:26:25 +00:00
|
|
|
type addTopicReq struct {
|
|
|
|
topic *Topic
|
|
|
|
resp chan *Topic
|
|
|
|
}
|
|
|
|
|
|
|
|
type rmTopicReq struct {
|
|
|
|
topic *Topic
|
|
|
|
resp chan error
|
|
|
|
}
|
|
|
|
|
|
|
|
type TopicOptions struct{}
|
|
|
|
|
|
|
|
type TopicOpt func(t *Topic) error
|
|
|
|
|
|
|
|
// Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if
|
|
|
|
// the Topic handle already exists.
|
|
|
|
func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error) {
|
|
|
|
t, ok, err := p.tryJoin(topic, opts...)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
return nil, fmt.Errorf("topic already exists")
|
|
|
|
}
|
|
|
|
|
|
|
|
return t, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// tryJoin is an internal function that tries to join a topic
|
|
|
|
// Returns the topic if it can be created or found
|
|
|
|
// Returns true if the topic was newly created, false otherwise
|
|
|
|
// Can be removed once pubsub.Publish() and pubsub.Subscribe() are removed
|
|
|
|
func (p *PubSub) tryJoin(topic string, opts ...TopicOpt) (*Topic, bool, error) {
|
|
|
|
t := &Topic{
|
|
|
|
p: p,
|
|
|
|
topic: topic,
|
|
|
|
evtHandlers: make(map[*TopicEventHandler]struct{}),
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, opt := range opts {
|
|
|
|
err := opt(t)
|
|
|
|
if err != nil {
|
|
|
|
return nil, false, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
resp := make(chan *Topic, 1)
|
2019-11-03 01:12:21 +00:00
|
|
|
select {
|
|
|
|
case t.p.addTopic <- &addTopicReq{
|
2019-10-31 18:26:25 +00:00
|
|
|
topic: t,
|
|
|
|
resp: resp,
|
2019-11-03 01:12:21 +00:00
|
|
|
}:
|
|
|
|
case <-t.p.ctx.Done():
|
|
|
|
return nil, false, t.p.ctx.Err()
|
2019-10-31 18:26:25 +00:00
|
|
|
}
|
|
|
|
returnedTopic := <-resp
|
|
|
|
|
|
|
|
if returnedTopic != t {
|
|
|
|
return returnedTopic, false, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return t, true, nil
|
|
|
|
}
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
type addSubReq struct {
|
|
|
|
sub *Subscription
|
|
|
|
resp chan *Subscription
|
|
|
|
}
|
|
|
|
|
|
|
|
type SubOpt func(sub *Subscription) error
|
|
|
|
|
2019-01-11 09:46:48 +00:00
|
|
|
// Subscribe returns a new Subscription for the given topic.
|
|
|
|
// Note that subscription is not an instanteneous operation. It may take some time
|
|
|
|
// before the subscription is processed by the pubsub main loop and propagated to our peers.
|
2019-10-31 18:26:25 +00:00
|
|
|
//
|
|
|
|
// Deprecated: use pubsub.Join() and topic.Subscribe() instead
|
2018-01-24 14:04:39 +00:00
|
|
|
func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) {
|
|
|
|
td := pb.TopicDescriptor{Name: &topic}
|
|
|
|
|
|
|
|
return p.SubscribeByTopicDescriptor(&td, opts...)
|
|
|
|
}
|
|
|
|
|
2019-01-11 09:46:48 +00:00
|
|
|
// SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor.
|
2019-10-31 18:26:25 +00:00
|
|
|
//
|
|
|
|
// Deprecated: use pubsub.Join() and topic.Subscribe() instead
|
2018-01-24 14:04:39 +00:00
|
|
|
func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error) {
|
|
|
|
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
|
|
|
|
return nil, fmt.Errorf("auth mode not yet supported")
|
|
|
|
}
|
|
|
|
|
|
|
|
if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE {
|
|
|
|
return nil, fmt.Errorf("encryption mode not yet supported")
|
|
|
|
}
|
|
|
|
|
2019-10-31 18:26:25 +00:00
|
|
|
// ignore whether the topic was newly created or not, since either way we have a valid topic to work with
|
|
|
|
topic, _, err := p.tryJoin(td.GetName())
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
2019-10-31 18:26:25 +00:00
|
|
|
return topic.Subscribe(opts...)
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type topicReq struct {
|
|
|
|
resp chan []string
|
|
|
|
}
|
|
|
|
|
2019-01-04 11:09:21 +00:00
|
|
|
// GetTopics returns the topics this node is subscribed to.
|
2018-01-24 14:04:39 +00:00
|
|
|
func (p *PubSub) GetTopics() []string {
|
|
|
|
out := make(chan []string, 1)
|
2019-11-03 01:12:21 +00:00
|
|
|
select {
|
|
|
|
case p.getTopics <- &topicReq{resp: out}:
|
|
|
|
case <-p.ctx.Done():
|
|
|
|
return nil
|
|
|
|
}
|
2018-01-24 14:04:39 +00:00
|
|
|
return <-out
|
|
|
|
}
|
|
|
|
|
2019-01-04 11:09:21 +00:00
|
|
|
// Publish publishes data to the given topic.
|
2019-10-31 18:26:25 +00:00
|
|
|
//
|
|
|
|
// Deprecated: use pubsub.Join() and topic.Publish() instead
|
|
|
|
func (p *PubSub) Publish(topic string, data []byte, opts ...PubOpt) error {
|
|
|
|
// ignore whether the topic was newly created or not, since either way we have a valid topic to work with
|
|
|
|
t, _, err := p.tryJoin(topic)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
2019-10-31 18:26:25 +00:00
|
|
|
|
|
|
|
return t.Publish(context.TODO(), data, opts...)
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
|
2018-03-10 08:14:21 +00:00
|
|
|
func (p *PubSub) nextSeqno() []byte {
|
|
|
|
seqno := make([]byte, 8)
|
|
|
|
counter := atomic.AddUint64(&p.counter, 1)
|
|
|
|
binary.BigEndian.PutUint64(seqno, counter)
|
|
|
|
return seqno
|
|
|
|
}
|
|
|
|
|
2018-01-24 14:04:39 +00:00
|
|
|
type listPeerReq struct {
|
|
|
|
resp chan []peer.ID
|
|
|
|
topic string
|
|
|
|
}
|
|
|
|
|
2019-01-04 11:09:21 +00:00
|
|
|
// ListPeers returns a list of peers we are connected to in the given topic.
|
2018-01-24 14:04:39 +00:00
|
|
|
func (p *PubSub) ListPeers(topic string) []peer.ID {
|
|
|
|
out := make(chan []peer.ID)
|
2019-11-03 01:12:21 +00:00
|
|
|
select {
|
|
|
|
case p.getPeers <- &listPeerReq{
|
2018-01-24 14:04:39 +00:00
|
|
|
resp: out,
|
|
|
|
topic: topic,
|
2019-11-03 01:12:21 +00:00
|
|
|
}:
|
|
|
|
case <-p.ctx.Done():
|
|
|
|
return nil
|
2018-01-24 14:04:39 +00:00
|
|
|
}
|
|
|
|
return <-out
|
|
|
|
}
|
|
|
|
|
2019-01-15 14:07:58 +00:00
|
|
|
// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
|
|
|
|
func (p *PubSub) BlacklistPeer(pid peer.ID) {
|
2019-11-03 01:12:21 +00:00
|
|
|
select {
|
|
|
|
case p.blacklistPeer <- pid:
|
|
|
|
case <-p.ctx.Done():
|
|
|
|
}
|
2019-01-15 14:07:58 +00:00
|
|
|
}
|
|
|
|
|
2019-01-04 11:09:21 +00:00
|
|
|
// RegisterTopicValidator registers a validator for topic.
|
2019-04-26 08:07:39 +00:00
|
|
|
// By default validators are asynchronous, which means they will run in a separate goroutine.
|
|
|
|
// The number of active goroutines is controlled by global and per topic validator
|
|
|
|
// throttles; if it exceeds the throttle threshold, messages will be dropped.
|
2020-04-28 14:40:36 +00:00
|
|
|
func (p *PubSub) RegisterTopicValidator(topic string, val interface{}, opts ...ValidatorOpt) error {
|
2018-01-24 14:04:39 +00:00
|
|
|
addVal := &addValReq{
|
|
|
|
topic: topic,
|
|
|
|
validate: val,
|
|
|
|
resp: make(chan error, 1),
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, opt := range opts {
|
|
|
|
err := opt(addVal)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-03 01:12:21 +00:00
|
|
|
select {
|
|
|
|
case p.addVal <- addVal:
|
|
|
|
case <-p.ctx.Done():
|
|
|
|
return p.ctx.Err()
|
|
|
|
}
|
2018-01-24 14:04:39 +00:00
|
|
|
return <-addVal.resp
|
|
|
|
}
|
|
|
|
|
2019-01-04 11:09:21 +00:00
|
|
|
// UnregisterTopicValidator removes a validator from a topic.
|
|
|
|
// Returns an error if there was no validator registered with the topic.
|
2018-02-28 20:14:54 +00:00
|
|
|
func (p *PubSub) UnregisterTopicValidator(topic string) error {
|
|
|
|
rmVal := &rmValReq{
|
|
|
|
topic: topic,
|
|
|
|
resp: make(chan error, 1),
|
|
|
|
}
|
|
|
|
|
2019-11-03 01:12:21 +00:00
|
|
|
select {
|
|
|
|
case p.rmVal <- rmVal:
|
|
|
|
case <-p.ctx.Done():
|
|
|
|
return p.ctx.Err()
|
|
|
|
}
|
2018-02-28 20:14:54 +00:00
|
|
|
return <-rmVal.resp
|
|
|
|
}
|
2020-04-29 11:05:31 +00:00
|
|
|
|
2020-04-29 11:33:09 +00:00
|
|
|
type RelayCancelFunc func()
|
|
|
|
|
2020-04-29 11:05:31 +00:00
|
|
|
type addRelayReq struct {
|
|
|
|
topic string
|
2020-04-29 11:33:09 +00:00
|
|
|
resp chan RelayCancelFunc
|
2020-04-29 11:05:31 +00:00
|
|
|
}
|