refactor: move rate limiter and priority queue from status-go to api package (#1171)

This commit is contained in:
richΛrd 2024-08-01 09:15:05 -04:00 committed by GitHub
parent 04a9af931f
commit 0fc5bcc953
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 332 additions and 3 deletions

2
go.mod
View File

@ -154,7 +154,7 @@ require (
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.5.0
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58

View File

@ -1,4 +1,4 @@
package api
package filter
import (
"context"

View File

@ -1,4 +1,4 @@
package api
package filter
import (
"context"

View File

@ -0,0 +1,9 @@
package publish
import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
)
// PublishFn represents a function that will publish a message.
type PublishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error

View File

@ -0,0 +1,156 @@
package publish
import (
"container/heap"
"context"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
// MessagePriority determines the ordering for the message priority queue
type MessagePriority = int
const (
LowPriority MessagePriority = 1
NormalPriority MessagePriority = 2
HighPriority MessagePriority = 3
)
type envelopePriority struct {
envelope *protocol.Envelope
priority int
index int
}
type envelopePriorityQueue []*envelopePriority
func (pq envelopePriorityQueue) Len() int { return len(pq) }
func (pq envelopePriorityQueue) Less(i, j int) bool {
if pq[i].priority > pq[j].priority {
return true
} else if pq[i].priority == pq[j].priority {
return pq[i].envelope.Message().GetTimestamp() < pq[j].envelope.Message().GetTimestamp()
}
return false
}
func (pq envelopePriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *envelopePriorityQueue) Push(x any) {
n := len(*pq)
item := x.(*envelopePriority)
item.index = n
*pq = append(*pq, item)
}
func (pq *envelopePriorityQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
// MessageQueue is a structure used to handle the ordering of the messages to publish
type MessageQueue struct {
usePriorityQueue bool
toSendChan chan *protocol.Envelope
throttledPrioritySendQueue chan *envelopePriority
envelopeAvailableOnPriorityQueueSignal chan struct{}
envelopePriorityQueue envelopePriorityQueue
}
// NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a
// priority queue to handle the ordering of the messages, or use a simple FIFO queue.
func NewMessageQueue(bufferSize int, usePriorityQueue bool) *MessageQueue {
m := &MessageQueue{
usePriorityQueue: usePriorityQueue,
}
if m.usePriorityQueue {
m.envelopePriorityQueue = make(envelopePriorityQueue, 0)
m.throttledPrioritySendQueue = make(chan *envelopePriority, bufferSize)
m.envelopeAvailableOnPriorityQueueSignal = make(chan struct{}, bufferSize)
heap.Init(&m.envelopePriorityQueue)
} else {
m.toSendChan = make(chan *protocol.Envelope, bufferSize)
}
return m
}
// Start must be called to handle the lifetime of the internals of the message queue
func (m *MessageQueue) Start(ctx context.Context) {
for {
select {
case envelopePriority, ok := <-m.throttledPrioritySendQueue:
if !ok {
continue
}
heap.Push(&m.envelopePriorityQueue, envelopePriority)
m.envelopeAvailableOnPriorityQueueSignal <- struct{}{}
case <-ctx.Done():
if m.usePriorityQueue {
close(m.throttledPrioritySendQueue)
close(m.envelopeAvailableOnPriorityQueueSignal)
} else {
close(m.toSendChan)
}
return
}
}
}
// Push an envelope into the message queue. The priority is optional, and will be ignored
// if the message queue does not use a priority queue
func (m *MessageQueue) Push(envelope *protocol.Envelope, priority ...MessagePriority) {
if m.usePriorityQueue {
msgPriority := NormalPriority
if len(priority) != 0 {
msgPriority = priority[0]
}
m.throttledPrioritySendQueue <- &envelopePriority{
envelope: envelope,
priority: msgPriority,
}
} else {
m.toSendChan <- envelope
}
}
// Pop will return a channel on which a message can be retrieved from the message queue
func (m *MessageQueue) Pop() <-chan *protocol.Envelope {
ch := make(chan *protocol.Envelope)
go func() {
select {
case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal:
if ok {
ch <- heap.Pop(&m.envelopePriorityQueue).(*envelopePriority).envelope
}
case envelope, ok := <-m.toSendChan:
if ok {
ch <- envelope
}
}
close(ch)
}()
return ch
}

View File

@ -0,0 +1,91 @@
package publish
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"google.golang.org/protobuf/proto"
)
func TestFifoQueue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
queue := NewMessageQueue(10, false)
go queue.Start(ctx)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{}, 0, "A"))
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{}, 0, "B"))
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{}, 0, "C"))
envelope, ok := <-queue.Pop()
require.True(t, ok)
require.Equal(t, "A", envelope.PubsubTopic())
envelope, ok = <-queue.Pop()
require.True(t, ok)
require.Equal(t, "B", envelope.PubsubTopic())
envelope, ok = <-queue.Pop()
require.True(t, ok)
require.Equal(t, "C", envelope.PubsubTopic())
cancel()
_, ok = <-queue.Pop()
require.False(t, ok)
}
func TestPriorityQueue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
queue := NewMessageQueue(10, true)
go queue.Start(ctx)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(0)}, 0, "A"), LowPriority)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(1)}, 0, "B"), LowPriority)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(2)}, 0, "C"), HighPriority)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(3)}, 0, "D"), NormalPriority)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(4)}, 0, "E"), HighPriority)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(5)}, 0, "F"), LowPriority)
queue.Push(protocol.NewEnvelope(&pb.WakuMessage{Timestamp: proto.Int64(6)}, 0, "G"), NormalPriority)
time.Sleep(2 * time.Second)
envelope, ok := <-queue.Pop()
require.True(t, ok)
require.Equal(t, "C", envelope.PubsubTopic())
envelope, ok = <-queue.Pop()
require.True(t, ok)
require.Equal(t, "E", envelope.PubsubTopic())
envelope, ok = <-queue.Pop()
require.True(t, ok)
require.Equal(t, "D", envelope.PubsubTopic())
envelope, ok = <-queue.Pop()
require.True(t, ok)
require.Equal(t, "G", envelope.PubsubTopic())
envelope, ok = <-queue.Pop()
require.True(t, ok)
require.Equal(t, "A", envelope.PubsubTopic())
envelope, ok = <-queue.Pop()
require.True(t, ok)
require.Equal(t, "B", envelope.PubsubTopic())
envelope, ok = <-queue.Pop()
require.True(t, ok)
require.Equal(t, "F", envelope.PubsubTopic())
cancel()
_, ok = <-queue.Pop()
require.False(t, ok)
}

View File

@ -0,0 +1,37 @@
package publish
import (
"context"
"errors"
"github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
// PublishRateLimiter is used to decorate publish functions to limit the
// number of messages per second that can be published
type PublishRateLimiter struct {
limiter *rate.Limiter
}
// NewPublishRateLimiter will create a new instance of PublishRateLimiter.
// You can specify an rate.Inf value to in practice ignore the rate limiting
func NewPublishRateLimiter(r rate.Limit) *PublishRateLimiter {
return &PublishRateLimiter{
limiter: rate.NewLimiter(r, 1),
}
}
// ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied
func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn {
return func(envelope *protocol.Envelope, logger *zap.Logger) error {
if err := p.limiter.Wait(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("could not send message (limiter)", zap.Error(err))
}
return err
}
return publishFn(envelope, logger)
}
}

View File

@ -0,0 +1,36 @@
package publish
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
func TestRateLimit(t *testing.T) {
r := NewPublishRateLimiter(rate.Limit(1))
l := utils.Logger()
var counter atomic.Int32
fn := r.ThrottlePublishFn(context.Background(), func(envelope *protocol.Envelope, logger *zap.Logger) error {
counter.Add(1)
return nil
})
go func() {
for i := 0; i <= 10; i++ {
err := fn(nil, l)
require.NoError(t, err)
}
}()
<-time.After(2 * time.Second)
require.LessOrEqual(t, counter.Load(), int32(3))
}