use broadcaster for subscriptions and minor code reorg

This commit is contained in:
Richard Ramos 2021-04-14 22:19:31 -04:00
parent e1f10d2099
commit ed9ea40668
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
3 changed files with 180 additions and 88 deletions

View File

@ -98,7 +98,7 @@ var rootCmd = &cobra.Command{
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
checkError(err, "DBStore")
err = wakuNode.MountStore(dbStore)
err = wakuNode.MountStore(true, dbStore)
checkError(err, "Error mounting store")
wakuNode.StartStore()

83
waku/v2/node/broadcast.go Normal file
View File

@ -0,0 +1,83 @@
package node
import (
"github.com/status-im/go-waku/waku/common"
)
// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 which was released under MIT license
type broadcaster struct {
input chan *common.Envelope
reg chan chan<- *common.Envelope
unreg chan chan<- *common.Envelope
outputs map[chan<- *common.Envelope]bool
}
// The Broadcaster interface describes the main entry points to
// broadcasters.
type Broadcaster interface {
// Register a new channel to receive broadcasts
Register(chan<- *common.Envelope)
// Unregister a channel so that it no longer receives broadcasts.
Unregister(chan<- *common.Envelope)
// Shut this broadcaster down.
Close() error
// Submit a new object to all subscribers
Submit(*common.Envelope)
}
func (b *broadcaster) broadcast(m *common.Envelope) {
for ch := range b.outputs {
ch <- m
}
}
func (b *broadcaster) run() {
for {
select {
case m := <-b.input:
b.broadcast(m)
case ch, ok := <-b.reg:
if ok {
b.outputs[ch] = true
} else {
return
}
case ch := <-b.unreg:
delete(b.outputs, ch)
}
}
}
func NewBroadcaster(buflen int) Broadcaster {
b := &broadcaster{
input: make(chan *common.Envelope, buflen),
reg: make(chan chan<- *common.Envelope),
unreg: make(chan chan<- *common.Envelope),
outputs: make(map[chan<- *common.Envelope]bool),
}
go b.run()
return b
}
func (b *broadcaster) Register(newch chan<- *common.Envelope) {
b.reg <- newch
}
func (b *broadcaster) Unregister(newch chan<- *common.Envelope) {
b.unreg <- newch
}
func (b *broadcaster) Close() error {
close(b.reg)
return nil
}
func (b *broadcaster) Submit(m *common.Envelope) {
if b != nil {
b.input <- m
}
}

View File

@ -36,25 +36,23 @@ const DefaultWakuTopic Topic = "/waku/2/default-waku/proto"
type Message []byte
type Subscription struct {
C chan *common.Envelope
closed bool
mutex sync.Mutex
pubSubscription *wakurelay.Subscription
quit chan struct{}
}
type WakuNode struct {
host host.Host
pubsub *wakurelay.PubSub
store *store.WakuStore
isStore bool
topics map[Topic]*wakurelay.Topic
topics map[Topic]bool
topicsMutex sync.Mutex
wakuRelayTopics map[Topic]*wakurelay.Topic
subscriptions []*Subscription
subscriptions map[Topic][]*Subscription
subscriptionsMutex sync.Mutex
bcaster Broadcaster
relaySubs map[Topic]*wakurelay.Subscription
ctx context.Context
cancel context.CancelFunc
privKey crypto.PrivKey
@ -95,12 +93,16 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr []net.Addr, op
}
w := new(WakuNode)
w.bcaster = NewBroadcaster(1024)
w.pubsub = nil
w.host = host
w.cancel = cancel
w.privKey = nodeKey
w.ctx = ctx
w.topics = make(map[Topic]*wakurelay.Topic)
w.topics = make(map[Topic]bool)
w.wakuRelayTopics = make(map[Topic]*wakurelay.Topic)
w.relaySubs = make(map[Topic]*wakurelay.Subscription)
w.subscriptions = make(map[Topic][]*Subscription)
for _, addr := range w.ListenAddresses() {
log.Info("Listening on ", addr)
@ -114,9 +116,11 @@ func (w *WakuNode) Stop() {
defer w.subscriptionsMutex.Unlock()
defer w.cancel()
for _, sub := range w.subscriptions {
for topic, _ := range w.topics {
for _, sub := range w.subscriptions[topic] {
sub.Unsubscribe()
}
}
w.subscriptions = nil
}
@ -159,12 +163,9 @@ func (w *WakuNode) MountRelay(opts ...wakurelay.Option) error {
return nil
}
func (w *WakuNode) MountStore(s store.MessageProvider) error {
sub, err := w.Subscribe(nil)
if err != nil {
return err
}
w.store = store.NewWakuStore(w.ctx, w.host, sub.C, s)
func (w *WakuNode) MountStore(isStore bool, s store.MessageProvider) error {
w.store = store.NewWakuStore(w.ctx, w.host, s)
w.isStore = isStore
return nil
}
@ -173,76 +174,86 @@ func (w *WakuNode) StartStore() error {
return errors.New("WakuStore is not set")
}
_, err := w.Subscribe(nil)
if err != nil {
return err
}
w.store.Start()
return nil
}
func (w *WakuNode) AddStorePeer(address string) error {
func (w *WakuNode) AddStorePeer(address string) (*peer.ID, error) {
if w.store == nil {
return errors.New("WakuStore is not set")
return nil, errors.New("WakuStore is not set")
}
storePeer, err := ma.NewMultiaddr(address)
if err != nil {
return err
return nil, err
}
// Extract the peer ID from the multiaddr.
info, err := peer.AddrInfoFromP2pAddr(storePeer)
if err != nil {
return err
return nil, err
}
return w.store.AddPeer(info.ID, info.Addrs)
return &info.ID, w.store.AddPeer(info.ID, info.Addrs)
}
func (w *WakuNode) Query(contentTopic string, asc bool, pageSize uint64) (*protocol.HistoryResponse, error) {
func (w *WakuNode) Query(contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*protocol.HistoryResponse, error) {
if w.store == nil {
return nil, errors.New("WakuStore is not set")
}
query := new(protocol.HistoryQuery)
query.Topics = append(query.Topics, contentTopic)
query.Topics = contentTopics
query.StartTime = startTime
query.EndTime = endTime
query.PagingInfo = new(protocol.PagingInfo)
if asc {
query.PagingInfo.Direction = protocol.PagingInfo_FORWARD
} else {
query.PagingInfo.Direction = protocol.PagingInfo_BACKWARD
}
query.PagingInfo.PageSize = pageSize
result, err := w.store.Query(query)
result, err := w.store.Query(query, opts...)
if err != nil {
return nil, err
}
return result, nil
}
func getTopic(topic *Topic) Topic {
var t Topic = DefaultWakuTopic
if topic != nil {
t = *topic
}
return t
}
func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
// Subscribes to a PubSub topic.
// NOTE The data field SHOULD be decoded as a WakuMessage.
if node.pubsub == nil {
return nil, errors.New("PubSub hasn't been set. Execute mountWakuRelay() or setPubSub() first")
}
pubSubTopic, err := node.upsertTopic(topic)
if err != nil {
return nil, err
}
sub, err := pubSubTopic.Subscribe()
t := getTopic(topic)
sub, err := node.upsertSubscription(t)
if err != nil {
return nil, err
}
// Create client subscription
subscription := new(Subscription)
subscription.closed = false
subscription.pubSubscription = sub
subscription.C = make(chan *common.Envelope)
subscription.C = make(chan *common.Envelope, 1024) // To avoid blocking
subscription.quit = make(chan struct{})
go func(ctx context.Context, sub *wakurelay.Subscription) {
node.subscriptionsMutex.Lock()
defer node.subscriptionsMutex.Unlock()
node.subscriptions[t] = append(node.subscriptions[t], subscription)
node.bcaster.Register(subscription.C)
go func() {
nextMsgTicker := time.NewTicker(time.Millisecond * 10)
defer nextMsgTicker.Stop()
@ -250,19 +261,19 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
select {
case <-subscription.quit:
subscription.mutex.Lock()
defer subscription.mutex.Unlock()
node.bcaster.Unregister(subscription.C) // Remove from broadcast list
close(subscription.C)
subscription.closed = true
return
subscription.mutex.Unlock()
case <-nextMsgTicker.C:
msg, err := sub.Next(ctx)
msg, err := sub.Next(node.ctx)
if err != nil {
subscription.mutex.Lock()
defer subscription.mutex.Unlock()
if !subscription.closed {
subscription.closed = true
close(subscription.quit)
node.topicsMutex.Lock()
for _, subscription := range node.subscriptions[t] {
subscription.Unsubscribe()
}
node.topicsMutex.Unlock()
subscription.mutex.Unlock()
return
}
@ -273,56 +284,54 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
}
envelope := common.NewEnvelope(wakuMessage, len(msg.Data), gcrypto.Keccak256(msg.Data))
subscription.C <- envelope
node.bcaster.Submit(envelope)
}
}
}(node.ctx, sub)
node.subscriptionsMutex.Lock()
defer node.subscriptionsMutex.Unlock()
node.subscriptions = append(node.subscriptions, subscription)
}()
return subscription, nil
}
func (subs *Subscription) Unsubscribe() {
// Unsubscribes a handler from a PubSub topic.
subs.mutex.Lock()
defer subs.mutex.Unlock()
if !subs.closed {
subs.closed = true
close(subs.quit)
}
}
func (subs *Subscription) IsClosed() bool {
subs.mutex.Lock()
defer subs.mutex.Unlock()
return subs.closed
}
func (node *WakuNode) upsertTopic(topic *Topic) (*wakurelay.Topic, error) {
func (node *WakuNode) upsertTopic(topic Topic) (*wakurelay.Topic, error) {
defer node.topicsMutex.Unlock()
node.topicsMutex.Lock()
var t Topic = DefaultWakuTopic
if topic != nil {
t = *topic
}
pubSubTopic, ok := node.topics[t]
node.topics[topic] = true
pubSubTopic, ok := node.wakuRelayTopics[topic]
if !ok { // Joins topic if node hasn't joined yet
newTopic, err := node.pubsub.Join(string(t))
newTopic, err := node.pubsub.Join(string(topic))
if err != nil {
return nil, err
}
node.topics[t] = newTopic
node.wakuRelayTopics[topic] = newTopic
pubSubTopic = newTopic
}
return pubSubTopic, nil
}
func (node *WakuNode) upsertSubscription(topic Topic) (*wakurelay.Subscription, error) {
sub, ok := node.relaySubs[topic]
if !ok {
pubSubTopic, err := node.upsertTopic(topic)
if err != nil {
return nil, err
}
sub, err = pubSubTopic.Subscribe()
if err != nil {
return nil, err
}
node.relaySubs[topic] = sub
}
if node.store != nil && node.isStore {
node.bcaster.Register(node.store.MsgC)
}
return sub, nil
}
func (node *WakuNode) Publish(message *protocol.WakuMessage, topic *Topic) ([]byte, error) {
// Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
// `contentTopic` field for light node functionality. This field may be also
@ -336,7 +345,7 @@ func (node *WakuNode) Publish(message *protocol.WakuMessage, topic *Topic) ([]by
return nil, errors.New("message can't be null")
}
pubSubTopic, err := node.upsertTopic(topic)
pubSubTopic, err := node.upsertTopic(getTopic(topic))
if err != nil {
return nil, err