mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-07 23:33:08 +00:00
improve handling of dead peers (#508)
This commit is contained in:
parent
aed7fc42c1
commit
76124145d0
18
comm.go
18
comm.go
@ -13,7 +13,6 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-msgio"
|
||||
"github.com/libp2p/go-msgio/protoio"
|
||||
|
||||
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
)
|
||||
@ -126,7 +125,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
|
||||
}
|
||||
|
||||
go p.handleSendingMessages(ctx, s, outgoing)
|
||||
go p.handlePeerEOF(ctx, s)
|
||||
go p.handlePeerDead(s)
|
||||
select {
|
||||
case p.newPeerStream <- s:
|
||||
case <-ctx.Done():
|
||||
@ -142,19 +141,16 @@ func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, back
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
|
||||
func (p *PubSub) handlePeerDead(s network.Stream) {
|
||||
pid := s.Conn().RemotePeer()
|
||||
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
|
||||
rpc := new(RPC)
|
||||
for {
|
||||
err := r.ReadMsg(&rpc.RPC)
|
||||
if err != nil {
|
||||
p.notifyPeerDead(pid)
|
||||
return
|
||||
}
|
||||
|
||||
_, err := s.Read([]byte{0})
|
||||
if err == nil {
|
||||
log.Debugf("unexpected message from %s", pid)
|
||||
}
|
||||
|
||||
s.Reset()
|
||||
p.notifyPeerDead(pid)
|
||||
}
|
||||
|
||||
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user