refactor: concurrent map and slices for filter

This commit is contained in:
Richard Ramos 2021-11-06 18:46:58 -04:00
parent fcfe3568ab
commit 49737780ea
6 changed files with 253 additions and 148 deletions

View File

@ -0,0 +1,101 @@
package filter
import (
"sync"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
)
type FilterMap struct {
sync.RWMutex
items map[string]Filter
}
type FilterMapItem struct {
Key string
Value Filter
}
func NewFilterMap() *FilterMap {
return &FilterMap{
items: make(map[string]Filter),
}
}
func (fm *FilterMap) Set(key string, value Filter) {
fm.Lock()
defer fm.Unlock()
fm.items[key] = value
}
func (fm *FilterMap) Get(key string) (Filter, bool) {
fm.Lock()
defer fm.Unlock()
value, ok := fm.items[key]
return value, ok
}
func (fm *FilterMap) Delete(key string) {
fm.Lock()
defer fm.Unlock()
close(fm.items[key].Chan)
delete(fm.items, key)
}
func (fm *FilterMap) RemoveAll() {
fm.Lock()
defer fm.Unlock()
for k, v := range fm.items {
close(v.Chan)
delete(fm.items, k)
}
}
func (fm *FilterMap) Items() <-chan FilterMapItem {
c := make(chan FilterMapItem)
f := func() {
fm.RLock()
defer fm.RUnlock()
for k, v := range fm.items {
c <- FilterMapItem{k, v}
}
close(c)
}
go f()
return c
}
func (fm *FilterMap) Notify(msg *pb.WakuMessage, requestId string) {
fm.RLock()
defer fm.RUnlock()
for key, filter := range fm.items {
envelope := protocol.NewEnvelope(msg, filter.Topic)
// We do this because the key for the filter is set to the requestId received from the filter protocol.
// This means we do not need to check the content filter explicitly as all MessagePushs already contain
// the requestId of the coresponding filter.
if requestId != "" && requestId == key {
filter.Chan <- envelope
continue
}
// TODO: In case of no topics we should either trigger here for all messages,
// or we should not allow such filter to exist in the first place.
for _, contentTopic := range filter.ContentFilters {
if msg.ContentTopic == contentTopic {
filter.Chan <- envelope
break
}
}
}
}

View File

@ -0,0 +1,94 @@
package filter
import (
"sync"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
)
type Subscriber struct {
peer peer.ID
requestId string
filter pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN?
}
type Subscribers struct {
sync.RWMutex
subscribers []Subscriber
}
func NewSubscribers() *Subscribers {
return &Subscribers{}
}
func (self *Subscribers) Append(s Subscriber) int {
self.Lock()
defer self.Unlock()
self.subscribers = append(self.subscribers, s)
return len(self.subscribers)
}
func (self *Subscribers) Items() <-chan Subscriber {
c := make(chan Subscriber)
f := func() {
self.RLock()
defer self.RUnlock()
for _, value := range self.subscribers {
c <- value
}
close(c)
}
go f()
return c
}
func (self *Subscribers) Length() int {
self.RLock()
defer self.RUnlock()
return len(self.subscribers)
}
func (self *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) {
var peerIdsToRemove []peer.ID
for _, subscriber := range self.subscribers {
if subscriber.peer != peerID {
continue
}
// make sure we delete the content filter
// if no more topics are left
for i, contentFilter := range contentFilters {
subCfs := subscriber.filter.ContentFilters
for _, cf := range subCfs {
if cf.ContentTopic == contentFilter.ContentTopic {
l := len(subCfs) - 1
subCfs[l], subCfs[i] = subCfs[i], subCfs[l]
subscriber.filter.ContentFilters = subCfs[:l]
}
}
}
if len(subscriber.filter.ContentFilters) == 0 {
peerIdsToRemove = append(peerIdsToRemove, subscriber.peer)
}
}
// make sure we delete the subscriber
// if no more content filters left
for _, peerId := range peerIdsToRemove {
for i, s := range self.subscribers {
if s.peer == peerId {
l := len(self.subscribers) - 1
self.subscribers[l], self.subscribers[i] = self.subscribers[i], self.subscribers[l]
self.subscribers = self.subscribers[:l]
break
}
}
}
}

View File

@ -5,7 +5,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"sync"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
@ -47,15 +46,6 @@ type (
ContentTopics []string
}
// TODO: MAYBE MORE INFO?
Filters map[string]Filter
Subscriber struct {
peer peer.ID
requestId string
filter pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN?
}
FilterSubscription struct {
RequestID string
Peer peer.ID
@ -67,11 +57,8 @@ type (
isFullNode bool
MsgC chan *protocol.Envelope
filtersMutex sync.RWMutex
filters Filters
subscriberMutex sync.Mutex
subscribers []Subscriber
filters *FilterMap
subscribers *Subscribers
}
)
@ -104,29 +91,6 @@ func DefaultOptions() []FilterSubscribeOption {
}
}
func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) {
for key, filter := range *filters {
envelope := protocol.NewEnvelope(msg, filter.Topic)
// We do this because the key for the filter is set to the requestId received from the filter protocol.
// This means we do not need to check the content filter explicitly as all MessagePushs already contain
// the requestId of the coresponding filter.
if requestId != "" && requestId == key {
filter.Chan <- envelope
continue
}
// TODO: In case of no topics we should either trigger here for all messages,
// or we should not allow such filter to exist in the first place.
for _, contentTopic := range filter.ContentFilters {
if msg.ContentTopic == contentTopic {
filter.Chan <- envelope
break
}
}
}
}
func (wf *WakuFilter) onRequest(s network.Stream) {
defer s.Close()
@ -146,9 +110,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
// We're on a light node.
// This is a message push coming from a full node.
for _, message := range filterRPCRequest.Push.Messages {
wf.filtersMutex.RLock()
wf.filters.Notify(message, filterRPCRequest.RequestId) // Trigger filter handlers on a light node
wf.filtersMutex.RUnlock()
}
log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages")
@ -157,58 +119,17 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
// We're on a full node.
// This is a filter request coming from a light node.
if filterRPCRequest.Request.Subscribe {
wf.subscriberMutex.Lock()
defer wf.subscriberMutex.Unlock()
subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request}
wf.subscribers = append(wf.subscribers, subscriber)
len := wf.subscribers.Append(subscriber)
log.Info("filter full node, add a filter subscriber: ", subscriber.peer)
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers))))
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len)))
} else {
peerId := s.Conn().RemotePeer()
wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.Request.ContentFilters)
log.Info("filter full node, remove a filter subscriber: ", peerId.Pretty())
contentFilters := filterRPCRequest.Request.ContentFilters
var peerIdsToRemove []peer.ID
wf.subscriberMutex.Lock()
defer wf.subscriberMutex.Unlock()
for _, subscriber := range wf.subscribers {
if subscriber.peer != peerId {
continue
}
// make sure we delete the content filter
// if no more topics are left
for i, contentFilter := range contentFilters {
subCfs := subscriber.filter.ContentFilters
for _, cf := range subCfs {
if cf.ContentTopic == contentFilter.ContentTopic {
l := len(subCfs) - 1
subCfs[l], subCfs[i] = subCfs[i], subCfs[l]
subscriber.filter.ContentFilters = subCfs[:l]
}
}
}
if len(subscriber.filter.ContentFilters) == 0 {
peerIdsToRemove = append(peerIdsToRemove, subscriber.peer)
}
}
// make sure we delete the subscriber
// if no more content filters left
for _, peerId := range peerIdsToRemove {
for i, s := range wf.subscribers {
if s.peer == peerId {
l := len(wf.subscribers) - 1
wf.subscribers[l], wf.subscribers[i] = wf.subscribers[i], wf.subscribers[l]
wf.subscribers = wf.subscribers[:l]
break
}
}
}
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers))))
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(wf.subscribers.Length())))
}
} else {
log.Error("can't serve request")
@ -227,7 +148,8 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFi
wf.MsgC = make(chan *protocol.Envelope)
wf.h = host
wf.isFullNode = isFullNode
wf.filters = make(Filters)
wf.filters = NewFilterMap()
wf.subscribers = NewSubscribers()
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
go wf.FilterListener()
@ -241,6 +163,29 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFi
return wf
}
func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error {
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}}
conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1)
// TODO: keep track of errors to automatically unsubscribe a peer?
if err != nil {
// @TODO more sophisticated error handling here
log.Error("failed to open peer stream")
//waku_filter_errors.inc(labelValues = [dialFailure])
return err
}
defer conn.Close()
writer := protoio.NewDelimitedWriter(conn)
err = writer.WriteMsg(pushRPC)
if err != nil {
log.Error("failed to push messages to remote peer")
return nil
}
return nil
}
func (wf *WakuFilter) FilterListener() {
// This function is invoked for each message received
// on the full node in context of Waku2-Filter
@ -249,7 +194,7 @@ func (wf *WakuFilter) FilterListener() {
topic := envelope.PubsubTopic()
// Each subscriber is a light node that earlier on invoked
// a FilterRequest on this node
for _, subscriber := range wf.subscribers {
for subscriber := range wf.subscribers.Items() {
if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic {
log.Info("Subscriber's filter pubsubTopic does not match message topic", subscriber.filter.Topic, topic)
continue
@ -258,28 +203,12 @@ func (wf *WakuFilter) FilterListener() {
for _, filter := range subscriber.filter.ContentFilters {
if msg.ContentTopic == filter.ContentTopic {
log.Info("found matching contentTopic ", filter, msg)
msgArr := []*pb.WakuMessage{msg}
// Do a message push to light node
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: msgArr}}
log.Info("pushing a message to light node: ", pushRPC)
conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1)
// TODO: keep track of errors to automatically unsubscribe a peer?
if err != nil {
// @TODO more sophisticated error handling here
log.Error("failed to open peer stream")
//waku_filter_errors.inc(labelValues = [dialFailure])
log.Info("pushing messages to light node: ", subscriber.peer)
if err := wf.pushMessage(subscriber, msg); err != nil {
return err
}
defer conn.Close()
writer := protoio.NewDelimitedWriter(conn)
err = writer.WriteMsg(pushRPC)
if err != nil {
log.Error("failed to push messages to remote peer")
return nil
}
}
}
}
@ -385,11 +314,7 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilt
func (wf *WakuFilter) Stop() {
wf.h.RemoveStreamHandler(FilterID_v20beta1)
wf.filtersMutex.Lock()
defer wf.filtersMutex.Unlock()
for _, filter := range wf.filters {
close(filter.Chan)
}
wf.filters.RemoveAll()
}
func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...FilterSubscribeOption) (filterID string, theFilter Filter, err error) {
@ -405,8 +330,6 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...Fi
}
// Register handler for filter, whether remote subscription succeeded or not
wf.filtersMutex.Lock()
defer wf.filtersMutex.Unlock()
filterID = remoteSubs.RequestID
theFilter = Filter{
@ -416,7 +339,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...Fi
Chan: make(chan *protocol.Envelope, 1024), // To avoid blocking
}
wf.filters[filterID] = theFilter
wf.filters.Set(filterID, theFilter)
return
}
@ -428,10 +351,7 @@ func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string
var f Filter
var ok bool
wf.filtersMutex.Lock()
defer wf.filtersMutex.Unlock()
if f, ok = wf.filters[filterID]; !ok {
if f, ok = wf.filters.Get(filterID); !ok {
return errors.New("filter not found")
}
@ -445,8 +365,7 @@ func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string
return err
}
close(f.Chan)
delete(wf.filters, filterID)
wf.filters.Delete(filterID)
return nil
}
@ -454,12 +373,12 @@ func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string
// Unsubscribe filter removes content topics from a filter subscription. If all
// the contentTopics are removed the subscription is dropped completely
func (wf *WakuFilter) UnsubscribeFilter(ctx context.Context, cf ContentFilter) error {
wf.filtersMutex.Lock()
defer wf.filtersMutex.Unlock()
// Remove local filter
var idsToRemove []string
for id, f := range wf.filters {
for filterMapItem := range wf.filters.Items() {
f := filterMapItem.Value
id := filterMapItem.Key
if f.Topic != cf.Topic {
continue
}
@ -490,13 +409,7 @@ func (wf *WakuFilter) UnsubscribeFilter(ctx context.Context, cf ContentFilter) e
}
for _, rId := range idsToRemove {
for id := range wf.filters {
if id == rId {
close(wf.filters[id].Chan)
delete(wf.filters, id)
break
}
}
wf.filters.Delete(rId)
}
return nil

View File

@ -33,13 +33,15 @@ type WakuRelay struct {
host host.Host
pubsub *pubsub.PubSub
topics map[Topic]bool
bcaster v2.Broadcaster
// TODO: convert to concurrent maps
topics map[Topic]struct{}
topicsMutex sync.Mutex
wakuRelayTopics map[Topic]*pubsub.Topic
relaySubs map[Topic]*pubsub.Subscription
bcaster v2.Broadcaster
// TODO: convert to concurrent maps
subscriptions map[Topic][]*Subscription
subscriptionsMutex sync.Mutex
}
@ -53,7 +55,7 @@ func msgIdFn(pmsg *pubsub_pb.Message) string {
func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) {
w := new(WakuRelay)
w.host = h
w.topics = make(map[Topic]bool)
w.topics = make(map[Topic]struct{})
w.wakuRelayTopics = make(map[Topic]*pubsub.Topic)
w.relaySubs = make(map[Topic]*pubsub.Subscription)
w.subscriptions = make(map[Topic][]*Subscription)
@ -112,7 +114,7 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
w.topics[topic] = true
w.topics[topic] = struct{}{}
pubSubTopic, ok := w.wakuRelayTopics[topic]
if !ok { // Joins topic if node hasn't joined yet
newTopic, err := w.pubsub.Join(string(topic))

View File

@ -18,11 +18,6 @@ type MessageQueue struct {
quit chan struct{}
}
type MessageQueueItem struct {
Index int
Value IndexedWakuMessage
}
func (self *MessageQueue) Push(msg IndexedWakuMessage) {
self.Lock()
defer self.Unlock()
@ -42,14 +37,14 @@ func (self *MessageQueue) Push(msg IndexedWakuMessage) {
}
}
func (self *MessageQueue) Messages() <-chan MessageQueueItem {
c := make(chan MessageQueueItem)
func (self *MessageQueue) Messages() <-chan IndexedWakuMessage {
c := make(chan IndexedWakuMessage)
f := func() {
self.RLock()
defer self.RUnlock()
for index, value := range self.messages {
c <- MessageQueueItem{index, value}
for _, value := range self.messages {
c <- value
}
close(c)
}

View File

@ -144,7 +144,7 @@ func (store *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse
result := new(pb.HistoryResponse)
// data holds IndexedWakuMessage whose topics match the query
var data []IndexedWakuMessage
for _, indexedMsg := range store.messageQueue.messages {
for indexedMsg := range store.messageQueue.Messages() {
// temporal filtering
// check whether the history query contains a time filter
if query.StartTime != 0 && query.EndTime != 0 {
@ -628,7 +628,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c
func (store *WakuStore) findLastSeen() float64 {
var lastSeenTime float64 = 0
for _, imsg := range store.messageQueue.messages {
for imsg := range store.messageQueue.Messages() {
if imsg.msg.Timestamp > lastSeenTime {
lastSeenTime = imsg.msg.Timestamp
}