From e9c89c8f00f5cd53187d4423756f863a5fb0392a Mon Sep 17 00:00:00 2001 From: Vitaliy Vlasov Date: Fri, 11 Jun 2021 14:16:36 +0300 Subject: [PATCH] Add EvtPeerConnectednessChanged --- pubsub.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pubsub.go b/pubsub.go index fe60d9c..666c336 100644 --- a/pubsub.go +++ b/pubsub.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -472,6 +473,8 @@ func WithMaxMessageSize(maxMessageSize int) Option { // processLoop handles all inputs arriving on the channels func (p *PubSub) processLoop(ctx context.Context) { + em, _ := p.host.EventBus().Emitter(new(event.EvtPeerConnectednessChanged)) + defer func() { // Clean up go routines. for _, ch := range p.peers { @@ -479,6 +482,7 @@ func (p *PubSub) processLoop(ctx context.Context) { } p.peers = nil p.topics = nil + em.Close() // MUST call this after being done with the emitter }() for { @@ -499,6 +503,8 @@ func (p *PubSub) processLoop(ctx context.Context) { go p.handleNewPeer(ctx, pid, messages) p.peers[pid] = messages + em.Emit(event.EvtPeerConnectednessChanged{Peer: pid, Connectedness: network.Connected}) + case s := <-p.newPeerStream: pid := s.Conn().RemotePeer() @@ -520,6 +526,7 @@ func (p *PubSub) processLoop(ctx context.Context) { case pid := <-p.newPeerError: delete(p.peers, pid) + em.Emit(event.EvtPeerConnectednessChanged{Peer: pid, Connectedness: network.NotConnected}) case pid := <-p.peerDead: ch, ok := p.peers[pid] @@ -549,6 +556,7 @@ func (p *PubSub) processLoop(ctx context.Context) { } p.rt.RemovePeer(pid) + em.Emit(event.EvtPeerConnectednessChanged{Peer: pid, Connectedness: network.NotConnected}) case treq := <-p.getTopics: var out []string