feat_: dial, drop and retrieve connected peers (#6013)

This commit is contained in:
gabrielmer 2024-10-30 18:24:05 +02:00 committed by GitHub
parent fd9d9f384f
commit 6ac236d380
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 192 additions and 44 deletions

2
third_party/nwaku vendored

@ -1 +1 @@
Subproject commit 3665991a655495a4e47c92596e7d9e156f4ed693
Subproject commit 507b1fc4d97a01ee5695a205f7f981bd4accc694

View File

@ -179,6 +179,20 @@ package wakuv2
resp) );
}
static void cGoWakuDialPeer(void* wakuCtx,
char* peerMultiAddr,
char* protocol,
int timeoutMs,
void* resp) {
WAKU_CALL( waku_dial_peer(wakuCtx,
peerMultiAddr,
protocol,
timeoutMs,
(WakuCallBack) callback,
resp) );
}
static void cGoWakuDialPeerById(void* wakuCtx,
char* peerId,
char* protocol,
@ -220,10 +234,14 @@ package wakuv2
WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) );
}
static void cGoWakuGetNumConnectedPeers(void* ctx, char* pubSubTopic, void* resp) {
static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) {
WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) );
}
static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) {
WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) callback, resp) );
}
static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) {
WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) );
}
@ -328,7 +346,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
@ -1905,10 +1922,13 @@ func (w *Waku) ClearEnvelopesCache() {
w.envelopeCache = newTTLCache()
}
// TODO-nwaku
func (w *Waku) PeerCount() int {
return 0
// return w.node.PeerCount()
func (w *Waku) PeerCount() (int, error) {
peerCount, err := w.GetNumConnectedPeers()
if err != nil {
return 0, err
}
return peerCount, nil
}
// TODO-nwaku
@ -2164,22 +2184,24 @@ func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) {
}
func (w *Waku) DialPeer(address multiaddr.Multiaddr) error {
// TODO-nwaku
/*
ctx, cancel := context.WithTimeout(w.ctx, requestTimeout)
defer cancel()
return w.node.DialPeerWithMultiAddress(ctx, address) */
return nil
// Using WakuConnect so it matches the go-waku's behavior and terminology
return w.WakuConnect(address.String(), int(requestTimeout/time.Millisecond))
}
func (w *Waku) DialPeerByID(peerID peer.ID) error {
return w.WakuDialPeerById(peerID, string(relay.WakuRelayID_v200), 1000)
}
func (self *Waku) DropPeer(peerId peer.ID) error {
var resp = C.allocResp()
var cPeerId = C.CString(peerId.String())
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerId))
func (w *Waku) DropPeer(peerID peer.ID) error {
// TODO-nwaku
// return w.node.ClosePeerById(peerID)
return nil
C.cGoWakuDisconnectPeerById(self.wakuCtx, cPeerId, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error DisconnectPeerById: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (w *Waku) ProcessingP2PMessages() bool {
@ -2668,6 +2690,24 @@ func (self *Waku) WakuConnect(peerMultiAddr string, timeoutMs int) error {
return errors.New(errMsg)
}
func (self *Waku) WakuDialPeer(peerMultiAddr multiaddr.Multiaddr, protocol string, timeoutMs int) error {
var resp = C.allocResp()
var cPeerMultiAddr = C.CString(peerMultiAddr.String())
var cProtocol = C.CString(protocol)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerMultiAddr))
defer C.free(unsafe.Pointer(cProtocol))
C.cGoWakuDialPeer(self.wakuCtx, cPeerMultiAddr, cProtocol, C.int(timeoutMs), resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error DialPeer: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (self *Waku) WakuDialPeerById(peerId peer.ID, protocol string, timeoutMs int) error {
var resp = C.allocResp()
var cPeerId = C.CString(peerId.String())
@ -2753,7 +2793,7 @@ func (self *Waku) ListPeersInMesh(pubsubTopic string) (int, error) {
return 0, errors.New(errMsg)
}
func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error) {
func (self *Waku) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) {
var pubsubTopic string
if len(paramPubsubTopic) == 0 {
pubsubTopic = ""
@ -2766,22 +2806,58 @@ func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPubsubTopic))
C.cGoWakuGetNumConnectedPeers(self.wakuCtx, cPubsubTopic, resp)
C.cGoWakuGetNumConnectedRelayPeers(self.wakuCtx, cPubsubTopic, resp)
if C.getRet(resp) == C.RET_OK {
numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
numPeers, err := strconv.Atoi(numPeersStr)
if err != nil {
errMsg := "GetNumConnectedPeers - error converting string to int: " + err.Error()
errMsg := "GetNumConnectedRelayPeers - error converting string to int: " + err.Error()
return 0, errors.New(errMsg)
}
return numPeers, nil
}
errMsg := "error GetNumConnectedPeers: " +
errMsg := "error GetNumConnectedRelayPeers: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return 0, errors.New(errMsg)
}
func (self *Waku) GetConnectedPeers() (peer.IDSlice, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuGetConnectedPeers(self.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
return peer.IDSlice{}, nil
}
// peersStr contains a comma-separated list of peer ids
itemsPeerIds := strings.Split(peersStr, ",")
var peers peer.IDSlice
for _, peerId := range itemsPeerIds {
id, err := peer.Decode(peerId)
if err != nil {
return nil, fmt.Errorf("GetConnectedPeers - decoding peerId: %w", err)
}
peers = append(peers, id)
}
return peers, nil
}
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, fmt.Errorf("GetConnectedPeers: %s", errMsg)
}
func (self *Waku) GetNumConnectedPeers() (int, error) {
connecterPeers, err := self.GetConnectedPeers()
if err != nil {
return 0, err
}
return len(connecterPeers), nil
}
func (self *Waku) GetPeerIdsFromPeerStore() (peer.IDSlice, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
@ -2842,22 +2918,6 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) {
return nil, errors.New(errMsg)
}
func (self *Waku) DisconnectPeerById(peerId peer.ID) error {
var resp = C.allocResp()
var cPeerId = C.CString(peerId.String())
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerId))
C.cGoWakuDisconnectPeerById(self.wakuCtx, cPeerId, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error DisconnectPeerById: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
// func main() {
// config := WakuConfig{

View File

@ -12,6 +12,7 @@ import (
"github.com/cenkalti/backoff/v3"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"go.uber.org/zap"
@ -196,7 +197,7 @@ func TestBasicWakuV2(t *testing.T) {
// Sanity check, not great, but it's probably helpful
err = tt.RetryWithBackOff(func() error {
numConnected, err := w.GetNumConnectedPeers()
numConnected, err := w.GetNumConnectedRelayPeers()
if err != nil {
return err
}
@ -219,7 +220,7 @@ func TestBasicWakuV2(t *testing.T) {
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")
// Disconnect from the store node
err = w.DisconnectPeerById(storeNode.ID)
err = w.DropPeer(storeNode.ID)
require.NoError(t, err)
// Check that we are indeed disconnected
@ -228,10 +229,15 @@ func TestBasicWakuV2(t *testing.T) {
isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID)
require.True(t, isDisconnected, "nwaku should be disconnected from the store node")
// Re-connect
err = w.DialPeerByID(storeNode.ID)
storeNodeMultiadd, err := multiaddr.NewMultiaddr(storeNodeInfo.ListenAddresses[0])
require.NoError(t, err)
// Re-connect
err = w.DialPeer(storeNodeMultiadd)
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Check that we are connected again
connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300))
require.NoError(t, err)
@ -545,6 +551,88 @@ func TestPeerExchange(t *testing.T) {
require.NoError(t, discV5Node.Stop()) */
}
func TestDial(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
dialerNodeConfig := Config{
UseThrottledPublish: true,
ClusterID: 16,
}
// start node that will initiate the dial
dialerNodeWakuConfig := WakuConfig{
EnableRelay: true,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: 16,
Shards: []uint16{64},
Discv5UdpPort: 9020,
TcpPort: 60020,
}
dialerNode, err := New(nil, "", &dialerNodeConfig, &dialerNodeWakuConfig, logger.Named("dialerNode"), nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, dialerNode.Start())
time.Sleep(1 * time.Second)
receiverNodeConfig := Config{
UseThrottledPublish: true,
ClusterID: 16,
}
// start node that will receive the dial
receiverNodeWakuConfig := WakuConfig{
EnableRelay: true,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: 16,
Shards: []uint16{64},
Discv5UdpPort: 9021,
TcpPort: 60021,
}
receiverNode, err := New(nil, "", &receiverNodeConfig, &receiverNodeWakuConfig, logger.Named("receiverNode"), nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
time.Sleep(1 * time.Second)
receiverMultiaddr, err := receiverNode.ListenAddresses()
require.NoError(t, err)
require.NotNil(t, receiverMultiaddr)
// Check that both nodes start with no connected peers
dialerPeerCount, err := dialerNode.PeerCount()
require.NoError(t, err)
require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers")
receiverPeerCount, err := receiverNode.PeerCount()
require.NoError(t, err)
require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers")
// Dial
err = dialerNode.DialPeer(receiverMultiaddr[0])
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Check that both nodes now have one connected peer
dialerPeerCount, err = dialerNode.PeerCount()
require.NoError(t, err)
require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer")
receiverPeerCount, err = receiverNode.PeerCount()
require.NoError(t, err)
require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer")
// Stop nodes
require.NoError(t, dialerNode.Stop())
require.NoError(t, receiverNode.Stop())
}
/*
func TestWakuV2Filter(t *testing.T) {