From 73321c41992a4e1300bf4f40073b240c1d382e77 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 27 Nov 2024 17:25:26 +0200 Subject: [PATCH] more --- nwaku.go | 940 ++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 718 insertions(+), 222 deletions(-) diff --git a/nwaku.go b/nwaku.go index 3377ac7..10a6c94 100644 --- a/nwaku.go +++ b/nwaku.go @@ -319,10 +319,12 @@ import ( "time" "unsafe" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" libp2pproto "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -361,6 +363,63 @@ type Waku struct { logger *zap.Logger } +// Start implements node.Service, starting the background data propagation thread +// of the Waku protocol. +func (w *Waku) Start() error { + err := w.node.Start() + if err != nil { + return fmt.Errorf("failed to start nwaku node: %v", err) + } + + peerID, err := w.node.PeerID() + if err != nil { + return err + } + + w.logger.Info("WakuV2 PeerID", zap.Stringer("id", peerID)) + + return nil +} + +// Stop implements node.Service, stopping the background data propagation thread +// of the Waku protocol. +func (w *Waku) Stop() error { + w.cancel() + + err := w.node.Stop() + if err != nil { + return err + } + + // w.wg.Wait() + + w.ctx = nil + w.cancel = nil + + return nil +} + +func (w *Waku) PeerCount() (int, error) { + return w.node.GetNumConnectedPeers() +} + +func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { + // Using WakuConnect so it matches the go-waku's behavior and terminology + ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) + defer cancel() + return w.node.Connect(ctx, address) +} + +func (w *Waku) DialPeerByID(peerID peer.ID) error { + ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) + defer cancel() + return w.node.DialPeerByID(ctx, peerID, relay.WakuRelayID_v200) +} + +func (w *Waku) DropPeer(peerID peer.ID) error { + return w.node.DisconnectPeerByID(peerID) +} + type request struct { id string reqType requestType @@ -555,118 +614,6 @@ func (n *WakuNode) processLoop(ctx context.Context) { } } -// Start implements node.Service, starting the background data propagation thread -// of the Waku protocol. -func (w *Waku) Start() error { - err := w.node.Start() - if err != nil { - return fmt.Errorf("failed to start nwaku node: %v", err) - } - - peerID, err := w.node.PeerID() - if err != nil { - return err - } - - w.logger.Info("WakuV2 PeerID", zap.Stringer("id", peerID)) - - return nil -} - -func (n *WakuNode) start() error { - var resp = C.allocResp() - defer C.freeResp(resp) - - C.cGoWakuStart(n.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - - errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -// Stop implements node.Service, stopping the background data propagation thread -// of the Waku protocol. -func (w *Waku) Stop() error { - w.cancel() - - err := w.node.Stop() - if err != nil { - return err - } - - // w.wg.Wait() - - w.ctx = nil - w.cancel = nil - - return nil -} - -func (n *WakuNode) stop() error { - var resp = C.allocResp() - defer C.freeResp(resp) - - C.cGoWakuStop(n.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - - errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -func (n *WakuNode) destroy() error { - var resp = C.allocResp() - defer C.freeResp(resp) - - C.cGoWakuDestroy(n.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - return nil - } - - errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuListenAddresses(n.wakuCtx, resp) - - if C.getRet(resp) == C.RET_OK { - var addrsRet []multiaddr.Multiaddr - listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - addrss := strings.Split(listenAddresses, ",") - for _, addr := range addrss { - addr, err := multiaddr.NewMultiaddr(addr) - if err != nil { - return nil, err - } - addrsRet = append(addrsRet, addr) - } - return addrsRet, nil - } - errMsg := "error WakuListenAddresses: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, errors.New(errMsg) -} - -func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { - response, err := n.postTask(requestTypeListenAddresses, nil) - if err != nil { - return nil, err - } - return response.([]multiaddr.Multiaddr), nil -} - -func (w *Waku) PeerCount() (int, error) { - return w.node.GetNumConnectedPeers() -} - func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { var pubsubTopic string if len(optPubsubTopic) == 0 { @@ -696,114 +643,153 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err return 0, errors.New(errMsg) } -func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetConnectedPeers, nil) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - -func (n *WakuNode) GetNumConnectedPeers() (int, error) { - peers, err := n.GetConnectedPeers() - if err != nil { - return 0, err - } - return len(peers), nil -} - -func (n *WakuNode) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) { - response, err := n.postTask(requestTypeGetNumConnectedRelayPeers, paramPubsubTopic) - if err != nil { - return 0, err - } - return response.(int), nil -} - -type dialPeerRequest struct { - ctx context.Context - peerAddr multiaddr.Multiaddr - protocol libp2pproto.ID -} - -func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { - _, err := n.postTask(requestTypeDialPeer, dialPeerRequest{ - ctx: ctx, - peerAddr: peerAddr, - protocol: protocol, - }) - return err -} - -func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { - // Using WakuConnect so it matches the go-waku's behavior and terminology - ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) - defer cancel() - return w.node.Connect(ctx, address) -} - -func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { +func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error { var resp = C.allocResp() - var cPeerMultiAddr = C.CString(dialPeerReq.peerAddr.String()) - var cProtocol = C.CString(string(dialPeerReq.protocol)) + var cPeerId = C.CString(peerID.String()) defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPeerMultiAddr)) - defer C.free(unsafe.Pointer(cProtocol)) - // TODO: extract timeout from context - C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(requestTimeout.Milliseconds()), resp) + defer C.free(unsafe.Pointer(cPeerId)) + + C.cGoWakuDisconnectPeerById(n.wakuCtx, cPeerId, resp) + if C.getRet(resp) == C.RET_OK { return nil } - errMsg := "error DialPeer: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + errMsg := "error DisconnectPeerById: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) return errors.New(errMsg) } -type connectRequest struct { - ctx context.Context - addr multiaddr.Multiaddr -} - -func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { - _, err := n.postTask(requestTypeConnect, connectRequest{ - ctx: ctx, - addr: addr, - }) - return err -} - -func (n *WakuNode) connect(connReq connectRequest) error { +func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) { var resp = C.allocResp() - var cPeerMultiAddr = C.CString(connReq.addr.String()) defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPeerMultiAddr)) + C.cGoWakuGetConnectedPeers(n.wakuCtx, resp) + if C.getRet(resp) == C.RET_OK { + peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + return nil, nil + } + // peersStr contains a comma-separated list of peer ids + itemsPeerIds := strings.Split(peersStr, ",") + var peers peer.IDSlice + for _, peerId := range itemsPeerIds { + id, err := peer.Decode(peerId) + if err != nil { + return nil, fmt.Errorf("GetConnectedPeers - decoding peerId: %w", err) + } + peers = append(peers, id) + } + return peers, nil + } + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, fmt.Errorf("GetConnectedPeers: %s", errMsg) - // TODO: extract timeout from ctx - C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(time.Minute.Milliseconds()), resp) +} + +func (n *WakuNode) relaySubscribe(pubsubTopic string) error { + if pubsubTopic == "" { + return errors.New("pubsub topic is empty") + } + + var resp = C.allocResp() + var cPubsubTopic = C.CString(pubsubTopic) + + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPubsubTopic)) + + if n.wakuCtx == nil { + return errors.New("wakuCtx is nil") + } + + C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) if C.getRet(resp) == C.RET_OK { return nil } - errMsg := "error WakuConnect: " + + + errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { + if pubsubTopic == "" { + return errors.New("pubsub topic is empty") + } + + var resp = C.allocResp() + var cPubsubTopic = C.CString(pubsubTopic) + + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPubsubTopic)) + + if n.wakuCtx == nil { + return errors.New("wakuCtx is nil") + } + + C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + + errMsg := "error WakuRelayUnsubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuPeerExchangeQuery(n.wakuCtx, C.uint64_t(numPeers), resp) + if C.getRet(resp) == C.RET_OK { + numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) + if err != nil { + return 0, err + } + return numRecvPeers, nil + } + + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return 0, errors.New(errMsg) +} + +func (n *WakuNode) startDiscV5() error { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuStartDiscV5(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) stopDiscV5() error { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuStopDiscV5(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) version() (string, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuVersion(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return version, nil + } + + errMsg := "error WakuVersion: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) -} - -type storeQueryRequest struct { - ctx context.Context - storeRequest *storepb.StoreQueryRequest - peerInfo peer.AddrInfo -} - -func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { - response, err := n.postTask(requestTypeStoreQuery, storeQueryRequest{ - ctx: ctx, - peerInfo: peerInfo, - storeRequest: storeRequest, - }) - if err != nil { - return nil, err - } - return response.(*storepb.StoreQueryResponse), nil + return "", errors.New(errMsg) } func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) { @@ -843,24 +829,6 @@ func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) return nil, errors.New(errMsg) } -type relayPublishRequest struct { - ctx context.Context - pubsubTopic string - message *pb.WakuMessage -} - -func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { - response, err := n.postTask(requestTypeRelayPublish, relayPublishRequest{ - ctx: ctx, - pubsubTopic: pubsubTopic, - message: message, - }) - if err != nil { - return pb.MessageHash{}, err - } - return response.(pb.MessageHash), nil -} - func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.MessageHash, error) { // TODO: extract timeout from context timeoutMs := time.Minute.Milliseconds() @@ -891,6 +859,277 @@ func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.Mes return pb.MessageHash{}, errors.New(errMsg) } +func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr.Multiaddr, error) { + var resp = C.allocResp() + var cEnrTree = C.CString(dnsDiscRequest.enrTreeUrl) + var cDnsServer = C.CString(dnsDiscRequest.nameDnsServer) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cEnrTree)) + defer C.free(unsafe.Pointer(cDnsServer)) + // TODO: extract timeout from context + C.cGoWakuDnsDiscovery(n.wakuCtx, cEnrTree, cDnsServer, C.int(time.Minute.Milliseconds()), resp) + if C.getRet(resp) == C.RET_OK { + var addrsRet []multiaddr.Multiaddr + nodeAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + addrss := strings.Split(nodeAddresses, ",") + for _, addr := range addrss { + addr, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return nil, err + } + addrsRet = append(addrsRet, addr) + } + return addrsRet, nil + } + errMsg := "error WakuDnsDiscovery: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) +} + +func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) { + peerInfo := request.peerInfo + + addrs := make([]string, len(peerInfo.Addrs)) + for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { + addrs[i] = addr.String() + } + + var resp = C.allocResp() + var cPeerId = C.CString(strings.Join(addrs, ",")) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerId)) + + // TODO: extract timeout from ctx + C.cGoWakuPingPeer(n.wakuCtx, cPeerId, C.int(time.Minute.Milliseconds()), resp) + if C.getRet(resp) == C.RET_OK { + rttStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + rttInt, err := strconv.ParseInt(rttStr, 10, 64) + if err != nil { + return 0, err + } + return time.Duration(rttInt), nil + } + + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return 0, fmt.Errorf("PingPeer: %s", errMsg) +} + +func (n *WakuNode) start() error { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuStart(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + + errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) stop() error { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuStop(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + + errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) destroy() error { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuDestroy(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + + errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) peerID() (peer.ID, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuGetMyPeerId(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + id, err := peer.Decode(peerIdStr) + if err != nil { + errMsg := "WakuGetMyPeerId - decoding peerId: %w" + return "", fmt.Errorf(errMsg, err) + } + return id, nil + } + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return "", errors.New(errMsg) +} + +func (n *WakuNode) connect(connReq connectRequest) error { + var resp = C.allocResp() + var cPeerMultiAddr = C.CString(connReq.addr.String()) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerMultiAddr)) + + // TODO: extract timeout from ctx + C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(time.Minute.Milliseconds()), resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error WakuConnect: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { + var resp = C.allocResp() + var cPeerId = C.CString(dialPeerByIDReq.peerID.String()) + var cProtocol = C.CString(string(dialPeerByIDReq.protocol)) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerId)) + defer C.free(unsafe.Pointer(cProtocol)) + + // TODO: extract timeout from ctx + C.cGoWakuDialPeerById(n.wakuCtx, cPeerId, cProtocol, C.int(time.Minute.Milliseconds()), resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DialPeerById: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuListenAddresses(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + var addrsRet []multiaddr.Multiaddr + listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + addrss := strings.Split(listenAddresses, ",") + for _, addr := range addrss { + addr, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return nil, err + } + addrsRet = append(addrsRet, addr) + } + return addrsRet, nil + } + errMsg := "error WakuListenAddresses: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) +} + +func (n *WakuNode) enr() (*enode.Node, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuGetMyENR(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + n, err := enode.Parse(enode.ValidSchemes, enrStr) + if err != nil { + return nil, err + } + return n, nil + } + errMsg := "error WakuGetMyENR: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) +} + +func (n *WakuNode) listPeersInMesh(pubsubTopic string) (int, error) { + var resp = C.allocResp() + var cPubsubTopic = C.CString(pubsubTopic) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPubsubTopic)) + + C.cGoWakuListPeersInMesh(n.wakuCtx, cPubsubTopic, resp) + + if C.getRet(resp) == C.RET_OK { + numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + numPeers, err := strconv.Atoi(numPeersStr) + if err != nil { + errMsg := "ListPeersInMesh - error converting string to int: " + err.Error() + return 0, errors.New(errMsg) + } + return numPeers, nil + } + errMsg := "error ListPeersInMesh: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return 0, errors.New(errMsg) +} + +func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + return nil, nil + } + // peersStr contains a comma-separated list of peer ids + itemsPeerIds := strings.Split(peersStr, ",") + + var peers peer.IDSlice + for _, peerId := range itemsPeerIds { + id, err := peer.Decode(peerId) + if err != nil { + return nil, fmt.Errorf("GetPeerIdsFromPeerStore - decoding peerId: %w", err) + } + peers = append(peers, id) + } + return peers, nil + } + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg) +} + +func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice, error) { + var resp = C.allocResp() + var cProtocol = C.CString(string(protocolID)) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cProtocol)) + + C.cGoWakuGetPeerIdsByProtocol(n.wakuCtx, cProtocol, resp) + + if C.getRet(resp) == C.RET_OK { + peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + return nil, nil + } + // peersStr contains a comma-separated list of peer ids + itemsPeerIds := strings.Split(peersStr, ",") + + var peers peer.IDSlice + for _, p := range itemsPeerIds { + id, err := peer.Decode(p) + if err != nil { + return nil, fmt.Errorf("GetPeerIdsByProtocol - decoding peerId: %w", err) + } + peers = append(peers, id) + } + return peers, nil + } + errMsg := "error GetPeerIdsByProtocol: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, fmt.Errorf("GetPeerIdsByProtocol: %s", errMsg) +} + func (n *WakuNode) newNode(config *WakuConfig) error { jsonConfig, err := json.Marshal(config) if err != nil { @@ -916,3 +1155,260 @@ func (n *WakuNode) newNode(config *WakuConfig) error { return nil } + +func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { + var resp = C.allocResp() + var cPeerMultiAddr = C.CString(dialPeerReq.peerAddr.String()) + var cProtocol = C.CString(string(dialPeerReq.protocol)) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerMultiAddr)) + defer C.free(unsafe.Pointer(cProtocol)) + // TODO: extract timeout from context + C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(requestTimeout.Milliseconds()), resp) + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DialPeer: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +type pingRequest struct { + ctx context.Context + peerInfo peer.AddrInfo +} + +func (n *WakuNode) PingPeer(ctx context.Context, info peer.AddrInfo) (time.Duration, error) { + response, err := n.postTask(requestTypePing, pingRequest{ + ctx: ctx, + peerInfo: info, + }) + if err != nil { + return 0, err + } + return response.(time.Duration), nil +} + +func (n *WakuNode) Start() error { + _, err := n.postTask(requestTypeStart, nil) + return err +} + +type relayPublishRequest struct { + ctx context.Context + pubsubTopic string + message *pb.WakuMessage +} + +func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { + response, err := n.postTask(requestTypeRelayPublish, relayPublishRequest{ + ctx: ctx, + pubsubTopic: pubsubTopic, + message: message, + }) + if err != nil { + return pb.MessageHash{}, err + } + return response.(pb.MessageHash), nil +} + +type storeQueryRequest struct { + ctx context.Context + storeRequest *storepb.StoreQueryRequest + peerInfo peer.AddrInfo +} + +func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { + response, err := n.postTask(requestTypeStoreQuery, storeQueryRequest{ + ctx: ctx, + peerInfo: peerInfo, + storeRequest: storeRequest, + }) + if err != nil { + return nil, err + } + return response.(*storepb.StoreQueryResponse), nil +} + +func (n *WakuNode) PeerID() (peer.ID, error) { + response, err := n.postTask(requestTypePeerID, nil) + if err != nil { + return "", err + } + return response.(peer.ID), nil +} + +func (n *WakuNode) Stop() error { + _, err := n.postTask(requestTypeStop, nil) + return err +} + +func (n *WakuNode) Destroy() error { + _, err := n.postTask(requestTypeDestroy, nil) + return err +} + +func (n *WakuNode) StartDiscV5() error { + _, err := n.postTask(requestTypeStartDiscV5, nil) + return err +} + +func (n *WakuNode) StopDiscV5() error { + _, err := n.postTask(requestTypeStopDiscV5, nil) + return err +} + +func (n *WakuNode) Version() (string, error) { + response, err := n.postTask(requestTypeVersion, nil) + if err != nil { + return "", err + } + return response.(string), nil +} + +func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { + _, err := n.postTask(requestTypeRelaySubscribe, pubsubTopic) + return err +} + +func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { + _, err := n.postTask(requestTypeRelayUnsubscribe, pubsubTopic) + return err +} + +func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { + response, err := n.postTask(requestTypePeerExchangeRequest, numPeers) + if err != nil { + return 0, err + } + return response.(uint64), nil +} + +type connectRequest struct { + ctx context.Context + addr multiaddr.Multiaddr +} + +func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { + _, err := n.postTask(requestTypeConnect, connectRequest{ + ctx: ctx, + addr: addr, + }) + return err +} + +type dialPeerByIDRequest struct { + ctx context.Context + peerID peer.ID + protocol libp2pproto.ID +} + +func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { + _, err := n.postTask(requestTypeDialPeerByID, dialPeerByIDRequest{ + ctx: ctx, + peerID: peerID, + protocol: protocol, + }) + return err +} + +func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { + response, err := n.postTask(requestTypeListenAddresses, nil) + if err != nil { + return nil, err + } + return response.([]multiaddr.Multiaddr), nil +} + +func (n *WakuNode) ENR() (*enode.Node, error) { + response, err := n.postTask(requestTypeENR, nil) + if err != nil { + return nil, err + } + return response.(*enode.Node), nil +} + +func (n *WakuNode) ListPeersInMesh(pubsubTopic string) (int, error) { + response, err := n.postTask(requestTypeListPeersInMesh, pubsubTopic) + if err != nil { + return 0, err + } + return response.(int), nil +} + +func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { + response, err := n.postTask(requestTypeGetConnectedPeers, nil) + if err != nil { + return nil, err + } + return response.(peer.IDSlice), nil +} + +func (n *WakuNode) GetNumConnectedPeers() (int, error) { + peers, err := n.GetConnectedPeers() + if err != nil { + return 0, err + } + return len(peers), nil +} + +func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) { + response, err := n.postTask(requestTypeGetPeerIDsFromPeerStore, nil) + if err != nil { + return nil, err + } + return response.(peer.IDSlice), nil +} + +func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) { + response, err := n.postTask(requestTypeGetPeerIDsByProtocol, protocol) + if err != nil { + return nil, err + } + return response.(peer.IDSlice), nil +} + +func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { + _, err := n.postTask(requestTypeDisconnectPeerByID, peerID) + return err +} + +type dnsDiscoveryRequest struct { + ctx context.Context + enrTreeUrl string + nameDnsServer string +} + +func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { + response, err := n.postTask(requestTypeDnsDiscovery, dnsDiscoveryRequest{ + ctx: ctx, + enrTreeUrl: enrTreeUrl, + nameDnsServer: nameDnsServer, + }) + if err != nil { + return nil, err + } + return response.([]multiaddr.Multiaddr), nil +} + +type dialPeerRequest struct { + ctx context.Context + peerAddr multiaddr.Multiaddr + protocol libp2pproto.ID +} + +func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { + _, err := n.postTask(requestTypeDialPeer, dialPeerRequest{ + ctx: ctx, + peerAddr: peerAddr, + protocol: protocol, + }) + return err +} + +func (n *WakuNode) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) { + response, err := n.postTask(requestTypeGetNumConnectedRelayPeers, paramPubsubTopic) + if err != nil { + return 0, err + } + return response.(int), nil +}