Configurable outbound peer queue sizes

This commit is contained in:
Aarsh Shah 2019-11-16 01:14:10 +08:00
parent 534fe2f382
commit d380477228

View File

@ -3,6 +3,7 @@ package pubsub
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"sync" "sync"
@ -45,6 +46,9 @@ type PubSub struct {
disc *discover disc *discover
// size of the outbound message channel that we maintain for each peer
peerOutboundQueueSize int
// incoming messages from other peers // incoming messages from other peers
incoming chan *RPC incoming chan *RPC
@ -174,6 +178,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
rt: rt, rt: rt,
val: newValidation(), val: newValidation(),
disc: &discover{}, disc: &discover{},
peerOutboundQueueSize: 32,
signID: h.ID(), signID: h.ID(),
signKey: h.Peerstore().PrivKey(h.ID()), signKey: h.Peerstore().PrivKey(h.ID()),
signStrict: true, signStrict: true,
@ -232,6 +237,18 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
return ps, nil return ps, nil
} }
// WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer
// We start dropping messages to a peer if the outbound queue if full
func WithPeerOutboundQueueSize(size int) Option {
return func(p *PubSub) error {
if size < 0 {
return errors.New("outbound queue size can't be negative")
}
p.peerOutboundQueueSize = size
return nil
}
}
// WithMessageSigning enables or disables message signing (enabled by default). // WithMessageSigning enables or disables message signing (enabled by default).
func WithMessageSigning(enabled bool) Option { func WithMessageSigning(enabled bool) Option {
return func(p *PubSub) error { return func(p *PubSub) error {
@ -327,7 +344,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
continue continue
} }
messages := make(chan *RPC, 32) messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket() messages <- p.getHelloPacket()
go p.handleNewPeer(ctx, pid, messages) go p.handleNewPeer(ctx, pid, messages)
p.peers[pid] = messages p.peers[pid] = messages
@ -366,7 +383,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
// still connected, must be a duplicate connection being closed. // still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is a stream active // we respawn the writer as we need to ensure there is a stream active
log.Warning("peer declared dead but still connected; respawning writer: ", pid) log.Warning("peer declared dead but still connected; respawning writer: ", pid)
messages := make(chan *RPC, 32) messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket() messages <- p.getHelloPacket()
go p.handleNewPeer(ctx, pid, messages) go p.handleNewPeer(ctx, pid, messages)
p.peers[pid] = messages p.peers[pid] = messages