diff --git a/floodsub.go b/floodsub.go index a39f87f..e1a1d64 100644 --- a/floodsub.go +++ b/floodsub.go @@ -102,11 +102,10 @@ func (p *PubSub) processLoop(ctx context.Context) { select { case s := <-p.newPeers: pid := s.Conn().RemotePeer() - _, ok := p.peers[pid] + ch, ok := p.peers[pid] if ok { log.Error("already have connection to peer: ", pid) - s.Close() - continue + close(ch) } messages := make(chan *RPC, 32) @@ -116,6 +115,11 @@ func (p *PubSub) processLoop(ctx context.Context) { p.peers[pid] = messages case pid := <-p.peerDead: + ch, ok := p.peers[pid] + if ok { + close(ch) + } + delete(p.peers, pid) for _, t := range p.topics { delete(t, pid) @@ -238,8 +242,12 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { return nil } +func msgID(pmsg *pb.Message) string { + return string(pmsg.GetFrom()) + string(pmsg.GetSeqno()) +} + func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) { - id := string(pmsg.GetFrom()) + string(pmsg.GetSeqno()) + id := msgID(pmsg) if p.seenMessage(id) { return }