From 49737780ea8d0b1338532331fa5c9d4d716f65f2 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Sat, 6 Nov 2021 18:46:58 -0400 Subject: [PATCH] refactor: concurrent map and slices for filter --- waku/v2/protocol/filter/filter_map.go | 101 ++++++++++ waku/v2/protocol/filter/filter_subscribers.go | 94 ++++++++++ waku/v2/protocol/filter/waku_filter.go | 177 +++++------------- waku/v2/protocol/relay/waku_relay.go | 12 +- waku/v2/protocol/store/message_queue.go | 13 +- waku/v2/protocol/store/waku_store.go | 4 +- 6 files changed, 253 insertions(+), 148 deletions(-) create mode 100644 waku/v2/protocol/filter/filter_map.go create mode 100644 waku/v2/protocol/filter/filter_subscribers.go diff --git a/waku/v2/protocol/filter/filter_map.go b/waku/v2/protocol/filter/filter_map.go new file mode 100644 index 00000000..34d982f8 --- /dev/null +++ b/waku/v2/protocol/filter/filter_map.go @@ -0,0 +1,101 @@ +package filter + +import ( + "sync" + + "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/pb" +) + +type FilterMap struct { + sync.RWMutex + items map[string]Filter +} + +type FilterMapItem struct { + Key string + Value Filter +} + +func NewFilterMap() *FilterMap { + return &FilterMap{ + items: make(map[string]Filter), + } +} + +func (fm *FilterMap) Set(key string, value Filter) { + fm.Lock() + defer fm.Unlock() + + fm.items[key] = value +} + +func (fm *FilterMap) Get(key string) (Filter, bool) { + fm.Lock() + defer fm.Unlock() + + value, ok := fm.items[key] + + return value, ok +} + +func (fm *FilterMap) Delete(key string) { + fm.Lock() + defer fm.Unlock() + + close(fm.items[key].Chan) + delete(fm.items, key) +} + +func (fm *FilterMap) RemoveAll() { + fm.Lock() + defer fm.Unlock() + + for k, v := range fm.items { + close(v.Chan) + delete(fm.items, k) + } +} + +func (fm *FilterMap) Items() <-chan FilterMapItem { + c := make(chan FilterMapItem) + + f := func() { + fm.RLock() + defer fm.RUnlock() + + for k, v := range fm.items { + c <- FilterMapItem{k, v} + } + close(c) + } + go f() + + return c +} + +func (fm *FilterMap) Notify(msg *pb.WakuMessage, requestId string) { + fm.RLock() + defer fm.RUnlock() + + for key, filter := range fm.items { + envelope := protocol.NewEnvelope(msg, filter.Topic) + + // We do this because the key for the filter is set to the requestId received from the filter protocol. + // This means we do not need to check the content filter explicitly as all MessagePushs already contain + // the requestId of the coresponding filter. + if requestId != "" && requestId == key { + filter.Chan <- envelope + continue + } + + // TODO: In case of no topics we should either trigger here for all messages, + // or we should not allow such filter to exist in the first place. + for _, contentTopic := range filter.ContentFilters { + if msg.ContentTopic == contentTopic { + filter.Chan <- envelope + break + } + } + } +} diff --git a/waku/v2/protocol/filter/filter_subscribers.go b/waku/v2/protocol/filter/filter_subscribers.go new file mode 100644 index 00000000..7703e736 --- /dev/null +++ b/waku/v2/protocol/filter/filter_subscribers.go @@ -0,0 +1,94 @@ +package filter + +import ( + "sync" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/status-im/go-waku/waku/v2/protocol/pb" +) + +type Subscriber struct { + peer peer.ID + requestId string + filter pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN? +} + +type Subscribers struct { + sync.RWMutex + subscribers []Subscriber +} + +func NewSubscribers() *Subscribers { + return &Subscribers{} +} + +func (self *Subscribers) Append(s Subscriber) int { + self.Lock() + defer self.Unlock() + + self.subscribers = append(self.subscribers, s) + return len(self.subscribers) +} + +func (self *Subscribers) Items() <-chan Subscriber { + c := make(chan Subscriber) + + f := func() { + self.RLock() + defer self.RUnlock() + for _, value := range self.subscribers { + c <- value + } + close(c) + } + go f() + + return c +} + +func (self *Subscribers) Length() int { + self.RLock() + defer self.RUnlock() + + return len(self.subscribers) +} + +func (self *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) { + var peerIdsToRemove []peer.ID + + for _, subscriber := range self.subscribers { + if subscriber.peer != peerID { + continue + } + + // make sure we delete the content filter + // if no more topics are left + for i, contentFilter := range contentFilters { + subCfs := subscriber.filter.ContentFilters + for _, cf := range subCfs { + if cf.ContentTopic == contentFilter.ContentTopic { + l := len(subCfs) - 1 + subCfs[l], subCfs[i] = subCfs[i], subCfs[l] + subscriber.filter.ContentFilters = subCfs[:l] + } + } + } + + if len(subscriber.filter.ContentFilters) == 0 { + peerIdsToRemove = append(peerIdsToRemove, subscriber.peer) + } + } + + // make sure we delete the subscriber + // if no more content filters left + for _, peerId := range peerIdsToRemove { + for i, s := range self.subscribers { + if s.peer == peerId { + l := len(self.subscribers) - 1 + self.subscribers[l], self.subscribers[i] = self.subscribers[i], self.subscribers[l] + self.subscribers = self.subscribers[:l] + break + } + } + } +} diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 6ad0e457..303a0aeb 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -5,7 +5,6 @@ import ( "encoding/hex" "errors" "fmt" - "sync" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" @@ -47,15 +46,6 @@ type ( ContentTopics []string } - // TODO: MAYBE MORE INFO? - Filters map[string]Filter - - Subscriber struct { - peer peer.ID - requestId string - filter pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN? - } - FilterSubscription struct { RequestID string Peer peer.ID @@ -67,11 +57,8 @@ type ( isFullNode bool MsgC chan *protocol.Envelope - filtersMutex sync.RWMutex - filters Filters - - subscriberMutex sync.Mutex - subscribers []Subscriber + filters *FilterMap + subscribers *Subscribers } ) @@ -104,29 +91,6 @@ func DefaultOptions() []FilterSubscribeOption { } } -func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) { - for key, filter := range *filters { - envelope := protocol.NewEnvelope(msg, filter.Topic) - - // We do this because the key for the filter is set to the requestId received from the filter protocol. - // This means we do not need to check the content filter explicitly as all MessagePushs already contain - // the requestId of the coresponding filter. - if requestId != "" && requestId == key { - filter.Chan <- envelope - continue - } - - // TODO: In case of no topics we should either trigger here for all messages, - // or we should not allow such filter to exist in the first place. - for _, contentTopic := range filter.ContentFilters { - if msg.ContentTopic == contentTopic { - filter.Chan <- envelope - break - } - } - } -} - func (wf *WakuFilter) onRequest(s network.Stream) { defer s.Close() @@ -146,9 +110,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) { // We're on a light node. // This is a message push coming from a full node. for _, message := range filterRPCRequest.Push.Messages { - wf.filtersMutex.RLock() wf.filters.Notify(message, filterRPCRequest.RequestId) // Trigger filter handlers on a light node - wf.filtersMutex.RUnlock() } log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages") @@ -157,58 +119,17 @@ func (wf *WakuFilter) onRequest(s network.Stream) { // We're on a full node. // This is a filter request coming from a light node. if filterRPCRequest.Request.Subscribe { - wf.subscriberMutex.Lock() - defer wf.subscriberMutex.Unlock() - subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request} - wf.subscribers = append(wf.subscribers, subscriber) + len := wf.subscribers.Append(subscriber) + log.Info("filter full node, add a filter subscriber: ", subscriber.peer) - stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) + stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len))) } else { peerId := s.Conn().RemotePeer() + wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.Request.ContentFilters) + log.Info("filter full node, remove a filter subscriber: ", peerId.Pretty()) - contentFilters := filterRPCRequest.Request.ContentFilters - var peerIdsToRemove []peer.ID - - wf.subscriberMutex.Lock() - defer wf.subscriberMutex.Unlock() - for _, subscriber := range wf.subscribers { - if subscriber.peer != peerId { - continue - } - - // make sure we delete the content filter - // if no more topics are left - for i, contentFilter := range contentFilters { - subCfs := subscriber.filter.ContentFilters - for _, cf := range subCfs { - if cf.ContentTopic == contentFilter.ContentTopic { - l := len(subCfs) - 1 - subCfs[l], subCfs[i] = subCfs[i], subCfs[l] - subscriber.filter.ContentFilters = subCfs[:l] - } - } - } - - if len(subscriber.filter.ContentFilters) == 0 { - peerIdsToRemove = append(peerIdsToRemove, subscriber.peer) - } - } - - // make sure we delete the subscriber - // if no more content filters left - for _, peerId := range peerIdsToRemove { - for i, s := range wf.subscribers { - if s.peer == peerId { - l := len(wf.subscribers) - 1 - wf.subscribers[l], wf.subscribers[i] = wf.subscribers[i], wf.subscribers[l] - wf.subscribers = wf.subscribers[:l] - break - } - } - } - - stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) + stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(wf.subscribers.Length()))) } } else { log.Error("can't serve request") @@ -227,7 +148,8 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFi wf.MsgC = make(chan *protocol.Envelope) wf.h = host wf.isFullNode = isFullNode - wf.filters = make(Filters) + wf.filters = NewFilterMap() + wf.subscribers = NewSubscribers() wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) go wf.FilterListener() @@ -241,6 +163,29 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFi return wf } +func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error { + pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}} + + conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1) + // TODO: keep track of errors to automatically unsubscribe a peer? + if err != nil { + // @TODO more sophisticated error handling here + log.Error("failed to open peer stream") + //waku_filter_errors.inc(labelValues = [dialFailure]) + return err + } + + defer conn.Close() + writer := protoio.NewDelimitedWriter(conn) + err = writer.WriteMsg(pushRPC) + if err != nil { + log.Error("failed to push messages to remote peer") + return nil + } + + return nil +} + func (wf *WakuFilter) FilterListener() { // This function is invoked for each message received // on the full node in context of Waku2-Filter @@ -249,7 +194,7 @@ func (wf *WakuFilter) FilterListener() { topic := envelope.PubsubTopic() // Each subscriber is a light node that earlier on invoked // a FilterRequest on this node - for _, subscriber := range wf.subscribers { + for subscriber := range wf.subscribers.Items() { if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic { log.Info("Subscriber's filter pubsubTopic does not match message topic", subscriber.filter.Topic, topic) continue @@ -258,28 +203,12 @@ func (wf *WakuFilter) FilterListener() { for _, filter := range subscriber.filter.ContentFilters { if msg.ContentTopic == filter.ContentTopic { log.Info("found matching contentTopic ", filter, msg) - msgArr := []*pb.WakuMessage{msg} // Do a message push to light node - pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: msgArr}} - log.Info("pushing a message to light node: ", pushRPC) - - conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1) - // TODO: keep track of errors to automatically unsubscribe a peer? - if err != nil { - // @TODO more sophisticated error handling here - log.Error("failed to open peer stream") - //waku_filter_errors.inc(labelValues = [dialFailure]) + log.Info("pushing messages to light node: ", subscriber.peer) + if err := wf.pushMessage(subscriber, msg); err != nil { return err } - defer conn.Close() - writer := protoio.NewDelimitedWriter(conn) - err = writer.WriteMsg(pushRPC) - if err != nil { - log.Error("failed to push messages to remote peer") - return nil - } - } } } @@ -385,11 +314,7 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilt func (wf *WakuFilter) Stop() { wf.h.RemoveStreamHandler(FilterID_v20beta1) - wf.filtersMutex.Lock() - defer wf.filtersMutex.Unlock() - for _, filter := range wf.filters { - close(filter.Chan) - } + wf.filters.RemoveAll() } func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...FilterSubscribeOption) (filterID string, theFilter Filter, err error) { @@ -405,8 +330,6 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...Fi } // Register handler for filter, whether remote subscription succeeded or not - wf.filtersMutex.Lock() - defer wf.filtersMutex.Unlock() filterID = remoteSubs.RequestID theFilter = Filter{ @@ -416,7 +339,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...Fi Chan: make(chan *protocol.Envelope, 1024), // To avoid blocking } - wf.filters[filterID] = theFilter + wf.filters.Set(filterID, theFilter) return } @@ -428,10 +351,7 @@ func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string var f Filter var ok bool - wf.filtersMutex.Lock() - defer wf.filtersMutex.Unlock() - - if f, ok = wf.filters[filterID]; !ok { + if f, ok = wf.filters.Get(filterID); !ok { return errors.New("filter not found") } @@ -445,8 +365,7 @@ func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string return err } - close(f.Chan) - delete(wf.filters, filterID) + wf.filters.Delete(filterID) return nil } @@ -454,12 +373,12 @@ func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string // Unsubscribe filter removes content topics from a filter subscription. If all // the contentTopics are removed the subscription is dropped completely func (wf *WakuFilter) UnsubscribeFilter(ctx context.Context, cf ContentFilter) error { - wf.filtersMutex.Lock() - defer wf.filtersMutex.Unlock() - // Remove local filter var idsToRemove []string - for id, f := range wf.filters { + for filterMapItem := range wf.filters.Items() { + f := filterMapItem.Value + id := filterMapItem.Key + if f.Topic != cf.Topic { continue } @@ -490,13 +409,7 @@ func (wf *WakuFilter) UnsubscribeFilter(ctx context.Context, cf ContentFilter) e } for _, rId := range idsToRemove { - for id := range wf.filters { - if id == rId { - close(wf.filters[id].Chan) - delete(wf.filters, id) - break - } - } + wf.filters.Delete(rId) } return nil diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 90b86eb2..8189d4ea 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -33,13 +33,15 @@ type WakuRelay struct { host host.Host pubsub *pubsub.PubSub - topics map[Topic]bool + bcaster v2.Broadcaster + + // TODO: convert to concurrent maps + topics map[Topic]struct{} topicsMutex sync.Mutex wakuRelayTopics map[Topic]*pubsub.Topic relaySubs map[Topic]*pubsub.Subscription - bcaster v2.Broadcaster - + // TODO: convert to concurrent maps subscriptions map[Topic][]*Subscription subscriptionsMutex sync.Mutex } @@ -53,7 +55,7 @@ func msgIdFn(pmsg *pubsub_pb.Message) string { func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) { w := new(WakuRelay) w.host = h - w.topics = make(map[Topic]bool) + w.topics = make(map[Topic]struct{}) w.wakuRelayTopics = make(map[Topic]*pubsub.Topic) w.relaySubs = make(map[Topic]*pubsub.Subscription) w.subscriptions = make(map[Topic][]*Subscription) @@ -112,7 +114,7 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) { defer w.topicsMutex.Unlock() w.topicsMutex.Lock() - w.topics[topic] = true + w.topics[topic] = struct{}{} pubSubTopic, ok := w.wakuRelayTopics[topic] if !ok { // Joins topic if node hasn't joined yet newTopic, err := w.pubsub.Join(string(topic)) diff --git a/waku/v2/protocol/store/message_queue.go b/waku/v2/protocol/store/message_queue.go index 38f40441..b47efddd 100644 --- a/waku/v2/protocol/store/message_queue.go +++ b/waku/v2/protocol/store/message_queue.go @@ -18,11 +18,6 @@ type MessageQueue struct { quit chan struct{} } -type MessageQueueItem struct { - Index int - Value IndexedWakuMessage -} - func (self *MessageQueue) Push(msg IndexedWakuMessage) { self.Lock() defer self.Unlock() @@ -42,14 +37,14 @@ func (self *MessageQueue) Push(msg IndexedWakuMessage) { } } -func (self *MessageQueue) Messages() <-chan MessageQueueItem { - c := make(chan MessageQueueItem) +func (self *MessageQueue) Messages() <-chan IndexedWakuMessage { + c := make(chan IndexedWakuMessage) f := func() { self.RLock() defer self.RUnlock() - for index, value := range self.messages { - c <- MessageQueueItem{index, value} + for _, value := range self.messages { + c <- value } close(c) } diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 191b0d6f..e1f64ca3 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -144,7 +144,7 @@ func (store *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse result := new(pb.HistoryResponse) // data holds IndexedWakuMessage whose topics match the query var data []IndexedWakuMessage - for _, indexedMsg := range store.messageQueue.messages { + for indexedMsg := range store.messageQueue.Messages() { // temporal filtering // check whether the history query contains a time filter if query.StartTime != 0 && query.EndTime != 0 { @@ -628,7 +628,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c func (store *WakuStore) findLastSeen() float64 { var lastSeenTime float64 = 0 - for _, imsg := range store.messageQueue.messages { + for imsg := range store.messageQueue.Messages() { if imsg.msg.Timestamp > lastSeenTime { lastSeenTime = imsg.msg.Timestamp }