add ctx to sub.Next for cancellation

This commit is contained in:
Jan Winkelmann 2016-11-17 11:27:57 +01:00
parent 539e4b6b45
commit 25b8aad61f
2 changed files with 16 additions and 9 deletions

View File

@ -127,7 +127,7 @@ func TestBasicFloodsub(t *testing.T) {
psubs[owner].Publish("foobar", msg) psubs[owner].Publish("foobar", msg)
for _, sub := range msgs { for _, sub := range msgs {
got, err := sub.Next() got, err := sub.Next(ctx)
if err != nil { if err != nil {
t.Fatal(sub.err) t.Fatal(sub.err)
} }
@ -525,7 +525,7 @@ func TestSubscribeMultipleTimes(t *testing.T) {
psubs[1].Publish("foo", []byte("bar")) psubs[1].Publish("foo", []byte("bar"))
msg, err := sub1.Next() msg, err := sub1.Next(ctx)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v.", err) t.Fatalf("unexpected error: %v.", err)
} }
@ -536,7 +536,7 @@ func TestSubscribeMultipleTimes(t *testing.T) {
t.Fatalf("data is %s, expected %s.", data, "bar") t.Fatalf("data is %s, expected %s.", data, "bar")
} }
msg, err = sub2.Next() msg, err = sub2.Next(ctx)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v.", err) t.Fatalf("unexpected error: %v.", err)
} }

View File

@ -1,5 +1,9 @@
package floodsub package floodsub
import (
"context"
)
type Subscription struct { type Subscription struct {
topic string topic string
ch chan *Message ch chan *Message
@ -11,14 +15,17 @@ func (sub *Subscription) Topic() string {
return sub.topic return sub.topic
} }
func (sub *Subscription) Next() (*Message, error) { func (sub *Subscription) Next(ctx context.Context) (*Message, error) {
msg, ok := <-sub.ch select {
case msg, ok := <-sub.ch:
if !ok {
return msg, sub.err
}
if !ok { return msg, nil
return msg, sub.err case <-ctx.Done():
return nil, ctx.Err()
} }
return msg, nil
} }
func (sub *Subscription) Cancel() { func (sub *Subscription) Cancel() {