Add EvtPeerConnectednessChanged
This commit is contained in:
parent
27056d061f
commit
e9c89c8f00
@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/discovery"
|
||||
"github.com/libp2p/go-libp2p-core/event"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@ -472,6 +473,8 @@ func WithMaxMessageSize(maxMessageSize int) Option {
|
||||
|
||||
// processLoop handles all inputs arriving on the channels
|
||||
func (p *PubSub) processLoop(ctx context.Context) {
|
||||
em, _ := p.host.EventBus().Emitter(new(event.EvtPeerConnectednessChanged))
|
||||
|
||||
defer func() {
|
||||
// Clean up go routines.
|
||||
for _, ch := range p.peers {
|
||||
@ -479,6 +482,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||
}
|
||||
p.peers = nil
|
||||
p.topics = nil
|
||||
em.Close() // MUST call this after being done with the emitter
|
||||
}()
|
||||
|
||||
for {
|
||||
@ -499,6 +503,8 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||
go p.handleNewPeer(ctx, pid, messages)
|
||||
p.peers[pid] = messages
|
||||
|
||||
em.Emit(event.EvtPeerConnectednessChanged{Peer: pid, Connectedness: network.Connected})
|
||||
|
||||
case s := <-p.newPeerStream:
|
||||
pid := s.Conn().RemotePeer()
|
||||
|
||||
@ -520,6 +526,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||
|
||||
case pid := <-p.newPeerError:
|
||||
delete(p.peers, pid)
|
||||
em.Emit(event.EvtPeerConnectednessChanged{Peer: pid, Connectedness: network.NotConnected})
|
||||
|
||||
case pid := <-p.peerDead:
|
||||
ch, ok := p.peers[pid]
|
||||
@ -549,6 +556,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||
}
|
||||
|
||||
p.rt.RemovePeer(pid)
|
||||
em.Emit(event.EvtPeerConnectednessChanged{Peer: pid, Connectedness: network.NotConnected})
|
||||
|
||||
case treq := <-p.getTopics:
|
||||
var out []string
|
||||
|
Loading…
x
Reference in New Issue
Block a user