diff --git a/main.go b/main.go index e1f8b9ec..5be96605 100644 --- a/main.go +++ b/main.go @@ -49,7 +49,7 @@ func main() { } fmt.Println("Received message:", string(payload)) - //sub.Unsubscribe() + // sub.Unsubscribe() } } }() @@ -57,7 +57,8 @@ func main() { // Write loop go func() { for { - time.Sleep(2 * time.Second) + time.Sleep(1 * time.Second) + var contentTopic uint32 = 1 var version uint32 = 0 diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 23a02a76..f283dab4 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -8,6 +8,7 @@ import ( "log" "net" "sync" + "time" proto "github.com/golang/protobuf/proto" "github.com/libp2p/go-libp2p" @@ -181,6 +182,9 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { subscription.quit = make(chan struct{}) go func(ctx context.Context, sub *pubsub.Subscription) { + nextMsgTicker := time.NewTicker(time.Millisecond * 10) + defer nextMsgTicker.Stop() + for { select { case <-subscription.quit: @@ -189,10 +193,11 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { close(subscription.C) subscription.closed = true return - default: + case <-nextMsgTicker.C: msg, err := sub.Next(ctx) if err != nil { + fmt.Println("Error receiving message", err) return // Should close channel? }