diff --git a/pubsub.go b/pubsub.go index 8b4a8a7..1a85642 100644 --- a/pubsub.go +++ b/pubsub.go @@ -288,6 +288,12 @@ func (p *PubSub) processLoop(ctx context.Context) { continue } + _, ok = p.blacklist[pid] + if ok { + log.Warning("ignoring connection from blacklisted peer: ", pid) + continue + } + messages := make(chan *RPC, 32) messages <- p.getHelloPacket() go p.handleNewPeer(ctx, pid, messages) @@ -296,13 +302,21 @@ func (p *PubSub) processLoop(ctx context.Context) { case s := <-p.newPeerStream: pid := s.Conn().RemotePeer() - _, ok := p.peers[pid] + ch, ok := p.peers[pid] if !ok { log.Warning("new stream for unknown peer: ", pid) s.Reset() continue } + _, ok = p.blacklist[pid] + if ok { + log.Warning("closing stream for blacklisted peer: ", pid) + close(ch) + s.Reset() + continue + } + p.rt.AddPeer(pid, s.Protocol()) case pid := <-p.newPeerError: @@ -384,6 +398,16 @@ func (p *PubSub) processLoop(ctx context.Context) { log.Infof("Blacklisting peer %s", pid) p.blacklist[pid] = struct{}{} + ch, ok := p.peers[pid] + if ok { + close(ch) + delete(p.peers, pid) + for _, t := range p.topics { + delete(t, pid) + } + p.rt.RemovePeer(pid) + } + case <-ctx.Done(): log.Info("pubsub processloop shutting down") return