chore: bum go-waku (#2430)
This commit is contained in:
parent
53bbfb3f08
commit
f24cc2e9fc
2
go.mod
2
go.mod
|
@ -49,7 +49,7 @@ require (
|
|||
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
|
||||
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
|
||||
github.com/status-im/doubleratchet v3.0.0+incompatible
|
||||
github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc
|
||||
github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1
|
||||
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432
|
||||
github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6
|
||||
github.com/status-im/migrate/v4 v4.6.2-status.2
|
||||
|
|
4
go.sum
4
go.sum
|
@ -1207,8 +1207,8 @@ github.com/status-im/go-ethereum v1.10.4-status.3 h1:RF618iSCvqJtXu3ZSg7XNg6MJaS
|
|||
github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE=
|
||||
github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE=
|
||||
github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU=
|
||||
github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc h1:OBoMUanISPnSAoMg0GIGGz6raeohIbHyhCjFbfSuea4=
|
||||
github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8=
|
||||
github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1 h1:SlnFFjgrrtI2XKRWWa2ZQNqJ1qJ2/X0fYVKPoBI2c5Q=
|
||||
github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8=
|
||||
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4=
|
||||
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20=
|
||||
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=
|
||||
|
|
|
@ -40,7 +40,6 @@ type WakuNode struct {
|
|||
filter *filter.WakuFilter
|
||||
lightPush *lightpush.WakuLightPush
|
||||
rendezvous *rendezvous.RendezvousService
|
||||
ping *ping.PingService
|
||||
store *store.WakuStore
|
||||
|
||||
bcaster v2.Broadcaster
|
||||
|
@ -402,7 +401,6 @@ func (w *WakuNode) Peers() PeerStats {
|
|||
func (w *WakuNode) startKeepAlive(t time.Duration) {
|
||||
log.Info("Setting up ping protocol with duration of ", t)
|
||||
|
||||
w.ping = ping.NewPingService(w.host)
|
||||
ticker := time.NewTicker(t)
|
||||
|
||||
go func() {
|
||||
|
@ -416,7 +414,9 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
|
|||
// which is not possible when iterating
|
||||
// through Network's peer collection, as it will be empty
|
||||
for _, p := range w.host.Peerstore().Peers() {
|
||||
go pingPeer(w.ctx, w.ping, p)
|
||||
if p != w.host.ID() {
|
||||
go pingPeer(w.ctx, w.host, p)
|
||||
}
|
||||
}
|
||||
case <-w.quit:
|
||||
ticker.Stop()
|
||||
|
@ -426,12 +426,12 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
|
|||
}()
|
||||
}
|
||||
|
||||
func pingPeer(ctx context.Context, pingService *ping.PingService, peer peer.ID) {
|
||||
func pingPeer(ctx context.Context, host host.Host, peer peer.ID) {
|
||||
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
log.Debug("Pinging ", peer)
|
||||
pr := pingService.Ping(ctx, peer)
|
||||
pr := ping.Ping(ctx, host, peer)
|
||||
select {
|
||||
case res := <-pr:
|
||||
if res.Error != nil {
|
||||
|
|
|
@ -85,6 +85,17 @@ func WithAutomaticPeerSelection() FilterSubscribeOption {
|
|||
}
|
||||
}
|
||||
|
||||
func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1))
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
log.Info("Error selecting peer: ", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultOptions() []FilterSubscribeOption {
|
||||
return []FilterSubscribeOption{
|
||||
WithAutomaticPeerSelection(),
|
||||
|
|
1
vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
1
vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
|
@ -122,6 +122,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
|
|||
|
||||
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
|
||||
params := new(LightPushParameters)
|
||||
params.host = wakuLP.h
|
||||
|
||||
optList := DefaultOptions(wakuLP.h)
|
||||
optList = append(optList, opts...)
|
||||
|
|
14
vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go
generated
vendored
14
vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go
generated
vendored
|
@ -1,6 +1,8 @@
|
|||
package lightpush
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||
|
@ -8,6 +10,7 @@ import (
|
|||
)
|
||||
|
||||
type LightPushParameters struct {
|
||||
host host.Host
|
||||
selectedPeer peer.ID
|
||||
requestId []byte
|
||||
}
|
||||
|
@ -31,6 +34,17 @@ func WithAutomaticPeerSelection(host host.Host) LightPushOption {
|
|||
}
|
||||
}
|
||||
|
||||
func WithFastestPeerSelection(ctx context.Context) LightPushOption {
|
||||
return func(params *LightPushParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1))
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
log.Info("Error selecting peer: ", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func WithRequestId(requestId []byte) LightPushOption {
|
||||
return func(params *LightPushParameters) {
|
||||
params.requestId = requestId
|
||||
|
|
|
@ -27,7 +27,8 @@ var log = logging.Logger("wakurelay")
|
|||
type Topic string
|
||||
|
||||
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
|
||||
const DefaultWakuTopic Topic = "/waku/2/default-waku/proto"
|
||||
|
||||
var DefaultWakuTopic Topic = Topic(waku_proto.DefaultPubsubTopic().String())
|
||||
|
||||
type WakuRelay struct {
|
||||
host host.Host
|
||||
|
|
|
@ -442,6 +442,17 @@ func WithAutomaticPeerSelection() HistoryRequestOption {
|
|||
}
|
||||
}
|
||||
|
||||
func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta3))
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
log.Info("Error selecting peer: ", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func WithRequestId(requestId []byte) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
params.requestId = requestId
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
package protocol
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var ErrInvalidFormat = errors.New("invalid format")
|
||||
|
||||
type ContentTopic struct {
|
||||
ApplicationName string
|
||||
ApplicationVersion uint
|
||||
ContentTopicName string
|
||||
Encoding string
|
||||
}
|
||||
|
||||
func (ct ContentTopic) String() string {
|
||||
return fmt.Sprintf("/%s/%d/%s/%s", ct.ApplicationName, ct.ApplicationVersion, ct.ContentTopicName, ct.Encoding)
|
||||
}
|
||||
|
||||
func NewContentTopic(applicationName string, applicationVersion uint, contentTopicName string, encoding string) ContentTopic {
|
||||
return ContentTopic{
|
||||
ApplicationName: applicationName,
|
||||
ApplicationVersion: applicationVersion,
|
||||
ContentTopicName: contentTopicName,
|
||||
Encoding: encoding,
|
||||
}
|
||||
}
|
||||
|
||||
func (ct ContentTopic) Equal(ct2 ContentTopic) bool {
|
||||
return ct.ApplicationName == ct2.ApplicationName && ct.ApplicationVersion == ct2.ApplicationVersion &&
|
||||
ct.ContentTopicName == ct2.ContentTopicName && ct.Encoding == ct2.Encoding
|
||||
}
|
||||
|
||||
func StringToContentTopic(s string) (ContentTopic, error) {
|
||||
p := strings.Split(s, "/")
|
||||
|
||||
if len(p) != 5 || p[0] != "" || p[1] == "" || p[2] == "" || p[3] == "" || p[4] == "" {
|
||||
return ContentTopic{}, ErrInvalidFormat
|
||||
}
|
||||
|
||||
vNum, err := strconv.ParseUint(p[2], 10, 32)
|
||||
if err != nil {
|
||||
return ContentTopic{}, ErrInvalidFormat
|
||||
}
|
||||
|
||||
return ContentTopic{
|
||||
ApplicationName: p[1],
|
||||
ApplicationVersion: uint(vNum),
|
||||
ContentTopicName: p[3],
|
||||
Encoding: p[4],
|
||||
}, nil
|
||||
}
|
||||
|
||||
type PubsubTopic struct {
|
||||
Name string
|
||||
Encoding string
|
||||
}
|
||||
|
||||
func (t PubsubTopic) String() string {
|
||||
return fmt.Sprintf("/waku/2/%s/%s", t.Name, t.Encoding)
|
||||
}
|
||||
|
||||
func DefaultPubsubTopic() PubsubTopic {
|
||||
return NewPubsubTopic("default-waku", "proto")
|
||||
}
|
||||
|
||||
func NewPubsubTopic(name string, encoding string) PubsubTopic {
|
||||
return PubsubTopic{
|
||||
Name: name,
|
||||
Encoding: encoding,
|
||||
}
|
||||
}
|
||||
|
||||
func (t PubsubTopic) Equal(t2 PubsubTopic) bool {
|
||||
return t.Name == t2.Name && t.Encoding == t2.Encoding
|
||||
}
|
||||
|
||||
func StringToPubsubTopic(s string) (PubsubTopic, error) {
|
||||
p := strings.Split(s, "/")
|
||||
if len(p) != 5 || p[0] != "" || p[1] != "waku" || p[2] != "2" || p[3] == "" || p[4] == "" {
|
||||
return PubsubTopic{}, ErrInvalidFormat
|
||||
}
|
||||
|
||||
return PubsubTopic{
|
||||
Name: p[3],
|
||||
Encoding: p[4],
|
||||
}, nil
|
||||
}
|
|
@ -1,16 +1,23 @@
|
|||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
)
|
||||
|
||||
var log = logging.Logger("utils")
|
||||
|
||||
var ErrNoPeersAvailable = errors.New("no suitable peers found")
|
||||
var PingServiceNotAvailable = errors.New("ping service not available")
|
||||
|
||||
// SelectPeer is used to return a random peer that supports a given protocol.
|
||||
func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) {
|
||||
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
|
||||
|
@ -37,5 +44,71 @@ func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) {
|
|||
return &peers[rand.Intn(len(peers))], nil // nolint: gosec
|
||||
}
|
||||
|
||||
return nil, errors.New("no suitable peers found")
|
||||
return nil, ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
type pingResult struct {
|
||||
p peer.ID
|
||||
rtt time.Duration
|
||||
}
|
||||
|
||||
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string) (*peer.ID, error) {
|
||||
var peers peer.IDSlice
|
||||
for _, peer := range host.Peerstore().Peers() {
|
||||
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
|
||||
if err != nil {
|
||||
log.Error("error obtaining the protocols supported by peers", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(protocols) > 0 {
|
||||
peers = append(peers, peer)
|
||||
}
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
waitCh := make(chan struct{})
|
||||
pingCh := make(chan pingResult, 1000)
|
||||
|
||||
go func() {
|
||||
for _, p := range peers {
|
||||
wg.Add(1)
|
||||
go func(p peer.ID) {
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
result := <-ping.Ping(ctx, host, p)
|
||||
if result.Error == nil {
|
||||
pingCh <- pingResult{
|
||||
p: p,
|
||||
rtt: result.RTT,
|
||||
}
|
||||
}
|
||||
}(p)
|
||||
}
|
||||
wg.Wait()
|
||||
close(waitCh)
|
||||
close(pingCh)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-waitCh:
|
||||
var min *pingResult
|
||||
for p := range pingCh {
|
||||
if min == nil {
|
||||
min = &p
|
||||
} else {
|
||||
if p.rtt < min.rtt {
|
||||
min = &p
|
||||
}
|
||||
}
|
||||
}
|
||||
if min == nil {
|
||||
return nil, ErrNoPeersAvailable
|
||||
} else {
|
||||
return &min.p, nil
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil, ErrNoPeersAvailable
|
||||
}
|
||||
}
|
||||
|
|
|
@ -447,7 +447,7 @@ github.com/spacemonkeygo/spacelog
|
|||
github.com/status-im/doubleratchet
|
||||
# github.com/status-im/go-multiaddr-ethv4 v1.2.1
|
||||
github.com/status-im/go-multiaddr-ethv4
|
||||
# github.com/status-im/go-waku v0.0.0-20211109161857-9426cd133acc
|
||||
# github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1
|
||||
github.com/status-im/go-waku/waku/persistence
|
||||
github.com/status-im/go-waku/waku/v2
|
||||
github.com/status-im/go-waku/waku/v2/discovery
|
||||
|
|
Loading…
Reference in New Issue