From 997bc4f2d86d9036c2c8417d2e6dfa8e74c64d92 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 21 Apr 2021 20:09:37 -0400 Subject: [PATCH] refactor: organize code --- waku/persistence/store.go | 12 +-- waku/v2/node/broadcast.go | 32 +++--- waku/v2/node/subscription.go | 4 +- waku/v2/node/waku_payload.go | 4 +- waku/v2/node/wakunode2.go | 24 ++--- waku/v2/node/wakuoptions.go | 9 +- waku/{common => v2/protocol}/envelope.go | 10 +- waku/v2/protocol/message_notifier.go | 72 -------------- waku/v2/protocol/{ => pb}/waku_message.pb.go | 2 +- waku/v2/protocol/{ => pb}/waku_message.proto | 2 +- waku/v2/protocol/{ => pb}/waku_store.pb.go | 2 +- waku/v2/protocol/{ => pb}/waku_store.proto | 2 +- .../{waku_store => store}/waku_store.go | 97 +++++++------------ waku/v2/protocol/waku_filter/README.md | 3 - waku/v2/protocol/waku_store/README.md | 3 - 15 files changed, 86 insertions(+), 192 deletions(-) rename waku/{common => v2/protocol}/envelope.go (50%) delete mode 100644 waku/v2/protocol/message_notifier.go rename waku/v2/protocol/{ => pb}/waku_message.pb.go (99%) rename waku/v2/protocol/{ => pb}/waku_message.proto (89%) rename waku/v2/protocol/{ => pb}/waku_store.pb.go (99%) rename waku/v2/protocol/{ => pb}/waku_store.proto (97%) rename waku/v2/protocol/{waku_store => store}/waku_store.go (80%) delete mode 100644 waku/v2/protocol/waku_filter/README.md delete mode 100644 waku/v2/protocol/waku_store/README.md diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 19c06ce..9b72216 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -4,8 +4,8 @@ import ( "database/sql" "log" - "github.com/status-im/go-waku/waku/v2/protocol" - store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/store" ) type DBStore struct { @@ -68,7 +68,7 @@ func (d *DBStore) Stop() { d.db.Close() } -func (d *DBStore) Put(cursor *protocol.Index, message *protocol.WakuMessage) error { +func (d *DBStore) Put(cursor *pb.Index, message *pb.WakuMessage) error { stmt, err := d.db.Prepare("INSERT INTO messages (id, timestamp, contentTopic, payload, version) VALUES (?, ?, ?, ?, ?)") if err != nil { return err @@ -81,13 +81,13 @@ func (d *DBStore) Put(cursor *protocol.Index, message *protocol.WakuMessage) err return nil } -func (d *DBStore) GetAll() ([]*protocol.WakuMessage, error) { +func (d *DBStore) GetAll() ([]*pb.WakuMessage, error) { rows, err := d.db.Query("SELECT timestamp, contentTopic, payload, version FROM messages ORDER BY timestamp ASC") if err != nil { return nil, err } - var result []*protocol.WakuMessage + var result []*pb.WakuMessage defer rows.Close() @@ -102,7 +102,7 @@ func (d *DBStore) GetAll() ([]*protocol.WakuMessage, error) { log.Fatal(err) } - msg := new(protocol.WakuMessage) + msg := new(pb.WakuMessage) msg.ContentTopic = contentTopic msg.Payload = payload msg.Timestamp = float64(timestamp) diff --git a/waku/v2/node/broadcast.go b/waku/v2/node/broadcast.go index 23f55b8..18a441d 100644 --- a/waku/v2/node/broadcast.go +++ b/waku/v2/node/broadcast.go @@ -1,33 +1,33 @@ package node import ( - "github.com/status-im/go-waku/waku/common" + "github.com/status-im/go-waku/waku/v2/protocol" ) // Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 which was released under MIT license type broadcaster struct { - input chan *common.Envelope - reg chan chan<- *common.Envelope - unreg chan chan<- *common.Envelope + input chan *protocol.Envelope + reg chan chan<- *protocol.Envelope + unreg chan chan<- *protocol.Envelope - outputs map[chan<- *common.Envelope]bool + outputs map[chan<- *protocol.Envelope]bool } // The Broadcaster interface describes the main entry points to // broadcasters. type Broadcaster interface { // Register a new channel to receive broadcasts - Register(chan<- *common.Envelope) + Register(chan<- *protocol.Envelope) // Unregister a channel so that it no longer receives broadcasts. - Unregister(chan<- *common.Envelope) + Unregister(chan<- *protocol.Envelope) // Shut this broadcaster down. Close() error // Submit a new object to all subscribers - Submit(*common.Envelope) + Submit(*protocol.Envelope) } -func (b *broadcaster) broadcast(m *common.Envelope) { +func (b *broadcaster) broadcast(m *protocol.Envelope) { for ch := range b.outputs { ch <- m } @@ -52,10 +52,10 @@ func (b *broadcaster) run() { func NewBroadcaster(buflen int) Broadcaster { b := &broadcaster{ - input: make(chan *common.Envelope, buflen), - reg: make(chan chan<- *common.Envelope), - unreg: make(chan chan<- *common.Envelope), - outputs: make(map[chan<- *common.Envelope]bool), + input: make(chan *protocol.Envelope, buflen), + reg: make(chan chan<- *protocol.Envelope), + unreg: make(chan chan<- *protocol.Envelope), + outputs: make(map[chan<- *protocol.Envelope]bool), } go b.run() @@ -63,11 +63,11 @@ func NewBroadcaster(buflen int) Broadcaster { return b } -func (b *broadcaster) Register(newch chan<- *common.Envelope) { +func (b *broadcaster) Register(newch chan<- *protocol.Envelope) { b.reg <- newch } -func (b *broadcaster) Unregister(newch chan<- *common.Envelope) { +func (b *broadcaster) Unregister(newch chan<- *protocol.Envelope) { b.unreg <- newch } @@ -76,7 +76,7 @@ func (b *broadcaster) Close() error { return nil } -func (b *broadcaster) Submit(m *common.Envelope) { +func (b *broadcaster) Submit(m *protocol.Envelope) { if b != nil { b.input <- m } diff --git a/waku/v2/node/subscription.go b/waku/v2/node/subscription.go index 1afe77b..1c144e0 100644 --- a/waku/v2/node/subscription.go +++ b/waku/v2/node/subscription.go @@ -3,11 +3,11 @@ package node import ( "sync" - "github.com/status-im/go-waku/waku/common" + "github.com/status-im/go-waku/waku/v2/protocol" ) type Subscription struct { - C chan *common.Envelope + C chan *protocol.Envelope closed bool mutex sync.Mutex quit chan struct{} diff --git a/waku/v2/node/waku_payload.go b/waku/v2/node/waku_payload.go index 776b0de..49253d2 100644 --- a/waku/v2/node/waku_payload.go +++ b/waku/v2/node/waku_payload.go @@ -13,7 +13,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/ecies" - "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/pb" ) type KeyKind string @@ -86,7 +86,7 @@ func (payload Payload) Encode(version uint32) ([]byte, error) { return nil, errors.New("Unsupported WakuMessage version") } -func DecodePayload(message *protocol.WakuMessage, keyInfo *KeyInfo) (*DecodedPayload, error) { +func DecodePayload(message *pb.WakuMessage, keyInfo *KeyInfo) (*DecodedPayload, error) { switch message.Version { case uint32(0): return &DecodedPayload{Data: message.Payload}, nil diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 582f367..fc524fe 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -14,9 +14,10 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" - common "github.com/status-im/go-waku/waku/common" + "github.com/status-im/go-waku/waku/v2/protocol" - store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/store" wakurelay "github.com/status-im/go-wakurelay-pubsub" ) @@ -56,7 +57,6 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { ctx, cancel := context.WithCancel(ctx) _ = cancel - params.ctx = ctx params.libP2POpts = DefaultLibP2POptions for _, opt := range opts { @@ -195,21 +195,21 @@ func (w *WakuNode) AddStorePeer(address string) (*peer.ID, error) { return &info.ID, w.opts.store.AddPeer(info.ID, info.Addrs) } -func (w *WakuNode) Query(contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*protocol.HistoryResponse, error) { +func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*pb.HistoryResponse, error) { if w.opts.store == nil { return nil, errors.New("WakuStore is not set") } - query := new(protocol.HistoryQuery) + query := new(pb.HistoryQuery) for _, ct := range contentTopics { - query.ContentFilters = append(query.ContentFilters, &protocol.ContentFilter{ContentTopic: ct}) + query.ContentFilters = append(query.ContentFilters, &pb.ContentFilter{ContentTopic: ct}) } query.StartTime = startTime query.EndTime = endTime - query.PagingInfo = new(protocol.PagingInfo) - result, err := w.opts.store.Query(query, opts...) + query.PagingInfo = new(pb.PagingInfo) + result, err := w.opts.store.Query(ctx, query, opts...) if err != nil { return nil, err } @@ -241,7 +241,7 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { // Create client subscription subscription := new(Subscription) subscription.closed = false - subscription.C = make(chan *common.Envelope, 1024) // To avoid blocking + subscription.C = make(chan *protocol.Envelope, 1024) // To avoid blocking subscription.quit = make(chan struct{}) node.subscriptionsMutex.Lock() @@ -274,13 +274,13 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { return } - wakuMessage := &protocol.WakuMessage{} + wakuMessage := &pb.WakuMessage{} if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { log.Error("could not decode message", err) return } - envelope := common.NewEnvelope(wakuMessage, len(msg.Data), gcrypto.Keccak256(msg.Data)) + envelope := protocol.NewEnvelope(wakuMessage, len(msg.Data), gcrypto.Keccak256(msg.Data)) node.bcaster.Submit(envelope) } @@ -332,7 +332,7 @@ func (node *WakuNode) upsertSubscription(topic Topic) (*wakurelay.Subscription, return sub, nil } -func (node *WakuNode) Publish(message *protocol.WakuMessage, topic *Topic) ([]byte, error) { +func (node *WakuNode) Publish(message *pb.WakuMessage, topic *Topic) ([]byte, error) { // Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a // `contentTopic` field for light node functionality. This field may be also // be omitted. diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 25a4515..b3ba45e 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -1,7 +1,6 @@ package node import ( - "context" "crypto/ecdsa" "net" @@ -10,7 +9,7 @@ import ( "github.com/libp2p/go-libp2p-core/crypto" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" - store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" + "github.com/status-im/go-waku/waku/v2/protocol/store" wakurelay "github.com/status-im/go-wakurelay-pubsub" ) @@ -25,8 +24,6 @@ type WakuNodeParameters struct { enableStore bool storeMsgs bool store *store.WakuStore - - ctx context.Context } type WakuNodeOption func(*WakuNodeParameters) error @@ -75,7 +72,7 @@ func WithWakuStore(shouldStoreMessages bool) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableStore = true params.storeMsgs = shouldStoreMessages - params.store = store.NewWakuStore(params.ctx, shouldStoreMessages, nil) + params.store = store.NewWakuStore(shouldStoreMessages, nil) return nil } } @@ -85,7 +82,7 @@ func WithMessageProvider(s store.MessageProvider) WakuNodeOption { if params.store != nil { params.store.SetMsgProvider(s) } else { - params.store = store.NewWakuStore(params.ctx, true, s) + params.store = store.NewWakuStore(true, s) } return nil } diff --git a/waku/common/envelope.go b/waku/v2/protocol/envelope.go similarity index 50% rename from waku/common/envelope.go rename to waku/v2/protocol/envelope.go index fbc50db..949941c 100644 --- a/waku/common/envelope.go +++ b/waku/v2/protocol/envelope.go @@ -1,14 +1,14 @@ -package common +package protocol -import "github.com/status-im/go-waku/waku/v2/protocol" +import "github.com/status-im/go-waku/waku/v2/protocol/pb" type Envelope struct { - msg *protocol.WakuMessage + msg *pb.WakuMessage size int hash []byte } -func NewEnvelope(msg *protocol.WakuMessage, size int, hash []byte) *Envelope { +func NewEnvelope(msg *pb.WakuMessage, size int, hash []byte) *Envelope { return &Envelope{ msg: msg, size: size, @@ -16,7 +16,7 @@ func NewEnvelope(msg *protocol.WakuMessage, size int, hash []byte) *Envelope { } } -func (e *Envelope) Message() *protocol.WakuMessage { +func (e *Envelope) Message() *pb.WakuMessage { return e.msg } diff --git a/waku/v2/protocol/message_notifier.go b/waku/v2/protocol/message_notifier.go deleted file mode 100644 index 834a03d..0000000 --- a/waku/v2/protocol/message_notifier.go +++ /dev/null @@ -1,72 +0,0 @@ -// The Message Notification system is a method to notify various protocols -// running on a node when a new message was received. -// -// Protocols can subscribe to messages of specific topics, then when one is received -// The notification handler function will be called. - -package protocol - -import ( - "sync" -) - -type MessageNotificationHandler func(topic string, msg *WakuMessage) - -type MessageNotificationSubscriptionIdentifier string - -type MessageNotificationSubscription struct { - topics []string // @TODO TOPIC (?) - handler MessageNotificationHandler -} - -type MessageNotificationSubscriptions map[string]MessageNotificationSubscription - -func (subscriptions MessageNotificationSubscriptions) subscribe(name string, subscription MessageNotificationSubscription) { - subscriptions[name] = subscription -} - -func Init(topics []string, handler MessageNotificationHandler) MessageNotificationSubscription { - result := MessageNotificationSubscription{} - result.topics = topics - result.handler = handler - return result -} - -func containsMatch(lhs []string, rhs []string) bool { - for _, l := range lhs { - for _, r := range rhs { - if l == r { - return true - } - } - } - return false -} - -func (subscriptions MessageNotificationSubscriptions) notify(topic string, msg *WakuMessage) { - var wg sync.WaitGroup - - for _, subscription := range subscriptions { - // @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES - - found := false - for _, subscriptionTopic := range subscription.topics { - if subscriptionTopic == topic { - found = true - break - } - } - - if !found { - continue - } - - wg.Add(1) - go func(subs MessageNotificationSubscription) { - subs.handler(topic, msg) - wg.Done() - }(subscription) - } - - wg.Wait() -} diff --git a/waku/v2/protocol/waku_message.pb.go b/waku/v2/protocol/pb/waku_message.pb.go similarity index 99% rename from waku/v2/protocol/waku_message.pb.go rename to waku/v2/protocol/pb/waku_message.pb.go index 47b47c2..0c0d680 100644 --- a/waku/v2/protocol/waku_message.pb.go +++ b/waku/v2/protocol/pb/waku_message.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. // source: waku_message.proto -package protocol +package pb import ( encoding_binary "encoding/binary" diff --git a/waku/v2/protocol/waku_message.proto b/waku/v2/protocol/pb/waku_message.proto similarity index 89% rename from waku/v2/protocol/waku_message.proto rename to waku/v2/protocol/pb/waku_message.proto index 3f92496..8b689cb 100644 --- a/waku/v2/protocol/waku_message.proto +++ b/waku/v2/protocol/pb/waku_message.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -package protocol; +package pb; message WakuMessage { bytes payload = 1; diff --git a/waku/v2/protocol/waku_store.pb.go b/waku/v2/protocol/pb/waku_store.pb.go similarity index 99% rename from waku/v2/protocol/waku_store.pb.go rename to waku/v2/protocol/pb/waku_store.pb.go index f1303ae..3022070 100644 --- a/waku/v2/protocol/waku_store.pb.go +++ b/waku/v2/protocol/pb/waku_store.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. // source: waku_store.proto -package protocol +package pb import ( encoding_binary "encoding/binary" diff --git a/waku/v2/protocol/waku_store.proto b/waku/v2/protocol/pb/waku_store.proto similarity index 97% rename from waku/v2/protocol/waku_store.proto rename to waku/v2/protocol/pb/waku_store.proto index 19b367e..41c602c 100644 --- a/waku/v2/protocol/waku_store.proto +++ b/waku/v2/protocol/pb/waku_store.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -package protocol; +package pb; import "waku_message.proto"; diff --git a/waku/v2/protocol/waku_store/waku_store.go b/waku/v2/protocol/store/waku_store.go similarity index 80% rename from waku/v2/protocol/waku_store/waku_store.go rename to waku/v2/protocol/store/waku_store.go index 29c985c..f3995dc 100644 --- a/waku/v2/protocol/waku_store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -22,8 +22,8 @@ import ( "github.com/libp2p/go-msgio/protoio" ma "github.com/multiformats/go-multiaddr" - "github.com/status-im/go-waku/waku/common" "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/pb" ) var log = logging.Logger("wakustore") @@ -50,7 +50,7 @@ func minOf(vars ...int) int { return min } -func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (resMessages []IndexedWakuMessage, resPagingInfo *protocol.PagingInfo) { +func paginateWithIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMessages []IndexedWakuMessage, resPagingInfo *pb.PagingInfo) { // takes list, and performs paging based on pinfo // returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request cursor := pinfo.Cursor @@ -62,7 +62,7 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (r } if len(list) == 0 { // no pagination is needed for an empty list - return list, &protocol.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction} + return list, &pb.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction} } msgList := make([]IndexedWakuMessage, len(list)) @@ -76,22 +76,22 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (r if cursor == nil { initQuery = true // an empty cursor means it is an initial query switch dir { - case protocol.PagingInfo_FORWARD: + case pb.PagingInfo_FORWARD: cursor = list[0].index // perform paging from the begining of the list - case protocol.PagingInfo_BACKWARD: + case pb.PagingInfo_BACKWARD: cursor = list[len(list)-1].index // perform paging from the end of the list } } foundIndex := findIndex(msgList, cursor) if foundIndex == -1 { // the cursor is not valid - return nil, &protocol.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction} + return nil, &pb.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction} } var retrievedPageSize, s, e int - var newCursor *protocol.Index // to be returned as part of the new paging info + var newCursor *pb.Index // to be returned as part of the new paging info switch dir { - case protocol.PagingInfo_FORWARD: // forward pagination + case pb.PagingInfo_FORWARD: // forward pagination remainingMessages := len(msgList) - foundIndex - 1 if initQuery { remainingMessages = remainingMessages + 1 @@ -102,7 +102,7 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (r s = foundIndex + 1 // non inclusive e = foundIndex + retrievedPageSize newCursor = msgList[e].index // the new cursor points to the end of the page - case protocol.PagingInfo_BACKWARD: // backward pagination + case pb.PagingInfo_BACKWARD: // backward pagination remainingMessages := foundIndex if initQuery { remainingMessages = remainingMessages + 1 @@ -119,12 +119,12 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (r for i := s; i <= e; i++ { resMessages = append(resMessages, msgList[i]) } - resPagingInfo = &protocol.PagingInfo{PageSize: uint64(retrievedPageSize), Cursor: newCursor, Direction: pinfo.Direction} + resPagingInfo = &pb.PagingInfo{PageSize: uint64(retrievedPageSize), Cursor: newCursor, Direction: pinfo.Direction} return } -func paginateWithoutIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (resMessages []*protocol.WakuMessage, resPinfo *protocol.PagingInfo) { +func paginateWithoutIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMessages []*pb.WakuMessage, resPinfo *pb.PagingInfo) { // takes list, and performs paging based on pinfo // returns the page i.e, a sequence of WakuMessage and the new paging info to be used for the next paging request indexedData, updatedPagingInfo := paginateWithIndex(list, pinfo) @@ -135,8 +135,8 @@ func paginateWithoutIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) return } -func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.HistoryResponse { - result := new(protocol.HistoryResponse) +func (w *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse { + result := new(pb.HistoryResponse) // data holds IndexedWakuMessage whose topics match the query var data []IndexedWakuMessage for _, indexedMsg := range w.messages { @@ -161,32 +161,30 @@ func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.History } type MessageProvider interface { - GetAll() ([]*protocol.WakuMessage, error) - Put(cursor *protocol.Index, message *protocol.WakuMessage) error + GetAll() ([]*pb.WakuMessage, error) + Put(cursor *pb.Index, message *pb.WakuMessage) error Stop() } type IndexedWakuMessage struct { - msg *protocol.WakuMessage - index *protocol.Index + msg *pb.WakuMessage + index *pb.Index } type WakuStore struct { - MsgC chan *common.Envelope + MsgC chan *protocol.Envelope messages []IndexedWakuMessage messagesMutex sync.Mutex storeMsgs bool msgProvider MessageProvider h host.Host - ctx context.Context } -func NewWakuStore(ctx context.Context, shouldStoreMessages bool, p MessageProvider) *WakuStore { +func NewWakuStore(shouldStoreMessages bool, p MessageProvider) *WakuStore { wakuStore := new(WakuStore) - wakuStore.MsgC = make(chan *common.Envelope) + wakuStore.MsgC = make(chan *protocol.Envelope) wakuStore.msgProvider = p - wakuStore.ctx = ctx wakuStore.storeMsgs = shouldStoreMessages return wakuStore @@ -258,7 +256,7 @@ func (store *WakuStore) storeIncomingMessages() { func (store *WakuStore) onRequest(s network.Stream) { defer s.Close() - historyRPCRequest := &protocol.HistoryRPC{} + historyRPCRequest := &pb.HistoryRPC{} writer := protoio.NewDelimitedWriter(s) reader := protoio.NewDelimitedReader(s, 64*1024) @@ -271,7 +269,7 @@ func (store *WakuStore) onRequest(s network.Stream) { log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) - historyResponseRPC := &protocol.HistoryRPC{} + historyResponseRPC := &pb.HistoryRPC{} historyResponseRPC.RequestId = historyRPCRequest.RequestId historyResponseRPC.Response = store.FindMessages(historyRPCRequest.Query) @@ -284,19 +282,19 @@ func (store *WakuStore) onRequest(s network.Stream) { } } -func computeIndex(msg *protocol.WakuMessage) (*protocol.Index, error) { +func computeIndex(msg *pb.WakuMessage) (*pb.Index, error) { data, err := msg.Marshal() if err != nil { return nil, err } digest := sha256.Sum256(data) - return &protocol.Index{ + return &pb.Index{ Digest: digest[:], ReceivedTime: float64(time.Now().UnixNano()), }, nil } -func indexComparison(x, y *protocol.Index) int { +func indexComparison(x, y *pb.Index) int { // compares x and y // returns 0 if they are equal // returns -1 if x < y @@ -325,7 +323,7 @@ func indexedWakuMessageComparison(x, y IndexedWakuMessage) int { return indexComparison(x.index, y.index) } -func findIndex(msgList []IndexedWakuMessage, index *protocol.Index) int { +func findIndex(msgList []IndexedWakuMessage, index *pb.Index) int { // returns the position of an IndexedWakuMessage in msgList whose index value matches the given index // returns -1 if no match is found for i, indexedWakuMessage := range msgList { @@ -415,12 +413,9 @@ type HistoryRequestParameters struct { selectedPeer peer.ID requestId []byte timeout *time.Duration - ctx context.Context - cancelFunc context.CancelFunc - - cursor *protocol.Index - pageSize uint64 - asc bool + cursor *pb.Index + pageSize uint64 + asc bool s *WakuStore } @@ -452,14 +447,7 @@ func WithAutomaticRequestId() HistoryRequestOption { } } -func WithTimeout(t time.Duration) HistoryRequestOption { - return func(params *HistoryRequestParameters) { - params.timeout = &t - params.ctx, params.cancelFunc = context.WithTimeout(params.s.ctx, t) - } -} - -func WithCursor(c *protocol.Index) HistoryRequestOption { +func WithCursor(c *pb.Index) HistoryRequestOption { return func(params *HistoryRequestParameters) { params.cursor = c } @@ -476,12 +464,11 @@ func DefaultOptions() []HistoryRequestOption { return []HistoryRequestOption{ WithAutomaticRequestId(), WithAutomaticPeerSelection(), - WithTimeout(ConnectionTimeout), WithPaging(true, 0), } } -func (store *WakuStore) Query(q *protocol.HistoryQuery, opts ...HistoryRequestOption) (*protocol.HistoryResponse, error) { +func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...HistoryRequestOption) (*pb.HistoryResponse, error) { params := new(HistoryRequestParameters) params.s = store for _, opt := range opts { @@ -496,31 +483,19 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery, opts ...HistoryRequestOp return nil, ErrInvalidId } - // Setting default timeout if none is specified - if params.timeout == nil { - timeoutF := WithTimeout(ConnectionTimeout) - timeoutF(params) - } - - if *params.timeout == 0 { - params.ctx = store.ctx - } else { - defer params.cancelFunc() - } - if params.cursor != nil { q.PagingInfo.Cursor = params.cursor } if params.asc { - q.PagingInfo.Direction = protocol.PagingInfo_FORWARD + q.PagingInfo.Direction = pb.PagingInfo_FORWARD } else { - q.PagingInfo.Direction = protocol.PagingInfo_BACKWARD + q.PagingInfo.Direction = pb.PagingInfo_BACKWARD } q.PagingInfo.PageSize = params.pageSize - connOpt, err := store.h.NewStream(params.ctx, params.selectedPeer, WakuStoreProtocolId) + connOpt, err := store.h.NewStream(ctx, params.selectedPeer, WakuStoreProtocolId) if err != nil { log.Info("failed to connect to remote peer", err) return nil, err @@ -529,7 +504,7 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery, opts ...HistoryRequestOp defer connOpt.Close() defer connOpt.Reset() - historyRequest := &protocol.HistoryRPC{Query: q, RequestId: hex.EncodeToString(params.requestId)} + historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(params.requestId)} writer := protoio.NewDelimitedWriter(connOpt) reader := protoio.NewDelimitedReader(connOpt, 64*1024) @@ -540,7 +515,7 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery, opts ...HistoryRequestOp return nil, err } - historyResponseRPC := &protocol.HistoryRPC{} + historyResponseRPC := &pb.HistoryRPC{} err = reader.ReadMsg(historyResponseRPC) if err != nil { log.Error("could not read response", err) diff --git a/waku/v2/protocol/waku_filter/README.md b/waku/v2/protocol/waku_filter/README.md deleted file mode 100644 index 8fd8446..0000000 --- a/waku/v2/protocol/waku_filter/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Waku Filter protocol - -The filter protocol implements bandwidth preserving filtering for light nodes. See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md for more information. diff --git a/waku/v2/protocol/waku_store/README.md b/waku/v2/protocol/waku_store/README.md deleted file mode 100644 index f5adaf4..0000000 --- a/waku/v2/protocol/waku_store/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Waku Store protocol - -The store protocol implements historical message support. See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md for more information.