diff --git a/waku/nwaku.go b/waku/nwaku.go index 23ed66f..7bd40bd 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -14,10 +14,13 @@ package waku int ret; char* msg; size_t len; + void* wg; } Resp; - static void* allocResp() { - return calloc(1, sizeof(Resp)); + static void* allocResp(void* wg) { + Resp* r = calloc(1, sizeof(Resp)); + r->wg = wg; + return r; } static void freeResp(void* resp) { @@ -51,52 +54,45 @@ package waku } // resp must be set != NULL in case interest on retrieving data from the callback - static void callback(int ret, char* msg, size_t len, void* resp) { - if (resp != NULL) { - Resp* m = (Resp*) resp; - m->ret = ret; - m->msg = msg; - m->len = len; - } - } + void GoCallback(int ret, char* msg, size_t len, void* resp); #define WAKU_CALL(call) \ do { \ - int ret = call; \ - if (ret != 0) { \ - printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ - exit(1); \ - } \ + int ret = call; \ + if (ret != 0) { \ + printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ + exit(1); \ + } \ } while (0) static void* cGoWakuNew(const char* configJson, void* resp) { // We pass NULL because we are not interested in retrieving data from this callback - void* ret = waku_new(configJson, (WakuCallBack) callback, resp); + void* ret = waku_new(configJson, (WakuCallBack) GoCallback, resp); return ret; } static void cGoWakuStart(void* wakuCtx, void* resp) { - WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStop(void* wakuCtx, void* resp) { - WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuDestroy(void* wakuCtx, void* resp) { - WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { - WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { - WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuVersion(void* wakuCtx, void* resp) { - WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuSetEventCallback(void* wakuCtx) { @@ -127,16 +123,16 @@ package waku appVersion, contentTopicName, encoding, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { - WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) callback, resp) ); + WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { - WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) callback, resp)); + WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuRelayPublish(void* wakuCtx, @@ -149,14 +145,14 @@ package waku pubSubTopic, jsonWakuMessage, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { WAKU_CALL ( waku_relay_subscribe(wakuCtx, pubSubTopic, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -164,7 +160,7 @@ package waku WAKU_CALL ( waku_relay_unsubscribe(wakuCtx, pubSubTopic, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -172,7 +168,7 @@ package waku WAKU_CALL( waku_connect(wakuCtx, peerMultiAddr, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -186,7 +182,7 @@ package waku peerMultiAddr, protocol, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } @@ -200,47 +196,47 @@ package waku peerId, protocol, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx, peerId, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp) ); } static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { - WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetMyENR(void* ctx, void* resp) { - WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetMyPeerId(void* ctx, void* resp) { - WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) { - WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { - WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { - WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { - WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { - WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) ); + WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuLightpushPublish(void* wakuCtx, @@ -251,7 +247,7 @@ package waku WAKU_CALL (waku_lightpush_publish(wakuCtx, pubSubTopic, jsonWakuMessage, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -265,7 +261,7 @@ package waku jsonQuery, peerAddr, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -275,7 +271,7 @@ package waku WAKU_CALL (waku_peer_exchange_request(wakuCtx, numPeers, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -285,7 +281,7 @@ package waku WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx, protocol, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -299,7 +295,7 @@ package waku entTreeUrl, nameDnsServer, timeoutMs, - (WakuCallBack) callback, + (WakuCallBack) GoCallback, resp)); } @@ -313,6 +309,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "unsafe" @@ -421,84 +418,75 @@ func (w *Waku) DropPeer(peerID peer.ID) error { return w.node.DisconnectPeerByID(peerID) } -type request struct { - id string - reqType requestType - input any - responseCh chan response -} - type response struct { err error value any } -// WakuNode represents an instance of an nwaku node -type WakuNode struct { - wakuCtx unsafe.Pointer - cancel context.CancelFunc - requestCh chan *request +//export GoCallback +func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { + if resp != nil { + m := (*C.Resp)(resp) + m.ret = ret + m.msg = msg + m.len = len + wg := (*sync.WaitGroup)(m.wg) + wg.Done() + } } -type requestType int - -const ( - requestTypeNew requestType = iota + 1 - requestTypePing - requestTypeStart - requestTypeRelayPublish - requestTypeStoreQuery - requestTypePeerID - requestTypeStop - requestTypeDestroy - requestTypeStartDiscV5 - requestTypeStopDiscV5 - requestTypeVersion - requestTypeRelaySubscribe - requestTypeRelayUnsubscribe - requestTypePeerExchangeRequest - requestTypeConnect - requestTypeDialPeerByID - requestTypeListenAddresses - requestTypeENR - requestTypeGetNumPeersInMesh - requestTypeGetConnectedPeers - requestTypeGetPeerIDsFromPeerStore - requestTypeGetPeerIDsByProtocol - requestTypeDisconnectPeerByID - requestTypeDnsDiscovery - requestTypeDialPeer - requestTypeGetNumConnectedRelayPeers -) +// WakuNode represents an instance of an nwaku node +type WakuNode struct { + wakuCtx unsafe.Pointer + cancel context.CancelFunc +} func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) { ctx, cancel := context.WithCancel(ctx) n := &WakuNode{ - requestCh: make(chan *request), - cancel: cancel, + cancel: cancel, } - // Notice this runs insto a separate goroutine. This is because we can't be sure - // from which OS thread will go call nwaku operations (They need to be done from - // the same thread that started nwaku). Communication with the goroutine to send - // operations to nwaku will be done via channels + wg := sync.WaitGroup{} + wg.Add(1) go func() { - // defer gocommon.LogOnPanic() TODO-nwaku + // defer gocommon.LogOnPanic() runtime.LockOSThread() defer runtime.UnlockOSThread() - C.waku_setup() + wg.Done() - n.processLoop(ctx) + <-ctx.Done() }() - _, err := n.postTask(requestTypeNew, config) + wg.Wait() + + jsonConfig, err := json.Marshal(config) if err != nil { - cancel() return nil, err } + + var cJsonConfig = C.CString(string(jsonConfig)) + + var resp = C.allocResp(unsafe.Pointer(&wg)) + + defer C.free(unsafe.Pointer(cJsonConfig)) + defer C.freeResp(resp) + + if C.getRet(resp) != C.RET_OK { + errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) + } + + wg.Add(1) + n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp) + wg.Wait() + + // Notice that the events for self node are handled by the 'MyEventCallback' method + C.cGoWakuSetEventCallback(n.wakuCtx) + return n, nil } @@ -530,20 +518,6 @@ func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) { }, nil } -func (n *WakuNode) postTask(reqType requestType, input any) (any, error) { - responseCh := make(chan response) - n.requestCh <- &request{ - reqType: reqType, - input: input, - responseCh: responseCh, - } - response := <-responseCh - if response.err != nil { - return nil, response.err - } - return response.value, nil -} - //export globalEventCallback func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) { // This is shared among all Golang instances @@ -552,81 +526,6 @@ func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData un // self.MyEventCallback(callerRet, msg, len) } -func (n *WakuNode) processLoop(ctx context.Context) { - for req := range n.requestCh { - switch req.reqType { - case requestTypeNew: - req.responseCh <- response{err: n.newNode(req.input.(*WakuConfig))} - case requestTypePing: - duration, err := n.pingPeer(req.input.(pingRequest)) - req.responseCh <- response{value: duration, err: err} - case requestTypeStart: - req.responseCh <- response{err: n.start()} - case requestTypeRelayPublish: - hash, err := n.relayPublish(req.input.(relayPublishRequest)) - req.responseCh <- response{value: hash, err: err} - case requestTypeStoreQuery: - results, err := n.storeQuery(req.input.(storeQueryRequest)) - req.responseCh <- response{value: results, err: err} - case requestTypeDestroy: - req.responseCh <- response{err: n.destroy()} - case requestTypePeerID: - peerID, err := n.peerID() - req.responseCh <- response{value: peerID, err: err} - case requestTypeStop: - req.responseCh <- response{err: n.stop()} - case requestTypeStartDiscV5: - req.responseCh <- response{err: n.startDiscV5()} - case requestTypeStopDiscV5: - req.responseCh <- response{err: n.stopDiscV5()} - case requestTypeVersion: - version, err := n.version() - req.responseCh <- response{value: version, err: err} - case requestTypePeerExchangeRequest: - numPeers, err := n.peerExchangeRequest(req.input.(uint64)) - req.responseCh <- response{value: numPeers, err: err} - case requestTypeRelaySubscribe: - req.responseCh <- response{err: n.relaySubscribe(req.input.(string))} - case requestTypeRelayUnsubscribe: - req.responseCh <- response{err: n.relayUnsubscribe(req.input.(string))} - case requestTypeConnect: - req.responseCh <- response{err: n.connect(req.input.(connectRequest))} - case requestTypeDialPeerByID: - req.responseCh <- response{err: n.dialPeerById(req.input.(dialPeerByIDRequest))} - case requestTypeListenAddresses: - addrs, err := n.listenAddresses() - req.responseCh <- response{value: addrs, err: err} - case requestTypeENR: - enr, err := n.enr() - req.responseCh <- response{value: enr, err: err} - case requestTypeGetNumPeersInMesh: - numPeers, err := n.getNumPeersInMesh(req.input.(string)) - req.responseCh <- response{value: numPeers, err: err} - case requestTypeGetConnectedPeers: - peers, err := n.getConnectedPeers() - req.responseCh <- response{value: peers, err: err} - case requestTypeGetPeerIDsFromPeerStore: - peers, err := n.getPeerIDsFromPeerStore() - req.responseCh <- response{value: peers, err: err} - case requestTypeGetPeerIDsByProtocol: - peers, err := n.getPeerIDsByProtocol(req.input.(libp2pproto.ID)) - req.responseCh <- response{value: peers, err: err} - case requestTypeDisconnectPeerByID: - req.responseCh <- response{err: n.disconnectPeerByID(req.input.(peer.ID))} - case requestTypeDnsDiscovery: - addrs, err := n.dnsDiscovery(req.input.(dnsDiscoveryRequest)) - req.responseCh <- response{value: addrs, err: err} - case requestTypeDialPeer: - req.responseCh <- response{err: n.dialPeer(req.input.(dialPeerRequest))} - case requestTypeGetNumConnectedRelayPeers: - numPeers, err := n.getNumConnectedRelayPeers(req.input.([]string)...) - req.responseCh <- response{value: numPeers, err: err} - default: - req.responseCh <- response{err: errors.New("invalid operation")} - } - } -} - func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { var pubsubTopic string if len(optPubsubTopic) == 0 {