some minor progress to add nwaku in status-go

This commit is contained in:
Ivan Folgueira Bande 2024-09-08 20:38:52 +02:00
parent fb1b1a8605
commit 6954612f54
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
4 changed files with 179 additions and 25 deletions

View File

@ -24,12 +24,12 @@ import (
"github.com/status-im/status-go/multiaccounts" "github.com/status-im/status-go/multiaccounts"
"github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/multiaccounts/settings"
"github.com/status-im/status-go/wakuv2"
"github.com/status-im/status-go/logutils" "github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol" "github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/identity/alias" "github.com/status-im/status-go/protocol/identity/alias"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
wakuextn "github.com/status-im/status-go/services/wakuext" wakuextn "github.com/status-im/status-go/services/wakuext"
@ -48,8 +48,8 @@ var (
seedPhrase = flag.String("seed-phrase", "", "Seed phrase") seedPhrase = flag.String("seed-phrase", "", "Seed phrase")
version = flag.Bool("version", false, "Print version and dump configuration") version = flag.Bool("version", false, "Print version and dump configuration")
communityID = flag.String("community-id", "", "The id of the community") communityID = flag.String("community-id", "", "The id of the community")
shardCluster = flag.Int("shard-cluster", shard.MainStatusShardCluster, "The shard cluster in which the of the community is published") shardCluster = flag.Int("shard-cluster", wakuv2.MainStatusShardCluster, "The shard cluster in which the of the community is published")
shardIndex = flag.Int("shard-index", shard.DefaultShardIndex, "The shard index in which the community is published") shardIndex = flag.Int("shard-index", wakuv2.DefaultShardIndex, "The shard index in which the community is published")
chatID = flag.String("chat-id", "", "The id of the chat") chatID = flag.String("chat-id", "", "The id of the chat")
dataDir = flag.String("dir", getDefaultDataDir(), "Directory used by node to store data") dataDir = flag.String("dir", getDefaultDataDir(), "Directory used by node to store data")
@ -148,9 +148,9 @@ func main() {
messenger := wakuextservice.Messenger() messenger := wakuextservice.Messenger()
var s *shard.Shard = nil var s *wakuv2.Shard = nil
if shardCluster != nil && shardIndex != nil { if shardCluster != nil && shardIndex != nil {
s = &shard.Shard{ s = &wakuv2.Shard{
Cluster: uint16(*shardCluster), Cluster: uint16(*shardCluster),
Index: uint16(*shardIndex), Index: uint16(*shardIndex),
} }

View File

@ -24,7 +24,6 @@ import (
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/wakuv2/common" "github.com/status-im/status-go/wakuv2/common"
) )
@ -57,7 +56,7 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) {
} }
found := false found := false
candidates := w.filters.GetWatchersByTopic(shard.DefaultShardPubsubTopic(), t1) candidates := w.filters.GetWatchersByTopic(DefaultShardPubsubTopic(), t1)
for _, f := range candidates { for _, f := range candidates {
if maps.Equal(f.ContentTopics, common.NewTopicSet(crit.ContentTopics)) { if maps.Equal(f.ContentTopics, common.NewTopicSet(crit.ContentTopics)) {
found = true found = true

View File

@ -188,6 +188,10 @@ package wakuv2
WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) );
} }
void cGoWakuGetNumConnectedPeers(void* ctx, char* pubSubTopic, void* resp) {
WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) );
}
void cGoWakuLightpushPublish(void* wakuCtx, void cGoWakuLightpushPublish(void* wakuCtx,
const char* pubSubTopic, const char* pubSubTopic,
const char* jsonWakuMessage, const char* jsonWakuMessage,
@ -214,6 +218,26 @@ package wakuv2
resp)); resp));
} }
void cGoWakuPeerExchangeQuery(void* wakuCtx,
uint64_t numPeers,
void* resp) {
WAKU_CALL (waku_peer_exchange_request(wakuCtx,
numPeers,
(WakuCallBack) callback,
resp));
}
void cGoWakuGetPeerIdsByProtocol(void* wakuCtx,
const char* protocol,
void* resp) {
WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx,
protocol,
(WakuCallBack) callback,
resp));
}
*/ */
import "C" import "C"
@ -226,6 +250,8 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"net/http"
"os" "os"
"os/signal" "os/signal"
"runtime" "runtime"
@ -246,6 +272,7 @@ import (
"github.com/jellydator/ttlcache/v3" "github.com/jellydator/ttlcache/v3"
"github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
peermod "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
@ -2092,6 +2119,20 @@ func (self *NWaku) wakuStoreQuery(
return "", errors.New(errMsg) return "", errors.New(errMsg)
} }
func (self *NWaku) WakuPeerExchangeRequest(numPeers uint64) (string, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuPeerExchangeQuery(self.wakuCtx, C.uint64_t(numPeers), resp)
if C.getRet(resp) == C.RET_OK {
msg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return msg, nil
}
errMsg := "error WakuPeerExchangeRequest: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return "", errors.New(errMsg)
}
func (self *NWaku) WakuConnect(peerMultiAddr string, timeoutMs int) error { func (self *NWaku) WakuConnect(peerMultiAddr string, timeoutMs int) error {
var resp = C.allocResp() var resp = C.allocResp()
var cPeerMultiAddr = C.CString(peerMultiAddr) var cPeerMultiAddr = C.CString(peerMultiAddr)
@ -2144,7 +2185,11 @@ func (self *NWaku) ENR() (*enode.Node, error) {
if C.getRet(resp) == C.RET_OK { if C.getRet(resp) == C.RET_OK {
enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return enode.Parse(enode.ValidSchemes, enrStr) n, err := enode.Parse(enode.ValidSchemes, enrStr)
if err != nil {
return nil, err
}
return n, nil
} }
errMsg := "error WakuGetMyENR: " + errMsg := "error WakuGetMyENR: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
@ -2174,6 +2219,59 @@ func (self *NWaku) ListPeersInMesh(pubsubTopic string) (int, error) {
return 0, errors.New(errMsg) return 0, errors.New(errMsg)
} }
func (self *NWaku) GetNumConnectedPeers(pubsubTopic string) (int, error) {
var resp = C.allocResp()
var cPubsubTopic = C.CString(pubsubTopic)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPubsubTopic))
C.cGoWakuGetNumConnectedPeers(self.wakuCtx, cPubsubTopic, resp)
if C.getRet(resp) == C.RET_OK {
numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
numPeers, err := strconv.Atoi(numPeersStr)
if err != nil {
fmt.Println(":", err)
errMsg := "ListPeersInMesh - error converting string to int: " + err.Error()
return 0, errors.New(errMsg)
}
return numPeers, nil
}
errMsg := "error ListPeersInMesh: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return 0, errors.New(errMsg)
}
func (self *NWaku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) {
var resp = C.allocResp()
var cProtocol = C.CString(protocol)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cProtocol))
C.cGoWakuGetPeerIdsByProtocol(self.wakuCtx, cProtocol, resp)
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
// peersStr contains a comma-separated list of peer ids
itemsPeerIds := strings.Split(peersStr, ",")
var peers peer.IDSlice
for _, peer := range itemsPeerIds {
id, err := peermod.Decode(peer)
if err != nil {
errMsg := "ListPeersInMesh - error converting string to int: " + err.Error()
return nil, errors.New(errMsg)
}
peers = append(peers, id)
}
return peers, nil
}
errMsg := "error GetPeerIdsByProtocol: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, errors.New(errMsg)
}
// func main() { // func main() {
// config := WakuConfig{ // config := WakuConfig{
@ -2424,3 +2522,51 @@ func New(nodeKey *ecdsa.PrivateKey,
// return waku, nil // return waku, nil
} }
type NwakuInfo struct {
ListenAddresses []string `json:"listenAddresses"`
EnrUri string `json:"enrUri"`
}
func GetNwakuInfo(host *string, port *int) (NwakuInfo, error) {
nwakuRestPort := 8645
if port != nil {
nwakuRestPort = *port
}
envNwakuRestPort := os.Getenv("NWAKU_REST_PORT")
if envNwakuRestPort != "" {
v, err := strconv.Atoi(envNwakuRestPort)
if err != nil {
return NwakuInfo{}, err
}
nwakuRestPort = v
}
nwakuRestHost := "localhost"
if host != nil {
nwakuRestHost = *host
}
envNwakuRestHost := os.Getenv("NWAKU_REST_HOST")
if envNwakuRestHost != "" {
nwakuRestHost = envNwakuRestHost
}
resp, err := http.Get(fmt.Sprintf("http://%s:%d/debug/v1/info", nwakuRestHost, nwakuRestPort))
if err != nil {
return NwakuInfo{}, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return NwakuInfo{}, err
}
var data NwakuInfo
err = json.Unmarshal(body, &data)
if err != nil {
return NwakuInfo{}, err
}
return data, nil
}

View File

@ -352,7 +352,10 @@ func TestPeerExchange(t *testing.T) {
config.EnableDiscV5 = true config.EnableDiscV5 = true
config.EnablePeerExchangeServer = false config.EnablePeerExchangeServer = false
config.EnablePeerExchangeClient = false config.EnablePeerExchangeClient = false
config.DiscV5BootstrapNodes = []string{pxServerNode.node.ENR().String()} enr, err := pxServerNode.ENR()
require.NoError(t, err)
config.DiscV5BootstrapNodes = []string{enr.String()}
discV5Node, err := New(nil, "", config, logger.Named("discV5Node"), nil, nil, nil, nil) discV5Node, err := New(nil, "", config, logger.Named("discV5Node"), nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, discV5Node.Start()) require.NoError(t, discV5Node.Start())
@ -360,7 +363,7 @@ func TestPeerExchange(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
// start light node which use PeerExchange to discover peers // start light node which use PeerExchange to discover peers
enrNodes := []*enode.Node{pxServerNode.node.ENR()} enrNodes := []*enode.Node{enr}
tree, url := makeTestTree("n", enrNodes, nil) tree, url := makeTestTree("n", enrNodes, nil)
resolver := mapResolver(tree.ToTXT("n")) resolver := mapResolver(tree.ToTXT("n"))
@ -385,17 +388,23 @@ func TestPeerExchange(t *testing.T) {
// in light client mode,the peer will be closed via `w.node.Host().Network().ClosePeer(peerInfo.ID)` // in light client mode,the peer will be closed via `w.node.Host().Network().ClosePeer(peerInfo.ID)`
// after invoking identifyAndConnect, instead, we should check the peerStore, peers from peerStore // after invoking identifyAndConnect, instead, we should check the peerStore, peers from peerStore
// won't get deleted especially if they are statically added. // won't get deleted especially if they are statically added.
if len(lightNode.node.Host().Peerstore().Peers()) == 2 { numConnected, err := lightNode.GetNumConnectedPeers()
if err != nil {
return err
}
if numConnected == 2 {
return nil return nil
} }
return errors.New("no peers discovered") return errors.New("no peers discovered")
}, options) }, options)
require.NoError(t, err) require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background()) _, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
require.NoError(t, discV5Node.node.PeerExchange().Request(ctx, 1)) _, err = discV5Node.WakuPeerExchangeRequest(1)
require.Error(t, discV5Node.node.PeerExchange().Request(ctx, 1)) //should fail due to rate limit require.NoError(t, err)
_, err = discV5Node.WakuPeerExchangeRequest(1)
require.Error(t, err) //should fail due to rate limit
require.NoError(t, lightNode.Stop()) require.NoError(t, lightNode.Stop())
require.NoError(t, pxServerNode.Stop()) require.NoError(t, pxServerNode.Stop())
@ -430,7 +439,7 @@ func TestWakuV2Filter(t *testing.T) {
// Sanity check, not great, but it's probably helpful // Sanity check, not great, but it's probably helpful
err = tt.RetryWithBackOff(func() error { err = tt.RetryWithBackOff(func() error {
peers, err := w.node.PeerManager().FilterPeersByProto(nil, nil, filter.FilterSubscribeID_v20beta1) peers, err := w.GetPeerIdsByProtocol(string(filter.FilterSubscribeID_v20beta1))
if err != nil { if err != nil {
return err return err
} }
@ -466,20 +475,20 @@ func TestWakuV2Filter(t *testing.T) {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
// Ensure there is at least 1 active filter subscription // Ensure there is at least 1 active filter subscription
subscriptions := w.node.FilterLightnode().Subscriptions() subscriptions := w.FilterLightnode().Subscriptions()
require.Greater(t, len(subscriptions), 0) require.Greater(t, len(subscriptions), 0)
messages := filter.Retrieve() messages := filter.Retrieve()
require.Len(t, messages, 1) require.Len(t, messages, 1)
// Mock peers going down // Mock peers going down
_, err = w.node.FilterLightnode().UnsubscribeWithSubscription(w.ctx, subscriptions[0]) _, err = w.FilterLightnode().UnsubscribeWithSubscription(w.ctx, subscriptions[0])
require.NoError(t, err) require.NoError(t, err)
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
// Ensure there is at least 1 active filter subscription // Ensure there is at least 1 active filter subscription
subscriptions = w.node.FilterLightnode().Subscriptions() subscriptions = w.FilterLightnode().Subscriptions()
require.Greater(t, len(subscriptions), 0) require.Greater(t, len(subscriptions), 0)
// Ensure that messages are retrieved with a fresh sub // Ensure that messages are retrieved with a fresh sub
@ -551,11 +560,11 @@ func TestWakuV2Store(t *testing.T) {
}() }()
// Connect the two nodes directly // Connect the two nodes directly
peer2Addr := w2.node.ListenAddresses()[0].String() peer2Addr := w2.ListenAddresses()[0].String()
err = w1.node.DialPeer(context.Background(), peer2Addr) err = w1.DialPeer(context.Background(), peer2Addr)
require.NoError(t, err) require.NoError(t, err)
waitForPeerConnection(t, w2.node.Host().ID(), w1PeersCh) waitForPeerConnection(t, w2.Host().ID(), w1PeersCh)
// Create a filter for the second node to catch messages // Create a filter for the second node to catch messages
filter := &common.Filter{ filter := &common.Filter{
@ -591,7 +600,7 @@ func TestWakuV2Store(t *testing.T) {
// Query the second node's store for the message // Query the second node's store for the message
_, envelopeCount, err := w1.Query( _, envelopeCount, err := w1.Query(
context.Background(), context.Background(),
w2.node.Host().ID(), w2.Host().ID(),
store.FilterCriteria{ store.FilterCriteria{
TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)), TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)),
TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)), TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)),
@ -729,7 +738,7 @@ func TestLightpushRateLimit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
//Connect the relay peer and full node //Connect the relay peer and full node
err = w1.node.DialPeer(ctx, w0.node.ListenAddresses()[0].String()) err = w1.DialPeer(ctx, w0.ListenAddresses()[0].String())
require.NoError(t, err) require.NoError(t, err)
err = tt.RetryWithBackOff(func() error { err = tt.RetryWithBackOff(func() error {
@ -756,9 +765,9 @@ func TestLightpushRateLimit(t *testing.T) {
}() }()
//Use this instead of DialPeer to make sure the peer is added to PeerStore and can be selected for Lighpush //Use this instead of DialPeer to make sure the peer is added to PeerStore and can be selected for Lighpush
w2.node.AddDiscoveredPeer(w1.PeerID(), w1.node.ListenAddresses(), wps.Static, w1.cfg.DefaultShardedPubsubTopics, w1.node.ENR(), true) w2.AddDiscoveredPeer(w1.PeerID(), w1.ListenAddresses(), wps.Static, w1.cfg.DefaultShardedPubsubTopics, w1.node.ENR(), true)
waitForPeerConnectionWithTimeout(t, w2.node.Host().ID(), w1PeersCh, 5*time.Second) waitForPeerConnectionWithTimeout(t, w2.Host().ID(), w1PeersCh, 5*time.Second)
event := make(chan common.EnvelopeEvent, 10) event := make(chan common.EnvelopeEvent, 10)
w2.SubscribeEnvelopeEvents(event) w2.SubscribeEnvelopeEvents(event)