From ed9ea4066839c27738516a9f177d81d8eaba97ef Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 14 Apr 2021 22:19:31 -0400 Subject: [PATCH] use broadcaster for subscriptions and minor code reorg --- waku/node.go | 2 +- waku/v2/node/broadcast.go | 83 +++++++++++++++++ waku/v2/node/wakunode2.go | 183 ++++++++++++++++++++------------------ 3 files changed, 180 insertions(+), 88 deletions(-) create mode 100644 waku/v2/node/broadcast.go diff --git a/waku/node.go b/waku/node.go index 6f844290..477a2ee7 100644 --- a/waku/node.go +++ b/waku/node.go @@ -98,7 +98,7 @@ var rootCmd = &cobra.Command{ dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) checkError(err, "DBStore") - err = wakuNode.MountStore(dbStore) + err = wakuNode.MountStore(true, dbStore) checkError(err, "Error mounting store") wakuNode.StartStore() diff --git a/waku/v2/node/broadcast.go b/waku/v2/node/broadcast.go new file mode 100644 index 00000000..23f55b83 --- /dev/null +++ b/waku/v2/node/broadcast.go @@ -0,0 +1,83 @@ +package node + +import ( + "github.com/status-im/go-waku/waku/common" +) + +// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 which was released under MIT license + +type broadcaster struct { + input chan *common.Envelope + reg chan chan<- *common.Envelope + unreg chan chan<- *common.Envelope + + outputs map[chan<- *common.Envelope]bool +} + +// The Broadcaster interface describes the main entry points to +// broadcasters. +type Broadcaster interface { + // Register a new channel to receive broadcasts + Register(chan<- *common.Envelope) + // Unregister a channel so that it no longer receives broadcasts. + Unregister(chan<- *common.Envelope) + // Shut this broadcaster down. + Close() error + // Submit a new object to all subscribers + Submit(*common.Envelope) +} + +func (b *broadcaster) broadcast(m *common.Envelope) { + for ch := range b.outputs { + ch <- m + } +} + +func (b *broadcaster) run() { + for { + select { + case m := <-b.input: + b.broadcast(m) + case ch, ok := <-b.reg: + if ok { + b.outputs[ch] = true + } else { + return + } + case ch := <-b.unreg: + delete(b.outputs, ch) + } + } +} + +func NewBroadcaster(buflen int) Broadcaster { + b := &broadcaster{ + input: make(chan *common.Envelope, buflen), + reg: make(chan chan<- *common.Envelope), + unreg: make(chan chan<- *common.Envelope), + outputs: make(map[chan<- *common.Envelope]bool), + } + + go b.run() + + return b +} + +func (b *broadcaster) Register(newch chan<- *common.Envelope) { + b.reg <- newch +} + +func (b *broadcaster) Unregister(newch chan<- *common.Envelope) { + b.unreg <- newch +} + +func (b *broadcaster) Close() error { + close(b.reg) + return nil +} + +func (b *broadcaster) Submit(m *common.Envelope) { + if b != nil { + b.input <- m + } +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index fecdb323..8529d0c6 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -36,25 +36,23 @@ const DefaultWakuTopic Topic = "/waku/2/default-waku/proto" type Message []byte -type Subscription struct { - C chan *common.Envelope - closed bool - mutex sync.Mutex - pubSubscription *wakurelay.Subscription - quit chan struct{} -} - type WakuNode struct { host host.Host pubsub *wakurelay.PubSub - store *store.WakuStore - topics map[Topic]*wakurelay.Topic - topicsMutex sync.Mutex + store *store.WakuStore + isStore bool - subscriptions []*Subscription + topics map[Topic]bool + topicsMutex sync.Mutex + wakuRelayTopics map[Topic]*wakurelay.Topic + + subscriptions map[Topic][]*Subscription subscriptionsMutex sync.Mutex + bcaster Broadcaster + relaySubs map[Topic]*wakurelay.Subscription + ctx context.Context cancel context.CancelFunc privKey crypto.PrivKey @@ -95,12 +93,16 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr []net.Addr, op } w := new(WakuNode) + w.bcaster = NewBroadcaster(1024) w.pubsub = nil w.host = host w.cancel = cancel w.privKey = nodeKey w.ctx = ctx - w.topics = make(map[Topic]*wakurelay.Topic) + w.topics = make(map[Topic]bool) + w.wakuRelayTopics = make(map[Topic]*wakurelay.Topic) + w.relaySubs = make(map[Topic]*wakurelay.Subscription) + w.subscriptions = make(map[Topic][]*Subscription) for _, addr := range w.ListenAddresses() { log.Info("Listening on ", addr) @@ -114,8 +116,10 @@ func (w *WakuNode) Stop() { defer w.subscriptionsMutex.Unlock() defer w.cancel() - for _, sub := range w.subscriptions { - sub.Unsubscribe() + for topic, _ := range w.topics { + for _, sub := range w.subscriptions[topic] { + sub.Unsubscribe() + } } w.subscriptions = nil @@ -159,12 +163,9 @@ func (w *WakuNode) MountRelay(opts ...wakurelay.Option) error { return nil } -func (w *WakuNode) MountStore(s store.MessageProvider) error { - sub, err := w.Subscribe(nil) - if err != nil { - return err - } - w.store = store.NewWakuStore(w.ctx, w.host, sub.C, s) +func (w *WakuNode) MountStore(isStore bool, s store.MessageProvider) error { + w.store = store.NewWakuStore(w.ctx, w.host, s) + w.isStore = isStore return nil } @@ -173,76 +174,86 @@ func (w *WakuNode) StartStore() error { return errors.New("WakuStore is not set") } + _, err := w.Subscribe(nil) + if err != nil { + return err + } + w.store.Start() return nil } -func (w *WakuNode) AddStorePeer(address string) error { +func (w *WakuNode) AddStorePeer(address string) (*peer.ID, error) { if w.store == nil { - return errors.New("WakuStore is not set") + return nil, errors.New("WakuStore is not set") } storePeer, err := ma.NewMultiaddr(address) if err != nil { - return err + return nil, err } // Extract the peer ID from the multiaddr. info, err := peer.AddrInfoFromP2pAddr(storePeer) if err != nil { - return err + return nil, err } - return w.store.AddPeer(info.ID, info.Addrs) + return &info.ID, w.store.AddPeer(info.ID, info.Addrs) } -func (w *WakuNode) Query(contentTopic string, asc bool, pageSize uint64) (*protocol.HistoryResponse, error) { +func (w *WakuNode) Query(contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*protocol.HistoryResponse, error) { if w.store == nil { return nil, errors.New("WakuStore is not set") } query := new(protocol.HistoryQuery) - query.Topics = append(query.Topics, contentTopic) + query.Topics = contentTopics + query.StartTime = startTime + query.EndTime = endTime query.PagingInfo = new(protocol.PagingInfo) - if asc { - query.PagingInfo.Direction = protocol.PagingInfo_FORWARD - } else { - query.PagingInfo.Direction = protocol.PagingInfo_BACKWARD - } - query.PagingInfo.PageSize = pageSize - - result, err := w.store.Query(query) + result, err := w.store.Query(query, opts...) if err != nil { return nil, err } return result, nil } +func getTopic(topic *Topic) Topic { + var t Topic = DefaultWakuTopic + if topic != nil { + t = *topic + } + return t +} + func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { // Subscribes to a PubSub topic. // NOTE The data field SHOULD be decoded as a WakuMessage. - if node.pubsub == nil { return nil, errors.New("PubSub hasn't been set. Execute mountWakuRelay() or setPubSub() first") } - pubSubTopic, err := node.upsertTopic(topic) - if err != nil { - return nil, err - } - - sub, err := pubSubTopic.Subscribe() + t := getTopic(topic) + + sub, err := node.upsertSubscription(t) if err != nil { return nil, err } + // Create client subscription subscription := new(Subscription) subscription.closed = false - subscription.pubSubscription = sub - subscription.C = make(chan *common.Envelope) + subscription.C = make(chan *common.Envelope, 1024) // To avoid blocking subscription.quit = make(chan struct{}) - go func(ctx context.Context, sub *wakurelay.Subscription) { + node.subscriptionsMutex.Lock() + defer node.subscriptionsMutex.Unlock() + node.subscriptions[t] = append(node.subscriptions[t], subscription) + + node.bcaster.Register(subscription.C) + + go func() { nextMsgTicker := time.NewTicker(time.Millisecond * 10) defer nextMsgTicker.Stop() @@ -250,19 +261,19 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { select { case <-subscription.quit: subscription.mutex.Lock() - defer subscription.mutex.Unlock() + node.bcaster.Unregister(subscription.C) // Remove from broadcast list close(subscription.C) - subscription.closed = true - return + subscription.mutex.Unlock() case <-nextMsgTicker.C: - msg, err := sub.Next(ctx) + msg, err := sub.Next(node.ctx) if err != nil { subscription.mutex.Lock() - defer subscription.mutex.Unlock() - if !subscription.closed { - subscription.closed = true - close(subscription.quit) + node.topicsMutex.Lock() + for _, subscription := range node.subscriptions[t] { + subscription.Unsubscribe() } + node.topicsMutex.Unlock() + subscription.mutex.Unlock() return } @@ -273,56 +284,54 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { } envelope := common.NewEnvelope(wakuMessage, len(msg.Data), gcrypto.Keccak256(msg.Data)) - subscription.C <- envelope + + node.bcaster.Submit(envelope) } } - }(node.ctx, sub) - - node.subscriptionsMutex.Lock() - defer node.subscriptionsMutex.Unlock() - - node.subscriptions = append(node.subscriptions, subscription) + }() return subscription, nil } -func (subs *Subscription) Unsubscribe() { - // Unsubscribes a handler from a PubSub topic. - subs.mutex.Lock() - defer subs.mutex.Unlock() - if !subs.closed { - subs.closed = true - close(subs.quit) - } -} - -func (subs *Subscription) IsClosed() bool { - subs.mutex.Lock() - defer subs.mutex.Unlock() - return subs.closed -} - -func (node *WakuNode) upsertTopic(topic *Topic) (*wakurelay.Topic, error) { +func (node *WakuNode) upsertTopic(topic Topic) (*wakurelay.Topic, error) { defer node.topicsMutex.Unlock() node.topicsMutex.Lock() - var t Topic = DefaultWakuTopic - if topic != nil { - t = *topic - } - - pubSubTopic, ok := node.topics[t] + node.topics[topic] = true + pubSubTopic, ok := node.wakuRelayTopics[topic] if !ok { // Joins topic if node hasn't joined yet - newTopic, err := node.pubsub.Join(string(t)) + newTopic, err := node.pubsub.Join(string(topic)) if err != nil { return nil, err } - node.topics[t] = newTopic + node.wakuRelayTopics[topic] = newTopic pubSubTopic = newTopic } return pubSubTopic, nil } +func (node *WakuNode) upsertSubscription(topic Topic) (*wakurelay.Subscription, error) { + sub, ok := node.relaySubs[topic] + if !ok { + pubSubTopic, err := node.upsertTopic(topic) + if err != nil { + return nil, err + } + + sub, err = pubSubTopic.Subscribe() + if err != nil { + return nil, err + } + node.relaySubs[topic] = sub + } + + if node.store != nil && node.isStore { + node.bcaster.Register(node.store.MsgC) + } + + return sub, nil +} + func (node *WakuNode) Publish(message *protocol.WakuMessage, topic *Topic) ([]byte, error) { // Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a // `contentTopic` field for light node functionality. This field may be also @@ -336,7 +345,7 @@ func (node *WakuNode) Publish(message *protocol.WakuMessage, topic *Topic) ([]by return nil, errors.New("message can't be null") } - pubSubTopic, err := node.upsertTopic(topic) + pubSubTopic, err := node.upsertTopic(getTopic(topic)) if err != nil { return nil, err