diff --git a/pubsub.go b/pubsub.go index a769a68..e0db9d0 100644 --- a/pubsub.go +++ b/pubsub.go @@ -3,6 +3,7 @@ package pubsub import ( "context" "encoding/binary" + "errors" "fmt" "math/rand" "sync" @@ -45,6 +46,9 @@ type PubSub struct { disc *discover + // size of the outbound message channel that we maintain for each peer + peerOutboundQueueSize int + // incoming messages from other peers incoming chan *RPC @@ -169,38 +173,39 @@ type Option func(*PubSub) error // NewPubSub returns a new PubSub management object. func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) { ps := &PubSub{ - host: h, - ctx: ctx, - rt: rt, - val: newValidation(), - disc: &discover{}, - signID: h.ID(), - signKey: h.Peerstore().PrivKey(h.ID()), - signStrict: true, - incoming: make(chan *RPC, 32), - publish: make(chan *Message), - newPeers: make(chan peer.ID), - newPeerStream: make(chan network.Stream), - newPeerError: make(chan peer.ID), - peerDead: make(chan peer.ID), - cancelCh: make(chan *Subscription), - getPeers: make(chan *listPeerReq), - addSub: make(chan *addSubReq), - addTopic: make(chan *addTopicReq), - rmTopic: make(chan *rmTopicReq), - getTopics: make(chan *topicReq), - sendMsg: make(chan *Message, 32), - addVal: make(chan *addValReq), - rmVal: make(chan *rmValReq), - eval: make(chan func()), - myTopics: make(map[string]*Topic), - mySubs: make(map[string]map[*Subscription]struct{}), - topics: make(map[string]map[peer.ID]struct{}), - peers: make(map[peer.ID]chan *RPC), - blacklist: NewMapBlacklist(), - blacklistPeer: make(chan peer.ID), - seenMessages: timecache.NewTimeCache(TimeCacheDuration), - counter: uint64(time.Now().UnixNano()), + host: h, + ctx: ctx, + rt: rt, + val: newValidation(), + disc: &discover{}, + peerOutboundQueueSize: 32, + signID: h.ID(), + signKey: h.Peerstore().PrivKey(h.ID()), + signStrict: true, + incoming: make(chan *RPC, 32), + publish: make(chan *Message), + newPeers: make(chan peer.ID), + newPeerStream: make(chan network.Stream), + newPeerError: make(chan peer.ID), + peerDead: make(chan peer.ID), + cancelCh: make(chan *Subscription), + getPeers: make(chan *listPeerReq), + addSub: make(chan *addSubReq), + addTopic: make(chan *addTopicReq), + rmTopic: make(chan *rmTopicReq), + getTopics: make(chan *topicReq), + sendMsg: make(chan *Message, 32), + addVal: make(chan *addValReq), + rmVal: make(chan *rmValReq), + eval: make(chan func()), + myTopics: make(map[string]*Topic), + mySubs: make(map[string]map[*Subscription]struct{}), + topics: make(map[string]map[peer.ID]struct{}), + peers: make(map[peer.ID]chan *RPC), + blacklist: NewMapBlacklist(), + blacklistPeer: make(chan peer.ID), + seenMessages: timecache.NewTimeCache(TimeCacheDuration), + counter: uint64(time.Now().UnixNano()), } for _, opt := range opts { @@ -232,6 +237,18 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option return ps, nil } +// WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer +// We start dropping messages to a peer if the outbound queue if full +func WithPeerOutboundQueueSize(size int) Option { + return func(p *PubSub) error { + if size < 0 { + return errors.New("outbound queue size can't be negative") + } + p.peerOutboundQueueSize = size + return nil + } +} + // WithMessageSigning enables or disables message signing (enabled by default). func WithMessageSigning(enabled bool) Option { return func(p *PubSub) error { @@ -327,7 +344,7 @@ func (p *PubSub) processLoop(ctx context.Context) { continue } - messages := make(chan *RPC, 32) + messages := make(chan *RPC, p.peerOutboundQueueSize) messages <- p.getHelloPacket() go p.handleNewPeer(ctx, pid, messages) p.peers[pid] = messages @@ -366,7 +383,7 @@ func (p *PubSub) processLoop(ctx context.Context) { // still connected, must be a duplicate connection being closed. // we respawn the writer as we need to ensure there is a stream active log.Warning("peer declared dead but still connected; respawning writer: ", pid) - messages := make(chan *RPC, 32) + messages := make(chan *RPC, p.peerOutboundQueueSize) messages <- p.getHelloPacket() go p.handleNewPeer(ctx, pid, messages) p.peers[pid] = messages