mirror of
https://github.com/status-im/status-go.git
synced 2025-01-21 20:20:29 +00:00
refactor(nwaku)_: ping
This commit is contained in:
parent
730d11821a
commit
948c6d3497
@ -331,7 +331,9 @@ func (w *gethWakuV2Wrapper) OnStorenodeAvailable() <-chan peer.ID {
|
||||
}
|
||||
|
||||
func (w *gethWakuV2Wrapper) WaitForAvailableStoreNode(timeout time.Duration) bool {
|
||||
return w.waku.StorenodeCycle.WaitForAvailableStoreNode(context.TODO(), timeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
return w.waku.StorenodeCycle.WaitForAvailableStoreNode(ctx)
|
||||
}
|
||||
|
||||
func (w *gethWakuV2Wrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) {
|
||||
|
2
go.mod
2
go.mod
@ -95,7 +95,7 @@ require (
|
||||
github.com/schollz/peerdiscovery v1.7.0
|
||||
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
||||
github.com/urfave/cli/v2 v2.27.2
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241024184757-fdb3c3d0b35a
|
||||
github.com/wk8/go-ordered-map/v2 v2.1.7
|
||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||
|
4
go.sum
4
go.sum
@ -2140,8 +2140,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27
|
||||
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056 h1:R2LscQHxKdVVdRIz7zcZWOkjcZDz753fflW5TPunJN0=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241024184757-fdb3c3d0b35a h1:epN2bp1mPzdg3S7S2iR72GsUPix7irc3UgM4W9NZJpU=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241024184757-fdb3c3d0b35a/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||
|
37
vendor/github.com/waku-org/go-waku/waku/v2/api/common/pinger.go
generated
vendored
Normal file
37
vendor/github.com/waku-org/go-waku/waku/v2/api/common/pinger.go
generated
vendored
Normal file
@ -0,0 +1,37 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
)
|
||||
|
||||
type Pinger interface {
|
||||
PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error)
|
||||
}
|
||||
|
||||
type defaultPingImpl struct {
|
||||
host host.Host
|
||||
}
|
||||
|
||||
func NewDefaultPinger(host host.Host) Pinger {
|
||||
return &defaultPingImpl{
|
||||
host: host,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *defaultPingImpl) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
|
||||
pingResultCh := ping.Ping(ctx, d.host, peerID)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
case r := <-pingResultCh:
|
||||
if r.Error != nil {
|
||||
return 0, r.Error
|
||||
}
|
||||
return r.RTT, nil
|
||||
}
|
||||
}
|
26
vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go
generated
vendored
26
vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go
generated
vendored
@ -14,9 +14,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/waku-org/go-waku/waku/v2/api/common"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -55,9 +54,8 @@ type StorenodeCycle struct {
|
||||
|
||||
logger *zap.Logger
|
||||
|
||||
host host.Host
|
||||
|
||||
storenodeConfigProvider StorenodeConfigProvider
|
||||
pinger common.Pinger
|
||||
|
||||
StorenodeAvailableOneshotEmitter *OneShotEmitter[struct{}]
|
||||
StorenodeChangedEmitter *Emitter[peer.ID]
|
||||
@ -71,7 +69,7 @@ type StorenodeCycle struct {
|
||||
peers map[peer.ID]peerStatus
|
||||
}
|
||||
|
||||
func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle {
|
||||
func NewStorenodeCycle(logger *zap.Logger, pinger common.Pinger) *StorenodeCycle {
|
||||
return &StorenodeCycle{
|
||||
StorenodeAvailableOneshotEmitter: NewOneshotEmitter[struct{}](),
|
||||
StorenodeChangedEmitter: NewEmitter[peer.ID](),
|
||||
@ -81,9 +79,8 @@ func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) Start(ctx context.Context, h host.Host) {
|
||||
func (m *StorenodeCycle) Start(ctx context.Context) {
|
||||
m.logger.Debug("starting storenode cycle")
|
||||
m.host = h
|
||||
m.failedRequests = make(map[peer.ID]uint)
|
||||
m.peers = make(map[peer.ID]peerStatus)
|
||||
|
||||
@ -194,7 +191,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context,
|
||||
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
|
||||
defer cancel()
|
||||
|
||||
rtt, err := m.pingPeer(ctx, peerID)
|
||||
rtt, err := m.pinger.PingPeer(ctx, peerID)
|
||||
if err == nil { // pinging storenodes might fail, but we don't care
|
||||
availableStorenodesMutex.Lock()
|
||||
availableStorenodes[peerID] = rtt
|
||||
@ -233,19 +230,6 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context,
|
||||
return result
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) pingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
|
||||
pingResultCh := ping.Ping(ctx, m.host, peerID)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
case r := <-pingResultCh:
|
||||
if r.Error != nil {
|
||||
return 0, r.Error
|
||||
}
|
||||
return r.RTT, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
|
||||
// we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581
|
||||
if overrideDNS {
|
||||
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@ -1031,7 +1031,7 @@ github.com/waku-org/go-discover/discover/v5wire
|
||||
github.com/waku-org/go-libp2p-rendezvous
|
||||
github.com/waku-org/go-libp2p-rendezvous/db
|
||||
github.com/waku-org/go-libp2p-rendezvous/pb
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20241024184757-fdb3c3d0b35a
|
||||
## explicit; go 1.21
|
||||
github.com/waku-org/go-waku/logging
|
||||
github.com/waku-org/go-waku/tests
|
||||
|
@ -75,6 +75,8 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
|
||||
commonapi "github.com/waku-org/go-waku/waku/v2/api/common"
|
||||
|
||||
gocommon "github.com/status-im/status-go/common"
|
||||
"github.com/status-im/status-go/connection"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
@ -1067,10 +1069,10 @@ func (w *Waku) Start() error {
|
||||
return fmt.Errorf("failed to start go-waku node: %v", err)
|
||||
}
|
||||
|
||||
w.StorenodeCycle = history.NewStorenodeCycle(w.logger)
|
||||
w.HistoryRetriever = history.NewHistoryRetriever(w.node.Store(), NewHistoryProcessorWrapper(w), w.logger)
|
||||
w.StorenodeCycle = history.NewStorenodeCycle(w.logger, commonapi.NewDefaultPinger(w.node.Host()))
|
||||
w.HistoryRetriever = history.NewHistoryRetriever(missing.NewDefaultStorenodeRequestor(w.node.Store()), NewHistoryProcessorWrapper(w), w.logger)
|
||||
|
||||
w.StorenodeCycle.Start(w.ctx, w.node.Host())
|
||||
w.StorenodeCycle.Start(w.ctx)
|
||||
|
||||
w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID()))
|
||||
|
||||
|
@ -212,6 +212,10 @@ package wakuv2
|
||||
WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) callback, resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) {
|
||||
WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) callback, resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) {
|
||||
WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) );
|
||||
}
|
||||
@ -1374,10 +1378,10 @@ func (w *Waku) Start() error {
|
||||
return fmt.Errorf("failed to start go-waku node: %v", err)
|
||||
}
|
||||
*/
|
||||
w.StorenodeCycle = history.NewStorenodeCycle(w.logger)
|
||||
w.StorenodeCycle = history.NewStorenodeCycle(w.logger, newPinger(w.wakuCtx))
|
||||
|
||||
w.HistoryRetriever = history.NewHistoryRetriever(newStorenodeRequestor(w.wakuCtx, w.logger), NewHistoryProcessorWrapper(w), w.logger)
|
||||
w.StorenodeCycle.Start(w.ctx, newPinger(w.wakuCtx))
|
||||
w.StorenodeCycle.Start(w.ctx)
|
||||
|
||||
w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.PeerID()))
|
||||
|
||||
@ -3064,18 +3068,48 @@ func (d *storenodeMessageVerifier) MessageHashesExist(ctx context.Context, reque
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) commonapi.StorenodeRequestor {
|
||||
return &storenodeRequestor{
|
||||
type pinger struct {
|
||||
wakuCtx unsafe.Pointer
|
||||
}
|
||||
|
||||
func newPinger(wakuCtx unsafe.Pointer) commonapi.Pinger {
|
||||
return &pinger{
|
||||
wakuCtx: wakuCtx,
|
||||
logger: logger.Named("storenodeRequestor"),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *pinger) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
|
||||
var resp = C.allocResp()
|
||||
var cPeerId = C.CString(peerID.String())
|
||||
defer C.freeResp(resp)
|
||||
defer C.free(unsafe.Pointer(cPeerId))
|
||||
|
||||
C.cGoWakuPingPeer(p.wakuCtx, cPeerId, C.int(time.Minute.Milliseconds()), resp)
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
rttStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
rttInt, err := strconv.ParseInt(rttStr, 10, 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return time.Duration(rttInt), nil
|
||||
}
|
||||
|
||||
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
return 0, fmt.Errorf("PingPeer: %s", errMsg)
|
||||
}
|
||||
|
||||
type storenodeRequestor struct {
|
||||
wakuCtx unsafe.Pointer
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) commonapi.StorenodeRequestor {
|
||||
return &storenodeRequestor{
|
||||
wakuCtx: wakuCtx,
|
||||
logger: logger.Named("storenodeRequestor"),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *storenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (commonapi.StoreRequestResult, error) {
|
||||
requestIDStr := hex.EncodeToString(protocol.GenerateRequestID())
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user