fix handling of new streams on peer reconnects

This commit is contained in:
Jeromy 2016-07-27 00:35:41 -07:00
parent e4bd01ce63
commit 6271c7f8b8
1 changed files with 12 additions and 4 deletions

View File

@ -102,11 +102,10 @@ func (p *PubSub) processLoop(ctx context.Context) {
select { select {
case s := <-p.newPeers: case s := <-p.newPeers:
pid := s.Conn().RemotePeer() pid := s.Conn().RemotePeer()
_, ok := p.peers[pid] ch, ok := p.peers[pid]
if ok { if ok {
log.Error("already have connection to peer: ", pid) log.Error("already have connection to peer: ", pid)
s.Close() close(ch)
continue
} }
messages := make(chan *RPC, 32) messages := make(chan *RPC, 32)
@ -116,6 +115,11 @@ func (p *PubSub) processLoop(ctx context.Context) {
p.peers[pid] = messages p.peers[pid] = messages
case pid := <-p.peerDead: case pid := <-p.peerDead:
ch, ok := p.peers[pid]
if ok {
close(ch)
}
delete(p.peers, pid) delete(p.peers, pid)
for _, t := range p.topics { for _, t := range p.topics {
delete(t, pid) delete(t, pid)
@ -238,8 +242,12 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
return nil return nil
} }
func msgID(pmsg *pb.Message) string {
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
}
func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) { func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) {
id := string(pmsg.GetFrom()) + string(pmsg.GetSeqno()) id := msgID(pmsg)
if p.seenMessage(id) { if p.seenMessage(id) {
return return
} }