diff --git a/comm.go b/comm.go index b2a9786..48b620c 100644 --- a/comm.go +++ b/comm.go @@ -10,6 +10,8 @@ import ( ggio "github.com/gogo/protobuf/io" proto "github.com/gogo/protobuf/proto" inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + ms "github.com/multiformats/go-multistream" ) // get the initial RPC containing all of our subscriptions to send to new peers @@ -39,10 +41,6 @@ func (p *PubSub) handleNewStream(s inet.Stream) { // but it doesn't hurt to send it. s.Close() } - select { - case p.peerDead <- s.Conn().RemotePeer(): - case <-p.ctx.Done(): - } return } @@ -57,6 +55,49 @@ func (p *PubSub) handleNewStream(s inet.Stream) { } } +func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) { + s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...) + if err != nil { + log.Warning("opening new stream to peer: ", err, pid) + + var ch chan peer.ID + if err == ms.ErrNotSupported { + ch = p.newPeerError + } else { + ch = p.peerDead + } + + select { + case ch <- pid: + case <-ctx.Done(): + } + return + } + + go p.handleSendingMessages(ctx, s, outgoing) + go p.handlePeerEOF(ctx, s) + select { + case p.newPeerStream <- s: + case <-ctx.Done(): + } +} + +func (p *PubSub) handlePeerEOF(ctx context.Context, s inet.Stream) { + r := ggio.NewDelimitedReader(s, 1<<20) + rpc := new(RPC) + for { + err := r.ReadMsg(&rpc.RPC) + if err != nil { + select { + case p.peerDead <- s.Conn().RemotePeer(): + case <-ctx.Done(): + } + return + } + log.Warning("unexpected message from ", s.Conn().RemotePeer()) + } +} + func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgoing <-chan *RPC) { bufw := bufio.NewWriter(s) wc := ggio.NewDelimitedWriter(bufw) @@ -82,10 +123,6 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo if err != nil { s.Reset() log.Infof("writing message to %s: %s", s.Conn().RemotePeer(), err) - select { - case p.peerDead <- s.Conn().RemotePeer(): - case <-ctx.Done(): - } return } case <-ctx.Done(): diff --git a/floodsub_test.go b/floodsub_test.go index df01076..3bf446d 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -874,9 +874,7 @@ func TestPeerDisconnect(t *testing.T) { peers := psubs[0].ListPeers("foo") assertPeerList(t, peers, hosts[1].ID()) for _, c := range hosts[1].Network().ConnsToPeer(hosts[0].ID()) { - for _, s := range c.GetStreams() { - s.Close() - } + c.Close() } time.Sleep(time.Millisecond * 10) diff --git a/notify.go b/notify.go index 7abe75f..35f02cc 100644 --- a/notify.go +++ b/notify.go @@ -17,16 +17,9 @@ func (p *PubSubNotif) ClosedStream(n inet.Network, s inet.Stream) { func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn) { go func() { - s, err := p.host.NewStream(p.ctx, c.RemotePeer(), p.rt.Protocols()...) - if err != nil { - log.Warning("opening new stream to peer: ", err, c.LocalPeer(), c.RemotePeer()) - return - } - select { - case p.newPeers <- s: + case p.newPeers <- c.RemotePeer(): case <-p.ctx.Done(): - s.Reset() } }() } diff --git a/package.json b/package.json index 984f80b..d1db2e2 100644 --- a/package.json +++ b/package.json @@ -71,6 +71,12 @@ "hash": "QmNiJiXwWE3kRhZrC5ej3kSjWHm337pYfhjLGSCDNKJP2s", "name": "go-libp2p-crypto", "version": "2.0.4" + }, + { + "author": "whyrusleeping", + "hash": "QmabLh8TrJ3emfAoQk5AbqbLTbMyj7XqumMFmAFxa9epo8", + "name": "go-multistream", + "version": "0.3.9" } ], "gxVersion": "0.9.0", diff --git a/pubsub.go b/pubsub.go index d90249f..1f396e8 100644 --- a/pubsub.go +++ b/pubsub.go @@ -57,8 +57,14 @@ type PubSub struct { // send subscription here to cancel it cancelCh chan *Subscription - // a notification channel for incoming streams from other peers - newPeers chan inet.Stream + // a notification channel for new peer connections + newPeers chan peer.ID + + // a notification channel for new outoging peer streams + newPeerStream chan inet.Stream + + // a notification channel for errors opening new peer streams + newPeerError chan peer.ID // a notification channel for when our peers die peerDead chan peer.ID @@ -151,7 +157,9 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option signKey: h.Peerstore().PrivKey(h.ID()), incoming: make(chan *RPC, 32), publish: make(chan *Message), - newPeers: make(chan inet.Stream), + newPeers: make(chan peer.ID), + newPeerStream: make(chan inet.Stream), + newPeerError: make(chan peer.ID), peerDead: make(chan peer.ID), cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), @@ -259,28 +267,53 @@ func (p *PubSub) processLoop(ctx context.Context) { p.peers = nil p.topics = nil }() + for { select { - case s := <-p.newPeers: - pid := s.Conn().RemotePeer() - ch, ok := p.peers[pid] + case pid := <-p.newPeers: + _, ok := p.peers[pid] if ok { - log.Error("already have connection to peer: ", pid) - close(ch) + log.Warning("already have connection to peer: ", pid) + continue } messages := make(chan *RPC, 32) - go p.handleSendingMessages(ctx, s, messages) messages <- p.getHelloPacket() - + go p.handleNewPeer(ctx, pid, messages) p.peers[pid] = messages + case s := <-p.newPeerStream: + pid := s.Conn().RemotePeer() + + _, ok := p.peers[pid] + if !ok { + log.Warning("new stream for unknown peer: ", pid) + s.Reset() + continue + } + p.rt.AddPeer(pid, s.Protocol()) + case pid := <-p.newPeerError: + delete(p.peers, pid) + case pid := <-p.peerDead: ch, ok := p.peers[pid] - if ok { - close(ch) + if !ok { + continue + } + + close(ch) + + if p.host.Network().Connectedness(pid) == inet.Connected { + // still connected, must be a duplicate connection being closed. + // we respawn the writer as we need to ensure there is a stream active + log.Warning("peer declared dead but still connected; respawning writer: ", pid) + messages := make(chan *RPC, 32) + messages <- p.getHelloPacket() + go p.handleNewPeer(ctx, pid, messages) + p.peers[pid] = messages + continue } delete(p.peers, pid)