From 2621f893e6eda293afac270a48ef031685038c0f Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 13 Dec 2018 16:58:32 +0200 Subject: [PATCH] rework peer tracking logic to handle multiple connections --- comm.go | 27 +++++++++++++------ floodsub_test.go | 4 +-- notify.go | 15 +++++------ pubsub.go | 69 +++++++++++++++++++++++++++++++++++++++--------- 4 files changed, 83 insertions(+), 32 deletions(-) diff --git a/comm.go b/comm.go index b2a9786..9bdcd7c 100644 --- a/comm.go +++ b/comm.go @@ -10,6 +10,7 @@ import ( ggio "github.com/gogo/protobuf/io" proto "github.com/gogo/protobuf/proto" inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" ) // get the initial RPC containing all of our subscriptions to send to new peers @@ -39,10 +40,6 @@ func (p *PubSub) handleNewStream(s inet.Stream) { // but it doesn't hurt to send it. s.Close() } - select { - case p.peerDead <- s.Conn().RemotePeer(): - case <-p.ctx.Done(): - } return } @@ -57,6 +54,24 @@ func (p *PubSub) handleNewStream(s inet.Stream) { } } +func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) { + s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...) + if err != nil { + log.Warning("opening new stream to peer: ", err, pid) + select { + case p.newPeerError <- pid: + case <-ctx.Done(): + } + return + } + + go p.handleSendingMessages(ctx, s, outgoing) + select { + case p.newPeerStream <- s: + case <-ctx.Done(): + } +} + func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgoing <-chan *RPC) { bufw := bufio.NewWriter(s) wc := ggio.NewDelimitedWriter(bufw) @@ -82,10 +97,6 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo if err != nil { s.Reset() log.Infof("writing message to %s: %s", s.Conn().RemotePeer(), err) - select { - case p.peerDead <- s.Conn().RemotePeer(): - case <-ctx.Done(): - } return } case <-ctx.Done(): diff --git a/floodsub_test.go b/floodsub_test.go index df01076..3bf446d 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -874,9 +874,7 @@ func TestPeerDisconnect(t *testing.T) { peers := psubs[0].ListPeers("foo") assertPeerList(t, peers, hosts[1].ID()) for _, c := range hosts[1].Network().ConnsToPeer(hosts[0].ID()) { - for _, s := range c.GetStreams() { - s.Close() - } + c.Close() } time.Sleep(time.Millisecond * 10) diff --git a/notify.go b/notify.go index 7abe75f..efe89b7 100644 --- a/notify.go +++ b/notify.go @@ -17,21 +17,20 @@ func (p *PubSubNotif) ClosedStream(n inet.Network, s inet.Stream) { func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn) { go func() { - s, err := p.host.NewStream(p.ctx, c.RemotePeer(), p.rt.Protocols()...) - if err != nil { - log.Warning("opening new stream to peer: ", err, c.LocalPeer(), c.RemotePeer()) - return - } - select { - case p.newPeers <- s: + case p.newPeers <- c.RemotePeer(): case <-p.ctx.Done(): - s.Reset() } }() } func (p *PubSubNotif) Disconnected(n inet.Network, c inet.Conn) { + go func() { + select { + case p.peerDead <- c.RemotePeer(): + case <-p.ctx.Done(): + } + }() } func (p *PubSubNotif) Listen(n inet.Network, _ ma.Multiaddr) { diff --git a/pubsub.go b/pubsub.go index d90249f..a59b388 100644 --- a/pubsub.go +++ b/pubsub.go @@ -57,8 +57,14 @@ type PubSub struct { // send subscription here to cancel it cancelCh chan *Subscription - // a notification channel for incoming streams from other peers - newPeers chan inet.Stream + // a notification channel for new peer connections + newPeers chan peer.ID + + // a notification channel for new outoging peer streams + newPeerStream chan inet.Stream + + // a notification channel for errors opening new peer streams + newPeerError chan peer.ID // a notification channel for when our peers die peerDead chan peer.ID @@ -151,7 +157,9 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option signKey: h.Peerstore().PrivKey(h.ID()), incoming: make(chan *RPC, 32), publish: make(chan *Message), - newPeers: make(chan inet.Stream), + newPeers: make(chan peer.ID), + newPeerStream: make(chan inet.Stream), + newPeerError: make(chan peer.ID), peerDead: make(chan peer.ID), cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), @@ -259,30 +267,65 @@ func (p *PubSub) processLoop(ctx context.Context) { p.peers = nil p.topics = nil }() + for { select { - case s := <-p.newPeers: - pid := s.Conn().RemotePeer() - ch, ok := p.peers[pid] + case pid := <-p.newPeers: + _, ok := p.peers[pid] if ok { - log.Error("already have connection to peer: ", pid) - close(ch) + log.Warning("already have connection to peer: ", pid) + continue } messages := make(chan *RPC, 32) - go p.handleSendingMessages(ctx, s, messages) - messages <- p.getHelloPacket() - + go p.handleNewPeer(ctx, pid, messages) p.peers[pid] = messages + case s := <-p.newPeerStream: + pid := s.Conn().RemotePeer() + + ch, ok := p.peers[pid] + if !ok { + log.Warning("new stream for unknown peer: ", pid) + s.Reset() + continue + } + + select { + case ch <- p.getHelloPacket(): + default: + log.Warning("error sending hello packet; buffer full: ", pid) + go func() { + time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond) + select { + case p.newPeerStream <- s: + case <-ctx.Done(): + } + }() + continue + } + p.rt.AddPeer(pid, s.Protocol()) + case pid := <-p.newPeerError: + delete(p.peers, pid) + case pid := <-p.peerDead: ch, ok := p.peers[pid] - if ok { - close(ch) + if !ok { + continue } + if p.host.Network().Connectedness(pid) == inet.Connected { + // still connected, must be a duplicate connection being closed. + // we respawn the writer as we need to ensure there is at leat one active + // at worst we can end with two writers pushing messages from the same channel. + log.Warning("peer declared dead but still connected; respawning writer: ", pid) + go p.handleNewPeer(ctx, pid, ch) + continue + } + + close(ch) delete(p.peers, pid) for _, t := range p.topics { delete(t, pid)