add an option to configure the Subscription output queue length

This commit is contained in:
Michael Muré 2021-07-12 21:56:55 +02:00 committed by Adin Schmahmann
parent c02d4c7179
commit 3c7689d482
2 changed files with 15 additions and 1 deletions

View File

@ -1150,6 +1150,16 @@ func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)
return topicHandle.Subscribe(opts...)
}
// WithBufferSize is a Subscribe option to customize the size of the subscribe output buffer.
// The default length is 32 but it can be configured to avoid dropping messages if the consumer is not reading fast
// enough.
func WithBufferSize(size int) SubOpt {
return func(sub *Subscription) error {
sub.ch = make(chan *Message, size)
return nil
}
}
type topicReq struct {
resp chan []string
}

View File

@ -141,7 +141,6 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
sub := &Subscription{
topic: t.topic,
ch: make(chan *Message, 32),
ctx: t.p.ctx,
}
@ -152,6 +151,11 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
}
}
if sub.ch == nil {
// apply the default size
sub.ch = make(chan *Message, 32)
}
out := make(chan *Subscription, 1)
t.p.disc.Discover(sub.topic)