2
0
mirror of https://github.com/status-im/status-go.git synced 2025-01-11 07:07:24 +00:00

feat: add dns discovery to wakuv2 ()

This commit is contained in:
Richard Ramos 2021-11-09 08:22:34 -04:00 committed by GitHub
parent dbac362bc7
commit f47229a466
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 2833 additions and 422 deletions

2
go.mod

@ -49,7 +49,7 @@ require (
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
github.com/status-im/doubleratchet v3.0.0+incompatible
github.com/status-im/go-waku v0.0.0-20211101173358-268767262b60
github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432
github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6
github.com/status-im/migrate/v4 v4.6.2-status.2

5
go.sum

@ -432,6 +432,7 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
github.com/gorilla/mux v1.7.1/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
@ -1206,8 +1207,8 @@ github.com/status-im/go-ethereum v1.10.4-status.3 h1:RF618iSCvqJtXu3ZSg7XNg6MJaS
github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE=
github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE=
github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU=
github.com/status-im/go-waku v0.0.0-20211101173358-268767262b60 h1:ymq5jgtepOPqNbs8yA9g2hkFn5811snImNqNbQzpcDA=
github.com/status-im/go-waku v0.0.0-20211101173358-268767262b60/go.mod h1:A0lI3uZYLKrXiviVkwGgBdT8b9HLcW3U/xUcE/4665k=
github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d h1:mWatRmDv+xopBdnd4SYj6I4mrqS0fTgMy2Q0r79LD0w=
github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=

@ -0,0 +1,392 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package dnsdisc
import (
"bytes"
"context"
"fmt"
"math/rand"
"net"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
lru "github.com/hashicorp/golang-lru"
"golang.org/x/sync/singleflight"
"golang.org/x/time/rate"
)
// Client discovers nodes by querying DNS servers.
type Client struct {
cfg Config
clock mclock.Clock
entries *lru.Cache
ratelimit *rate.Limiter
singleflight singleflight.Group
}
// Config holds configuration options for the client.
type Config struct {
Timeout time.Duration // timeout used for DNS lookups (default 5s)
RecheckInterval time.Duration // time between tree root update checks (default 30min)
CacheLimit int // maximum number of cached records (default 1000)
RateLimit float64 // maximum DNS requests / second (default 3)
ValidSchemes enr.IdentityScheme // acceptable ENR identity schemes (default enode.ValidSchemes)
Resolver Resolver // the DNS resolver to use (defaults to system DNS)
Logger log.Logger // destination of client log messages (defaults to root logger)
}
// Resolver is a DNS resolver that can query TXT records.
type Resolver interface {
LookupTXT(ctx context.Context, domain string) ([]string, error)
}
func (cfg Config) withDefaults() Config {
const (
defaultTimeout = 5 * time.Second
defaultRecheck = 30 * time.Minute
defaultRateLimit = 3
defaultCache = 1000
)
if cfg.Timeout == 0 {
cfg.Timeout = defaultTimeout
}
if cfg.RecheckInterval == 0 {
cfg.RecheckInterval = defaultRecheck
}
if cfg.CacheLimit == 0 {
cfg.CacheLimit = defaultCache
}
if cfg.RateLimit == 0 {
cfg.RateLimit = defaultRateLimit
}
if cfg.ValidSchemes == nil {
cfg.ValidSchemes = enode.ValidSchemes
}
if cfg.Resolver == nil {
cfg.Resolver = new(net.Resolver)
}
if cfg.Logger == nil {
cfg.Logger = log.Root()
}
return cfg
}
// NewClient creates a client.
func NewClient(cfg Config) *Client {
cfg = cfg.withDefaults()
cache, err := lru.New(cfg.CacheLimit)
if err != nil {
panic(err)
}
rlimit := rate.NewLimiter(rate.Limit(cfg.RateLimit), 10)
return &Client{
cfg: cfg,
entries: cache,
clock: mclock.System{},
ratelimit: rlimit,
}
}
// SyncTree downloads the entire node tree at the given URL.
func (c *Client) SyncTree(url string) (*Tree, error) {
le, err := parseLink(url)
if err != nil {
return nil, fmt.Errorf("invalid enrtree URL: %v", err)
}
ct := newClientTree(c, new(linkCache), le)
t := &Tree{entries: make(map[string]entry)}
if err := ct.syncAll(t.entries); err != nil {
return nil, err
}
t.root = ct.root
return t, nil
}
// NewIterator creates an iterator that visits all nodes at the
// given tree URLs.
func (c *Client) NewIterator(urls ...string) (enode.Iterator, error) {
it := c.newRandomIterator()
for _, url := range urls {
if err := it.addTree(url); err != nil {
return nil, err
}
}
return it, nil
}
// resolveRoot retrieves a root entry via DNS.
func (c *Client) resolveRoot(ctx context.Context, loc *linkEntry) (rootEntry, error) {
e, err, _ := c.singleflight.Do(loc.str, func() (interface{}, error) {
txts, err := c.cfg.Resolver.LookupTXT(ctx, loc.domain)
c.cfg.Logger.Trace("Updating DNS discovery root", "tree", loc.domain, "err", err)
if err != nil {
return rootEntry{}, err
}
for _, txt := range txts {
if strings.HasPrefix(txt, rootPrefix) {
return parseAndVerifyRoot(txt, loc)
}
}
return rootEntry{}, nameError{loc.domain, errNoRoot}
})
return e.(rootEntry), err
}
func parseAndVerifyRoot(txt string, loc *linkEntry) (rootEntry, error) {
e, err := parseRoot(txt)
if err != nil {
return e, err
}
if !e.verifySignature(loc.pubkey) {
return e, entryError{typ: "root", err: errInvalidSig}
}
return e, nil
}
// resolveEntry retrieves an entry from the cache or fetches it from the network
// if it isn't cached.
func (c *Client) resolveEntry(ctx context.Context, domain, hash string) (entry, error) {
// The rate limit always applies, even when the result might be cached. This is
// important because it avoids hot-spinning in consumers of node iterators created on
// this client.
if err := c.ratelimit.Wait(ctx); err != nil {
return nil, err
}
cacheKey := truncateHash(hash)
if e, ok := c.entries.Get(cacheKey); ok {
return e.(entry), nil
}
ei, err, _ := c.singleflight.Do(cacheKey, func() (interface{}, error) {
e, err := c.doResolveEntry(ctx, domain, hash)
if err != nil {
return nil, err
}
c.entries.Add(cacheKey, e)
return e, nil
})
e, _ := ei.(entry)
return e, err
}
// doResolveEntry fetches an entry via DNS.
func (c *Client) doResolveEntry(ctx context.Context, domain, hash string) (entry, error) {
wantHash, err := b32format.DecodeString(hash)
if err != nil {
return nil, fmt.Errorf("invalid base32 hash")
}
name := hash + "." + domain
txts, err := c.cfg.Resolver.LookupTXT(ctx, hash+"."+domain)
c.cfg.Logger.Trace("DNS discovery lookup", "name", name, "err", err)
if err != nil {
return nil, err
}
for _, txt := range txts {
e, err := parseEntry(txt, c.cfg.ValidSchemes)
if err == errUnknownEntry {
continue
}
if !bytes.HasPrefix(crypto.Keccak256([]byte(txt)), wantHash) {
err = nameError{name, errHashMismatch}
} else if err != nil {
err = nameError{name, err}
}
return e, err
}
return nil, nameError{name, errNoEntry}
}
// randomIterator traverses a set of trees and returns nodes found in them.
type randomIterator struct {
cur *enode.Node
ctx context.Context
cancelFn context.CancelFunc
c *Client
mu sync.Mutex
lc linkCache // tracks tree dependencies
trees map[string]*clientTree // all trees
// buffers for syncableTrees
syncableList []*clientTree
disabledList []*clientTree
}
func (c *Client) newRandomIterator() *randomIterator {
ctx, cancel := context.WithCancel(context.Background())
return &randomIterator{
c: c,
ctx: ctx,
cancelFn: cancel,
trees: make(map[string]*clientTree),
}
}
// Node returns the current node.
func (it *randomIterator) Node() *enode.Node {
return it.cur
}
// Close closes the iterator.
func (it *randomIterator) Close() {
it.cancelFn()
it.mu.Lock()
defer it.mu.Unlock()
it.trees = nil
}
// Next moves the iterator to the next node.
func (it *randomIterator) Next() bool {
it.cur = it.nextNode()
return it.cur != nil
}
// addTree adds an enrtree:// URL to the iterator.
func (it *randomIterator) addTree(url string) error {
le, err := parseLink(url)
if err != nil {
return fmt.Errorf("invalid enrtree URL: %v", err)
}
it.lc.addLink("", le.str)
return nil
}
// nextNode syncs random tree entries until it finds a node.
func (it *randomIterator) nextNode() *enode.Node {
for {
ct := it.pickTree()
if ct == nil {
return nil
}
n, err := ct.syncRandom(it.ctx)
if err != nil {
if err == it.ctx.Err() {
return nil // context canceled.
}
it.c.cfg.Logger.Debug("Error in DNS random node sync", "tree", ct.loc.domain, "err", err)
continue
}
if n != nil {
return n
}
}
}
// pickTree returns a random tree to sync from.
func (it *randomIterator) pickTree() *clientTree {
it.mu.Lock()
defer it.mu.Unlock()
// First check if iterator was closed.
// Need to do this here to avoid nil map access in rebuildTrees.
if it.trees == nil {
return nil
}
// Rebuild the trees map if any links have changed.
if it.lc.changed {
it.rebuildTrees()
it.lc.changed = false
}
for {
canSync, trees := it.syncableTrees()
switch {
case canSync:
// Pick a random tree.
return trees[rand.Intn(len(trees))]
case len(trees) > 0:
// No sync action can be performed on any tree right now. The only meaningful
// thing to do is waiting for any root record to get updated.
if !it.waitForRootUpdates(trees) {
// Iterator was closed while waiting.
return nil
}
default:
// There are no trees left, the iterator was closed.
return nil
}
}
}
// syncableTrees finds trees on which any meaningful sync action can be performed.
func (it *randomIterator) syncableTrees() (canSync bool, trees []*clientTree) {
// Resize tree lists.
it.syncableList = it.syncableList[:0]
it.disabledList = it.disabledList[:0]
// Partition them into the two lists.
for _, ct := range it.trees {
if ct.canSyncRandom() {
it.syncableList = append(it.syncableList, ct)
} else {
it.disabledList = append(it.disabledList, ct)
}
}
if len(it.syncableList) > 0 {
return true, it.syncableList
}
return false, it.disabledList
}
// waitForRootUpdates waits for the closest scheduled root check time on the given trees.
func (it *randomIterator) waitForRootUpdates(trees []*clientTree) bool {
var minTree *clientTree
var nextCheck mclock.AbsTime
for _, ct := range trees {
check := ct.nextScheduledRootCheck()
if minTree == nil || check < nextCheck {
minTree = ct
nextCheck = check
}
}
sleep := nextCheck.Sub(it.c.clock.Now())
it.c.cfg.Logger.Debug("DNS iterator waiting for root updates", "sleep", sleep, "tree", minTree.loc.domain)
timeout := it.c.clock.NewTimer(sleep)
defer timeout.Stop()
select {
case <-timeout.C():
return true
case <-it.ctx.Done():
return false // Iterator was closed.
}
}
// rebuildTrees rebuilds the 'trees' map.
func (it *randomIterator) rebuildTrees() {
// Delete removed trees.
for loc := range it.trees {
if !it.lc.isReferenced(loc) {
delete(it.trees, loc)
}
}
// Add new trees.
for loc := range it.lc.backrefs {
if it.trees[loc] == nil {
link, _ := parseLink(linkPrefix + loc)
it.trees[loc] = newClientTree(it.c, &it.lc, link)
}
}
}

@ -0,0 +1,18 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package dnsdisc implements node discovery via DNS (EIP-1459).
package dnsdisc

@ -0,0 +1,63 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package dnsdisc
import (
"errors"
"fmt"
)
// Entry parse errors.
var (
errUnknownEntry = errors.New("unknown entry type")
errNoPubkey = errors.New("missing public key")
errBadPubkey = errors.New("invalid public key")
errInvalidENR = errors.New("invalid node record")
errInvalidChild = errors.New("invalid child hash")
errInvalidSig = errors.New("invalid base64 signature")
errSyntax = errors.New("invalid syntax")
)
// Resolver/sync errors
var (
errNoRoot = errors.New("no valid root found")
errNoEntry = errors.New("no valid tree entry found")
errHashMismatch = errors.New("hash mismatch")
errENRInLinkTree = errors.New("enr entry in link tree")
errLinkInENRTree = errors.New("link entry in ENR tree")
)
type nameError struct {
name string
err error
}
func (err nameError) Error() string {
if ee, ok := err.err.(entryError); ok {
return fmt.Sprintf("invalid %s entry at %s: %v", ee.typ, err.name, ee.err)
}
return err.name + ": " + err.err.Error()
}
type entryError struct {
typ string
err error
}
func (err entryError) Error() string {
return fmt.Sprintf("invalid %s entry: %v", err.typ, err.err)
}

@ -0,0 +1,329 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package dnsdisc
import (
"context"
"math/rand"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/p2p/enode"
)
// This is the number of consecutive leaf requests that may fail before
// we consider re-resolving the tree root.
const rootRecheckFailCount = 5
// clientTree is a full tree being synced.
type clientTree struct {
c *Client
loc *linkEntry // link to this tree
lastRootCheck mclock.AbsTime // last revalidation of root
leafFailCount int
rootFailCount int
root *rootEntry
enrs *subtreeSync
links *subtreeSync
lc *linkCache // tracks all links between all trees
curLinks map[string]struct{} // links contained in this tree
linkGCRoot string // root on which last link GC has run
}
func newClientTree(c *Client, lc *linkCache, loc *linkEntry) *clientTree {
return &clientTree{c: c, lc: lc, loc: loc}
}
// syncAll retrieves all entries of the tree.
func (ct *clientTree) syncAll(dest map[string]entry) error {
if err := ct.updateRoot(context.Background()); err != nil {
return err
}
if err := ct.links.resolveAll(dest); err != nil {
return err
}
if err := ct.enrs.resolveAll(dest); err != nil {
return err
}
return nil
}
// syncRandom retrieves a single entry of the tree. The Node return value
// is non-nil if the entry was a node.
func (ct *clientTree) syncRandom(ctx context.Context) (n *enode.Node, err error) {
if ct.rootUpdateDue() {
if err := ct.updateRoot(ctx); err != nil {
return nil, err
}
}
// Update fail counter for leaf request errors.
defer func() {
if err != nil {
ct.leafFailCount++
}
}()
// Link tree sync has priority, run it to completion before syncing ENRs.
if !ct.links.done() {
err := ct.syncNextLink(ctx)
return nil, err
}
ct.gcLinks()
// Sync next random entry in ENR tree. Once every node has been visited, we simply
// start over. This is fine because entries are cached internally by the client LRU
// also by DNS resolvers.
if ct.enrs.done() {
ct.enrs = newSubtreeSync(ct.c, ct.loc, ct.root.eroot, false)
}
return ct.syncNextRandomENR(ctx)
}
// canSyncRandom checks if any meaningful action can be performed by syncRandom.
func (ct *clientTree) canSyncRandom() bool {
// Note: the check for non-zero leaf count is very important here.
// If we're done syncing all nodes, and no leaves were found, the tree
// is empty and we can't use it for sync.
return ct.rootUpdateDue() || !ct.links.done() || !ct.enrs.done() || ct.enrs.leaves != 0
}
// gcLinks removes outdated links from the global link cache. GC runs once
// when the link sync finishes.
func (ct *clientTree) gcLinks() {
if !ct.links.done() || ct.root.lroot == ct.linkGCRoot {
return
}
ct.lc.resetLinks(ct.loc.str, ct.curLinks)
ct.linkGCRoot = ct.root.lroot
}
func (ct *clientTree) syncNextLink(ctx context.Context) error {
hash := ct.links.missing[0]
e, err := ct.links.resolveNext(ctx, hash)
if err != nil {
return err
}
ct.links.missing = ct.links.missing[1:]
if dest, ok := e.(*linkEntry); ok {
ct.lc.addLink(ct.loc.str, dest.str)
ct.curLinks[dest.str] = struct{}{}
}
return nil
}
func (ct *clientTree) syncNextRandomENR(ctx context.Context) (*enode.Node, error) {
index := rand.Intn(len(ct.enrs.missing))
hash := ct.enrs.missing[index]
e, err := ct.enrs.resolveNext(ctx, hash)
if err != nil {
return nil, err
}
ct.enrs.missing = removeHash(ct.enrs.missing, index)
if ee, ok := e.(*enrEntry); ok {
return ee.node, nil
}
return nil, nil
}
func (ct *clientTree) String() string {
return ct.loc.String()
}
// removeHash removes the element at index from h.
func removeHash(h []string, index int) []string {
if len(h) == 1 {
return nil
}
last := len(h) - 1
if index < last {
h[index] = h[last]
h[last] = ""
}
return h[:last]
}
// updateRoot ensures that the given tree has an up-to-date root.
func (ct *clientTree) updateRoot(ctx context.Context) error {
if !ct.slowdownRootUpdate(ctx) {
return ctx.Err()
}
ct.lastRootCheck = ct.c.clock.Now()
ctx, cancel := context.WithTimeout(ctx, ct.c.cfg.Timeout)
defer cancel()
root, err := ct.c.resolveRoot(ctx, ct.loc)
if err != nil {
ct.rootFailCount++
return err
}
ct.root = &root
ct.rootFailCount = 0
ct.leafFailCount = 0
// Invalidate subtrees if changed.
if ct.links == nil || root.lroot != ct.links.root {
ct.links = newSubtreeSync(ct.c, ct.loc, root.lroot, true)
ct.curLinks = make(map[string]struct{})
}
if ct.enrs == nil || root.eroot != ct.enrs.root {
ct.enrs = newSubtreeSync(ct.c, ct.loc, root.eroot, false)
}
return nil
}
// rootUpdateDue returns true when a root update is needed.
func (ct *clientTree) rootUpdateDue() bool {
tooManyFailures := ct.leafFailCount > rootRecheckFailCount
scheduledCheck := ct.c.clock.Now() >= ct.nextScheduledRootCheck()
return ct.root == nil || tooManyFailures || scheduledCheck
}
func (ct *clientTree) nextScheduledRootCheck() mclock.AbsTime {
return ct.lastRootCheck.Add(ct.c.cfg.RecheckInterval)
}
// slowdownRootUpdate applies a delay to root resolution if is tried
// too frequently. This avoids busy polling when the client is offline.
// Returns true if the timeout passed, false if sync was canceled.
func (ct *clientTree) slowdownRootUpdate(ctx context.Context) bool {
var delay time.Duration
switch {
case ct.rootFailCount > 20:
delay = 10 * time.Second
case ct.rootFailCount > 5:
delay = 5 * time.Second
default:
return true
}
timeout := ct.c.clock.NewTimer(delay)
defer timeout.Stop()
select {
case <-timeout.C():
return true
case <-ctx.Done():
return false
}
}
// subtreeSync is the sync of an ENR or link subtree.
type subtreeSync struct {
c *Client
loc *linkEntry
root string
missing []string // missing tree node hashes
link bool // true if this sync is for the link tree
leaves int // counter of synced leaves
}
func newSubtreeSync(c *Client, loc *linkEntry, root string, link bool) *subtreeSync {
return &subtreeSync{c, loc, root, []string{root}, link, 0}
}
func (ts *subtreeSync) done() bool {
return len(ts.missing) == 0
}
func (ts *subtreeSync) resolveAll(dest map[string]entry) error {
for !ts.done() {
hash := ts.missing[0]
ctx, cancel := context.WithTimeout(context.Background(), ts.c.cfg.Timeout)
e, err := ts.resolveNext(ctx, hash)
cancel()
if err != nil {
return err
}
dest[hash] = e
ts.missing = ts.missing[1:]
}
return nil
}
func (ts *subtreeSync) resolveNext(ctx context.Context, hash string) (entry, error) {
e, err := ts.c.resolveEntry(ctx, ts.loc.domain, hash)
if err != nil {
return nil, err
}
switch e := e.(type) {
case *enrEntry:
if ts.link {
return nil, errENRInLinkTree
}
ts.leaves++
case *linkEntry:
if !ts.link {
return nil, errLinkInENRTree
}
ts.leaves++
case *branchEntry:
ts.missing = append(ts.missing, e.children...)
}
return e, nil
}
// linkCache tracks links between trees.
type linkCache struct {
backrefs map[string]map[string]struct{}
changed bool
}
func (lc *linkCache) isReferenced(r string) bool {
return len(lc.backrefs[r]) != 0
}
func (lc *linkCache) addLink(from, to string) {
if _, ok := lc.backrefs[to][from]; ok {
return
}
if lc.backrefs == nil {
lc.backrefs = make(map[string]map[string]struct{})
}
if _, ok := lc.backrefs[to]; !ok {
lc.backrefs[to] = make(map[string]struct{})
}
lc.backrefs[to][from] = struct{}{}
lc.changed = true
}
// resetLinks clears all links of the given tree.
func (lc *linkCache) resetLinks(from string, keep map[string]struct{}) {
stk := []string{from}
for len(stk) > 0 {
item := stk[len(stk)-1]
stk = stk[:len(stk)-1]
for r, refs := range lc.backrefs {
if _, ok := keep[r]; ok {
continue
}
if _, ok := refs[item]; !ok {
continue
}
lc.changed = true
delete(refs, item)
if len(refs) == 0 {
delete(lc.backrefs, r)
stk = append(stk, r)
}
}
}
}

@ -0,0 +1,423 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package dnsdisc
import (
"bytes"
"crypto/ecdsa"
"encoding/base32"
"encoding/base64"
"fmt"
"io"
"sort"
"strings"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rlp"
"golang.org/x/crypto/sha3"
)
// Tree is a merkle tree of node records.
type Tree struct {
root *rootEntry
entries map[string]entry
}
// Sign signs the tree with the given private key and sets the sequence number.
func (t *Tree) Sign(key *ecdsa.PrivateKey, domain string) (url string, err error) {
root := *t.root
sig, err := crypto.Sign(root.sigHash(), key)
if err != nil {
return "", err
}
root.sig = sig
t.root = &root
link := newLinkEntry(domain, &key.PublicKey)
return link.String(), nil
}
// SetSignature verifies the given signature and assigns it as the tree's current
// signature if valid.
func (t *Tree) SetSignature(pubkey *ecdsa.PublicKey, signature string) error {
sig, err := b64format.DecodeString(signature)
if err != nil || len(sig) != crypto.SignatureLength {
return errInvalidSig
}
root := *t.root
root.sig = sig
if !root.verifySignature(pubkey) {
return errInvalidSig
}
t.root = &root
return nil
}
// Seq returns the sequence number of the tree.
func (t *Tree) Seq() uint {
return t.root.seq
}
// Signature returns the signature of the tree.
func (t *Tree) Signature() string {
return b64format.EncodeToString(t.root.sig)
}
// ToTXT returns all DNS TXT records required for the tree.
func (t *Tree) ToTXT(domain string) map[string]string {
records := map[string]string{domain: t.root.String()}
for _, e := range t.entries {
sd := subdomain(e)
if domain != "" {
sd = sd + "." + domain
}
records[sd] = e.String()
}
return records
}
// Links returns all links contained in the tree.
func (t *Tree) Links() []string {
var links []string
for _, e := range t.entries {
if le, ok := e.(*linkEntry); ok {
links = append(links, le.String())
}
}
return links
}
// Nodes returns all nodes contained in the tree.
func (t *Tree) Nodes() []*enode.Node {
var nodes []*enode.Node
for _, e := range t.entries {
if ee, ok := e.(*enrEntry); ok {
nodes = append(nodes, ee.node)
}
}
return nodes
}
/*
We want to keep the UDP size below 512 bytes. The UDP size is roughly:
UDP length = 8 + UDP payload length ( 229 )
UPD Payload length:
- dns.id 2
- dns.flags 2
- dns.count.queries 2
- dns.count.answers 2
- dns.count.auth_rr 2
- dns.count.add_rr 2
- queries (query-size + 6)
- answers :
- dns.resp.name 2
- dns.resp.type 2
- dns.resp.class 2
- dns.resp.ttl 4
- dns.resp.len 2
- dns.txt.length 1
- dns.txt resp_data_size
So the total size is roughly a fixed overhead of `39`, and the size of the
query (domain name) and response.
The query size is, for example, FVY6INQ6LZ33WLCHO3BPR3FH6Y.snap.mainnet.ethdisco.net (52)
We also have some static data in the response, such as `enrtree-branch:`, and potentially
splitting the response up with `" "`, leaving us with a size of roughly `400` that we need
to stay below.
The number `370` is used to have some margin for extra overhead (for example, the dns query
may be larger - more subdomains).
*/
const (
hashAbbrevSize = 1 + 16*13/8 // Size of an encoded hash (plus comma)
maxChildren = 370 / hashAbbrevSize // 13 children
minHashLength = 12
)
// MakeTree creates a tree containing the given nodes and links.
func MakeTree(seq uint, nodes []*enode.Node, links []string) (*Tree, error) {
// Sort records by ID and ensure all nodes have a valid record.
records := make([]*enode.Node, len(nodes))
copy(records, nodes)
sortByID(records)
for _, n := range records {
if len(n.Record().Signature()) == 0 {
return nil, fmt.Errorf("can't add node %v: unsigned node record", n.ID())
}
}
// Create the leaf list.
enrEntries := make([]entry, len(records))
for i, r := range records {
enrEntries[i] = &enrEntry{r}
}
linkEntries := make([]entry, len(links))
for i, l := range links {
le, err := parseLink(l)
if err != nil {
return nil, err
}
linkEntries[i] = le
}
// Create intermediate nodes.
t := &Tree{entries: make(map[string]entry)}
eroot := t.build(enrEntries)
t.entries[subdomain(eroot)] = eroot
lroot := t.build(linkEntries)
t.entries[subdomain(lroot)] = lroot
t.root = &rootEntry{seq: seq, eroot: subdomain(eroot), lroot: subdomain(lroot)}
return t, nil
}
func (t *Tree) build(entries []entry) entry {
if len(entries) == 1 {
return entries[0]
}
if len(entries) <= maxChildren {
hashes := make([]string, len(entries))
for i, e := range entries {
hashes[i] = subdomain(e)
t.entries[hashes[i]] = e
}
return &branchEntry{hashes}
}
var subtrees []entry
for len(entries) > 0 {
n := maxChildren
if len(entries) < n {
n = len(entries)
}
sub := t.build(entries[:n])
entries = entries[n:]
subtrees = append(subtrees, sub)
t.entries[subdomain(sub)] = sub
}
return t.build(subtrees)
}
func sortByID(nodes []*enode.Node) []*enode.Node {
sort.Slice(nodes, func(i, j int) bool {
return bytes.Compare(nodes[i].ID().Bytes(), nodes[j].ID().Bytes()) < 0
})
return nodes
}
// Entry Types
type entry interface {
fmt.Stringer
}
type (
rootEntry struct {
eroot string
lroot string
seq uint
sig []byte
}
branchEntry struct {
children []string
}
enrEntry struct {
node *enode.Node
}
linkEntry struct {
str string
domain string
pubkey *ecdsa.PublicKey
}
)
// Entry Encoding
var (
b32format = base32.StdEncoding.WithPadding(base32.NoPadding)
b64format = base64.RawURLEncoding
)
const (
rootPrefix = "enrtree-root:v1"
linkPrefix = "enrtree://"
branchPrefix = "enrtree-branch:"
enrPrefix = "enr:"
)
func subdomain(e entry) string {
h := sha3.NewLegacyKeccak256()
io.WriteString(h, e.String())
return b32format.EncodeToString(h.Sum(nil)[:16])
}
func (e *rootEntry) String() string {
return fmt.Sprintf(rootPrefix+" e=%s l=%s seq=%d sig=%s", e.eroot, e.lroot, e.seq, b64format.EncodeToString(e.sig))
}
func (e *rootEntry) sigHash() []byte {
h := sha3.NewLegacyKeccak256()
fmt.Fprintf(h, rootPrefix+" e=%s l=%s seq=%d", e.eroot, e.lroot, e.seq)
return h.Sum(nil)
}
func (e *rootEntry) verifySignature(pubkey *ecdsa.PublicKey) bool {
sig := e.sig[:crypto.RecoveryIDOffset] // remove recovery id
enckey := crypto.FromECDSAPub(pubkey)
return crypto.VerifySignature(enckey, e.sigHash(), sig)
}
func (e *branchEntry) String() string {
return branchPrefix + strings.Join(e.children, ",")
}
func (e *enrEntry) String() string {
return e.node.String()
}
func (e *linkEntry) String() string {
return linkPrefix + e.str
}
func newLinkEntry(domain string, pubkey *ecdsa.PublicKey) *linkEntry {
key := b32format.EncodeToString(crypto.CompressPubkey(pubkey))
str := key + "@" + domain
return &linkEntry{str, domain, pubkey}
}
// Entry Parsing
func parseEntry(e string, validSchemes enr.IdentityScheme) (entry, error) {
switch {
case strings.HasPrefix(e, linkPrefix):
return parseLinkEntry(e)
case strings.HasPrefix(e, branchPrefix):
return parseBranch(e)
case strings.HasPrefix(e, enrPrefix):
return parseENR(e, validSchemes)
default:
return nil, errUnknownEntry
}
}
func parseRoot(e string) (rootEntry, error) {
var eroot, lroot, sig string
var seq uint
if _, err := fmt.Sscanf(e, rootPrefix+" e=%s l=%s seq=%d sig=%s", &eroot, &lroot, &seq, &sig); err != nil {
return rootEntry{}, entryError{"root", errSyntax}
}
if !isValidHash(eroot) || !isValidHash(lroot) {
return rootEntry{}, entryError{"root", errInvalidChild}
}
sigb, err := b64format.DecodeString(sig)
if err != nil || len(sigb) != crypto.SignatureLength {
return rootEntry{}, entryError{"root", errInvalidSig}
}
return rootEntry{eroot, lroot, seq, sigb}, nil
}
func parseLinkEntry(e string) (entry, error) {
le, err := parseLink(e)
if err != nil {
return nil, err
}
return le, nil
}
func parseLink(e string) (*linkEntry, error) {
if !strings.HasPrefix(e, linkPrefix) {
return nil, fmt.Errorf("wrong/missing scheme 'enrtree' in URL")
}
e = e[len(linkPrefix):]
pos := strings.IndexByte(e, '@')
if pos == -1 {
return nil, entryError{"link", errNoPubkey}
}
keystring, domain := e[:pos], e[pos+1:]
keybytes, err := b32format.DecodeString(keystring)
if err != nil {
return nil, entryError{"link", errBadPubkey}
}
key, err := crypto.DecompressPubkey(keybytes)
if err != nil {
return nil, entryError{"link", errBadPubkey}
}
return &linkEntry{e, domain, key}, nil
}
func parseBranch(e string) (entry, error) {
e = e[len(branchPrefix):]
if e == "" {
return &branchEntry{}, nil // empty entry is OK
}
hashes := make([]string, 0, strings.Count(e, ","))
for _, c := range strings.Split(e, ",") {
if !isValidHash(c) {
return nil, entryError{"branch", errInvalidChild}
}
hashes = append(hashes, c)
}
return &branchEntry{hashes}, nil
}
func parseENR(e string, validSchemes enr.IdentityScheme) (entry, error) {
e = e[len(enrPrefix):]
enc, err := b64format.DecodeString(e)
if err != nil {
return nil, entryError{"enr", errInvalidENR}
}
var rec enr.Record
if err := rlp.DecodeBytes(enc, &rec); err != nil {
return nil, entryError{"enr", err}
}
n, err := enode.New(validSchemes, &rec)
if err != nil {
return nil, entryError{"enr", err}
}
return &enrEntry{n}, nil
}
func isValidHash(s string) bool {
dlen := b32format.DecodedLen(len(s))
if dlen < minHashLength || dlen > 32 || strings.ContainsAny(s, "\n\r") {
return false
}
buf := make([]byte, 32)
_, err := b32format.Decode(buf, []byte(s))
return err == nil
}
// truncateHash truncates the given base32 hash string to the minimum acceptable length.
func truncateHash(hash string) string {
maxLen := b32format.EncodedLen(minHashLength)
if len(hash) < maxLen {
panic(fmt.Errorf("dnsdisc: hash %q is too short", hash))
}
return hash[:maxLen]
}
// URL encoding
// ParseURL parses an enrtree:// URL and returns its components.
func ParseURL(url string) (domain string, pubkey *ecdsa.PublicKey, err error) {
le, err := parseLink(url)
if err != nil {
return "", nil, err
}
return le.domain, le.pubkey, nil
}

@ -3,8 +3,10 @@ package persistence
import (
"database/sql"
"log"
"time"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
)
type MessageProvider interface {
@ -17,6 +19,9 @@ type MessageProvider interface {
type DBStore struct {
MessageProvider
db *sql.DB
maxMessages int
maxDuration time.Duration
}
type StoredMessage struct {
@ -49,17 +54,33 @@ func WithDriver(driverName string, datasourceName string) DBOption {
}
}
func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption {
return func(d *DBStore) error {
d.maxDuration = maxDuration
d.maxMessages = maxMessages
return nil
}
}
// Creates a new DB store using the db specified via options.
// It will create a messages table if it does not exist
func NewDBStore(opt DBOption) (*DBStore, error) {
// It will create a messages table if it does not exist and
// clean up records according to the retention policy used
func NewDBStore(options ...DBOption) (*DBStore, error) {
result := new(DBStore)
err := opt(result)
for _, opt := range options {
err := opt(result)
if err != nil {
return nil, err
}
}
err := result.createTable()
if err != nil {
return nil, err
}
err = result.createTable()
err = result.cleanOlderRecords()
if err != nil {
return nil, err
}
@ -84,6 +105,28 @@ func (d *DBStore) createTable() error {
return nil
}
func (d *DBStore) cleanOlderRecords() error {
// Delete older messages
if d.maxDuration > 0 {
sqlStmt := `DELETE FROM message WHERE receiverTimestamp < ?`
_, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(time.Now().Add(-d.maxDuration)))
if err != nil {
return err
}
}
// Limit number of records to a max N
if d.maxMessages > 0 {
sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET 5)`
_, err := d.db.Exec(sqlStmt, d.maxMessages)
if err != nil {
return err
}
}
return nil
}
// Closes a DB connection
func (d *DBStore) Stop() {
d.db.Close()

@ -0,0 +1,65 @@
package discovery
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
type DnsDiscoveryParameters struct {
nameserver string
}
type DnsDiscoveryOption func(*DnsDiscoveryParameters)
// WithMultiaddress is a WakuNodeOption that configures libp2p to listen on a list of multiaddresses
func WithNameserver(nameserver string) DnsDiscoveryOption {
return func(params *DnsDiscoveryParameters) {
params.nameserver = nameserver
}
}
// RetrieveNodes returns a list of multiaddress given a url to a DNS discoverable
// ENR tree
func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) ([]ma.Multiaddr, error) {
var multiAddrs []ma.Multiaddr
params := new(DnsDiscoveryParameters)
for _, opt := range opts {
opt(params)
}
client := dnsdisc.NewClient(dnsdisc.Config{
Resolver: GetResolver(ctx, params.nameserver),
})
tree, err := client.SyncTree(url)
if err != nil {
return nil, err
}
for _, node := range tree.Nodes() {
m, err := EnodeToMultiAddr(node)
if err != nil {
return nil, err
}
multiAddrs = append(multiAddrs, m)
}
return multiAddrs, nil
}
func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
peerID, err := peer.IDFromPublicKey(&ECDSAPublicKey{node.Pubkey()})
if err != nil {
return nil, err
}
return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", node.IP(), node.TCP(), peerID))
}

@ -0,0 +1,79 @@
package discovery
import (
"crypto/ecdsa"
"crypto/subtle"
"encoding/asn1"
"errors"
"math/big"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/libp2p/go-libp2p-core/crypto"
pb "github.com/libp2p/go-libp2p-core/crypto/pb"
"github.com/minio/sha256-simd"
)
// Taken from: https://github.com/libp2p/go-libp2p-core/blob/094b0d3f8ba2934339cb35e1a875b11ab6d08839/crypto/ecdsa.go as
// they don't provide a way to set the key
var ErrNilSig = errors.New("sig is nil")
// ECDSASig holds the r and s values of an ECDSA signature
type ECDSASig struct {
R, S *big.Int
}
// ECDSAPublicKey is an implementation of an ECDSA public key
type ECDSAPublicKey struct {
pub *ecdsa.PublicKey
}
// Type returns the key type
func (ePub *ECDSAPublicKey) Type() pb.KeyType {
return pb.KeyType_Secp256k1
}
// Raw returns x509 bytes from a public key
func (ePub *ECDSAPublicKey) Raw() ([]byte, error) {
return ethcrypto.CompressPubkey(ePub.pub), nil
}
// Bytes returns the public key as protobuf bytes
func (ePub *ECDSAPublicKey) Bytes() ([]byte, error) {
return crypto.MarshalPublicKey(ePub)
}
// Equals compares to public keys
func (ePub *ECDSAPublicKey) Equals(o crypto.Key) bool {
return basicEquals(ePub, o)
}
// Verify compares data to a signature
func (ePub *ECDSAPublicKey) Verify(data, sigBytes []byte) (bool, error) {
sig := new(ECDSASig)
if _, err := asn1.Unmarshal(sigBytes, sig); err != nil {
return false, err
}
if sig == nil {
return false, ErrNilSig
}
hash := sha256.Sum256(data)
return ecdsa.Verify(ePub.pub, hash[:], sig.R, sig.S), nil
}
func basicEquals(k1, k2 crypto.Key) bool {
if k1.Type() != k2.Type() {
return false
}
a, err := k1.Raw()
if err != nil {
return false
}
b, err := k2.Raw()
if err != nil {
return false
}
return subtle.ConstantTimeCompare(a, b) == 1
}

@ -0,0 +1,22 @@
package discovery
import (
"context"
"net"
)
// GetResolver returns a *net.Resolver object using a custom nameserver, or
// the default system resolver if no nameserver is specified
func GetResolver(ctx context.Context, nameserver string) *net.Resolver {
if nameserver == "" {
return net.DefaultResolver
}
return &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
d := net.Dialer{}
return d.DialContext(ctx, network, net.JoinHostPort(nameserver, "53"))
},
}
}

@ -48,7 +48,7 @@ type KeyInfo struct {
}
// Encodes a payload depending on the version parameter.
// Encode encodes a payload depending on the version parameter.
// 0 for raw unencrypted data, and 1 for using WakuV1 encoding.
func (payload Payload) Encode(version uint32) ([]byte, error) {
switch version {
@ -105,7 +105,7 @@ func EncodeWakuMessage(message *pb.WakuMessage, keyInfo *KeyInfo) error {
return nil
}
// Decodes a WakuMessage depending on the version parameter.
// DecodePayload decodes a WakuMessage depending on the version parameter.
// 0 for raw unencrypted data, and 1 for using WakuV1 decoding
func DecodePayload(message *pb.WakuMessage, keyInfo *KeyInfo) (*DecodedPayload, error) {
switch message.Version {

@ -2,7 +2,6 @@ package node
import (
"context"
"errors"
"fmt"
"time"
@ -22,10 +21,8 @@ import (
rendezvous "github.com/status-im/go-waku-rendezvous"
v2 "github.com/status-im/go-waku/waku/v2"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/protocol/store"
"github.com/status-im/go-waku/waku/v2/utils"
@ -48,8 +45,6 @@ type WakuNode struct {
bcaster v2.Broadcaster
filters filter.Filters
connectionNotif ConnectionNotifier
protocolEventSub event.Subscription
identificationEventSub event.Subscription
@ -112,8 +107,8 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
return nil, err
}
if params.connStatusChan != nil {
w.connStatusChan = params.connStatusChan
if params.connStatusC != nil {
w.connStatusChan = params.connStatusC
}
w.connectionNotif = NewConnectionNotifier(ctx, host)
@ -133,17 +128,13 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
}
func (w *WakuNode) Start() error {
w.store = store.NewWakuStore(w.opts.messageProvider)
w.store = store.NewWakuStore(w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration)
if w.opts.enableStore {
w.startStore()
}
if w.opts.enableFilter {
w.filters = make(filter.Filters)
err := w.mountFilter()
if err != nil {
return err
}
w.filter = filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode)
}
if w.opts.enableRendezvous {
@ -201,10 +192,6 @@ func (w *WakuNode) Stop() {
if w.filter != nil {
w.filter.Stop()
for _, filter := range w.filters {
close(filter.Chan)
}
w.filters = nil
}
w.relay.Stop()
@ -266,18 +253,6 @@ func (w *WakuNode) mountRelay(opts ...pubsub.Option) error {
return err
}
func (w *WakuNode) mountFilter() error {
filterHandler := func(requestId string, msg pb.MessagePush) {
for _, message := range msg.Messages {
w.filters.Notify(message, requestId) // Trigger filter handlers on a light node
}
}
w.filter = filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, filterHandler)
return nil
}
func (w *WakuNode) mountRendezvous() error {
w.rendezvous = rendezvous.NewRendezvousService(w.host, w.opts.rendevousStorage)
@ -344,113 +319,6 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer.
return &info.ID, w.addPeer(info, protocolID)
}
// Wrapper around WakuFilter.Subscribe
// that adds a Filter object to node.filters
func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilter) (filterID string, ch chan *protocol.Envelope, err error) {
if node.filter == nil {
err = errors.New("WakuFilter is not set")
return
}
// TODO: should be possible to pass the peerID as option or autoselect peer.
// TODO: check if there's an existing pubsub topic that uses the same peer. If so, reuse filter, and return same channel and filterID
// Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
// ContentFilterChan takes MessagePush structs
subs, err := node.filter.Subscribe(ctx, f)
if err != nil || subs.RequestID == "" {
// Failed to subscribe
log.Error("remote subscription to filter failed", err)
return
}
ch = make(chan *protocol.Envelope, 1024) // To avoid blocking
// Register handler for filter, whether remote subscription succeeded or not
node.filters[subs.RequestID] = filter.Filter{
PeerID: subs.Peer,
Topic: f.Topic,
ContentFilters: f.ContentTopics,
Chan: ch,
}
return subs.RequestID, ch, nil
}
// UnsubscribeFilterByID removes a subscription to a filter node completely
// using the filterID returned when the subscription was created
func (node *WakuNode) UnsubscribeFilterByID(ctx context.Context, filterID string) error {
var f filter.Filter
var ok bool
if f, ok = node.filters[filterID]; !ok {
return errors.New("filter not found")
}
cf := filter.ContentFilter{
Topic: f.Topic,
ContentTopics: f.ContentFilters,
}
err := node.filter.Unsubscribe(ctx, cf, f.PeerID)
if err != nil {
return err
}
close(f.Chan)
delete(node.filters, filterID)
return nil
}
// Unsubscribe filter removes content topics from a filter subscription. If all
// the contentTopics are removed the subscription is dropped completely
func (node *WakuNode) UnsubscribeFilter(ctx context.Context, cf filter.ContentFilter) error {
// Remove local filter
var idsToRemove []string
for id, f := range node.filters {
if f.Topic != cf.Topic {
continue
}
// Send message to full node in order to unsubscribe
err := node.filter.Unsubscribe(ctx, cf, f.PeerID)
if err != nil {
return err
}
// Iterate filter entries to remove matching content topics
// make sure we delete the content filter
// if no more topics are left
for _, cfToDelete := range cf.ContentTopics {
for i, cf := range f.ContentFilters {
if cf == cfToDelete {
l := len(f.ContentFilters) - 1
f.ContentFilters[l], f.ContentFilters[i] = f.ContentFilters[i], f.ContentFilters[l]
f.ContentFilters = f.ContentFilters[:l]
break
}
}
if len(f.ContentFilters) == 0 {
idsToRemove = append(idsToRemove, id)
}
}
}
for _, rId := range idsToRemove {
for id := range node.filters {
if id == rId {
close(node.filters[id].Chan)
delete(node.filters, id)
break
}
}
}
return nil
}
func (w *WakuNode) DialPeerWithMultiAddress(ctx context.Context, address ma.Multiaddr) error {
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {

@ -37,6 +37,8 @@ type WakuNodeParameters struct {
shouldResume bool
storeMsgs bool
messageProvider store.MessageProvider
maxMessages int
maxDuration time.Duration
enableRendezvous bool
enableRendezvousServer bool
@ -47,15 +49,17 @@ type WakuNodeParameters struct {
enableLightPush bool
connStatusChan chan ConnStatus
connStatusC chan ConnStatus
}
type WakuNodeOption func(*WakuNodeParameters) error
// MultiAddresses return the list of multiaddresses configured in the node
func (w WakuNodeParameters) MultiAddresses() []ma.Multiaddr {
return w.multiAddr
}
// Identity returns a libp2p option containing the identity used by the node
func (w WakuNodeParameters) Identity() config.Option {
return libp2p.Identity(*w.privKey)
}
@ -134,6 +138,8 @@ func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption {
}
}
// WithRendezvous is a WakuOption used to enable go-waku-rendezvous discovery.
// It accepts an optional list of DiscoveryOpt options
func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableRendezvous = true
@ -142,6 +148,8 @@ func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption {
}
}
// WithRendezvousServer is a WakuOption used to set the node as a rendezvous
// point, using an specific storage for the peer information
func WithRendezvousServer(storage rendezvous.Storage) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableRendezvousServer = true
@ -171,6 +179,19 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption {
}
}
// WithWakuStoreAndRetentionPolicy enables the Waku V2 Store protocol, storing them in an optional message provider
// applying an specific retention policy
func WithWakuStoreAndRetentionPolicy(shouldResume bool, maxDuration time.Duration, maxMessages int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableStore = true
params.storeMsgs = true
params.shouldResume = shouldResume
params.maxDuration = maxDuration
params.maxMessages = maxMessages
return nil
}
}
// WithMessageProvider is a WakuNodeOption that sets the MessageProvider
// used to store and retrieve persisted messages
func WithMessageProvider(s store.MessageProvider) WakuNodeOption {
@ -188,6 +209,8 @@ func WithLightPush() WakuNodeOption {
}
}
// WithKeepAlive is a WakuNodeOption used to set the interval of time when
// each peer will be ping to keep the TCP connection alive
func WithKeepAlive(t time.Duration) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.keepAliveInterval = t
@ -195,9 +218,12 @@ func WithKeepAlive(t time.Duration) WakuNodeOption {
}
}
func WithConnStatusChan(connStatusChan chan ConnStatus) WakuNodeOption {
// WithConnectionStatusChannel is a WakuNodeOption used to set a channel where the
// connection status changes will be pushed to. It's useful to identify when peer
// connections and disconnections occur
func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.connStatusChan = connStatusChan
params.connStatusC = connStatus
return nil
}
}

@ -2,6 +2,9 @@ package protocol
import "github.com/status-im/go-waku/waku/v2/protocol/pb"
// Envelope contains information about the pubsub topic of a WakuMessage
// and a hash used to identify a message based on the bytes of a WakuMessage
// protobuffer
type Envelope struct {
msg *pb.WakuMessage
pubsubTopic string

@ -0,0 +1,101 @@
package filter
import (
"sync"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
)
type FilterMap struct {
sync.RWMutex
items map[string]Filter
}
type FilterMapItem struct {
Key string
Value Filter
}
func NewFilterMap() *FilterMap {
return &FilterMap{
items: make(map[string]Filter),
}
}
func (fm *FilterMap) Set(key string, value Filter) {
fm.Lock()
defer fm.Unlock()
fm.items[key] = value
}
func (fm *FilterMap) Get(key string) (Filter, bool) {
fm.Lock()
defer fm.Unlock()
value, ok := fm.items[key]
return value, ok
}
func (fm *FilterMap) Delete(key string) {
fm.Lock()
defer fm.Unlock()
close(fm.items[key].Chan)
delete(fm.items, key)
}
func (fm *FilterMap) RemoveAll() {
fm.Lock()
defer fm.Unlock()
for k, v := range fm.items {
close(v.Chan)
delete(fm.items, k)
}
}
func (fm *FilterMap) Items() <-chan FilterMapItem {
c := make(chan FilterMapItem)
f := func() {
fm.RLock()
defer fm.RUnlock()
for k, v := range fm.items {
c <- FilterMapItem{k, v}
}
close(c)
}
go f()
return c
}
func (fm *FilterMap) Notify(msg *pb.WakuMessage, requestId string) {
fm.RLock()
defer fm.RUnlock()
for key, filter := range fm.items {
envelope := protocol.NewEnvelope(msg, filter.Topic)
// We do this because the key for the filter is set to the requestId received from the filter protocol.
// This means we do not need to check the content filter explicitly as all MessagePushs already contain
// the requestId of the coresponding filter.
if requestId != "" && requestId == key {
filter.Chan <- envelope
continue
}
// TODO: In case of no topics we should either trigger here for all messages,
// or we should not allow such filter to exist in the first place.
for _, contentTopic := range filter.ContentFilters {
if msg.ContentTopic == contentTopic {
filter.Chan <- envelope
break
}
}
}
}

@ -0,0 +1,94 @@
package filter
import (
"sync"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
)
type Subscriber struct {
peer peer.ID
requestId string
filter pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN?
}
type Subscribers struct {
sync.RWMutex
subscribers []Subscriber
}
func NewSubscribers() *Subscribers {
return &Subscribers{}
}
func (self *Subscribers) Append(s Subscriber) int {
self.Lock()
defer self.Unlock()
self.subscribers = append(self.subscribers, s)
return len(self.subscribers)
}
func (self *Subscribers) Items() <-chan Subscriber {
c := make(chan Subscriber)
f := func() {
self.RLock()
defer self.RUnlock()
for _, value := range self.subscribers {
c <- value
}
close(c)
}
go f()
return c
}
func (self *Subscribers) Length() int {
self.RLock()
defer self.RUnlock()
return len(self.subscribers)
}
func (self *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) {
var peerIdsToRemove []peer.ID
for _, subscriber := range self.subscribers {
if subscriber.peer != peerID {
continue
}
// make sure we delete the content filter
// if no more topics are left
for i, contentFilter := range contentFilters {
subCfs := subscriber.filter.ContentFilters
for _, cf := range subCfs {
if cf.ContentTopic == contentFilter.ContentTopic {
l := len(subCfs) - 1
subCfs[l], subCfs[i] = subCfs[i], subCfs[l]
subscriber.filter.ContentFilters = subCfs[:l]
}
}
}
if len(subscriber.filter.ContentFilters) == 0 {
peerIdsToRemove = append(peerIdsToRemove, subscriber.peer)
}
}
// make sure we delete the subscriber
// if no more content filters left
for _, peerId := range peerIdsToRemove {
for i, s := range self.subscribers {
if s.peer == peerId {
l := len(self.subscribers) - 1
self.subscribers[l], self.subscribers[i] = self.subscribers[i], self.subscribers[l]
self.subscribers = self.subscribers[:l]
break
}
}
}
}

@ -46,29 +46,19 @@ type (
ContentTopics []string
}
// @TODO MAYBE MORE INFO?
Filters map[string]Filter
Subscriber struct {
peer peer.ID
requestId string
filter pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN?
}
FilterSubscription struct {
RequestID string
Peer peer.ID
}
MessagePushHandler func(requestId string, msg pb.MessagePush)
WakuFilter struct {
ctx context.Context
h host.Host
subscribers []Subscriber
isFullNode bool
pushHandler MessagePushHandler
MsgC chan *protocol.Envelope
ctx context.Context
h host.Host
isFullNode bool
MsgC chan *protocol.Envelope
filters *FilterMap
subscribers *Subscribers
}
)
@ -101,29 +91,6 @@ func DefaultOptions() []FilterSubscribeOption {
}
}
func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) {
for key, filter := range *filters {
envelope := protocol.NewEnvelope(msg, filter.Topic)
// We do this because the key for the filter is set to the requestId received from the filter protocol.
// This means we do not need to check the content filter explicitly as all MessagePushs already contain
// the requestId of the coresponding filter.
if requestId != "" && requestId == key {
filter.Chan <- envelope
continue
}
// TODO: In case of no topics we should either trigger here for all messages,
// or we should not allow such filter to exist in the first place.
for _, contentTopic := range filter.ContentFilters {
if msg.ContentTopic == contentTopic {
filter.Chan <- envelope
break
}
}
}
}
func (wf *WakuFilter) onRequest(s network.Stream) {
defer s.Close()
@ -142,7 +109,9 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 {
// We're on a light node.
// This is a message push coming from a full node.
wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push)
for _, message := range filterRPCRequest.Push.Messages {
wf.filters.Notify(message, filterRPCRequest.RequestId) // Trigger filter handlers on a light node
}
log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages")
stats.Record(wf.ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages))))
@ -151,52 +120,16 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
// This is a filter request coming from a light node.
if filterRPCRequest.Request.Subscribe {
subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request}
wf.subscribers = append(wf.subscribers, subscriber)
log.Info("filter full node, add a filter subscriber: ", subscriber.peer)
len := wf.subscribers.Append(subscriber)
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers))))
log.Info("filter full node, add a filter subscriber: ", subscriber.peer)
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len)))
} else {
peerId := s.Conn().RemotePeer()
wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.Request.ContentFilters)
log.Info("filter full node, remove a filter subscriber: ", peerId.Pretty())
contentFilters := filterRPCRequest.Request.ContentFilters
var peerIdsToRemove []peer.ID
for _, subscriber := range wf.subscribers {
if subscriber.peer != peerId {
continue
}
// make sure we delete the content filter
// if no more topics are left
for i, contentFilter := range contentFilters {
subCfs := subscriber.filter.ContentFilters
for _, cf := range subCfs {
if cf.ContentTopic == contentFilter.ContentTopic {
l := len(subCfs) - 1
subCfs[l], subCfs[i] = subCfs[i], subCfs[l]
subscriber.filter.ContentFilters = subCfs[:l]
}
}
}
if len(subscriber.filter.ContentFilters) == 0 {
peerIdsToRemove = append(peerIdsToRemove, subscriber.peer)
}
}
// make sure we delete the subscriber
// if no more content filters left
for _, peerId := range peerIdsToRemove {
for i, s := range wf.subscribers {
if s.peer == peerId {
l := len(wf.subscribers) - 1
wf.subscribers[l], wf.subscribers[i] = wf.subscribers[i], wf.subscribers[l]
wf.subscribers = wf.subscribers[:l]
break
}
}
}
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers))))
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(wf.subscribers.Length())))
}
} else {
log.Error("can't serve request")
@ -204,7 +137,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
}
}
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, handler MessagePushHandler) *WakuFilter {
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter {
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
if err != nil {
log.Error(err)
@ -214,8 +147,9 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, handler
wf.ctx = ctx
wf.MsgC = make(chan *protocol.Envelope)
wf.h = host
wf.pushHandler = handler
wf.isFullNode = isFullNode
wf.filters = NewFilterMap()
wf.subscribers = NewSubscribers()
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
go wf.FilterListener()
@ -229,6 +163,29 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, handler
return wf
}
func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error {
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}}
conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1)
// TODO: keep track of errors to automatically unsubscribe a peer?
if err != nil {
// @TODO more sophisticated error handling here
log.Error("failed to open peer stream")
//waku_filter_errors.inc(labelValues = [dialFailure])
return err
}
defer conn.Close()
writer := protoio.NewDelimitedWriter(conn)
err = writer.WriteMsg(pushRPC)
if err != nil {
log.Error("failed to push messages to remote peer")
return nil
}
return nil
}
func (wf *WakuFilter) FilterListener() {
// This function is invoked for each message received
// on the full node in context of Waku2-Filter
@ -237,7 +194,7 @@ func (wf *WakuFilter) FilterListener() {
topic := envelope.PubsubTopic()
// Each subscriber is a light node that earlier on invoked
// a FilterRequest on this node
for _, subscriber := range wf.subscribers {
for subscriber := range wf.subscribers.Items() {
if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic {
log.Info("Subscriber's filter pubsubTopic does not match message topic", subscriber.filter.Topic, topic)
continue
@ -246,28 +203,12 @@ func (wf *WakuFilter) FilterListener() {
for _, filter := range subscriber.filter.ContentFilters {
if msg.ContentTopic == filter.ContentTopic {
log.Info("found matching contentTopic ", filter, msg)
msgArr := []*pb.WakuMessage{msg}
// Do a message push to light node
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: msgArr}}
log.Info("pushing a message to light node: ", pushRPC)
conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1)
// TODO: keep track of errors to automatically unsubscribe a peer?
if err != nil {
// @TODO more sophisticated error handling here
log.Error("failed to open peer stream")
//waku_filter_errors.inc(labelValues = [dialFailure])
log.Info("pushing messages to light node: ", subscriber.peer)
if err := wf.pushMessage(subscriber, msg); err != nil {
return err
}
defer conn.Close()
writer := protoio.NewDelimitedWriter(conn)
err = writer.WriteMsg(pushRPC)
if err != nil {
log.Error("failed to push messages to remote peer")
return nil
}
}
}
}
@ -286,7 +227,7 @@ func (wf *WakuFilter) FilterListener() {
// Having a FilterRequest struct,
// select a peer with filter support, dial it,
// and submit FilterRequest wrapped in FilterRPC
func (wf *WakuFilter) Subscribe(ctx context.Context, filter ContentFilter, opts ...FilterSubscribeOption) (subscription *FilterSubscription, err error) {
func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFilter, opts ...FilterSubscribeOption) (subscription *FilterSubscription, err error) {
params := new(FilterSubscribeParameters)
params.host = wf.h
@ -338,7 +279,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, filter ContentFilter, opts
return
}
func (wf *WakuFilter) Unsubscribe(ctx context.Context, filter ContentFilter, peer peer.ID) error {
func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilter, peer peer.ID) error {
conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1)
if err != nil {
@ -351,13 +292,13 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, filter ContentFilter, pee
id := protocol.GenerateRequestId()
var contentFilters []*pb.FilterRequest_ContentFilter
for _, ct := range filter.ContentTopics {
for _, ct := range contentFilter.ContentTopics {
contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct})
}
request := pb.FilterRequest{
Subscribe: false,
Topic: filter.Topic,
Topic: contentFilter.Topic,
ContentFilters: contentFilters,
}
@ -373,4 +314,103 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, filter ContentFilter, pee
func (wf *WakuFilter) Stop() {
wf.h.RemoveStreamHandler(FilterID_v20beta1)
wf.filters.RemoveAll()
}
func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...FilterSubscribeOption) (filterID string, theFilter Filter, err error) {
// TODO: check if there's an existing pubsub topic that uses the same peer. If so, reuse filter, and return same channel and filterID
// Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
// ContentFilterChan takes MessagePush structs
remoteSubs, err := wf.requestSubscription(ctx, f, opts...)
if err != nil || remoteSubs.RequestID == "" {
// Failed to subscribe
log.Error("remote subscription to filter failed", err)
return
}
// Register handler for filter, whether remote subscription succeeded or not
filterID = remoteSubs.RequestID
theFilter = Filter{
PeerID: remoteSubs.Peer,
Topic: f.Topic,
ContentFilters: f.ContentTopics,
Chan: make(chan *protocol.Envelope, 1024), // To avoid blocking
}
wf.filters.Set(filterID, theFilter)
return
}
// UnsubscribeFilterByID removes a subscription to a filter node completely
// using the filterID returned when the subscription was created
func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string) error {
var f Filter
var ok bool
if f, ok = wf.filters.Get(filterID); !ok {
return errors.New("filter not found")
}
cf := ContentFilter{
Topic: f.Topic,
ContentTopics: f.ContentFilters,
}
err := wf.Unsubscribe(ctx, cf, f.PeerID)
if err != nil {
return err
}
wf.filters.Delete(filterID)
return nil
}
// Unsubscribe filter removes content topics from a filter subscription. If all
// the contentTopics are removed the subscription is dropped completely
func (wf *WakuFilter) UnsubscribeFilter(ctx context.Context, cf ContentFilter) error {
// Remove local filter
var idsToRemove []string
for filterMapItem := range wf.filters.Items() {
f := filterMapItem.Value
id := filterMapItem.Key
if f.Topic != cf.Topic {
continue
}
// Send message to full node in order to unsubscribe
err := wf.Unsubscribe(ctx, cf, f.PeerID)
if err != nil {
return err
}
// Iterate filter entries to remove matching content topics
// make sure we delete the content filter
// if no more topics are left
for _, cfToDelete := range cf.ContentTopics {
for i, cf := range f.ContentFilters {
if cf == cfToDelete {
l := len(f.ContentFilters) - 1
f.ContentFilters[l], f.ContentFilters[i] = f.ContentFilters[i], f.ContentFilters[l]
f.ContentFilters = f.ContentFilters[:l]
break
}
}
if len(f.ContentFilters) == 0 {
idsToRemove = append(idsToRemove, id)
}
}
}
for _, rId := range idsToRemove {
wf.filters.Delete(rId)
}
return nil
}

@ -9,14 +9,12 @@ import (
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-msgio/protoio"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
utils "github.com/status-im/go-waku/waku/v2/utils"
)
var log = logging.Logger("waku_lightpush")
@ -44,8 +42,8 @@ func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay)
}
func (wakuLP *WakuLightPush) Start() error {
if wakuLP.relay == nil {
return errors.New("relay is required")
if wakuLP.IsClientOnly() {
return errors.New("relay is required, without it, it is only a client and cannot be started")
}
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest)
@ -54,6 +52,10 @@ func (wakuLP *WakuLightPush) Start() error {
return nil
}
func (wakuLp *WakuLightPush) IsClientOnly() bool {
return wakuLp.relay == nil
}
func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
defer s.Close()
@ -73,11 +75,11 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
if requestPushRPC.Query != nil {
log.Info("lightpush push request")
pubSubTopic := relay.Topic(requestPushRPC.Query.PubsubTopic)
message := requestPushRPC.Query.Message
response := new(pb.PushResponse)
if wakuLP.relay != nil {
if !wakuLP.IsClientOnly() {
pubSubTopic := relay.Topic(requestPushRPC.Query.PubsubTopic)
message := requestPushRPC.Query.Message
// TODO: Assumes success, should probably be extended to check for network, peers, etc
// It might make sense to use WithReadiness option here?
@ -118,56 +120,10 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
}
}
type LightPushParameters struct {
selectedPeer peer.ID
requestId []byte
lp *WakuLightPush
}
type LightPushOption func(*LightPushParameters)
func WithPeer(p peer.ID) LightPushOption {
return func(params *LightPushParameters) {
params.selectedPeer = p
}
}
func WithAutomaticPeerSelection() LightPushOption {
return func(params *LightPushParameters) {
p, err := utils.SelectPeer(params.lp.h, string(LightPushID_v20beta1))
if err == nil {
params.selectedPeer = *p
} else {
log.Info("Error selecting peer: ", err)
}
}
}
func WithRequestId(requestId []byte) LightPushOption {
return func(params *LightPushParameters) {
params.requestId = requestId
}
}
func WithAutomaticRequestId() LightPushOption {
return func(params *LightPushParameters) {
params.requestId = protocol.GenerateRequestId()
}
}
func DefaultOptions() []LightPushOption {
return []LightPushOption{
WithAutomaticRequestId(),
WithAutomaticPeerSelection(),
}
}
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
params := new(LightPushParameters)
params.lp = wakuLP
optList := DefaultOptions()
optList := DefaultOptions(wakuLP.h)
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
@ -220,11 +176,11 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
return pushResponseRPC.Response, nil
}
func (w *WakuLightPush) Stop() {
w.h.RemoveStreamHandler(LightPushID_v20beta1)
func (wakuLP *WakuLightPush) Stop() {
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
}
func (w *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) {
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) {
if message == nil {
return nil, errors.New("message can't be null")
}
@ -233,7 +189,7 @@ func (w *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, to
req.Message = message
req.PubsubTopic = string(relay.GetTopic(topic))
response, err := w.request(ctx, req, opts...)
response, err := wakuLP.request(ctx, req, opts...)
if err != nil {
return nil, err
}

@ -0,0 +1,51 @@
package lightpush
import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/utils"
)
type LightPushParameters struct {
selectedPeer peer.ID
requestId []byte
}
type LightPushOption func(*LightPushParameters)
func WithPeer(p peer.ID) LightPushOption {
return func(params *LightPushParameters) {
params.selectedPeer = p
}
}
func WithAutomaticPeerSelection(host host.Host) LightPushOption {
return func(params *LightPushParameters) {
p, err := utils.SelectPeer(host, string(LightPushID_v20beta1))
if err == nil {
params.selectedPeer = *p
} else {
log.Info("Error selecting peer: ", err)
}
}
}
func WithRequestId(requestId []byte) LightPushOption {
return func(params *LightPushParameters) {
params.requestId = requestId
}
}
func WithAutomaticRequestId() LightPushOption {
return func(params *LightPushParameters) {
params.requestId = protocol.GenerateRequestId()
}
}
func DefaultOptions(host host.Host) []LightPushOption {
return []LightPushOption{
WithAutomaticRequestId(),
WithAutomaticPeerSelection(host),
}
}

@ -33,13 +33,15 @@ type WakuRelay struct {
host host.Host
pubsub *pubsub.PubSub
topics map[Topic]bool
bcaster v2.Broadcaster
// TODO: convert to concurrent maps
topics map[Topic]struct{}
topicsMutex sync.Mutex
wakuRelayTopics map[Topic]*pubsub.Topic
relaySubs map[Topic]*pubsub.Subscription
bcaster v2.Broadcaster
// TODO: convert to concurrent maps
subscriptions map[Topic][]*Subscription
subscriptionsMutex sync.Mutex
}
@ -53,7 +55,7 @@ func msgIdFn(pmsg *pubsub_pb.Message) string {
func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) {
w := new(WakuRelay)
w.host = h
w.topics = make(map[Topic]bool)
w.topics = make(map[Topic]struct{})
w.wakuRelayTopics = make(map[Topic]*pubsub.Topic)
w.relaySubs = make(map[Topic]*pubsub.Subscription)
w.subscriptions = make(map[Topic][]*Subscription)
@ -112,7 +114,7 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
w.topics[topic] = true
w.topics[topic] = struct{}{}
pubSubTopic, ok := w.wakuRelayTopics[topic]
if !ok { // Joins topic if node hasn't joined yet
newTopic, err := w.pubsub.Join(string(topic))

@ -0,0 +1,109 @@
package store
import (
"sync"
"time"
"github.com/status-im/go-waku/waku/v2/utils"
)
type MessageQueue struct {
sync.RWMutex
seen map[[32]byte]struct{}
messages []IndexedWakuMessage
maxMessages int
maxDuration time.Duration
quit chan struct{}
}
func (self *MessageQueue) Push(msg IndexedWakuMessage) {
self.Lock()
defer self.Unlock()
var k [32]byte
copy(k[:], msg.index.Digest)
if _, ok := self.seen[k]; ok {
return
}
self.seen[k] = struct{}{}
self.messages = append(self.messages, msg)
if self.maxMessages != 0 && len(self.messages) > self.maxMessages {
numToPop := len(self.messages) - self.maxMessages
self.messages = self.messages[numToPop:len(self.messages)]
}
}
func (self *MessageQueue) Messages() <-chan IndexedWakuMessage {
c := make(chan IndexedWakuMessage)
f := func() {
self.RLock()
defer self.RUnlock()
for _, value := range self.messages {
c <- value
}
close(c)
}
go f()
return c
}
func (self *MessageQueue) cleanOlderRecords() {
self.Lock()
defer self.Unlock()
// TODO: check if retention days was set
t := utils.GetUnixEpochFrom(time.Now().Add(-self.maxDuration))
var idx int
for i := 0; i < len(self.messages); i++ {
if self.messages[i].index.ReceiverTime >= t {
idx = i
break
}
}
self.messages = self.messages[idx:]
}
func (self *MessageQueue) checkForOlderRecords(d time.Duration) {
ticker := time.NewTicker(d)
select {
case <-self.quit:
return
case <-ticker.C:
self.cleanOlderRecords()
}
}
func (self *MessageQueue) Length() int {
self.RLock()
defer self.RUnlock()
return len(self.messages)
}
func NewMessageQueue(maxMessages int, maxDuration time.Duration) *MessageQueue {
result := &MessageQueue{
maxMessages: maxMessages,
maxDuration: maxDuration,
seen: make(map[[32]byte]struct{}),
quit: make(chan struct{}),
}
if maxDuration != 0 {
go result.checkForOlderRecords(10 * time.Second) // is 10s okay?
}
return result
}
func (self *MessageQueue) Stop() {
close(self.quit)
}

@ -3,13 +3,12 @@ package store
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"math"
"sort"
"sync"
"time"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
@ -27,8 +26,11 @@ import (
var log = logging.Logger("wakustore")
// StoreID_v20beta3 is the current Waku Store protocol identifier
const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta3")
const MaxPageSize = 100 // Maximum number of waku messages in each page
// MaxPageSize is the maximum number of waku messages to return per page
const MaxPageSize = 100
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
@ -138,11 +140,11 @@ func paginateWithoutIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resM
return
}
func (w *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse {
func (store *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse {
result := new(pb.HistoryResponse)
// data holds IndexedWakuMessage whose topics match the query
var data []IndexedWakuMessage
for _, indexedMsg := range w.messages {
for indexedMsg := range store.messageQueue.Messages() {
// temporal filtering
// check whether the history query contains a time filter
if query.StartTime != 0 && query.EndTime != 0 {
@ -195,6 +197,7 @@ type Query struct {
EndTime float64
}
// Result represents a valid response from a store node
type Result struct {
Messages []*pb.WakuMessage
@ -222,31 +225,30 @@ type IndexedWakuMessage struct {
}
type WakuStore struct {
ctx context.Context
MsgC chan *protocol.Envelope
messages []IndexedWakuMessage
seen map[[32]byte]struct{}
ctx context.Context
MsgC chan *protocol.Envelope
started bool
messagesMutex sync.Mutex
msgProvider MessageProvider
h host.Host
messageQueue *MessageQueue
msgProvider MessageProvider
h host.Host
}
func NewWakuStore(p MessageProvider) *WakuStore {
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.msgProvider = p
wakuStore.seen = make(map[[32]byte]struct{})
wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration)
return wakuStore
}
func (store *WakuStore) SetMsgProvider(p MessageProvider) {
// SetMessageProvider allows switching the message provider used with a WakuStore
func (store *WakuStore) SetMessageProvider(p MessageProvider) {
store.msgProvider = p
}
// Start initializes the WakuStore by enabling the protocol and fetching records from a message provider
func (store *WakuStore) Start(ctx context.Context, h host.Host) {
if store.started {
return
@ -255,7 +257,7 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host) {
store.started = true
store.h = h
store.ctx = ctx
store.MsgC = make(chan *protocol.Envelope)
store.MsgC = make(chan *protocol.Envelope, 1024)
store.h.SetStreamHandlerMatch(StoreID_v20beta3, protocol.PrefixTextMatch(string(StoreID_v20beta3)), store.onRequest)
@ -291,53 +293,42 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) {
store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message)
metrics.RecordMessage(ctx, "stored", len(store.messages))
metrics.RecordMessage(ctx, "stored", store.messageQueue.Length())
}
}
func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) {
var k [32]byte
copy(k[:], idx.Digest)
if _, ok := store.seen[k]; ok {
return
}
store.seen[k] = struct{}{}
store.messages = append(store.messages, IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic})
store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic})
}
func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) {
index, err := computeIndex(msg)
func (store *WakuStore) storeMessage(env *protocol.Envelope) {
index, err := computeIndex(env)
if err != nil {
log.Error("could not calculate message index", err)
return
}
store.messagesMutex.Lock()
defer store.messagesMutex.Unlock()
store.storeMessageWithIndex(pubSubTopic, index, msg)
store.storeMessageWithIndex(env.PubsubTopic(), index, env.Message())
if store.msgProvider == nil {
metrics.RecordMessage(store.ctx, "stored", len(store.messages))
metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length())
return
}
err = store.msgProvider.Put(index, pubSubTopic, msg) // Should the index be stored?
// TODO: Move this to a separate go routine if DB writes becomes a bottleneck
err = store.msgProvider.Put(index, env.PubsubTopic(), env.Message()) // Should the index be stored?
if err != nil {
log.Error("could not store message", err)
metrics.RecordStoreError(store.ctx, "store_failure")
return
}
metrics.RecordMessage(store.ctx, "stored", len(store.messages))
metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length())
}
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
for envelope := range store.MsgC {
store.storeMessage(envelope.PubsubTopic(), envelope.Message())
store.storeMessage(envelope)
}
}
@ -371,16 +362,11 @@ func (store *WakuStore) onRequest(s network.Stream) {
}
}
func computeIndex(msg *pb.WakuMessage) (*pb.Index, error) {
data, err := msg.Marshal()
if err != nil {
return nil, err
}
digest := sha256.Sum256(data)
func computeIndex(env *protocol.Envelope) (*pb.Index, error) {
return &pb.Index{
Digest: digest[:],
Digest: env.Hash(),
ReceiverTime: utils.GetUnixEpoch(),
SenderTime: msg.Timestamp,
SenderTime: env.Message().Timestamp,
}, nil
}
@ -436,12 +422,15 @@ type HistoryRequestParameters struct {
type HistoryRequestOption func(*HistoryRequestParameters)
// WithPeer is an option used to specify the peerID to request the message history
func WithPeer(p peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.selectedPeer = p
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the store
// to request the message history
func WithAutomaticPeerSelection() HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta3))
@ -471,6 +460,7 @@ func WithCursor(c *pb.Index) HistoryRequestOption {
}
}
// WithPaging is an option used to specify the order and maximum number of records to return
func WithPaging(asc bool, pageSize uint64) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.asc = asc
@ -478,16 +468,17 @@ func WithPaging(asc bool, pageSize uint64) HistoryRequestOption {
}
}
// Default options to be used when querying a store node for results
func DefaultOptions() []HistoryRequestOption {
return []HistoryRequestOption{
WithAutomaticRequestId(),
WithAutomaticPeerSelection(),
WithPaging(true, 0),
WithPaging(true, MaxPageSize),
}
}
func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) {
log.Info(fmt.Sprintf("Resuming message history with peer %s", selectedPeer))
log.Info(fmt.Sprintf("Querying message history with peer %s", selectedPeer))
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta3)
if err != nil {
@ -519,7 +510,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
return nil, err
}
metrics.RecordMessage(ctx, "retrieved", len(store.messages))
metrics.RecordMessage(ctx, "retrieved", store.messageQueue.Length())
return historyResponseRPC.Response, nil
}
@ -583,6 +574,10 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
}, nil
}
// Next is used with to retrieve the next page of rows from a query response.
// If no more records are found, the result will not contain any messages.
// This function is useful for iterating over results without having to manually
// specify the cursor and pagination order and max number of results
func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {
q := &pb.HistoryQuery{
PubsubTopic: r.query.PubsubTopic,
@ -633,7 +628,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c
func (store *WakuStore) findLastSeen() float64 {
var lastSeenTime float64 = 0
for _, imsg := range store.messages {
for imsg := range store.messageQueue.Messages() {
if imsg.msg.Timestamp > lastSeenTime {
lastSeenTime = imsg.msg.Timestamp
}
@ -641,7 +636,7 @@ func (store *WakuStore) findLastSeen() float64 {
return lastSeenTime
}
// resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
// Resume retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
// messages are stored in the store node's messages field and in the message db
// the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
// an offset of 20 second is added to the time window to count for nodes asynchrony
@ -649,7 +644,6 @@ func (store *WakuStore) findLastSeen() float64 {
// peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
// if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window.
// the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) {
if !store.started {
return 0, errors.New("can't resume: store has not started")
@ -695,7 +689,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
}
for _, msg := range response.Messages {
store.storeMessage(pubsubTopic, msg)
store.storeMessage(protocol.NewEnvelope(msg, pubsubTopic))
}
log.Info("Retrieved messages since the last online time: ", len(response.Messages))
@ -705,14 +699,15 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
// TODO: queryWithAccounting
func (w *WakuStore) Stop() {
w.started = false
// Stop closes the store message channel and removes the protocol stream handler
func (store *WakuStore) Stop() {
store.started = false
if w.MsgC != nil {
close(w.MsgC)
if store.MsgC != nil {
close(store.MsgC)
}
if w.h != nil {
w.h.RemoveStreamHandler(StoreID_v20beta3)
if store.h != nil {
store.h.RemoveStreamHandler(StoreID_v20beta3)
}
}

@ -2,10 +2,14 @@ package utils
import "time"
func GetUnixEpochFrom(now func() time.Time) float64 {
return float64(now().UnixNano()) / float64(time.Second)
// GetUnixEpoch converts a time into a unix timestamp with the integer part
// representing seconds and the decimal part representing subseconds
func GetUnixEpochFrom(now time.Time) float64 {
return float64(now.UnixNano()) / float64(time.Second)
}
// GetUnixEpoch returns the current time in unix timestamp with the integer part
// representing seconds and the decimal part representing subseconds
func GetUnixEpoch() float64 {
return GetUnixEpochFrom(time.Now)
return GetUnixEpochFrom(time.Now())
}

212
vendor/golang.org/x/sync/singleflight/singleflight.go generated vendored Normal file

@ -0,0 +1,212 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight // import "golang.org/x/sync/singleflight"
import (
"bytes"
"errors"
"fmt"
"runtime"
"runtime/debug"
"sync"
)
// errGoexit indicates the runtime.Goexit was called in
// the user given function.
var errGoexit = errors.New("runtime.Goexit was called")
// A panicError is an arbitrary value recovered from a panic
// with the stack trace during the execution of given function.
type panicError struct {
value interface{}
stack []byte
}
// Error implements error interface.
func (p *panicError) Error() string {
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}
func newPanicError(v interface{}) error {
stack := debug.Stack()
// The first line of the stack trace is of the form "goroutine N [status]:"
// but by the time the panic reaches Do the goroutine may no longer exist
// and its status will have changed. Trim out the misleading line.
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val interface{}
err error
// forgotten indicates whether Forget was called with this call's key
// while the call was still in flight.
forgotten bool
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}
c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()
if !c.forgotten {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}

3
vendor/golang.org/x/time/AUTHORS generated vendored Normal file

@ -0,0 +1,3 @@
# This source code refers to The Go Authors for copyright purposes.
# The master list of authors is in the main Go distribution,
# visible at http://tip.golang.org/AUTHORS.

3
vendor/golang.org/x/time/CONTRIBUTORS generated vendored Normal file

@ -0,0 +1,3 @@
# This source code was written by the Go contributors.
# The master list of contributors is in the main Go distribution,
# visible at http://tip.golang.org/CONTRIBUTORS.

27
vendor/golang.org/x/time/LICENSE generated vendored Normal file

@ -0,0 +1,27 @@
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

22
vendor/golang.org/x/time/PATENTS generated vendored Normal file

@ -0,0 +1,22 @@
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.

402
vendor/golang.org/x/time/rate/rate.go generated vendored Normal file

@ -0,0 +1,402 @@
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package rate provides a rate limiter.
package rate
import (
"context"
"fmt"
"math"
"sync"
"time"
)
// Limit defines the maximum frequency of some events.
// Limit is represented as number of events per second.
// A zero Limit allows no events.
type Limit float64
// Inf is the infinite rate limit; it allows all events (even if burst is zero).
const Inf = Limit(math.MaxFloat64)
// Every converts a minimum time interval between events to a Limit.
func Every(interval time.Duration) Limit {
if interval <= 0 {
return Inf
}
return 1 / Limit(interval.Seconds())
}
// A Limiter controls how frequently events are allowed to happen.
// It implements a "token bucket" of size b, initially full and refilled
// at rate r tokens per second.
// Informally, in any large enough time interval, the Limiter limits the
// rate to r tokens per second, with a maximum burst size of b events.
// As a special case, if r == Inf (the infinite rate), b is ignored.
// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
//
// The zero value is a valid Limiter, but it will reject all events.
// Use NewLimiter to create non-zero Limiters.
//
// Limiter has three main methods, Allow, Reserve, and Wait.
// Most callers should use Wait.
//
// Each of the three methods consumes a single token.
// They differ in their behavior when no token is available.
// If no token is available, Allow returns false.
// If no token is available, Reserve returns a reservation for a future token
// and the amount of time the caller must wait before using it.
// If no token is available, Wait blocks until one can be obtained
// or its associated context.Context is canceled.
//
// The methods AllowN, ReserveN, and WaitN consume n tokens.
type Limiter struct {
mu sync.Mutex
limit Limit
burst int
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}
// Limit returns the maximum overall event rate.
func (lim *Limiter) Limit() Limit {
lim.mu.Lock()
defer lim.mu.Unlock()
return lim.limit
}
// Burst returns the maximum burst size. Burst is the maximum number of tokens
// that can be consumed in a single call to Allow, Reserve, or Wait, so higher
// Burst values allow more events to happen at once.
// A zero Burst allows no events, unless limit == Inf.
func (lim *Limiter) Burst() int {
lim.mu.Lock()
defer lim.mu.Unlock()
return lim.burst
}
// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok
}
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
ok bool
lim *Limiter
tokens int
timeToAct time.Time
// This is the Limit at reservation time, it can change later.
limit Limit
}
// OK returns whether the limiter can provide the requested number of tokens
// within the maximum wait time. If OK is false, Delay returns InfDuration, and
// Cancel does nothing.
func (r *Reservation) OK() bool {
return r.ok
}
// Delay is shorthand for DelayFrom(time.Now()).
func (r *Reservation) Delay() time.Duration {
return r.DelayFrom(time.Now())
}
// InfDuration is the duration returned by Delay when a Reservation is not OK.
const InfDuration = time.Duration(1<<63 - 1)
// DelayFrom returns the duration for which the reservation holder must wait
// before taking the reserved action. Zero duration means act immediately.
// InfDuration means the limiter cannot grant the tokens requested in this
// Reservation within the maximum wait time.
func (r *Reservation) DelayFrom(now time.Time) time.Duration {
if !r.ok {
return InfDuration
}
delay := r.timeToAct.Sub(now)
if delay < 0 {
return 0
}
return delay
}
// Cancel is shorthand for CancelAt(time.Now()).
func (r *Reservation) Cancel() {
r.CancelAt(time.Now())
return
}
// CancelAt indicates that the reservation holder will not perform the reserved action
// and reverses the effects of this Reservation on the rate limit as much as possible,
// considering that other reservations may have already been made.
func (r *Reservation) CancelAt(now time.Time) {
if !r.ok {
return
}
r.lim.mu.Lock()
defer r.lim.mu.Unlock()
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
return
}
// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}
// advance time to now
now, _, tokens := r.lim.advance(now)
// calculate new number of tokens
tokens += restoreTokens
if burst := float64(r.lim.burst); tokens > burst {
tokens = burst
}
// update state
r.lim.last = now
r.lim.tokens = tokens
if r.timeToAct == r.lim.lastEvent {
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(now) {
r.lim.lastEvent = prevEvent
}
}
return
}
// Reserve is shorthand for ReserveN(time.Now(), 1).
func (lim *Limiter) Reserve() *Reservation {
return lim.ReserveN(time.Now(), 1)
}
// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
// The Limiter takes this Reservation into account when allowing future events.
// The returned Reservations OK() method returns false if n exceeds the Limiter's burst size.
// Usage example:
// r := lim.ReserveN(time.Now(), 1)
// if !r.OK() {
// // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
// return
// }
// time.Sleep(r.Delay())
// Act()
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
// If you need to respect a deadline or cancel the delay, use Wait instead.
// To drop or skip events exceeding rate limit, use Allow instead.
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
r := lim.reserveN(now, n, InfDuration)
return &r
}
// Wait is shorthand for WaitN(ctx, 1).
func (lim *Limiter) Wait(ctx context.Context) (err error) {
return lim.WaitN(ctx, 1)
}
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
lim.mu.Lock()
burst := lim.burst
limit := lim.limit
lim.mu.Unlock()
if n > burst && limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
}
// Check if ctx is already cancelled
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Determine wait limit
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
// Reserve
r := lim.reserveN(now, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// Wait if necessary
delay := r.DelayFrom(now)
if delay == 0 {
return nil
}
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
// We can proceed.
return nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
r.Cancel()
return ctx.Err()
}
}
// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit).
func (lim *Limiter) SetLimit(newLimit Limit) {
lim.SetLimitAt(time.Now(), newLimit)
}
// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
// or underutilized by those which reserved (using Reserve or Wait) but did not yet act
// before SetLimitAt was called.
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) {
lim.mu.Lock()
defer lim.mu.Unlock()
now, _, tokens := lim.advance(now)
lim.last = now
lim.tokens = tokens
lim.limit = newLimit
}
// SetBurst is shorthand for SetBurstAt(time.Now(), newBurst).
func (lim *Limiter) SetBurst(newBurst int) {
lim.SetBurstAt(time.Now(), newBurst)
}
// SetBurstAt sets a new burst size for the limiter.
func (lim *Limiter) SetBurstAt(now time.Time, newBurst int) {
lim.mu.Lock()
defer lim.mu.Unlock()
now, _, tokens := lim.advance(now)
lim.last = now
lim.tokens = tokens
lim.burst = newBurst
}
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Avoid making delta overflow below when last is very old.
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}
// Calculate the new number of tokens, due to time that passed.
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
// durationFromTokens is a unit conversion function from the number of tokens to the duration
// of time it takes to accumulate them at a rate of limit tokens per second.
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
seconds := tokens / float64(limit)
return time.Nanosecond * time.Duration(1e9*seconds)
}
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
// Split the integer and fractional parts ourself to minimize rounding errors.
// See golang.org/issues/34861.
sec := float64(d/time.Second) * float64(limit)
nsec := float64(d%time.Second) * float64(limit)
return sec + nsec/1e9
}

7
vendor/modules.txt vendored

@ -101,6 +101,7 @@ github.com/ethereum/go-ethereum/p2p/discover
github.com/ethereum/go-ethereum/p2p/discover/v4wire
github.com/ethereum/go-ethereum/p2p/discover/v5wire
github.com/ethereum/go-ethereum/p2p/discv5
github.com/ethereum/go-ethereum/p2p/dnsdisc
github.com/ethereum/go-ethereum/p2p/enode
github.com/ethereum/go-ethereum/p2p/enr
github.com/ethereum/go-ethereum/p2p/msgrate
@ -446,9 +447,10 @@ github.com/spacemonkeygo/spacelog
github.com/status-im/doubleratchet
# github.com/status-im/go-multiaddr-ethv4 v1.2.1
github.com/status-im/go-multiaddr-ethv4
# github.com/status-im/go-waku v0.0.0-20211101173358-268767262b60
# github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d
github.com/status-im/go-waku/waku/persistence
github.com/status-im/go-waku/waku/v2
github.com/status-im/go-waku/waku/v2/discovery
github.com/status-im/go-waku/waku/v2/metrics
github.com/status-im/go-waku/waku/v2/node
github.com/status-im/go-waku/waku/v2/protocol
@ -602,6 +604,7 @@ golang.org/x/net/publicsuffix
golang.org/x/net/route
# golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sync/errgroup
golang.org/x/sync/singleflight
# golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912
golang.org/x/sys/cpu
golang.org/x/sys/internal/unsafeheader
@ -631,6 +634,8 @@ golang.org/x/text/secure/bidirule
golang.org/x/text/transform
golang.org/x/text/unicode/bidi
golang.org/x/text/unicode/norm
# golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
golang.org/x/time/rate
# google.golang.org/protobuf v1.27.1
google.golang.org/protobuf/encoding/prototext
google.golang.org/protobuf/encoding/protowire

@ -27,6 +27,7 @@ import (
"fmt"
"net"
"runtime"
"strings"
"sync"
"time"
@ -56,6 +57,7 @@ import (
libp2pproto "github.com/libp2p/go-libp2p-core/protocol"
rendezvous "github.com/status-im/go-waku-rendezvous"
"github.com/status-im/go-waku/waku/v2/discovery"
"github.com/status-im/go-waku/waku/v2/protocol"
wakuprotocol "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
@ -94,6 +96,9 @@ type ConnStatus struct {
type Waku struct {
node *node.WakuNode // reference to a libp2p waku node
dnsAddressCache map[string][]multiaddr.Multiaddr // Map to store the multiaddresses returned by dns discovery
dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map
filters *common.Filters // Message filters installed with Subscribe function
filterMsgChannel chan *protocol.Envelope // Channel for wakuv2 filter messages
@ -133,14 +138,16 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
}
waku := &Waku{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage),
expirations: make(map[uint32]mapset.Set),
msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit),
quit: make(chan struct{}),
timeSource: time.Now,
logger: logger,
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage),
expirations: make(map[uint32]mapset.Set),
msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit),
quit: make(chan struct{}),
dnsAddressCache: make(map[string][]multiaddr.Multiaddr),
dnsAddressCacheLock: &sync.RWMutex{},
timeSource: time.Now,
logger: logger,
}
waku.settings = settings{
@ -202,13 +209,10 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
}
opts := []node.WakuNodeOption{
node.WithLibP2POptions(
libp2pOpts...,
),
node.WithLibP2POptions(libp2pOpts...),
node.WithPrivateKey(privateKey),
node.WithHostAddress([]*net.TCPAddr{hostAddr}),
node.WithWakuStore(false, false), // Mounts the store protocol (without storing the messages)
node.WithConnStatusChan(connStatusChan),
node.WithConnectionStatusChannel(connStatusChan),
node.WithKeepAlive(time.Duration(cfg.KeepAliveInterval) * time.Second),
}
@ -260,48 +264,89 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
return waku, nil
}
func (w *Waku) addPeers(addresses []string, protocol libp2pproto.ID) {
type fnApplyToEachPeer func(ma multiaddr.Multiaddr, protocol libp2pproto.ID)
func (w *Waku) addPeers(addresses []string, protocol libp2pproto.ID, apply fnApplyToEachPeer) {
for _, addrString := range addresses {
if addrString == "" {
continue
}
addr, err := multiaddr.NewMultiaddr(addrString)
if err != nil {
log.Warn("invalid peer multiaddress", addrString, err)
continue
if strings.HasPrefix(addrString, "enrtree://") {
// Use DNS Discovery
go w.dnsDiscover(addrString, protocol, apply)
} else {
// It's a normal multiaddress
w.addPeerFromString(addrString, protocol, apply)
}
peerID, err := w.node.AddPeer(addr, protocol)
if err != nil {
log.Warn("could not add peer", addr, err)
continue
}
log.Info("peer added successfully", peerID)
}
}
func (w *Waku) dnsDiscover(enrtreeAddress string, protocol libp2pproto.ID, apply fnApplyToEachPeer) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
w.dnsAddressCacheLock.RLock()
multiaddresses, ok := w.dnsAddressCache[enrtreeAddress]
w.dnsAddressCacheLock.RUnlock()
if !ok {
w.dnsAddressCacheLock.Lock()
var err error
multiaddresses, err = discovery.RetrieveNodes(ctx, enrtreeAddress)
w.dnsAddressCache[enrtreeAddress] = multiaddresses
w.dnsAddressCacheLock.Unlock()
if err != nil {
log.Warn("dns discovery error ", err)
return
}
}
for _, m := range multiaddresses {
apply(m, protocol)
}
}
func (w *Waku) addPeerFromString(addrString string, protocol libp2pproto.ID, apply fnApplyToEachPeer) {
addr, err := multiaddr.NewMultiaddr(addrString)
if err != nil {
log.Warn("invalid peer multiaddress", addrString, err)
return
}
apply(addr, protocol)
}
func (w *Waku) addWakuV2Peers(cfg *Config) {
if !cfg.LightClient {
for _, relaynode := range cfg.RelayNodes {
go func(node string) {
addRelayPeer := func(m multiaddr.Multiaddr, protocol libp2pproto.ID) {
go func(node multiaddr.Multiaddr) {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err := w.node.DialPeer(ctx, node)
err := w.node.DialPeerWithMultiAddress(ctx, node)
if err != nil {
log.Warn("could not dial peer", err)
} else {
log.Info("relay peer dialed successfully", node)
}
}(relaynode)
}(m)
}
w.addPeers(cfg.RelayNodes, relay.WakuRelayID_v200, addRelayPeer)
}
w.addPeers(cfg.StoreNodes, store.StoreID_v20beta3)
w.addPeers(cfg.FilterNodes, filter.FilterID_v20beta1)
w.addPeers(cfg.LightpushNodes, lightpush.LightPushID_v20beta1)
w.addPeers(cfg.WakuRendezvousNodes, rendezvous.RendezvousID_v001)
addToStore := func(m multiaddr.Multiaddr, protocol libp2pproto.ID) {
peerID, err := w.node.AddPeer(m, protocol)
if err != nil {
log.Warn("could not add peer", m, err)
return
}
log.Info("peer added successfully", peerID)
}
w.addPeers(cfg.StoreNodes, store.StoreID_v20beta3, addToStore)
w.addPeers(cfg.FilterNodes, filter.FilterID_v20beta1, addToStore)
w.addPeers(cfg.LightpushNodes, lightpush.LightPushID_v20beta1, addToStore)
w.addPeers(cfg.WakuRendezvousNodes, rendezvous.RendezvousID_v001, addToStore)
}
func (w *Waku) GetStats() types.StatsSummary {
@ -358,15 +403,19 @@ func (w *Waku) subscribeWakuFilterTopic(topics [][]byte) {
}
var err error
filter := filter.ContentFilter{
contentFilter := filter.ContentFilter{
Topic: string(pubsubTopic),
ContentTopics: contentTopics,
}
_, w.filterMsgChannel, err = w.node.SubscribeFilter(context.Background(), filter)
var wakuFilter filter.Filter
_, wakuFilter, err = w.node.Filter().Subscribe(context.Background(), contentFilter)
if err != nil {
w.logger.Warn("could not add wakuv2 filter for topics", zap.Any("topics", topics))
return
}
w.filterMsgChannel = wakuFilter.Chan
}
// MaxMessageSize returns the maximum accepted message size.
@ -686,7 +735,8 @@ func (w *Waku) Unsubscribe(id string) error {
for _, topic := range f.Topics {
contentFilter.ContentTopics = append(contentFilter.ContentTopics, common.BytesToTopic(topic).ContentTopic())
}
if err := w.node.UnsubscribeFilter(context.Background(), contentFilter); err != nil {
if err := w.node.Filter().UnsubscribeFilter(context.Background(), contentFilter); err != nil {
return fmt.Errorf("failed to unsubscribe: %w", err)
}
}
@ -758,7 +808,10 @@ func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []s
Topic: string(relay.DefaultWakuTopic),
}
result, err := w.node.Store().Query(context.Background(), query, opts...)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
result, err := w.node.Store().Query(ctx, query, opts...)
if err != nil {
return
}