diff --git a/pubsub.go b/pubsub.go index 9836614..185386b 100644 --- a/pubsub.go +++ b/pubsub.go @@ -1150,6 +1150,16 @@ func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) return topicHandle.Subscribe(opts...) } +// WithBufferSize is a Subscribe option to customize the size of the subscribe output buffer. +// The default length is 32 but it can be configured to avoid dropping messages if the consumer is not reading fast +// enough. +func WithBufferSize(size int) SubOpt { + return func(sub *Subscription) error { + sub.ch = make(chan *Message, size) + return nil + } +} + type topicReq struct { resp chan []string } diff --git a/topic.go b/topic.go index a7d0bbd..6460782 100644 --- a/topic.go +++ b/topic.go @@ -141,7 +141,6 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) { sub := &Subscription{ topic: t.topic, - ch: make(chan *Message, 32), ctx: t.p.ctx, } @@ -152,6 +151,11 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) { } } + if sub.ch == nil { + // apply the default size + sub.ch = make(chan *Message, 32) + } + out := make(chan *Subscription, 1) t.p.disc.Discover(sub.topic)