diff --git a/go.mod b/go.mod index 57f5baa80..617ee45f7 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a github.com/status-im/doubleratchet v3.0.0+incompatible - github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc + github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1 github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6 github.com/status-im/migrate/v4 v4.6.2-status.2 diff --git a/go.sum b/go.sum index d85568412..05925648a 100644 --- a/go.sum +++ b/go.sum @@ -1207,8 +1207,8 @@ github.com/status-im/go-ethereum v1.10.4-status.3 h1:RF618iSCvqJtXu3ZSg7XNg6MJaS github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE= github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE= github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU= -github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc h1:OBoMUanISPnSAoMg0GIGGz6raeohIbHyhCjFbfSuea4= -github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8= +github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1 h1:SlnFFjgrrtI2XKRWWa2ZQNqJ1qJ2/X0fYVKPoBI2c5Q= +github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go index 8d64967cb..32b3de513 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go @@ -40,7 +40,6 @@ type WakuNode struct { filter *filter.WakuFilter lightPush *lightpush.WakuLightPush rendezvous *rendezvous.RendezvousService - ping *ping.PingService store *store.WakuStore bcaster v2.Broadcaster @@ -402,7 +401,6 @@ func (w *WakuNode) Peers() PeerStats { func (w *WakuNode) startKeepAlive(t time.Duration) { log.Info("Setting up ping protocol with duration of ", t) - w.ping = ping.NewPingService(w.host) ticker := time.NewTicker(t) go func() { @@ -416,7 +414,9 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { // which is not possible when iterating // through Network's peer collection, as it will be empty for _, p := range w.host.Peerstore().Peers() { - go pingPeer(w.ctx, w.ping, p) + if p != w.host.ID() { + go pingPeer(w.ctx, w.host, p) + } } case <-w.quit: ticker.Stop() @@ -426,12 +426,12 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { }() } -func pingPeer(ctx context.Context, pingService *ping.PingService, peer peer.ID) { +func pingPeer(ctx context.Context, host host.Host, peer peer.ID) { ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() log.Debug("Pinging ", peer) - pr := pingService.Ping(ctx, peer) + pr := ping.Ping(ctx, host, peer) select { case res := <-pr: if res.Error != nil { diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go index 303a0aeb8..161a9782c 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go @@ -85,6 +85,17 @@ func WithAutomaticPeerSelection() FilterSubscribeOption { } } +func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1)) + if err == nil { + params.selectedPeer = *p + } else { + log.Info("Error selecting peer: ", err) + } + } +} + func DefaultOptions() []FilterSubscribeOption { return []FilterSubscribeOption{ WithAutomaticPeerSelection(), diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index df90caa60..053364b45 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -122,6 +122,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) { params := new(LightPushParameters) + params.host = wakuLP.h optList := DefaultOptions(wakuLP.h) optList = append(optList, opts...) diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go index 13758d567..d880f5978 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -1,6 +1,8 @@ package lightpush import ( + "context" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/status-im/go-waku/waku/v2/protocol" @@ -8,6 +10,7 @@ import ( ) type LightPushParameters struct { + host host.Host selectedPeer peer.ID requestId []byte } @@ -31,6 +34,17 @@ func WithAutomaticPeerSelection(host host.Host) LightPushOption { } } +func WithFastestPeerSelection(ctx context.Context) LightPushOption { + return func(params *LightPushParameters) { + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1)) + if err == nil { + params.selectedPeer = *p + } else { + log.Info("Error selecting peer: ", err) + } + } +} + func WithRequestId(requestId []byte) LightPushOption { return func(params *LightPushParameters) { params.requestId = requestId diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go index 8e4c79329..be1e2cf42 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go @@ -27,7 +27,8 @@ var log = logging.Logger("wakurelay") type Topic string const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") -const DefaultWakuTopic Topic = "/waku/2/default-waku/proto" + +var DefaultWakuTopic Topic = Topic(waku_proto.DefaultPubsubTopic().String()) type WakuRelay struct { host host.Host diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go index 5bccc0c99..90ec1a5a0 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go @@ -442,6 +442,17 @@ func WithAutomaticPeerSelection() HistoryRequestOption { } } +func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption { + return func(params *HistoryRequestParameters) { + p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta3)) + if err == nil { + params.selectedPeer = *p + } else { + log.Info("Error selecting peer: ", err) + } + } +} + func WithRequestId(requestId []byte) HistoryRequestOption { return func(params *HistoryRequestParameters) { params.requestId = requestId diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/topic.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/topic.go new file mode 100644 index 000000000..2c9aba4ff --- /dev/null +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/topic.go @@ -0,0 +1,91 @@ +package protocol + +import ( + "errors" + "fmt" + "strconv" + "strings" +) + +var ErrInvalidFormat = errors.New("invalid format") + +type ContentTopic struct { + ApplicationName string + ApplicationVersion uint + ContentTopicName string + Encoding string +} + +func (ct ContentTopic) String() string { + return fmt.Sprintf("/%s/%d/%s/%s", ct.ApplicationName, ct.ApplicationVersion, ct.ContentTopicName, ct.Encoding) +} + +func NewContentTopic(applicationName string, applicationVersion uint, contentTopicName string, encoding string) ContentTopic { + return ContentTopic{ + ApplicationName: applicationName, + ApplicationVersion: applicationVersion, + ContentTopicName: contentTopicName, + Encoding: encoding, + } +} + +func (ct ContentTopic) Equal(ct2 ContentTopic) bool { + return ct.ApplicationName == ct2.ApplicationName && ct.ApplicationVersion == ct2.ApplicationVersion && + ct.ContentTopicName == ct2.ContentTopicName && ct.Encoding == ct2.Encoding +} + +func StringToContentTopic(s string) (ContentTopic, error) { + p := strings.Split(s, "/") + + if len(p) != 5 || p[0] != "" || p[1] == "" || p[2] == "" || p[3] == "" || p[4] == "" { + return ContentTopic{}, ErrInvalidFormat + } + + vNum, err := strconv.ParseUint(p[2], 10, 32) + if err != nil { + return ContentTopic{}, ErrInvalidFormat + } + + return ContentTopic{ + ApplicationName: p[1], + ApplicationVersion: uint(vNum), + ContentTopicName: p[3], + Encoding: p[4], + }, nil +} + +type PubsubTopic struct { + Name string + Encoding string +} + +func (t PubsubTopic) String() string { + return fmt.Sprintf("/waku/2/%s/%s", t.Name, t.Encoding) +} + +func DefaultPubsubTopic() PubsubTopic { + return NewPubsubTopic("default-waku", "proto") +} + +func NewPubsubTopic(name string, encoding string) PubsubTopic { + return PubsubTopic{ + Name: name, + Encoding: encoding, + } +} + +func (t PubsubTopic) Equal(t2 PubsubTopic) bool { + return t.Name == t2.Name && t.Encoding == t2.Encoding +} + +func StringToPubsubTopic(s string) (PubsubTopic, error) { + p := strings.Split(s, "/") + if len(p) != 5 || p[0] != "" || p[1] != "waku" || p[2] != "2" || p[3] == "" || p[4] == "" { + return PubsubTopic{}, ErrInvalidFormat + } + + return PubsubTopic{ + Name: p[3], + Encoding: p[4], + }, nil +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/utils/peer.go b/vendor/github.com/status-im/go-waku/waku/v2/utils/peer.go index 3cc65c8f0..862363a4d 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/utils/peer.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/utils/peer.go @@ -1,16 +1,23 @@ package utils import ( + "context" "errors" "math/rand" + "sync" + "time" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) var log = logging.Logger("utils") +var ErrNoPeersAvailable = errors.New("no suitable peers found") +var PingServiceNotAvailable = errors.New("ping service not available") + // SelectPeer is used to return a random peer that supports a given protocol. func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) { // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. @@ -37,5 +44,71 @@ func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) { return &peers[rand.Intn(len(peers))], nil // nolint: gosec } - return nil, errors.New("no suitable peers found") + return nil, ErrNoPeersAvailable +} + +type pingResult struct { + p peer.ID + rtt time.Duration +} + +func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string) (*peer.ID, error) { + var peers peer.IDSlice + for _, peer := range host.Peerstore().Peers() { + protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId) + if err != nil { + log.Error("error obtaining the protocols supported by peers", err) + return nil, err + } + + if len(protocols) > 0 { + peers = append(peers, peer) + } + } + + wg := sync.WaitGroup{} + waitCh := make(chan struct{}) + pingCh := make(chan pingResult, 1000) + + go func() { + for _, p := range peers { + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + result := <-ping.Ping(ctx, host, p) + if result.Error == nil { + pingCh <- pingResult{ + p: p, + rtt: result.RTT, + } + } + }(p) + } + wg.Wait() + close(waitCh) + close(pingCh) + }() + + select { + case <-waitCh: + var min *pingResult + for p := range pingCh { + if min == nil { + min = &p + } else { + if p.rtt < min.rtt { + min = &p + } + } + } + if min == nil { + return nil, ErrNoPeersAvailable + } else { + return &min.p, nil + } + case <-ctx.Done(): + return nil, ErrNoPeersAvailable + } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 8a28f991f..503a6898b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -447,7 +447,7 @@ github.com/spacemonkeygo/spacelog github.com/status-im/doubleratchet # github.com/status-im/go-multiaddr-ethv4 v1.2.1 github.com/status-im/go-multiaddr-ethv4 -# github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc +# github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1 github.com/status-im/go-waku/waku/persistence github.com/status-im/go-waku/waku/v2 github.com/status-im/go-waku/waku/v2/discovery