From f47229a466722943ae11c7cdcd52a7ce5d8d2239 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 9 Nov 2021 08:22:34 -0400 Subject: [PATCH] feat: add dns discovery to wakuv2 (#2422) --- go.mod | 2 +- go.sum | 5 +- .../go-ethereum/p2p/dnsdisc/client.go | 392 ++++++++++++++++ .../ethereum/go-ethereum/p2p/dnsdisc/doc.go | 18 + .../ethereum/go-ethereum/p2p/dnsdisc/error.go | 63 +++ .../ethereum/go-ethereum/p2p/dnsdisc/sync.go | 329 ++++++++++++++ .../ethereum/go-ethereum/p2p/dnsdisc/tree.go | 423 ++++++++++++++++++ .../go-waku/waku/persistence/store.go | 51 ++- .../go-waku/waku/v2/discovery/enr.go | 65 +++ .../go-waku/waku/v2/discovery/public_key.go | 79 ++++ .../go-waku/waku/v2/discovery/resolver.go | 22 + .../go-waku/waku/v2/node/waku_payload.go | 4 +- .../go-waku/waku/v2/node/wakunode2.go | 140 +----- .../go-waku/waku/v2/node/wakuoptions.go | 32 +- .../go-waku/waku/v2/protocol/envelope.go | 3 + .../waku/v2/protocol/filter/filter_map.go | 101 +++++ .../v2/protocol/filter/filter_subscribers.go | 94 ++++ .../waku/v2/protocol/filter/waku_filter.go | 256 ++++++----- .../v2/protocol/lightpush/waku_lightpush.go | 74 +-- .../lightpush/waku_lightpush_option.go | 51 +++ .../waku/v2/protocol/relay/waku_relay.go | 12 +- .../waku/v2/protocol/store/message_queue.go | 109 +++++ .../waku/v2/protocol/store/waku_store.go | 115 +++-- .../status-im/go-waku/waku/v2/utils/time.go | 10 +- .../x/sync/singleflight/singleflight.go | 212 +++++++++ vendor/golang.org/x/time/AUTHORS | 3 + vendor/golang.org/x/time/CONTRIBUTORS | 3 + vendor/golang.org/x/time/LICENSE | 27 ++ vendor/golang.org/x/time/PATENTS | 22 + vendor/golang.org/x/time/rate/rate.go | 402 +++++++++++++++++ vendor/modules.txt | 7 +- wakuv2/waku.go | 129 ++++-- 32 files changed, 2833 insertions(+), 422 deletions(-) create mode 100644 vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/client.go create mode 100644 vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/doc.go create mode 100644 vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/error.go create mode 100644 vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/sync.go create mode 100644 vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/tree.go create mode 100644 vendor/github.com/status-im/go-waku/waku/v2/discovery/enr.go create mode 100644 vendor/github.com/status-im/go-waku/waku/v2/discovery/public_key.go create mode 100644 vendor/github.com/status-im/go-waku/waku/v2/discovery/resolver.go create mode 100644 vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/filter_map.go create mode 100644 vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/filter_subscribers.go create mode 100644 vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go create mode 100644 vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go create mode 100644 vendor/golang.org/x/sync/singleflight/singleflight.go create mode 100644 vendor/golang.org/x/time/AUTHORS create mode 100644 vendor/golang.org/x/time/CONTRIBUTORS create mode 100644 vendor/golang.org/x/time/LICENSE create mode 100644 vendor/golang.org/x/time/PATENTS create mode 100644 vendor/golang.org/x/time/rate/rate.go diff --git a/go.mod b/go.mod index 6a326d04b..63ce6da83 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a github.com/status-im/doubleratchet v3.0.0+incompatible - github.com/status-im/go-waku v0.0.0-20211101173358-268767262b60 + github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6 github.com/status-im/migrate/v4 v4.6.2-status.2 diff --git a/go.sum b/go.sum index 1514b7a49..269563de3 100644 --- a/go.sum +++ b/go.sum @@ -432,6 +432,7 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z github.com/gorilla/mux v1.7.1/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= @@ -1206,8 +1207,8 @@ github.com/status-im/go-ethereum v1.10.4-status.3 h1:RF618iSCvqJtXu3ZSg7XNg6MJaS github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE= github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE= github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU= -github.com/status-im/go-waku v0.0.0-20211101173358-268767262b60 h1:ymq5jgtepOPqNbs8yA9g2hkFn5811snImNqNbQzpcDA= -github.com/status-im/go-waku v0.0.0-20211101173358-268767262b60/go.mod h1:A0lI3uZYLKrXiviVkwGgBdT8b9HLcW3U/xUcE/4665k= +github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d h1:mWatRmDv+xopBdnd4SYj6I4mrqS0fTgMy2Q0r79LD0w= +github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= diff --git a/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/client.go b/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/client.go new file mode 100644 index 000000000..096df06a5 --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/client.go @@ -0,0 +1,392 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package dnsdisc + +import ( + "bytes" + "context" + "fmt" + "math/rand" + "net" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + lru "github.com/hashicorp/golang-lru" + "golang.org/x/sync/singleflight" + "golang.org/x/time/rate" +) + +// Client discovers nodes by querying DNS servers. +type Client struct { + cfg Config + clock mclock.Clock + entries *lru.Cache + ratelimit *rate.Limiter + singleflight singleflight.Group +} + +// Config holds configuration options for the client. +type Config struct { + Timeout time.Duration // timeout used for DNS lookups (default 5s) + RecheckInterval time.Duration // time between tree root update checks (default 30min) + CacheLimit int // maximum number of cached records (default 1000) + RateLimit float64 // maximum DNS requests / second (default 3) + ValidSchemes enr.IdentityScheme // acceptable ENR identity schemes (default enode.ValidSchemes) + Resolver Resolver // the DNS resolver to use (defaults to system DNS) + Logger log.Logger // destination of client log messages (defaults to root logger) +} + +// Resolver is a DNS resolver that can query TXT records. +type Resolver interface { + LookupTXT(ctx context.Context, domain string) ([]string, error) +} + +func (cfg Config) withDefaults() Config { + const ( + defaultTimeout = 5 * time.Second + defaultRecheck = 30 * time.Minute + defaultRateLimit = 3 + defaultCache = 1000 + ) + if cfg.Timeout == 0 { + cfg.Timeout = defaultTimeout + } + if cfg.RecheckInterval == 0 { + cfg.RecheckInterval = defaultRecheck + } + if cfg.CacheLimit == 0 { + cfg.CacheLimit = defaultCache + } + if cfg.RateLimit == 0 { + cfg.RateLimit = defaultRateLimit + } + if cfg.ValidSchemes == nil { + cfg.ValidSchemes = enode.ValidSchemes + } + if cfg.Resolver == nil { + cfg.Resolver = new(net.Resolver) + } + if cfg.Logger == nil { + cfg.Logger = log.Root() + } + return cfg +} + +// NewClient creates a client. +func NewClient(cfg Config) *Client { + cfg = cfg.withDefaults() + cache, err := lru.New(cfg.CacheLimit) + if err != nil { + panic(err) + } + rlimit := rate.NewLimiter(rate.Limit(cfg.RateLimit), 10) + return &Client{ + cfg: cfg, + entries: cache, + clock: mclock.System{}, + ratelimit: rlimit, + } +} + +// SyncTree downloads the entire node tree at the given URL. +func (c *Client) SyncTree(url string) (*Tree, error) { + le, err := parseLink(url) + if err != nil { + return nil, fmt.Errorf("invalid enrtree URL: %v", err) + } + ct := newClientTree(c, new(linkCache), le) + t := &Tree{entries: make(map[string]entry)} + if err := ct.syncAll(t.entries); err != nil { + return nil, err + } + t.root = ct.root + return t, nil +} + +// NewIterator creates an iterator that visits all nodes at the +// given tree URLs. +func (c *Client) NewIterator(urls ...string) (enode.Iterator, error) { + it := c.newRandomIterator() + for _, url := range urls { + if err := it.addTree(url); err != nil { + return nil, err + } + } + return it, nil +} + +// resolveRoot retrieves a root entry via DNS. +func (c *Client) resolveRoot(ctx context.Context, loc *linkEntry) (rootEntry, error) { + e, err, _ := c.singleflight.Do(loc.str, func() (interface{}, error) { + txts, err := c.cfg.Resolver.LookupTXT(ctx, loc.domain) + c.cfg.Logger.Trace("Updating DNS discovery root", "tree", loc.domain, "err", err) + if err != nil { + return rootEntry{}, err + } + for _, txt := range txts { + if strings.HasPrefix(txt, rootPrefix) { + return parseAndVerifyRoot(txt, loc) + } + } + return rootEntry{}, nameError{loc.domain, errNoRoot} + }) + return e.(rootEntry), err +} + +func parseAndVerifyRoot(txt string, loc *linkEntry) (rootEntry, error) { + e, err := parseRoot(txt) + if err != nil { + return e, err + } + if !e.verifySignature(loc.pubkey) { + return e, entryError{typ: "root", err: errInvalidSig} + } + return e, nil +} + +// resolveEntry retrieves an entry from the cache or fetches it from the network +// if it isn't cached. +func (c *Client) resolveEntry(ctx context.Context, domain, hash string) (entry, error) { + // The rate limit always applies, even when the result might be cached. This is + // important because it avoids hot-spinning in consumers of node iterators created on + // this client. + if err := c.ratelimit.Wait(ctx); err != nil { + return nil, err + } + cacheKey := truncateHash(hash) + if e, ok := c.entries.Get(cacheKey); ok { + return e.(entry), nil + } + + ei, err, _ := c.singleflight.Do(cacheKey, func() (interface{}, error) { + e, err := c.doResolveEntry(ctx, domain, hash) + if err != nil { + return nil, err + } + c.entries.Add(cacheKey, e) + return e, nil + }) + e, _ := ei.(entry) + return e, err +} + +// doResolveEntry fetches an entry via DNS. +func (c *Client) doResolveEntry(ctx context.Context, domain, hash string) (entry, error) { + wantHash, err := b32format.DecodeString(hash) + if err != nil { + return nil, fmt.Errorf("invalid base32 hash") + } + name := hash + "." + domain + txts, err := c.cfg.Resolver.LookupTXT(ctx, hash+"."+domain) + c.cfg.Logger.Trace("DNS discovery lookup", "name", name, "err", err) + if err != nil { + return nil, err + } + for _, txt := range txts { + e, err := parseEntry(txt, c.cfg.ValidSchemes) + if err == errUnknownEntry { + continue + } + if !bytes.HasPrefix(crypto.Keccak256([]byte(txt)), wantHash) { + err = nameError{name, errHashMismatch} + } else if err != nil { + err = nameError{name, err} + } + return e, err + } + return nil, nameError{name, errNoEntry} +} + +// randomIterator traverses a set of trees and returns nodes found in them. +type randomIterator struct { + cur *enode.Node + ctx context.Context + cancelFn context.CancelFunc + c *Client + + mu sync.Mutex + lc linkCache // tracks tree dependencies + trees map[string]*clientTree // all trees + // buffers for syncableTrees + syncableList []*clientTree + disabledList []*clientTree +} + +func (c *Client) newRandomIterator() *randomIterator { + ctx, cancel := context.WithCancel(context.Background()) + return &randomIterator{ + c: c, + ctx: ctx, + cancelFn: cancel, + trees: make(map[string]*clientTree), + } +} + +// Node returns the current node. +func (it *randomIterator) Node() *enode.Node { + return it.cur +} + +// Close closes the iterator. +func (it *randomIterator) Close() { + it.cancelFn() + + it.mu.Lock() + defer it.mu.Unlock() + it.trees = nil +} + +// Next moves the iterator to the next node. +func (it *randomIterator) Next() bool { + it.cur = it.nextNode() + return it.cur != nil +} + +// addTree adds an enrtree:// URL to the iterator. +func (it *randomIterator) addTree(url string) error { + le, err := parseLink(url) + if err != nil { + return fmt.Errorf("invalid enrtree URL: %v", err) + } + it.lc.addLink("", le.str) + return nil +} + +// nextNode syncs random tree entries until it finds a node. +func (it *randomIterator) nextNode() *enode.Node { + for { + ct := it.pickTree() + if ct == nil { + return nil + } + n, err := ct.syncRandom(it.ctx) + if err != nil { + if err == it.ctx.Err() { + return nil // context canceled. + } + it.c.cfg.Logger.Debug("Error in DNS random node sync", "tree", ct.loc.domain, "err", err) + continue + } + if n != nil { + return n + } + } +} + +// pickTree returns a random tree to sync from. +func (it *randomIterator) pickTree() *clientTree { + it.mu.Lock() + defer it.mu.Unlock() + + // First check if iterator was closed. + // Need to do this here to avoid nil map access in rebuildTrees. + if it.trees == nil { + return nil + } + + // Rebuild the trees map if any links have changed. + if it.lc.changed { + it.rebuildTrees() + it.lc.changed = false + } + + for { + canSync, trees := it.syncableTrees() + switch { + case canSync: + // Pick a random tree. + return trees[rand.Intn(len(trees))] + case len(trees) > 0: + // No sync action can be performed on any tree right now. The only meaningful + // thing to do is waiting for any root record to get updated. + if !it.waitForRootUpdates(trees) { + // Iterator was closed while waiting. + return nil + } + default: + // There are no trees left, the iterator was closed. + return nil + } + } +} + +// syncableTrees finds trees on which any meaningful sync action can be performed. +func (it *randomIterator) syncableTrees() (canSync bool, trees []*clientTree) { + // Resize tree lists. + it.syncableList = it.syncableList[:0] + it.disabledList = it.disabledList[:0] + + // Partition them into the two lists. + for _, ct := range it.trees { + if ct.canSyncRandom() { + it.syncableList = append(it.syncableList, ct) + } else { + it.disabledList = append(it.disabledList, ct) + } + } + if len(it.syncableList) > 0 { + return true, it.syncableList + } + return false, it.disabledList +} + +// waitForRootUpdates waits for the closest scheduled root check time on the given trees. +func (it *randomIterator) waitForRootUpdates(trees []*clientTree) bool { + var minTree *clientTree + var nextCheck mclock.AbsTime + for _, ct := range trees { + check := ct.nextScheduledRootCheck() + if minTree == nil || check < nextCheck { + minTree = ct + nextCheck = check + } + } + + sleep := nextCheck.Sub(it.c.clock.Now()) + it.c.cfg.Logger.Debug("DNS iterator waiting for root updates", "sleep", sleep, "tree", minTree.loc.domain) + timeout := it.c.clock.NewTimer(sleep) + defer timeout.Stop() + select { + case <-timeout.C(): + return true + case <-it.ctx.Done(): + return false // Iterator was closed. + } +} + +// rebuildTrees rebuilds the 'trees' map. +func (it *randomIterator) rebuildTrees() { + // Delete removed trees. + for loc := range it.trees { + if !it.lc.isReferenced(loc) { + delete(it.trees, loc) + } + } + // Add new trees. + for loc := range it.lc.backrefs { + if it.trees[loc] == nil { + link, _ := parseLink(linkPrefix + loc) + it.trees[loc] = newClientTree(it.c, &it.lc, link) + } + } +} diff --git a/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/doc.go b/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/doc.go new file mode 100644 index 000000000..227467d08 --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/doc.go @@ -0,0 +1,18 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package dnsdisc implements node discovery via DNS (EIP-1459). +package dnsdisc diff --git a/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/error.go b/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/error.go new file mode 100644 index 000000000..e0998c735 --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/error.go @@ -0,0 +1,63 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package dnsdisc + +import ( + "errors" + "fmt" +) + +// Entry parse errors. +var ( + errUnknownEntry = errors.New("unknown entry type") + errNoPubkey = errors.New("missing public key") + errBadPubkey = errors.New("invalid public key") + errInvalidENR = errors.New("invalid node record") + errInvalidChild = errors.New("invalid child hash") + errInvalidSig = errors.New("invalid base64 signature") + errSyntax = errors.New("invalid syntax") +) + +// Resolver/sync errors +var ( + errNoRoot = errors.New("no valid root found") + errNoEntry = errors.New("no valid tree entry found") + errHashMismatch = errors.New("hash mismatch") + errENRInLinkTree = errors.New("enr entry in link tree") + errLinkInENRTree = errors.New("link entry in ENR tree") +) + +type nameError struct { + name string + err error +} + +func (err nameError) Error() string { + if ee, ok := err.err.(entryError); ok { + return fmt.Sprintf("invalid %s entry at %s: %v", ee.typ, err.name, ee.err) + } + return err.name + ": " + err.err.Error() +} + +type entryError struct { + typ string + err error +} + +func (err entryError) Error() string { + return fmt.Sprintf("invalid %s entry: %v", err.typ, err.err) +} diff --git a/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/sync.go b/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/sync.go new file mode 100644 index 000000000..073547c90 --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/sync.go @@ -0,0 +1,329 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package dnsdisc + +import ( + "context" + "math/rand" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +// This is the number of consecutive leaf requests that may fail before +// we consider re-resolving the tree root. +const rootRecheckFailCount = 5 + +// clientTree is a full tree being synced. +type clientTree struct { + c *Client + loc *linkEntry // link to this tree + + lastRootCheck mclock.AbsTime // last revalidation of root + leafFailCount int + rootFailCount int + + root *rootEntry + enrs *subtreeSync + links *subtreeSync + + lc *linkCache // tracks all links between all trees + curLinks map[string]struct{} // links contained in this tree + linkGCRoot string // root on which last link GC has run +} + +func newClientTree(c *Client, lc *linkCache, loc *linkEntry) *clientTree { + return &clientTree{c: c, lc: lc, loc: loc} +} + +// syncAll retrieves all entries of the tree. +func (ct *clientTree) syncAll(dest map[string]entry) error { + if err := ct.updateRoot(context.Background()); err != nil { + return err + } + if err := ct.links.resolveAll(dest); err != nil { + return err + } + if err := ct.enrs.resolveAll(dest); err != nil { + return err + } + return nil +} + +// syncRandom retrieves a single entry of the tree. The Node return value +// is non-nil if the entry was a node. +func (ct *clientTree) syncRandom(ctx context.Context) (n *enode.Node, err error) { + if ct.rootUpdateDue() { + if err := ct.updateRoot(ctx); err != nil { + return nil, err + } + } + + // Update fail counter for leaf request errors. + defer func() { + if err != nil { + ct.leafFailCount++ + } + }() + + // Link tree sync has priority, run it to completion before syncing ENRs. + if !ct.links.done() { + err := ct.syncNextLink(ctx) + return nil, err + } + ct.gcLinks() + + // Sync next random entry in ENR tree. Once every node has been visited, we simply + // start over. This is fine because entries are cached internally by the client LRU + // also by DNS resolvers. + if ct.enrs.done() { + ct.enrs = newSubtreeSync(ct.c, ct.loc, ct.root.eroot, false) + } + return ct.syncNextRandomENR(ctx) +} + +// canSyncRandom checks if any meaningful action can be performed by syncRandom. +func (ct *clientTree) canSyncRandom() bool { + // Note: the check for non-zero leaf count is very important here. + // If we're done syncing all nodes, and no leaves were found, the tree + // is empty and we can't use it for sync. + return ct.rootUpdateDue() || !ct.links.done() || !ct.enrs.done() || ct.enrs.leaves != 0 +} + +// gcLinks removes outdated links from the global link cache. GC runs once +// when the link sync finishes. +func (ct *clientTree) gcLinks() { + if !ct.links.done() || ct.root.lroot == ct.linkGCRoot { + return + } + ct.lc.resetLinks(ct.loc.str, ct.curLinks) + ct.linkGCRoot = ct.root.lroot +} + +func (ct *clientTree) syncNextLink(ctx context.Context) error { + hash := ct.links.missing[0] + e, err := ct.links.resolveNext(ctx, hash) + if err != nil { + return err + } + ct.links.missing = ct.links.missing[1:] + + if dest, ok := e.(*linkEntry); ok { + ct.lc.addLink(ct.loc.str, dest.str) + ct.curLinks[dest.str] = struct{}{} + } + return nil +} + +func (ct *clientTree) syncNextRandomENR(ctx context.Context) (*enode.Node, error) { + index := rand.Intn(len(ct.enrs.missing)) + hash := ct.enrs.missing[index] + e, err := ct.enrs.resolveNext(ctx, hash) + if err != nil { + return nil, err + } + ct.enrs.missing = removeHash(ct.enrs.missing, index) + if ee, ok := e.(*enrEntry); ok { + return ee.node, nil + } + return nil, nil +} + +func (ct *clientTree) String() string { + return ct.loc.String() +} + +// removeHash removes the element at index from h. +func removeHash(h []string, index int) []string { + if len(h) == 1 { + return nil + } + last := len(h) - 1 + if index < last { + h[index] = h[last] + h[last] = "" + } + return h[:last] +} + +// updateRoot ensures that the given tree has an up-to-date root. +func (ct *clientTree) updateRoot(ctx context.Context) error { + if !ct.slowdownRootUpdate(ctx) { + return ctx.Err() + } + + ct.lastRootCheck = ct.c.clock.Now() + ctx, cancel := context.WithTimeout(ctx, ct.c.cfg.Timeout) + defer cancel() + root, err := ct.c.resolveRoot(ctx, ct.loc) + if err != nil { + ct.rootFailCount++ + return err + } + ct.root = &root + ct.rootFailCount = 0 + ct.leafFailCount = 0 + + // Invalidate subtrees if changed. + if ct.links == nil || root.lroot != ct.links.root { + ct.links = newSubtreeSync(ct.c, ct.loc, root.lroot, true) + ct.curLinks = make(map[string]struct{}) + } + if ct.enrs == nil || root.eroot != ct.enrs.root { + ct.enrs = newSubtreeSync(ct.c, ct.loc, root.eroot, false) + } + return nil +} + +// rootUpdateDue returns true when a root update is needed. +func (ct *clientTree) rootUpdateDue() bool { + tooManyFailures := ct.leafFailCount > rootRecheckFailCount + scheduledCheck := ct.c.clock.Now() >= ct.nextScheduledRootCheck() + return ct.root == nil || tooManyFailures || scheduledCheck +} + +func (ct *clientTree) nextScheduledRootCheck() mclock.AbsTime { + return ct.lastRootCheck.Add(ct.c.cfg.RecheckInterval) +} + +// slowdownRootUpdate applies a delay to root resolution if is tried +// too frequently. This avoids busy polling when the client is offline. +// Returns true if the timeout passed, false if sync was canceled. +func (ct *clientTree) slowdownRootUpdate(ctx context.Context) bool { + var delay time.Duration + switch { + case ct.rootFailCount > 20: + delay = 10 * time.Second + case ct.rootFailCount > 5: + delay = 5 * time.Second + default: + return true + } + timeout := ct.c.clock.NewTimer(delay) + defer timeout.Stop() + select { + case <-timeout.C(): + return true + case <-ctx.Done(): + return false + } +} + +// subtreeSync is the sync of an ENR or link subtree. +type subtreeSync struct { + c *Client + loc *linkEntry + root string + missing []string // missing tree node hashes + link bool // true if this sync is for the link tree + leaves int // counter of synced leaves +} + +func newSubtreeSync(c *Client, loc *linkEntry, root string, link bool) *subtreeSync { + return &subtreeSync{c, loc, root, []string{root}, link, 0} +} + +func (ts *subtreeSync) done() bool { + return len(ts.missing) == 0 +} + +func (ts *subtreeSync) resolveAll(dest map[string]entry) error { + for !ts.done() { + hash := ts.missing[0] + ctx, cancel := context.WithTimeout(context.Background(), ts.c.cfg.Timeout) + e, err := ts.resolveNext(ctx, hash) + cancel() + if err != nil { + return err + } + dest[hash] = e + ts.missing = ts.missing[1:] + } + return nil +} + +func (ts *subtreeSync) resolveNext(ctx context.Context, hash string) (entry, error) { + e, err := ts.c.resolveEntry(ctx, ts.loc.domain, hash) + if err != nil { + return nil, err + } + switch e := e.(type) { + case *enrEntry: + if ts.link { + return nil, errENRInLinkTree + } + ts.leaves++ + case *linkEntry: + if !ts.link { + return nil, errLinkInENRTree + } + ts.leaves++ + case *branchEntry: + ts.missing = append(ts.missing, e.children...) + } + return e, nil +} + +// linkCache tracks links between trees. +type linkCache struct { + backrefs map[string]map[string]struct{} + changed bool +} + +func (lc *linkCache) isReferenced(r string) bool { + return len(lc.backrefs[r]) != 0 +} + +func (lc *linkCache) addLink(from, to string) { + if _, ok := lc.backrefs[to][from]; ok { + return + } + + if lc.backrefs == nil { + lc.backrefs = make(map[string]map[string]struct{}) + } + if _, ok := lc.backrefs[to]; !ok { + lc.backrefs[to] = make(map[string]struct{}) + } + lc.backrefs[to][from] = struct{}{} + lc.changed = true +} + +// resetLinks clears all links of the given tree. +func (lc *linkCache) resetLinks(from string, keep map[string]struct{}) { + stk := []string{from} + for len(stk) > 0 { + item := stk[len(stk)-1] + stk = stk[:len(stk)-1] + + for r, refs := range lc.backrefs { + if _, ok := keep[r]; ok { + continue + } + if _, ok := refs[item]; !ok { + continue + } + lc.changed = true + delete(refs, item) + if len(refs) == 0 { + delete(lc.backrefs, r) + stk = append(stk, r) + } + } + } +} diff --git a/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/tree.go b/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/tree.go new file mode 100644 index 000000000..410ec3b85 --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/p2p/dnsdisc/tree.go @@ -0,0 +1,423 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package dnsdisc + +import ( + "bytes" + "crypto/ecdsa" + "encoding/base32" + "encoding/base64" + "fmt" + "io" + "sort" + "strings" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/ethereum/go-ethereum/rlp" + "golang.org/x/crypto/sha3" +) + +// Tree is a merkle tree of node records. +type Tree struct { + root *rootEntry + entries map[string]entry +} + +// Sign signs the tree with the given private key and sets the sequence number. +func (t *Tree) Sign(key *ecdsa.PrivateKey, domain string) (url string, err error) { + root := *t.root + sig, err := crypto.Sign(root.sigHash(), key) + if err != nil { + return "", err + } + root.sig = sig + t.root = &root + link := newLinkEntry(domain, &key.PublicKey) + return link.String(), nil +} + +// SetSignature verifies the given signature and assigns it as the tree's current +// signature if valid. +func (t *Tree) SetSignature(pubkey *ecdsa.PublicKey, signature string) error { + sig, err := b64format.DecodeString(signature) + if err != nil || len(sig) != crypto.SignatureLength { + return errInvalidSig + } + root := *t.root + root.sig = sig + if !root.verifySignature(pubkey) { + return errInvalidSig + } + t.root = &root + return nil +} + +// Seq returns the sequence number of the tree. +func (t *Tree) Seq() uint { + return t.root.seq +} + +// Signature returns the signature of the tree. +func (t *Tree) Signature() string { + return b64format.EncodeToString(t.root.sig) +} + +// ToTXT returns all DNS TXT records required for the tree. +func (t *Tree) ToTXT(domain string) map[string]string { + records := map[string]string{domain: t.root.String()} + for _, e := range t.entries { + sd := subdomain(e) + if domain != "" { + sd = sd + "." + domain + } + records[sd] = e.String() + } + return records +} + +// Links returns all links contained in the tree. +func (t *Tree) Links() []string { + var links []string + for _, e := range t.entries { + if le, ok := e.(*linkEntry); ok { + links = append(links, le.String()) + } + } + return links +} + +// Nodes returns all nodes contained in the tree. +func (t *Tree) Nodes() []*enode.Node { + var nodes []*enode.Node + for _, e := range t.entries { + if ee, ok := e.(*enrEntry); ok { + nodes = append(nodes, ee.node) + } + } + return nodes +} + +/* +We want to keep the UDP size below 512 bytes. The UDP size is roughly: +UDP length = 8 + UDP payload length ( 229 ) +UPD Payload length: + - dns.id 2 + - dns.flags 2 + - dns.count.queries 2 + - dns.count.answers 2 + - dns.count.auth_rr 2 + - dns.count.add_rr 2 + - queries (query-size + 6) + - answers : + - dns.resp.name 2 + - dns.resp.type 2 + - dns.resp.class 2 + - dns.resp.ttl 4 + - dns.resp.len 2 + - dns.txt.length 1 + - dns.txt resp_data_size + +So the total size is roughly a fixed overhead of `39`, and the size of the +query (domain name) and response. +The query size is, for example, FVY6INQ6LZ33WLCHO3BPR3FH6Y.snap.mainnet.ethdisco.net (52) + +We also have some static data in the response, such as `enrtree-branch:`, and potentially +splitting the response up with `" "`, leaving us with a size of roughly `400` that we need +to stay below. + +The number `370` is used to have some margin for extra overhead (for example, the dns query +may be larger - more subdomains). +*/ +const ( + hashAbbrevSize = 1 + 16*13/8 // Size of an encoded hash (plus comma) + maxChildren = 370 / hashAbbrevSize // 13 children + minHashLength = 12 +) + +// MakeTree creates a tree containing the given nodes and links. +func MakeTree(seq uint, nodes []*enode.Node, links []string) (*Tree, error) { + // Sort records by ID and ensure all nodes have a valid record. + records := make([]*enode.Node, len(nodes)) + + copy(records, nodes) + sortByID(records) + for _, n := range records { + if len(n.Record().Signature()) == 0 { + return nil, fmt.Errorf("can't add node %v: unsigned node record", n.ID()) + } + } + + // Create the leaf list. + enrEntries := make([]entry, len(records)) + for i, r := range records { + enrEntries[i] = &enrEntry{r} + } + linkEntries := make([]entry, len(links)) + for i, l := range links { + le, err := parseLink(l) + if err != nil { + return nil, err + } + linkEntries[i] = le + } + + // Create intermediate nodes. + t := &Tree{entries: make(map[string]entry)} + eroot := t.build(enrEntries) + t.entries[subdomain(eroot)] = eroot + lroot := t.build(linkEntries) + t.entries[subdomain(lroot)] = lroot + t.root = &rootEntry{seq: seq, eroot: subdomain(eroot), lroot: subdomain(lroot)} + return t, nil +} + +func (t *Tree) build(entries []entry) entry { + if len(entries) == 1 { + return entries[0] + } + if len(entries) <= maxChildren { + hashes := make([]string, len(entries)) + for i, e := range entries { + hashes[i] = subdomain(e) + t.entries[hashes[i]] = e + } + return &branchEntry{hashes} + } + var subtrees []entry + for len(entries) > 0 { + n := maxChildren + if len(entries) < n { + n = len(entries) + } + sub := t.build(entries[:n]) + entries = entries[n:] + subtrees = append(subtrees, sub) + t.entries[subdomain(sub)] = sub + } + return t.build(subtrees) +} + +func sortByID(nodes []*enode.Node) []*enode.Node { + sort.Slice(nodes, func(i, j int) bool { + return bytes.Compare(nodes[i].ID().Bytes(), nodes[j].ID().Bytes()) < 0 + }) + return nodes +} + +// Entry Types + +type entry interface { + fmt.Stringer +} + +type ( + rootEntry struct { + eroot string + lroot string + seq uint + sig []byte + } + branchEntry struct { + children []string + } + enrEntry struct { + node *enode.Node + } + linkEntry struct { + str string + domain string + pubkey *ecdsa.PublicKey + } +) + +// Entry Encoding + +var ( + b32format = base32.StdEncoding.WithPadding(base32.NoPadding) + b64format = base64.RawURLEncoding +) + +const ( + rootPrefix = "enrtree-root:v1" + linkPrefix = "enrtree://" + branchPrefix = "enrtree-branch:" + enrPrefix = "enr:" +) + +func subdomain(e entry) string { + h := sha3.NewLegacyKeccak256() + io.WriteString(h, e.String()) + return b32format.EncodeToString(h.Sum(nil)[:16]) +} + +func (e *rootEntry) String() string { + return fmt.Sprintf(rootPrefix+" e=%s l=%s seq=%d sig=%s", e.eroot, e.lroot, e.seq, b64format.EncodeToString(e.sig)) +} + +func (e *rootEntry) sigHash() []byte { + h := sha3.NewLegacyKeccak256() + fmt.Fprintf(h, rootPrefix+" e=%s l=%s seq=%d", e.eroot, e.lroot, e.seq) + return h.Sum(nil) +} + +func (e *rootEntry) verifySignature(pubkey *ecdsa.PublicKey) bool { + sig := e.sig[:crypto.RecoveryIDOffset] // remove recovery id + enckey := crypto.FromECDSAPub(pubkey) + return crypto.VerifySignature(enckey, e.sigHash(), sig) +} + +func (e *branchEntry) String() string { + return branchPrefix + strings.Join(e.children, ",") +} + +func (e *enrEntry) String() string { + return e.node.String() +} + +func (e *linkEntry) String() string { + return linkPrefix + e.str +} + +func newLinkEntry(domain string, pubkey *ecdsa.PublicKey) *linkEntry { + key := b32format.EncodeToString(crypto.CompressPubkey(pubkey)) + str := key + "@" + domain + return &linkEntry{str, domain, pubkey} +} + +// Entry Parsing + +func parseEntry(e string, validSchemes enr.IdentityScheme) (entry, error) { + switch { + case strings.HasPrefix(e, linkPrefix): + return parseLinkEntry(e) + case strings.HasPrefix(e, branchPrefix): + return parseBranch(e) + case strings.HasPrefix(e, enrPrefix): + return parseENR(e, validSchemes) + default: + return nil, errUnknownEntry + } +} + +func parseRoot(e string) (rootEntry, error) { + var eroot, lroot, sig string + var seq uint + if _, err := fmt.Sscanf(e, rootPrefix+" e=%s l=%s seq=%d sig=%s", &eroot, &lroot, &seq, &sig); err != nil { + return rootEntry{}, entryError{"root", errSyntax} + } + if !isValidHash(eroot) || !isValidHash(lroot) { + return rootEntry{}, entryError{"root", errInvalidChild} + } + sigb, err := b64format.DecodeString(sig) + if err != nil || len(sigb) != crypto.SignatureLength { + return rootEntry{}, entryError{"root", errInvalidSig} + } + return rootEntry{eroot, lroot, seq, sigb}, nil +} + +func parseLinkEntry(e string) (entry, error) { + le, err := parseLink(e) + if err != nil { + return nil, err + } + return le, nil +} + +func parseLink(e string) (*linkEntry, error) { + if !strings.HasPrefix(e, linkPrefix) { + return nil, fmt.Errorf("wrong/missing scheme 'enrtree' in URL") + } + e = e[len(linkPrefix):] + pos := strings.IndexByte(e, '@') + if pos == -1 { + return nil, entryError{"link", errNoPubkey} + } + keystring, domain := e[:pos], e[pos+1:] + keybytes, err := b32format.DecodeString(keystring) + if err != nil { + return nil, entryError{"link", errBadPubkey} + } + key, err := crypto.DecompressPubkey(keybytes) + if err != nil { + return nil, entryError{"link", errBadPubkey} + } + return &linkEntry{e, domain, key}, nil +} + +func parseBranch(e string) (entry, error) { + e = e[len(branchPrefix):] + if e == "" { + return &branchEntry{}, nil // empty entry is OK + } + hashes := make([]string, 0, strings.Count(e, ",")) + for _, c := range strings.Split(e, ",") { + if !isValidHash(c) { + return nil, entryError{"branch", errInvalidChild} + } + hashes = append(hashes, c) + } + return &branchEntry{hashes}, nil +} + +func parseENR(e string, validSchemes enr.IdentityScheme) (entry, error) { + e = e[len(enrPrefix):] + enc, err := b64format.DecodeString(e) + if err != nil { + return nil, entryError{"enr", errInvalidENR} + } + var rec enr.Record + if err := rlp.DecodeBytes(enc, &rec); err != nil { + return nil, entryError{"enr", err} + } + n, err := enode.New(validSchemes, &rec) + if err != nil { + return nil, entryError{"enr", err} + } + return &enrEntry{n}, nil +} + +func isValidHash(s string) bool { + dlen := b32format.DecodedLen(len(s)) + if dlen < minHashLength || dlen > 32 || strings.ContainsAny(s, "\n\r") { + return false + } + buf := make([]byte, 32) + _, err := b32format.Decode(buf, []byte(s)) + return err == nil +} + +// truncateHash truncates the given base32 hash string to the minimum acceptable length. +func truncateHash(hash string) string { + maxLen := b32format.EncodedLen(minHashLength) + if len(hash) < maxLen { + panic(fmt.Errorf("dnsdisc: hash %q is too short", hash)) + } + return hash[:maxLen] +} + +// URL encoding + +// ParseURL parses an enrtree:// URL and returns its components. +func ParseURL(url string) (domain string, pubkey *ecdsa.PublicKey, err error) { + le, err := parseLink(url) + if err != nil { + return "", nil, err + } + return le.domain, le.pubkey, nil +} diff --git a/vendor/github.com/status-im/go-waku/waku/persistence/store.go b/vendor/github.com/status-im/go-waku/waku/persistence/store.go index 8022d9d2c..e1a4fb2e9 100644 --- a/vendor/github.com/status-im/go-waku/waku/persistence/store.go +++ b/vendor/github.com/status-im/go-waku/waku/persistence/store.go @@ -3,8 +3,10 @@ package persistence import ( "database/sql" "log" + "time" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/utils" ) type MessageProvider interface { @@ -17,6 +19,9 @@ type MessageProvider interface { type DBStore struct { MessageProvider db *sql.DB + + maxMessages int + maxDuration time.Duration } type StoredMessage struct { @@ -49,17 +54,33 @@ func WithDriver(driverName string, datasourceName string) DBOption { } } +func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption { + return func(d *DBStore) error { + d.maxDuration = maxDuration + d.maxMessages = maxMessages + return nil + } +} + // Creates a new DB store using the db specified via options. -// It will create a messages table if it does not exist -func NewDBStore(opt DBOption) (*DBStore, error) { +// It will create a messages table if it does not exist and +// clean up records according to the retention policy used +func NewDBStore(options ...DBOption) (*DBStore, error) { result := new(DBStore) - err := opt(result) + for _, opt := range options { + err := opt(result) + if err != nil { + return nil, err + } + } + + err := result.createTable() if err != nil { return nil, err } - err = result.createTable() + err = result.cleanOlderRecords() if err != nil { return nil, err } @@ -84,6 +105,28 @@ func (d *DBStore) createTable() error { return nil } +func (d *DBStore) cleanOlderRecords() error { + // Delete older messages + if d.maxDuration > 0 { + sqlStmt := `DELETE FROM message WHERE receiverTimestamp < ?` + _, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(time.Now().Add(-d.maxDuration))) + if err != nil { + return err + } + } + + // Limit number of records to a max N + if d.maxMessages > 0 { + sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET 5)` + _, err := d.db.Exec(sqlStmt, d.maxMessages) + if err != nil { + return err + } + } + + return nil +} + // Closes a DB connection func (d *DBStore) Stop() { d.db.Close() diff --git a/vendor/github.com/status-im/go-waku/waku/v2/discovery/enr.go b/vendor/github.com/status-im/go-waku/waku/v2/discovery/enr.go new file mode 100644 index 000000000..3fccba245 --- /dev/null +++ b/vendor/github.com/status-im/go-waku/waku/v2/discovery/enr.go @@ -0,0 +1,65 @@ +package discovery + +import ( + "context" + "fmt" + + "github.com/ethereum/go-ethereum/p2p/dnsdisc" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/libp2p/go-libp2p-core/peer" + + ma "github.com/multiformats/go-multiaddr" +) + +type DnsDiscoveryParameters struct { + nameserver string +} + +type DnsDiscoveryOption func(*DnsDiscoveryParameters) + +// WithMultiaddress is a WakuNodeOption that configures libp2p to listen on a list of multiaddresses +func WithNameserver(nameserver string) DnsDiscoveryOption { + return func(params *DnsDiscoveryParameters) { + params.nameserver = nameserver + } +} + +// RetrieveNodes returns a list of multiaddress given a url to a DNS discoverable +// ENR tree +func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) ([]ma.Multiaddr, error) { + var multiAddrs []ma.Multiaddr + + params := new(DnsDiscoveryParameters) + for _, opt := range opts { + opt(params) + } + + client := dnsdisc.NewClient(dnsdisc.Config{ + Resolver: GetResolver(ctx, params.nameserver), + }) + + tree, err := client.SyncTree(url) + if err != nil { + return nil, err + } + + for _, node := range tree.Nodes() { + m, err := EnodeToMultiAddr(node) + if err != nil { + return nil, err + } + + multiAddrs = append(multiAddrs, m) + } + + return multiAddrs, nil +} + +func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) { + peerID, err := peer.IDFromPublicKey(&ECDSAPublicKey{node.Pubkey()}) + if err != nil { + return nil, err + } + + return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", node.IP(), node.TCP(), peerID)) +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/discovery/public_key.go b/vendor/github.com/status-im/go-waku/waku/v2/discovery/public_key.go new file mode 100644 index 000000000..d80770c8b --- /dev/null +++ b/vendor/github.com/status-im/go-waku/waku/v2/discovery/public_key.go @@ -0,0 +1,79 @@ +package discovery + +import ( + "crypto/ecdsa" + "crypto/subtle" + "encoding/asn1" + "errors" + "math/big" + + ethcrypto "github.com/ethereum/go-ethereum/crypto" + "github.com/libp2p/go-libp2p-core/crypto" + pb "github.com/libp2p/go-libp2p-core/crypto/pb" + "github.com/minio/sha256-simd" +) + +// Taken from: https://github.com/libp2p/go-libp2p-core/blob/094b0d3f8ba2934339cb35e1a875b11ab6d08839/crypto/ecdsa.go as +// they don't provide a way to set the key +var ErrNilSig = errors.New("sig is nil") + +// ECDSASig holds the r and s values of an ECDSA signature +type ECDSASig struct { + R, S *big.Int +} + +// ECDSAPublicKey is an implementation of an ECDSA public key +type ECDSAPublicKey struct { + pub *ecdsa.PublicKey +} + +// Type returns the key type +func (ePub *ECDSAPublicKey) Type() pb.KeyType { + return pb.KeyType_Secp256k1 +} + +// Raw returns x509 bytes from a public key +func (ePub *ECDSAPublicKey) Raw() ([]byte, error) { + return ethcrypto.CompressPubkey(ePub.pub), nil +} + +// Bytes returns the public key as protobuf bytes +func (ePub *ECDSAPublicKey) Bytes() ([]byte, error) { + return crypto.MarshalPublicKey(ePub) +} + +// Equals compares to public keys +func (ePub *ECDSAPublicKey) Equals(o crypto.Key) bool { + return basicEquals(ePub, o) +} + +// Verify compares data to a signature +func (ePub *ECDSAPublicKey) Verify(data, sigBytes []byte) (bool, error) { + sig := new(ECDSASig) + if _, err := asn1.Unmarshal(sigBytes, sig); err != nil { + return false, err + } + if sig == nil { + return false, ErrNilSig + } + + hash := sha256.Sum256(data) + + return ecdsa.Verify(ePub.pub, hash[:], sig.R, sig.S), nil +} + +func basicEquals(k1, k2 crypto.Key) bool { + if k1.Type() != k2.Type() { + return false + } + + a, err := k1.Raw() + if err != nil { + return false + } + b, err := k2.Raw() + if err != nil { + return false + } + return subtle.ConstantTimeCompare(a, b) == 1 +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/discovery/resolver.go b/vendor/github.com/status-im/go-waku/waku/v2/discovery/resolver.go new file mode 100644 index 000000000..9708f42e0 --- /dev/null +++ b/vendor/github.com/status-im/go-waku/waku/v2/discovery/resolver.go @@ -0,0 +1,22 @@ +package discovery + +import ( + "context" + "net" +) + +// GetResolver returns a *net.Resolver object using a custom nameserver, or +// the default system resolver if no nameserver is specified +func GetResolver(ctx context.Context, nameserver string) *net.Resolver { + if nameserver == "" { + return net.DefaultResolver + } + + return &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + d := net.Dialer{} + return d.DialContext(ctx, network, net.JoinHostPort(nameserver, "53")) + }, + } +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/waku_payload.go b/vendor/github.com/status-im/go-waku/waku/v2/node/waku_payload.go index 23a04fab4..ad6d4d0ed 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/waku_payload.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/waku_payload.go @@ -48,7 +48,7 @@ type KeyInfo struct { } -// Encodes a payload depending on the version parameter. +// Encode encodes a payload depending on the version parameter. // 0 for raw unencrypted data, and 1 for using WakuV1 encoding. func (payload Payload) Encode(version uint32) ([]byte, error) { switch version { @@ -105,7 +105,7 @@ func EncodeWakuMessage(message *pb.WakuMessage, keyInfo *KeyInfo) error { return nil } -// Decodes a WakuMessage depending on the version parameter. +// DecodePayload decodes a WakuMessage depending on the version parameter. // 0 for raw unencrypted data, and 1 for using WakuV1 decoding func DecodePayload(message *pb.WakuMessage, keyInfo *KeyInfo) (*DecodedPayload, error) { switch message.Version { diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go index c91050992..e09f85007 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go @@ -2,7 +2,6 @@ package node import ( "context" - "errors" "fmt" "time" @@ -22,10 +21,8 @@ import ( rendezvous "github.com/status-im/go-waku-rendezvous" v2 "github.com/status-im/go-waku/waku/v2" "github.com/status-im/go-waku/waku/v2/metrics" - "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/lightpush" - "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/store" "github.com/status-im/go-waku/waku/v2/utils" @@ -48,8 +45,6 @@ type WakuNode struct { bcaster v2.Broadcaster - filters filter.Filters - connectionNotif ConnectionNotifier protocolEventSub event.Subscription identificationEventSub event.Subscription @@ -112,8 +107,8 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { return nil, err } - if params.connStatusChan != nil { - w.connStatusChan = params.connStatusChan + if params.connStatusC != nil { + w.connStatusChan = params.connStatusC } w.connectionNotif = NewConnectionNotifier(ctx, host) @@ -133,17 +128,13 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { } func (w *WakuNode) Start() error { - w.store = store.NewWakuStore(w.opts.messageProvider) + w.store = store.NewWakuStore(w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration) if w.opts.enableStore { w.startStore() } if w.opts.enableFilter { - w.filters = make(filter.Filters) - err := w.mountFilter() - if err != nil { - return err - } + w.filter = filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode) } if w.opts.enableRendezvous { @@ -201,10 +192,6 @@ func (w *WakuNode) Stop() { if w.filter != nil { w.filter.Stop() - for _, filter := range w.filters { - close(filter.Chan) - } - w.filters = nil } w.relay.Stop() @@ -266,18 +253,6 @@ func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { return err } -func (w *WakuNode) mountFilter() error { - filterHandler := func(requestId string, msg pb.MessagePush) { - for _, message := range msg.Messages { - w.filters.Notify(message, requestId) // Trigger filter handlers on a light node - } - } - - w.filter = filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, filterHandler) - - return nil -} - func (w *WakuNode) mountRendezvous() error { w.rendezvous = rendezvous.NewRendezvousService(w.host, w.opts.rendevousStorage) @@ -344,113 +319,6 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer. return &info.ID, w.addPeer(info, protocolID) } -// Wrapper around WakuFilter.Subscribe -// that adds a Filter object to node.filters -func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilter) (filterID string, ch chan *protocol.Envelope, err error) { - if node.filter == nil { - err = errors.New("WakuFilter is not set") - return - } - - // TODO: should be possible to pass the peerID as option or autoselect peer. - // TODO: check if there's an existing pubsub topic that uses the same peer. If so, reuse filter, and return same channel and filterID - - // Registers for messages that match a specific filter. Triggers the handler whenever a message is received. - // ContentFilterChan takes MessagePush structs - subs, err := node.filter.Subscribe(ctx, f) - if err != nil || subs.RequestID == "" { - // Failed to subscribe - log.Error("remote subscription to filter failed", err) - return - } - - ch = make(chan *protocol.Envelope, 1024) // To avoid blocking - - // Register handler for filter, whether remote subscription succeeded or not - node.filters[subs.RequestID] = filter.Filter{ - PeerID: subs.Peer, - Topic: f.Topic, - ContentFilters: f.ContentTopics, - Chan: ch, - } - - return subs.RequestID, ch, nil -} - -// UnsubscribeFilterByID removes a subscription to a filter node completely -// using the filterID returned when the subscription was created -func (node *WakuNode) UnsubscribeFilterByID(ctx context.Context, filterID string) error { - - var f filter.Filter - var ok bool - if f, ok = node.filters[filterID]; !ok { - return errors.New("filter not found") - } - - cf := filter.ContentFilter{ - Topic: f.Topic, - ContentTopics: f.ContentFilters, - } - - err := node.filter.Unsubscribe(ctx, cf, f.PeerID) - if err != nil { - return err - } - - close(f.Chan) - delete(node.filters, filterID) - - return nil -} - -// Unsubscribe filter removes content topics from a filter subscription. If all -// the contentTopics are removed the subscription is dropped completely -func (node *WakuNode) UnsubscribeFilter(ctx context.Context, cf filter.ContentFilter) error { - // Remove local filter - var idsToRemove []string - for id, f := range node.filters { - if f.Topic != cf.Topic { - continue - } - - // Send message to full node in order to unsubscribe - err := node.filter.Unsubscribe(ctx, cf, f.PeerID) - if err != nil { - return err - } - - // Iterate filter entries to remove matching content topics - // make sure we delete the content filter - // if no more topics are left - for _, cfToDelete := range cf.ContentTopics { - for i, cf := range f.ContentFilters { - if cf == cfToDelete { - l := len(f.ContentFilters) - 1 - f.ContentFilters[l], f.ContentFilters[i] = f.ContentFilters[i], f.ContentFilters[l] - f.ContentFilters = f.ContentFilters[:l] - break - } - - } - if len(f.ContentFilters) == 0 { - idsToRemove = append(idsToRemove, id) - } - } - } - - for _, rId := range idsToRemove { - for id := range node.filters { - if id == rId { - close(node.filters[id].Chan) - delete(node.filters, id) - break - } - } - } - - return nil -} - func (w *WakuNode) DialPeerWithMultiAddress(ctx context.Context, address ma.Multiaddr) error { info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go index 47326ab69..1afd6e1a4 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go @@ -37,6 +37,8 @@ type WakuNodeParameters struct { shouldResume bool storeMsgs bool messageProvider store.MessageProvider + maxMessages int + maxDuration time.Duration enableRendezvous bool enableRendezvousServer bool @@ -47,15 +49,17 @@ type WakuNodeParameters struct { enableLightPush bool - connStatusChan chan ConnStatus + connStatusC chan ConnStatus } type WakuNodeOption func(*WakuNodeParameters) error +// MultiAddresses return the list of multiaddresses configured in the node func (w WakuNodeParameters) MultiAddresses() []ma.Multiaddr { return w.multiAddr } +// Identity returns a libp2p option containing the identity used by the node func (w WakuNodeParameters) Identity() config.Option { return libp2p.Identity(*w.privKey) } @@ -134,6 +138,8 @@ func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption { } } +// WithRendezvous is a WakuOption used to enable go-waku-rendezvous discovery. +// It accepts an optional list of DiscoveryOpt options func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableRendezvous = true @@ -142,6 +148,8 @@ func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption { } } +// WithRendezvousServer is a WakuOption used to set the node as a rendezvous +// point, using an specific storage for the peer information func WithRendezvousServer(storage rendezvous.Storage) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableRendezvousServer = true @@ -171,6 +179,19 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption { } } +// WithWakuStoreAndRetentionPolicy enables the Waku V2 Store protocol, storing them in an optional message provider +// applying an specific retention policy +func WithWakuStoreAndRetentionPolicy(shouldResume bool, maxDuration time.Duration, maxMessages int) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.enableStore = true + params.storeMsgs = true + params.shouldResume = shouldResume + params.maxDuration = maxDuration + params.maxMessages = maxMessages + return nil + } +} + // WithMessageProvider is a WakuNodeOption that sets the MessageProvider // used to store and retrieve persisted messages func WithMessageProvider(s store.MessageProvider) WakuNodeOption { @@ -188,6 +209,8 @@ func WithLightPush() WakuNodeOption { } } +// WithKeepAlive is a WakuNodeOption used to set the interval of time when +// each peer will be ping to keep the TCP connection alive func WithKeepAlive(t time.Duration) WakuNodeOption { return func(params *WakuNodeParameters) error { params.keepAliveInterval = t @@ -195,9 +218,12 @@ func WithKeepAlive(t time.Duration) WakuNodeOption { } } -func WithConnStatusChan(connStatusChan chan ConnStatus) WakuNodeOption { +// WithConnectionStatusChannel is a WakuNodeOption used to set a channel where the +// connection status changes will be pushed to. It's useful to identify when peer +// connections and disconnections occur +func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption { return func(params *WakuNodeParameters) error { - params.connStatusChan = connStatusChan + params.connStatusC = connStatus return nil } } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/envelope.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/envelope.go index 93298fde3..bb26fd319 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/envelope.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/envelope.go @@ -2,6 +2,9 @@ package protocol import "github.com/status-im/go-waku/waku/v2/protocol/pb" +// Envelope contains information about the pubsub topic of a WakuMessage +// and a hash used to identify a message based on the bytes of a WakuMessage +// protobuffer type Envelope struct { msg *pb.WakuMessage pubsubTopic string diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/filter_map.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/filter_map.go new file mode 100644 index 000000000..34d982f83 --- /dev/null +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/filter_map.go @@ -0,0 +1,101 @@ +package filter + +import ( + "sync" + + "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/pb" +) + +type FilterMap struct { + sync.RWMutex + items map[string]Filter +} + +type FilterMapItem struct { + Key string + Value Filter +} + +func NewFilterMap() *FilterMap { + return &FilterMap{ + items: make(map[string]Filter), + } +} + +func (fm *FilterMap) Set(key string, value Filter) { + fm.Lock() + defer fm.Unlock() + + fm.items[key] = value +} + +func (fm *FilterMap) Get(key string) (Filter, bool) { + fm.Lock() + defer fm.Unlock() + + value, ok := fm.items[key] + + return value, ok +} + +func (fm *FilterMap) Delete(key string) { + fm.Lock() + defer fm.Unlock() + + close(fm.items[key].Chan) + delete(fm.items, key) +} + +func (fm *FilterMap) RemoveAll() { + fm.Lock() + defer fm.Unlock() + + for k, v := range fm.items { + close(v.Chan) + delete(fm.items, k) + } +} + +func (fm *FilterMap) Items() <-chan FilterMapItem { + c := make(chan FilterMapItem) + + f := func() { + fm.RLock() + defer fm.RUnlock() + + for k, v := range fm.items { + c <- FilterMapItem{k, v} + } + close(c) + } + go f() + + return c +} + +func (fm *FilterMap) Notify(msg *pb.WakuMessage, requestId string) { + fm.RLock() + defer fm.RUnlock() + + for key, filter := range fm.items { + envelope := protocol.NewEnvelope(msg, filter.Topic) + + // We do this because the key for the filter is set to the requestId received from the filter protocol. + // This means we do not need to check the content filter explicitly as all MessagePushs already contain + // the requestId of the coresponding filter. + if requestId != "" && requestId == key { + filter.Chan <- envelope + continue + } + + // TODO: In case of no topics we should either trigger here for all messages, + // or we should not allow such filter to exist in the first place. + for _, contentTopic := range filter.ContentFilters { + if msg.ContentTopic == contentTopic { + filter.Chan <- envelope + break + } + } + } +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/filter_subscribers.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/filter_subscribers.go new file mode 100644 index 000000000..7703e7360 --- /dev/null +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/filter_subscribers.go @@ -0,0 +1,94 @@ +package filter + +import ( + "sync" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/status-im/go-waku/waku/v2/protocol/pb" +) + +type Subscriber struct { + peer peer.ID + requestId string + filter pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN? +} + +type Subscribers struct { + sync.RWMutex + subscribers []Subscriber +} + +func NewSubscribers() *Subscribers { + return &Subscribers{} +} + +func (self *Subscribers) Append(s Subscriber) int { + self.Lock() + defer self.Unlock() + + self.subscribers = append(self.subscribers, s) + return len(self.subscribers) +} + +func (self *Subscribers) Items() <-chan Subscriber { + c := make(chan Subscriber) + + f := func() { + self.RLock() + defer self.RUnlock() + for _, value := range self.subscribers { + c <- value + } + close(c) + } + go f() + + return c +} + +func (self *Subscribers) Length() int { + self.RLock() + defer self.RUnlock() + + return len(self.subscribers) +} + +func (self *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) { + var peerIdsToRemove []peer.ID + + for _, subscriber := range self.subscribers { + if subscriber.peer != peerID { + continue + } + + // make sure we delete the content filter + // if no more topics are left + for i, contentFilter := range contentFilters { + subCfs := subscriber.filter.ContentFilters + for _, cf := range subCfs { + if cf.ContentTopic == contentFilter.ContentTopic { + l := len(subCfs) - 1 + subCfs[l], subCfs[i] = subCfs[i], subCfs[l] + subscriber.filter.ContentFilters = subCfs[:l] + } + } + } + + if len(subscriber.filter.ContentFilters) == 0 { + peerIdsToRemove = append(peerIdsToRemove, subscriber.peer) + } + } + + // make sure we delete the subscriber + // if no more content filters left + for _, peerId := range peerIdsToRemove { + for i, s := range self.subscribers { + if s.peer == peerId { + l := len(self.subscribers) - 1 + self.subscribers[l], self.subscribers[i] = self.subscribers[i], self.subscribers[l] + self.subscribers = self.subscribers[:l] + break + } + } + } +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go index d0eff0e6d..303a0aeb8 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go @@ -46,29 +46,19 @@ type ( ContentTopics []string } - // @TODO MAYBE MORE INFO? - Filters map[string]Filter - - Subscriber struct { - peer peer.ID - requestId string - filter pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN? - } - FilterSubscription struct { RequestID string Peer peer.ID } - MessagePushHandler func(requestId string, msg pb.MessagePush) - WakuFilter struct { - ctx context.Context - h host.Host - subscribers []Subscriber - isFullNode bool - pushHandler MessagePushHandler - MsgC chan *protocol.Envelope + ctx context.Context + h host.Host + isFullNode bool + MsgC chan *protocol.Envelope + + filters *FilterMap + subscribers *Subscribers } ) @@ -101,29 +91,6 @@ func DefaultOptions() []FilterSubscribeOption { } } -func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) { - for key, filter := range *filters { - envelope := protocol.NewEnvelope(msg, filter.Topic) - - // We do this because the key for the filter is set to the requestId received from the filter protocol. - // This means we do not need to check the content filter explicitly as all MessagePushs already contain - // the requestId of the coresponding filter. - if requestId != "" && requestId == key { - filter.Chan <- envelope - continue - } - - // TODO: In case of no topics we should either trigger here for all messages, - // or we should not allow such filter to exist in the first place. - for _, contentTopic := range filter.ContentFilters { - if msg.ContentTopic == contentTopic { - filter.Chan <- envelope - break - } - } - } -} - func (wf *WakuFilter) onRequest(s network.Stream) { defer s.Close() @@ -142,7 +109,9 @@ func (wf *WakuFilter) onRequest(s network.Stream) { if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 { // We're on a light node. // This is a message push coming from a full node. - wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push) + for _, message := range filterRPCRequest.Push.Messages { + wf.filters.Notify(message, filterRPCRequest.RequestId) // Trigger filter handlers on a light node + } log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages") stats.Record(wf.ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages)))) @@ -151,52 +120,16 @@ func (wf *WakuFilter) onRequest(s network.Stream) { // This is a filter request coming from a light node. if filterRPCRequest.Request.Subscribe { subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request} - wf.subscribers = append(wf.subscribers, subscriber) - log.Info("filter full node, add a filter subscriber: ", subscriber.peer) + len := wf.subscribers.Append(subscriber) - stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) + log.Info("filter full node, add a filter subscriber: ", subscriber.peer) + stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len))) } else { peerId := s.Conn().RemotePeer() + wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.Request.ContentFilters) + log.Info("filter full node, remove a filter subscriber: ", peerId.Pretty()) - contentFilters := filterRPCRequest.Request.ContentFilters - var peerIdsToRemove []peer.ID - for _, subscriber := range wf.subscribers { - if subscriber.peer != peerId { - continue - } - - // make sure we delete the content filter - // if no more topics are left - for i, contentFilter := range contentFilters { - subCfs := subscriber.filter.ContentFilters - for _, cf := range subCfs { - if cf.ContentTopic == contentFilter.ContentTopic { - l := len(subCfs) - 1 - subCfs[l], subCfs[i] = subCfs[i], subCfs[l] - subscriber.filter.ContentFilters = subCfs[:l] - } - } - } - - if len(subscriber.filter.ContentFilters) == 0 { - peerIdsToRemove = append(peerIdsToRemove, subscriber.peer) - } - } - - // make sure we delete the subscriber - // if no more content filters left - for _, peerId := range peerIdsToRemove { - for i, s := range wf.subscribers { - if s.peer == peerId { - l := len(wf.subscribers) - 1 - wf.subscribers[l], wf.subscribers[i] = wf.subscribers[i], wf.subscribers[l] - wf.subscribers = wf.subscribers[:l] - break - } - } - } - - stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) + stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(wf.subscribers.Length()))) } } else { log.Error("can't serve request") @@ -204,7 +137,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) { } } -func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, handler MessagePushHandler) *WakuFilter { +func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter { ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) if err != nil { log.Error(err) @@ -214,8 +147,9 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, handler wf.ctx = ctx wf.MsgC = make(chan *protocol.Envelope) wf.h = host - wf.pushHandler = handler wf.isFullNode = isFullNode + wf.filters = NewFilterMap() + wf.subscribers = NewSubscribers() wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) go wf.FilterListener() @@ -229,6 +163,29 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, handler return wf } +func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error { + pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}} + + conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1) + // TODO: keep track of errors to automatically unsubscribe a peer? + if err != nil { + // @TODO more sophisticated error handling here + log.Error("failed to open peer stream") + //waku_filter_errors.inc(labelValues = [dialFailure]) + return err + } + + defer conn.Close() + writer := protoio.NewDelimitedWriter(conn) + err = writer.WriteMsg(pushRPC) + if err != nil { + log.Error("failed to push messages to remote peer") + return nil + } + + return nil +} + func (wf *WakuFilter) FilterListener() { // This function is invoked for each message received // on the full node in context of Waku2-Filter @@ -237,7 +194,7 @@ func (wf *WakuFilter) FilterListener() { topic := envelope.PubsubTopic() // Each subscriber is a light node that earlier on invoked // a FilterRequest on this node - for _, subscriber := range wf.subscribers { + for subscriber := range wf.subscribers.Items() { if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic { log.Info("Subscriber's filter pubsubTopic does not match message topic", subscriber.filter.Topic, topic) continue @@ -246,28 +203,12 @@ func (wf *WakuFilter) FilterListener() { for _, filter := range subscriber.filter.ContentFilters { if msg.ContentTopic == filter.ContentTopic { log.Info("found matching contentTopic ", filter, msg) - msgArr := []*pb.WakuMessage{msg} // Do a message push to light node - pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: msgArr}} - log.Info("pushing a message to light node: ", pushRPC) - - conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1) - // TODO: keep track of errors to automatically unsubscribe a peer? - if err != nil { - // @TODO more sophisticated error handling here - log.Error("failed to open peer stream") - //waku_filter_errors.inc(labelValues = [dialFailure]) + log.Info("pushing messages to light node: ", subscriber.peer) + if err := wf.pushMessage(subscriber, msg); err != nil { return err } - defer conn.Close() - writer := protoio.NewDelimitedWriter(conn) - err = writer.WriteMsg(pushRPC) - if err != nil { - log.Error("failed to push messages to remote peer") - return nil - } - } } } @@ -286,7 +227,7 @@ func (wf *WakuFilter) FilterListener() { // Having a FilterRequest struct, // select a peer with filter support, dial it, // and submit FilterRequest wrapped in FilterRPC -func (wf *WakuFilter) Subscribe(ctx context.Context, filter ContentFilter, opts ...FilterSubscribeOption) (subscription *FilterSubscription, err error) { +func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFilter, opts ...FilterSubscribeOption) (subscription *FilterSubscription, err error) { params := new(FilterSubscribeParameters) params.host = wf.h @@ -338,7 +279,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, filter ContentFilter, opts return } -func (wf *WakuFilter) Unsubscribe(ctx context.Context, filter ContentFilter, peer peer.ID) error { +func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilter, peer peer.ID) error { conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1) if err != nil { @@ -351,13 +292,13 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, filter ContentFilter, pee id := protocol.GenerateRequestId() var contentFilters []*pb.FilterRequest_ContentFilter - for _, ct := range filter.ContentTopics { + for _, ct := range contentFilter.ContentTopics { contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) } request := pb.FilterRequest{ Subscribe: false, - Topic: filter.Topic, + Topic: contentFilter.Topic, ContentFilters: contentFilters, } @@ -373,4 +314,103 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, filter ContentFilter, pee func (wf *WakuFilter) Stop() { wf.h.RemoveStreamHandler(FilterID_v20beta1) + wf.filters.RemoveAll() +} + +func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...FilterSubscribeOption) (filterID string, theFilter Filter, err error) { + // TODO: check if there's an existing pubsub topic that uses the same peer. If so, reuse filter, and return same channel and filterID + + // Registers for messages that match a specific filter. Triggers the handler whenever a message is received. + // ContentFilterChan takes MessagePush structs + remoteSubs, err := wf.requestSubscription(ctx, f, opts...) + if err != nil || remoteSubs.RequestID == "" { + // Failed to subscribe + log.Error("remote subscription to filter failed", err) + return + } + + // Register handler for filter, whether remote subscription succeeded or not + + filterID = remoteSubs.RequestID + theFilter = Filter{ + PeerID: remoteSubs.Peer, + Topic: f.Topic, + ContentFilters: f.ContentTopics, + Chan: make(chan *protocol.Envelope, 1024), // To avoid blocking + } + + wf.filters.Set(filterID, theFilter) + + return +} + +// UnsubscribeFilterByID removes a subscription to a filter node completely +// using the filterID returned when the subscription was created +func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string) error { + + var f Filter + var ok bool + + if f, ok = wf.filters.Get(filterID); !ok { + return errors.New("filter not found") + } + + cf := ContentFilter{ + Topic: f.Topic, + ContentTopics: f.ContentFilters, + } + + err := wf.Unsubscribe(ctx, cf, f.PeerID) + if err != nil { + return err + } + + wf.filters.Delete(filterID) + + return nil +} + +// Unsubscribe filter removes content topics from a filter subscription. If all +// the contentTopics are removed the subscription is dropped completely +func (wf *WakuFilter) UnsubscribeFilter(ctx context.Context, cf ContentFilter) error { + // Remove local filter + var idsToRemove []string + for filterMapItem := range wf.filters.Items() { + f := filterMapItem.Value + id := filterMapItem.Key + + if f.Topic != cf.Topic { + continue + } + + // Send message to full node in order to unsubscribe + err := wf.Unsubscribe(ctx, cf, f.PeerID) + if err != nil { + return err + } + + // Iterate filter entries to remove matching content topics + // make sure we delete the content filter + // if no more topics are left + for _, cfToDelete := range cf.ContentTopics { + for i, cf := range f.ContentFilters { + if cf == cfToDelete { + l := len(f.ContentFilters) - 1 + f.ContentFilters[l], f.ContentFilters[i] = f.ContentFilters[i], f.ContentFilters[l] + f.ContentFilters = f.ContentFilters[:l] + break + } + + } + if len(f.ContentFilters) == 0 { + idsToRemove = append(idsToRemove, id) + } + } + } + + for _, rId := range idsToRemove { + wf.filters.Delete(rId) + } + + return nil } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index e9e880773..df90caa60 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -9,14 +9,12 @@ import ( logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-msgio/protoio" "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" - utils "github.com/status-im/go-waku/waku/v2/utils" ) var log = logging.Logger("waku_lightpush") @@ -44,8 +42,8 @@ func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay) } func (wakuLP *WakuLightPush) Start() error { - if wakuLP.relay == nil { - return errors.New("relay is required") + if wakuLP.IsClientOnly() { + return errors.New("relay is required, without it, it is only a client and cannot be started") } wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest) @@ -54,6 +52,10 @@ func (wakuLP *WakuLightPush) Start() error { return nil } +func (wakuLp *WakuLightPush) IsClientOnly() bool { + return wakuLp.relay == nil +} + func (wakuLP *WakuLightPush) onRequest(s network.Stream) { defer s.Close() @@ -73,11 +75,11 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { if requestPushRPC.Query != nil { log.Info("lightpush push request") - pubSubTopic := relay.Topic(requestPushRPC.Query.PubsubTopic) - message := requestPushRPC.Query.Message - response := new(pb.PushResponse) - if wakuLP.relay != nil { + if !wakuLP.IsClientOnly() { + pubSubTopic := relay.Topic(requestPushRPC.Query.PubsubTopic) + message := requestPushRPC.Query.Message + // TODO: Assumes success, should probably be extended to check for network, peers, etc // It might make sense to use WithReadiness option here? @@ -118,56 +120,10 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { } } -type LightPushParameters struct { - selectedPeer peer.ID - requestId []byte - - lp *WakuLightPush -} - -type LightPushOption func(*LightPushParameters) - -func WithPeer(p peer.ID) LightPushOption { - return func(params *LightPushParameters) { - params.selectedPeer = p - } -} - -func WithAutomaticPeerSelection() LightPushOption { - return func(params *LightPushParameters) { - p, err := utils.SelectPeer(params.lp.h, string(LightPushID_v20beta1)) - if err == nil { - params.selectedPeer = *p - } else { - log.Info("Error selecting peer: ", err) - } - } -} - -func WithRequestId(requestId []byte) LightPushOption { - return func(params *LightPushParameters) { - params.requestId = requestId - } -} - -func WithAutomaticRequestId() LightPushOption { - return func(params *LightPushParameters) { - params.requestId = protocol.GenerateRequestId() - } -} - -func DefaultOptions() []LightPushOption { - return []LightPushOption{ - WithAutomaticRequestId(), - WithAutomaticPeerSelection(), - } -} - func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) { params := new(LightPushParameters) - params.lp = wakuLP - optList := DefaultOptions() + optList := DefaultOptions(wakuLP.h) optList = append(optList, opts...) for _, opt := range optList { opt(params) @@ -220,11 +176,11 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o return pushResponseRPC.Response, nil } -func (w *WakuLightPush) Stop() { - w.h.RemoveStreamHandler(LightPushID_v20beta1) +func (wakuLP *WakuLightPush) Stop() { + wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) } -func (w *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) { +func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) { if message == nil { return nil, errors.New("message can't be null") } @@ -233,7 +189,7 @@ func (w *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, to req.Message = message req.PubsubTopic = string(relay.GetTopic(topic)) - response, err := w.request(ctx, req, opts...) + response, err := wakuLP.request(ctx, req, opts...) if err != nil { return nil, err } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go new file mode 100644 index 000000000..13758d567 --- /dev/null +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -0,0 +1,51 @@ +package lightpush + +import ( + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/utils" +) + +type LightPushParameters struct { + selectedPeer peer.ID + requestId []byte +} + +type LightPushOption func(*LightPushParameters) + +func WithPeer(p peer.ID) LightPushOption { + return func(params *LightPushParameters) { + params.selectedPeer = p + } +} + +func WithAutomaticPeerSelection(host host.Host) LightPushOption { + return func(params *LightPushParameters) { + p, err := utils.SelectPeer(host, string(LightPushID_v20beta1)) + if err == nil { + params.selectedPeer = *p + } else { + log.Info("Error selecting peer: ", err) + } + } +} + +func WithRequestId(requestId []byte) LightPushOption { + return func(params *LightPushParameters) { + params.requestId = requestId + } +} + +func WithAutomaticRequestId() LightPushOption { + return func(params *LightPushParameters) { + params.requestId = protocol.GenerateRequestId() + } +} + +func DefaultOptions(host host.Host) []LightPushOption { + return []LightPushOption{ + WithAutomaticRequestId(), + WithAutomaticPeerSelection(host), + } +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go index 90b86eb26..8189d4ead 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go @@ -33,13 +33,15 @@ type WakuRelay struct { host host.Host pubsub *pubsub.PubSub - topics map[Topic]bool + bcaster v2.Broadcaster + + // TODO: convert to concurrent maps + topics map[Topic]struct{} topicsMutex sync.Mutex wakuRelayTopics map[Topic]*pubsub.Topic relaySubs map[Topic]*pubsub.Subscription - bcaster v2.Broadcaster - + // TODO: convert to concurrent maps subscriptions map[Topic][]*Subscription subscriptionsMutex sync.Mutex } @@ -53,7 +55,7 @@ func msgIdFn(pmsg *pubsub_pb.Message) string { func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) { w := new(WakuRelay) w.host = h - w.topics = make(map[Topic]bool) + w.topics = make(map[Topic]struct{}) w.wakuRelayTopics = make(map[Topic]*pubsub.Topic) w.relaySubs = make(map[Topic]*pubsub.Subscription) w.subscriptions = make(map[Topic][]*Subscription) @@ -112,7 +114,7 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) { defer w.topicsMutex.Unlock() w.topicsMutex.Lock() - w.topics[topic] = true + w.topics[topic] = struct{}{} pubSubTopic, ok := w.wakuRelayTopics[topic] if !ok { // Joins topic if node hasn't joined yet newTopic, err := w.pubsub.Join(string(topic)) diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go new file mode 100644 index 000000000..b47efdddc --- /dev/null +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go @@ -0,0 +1,109 @@ +package store + +import ( + "sync" + "time" + + "github.com/status-im/go-waku/waku/v2/utils" +) + +type MessageQueue struct { + sync.RWMutex + + seen map[[32]byte]struct{} + messages []IndexedWakuMessage + maxMessages int + maxDuration time.Duration + + quit chan struct{} +} + +func (self *MessageQueue) Push(msg IndexedWakuMessage) { + self.Lock() + defer self.Unlock() + + var k [32]byte + copy(k[:], msg.index.Digest) + if _, ok := self.seen[k]; ok { + return + } + + self.seen[k] = struct{}{} + self.messages = append(self.messages, msg) + + if self.maxMessages != 0 && len(self.messages) > self.maxMessages { + numToPop := len(self.messages) - self.maxMessages + self.messages = self.messages[numToPop:len(self.messages)] + } +} + +func (self *MessageQueue) Messages() <-chan IndexedWakuMessage { + c := make(chan IndexedWakuMessage) + + f := func() { + self.RLock() + defer self.RUnlock() + for _, value := range self.messages { + c <- value + } + close(c) + } + go f() + + return c +} + +func (self *MessageQueue) cleanOlderRecords() { + self.Lock() + defer self.Unlock() + + // TODO: check if retention days was set + + t := utils.GetUnixEpochFrom(time.Now().Add(-self.maxDuration)) + + var idx int + for i := 0; i < len(self.messages); i++ { + if self.messages[i].index.ReceiverTime >= t { + idx = i + break + } + } + + self.messages = self.messages[idx:] +} + +func (self *MessageQueue) checkForOlderRecords(d time.Duration) { + ticker := time.NewTicker(d) + + select { + case <-self.quit: + return + case <-ticker.C: + self.cleanOlderRecords() + } +} + +func (self *MessageQueue) Length() int { + self.RLock() + defer self.RUnlock() + return len(self.messages) +} + +func NewMessageQueue(maxMessages int, maxDuration time.Duration) *MessageQueue { + result := &MessageQueue{ + maxMessages: maxMessages, + maxDuration: maxDuration, + seen: make(map[[32]byte]struct{}), + quit: make(chan struct{}), + } + + if maxDuration != 0 { + go result.checkForOlderRecords(10 * time.Second) // is 10s okay? + } + + return result +} + +func (self *MessageQueue) Stop() { + close(self.quit) +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go index d138c0fc0..e1f64ca3a 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go @@ -3,13 +3,12 @@ package store import ( "bytes" "context" - "crypto/sha256" "encoding/hex" "errors" "fmt" "math" "sort" - "sync" + "time" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" @@ -27,8 +26,11 @@ import ( var log = logging.Logger("wakustore") +// StoreID_v20beta3 is the current Waku Store protocol identifier const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta3") -const MaxPageSize = 100 // Maximum number of waku messages in each page + +// MaxPageSize is the maximum number of waku messages to return per page +const MaxPageSize = 100 var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") @@ -138,11 +140,11 @@ func paginateWithoutIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resM return } -func (w *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse { +func (store *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse { result := new(pb.HistoryResponse) // data holds IndexedWakuMessage whose topics match the query var data []IndexedWakuMessage - for _, indexedMsg := range w.messages { + for indexedMsg := range store.messageQueue.Messages() { // temporal filtering // check whether the history query contains a time filter if query.StartTime != 0 && query.EndTime != 0 { @@ -195,6 +197,7 @@ type Query struct { EndTime float64 } +// Result represents a valid response from a store node type Result struct { Messages []*pb.WakuMessage @@ -222,31 +225,30 @@ type IndexedWakuMessage struct { } type WakuStore struct { - ctx context.Context - MsgC chan *protocol.Envelope - messages []IndexedWakuMessage - seen map[[32]byte]struct{} + ctx context.Context + MsgC chan *protocol.Envelope started bool - messagesMutex sync.Mutex - - msgProvider MessageProvider - h host.Host + messageQueue *MessageQueue + msgProvider MessageProvider + h host.Host } -func NewWakuStore(p MessageProvider) *WakuStore { +// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages +func NewWakuStore(p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p - wakuStore.seen = make(map[[32]byte]struct{}) - + wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration) return wakuStore } -func (store *WakuStore) SetMsgProvider(p MessageProvider) { +// SetMessageProvider allows switching the message provider used with a WakuStore +func (store *WakuStore) SetMessageProvider(p MessageProvider) { store.msgProvider = p } +// Start initializes the WakuStore by enabling the protocol and fetching records from a message provider func (store *WakuStore) Start(ctx context.Context, h host.Host) { if store.started { return @@ -255,7 +257,7 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host) { store.started = true store.h = h store.ctx = ctx - store.MsgC = make(chan *protocol.Envelope) + store.MsgC = make(chan *protocol.Envelope, 1024) store.h.SetStreamHandlerMatch(StoreID_v20beta3, protocol.PrefixTextMatch(string(StoreID_v20beta3)), store.onRequest) @@ -291,53 +293,42 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message) - metrics.RecordMessage(ctx, "stored", len(store.messages)) + metrics.RecordMessage(ctx, "stored", store.messageQueue.Length()) } } func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) { - var k [32]byte - copy(k[:], idx.Digest) - - if _, ok := store.seen[k]; ok { - return - } - - store.seen[k] = struct{}{} - store.messages = append(store.messages, IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic}) + store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic}) } -func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) { - index, err := computeIndex(msg) +func (store *WakuStore) storeMessage(env *protocol.Envelope) { + index, err := computeIndex(env) if err != nil { log.Error("could not calculate message index", err) return } - store.messagesMutex.Lock() - defer store.messagesMutex.Unlock() - - store.storeMessageWithIndex(pubSubTopic, index, msg) + store.storeMessageWithIndex(env.PubsubTopic(), index, env.Message()) if store.msgProvider == nil { - metrics.RecordMessage(store.ctx, "stored", len(store.messages)) + metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length()) return } - err = store.msgProvider.Put(index, pubSubTopic, msg) // Should the index be stored? - + // TODO: Move this to a separate go routine if DB writes becomes a bottleneck + err = store.msgProvider.Put(index, env.PubsubTopic(), env.Message()) // Should the index be stored? if err != nil { log.Error("could not store message", err) metrics.RecordStoreError(store.ctx, "store_failure") return } - metrics.RecordMessage(store.ctx, "stored", len(store.messages)) + metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length()) } func (store *WakuStore) storeIncomingMessages(ctx context.Context) { for envelope := range store.MsgC { - store.storeMessage(envelope.PubsubTopic(), envelope.Message()) + store.storeMessage(envelope) } } @@ -371,16 +362,11 @@ func (store *WakuStore) onRequest(s network.Stream) { } } -func computeIndex(msg *pb.WakuMessage) (*pb.Index, error) { - data, err := msg.Marshal() - if err != nil { - return nil, err - } - digest := sha256.Sum256(data) +func computeIndex(env *protocol.Envelope) (*pb.Index, error) { return &pb.Index{ - Digest: digest[:], + Digest: env.Hash(), ReceiverTime: utils.GetUnixEpoch(), - SenderTime: msg.Timestamp, + SenderTime: env.Message().Timestamp, }, nil } @@ -436,12 +422,15 @@ type HistoryRequestParameters struct { type HistoryRequestOption func(*HistoryRequestParameters) +// WithPeer is an option used to specify the peerID to request the message history func WithPeer(p peer.ID) HistoryRequestOption { return func(params *HistoryRequestParameters) { params.selectedPeer = p } } +// WithAutomaticPeerSelection is an option used to randomly select a peer from the store +// to request the message history func WithAutomaticPeerSelection() HistoryRequestOption { return func(params *HistoryRequestParameters) { p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta3)) @@ -471,6 +460,7 @@ func WithCursor(c *pb.Index) HistoryRequestOption { } } +// WithPaging is an option used to specify the order and maximum number of records to return func WithPaging(asc bool, pageSize uint64) HistoryRequestOption { return func(params *HistoryRequestParameters) { params.asc = asc @@ -478,16 +468,17 @@ func WithPaging(asc bool, pageSize uint64) HistoryRequestOption { } } +// Default options to be used when querying a store node for results func DefaultOptions() []HistoryRequestOption { return []HistoryRequestOption{ WithAutomaticRequestId(), WithAutomaticPeerSelection(), - WithPaging(true, 0), + WithPaging(true, MaxPageSize), } } func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) { - log.Info(fmt.Sprintf("Resuming message history with peer %s", selectedPeer)) + log.Info(fmt.Sprintf("Querying message history with peer %s", selectedPeer)) connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta3) if err != nil { @@ -519,7 +510,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec return nil, err } - metrics.RecordMessage(ctx, "retrieved", len(store.messages)) + metrics.RecordMessage(ctx, "retrieved", store.messageQueue.Length()) return historyResponseRPC.Response, nil } @@ -583,6 +574,10 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR }, nil } +// Next is used with to retrieve the next page of rows from a query response. +// If no more records are found, the result will not contain any messages. +// This function is useful for iterating over results without having to manually +// specify the cursor and pagination order and max number of results func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) { q := &pb.HistoryQuery{ PubsubTopic: r.query.PubsubTopic, @@ -633,7 +628,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c func (store *WakuStore) findLastSeen() float64 { var lastSeenTime float64 = 0 - for _, imsg := range store.messages { + for imsg := range store.messageQueue.Messages() { if imsg.msg.Timestamp > lastSeenTime { lastSeenTime = imsg.msg.Timestamp } @@ -641,7 +636,7 @@ func (store *WakuStore) findLastSeen() float64 { return lastSeenTime } -// resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online +// Resume retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online // messages are stored in the store node's messages field and in the message db // the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message // an offset of 20 second is added to the time window to count for nodes asynchrony @@ -649,7 +644,6 @@ func (store *WakuStore) findLastSeen() float64 { // peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). // if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window. // the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string - func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) { if !store.started { return 0, errors.New("can't resume: store has not started") @@ -695,7 +689,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList } for _, msg := range response.Messages { - store.storeMessage(pubsubTopic, msg) + store.storeMessage(protocol.NewEnvelope(msg, pubsubTopic)) } log.Info("Retrieved messages since the last online time: ", len(response.Messages)) @@ -705,14 +699,15 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList // TODO: queryWithAccounting -func (w *WakuStore) Stop() { - w.started = false +// Stop closes the store message channel and removes the protocol stream handler +func (store *WakuStore) Stop() { + store.started = false - if w.MsgC != nil { - close(w.MsgC) + if store.MsgC != nil { + close(store.MsgC) } - if w.h != nil { - w.h.RemoveStreamHandler(StoreID_v20beta3) + if store.h != nil { + store.h.RemoveStreamHandler(StoreID_v20beta3) } } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/utils/time.go b/vendor/github.com/status-im/go-waku/waku/v2/utils/time.go index 902277742..1b28756e2 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/utils/time.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/utils/time.go @@ -2,10 +2,14 @@ package utils import "time" -func GetUnixEpochFrom(now func() time.Time) float64 { - return float64(now().UnixNano()) / float64(time.Second) +// GetUnixEpoch converts a time into a unix timestamp with the integer part +// representing seconds and the decimal part representing subseconds +func GetUnixEpochFrom(now time.Time) float64 { + return float64(now.UnixNano()) / float64(time.Second) } +// GetUnixEpoch returns the current time in unix timestamp with the integer part +// representing seconds and the decimal part representing subseconds func GetUnixEpoch() float64 { - return GetUnixEpochFrom(time.Now) + return GetUnixEpochFrom(time.Now()) } diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 000000000..690eb8501 --- /dev/null +++ b/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,212 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // forgotten indicates whether Forget was called with this call's key + // while the call was still in flight. + forgotten bool + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +// +// The returned channel will not be closed. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + c.wg.Done() + g.mu.Lock() + defer g.mu.Unlock() + if !c.forgotten { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true + } +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + if c, ok := g.m[key]; ok { + c.forgotten = true + } + delete(g.m, key) + g.mu.Unlock() +} diff --git a/vendor/golang.org/x/time/AUTHORS b/vendor/golang.org/x/time/AUTHORS new file mode 100644 index 000000000..15167cd74 --- /dev/null +++ b/vendor/golang.org/x/time/AUTHORS @@ -0,0 +1,3 @@ +# This source code refers to The Go Authors for copyright purposes. +# The master list of authors is in the main Go distribution, +# visible at http://tip.golang.org/AUTHORS. diff --git a/vendor/golang.org/x/time/CONTRIBUTORS b/vendor/golang.org/x/time/CONTRIBUTORS new file mode 100644 index 000000000..1c4577e96 --- /dev/null +++ b/vendor/golang.org/x/time/CONTRIBUTORS @@ -0,0 +1,3 @@ +# This source code was written by the Go contributors. +# The master list of contributors is in the main Go distribution, +# visible at http://tip.golang.org/CONTRIBUTORS. diff --git a/vendor/golang.org/x/time/LICENSE b/vendor/golang.org/x/time/LICENSE new file mode 100644 index 000000000..6a66aea5e --- /dev/null +++ b/vendor/golang.org/x/time/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/time/PATENTS b/vendor/golang.org/x/time/PATENTS new file mode 100644 index 000000000..733099041 --- /dev/null +++ b/vendor/golang.org/x/time/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/time/rate/rate.go b/vendor/golang.org/x/time/rate/rate.go new file mode 100644 index 000000000..a98fe7782 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate.go @@ -0,0 +1,402 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package rate provides a rate limiter. +package rate + +import ( + "context" + "fmt" + "math" + "sync" + "time" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit allows no events. +type Limit float64 + +// Inf is the infinite rate limit; it allows all events (even if burst is zero). +const Inf = Limit(math.MaxFloat64) + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + return Inf + } + return 1 / Limit(interval.Seconds()) +} + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// As a special case, if r == Inf (the infinite rate), b is ignored. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// +// The zero value is a valid Limiter, but it will reject all events. +// Use NewLimiter to create non-zero Limiters. +// +// Limiter has three main methods, Allow, Reserve, and Wait. +// Most callers should use Wait. +// +// Each of the three methods consumes a single token. +// They differ in their behavior when no token is available. +// If no token is available, Allow returns false. +// If no token is available, Reserve returns a reservation for a future token +// and the amount of time the caller must wait before using it. +// If no token is available, Wait blocks until one can be obtained +// or its associated context.Context is canceled. +// +// The methods AllowN, ReserveN, and WaitN consume n tokens. +type Limiter struct { + mu sync.Mutex + limit Limit + burst int + tokens float64 + // last is the last time the limiter's tokens field was updated + last time.Time + // lastEvent is the latest time of a rate-limited event (past or future) + lastEvent time.Time +} + +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.limit +} + +// Burst returns the maximum burst size. Burst is the maximum number of tokens +// that can be consumed in a single call to Allow, Reserve, or Wait, so higher +// Burst values allow more events to happen at once. +// A zero Burst allows no events, unless limit == Inf. +func (lim *Limiter) Burst() int { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.burst +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, b int) *Limiter { + return &Limiter{ + limit: r, + burst: b, + } +} + +// Allow is shorthand for AllowN(time.Now(), 1). +func (lim *Limiter) Allow() bool { + return lim.AllowN(time.Now(), 1) +} + +// AllowN reports whether n events may happen at time now. +// Use this method if you intend to drop / skip events that exceed the rate limit. +// Otherwise use Reserve or Wait. +func (lim *Limiter) AllowN(now time.Time, n int) bool { + return lim.reserveN(now, n, 0).ok +} + +// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. +// A Reservation may be canceled, which may enable the Limiter to permit additional events. +type Reservation struct { + ok bool + lim *Limiter + tokens int + timeToAct time.Time + // This is the Limit at reservation time, it can change later. + limit Limit +} + +// OK returns whether the limiter can provide the requested number of tokens +// within the maximum wait time. If OK is false, Delay returns InfDuration, and +// Cancel does nothing. +func (r *Reservation) OK() bool { + return r.ok +} + +// Delay is shorthand for DelayFrom(time.Now()). +func (r *Reservation) Delay() time.Duration { + return r.DelayFrom(time.Now()) +} + +// InfDuration is the duration returned by Delay when a Reservation is not OK. +const InfDuration = time.Duration(1<<63 - 1) + +// DelayFrom returns the duration for which the reservation holder must wait +// before taking the reserved action. Zero duration means act immediately. +// InfDuration means the limiter cannot grant the tokens requested in this +// Reservation within the maximum wait time. +func (r *Reservation) DelayFrom(now time.Time) time.Duration { + if !r.ok { + return InfDuration + } + delay := r.timeToAct.Sub(now) + if delay < 0 { + return 0 + } + return delay +} + +// Cancel is shorthand for CancelAt(time.Now()). +func (r *Reservation) Cancel() { + r.CancelAt(time.Now()) + return +} + +// CancelAt indicates that the reservation holder will not perform the reserved action +// and reverses the effects of this Reservation on the rate limit as much as possible, +// considering that other reservations may have already been made. +func (r *Reservation) CancelAt(now time.Time) { + if !r.ok { + return + } + + r.lim.mu.Lock() + defer r.lim.mu.Unlock() + + if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { + return + } + + // calculate tokens to restore + // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved + // after r was obtained. These tokens should not be restored. + restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) + if restoreTokens <= 0 { + return + } + // advance time to now + now, _, tokens := r.lim.advance(now) + // calculate new number of tokens + tokens += restoreTokens + if burst := float64(r.lim.burst); tokens > burst { + tokens = burst + } + // update state + r.lim.last = now + r.lim.tokens = tokens + if r.timeToAct == r.lim.lastEvent { + prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) + if !prevEvent.Before(now) { + r.lim.lastEvent = prevEvent + } + } + + return +} + +// Reserve is shorthand for ReserveN(time.Now(), 1). +func (lim *Limiter) Reserve() *Reservation { + return lim.ReserveN(time.Now(), 1) +} + +// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// The Limiter takes this Reservation into account when allowing future events. +// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size. +// Usage example: +// r := lim.ReserveN(time.Now(), 1) +// if !r.OK() { +// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? +// return +// } +// time.Sleep(r.Delay()) +// Act() +// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. +// If you need to respect a deadline or cancel the delay, use Wait instead. +// To drop or skip events exceeding rate limit, use Allow instead. +func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { + r := lim.reserveN(now, n, InfDuration) + return &r +} + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.WaitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +// The burst limit is ignored if the rate limit is Inf. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + lim.mu.Lock() + burst := lim.burst + limit := lim.limit + lim.mu.Unlock() + + if n > burst && limit != Inf { + return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst) + } + // Check if ctx is already cancelled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Determine wait limit + now := time.Now() + waitLimit := InfDuration + if deadline, ok := ctx.Deadline(); ok { + waitLimit = deadline.Sub(now) + } + // Reserve + r := lim.reserveN(now, n, waitLimit) + if !r.ok { + return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) + } + // Wait if necessary + delay := r.DelayFrom(now) + if delay == 0 { + return nil + } + t := time.NewTimer(delay) + defer t.Stop() + select { + case <-t.C: + // We can proceed. + return nil + case <-ctx.Done(): + // Context was canceled before we could proceed. Cancel the + // reservation, which may permit other events to proceed sooner. + r.Cancel() + return ctx.Err() + } +} + +// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). +func (lim *Limiter) SetLimit(newLimit Limit) { + lim.SetLimitAt(time.Now(), newLimit) +} + +// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated +// or underutilized by those which reserved (using Reserve or Wait) but did not yet act +// before SetLimitAt was called. +func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { + lim.mu.Lock() + defer lim.mu.Unlock() + + now, _, tokens := lim.advance(now) + + lim.last = now + lim.tokens = tokens + lim.limit = newLimit +} + +// SetBurst is shorthand for SetBurstAt(time.Now(), newBurst). +func (lim *Limiter) SetBurst(newBurst int) { + lim.SetBurstAt(time.Now(), newBurst) +} + +// SetBurstAt sets a new burst size for the limiter. +func (lim *Limiter) SetBurstAt(now time.Time, newBurst int) { + lim.mu.Lock() + defer lim.mu.Unlock() + + now, _, tokens := lim.advance(now) + + lim.last = now + lim.tokens = tokens + lim.burst = newBurst +} + +// reserveN is a helper method for AllowN, ReserveN, and WaitN. +// maxFutureReserve specifies the maximum reservation wait duration allowed. +// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { + lim.mu.Lock() + + if lim.limit == Inf { + lim.mu.Unlock() + return Reservation{ + ok: true, + lim: lim, + tokens: n, + timeToAct: now, + } + } + + now, last, tokens := lim.advance(now) + + // Calculate the remaining number of tokens resulting from the request. + tokens -= float64(n) + + // Calculate the wait duration + var waitDuration time.Duration + if tokens < 0 { + waitDuration = lim.limit.durationFromTokens(-tokens) + } + + // Decide result + ok := n <= lim.burst && waitDuration <= maxFutureReserve + + // Prepare reservation + r := Reservation{ + ok: ok, + lim: lim, + limit: lim.limit, + } + if ok { + r.tokens = n + r.timeToAct = now.Add(waitDuration) + } + + // Update state + if ok { + lim.last = now + lim.tokens = tokens + lim.lastEvent = r.timeToAct + } else { + lim.last = last + } + + lim.mu.Unlock() + return r +} + +// advance calculates and returns an updated state for lim resulting from the passage of time. +// lim is not changed. +// advance requires that lim.mu is held. +func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { + last := lim.last + if now.Before(last) { + last = now + } + + // Avoid making delta overflow below when last is very old. + maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) + elapsed := now.Sub(last) + if elapsed > maxElapsed { + elapsed = maxElapsed + } + + // Calculate the new number of tokens, due to time that passed. + delta := lim.limit.tokensFromDuration(elapsed) + tokens := lim.tokens + delta + if burst := float64(lim.burst); tokens > burst { + tokens = burst + } + + return now, last, tokens +} + +// durationFromTokens is a unit conversion function from the number of tokens to the duration +// of time it takes to accumulate them at a rate of limit tokens per second. +func (limit Limit) durationFromTokens(tokens float64) time.Duration { + seconds := tokens / float64(limit) + return time.Nanosecond * time.Duration(1e9*seconds) +} + +// tokensFromDuration is a unit conversion function from a time duration to the number of tokens +// which could be accumulated during that duration at a rate of limit tokens per second. +func (limit Limit) tokensFromDuration(d time.Duration) float64 { + // Split the integer and fractional parts ourself to minimize rounding errors. + // See golang.org/issues/34861. + sec := float64(d/time.Second) * float64(limit) + nsec := float64(d%time.Second) * float64(limit) + return sec + nsec/1e9 +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 5373b4c2b..85be03ae5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -101,6 +101,7 @@ github.com/ethereum/go-ethereum/p2p/discover github.com/ethereum/go-ethereum/p2p/discover/v4wire github.com/ethereum/go-ethereum/p2p/discover/v5wire github.com/ethereum/go-ethereum/p2p/discv5 +github.com/ethereum/go-ethereum/p2p/dnsdisc github.com/ethereum/go-ethereum/p2p/enode github.com/ethereum/go-ethereum/p2p/enr github.com/ethereum/go-ethereum/p2p/msgrate @@ -446,9 +447,10 @@ github.com/spacemonkeygo/spacelog github.com/status-im/doubleratchet # github.com/status-im/go-multiaddr-ethv4 v1.2.1 github.com/status-im/go-multiaddr-ethv4 -# github.com/status-im/go-waku v0.0.0-20211101173358-268767262b60 +# github.com/status-im/go-waku v0.0.0-20211108125814-49737780ea8d github.com/status-im/go-waku/waku/persistence github.com/status-im/go-waku/waku/v2 +github.com/status-im/go-waku/waku/v2/discovery github.com/status-im/go-waku/waku/v2/metrics github.com/status-im/go-waku/waku/v2/node github.com/status-im/go-waku/waku/v2/protocol @@ -602,6 +604,7 @@ golang.org/x/net/publicsuffix golang.org/x/net/route # golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync/errgroup +golang.org/x/sync/singleflight # golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 golang.org/x/sys/cpu golang.org/x/sys/internal/unsafeheader @@ -631,6 +634,8 @@ golang.org/x/text/secure/bidirule golang.org/x/text/transform golang.org/x/text/unicode/bidi golang.org/x/text/unicode/norm +# golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 +golang.org/x/time/rate # google.golang.org/protobuf v1.27.1 google.golang.org/protobuf/encoding/prototext google.golang.org/protobuf/encoding/protowire diff --git a/wakuv2/waku.go b/wakuv2/waku.go index d63ccb51c..71f1f1d70 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -27,6 +27,7 @@ import ( "fmt" "net" "runtime" + "strings" "sync" "time" @@ -56,6 +57,7 @@ import ( libp2pproto "github.com/libp2p/go-libp2p-core/protocol" rendezvous "github.com/status-im/go-waku-rendezvous" + "github.com/status-im/go-waku/waku/v2/discovery" "github.com/status-im/go-waku/waku/v2/protocol" wakuprotocol "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/filter" @@ -94,6 +96,9 @@ type ConnStatus struct { type Waku struct { node *node.WakuNode // reference to a libp2p waku node + dnsAddressCache map[string][]multiaddr.Multiaddr // Map to store the multiaddresses returned by dns discovery + dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map + filters *common.Filters // Message filters installed with Subscribe function filterMsgChannel chan *protocol.Envelope // Channel for wakuv2 filter messages @@ -133,14 +138,16 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, } waku := &Waku{ - privateKeys: make(map[string]*ecdsa.PrivateKey), - symKeys: make(map[string][]byte), - envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage), - expirations: make(map[uint32]mapset.Set), - msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), - quit: make(chan struct{}), - timeSource: time.Now, - logger: logger, + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage), + expirations: make(map[uint32]mapset.Set), + msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), + quit: make(chan struct{}), + dnsAddressCache: make(map[string][]multiaddr.Multiaddr), + dnsAddressCacheLock: &sync.RWMutex{}, + timeSource: time.Now, + logger: logger, } waku.settings = settings{ @@ -202,13 +209,10 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, } opts := []node.WakuNodeOption{ - node.WithLibP2POptions( - libp2pOpts..., - ), + node.WithLibP2POptions(libp2pOpts...), node.WithPrivateKey(privateKey), node.WithHostAddress([]*net.TCPAddr{hostAddr}), - node.WithWakuStore(false, false), // Mounts the store protocol (without storing the messages) - node.WithConnStatusChan(connStatusChan), + node.WithConnectionStatusChannel(connStatusChan), node.WithKeepAlive(time.Duration(cfg.KeepAliveInterval) * time.Second), } @@ -260,48 +264,89 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, return waku, nil } -func (w *Waku) addPeers(addresses []string, protocol libp2pproto.ID) { +type fnApplyToEachPeer func(ma multiaddr.Multiaddr, protocol libp2pproto.ID) + +func (w *Waku) addPeers(addresses []string, protocol libp2pproto.ID, apply fnApplyToEachPeer) { for _, addrString := range addresses { if addrString == "" { continue } - addr, err := multiaddr.NewMultiaddr(addrString) - if err != nil { - log.Warn("invalid peer multiaddress", addrString, err) - continue + if strings.HasPrefix(addrString, "enrtree://") { + // Use DNS Discovery + go w.dnsDiscover(addrString, protocol, apply) + } else { + // It's a normal multiaddress + w.addPeerFromString(addrString, protocol, apply) } - - peerID, err := w.node.AddPeer(addr, protocol) - if err != nil { - log.Warn("could not add peer", addr, err) - continue - } - - log.Info("peer added successfully", peerID) } } +func (w *Waku) dnsDiscover(enrtreeAddress string, protocol libp2pproto.ID, apply fnApplyToEachPeer) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + w.dnsAddressCacheLock.RLock() + multiaddresses, ok := w.dnsAddressCache[enrtreeAddress] + w.dnsAddressCacheLock.RUnlock() + + if !ok { + w.dnsAddressCacheLock.Lock() + var err error + multiaddresses, err = discovery.RetrieveNodes(ctx, enrtreeAddress) + w.dnsAddressCache[enrtreeAddress] = multiaddresses + w.dnsAddressCacheLock.Unlock() + if err != nil { + log.Warn("dns discovery error ", err) + return + } + } + + for _, m := range multiaddresses { + apply(m, protocol) + } +} + +func (w *Waku) addPeerFromString(addrString string, protocol libp2pproto.ID, apply fnApplyToEachPeer) { + addr, err := multiaddr.NewMultiaddr(addrString) + if err != nil { + log.Warn("invalid peer multiaddress", addrString, err) + return + } + + apply(addr, protocol) +} + func (w *Waku) addWakuV2Peers(cfg *Config) { if !cfg.LightClient { - for _, relaynode := range cfg.RelayNodes { - go func(node string) { + addRelayPeer := func(m multiaddr.Multiaddr, protocol libp2pproto.ID) { + go func(node multiaddr.Multiaddr) { ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() - err := w.node.DialPeer(ctx, node) + err := w.node.DialPeerWithMultiAddress(ctx, node) if err != nil { log.Warn("could not dial peer", err) } else { log.Info("relay peer dialed successfully", node) } - }(relaynode) + }(m) } + w.addPeers(cfg.RelayNodes, relay.WakuRelayID_v200, addRelayPeer) } - w.addPeers(cfg.StoreNodes, store.StoreID_v20beta3) - w.addPeers(cfg.FilterNodes, filter.FilterID_v20beta1) - w.addPeers(cfg.LightpushNodes, lightpush.LightPushID_v20beta1) - w.addPeers(cfg.WakuRendezvousNodes, rendezvous.RendezvousID_v001) + addToStore := func(m multiaddr.Multiaddr, protocol libp2pproto.ID) { + peerID, err := w.node.AddPeer(m, protocol) + if err != nil { + log.Warn("could not add peer", m, err) + return + } + log.Info("peer added successfully", peerID) + } + + w.addPeers(cfg.StoreNodes, store.StoreID_v20beta3, addToStore) + w.addPeers(cfg.FilterNodes, filter.FilterID_v20beta1, addToStore) + w.addPeers(cfg.LightpushNodes, lightpush.LightPushID_v20beta1, addToStore) + w.addPeers(cfg.WakuRendezvousNodes, rendezvous.RendezvousID_v001, addToStore) } func (w *Waku) GetStats() types.StatsSummary { @@ -358,15 +403,19 @@ func (w *Waku) subscribeWakuFilterTopic(topics [][]byte) { } var err error - filter := filter.ContentFilter{ + contentFilter := filter.ContentFilter{ Topic: string(pubsubTopic), ContentTopics: contentTopics, } - _, w.filterMsgChannel, err = w.node.SubscribeFilter(context.Background(), filter) + + var wakuFilter filter.Filter + _, wakuFilter, err = w.node.Filter().Subscribe(context.Background(), contentFilter) if err != nil { w.logger.Warn("could not add wakuv2 filter for topics", zap.Any("topics", topics)) return } + + w.filterMsgChannel = wakuFilter.Chan } // MaxMessageSize returns the maximum accepted message size. @@ -686,7 +735,8 @@ func (w *Waku) Unsubscribe(id string) error { for _, topic := range f.Topics { contentFilter.ContentTopics = append(contentFilter.ContentTopics, common.BytesToTopic(topic).ContentTopic()) } - if err := w.node.UnsubscribeFilter(context.Background(), contentFilter); err != nil { + + if err := w.node.Filter().UnsubscribeFilter(context.Background(), contentFilter); err != nil { return fmt.Errorf("failed to unsubscribe: %w", err) } } @@ -758,7 +808,10 @@ func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []s Topic: string(relay.DefaultWakuTopic), } - result, err := w.node.Store().Query(context.Background(), query, opts...) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + result, err := w.node.Store().Query(ctx, query, opts...) if err != nil { return }