waku2: enable message confirmations (#2416)

This commit is contained in:
Richard Ramos 2021-11-02 14:27:37 -04:00 committed by GitHub
parent 4a0ad1cf69
commit 4602982c77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 723 additions and 355 deletions

2
go.mod
View File

@ -49,7 +49,7 @@ require (
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
github.com/status-im/doubleratchet v3.0.0+incompatible
github.com/status-im/go-waku v0.0.0-20211018124348-4227a9d69d19
github.com/status-im/go-waku v0.0.0-20211101173358-268767262b60
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432
github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6
github.com/status-im/migrate/v4 v4.6.2-status.2

4
go.sum
View File

@ -1206,8 +1206,8 @@ github.com/status-im/go-ethereum v1.10.4-status.3 h1:RF618iSCvqJtXu3ZSg7XNg6MJaS
github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE=
github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE=
github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU=
github.com/status-im/go-waku v0.0.0-20211018124348-4227a9d69d19 h1:z5xRto9SI+VgPNA52Wf+NTYs1i75hfW5gGhJC4k53Vs=
github.com/status-im/go-waku v0.0.0-20211018124348-4227a9d69d19/go.mod h1:A0lI3uZYLKrXiviVkwGgBdT8b9HLcW3U/xUcE/4665k=
github.com/status-im/go-waku v0.0.0-20211101173358-268767262b60 h1:ymq5jgtepOPqNbs8yA9g2hkFn5811snImNqNbQzpcDA=
github.com/status-im/go-waku v0.0.0-20211101173358-268767262b60/go.mod h1:A0lI3uZYLKrXiviVkwGgBdT8b9HLcW3U/xUcE/4665k=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=

View File

@ -163,7 +163,8 @@ func (m *EnvelopesMonitor) handleEvent(event types.EnvelopeEvent) {
}
func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) {
if m.mailServerConfirmation {
// Mailserver confirmations for WakuV2 are disabled
if (m.w == nil || m.w.Version() < 2) && m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
@ -173,6 +174,7 @@ func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) {
defer m.mu.Unlock()
state, ok := m.envelopes[event.Hash]
// if we didn't send a message using extension - skip it
// if message was already confirmed - skip it
if !ok || state == EnvelopeSent {

View File

@ -0,0 +1,34 @@
package persistence
import (
rendezvous "github.com/status-im/go-waku-rendezvous"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
type RendezVousLevelDB struct {
db *leveldb.DB
}
func NewRendezVousLevelDB(dBPath string) (*RendezVousLevelDB, error) {
db, err := leveldb.OpenFile(dBPath, &opt.Options{OpenFilesCacheCapacity: 3})
if err != nil {
return nil, err
}
return &RendezVousLevelDB{db}, nil
}
func (r *RendezVousLevelDB) Delete(key []byte) error {
return r.db.Delete(key, nil)
}
func (r *RendezVousLevelDB) Put(key []byte, value []byte) error {
return r.db.Put(key, value, nil)
}
func (r *RendezVousLevelDB) NewIterator(prefix []byte) rendezvous.Iterator {
return r.db.NewIterator(util.BytesPrefix(prefix), nil)
}

View File

@ -0,0 +1,153 @@
package persistence
import (
"database/sql"
"log"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
)
type MessageProvider interface {
GetAll() ([]StoredMessage, error)
Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMessage) error
Stop()
}
// DBStore is a MessageProvider that has a *sql.DB connection
type DBStore struct {
MessageProvider
db *sql.DB
}
type StoredMessage struct {
ID []byte
PubsubTopic string
ReceiverTime float64
Message *pb.WakuMessage
}
// DBOption is an optional setting that can be used to configure the DBStore
type DBOption func(*DBStore) error
// WithDB is a DBOption that lets you use any custom *sql.DB with a DBStore.
func WithDB(db *sql.DB) DBOption {
return func(d *DBStore) error {
d.db = db
return nil
}
}
// WithDriver is a DBOption that will open a *sql.DB connection
func WithDriver(driverName string, datasourceName string) DBOption {
return func(d *DBStore) error {
db, err := sql.Open(driverName, datasourceName)
if err != nil {
return err
}
d.db = db
return nil
}
}
// Creates a new DB store using the db specified via options.
// It will create a messages table if it does not exist
func NewDBStore(opt DBOption) (*DBStore, error) {
result := new(DBStore)
err := opt(result)
if err != nil {
return nil, err
}
err = result.createTable()
if err != nil {
return nil, err
}
return result, nil
}
func (d *DBStore) createTable() error {
sqlStmt := `CREATE TABLE IF NOT EXISTS message (
id BLOB PRIMARY KEY,
receiverTimestamp REAL NOT NULL,
senderTimestamp REAL NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL DEFAULT 0
) WITHOUT ROWID;`
_, err := d.db.Exec(sqlStmt)
if err != nil {
return err
}
return nil
}
// Closes a DB connection
func (d *DBStore) Stop() {
d.db.Close()
}
// Inserts a WakuMessage into the DB
func (d *DBStore) Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMessage) error {
stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
_, err = stmt.Exec(cursor.Digest, cursor.ReceiverTime, message.Timestamp, message.ContentTopic, pubsubTopic, message.Payload, message.Version)
if err != nil {
return err
}
return nil
}
// Returns all the stored WakuMessages
func (d *DBStore) GetAll() ([]StoredMessage, error) {
rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC")
if err != nil {
return nil, err
}
var result []StoredMessage
defer rows.Close()
for rows.Next() {
var id []byte
var receiverTimestamp float64
var senderTimestamp float64
var contentTopic string
var payload []byte
var version uint32
var pubsubTopic string
err = rows.Scan(&id, &receiverTimestamp, &senderTimestamp, &contentTopic, &pubsubTopic, &payload, &version)
if err != nil {
log.Fatal(err)
}
msg := new(pb.WakuMessage)
msg.ContentTopic = contentTopic
msg.Payload = payload
msg.Timestamp = senderTimestamp
msg.Version = version
record := StoredMessage{
ID: id,
PubsubTopic: pubsubTopic,
ReceiverTime: receiverTimestamp,
Message: msg,
}
result = append(result, record)
}
err = rows.Err()
if err != nil {
return nil, err
}
return result, nil
}

View File

@ -1,4 +1,4 @@
package node
package v2
import (
"github.com/status-im/go-waku/waku/v2/protocol"
@ -23,7 +23,7 @@ type Broadcaster interface {
// Unregister a channel so that it no longer receives broadcasts.
Unregister(chan<- *protocol.Envelope)
// Shut this broadcaster down.
Close() error
Close()
// Submit a new object to all subscribers
Submit(*protocol.Envelope)
}
@ -78,9 +78,8 @@ func (b *broadcaster) Unregister(newch chan<- *protocol.Envelope) {
}
// Closes the broadcaster. Used to stop receiving new subscribers
func (b *broadcaster) Close() error {
func (b *broadcaster) Close() {
close(b.reg)
return nil
}
// Submits an Envelope to be broadcasted among all registered subscriber channels

View File

@ -1,23 +1,29 @@
package metrics
import (
"context"
logging "github.com/ipfs/go-log"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
var log = logging.Logger("metrics")
var (
Messages = stats.Int64("node_messages", "Number of messages received", stats.UnitDimensionless)
Peers = stats.Int64("peers", "Number of connected peers", stats.UnitDimensionless)
Dials = stats.Int64("dials", "Number of peer dials", stats.UnitDimensionless)
StoreMessages = stats.Int64("store_messages", "Number of historical messages", stats.UnitDimensionless)
FilterSubscriptions = stats.Int64("filter_subscriptions", "Number of filter subscriptions", stats.UnitDimensionless)
Errors = stats.Int64("errors", "Number of errors", stats.UnitDimensionless)
StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless)
LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless)
)
var (
KeyType, _ = tag.NewKey("type")
KeyStoreErrorType, _ = tag.NewKey("store_error_type")
KeyType, _ = tag.NewKey("type")
ErrorType, _ = tag.NewKey("error_type")
)
var (
@ -44,6 +50,7 @@ var (
Measure: StoreMessages,
Description: "The distribution of the store protocol messages",
Aggregation: view.LastValue(),
TagKeys: []tag.Key{KeyType},
}
FilterSubscriptionsView = &view.View{
Name: "gowaku_filter_subscriptions",
@ -53,9 +60,34 @@ var (
}
StoreErrorTypesView = &view.View{
Name: "gowaku_store_errors",
Measure: Errors,
Measure: StoreErrors,
Description: "The distribution of the store protocol errors",
Aggregation: view.Count(),
TagKeys: []tag.Key{KeyType},
}
LightpushErrorTypesView = &view.View{
Name: "gowaku_lightpush_errors",
Measure: LightpushErrors,
Description: "The distribution of the lightpush protocol errors",
Aggregation: view.Count(),
TagKeys: []tag.Key{KeyType},
}
)
func RecordLightpushError(ctx context.Context, tagType string) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(tag.Key(ErrorType), tagType)}, LightpushErrors.M(1)); err != nil {
log.Error("failed to record with tags", err)
}
}
func RecordMessage(ctx context.Context, tagType string, len int) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoreMessages.M(int64(len))); err != nil {
log.Error("failed to record with tags", err)
}
}
func RecordStoreError(ctx context.Context, tagType string) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(ErrorType, tagType)}, StoreErrors.M(1)); err != nil {
log.Error("failed to record with tags", err)
}
}

View File

@ -6,6 +6,7 @@ import (
"crypto/ecdsa"
crand "crypto/rand"
"encoding/binary"
"fmt"
mrand "math/rand"
"errors"
@ -70,22 +71,38 @@ func (payload Payload) Encode(version uint32) ([]byte, error) {
case Symmetric:
encoded, err := encryptSymmetric(data, payload.Key.SymKey)
if err != nil {
return nil, errors.New("Couldn't encrypt using symmetric key")
return nil, fmt.Errorf("couldn't encrypt using symmetric key: %w", err)
} else {
return encoded, nil
}
case Asymmetric:
encoded, err := encryptAsymmetric(data, &payload.Key.PubKey)
if err != nil {
return nil, errors.New("Couldn't encrypt using asymmetric key")
return nil, fmt.Errorf("couldn't encrypt using asymmetric key: %w", err)
} else {
return encoded, nil
}
case None:
return nil, errors.New("Non supported KeyKind")
return nil, errors.New("non supported KeyKind")
}
}
return nil, errors.New("Unsupported WakuMessage version")
return nil, errors.New("unsupported wakumessage version")
}
func EncodeWakuMessage(message *pb.WakuMessage, keyInfo *KeyInfo) error {
msgPayload := message.Payload
payload := Payload{
Data: msgPayload,
Key: keyInfo,
}
encodedBytes, err := payload.Encode(message.Version)
if err != nil {
return err
}
message.Payload = encodedBytes
return nil
}
// Decodes a WakuMessage depending on the version parameter.
@ -98,12 +115,12 @@ func DecodePayload(message *pb.WakuMessage, keyInfo *KeyInfo) (*DecodedPayload,
switch keyInfo.Kind {
case Symmetric:
if keyInfo.SymKey == nil {
return nil, errors.New("Symmetric key is required")
return nil, errors.New("symmetric key is required")
}
decodedData, err := decryptSymmetric(message.Payload, keyInfo.SymKey)
if err != nil {
return nil, errors.New("Couldn't decrypt using symmetric key")
return nil, fmt.Errorf("couldn't decrypt using symmetric key: %w", err)
}
decodedPayload, err := validateAndParse(decodedData)
@ -114,12 +131,12 @@ func DecodePayload(message *pb.WakuMessage, keyInfo *KeyInfo) (*DecodedPayload,
return decodedPayload, nil
case Asymmetric:
if keyInfo.PrivKey == nil {
return nil, errors.New("Private key is required")
return nil, errors.New("private key is required")
}
decodedData, err := decryptAsymmetric(message.Payload, keyInfo.PrivKey)
if err != nil {
return nil, errors.New("Couldn't decrypt using asymmetric key")
return nil, fmt.Errorf("couldn't decrypt using asymmetric key: %w", err)
}
decodedPayload, err := validateAndParse(decodedData)
@ -129,10 +146,20 @@ func DecodePayload(message *pb.WakuMessage, keyInfo *KeyInfo) (*DecodedPayload,
return decodedPayload, nil
case None:
return nil, errors.New("Non supported KeyKind")
return nil, errors.New("non supported KeyKind")
}
}
return nil, errors.New("Unsupported WakuMessage version")
return nil, errors.New("unsupported wakumessage version")
}
func DecodeWakuMessage(message *pb.WakuMessage, keyInfo *KeyInfo) error {
decodedPayload, err := DecodePayload(message, keyInfo)
if err != nil {
return err
}
message.Payload = decodedPayload.Data
return nil
}
const aesNonceLength = 12

View File

@ -4,10 +4,8 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
proto "github.com/golang/protobuf/proto"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p"
@ -20,9 +18,9 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
rendezvous "github.com/status-im/go-waku-rendezvous"
v2 "github.com/status-im/go-waku/waku/v2"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
@ -48,10 +46,7 @@ type WakuNode struct {
ping *ping.PingService
store *store.WakuStore
subscriptions map[relay.Topic][]*Subscription
subscriptionsMutex sync.Mutex
bcaster Broadcaster
bcaster v2.Broadcaster
filters filter.Filters
@ -102,11 +97,10 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
}
w := new(WakuNode)
w.bcaster = NewBroadcaster(1024)
w.bcaster = v2.NewBroadcaster(1024)
w.host = host
w.cancel = cancel
w.ctx = ctx
w.subscriptions = make(map[relay.Topic][]*Subscription)
w.opts = params
w.quit = make(chan struct{})
@ -124,6 +118,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.connectionNotif = NewConnectionNotifier(ctx, host)
w.host.Network().Notify(w.connectionNotif)
go w.connectednessListener()
if w.opts.keepAliveInterval > time.Duration(0) {
@ -138,6 +133,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
}
func (w *WakuNode) Start() error {
w.store = store.NewWakuStore(w.opts.messageProvider)
if w.opts.enableStore {
w.startStore()
}
@ -155,13 +151,16 @@ func (w *WakuNode) Start() error {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...))
}
err := w.mountRelay(w.opts.enableRelay, w.opts.wOpts...)
err := w.mountRelay(w.opts.wOpts...)
if err != nil {
return err
}
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay)
if w.opts.enableLightPush {
w.mountLightPush()
if err := w.lightPush.Start(); err != nil {
return err
}
}
if w.opts.enableRendezvousServer {
@ -171,16 +170,27 @@ func (w *WakuNode) Start() error {
}
}
// Subscribe store to topic
if w.opts.storeMsgs {
log.Info("Subscribing store to broadcaster")
w.bcaster.Register(w.store.MsgC)
}
if w.filter != nil {
log.Info("Subscribing filter to broadcaster")
w.bcaster.Register(w.filter.MsgC)
}
return nil
}
func (w *WakuNode) Stop() {
w.subscriptionsMutex.Lock()
defer w.subscriptionsMutex.Unlock()
defer w.cancel()
close(w.quit)
w.bcaster.Close()
defer w.connectionNotif.Close()
defer w.protocolEventSub.Close()
defer w.identificationEventSub.Close()
@ -189,15 +199,6 @@ func (w *WakuNode) Stop() {
w.rendezvous.Stop()
}
if w.relay != nil {
for _, topic := range w.relay.Topics() {
for _, sub := range w.subscriptions[topic] {
sub.Unsubscribe()
}
}
w.subscriptions = nil
}
if w.filter != nil {
w.filter.Stop()
for _, filter := range w.filters {
@ -206,13 +207,9 @@ func (w *WakuNode) Stop() {
w.filters = nil
}
if w.lightPush != nil {
w.lightPush.Stop()
}
if w.store != nil {
w.store.Stop()
}
w.relay.Stop()
w.lightPush.Stop()
w.store.Stop()
w.host.Close()
}
@ -238,16 +235,27 @@ func (w *WakuNode) Relay() *relay.WakuRelay {
return w.relay
}
func (w *WakuNode) Store() *store.WakuStore {
return w.store
}
func (w *WakuNode) Filter() *filter.WakuFilter {
return w.filter
}
func (w *WakuNode) mountRelay(shouldRelayMessages bool, opts ...pubsub.Option) error {
var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...)
func (w *WakuNode) Lightpush() *lightpush.WakuLightPush {
return w.lightPush
}
if shouldRelayMessages {
_, err := w.Subscribe(w.ctx, nil)
func (w *WakuNode) mountRelay(opts ...pubsub.Option) error {
var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, opts...)
if err != nil {
return err
}
if w.opts.enableRelay {
_, err = w.relay.Subscribe(w.ctx, nil)
if err != nil {
return err
}
@ -265,15 +273,11 @@ func (w *WakuNode) mountFilter() error {
}
}
w.filter = filter.NewWakuFilter(w.ctx, w.host, filterHandler)
w.filter = filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, filterHandler)
return nil
}
func (w *WakuNode) mountLightPush() {
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay)
}
func (w *WakuNode) mountRendezvous() error {
w.rendezvous = rendezvous.NewRendezvousService(w.host, w.opts.rendevousStorage)
@ -286,7 +290,6 @@ func (w *WakuNode) mountRendezvous() error {
}
func (w *WakuNode) startStore() {
w.store = w.opts.store
w.store.Start(w.ctx, w.host)
if w.opts.shouldResume {
@ -310,7 +313,7 @@ func (w *WakuNode) startStore() {
ctxWithTimeout, ctxCancel := context.WithTimeout(w.ctx, 20*time.Second)
defer ctxCancel()
if err := w.Resume(ctxWithTimeout, nil); err != nil {
if _, err := w.store.Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), nil); err != nil {
log.Info("Retrying in 10s...")
time.Sleep(10 * time.Second)
} else {
@ -341,129 +344,6 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer.
return &info.ID, w.addPeer(info, protocolID)
}
func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*pb.HistoryResponse, error) {
if w.store == nil {
return nil, errors.New("WakuStore is not set")
}
query := new(pb.HistoryQuery)
for _, ct := range contentTopics {
query.ContentFilters = append(query.ContentFilters, &pb.ContentFilter{ContentTopic: ct})
}
query.StartTime = startTime
query.EndTime = endTime
query.PagingInfo = new(pb.PagingInfo)
result, err := w.store.Query(ctx, query, opts...)
if err != nil {
return nil, err
}
return result, nil
}
func (w *WakuNode) Resume(ctx context.Context, peerList []peer.ID) error {
if w.store == nil {
return errors.New("WakuStore is not set")
}
result, err := w.store.Resume(ctx, string(relay.DefaultWakuTopic), peerList)
if err != nil {
return err
}
log.Info("Retrieved messages since the last online time: ", result)
return nil
}
func (node *WakuNode) Subscribe(ctx context.Context, topic *relay.Topic) (*Subscription, error) {
// Subscribes to a PubSub topic.
// NOTE The data field SHOULD be decoded as a WakuMessage.
if node.relay == nil {
return nil, errors.New("WakuRelay hasn't been set")
}
t := relay.GetTopic(topic)
sub, isNew, err := node.relay.Subscribe(t)
// Subscribe store to topic
if isNew && node.opts.store != nil && node.opts.storeMsgs {
log.Info("Subscribing store to topic ", t)
node.bcaster.Register(node.opts.store.MsgC)
}
// Subscribe filter
if isNew && node.filter != nil {
log.Info("Subscribing filter to topic ", t)
node.bcaster.Register(node.filter.MsgC)
}
if err != nil {
return nil, err
}
// Create client subscription
subscription := new(Subscription)
subscription.closed = false
subscription.C = make(chan *protocol.Envelope, 1024) // To avoid blocking
subscription.quit = make(chan struct{})
node.subscriptionsMutex.Lock()
defer node.subscriptionsMutex.Unlock()
node.subscriptions[t] = append(node.subscriptions[t], subscription)
node.bcaster.Register(subscription.C)
go node.subscribeToTopic(t, subscription, sub)
return subscription, nil
}
func (node *WakuNode) subscribeToTopic(t relay.Topic, subscription *Subscription, sub *pubsub.Subscription) {
nextMsgTicker := time.NewTicker(time.Millisecond * 10)
defer nextMsgTicker.Stop()
ctx, err := tag.New(node.ctx, tag.Insert(metrics.KeyType, "relay"))
if err != nil {
log.Error(err)
return
}
for {
select {
case <-subscription.quit:
subscription.mutex.Lock()
node.bcaster.Unregister(subscription.C) // Remove from broadcast list
close(subscription.C)
subscription.mutex.Unlock()
case <-nextMsgTicker.C:
msg, err := sub.Next(ctx)
if err != nil {
subscription.mutex.Lock()
for _, subscription := range node.subscriptions[t] {
subscription.Unsubscribe()
}
subscription.mutex.Unlock()
return
}
stats.Record(ctx, metrics.Messages.M(1))
wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
log.Error("could not decode message", err)
return
}
envelope := protocol.NewEnvelope(wakuMessage, string(t))
node.bcaster.Submit(envelope)
}
}
}
// Wrapper around WakuFilter.Subscribe
// that adds a Filter object to node.filters
func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilter) (filterID string, ch chan *protocol.Envelope, err error) {
@ -478,7 +358,7 @@ func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilte
// Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
// ContentFilterChan takes MessagePush structs
subs, err := node.filter.Subscribe(ctx, f)
if subs.RequestID == "" || err != nil {
if err != nil || subs.RequestID == "" {
// Failed to subscribe
log.Error("remote subscription to filter failed", err)
return
@ -571,52 +451,6 @@ func (node *WakuNode) UnsubscribeFilter(ctx context.Context, cf filter.ContentFi
return nil
}
func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) {
if node.relay == nil {
return nil, errors.New("WakuRelay hasn't been set")
}
if message == nil {
return nil, errors.New("message can't be null")
}
if node.lightPush != nil {
return node.LightPush(ctx, message, topic)
}
hash, err := node.relay.Publish(ctx, message, topic)
if err != nil {
return nil, err
}
return hash, nil
}
func (node *WakuNode) LightPush(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...lightpush.LightPushOption) ([]byte, error) {
if node.lightPush == nil {
return nil, errors.New("WakuLightPush hasn't been set")
}
if message == nil {
return nil, errors.New("message can't be null")
}
req := new(pb.PushRequest)
req.Message = message
req.PubsubTopic = string(relay.GetTopic(topic))
response, err := node.lightPush.Request(ctx, req, opts...)
if err != nil {
return nil, err
}
if response.IsSuccess {
hash, _ := message.Hash()
return hash, nil
} else {
return nil, errors.New(response.Info)
}
}
func (w *WakuNode) DialPeerWithMultiAddress(ctx context.Context, address ma.Multiaddr) error {
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
@ -694,6 +528,9 @@ func (w *WakuNode) Peers() PeerStats {
return p
}
// startKeepAlive creates a go routine that periodically pings connected peers.
// This is necessary because TCP connections are automatically closed due to inactivity,
// and doing a ping will avoid this (with a small bandwidth cost)
func (w *WakuNode) startKeepAlive(t time.Duration) {
log.Info("Setting up ping protocol with duration of ", t)
@ -704,21 +541,14 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
for {
select {
case <-ticker.C:
for _, p := range w.host.Network().Peers() {
log.Debug("Pinging ", p)
go func(peer peer.ID) {
ctx, cancel := context.WithTimeout(w.ctx, 3*time.Second)
defer cancel()
pr := w.ping.Ping(ctx, peer)
select {
case res := <-pr:
if res.Error != nil {
log.Error(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error()))
}
case <-ctx.Done():
log.Error(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err()))
}
}(p)
// Compared to Network's peers collection,
// Peerstore contains all peers ever connected to,
// thus if a host goes down and back again,
// pinging a peer will trigger identification process,
// which is not possible when iterating
// through Network's peer collection, as it will be empty
for _, p := range w.host.Peerstore().Peers() {
go pingPeer(w.ctx, w.ping, p)
}
case <-w.quit:
ticker.Stop()
@ -727,3 +557,19 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
}
}()
}
func pingPeer(ctx context.Context, pingService *ping.PingService, peer peer.ID) {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
log.Debug("Pinging ", peer)
pr := pingService.Ping(ctx, peer)
select {
case res := <-pr:
if res.Error != nil {
log.Error(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error()))
}
case <-ctx.Done():
log.Error(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err()))
}
}

View File

@ -28,14 +28,15 @@ type WakuNodeParameters struct {
privKey *crypto.PrivKey
libP2POpts []libp2p.Option
enableRelay bool
enableFilter bool
wOpts []pubsub.Option
enableRelay bool
enableFilter bool
isFilterFullNode bool
wOpts []pubsub.Option
enableStore bool
shouldResume bool
storeMsgs bool
store *store.WakuStore
enableStore bool
shouldResume bool
storeMsgs bool
messageProvider store.MessageProvider
enableRendezvous bool
enableRendezvousServer bool
@ -151,9 +152,10 @@ func WithRendezvousServer(storage rendezvous.Storage) WakuNodeOption {
// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption
// accepts a list of WakuFilter gossipsub options to setup the protocol
func WithWakuFilter(opts ...pubsub.Option) WakuNodeOption {
func WithWakuFilter(fullNode bool) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableFilter = true
params.isFilterFullNode = fullNode
return nil
}
}
@ -164,7 +166,6 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableStore = true
params.storeMsgs = shouldStoreMessages
params.store = store.NewWakuStore(shouldStoreMessages, nil)
params.shouldResume = shouldResume
return nil
}
@ -174,11 +175,7 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption {
// used to store and retrieve persisted messages
func WithMessageProvider(s store.MessageProvider) WakuNodeOption {
return func(params *WakuNodeParameters) error {
if params.store != nil {
params.store.SetMsgProvider(s)
} else {
params.store = store.NewWakuStore(true, s)
}
params.messageProvider = s
return nil
}
}

View File

@ -66,6 +66,7 @@ type (
ctx context.Context
h host.Host
subscribers []Subscriber
isFullNode bool
pushHandler MessagePushHandler
MsgC chan *protocol.Envelope
}
@ -145,7 +146,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages")
stats.Record(wf.ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages))))
} else if filterRPCRequest.Request != nil {
} else if filterRPCRequest.Request != nil && wf.isFullNode {
// We're on a full node.
// This is a filter request coming from a light node.
if filterRPCRequest.Request.Subscribe {
@ -197,10 +198,13 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers))))
}
} else {
log.Error("can't serve request")
return
}
}
func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandler) *WakuFilter {
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, handler MessagePushHandler) *WakuFilter {
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
if err != nil {
log.Error(err)
@ -211,15 +215,21 @@ func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandl
wf.MsgC = make(chan *protocol.Envelope)
wf.h = host
wf.pushHandler = handler
wf.isFullNode = isFullNode
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
go wf.FilterListener()
if wf.isFullNode {
log.Info("Filter protocol started")
} else {
log.Info("Filter protocol started (only client mode)")
}
return wf
}
func (wf *WakuFilter) FilterListener() {
// This function is invoked for each message received
// on the full node in context of Waku2-Filter
handle := func(envelope *protocol.Envelope) error { // async

View File

@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-msgio/protoio"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
@ -39,10 +40,18 @@ func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay)
wakuLP.ctx = ctx
wakuLP.h = h
return wakuLP
}
func (wakuLP *WakuLightPush) Start() error {
if wakuLP.relay == nil {
return errors.New("relay is required")
}
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest)
log.Info("Light Push protocol started")
return wakuLP
return nil
}
func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
@ -56,6 +65,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
err := reader.ReadMsg(requestPushRPC)
if err != nil {
log.Error("error reading request", err)
metrics.RecordLightpushError(wakuLP.ctx, "decodeRpcFailure")
return
}
@ -68,7 +78,9 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
response := new(pb.PushResponse)
if wakuLP.relay != nil {
// XXX Assumes success, should probably be extended to check for network, peers, etc
// TODO: Assumes success, should probably be extended to check for network, peers, etc
// It might make sense to use WithReadiness option here?
_, err := wakuLP.relay.Publish(wakuLP.ctx, message, &pubSubTopic)
if err != nil {
@ -151,7 +163,7 @@ func DefaultOptions() []LightPushOption {
}
}
func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
params := new(LightPushParameters)
params.lp = wakuLP
@ -162,6 +174,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
}
if params.selectedPeer == "" {
metrics.RecordLightpushError(wakuLP.ctx, "dialError")
return nil, ErrNoPeersAvailable
}
@ -172,6 +185,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
if err != nil {
log.Info("failed to connect to remote peer", err)
metrics.RecordLightpushError(wakuLP.ctx, "dialError")
return nil, err
}
@ -179,6 +193,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
defer func() {
err := connOpt.Reset()
if err != nil {
metrics.RecordLightpushError(wakuLP.ctx, "dialError")
log.Error("failed to reset connection", err)
}
}()
@ -198,6 +213,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
err = reader.ReadMsg(pushResponseRPC)
if err != nil {
log.Error("could not read response", err)
metrics.RecordLightpushError(wakuLP.ctx, "decodeRPCFailure")
return nil, err
}
@ -207,3 +223,25 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
func (w *WakuLightPush) Stop() {
w.h.RemoveStreamHandler(LightPushID_v20beta1)
}
func (w *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) {
if message == nil {
return nil, errors.New("message can't be null")
}
req := new(pb.PushRequest)
req.Message = message
req.PubsubTopic = string(relay.GetTopic(topic))
response, err := w.request(ctx, req, opts...)
if err != nil {
return nil, err
}
if response.IsSuccess {
hash, _ := message.Hash()
return hash, nil
} else {
return nil, errors.New(response.Info)
}
}

View File

@ -1,4 +1,4 @@
package node
package relay
import (
"sync"
@ -12,23 +12,21 @@ type Subscription struct {
C chan *protocol.Envelope
closed bool
mutex sync.Mutex
once sync.Once
quit chan struct{}
}
// Unsubscribe will close a subscription from a pubsub topic. Will close the message channel
func (subs *Subscription) Unsubscribe() {
subs.mutex.Lock()
defer subs.mutex.Unlock()
if !subs.closed {
close(subs.quit)
subs.once.Do(func() {
subs.closed = true
}
close(subs.quit)
close(subs.C)
})
}
// IsClosed determine whether a Subscription is still open for receiving messages
func (subs *Subscription) IsClosed() bool {
subs.mutex.Lock()
defer subs.mutex.Unlock()
return subs.closed
}

View File

@ -4,17 +4,22 @@ import (
"context"
"crypto/sha256"
"errors"
"fmt"
"sync"
proto "github.com/golang/protobuf/proto"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
v2 "github.com/status-im/go-waku/waku/v2"
"github.com/status-im/go-waku/waku/v2/metrics"
waku_proto "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
)
var log = logging.Logger("wakurelay")
@ -32,6 +37,11 @@ type WakuRelay struct {
topicsMutex sync.Mutex
wakuRelayTopics map[Topic]*pubsub.Topic
relaySubs map[Topic]*pubsub.Subscription
bcaster v2.Broadcaster
subscriptions map[Topic][]*Subscription
subscriptionsMutex sync.Mutex
}
// Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn
@ -40,12 +50,14 @@ func msgIdFn(pmsg *pubsub_pb.Message) string {
return string(hash[:])
}
func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*WakuRelay, error) {
func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) {
w := new(WakuRelay)
w.host = h
w.topics = make(map[Topic]bool)
w.wakuRelayTopics = make(map[Topic]*pubsub.Topic)
w.relaySubs = make(map[Topic]*pubsub.Subscription)
w.subscriptions = make(map[Topic][]*Subscription)
w.bcaster = bcaster
// default options required by WakuRelay
opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
@ -113,31 +125,28 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) {
return pubSubTopic, nil
}
func (w *WakuRelay) Subscribe(topic Topic) (subs *pubsub.Subscription, isNew bool, err error) {
func (w *WakuRelay) subscribe(topic Topic) (subs *pubsub.Subscription, err error) {
sub, ok := w.relaySubs[topic]
if !ok {
pubSubTopic, err := w.upsertTopic(topic)
if err != nil {
return nil, false, err
return nil, err
}
sub, err = pubSubTopic.Subscribe()
if err != nil {
return nil, false, err
return nil, err
}
w.relaySubs[topic] = sub
log.Info("Subscribing to topic ", topic)
}
isNew = !ok // ok will be true if subscription already exists
return sub, isNew, nil
return sub, nil
}
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *Topic) ([]byte, error) {
// Publish a `WakuMessage` to a PubSub topic.
if w.pubsub == nil {
return nil, errors.New("PubSub hasn't been set")
}
@ -158,7 +167,6 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic
}
err = pubSubTopic.Publish(ctx, out)
if err != nil {
return nil, err
}
@ -178,4 +186,96 @@ func GetTopic(topic *Topic) Topic {
func (w *WakuRelay) Stop() {
w.host.RemoveStreamHandler(WakuRelayID_v200)
w.subscriptionsMutex.Lock()
defer w.subscriptionsMutex.Unlock()
for _, topic := range w.Topics() {
for _, sub := range w.subscriptions[topic] {
sub.Unsubscribe()
}
}
w.subscriptions = nil
}
func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription, error) {
// Subscribes to a PubSub topic.
// NOTE The data field SHOULD be decoded as a WakuMessage.
t := GetTopic(topic)
sub, err := w.subscribe(t)
if err != nil {
return nil, err
}
// Create client subscription
subscription := new(Subscription)
subscription.closed = false
subscription.C = make(chan *waku_proto.Envelope, 1024) // To avoid blocking
subscription.quit = make(chan struct{})
w.subscriptionsMutex.Lock()
defer w.subscriptionsMutex.Unlock()
w.subscriptions[t] = append(w.subscriptions[t], subscription)
if w.bcaster != nil {
w.bcaster.Register(subscription.C)
}
go w.subscribeToTopic(t, subscription, sub)
return subscription, nil
}
func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message {
msgChannel := make(chan *pubsub.Message, 1024)
go func(msgChannel chan *pubsub.Message) {
for {
msg, err := sub.Next(ctx)
if err != nil {
log.Error(fmt.Errorf("subscription failed: %w", err))
sub.Cancel()
close(msgChannel)
for _, subscription := range w.subscriptions[Topic(sub.Topic())] {
subscription.Unsubscribe()
}
}
msgChannel <- msg
}
}(msgChannel)
return msgChannel
}
func (w *WakuRelay) subscribeToTopic(t Topic, subscription *Subscription, sub *pubsub.Subscription) {
ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay"))
if err != nil {
log.Error(err)
return
}
subChannel := w.nextMessage(ctx, sub)
for {
select {
case <-subscription.quit:
if w.bcaster != nil {
w.bcaster.Unregister(subscription.C) // Remove from broadcast list
}
// TODO: if there are no more relay subscriptions, close the pubsub subscription
case msg := <-subChannel:
stats.Record(ctx, metrics.Messages.M(1))
wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
log.Error("could not decode message", err)
return
}
envelope := waku_proto.NewEnvelope(wakuMessage, string(t))
if w.bcaster != nil {
w.bcaster.Submit(envelope)
}
}
}
}

View File

@ -17,9 +17,8 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-msgio/protoio"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"github.com/status-im/go-waku/waku/persistence"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
@ -51,6 +50,10 @@ func minOf(vars ...int) int {
}
func paginateWithIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMessages []IndexedWakuMessage, resPagingInfo *pb.PagingInfo) {
if pinfo == nil {
pinfo = new(pb.PagingInfo)
}
// takes list, and performs paging based on pinfo
// returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request
cursor := pinfo.Cursor
@ -179,19 +182,39 @@ func (w *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse {
return result
}
type StoredMessage struct {
ID []byte
PubsubTopic string
ReceiverTime float64
Message *pb.WakuMessage
}
type MessageProvider interface {
GetAll() ([]StoredMessage, error)
GetAll() ([]persistence.StoredMessage, error)
Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMessage) error
Stop()
}
type Query struct {
Topic string
ContentTopics []string
StartTime float64
EndTime float64
}
type Result struct {
Messages []*pb.WakuMessage
query *pb.HistoryQuery
cursor *pb.Index
peerId peer.ID
}
func (r *Result) Cursor() *pb.Index {
return r.cursor
}
func (r *Result) PeerID() peer.ID {
return r.peerId
}
func (r *Result) Query() *pb.HistoryQuery {
return r.query
}
type IndexedWakuMessage struct {
msg *pb.WakuMessage
index *pb.Index
@ -204,18 +227,17 @@ type WakuStore struct {
messages []IndexedWakuMessage
seen map[[32]byte]struct{}
started bool
messagesMutex sync.Mutex
storeMsgs bool
msgProvider MessageProvider
h host.Host
}
func NewWakuStore(shouldStoreMessages bool, p MessageProvider) *WakuStore {
func NewWakuStore(p MessageProvider) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.MsgC = make(chan *protocol.Envelope)
wakuStore.msgProvider = p
wakuStore.storeMsgs = shouldStoreMessages
wakuStore.seen = make(map[[32]byte]struct{})
return wakuStore
@ -226,14 +248,15 @@ func (store *WakuStore) SetMsgProvider(p MessageProvider) {
}
func (store *WakuStore) Start(ctx context.Context, h host.Host) {
store.h = h
store.ctx = ctx
if !store.storeMsgs {
log.Info("Store protocol started (messages aren't stored)")
if store.started {
return
}
store.started = true
store.h = h
store.ctx = ctx
store.MsgC = make(chan *protocol.Envelope)
store.h.SetStreamHandlerMatch(StoreID_v20beta3, protocol.PrefixTextMatch(string(StoreID_v20beta3)), store.onRequest)
go store.storeIncomingMessages(ctx)
@ -243,13 +266,20 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host) {
return
}
storedMessages, err := store.msgProvider.GetAll()
store.fetchDBRecords(ctx)
log.Info("Store protocol started")
}
func (store *WakuStore) fetchDBRecords(ctx context.Context) {
if store.msgProvider == nil {
return
}
storedMessages, err := (store.msgProvider).GetAll()
if err != nil {
log.Error("could not load DBProvider messages", err)
err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_load_failure")}, metrics.Errors.M(1))
if err != nil {
log.Error("failed to record with tags")
}
metrics.RecordStoreError(ctx, "store_load_failure")
return
}
@ -261,12 +291,8 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host) {
store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message)
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil {
log.Error("failed to record with tags")
}
metrics.RecordMessage(ctx, "stored", len(store.messages))
}
log.Info("Store protocol started")
}
func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) {
@ -294,6 +320,7 @@ func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) {
store.storeMessageWithIndex(pubSubTopic, index, msg)
if store.msgProvider == nil {
metrics.RecordMessage(store.ctx, "stored", len(store.messages))
return
}
@ -301,16 +328,11 @@ func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) {
if err != nil {
log.Error("could not store message", err)
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_failure")}, metrics.Errors.M(1)); err != nil {
log.Error("failed to record with tags", err)
}
metrics.RecordStoreError(store.ctx, "store_failure")
return
}
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil {
log.Error("failed to record with tags", err)
}
metrics.RecordMessage(store.ctx, "stored", len(store.messages))
}
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
@ -330,9 +352,7 @@ func (store *WakuStore) onRequest(s network.Stream) {
err := reader.ReadMsg(historyRPCRequest)
if err != nil {
log.Error("error reading request", err)
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)); err != nil {
log.Error("failed to record with tags", err)
}
metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
return
}
@ -495,20 +515,28 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
err = reader.ReadMsg(historyResponseRPC)
if err != nil {
log.Error("could not read response", err)
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)); err != nil {
log.Error("failed to record with tags")
}
metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
return nil, err
}
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "retrieved")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil {
log.Error("failed to record with tags", err)
}
metrics.RecordMessage(ctx, "retrieved", len(store.messages))
return historyResponseRPC.Response, nil
}
func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...HistoryRequestOption) (*pb.HistoryResponse, error) {
func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) {
q := &pb.HistoryQuery{
PubsubTopic: query.Topic,
ContentFilters: []*pb.ContentFilter{},
StartTime: query.StartTime,
EndTime: query.EndTime,
PagingInfo: &pb.PagingInfo{},
}
for _, cf := range query.ContentTopics {
q.ContentFilters = append(q.ContentFilters, &pb.ContentFilter{ContentTopic: cf})
}
params := new(HistoryRequestParameters)
params.s = store
@ -538,7 +566,55 @@ func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...H
q.PagingInfo.PageSize = params.pageSize
return store.queryFrom(ctx, q, params.selectedPeer, params.requestId)
response, err := store.queryFrom(ctx, q, params.selectedPeer, params.requestId)
if err != nil {
return nil, err
}
if response.Error == pb.HistoryResponse_INVALID_CURSOR {
return nil, errors.New("invalid cursor")
}
return &Result{
Messages: response.Messages,
cursor: response.PagingInfo.Cursor,
query: q,
peerId: params.selectedPeer,
}, nil
}
func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {
q := &pb.HistoryQuery{
PubsubTopic: r.query.PubsubTopic,
ContentFilters: r.query.ContentFilters,
StartTime: r.query.StartTime,
EndTime: r.query.EndTime,
PagingInfo: &pb.PagingInfo{
PageSize: r.query.PagingInfo.PageSize,
Direction: r.query.PagingInfo.Direction,
Cursor: &pb.Index{
Digest: r.cursor.Digest,
ReceiverTime: r.cursor.ReceiverTime,
SenderTime: r.cursor.SenderTime,
},
},
}
response, err := store.queryFrom(ctx, q, r.peerId, protocol.GenerateRequestId())
if err != nil {
return nil, err
}
if response.Error == pb.HistoryResponse_INVALID_CURSOR {
return nil, errors.New("invalid cursor")
}
return &Result{
Messages: response.Messages,
cursor: response.PagingInfo.Cursor,
query: q,
peerId: r.peerId,
}, nil
}
func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, candidateList []peer.ID) (*pb.HistoryResponse, error) {
@ -546,9 +622,10 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c
// returns the number of retrieved messages, or error if all the requests fail
for _, peer := range candidateList {
result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId())
if err != nil {
if err == nil {
return result, nil
}
log.Error(fmt.Errorf("resume history with peer %s failed: %w", peer, err))
}
return nil, ErrFailedQuery
@ -574,6 +651,10 @@ func (store *WakuStore) findLastSeen() float64 {
// the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) {
if !store.started {
return 0, errors.New("can't resume: store has not started")
}
currentTime := utils.GetUnixEpoch()
lastSeenTime := store.findLastSeen()
@ -617,11 +698,21 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
store.storeMessage(pubsubTopic, msg)
}
log.Info("Retrieved messages since the last online time: ", len(response.Messages))
return len(response.Messages), nil
}
// TODO: queryWithAccounting
func (w *WakuStore) Stop() {
w.h.RemoveStreamHandler(StoreID_v20beta3)
w.started = false
if w.MsgC != nil {
close(w.MsgC)
}
if w.h != nil {
w.h.RemoveStreamHandler(StoreID_v20beta3)
}
}

View File

@ -2,6 +2,7 @@ package utils
import (
"errors"
"math/rand"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
@ -10,8 +11,7 @@ import (
var log = logging.Logger("utils")
// SelectPeer is used to return a peer that supports a given protocol.
// It currently selects the first peer returned by the peerstore
// SelectPeer is used to return a random peer that supports a given protocol.
func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
@ -19,9 +19,6 @@ func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) {
// - which topics they track
// - latency?
// - default store peer?
// Selects the best peer for a given protocol
var peers peer.IDSlice
for _, peer := range host.Peerstore().Peers() {
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
@ -36,8 +33,8 @@ func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) {
}
if len(peers) >= 1 {
// TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
return &peers[0], nil
// TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned
return &peers[rand.Intn(len(peers))], nil // nolint: gosec
}
return nil, errors.New("no suitable peers found")

View File

@ -2,6 +2,10 @@ package utils
import "time"
func GetUnixEpoch() float64 {
return float64(time.Now().UnixNano()) / float64(time.Second)
func GetUnixEpochFrom(now func() time.Time) float64 {
return float64(now().UnixNano()) / float64(time.Second)
}
func GetUnixEpoch() float64 {
return GetUnixEpochFrom(time.Now)
}

4
vendor/modules.txt vendored
View File

@ -446,7 +446,9 @@ github.com/spacemonkeygo/spacelog
github.com/status-im/doubleratchet
# github.com/status-im/go-multiaddr-ethv4 v1.2.1
github.com/status-im/go-multiaddr-ethv4
# github.com/status-im/go-waku v0.0.0-20211018124348-4227a9d69d19
# github.com/status-im/go-waku v0.0.0-20211101173358-268767262b60
github.com/status-im/go-waku/waku/persistence
github.com/status-im/go-waku/waku/v2
github.com/status-im/go-waku/waku/v2/metrics
github.com/status-im/go-waku/waku/v2/node
github.com/status-im/go-waku/waku/v2/protocol

View File

@ -45,6 +45,6 @@ var DefaultConfig = Config{
MaxMessageSize: common.DefaultMaxMessageSize,
Host: "0.0.0.0",
Port: 60000,
KeepAliveInterval: 1, // second
KeepAliveInterval: 10, // second
DiscoveryLimit: 40,
}

View File

@ -217,8 +217,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
}
if cfg.LightClient {
opts = append(opts, node.WithLightPush())
opts = append(opts, node.WithWakuFilter())
opts = append(opts, node.WithWakuFilter(false))
} else {
relayOpts := []pubsub.Option{
pubsub.WithMaxMessageSize(int(waku.settings.MaxMsgSize)),
@ -318,7 +317,7 @@ func (w *Waku) runRelayMsgLoop() {
return
}
sub, err := w.node.Subscribe(context.Background(), nil)
sub, err := w.node.Relay().Subscribe(context.Background(), nil)
if err != nil {
fmt.Println("Could not subscribe:", err)
return
@ -714,7 +713,36 @@ func (w *Waku) UnsubscribeMany(ids []string) error {
// Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles.
func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) {
return w.node.Publish(context.Background(), msg, nil)
var err error
var hash []byte
if w.settings.LightClient {
hash, err = w.node.Lightpush().Publish(context.Background(), msg, nil)
} else {
hash, err = w.node.Relay().Publish(context.Background(), msg, nil)
}
if err != nil {
return nil, err
}
w.poolMu.Lock()
_, alreadyCached := w.envelopes[gethcommon.BytesToHash(hash)]
w.poolMu.Unlock()
if !alreadyCached {
envelope := wakuprotocol.NewEnvelope(msg, string(relay.GetTopic(nil)))
recvMessage := common.NewReceivedMessage(envelope)
w.addEnvelope(recvMessage)
}
event := common.EnvelopeEvent{
Event: common.EventEnvelopeSent,
Hash: gethcommon.BytesToHash(hash),
}
w.SendEnvelopeEvent(event)
return hash, nil
}
func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (cursor *pb.Index, err error) {
@ -723,7 +751,17 @@ func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []s
strTopics[i] = t.ContentTopic()
}
result, err := w.node.Query(context.Background(), strTopics, float64(from), float64(to), opts...)
query := store.Query{
StartTime: float64(from),
EndTime: float64(to),
ContentTopics: strTopics,
Topic: string(relay.DefaultWakuTopic),
}
result, err := w.node.Store().Query(context.Background(), query, opts...)
if err != nil {
return
}
for _, msg := range result.Messages {
envelope := wakuprotocol.NewEnvelope(msg, string(relay.DefaultWakuTopic))
@ -734,7 +772,7 @@ func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []s
}
if len(result.Messages) != 0 {
cursor = result.PagingInfo.Cursor
cursor = result.Cursor()
}
return