mirror of
https://github.com/status-im/go-waku.git
synced 2025-01-27 14:05:18 +00:00
refactor: organize code
This commit is contained in:
parent
c3d9868e77
commit
997bc4f2d8
@ -4,8 +4,8 @@ import (
|
||||
"database/sql"
|
||||
"log"
|
||||
|
||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||
store "github.com/status-im/go-waku/waku/v2/protocol/waku_store"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/store"
|
||||
)
|
||||
|
||||
type DBStore struct {
|
||||
@ -68,7 +68,7 @@ func (d *DBStore) Stop() {
|
||||
d.db.Close()
|
||||
}
|
||||
|
||||
func (d *DBStore) Put(cursor *protocol.Index, message *protocol.WakuMessage) error {
|
||||
func (d *DBStore) Put(cursor *pb.Index, message *pb.WakuMessage) error {
|
||||
stmt, err := d.db.Prepare("INSERT INTO messages (id, timestamp, contentTopic, payload, version) VALUES (?, ?, ?, ?, ?)")
|
||||
if err != nil {
|
||||
return err
|
||||
@ -81,13 +81,13 @@ func (d *DBStore) Put(cursor *protocol.Index, message *protocol.WakuMessage) err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DBStore) GetAll() ([]*protocol.WakuMessage, error) {
|
||||
func (d *DBStore) GetAll() ([]*pb.WakuMessage, error) {
|
||||
rows, err := d.db.Query("SELECT timestamp, contentTopic, payload, version FROM messages ORDER BY timestamp ASC")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []*protocol.WakuMessage
|
||||
var result []*pb.WakuMessage
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
@ -102,7 +102,7 @@ func (d *DBStore) GetAll() ([]*protocol.WakuMessage, error) {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
msg := new(protocol.WakuMessage)
|
||||
msg := new(pb.WakuMessage)
|
||||
msg.ContentTopic = contentTopic
|
||||
msg.Payload = payload
|
||||
msg.Timestamp = float64(timestamp)
|
||||
|
@ -1,33 +1,33 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"github.com/status-im/go-waku/waku/common"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||
)
|
||||
|
||||
// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 which was released under MIT license
|
||||
|
||||
type broadcaster struct {
|
||||
input chan *common.Envelope
|
||||
reg chan chan<- *common.Envelope
|
||||
unreg chan chan<- *common.Envelope
|
||||
input chan *protocol.Envelope
|
||||
reg chan chan<- *protocol.Envelope
|
||||
unreg chan chan<- *protocol.Envelope
|
||||
|
||||
outputs map[chan<- *common.Envelope]bool
|
||||
outputs map[chan<- *protocol.Envelope]bool
|
||||
}
|
||||
|
||||
// The Broadcaster interface describes the main entry points to
|
||||
// broadcasters.
|
||||
type Broadcaster interface {
|
||||
// Register a new channel to receive broadcasts
|
||||
Register(chan<- *common.Envelope)
|
||||
Register(chan<- *protocol.Envelope)
|
||||
// Unregister a channel so that it no longer receives broadcasts.
|
||||
Unregister(chan<- *common.Envelope)
|
||||
Unregister(chan<- *protocol.Envelope)
|
||||
// Shut this broadcaster down.
|
||||
Close() error
|
||||
// Submit a new object to all subscribers
|
||||
Submit(*common.Envelope)
|
||||
Submit(*protocol.Envelope)
|
||||
}
|
||||
|
||||
func (b *broadcaster) broadcast(m *common.Envelope) {
|
||||
func (b *broadcaster) broadcast(m *protocol.Envelope) {
|
||||
for ch := range b.outputs {
|
||||
ch <- m
|
||||
}
|
||||
@ -52,10 +52,10 @@ func (b *broadcaster) run() {
|
||||
|
||||
func NewBroadcaster(buflen int) Broadcaster {
|
||||
b := &broadcaster{
|
||||
input: make(chan *common.Envelope, buflen),
|
||||
reg: make(chan chan<- *common.Envelope),
|
||||
unreg: make(chan chan<- *common.Envelope),
|
||||
outputs: make(map[chan<- *common.Envelope]bool),
|
||||
input: make(chan *protocol.Envelope, buflen),
|
||||
reg: make(chan chan<- *protocol.Envelope),
|
||||
unreg: make(chan chan<- *protocol.Envelope),
|
||||
outputs: make(map[chan<- *protocol.Envelope]bool),
|
||||
}
|
||||
|
||||
go b.run()
|
||||
@ -63,11 +63,11 @@ func NewBroadcaster(buflen int) Broadcaster {
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *broadcaster) Register(newch chan<- *common.Envelope) {
|
||||
func (b *broadcaster) Register(newch chan<- *protocol.Envelope) {
|
||||
b.reg <- newch
|
||||
}
|
||||
|
||||
func (b *broadcaster) Unregister(newch chan<- *common.Envelope) {
|
||||
func (b *broadcaster) Unregister(newch chan<- *protocol.Envelope) {
|
||||
b.unreg <- newch
|
||||
}
|
||||
|
||||
@ -76,7 +76,7 @@ func (b *broadcaster) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *broadcaster) Submit(m *common.Envelope) {
|
||||
func (b *broadcaster) Submit(m *protocol.Envelope) {
|
||||
if b != nil {
|
||||
b.input <- m
|
||||
}
|
||||
|
@ -3,11 +3,11 @@ package node
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/status-im/go-waku/waku/common"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||
)
|
||||
|
||||
type Subscription struct {
|
||||
C chan *common.Envelope
|
||||
C chan *protocol.Envelope
|
||||
closed bool
|
||||
mutex sync.Mutex
|
||||
quit chan struct{}
|
||||
|
@ -13,7 +13,7 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/crypto/ecies"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||
)
|
||||
|
||||
type KeyKind string
|
||||
@ -86,7 +86,7 @@ func (payload Payload) Encode(version uint32) ([]byte, error) {
|
||||
return nil, errors.New("Unsupported WakuMessage version")
|
||||
}
|
||||
|
||||
func DecodePayload(message *protocol.WakuMessage, keyInfo *KeyInfo) (*DecodedPayload, error) {
|
||||
func DecodePayload(message *pb.WakuMessage, keyInfo *KeyInfo) (*DecodedPayload, error) {
|
||||
switch message.Version {
|
||||
case uint32(0):
|
||||
return &DecodedPayload{Data: message.Payload}, nil
|
||||
|
@ -14,9 +14,10 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
common "github.com/status-im/go-waku/waku/common"
|
||||
|
||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||
store "github.com/status-im/go-waku/waku/v2/protocol/waku_store"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/store"
|
||||
wakurelay "github.com/status-im/go-wakurelay-pubsub"
|
||||
)
|
||||
|
||||
@ -56,7 +57,6 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
_ = cancel
|
||||
|
||||
params.ctx = ctx
|
||||
params.libP2POpts = DefaultLibP2POptions
|
||||
|
||||
for _, opt := range opts {
|
||||
@ -195,21 +195,21 @@ func (w *WakuNode) AddStorePeer(address string) (*peer.ID, error) {
|
||||
return &info.ID, w.opts.store.AddPeer(info.ID, info.Addrs)
|
||||
}
|
||||
|
||||
func (w *WakuNode) Query(contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*protocol.HistoryResponse, error) {
|
||||
func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*pb.HistoryResponse, error) {
|
||||
if w.opts.store == nil {
|
||||
return nil, errors.New("WakuStore is not set")
|
||||
}
|
||||
|
||||
query := new(protocol.HistoryQuery)
|
||||
query := new(pb.HistoryQuery)
|
||||
|
||||
for _, ct := range contentTopics {
|
||||
query.ContentFilters = append(query.ContentFilters, &protocol.ContentFilter{ContentTopic: ct})
|
||||
query.ContentFilters = append(query.ContentFilters, &pb.ContentFilter{ContentTopic: ct})
|
||||
}
|
||||
|
||||
query.StartTime = startTime
|
||||
query.EndTime = endTime
|
||||
query.PagingInfo = new(protocol.PagingInfo)
|
||||
result, err := w.opts.store.Query(query, opts...)
|
||||
query.PagingInfo = new(pb.PagingInfo)
|
||||
result, err := w.opts.store.Query(ctx, query, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -241,7 +241,7 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
|
||||
// Create client subscription
|
||||
subscription := new(Subscription)
|
||||
subscription.closed = false
|
||||
subscription.C = make(chan *common.Envelope, 1024) // To avoid blocking
|
||||
subscription.C = make(chan *protocol.Envelope, 1024) // To avoid blocking
|
||||
subscription.quit = make(chan struct{})
|
||||
|
||||
node.subscriptionsMutex.Lock()
|
||||
@ -274,13 +274,13 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
|
||||
return
|
||||
}
|
||||
|
||||
wakuMessage := &protocol.WakuMessage{}
|
||||
wakuMessage := &pb.WakuMessage{}
|
||||
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
|
||||
log.Error("could not decode message", err)
|
||||
return
|
||||
}
|
||||
|
||||
envelope := common.NewEnvelope(wakuMessage, len(msg.Data), gcrypto.Keccak256(msg.Data))
|
||||
envelope := protocol.NewEnvelope(wakuMessage, len(msg.Data), gcrypto.Keccak256(msg.Data))
|
||||
|
||||
node.bcaster.Submit(envelope)
|
||||
}
|
||||
@ -332,7 +332,7 @@ func (node *WakuNode) upsertSubscription(topic Topic) (*wakurelay.Subscription,
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
func (node *WakuNode) Publish(message *protocol.WakuMessage, topic *Topic) ([]byte, error) {
|
||||
func (node *WakuNode) Publish(message *pb.WakuMessage, topic *Topic) ([]byte, error) {
|
||||
// Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
|
||||
// `contentTopic` field for light node functionality. This field may be also
|
||||
// be omitted.
|
||||
|
@ -1,7 +1,6 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"net"
|
||||
|
||||
@ -10,7 +9,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/crypto"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
manet "github.com/multiformats/go-multiaddr-net"
|
||||
store "github.com/status-im/go-waku/waku/v2/protocol/waku_store"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/store"
|
||||
wakurelay "github.com/status-im/go-wakurelay-pubsub"
|
||||
)
|
||||
|
||||
@ -25,8 +24,6 @@ type WakuNodeParameters struct {
|
||||
enableStore bool
|
||||
storeMsgs bool
|
||||
store *store.WakuStore
|
||||
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
type WakuNodeOption func(*WakuNodeParameters) error
|
||||
@ -75,7 +72,7 @@ func WithWakuStore(shouldStoreMessages bool) WakuNodeOption {
|
||||
return func(params *WakuNodeParameters) error {
|
||||
params.enableStore = true
|
||||
params.storeMsgs = shouldStoreMessages
|
||||
params.store = store.NewWakuStore(params.ctx, shouldStoreMessages, nil)
|
||||
params.store = store.NewWakuStore(shouldStoreMessages, nil)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -85,7 +82,7 @@ func WithMessageProvider(s store.MessageProvider) WakuNodeOption {
|
||||
if params.store != nil {
|
||||
params.store.SetMsgProvider(s)
|
||||
} else {
|
||||
params.store = store.NewWakuStore(params.ctx, true, s)
|
||||
params.store = store.NewWakuStore(true, s)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1,14 +1,14 @@
|
||||
package common
|
||||
package protocol
|
||||
|
||||
import "github.com/status-im/go-waku/waku/v2/protocol"
|
||||
import "github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||
|
||||
type Envelope struct {
|
||||
msg *protocol.WakuMessage
|
||||
msg *pb.WakuMessage
|
||||
size int
|
||||
hash []byte
|
||||
}
|
||||
|
||||
func NewEnvelope(msg *protocol.WakuMessage, size int, hash []byte) *Envelope {
|
||||
func NewEnvelope(msg *pb.WakuMessage, size int, hash []byte) *Envelope {
|
||||
return &Envelope{
|
||||
msg: msg,
|
||||
size: size,
|
||||
@ -16,7 +16,7 @@ func NewEnvelope(msg *protocol.WakuMessage, size int, hash []byte) *Envelope {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Envelope) Message() *protocol.WakuMessage {
|
||||
func (e *Envelope) Message() *pb.WakuMessage {
|
||||
return e.msg
|
||||
}
|
||||
|
@ -1,72 +0,0 @@
|
||||
// The Message Notification system is a method to notify various protocols
|
||||
// running on a node when a new message was received.
|
||||
//
|
||||
// Protocols can subscribe to messages of specific topics, then when one is received
|
||||
// The notification handler function will be called.
|
||||
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type MessageNotificationHandler func(topic string, msg *WakuMessage)
|
||||
|
||||
type MessageNotificationSubscriptionIdentifier string
|
||||
|
||||
type MessageNotificationSubscription struct {
|
||||
topics []string // @TODO TOPIC (?)
|
||||
handler MessageNotificationHandler
|
||||
}
|
||||
|
||||
type MessageNotificationSubscriptions map[string]MessageNotificationSubscription
|
||||
|
||||
func (subscriptions MessageNotificationSubscriptions) subscribe(name string, subscription MessageNotificationSubscription) {
|
||||
subscriptions[name] = subscription
|
||||
}
|
||||
|
||||
func Init(topics []string, handler MessageNotificationHandler) MessageNotificationSubscription {
|
||||
result := MessageNotificationSubscription{}
|
||||
result.topics = topics
|
||||
result.handler = handler
|
||||
return result
|
||||
}
|
||||
|
||||
func containsMatch(lhs []string, rhs []string) bool {
|
||||
for _, l := range lhs {
|
||||
for _, r := range rhs {
|
||||
if l == r {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (subscriptions MessageNotificationSubscriptions) notify(topic string, msg *WakuMessage) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, subscription := range subscriptions {
|
||||
// @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES
|
||||
|
||||
found := false
|
||||
for _, subscriptionTopic := range subscription.topics {
|
||||
if subscriptionTopic == topic {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(subs MessageNotificationSubscription) {
|
||||
subs.handler(topic, msg)
|
||||
wg.Done()
|
||||
}(subscription)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
@ -1,7 +1,7 @@
|
||||
// Code generated by protoc-gen-gogo. DO NOT EDIT.
|
||||
// source: waku_message.proto
|
||||
|
||||
package protocol
|
||||
package pb
|
||||
|
||||
import (
|
||||
encoding_binary "encoding/binary"
|
@ -1,6 +1,6 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package protocol;
|
||||
package pb;
|
||||
|
||||
message WakuMessage {
|
||||
bytes payload = 1;
|
@ -1,7 +1,7 @@
|
||||
// Code generated by protoc-gen-gogo. DO NOT EDIT.
|
||||
// source: waku_store.proto
|
||||
|
||||
package protocol
|
||||
package pb
|
||||
|
||||
import (
|
||||
encoding_binary "encoding/binary"
|
@ -1,6 +1,6 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package protocol;
|
||||
package pb;
|
||||
|
||||
import "waku_message.proto";
|
||||
|
@ -22,8 +22,8 @@ import (
|
||||
"github.com/libp2p/go-msgio/protoio"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/status-im/go-waku/waku/common"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||
)
|
||||
|
||||
var log = logging.Logger("wakustore")
|
||||
@ -50,7 +50,7 @@ func minOf(vars ...int) int {
|
||||
return min
|
||||
}
|
||||
|
||||
func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (resMessages []IndexedWakuMessage, resPagingInfo *protocol.PagingInfo) {
|
||||
func paginateWithIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMessages []IndexedWakuMessage, resPagingInfo *pb.PagingInfo) {
|
||||
// takes list, and performs paging based on pinfo
|
||||
// returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request
|
||||
cursor := pinfo.Cursor
|
||||
@ -62,7 +62,7 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (r
|
||||
}
|
||||
|
||||
if len(list) == 0 { // no pagination is needed for an empty list
|
||||
return list, &protocol.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction}
|
||||
return list, &pb.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction}
|
||||
}
|
||||
|
||||
msgList := make([]IndexedWakuMessage, len(list))
|
||||
@ -76,22 +76,22 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (r
|
||||
if cursor == nil {
|
||||
initQuery = true // an empty cursor means it is an initial query
|
||||
switch dir {
|
||||
case protocol.PagingInfo_FORWARD:
|
||||
case pb.PagingInfo_FORWARD:
|
||||
cursor = list[0].index // perform paging from the begining of the list
|
||||
case protocol.PagingInfo_BACKWARD:
|
||||
case pb.PagingInfo_BACKWARD:
|
||||
cursor = list[len(list)-1].index // perform paging from the end of the list
|
||||
}
|
||||
}
|
||||
|
||||
foundIndex := findIndex(msgList, cursor)
|
||||
if foundIndex == -1 { // the cursor is not valid
|
||||
return nil, &protocol.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction}
|
||||
return nil, &pb.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction}
|
||||
}
|
||||
|
||||
var retrievedPageSize, s, e int
|
||||
var newCursor *protocol.Index // to be returned as part of the new paging info
|
||||
var newCursor *pb.Index // to be returned as part of the new paging info
|
||||
switch dir {
|
||||
case protocol.PagingInfo_FORWARD: // forward pagination
|
||||
case pb.PagingInfo_FORWARD: // forward pagination
|
||||
remainingMessages := len(msgList) - foundIndex - 1
|
||||
if initQuery {
|
||||
remainingMessages = remainingMessages + 1
|
||||
@ -102,7 +102,7 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (r
|
||||
s = foundIndex + 1 // non inclusive
|
||||
e = foundIndex + retrievedPageSize
|
||||
newCursor = msgList[e].index // the new cursor points to the end of the page
|
||||
case protocol.PagingInfo_BACKWARD: // backward pagination
|
||||
case pb.PagingInfo_BACKWARD: // backward pagination
|
||||
remainingMessages := foundIndex
|
||||
if initQuery {
|
||||
remainingMessages = remainingMessages + 1
|
||||
@ -119,12 +119,12 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (r
|
||||
for i := s; i <= e; i++ {
|
||||
resMessages = append(resMessages, msgList[i])
|
||||
}
|
||||
resPagingInfo = &protocol.PagingInfo{PageSize: uint64(retrievedPageSize), Cursor: newCursor, Direction: pinfo.Direction}
|
||||
resPagingInfo = &pb.PagingInfo{PageSize: uint64(retrievedPageSize), Cursor: newCursor, Direction: pinfo.Direction}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func paginateWithoutIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo) (resMessages []*protocol.WakuMessage, resPinfo *protocol.PagingInfo) {
|
||||
func paginateWithoutIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMessages []*pb.WakuMessage, resPinfo *pb.PagingInfo) {
|
||||
// takes list, and performs paging based on pinfo
|
||||
// returns the page i.e, a sequence of WakuMessage and the new paging info to be used for the next paging request
|
||||
indexedData, updatedPagingInfo := paginateWithIndex(list, pinfo)
|
||||
@ -135,8 +135,8 @@ func paginateWithoutIndex(list []IndexedWakuMessage, pinfo *protocol.PagingInfo)
|
||||
return
|
||||
}
|
||||
|
||||
func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.HistoryResponse {
|
||||
result := new(protocol.HistoryResponse)
|
||||
func (w *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse {
|
||||
result := new(pb.HistoryResponse)
|
||||
// data holds IndexedWakuMessage whose topics match the query
|
||||
var data []IndexedWakuMessage
|
||||
for _, indexedMsg := range w.messages {
|
||||
@ -161,32 +161,30 @@ func (w *WakuStore) FindMessages(query *protocol.HistoryQuery) *protocol.History
|
||||
}
|
||||
|
||||
type MessageProvider interface {
|
||||
GetAll() ([]*protocol.WakuMessage, error)
|
||||
Put(cursor *protocol.Index, message *protocol.WakuMessage) error
|
||||
GetAll() ([]*pb.WakuMessage, error)
|
||||
Put(cursor *pb.Index, message *pb.WakuMessage) error
|
||||
Stop()
|
||||
}
|
||||
|
||||
type IndexedWakuMessage struct {
|
||||
msg *protocol.WakuMessage
|
||||
index *protocol.Index
|
||||
msg *pb.WakuMessage
|
||||
index *pb.Index
|
||||
}
|
||||
|
||||
type WakuStore struct {
|
||||
MsgC chan *common.Envelope
|
||||
MsgC chan *protocol.Envelope
|
||||
messages []IndexedWakuMessage
|
||||
messagesMutex sync.Mutex
|
||||
|
||||
storeMsgs bool
|
||||
msgProvider MessageProvider
|
||||
h host.Host
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewWakuStore(ctx context.Context, shouldStoreMessages bool, p MessageProvider) *WakuStore {
|
||||
func NewWakuStore(shouldStoreMessages bool, p MessageProvider) *WakuStore {
|
||||
wakuStore := new(WakuStore)
|
||||
wakuStore.MsgC = make(chan *common.Envelope)
|
||||
wakuStore.MsgC = make(chan *protocol.Envelope)
|
||||
wakuStore.msgProvider = p
|
||||
wakuStore.ctx = ctx
|
||||
wakuStore.storeMsgs = shouldStoreMessages
|
||||
|
||||
return wakuStore
|
||||
@ -258,7 +256,7 @@ func (store *WakuStore) storeIncomingMessages() {
|
||||
func (store *WakuStore) onRequest(s network.Stream) {
|
||||
defer s.Close()
|
||||
|
||||
historyRPCRequest := &protocol.HistoryRPC{}
|
||||
historyRPCRequest := &pb.HistoryRPC{}
|
||||
|
||||
writer := protoio.NewDelimitedWriter(s)
|
||||
reader := protoio.NewDelimitedReader(s, 64*1024)
|
||||
@ -271,7 +269,7 @@ func (store *WakuStore) onRequest(s network.Stream) {
|
||||
|
||||
log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()))
|
||||
|
||||
historyResponseRPC := &protocol.HistoryRPC{}
|
||||
historyResponseRPC := &pb.HistoryRPC{}
|
||||
historyResponseRPC.RequestId = historyRPCRequest.RequestId
|
||||
historyResponseRPC.Response = store.FindMessages(historyRPCRequest.Query)
|
||||
|
||||
@ -284,19 +282,19 @@ func (store *WakuStore) onRequest(s network.Stream) {
|
||||
}
|
||||
}
|
||||
|
||||
func computeIndex(msg *protocol.WakuMessage) (*protocol.Index, error) {
|
||||
func computeIndex(msg *pb.WakuMessage) (*pb.Index, error) {
|
||||
data, err := msg.Marshal()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
digest := sha256.Sum256(data)
|
||||
return &protocol.Index{
|
||||
return &pb.Index{
|
||||
Digest: digest[:],
|
||||
ReceivedTime: float64(time.Now().UnixNano()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func indexComparison(x, y *protocol.Index) int {
|
||||
func indexComparison(x, y *pb.Index) int {
|
||||
// compares x and y
|
||||
// returns 0 if they are equal
|
||||
// returns -1 if x < y
|
||||
@ -325,7 +323,7 @@ func indexedWakuMessageComparison(x, y IndexedWakuMessage) int {
|
||||
return indexComparison(x.index, y.index)
|
||||
}
|
||||
|
||||
func findIndex(msgList []IndexedWakuMessage, index *protocol.Index) int {
|
||||
func findIndex(msgList []IndexedWakuMessage, index *pb.Index) int {
|
||||
// returns the position of an IndexedWakuMessage in msgList whose index value matches the given index
|
||||
// returns -1 if no match is found
|
||||
for i, indexedWakuMessage := range msgList {
|
||||
@ -415,12 +413,9 @@ type HistoryRequestParameters struct {
|
||||
selectedPeer peer.ID
|
||||
requestId []byte
|
||||
timeout *time.Duration
|
||||
ctx context.Context
|
||||
cancelFunc context.CancelFunc
|
||||
|
||||
cursor *protocol.Index
|
||||
pageSize uint64
|
||||
asc bool
|
||||
cursor *pb.Index
|
||||
pageSize uint64
|
||||
asc bool
|
||||
|
||||
s *WakuStore
|
||||
}
|
||||
@ -452,14 +447,7 @@ func WithAutomaticRequestId() HistoryRequestOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithTimeout(t time.Duration) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
params.timeout = &t
|
||||
params.ctx, params.cancelFunc = context.WithTimeout(params.s.ctx, t)
|
||||
}
|
||||
}
|
||||
|
||||
func WithCursor(c *protocol.Index) HistoryRequestOption {
|
||||
func WithCursor(c *pb.Index) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
params.cursor = c
|
||||
}
|
||||
@ -476,12 +464,11 @@ func DefaultOptions() []HistoryRequestOption {
|
||||
return []HistoryRequestOption{
|
||||
WithAutomaticRequestId(),
|
||||
WithAutomaticPeerSelection(),
|
||||
WithTimeout(ConnectionTimeout),
|
||||
WithPaging(true, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (store *WakuStore) Query(q *protocol.HistoryQuery, opts ...HistoryRequestOption) (*protocol.HistoryResponse, error) {
|
||||
func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...HistoryRequestOption) (*pb.HistoryResponse, error) {
|
||||
params := new(HistoryRequestParameters)
|
||||
params.s = store
|
||||
for _, opt := range opts {
|
||||
@ -496,31 +483,19 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery, opts ...HistoryRequestOp
|
||||
return nil, ErrInvalidId
|
||||
}
|
||||
|
||||
// Setting default timeout if none is specified
|
||||
if params.timeout == nil {
|
||||
timeoutF := WithTimeout(ConnectionTimeout)
|
||||
timeoutF(params)
|
||||
}
|
||||
|
||||
if *params.timeout == 0 {
|
||||
params.ctx = store.ctx
|
||||
} else {
|
||||
defer params.cancelFunc()
|
||||
}
|
||||
|
||||
if params.cursor != nil {
|
||||
q.PagingInfo.Cursor = params.cursor
|
||||
}
|
||||
|
||||
if params.asc {
|
||||
q.PagingInfo.Direction = protocol.PagingInfo_FORWARD
|
||||
q.PagingInfo.Direction = pb.PagingInfo_FORWARD
|
||||
} else {
|
||||
q.PagingInfo.Direction = protocol.PagingInfo_BACKWARD
|
||||
q.PagingInfo.Direction = pb.PagingInfo_BACKWARD
|
||||
}
|
||||
|
||||
q.PagingInfo.PageSize = params.pageSize
|
||||
|
||||
connOpt, err := store.h.NewStream(params.ctx, params.selectedPeer, WakuStoreProtocolId)
|
||||
connOpt, err := store.h.NewStream(ctx, params.selectedPeer, WakuStoreProtocolId)
|
||||
if err != nil {
|
||||
log.Info("failed to connect to remote peer", err)
|
||||
return nil, err
|
||||
@ -529,7 +504,7 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery, opts ...HistoryRequestOp
|
||||
defer connOpt.Close()
|
||||
defer connOpt.Reset()
|
||||
|
||||
historyRequest := &protocol.HistoryRPC{Query: q, RequestId: hex.EncodeToString(params.requestId)}
|
||||
historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(params.requestId)}
|
||||
|
||||
writer := protoio.NewDelimitedWriter(connOpt)
|
||||
reader := protoio.NewDelimitedReader(connOpt, 64*1024)
|
||||
@ -540,7 +515,7 @@ func (store *WakuStore) Query(q *protocol.HistoryQuery, opts ...HistoryRequestOp
|
||||
return nil, err
|
||||
}
|
||||
|
||||
historyResponseRPC := &protocol.HistoryRPC{}
|
||||
historyResponseRPC := &pb.HistoryRPC{}
|
||||
err = reader.ReadMsg(historyResponseRPC)
|
||||
if err != nil {
|
||||
log.Error("could not read response", err)
|
@ -1,3 +0,0 @@
|
||||
# Waku Filter protocol
|
||||
|
||||
The filter protocol implements bandwidth preserving filtering for light nodes. See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md for more information.
|
@ -1,3 +0,0 @@
|
||||
# Waku Store protocol
|
||||
|
||||
The store protocol implements historical message support. See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md for more information.
|
Loading…
x
Reference in New Issue
Block a user