implement peer blacklist
This commit is contained in:
parent
fef794ab97
commit
cfb9a1dc96
21
pubsub.go
21
pubsub.go
|
@ -98,6 +98,10 @@ type PubSub struct {
|
||||||
// eval thunk in event loop
|
// eval thunk in event loop
|
||||||
eval chan func()
|
eval chan func()
|
||||||
|
|
||||||
|
// peer blacklist
|
||||||
|
blacklist map[peer.ID]struct{}
|
||||||
|
blacklistPeer chan peer.ID
|
||||||
|
|
||||||
peers map[peer.ID]chan *RPC
|
peers map[peer.ID]chan *RPC
|
||||||
seenMessages *timecache.TimeCache
|
seenMessages *timecache.TimeCache
|
||||||
|
|
||||||
|
@ -179,6 +183,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||||
topics: make(map[string]map[peer.ID]struct{}),
|
topics: make(map[string]map[peer.ID]struct{}),
|
||||||
peers: make(map[peer.ID]chan *RPC),
|
peers: make(map[peer.ID]chan *RPC),
|
||||||
topicVals: make(map[string]*topicVal),
|
topicVals: make(map[string]*topicVal),
|
||||||
|
blacklist: make(map[peer.ID]struct{}),
|
||||||
|
blacklistPeer: make(chan peer.ID),
|
||||||
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
|
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
|
||||||
counter: uint64(time.Now().UnixNano()),
|
counter: uint64(time.Now().UnixNano()),
|
||||||
}
|
}
|
||||||
|
@ -374,6 +380,10 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||||
case thunk := <-p.eval:
|
case thunk := <-p.eval:
|
||||||
thunk()
|
thunk()
|
||||||
|
|
||||||
|
case pid := <-p.blacklistPeer:
|
||||||
|
log.Infof("Blacklisting peer %s", pid)
|
||||||
|
p.blacklist[pid] = struct{}{}
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.Info("pubsub processloop shutting down")
|
log.Info("pubsub processloop shutting down")
|
||||||
return
|
return
|
||||||
|
@ -567,6 +577,12 @@ func msgID(pmsg *pb.Message) string {
|
||||||
|
|
||||||
// pushMsg pushes a message performing validation as necessary
|
// pushMsg pushes a message performing validation as necessary
|
||||||
func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
|
func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
|
||||||
|
// reject messages from blacklisted peers
|
||||||
|
if _, ok := p.blacklist[src]; ok {
|
||||||
|
log.Warningf("dropping message from blacklisted peer %s", src)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// reject unsigned messages when strict before we even process the id
|
// reject unsigned messages when strict before we even process the id
|
||||||
if p.signStrict && msg.Signature == nil {
|
if p.signStrict && msg.Signature == nil {
|
||||||
log.Debugf("dropping unsigned message from %s", src)
|
log.Debugf("dropping unsigned message from %s", src)
|
||||||
|
@ -821,6 +837,11 @@ func (p *PubSub) ListPeers(topic string) []peer.ID {
|
||||||
return <-out
|
return <-out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
|
||||||
|
func (p *PubSub) BlacklistPeer(pid peer.ID) {
|
||||||
|
p.blacklistPeer <- pid
|
||||||
|
}
|
||||||
|
|
||||||
// per topic validators
|
// per topic validators
|
||||||
type addValReq struct {
|
type addValReq struct {
|
||||||
topic string
|
topic string
|
||||||
|
|
Loading…
Reference in New Issue