diff --git a/go.mod b/go.mod index 7e948001..3a181efc 100644 --- a/go.mod +++ b/go.mod @@ -154,7 +154,7 @@ require ( github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multicodec v0.9.0 // indirect github.com/multiformats/go-multihash v0.2.3 // indirect - github.com/multiformats/go-multistream v0.5.0 + github.com/multiformats/go-multistream v0.5.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/opencontainers/runtime-spec v1.2.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 diff --git a/waku/v2/api/filter.go b/waku/v2/api/filter/filter.go similarity index 99% rename from waku/v2/api/filter.go rename to waku/v2/api/filter/filter.go index 1f9ea6be..6bd041e6 100644 --- a/waku/v2/api/filter.go +++ b/waku/v2/api/filter/filter.go @@ -1,4 +1,4 @@ -package api +package filter import ( "context" diff --git a/waku/v2/api/filter_test.go b/waku/v2/api/filter/filter_test.go similarity index 99% rename from waku/v2/api/filter_test.go rename to waku/v2/api/filter/filter_test.go index ff22fb75..af976a69 100644 --- a/waku/v2/api/filter_test.go +++ b/waku/v2/api/filter/filter_test.go @@ -1,4 +1,4 @@ -package api +package filter import ( "context" diff --git a/waku/v2/api/publish/common.go b/waku/v2/api/publish/common.go new file mode 100644 index 00000000..be72f4c1 --- /dev/null +++ b/waku/v2/api/publish/common.go @@ -0,0 +1,9 @@ +package publish + +import ( + "github.com/waku-org/go-waku/waku/v2/protocol" + "go.uber.org/zap" +) + +// PublishFn represents a function that will publish a message. +type PublishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error diff --git a/waku/v2/api/publish/message_queue.go b/waku/v2/api/publish/message_queue.go new file mode 100644 index 00000000..fbd79df8 --- /dev/null +++ b/waku/v2/api/publish/message_queue.go @@ -0,0 +1,156 @@ +package publish + +import ( + "container/heap" + "context" + + "github.com/waku-org/go-waku/waku/v2/protocol" +) + +// MessagePriority determines the ordering for the message priority queue +type MessagePriority = int + +const ( + LowPriority MessagePriority = 1 + NormalPriority MessagePriority = 2 + HighPriority MessagePriority = 3 +) + +type envelopePriority struct { + envelope *protocol.Envelope + priority int + index int +} + +type envelopePriorityQueue []*envelopePriority + +func (pq envelopePriorityQueue) Len() int { return len(pq) } + +func (pq envelopePriorityQueue) Less(i, j int) bool { + if pq[i].priority > pq[j].priority { + return true + } else if pq[i].priority == pq[j].priority { + return pq[i].envelope.Message().GetTimestamp() < pq[j].envelope.Message().GetTimestamp() + } + + return false +} + +func (pq envelopePriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *envelopePriorityQueue) Push(x any) { + n := len(*pq) + item := x.(*envelopePriority) + item.index = n + *pq = append(*pq, item) +} + +func (pq *envelopePriorityQueue) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// MessageQueue is a structure used to handle the ordering of the messages to publish +type MessageQueue struct { + usePriorityQueue bool + + toSendChan chan *protocol.Envelope + throttledPrioritySendQueue chan *envelopePriority + envelopeAvailableOnPriorityQueueSignal chan struct{} + envelopePriorityQueue envelopePriorityQueue +} + +// NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a +// priority queue to handle the ordering of the messages, or use a simple FIFO queue. +func NewMessageQueue(bufferSize int, usePriorityQueue bool) *MessageQueue { + m := &MessageQueue{ + usePriorityQueue: usePriorityQueue, + } + + if m.usePriorityQueue { + m.envelopePriorityQueue = make(envelopePriorityQueue, 0) + m.throttledPrioritySendQueue = make(chan *envelopePriority, bufferSize) + m.envelopeAvailableOnPriorityQueueSignal = make(chan struct{}, bufferSize) + heap.Init(&m.envelopePriorityQueue) + } else { + m.toSendChan = make(chan *protocol.Envelope, bufferSize) + } + + return m +} + +// Start must be called to handle the lifetime of the internals of the message queue +func (m *MessageQueue) Start(ctx context.Context) { + + for { + select { + case envelopePriority, ok := <-m.throttledPrioritySendQueue: + if !ok { + continue + } + + heap.Push(&m.envelopePriorityQueue, envelopePriority) + + m.envelopeAvailableOnPriorityQueueSignal <- struct{}{} + + case <-ctx.Done(): + if m.usePriorityQueue { + close(m.throttledPrioritySendQueue) + close(m.envelopeAvailableOnPriorityQueueSignal) + } else { + close(m.toSendChan) + } + return + } + } +} + +// Push an envelope into the message queue. The priority is optional, and will be ignored +// if the message queue does not use a priority queue +func (m *MessageQueue) Push(envelope *protocol.Envelope, priority ...MessagePriority) { + if m.usePriorityQueue { + msgPriority := NormalPriority + if len(priority) != 0 { + msgPriority = priority[0] + } + + m.throttledPrioritySendQueue <- &envelopePriority{ + envelope: envelope, + priority: msgPriority, + } + } else { + m.toSendChan <- envelope + } +} + +// Pop will return a channel on which a message can be retrieved from the message queue +func (m *MessageQueue) Pop() <-chan *protocol.Envelope { + ch := make(chan *protocol.Envelope) + + go func() { + select { + case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal: + if ok { + ch <- heap.Pop(&m.envelopePriorityQueue).(*envelopePriority).envelope + } + + case envelope, ok := <-m.toSendChan: + if ok { + ch <- envelope + } + } + + close(ch) + }() + + return ch +} diff --git a/waku/v2/api/publish/message_queue_test.go b/waku/v2/api/publish/message_queue_test.go new file mode 100644 index 00000000..15761c57 --- /dev/null +++ b/waku/v2/api/publish/message_queue_test.go @@ -0,0 +1,91 @@ +package publish + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "google.golang.org/protobuf/proto" +) + +func TestFifoQueue(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + queue := NewMessageQueue(10, false) + go queue.Start(ctx) + + queue.Push(protocol.NewEnvelope(&pb.WakuMessage{}, 0, "A")) + queue.Push(protocol.NewEnvelope(&pb.WakuMessage{}, 0, "B")) + queue.Push(protocol.NewEnvelope(&pb.WakuMessage{}, 0, "C")) + + envelope, ok := <-queue.Pop() + require.True(t, ok) + require.Equal(t, "A", envelope.PubsubTopic()) + + envelope, ok = <-queue.Pop() + require.True(t, ok) + require.Equal(t, "B", envelope.PubsubTopic()) + + envelope, ok = <-queue.Pop() + require.True(t, ok) + require.Equal(t, "C", envelope.PubsubTopic()) + + cancel() + + _, ok = <-queue.Pop() + require.False(t, ok) +} + +func TestPriorityQueue(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + queue := NewMessageQueue(10, true) + go queue.Start(ctx) + + queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(0)}, 0, "A"), LowPriority) + queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(1)}, 0, "B"), LowPriority) + queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(2)}, 0, "C"), HighPriority) + queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(3)}, 0, "D"), NormalPriority) + queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(4)}, 0, "E"), HighPriority) + queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(5)}, 0, "F"), LowPriority) + queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(6)}, 0, "G"), NormalPriority) + + time.Sleep(2 * time.Second) + + envelope, ok := <-queue.Pop() + require.True(t, ok) + require.Equal(t, "C", envelope.PubsubTopic()) + + envelope, ok = <-queue.Pop() + require.True(t, ok) + require.Equal(t, "E", envelope.PubsubTopic()) + + envelope, ok = <-queue.Pop() + require.True(t, ok) + require.Equal(t, "D", envelope.PubsubTopic()) + + envelope, ok = <-queue.Pop() + require.True(t, ok) + require.Equal(t, "G", envelope.PubsubTopic()) + + envelope, ok = <-queue.Pop() + require.True(t, ok) + require.Equal(t, "A", envelope.PubsubTopic()) + + envelope, ok = <-queue.Pop() + require.True(t, ok) + require.Equal(t, "B", envelope.PubsubTopic()) + + envelope, ok = <-queue.Pop() + require.True(t, ok) + require.Equal(t, "F", envelope.PubsubTopic()) + + cancel() + + _, ok = <-queue.Pop() + require.False(t, ok) + +} diff --git a/waku/v2/api/publish/rate_limiting.go b/waku/v2/api/publish/rate_limiting.go new file mode 100644 index 00000000..390bed95 --- /dev/null +++ b/waku/v2/api/publish/rate_limiting.go @@ -0,0 +1,37 @@ +package publish + +import ( + "context" + "errors" + + "github.com/waku-org/go-waku/waku/v2/protocol" + "go.uber.org/zap" + "golang.org/x/time/rate" +) + +// PublishRateLimiter is used to decorate publish functions to limit the +// number of messages per second that can be published +type PublishRateLimiter struct { + limiter *rate.Limiter +} + +// NewPublishRateLimiter will create a new instance of PublishRateLimiter. +// You can specify an rate.Inf value to in practice ignore the rate limiting +func NewPublishRateLimiter(r rate.Limit) *PublishRateLimiter { + return &PublishRateLimiter{ + limiter: rate.NewLimiter(r, 1), + } +} + +// ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied +func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn { + return func(envelope *protocol.Envelope, logger *zap.Logger) error { + if err := p.limiter.Wait(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("could not send message (limiter)", zap.Error(err)) + } + return err + } + return publishFn(envelope, logger) + } +} diff --git a/waku/v2/api/publish/rate_limiting_test.go b/waku/v2/api/publish/rate_limiting_test.go new file mode 100644 index 00000000..e516cbc9 --- /dev/null +++ b/waku/v2/api/publish/rate_limiting_test.go @@ -0,0 +1,36 @@ +package publish + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" + "golang.org/x/time/rate" +) + +func TestRateLimit(t *testing.T) { + r := NewPublishRateLimiter(rate.Limit(1)) + l := utils.Logger() + + var counter atomic.Int32 + fn := r.ThrottlePublishFn(context.Background(), func(envelope *protocol.Envelope, logger *zap.Logger) error { + counter.Add(1) + return nil + }) + + go func() { + for i := 0; i <= 10; i++ { + err := fn(nil, l) + require.NoError(t, err) + } + }() + + <-time.After(2 * time.Second) + + require.LessOrEqual(t, counter.Load(), int32(3)) +}