From 59f8461895a143bb07ff312aa9a1812ca2b7cfe3 Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:41:28 +0100 Subject: [PATCH] feat: integrate async support (#5) --- waku/nwaku.go | 792 +++++++++++++++------------------------------ waku/nwaku_test.go | 10 +- 2 files changed, 269 insertions(+), 533 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index 23ed66f..26d6aa3 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -14,10 +14,13 @@ package waku int ret; char* msg; size_t len; + void* ffiWg; } Resp; - static void* allocResp() { - return calloc(1, sizeof(Resp)); + static void* allocResp(void* wg) { + Resp* r = calloc(1, sizeof(Resp)); + r->ffiWg = wg; + return r; } static void freeResp(void* resp) { @@ -51,52 +54,45 @@ package waku } // resp must be set != NULL in case interest on retrieving data from the callback - static void callback(int ret, char* msg, size_t len, void* resp) { - if (resp != NULL) { - Resp* m = (Resp*) resp; - m->ret = ret; - m->msg = msg; - m->len = len; - } - } + void GoCallback(int ret, char* msg, size_t len, void* resp); #define WAKU_CALL(call) \ do { \ - int ret = call; \ - if (ret != 0) { \ - printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ - exit(1); \ - } \ + int ret = call; \ + if (ret != 0) { \ + printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ + exit(1); \ + } \ } while (0) static void* cGoWakuNew(const char* configJson, void* resp) { // We pass NULL because we are not interested in retrieving data from this callback - void* ret = waku_new(configJson, (WakuCallBack) callback, resp); + void* ret = waku_new(configJson, (WakuCallBack) GoCallback, resp); return ret; } static void cGoWakuStart(void* wakuCtx, void* resp) { - WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStop(void* wakuCtx, void* resp) { - WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuDestroy(void* wakuCtx, void* resp) { - WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { - WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { - WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuVersion(void* wakuCtx, void* resp) { - WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuSetEventCallback(void* wakuCtx) { @@ -127,16 +123,16 @@ package waku appVersion, contentTopicName, encoding, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { - WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) callback, resp) ); + WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { - WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuRelayPublish(void* wakuCtx, @@ -149,14 +145,14 @@ package waku pubSubTopic, jsonWakuMessage, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { WAKU_CALL ( waku_relay_subscribe(wakuCtx, pubSubTopic, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -164,7 +160,7 @@ package waku WAKU_CALL ( waku_relay_unsubscribe(wakuCtx, pubSubTopic, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -172,7 +168,7 @@ package waku WAKU_CALL( waku_connect(wakuCtx, peerMultiAddr, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -186,7 +182,7 @@ package waku peerMultiAddr, protocol, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -200,47 +196,47 @@ package waku peerId, protocol, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx, peerId, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { - WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetMyENR(void* ctx, void* resp) { - WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetMyPeerId(void* ctx, void* resp) { - WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) { - WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { - WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { - WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { - WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { - WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuLightpushPublish(void* wakuCtx, @@ -251,7 +247,7 @@ package waku WAKU_CALL (waku_lightpush_publish(wakuCtx, pubSubTopic, jsonWakuMessage, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -265,7 +261,7 @@ package waku jsonQuery, peerAddr, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -275,7 +271,7 @@ package waku WAKU_CALL (waku_peer_exchange_request(wakuCtx, numPeers, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -285,7 +281,7 @@ package waku WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx, protocol, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -299,7 +295,7 @@ package waku entTreeUrl, nameDnsServer, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -313,6 +309,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "unsafe" @@ -421,84 +418,75 @@ func (w *Waku) DropPeer(peerID peer.ID) error { return w.node.DisconnectPeerByID(peerID) } -type request struct { - id string - reqType requestType - input any - responseCh chan response -} - type response struct { err error value any } -// WakuNode represents an instance of an nwaku node -type WakuNode struct { - wakuCtx unsafe.Pointer - cancel context.CancelFunc - requestCh chan *request +//export GoCallback +func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { + if resp != nil { + m := (*C.Resp)(resp) + m.ret = ret + m.msg = msg + m.len = len + wg := (*sync.WaitGroup)(m.ffiWg) + wg.Done() + } } -type requestType int - -const ( - requestTypeNew requestType = iota + 1 - requestTypePing - requestTypeStart - requestTypeRelayPublish - requestTypeStoreQuery - requestTypePeerID - requestTypeStop - requestTypeDestroy - requestTypeStartDiscV5 - requestTypeStopDiscV5 - requestTypeVersion - requestTypeRelaySubscribe - requestTypeRelayUnsubscribe - requestTypePeerExchangeRequest - requestTypeConnect - requestTypeDialPeerByID - requestTypeListenAddresses - requestTypeENR - requestTypeGetNumPeersInMesh - requestTypeGetConnectedPeers - requestTypeGetPeerIDsFromPeerStore - requestTypeGetPeerIDsByProtocol - requestTypeDisconnectPeerByID - requestTypeDnsDiscovery - requestTypeDialPeer - requestTypeGetNumConnectedRelayPeers -) +// WakuNode represents an instance of an nwaku node +type WakuNode struct { + wakuCtx unsafe.Pointer + cancel context.CancelFunc +} func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) { ctx, cancel := context.WithCancel(ctx) n := &WakuNode{ - requestCh: make(chan *request), - cancel: cancel, + cancel: cancel, } - // Notice this runs insto a separate goroutine. This is because we can't be sure - // from which OS thread will go call nwaku operations (They need to be done from - // the same thread that started nwaku). Communication with the goroutine to send - // operations to nwaku will be done via channels + wg := sync.WaitGroup{} + wg.Add(1) go func() { - // defer gocommon.LogOnPanic() TODO-nwaku + // defer gocommon.LogOnPanic() runtime.LockOSThread() defer runtime.UnlockOSThread() - C.waku_setup() + wg.Done() - n.processLoop(ctx) + <-ctx.Done() }() - _, err := n.postTask(requestTypeNew, config) + wg.Wait() + + jsonConfig, err := json.Marshal(config) if err != nil { - cancel() return nil, err } + + var cJsonConfig = C.CString(string(jsonConfig)) + + var resp = C.allocResp(unsafe.Pointer(&wg)) + + defer C.free(unsafe.Pointer(cJsonConfig)) + defer C.freeResp(resp) + + if C.getRet(resp) != C.RET_OK { + errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) + } + + wg.Add(1) + n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp) + wg.Wait() + + // Notice that the events for self node are handled by the 'MyEventCallback' method + C.cGoWakuSetEventCallback(n.wakuCtx) + return n, nil } @@ -530,20 +518,6 @@ func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) { }, nil } -func (n *WakuNode) postTask(reqType requestType, input any) (any, error) { - responseCh := make(chan response) - n.requestCh <- &request{ - reqType: reqType, - input: input, - responseCh: responseCh, - } - response := <-responseCh - if response.err != nil { - return nil, response.err - } - return response.value, nil -} - //export globalEventCallback func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) { // This is shared among all Golang instances @@ -552,82 +526,7 @@ func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData un // self.MyEventCallback(callerRet, msg, len) } -func (n *WakuNode) processLoop(ctx context.Context) { - for req := range n.requestCh { - switch req.reqType { - case requestTypeNew: - req.responseCh <- response{err: n.newNode(req.input.(*WakuConfig))} - case requestTypePing: - duration, err := n.pingPeer(req.input.(pingRequest)) - req.responseCh <- response{value: duration, err: err} - case requestTypeStart: - req.responseCh <- response{err: n.start()} - case requestTypeRelayPublish: - hash, err := n.relayPublish(req.input.(relayPublishRequest)) - req.responseCh <- response{value: hash, err: err} - case requestTypeStoreQuery: - results, err := n.storeQuery(req.input.(storeQueryRequest)) - req.responseCh <- response{value: results, err: err} - case requestTypeDestroy: - req.responseCh <- response{err: n.destroy()} - case requestTypePeerID: - peerID, err := n.peerID() - req.responseCh <- response{value: peerID, err: err} - case requestTypeStop: - req.responseCh <- response{err: n.stop()} - case requestTypeStartDiscV5: - req.responseCh <- response{err: n.startDiscV5()} - case requestTypeStopDiscV5: - req.responseCh <- response{err: n.stopDiscV5()} - case requestTypeVersion: - version, err := n.version() - req.responseCh <- response{value: version, err: err} - case requestTypePeerExchangeRequest: - numPeers, err := n.peerExchangeRequest(req.input.(uint64)) - req.responseCh <- response{value: numPeers, err: err} - case requestTypeRelaySubscribe: - req.responseCh <- response{err: n.relaySubscribe(req.input.(string))} - case requestTypeRelayUnsubscribe: - req.responseCh <- response{err: n.relayUnsubscribe(req.input.(string))} - case requestTypeConnect: - req.responseCh <- response{err: n.connect(req.input.(connectRequest))} - case requestTypeDialPeerByID: - req.responseCh <- response{err: n.dialPeerById(req.input.(dialPeerByIDRequest))} - case requestTypeListenAddresses: - addrs, err := n.listenAddresses() - req.responseCh <- response{value: addrs, err: err} - case requestTypeENR: - enr, err := n.enr() - req.responseCh <- response{value: enr, err: err} - case requestTypeGetNumPeersInMesh: - numPeers, err := n.getNumPeersInMesh(req.input.(string)) - req.responseCh <- response{value: numPeers, err: err} - case requestTypeGetConnectedPeers: - peers, err := n.getConnectedPeers() - req.responseCh <- response{value: peers, err: err} - case requestTypeGetPeerIDsFromPeerStore: - peers, err := n.getPeerIDsFromPeerStore() - req.responseCh <- response{value: peers, err: err} - case requestTypeGetPeerIDsByProtocol: - peers, err := n.getPeerIDsByProtocol(req.input.(libp2pproto.ID)) - req.responseCh <- response{value: peers, err: err} - case requestTypeDisconnectPeerByID: - req.responseCh <- response{err: n.disconnectPeerByID(req.input.(peer.ID))} - case requestTypeDnsDiscovery: - addrs, err := n.dnsDiscovery(req.input.(dnsDiscoveryRequest)) - req.responseCh <- response{value: addrs, err: err} - case requestTypeDialPeer: - req.responseCh <- response{err: n.dialPeer(req.input.(dialPeerRequest))} - case requestTypeGetNumConnectedRelayPeers: - numPeers, err := n.getNumConnectedRelayPeers(req.input.([]string)...) - req.responseCh <- response{value: numPeers, err: err} - default: - req.responseCh <- response{err: errors.New("invalid operation")} - } - } -} - -func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { +func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { var pubsubTopic string if len(optPubsubTopic) == 0 { pubsubTopic = "" @@ -635,9 +534,12 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err pubsubTopic = optPubsubTopic[0] } - var resp = C.allocResp() - var cPubsubTopic = C.CString(pubsubTopic) + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + var cPubsubTopic = C.CString(pubsubTopic) defer C.free(unsafe.Pointer(cPubsubTopic)) C.cGoWakuGetNumConnectedRelayPeers(n.wakuCtx, cPubsubTopic, resp) @@ -656,13 +558,17 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err return 0, errors.New(errMsg) } -func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error { - var resp = C.allocResp() +func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPeerId = C.CString(peerID.String()) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerId)) + wg.Add(1) C.cGoWakuDisconnectPeerById(n.wakuCtx, cPeerId, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -671,10 +577,16 @@ func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error { return errors.New(errMsg) } -func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) { - var resp = C.allocResp() +func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + wg.Add(1) C.cGoWakuGetConnectedPeers(n.wakuCtx, resp) + wg.Wait() + if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { @@ -697,12 +609,14 @@ func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) { } -func (n *WakuNode) relaySubscribe(pubsubTopic string) error { +func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { if pubsubTopic == "" { return errors.New("pubsub topic is empty") } - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) @@ -712,7 +626,9 @@ func (n *WakuNode) relaySubscribe(pubsubTopic string) error { return errors.New("wakuCtx is nil") } + wg.Add(1) C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -722,12 +638,14 @@ func (n *WakuNode) relaySubscribe(pubsubTopic string) error { return errors.New(errMsg) } -func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { +func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { if pubsubTopic == "" { return errors.New("pubsub topic is empty") } - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) @@ -737,7 +655,9 @@ func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { return errors.New("wakuCtx is nil") } + wg.Add(1) C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -747,11 +667,15 @@ func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error { return errors.New(errMsg) } -func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) { - var resp = C.allocResp() +func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuPeerExchangeQuery(n.wakuCtx, C.uint64_t(numPeers), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) @@ -765,11 +689,15 @@ func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) { return 0, errors.New(errMsg) } -func (n *WakuNode) startDiscV5() error { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuStartDiscV5(n.wakuCtx, resp) +func (n *WakuNode) StartDiscV5() error { + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + wg.Add(1) + C.cGoWakuStartDiscV5(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -777,10 +705,15 @@ func (n *WakuNode) startDiscV5() error { return errors.New(errMsg) } -func (n *WakuNode) stopDiscV5() error { - var resp = C.allocResp() +func (n *WakuNode) StopDiscV5() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + wg.Add(1) C.cGoWakuStopDiscV5(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil @@ -789,11 +722,15 @@ func (n *WakuNode) stopDiscV5() error { return errors.New(errMsg) } -func (n *WakuNode) version() (string, error) { - var resp = C.allocResp() +func (n *WakuNode) Version() (string, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuVersion(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -805,29 +742,34 @@ func (n *WakuNode) version() (string, error) { return "", errors.New(errMsg) } -func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) { +func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { // TODO: extract timeout from context timeoutMs := time.Minute.Milliseconds() - b, err := json.Marshal(storeQueryRequest.storeRequest) + b, err := json.Marshal(storeRequest) if err != nil { return nil, err } - addrs := make([]string, len(storeQueryRequest.peerInfo.Addrs)) - for i, addr := range utils.EncapsulatePeerID(storeQueryRequest.peerInfo.ID, storeQueryRequest.peerInfo.Addrs...) { + addrs := make([]string, len(peerInfo.Addrs)) + for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { addrs[i] = addr.String() } var cJsonQuery = C.CString(string(b)) var cPeerAddr = C.CString(strings.Join(addrs, ",")) - var resp = C.allocResp() + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.free(unsafe.Pointer(cJsonQuery)) defer C.free(unsafe.Pointer(cPeerAddr)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuStoreQuery(n.wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp) + wg.Wait() + if C.getRet(resp) == C.RET_OK { jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) storeQueryResponse := &storepb.StoreQueryResponse{} @@ -842,24 +784,27 @@ func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) return nil, errors.New(errMsg) } -func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.MessageHash, error) { +func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { // TODO: extract timeout from context timeoutMs := time.Minute.Milliseconds() - jsonMsg, err := json.Marshal(relayPublishRequest.message) + jsonMsg, err := json.Marshal(message) if err != nil { return pb.MessageHash{}, err } - var cPubsubTopic = C.CString(relayPublishRequest.pubsubTopic) - var msg = C.CString(string(jsonMsg)) - var resp = C.allocResp() + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPubsubTopic = C.CString(pubsubTopic) + var msg = C.CString(string(jsonMsg)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) defer C.free(unsafe.Pointer(msg)) + wg.Add(1) C.cGoWakuRelayPublish(n.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) msgHashBytes, err := hexutil.Decode(msgHash) @@ -872,15 +817,20 @@ func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.Mes return pb.MessageHash{}, errors.New(errMsg) } -func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr.Multiaddr, error) { - var resp = C.allocResp() - var cEnrTree = C.CString(dnsDiscRequest.enrTreeUrl) - var cDnsServer = C.CString(dnsDiscRequest.nameDnsServer) +func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cEnrTree = C.CString(enrTreeUrl) + var cDnsServer = C.CString(nameDnsServer) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cEnrTree)) defer C.free(unsafe.Pointer(cDnsServer)) + // TODO: extract timeout from context + wg.Add(1) C.cGoWakuDnsDiscovery(n.wakuCtx, cEnrTree, cDnsServer, C.int(time.Minute.Milliseconds()), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { var addrsRet []multiaddr.Multiaddr nodeAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -898,21 +848,24 @@ func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr return nil, errors.New(errMsg) } -func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) { - peerInfo := request.peerInfo - +func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) { addrs := make([]string, len(peerInfo.Addrs)) for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) { addrs[i] = addr.String() } - var resp = C.allocResp() - var cPeerId = C.CString(strings.Join(addrs, ",")) + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + + var cPeerId = C.CString(strings.Join(addrs, ",")) defer C.free(unsafe.Pointer(cPeerId)) // TODO: extract timeout from ctx + wg.Add(1) C.cGoWakuPingPeer(n.wakuCtx, cPeerId, C.int(time.Minute.Milliseconds()), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { rttStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) rttInt, err := strconv.ParseInt(rttStr, 10, 64) @@ -926,12 +879,15 @@ func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) { return 0, fmt.Errorf("PingPeer: %s", errMsg) } -func (n *WakuNode) start() error { - var resp = C.allocResp() +func (n *WakuNode) Start() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuStart(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -940,12 +896,15 @@ func (n *WakuNode) start() error { return errors.New(errMsg) } -func (n *WakuNode) stop() error { - var resp = C.allocResp() +func (n *WakuNode) Stop() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuStop(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -954,12 +913,15 @@ func (n *WakuNode) stop() error { return errors.New(errMsg) } -func (n *WakuNode) destroy() error { - var resp = C.allocResp() +func (n *WakuNode) Destroy() error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuDestroy(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -968,12 +930,15 @@ func (n *WakuNode) destroy() error { return errors.New(errMsg) } -func (n *WakuNode) peerID() (peer.ID, error) { - var resp = C.allocResp() +func (n *WakuNode) PeerID() (peer.ID, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuGetMyPeerId(n.wakuCtx, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) id, err := peer.Decode(peerIdStr) @@ -987,15 +952,18 @@ func (n *WakuNode) peerID() (peer.ID, error) { return "", errors.New(errMsg) } -func (n *WakuNode) connect(connReq connectRequest) error { - var resp = C.allocResp() - var cPeerMultiAddr = C.CString(connReq.addr.String()) +func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPeerMultiAddr = C.CString(addr.String()) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerMultiAddr)) // TODO: extract timeout from ctx + wg.Add(1) C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(time.Minute.Milliseconds()), resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -1004,17 +972,20 @@ func (n *WakuNode) connect(connReq connectRequest) error { return errors.New(errMsg) } -func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { - var resp = C.allocResp() - var cPeerId = C.CString(dialPeerByIDReq.peerID.String()) - var cProtocol = C.CString(string(dialPeerByIDReq.protocol)) +func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPeerId = C.CString(peerID.String()) + var cProtocol = C.CString(string(protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerId)) defer C.free(unsafe.Pointer(cProtocol)) // TODO: extract timeout from ctx + wg.Add(1) C.cGoWakuDialPeerById(n.wakuCtx, cPeerId, cProtocol, C.int(time.Minute.Milliseconds()), resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -1023,11 +994,15 @@ func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error { return errors.New(errMsg) } -func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuListenAddresses(n.wakuCtx, resp) +func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + wg.Add(1) + C.cGoWakuListenAddresses(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { var addrsRet []multiaddr.Multiaddr listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -1045,11 +1020,15 @@ func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { return nil, errors.New(errMsg) } -func (n *WakuNode) enr() (*enode.Node, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuGetMyENR(n.wakuCtx, resp) +func (n *WakuNode) ENR() (*enode.Node, error) { + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + wg.Add(1) + C.cGoWakuGetMyENR(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) n, err := enode.Parse(enode.ValidSchemes, enrStr) @@ -1063,14 +1042,17 @@ func (n *WakuNode) enr() (*enode.Node, error) { return nil, errors.New(errMsg) } -func (n *WakuNode) getNumPeersInMesh(pubsubTopic string) (int, error) { - var resp = C.allocResp() +func (n *WakuNode) GetNumPeersInMesh(pubsubTopic string) (int, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) + wg.Add(1) C.cGoWakuGetNumPeersInMesh(n.wakuCtx, cPubsubTopic, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numPeers, err := strconv.Atoi(numPeersStr) @@ -1085,11 +1067,15 @@ func (n *WakuNode) getNumPeersInMesh(pubsubTopic string) (int, error) { return 0, errors.New(errMsg) } -func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) { - var resp = C.allocResp() - defer C.freeResp(resp) - C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp) +func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) { + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + wg.Add(1) + C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { @@ -1112,14 +1098,17 @@ func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) { return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg) } -func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice, error) { - var resp = C.allocResp() - var cProtocol = C.CString(string(protocolID)) +func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cProtocol = C.CString(string(protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cProtocol)) + wg.Add(1) C.cGoWakuGetPeerIdsByProtocol(n.wakuCtx, cProtocol, resp) - + wg.Wait() if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { @@ -1143,41 +1132,19 @@ func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice return nil, fmt.Errorf("GetPeerIdsByProtocol: %s", errMsg) } -func (n *WakuNode) newNode(config *WakuConfig) error { - jsonConfig, err := json.Marshal(config) - if err != nil { - return err - } +func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { + wg := sync.WaitGroup{} - var cJsonConfig = C.CString(string(jsonConfig)) - var resp = C.allocResp() - - defer C.free(unsafe.Pointer(cJsonConfig)) - defer C.freeResp(resp) - - if C.getRet(resp) != C.RET_OK { - errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) - } - - wakuCtx := C.cGoWakuNew(cJsonConfig, resp) - n.wakuCtx = unsafe.Pointer(wakuCtx) - - // Notice that the events for self node are handled by the 'MyEventCallback' method - C.cGoWakuSetEventCallback(n.wakuCtx) - - return nil -} - -func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { - var resp = C.allocResp() - var cPeerMultiAddr = C.CString(dialPeerReq.peerAddr.String()) - var cProtocol = C.CString(string(dialPeerReq.protocol)) + var resp = C.allocResp(unsafe.Pointer(&wg)) + var cPeerMultiAddr = C.CString(peerAddr.String()) + var cProtocol = C.CString(string(protocol)) defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPeerMultiAddr)) defer C.free(unsafe.Pointer(cProtocol)) // TODO: extract timeout from context + wg.Add(1) C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(requestTimeout.Milliseconds()), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { return nil } @@ -1185,177 +1152,6 @@ func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { return errors.New(errMsg) } -type pingRequest struct { - ctx context.Context - peerInfo peer.AddrInfo -} - -func (n *WakuNode) PingPeer(ctx context.Context, info peer.AddrInfo) (time.Duration, error) { - response, err := n.postTask(requestTypePing, pingRequest{ - ctx: ctx, - peerInfo: info, - }) - if err != nil { - return 0, err - } - return response.(time.Duration), nil -} - -func (n *WakuNode) Start() error { - _, err := n.postTask(requestTypeStart, nil) - return err -} - -type relayPublishRequest struct { - ctx context.Context - pubsubTopic string - message *pb.WakuMessage -} - -func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { - response, err := n.postTask(requestTypeRelayPublish, relayPublishRequest{ - ctx: ctx, - pubsubTopic: pubsubTopic, - message: message, - }) - if err != nil { - return pb.MessageHash{}, err - } - return response.(pb.MessageHash), nil -} - -type storeQueryRequest struct { - ctx context.Context - storeRequest *storepb.StoreQueryRequest - peerInfo peer.AddrInfo -} - -func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { - response, err := n.postTask(requestTypeStoreQuery, storeQueryRequest{ - ctx: ctx, - peerInfo: peerInfo, - storeRequest: storeRequest, - }) - if err != nil { - return nil, err - } - return response.(*storepb.StoreQueryResponse), nil -} - -func (n *WakuNode) PeerID() (peer.ID, error) { - response, err := n.postTask(requestTypePeerID, nil) - if err != nil { - return "", err - } - return response.(peer.ID), nil -} - -func (n *WakuNode) Stop() error { - _, err := n.postTask(requestTypeStop, nil) - return err -} - -func (n *WakuNode) Destroy() error { - _, err := n.postTask(requestTypeDestroy, nil) - return err -} - -func (n *WakuNode) StartDiscV5() error { - _, err := n.postTask(requestTypeStartDiscV5, nil) - return err -} - -func (n *WakuNode) StopDiscV5() error { - _, err := n.postTask(requestTypeStopDiscV5, nil) - return err -} - -func (n *WakuNode) Version() (string, error) { - response, err := n.postTask(requestTypeVersion, nil) - if err != nil { - return "", err - } - return response.(string), nil -} - -func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { - _, err := n.postTask(requestTypeRelaySubscribe, pubsubTopic) - return err -} - -func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { - _, err := n.postTask(requestTypeRelayUnsubscribe, pubsubTopic) - return err -} - -func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { - response, err := n.postTask(requestTypePeerExchangeRequest, numPeers) - if err != nil { - return 0, err - } - return response.(uint64), nil -} - -type connectRequest struct { - ctx context.Context - addr multiaddr.Multiaddr -} - -func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { - _, err := n.postTask(requestTypeConnect, connectRequest{ - ctx: ctx, - addr: addr, - }) - return err -} - -type dialPeerByIDRequest struct { - ctx context.Context - peerID peer.ID - protocol libp2pproto.ID -} - -func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { - _, err := n.postTask(requestTypeDialPeerByID, dialPeerByIDRequest{ - ctx: ctx, - peerID: peerID, - protocol: protocol, - }) - return err -} - -func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { - response, err := n.postTask(requestTypeListenAddresses, nil) - if err != nil { - return nil, err - } - return response.([]multiaddr.Multiaddr), nil -} - -func (n *WakuNode) ENR() (*enode.Node, error) { - response, err := n.postTask(requestTypeENR, nil) - if err != nil { - return nil, err - } - return response.(*enode.Node), nil -} - -func (n *WakuNode) GetNumPeersInMesh(pubsubTopic string) (int, error) { - response, err := n.postTask(requestTypeGetNumPeersInMesh, pubsubTopic) - if err != nil { - return 0, err - } - return response.(int), nil -} - -func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetConnectedPeers, nil) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - func (n *WakuNode) GetNumConnectedPeers() (int, error) { peers, err := n.GetConnectedPeers() if err != nil { @@ -1363,65 +1159,3 @@ func (n *WakuNode) GetNumConnectedPeers() (int, error) { } return len(peers), nil } - -func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetPeerIDsFromPeerStore, nil) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - -func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetPeerIDsByProtocol, protocol) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - -func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { - _, err := n.postTask(requestTypeDisconnectPeerByID, peerID) - return err -} - -type dnsDiscoveryRequest struct { - ctx context.Context - enrTreeUrl string - nameDnsServer string -} - -func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { - response, err := n.postTask(requestTypeDnsDiscovery, dnsDiscoveryRequest{ - ctx: ctx, - enrTreeUrl: enrTreeUrl, - nameDnsServer: nameDnsServer, - }) - if err != nil { - return nil, err - } - return response.([]multiaddr.Multiaddr), nil -} - -type dialPeerRequest struct { - ctx context.Context - peerAddr multiaddr.Multiaddr - protocol libp2pproto.ID -} - -func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { - _, err := n.postTask(requestTypeDialPeer, dialPeerRequest{ - ctx: ctx, - peerAddr: peerAddr, - protocol: protocol, - }) - return err -} - -func (n *WakuNode) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) { - response, err := n.postTask(requestTypeGetNumConnectedRelayPeers, paramPubsubTopic) - if err != nil { - return 0, err - } - return response.(int), nil -} diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index f368628..9b9736d 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -31,6 +31,8 @@ func TestBasicWaku(t *testing.T) { storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort) require.NoError(t, err) + // ctx := context.Background() + nwakuConfig := WakuConfig{ Port: 30303, NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", @@ -77,11 +79,11 @@ func TestBasicWaku(t *testing.T) { storeNode, err := peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) - /* for i := 0; i <= 100; i++ { - time.Sleep(2 * time.Second) - } + /* + w.node.DialPeer(ctx, storeNode.Addrs[0], "") - w.StorenodeCycle.SetStorenodeConfigProvider(newTestStorenodeConfigProvider(*storeNode)) */ + w.StorenodeCycle.SetStorenodeConfigProvider(newTestStorenodeConfigProvider(*storeNode)) + */ // Check that we are indeed connected to the store node connectedStoreNodes, err := w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)