diff --git a/waku/nwaku.go b/waku/nwaku.go index 7bd40bd..f6c7d65 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -526,7 +526,7 @@ func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData un // self.MyEventCallback(callerRet, msg, len) } -func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { +func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { var pubsubTopic string if len(optPubsubTopic) == 0 { pubsubTopic = "" @@ -534,9 +534,12 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err pubsubTopic = optPubsubTopic[0] } - var resp = C.allocResp() - var cPubsubTopic = C.CString(pubsubTopic) + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + var cPubsubTopic = C.CString(pubsubTopic) defer C.free(unsafe.Pointer(cPubsubTopic)) C.cGoWakuGetNumConnectedRelayPeers(n.wakuCtx, cPubsubTopic, resp) @@ -555,13 +558,17 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err return 0, errors.New(errMsg) } -func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error { - var resp = C.allocResp() +func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPeerId = C.CString(peerID.String()) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerId)) + wg.Add(1) C.cGoWakuDisconnectPeerById(n.wakuCtx, cPeerId, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -570,10 +577,16 @@ func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error { return errors.New(errMsg) } -func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) { - var resp = C.allocResp() +func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + wg.Add(1) C.cGoWakuGetConnectedPeers(n.wakuCtx, resp) + wg.Wait() + if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { @@ -596,12 +609,14 @@ func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) { } -func (n *WakuNode) relaySubscribe(pubsubTopic string) error { +func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { if pubsubTopic == "" { return errors.New("pubsub topic is empty") } - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) @@ -611,7 +626,9 @@ func (n *WakuNode) relaySubscribe(pubsubTopic string) error { return errors.New("wakuCtx is nil") } + wg.Add(1) C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -621,12 +638,14 @@ func (n *WakuNode) relaySubscribe(pubsubTopic string) error { return errors.New(errMsg) } -func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { +func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { if pubsubTopic == "" { return errors.New("pubsub topic is empty") } - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) @@ -636,7 +655,9 @@ func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { return errors.New("wakuCtx is nil") } + wg.Add(1) C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -664,11 +685,15 @@ func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) { return 0, errors.New(errMsg) } -func (n *WakuNode) startDiscV5() error { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuStartDiscV5(n.wakuCtx, resp) +func (n *WakuNode) StartDiscV5() error { + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + wg.Add(1) + C.cGoWakuStartDiscV5(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -676,10 +701,15 @@ func (n *WakuNode) startDiscV5() error { return errors.New(errMsg) } -func (n *WakuNode) stopDiscV5() error { - var resp = C.allocResp() +func (n *WakuNode) StopDiscV5() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + wg.Add(1) C.cGoWakuStopDiscV5(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -688,11 +718,15 @@ func (n *WakuNode) stopDiscV5() error { return errors.New(errMsg) } -func (n *WakuNode) version() (string, error) { - var resp = C.allocResp() +func (n *WakuNode) Version() (string, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuVersion(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -704,29 +738,34 @@ func (n *WakuNode) version() (string, error) { return "", errors.New(errMsg) } -func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) { +func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { // TODO: extract timeout from context timeoutMs := time.Minute.Milliseconds() - b, err := json.Marshal(storeQueryRequest.storeRequest) + b, err := json.Marshal(storeRequest) if err != nil { return nil, err } - addrs := make([]string, len(storeQueryRequest.peerInfo.Addrs)) - for i, addr := range utils.EncapsulatePeerID(storeQueryRequest.peerInfo.ID, storeQueryRequest.peerInfo.Addrs...) { + addrs := make([]string, len(peerInfo.Addrs)) + for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { addrs[i] = addr.String() } var cJsonQuery = C.CString(string(b)) var cPeerAddr = C.CString(strings.Join(addrs, ",")) - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.free(unsafe.Pointer(cJsonQuery)) defer C.free(unsafe.Pointer(cPeerAddr)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuStoreQuery(n.wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp) + wg.Wait() + if C.getRet(resp) == C.RET_OK { jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) storeQueryResponse := &storepb.StoreQueryResponse{} @@ -741,24 +780,27 @@ func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) return nil, errors.New(errMsg) } -func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.MessageHash, error) { +func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { // TODO: extract timeout from context timeoutMs := time.Minute.Milliseconds() - jsonMsg, err := json.Marshal(relayPublishRequest.message) + jsonMsg, err := json.Marshal(message) if err != nil { return pb.MessageHash{}, err } - var cPubsubTopic = C.CString(relayPublishRequest.pubsubTopic) - var msg = C.CString(string(jsonMsg)) - var resp = C.allocResp() + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPubsubTopic = C.CString(pubsubTopic) + var msg = C.CString(string(jsonMsg)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) defer C.free(unsafe.Pointer(msg)) + wg.Add(1) C.cGoWakuRelayPublish(n.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) msgHashBytes, err := hexutil.Decode(msgHash) @@ -771,15 +813,20 @@ func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.Mes return pb.MessageHash{}, errors.New(errMsg) } -func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr.Multiaddr, error) { - var resp = C.allocResp() - var cEnrTree = C.CString(dnsDiscRequest.enrTreeUrl) - var cDnsServer = C.CString(dnsDiscRequest.nameDnsServer) +func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cEnrTree = C.CString(enrTreeUrl) + var cDnsServer = C.CString(nameDnsServer) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cEnrTree)) defer C.free(unsafe.Pointer(cDnsServer)) + // TODO: extract timeout from context + wg.Add(1) C.cGoWakuDnsDiscovery(n.wakuCtx, cEnrTree, cDnsServer, C.int(time.Minute.Milliseconds()), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { var addrsRet []multiaddr.Multiaddr nodeAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -797,21 +844,24 @@ func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr return nil, errors.New(errMsg) } -func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) { - peerInfo := request.peerInfo - +func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) { addrs := make([]string, len(peerInfo.Addrs)) for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { addrs[i] = addr.String() } - var resp = C.allocResp() - var cPeerId = C.CString(strings.Join(addrs, ",")) + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + var cPeerId = C.CString(strings.Join(addrs, ",")) defer C.free(unsafe.Pointer(cPeerId)) // TODO: extract timeout from ctx + wg.Add(1) C.cGoWakuPingPeer(n.wakuCtx, cPeerId, C.int(time.Minute.Milliseconds()), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { rttStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) rttInt, err := strconv.ParseInt(rttStr, 10, 64) @@ -825,12 +875,15 @@ func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) { return 0, fmt.Errorf("PingPeer: %s", errMsg) } -func (n *WakuNode) start() error { - var resp = C.allocResp() +func (n *WakuNode) Start() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuStart(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -839,12 +892,15 @@ func (n *WakuNode) start() error { return errors.New(errMsg) } -func (n *WakuNode) stop() error { - var resp = C.allocResp() +func (n *WakuNode) Stop() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuStop(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -853,12 +909,15 @@ func (n *WakuNode) stop() error { return errors.New(errMsg) } -func (n *WakuNode) destroy() error { - var resp = C.allocResp() +func (n *WakuNode) Destroy() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuDestroy(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -867,12 +926,15 @@ func (n *WakuNode) destroy() error { return errors.New(errMsg) } -func (n *WakuNode) peerID() (peer.ID, error) { - var resp = C.allocResp() +func (n *WakuNode) PeerID() (peer.ID, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuGetMyPeerId(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) id, err := peer.Decode(peerIdStr) @@ -886,15 +948,18 @@ func (n *WakuNode) peerID() (peer.ID, error) { return "", errors.New(errMsg) } -func (n *WakuNode) connect(connReq connectRequest) error { - var resp = C.allocResp() - var cPeerMultiAddr = C.CString(connReq.addr.String()) +func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPeerMultiAddr = C.CString(addr.String()) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerMultiAddr)) // TODO: extract timeout from ctx + wg.Add(1) C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(time.Minute.Milliseconds()), resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -903,17 +968,20 @@ func (n *WakuNode) connect(connReq connectRequest) error { return errors.New(errMsg) } -func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { - var resp = C.allocResp() - var cPeerId = C.CString(dialPeerByIDReq.peerID.String()) - var cProtocol = C.CString(string(dialPeerByIDReq.protocol)) +func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPeerId = C.CString(peerID.String()) + var cProtocol = C.CString(string(protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerId)) defer C.free(unsafe.Pointer(cProtocol)) // TODO: extract timeout from ctx + wg.Add(1) C.cGoWakuDialPeerById(n.wakuCtx, cPeerId, cProtocol, C.int(time.Minute.Milliseconds()), resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -922,11 +990,15 @@ func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { return errors.New(errMsg) } -func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuListenAddresses(n.wakuCtx, resp) +func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + wg.Add(1) + C.cGoWakuListenAddresses(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { var addrsRet []multiaddr.Multiaddr listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -944,11 +1016,15 @@ func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { return nil, errors.New(errMsg) } -func (n *WakuNode) enr() (*enode.Node, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuGetMyENR(n.wakuCtx, resp) +func (n *WakuNode) ENR() (*enode.Node, error) { + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + wg.Add(1) + C.cGoWakuGetMyENR(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) n, err := enode.Parse(enode.ValidSchemes, enrStr) @@ -962,14 +1038,17 @@ func (n *WakuNode) enr() (*enode.Node, error) { return nil, errors.New(errMsg) } -func (n *WakuNode) getNumPeersInMesh(pubsubTopic string) (int, error) { - var resp = C.allocResp() +func (n *WakuNode) GetNumPeersInMesh(pubsubTopic string) (int, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) + wg.Add(1) C.cGoWakuGetNumPeersInMesh(n.wakuCtx, cPubsubTopic, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numPeers, err := strconv.Atoi(numPeersStr) @@ -984,11 +1063,15 @@ func (n *WakuNode) getNumPeersInMesh(pubsubTopic string) (int, error) { return 0, errors.New(errMsg) } -func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp) +func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) { + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + wg.Add(1) + C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { @@ -1011,14 +1094,17 @@ func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) { return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg) } -func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice, error) { - var resp = C.allocResp() - var cProtocol = C.CString(string(protocolID)) +func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cProtocol = C.CString(string(protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cProtocol)) + wg.Add(1) C.cGoWakuGetPeerIdsByProtocol(n.wakuCtx, cProtocol, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { @@ -1042,41 +1128,19 @@ func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice return nil, fmt.Errorf("GetPeerIdsByProtocol: %s", errMsg) } -func (n *WakuNode) newNode(config *WakuConfig) error { - jsonConfig, err := json.Marshal(config) - if err != nil { - return err - } +func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { + wg := sync.WaitGroup{} - var cJsonConfig = C.CString(string(jsonConfig)) - var resp = C.allocResp() - - defer C.free(unsafe.Pointer(cJsonConfig)) - defer C.freeResp(resp) - - if C.getRet(resp) != C.RET_OK { - errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) - } - - wakuCtx := C.cGoWakuNew(cJsonConfig, resp) - n.wakuCtx = unsafe.Pointer(wakuCtx) - - // Notice that the events for self node are handled by the 'MyEventCallback' method - C.cGoWakuSetEventCallback(n.wakuCtx) - - return nil -} - -func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { - var resp = C.allocResp() - var cPeerMultiAddr = C.CString(dialPeerReq.peerAddr.String()) - var cProtocol = C.CString(string(dialPeerReq.protocol)) + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPeerMultiAddr = C.CString(peerAddr.String()) + var cProtocol = C.CString(string(protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerMultiAddr)) defer C.free(unsafe.Pointer(cProtocol)) // TODO: extract timeout from context + wg.Add(1) C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(requestTimeout.Milliseconds()), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil }