Merge pull request #132 from libp2p/fix/peer-tracking
rework peer tracking logic to handle multiple connections
This commit is contained in:
commit
a12c523316
53
comm.go
53
comm.go
|
@ -10,6 +10,8 @@ import (
|
|||
ggio "github.com/gogo/protobuf/io"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
inet "github.com/libp2p/go-libp2p-net"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
ms "github.com/multiformats/go-multistream"
|
||||
)
|
||||
|
||||
// get the initial RPC containing all of our subscriptions to send to new peers
|
||||
|
@ -39,10 +41,6 @@ func (p *PubSub) handleNewStream(s inet.Stream) {
|
|||
// but it doesn't hurt to send it.
|
||||
s.Close()
|
||||
}
|
||||
select {
|
||||
case p.peerDead <- s.Conn().RemotePeer():
|
||||
case <-p.ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -57,6 +55,49 @@ func (p *PubSub) handleNewStream(s inet.Stream) {
|
|||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
|
||||
s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...)
|
||||
if err != nil {
|
||||
log.Warning("opening new stream to peer: ", err, pid)
|
||||
|
||||
var ch chan peer.ID
|
||||
if err == ms.ErrNotSupported {
|
||||
ch = p.newPeerError
|
||||
} else {
|
||||
ch = p.peerDead
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- pid:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
go p.handleSendingMessages(ctx, s, outgoing)
|
||||
go p.handlePeerEOF(ctx, s)
|
||||
select {
|
||||
case p.newPeerStream <- s:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handlePeerEOF(ctx context.Context, s inet.Stream) {
|
||||
r := ggio.NewDelimitedReader(s, 1<<20)
|
||||
rpc := new(RPC)
|
||||
for {
|
||||
err := r.ReadMsg(&rpc.RPC)
|
||||
if err != nil {
|
||||
select {
|
||||
case p.peerDead <- s.Conn().RemotePeer():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Warning("unexpected message from ", s.Conn().RemotePeer())
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgoing <-chan *RPC) {
|
||||
bufw := bufio.NewWriter(s)
|
||||
wc := ggio.NewDelimitedWriter(bufw)
|
||||
|
@ -82,10 +123,6 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo
|
|||
if err != nil {
|
||||
s.Reset()
|
||||
log.Infof("writing message to %s: %s", s.Conn().RemotePeer(), err)
|
||||
select {
|
||||
case p.peerDead <- s.Conn().RemotePeer():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
|
@ -874,9 +874,7 @@ func TestPeerDisconnect(t *testing.T) {
|
|||
peers := psubs[0].ListPeers("foo")
|
||||
assertPeerList(t, peers, hosts[1].ID())
|
||||
for _, c := range hosts[1].Network().ConnsToPeer(hosts[0].ID()) {
|
||||
for _, s := range c.GetStreams() {
|
||||
s.Close()
|
||||
}
|
||||
c.Close()
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
|
|
|
@ -17,16 +17,9 @@ func (p *PubSubNotif) ClosedStream(n inet.Network, s inet.Stream) {
|
|||
|
||||
func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn) {
|
||||
go func() {
|
||||
s, err := p.host.NewStream(p.ctx, c.RemotePeer(), p.rt.Protocols()...)
|
||||
if err != nil {
|
||||
log.Warning("opening new stream to peer: ", err, c.LocalPeer(), c.RemotePeer())
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case p.newPeers <- s:
|
||||
case p.newPeers <- c.RemotePeer():
|
||||
case <-p.ctx.Done():
|
||||
s.Reset()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -71,6 +71,12 @@
|
|||
"hash": "QmNiJiXwWE3kRhZrC5ej3kSjWHm337pYfhjLGSCDNKJP2s",
|
||||
"name": "go-libp2p-crypto",
|
||||
"version": "2.0.4"
|
||||
},
|
||||
{
|
||||
"author": "whyrusleeping",
|
||||
"hash": "QmabLh8TrJ3emfAoQk5AbqbLTbMyj7XqumMFmAFxa9epo8",
|
||||
"name": "go-multistream",
|
||||
"version": "0.3.9"
|
||||
}
|
||||
],
|
||||
"gxVersion": "0.9.0",
|
||||
|
|
57
pubsub.go
57
pubsub.go
|
@ -57,8 +57,14 @@ type PubSub struct {
|
|||
// send subscription here to cancel it
|
||||
cancelCh chan *Subscription
|
||||
|
||||
// a notification channel for incoming streams from other peers
|
||||
newPeers chan inet.Stream
|
||||
// a notification channel for new peer connections
|
||||
newPeers chan peer.ID
|
||||
|
||||
// a notification channel for new outoging peer streams
|
||||
newPeerStream chan inet.Stream
|
||||
|
||||
// a notification channel for errors opening new peer streams
|
||||
newPeerError chan peer.ID
|
||||
|
||||
// a notification channel for when our peers die
|
||||
peerDead chan peer.ID
|
||||
|
@ -151,7 +157,9 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
|||
signKey: h.Peerstore().PrivKey(h.ID()),
|
||||
incoming: make(chan *RPC, 32),
|
||||
publish: make(chan *Message),
|
||||
newPeers: make(chan inet.Stream),
|
||||
newPeers: make(chan peer.ID),
|
||||
newPeerStream: make(chan inet.Stream),
|
||||
newPeerError: make(chan peer.ID),
|
||||
peerDead: make(chan peer.ID),
|
||||
cancelCh: make(chan *Subscription),
|
||||
getPeers: make(chan *listPeerReq),
|
||||
|
@ -259,28 +267,53 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
|||
p.peers = nil
|
||||
p.topics = nil
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case s := <-p.newPeers:
|
||||
pid := s.Conn().RemotePeer()
|
||||
ch, ok := p.peers[pid]
|
||||
case pid := <-p.newPeers:
|
||||
_, ok := p.peers[pid]
|
||||
if ok {
|
||||
log.Error("already have connection to peer: ", pid)
|
||||
close(ch)
|
||||
log.Warning("already have connection to peer: ", pid)
|
||||
continue
|
||||
}
|
||||
|
||||
messages := make(chan *RPC, 32)
|
||||
go p.handleSendingMessages(ctx, s, messages)
|
||||
messages <- p.getHelloPacket()
|
||||
|
||||
go p.handleNewPeer(ctx, pid, messages)
|
||||
p.peers[pid] = messages
|
||||
|
||||
case s := <-p.newPeerStream:
|
||||
pid := s.Conn().RemotePeer()
|
||||
|
||||
_, ok := p.peers[pid]
|
||||
if !ok {
|
||||
log.Warning("new stream for unknown peer: ", pid)
|
||||
s.Reset()
|
||||
continue
|
||||
}
|
||||
|
||||
p.rt.AddPeer(pid, s.Protocol())
|
||||
|
||||
case pid := <-p.newPeerError:
|
||||
delete(p.peers, pid)
|
||||
|
||||
case pid := <-p.peerDead:
|
||||
ch, ok := p.peers[pid]
|
||||
if ok {
|
||||
close(ch)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
close(ch)
|
||||
|
||||
if p.host.Network().Connectedness(pid) == inet.Connected {
|
||||
// still connected, must be a duplicate connection being closed.
|
||||
// we respawn the writer as we need to ensure there is a stream active
|
||||
log.Warning("peer declared dead but still connected; respawning writer: ", pid)
|
||||
messages := make(chan *RPC, 32)
|
||||
messages <- p.getHelloPacket()
|
||||
go p.handleNewPeer(ctx, pid, messages)
|
||||
p.peers[pid] = messages
|
||||
continue
|
||||
}
|
||||
|
||||
delete(p.peers, pid)
|
||||
|
|
Loading…
Reference in New Issue