Merge pull request #4 from libp2p/fix/reconnects
fix handling of new streams on peer reconnects
This commit is contained in:
commit
c61ae3500c
16
floodsub.go
16
floodsub.go
|
@ -102,11 +102,10 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
|||
select {
|
||||
case s := <-p.newPeers:
|
||||
pid := s.Conn().RemotePeer()
|
||||
_, ok := p.peers[pid]
|
||||
ch, ok := p.peers[pid]
|
||||
if ok {
|
||||
log.Error("already have connection to peer: ", pid)
|
||||
s.Close()
|
||||
continue
|
||||
close(ch)
|
||||
}
|
||||
|
||||
messages := make(chan *RPC, 32)
|
||||
|
@ -116,6 +115,11 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
|||
p.peers[pid] = messages
|
||||
|
||||
case pid := <-p.peerDead:
|
||||
ch, ok := p.peers[pid]
|
||||
if ok {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
delete(p.peers, pid)
|
||||
for _, t := range p.topics {
|
||||
delete(t, pid)
|
||||
|
@ -238,8 +242,12 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func msgID(pmsg *pb.Message) string {
|
||||
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
|
||||
}
|
||||
|
||||
func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) {
|
||||
id := string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
|
||||
id := msgID(pmsg)
|
||||
if p.seenMessage(id) {
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue