prefill outoging channel with hello packet
This commit is contained in:
parent
f82af595dc
commit
f31593e883
18
pubsub.go
18
pubsub.go
|
@ -278,33 +278,20 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
messages := make(chan *RPC, 32)
|
messages := make(chan *RPC, 32)
|
||||||
|
messages <- p.getHelloPacket()
|
||||||
go p.handleNewPeer(ctx, pid, messages)
|
go p.handleNewPeer(ctx, pid, messages)
|
||||||
p.peers[pid] = messages
|
p.peers[pid] = messages
|
||||||
|
|
||||||
case s := <-p.newPeerStream:
|
case s := <-p.newPeerStream:
|
||||||
pid := s.Conn().RemotePeer()
|
pid := s.Conn().RemotePeer()
|
||||||
|
|
||||||
ch, ok := p.peers[pid]
|
_, ok := p.peers[pid]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warning("new stream for unknown peer: ", pid)
|
log.Warning("new stream for unknown peer: ", pid)
|
||||||
s.Reset()
|
s.Reset()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
|
||||||
case ch <- p.getHelloPacket():
|
|
||||||
default:
|
|
||||||
log.Warning("error sending hello packet; buffer full: ", pid)
|
|
||||||
go func() {
|
|
||||||
time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond)
|
|
||||||
select {
|
|
||||||
case p.newPeerStream <- s:
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
p.rt.AddPeer(pid, s.Protocol())
|
p.rt.AddPeer(pid, s.Protocol())
|
||||||
|
|
||||||
case pid := <-p.newPeerError:
|
case pid := <-p.newPeerError:
|
||||||
|
@ -323,6 +310,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||||
// we respawn the writer as we need to ensure there is a stream active
|
// we respawn the writer as we need to ensure there is a stream active
|
||||||
log.Warning("peer declared dead but still connected; respawning writer: ", pid)
|
log.Warning("peer declared dead but still connected; respawning writer: ", pid)
|
||||||
messages := make(chan *RPC, 32)
|
messages := make(chan *RPC, 32)
|
||||||
|
messages <- p.getHelloPacket()
|
||||||
go p.handleNewPeer(ctx, pid, messages)
|
go p.handleNewPeer(ctx, pid, messages)
|
||||||
p.peers[pid] = messages
|
p.peers[pid] = messages
|
||||||
continue
|
continue
|
||||||
|
|
Loading…
Reference in New Issue