modularized pubsub; Flooding is just a routing method.
This commit is contained in:
parent
c036ca0c01
commit
e7b1fe6e75
646
floodsub.go
646
floodsub.go
|
@ -2,454 +2,48 @@ package floodsub
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
pb "github.com/libp2p/go-floodsub/pb"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
host "github.com/libp2p/go-libp2p-host"
|
||||
inet "github.com/libp2p/go-libp2p-net"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
protocol "github.com/libp2p/go-libp2p-protocol"
|
||||
timecache "github.com/whyrusleeping/timecache"
|
||||
)
|
||||
|
||||
const (
|
||||
ID = protocol.ID("/floodsub/1.0.0")
|
||||
defaultValidateTimeout = 150 * time.Millisecond
|
||||
defaultValidateConcurrency = 100
|
||||
defaultValidateThrottle = 8192
|
||||
FloodSubID = protocol.ID("/floodsub/1.0.0")
|
||||
)
|
||||
|
||||
var log = logging.Logger("floodsub")
|
||||
|
||||
type PubSub struct {
|
||||
host host.Host
|
||||
|
||||
// incoming messages from other peers
|
||||
incoming chan *RPC
|
||||
|
||||
// messages we are publishing out to our peers
|
||||
publish chan *Message
|
||||
|
||||
// addSub is a control channel for us to add and remove subscriptions
|
||||
addSub chan *addSubReq
|
||||
|
||||
// get list of topics we are subscribed to
|
||||
getTopics chan *topicReq
|
||||
|
||||
// get chan of peers we are connected to
|
||||
getPeers chan *listPeerReq
|
||||
|
||||
// send subscription here to cancel it
|
||||
cancelCh chan *Subscription
|
||||
|
||||
// a notification channel for incoming streams from other peers
|
||||
newPeers chan inet.Stream
|
||||
|
||||
// a notification channel for when our peers die
|
||||
peerDead chan peer.ID
|
||||
|
||||
// The set of topics we are subscribed to
|
||||
myTopics map[string]map[*Subscription]struct{}
|
||||
|
||||
// topics tracks which topics each of our peers are subscribed to
|
||||
topics map[string]map[peer.ID]struct{}
|
||||
|
||||
// sendMsg handles messages that have been validated
|
||||
sendMsg chan *sendReq
|
||||
|
||||
// addVal handles validator registration requests
|
||||
addVal chan *addValReq
|
||||
|
||||
// topicVals tracks per topic validators
|
||||
topicVals map[string]*topicVal
|
||||
|
||||
// validateThrottle limits the number of active validation goroutines
|
||||
validateThrottle chan struct{}
|
||||
|
||||
peers map[peer.ID]chan *RPC
|
||||
seenMessages *timecache.TimeCache
|
||||
|
||||
ctx context.Context
|
||||
|
||||
// atomic counter for seqnos
|
||||
counter uint64
|
||||
}
|
||||
|
||||
type Message struct {
|
||||
*pb.Message
|
||||
}
|
||||
|
||||
func (m *Message) GetFrom() peer.ID {
|
||||
return peer.ID(m.Message.GetFrom())
|
||||
}
|
||||
|
||||
type RPC struct {
|
||||
pb.RPC
|
||||
|
||||
// unexported on purpose, not sending this over the wire
|
||||
from peer.ID
|
||||
}
|
||||
|
||||
type Option func(*PubSub) error
|
||||
|
||||
// NewFloodSub returns a new FloodSub management object
|
||||
// NewFloodSub returns a new PubSub object using the FloodSubRouter
|
||||
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
|
||||
ps := &PubSub{
|
||||
host: h,
|
||||
ctx: ctx,
|
||||
incoming: make(chan *RPC, 32),
|
||||
publish: make(chan *Message),
|
||||
newPeers: make(chan inet.Stream),
|
||||
peerDead: make(chan peer.ID),
|
||||
cancelCh: make(chan *Subscription),
|
||||
getPeers: make(chan *listPeerReq),
|
||||
addSub: make(chan *addSubReq),
|
||||
getTopics: make(chan *topicReq),
|
||||
sendMsg: make(chan *sendReq, 32),
|
||||
addVal: make(chan *addValReq),
|
||||
validateThrottle: make(chan struct{}, defaultValidateThrottle),
|
||||
myTopics: make(map[string]map[*Subscription]struct{}),
|
||||
topics: make(map[string]map[peer.ID]struct{}),
|
||||
peers: make(map[peer.ID]chan *RPC),
|
||||
topicVals: make(map[string]*topicVal),
|
||||
seenMessages: timecache.NewTimeCache(time.Second * 30),
|
||||
counter: uint64(time.Now().UnixNano()),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
err := opt(ps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
h.SetStreamHandler(ID, ps.handleNewStream)
|
||||
h.Network().Notify((*PubSubNotif)(ps))
|
||||
|
||||
go ps.processLoop(ctx)
|
||||
|
||||
return ps, nil
|
||||
rt := &FloodSubRouter{}
|
||||
return NewPubSub(ctx, h, rt, opts...)
|
||||
}
|
||||
|
||||
func WithValidateThrottle(n int) Option {
|
||||
return func(ps *PubSub) error {
|
||||
ps.validateThrottle = make(chan struct{}, n)
|
||||
return nil
|
||||
}
|
||||
type FloodSubRouter struct {
|
||||
p *PubSub
|
||||
}
|
||||
|
||||
// processLoop handles all inputs arriving on the channels
|
||||
func (p *PubSub) processLoop(ctx context.Context) {
|
||||
defer func() {
|
||||
// Clean up go routines.
|
||||
for _, ch := range p.peers {
|
||||
close(ch)
|
||||
}
|
||||
p.peers = nil
|
||||
p.topics = nil
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case s := <-p.newPeers:
|
||||
pid := s.Conn().RemotePeer()
|
||||
ch, ok := p.peers[pid]
|
||||
if ok {
|
||||
log.Error("already have connection to peer: ", pid)
|
||||
close(ch)
|
||||
}
|
||||
|
||||
messages := make(chan *RPC, 32)
|
||||
go p.handleSendingMessages(ctx, s, messages)
|
||||
messages <- p.getHelloPacket()
|
||||
|
||||
p.peers[pid] = messages
|
||||
|
||||
case pid := <-p.peerDead:
|
||||
ch, ok := p.peers[pid]
|
||||
if ok {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
delete(p.peers, pid)
|
||||
for _, t := range p.topics {
|
||||
delete(t, pid)
|
||||
}
|
||||
case treq := <-p.getTopics:
|
||||
var out []string
|
||||
for t := range p.myTopics {
|
||||
out = append(out, t)
|
||||
}
|
||||
treq.resp <- out
|
||||
case sub := <-p.cancelCh:
|
||||
p.handleRemoveSubscription(sub)
|
||||
case sub := <-p.addSub:
|
||||
p.handleAddSubscription(sub)
|
||||
case preq := <-p.getPeers:
|
||||
tmap, ok := p.topics[preq.topic]
|
||||
if preq.topic != "" && !ok {
|
||||
preq.resp <- nil
|
||||
continue
|
||||
}
|
||||
var peers []peer.ID
|
||||
for p := range p.peers {
|
||||
if preq.topic != "" {
|
||||
_, ok := tmap[p]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
peers = append(peers, p)
|
||||
}
|
||||
preq.resp <- peers
|
||||
case rpc := <-p.incoming:
|
||||
p.handleIncomingRPC(rpc)
|
||||
case msg := <-p.publish:
|
||||
vals := p.getValidators(msg)
|
||||
p.pushMsg(vals, p.host.ID(), msg)
|
||||
|
||||
case req := <-p.sendMsg:
|
||||
p.maybePublishMessage(req.from, req.msg.Message)
|
||||
|
||||
case req := <-p.addVal:
|
||||
p.addValidator(req)
|
||||
|
||||
case <-ctx.Done():
|
||||
log.Info("pubsub processloop shutting down")
|
||||
return
|
||||
}
|
||||
}
|
||||
func (fs *FloodSubRouter) Protocols() []protocol.ID {
|
||||
return []protocol.ID{FloodSubID}
|
||||
}
|
||||
|
||||
// handleRemoveSubscription removes Subscription sub from bookeeping.
|
||||
// If this was the last Subscription for a given topic, it will also announce
|
||||
// that this node is not subscribing to this topic anymore.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
|
||||
subs := p.myTopics[sub.topic]
|
||||
|
||||
if subs == nil {
|
||||
return
|
||||
}
|
||||
|
||||
sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()")
|
||||
close(sub.ch)
|
||||
delete(subs, sub)
|
||||
|
||||
if len(subs) == 0 {
|
||||
delete(p.myTopics, sub.topic)
|
||||
p.announce(sub.topic, false)
|
||||
}
|
||||
func (fs *FloodSubRouter) Attach(p *PubSub) {
|
||||
fs.p = p
|
||||
}
|
||||
|
||||
// handleAddSubscription adds a Subscription for a particular topic. If it is
|
||||
// the first Subscription for the topic, it will announce that this node
|
||||
// subscribes to the topic.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) handleAddSubscription(req *addSubReq) {
|
||||
sub := req.sub
|
||||
subs := p.myTopics[sub.topic]
|
||||
func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID) {}
|
||||
|
||||
// announce we want this topic
|
||||
if len(subs) == 0 {
|
||||
p.announce(sub.topic, true)
|
||||
}
|
||||
func (fs *FloodSubRouter) RemovePeer(peer.ID) {}
|
||||
|
||||
// make new if not there
|
||||
if subs == nil {
|
||||
p.myTopics[sub.topic] = make(map[*Subscription]struct{})
|
||||
subs = p.myTopics[sub.topic]
|
||||
}
|
||||
|
||||
sub.ch = make(chan *Message, 32)
|
||||
sub.cancelCh = p.cancelCh
|
||||
|
||||
p.myTopics[sub.topic][sub] = struct{}{}
|
||||
|
||||
req.resp <- sub
|
||||
func (fs *FloodSubRouter) HandleRPC(rpc *RPC) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// announce announces whether or not this node is interested in a given topic
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) announce(topic string, sub bool) {
|
||||
subopt := &pb.RPC_SubOpts{
|
||||
Topicid: &topic,
|
||||
Subscribe: &sub,
|
||||
}
|
||||
|
||||
out := rpcWithSubs(subopt)
|
||||
for pid, peer := range p.peers {
|
||||
select {
|
||||
case peer <- out:
|
||||
default:
|
||||
log.Infof("dropping announce message to peer %s: queue full", pid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// notifySubs sends a given message to all corresponding subscribers.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) notifySubs(msg *pb.Message) {
|
||||
for _, topic := range msg.GetTopicIDs() {
|
||||
subs := p.myTopics[topic]
|
||||
for f := range subs {
|
||||
f.ch <- &Message{msg}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// seenMessage returns whether we already saw this message before
|
||||
func (p *PubSub) seenMessage(id string) bool {
|
||||
return p.seenMessages.Has(id)
|
||||
}
|
||||
|
||||
// markSeen marks a message as seen such that seenMessage returns `true' for the given id
|
||||
func (p *PubSub) markSeen(id string) {
|
||||
p.seenMessages.Add(id)
|
||||
}
|
||||
|
||||
// subscribedToMessage returns whether we are subscribed to one of the topics
|
||||
// of a given message
|
||||
func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
|
||||
if len(p.myTopics) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, t := range msg.GetTopicIDs() {
|
||||
if _, ok := p.myTopics[t]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
||||
for _, subopt := range rpc.GetSubscriptions() {
|
||||
t := subopt.GetTopicid()
|
||||
if subopt.GetSubscribe() {
|
||||
tmap, ok := p.topics[t]
|
||||
if !ok {
|
||||
tmap = make(map[peer.ID]struct{})
|
||||
p.topics[t] = tmap
|
||||
}
|
||||
|
||||
tmap[rpc.from] = struct{}{}
|
||||
} else {
|
||||
tmap, ok := p.topics[t]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
delete(tmap, rpc.from)
|
||||
}
|
||||
}
|
||||
|
||||
for _, pmsg := range rpc.GetPublish() {
|
||||
if !p.subscribedToMsg(pmsg) {
|
||||
log.Warning("received message we didn't subscribe to. Dropping.")
|
||||
continue
|
||||
}
|
||||
|
||||
msg := &Message{pmsg}
|
||||
vals := p.getValidators(msg)
|
||||
p.pushMsg(vals, rpc.from, msg)
|
||||
}
|
||||
}
|
||||
|
||||
// msgID returns a unique ID of the passed Message
|
||||
func msgID(pmsg *pb.Message) string {
|
||||
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
|
||||
}
|
||||
|
||||
// pushMsg pushes a message performing validation as necessary
|
||||
func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
|
||||
if len(vals) > 0 {
|
||||
// validation is asynchronous and globally throttled with the throttleValidate semaphore.
|
||||
// the purpose of the global throttle is to bound the goncurrency possible from incoming
|
||||
// network traffic; each validator also has an individual throttle to preclude
|
||||
// slow (or faulty) validators from starving other topics; see validate below.
|
||||
select {
|
||||
case p.validateThrottle <- struct{}{}:
|
||||
go func() {
|
||||
p.validate(vals, src, msg)
|
||||
<-p.validateThrottle
|
||||
}()
|
||||
default:
|
||||
log.Warningf("message validation throttled; dropping message from %s", src)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
p.maybePublishMessage(src, msg.Message)
|
||||
}
|
||||
|
||||
// validate performs validation and only sends the message if all validators succeed
|
||||
func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) {
|
||||
ctx, cancel := context.WithCancel(p.ctx)
|
||||
defer cancel()
|
||||
|
||||
rch := make(chan bool, len(vals))
|
||||
rcount := 0
|
||||
throttle := false
|
||||
|
||||
loop:
|
||||
for _, val := range vals {
|
||||
rcount++
|
||||
|
||||
select {
|
||||
case val.validateThrottle <- struct{}{}:
|
||||
go func(val *topicVal) {
|
||||
rch <- val.validateMsg(ctx, msg)
|
||||
<-val.validateThrottle
|
||||
}(val)
|
||||
|
||||
default:
|
||||
log.Debugf("validation throttled for topic %s", val.topic)
|
||||
throttle = true
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
if throttle {
|
||||
log.Warningf("message validation throttled; dropping message from %s", src)
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < rcount; i++ {
|
||||
valid := <-rch
|
||||
if !valid {
|
||||
log.Warningf("message validation failed; dropping message from %s", src)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// all validators were successful, send the message
|
||||
p.sendMsg <- &sendReq{
|
||||
from: src,
|
||||
msg: msg,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) {
|
||||
id := msgID(pmsg)
|
||||
if p.seenMessage(id) {
|
||||
return
|
||||
}
|
||||
|
||||
p.markSeen(id)
|
||||
|
||||
p.notifySubs(pmsg)
|
||||
|
||||
p.publishMessage(from, pmsg)
|
||||
}
|
||||
|
||||
func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) {
|
||||
func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) error {
|
||||
tosend := make(map[peer.ID]struct{})
|
||||
for _, topic := range msg.GetTopicIDs() {
|
||||
tmap, ok := p.topics[topic]
|
||||
tmap, ok := fs.p.topics[topic]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -465,7 +59,7 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) {
|
|||
continue
|
||||
}
|
||||
|
||||
mch, ok := p.peers[pid]
|
||||
mch, ok := fs.p.peers[pid]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -478,209 +72,3 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getValidators returns all validators that apply to a given message
|
||||
func (p *PubSub) getValidators(msg *Message) []*topicVal {
|
||||
var vals []*topicVal
|
||||
|
||||
for _, topic := range msg.GetTopicIDs() {
|
||||
val, ok := p.topicVals[topic]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
vals = append(vals, val)
|
||||
}
|
||||
|
||||
return vals
|
||||
}
|
||||
|
||||
type addSubReq struct {
|
||||
sub *Subscription
|
||||
resp chan *Subscription
|
||||
}
|
||||
|
||||
type SubOpt func(sub *Subscription) error
|
||||
|
||||
// Subscribe returns a new Subscription for the given topic
|
||||
func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) {
|
||||
td := pb.TopicDescriptor{Name: &topic}
|
||||
|
||||
return p.SubscribeByTopicDescriptor(&td, opts...)
|
||||
}
|
||||
|
||||
// SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor
|
||||
func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error) {
|
||||
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
|
||||
return nil, fmt.Errorf("auth mode not yet supported")
|
||||
}
|
||||
|
||||
if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE {
|
||||
return nil, fmt.Errorf("encryption mode not yet supported")
|
||||
}
|
||||
|
||||
sub := &Subscription{
|
||||
topic: td.GetName(),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
err := opt(sub)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
out := make(chan *Subscription, 1)
|
||||
p.addSub <- &addSubReq{
|
||||
sub: sub,
|
||||
resp: out,
|
||||
}
|
||||
|
||||
return <-out, nil
|
||||
}
|
||||
|
||||
type topicReq struct {
|
||||
resp chan []string
|
||||
}
|
||||
|
||||
// GetTopics returns the topics this node is subscribed to
|
||||
func (p *PubSub) GetTopics() []string {
|
||||
out := make(chan []string, 1)
|
||||
p.getTopics <- &topicReq{resp: out}
|
||||
return <-out
|
||||
}
|
||||
|
||||
// Publish publishes data under the given topic
|
||||
func (p *PubSub) Publish(topic string, data []byte) error {
|
||||
seqno := make([]byte, 8)
|
||||
counter := atomic.AddUint64(&p.counter, 1)
|
||||
binary.BigEndian.PutUint64(seqno, counter)
|
||||
|
||||
p.publish <- &Message{
|
||||
&pb.Message{
|
||||
Data: data,
|
||||
TopicIDs: []string{topic},
|
||||
From: []byte(p.host.ID()),
|
||||
Seqno: seqno,
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type listPeerReq struct {
|
||||
resp chan []peer.ID
|
||||
topic string
|
||||
}
|
||||
|
||||
// sendReq is a request to call maybePublishMessage. It is issued after the subscription verification is done.
|
||||
type sendReq struct {
|
||||
from peer.ID
|
||||
msg *Message
|
||||
}
|
||||
|
||||
// ListPeers returns a list of peers we are connected to.
|
||||
func (p *PubSub) ListPeers(topic string) []peer.ID {
|
||||
out := make(chan []peer.ID)
|
||||
p.getPeers <- &listPeerReq{
|
||||
resp: out,
|
||||
topic: topic,
|
||||
}
|
||||
return <-out
|
||||
}
|
||||
|
||||
// per topic validators
|
||||
type addValReq struct {
|
||||
topic string
|
||||
validate Validator
|
||||
timeout time.Duration
|
||||
throttle int
|
||||
resp chan error
|
||||
}
|
||||
|
||||
type topicVal struct {
|
||||
topic string
|
||||
validate Validator
|
||||
validateTimeout time.Duration
|
||||
validateThrottle chan struct{}
|
||||
}
|
||||
|
||||
// Validator is a function that validates a message
|
||||
type Validator func(context.Context, *Message) bool
|
||||
|
||||
// ValidatorOpt is an option for RegisterTopicValidator
|
||||
type ValidatorOpt func(addVal *addValReq) error
|
||||
|
||||
// WithValidatorTimeout is an option that sets the topic validator timeout
|
||||
func WithValidatorTimeout(timeout time.Duration) ValidatorOpt {
|
||||
return func(addVal *addValReq) error {
|
||||
addVal.timeout = timeout
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithValidatorConcurrency is an option that sets topic validator throttle
|
||||
func WithValidatorConcurrency(n int) ValidatorOpt {
|
||||
return func(addVal *addValReq) error {
|
||||
addVal.throttle = n
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterTopicValidator registers a validator for topic
|
||||
func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error {
|
||||
addVal := &addValReq{
|
||||
topic: topic,
|
||||
validate: val,
|
||||
resp: make(chan error, 1),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
err := opt(addVal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
p.addVal <- addVal
|
||||
return <-addVal.resp
|
||||
}
|
||||
|
||||
func (ps *PubSub) addValidator(req *addValReq) {
|
||||
topic := req.topic
|
||||
|
||||
_, ok := ps.topicVals[topic]
|
||||
if ok {
|
||||
req.resp <- fmt.Errorf("Duplicate validator for topic %s", topic)
|
||||
return
|
||||
}
|
||||
|
||||
val := &topicVal{
|
||||
topic: topic,
|
||||
validate: req.validate,
|
||||
validateTimeout: defaultValidateTimeout,
|
||||
validateThrottle: make(chan struct{}, defaultValidateConcurrency),
|
||||
}
|
||||
|
||||
if req.timeout > 0 {
|
||||
val.validateTimeout = req.timeout
|
||||
}
|
||||
|
||||
if req.throttle > 0 {
|
||||
val.validateThrottle = make(chan struct{}, req.throttle)
|
||||
}
|
||||
|
||||
ps.topicVals[topic] = val
|
||||
req.resp <- nil
|
||||
}
|
||||
|
||||
func (val *topicVal) validateMsg(ctx context.Context, msg *Message) bool {
|
||||
vctx, cancel := context.WithTimeout(ctx, val.validateTimeout)
|
||||
defer cancel()
|
||||
|
||||
valid := val.validate(vctx, msg)
|
||||
if !valid {
|
||||
log.Debugf("validation failed for topic %s", val.topic)
|
||||
}
|
||||
|
||||
return valid
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ func (p *PubSubNotif) ClosedStream(n inet.Network, s inet.Stream) {
|
|||
|
||||
func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn) {
|
||||
go func() {
|
||||
s, err := p.host.NewStream(p.ctx, c.RemotePeer(), ID)
|
||||
s, err := p.host.NewStream(p.ctx, c.RemotePeer(), p.rt.Protocols()...)
|
||||
if err != nil {
|
||||
log.Warning("opening new stream to peer: ", err, c.LocalPeer(), c.RemotePeer())
|
||||
return
|
||||
|
|
|
@ -0,0 +1,683 @@
|
|||
package floodsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
pb "github.com/libp2p/go-floodsub/pb"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
host "github.com/libp2p/go-libp2p-host"
|
||||
inet "github.com/libp2p/go-libp2p-net"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
protocol "github.com/libp2p/go-libp2p-protocol"
|
||||
timecache "github.com/whyrusleeping/timecache"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultValidateTimeout = 150 * time.Millisecond
|
||||
defaultValidateConcurrency = 100
|
||||
defaultValidateThrottle = 8192
|
||||
)
|
||||
|
||||
var log = logging.Logger("pubsub")
|
||||
|
||||
type PubSub struct {
|
||||
host host.Host
|
||||
|
||||
rt PubSubRouter
|
||||
|
||||
// incoming messages from other peers
|
||||
incoming chan *RPC
|
||||
|
||||
// messages we are publishing out to our peers
|
||||
publish chan *Message
|
||||
|
||||
// addSub is a control channel for us to add and remove subscriptions
|
||||
addSub chan *addSubReq
|
||||
|
||||
// get list of topics we are subscribed to
|
||||
getTopics chan *topicReq
|
||||
|
||||
// get chan of peers we are connected to
|
||||
getPeers chan *listPeerReq
|
||||
|
||||
// send subscription here to cancel it
|
||||
cancelCh chan *Subscription
|
||||
|
||||
// a notification channel for incoming streams from other peers
|
||||
newPeers chan inet.Stream
|
||||
|
||||
// a notification channel for when our peers die
|
||||
peerDead chan peer.ID
|
||||
|
||||
// The set of topics we are subscribed to
|
||||
myTopics map[string]map[*Subscription]struct{}
|
||||
|
||||
// topics tracks which topics each of our peers are subscribed to
|
||||
topics map[string]map[peer.ID]struct{}
|
||||
|
||||
// sendMsg handles messages that have been validated
|
||||
sendMsg chan *sendReq
|
||||
|
||||
// addVal handles validator registration requests
|
||||
addVal chan *addValReq
|
||||
|
||||
// topicVals tracks per topic validators
|
||||
topicVals map[string]*topicVal
|
||||
|
||||
// validateThrottle limits the number of active validation goroutines
|
||||
validateThrottle chan struct{}
|
||||
|
||||
peers map[peer.ID]chan *RPC
|
||||
seenMessages *timecache.TimeCache
|
||||
|
||||
ctx context.Context
|
||||
|
||||
// atomic counter for seqnos
|
||||
counter uint64
|
||||
}
|
||||
|
||||
// PubSubRouter is the message router component of PubSub
|
||||
type PubSubRouter interface {
|
||||
Protocols() []protocol.ID
|
||||
Attach(*PubSub)
|
||||
AddPeer(peer.ID, protocol.ID)
|
||||
RemovePeer(peer.ID)
|
||||
HandleRPC(*RPC) error
|
||||
Publish(peer.ID, *pb.Message) error
|
||||
}
|
||||
|
||||
type Message struct {
|
||||
*pb.Message
|
||||
}
|
||||
|
||||
func (m *Message) GetFrom() peer.ID {
|
||||
return peer.ID(m.Message.GetFrom())
|
||||
}
|
||||
|
||||
type RPC struct {
|
||||
pb.RPC
|
||||
|
||||
// unexported on purpose, not sending this over the wire
|
||||
from peer.ID
|
||||
}
|
||||
|
||||
type Option func(*PubSub) error
|
||||
|
||||
// NewFloodSub returns a new PubSub management object
|
||||
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) {
|
||||
ps := &PubSub{
|
||||
host: h,
|
||||
ctx: ctx,
|
||||
rt: rt,
|
||||
incoming: make(chan *RPC, 32),
|
||||
publish: make(chan *Message),
|
||||
newPeers: make(chan inet.Stream),
|
||||
peerDead: make(chan peer.ID),
|
||||
cancelCh: make(chan *Subscription),
|
||||
getPeers: make(chan *listPeerReq),
|
||||
addSub: make(chan *addSubReq),
|
||||
getTopics: make(chan *topicReq),
|
||||
sendMsg: make(chan *sendReq, 32),
|
||||
addVal: make(chan *addValReq),
|
||||
validateThrottle: make(chan struct{}, defaultValidateThrottle),
|
||||
myTopics: make(map[string]map[*Subscription]struct{}),
|
||||
topics: make(map[string]map[peer.ID]struct{}),
|
||||
peers: make(map[peer.ID]chan *RPC),
|
||||
topicVals: make(map[string]*topicVal),
|
||||
seenMessages: timecache.NewTimeCache(time.Second * 30),
|
||||
counter: uint64(time.Now().UnixNano()),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
err := opt(ps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
rt.Attach(ps)
|
||||
|
||||
for _, id := range rt.Protocols() {
|
||||
h.SetStreamHandler(id, ps.handleNewStream)
|
||||
}
|
||||
h.Network().Notify((*PubSubNotif)(ps))
|
||||
|
||||
go ps.processLoop(ctx)
|
||||
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
func WithValidateThrottle(n int) Option {
|
||||
return func(ps *PubSub) error {
|
||||
ps.validateThrottle = make(chan struct{}, n)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// processLoop handles all inputs arriving on the channels
|
||||
func (p *PubSub) processLoop(ctx context.Context) {
|
||||
defer func() {
|
||||
// Clean up go routines.
|
||||
for _, ch := range p.peers {
|
||||
close(ch)
|
||||
}
|
||||
p.peers = nil
|
||||
p.topics = nil
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case s := <-p.newPeers:
|
||||
pid := s.Conn().RemotePeer()
|
||||
ch, ok := p.peers[pid]
|
||||
if ok {
|
||||
log.Error("already have connection to peer: ", pid)
|
||||
close(ch)
|
||||
}
|
||||
|
||||
messages := make(chan *RPC, 32)
|
||||
go p.handleSendingMessages(ctx, s, messages)
|
||||
messages <- p.getHelloPacket()
|
||||
|
||||
p.peers[pid] = messages
|
||||
|
||||
p.rt.AddPeer(pid, s.Protocol())
|
||||
|
||||
case pid := <-p.peerDead:
|
||||
ch, ok := p.peers[pid]
|
||||
if ok {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
delete(p.peers, pid)
|
||||
for _, t := range p.topics {
|
||||
delete(t, pid)
|
||||
}
|
||||
|
||||
p.rt.RemovePeer(pid)
|
||||
|
||||
case treq := <-p.getTopics:
|
||||
var out []string
|
||||
for t := range p.myTopics {
|
||||
out = append(out, t)
|
||||
}
|
||||
treq.resp <- out
|
||||
case sub := <-p.cancelCh:
|
||||
p.handleRemoveSubscription(sub)
|
||||
case sub := <-p.addSub:
|
||||
p.handleAddSubscription(sub)
|
||||
case preq := <-p.getPeers:
|
||||
tmap, ok := p.topics[preq.topic]
|
||||
if preq.topic != "" && !ok {
|
||||
preq.resp <- nil
|
||||
continue
|
||||
}
|
||||
var peers []peer.ID
|
||||
for p := range p.peers {
|
||||
if preq.topic != "" {
|
||||
_, ok := tmap[p]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
peers = append(peers, p)
|
||||
}
|
||||
preq.resp <- peers
|
||||
case rpc := <-p.incoming:
|
||||
err := p.handleIncomingRPC(rpc)
|
||||
if err != nil {
|
||||
log.Error("handling RPC: ", err)
|
||||
continue
|
||||
}
|
||||
case msg := <-p.publish:
|
||||
vals := p.getValidators(msg)
|
||||
p.pushMsg(vals, p.host.ID(), msg)
|
||||
|
||||
case req := <-p.sendMsg:
|
||||
p.maybePublishMessage(req.from, req.msg.Message)
|
||||
|
||||
case req := <-p.addVal:
|
||||
p.addValidator(req)
|
||||
|
||||
case <-ctx.Done():
|
||||
log.Info("pubsub processloop shutting down")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleRemoveSubscription removes Subscription sub from bookeeping.
|
||||
// If this was the last Subscription for a given topic, it will also announce
|
||||
// that this node is not subscribing to this topic anymore.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
|
||||
subs := p.myTopics[sub.topic]
|
||||
|
||||
if subs == nil {
|
||||
return
|
||||
}
|
||||
|
||||
sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()")
|
||||
close(sub.ch)
|
||||
delete(subs, sub)
|
||||
|
||||
if len(subs) == 0 {
|
||||
delete(p.myTopics, sub.topic)
|
||||
p.announce(sub.topic, false)
|
||||
}
|
||||
}
|
||||
|
||||
// handleAddSubscription adds a Subscription for a particular topic. If it is
|
||||
// the first Subscription for the topic, it will announce that this node
|
||||
// subscribes to the topic.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) handleAddSubscription(req *addSubReq) {
|
||||
sub := req.sub
|
||||
subs := p.myTopics[sub.topic]
|
||||
|
||||
// announce we want this topic
|
||||
if len(subs) == 0 {
|
||||
p.announce(sub.topic, true)
|
||||
}
|
||||
|
||||
// make new if not there
|
||||
if subs == nil {
|
||||
p.myTopics[sub.topic] = make(map[*Subscription]struct{})
|
||||
subs = p.myTopics[sub.topic]
|
||||
}
|
||||
|
||||
sub.ch = make(chan *Message, 32)
|
||||
sub.cancelCh = p.cancelCh
|
||||
|
||||
p.myTopics[sub.topic][sub] = struct{}{}
|
||||
|
||||
req.resp <- sub
|
||||
}
|
||||
|
||||
// announce announces whether or not this node is interested in a given topic
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) announce(topic string, sub bool) {
|
||||
subopt := &pb.RPC_SubOpts{
|
||||
Topicid: &topic,
|
||||
Subscribe: &sub,
|
||||
}
|
||||
|
||||
out := rpcWithSubs(subopt)
|
||||
for pid, peer := range p.peers {
|
||||
select {
|
||||
case peer <- out:
|
||||
default:
|
||||
log.Infof("dropping announce message to peer %s: queue full", pid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// notifySubs sends a given message to all corresponding subscribers.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) notifySubs(msg *pb.Message) {
|
||||
for _, topic := range msg.GetTopicIDs() {
|
||||
subs := p.myTopics[topic]
|
||||
for f := range subs {
|
||||
f.ch <- &Message{msg}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// seenMessage returns whether we already saw this message before
|
||||
func (p *PubSub) seenMessage(id string) bool {
|
||||
return p.seenMessages.Has(id)
|
||||
}
|
||||
|
||||
// markSeen marks a message as seen such that seenMessage returns `true' for the given id
|
||||
func (p *PubSub) markSeen(id string) {
|
||||
p.seenMessages.Add(id)
|
||||
}
|
||||
|
||||
// subscribedToMessage returns whether we are subscribed to one of the topics
|
||||
// of a given message
|
||||
func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
|
||||
if len(p.myTopics) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, t := range msg.GetTopicIDs() {
|
||||
if _, ok := p.myTopics[t]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
|
||||
for _, subopt := range rpc.GetSubscriptions() {
|
||||
t := subopt.GetTopicid()
|
||||
if subopt.GetSubscribe() {
|
||||
tmap, ok := p.topics[t]
|
||||
if !ok {
|
||||
tmap = make(map[peer.ID]struct{})
|
||||
p.topics[t] = tmap
|
||||
}
|
||||
|
||||
tmap[rpc.from] = struct{}{}
|
||||
} else {
|
||||
tmap, ok := p.topics[t]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
delete(tmap, rpc.from)
|
||||
}
|
||||
}
|
||||
|
||||
for _, pmsg := range rpc.GetPublish() {
|
||||
if !p.subscribedToMsg(pmsg) {
|
||||
log.Warning("received message we didn't subscribe to. Dropping.")
|
||||
continue
|
||||
}
|
||||
|
||||
msg := &Message{pmsg}
|
||||
vals := p.getValidators(msg)
|
||||
p.pushMsg(vals, rpc.from, msg)
|
||||
}
|
||||
|
||||
return p.rt.HandleRPC(rpc)
|
||||
}
|
||||
|
||||
// msgID returns a unique ID of the passed Message
|
||||
func msgID(pmsg *pb.Message) string {
|
||||
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
|
||||
}
|
||||
|
||||
// pushMsg pushes a message performing validation as necessary
|
||||
func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
|
||||
if len(vals) > 0 {
|
||||
// validation is asynchronous and globally throttled with the throttleValidate semaphore.
|
||||
// the purpose of the global throttle is to bound the goncurrency possible from incoming
|
||||
// network traffic; each validator also has an individual throttle to preclude
|
||||
// slow (or faulty) validators from starving other topics; see validate below.
|
||||
select {
|
||||
case p.validateThrottle <- struct{}{}:
|
||||
go func() {
|
||||
p.validate(vals, src, msg)
|
||||
<-p.validateThrottle
|
||||
}()
|
||||
default:
|
||||
log.Warningf("message validation throttled; dropping message from %s", src)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
p.maybePublishMessage(src, msg.Message)
|
||||
}
|
||||
|
||||
// validate performs validation and only sends the message if all validators succeed
|
||||
func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) {
|
||||
ctx, cancel := context.WithCancel(p.ctx)
|
||||
defer cancel()
|
||||
|
||||
rch := make(chan bool, len(vals))
|
||||
rcount := 0
|
||||
throttle := false
|
||||
|
||||
loop:
|
||||
for _, val := range vals {
|
||||
rcount++
|
||||
|
||||
select {
|
||||
case val.validateThrottle <- struct{}{}:
|
||||
go func(val *topicVal) {
|
||||
rch <- val.validateMsg(ctx, msg)
|
||||
<-val.validateThrottle
|
||||
}(val)
|
||||
|
||||
default:
|
||||
log.Debugf("validation throttled for topic %s", val.topic)
|
||||
throttle = true
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
if throttle {
|
||||
log.Warningf("message validation throttled; dropping message from %s", src)
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < rcount; i++ {
|
||||
valid := <-rch
|
||||
if !valid {
|
||||
log.Warningf("message validation failed; dropping message from %s", src)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// all validators were successful, send the message
|
||||
p.sendMsg <- &sendReq{
|
||||
from: src,
|
||||
msg: msg,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) {
|
||||
id := msgID(pmsg)
|
||||
if p.seenMessage(id) {
|
||||
return
|
||||
}
|
||||
|
||||
p.markSeen(id)
|
||||
|
||||
p.notifySubs(pmsg)
|
||||
|
||||
err := p.rt.Publish(from, pmsg)
|
||||
if err != nil {
|
||||
log.Error("publish message: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
// getValidators returns all validators that apply to a given message
|
||||
func (p *PubSub) getValidators(msg *Message) []*topicVal {
|
||||
var vals []*topicVal
|
||||
|
||||
for _, topic := range msg.GetTopicIDs() {
|
||||
val, ok := p.topicVals[topic]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
vals = append(vals, val)
|
||||
}
|
||||
|
||||
return vals
|
||||
}
|
||||
|
||||
type addSubReq struct {
|
||||
sub *Subscription
|
||||
resp chan *Subscription
|
||||
}
|
||||
|
||||
type SubOpt func(sub *Subscription) error
|
||||
|
||||
// Subscribe returns a new Subscription for the given topic
|
||||
func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) {
|
||||
td := pb.TopicDescriptor{Name: &topic}
|
||||
|
||||
return p.SubscribeByTopicDescriptor(&td, opts...)
|
||||
}
|
||||
|
||||
// SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor
|
||||
func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error) {
|
||||
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
|
||||
return nil, fmt.Errorf("auth mode not yet supported")
|
||||
}
|
||||
|
||||
if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE {
|
||||
return nil, fmt.Errorf("encryption mode not yet supported")
|
||||
}
|
||||
|
||||
sub := &Subscription{
|
||||
topic: td.GetName(),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
err := opt(sub)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
out := make(chan *Subscription, 1)
|
||||
p.addSub <- &addSubReq{
|
||||
sub: sub,
|
||||
resp: out,
|
||||
}
|
||||
|
||||
return <-out, nil
|
||||
}
|
||||
|
||||
type topicReq struct {
|
||||
resp chan []string
|
||||
}
|
||||
|
||||
// GetTopics returns the topics this node is subscribed to
|
||||
func (p *PubSub) GetTopics() []string {
|
||||
out := make(chan []string, 1)
|
||||
p.getTopics <- &topicReq{resp: out}
|
||||
return <-out
|
||||
}
|
||||
|
||||
// Publish publishes data under the given topic
|
||||
func (p *PubSub) Publish(topic string, data []byte) error {
|
||||
seqno := make([]byte, 8)
|
||||
counter := atomic.AddUint64(&p.counter, 1)
|
||||
binary.BigEndian.PutUint64(seqno, counter)
|
||||
|
||||
p.publish <- &Message{
|
||||
&pb.Message{
|
||||
Data: data,
|
||||
TopicIDs: []string{topic},
|
||||
From: []byte(p.host.ID()),
|
||||
Seqno: seqno,
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type listPeerReq struct {
|
||||
resp chan []peer.ID
|
||||
topic string
|
||||
}
|
||||
|
||||
// sendReq is a request to call maybePublishMessage. It is issued after the subscription verification is done.
|
||||
type sendReq struct {
|
||||
from peer.ID
|
||||
msg *Message
|
||||
}
|
||||
|
||||
// ListPeers returns a list of peers we are connected to.
|
||||
func (p *PubSub) ListPeers(topic string) []peer.ID {
|
||||
out := make(chan []peer.ID)
|
||||
p.getPeers <- &listPeerReq{
|
||||
resp: out,
|
||||
topic: topic,
|
||||
}
|
||||
return <-out
|
||||
}
|
||||
|
||||
// per topic validators
|
||||
type addValReq struct {
|
||||
topic string
|
||||
validate Validator
|
||||
timeout time.Duration
|
||||
throttle int
|
||||
resp chan error
|
||||
}
|
||||
|
||||
type topicVal struct {
|
||||
topic string
|
||||
validate Validator
|
||||
validateTimeout time.Duration
|
||||
validateThrottle chan struct{}
|
||||
}
|
||||
|
||||
// Validator is a function that validates a message
|
||||
type Validator func(context.Context, *Message) bool
|
||||
|
||||
// ValidatorOpt is an option for RegisterTopicValidator
|
||||
type ValidatorOpt func(addVal *addValReq) error
|
||||
|
||||
// WithValidatorTimeout is an option that sets the topic validator timeout
|
||||
func WithValidatorTimeout(timeout time.Duration) ValidatorOpt {
|
||||
return func(addVal *addValReq) error {
|
||||
addVal.timeout = timeout
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithValidatorConcurrency is an option that sets topic validator throttle
|
||||
func WithValidatorConcurrency(n int) ValidatorOpt {
|
||||
return func(addVal *addValReq) error {
|
||||
addVal.throttle = n
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterTopicValidator registers a validator for topic
|
||||
func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error {
|
||||
addVal := &addValReq{
|
||||
topic: topic,
|
||||
validate: val,
|
||||
resp: make(chan error, 1),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
err := opt(addVal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
p.addVal <- addVal
|
||||
return <-addVal.resp
|
||||
}
|
||||
|
||||
func (ps *PubSub) addValidator(req *addValReq) {
|
||||
topic := req.topic
|
||||
|
||||
_, ok := ps.topicVals[topic]
|
||||
if ok {
|
||||
req.resp <- fmt.Errorf("Duplicate validator for topic %s", topic)
|
||||
return
|
||||
}
|
||||
|
||||
val := &topicVal{
|
||||
topic: topic,
|
||||
validate: req.validate,
|
||||
validateTimeout: defaultValidateTimeout,
|
||||
validateThrottle: make(chan struct{}, defaultValidateConcurrency),
|
||||
}
|
||||
|
||||
if req.timeout > 0 {
|
||||
val.validateTimeout = req.timeout
|
||||
}
|
||||
|
||||
if req.throttle > 0 {
|
||||
val.validateThrottle = make(chan struct{}, req.throttle)
|
||||
}
|
||||
|
||||
ps.topicVals[topic] = val
|
||||
req.resp <- nil
|
||||
}
|
||||
|
||||
func (val *topicVal) validateMsg(ctx context.Context, msg *Message) bool {
|
||||
vctx, cancel := context.WithTimeout(ctx, val.validateTimeout)
|
||||
defer cancel()
|
||||
|
||||
valid := val.validate(vctx, msg)
|
||||
if !valid {
|
||||
log.Debugf("validation failed for topic %s", val.topic)
|
||||
}
|
||||
|
||||
return valid
|
||||
}
|
Loading…
Reference in New Issue