diff --git a/pubsub.go b/pubsub.go index cc0ce3f..aeb076a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -316,15 +316,18 @@ func (p *PubSub) processLoop(ctx context.Context) { continue } + close(ch) + if p.host.Network().Connectedness(pid) == inet.Connected { // still connected, must be a duplicate connection being closed. // we respawn the writer as we need to ensure there is a stream active log.Warning("peer declared dead but still connected; respawning writer: ", pid) - go p.handleNewPeer(ctx, pid, ch) + messages := make(chan *RPC, 32) + go p.handleNewPeer(ctx, pid, messages) + p.peers[pid] = messages continue } - close(ch) delete(p.peers, pid) for _, t := range p.topics { delete(t, pid)