chore: bump go-waku

This commit is contained in:
Richard Ramos 2022-12-09 14:06:36 -04:00 committed by Andrea Maria Piana
parent dd8be1d8b0
commit 38a4bbf235
31 changed files with 1354 additions and 891 deletions

6
go.mod
View File

@ -16,7 +16,7 @@ replace github.com/flynn/noise v1.0.0 => github.com/status-im/noise v1.0.1-hands
require (
github.com/anacrolix/torrent v1.41.0
github.com/beevik/ntp v0.2.0
github.com/beevik/ntp v0.3.0
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/cenkalti/backoff/v3 v3.2.2
github.com/davecgh/go-spew v1.1.1
@ -80,7 +80,7 @@ require github.com/fogleman/gg v1.3.0
require (
github.com/gorilla/sessions v1.2.1
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8
github.com/waku-org/go-waku v0.2.3-0.20221205192014-05e33105c43f
github.com/waku-org/go-waku v0.2.3-test.0.20221209175307-685142e7b743
)
require (
@ -242,7 +242,7 @@ require (
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
github.com/urfave/cli/v2 v2.20.2 // indirect
github.com/waku-org/go-discover v0.0.0-20221027130446-2f43d5f6c73f // indirect
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 // indirect
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg // indirect
github.com/waku-org/noise v1.0.2 // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect

12
go.sum
View File

@ -382,8 +382,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.7.2/go.mod h1:8EzeIqfWt2wWT4rJVu3f21
github.com/aws/smithy-go v1.1.0/go.mod h1:EzMw8dbp/YJL4A5/sbhGddag+NPT7q084agLbB9LgIw=
github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/beevik/ntp v0.2.0 h1:sGsd+kAXzT0bfVfzJfce04g+dSRfrs+tbQW8lweuYgw=
github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg=
github.com/beevik/ntp v0.3.0 h1:xzVrPrE4ziasFXgBVBZJDP0Wg/KpMwk2KHJ4Ba8GrDw=
github.com/beevik/ntp v0.3.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
@ -2063,10 +2063,10 @@ github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmF
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/waku-org/go-discover v0.0.0-20221027130446-2f43d5f6c73f h1:YHIrSqs8Aot1exhwx0+uwdshCp3RfZu5OY6Hvt3Hk8g=
github.com/waku-org/go-discover v0.0.0-20221027130446-2f43d5f6c73f/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-waku v0.2.3-0.20221205192014-05e33105c43f h1:MGWZyvAizQs9BoNDh/GUbzJykBJKETXRvwbd8/VwglA=
github.com/waku-org/go-waku v0.2.3-0.20221205192014-05e33105c43f/go.mod h1:tJGCjHrNc8JjOX0df15Uv4YiLQMNhWrotKHJeeVl3AE=
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM=
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-waku v0.2.3-test.0.20221209175307-685142e7b743 h1:Q6bNLLCE7+OGrRlsmcrglKeURXmaagemIWTrvrJTgK4=
github.com/waku-org/go-waku v0.2.3-test.0.20221209175307-685142e7b743/go.mod h1:MzmxeUFKOSGqI+3ditwJVmiDXtWW7p4vZhmFeAcwKyI=
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg h1:2vVIBCtBih2w1K9ll8YnToTDZvbxcgbsClsPlJS/kkg=
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg/go.mod h1:GlyaVeEWNEBxVJrWC6jFTvb4LNb9d9qnjdS6EiWVUvk=
github.com/waku-org/noise v1.0.2 h1:7WmlhpJ0eliBzwzKz6SoTqQznaEU2IuebHF3oCekqqs=

View File

@ -2,9 +2,8 @@ language: go
sudo: false
go:
- 1.7.x
- 1.8.x
- 1.9.x
- 1.12.x
- tip
matrix:

View File

@ -21,22 +21,23 @@ time, err := ntp.Time("0.beevik-ntp.pool.ntp.org")
## Querying time metadata
To obtain the current time as well as some additional metadata about the time,
use the `Query` function:
use the [`Query`](https://godoc.org/github.com/beevik/ntp#Query) function:
```go
response, err := ntp.Query("0.beevik-ntp.pool.ntp.org")
time := time.Now().Add(response.ClockOffset)
```
Alternatively, use the `QueryWithOptions` function if you want to change the
default behavior used by the `Query` function:
Alternatively, use the [`QueryWithOptions`](https://godoc.org/github.com/beevik/ntp#QueryWithOptions)
function if you want to change the default behavior used by the `Query`
function:
```go
options := ntp.QueryOptions{ Timeout: 30*time.Second, TTL: 5 }
response, err := ntp.QueryWithOptions("0.beevik-ntp.pool.ntp.org", options)
time := time.Now().Add(response.ClockOffset)
```
The `Response` structure returned by `Query` includes the following
information:
The [`Response`](https://godoc.org/github.com/beevik/ntp#Response) structure
returned by `Query` includes the following information:
* `Time`: The time the server transmitted its response, according to its own clock.
* `ClockOffset`: The estimated offset of the local system clock relative to the server's clock. For a more accurate time reading, you may add this offset to any subsequent system clock reading.
* `RTT`: An estimate of the round-trip-time delay between the client and the server.
@ -52,9 +53,9 @@ information:
* `KissCode`: A 4-character string describing the reason for a "kiss of death" response (stratum=0).
* `Poll`: The maximum polling interval between successive messages to the server.
The `Response` structure's `Validate` method performs additional sanity checks
to determine whether the response is suitable for time synchronization
purposes.
The `Response` structure's [`Validate`](https://godoc.org/github.com/beevik/ntp#Response.Validate)
method performs additional sanity checks to determine whether the response is
suitable for time synchronization purposes.
```go
err := response.Validate()
if err == nil {

View File

@ -1,3 +1,13 @@
Release v0.3.0
==============
There have been no breaking changes or further deprecations since the
previous release.
**Changes**
* Fixed a bug in the calculation of NTP timestamps.
Release v0.2.0
==============

28
vendor/github.com/beevik/ntp/ntp.go generated vendored
View File

@ -76,8 +76,12 @@ type ntpTime uint64
// and returns the corresponding time.Duration value.
func (t ntpTime) Duration() time.Duration {
sec := (t >> 32) * nanoPerSec
frac := (t & 0xffffffff) * nanoPerSec >> 32
return time.Duration(sec + frac)
frac := (t & 0xffffffff) * nanoPerSec
nsec := frac >> 32
if uint32(frac) >= 0x80000000 {
nsec++
}
return time.Duration(sec + nsec)
}
// Time interprets the fixed-point ntpTime as an absolute time and returns
@ -91,10 +95,11 @@ func (t ntpTime) Time() time.Time {
func toNtpTime(t time.Time) ntpTime {
nsec := uint64(t.Sub(ntpEpoch))
sec := nsec / nanoPerSec
// Round up the fractional component so that repeated conversions
// between time.Time and ntpTime do not yield continually decreasing
// results.
frac := (((nsec - sec*nanoPerSec) << 32) + nanoPerSec - 1) / nanoPerSec
nsec = uint64(nsec-sec*nanoPerSec) << 32
frac := uint64(nsec / nanoPerSec)
if nsec%nanoPerSec >= nanoPerSec/2 {
frac++
}
return ntpTime(sec<<32 | frac)
}
@ -105,10 +110,13 @@ type ntpTimeShort uint32
// Duration interprets the fixed-point ntpTimeShort as a number of elapsed
// seconds and returns the corresponding time.Duration value.
func (t ntpTimeShort) Duration() time.Duration {
t64 := uint64(t)
sec := (t64 >> 16) * nanoPerSec
frac := (t64 & 0xffff) * nanoPerSec >> 16
return time.Duration(sec + frac)
sec := uint64(t>>16) * nanoPerSec
frac := uint64(t&0xffff) * nanoPerSec
nsec := frac >> 16
if uint16(frac) >= 0x8000 {
nsec++
}
return time.Duration(sec + nsec)
}
// msg is an internal representation of an NTP packet.

View File

@ -29,12 +29,13 @@ import (
"sync"
"time"
"github.com/waku-org/go-discover/discover/v5wire"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/waku-org/go-discover/discover/v5wire"
)
const (
@ -846,6 +847,17 @@ func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes {
return resp
}
func (t *UDPv5) SetFallbackNodes(nodes []*enode.Node) error {
err := t.tab.setFallbackNodes(nodes)
if err != nil {
return err
}
refreshDone := make(chan struct{})
t.tab.doRefresh(refreshDone)
<-refreshDone
return nil
}
// handleTalkRequest runs the talk request handler of the requested protocol.
func (t *UDPv5) handleTalkRequest(p *v5wire.TalkRequest, fromID enode.ID, fromAddr *net.UDPAddr) {
t.trlock.Lock()

View File

@ -11,6 +11,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence/migrations"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -20,6 +21,7 @@ type MessageProvider interface {
Put(env *protocol.Envelope) error
Query(query *pb.HistoryQuery) ([]StoredMessage, error)
MostRecentTimestamp() (int64, error)
Start(timesource timesource.Timesource) error
Stop()
}
@ -31,8 +33,9 @@ const WALMode = "wal"
// DBStore is a MessageProvider that has a *sql.DB connection
type DBStore struct {
MessageProvider
db *sql.DB
log *zap.Logger
db *sql.DB
timesource timesource.Timesource
log *zap.Logger
maxMessages int
maxDuration time.Duration
@ -146,25 +149,31 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
}
}
err = result.cleanOlderRecords()
if err != nil {
return nil, err
}
result.wg.Add(1)
go result.checkForOlderRecords(10 * time.Second) // is 10s okay?
return result, nil
}
func (d *DBStore) Start(timesource timesource.Timesource) error {
d.timesource = timesource
err := d.cleanOlderRecords()
if err != nil {
return err
}
d.wg.Add(1)
go d.checkForOlderRecords(60 * time.Second)
return nil
}
func (d *DBStore) cleanOlderRecords() error {
d.log.Debug("Cleaning older records...")
d.log.Info("Cleaning older records...")
// Delete older messages
if d.maxDuration > 0 {
start := time.Now()
sqlStmt := `DELETE FROM message WHERE receiverTimestamp < ?`
_, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(time.Now().Add(-d.maxDuration)))
_, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(d.timesource.Now().Add(-d.maxDuration)))
if err != nil {
return err
}
@ -184,6 +193,8 @@ func (d *DBStore) cleanOlderRecords() error {
d.log.Debug("deleting excess records from the DB", zap.Duration("duration", elapsed))
}
d.log.Info("Older records removed")
return nil
}
@ -257,19 +268,6 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
parameters = append(parameters, query.PubsubTopic)
}
if query.StartTime != 0 {
conditions = append(conditions, "id >= ?")
startTimeDBKey := NewDBKey(uint64(query.StartTime), "", []byte{})
parameters = append(parameters, startTimeDBKey.Bytes())
}
if query.EndTime != 0 {
conditions = append(conditions, "id <= ?")
endTimeDBKey := NewDBKey(uint64(query.EndTime), "", []byte{})
parameters = append(parameters, endTimeDBKey.Bytes())
}
if len(query.ContentFilters) != 0 {
var ctPlaceHolder []string
for _, ct := range query.ContentFilters {
@ -281,7 +279,9 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
conditions = append(conditions, "contentTopic IN ("+strings.Join(ctPlaceHolder, ", ")+")")
}
usesCursor := false
if query.PagingInfo.Cursor != nil {
usesCursor = true
var exists bool
cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest)
@ -306,6 +306,23 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
}
}
if query.StartTime != 0 {
if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
conditions = append(conditions, "id >= ?")
startTimeDBKey := NewDBKey(uint64(query.StartTime), "", []byte{})
parameters = append(parameters, startTimeDBKey.Bytes())
}
}
if query.EndTime != 0 {
if !usesCursor || query.PagingInfo.Direction == pb.PagingInfo_FORWARD {
conditions = append(conditions, "id <= ?")
endTimeDBKey := NewDBKey(uint64(query.EndTime), "", []byte{})
parameters = append(parameters, endTimeDBKey.Bytes())
}
}
conditionStr := ""
if len(conditions) != 0 {
conditionStr = "WHERE " + strings.Join(conditions, " AND ")
@ -342,7 +359,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
}
defer rows.Close()
cursor := &pb.Index{}
var cursor *pb.Index
if len(result) != 0 {
if len(result) > int(query.PagingInfo.PageSize) {
result = result[0:query.PagingInfo.PageSize]

View File

@ -207,6 +207,10 @@ func (d *DiscoveryV5) Start() error {
return nil
}
func (d *DiscoveryV5) SetBootnodes(nodes []*enode.Node) error {
return d.listener.SetFallbackNodes(nodes)
}
func (d *DiscoveryV5) Stop() {
d.Lock()
defer d.Unlock()

View File

@ -19,6 +19,7 @@ var (
StoreMessages = stats.Int64("store_messages", "Number of historical messages", stats.UnitDimensionless)
FilterSubscriptions = stats.Int64("filter_subscriptions", "Number of filter subscriptions", stats.UnitDimensionless)
StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless)
StoreQueries = stats.Int64("store_queries", "Number of store queries", stats.UnitDimensionless)
LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless)
PeerExchangeError = stats.Int64("errors", "Number of errors in peer exchange protocol", stats.UnitDimensionless)
)
@ -48,6 +49,12 @@ var (
Description: "The number of the messages received",
Aggregation: view.Count(),
}
StoreQueriesView = &view.View{
Name: "gowaku_store_queries",
Measure: StoreQueries,
Description: "The number of the store queries received",
Aggregation: view.Count(),
}
StoreMessagesView = &view.View{
Name: "gowaku_store_messages",
Measure: StoreMessages,
@ -102,6 +109,10 @@ func RecordMessage(ctx context.Context, tagType string, len int) {
}
}
func RecordStoreQuery(ctx context.Context) {
stats.Record(ctx, StoreQueries.M(1))
}
func RecordStoreError(ctx context.Context, tagType string) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(ErrorType, tagType)}, StoreErrors.M(1)); err != nil {
utils.Logger().Error("failed to record with tags", zap.Error(err))

View File

@ -24,13 +24,14 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
ticker := time.NewTicker(t)
defer ticker.Stop()
lastTimeExecuted := <-ticker.C
lastTimeExecuted := w.timesource.Now()
sleepDetectionInterval := int64(t) * 3
for {
select {
case <-ticker.C:
difference := time.Now().UnixNano() - lastTimeExecuted.UnixNano()
difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano()
if difference > sleepDetectionInterval {
w.log.Warn("keep alive hasnt been executed recently. Killing all connections to peers")
for _, p := range w.host.Network().Peers() {
@ -39,7 +40,7 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
w.log.Warn("while disconnecting peer", zap.Error(err))
}
}
lastTimeExecuted = time.Now()
lastTimeExecuted = w.timesource.Now()
continue
}
@ -52,7 +53,7 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
}
}
lastTimeExecuted = time.Now()
lastTimeExecuted = w.timesource.Now()
case <-w.quit:
return
}

View File

@ -36,6 +36,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/swap"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -63,9 +64,10 @@ type RLNRelay interface {
}
type WakuNode struct {
host host.Host
opts *WakuNodeParameters
log *zap.Logger
host host.Host
opts *WakuNodeParameters
log *zap.Logger
timesource timesource.Timesource
relay *relay.WakuRelay
filter *filter.WakuFilter
@ -105,7 +107,7 @@ type WakuNode struct {
}
func defaultStoreFactory(w *WakuNode) store.Store {
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.log)
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.timesource, w.log)
}
// New is used to instantiate a WakuNode using a set of WakuNodeOptions
@ -174,6 +176,12 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.keepAliveFails = make(map[peer.ID]int)
w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay)
if params.enableNTP {
w.timesource = timesource.NewNTPTimesource(w.opts.ntpURLs, w.log)
} else {
w.timesource = timesource.NewDefaultClock()
}
if params.storeFactory != nil {
w.storeFactory = params.storeFactory
} else {
@ -259,6 +267,13 @@ func (w *WakuNode) checkForAddressChanges() {
// Start initializes all the protocols that were setup in the WakuNode
func (w *WakuNode) Start() error {
if w.opts.enableNTP {
err := w.timesource.Start()
if err != nil {
return err
}
}
if w.opts.enableSwap {
w.swap = swap.NewWakuSwap(w.log, []swap.SwapOption{
swap.WithMode(w.opts.swapMode),
@ -268,11 +283,14 @@ func (w *WakuNode) Start() error {
w.store = w.storeFactory(w)
if w.opts.enableStore {
w.startStore()
err := w.startStore()
if err != nil {
return err
}
}
if w.opts.enableFilter {
filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.log, w.opts.filterOpts...)
filter, err := filter.NewWakuFilter(w.ctx, w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
if err != nil {
return err
}
@ -302,9 +320,11 @@ func (w *WakuNode) Start() error {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...))
}
err = w.mountRelay(w.opts.minRelayPeersToPublish, w.opts.wOpts...)
if err != nil {
return err
if w.opts.enableRelay {
err = w.mountRelay(w.opts.minRelayPeersToPublish, w.opts.wOpts...)
if err != nil {
return err
}
}
if w.opts.enableRLN {
@ -360,11 +380,19 @@ func (w *WakuNode) Stop() {
w.discoveryV5.Stop()
}
w.relay.Stop()
if w.opts.enableRelay {
w.relay.Stop()
}
w.lightPush.Stop()
w.store.Stop()
_ = w.stopRlnRelay()
err := w.timesource.Stop()
if err != nil {
w.log.Error("stopping timesource", zap.Error(err))
}
w.host.Close()
w.wg.Wait()
@ -395,6 +423,12 @@ func (w *WakuNode) ENR() *enode.Node {
return w.localNode.Node()
}
// Timesource returns the timesource used by this node to obtain the current wall time
// Depending on the configuration it will be the local time or a ntp syncd time
func (w *WakuNode) Timesource() timesource.Timesource {
return w.timesource
}
// Relay is used to access any operation related to Waku Relay protocol
func (w *WakuNode) Relay() *relay.WakuRelay {
return w.relay
@ -461,7 +495,7 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error {
func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) error {
var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.log, opts...)
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.timesource, w.log, opts...)
if err != nil {
return err
}
@ -499,44 +533,39 @@ func (w *WakuNode) mountPeerExchange() error {
return w.peerExchange.Start()
}
func (w *WakuNode) startStore() {
w.store.Start(w.ctx)
func (w *WakuNode) startStore() error {
err := w.store.Start(w.ctx)
if err != nil {
w.log.Error("starting store", zap.Error(err))
return err
}
if w.opts.shouldResume {
if len(w.opts.resumeNodes) != 0 {
// TODO: extract this to a function and run it when you go offline
// TODO: determine if a store is listening to a topic
var peerIDs []peer.ID
for _, n := range w.opts.resumeNodes {
pID, err := w.AddPeer(n, string(store.StoreID_v20beta4))
if err != nil {
w.log.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err))
}
peerIDs = append(peerIDs, pID)
}
w.wg.Add(1)
go func() {
defer w.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
peerVerif:
for {
select {
case <-w.quit:
return
case <-ticker.C:
_, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta4), nil, w.log)
if err == nil {
break peerVerif
}
}
}
ctxWithTimeout, ctxCancel := context.WithTimeout(w.ctx, 20*time.Second)
defer ctxCancel()
if _, err := w.store.Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), nil); err != nil {
w.log.Info("Retrying in 10s...")
time.Sleep(10 * time.Second)
} else {
break
}
ctxWithTimeout, ctxCancel := context.WithTimeout(w.ctx, 20*time.Second)
defer ctxCancel()
if _, err := w.store.Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), peerIDs); err != nil {
w.log.Error("Could not resume history", zap.Error(err))
time.Sleep(10 * time.Second)
}
}()
}
return nil
}
func (w *WakuNode) addPeer(info *peer.AddrInfo, protocols ...string) error {
@ -551,13 +580,13 @@ func (w *WakuNode) addPeer(info *peer.AddrInfo, protocols ...string) error {
}
// AddPeer is used to add a peer and the protocols it support to the node peerstore
func (w *WakuNode) AddPeer(address ma.Multiaddr, protocols ...string) (*peer.ID, error) {
func (w *WakuNode) AddPeer(address ma.Multiaddr, protocols ...string) (peer.ID, error) {
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
return nil, err
return "", err
}
return &info.ID, w.addPeer(info, protocols...)
return info.ID, w.addPeer(info, protocols...)
}
// DialPeerWithMultiAddress is used to connect to a peer using a multiaddress

View File

@ -47,7 +47,7 @@ func (w *WakuNode) mountRlnRelay() error {
}
// mount rlnrelay in off-chain mode with a static group of users
rlnRelay, err := rln.RlnRelayStatic(w.ctx, w.relay, groupKeys, memKeyPair, memIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.log)
rlnRelay, err := rln.RlnRelayStatic(w.ctx, w.relay, groupKeys, memKeyPair, memIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.timesource, w.log)
if err != nil {
return err
}
@ -81,7 +81,7 @@ func (w *WakuNode) mountRlnRelay() error {
// mount the rln relay protocol in the on-chain/dynamic mode
var err error
w.rlnRelay, err = rln.RlnRelayDynamic(context.Background(), w.relay, w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.log)
w.rlnRelay, err = rln.RlnRelayDynamic(context.Background(), w.relay, w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.timesource, w.log)
if err != nil {
return err
}

View File

@ -26,6 +26,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -45,6 +46,9 @@ type WakuNodeParameters struct {
privKey *ecdsa.PrivateKey
libP2POpts []libp2p.Option
enableNTP bool
ntpURLs []string
enableWS bool
wsPort int
enableWSS bool
@ -63,8 +67,8 @@ type WakuNodeParameters struct {
enableStore bool
enableSwap bool
shouldResume bool
storeMsgs bool
resumeNodes []multiaddr.Multiaddr
messageProvider store.MessageProvider
swapMode int
@ -106,7 +110,6 @@ type WakuNodeOption func(*WakuNodeParameters) error
// Default options used in the libp2p node
var DefaultWakuNodeOptions = []WakuNodeOption{
WithLogger(utils.Logger()),
WithWakuRelay(),
}
// MultiAddresses return the list of multiaddresses configured in the node
@ -234,6 +237,21 @@ func WithPrivateKey(privKey *ecdsa.PrivateKey) WakuNodeOption {
}
}
// WithNTP is used to use ntp for any operation that requires obtaining time
// A list of ntp servers can be passed but if none is specified, some defaults
// will be used
func WithNTP(ntpURLs ...string) WakuNodeOption {
return func(params *WakuNodeParameters) error {
if len(ntpURLs) == 0 {
ntpURLs = timesource.DefaultServers
}
params.enableNTP = true
params.ntpURLs = ntpURLs
return nil
}
}
// GetPrivKey returns the private key used in the node
func (w *WakuNodeParameters) GetPrivKey() *crypto.PrivKey {
privKey := crypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(w.privKey))
@ -300,11 +318,11 @@ func WithWakuFilter(fullNode bool, filterOpts ...filter.Option) WakuNodeOption {
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
// be stored or not in a message provider
func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption {
func WithWakuStore(shouldStoreMessages bool, resumeNodes []multiaddr.Multiaddr) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableStore = true
params.storeMsgs = shouldStoreMessages
params.shouldResume = shouldResume
params.resumeNodes = resumeNodes
return nil
}
}

View File

@ -3,14 +3,17 @@ package filter
import (
"sync"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
type FilterMap struct {
sync.RWMutex
items map[string]Filter
timesource timesource.Timesource
items map[string]Filter
broadcaster v2.Broadcaster
}
type FilterMapItem struct {
@ -18,9 +21,11 @@ type FilterMapItem struct {
Value Filter
}
func NewFilterMap() *FilterMap {
func NewFilterMap(broadcaster v2.Broadcaster, timesource timesource.Timesource) *FilterMap {
return &FilterMap{
items: make(map[string]Filter),
timesource: timesource,
items: make(map[string]Filter),
broadcaster: broadcaster,
}
}
@ -79,24 +84,29 @@ func (fm *FilterMap) Notify(msg *pb.WakuMessage, requestId string) {
fm.RLock()
defer fm.RUnlock()
for key, filter := range fm.items {
envelope := protocol.NewEnvelope(msg, utils.GetUnixEpoch(), filter.Topic)
filter, ok := fm.items[requestId]
if !ok {
// We do this because the key for the filter is set to the requestId received from the filter protocol.
// This means we do not need to check the content filter explicitly as all MessagePushs already contain
// the requestId of the coresponding filter.
if requestId != "" && requestId == key {
filter.Chan <- envelope
continue
}
return
}
// TODO: In case of no topics we should either trigger here for all messages,
// or we should not allow such filter to exist in the first place.
for _, contentTopic := range filter.ContentFilters {
if msg.ContentTopic == contentTopic {
filter.Chan <- envelope
break
}
envelope := protocol.NewEnvelope(msg, fm.timesource.Now().UnixNano(), filter.Topic)
// Broadcasting message so it's stored
fm.broadcaster.Submit(envelope)
if msg.ContentTopic == "" {
filter.Chan <- envelope
}
// TODO: In case of no topics we should either trigger here for all messages,
// or we should not allow such filter to exist in the first place.
for _, contentTopic := range filter.ContentFilters {
if msg.ContentTopic == contentTopic {
filter.Chan <- envelope
break
}
}
}

View File

@ -13,9 +13,12 @@ import (
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/protoio"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
@ -62,7 +65,7 @@ type (
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *zap.Logger, opts ...Option) (*WakuFilter, error) {
func NewWakuFilter(ctx context.Context, host host.Host, broadcaster v2.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) (*WakuFilter, error) {
wf := new(WakuFilter)
wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode))
@ -84,7 +87,7 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *za
wf.MsgC = make(chan *protocol.Envelope, 1024)
wf.h = host
wf.isFullNode = isFullNode
wf.filters = NewFilterMap()
wf.filters = NewFilterMap(broadcaster, timesource)
wf.subscribers = NewSubscribers(params.timeout)
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
@ -126,6 +129,10 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
// This is a filter request coming from a light node.
if filterRPCRequest.Request.Subscribe {
subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request}
if subscriber.filter.Topic == "" { // @TODO: review if empty topic is possible
subscriber.filter.Topic = relay.DefaultWakuTopic
}
len := wf.subscribers.Append(subscriber)
logger.Info("adding subscriber")
@ -192,7 +199,7 @@ func (wf *WakuFilter) filterListener() {
for subscriber := range wf.subscribers.Items(&(msg.ContentTopic)) {
logger := logger.With(logging.HostID("subscriber", subscriber.peer))
subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines
if subscriber.filter.Topic != "" && subscriber.filter.Topic != pubsubTopic {
if subscriber.filter.Topic != pubsubTopic {
logger.Info("pubsub topic mismatch",
zap.String("subscriberTopic", subscriber.filter.Topic),
zap.String("messageTopic", pubsubTopic))

View File

@ -22,7 +22,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/metrics"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
@ -30,8 +30,9 @@ const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String()
type WakuRelay struct {
host host.Host
pubsub *pubsub.PubSub
host host.Host
pubsub *pubsub.PubSub
timesource timesource.Timesource
log *zap.Logger
@ -55,9 +56,10 @@ func msgIdFn(pmsg *pubsub_pb.Message) string {
}
// NewWakuRelay returns a new instance of a WakuRelay struct
func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, log *zap.Logger, opts ...pubsub.Option) (*WakuRelay, error) {
func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) (*WakuRelay, error) {
w := new(WakuRelay)
w.host = h
w.timesource = timesource
w.wakuRelayTopics = make(map[string]*pubsub.Topic)
w.relaySubs = make(map[string]*pubsub.Subscription)
w.subscriptions = make(map[string][]*Subscription)
@ -343,7 +345,7 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *
return
}
envelope := waku_proto.NewEnvelope(wakuMessage, utils.GetUnixEpoch(), string(t))
envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), string(t))
w.log.Info("waku.relay received", logging.HexString("hash", envelope.Hash()))

View File

@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)
@ -20,6 +21,7 @@ func RlnRelayStatic(
pubsubTopic string,
contentTopic string,
spamHandler SpamHandler,
timesource timesource.Timesource,
log *zap.Logger,
) (*WakuRLNRelay, error) {
log = log.Named("rln-static")
@ -45,6 +47,7 @@ func RlnRelayStatic(
pubsubTopic: pubsubTopic,
contentTopic: contentTopic,
log: log,
timesource: timesource,
nullifierLog: make(map[r.Epoch][]r.ProofMetadata),
}
@ -87,6 +90,7 @@ func RlnRelayDynamic(
contentTopic string,
spamHandler SpamHandler,
registrationHandler RegistrationHandler,
timesource timesource.Timesource,
log *zap.Logger,
) (*WakuRLNRelay, error) {
log = log.Named("rln-dynamic")
@ -109,6 +113,7 @@ func RlnRelayDynamic(
pubsubTopic: pubsubTopic,
contentTopic: contentTopic,
log: log,
timesource: timesource,
nullifierLog: make(map[r.Epoch][]r.ProofMetadata),
registrationHandler: registrationHandler,
}

View File

@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
@ -33,7 +34,8 @@ type RegistrationHandler = func(tx *types.Transaction)
const AcceptableRootWindowSize = 5
type WakuRLNRelay struct {
ctx context.Context
ctx context.Context
timesource timesource.Timesource
membershipKeyPair *r.MembershipKeyPair
@ -206,7 +208,7 @@ func (rln *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime *time
epoch = r.CalcEpoch(*optionalTime)
} else {
// get current rln epoch
epoch = r.GetCurrentEpoch()
epoch = r.CalcEpoch(rln.timesource.Now())
}
msgProof := ToRateLimitProof(msg)

View File

@ -1,739 +0,0 @@
package store
import (
"context"
"encoding/hex"
"errors"
"math"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/protoio"
"go.uber.org/zap"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/swap"
"github.com/waku-org/go-waku/waku/v2/utils"
)
// StoreID_v20beta4 is the current Waku Store protocol identifier
const StoreID_v20beta4 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4")
// MaxPageSize is the maximum number of waku messages to return per page
const MaxPageSize = 100
// MaxContentFilters is the maximum number of allowed content filters in a query
const MaxContentFilters = 10
// MaxTimeVariance is the maximum duration in the future allowed for a message timestamp
const MaxTimeVariance = time.Duration(20) * time.Second
var (
// ErrMaxContentFilters is returned when the number of content topics in the query
// exceeds the limit
ErrMaxContentFilters = errors.New("exceeds the maximum number of content filters allowed")
// ErrNoPeersAvailable is returned when there are no store peers in the peer store
// that could be used to retrieve message history
ErrNoPeersAvailable = errors.New("no suitable remote peers")
// ErrInvalidId is returned when no RequestID is given
ErrInvalidId = errors.New("invalid request id")
// ErrFailedToResumeHistory is returned when the node attempted to retrieve historic
// messages to fill its own message history but for some reason it failed
ErrFailedToResumeHistory = errors.New("failed to resume the history")
// ErrFailedQuery is emitted when the query fails to return results
ErrFailedQuery = errors.New("failed to resolve the query")
ErrFutureMessage = errors.New("message timestamp in the future")
)
func findMessages(query *pb.HistoryQuery, msgProvider MessageProvider) ([]*pb.WakuMessage, *pb.PagingInfo, error) {
if query.PagingInfo == nil {
query.PagingInfo = &pb.PagingInfo{
Direction: pb.PagingInfo_FORWARD,
}
}
if query.PagingInfo.PageSize == 0 || query.PagingInfo.PageSize > uint64(MaxPageSize) {
query.PagingInfo.PageSize = MaxPageSize
}
if len(query.ContentFilters) > MaxContentFilters {
return nil, nil, ErrMaxContentFilters
}
cursor, queryResult, err := msgProvider.Query(query)
if err != nil {
return nil, nil, err
}
if len(queryResult) == 0 { // no pagination is needed for an empty list
newPagingInfo := &pb.PagingInfo{PageSize: 0, Cursor: query.PagingInfo.Cursor, Direction: query.PagingInfo.Direction}
return nil, newPagingInfo, nil
}
newPagingInfo := &pb.PagingInfo{PageSize: query.PagingInfo.PageSize, Cursor: cursor, Direction: query.PagingInfo.Direction}
if newPagingInfo.PageSize > uint64(len(queryResult)) {
newPagingInfo.PageSize = uint64(len(queryResult))
}
resultMessages := make([]*pb.WakuMessage, len(queryResult))
for i := range queryResult {
resultMessages[i] = queryResult[i].Message
}
return resultMessages, newPagingInfo, nil
}
func (store *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse {
result := new(pb.HistoryResponse)
messages, newPagingInfo, err := findMessages(query, store.msgProvider)
if err != nil {
if err == persistence.ErrInvalidCursor {
result.Error = pb.HistoryResponse_INVALID_CURSOR
} else {
// TODO: return error in pb.HistoryResponse
store.log.Error("obtaining messages from db", zap.Error(err))
}
}
result.Messages = messages
result.PagingInfo = newPagingInfo
return result
}
type MessageProvider interface {
GetAll() ([]persistence.StoredMessage, error)
Query(query *pb.HistoryQuery) (*pb.Index, []persistence.StoredMessage, error)
Put(env *protocol.Envelope) error
MostRecentTimestamp() (int64, error)
Stop()
Count() (int, error)
}
type Query struct {
Topic string
ContentTopics []string
StartTime int64
EndTime int64
}
// Result represents a valid response from a store node
type Result struct {
Messages []*pb.WakuMessage
query *pb.HistoryQuery
cursor *pb.Index
peerId peer.ID
}
func (r *Result) Cursor() *pb.Index {
return r.cursor
}
func (r *Result) IsComplete() bool {
return len(r.cursor.Digest) == 0
}
func (r *Result) PeerID() peer.ID {
return r.peerId
}
func (r *Result) Query() *pb.HistoryQuery {
return r.query
}
type WakuStore struct {
ctx context.Context
MsgC chan *protocol.Envelope
wg *sync.WaitGroup
log *zap.Logger
started bool
quit chan struct{}
msgProvider MessageProvider
h host.Host
swap *swap.WakuSwap
}
type criteriaFN = func(msg *pb.WakuMessage) (bool, error)
type Store interface {
Start(ctx context.Context)
Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error)
Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error)
Next(ctx context.Context, r *Result) (*Result, error)
Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error)
MessageChannel() chan *protocol.Envelope
Stop()
}
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, log *zap.Logger) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.msgProvider = p
wakuStore.h = host
wakuStore.swap = swap
wakuStore.wg = &sync.WaitGroup{}
wakuStore.log = log.Named("store")
wakuStore.quit = make(chan struct{})
return wakuStore
}
// SetMessageProvider allows switching the message provider used with a WakuStore
func (store *WakuStore) SetMessageProvider(p MessageProvider) {
store.msgProvider = p
}
// Start initializes the WakuStore by enabling the protocol and fetching records from a message provider
func (store *WakuStore) Start(ctx context.Context) {
if store.started {
return
}
if store.msgProvider == nil {
store.log.Info("Store protocol started (no message provider)")
return
}
store.started = true
store.ctx = ctx
store.MsgC = make(chan *protocol.Envelope, 1024)
store.h.SetStreamHandlerMatch(StoreID_v20beta4, protocol.PrefixTextMatch(string(StoreID_v20beta4)), store.onRequest)
store.wg.Add(2)
go store.storeIncomingMessages(ctx)
go store.updateMetrics(ctx)
store.log.Info("Store protocol started")
}
func (store *WakuStore) storeMessage(env *protocol.Envelope) error {
// Ensure that messages don't "jump" to the front of the queue with future timestamps
if env.Index().SenderTime-env.Index().ReceiverTime > int64(MaxTimeVariance) {
return ErrFutureMessage
}
if env.Message().Ephemeral {
return nil
}
err := store.msgProvider.Put(env)
if err != nil {
store.log.Error("storing message", zap.Error(err))
metrics.RecordStoreError(store.ctx, "store_failure")
return err
}
return nil
}
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
defer store.wg.Done()
for envelope := range store.MsgC {
_ = store.storeMessage(envelope)
}
}
func (store *WakuStore) updateMetrics(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
defer store.wg.Done()
for {
select {
case <-ticker.C:
msgCount, err := store.msgProvider.Count()
if err != nil {
store.log.Error("updating store metrics", zap.Error(err))
} else {
metrics.RecordMessage(store.ctx, "stored", msgCount)
}
case <-store.quit:
return
}
}
}
func (store *WakuStore) onRequest(s network.Stream) {
defer s.Close()
logger := store.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
historyRPCRequest := &pb.HistoryRPC{}
writer := protoio.NewDelimitedWriter(s)
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
err := reader.ReadMsg(historyRPCRequest)
if err != nil {
logger.Error("reading request", zap.Error(err))
metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
return
}
logger = logger.With(zap.String("id", historyRPCRequest.RequestId))
if query := historyRPCRequest.Query; query != nil {
logger = logger.With(logging.Filters(query.GetContentFilters()))
}
logger.Info("received query")
historyResponseRPC := &pb.HistoryRPC{}
historyResponseRPC.RequestId = historyRPCRequest.RequestId
historyResponseRPC.Response = store.FindMessages(historyRPCRequest.Query)
logger = logger.With(zap.Int("messages", len(historyResponseRPC.Response.Messages)))
err = writer.WriteMsg(historyResponseRPC)
if err != nil {
logger.Error("writing response", zap.Error(err), logging.PagingInfo(historyResponseRPC.Response.PagingInfo))
_ = s.Reset()
} else {
logger.Info("response sent")
}
}
type HistoryRequestParameters struct {
selectedPeer peer.ID
requestId []byte
cursor *pb.Index
pageSize uint64
asc bool
s *WakuStore
}
type HistoryRequestOption func(*HistoryRequestParameters)
// WithPeer is an option used to specify the peerID to request the message history
func WithPeer(p peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.selectedPeer = p
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
// to request the message history. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log)
if err == nil {
params.selectedPeer = *p
} else {
params.s.log.Info("selecting peer", zap.Error(err))
}
}
}
// WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log)
if err == nil {
params.selectedPeer = *p
} else {
params.s.log.Info("selecting peer", zap.Error(err))
}
}
}
func WithRequestId(requestId []byte) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = requestId
}
}
func WithAutomaticRequestId() HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = protocol.GenerateRequestId()
}
}
func WithCursor(c *pb.Index) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.cursor = c
}
}
// WithPaging is an option used to specify the order and maximum number of records to return
func WithPaging(asc bool, pageSize uint64) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.asc = asc
params.pageSize = pageSize
}
}
// Default options to be used when querying a store node for results
func DefaultOptions() []HistoryRequestOption {
return []HistoryRequestOption{
WithAutomaticRequestId(),
WithAutomaticPeerSelection(),
WithPaging(true, MaxPageSize),
}
}
func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) {
logger := store.log.With(logging.HostID("peer", selectedPeer))
logger.Info("querying message history")
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := store.h.Connect(ctx, store.h.Peerstore().PeerInfo(selectedPeer))
if err != nil {
logger.Error("connecting to peer", zap.Error(err))
return nil, err
}
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
return nil, err
}
defer connOpt.Close()
defer func() {
_ = connOpt.Reset()
}()
historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestId)}
writer := protoio.NewDelimitedWriter(connOpt)
reader := protoio.NewDelimitedReader(connOpt, math.MaxInt32)
err = writer.WriteMsg(historyRequest)
if err != nil {
logger.Error("writing request", zap.Error(err))
return nil, err
}
historyResponseRPC := &pb.HistoryRPC{}
err = reader.ReadMsg(historyResponseRPC)
if err != nil {
logger.Error("reading response", zap.Error(err))
metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
return nil, err
}
if historyResponseRPC.Response == nil {
historyResponseRPC.Response = new(pb.HistoryResponse)
}
if historyResponseRPC.Response.PagingInfo == nil {
historyResponseRPC.Response.PagingInfo = new(pb.PagingInfo)
historyResponseRPC.Response.PagingInfo.Cursor = new(pb.Index)
}
metrics.RecordMessage(ctx, "retrieved", len(historyResponseRPC.Response.Messages))
return historyResponseRPC.Response, nil
}
func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) {
q := &pb.HistoryQuery{
PubsubTopic: query.Topic,
ContentFilters: []*pb.ContentFilter{},
StartTime: query.StartTime,
EndTime: query.EndTime,
PagingInfo: &pb.PagingInfo{},
}
for _, cf := range query.ContentTopics {
q.ContentFilters = append(q.ContentFilters, &pb.ContentFilter{ContentTopic: cf})
}
if len(q.ContentFilters) > MaxContentFilters {
return nil, ErrMaxContentFilters
}
params := new(HistoryRequestParameters)
params.s = store
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
if params.selectedPeer == "" {
return nil, ErrNoPeersAvailable
}
if len(params.requestId) == 0 {
return nil, ErrInvalidId
}
if params.cursor != nil {
q.PagingInfo.Cursor = params.cursor
}
if params.asc {
q.PagingInfo.Direction = pb.PagingInfo_FORWARD
} else {
q.PagingInfo.Direction = pb.PagingInfo_BACKWARD
}
pageSize := params.pageSize
if pageSize == 0 || pageSize > uint64(MaxPageSize) {
pageSize = MaxPageSize
}
q.PagingInfo.PageSize = pageSize
response, err := store.queryFrom(ctx, q, params.selectedPeer, params.requestId)
if err != nil {
return nil, err
}
if response.Error == pb.HistoryResponse_INVALID_CURSOR {
return nil, errors.New("invalid cursor")
}
var messageIDs [][]byte
for _, m := range response.Messages {
messageID, _, _ := m.Hash()
messageIDs = append(messageIDs, messageID)
}
store.log.Info("waku.store retrieved", logging.HexArray("hashes", messageIDs))
result := &Result{
Messages: response.Messages,
query: q,
cursor: response.PagingInfo.Cursor,
peerId: params.selectedPeer,
}
return result, nil
}
// Find the first message that matches a criteria. criteriaCB is a function that will be invoked for each message and returns true if the message matches the criteria
func (store *WakuStore) Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error) {
if cb == nil {
return nil, errors.New("callback can't be null")
}
result, err := store.Query(ctx, query, opts...)
if err != nil {
return nil, err
}
for {
for _, m := range result.Messages {
found, err := cb(m)
if err != nil {
return nil, err
}
if found {
return m, nil
}
}
if result.IsComplete() {
break
}
result, err = store.Next(ctx, result)
if err != nil {
return nil, err
}
}
return nil, nil
}
// Next is used with to retrieve the next page of rows from a query response.
// If no more records are found, the result will not contain any messages.
// This function is useful for iterating over results without having to manually
// specify the cursor and pagination order and max number of results
func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {
if r.IsComplete() {
return &Result{
Messages: []*pb.WakuMessage{},
cursor: &pb.Index{},
query: r.query,
peerId: r.PeerID(),
}, nil
}
q := &pb.HistoryQuery{
PubsubTopic: r.Query().PubsubTopic,
ContentFilters: r.Query().ContentFilters,
StartTime: r.Query().StartTime,
EndTime: r.Query().EndTime,
PagingInfo: &pb.PagingInfo{
PageSize: r.Query().PagingInfo.PageSize,
Direction: r.Query().PagingInfo.Direction,
Cursor: &pb.Index{
Digest: r.Cursor().Digest,
ReceiverTime: r.Cursor().ReceiverTime,
SenderTime: r.Cursor().SenderTime,
PubsubTopic: r.Cursor().PubsubTopic,
},
},
}
response, err := store.queryFrom(ctx, q, r.PeerID(), protocol.GenerateRequestId())
if err != nil {
return nil, err
}
if response.Error == pb.HistoryResponse_INVALID_CURSOR {
return nil, errors.New("invalid cursor")
}
return &Result{
Messages: response.Messages,
cursor: response.PagingInfo.Cursor,
query: q,
peerId: r.PeerID(),
}, nil
}
func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, candidateList []peer.ID) ([]*pb.WakuMessage, error) {
// loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully
// returns the number of retrieved messages, or error if all the requests fail
queryWg := sync.WaitGroup{}
queryWg.Add(len(candidateList))
resultChan := make(chan *pb.HistoryResponse, len(candidateList))
for _, peer := range candidateList {
func() {
defer queryWg.Done()
result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId())
if err == nil {
resultChan <- result
return
}
store.log.Error("resuming history", logging.HostID("peer", peer), zap.Error(err))
}()
}
queryWg.Wait()
close(resultChan)
var messages []*pb.WakuMessage
hasResults := false
for result := range resultChan {
hasResults = true
messages = append(messages, result.Messages...)
}
if hasResults {
return messages, nil
}
return nil, ErrFailedQuery
}
func (store *WakuStore) findLastSeen() (int64, error) {
return store.msgProvider.MostRecentTimestamp()
}
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
// Resume retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
// messages are stored in the store node's messages field and in the message db
// the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
// an offset of 20 second is added to the time window to count for nodes asynchrony
// the history is fetched from one of the peers persisted in the waku store node's peer manager unit
// peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
// if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window.
// the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) {
if !store.started {
return 0, errors.New("can't resume: store has not started")
}
currentTime := utils.GetUnixEpoch()
lastSeenTime, err := store.findLastSeen()
if err != nil {
return 0, err
}
var offset int64 = int64(20 * time.Nanosecond)
currentTime = currentTime + offset
lastSeenTime = max(lastSeenTime-offset, 0)
rpc := &pb.HistoryQuery{
PubsubTopic: pubsubTopic,
StartTime: lastSeenTime,
EndTime: currentTime,
PagingInfo: &pb.PagingInfo{
PageSize: 0,
Direction: pb.PagingInfo_BACKWARD,
},
}
if len(peerList) == 0 {
p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), nil, store.log)
if err != nil {
store.log.Info("selecting peer", zap.Error(err))
return -1, ErrNoPeersAvailable
}
peerList = append(peerList, *p)
}
messages, err := store.queryLoop(ctx, rpc, peerList)
if err != nil {
store.log.Error("resuming history", zap.Error(err))
return -1, ErrFailedToResumeHistory
}
msgCount := 0
for _, msg := range messages {
if err = store.storeMessage(protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic)); err == nil {
msgCount++
}
}
store.log.Info("retrieved messages since the last online time", zap.Int("messages", len(messages)))
return msgCount, nil
}
func (store *WakuStore) MessageChannel() chan *protocol.Envelope {
return store.MsgC
}
// TODO: queryWithAccounting
// Stop closes the store message channel and removes the protocol stream handler
func (store *WakuStore) Stop() {
store.started = false
if store.MsgC != nil {
close(store.MsgC)
}
if store.msgProvider != nil {
store.quit <- struct{}{}
}
if store.h != nil {
store.h.RemoveStreamHandler(StoreID_v20beta4)
}
store.wg.Wait()
}

View File

@ -0,0 +1,348 @@
package store
import (
"context"
"encoding/hex"
"errors"
"math"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio/protoio"
"go.uber.org/zap"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type Query struct {
Topic string
ContentTopics []string
StartTime int64
EndTime int64
}
// Result represents a valid response from a store node
type Result struct {
Messages []*pb.WakuMessage
query *pb.HistoryQuery
cursor *pb.Index
peerId peer.ID
}
func (r *Result) Cursor() *pb.Index {
return r.cursor
}
func (r *Result) IsComplete() bool {
return r.cursor == nil
}
func (r *Result) PeerID() peer.ID {
return r.peerId
}
func (r *Result) Query() *pb.HistoryQuery {
return r.query
}
type criteriaFN = func(msg *pb.WakuMessage) (bool, error)
type HistoryRequestParameters struct {
selectedPeer peer.ID
requestId []byte
cursor *pb.Index
pageSize uint64
asc bool
s *WakuStore
}
type HistoryRequestOption func(*HistoryRequestParameters)
// WithPeer is an option used to specify the peerID to request the message history
func WithPeer(p peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.selectedPeer = p
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
// to request the message history. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log)
if err == nil {
params.selectedPeer = *p
} else {
params.s.log.Info("selecting peer", zap.Error(err))
}
}
}
// WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log)
if err == nil {
params.selectedPeer = *p
} else {
params.s.log.Info("selecting peer", zap.Error(err))
}
}
}
func WithRequestId(requestId []byte) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = requestId
}
}
func WithAutomaticRequestId() HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = protocol.GenerateRequestId()
}
}
func WithCursor(c *pb.Index) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.cursor = c
}
}
// WithPaging is an option used to specify the order and maximum number of records to return
func WithPaging(asc bool, pageSize uint64) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.asc = asc
params.pageSize = pageSize
}
}
// Default options to be used when querying a store node for results
func DefaultOptions() []HistoryRequestOption {
return []HistoryRequestOption{
WithAutomaticRequestId(),
WithAutomaticPeerSelection(),
WithPaging(true, MaxPageSize),
}
}
func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) {
logger := store.log.With(logging.HostID("peer", selectedPeer))
logger.Info("querying message history")
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := store.h.Connect(ctx, store.h.Peerstore().PeerInfo(selectedPeer))
if err != nil {
logger.Error("connecting to peer", zap.Error(err))
return nil, err
}
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
return nil, err
}
defer connOpt.Close()
defer func() {
_ = connOpt.Reset()
}()
historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestId)}
writer := protoio.NewDelimitedWriter(connOpt)
reader := protoio.NewDelimitedReader(connOpt, math.MaxInt32)
err = writer.WriteMsg(historyRequest)
if err != nil {
logger.Error("writing request", zap.Error(err))
return nil, err
}
historyResponseRPC := &pb.HistoryRPC{}
err = reader.ReadMsg(historyResponseRPC)
if err != nil {
logger.Error("reading response", zap.Error(err))
metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
return nil, err
}
if historyResponseRPC.Response == nil {
logger.Error("empty response")
metrics.RecordStoreError(store.ctx, "emptyRpcResponseFailure")
return nil, ErrEmptyResponse
}
metrics.RecordMessage(ctx, "retrieved", len(historyResponseRPC.Response.Messages))
return historyResponseRPC.Response, nil
}
func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) {
q := &pb.HistoryQuery{
PubsubTopic: query.Topic,
ContentFilters: []*pb.ContentFilter{},
StartTime: query.StartTime,
EndTime: query.EndTime,
PagingInfo: &pb.PagingInfo{},
}
for _, cf := range query.ContentTopics {
q.ContentFilters = append(q.ContentFilters, &pb.ContentFilter{ContentTopic: cf})
}
if len(q.ContentFilters) > MaxContentFilters {
return nil, ErrMaxContentFilters
}
params := new(HistoryRequestParameters)
params.s = store
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
if params.selectedPeer == "" {
return nil, ErrNoPeersAvailable
}
if len(params.requestId) == 0 {
return nil, ErrInvalidId
}
if params.cursor != nil {
q.PagingInfo.Cursor = params.cursor
}
if params.asc {
q.PagingInfo.Direction = pb.PagingInfo_FORWARD
} else {
q.PagingInfo.Direction = pb.PagingInfo_BACKWARD
}
pageSize := params.pageSize
if pageSize == 0 || pageSize > uint64(MaxPageSize) {
pageSize = MaxPageSize
}
q.PagingInfo.PageSize = pageSize
response, err := store.queryFrom(ctx, q, params.selectedPeer, params.requestId)
if err != nil {
return nil, err
}
if response.Error == pb.HistoryResponse_INVALID_CURSOR {
return nil, errors.New("invalid cursor")
}
var messageIDs [][]byte
for _, m := range response.Messages {
messageID, _, _ := m.Hash()
messageIDs = append(messageIDs, messageID)
}
store.log.Info("waku.store retrieved", logging.HexArray("hashes", messageIDs))
result := &Result{
Messages: response.Messages,
query: q,
cursor: response.PagingInfo.Cursor,
peerId: params.selectedPeer,
}
return result, nil
}
// Find the first message that matches a criteria. criteriaCB is a function that will be invoked for each message and returns true if the message matches the criteria
func (store *WakuStore) Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error) {
if cb == nil {
return nil, errors.New("callback can't be null")
}
result, err := store.Query(ctx, query, opts...)
if err != nil {
return nil, err
}
for {
for _, m := range result.Messages {
found, err := cb(m)
if err != nil {
return nil, err
}
if found {
return m, nil
}
}
if result.IsComplete() {
break
}
result, err = store.Next(ctx, result)
if err != nil {
return nil, err
}
}
return nil, nil
}
// Next is used with to retrieve the next page of rows from a query response.
// If no more records are found, the result will not contain any messages.
// This function is useful for iterating over results without having to manually
// specify the cursor and pagination order and max number of results
func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {
if r.IsComplete() {
return &Result{
Messages: []*pb.WakuMessage{},
cursor: nil,
query: r.query,
peerId: r.PeerID(),
}, nil
}
q := &pb.HistoryQuery{
PubsubTopic: r.Query().PubsubTopic,
ContentFilters: r.Query().ContentFilters,
StartTime: r.Query().StartTime,
EndTime: r.Query().EndTime,
PagingInfo: &pb.PagingInfo{
PageSize: r.Query().PagingInfo.PageSize,
Direction: r.Query().PagingInfo.Direction,
Cursor: &pb.Index{
Digest: r.Cursor().Digest,
ReceiverTime: r.Cursor().ReceiverTime,
SenderTime: r.Cursor().SenderTime,
PubsubTopic: r.Cursor().PubsubTopic,
},
},
}
response, err := store.queryFrom(ctx, q, r.PeerID(), protocol.GenerateRequestId())
if err != nil {
return nil, err
}
if response.Error == pb.HistoryResponse_INVALID_CURSOR {
return nil, errors.New("invalid cursor")
}
return &Result{
Messages: response.Messages,
cursor: response.PagingInfo.Cursor,
query: q,
peerId: r.PeerID(),
}, nil
}

View File

@ -0,0 +1,77 @@
package store
import (
"context"
"errors"
"sync"
"github.com/libp2p/go-libp2p/core/host"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/swap"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
)
// StoreID_v20beta4 is the current Waku Store protocol identifier
const StoreID_v20beta4 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4")
// MaxPageSize is the maximum number of waku messages to return per page
const MaxPageSize = 100
// MaxContentFilters is the maximum number of allowed content filters in a query
const MaxContentFilters = 10
var (
// ErrMaxContentFilters is returned when the number of content topics in the query
// exceeds the limit
ErrMaxContentFilters = errors.New("exceeds the maximum number of content filters allowed")
// ErrNoPeersAvailable is returned when there are no store peers in the peer store
// that could be used to retrieve message history
ErrNoPeersAvailable = errors.New("no suitable remote peers")
// ErrInvalidId is returned when no RequestID is given
ErrInvalidId = errors.New("invalid request id")
// ErrFailedToResumeHistory is returned when the node attempted to retrieve historic
// messages to fill its own message history but for some reason it failed
ErrFailedToResumeHistory = errors.New("failed to resume the history")
// ErrFailedQuery is emitted when the query fails to return results
ErrFailedQuery = errors.New("failed to resolve the query")
ErrFutureMessage = errors.New("message timestamp in the future")
ErrEmptyResponse = errors.New("empty store response")
)
type WakuStore struct {
ctx context.Context
timesource timesource.Timesource
MsgC chan *protocol.Envelope
wg *sync.WaitGroup
log *zap.Logger
started bool
quit chan struct{}
msgProvider MessageProvider
h host.Host
swap *swap.WakuSwap
}
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.msgProvider = p
wakuStore.h = host
wakuStore.swap = swap
wakuStore.wg = &sync.WaitGroup{}
wakuStore.log = log.Named("store")
wakuStore.quit = make(chan struct{})
wakuStore.timesource = timesource
return wakuStore
}

View File

@ -0,0 +1,350 @@
package store
import (
"context"
"errors"
"math"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio/protoio"
"go.uber.org/zap"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
// MaxTimeVariance is the maximum duration in the future allowed for a message timestamp
const MaxTimeVariance = time.Duration(20) * time.Second
func findMessages(query *pb.HistoryQuery, msgProvider MessageProvider) ([]*pb.WakuMessage, *pb.PagingInfo, error) {
if query.PagingInfo == nil {
query.PagingInfo = &pb.PagingInfo{
Direction: pb.PagingInfo_FORWARD,
}
}
if query.PagingInfo.PageSize == 0 || query.PagingInfo.PageSize > uint64(MaxPageSize) {
query.PagingInfo.PageSize = MaxPageSize
}
if len(query.ContentFilters) > MaxContentFilters {
return nil, nil, ErrMaxContentFilters
}
cursor, queryResult, err := msgProvider.Query(query)
if err != nil {
return nil, nil, err
}
if len(queryResult) == 0 { // no pagination is needed for an empty list
return nil, &pb.PagingInfo{Cursor: nil}, nil
}
resultMessages := make([]*pb.WakuMessage, len(queryResult))
for i := range queryResult {
resultMessages[i] = queryResult[i].Message
}
return resultMessages, &pb.PagingInfo{Cursor: cursor}, nil
}
func (store *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse {
result := new(pb.HistoryResponse)
messages, newPagingInfo, err := findMessages(query, store.msgProvider)
if err != nil {
if err == persistence.ErrInvalidCursor {
result.Error = pb.HistoryResponse_INVALID_CURSOR
} else {
// TODO: return error in pb.HistoryResponse
store.log.Error("obtaining messages from db", zap.Error(err))
}
}
result.Messages = messages
result.PagingInfo = newPagingInfo
return result
}
type MessageProvider interface {
GetAll() ([]persistence.StoredMessage, error)
Query(query *pb.HistoryQuery) (*pb.Index, []persistence.StoredMessage, error)
Put(env *protocol.Envelope) error
MostRecentTimestamp() (int64, error)
Start(timesource timesource.Timesource) error
Stop()
Count() (int, error)
}
type Store interface {
Start(ctx context.Context) error
Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error)
Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error)
Next(ctx context.Context, r *Result) (*Result, error)
Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error)
MessageChannel() chan *protocol.Envelope
Stop()
}
// SetMessageProvider allows switching the message provider used with a WakuStore
func (store *WakuStore) SetMessageProvider(p MessageProvider) {
store.msgProvider = p
}
// Start initializes the WakuStore by enabling the protocol and fetching records from a message provider
func (store *WakuStore) Start(ctx context.Context) error {
if store.started {
return nil
}
if store.msgProvider == nil {
store.log.Info("Store protocol started (no message provider)")
return nil
}
err := store.msgProvider.Start(store.timesource)
if err != nil {
store.log.Error("Error starting message provider", zap.Error(err))
return nil
}
store.started = true
store.ctx = ctx
store.MsgC = make(chan *protocol.Envelope, 1024)
store.h.SetStreamHandlerMatch(StoreID_v20beta4, protocol.PrefixTextMatch(string(StoreID_v20beta4)), store.onRequest)
store.wg.Add(2)
go store.storeIncomingMessages(ctx)
go store.updateMetrics(ctx)
store.log.Info("Store protocol started")
return nil
}
func (store *WakuStore) storeMessage(env *protocol.Envelope) error {
// Ensure that messages don't "jump" to the front of the queue with future timestamps
if env.Index().SenderTime-env.Index().ReceiverTime > int64(MaxTimeVariance) {
return ErrFutureMessage
}
if env.Message().Ephemeral {
return nil
}
err := store.msgProvider.Put(env)
if err != nil {
store.log.Error("storing message", zap.Error(err))
metrics.RecordStoreError(store.ctx, "store_failure")
return err
}
return nil
}
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
defer store.wg.Done()
for envelope := range store.MsgC {
go func(env *protocol.Envelope) {
_ = store.storeMessage(env)
}(envelope)
}
}
func (store *WakuStore) updateMetrics(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
defer store.wg.Done()
for {
select {
case <-ticker.C:
msgCount, err := store.msgProvider.Count()
if err != nil {
store.log.Error("updating store metrics", zap.Error(err))
} else {
metrics.RecordMessage(store.ctx, "stored", msgCount)
}
case <-store.quit:
return
}
}
}
func (store *WakuStore) onRequest(s network.Stream) {
defer s.Close()
logger := store.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
historyRPCRequest := &pb.HistoryRPC{}
writer := protoio.NewDelimitedWriter(s)
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
err := reader.ReadMsg(historyRPCRequest)
if err != nil {
logger.Error("reading request", zap.Error(err))
metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
return
}
logger = logger.With(zap.String("id", historyRPCRequest.RequestId))
if query := historyRPCRequest.Query; query != nil {
logger = logger.With(logging.Filters(query.GetContentFilters()))
} else {
logger.Error("reading request", zap.Error(err))
metrics.RecordStoreError(store.ctx, "emptyRpcQueryFailure")
return
}
logger.Info("received history query")
metrics.RecordStoreQuery(store.ctx)
historyResponseRPC := &pb.HistoryRPC{}
historyResponseRPC.RequestId = historyRPCRequest.RequestId
historyResponseRPC.Response = store.FindMessages(historyRPCRequest.Query)
logger = logger.With(zap.Int("messages", len(historyResponseRPC.Response.Messages)))
err = writer.WriteMsg(historyResponseRPC)
if err != nil {
logger.Error("writing response", zap.Error(err), logging.PagingInfo(historyResponseRPC.Response.PagingInfo))
_ = s.Reset()
} else {
logger.Info("response sent")
}
}
func (store *WakuStore) MessageChannel() chan *protocol.Envelope {
return store.MsgC
}
// TODO: queryWithAccounting
// Stop closes the store message channel and removes the protocol stream handler
func (store *WakuStore) Stop() {
store.started = false
if store.MsgC != nil {
close(store.MsgC)
}
if store.msgProvider != nil {
store.msgProvider.Stop()
store.quit <- struct{}{}
}
if store.h != nil {
store.h.RemoveStreamHandler(StoreID_v20beta4)
}
store.wg.Wait()
}
func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, candidateList []peer.ID) ([]*pb.WakuMessage, error) {
// loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully
// returns the number of retrieved messages, or error if all the requests fail
queryWg := sync.WaitGroup{}
queryWg.Add(len(candidateList))
resultChan := make(chan *pb.HistoryResponse, len(candidateList))
for _, peer := range candidateList {
func() {
defer queryWg.Done()
result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId())
if err == nil {
resultChan <- result
return
}
store.log.Error("resuming history", logging.HostID("peer", peer), zap.Error(err))
}()
}
queryWg.Wait()
close(resultChan)
var messages []*pb.WakuMessage
hasResults := false
for result := range resultChan {
hasResults = true
messages = append(messages, result.Messages...)
}
if hasResults {
return messages, nil
}
return nil, ErrFailedQuery
}
func (store *WakuStore) findLastSeen() (int64, error) {
return store.msgProvider.MostRecentTimestamp()
}
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
// Resume retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
// messages are stored in the store node's messages field and in the message db
// the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
// an offset of 20 second is added to the time window to count for nodes asynchrony
// the history is fetched from one of the peers persisted in the waku store node's peer manager unit
// peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
// if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window.
// the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) {
if !store.started {
return 0, errors.New("can't resume: store has not started")
}
lastSeenTime, err := store.findLastSeen()
if err != nil {
return 0, err
}
var offset int64 = int64(20 * time.Nanosecond)
currentTime := store.timesource.Now().UnixNano() + offset
lastSeenTime = max(lastSeenTime-offset, 0)
rpc := &pb.HistoryQuery{
PubsubTopic: pubsubTopic,
StartTime: lastSeenTime,
EndTime: currentTime,
PagingInfo: &pb.PagingInfo{
PageSize: 0,
Direction: pb.PagingInfo_BACKWARD,
},
}
if len(peerList) == 0 {
return -1, ErrNoPeersAvailable
}
messages, err := store.queryLoop(ctx, rpc, peerList)
if err != nil {
store.log.Error("resuming history", zap.Error(err))
return -1, ErrFailedToResumeHistory
}
msgCount := 0
for _, msg := range messages {
if err = store.storeMessage(protocol.NewEnvelope(msg, store.timesource.Now().UnixNano(), pubsubTopic)); err == nil {
msgCount++
}
}
store.log.Info("retrieved messages since the last online time", zap.Int("messages", len(messages)))
return msgCount, nil
}

View File

@ -0,0 +1,203 @@
package timesource
import (
"bytes"
"errors"
"sort"
"sync"
"time"
"github.com/beevik/ntp"
"go.uber.org/zap"
)
const (
// DefaultMaxAllowedFailures defines how many failures will be tolerated.
DefaultMaxAllowedFailures = 1
// FastNTPSyncPeriod period between ntp synchronizations before the first
// successful connection.
FastNTPSyncPeriod = 2 * time.Minute
// SlowNTPSyncPeriod period between ntp synchronizations after the first
// successful connection.
SlowNTPSyncPeriod = 1 * time.Hour
// DefaultRPCTimeout defines write deadline for single ntp server request.
DefaultRPCTimeout = 2 * time.Second
)
// DefaultServers will be resolved to the closest available,
// and with high probability resolved to the different IPs
var DefaultServers = []string{
"0.pool.ntp.org",
"1.pool.ntp.org",
"2.pool.ntp.org",
"3.pool.ntp.org",
}
var errUpdateOffset = errors.New("failed to compute offset")
type ntpQuery func(string, ntp.QueryOptions) (*ntp.Response, error)
type queryResponse struct {
Offset time.Duration
Error error
}
type multiRPCError []error
func (e multiRPCError) Error() string {
var b bytes.Buffer
b.WriteString("RPC failed: ")
more := false
for _, err := range e {
if more {
b.WriteString("; ")
}
b.WriteString(err.Error())
more = true
}
b.WriteString(".")
return b.String()
}
func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (time.Duration, error) {
if len(servers) == 0 {
return 0, nil
}
responses := make(chan queryResponse, len(servers))
for _, server := range servers {
go func(server string) {
response, err := timeQuery(server, ntp.QueryOptions{
Timeout: DefaultRPCTimeout,
})
if err == nil {
err = response.Validate()
}
if err != nil {
responses <- queryResponse{Error: err}
return
}
responses <- queryResponse{Offset: response.ClockOffset}
}(server)
}
var (
rpcErrors multiRPCError
offsets []time.Duration
collected int
)
for response := range responses {
if response.Error != nil {
rpcErrors = append(rpcErrors, response.Error)
} else {
offsets = append(offsets, response.Offset)
}
collected++
if collected == len(servers) {
break
}
}
if lth := len(rpcErrors); lth > allowedFailures {
return 0, rpcErrors
} else if lth == len(servers) {
return 0, rpcErrors
}
sort.SliceStable(offsets, func(i, j int) bool {
return offsets[i] > offsets[j]
})
mid := len(offsets) / 2
if len(offsets)%2 == 0 {
return (offsets[mid-1] + offsets[mid]) / 2, nil
}
return offsets[mid], nil
}
func NewNTPTimesource(ntpServers []string, log *zap.Logger) *NTPTimeSource {
return &NTPTimeSource{
servers: ntpServers,
allowedFailures: DefaultMaxAllowedFailures,
fastNTPSyncPeriod: FastNTPSyncPeriod,
slowNTPSyncPeriod: SlowNTPSyncPeriod,
timeQuery: ntp.QueryWithOptions,
log: log.Named("timesource"),
}
}
// NTPTimeSource provides source of time that tries to be resistant to time skews.
// It does so by periodically querying time offset from ntp servers.
type NTPTimeSource struct {
servers []string
allowedFailures int
fastNTPSyncPeriod time.Duration
slowNTPSyncPeriod time.Duration
timeQuery ntpQuery // for ease of testing
log *zap.Logger
quit chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
latestOffset time.Duration
}
// Now returns time adjusted by latest known offset
func (s *NTPTimeSource) Now() time.Time {
s.mu.RLock()
defer s.mu.RUnlock()
return time.Now().Add(s.latestOffset)
}
func (s *NTPTimeSource) updateOffset() error {
offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures)
if err != nil {
s.log.Error("failed to compute offset", zap.Error(err))
return errUpdateOffset
}
s.log.Info("Difference with ntp servers", zap.Duration("offset", offset))
s.mu.Lock()
s.latestOffset = offset
s.mu.Unlock()
return nil
}
// runPeriodically runs periodically the given function based on NTPTimeSource
// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod)
func (s *NTPTimeSource) runPeriodically(fn func() error) error {
var period time.Duration
s.quit = make(chan struct{})
// we try to do it synchronously so that user can have reliable messages right away
s.wg.Add(1)
go func() {
for {
select {
case <-time.After(period):
if err := fn(); err == nil {
period = s.slowNTPSyncPeriod
} else if period != s.slowNTPSyncPeriod {
period = s.fastNTPSyncPeriod
}
case <-s.quit:
s.wg.Done()
return
}
}
}()
return nil
}
// Start runs a goroutine that updates local offset every updatePeriod.
func (s *NTPTimeSource) Start() error {
return s.runPeriodically(s.updateOffset)
}
// Stop goroutine that updates time source.
func (s *NTPTimeSource) Stop() error {
if s.quit == nil {
return nil
}
close(s.quit)
s.wg.Wait()
return nil
}

View File

@ -0,0 +1,9 @@
package timesource
import "time"
type Timesource interface {
Now() time.Time
Start() error
Stop() error
}

View File

@ -0,0 +1,24 @@
package timesource
import "time"
type WallClockTimeSource struct {
}
func NewDefaultClock() *WallClockTimeSource {
return &WallClockTimeSource{}
}
func (t *WallClockTimeSource) Now() time.Time {
return time.Now()
}
func (t *WallClockTimeSource) Start() error {
// Do nothing
return nil
}
func (t *WallClockTimeSource) Stop() error {
// Do nothing
return nil
}

View File

@ -0,0 +1,38 @@
package utils
import (
"time"
"github.com/beevik/ntp"
)
var NTPServer = "pool.ntp.org"
func GetNTPTime() (time.Time, error) {
t, err := ntp.Time(NTPServer)
if err != nil {
return t, err
}
return t, nil
}
func GetNTPMetadata() (*ntp.Response, error) {
options := ntp.QueryOptions{Timeout: 60 * time.Second, TTL: 10}
response, err := ntp.QueryWithOptions(NTPServer, options)
if err != nil {
return nil, err
}
return response, nil
}
func GetTimeOffset() (time.Duration, error) {
options := ntp.QueryOptions{Timeout: 60 * time.Second, TTL: 10}
response, err := ntp.QueryWithOptions(NTPServer, options)
if err != nil {
return 0, err
}
return response.ClockOffset, nil
}

View File

@ -1,14 +1,25 @@
package utils
import "time"
import (
"time"
)
// GetUnixEpochFrom converts a time into a unix timestamp with nanoseconds
func GetUnixEpochFrom(now time.Time) int64 {
return now.UnixNano()
}
// GetUnixEpoch returns the current time in unix timestamp with the integer part
// representing seconds and the decimal part representing subseconds
func GetUnixEpoch() int64 {
return GetUnixEpochFrom(time.Now())
type Timesource interface {
Now() time.Time
}
// GetUnixEpoch returns the current time in unix timestamp with the integer part
// representing seconds and the decimal part representing subseconds.
// Optionally receives a timesource to obtain the time from
func GetUnixEpoch(timesource ...Timesource) int64 {
if len(timesource) != 0 {
return GetUnixEpochFrom(timesource[0].Now())
} else {
return GetUnixEpochFrom(time.Now())
}
}

7
vendor/modules.txt vendored
View File

@ -107,7 +107,7 @@ github.com/anacrolix/utp
# github.com/andybalholm/cascadia v1.2.0
## explicit; go 1.13
github.com/andybalholm/cascadia
# github.com/beevik/ntp v0.2.0
# github.com/beevik/ntp v0.3.0
## explicit
github.com/beevik/ntp
# github.com/benbjohnson/clock v1.3.0
@ -985,12 +985,12 @@ github.com/vacp2p/mvds/state/migrations
github.com/vacp2p/mvds/store
github.com/vacp2p/mvds/store/migrations
github.com/vacp2p/mvds/transport
# github.com/waku-org/go-discover v0.0.0-20221027130446-2f43d5f6c73f
# github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98
## explicit; go 1.15
github.com/waku-org/go-discover/discover
github.com/waku-org/go-discover/discover/v4wire
github.com/waku-org/go-discover/discover/v5wire
# github.com/waku-org/go-waku v0.2.3-0.20221205192014-05e33105c43f
# github.com/waku-org/go-waku v0.2.3-test.0.20221209175307-685142e7b743
## explicit; go 1.18
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/waku/persistence
@ -1012,6 +1012,7 @@ github.com/waku-org/go-waku/waku/v2/protocol/rln
github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts
github.com/waku-org/go-waku/waku/v2/protocol/store
github.com/waku-org/go-waku/waku/v2/protocol/swap
github.com/waku-org/go-waku/waku/v2/timesource
github.com/waku-org/go-waku/waku/v2/utils
# github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg
## explicit; go 1.18

View File

@ -11,6 +11,7 @@ import (
gowakuPersistence "github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
@ -66,15 +67,19 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
}
}
err := result.cleanOlderRecords()
return result, nil
}
func (d *DBStore) Start(timesource timesource.Timesource) error {
err := d.cleanOlderRecords()
if err != nil {
return nil, err
return err
}
result.wg.Add(1)
go result.checkForOlderRecords(10 * time.Second) // is 10s okay?
d.wg.Add(1)
go d.checkForOlderRecords(60 * time.Second)
return result, nil
return nil
}
func (d *DBStore) cleanOlderRecords() error {

View File

@ -248,7 +248,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
}
if cfg.EnableStore {
opts = append(opts, node.WithWakuStore(true, true))
opts = append(opts, node.WithWakuStore(true, nil))
dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(appDB), persistence.WithRetentionPolicy(cfg.StoreCapacity, time.Duration(cfg.StoreSeconds)*time.Second))
if err != nil {
return nil, err
@ -1277,7 +1277,7 @@ func (w *Waku) AddStorePeer(address string) (string, error) {
if err != nil {
return "", err
}
return string(*peerID), nil
return string(peerID), nil
}
func (w *Waku) timestamp() int64 {
@ -1340,7 +1340,7 @@ func (w *Waku) AddRelayPeer(address string) (string, error) {
if err != nil {
return "", err
}
return string(*peerID), nil
return string(peerID), nil
}
func (w *Waku) DialPeer(address string) error {