diff --git a/third_party/nwaku b/third_party/nwaku index 3665991a6..507b1fc4d 160000 --- a/third_party/nwaku +++ b/third_party/nwaku @@ -1 +1 @@ -Subproject commit 3665991a655495a4e47c92596e7d9e156f4ed693 +Subproject commit 507b1fc4d97a01ee5695a205f7f981bd4accc694 diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index c86148dd5..ce078c1e8 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -179,6 +179,20 @@ package wakuv2 resp) ); } + static void cGoWakuDialPeer(void* wakuCtx, + char* peerMultiAddr, + char* protocol, + int timeoutMs, + void* resp) { + + WAKU_CALL( waku_dial_peer(wakuCtx, + peerMultiAddr, + protocol, + timeoutMs, + (WakuCallBack) callback, + resp) ); + } + static void cGoWakuDialPeerById(void* wakuCtx, char* peerId, char* protocol, @@ -220,10 +234,14 @@ package wakuv2 WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } - static void cGoWakuGetNumConnectedPeers(void* ctx, char* pubSubTopic, void* resp) { + static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } + static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { + WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) callback, resp) ); + } + static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) ); } @@ -328,7 +346,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" @@ -1905,10 +1922,13 @@ func (w *Waku) ClearEnvelopesCache() { w.envelopeCache = newTTLCache() } -// TODO-nwaku -func (w *Waku) PeerCount() int { - return 0 - // return w.node.PeerCount() +func (w *Waku) PeerCount() (int, error) { + peerCount, err := w.GetNumConnectedPeers() + if err != nil { + return 0, err + } + + return peerCount, nil } // TODO-nwaku @@ -2164,22 +2184,24 @@ func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { } func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { - // TODO-nwaku - /* - ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) - defer cancel() - return w.node.DialPeerWithMultiAddress(ctx, address) */ - return nil + // Using WakuConnect so it matches the go-waku's behavior and terminology + return w.WakuConnect(address.String(), int(requestTimeout/time.Millisecond)) } -func (w *Waku) DialPeerByID(peerID peer.ID) error { - return w.WakuDialPeerById(peerID, string(relay.WakuRelayID_v200), 1000) -} +func (self *Waku) DropPeer(peerId peer.ID) error { + var resp = C.allocResp() + var cPeerId = C.CString(peerId.String()) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerId)) -func (w *Waku) DropPeer(peerID peer.ID) error { - // TODO-nwaku - // return w.node.ClosePeerById(peerID) - return nil + C.cGoWakuDisconnectPeerById(self.wakuCtx, cPeerId, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DisconnectPeerById: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) } func (w *Waku) ProcessingP2PMessages() bool { @@ -2668,6 +2690,24 @@ func (self *Waku) WakuConnect(peerMultiAddr string, timeoutMs int) error { return errors.New(errMsg) } +func (self *Waku) WakuDialPeer(peerMultiAddr multiaddr.Multiaddr, protocol string, timeoutMs int) error { + var resp = C.allocResp() + var cPeerMultiAddr = C.CString(peerMultiAddr.String()) + var cProtocol = C.CString(protocol) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerMultiAddr)) + defer C.free(unsafe.Pointer(cProtocol)) + + C.cGoWakuDialPeer(self.wakuCtx, cPeerMultiAddr, cProtocol, C.int(timeoutMs), resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DialPeer: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + func (self *Waku) WakuDialPeerById(peerId peer.ID, protocol string, timeoutMs int) error { var resp = C.allocResp() var cPeerId = C.CString(peerId.String()) @@ -2753,7 +2793,7 @@ func (self *Waku) ListPeersInMesh(pubsubTopic string) (int, error) { return 0, errors.New(errMsg) } -func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error) { +func (self *Waku) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) { var pubsubTopic string if len(paramPubsubTopic) == 0 { pubsubTopic = "" @@ -2766,22 +2806,58 @@ func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) - C.cGoWakuGetNumConnectedPeers(self.wakuCtx, cPubsubTopic, resp) + C.cGoWakuGetNumConnectedRelayPeers(self.wakuCtx, cPubsubTopic, resp) if C.getRet(resp) == C.RET_OK { numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numPeers, err := strconv.Atoi(numPeersStr) if err != nil { - errMsg := "GetNumConnectedPeers - error converting string to int: " + err.Error() + errMsg := "GetNumConnectedRelayPeers - error converting string to int: " + err.Error() return 0, errors.New(errMsg) } return numPeers, nil } - errMsg := "error GetNumConnectedPeers: " + + errMsg := "error GetNumConnectedRelayPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) return 0, errors.New(errMsg) } +func (self *Waku) GetConnectedPeers() (peer.IDSlice, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuGetConnectedPeers(self.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + return peer.IDSlice{}, nil + } + // peersStr contains a comma-separated list of peer ids + itemsPeerIds := strings.Split(peersStr, ",") + + var peers peer.IDSlice + for _, peerId := range itemsPeerIds { + id, err := peer.Decode(peerId) + if err != nil { + return nil, fmt.Errorf("GetConnectedPeers - decoding peerId: %w", err) + } + peers = append(peers, id) + } + + return peers, nil + } + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, fmt.Errorf("GetConnectedPeers: %s", errMsg) +} + +func (self *Waku) GetNumConnectedPeers() (int, error) { + connecterPeers, err := self.GetConnectedPeers() + if err != nil { + return 0, err + } + return len(connecterPeers), nil +} + func (self *Waku) GetPeerIdsFromPeerStore() (peer.IDSlice, error) { var resp = C.allocResp() defer C.freeResp(resp) @@ -2842,22 +2918,6 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { return nil, errors.New(errMsg) } -func (self *Waku) DisconnectPeerById(peerId peer.ID) error { - var resp = C.allocResp() - var cPeerId = C.CString(peerId.String()) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPeerId)) - - C.cGoWakuDisconnectPeerById(self.wakuCtx, cPeerId, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - errMsg := "error DisconnectPeerById: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - // func main() { // config := WakuConfig{ diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 417348e02..0a58054ec 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -12,6 +12,7 @@ import ( "github.com/cenkalti/backoff/v3" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/protocol/store" "go.uber.org/zap" @@ -196,7 +197,7 @@ func TestBasicWakuV2(t *testing.T) { // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { - numConnected, err := w.GetNumConnectedPeers() + numConnected, err := w.GetNumConnectedRelayPeers() if err != nil { return err } @@ -219,7 +220,7 @@ func TestBasicWakuV2(t *testing.T) { require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") // Disconnect from the store node - err = w.DisconnectPeerById(storeNode.ID) + err = w.DropPeer(storeNode.ID) require.NoError(t, err) // Check that we are indeed disconnected @@ -228,10 +229,15 @@ func TestBasicWakuV2(t *testing.T) { isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID) require.True(t, isDisconnected, "nwaku should be disconnected from the store node") - // Re-connect - err = w.DialPeerByID(storeNode.ID) + storeNodeMultiadd, err := multiaddr.NewMultiaddr(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) + // Re-connect + err = w.DialPeer(storeNodeMultiadd) + require.NoError(t, err) + + time.Sleep(1 * time.Second) + // Check that we are connected again connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) require.NoError(t, err) @@ -545,6 +551,88 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, discV5Node.Stop()) */ } +func TestDial(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + dialerNodeConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + + // start node that will initiate the dial + dialerNodeWakuConfig := WakuConfig{ + EnableRelay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + Discv5UdpPort: 9020, + TcpPort: 60020, + } + + dialerNode, err := New(nil, "", &dialerNodeConfig, &dialerNodeWakuConfig, logger.Named("dialerNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, dialerNode.Start()) + + time.Sleep(1 * time.Second) + + receiverNodeConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + + // start node that will receive the dial + receiverNodeWakuConfig := WakuConfig{ + EnableRelay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + Discv5UdpPort: 9021, + TcpPort: 60021, + } + + receiverNode, err := New(nil, "", &receiverNodeConfig, &receiverNodeWakuConfig, logger.Named("receiverNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, receiverNode.Start()) + + time.Sleep(1 * time.Second) + + receiverMultiaddr, err := receiverNode.ListenAddresses() + require.NoError(t, err) + require.NotNil(t, receiverMultiaddr) + + // Check that both nodes start with no connected peers + dialerPeerCount, err := dialerNode.PeerCount() + require.NoError(t, err) + require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers") + + receiverPeerCount, err := receiverNode.PeerCount() + require.NoError(t, err) + require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers") + + // Dial + err = dialerNode.DialPeer(receiverMultiaddr[0]) + require.NoError(t, err) + + time.Sleep(1 * time.Second) + + // Check that both nodes now have one connected peer + dialerPeerCount, err = dialerNode.PeerCount() + require.NoError(t, err) + require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer") + + receiverPeerCount, err = receiverNode.PeerCount() + require.NoError(t, err) + require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") + + // Stop nodes + require.NoError(t, dialerNode.Stop()) + require.NoError(t, receiverNode.Stop()) + +} + /* func TestWakuV2Filter(t *testing.T) {