From 6271c7f8b817ce62e6457faa7c4dec1ee6e1275a Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 27 Jul 2016 00:35:41 -0700 Subject: [PATCH] fix handling of new streams on peer reconnects --- floodsub.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/floodsub.go b/floodsub.go index a39f87f..e1a1d64 100644 --- a/floodsub.go +++ b/floodsub.go @@ -102,11 +102,10 @@ func (p *PubSub) processLoop(ctx context.Context) { select { case s := <-p.newPeers: pid := s.Conn().RemotePeer() - _, ok := p.peers[pid] + ch, ok := p.peers[pid] if ok { log.Error("already have connection to peer: ", pid) - s.Close() - continue + close(ch) } messages := make(chan *RPC, 32) @@ -116,6 +115,11 @@ func (p *PubSub) processLoop(ctx context.Context) { p.peers[pid] = messages case pid := <-p.peerDead: + ch, ok := p.peers[pid] + if ok { + close(ch) + } + delete(p.peers, pid) for _, t := range p.topics { delete(t, pid) @@ -238,8 +242,12 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { return nil } +func msgID(pmsg *pb.Message) string { + return string(pmsg.GetFrom()) + string(pmsg.GetSeqno()) +} + func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) { - id := string(pmsg.GetFrom()) + string(pmsg.GetSeqno()) + id := msgID(pmsg) if p.seenMessage(id) { return }