close streams and ignore blacklisted peers
This commit is contained in:
parent
cfb9a1dc96
commit
654b4e9bf3
26
pubsub.go
26
pubsub.go
|
@ -288,6 +288,12 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_, ok = p.blacklist[pid]
|
||||||
|
if ok {
|
||||||
|
log.Warning("ignoring connection from blacklisted peer: ", pid)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
messages := make(chan *RPC, 32)
|
messages := make(chan *RPC, 32)
|
||||||
messages <- p.getHelloPacket()
|
messages <- p.getHelloPacket()
|
||||||
go p.handleNewPeer(ctx, pid, messages)
|
go p.handleNewPeer(ctx, pid, messages)
|
||||||
|
@ -296,13 +302,21 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||||
case s := <-p.newPeerStream:
|
case s := <-p.newPeerStream:
|
||||||
pid := s.Conn().RemotePeer()
|
pid := s.Conn().RemotePeer()
|
||||||
|
|
||||||
_, ok := p.peers[pid]
|
ch, ok := p.peers[pid]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warning("new stream for unknown peer: ", pid)
|
log.Warning("new stream for unknown peer: ", pid)
|
||||||
s.Reset()
|
s.Reset()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_, ok = p.blacklist[pid]
|
||||||
|
if ok {
|
||||||
|
log.Warning("closing stream for blacklisted peer: ", pid)
|
||||||
|
close(ch)
|
||||||
|
s.Reset()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
p.rt.AddPeer(pid, s.Protocol())
|
p.rt.AddPeer(pid, s.Protocol())
|
||||||
|
|
||||||
case pid := <-p.newPeerError:
|
case pid := <-p.newPeerError:
|
||||||
|
@ -384,6 +398,16 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||||
log.Infof("Blacklisting peer %s", pid)
|
log.Infof("Blacklisting peer %s", pid)
|
||||||
p.blacklist[pid] = struct{}{}
|
p.blacklist[pid] = struct{}{}
|
||||||
|
|
||||||
|
ch, ok := p.peers[pid]
|
||||||
|
if ok {
|
||||||
|
close(ch)
|
||||||
|
delete(p.peers, pid)
|
||||||
|
for _, t := range p.topics {
|
||||||
|
delete(t, pid)
|
||||||
|
}
|
||||||
|
p.rt.RemovePeer(pid)
|
||||||
|
}
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.Info("pubsub processloop shutting down")
|
log.Info("pubsub processloop shutting down")
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue