Bump go-waku

This commit is contained in:
Andrea Maria Piana 2023-07-07 11:26:03 +01:00
parent 5a2757e785
commit 67050429df
24 changed files with 397 additions and 276 deletions

4
go.mod
View File

@ -62,7 +62,7 @@ require (
github.com/xeipuuv/gojsonschema v1.2.0
github.com/zenthangplus/goccm v0.0.0-20211005163543-2f2e522aca15
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.7.0
golang.org/x/crypto v0.9.0
golang.org/x/image v0.0.0-20210220032944-ac19c3e999fb
google.golang.org/protobuf v1.31.0
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
@ -83,7 +83,7 @@ require (
github.com/mutecomm/go-sqlcipher/v4 v4.4.2
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/waku-org/go-waku v0.7.1-0.20230703171720-78ed073c7542
github.com/waku-org/go-waku v0.7.1-0.20230707130306-0d4ff8bff7f4
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1

8
go.sum
View File

@ -2090,8 +2090,8 @@ github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZF
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 h1:0e1h+p84yBp0IN7AqgbZlV7lgFBjm214lgSOE7CeJmE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7/go.mod h1:pFvOZ9YTFsW0o5zJW7a0B5tr1owAijRWJctXJ2toL04=
github.com/waku-org/go-waku v0.7.1-0.20230703171720-78ed073c7542 h1:AFsXEAMdzd3v1QmPjWWlnrWGS2bM+cMxLdVJdtO5j3c=
github.com/waku-org/go-waku v0.7.1-0.20230703171720-78ed073c7542/go.mod h1:AxFv7fMqSEEi23O7Kq87Hmlqnx4QVVbh0Rvc3f71NQM=
github.com/waku-org/go-waku v0.7.1-0.20230707130306-0d4ff8bff7f4 h1:FHoWj4WpAaQ334xWY1a5MRqPIE62fNZ5hdUB90jicLw=
github.com/waku-org/go-waku v0.7.1-0.20230707130306-0d4ff8bff7f4/go.mod h1:52Ulzg6Y4zmQexM41I9DqVR+8d91JJGDUUp23Vw6HNg=
github.com/waku-org/go-zerokit-rln v0.1.12 h1:66+tU6sTlmUpuUlEv7kCFOGZ37MwZYFJBXHcm8QquwU=
github.com/waku-org/go-zerokit-rln v0.1.12/go.mod h1:MUW+wB6Yj7UBMdZrhko7oHfUZeY2wchggXYjpUiMoac=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 h1:Q5XQqo+PEmvrybT8D7BEsKCwIYDi80s+00Q49cfm9Gs=
@ -2282,8 +2282,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=

View File

@ -1,36 +0,0 @@
package try
import "errors"
// MaxRetries is the maximum number of retries before bailing.
var MaxRetries = 10
var errMaxRetriesReached = errors.New("exceeded retry limit")
// Func represents functions that can be retried.
type Func func(attempt int) (retry bool, err error)
// Do keeps trying the function until the second argument
// returns false, or no error is returned.
func Do(fn Func) error {
var err error
var cont bool
attempt := 1
for {
cont, err = fn(attempt)
if !cont || err == nil {
break
}
attempt++
if attempt > MaxRetries {
return errMaxRetriesReached
}
}
return err
}
// IsMaxRetries checks whether the error is due to hitting the
// maximum number of retries or not.
func IsMaxRetries(err error) bool {
return err == errMaxRetriesReached
}

View File

@ -35,11 +35,12 @@ type PeerConnectionStrategy struct {
workerCtx context.Context
workerCancel context.CancelFunc
wg sync.WaitGroup
minPeers int
dialTimeout time.Duration
peerCh chan PeerData
dialCh chan peer.AddrInfo
wg sync.WaitGroup
minPeers int
dialTimeout time.Duration
peerCh chan PeerData
dialCh chan peer.AddrInfo
subscriptions []<-chan PeerData
backoff backoff.BackoffFactory
mux sync.Mutex
@ -78,9 +79,33 @@ type PeerData struct {
ENR *enode.Node
}
// PeerChannel exposes the channel on which discovered peers should be pushed
func (c *PeerConnectionStrategy) PeerChannel() chan<- PeerData {
return c.peerCh
// PeerChannel receives channels on which discovered peers should be pushed
func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerData) {
if c.cancel != nil {
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.consumeSubscription(ctx, ch)
}()
} else {
c.subscriptions = append(c.subscriptions, ch)
}
}
func (c *PeerConnectionStrategy) consumeSubscription(ctx context.Context, ch <-chan PeerData) {
for {
select {
case <-ctx.Done():
return
case p := <-ch:
select {
case <-ctx.Done():
return
case c.peerCh <- p:
}
}
}
}
// Sets the host to be able to mount or consume a protocol
@ -104,6 +129,8 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
go c.workPublisher(ctx)
go c.dialPeers(ctx)
c.consumeSubscriptions(ctx)
return nil
}
@ -117,6 +144,9 @@ func (c *PeerConnectionStrategy) Stop() {
close(c.peerCh)
close(c.dialCh)
c.subscriptions = nil
c.cancel = nil
}
func (c *PeerConnectionStrategy) isPaused() bool {
@ -157,6 +187,16 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) {
}
}
func (c *PeerConnectionStrategy) consumeSubscriptions(ctx context.Context) {
for _, subs := range c.subscriptions {
c.wg.Add(1)
go func(s <-chan PeerData) {
defer c.wg.Done()
c.consumeSubscription(ctx, s)
}(subs)
}
}
func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInfo) {
select {
case c.dialCh <- p:

View File

@ -28,14 +28,20 @@ import (
var ErrNoDiscV5Listener = errors.New("no discv5 listener")
type PeerConnector interface {
Subscribe(context.Context, <-chan v2.PeerData)
}
type DiscoveryV5 struct {
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
peerConnector PeerConnector
peerCh chan v2.PeerData
NAT nat.Interface
log *zap.Logger
@ -101,10 +107,6 @@ func DefaultOptions() []DiscoveryV5Option {
}
}
type PeerConnector interface {
PeerChannel() chan<- v2.PeerData
}
func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConnector PeerConnector, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
params := new(discV5Parameters)
optList := DefaultOptions()
@ -121,8 +123,8 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn
}
return &DiscoveryV5{
peerConnector: peerConnector,
params: params,
peerConnector: peerConnector,
NAT: NAT,
wg: &sync.WaitGroup{},
localnode: localnode,
@ -195,6 +197,9 @@ func (d *DiscoveryV5) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
d.cancel = cancel
d.peerCh = make(chan v2.PeerData)
d.peerConnector.Subscribe(ctx, d.peerCh)
err := d.listen(ctx)
if err != nil {
return err
@ -225,6 +230,7 @@ func (d *DiscoveryV5) Stop() {
if !d.started.CompareAndSwap(true, false) { // if Discoveryv5 is running, set started to false
return
}
d.cancel()
if d.listener != nil {
@ -234,6 +240,8 @@ func (d *DiscoveryV5) Stop() {
}
d.wg.Wait()
close(d.peerCh)
}
/*
@ -373,7 +381,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
return true
}
nodeRS, err := enr.RelaySharding(d.localnode.Node().Record())
nodeRS, err := enr.RelaySharding(n.Record())
if err != nil || nodeRS == nil {
return false
}
@ -383,7 +391,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
}
// Contains any
for _, idx := range nodeRS.Indices {
for _, idx := range localRS.Indices {
if nodeRS.Contains(localRS.Cluster, idx) {
return true
}
@ -402,7 +410,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
}
select {
case d.peerConnector.PeerChannel() <- peer:
case d.peerCh <- peer:
case <-ctx.Done():
return nil
}
@ -437,3 +445,7 @@ restartLoop:
func (d *DiscoveryV5) IsStarted() bool {
return d.started.Load()
}
func (d *DiscoveryV5) PeerChannel() <-chan v2.PeerData {
return d.peerCh
}

View File

@ -14,7 +14,6 @@ import (
)
const maxAllowedPingFailures = 2
const maxPublishAttempt = 5
func disconnectPeers(host host.Host, logger *zap.Logger) {
logger.Warn("keep alive hasnt been executed recently. Killing all connections to peers")

View File

@ -9,7 +9,9 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
ma "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
@ -56,6 +58,8 @@ func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.M
}
}
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
return wenr.Update(localnode, options...)
}
@ -269,5 +273,62 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
return err
}
if w.Relay() != nil {
err = w.watchTopicShards(ctx)
if err != nil {
return err
}
}
return nil
}
func (w *WakuNode) watchTopicShards(ctx context.Context) error {
evtRelaySubscribed, err := w.Relay().Events().Subscribe(new(relay.EvtRelaySubscribed))
if err != nil {
return err
}
evtRelayUnsubscribed, err := w.Relay().Events().Subscribe(new(relay.EvtRelayUnsubscribed))
if err != nil {
return err
}
go func() {
defer evtRelaySubscribed.Close()
defer evtRelayUnsubscribed.Close()
for {
select {
case <-ctx.Done():
return
case <-evtRelayUnsubscribed.Out():
case <-evtRelaySubscribed.Out():
rs, err := protocol.TopicsToRelayShards(w.Relay().Topics()...)
if err != nil {
w.log.Warn("could not set ENR shard info", zap.Error(err))
continue
}
if len(rs) > 1 {
w.log.Warn("use sharded topics within the same cluster")
continue
}
if len(rs) == 1 {
w.log.Info("updating advertised relay shards in ENR")
err = wenr.Update(w.localNode, wenr.WithWakuRelaySharding(rs[0]))
if err != nil {
w.log.Warn("could not set ENR shard info", zap.Error(err))
continue
}
w.enrChangeCh <- struct{}{}
}
}
}
}()
return nil
}

View File

@ -4,7 +4,6 @@ import (
"context"
"github.com/libp2p/go-libp2p/core/host"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
@ -19,8 +18,3 @@ type ReceptorService interface {
Stop()
Start(context.Context, relay.Subscription) error
}
type PeerConnectorService interface {
Service
PeerChannel() chan<- v2.PeerData
}

View File

@ -2,7 +2,6 @@ package node
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
@ -32,7 +31,6 @@ import (
"go.opencensus.io/stats"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/try"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/metrics"
@ -82,11 +80,11 @@ type WakuNode struct {
log *zap.Logger
timesource timesource.Timesource
peerstore peerstore.Peerstore
peerstore peerstore.Peerstore
peerConnector *v2.PeerConnectionStrategy
relay Service
lightPush Service
peerConnector PeerConnectorService
discoveryV5 Service
peerExchange Service
rendezvous Service
@ -274,7 +272,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
rendezvousPoints = append(rendezvousPoints, peerID)
}
w.rendezvous = rendezvous.NewRendezvous(w.opts.enableRendezvousServer, w.opts.rendezvousDB, rendezvousPoints, w.peerConnector, w.log)
w.rendezvous = rendezvous.NewRendezvous(w.opts.enableRendezvousServer, w.opts.rendezvousDB, rendezvousPoints, w.log)
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...)
w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...)
@ -336,9 +334,10 @@ func (w *WakuNode) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel
w.opts.libP2POpts = append(w.opts.libP2POpts, libp2p.ConnectionGater(connGater))
host, err := libp2p.New(w.opts.libP2POpts...)
libP2POpts := append(w.opts.libP2POpts, libp2p.ConnectionGater(connGater))
host, err := libp2p.New(libP2POpts...)
if err != nil {
return err
}
@ -403,14 +402,6 @@ func (w *WakuNode) Start(ctx context.Context) error {
if err != nil {
return err
}
if !w.opts.noDefaultWakuTopic {
sub, err := w.Relay().Subscribe(ctx)
if err != nil {
return err
}
sub.Unsubscribe()
}
}
w.store = w.storeFactory(w)
@ -666,34 +657,6 @@ func (w *WakuNode) Broadcaster() relay.Broadcaster {
return w.bcaster
}
// Publish will attempt to publish a message via WakuRelay if there are enough
// peers available, otherwise it will attempt to publish via Lightpush protocol
func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error {
if !w.opts.enableLightPush && !w.opts.enableRelay {
return errors.New("cannot publish message, relay and lightpush are disabled")
}
hash := msg.Hash(relay.DefaultWakuTopic)
err := try.Do(func(attempt int) (bool, error) {
var err error
relay := w.Relay()
lightpush := w.Lightpush()
if relay == nil || !relay.EnoughPeersToPublish() {
w.log.Debug("publishing message via lightpush", logging.HexBytes("hash", hash))
_, err = lightpush.Publish(ctx, msg)
} else {
w.log.Debug("publishing message via relay", logging.HexBytes("hash", hash))
_, err = relay.Publish(ctx, msg)
}
return attempt < maxPublishAttempt, err
})
return err
}
func (w *WakuNode) mountDiscV5() error {
discV5Options := []discv5.DiscoveryV5Option{
discv5.WithBootnodes(w.opts.discV5bootnodes),
@ -718,33 +681,6 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error
return err
}
if len(w.opts.resumeNodes) != 0 {
// TODO: extract this to a function and run it when you go offline
// TODO: determine if a store is listening to a topic
var peerIDs []peer.ID
for _, n := range w.opts.resumeNodes {
pID, err := w.AddPeer(n, peers.Static, store.StoreID_v20beta4)
if err != nil {
w.log.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err))
}
peerIDs = append(peerIDs, pID)
}
if !w.opts.noDefaultWakuTopic {
w.wg.Add(1)
go func() {
defer w.wg.Done()
ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second)
defer ctxCancel()
if _, err := w.store.(store.Store).Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), peerIDs); err != nil {
w.log.Error("Could not resume history", zap.Error(err))
time.Sleep(10 * time.Second)
}
}()
}
}
return nil
}

View File

@ -66,7 +66,6 @@ type WakuNodeParameters struct {
logger *zap.Logger
logLevel logging.LogLevel
noDefaultWakuTopic bool
enableRelay bool
enableLegacyFilter bool
isLegacyFilterFullnode bool
@ -79,7 +78,6 @@ type WakuNodeParameters struct {
minRelayPeersToPublish int
enableStore bool
resumeNodes []multiaddr.Multiaddr
messageProvider store.MessageProvider
rendezvousNodes []multiaddr.Multiaddr
@ -318,15 +316,6 @@ func WithPeerStore(ps peerstore.Peerstore) WakuNodeOption {
}
}
// NoDefaultWakuTopic will stop the node from subscribing to the default
// pubsub topic automatically
func NoDefaultWakuTopic() WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.noDefaultWakuTopic = true
return nil
}
}
// WithWakuRelay enables the Waku V2 Relay protocol. This WakuNodeOption
// accepts a list of WakuRelay gossipsub option to setup the protocol
func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption {
@ -400,12 +389,10 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
}
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
// be stored or not in a message provider. If resumeNodes are specified, the
// store will attempt to resume message history using those nodes
func WithWakuStore(resumeNodes ...multiaddr.Multiaddr) WakuNodeOption {
// be stored or not in a message provider.
func WithWakuStore() WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableStore = true
params.resumeNodes = resumeNodes
return nil
}
}

View File

@ -91,7 +91,6 @@ func WithUDPPort(udpPort uint) ENROption {
}
func Update(localnode *enode.LocalNode, enrOptions ...ENROption) error {
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
for _, opt := range enrOptions {
err := opt(localnode)
if err != nil {

View File

@ -8,12 +8,18 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
)
func deleteShardingENREntries(localnode *enode.LocalNode) {
localnode.Delete(enr.WithEntry(ShardingBitVectorEnrField, struct{}{}))
localnode.Delete(enr.WithEntry(ShardingIndicesListEnrField, struct{}{}))
}
func WithWakuRelayShardingIndicesList(rs protocol.RelayShards) ENROption {
return func(localnode *enode.LocalNode) error {
value, err := rs.IndicesList()
if err != nil {
return err
}
deleteShardingENREntries(localnode)
localnode.Set(enr.WithEntry(ShardingIndicesListEnrField, value))
return nil
}
@ -21,6 +27,7 @@ func WithWakuRelayShardingIndicesList(rs protocol.RelayShards) ENROption {
func WithWakuRelayShardingBitVector(rs protocol.RelayShards) ENROption {
return func(localnode *enode.LocalNode) error {
deleteShardingENREntries(localnode)
localnode.Set(enr.WithEntry(ShardingBitVectorEnrField, rs.BitVector()))
return nil
}
@ -55,8 +62,11 @@ func WithWakuRelayShardingTopics(topics ...string) ENROption {
func RelayShardingIndicesList(record *enr.Record) (*protocol.RelayShards, error) {
var field []byte
if err := record.Load(enr.WithEntry(ShardingIndicesListEnrField, field)); err != nil {
return nil, nil
if err := record.Load(enr.WithEntry(ShardingIndicesListEnrField, &field)); err != nil {
if enr.IsNotFound(err) {
return nil, nil
}
return nil, err
}
res, err := protocol.FromIndicesList(field)
@ -69,7 +79,7 @@ func RelayShardingIndicesList(record *enr.Record) (*protocol.RelayShards, error)
func RelayShardingBitVector(record *enr.Record) (*protocol.RelayShards, error) {
var field []byte
if err := record.Load(enr.WithEntry(ShardingBitVectorEnrField, field)); err != nil {
if err := record.Load(enr.WithEntry(ShardingBitVectorEnrField, &field)); err != nil {
if enr.IsNotFound(err) {
return nil, nil
}

View File

@ -424,9 +424,7 @@ func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...Filte
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
wf.subscriptions.Lock()
delete(wf.subscriptions.items, peerID)
defer wf.subscriptions.Unlock()
resultChan <- WakuFilterPushResult{
Err: err,

View File

@ -103,6 +103,10 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
wakuPX.wg.Add(1)
go func() {
defer wakuPX.wg.Done()
peerCh := make(chan v2.PeerData)
defer close(peerCh)
wakuPX.peerConnector.Subscribe(ctx, peerCh)
for _, p := range discoveredPeers {
peer := v2.PeerData{
Origin: peers.PeerExchange,
@ -112,7 +116,7 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
select {
case <-ctx.Done():
return
case wakuPX.peerConnector.PeerChannel() <- peer:
case peerCh <- peer:
}
}
}()

View File

@ -31,6 +31,10 @@ var (
ErrInvalidId = errors.New("invalid request id")
)
type PeerConnector interface {
Subscribe(context.Context, <-chan v2.PeerData)
}
type WakuPeerExchange struct {
h host.Host
disc *discv5.DiscoveryV5
@ -44,10 +48,6 @@ type WakuPeerExchange struct {
enrCache *enrCache
}
type PeerConnector interface {
PeerChannel() chan<- v2.PeerData
}
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) {
newEnrCache, err := newEnrCache(MaxCacheSize)
@ -59,6 +59,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector,
wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = newEnrCache
wakuPX.peerConnector = peerConnector
return wakuPX, nil
}

View File

@ -7,9 +7,11 @@ import (
"sync"
"time"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
@ -50,11 +52,27 @@ type WakuRelay struct {
wakuRelayTopics map[string]*pubsub.Topic
relaySubs map[string]*pubsub.Subscription
events event.Bus
emitters struct {
EvtRelaySubscribed event.Emitter
EvtRelayUnsubscribed event.Emitter
}
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// EvtRelaySubscribed is an event emitted when a new subscription to a pubsub topic is created
type EvtRelaySubscribed struct {
Topic string
}
// EvtRelayUnsubscribed is an event emitted when a subscription to a pubsub topic is closed
type EvtRelayUnsubscribed struct {
Topic string
}
func msgIdFn(pmsg *pubsub_pb.Message) string {
return string(hash.SHA256(pmsg.Data))
}
@ -69,6 +87,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
w.minPeersToPublish = minPeersToPublish
w.wg = sync.WaitGroup{}
w.log = log.Named("relay")
w.events = eventbus.NewBus()
cfg := pubsub.DefaultGossipSubParams()
cfg.PruneBackoff = time.Minute
@ -200,6 +219,15 @@ func (w *WakuRelay) Start(ctx context.Context) error {
}
w.pubsub = ps
w.emitters.EvtRelaySubscribed, err = w.events.Emitter(new(EvtRelaySubscribed))
if err != nil {
return err
}
w.emitters.EvtRelayUnsubscribed, err = w.events.Emitter(new(EvtRelayUnsubscribed))
if err != nil {
return err
}
w.log.Info("Relay protocol started")
return nil
}
@ -234,8 +262,8 @@ func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) {
}
func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
defer w.topicsMutex.Unlock()
pubSubTopic, ok := w.wakuRelayTopics[topic]
if !ok { // Joins topic if node hasn't joined yet
@ -267,7 +295,14 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro
if err != nil {
return nil, err
}
w.relaySubs[topic] = sub
err = w.emitters.EvtRelaySubscribed.Emit(EvtRelaySubscribed{topic})
if err != nil {
return nil, err
}
if w.bcaster != nil {
w.wg.Add(1)
go w.subscribeToTopic(topic, sub)
@ -328,7 +363,8 @@ func (w *WakuRelay) Stop() {
}
w.host.RemoveStreamHandler(WakuRelayID_v200)
w.emitters.EvtRelaySubscribed.Close()
w.emitters.EvtRelayUnsubscribed.Close()
w.cancel()
w.wg.Wait()
}
@ -384,6 +420,11 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error {
}
delete(w.wakuRelayTopics, topic)
err = w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{topic})
if err != nil {
return err
}
return nil
}
@ -449,3 +490,8 @@ func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscriptio
func (w *WakuRelay) Params() pubsub.GossipSubParams {
return w.params
}
// Events returns the event bus on which WakuRelay events will be emitted
func (w *WakuRelay) Events() event.Bus {
return w.events
}

View File

@ -53,8 +53,8 @@ func (rs RelayShards) Contains(cluster uint16, index uint16) bool {
}
found := false
for _, i := range rs.Indices {
if i == index {
for _, idx := range rs.Indices {
if idx == index {
found = true
}
}

View File

@ -159,7 +159,6 @@ func (n StaticShardingPubsubTopic) String() string {
func (s *StaticShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
fmt.Println(topic, StaticShardingPubsubTopicPrefix)
return ErrInvalidShardedTopicPrefix
}

View File

@ -34,18 +34,14 @@ type Rendezvous struct {
rendezvousSvc *rvs.RendezvousService
rendezvousPoints []*rendezvousPoint
peerConnector PeerConnector
peerCh chan v2.PeerData
log *zap.Logger
wg sync.WaitGroup
cancel context.CancelFunc
}
type PeerConnector interface {
PeerChannel() chan<- v2.PeerData
}
func NewRendezvous(enableServer bool, db *DB, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
func NewRendezvous(enableServer bool, db *DB, rendezvousPoints []peer.ID, log *zap.Logger) *Rendezvous {
logger := log.Named("rendezvous")
var rendevousPoints []*rendezvousPoint
@ -59,7 +55,7 @@ func NewRendezvous(enableServer bool, db *DB, rendezvousPoints []peer.ID, peerCo
enableServer: enableServer,
db: db,
rendezvousPoints: rendevousPoints,
peerConnector: peerConnector,
peerCh: make(chan v2.PeerData),
log: logger,
}
}
@ -124,7 +120,7 @@ func (r *Rendezvous) Discover(ctx context.Context, topic string, numPeers int) {
AddrInfo: addr,
}
select {
case r.peerConnector.PeerChannel() <- peer:
case r.peerCh <- peer:
case <-ctx.Done():
return
}
@ -194,6 +190,10 @@ func (r *Rendezvous) RegisterRelayShards(ctx context.Context, rs protocol.RelayS
}
}
func (r *Rendezvous) PeerChannel() <-chan v2.PeerData {
return r.peerCh
}
func (r *Rendezvous) Stop() {
r.cancel()
r.wg.Wait()

View File

@ -431,6 +431,14 @@ func (s *String) readBase128Int(out *int) bool {
}
ret <<= 7
b := s.read(1)[0]
// ITU-T X.690, section 8.19.2:
// The subidentifier shall be encoded in the fewest possible octets,
// that is, the leading octet of the subidentifier shall not have the value 0x80.
if i == 0 && b == 0x80 {
return false
}
ret |= int(b & 0x7f)
if b&0x80 == 0 {
*out = ret

View File

@ -5,71 +5,18 @@
// Package curve25519 provides an implementation of the X25519 function, which
// performs scalar multiplication on the elliptic curve known as Curve25519.
// See RFC 7748.
//
// Starting in Go 1.20, this package is a wrapper for the X25519 implementation
// in the crypto/ecdh package.
package curve25519 // import "golang.org/x/crypto/curve25519"
import (
"crypto/subtle"
"errors"
"strconv"
"golang.org/x/crypto/curve25519/internal/field"
)
// ScalarMult sets dst to the product scalar * point.
//
// Deprecated: when provided a low-order point, ScalarMult will set dst to all
// zeroes, irrespective of the scalar. Instead, use the X25519 function, which
// will return an error.
func ScalarMult(dst, scalar, point *[32]byte) {
var e [32]byte
copy(e[:], scalar[:])
e[0] &= 248
e[31] &= 127
e[31] |= 64
var x1, x2, z2, x3, z3, tmp0, tmp1 field.Element
x1.SetBytes(point[:])
x2.One()
x3.Set(&x1)
z3.One()
swap := 0
for pos := 254; pos >= 0; pos-- {
b := e[pos/8] >> uint(pos&7)
b &= 1
swap ^= int(b)
x2.Swap(&x3, swap)
z2.Swap(&z3, swap)
swap = int(b)
tmp0.Subtract(&x3, &z3)
tmp1.Subtract(&x2, &z2)
x2.Add(&x2, &z2)
z2.Add(&x3, &z3)
z3.Multiply(&tmp0, &x2)
z2.Multiply(&z2, &tmp1)
tmp0.Square(&tmp1)
tmp1.Square(&x2)
x3.Add(&z3, &z2)
z2.Subtract(&z3, &z2)
x2.Multiply(&tmp1, &tmp0)
tmp1.Subtract(&tmp1, &tmp0)
z2.Square(&z2)
z3.Mult32(&tmp1, 121666)
x3.Square(&x3)
tmp0.Add(&tmp0, &z3)
z3.Multiply(&x1, &z2)
z2.Multiply(&tmp1, &tmp0)
}
x2.Swap(&x3, swap)
z2.Swap(&z3, swap)
z2.Invert(&z2)
x2.Multiply(&x2, &z2)
copy(dst[:], x2.Bytes())
scalarMult(dst, scalar, point)
}
// ScalarBaseMult sets dst to the product scalar * base where base is the
@ -78,7 +25,7 @@ func ScalarMult(dst, scalar, point *[32]byte) {
// It is recommended to use the X25519 function with Basepoint instead, as
// copying into fixed size arrays can lead to unexpected bugs.
func ScalarBaseMult(dst, scalar *[32]byte) {
ScalarMult(dst, scalar, &basePoint)
scalarBaseMult(dst, scalar)
}
const (
@ -91,21 +38,10 @@ const (
// Basepoint is the canonical Curve25519 generator.
var Basepoint []byte
var basePoint = [32]byte{9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
var basePoint = [32]byte{9}
func init() { Basepoint = basePoint[:] }
func checkBasepoint() {
if subtle.ConstantTimeCompare(Basepoint, []byte{
0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}) != 1 {
panic("curve25519: global Basepoint value was modified")
}
}
// X25519 returns the result of the scalar multiplication (scalar * point),
// according to RFC 7748, Section 5. scalar, point and the return value are
// slices of 32 bytes.
@ -121,26 +57,3 @@ func X25519(scalar, point []byte) ([]byte, error) {
var dst [32]byte
return x25519(&dst, scalar, point)
}
func x25519(dst *[32]byte, scalar, point []byte) ([]byte, error) {
var in [32]byte
if l := len(scalar); l != 32 {
return nil, errors.New("bad scalar length: " + strconv.Itoa(l) + ", expected 32")
}
if l := len(point); l != 32 {
return nil, errors.New("bad point length: " + strconv.Itoa(l) + ", expected 32")
}
copy(in[:], scalar)
if &point[0] == &Basepoint[0] {
checkBasepoint()
ScalarBaseMult(dst, &in)
} else {
var base, zero [32]byte
copy(base[:], point)
ScalarMult(dst, &in, &base)
if subtle.ConstantTimeCompare(dst[:], zero[:]) == 1 {
return nil, errors.New("bad input point: low order point")
}
}
return dst[:], nil
}

View File

@ -0,0 +1,105 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.20
package curve25519
import (
"crypto/subtle"
"errors"
"strconv"
"golang.org/x/crypto/curve25519/internal/field"
)
func scalarMult(dst, scalar, point *[32]byte) {
var e [32]byte
copy(e[:], scalar[:])
e[0] &= 248
e[31] &= 127
e[31] |= 64
var x1, x2, z2, x3, z3, tmp0, tmp1 field.Element
x1.SetBytes(point[:])
x2.One()
x3.Set(&x1)
z3.One()
swap := 0
for pos := 254; pos >= 0; pos-- {
b := e[pos/8] >> uint(pos&7)
b &= 1
swap ^= int(b)
x2.Swap(&x3, swap)
z2.Swap(&z3, swap)
swap = int(b)
tmp0.Subtract(&x3, &z3)
tmp1.Subtract(&x2, &z2)
x2.Add(&x2, &z2)
z2.Add(&x3, &z3)
z3.Multiply(&tmp0, &x2)
z2.Multiply(&z2, &tmp1)
tmp0.Square(&tmp1)
tmp1.Square(&x2)
x3.Add(&z3, &z2)
z2.Subtract(&z3, &z2)
x2.Multiply(&tmp1, &tmp0)
tmp1.Subtract(&tmp1, &tmp0)
z2.Square(&z2)
z3.Mult32(&tmp1, 121666)
x3.Square(&x3)
tmp0.Add(&tmp0, &z3)
z3.Multiply(&x1, &z2)
z2.Multiply(&tmp1, &tmp0)
}
x2.Swap(&x3, swap)
z2.Swap(&z3, swap)
z2.Invert(&z2)
x2.Multiply(&x2, &z2)
copy(dst[:], x2.Bytes())
}
func scalarBaseMult(dst, scalar *[32]byte) {
checkBasepoint()
scalarMult(dst, scalar, &basePoint)
}
func x25519(dst *[32]byte, scalar, point []byte) ([]byte, error) {
var in [32]byte
if l := len(scalar); l != 32 {
return nil, errors.New("bad scalar length: " + strconv.Itoa(l) + ", expected 32")
}
if l := len(point); l != 32 {
return nil, errors.New("bad point length: " + strconv.Itoa(l) + ", expected 32")
}
copy(in[:], scalar)
if &point[0] == &Basepoint[0] {
scalarBaseMult(dst, &in)
} else {
var base, zero [32]byte
copy(base[:], point)
scalarMult(dst, &in, &base)
if subtle.ConstantTimeCompare(dst[:], zero[:]) == 1 {
return nil, errors.New("bad input point: low order point")
}
}
return dst[:], nil
}
func checkBasepoint() {
if subtle.ConstantTimeCompare(Basepoint, []byte{
0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}) != 1 {
panic("curve25519: global Basepoint value was modified")
}
}

View File

@ -0,0 +1,46 @@
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.20
package curve25519
import "crypto/ecdh"
func x25519(dst *[32]byte, scalar, point []byte) ([]byte, error) {
curve := ecdh.X25519()
pub, err := curve.NewPublicKey(point)
if err != nil {
return nil, err
}
priv, err := curve.NewPrivateKey(scalar)
if err != nil {
return nil, err
}
out, err := priv.ECDH(pub)
if err != nil {
return nil, err
}
copy(dst[:], out)
return dst[:], nil
}
func scalarMult(dst, scalar, point *[32]byte) {
if _, err := x25519(dst, scalar[:], point[:]); err != nil {
// The only error condition for x25519 when the inputs are 32 bytes long
// is if the output would have been the all-zero value.
for i := range dst {
dst[i] = 0
}
}
}
func scalarBaseMult(dst, scalar *[32]byte) {
curve := ecdh.X25519()
priv, err := curve.NewPrivateKey(scalar[:])
if err != nil {
panic("curve25519: internal error: scalarBaseMult was not 32 bytes")
}
copy(dst[:], priv.PublicKey().Bytes())
}

5
vendor/modules.txt vendored
View File

@ -999,11 +999,10 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.7.1-0.20230703171720-78ed073c7542
# github.com/waku-org/go-waku v0.7.1-0.20230707130306-0d4ff8bff7f4
## explicit; go 1.19
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/waku/persistence
github.com/waku-org/go-waku/waku/try
github.com/waku-org/go-waku/waku/v2
github.com/waku-org/go-waku/waku/v2/discv5
github.com/waku-org/go-waku/waku/v2/dnsdisc
@ -1142,7 +1141,7 @@ go.uber.org/zap/internal/bufferpool
go.uber.org/zap/internal/color
go.uber.org/zap/internal/exit
go.uber.org/zap/zapcore
# golang.org/x/crypto v0.7.0
# golang.org/x/crypto v0.9.0
## explicit; go 1.17
golang.org/x/crypto/blake2b
golang.org/x/crypto/blake2s