diff --git a/internal/ffi/doc.go b/internal/ffi/doc.go index a9586c2..de4ccfe 100644 --- a/internal/ffi/doc.go +++ b/internal/ffi/doc.go @@ -1,8 +1,7 @@ -// Package ffi holds the cgo bridge over the logos-delivery C libraries: the -// synchronous request/callback plumbing, the global event callback, and the -// handle registry. It exposes Go-typed primitives so the public packages -// (e.g. messaging) stay pure Go. +// Package ffi groups the cgo bridges over the logos-delivery C libraries. // -// Scaffolding only for now; the Messaging API bindings land here (over -// liblogosdelivery) in a follow-up. +// Each C library gets its own subpackage (libwaku now; liblogosdelivery in a +// follow-up) so that a binary links exactly the libraries it imports — the +// two .so files carry overlapping symbols and must never be linked together +// (until logos-delivery#3851 consolidates them). package ffi diff --git a/internal/ffi/libwaku/libwaku.go b/internal/ffi/libwaku/libwaku.go new file mode 100644 index 0000000..5e8ecd0 --- /dev/null +++ b/internal/ffi/libwaku/libwaku.go @@ -0,0 +1,474 @@ +// Package libwaku is the cgo bridge over libwaku (the legacy Kernel API +// library): the synchronous request/callback plumbing, the global event +// callback, and the handle registry. It exposes Go-typed primitives so +// pkg/kernel stays pure Go. +package libwaku + +/* +#include +#include + +// wakuGoCallback (sync request/response) and wakuEventCallback (async events) +// are implemented in Go and exported below. +extern void wakuGoCallback(int ret, char* msg, size_t len, void* resp); +extern void wakuEventCallback(int ret, char* msg, size_t len, void* userData); + +// wakuResp carries a single synchronous call's result back from the callback, +// plus a pointer to the Go sync.WaitGroup the caller blocks on. +typedef struct { + int ret; + char* msg; + size_t len; + void* wg; +} wakuResp; + +static void* allocWakuResp(void* wg) { + wakuResp* r = (wakuResp*) calloc(1, sizeof(wakuResp)); + r->wg = wg; + return r; +} +static void freeWakuResp(void* resp) { if (resp != NULL) free(resp); } +static char* wakuRespMsg(void* resp) { return resp ? ((wakuResp*)resp)->msg : NULL; } +static size_t wakuRespLen(void* resp) { return resp ? ((wakuResp*)resp)->len : 0; } +static int wakuRespRet(void* resp) { return resp ? ((wakuResp*)resp)->ret : RET_ERR; } + +// Thin wrappers binding the shared Go callback to each libwaku entry point. +static void* cGoWakuNew(const char* configJson, void* resp) { + return waku_new(configJson, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuStart(void* ctx, void* resp) { + waku_start(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuStop(void* ctx, void* resp) { + waku_stop(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuDestroy(void* ctx, void* resp) { + waku_destroy(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuStartDiscV5(void* ctx, void* resp) { + waku_start_discv5(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuStopDiscV5(void* ctx, void* resp) { + waku_stop_discv5(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuVersion(void* ctx, void* resp) { + waku_version(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuSetEventCallback(void* ctx) { + // The ctx doubles as userData so the shared event callback can route the + // event to the right registered handler. + set_event_callback(ctx, (FFICallBack) wakuEventCallback, ctx); +} +static void cGoWakuRelayPublish(void* ctx, const char* pubSubTopic, const char* jsonWakuMessage, int timeoutMs, void* resp) { + waku_relay_publish(ctx, (FFICallBack) wakuGoCallback, resp, pubSubTopic, jsonWakuMessage, timeoutMs); +} +static void cGoWakuRelaySubscribe(void* ctx, const char* pubSubTopic, void* resp) { + waku_relay_subscribe(ctx, (FFICallBack) wakuGoCallback, resp, pubSubTopic); +} +static void cGoWakuRelayAddProtectedShard(void* ctx, int clusterId, int shardId, char* publicKey, void* resp) { + waku_relay_add_protected_shard(ctx, (FFICallBack) wakuGoCallback, resp, clusterId, shardId, publicKey); +} +static void cGoWakuRelayUnsubscribe(void* ctx, const char* pubSubTopic, void* resp) { + waku_relay_unsubscribe(ctx, (FFICallBack) wakuGoCallback, resp, pubSubTopic); +} +static void cGoWakuConnect(void* ctx, const char* peerMultiAddr, int timeoutMs, void* resp) { + waku_connect(ctx, (FFICallBack) wakuGoCallback, resp, peerMultiAddr, timeoutMs); +} +static void cGoWakuDialPeer(void* ctx, const char* peerMultiAddr, const char* protocol, int timeoutMs, void* resp) { + waku_dial_peer(ctx, (FFICallBack) wakuGoCallback, resp, peerMultiAddr, protocol, timeoutMs); +} +static void cGoWakuDialPeerById(void* ctx, const char* peerId, const char* protocol, int timeoutMs, void* resp) { + waku_dial_peer_by_id(ctx, (FFICallBack) wakuGoCallback, resp, peerId, protocol, timeoutMs); +} +static void cGoWakuDisconnectPeerById(void* ctx, const char* peerId, void* resp) { + waku_disconnect_peer_by_id(ctx, (FFICallBack) wakuGoCallback, resp, peerId); +} +static void cGoWakuDisconnectAllPeers(void* ctx, void* resp) { + waku_disconnect_all_peers(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuListenAddresses(void* ctx, void* resp) { + waku_listen_addresses(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuGetMyENR(void* ctx, void* resp) { + waku_get_my_enr(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuGetMyPeerId(void* ctx, void* resp) { + waku_get_my_peerid(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuPingPeer(void* ctx, const char* peerAddr, int timeoutMs, void* resp) { + waku_ping_peer(ctx, (FFICallBack) wakuGoCallback, resp, peerAddr, timeoutMs); +} +static void cGoWakuGetPeersInMesh(void* ctx, const char* pubSubTopic, void* resp) { + waku_relay_get_peers_in_mesh(ctx, (FFICallBack) wakuGoCallback, resp, pubSubTopic); +} +static void cGoWakuGetNumPeersInMesh(void* ctx, const char* pubSubTopic, void* resp) { + waku_relay_get_num_peers_in_mesh(ctx, (FFICallBack) wakuGoCallback, resp, pubSubTopic); +} +static void cGoWakuGetNumConnectedRelayPeers(void* ctx, const char* pubSubTopic, void* resp) { + waku_relay_get_num_connected_peers(ctx, (FFICallBack) wakuGoCallback, resp, pubSubTopic); +} +static void cGoWakuGetConnectedRelayPeers(void* ctx, const char* pubSubTopic, void* resp) { + waku_relay_get_connected_peers(ctx, (FFICallBack) wakuGoCallback, resp, pubSubTopic); +} +static void cGoWakuGetConnectedPeers(void* ctx, void* resp) { + waku_get_connected_peers(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuGetPeerIdsFromPeerStore(void* ctx, void* resp) { + waku_get_peerids_from_peerstore(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuGetConnectedPeersInfo(void* ctx, void* resp) { + waku_get_connected_peers_info(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuStoreQuery(void* ctx, const char* jsonQuery, const char* peerAddr, int timeoutMs, void* resp) { + waku_store_query(ctx, (FFICallBack) wakuGoCallback, resp, jsonQuery, peerAddr, timeoutMs); +} +static void cGoWakuPeerExchangeQuery(void* ctx, uint64_t numPeers, void* resp) { + waku_peer_exchange_request(ctx, (FFICallBack) wakuGoCallback, resp, numPeers); +} +static void cGoWakuGetPeerIdsByProtocol(void* ctx, const char* protocol, void* resp) { + waku_get_peerids_by_protocol(ctx, (FFICallBack) wakuGoCallback, resp, protocol); +} +static void cGoWakuDnsDiscovery(void* ctx, const char* entTreeUrl, const char* nameDnsServer, int timeoutMs, void* resp) { + waku_dns_discovery(ctx, (FFICallBack) wakuGoCallback, resp, entTreeUrl, nameDnsServer, timeoutMs); +} +static void cGoWakuIsOnline(void* ctx, void* resp) { + waku_is_online(ctx, (FFICallBack) wakuGoCallback, resp); +} +static void cGoWakuGetMetrics(void* ctx, void* resp) { + waku_get_metrics(ctx, (FFICallBack) wakuGoCallback, resp); +} +*/ +import "C" + +import ( + "errors" + "sync" + "unsafe" +) + +// Handle is an opaque pointer to a node context owned by the C library. +type Handle = unsafe.Pointer + +// RetOK is the return code callbacks report on success. +const RetOK = C.RET_OK + +// EventHandler receives every event libwaku emits for a node: the raw +// event JSON when ret == RetOK, an error message otherwise. +type EventHandler func(ret int, msg string) + +// eventHandlers maps a node handle to the Go function that receives its +// events. The shared C event callback looks the handler up by handle. +var ( + eventHandlersMu sync.RWMutex + eventHandlers = make(map[Handle]EventHandler) +) + +//export wakuGoCallback +func wakuGoCallback(ret C.int, msg *C.char, length C.size_t, resp unsafe.Pointer) { + if resp == nil { + return + } + r := (*C.wakuResp)(resp) + r.ret = ret + r.msg = msg + r.len = length + wg := (*sync.WaitGroup)(r.wg) + wg.Done() +} + +//export wakuEventCallback +func wakuEventCallback(ret C.int, msg *C.char, length C.size_t, userData unsafe.Pointer) { + eventHandlersMu.RLock() + fn := eventHandlers[userData] // userData carries the node's handle + eventHandlersMu.RUnlock() + if fn != nil { + fn(int(ret), C.GoStringN(msg, C.int(length))) + } +} + +// call runs a synchronous libwaku entry point that reports its result +// through the response callback, blocks until it completes, and returns the +// callback message (on RetOK) or an error built from it. +func call(invoke func(resp unsafe.Pointer)) (string, error) { + var wg sync.WaitGroup + wg.Add(1) + resp := C.allocWakuResp(unsafe.Pointer(&wg)) + defer C.freeWakuResp(resp) + + invoke(resp) + wg.Wait() + + msg := C.GoStringN(C.wakuRespMsg(resp), C.int(C.wakuRespLen(resp))) + if C.wakuRespRet(resp) != C.RET_OK { + return "", errors.New(msg) + } + return msg, nil +} + +// New builds a node from a WakuConfig JSON string and returns its handle. +// The handle must be released with Destroy. +func New(configJSON string) (Handle, error) { + cCfg := C.CString(configJSON) + defer C.free(unsafe.Pointer(cCfg)) + + var wg sync.WaitGroup + wg.Add(1) + resp := C.allocWakuResp(unsafe.Pointer(&wg)) + defer C.freeWakuResp(resp) + + ctx := C.cGoWakuNew(cCfg, resp) + wg.Wait() + + if C.wakuRespRet(resp) != C.RET_OK || ctx == nil { + msg := C.GoStringN(C.wakuRespMsg(resp), C.int(C.wakuRespLen(resp))) + if msg == "" { + msg = "waku_new returned no context" + } + return nil, errors.New(msg) + } + return Handle(ctx), nil +} + +// SetEventHandler registers fn to receive events for the node and wires up +// the underlying C event callback. +func SetEventHandler(h Handle, fn EventHandler) { + eventHandlersMu.Lock() + eventHandlers[h] = fn + eventHandlersMu.Unlock() + C.cGoWakuSetEventCallback(h) +} + +// Start starts the node. +func Start(h Handle) error { + _, err := call(func(resp unsafe.Pointer) { C.cGoWakuStart(h, resp) }) + return err +} + +// Stop stops the node. +func Stop(h Handle) error { + _, err := call(func(resp unsafe.Pointer) { C.cGoWakuStop(h, resp) }) + return err +} + +// Destroy releases the node context and unregisters its event handler. +func Destroy(h Handle) error { + _, err := call(func(resp unsafe.Pointer) { C.cGoWakuDestroy(h, resp) }) + if err == nil { + eventHandlersMu.Lock() + delete(eventHandlers, h) + eventHandlersMu.Unlock() + } + return err +} + +// StartDiscV5 starts DiscV5 peer discovery. +func StartDiscV5(h Handle) error { + _, err := call(func(resp unsafe.Pointer) { C.cGoWakuStartDiscV5(h, resp) }) + return err +} + +// StopDiscV5 stops DiscV5 peer discovery. +func StopDiscV5(h Handle) error { + _, err := call(func(resp unsafe.Pointer) { C.cGoWakuStopDiscV5(h, resp) }) + return err +} + +// Version returns the libwaku version string. +func Version(h Handle) (string, error) { + return call(func(resp unsafe.Pointer) { C.cGoWakuVersion(h, resp) }) +} + +// RelayPublish publishes a WakuMessage JSON on a pubsub topic and returns +// the message hash. +func RelayPublish(h Handle, pubsubTopic, messageJSON string, timeoutMs int) (string, error) { + cTopic := C.CString(pubsubTopic) + cMsg := C.CString(messageJSON) + defer C.free(unsafe.Pointer(cTopic)) + defer C.free(unsafe.Pointer(cMsg)) + return call(func(resp unsafe.Pointer) { C.cGoWakuRelayPublish(h, cTopic, cMsg, C.int(timeoutMs), resp) }) +} + +// RelaySubscribe subscribes the node to a pubsub topic. +func RelaySubscribe(h Handle, pubsubTopic string) error { + cTopic := C.CString(pubsubTopic) + defer C.free(unsafe.Pointer(cTopic)) + _, err := call(func(resp unsafe.Pointer) { C.cGoWakuRelaySubscribe(h, cTopic, resp) }) + return err +} + +// RelayAddProtectedShard registers the hex-encoded public key allowed to +// sign messages on a protected shard. +func RelayAddProtectedShard(h Handle, clusterID, shardID int, publicKeyHex string) error { + cPublicKey := C.CString(publicKeyHex) + defer C.free(unsafe.Pointer(cPublicKey)) + _, err := call(func(resp unsafe.Pointer) { + C.cGoWakuRelayAddProtectedShard(h, C.int(clusterID), C.int(shardID), cPublicKey, resp) + }) + return err +} + +// RelayUnsubscribe unsubscribes the node from a pubsub topic. +func RelayUnsubscribe(h Handle, pubsubTopic string) error { + cTopic := C.CString(pubsubTopic) + defer C.free(unsafe.Pointer(cTopic)) + _, err := call(func(resp unsafe.Pointer) { C.cGoWakuRelayUnsubscribe(h, cTopic, resp) }) + return err +} + +// Connect dials a peer multiaddress. +func Connect(h Handle, peerMultiAddr string, timeoutMs int) error { + cAddr := C.CString(peerMultiAddr) + defer C.free(unsafe.Pointer(cAddr)) + _, err := call(func(resp unsafe.Pointer) { C.cGoWakuConnect(h, cAddr, C.int(timeoutMs), resp) }) + return err +} + +// DialPeer dials a peer multiaddress over a specific protocol. +func DialPeer(h Handle, peerMultiAddr, protocol string, timeoutMs int) error { + cAddr := C.CString(peerMultiAddr) + cProtocol := C.CString(protocol) + defer C.free(unsafe.Pointer(cAddr)) + defer C.free(unsafe.Pointer(cProtocol)) + _, err := call(func(resp unsafe.Pointer) { C.cGoWakuDialPeer(h, cAddr, cProtocol, C.int(timeoutMs), resp) }) + return err +} + +// DialPeerByID dials a known peer id over a specific protocol. +func DialPeerByID(h Handle, peerID, protocol string, timeoutMs int) error { + cPeerID := C.CString(peerID) + cProtocol := C.CString(protocol) + defer C.free(unsafe.Pointer(cPeerID)) + defer C.free(unsafe.Pointer(cProtocol)) + _, err := call(func(resp unsafe.Pointer) { C.cGoWakuDialPeerById(h, cPeerID, cProtocol, C.int(timeoutMs), resp) }) + return err +} + +// DisconnectPeerByID drops the connection to a peer. +func DisconnectPeerByID(h Handle, peerID string) error { + cPeerID := C.CString(peerID) + defer C.free(unsafe.Pointer(cPeerID)) + _, err := call(func(resp unsafe.Pointer) { C.cGoWakuDisconnectPeerById(h, cPeerID, resp) }) + return err +} + +// DisconnectAllPeers drops all peer connections. +func DisconnectAllPeers(h Handle) error { + _, err := call(func(resp unsafe.Pointer) { C.cGoWakuDisconnectAllPeers(h, resp) }) + return err +} + +// ListenAddresses returns the node's listen multiaddresses as a +// comma-separated list. +func ListenAddresses(h Handle) (string, error) { + return call(func(resp unsafe.Pointer) { C.cGoWakuListenAddresses(h, resp) }) +} + +// GetMyENR returns the node's ENR record. +func GetMyENR(h Handle) (string, error) { + return call(func(resp unsafe.Pointer) { C.cGoWakuGetMyENR(h, resp) }) +} + +// GetMyPeerID returns the node's peer id. +func GetMyPeerID(h Handle) (string, error) { + return call(func(resp unsafe.Pointer) { C.cGoWakuGetMyPeerId(h, resp) }) +} + +// PingPeer pings a peer (comma-separated multiaddresses) and returns the +// round-trip time in nanoseconds. +func PingPeer(h Handle, peerAddrs string, timeoutMs int) (string, error) { + cAddr := C.CString(peerAddrs) + defer C.free(unsafe.Pointer(cAddr)) + return call(func(resp unsafe.Pointer) { C.cGoWakuPingPeer(h, cAddr, C.int(timeoutMs), resp) }) +} + +// GetPeersInMesh returns the relay mesh peer ids for a pubsub topic as a +// comma-separated list. +func GetPeersInMesh(h Handle, pubsubTopic string) (string, error) { + cTopic := C.CString(pubsubTopic) + defer C.free(unsafe.Pointer(cTopic)) + return call(func(resp unsafe.Pointer) { C.cGoWakuGetPeersInMesh(h, cTopic, resp) }) +} + +// GetNumPeersInMesh returns the relay mesh peer count for a pubsub topic. +func GetNumPeersInMesh(h Handle, pubsubTopic string) (string, error) { + cTopic := C.CString(pubsubTopic) + defer C.free(unsafe.Pointer(cTopic)) + return call(func(resp unsafe.Pointer) { C.cGoWakuGetNumPeersInMesh(h, cTopic, resp) }) +} + +// GetNumConnectedRelayPeers returns the connected relay peer count for a +// pubsub topic. +func GetNumConnectedRelayPeers(h Handle, pubsubTopic string) (string, error) { + cTopic := C.CString(pubsubTopic) + defer C.free(unsafe.Pointer(cTopic)) + return call(func(resp unsafe.Pointer) { C.cGoWakuGetNumConnectedRelayPeers(h, cTopic, resp) }) +} + +// GetConnectedRelayPeers returns the connected relay peer ids for a pubsub +// topic as a comma-separated list. +func GetConnectedRelayPeers(h Handle, pubsubTopic string) (string, error) { + cTopic := C.CString(pubsubTopic) + defer C.free(unsafe.Pointer(cTopic)) + return call(func(resp unsafe.Pointer) { C.cGoWakuGetConnectedRelayPeers(h, cTopic, resp) }) +} + +// GetConnectedPeers returns the connected peer ids as a comma-separated +// list. +func GetConnectedPeers(h Handle) (string, error) { + return call(func(resp unsafe.Pointer) { C.cGoWakuGetConnectedPeers(h, resp) }) +} + +// GetPeerIDsFromPeerStore returns the peer-store peer ids as a +// comma-separated list. +func GetPeerIDsFromPeerStore(h Handle) (string, error) { + return call(func(resp unsafe.Pointer) { C.cGoWakuGetPeerIdsFromPeerStore(h, resp) }) +} + +// GetConnectedPeersInfo returns the connected peers' info as JSON. +func GetConnectedPeersInfo(h Handle) (string, error) { + return call(func(resp unsafe.Pointer) { C.cGoWakuGetConnectedPeersInfo(h, resp) }) +} + +// StoreQuery runs a store query (JSON) against a peer (comma-separated +// multiaddresses) and returns the response JSON. +func StoreQuery(h Handle, queryJSON, peerAddrs string, timeoutMs int) (string, error) { + cQuery := C.CString(queryJSON) + cAddr := C.CString(peerAddrs) + defer C.free(unsafe.Pointer(cQuery)) + defer C.free(unsafe.Pointer(cAddr)) + return call(func(resp unsafe.Pointer) { C.cGoWakuStoreQuery(h, cQuery, cAddr, C.int(timeoutMs), resp) }) +} + +// PeerExchangeRequest asks peer exchange for numPeers peers and returns +// the number of received peers. +func PeerExchangeRequest(h Handle, numPeers uint64) (string, error) { + return call(func(resp unsafe.Pointer) { C.cGoWakuPeerExchangeQuery(h, C.uint64_t(numPeers), resp) }) +} + +// GetPeerIDsByProtocol returns the peer ids supporting a protocol as a +// comma-separated list. +func GetPeerIDsByProtocol(h Handle, protocol string) (string, error) { + cProtocol := C.CString(protocol) + defer C.free(unsafe.Pointer(cProtocol)) + return call(func(resp unsafe.Pointer) { C.cGoWakuGetPeerIdsByProtocol(h, cProtocol, resp) }) +} + +// DnsDiscovery resolves an ENR tree URL via DNS discovery and returns the +// discovered multiaddresses as a comma-separated list. +func DnsDiscovery(h Handle, enrTreeURL, nameDNSServer string, timeoutMs int) (string, error) { + cEnrTree := C.CString(enrTreeURL) + cDNSServer := C.CString(nameDNSServer) + defer C.free(unsafe.Pointer(cEnrTree)) + defer C.free(unsafe.Pointer(cDNSServer)) + return call(func(resp unsafe.Pointer) { C.cGoWakuDnsDiscovery(h, cEnrTree, cDNSServer, C.int(timeoutMs), resp) }) +} + +// IsOnline reports the node's online state ("true"/"false"). +func IsOnline(h Handle) (string, error) { + return call(func(resp unsafe.Pointer) { C.cGoWakuIsOnline(h, resp) }) +} + +// GetMetrics returns the node's metrics in Prometheus text format. +func GetMetrics(h Handle) (string, error) { + return call(func(resp unsafe.Pointer) { C.cGoWakuGetMetrics(h, resp) }) +} diff --git a/pkg/kernel/nwaku.go b/pkg/kernel/nwaku.go index cc81114..42baeba 100644 --- a/pkg/kernel/nwaku.go +++ b/pkg/kernel/nwaku.go @@ -1,343 +1,5 @@ package kernel -/* - #include - #include - #include - - extern void wakuGlobalEventCallback(int ret, char* msg, size_t len, void* userData); - - typedef struct { - int ret; - char* msg; - size_t len; - void* ffiWg; - } WakuResp; - - static void* allocResp(void* wg) { - WakuResp* r = calloc(1, sizeof(WakuResp)); - r->ffiWg = wg; - return r; - } - - static void freeResp(void* resp) { - if (resp != NULL) { - free(resp); - } - } - - static char* getMyCharPtr(void* resp) { - if (resp == NULL) { - return NULL; - } - WakuResp* m = (WakuResp*) resp; - return m->msg; - } - - static size_t getMyCharLen(void* resp) { - if (resp == NULL) { - return 0; - } - WakuResp* m = (WakuResp*) resp; - return m->len; - } - - static int getRet(void* resp) { - if (resp == NULL) { - return 0; - } - WakuResp* m = (WakuResp*) resp; - return m->ret; - } - - // resp must be set != NULL in case interest on retrieving data from the callback - void WakuGoCallback(int ret, char* msg, size_t len, void* resp); - - static void* cGoWakuNew(const char* configJson, void* resp) { - // We pass NULL because we are not interested in retrieving data from this callback - void* ret = waku_new(configJson, (FFICallBack) WakuGoCallback, resp); - return ret; - } - - static void cGoWakuStart(void* wakuCtx, void* resp) { - waku_start(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuStop(void* wakuCtx, void* resp) { - waku_stop(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuDestroy(void* wakuCtx, void* resp) { - waku_destroy(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { - waku_start_discv5(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { - waku_stop_discv5(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuVersion(void* wakuCtx, void* resp) { - waku_version(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuSetEventCallback(void* wakuCtx) { - // The 'wakuGlobalEventCallback' Go function is shared amongst all possible Waku instances. - - // Given that the 'wakuGlobalEventCallback' is shared, we pass again the - // wakuCtx instance but in this case is needed to pick up the correct method - // that will handle the event. - - // In other words, for every call the libwaku makes to wakuGlobalEventCallback, - // the 'userData' parameter will bring the context of the node that registered - // that wakuGlobalEventCallback. - - // This technique is needed because cgo only allows to export Go functions and not methods. - - set_event_callback(wakuCtx, (FFICallBack) wakuGlobalEventCallback, wakuCtx); - } - - static void cGoWakuContentTopic(void* wakuCtx, - char* appName, - int appVersion, - char* contentTopicName, - char* encoding, - void* resp) { - - waku_content_topic(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - appName, - appVersion, - contentTopicName, - encoding - ); - } - - static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { - waku_pubsub_topic(wakuCtx, (FFICallBack) WakuGoCallback, resp, topicName); - } - - static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { - waku_default_pubsub_topic(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuRelayPublish(void* wakuCtx, - const char* pubSubTopic, - const char* jsonWakuMessage, - int timeoutMs, - void* resp) { - - waku_relay_publish(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - pubSubTopic, - jsonWakuMessage, - timeoutMs - ); - } - - static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { - waku_relay_subscribe(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - pubSubTopic - ); - } - - static void cGoWakuRelayAddProtectedShard(void* wakuCtx, int clusterId, int shardId, char* publicKey, void* resp) { - waku_relay_add_protected_shard(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - clusterId, - shardId, - publicKey - ); - } - - static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { - - waku_relay_unsubscribe(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - pubSubTopic - ); - } - - static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) { - waku_connect(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - peerMultiAddr, - timeoutMs - ); - } - - static void cGoWakuDialPeer(void* wakuCtx, - char* peerMultiAddr, - char* protocol, - int timeoutMs, - void* resp) { - - waku_dial_peer(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - peerMultiAddr, - protocol, - timeoutMs - ); - } - - static void cGoWakuDialPeerById(void* wakuCtx, - char* peerId, - char* protocol, - int timeoutMs, - void* resp) { - - waku_dial_peer_by_id(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - peerId, - protocol, - timeoutMs - ); - } - - static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { - waku_disconnect_peer_by_id(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - peerId - ); - } - - static void cGoWakuDisconnectAllPeers(void* wakuCtx, void* resp) { - waku_disconnect_all_peers(wakuCtx, - (FFICallBack) WakuGoCallback, - resp); - } - - static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { - waku_listen_addresses(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuGetMyENR(void* ctx, void* resp) { - waku_get_my_enr(ctx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuGetMyPeerId(void* ctx, void* resp) { - waku_get_my_peerid(ctx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) { - waku_ping_peer(ctx, (FFICallBack) WakuGoCallback, resp, peerAddr, timeoutMs); - } - - static void cGoWakuGetPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { - waku_relay_get_peers_in_mesh(ctx, (FFICallBack) WakuGoCallback, resp, pubSubTopic); - } - - static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { - waku_relay_get_num_peers_in_mesh(ctx, (FFICallBack) WakuGoCallback, resp, pubSubTopic); - } - - static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { - waku_relay_get_num_connected_peers(ctx, (FFICallBack) WakuGoCallback, resp, pubSubTopic); - } - - static void cGoWakuGetConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { - waku_relay_get_connected_peers(ctx, (FFICallBack) WakuGoCallback, resp, pubSubTopic); - } - - static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { - waku_get_connected_peers(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { - waku_get_peerids_from_peerstore(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuGetConnectedPeersInfo(void* wakuCtx, void* resp) { - waku_get_connected_peers_info(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuLightpushPublish(void* wakuCtx, - const char* pubSubTopic, - const char* jsonWakuMessage, - void* resp) { - - waku_lightpush_publish(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - pubSubTopic, - jsonWakuMessage - ); - } - - static void cGoWakuStoreQuery(void* wakuCtx, - const char* jsonQuery, - const char* peerAddr, - int timeoutMs, - void* resp) { - - waku_store_query(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - jsonQuery, - peerAddr, - timeoutMs - ); - } - - static void cGoWakuPeerExchangeQuery(void* wakuCtx, - uint64_t numPeers, - void* resp) { - - waku_peer_exchange_request(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - numPeers - ); - } - - static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx, - const char* protocol, - void* resp) { - - waku_get_peerids_by_protocol(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - protocol - ); - } - - static void cGoWakuDnsDiscovery(void* wakuCtx, - const char* entTreeUrl, - const char* nameDnsServer, - int timeoutMs, - void* resp) { - - waku_dns_discovery(wakuCtx, - (FFICallBack) WakuGoCallback, - resp, - entTreeUrl, - nameDnsServer, - timeoutMs - ); - } - - static void cGoWakuIsOnline(void* wakuCtx, void* resp) { - waku_is_online(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - - static void cGoWakuGetMetrics(void* wakuCtx, void* resp) { - waku_get_metrics(wakuCtx, (FFICallBack) WakuGoCallback, resp); - } - -*/ -import "C" import ( "context" "crypto/ecdsa" @@ -348,10 +10,9 @@ import ( "net" "strconv" "strings" - "sync" "time" - "unsafe" + "github.com/logos-messaging/logos-delivery-go-bindings/internal/ffi/libwaku" "github.com/logos-messaging/logos-delivery-go-bindings/pkg/kernel/timesource" "github.com/ethereum/go-ethereum/crypto" @@ -370,21 +31,9 @@ const MsgChanBufferSize = 1024 const TopicHealthChanBufferSize = 1024 const ConnectionChangeChanBufferSize = 1024 -//export WakuGoCallback -func WakuGoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { - if resp != nil { - m := (*C.WakuResp)(resp) - m.ret = ret - m.msg = msg - m.len = len - wg := (*sync.WaitGroup)(m.ffiWg) - wg.Done() - } -} - // WakuNode represents an instance of an nwaku node type WakuNode struct { - wakuCtx unsafe.Pointer + wakuCtx libwaku.Handle config *common.WakuConfig MsgChan chan common.Envelope TopicHealthChan chan topicHealth @@ -400,78 +49,37 @@ func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error) nodeName: nodeName, } - wg := sync.WaitGroup{} - jsonConfig, err := json.Marshal(config) if err != nil { return nil, err } - var cJsonConfig = C.CString(string(jsonConfig)) - var resp = C.allocResp(unsafe.Pointer(&wg)) - - defer C.free(unsafe.Pointer(cJsonConfig)) - defer C.freeResp(resp) - - wg.Add(1) - n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp) - wg.Wait() - - if C.getRet(resp) != C.RET_OK { - errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("error wakuNew for %s: %v", nodeName, errMsg) - return nil, errors.New(errMsg) + n.wakuCtx, err = libwaku.New(string(jsonConfig)) + if err != nil { + Error("error wakuNew for %s: %v", nodeName, err) + return nil, err } n.MsgChan = make(chan common.Envelope, MsgChanBufferSize) n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize) n.ConnectionChangeChan = make(chan connectionChange, ConnectionChangeChanBufferSize) - C.cGoWakuSetEventCallback(n.wakuCtx) - registerNode(n) + libwaku.SetEventHandler(n.wakuCtx, n.onRawEvent) Debug("Successfully created WakuNode: %s", nodeName) return n, nil } -// The event callback sends back the node's ctx to know to which -// node is the event being emited for. Since we only have a global -// callback in the go side, We register all the nodes that we create -// so we can later obtain which instance of `WakuNode` is should -// be invoked depending on the ctx received - -var nodeRegistry map[unsafe.Pointer]*WakuNode - -func init() { - nodeRegistry = make(map[unsafe.Pointer]*WakuNode) -} - -func registerNode(node *WakuNode) { - _, ok := nodeRegistry[node.wakuCtx] - if !ok { - nodeRegistry[node.wakuCtx] = node +// onRawEvent receives every libwaku event for this node from the ffi bridge. +func (n *WakuNode) onRawEvent(ret int, msg string) { + if ret == libwaku.RetOK { + n.OnEvent(msg) + return } -} - -func unregisterNode(node *WakuNode) { - delete(nodeRegistry, node.wakuCtx) -} - -//export wakuGlobalEventCallback -func wakuGlobalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) { - if callerRet == C.RET_OK { - eventStr := C.GoStringN(msg, C.int(len)) - node, ok := nodeRegistry[userData] // userData contains node's ctx - if ok { - node.OnEvent(eventStr) - } + if msg != "" { + Error("wakuEventCallback retCode not ok, retCode: %v: %v", ret, msg) } else { - if len != 0 { - errMsg := C.GoStringN(msg, C.int(len)) - Error("wakuGlobalEventCallback retCode not ok, retCode: %v: %v", callerRet, errMsg) - } else { - Error("wakuGlobalEventCallback retCode not ok, retCode: %v", callerRet) - } + Error("wakuEventCallback retCode not ok, retCode: %v", ret) } } @@ -560,32 +168,20 @@ func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, err pubsubTopic = optPubsubTopic[0] } - wg := sync.WaitGroup{} - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - var cPubsubTopic = C.CString(pubsubTopic) - defer C.free(unsafe.Pointer(cPubsubTopic)) - - wg.Add(1) - C.cGoWakuGetNumConnectedRelayPeers(n.wakuCtx, cPubsubTopic, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - numPeers, err := strconv.Atoi(numPeersStr) - if err != nil { - Error("Failed to convert relay peer count for %s: %v", n.nodeName, err) - return 0, err - } - Debug("Successfully fetched number of connected relay peers for %s: %d", n.nodeName, numPeers) - return numPeers, nil + numPeersStr, err := libwaku.GetNumConnectedRelayPeers(n.wakuCtx, pubsubTopic) + if err != nil { + errMsg := "error GetNumConnectedRelayPeers: " + err.Error() + Error("Failed to get number of connected relay peers for %s: %s", n.nodeName, errMsg) + return 0, errors.New(errMsg) } - errMsg := "error GetNumConnectedRelayPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to get number of connected relay peers for %s: %s", n.nodeName, errMsg) - - return 0, errors.New(errMsg) + numPeers, err := strconv.Atoi(numPeersStr) + if err != nil { + Error("Failed to convert relay peer count for %s: %v", n.nodeName, err) + return 0, err + } + Debug("Successfully fetched number of connected relay peers for %s: %d", n.nodeName, numPeers) + return numPeers, nil } func (n *WakuNode) GetConnectedRelayPeers(optPubsubTopic ...string) (peer.IDSlice, error) { @@ -603,79 +199,45 @@ func (n *WakuNode) GetConnectedRelayPeers(optPubsubTopic ...string) (peer.IDSlic Debug("Fetching connected relay peers for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName) - wg := sync.WaitGroup{} - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - var cPubsubTopic = C.CString(pubsubTopic) - defer C.free(unsafe.Pointer(cPubsubTopic)) - - wg.Add(1) - C.cGoWakuGetConnectedRelayPeers(n.wakuCtx, cPubsubTopic, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - if peersStr == "" { - Debug("No connected relay peers found for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName) - return nil, nil - } - - peerIDs := strings.Split(peersStr, ",") - var peers peer.IDSlice - for _, peerID := range peerIDs { - id, err := peer.Decode(peerID) - if err != nil { - Error("Failed to decode peer ID for %v: %v", n.nodeName, err) - return nil, err - } - peers = append(peers, id) - } - - Debug("Successfully fetched connected relay peers for pubsubTopic: %v, node: %v count: %v", pubsubTopic, n.nodeName, len(peers)) - return peers, nil + peersStr, err := libwaku.GetConnectedRelayPeers(n.wakuCtx, pubsubTopic) + if err != nil { + errMsg := "error GetConnectedRelayPeers: " + err.Error() + Error("Failed to get connected relay peers for pubsubTopic: %v:, node: %v. %v", pubsubTopic, n.nodeName, errMsg) + return nil, errors.New(errMsg) } - errMsg := "error GetConnectedRelayPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to get connected relay peers for pubsubTopic: %v:, node: %v. %v", pubsubTopic, n.nodeName, errMsg) + if peersStr == "" { + Debug("No connected relay peers found for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName) + return nil, nil + } - return nil, errors.New(errMsg) + peerIDs := strings.Split(peersStr, ",") + var peers peer.IDSlice + for _, peerID := range peerIDs { + id, err := peer.Decode(peerID) + if err != nil { + Error("Failed to decode peer ID for %v: %v", n.nodeName, err) + return nil, err + } + peers = append(peers, id) + } + + Debug("Successfully fetched connected relay peers for pubsubTopic: %v, node: %v count: %v", pubsubTopic, n.nodeName, len(peers)) + return peers, nil } func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPeerId = C.CString(peerID.String()) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPeerId)) - - wg.Add(1) - C.cGoWakuDisconnectPeerById(n.wakuCtx, cPeerId, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - return nil + if err := libwaku.DisconnectPeerByID(n.wakuCtx, peerID.String()); err != nil { + return fmt.Errorf("error DisconnectPeerById: %w", err) } - errMsg := "error DisconnectPeerById: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + return nil } func (n *WakuNode) DisconnectAllPeers() error { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuDisconnectAllPeers(n.wakuCtx, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - return nil + if err := libwaku.DisconnectAllPeers(n.wakuCtx); err != nil { + return fmt.Errorf("error DisconnectAllPeers: %w", err) } - errMsg := "error DisconnectAllPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + return nil } func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { @@ -687,40 +249,31 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { Debug("Fetching connected peers for %v", n.nodeName) - wg := sync.WaitGroup{} - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuGetConnectedPeers(n.wakuCtx, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - if peersStr == "" { - Debug("No connected peers found for %v", n.nodeName) - return nil, nil - } - - peerIDs := strings.Split(peersStr, ",") - var peers peer.IDSlice - for _, peerID := range peerIDs { - id, err := peer.Decode(peerID) - if err != nil { - Error("Failed to decode peer ID for %v: %v", n.nodeName, err) - return nil, err - } - peers = append(peers, id) - } - - Debug("Successfully fetched connected peers for %v, count: %v", n.nodeName, len(peers)) - return peers, nil + peersStr, err := libwaku.GetConnectedPeers(n.wakuCtx) + if err != nil { + errMsg := "error GetConnectedPeers: " + err.Error() + Error("Failed to get connected peers for %v: %v", n.nodeName, errMsg) + return nil, errors.New(errMsg) } - errMsg := "error GetConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to get connected peers for %v: %v", n.nodeName, errMsg) + if peersStr == "" { + Debug("No connected peers found for %v", n.nodeName) + return nil, nil + } - return nil, errors.New(errMsg) + peerIDs := strings.Split(peersStr, ",") + var peers peer.IDSlice + for _, peerID := range peerIDs { + id, err := peer.Decode(peerID) + if err != nil { + Error("Failed to decode peer ID for %v: %v", n.nodeName, err) + return nil, err + } + peers = append(peers, id) + } + + Debug("Successfully fetched connected peers for %v, count: %v", n.nodeName, len(peers)) + return peers, nil } func (n *WakuNode) GetPeersInMesh(pubsubTopic string) (peer.IDSlice, error) { @@ -732,43 +285,31 @@ func (n *WakuNode) GetPeersInMesh(pubsubTopic string) (peer.IDSlice, error) { Debug("Fetching peers in mesh peers for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName) - wg := sync.WaitGroup{} - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - var cPubsubTopic = C.CString(pubsubTopic) - defer C.free(unsafe.Pointer(cPubsubTopic)) - - wg.Add(1) - C.cGoWakuGetPeersInMesh(n.wakuCtx, cPubsubTopic, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - if peersStr == "" { - Debug("No peers in mesh found for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName) - return nil, nil - } - - peerIDs := strings.Split(peersStr, ",") - var peers peer.IDSlice - for _, peerID := range peerIDs { - id, err := peer.Decode(peerID) - if err != nil { - Error("Failed to decode peer ID for %v: %v", n.nodeName, err) - return nil, err - } - peers = append(peers, id) - } - - Debug("Successfully fetched mesh peers for pubsubTopic: %v, node: %v count: %v", pubsubTopic, n.nodeName, len(peers)) - return peers, nil + peersStr, err := libwaku.GetPeersInMesh(n.wakuCtx, pubsubTopic) + if err != nil { + errMsg := "error GetPeersInMesh: " + err.Error() + Error("Failed to get peers in mesh for pubsubTopic: %v:, node: %v. %v", pubsubTopic, n.nodeName, errMsg) + return nil, errors.New(errMsg) } - errMsg := "error GetPeersInMesh: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to get peers in mesh for pubsubTopic: %v:, node: %v. %v", pubsubTopic, n.nodeName, errMsg) + if peersStr == "" { + Debug("No peers in mesh found for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName) + return nil, nil + } - return nil, errors.New(errMsg) + peerIDs := strings.Split(peersStr, ",") + var peers peer.IDSlice + for _, peerID := range peerIDs { + id, err := peer.Decode(peerID) + if err != nil { + Error("Failed to decode peer ID for %v: %v", n.nodeName, err) + return nil, err + } + peers = append(peers, id) + } + + Debug("Successfully fetched mesh peers for pubsubTopic: %v, node: %v count: %v", pubsubTopic, n.nodeName, len(peers)) + return peers, nil } func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { @@ -776,30 +317,17 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { return errors.New("pubsub topic is empty") } - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPubsubTopic = C.CString(pubsubTopic) - - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPubsubTopic)) - if n.wakuCtx == nil { return errors.New("wakuCtx is nil") } - wg.Add(1) - C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) - return nil + if err := libwaku.RelaySubscribe(n.wakuCtx, pubsubTopic); err != nil { + Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, err) + return fmt.Errorf("error WakuRelaySubscribe: %w", err) } - errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) - return errors.New("error WakuRelaySubscribe: " + errMsg) + Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) + return nil } func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubkey *ecdsa.PublicKey) error { @@ -807,30 +335,16 @@ func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubk return errors.New("error WakuRelayAddProtectedShard: pubkey can't be nil") } - keyHexStr := hex.EncodeToString(crypto.FromECDSAPub(pubkey)) - - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPublicKey = C.CString(keyHexStr) - - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPublicKey)) - if n.wakuCtx == nil { return errors.New("wakuCtx is nil") } - wg.Add(1) - C.cGoWakuRelayAddProtectedShard(n.wakuCtx, C.int(clusterId), C.int(shardId), cPublicKey, resp) - wg.Wait() + keyHexStr := hex.EncodeToString(crypto.FromECDSAPub(pubkey)) - if C.getRet(resp) == C.RET_OK { - return nil + if err := libwaku.RelayAddProtectedShard(n.wakuCtx, int(clusterId), int(shardId), keyHexStr); err != nil { + return fmt.Errorf("error WakuRelayAddProtectedShard: %w", err) } - - errMsg := "error WakuRelayAddProtectedShard: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + return nil } func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { @@ -840,117 +354,67 @@ func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { return err } - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPubsubTopic = C.CString(pubsubTopic) - - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPubsubTopic)) - if n.wakuCtx == nil { return errors.New("wakuCtx is nil") } - wg.Add(1) Debug("Attempting to unsubscribe from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) - C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - - Debug("Successfully unsubscribed from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) - return nil + if err := libwaku.RelayUnsubscribe(n.wakuCtx, pubsubTopic); err != nil { + Error("Failed to unsubscribe from relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, err) + return fmt.Errorf("error WakuRelayUnsubscribe: %w", err) } - errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to unsubscribe from relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) - return errors.New("error WakuRelayUnsubscribe: " + errMsg) + Debug("Successfully unsubscribed from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) + return nil } func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuPeerExchangeQuery(n.wakuCtx, C.uint64_t(numPeers), resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) - if err != nil { - Error("Failed to parse number of received peers: %v", err) - return 0, err - } - return numRecvPeers, nil + numRecvPeersStr, err := libwaku.PeerExchangeRequest(n.wakuCtx, numPeers) + if err != nil { + Error("PeerExchangeRequest failed: %v", err) + return 0, err } - errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("PeerExchangeRequest failed: %v", errMsg) - return 0, errors.New(errMsg) + numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) + if err != nil { + Error("Failed to parse number of received peers: %v", err) + return 0, err + } + return numRecvPeers, nil } func (n *WakuNode) StartDiscV5() error { Debug("Starting DiscV5 for node: %s", n.nodeName) - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuStartDiscV5(n.wakuCtx, resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - Debug("Successfully started DiscV5 for node: %s", n.nodeName) - return nil + if err := libwaku.StartDiscV5(n.wakuCtx); err != nil { + errMsg := "error WakuStartDiscV5: " + err.Error() + Error("Failed to start DiscV5 for node %s: %v", n.nodeName, errMsg) + return errors.New(errMsg) } - errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to start DiscV5 for node %s: %v", n.nodeName, errMsg) - return errors.New(errMsg) + Debug("Successfully started DiscV5 for node: %s", n.nodeName) + return nil } func (n *WakuNode) StopDiscV5() error { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuStopDiscV5(n.wakuCtx, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - Debug("Successfully stopped DiscV5 for node: %s", n.nodeName) - return nil + if err := libwaku.StopDiscV5(n.wakuCtx); err != nil { + errMsg := "error WakuStopDiscV5: " + err.Error() + Error("Failed to stop DiscV5 for node %s: %v", n.nodeName, errMsg) + return errors.New(errMsg) } - errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to stop DiscV5 for node %s: %v", n.nodeName, errMsg) - return errors.New(errMsg) + Debug("Successfully stopped DiscV5 for node: %s", n.nodeName) + return nil } func (n *WakuNode) Version() (string, error) { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuVersion(n.wakuCtx, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Debug("Successfully fetched Waku version for node %s: %s", n.nodeName, version) - return version, nil + version, err := libwaku.Version(n.wakuCtx) + if err != nil { + errMsg := "error WakuVersion: " + err.Error() + Error("Failed to fetch Waku version for node %s: %v", n.nodeName, errMsg) + return "", errors.New(errMsg) } - errMsg := "error WakuVersion: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to fetch Waku version for node %s: %v", n.nodeName, errMsg) - return "", errors.New(errMsg) + Debug("Successfully fetched Waku version for node %s: %s", n.nodeName, version) + return version, nil } func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *common.StoreQueryRequest, peerInfo peer.AddrInfo) (*common.StoreQueryResponse, error) { @@ -966,32 +430,17 @@ func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *common.StoreQue addrs[i] = addr.String() } - var cJsonQuery = C.CString(string(b)) - var cPeerAddr = C.CString(strings.Join(addrs, ",")) - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - - defer C.free(unsafe.Pointer(cJsonQuery)) - defer C.free(unsafe.Pointer(cPeerAddr)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuStoreQuery(n.wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - storeQueryResponse := common.StoreQueryResponse{} - err = json.Unmarshal([]byte(jsonResponseStr), &storeQueryResponse) - if err != nil { - return nil, err - } - return &storeQueryResponse, nil + jsonResponseStr, err := libwaku.StoreQuery(n.wakuCtx, string(b), strings.Join(addrs, ","), timeoutMs) + if err != nil { + return nil, fmt.Errorf("error WakuStoreQuery: %w", err) } - errMsg := "error WakuStoreQuery: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, errors.New(errMsg) + + storeQueryResponse := common.StoreQueryResponse{} + err = json.Unmarshal([]byte(jsonResponseStr), &storeQueryResponse) + if err != nil { + return nil, err + } + return &storeQueryResponse, nil } func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (common.MessageHash, error) { @@ -1002,28 +451,16 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu return common.MessageHash(""), err } - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPubsubTopic = C.CString(pubsubTopic) - var msg = C.CString(string(jsonMsg)) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPubsubTopic)) - defer C.free(unsafe.Pointer(msg)) - - wg.Add(1) - C.cGoWakuRelayPublish(n.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - parsedMsgHash, err := common.ToMessageHash(msgHash) - if err != nil { - return common.MessageHash(""), err - } - return parsedMsgHash, nil + msgHash, err := libwaku.RelayPublish(n.wakuCtx, pubsubTopic, string(jsonMsg), timeoutMs) + if err != nil { + return common.MessageHash(""), fmt.Errorf("WakuRelayPublish: %w", err) } - errMsg := "WakuRelayPublish: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return common.MessageHash(""), errors.New(errMsg) + + parsedMsgHash, err := common.ToMessageHash(msgHash) + if err != nil { + return common.MessageHash(""), err + } + return parsedMsgHash, nil } func (n *WakuNode) RelayPublishNoCTX(pubsubTopic string, message *pb.WakuMessage) (common.MessageHash, error) { @@ -1050,34 +487,23 @@ func (n *WakuNode) RelayPublishNoCTX(pubsubTopic string, message *pb.WakuMessage } func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cEnrTree = C.CString(enrTreeUrl) - var cDnsServer = C.CString(nameDnsServer) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cEnrTree)) - defer C.free(unsafe.Pointer(cDnsServer)) - timeoutMs := getContextTimeoutMilliseconds(ctx) - wg.Add(1) - C.cGoWakuDnsDiscovery(n.wakuCtx, cEnrTree, cDnsServer, C.int(timeoutMs), resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - var addrsRet []multiaddr.Multiaddr - nodeAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - addrss := strings.Split(nodeAddresses, ",") - for _, addr := range addrss { - addr, err := multiaddr.NewMultiaddr(addr) - if err != nil { - return nil, err - } - addrsRet = append(addrsRet, addr) - } - return addrsRet, nil + + nodeAddresses, err := libwaku.DnsDiscovery(n.wakuCtx, enrTreeUrl, nameDnsServer, timeoutMs) + if err != nil { + return nil, fmt.Errorf("error WakuDnsDiscovery: %w", err) } - errMsg := "error WakuDnsDiscovery: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, errors.New(errMsg) + + var addrsRet []multiaddr.Multiaddr + addrss := strings.Split(nodeAddresses, ",") + for _, addr := range addrss { + addr, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return nil, err + } + addrsRet = append(addrsRet, addr) + } + return addrsRet, nil } func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) { @@ -1086,73 +512,44 @@ func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.D addrs[i] = addr.String() } - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - var cPeerId = C.CString(strings.Join(addrs, ",")) - defer C.free(unsafe.Pointer(cPeerId)) - timeoutMs := getContextTimeoutMilliseconds(ctx) - wg.Add(1) - C.cGoWakuPingPeer(n.wakuCtx, cPeerId, C.int(timeoutMs), resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - rttStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - rttInt, err := strconv.ParseInt(rttStr, 10, 64) - if err != nil { - return 0, err - } - return time.Duration(rttInt), nil + + rttStr, err := libwaku.PingPeer(n.wakuCtx, strings.Join(addrs, ","), timeoutMs) + if err != nil { + return 0, fmt.Errorf("PingPeer: %w", err) } - errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return 0, fmt.Errorf("PingPeer: %s", errMsg) + rttInt, err := strconv.ParseInt(rttStr, 10, 64) + if err != nil { + return 0, err + } + return time.Duration(rttInt), nil } func (n *WakuNode) Start() error { Debug("Starting %s", n.nodeName) - wg := sync.WaitGroup{} - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuStart(n.wakuCtx, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - Debug("Successfully started %s", n.nodeName) - return nil + if err := libwaku.Start(n.wakuCtx); err != nil { + errMsg := "error WakuStart: " + err.Error() + Error("Failed to start %s: %s", n.nodeName, errMsg) + return errors.New(errMsg) } - errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to start %s: %s", n.nodeName, errMsg) - - return errors.New(errMsg) + Debug("Successfully started %s", n.nodeName) + return nil } func (n *WakuNode) Stop() error { Debug("Stopping %s", n.nodeName) - wg := sync.WaitGroup{} - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuStop(n.wakuCtx, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - Debug("Successfully stopped %s", n.nodeName) - return nil + if err := libwaku.Stop(n.wakuCtx); err != nil { + errMsg := "error WakuStop: " + err.Error() + Error("Failed to stop %s: %s", n.nodeName, errMsg) + return errors.New(errMsg) } - errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to stop %s: %s", n.nodeName, errMsg) - - return errors.New(errMsg) + Debug("Successfully stopped %s", n.nodeName) + return nil } func (n *WakuNode) Destroy() error { @@ -1164,274 +561,165 @@ func (n *WakuNode) Destroy() error { Debug("Destroying %v", n.nodeName) - wg := sync.WaitGroup{} - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuDestroy(n.wakuCtx, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - unregisterNode(n) - Debug("Successfully destroyed %s", n.nodeName) - return nil + if err := libwaku.Destroy(n.wakuCtx); err != nil { + errMsg := "error WakuDestroy: " + err.Error() + Error("Failed to destroy %v: %v", n.nodeName, errMsg) + return errors.New(errMsg) } - errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to destroy %v: %v", n.nodeName, errMsg) - - return errors.New(errMsg) + Debug("Successfully destroyed %s", n.nodeName) + return nil } func (n *WakuNode) PeerID() (peer.ID, error) { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuGetMyPeerId(n.wakuCtx, resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - id, err := peer.Decode(peerIdStr) - if err != nil { - errMsg := "WakuGetMyPeerId - decoding peerId: %w" - return "", fmt.Errorf(errMsg, err) - } - return id, nil + peerIdStr, err := libwaku.GetMyPeerID(n.wakuCtx) + if err != nil { + return "", err } - errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return "", errors.New(errMsg) + + id, err := peer.Decode(peerIdStr) + if err != nil { + errMsg := "WakuGetMyPeerId - decoding peerId: %w" + return "", fmt.Errorf(errMsg, err) + } + return id, nil } func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPeerMultiAddr = C.CString(addr.String()) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPeerMultiAddr)) - timeoutMs := getContextTimeoutMilliseconds(ctx) - wg.Add(1) - C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(timeoutMs), resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - return nil + + if err := libwaku.Connect(n.wakuCtx, addr.String(), timeoutMs); err != nil { + return fmt.Errorf("error WakuConnect: %w", err) } - errMsg := "error WakuConnect: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + return nil } func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPeerId = C.CString(peerID.String()) - var cProtocol = C.CString(string(protocol)) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPeerId)) - defer C.free(unsafe.Pointer(cProtocol)) - timeoutMs := getContextTimeoutMilliseconds(ctx) - wg.Add(1) - C.cGoWakuDialPeerById(n.wakuCtx, cPeerId, cProtocol, C.int(timeoutMs), resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - return nil + + if err := libwaku.DialPeerByID(n.wakuCtx, peerID.String(), string(protocol), timeoutMs); err != nil { + return fmt.Errorf("error DialPeerById: %w", err) } - errMsg := "error DialPeerById: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + return nil } func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuListenAddresses(n.wakuCtx, resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - var addrsRet []multiaddr.Multiaddr - listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - addrss := strings.Split(listenAddresses, ",") - for _, addr := range addrss { - addr, err := multiaddr.NewMultiaddr(addr) - if err != nil { - return nil, err - } - addrsRet = append(addrsRet, addr) - } - return addrsRet, nil + listenAddresses, err := libwaku.ListenAddresses(n.wakuCtx) + if err != nil { + return nil, fmt.Errorf("error WakuListenAddresses: %w", err) } - errMsg := "error WakuListenAddresses: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, errors.New(errMsg) -} -func (n *WakuNode) ENR() (*enode.Node, error) { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuGetMyENR(n.wakuCtx, resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - n, err := enode.Parse(enode.ValidSchemes, enrStr) + var addrsRet []multiaddr.Multiaddr + addrss := strings.Split(listenAddresses, ",") + for _, addr := range addrss { + addr, err := multiaddr.NewMultiaddr(addr) if err != nil { return nil, err } - return n, nil + addrsRet = append(addrsRet, addr) } - errMsg := "error WakuGetMyENR: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, errors.New(errMsg) + return addrsRet, nil +} + +func (n *WakuNode) ENR() (*enode.Node, error) { + enrStr, err := libwaku.GetMyENR(n.wakuCtx) + if err != nil { + return nil, fmt.Errorf("error WakuGetMyENR: %w", err) + } + + node, err := enode.Parse(enode.ValidSchemes, enrStr) + if err != nil { + return nil, err + } + return node, nil } func (n *WakuNode) GetNumPeersInMesh(pubsubTopic string) (int, error) { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPubsubTopic = C.CString(pubsubTopic) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPubsubTopic)) - - wg.Add(1) - C.cGoWakuGetNumPeersInMesh(n.wakuCtx, cPubsubTopic, resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - numPeers, err := strconv.Atoi(numPeersStr) - if err != nil { - errMsg := "GetNumPeersInMesh - error converting string to int: " + err.Error() - return 0, errors.New(errMsg) - } - return numPeers, nil + numPeersStr, err := libwaku.GetNumPeersInMesh(n.wakuCtx, pubsubTopic) + if err != nil { + return 0, fmt.Errorf("error GetNumPeersInMesh: %w", err) } - errMsg := "error GetNumPeersInMesh: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return 0, errors.New(errMsg) + + numPeers, err := strconv.Atoi(numPeersStr) + if err != nil { + errMsg := "GetNumPeersInMesh - error converting string to int: " + err.Error() + return 0, errors.New(errMsg) + } + return numPeers, nil } func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - if peersStr == "" { - return nil, nil - } - // peersStr contains a comma-separated list of peer ids - itemsPeerIds := strings.Split(peersStr, ",") - - var peers peer.IDSlice - for _, peerId := range itemsPeerIds { - id, err := peer.Decode(peerId) - if err != nil { - return nil, fmt.Errorf("GetPeerIdsFromPeerStore - decoding peerId: %w", err) - } - peers = append(peers, id) - } - return peers, nil + peersStr, err := libwaku.GetPeerIDsFromPeerStore(n.wakuCtx) + if err != nil { + return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", err.Error()) } - errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg) + + if peersStr == "" { + return nil, nil + } + // peersStr contains a comma-separated list of peer ids + itemsPeerIds := strings.Split(peersStr, ",") + + var peers peer.IDSlice + for _, peerId := range itemsPeerIds { + id, err := peer.Decode(peerId) + if err != nil { + return nil, fmt.Errorf("GetPeerIdsFromPeerStore - decoding peerId: %w", err) + } + peers = append(peers, id) + } + return peers, nil } func (n *WakuNode) GetConnectedPeersInfo() (common.PeersData, error) { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuGetConnectedPeersInfo(n.wakuCtx, resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - jsonStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - if jsonStr == "" { - return nil, nil - } - - peerData, err := common.ParsePeerInfoFromJSON(jsonStr) - - if err != nil { - return nil, fmt.Errorf("GetConnectedPeersInfo - failed parsing JSON: %w", err) - } - - return peerData, nil + jsonStr, err := libwaku.GetConnectedPeersInfo(n.wakuCtx) + if err != nil { + return nil, fmt.Errorf("GetConnectedPeersInfo: %s", err.Error()) } - errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, fmt.Errorf("GetConnectedPeersInfo: %s", errMsg) + + if jsonStr == "" { + return nil, nil + } + + peerData, err := common.ParsePeerInfoFromJSON(jsonStr) + + if err != nil { + return nil, fmt.Errorf("GetConnectedPeersInfo - failed parsing JSON: %w", err) + } + + return peerData, nil } func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cProtocol = C.CString(string(protocol)) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cProtocol)) - - wg.Add(1) - C.cGoWakuGetPeerIdsByProtocol(n.wakuCtx, cProtocol, resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - if peersStr == "" { - return nil, nil - } - // peersStr contains a comma-separated list of peer ids - itemsPeerIds := strings.Split(peersStr, ",") - - var peers peer.IDSlice - for _, p := range itemsPeerIds { - id, err := peer.Decode(p) - if err != nil { - return nil, fmt.Errorf("GetPeerIdsByProtocol - decoding peerId: %w", err) - } - peers = append(peers, id) - } - return peers, nil + peersStr, err := libwaku.GetPeerIDsByProtocol(n.wakuCtx, string(protocol)) + if err != nil { + return nil, fmt.Errorf("GetPeerIdsByProtocol: error GetPeerIdsByProtocol: %s", err.Error()) } - errMsg := "error GetPeerIdsByProtocol: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return nil, fmt.Errorf("GetPeerIdsByProtocol: %s", errMsg) + + if peersStr == "" { + return nil, nil + } + // peersStr contains a comma-separated list of peer ids + itemsPeerIds := strings.Split(peersStr, ",") + + var peers peer.IDSlice + for _, p := range itemsPeerIds { + id, err := peer.Decode(p) + if err != nil { + return nil, fmt.Errorf("GetPeerIdsByProtocol - decoding peerId: %w", err) + } + peers = append(peers, id) + } + return peers, nil } func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { - wg := sync.WaitGroup{} - - var resp = C.allocResp(unsafe.Pointer(&wg)) - var cPeerMultiAddr = C.CString(peerAddr.String()) - var cProtocol = C.CString(string(protocol)) - defer C.freeResp(resp) - defer C.free(unsafe.Pointer(cPeerMultiAddr)) - defer C.free(unsafe.Pointer(cProtocol)) - timeoutMs := getContextTimeoutMilliseconds(ctx) - wg.Add(1) - C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(timeoutMs), resp) - wg.Wait() - if C.getRet(resp) == C.RET_OK { - return nil + + if err := libwaku.DialPeer(n.wakuCtx, peerAddr.String(), string(protocol), timeoutMs); err != nil { + return fmt.Errorf("error DialPeer: %w", err) } - errMsg := "error DialPeer: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + return nil } func (n *WakuNode) GetNumConnectedPeers() (int, error) { @@ -1646,29 +934,14 @@ func (n *WakuNode) IsOnline() (bool, error) { Debug("Querying online state for %v", n.nodeName) - wg := sync.WaitGroup{} - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuIsOnline(n.wakuCtx, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - onlineStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - - if onlineStr == "true" { - return true, nil - } - - return false, nil - + onlineStr, err := libwaku.IsOnline(n.wakuCtx) + if err != nil { + errMsg := "error IsOnline: " + err.Error() + Error("Failed to query online state for %v: %v", n.nodeName, errMsg) + return false, errors.New(errMsg) } - errMsg := "error IsOnline: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to query online state for %v: %v", n.nodeName, errMsg) - - return false, errors.New(errMsg) + return onlineStr == "true", nil } func (n *WakuNode) GetMetrics() (string, error) { @@ -1680,29 +953,18 @@ func (n *WakuNode) GetMetrics() (string, error) { Debug("Querying metrics for %v", n.nodeName) - wg := sync.WaitGroup{} - var resp = C.allocResp(unsafe.Pointer(&wg)) - defer C.freeResp(resp) - - wg.Add(1) - C.cGoWakuGetMetrics(n.wakuCtx, resp) - wg.Wait() - - if C.getRet(resp) == C.RET_OK { - metricsStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - - if metricsStr == "" { - errMsg := "received empty metrics response" - Error(errMsg) - return "", errors.New(errMsg) - } - - return metricsStr, nil - + metricsStr, err := libwaku.GetMetrics(n.wakuCtx) + if err != nil { + errMsg := "error GetMetrics: " + err.Error() + Error("Failed to query metrics for %v: %v", n.nodeName, errMsg) + return "", errors.New(errMsg) } - errMsg := "error GetMetrics: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to query metrics for %v: %v", n.nodeName, errMsg) + if metricsStr == "" { + errMsg := "received empty metrics response" + Error(errMsg) + return "", errors.New(errMsg) + } - return "", errors.New(errMsg) + return metricsStr, nil }