mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-03 21:33:07 +00:00
rework peer tracking logic to handle multiple connections
This commit is contained in:
parent
c7528f3c99
commit
2621f893e6
27
comm.go
27
comm.go
@ -10,6 +10,7 @@ import (
|
||||
ggio "github.com/gogo/protobuf/io"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
inet "github.com/libp2p/go-libp2p-net"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
)
|
||||
|
||||
// get the initial RPC containing all of our subscriptions to send to new peers
|
||||
@ -39,10 +40,6 @@ func (p *PubSub) handleNewStream(s inet.Stream) {
|
||||
// but it doesn't hurt to send it.
|
||||
s.Close()
|
||||
}
|
||||
select {
|
||||
case p.peerDead <- s.Conn().RemotePeer():
|
||||
case <-p.ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -57,6 +54,24 @@ func (p *PubSub) handleNewStream(s inet.Stream) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
|
||||
s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...)
|
||||
if err != nil {
|
||||
log.Warning("opening new stream to peer: ", err, pid)
|
||||
select {
|
||||
case p.newPeerError <- pid:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
go p.handleSendingMessages(ctx, s, outgoing)
|
||||
select {
|
||||
case p.newPeerStream <- s:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgoing <-chan *RPC) {
|
||||
bufw := bufio.NewWriter(s)
|
||||
wc := ggio.NewDelimitedWriter(bufw)
|
||||
@ -82,10 +97,6 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
log.Infof("writing message to %s: %s", s.Conn().RemotePeer(), err)
|
||||
select {
|
||||
case p.peerDead <- s.Conn().RemotePeer():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -874,9 +874,7 @@ func TestPeerDisconnect(t *testing.T) {
|
||||
peers := psubs[0].ListPeers("foo")
|
||||
assertPeerList(t, peers, hosts[1].ID())
|
||||
for _, c := range hosts[1].Network().ConnsToPeer(hosts[0].ID()) {
|
||||
for _, s := range c.GetStreams() {
|
||||
s.Close()
|
||||
}
|
||||
c.Close()
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
|
||||
15
notify.go
15
notify.go
@ -17,21 +17,20 @@ func (p *PubSubNotif) ClosedStream(n inet.Network, s inet.Stream) {
|
||||
|
||||
func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn) {
|
||||
go func() {
|
||||
s, err := p.host.NewStream(p.ctx, c.RemotePeer(), p.rt.Protocols()...)
|
||||
if err != nil {
|
||||
log.Warning("opening new stream to peer: ", err, c.LocalPeer(), c.RemotePeer())
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case p.newPeers <- s:
|
||||
case p.newPeers <- c.RemotePeer():
|
||||
case <-p.ctx.Done():
|
||||
s.Reset()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *PubSubNotif) Disconnected(n inet.Network, c inet.Conn) {
|
||||
go func() {
|
||||
select {
|
||||
case p.peerDead <- c.RemotePeer():
|
||||
case <-p.ctx.Done():
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *PubSubNotif) Listen(n inet.Network, _ ma.Multiaddr) {
|
||||
|
||||
69
pubsub.go
69
pubsub.go
@ -57,8 +57,14 @@ type PubSub struct {
|
||||
// send subscription here to cancel it
|
||||
cancelCh chan *Subscription
|
||||
|
||||
// a notification channel for incoming streams from other peers
|
||||
newPeers chan inet.Stream
|
||||
// a notification channel for new peer connections
|
||||
newPeers chan peer.ID
|
||||
|
||||
// a notification channel for new outoging peer streams
|
||||
newPeerStream chan inet.Stream
|
||||
|
||||
// a notification channel for errors opening new peer streams
|
||||
newPeerError chan peer.ID
|
||||
|
||||
// a notification channel for when our peers die
|
||||
peerDead chan peer.ID
|
||||
@ -151,7 +157,9 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||
signKey: h.Peerstore().PrivKey(h.ID()),
|
||||
incoming: make(chan *RPC, 32),
|
||||
publish: make(chan *Message),
|
||||
newPeers: make(chan inet.Stream),
|
||||
newPeers: make(chan peer.ID),
|
||||
newPeerStream: make(chan inet.Stream),
|
||||
newPeerError: make(chan peer.ID),
|
||||
peerDead: make(chan peer.ID),
|
||||
cancelCh: make(chan *Subscription),
|
||||
getPeers: make(chan *listPeerReq),
|
||||
@ -259,30 +267,65 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||
p.peers = nil
|
||||
p.topics = nil
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case s := <-p.newPeers:
|
||||
pid := s.Conn().RemotePeer()
|
||||
ch, ok := p.peers[pid]
|
||||
case pid := <-p.newPeers:
|
||||
_, ok := p.peers[pid]
|
||||
if ok {
|
||||
log.Error("already have connection to peer: ", pid)
|
||||
close(ch)
|
||||
log.Warning("already have connection to peer: ", pid)
|
||||
continue
|
||||
}
|
||||
|
||||
messages := make(chan *RPC, 32)
|
||||
go p.handleSendingMessages(ctx, s, messages)
|
||||
messages <- p.getHelloPacket()
|
||||
|
||||
go p.handleNewPeer(ctx, pid, messages)
|
||||
p.peers[pid] = messages
|
||||
|
||||
case s := <-p.newPeerStream:
|
||||
pid := s.Conn().RemotePeer()
|
||||
|
||||
ch, ok := p.peers[pid]
|
||||
if !ok {
|
||||
log.Warning("new stream for unknown peer: ", pid)
|
||||
s.Reset()
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- p.getHelloPacket():
|
||||
default:
|
||||
log.Warning("error sending hello packet; buffer full: ", pid)
|
||||
go func() {
|
||||
time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond)
|
||||
select {
|
||||
case p.newPeerStream <- s:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
continue
|
||||
}
|
||||
|
||||
p.rt.AddPeer(pid, s.Protocol())
|
||||
|
||||
case pid := <-p.newPeerError:
|
||||
delete(p.peers, pid)
|
||||
|
||||
case pid := <-p.peerDead:
|
||||
ch, ok := p.peers[pid]
|
||||
if ok {
|
||||
close(ch)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if p.host.Network().Connectedness(pid) == inet.Connected {
|
||||
// still connected, must be a duplicate connection being closed.
|
||||
// we respawn the writer as we need to ensure there is at leat one active
|
||||
// at worst we can end with two writers pushing messages from the same channel.
|
||||
log.Warning("peer declared dead but still connected; respawning writer: ", pid)
|
||||
go p.handleNewPeer(ctx, pid, ch)
|
||||
continue
|
||||
}
|
||||
|
||||
close(ch)
|
||||
delete(p.peers, pid)
|
||||
for _, t := range p.topics {
|
||||
delete(t, pid)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user