initial half of the changes

This commit is contained in:
Gabriel mermelstein 2024-12-03 18:34:51 +01:00
parent 22c28a243a
commit e0f960c5a7
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D

View File

@ -14,10 +14,13 @@ package waku
int ret;
char* msg;
size_t len;
void* wg;
} Resp;
static void* allocResp() {
return calloc(1, sizeof(Resp));
static void* allocResp(void* wg) {
Resp* r = calloc(1, sizeof(Resp));
r->wg = wg;
return r;
}
static void freeResp(void* resp) {
@ -51,52 +54,45 @@ package waku
}
// resp must be set != NULL in case interest on retrieving data from the callback
static void callback(int ret, char* msg, size_t len, void* resp) {
if (resp != NULL) {
Resp* m = (Resp*) resp;
m->ret = ret;
m->msg = msg;
m->len = len;
}
}
void GoCallback(int ret, char* msg, size_t len, void* resp);
#define WAKU_CALL(call) \
do { \
int ret = call; \
if (ret != 0) { \
printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \
exit(1); \
} \
int ret = call; \
if (ret != 0) { \
printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \
exit(1); \
} \
} while (0)
static void* cGoWakuNew(const char* configJson, void* resp) {
// We pass NULL because we are not interested in retrieving data from this callback
void* ret = waku_new(configJson, (WakuCallBack) callback, resp);
void* ret = waku_new(configJson, (WakuCallBack) GoCallback, resp);
return ret;
}
static void cGoWakuStart(void* wakuCtx, void* resp) {
WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) callback, resp));
WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) GoCallback, resp));
}
static void cGoWakuStop(void* wakuCtx, void* resp) {
WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) callback, resp));
WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp));
}
static void cGoWakuDestroy(void* wakuCtx, void* resp) {
WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) callback, resp));
WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp));
}
static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) {
WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) callback, resp));
WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp));
}
static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) {
WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) callback, resp));
WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp));
}
static void cGoWakuVersion(void* wakuCtx, void* resp) {
WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) callback, resp));
WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) GoCallback, resp));
}
static void cGoWakuSetEventCallback(void* wakuCtx) {
@ -127,16 +123,16 @@ package waku
appVersion,
contentTopicName,
encoding,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp) );
}
static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) {
WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) callback, resp) );
WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp) );
}
static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) {
WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) callback, resp));
WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp));
}
static void cGoWakuRelayPublish(void* wakuCtx,
@ -149,14 +145,14 @@ package waku
pubSubTopic,
jsonWakuMessage,
timeoutMs,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp));
}
static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {
WAKU_CALL ( waku_relay_subscribe(wakuCtx,
pubSubTopic,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp) );
}
@ -164,7 +160,7 @@ package waku
WAKU_CALL ( waku_relay_unsubscribe(wakuCtx,
pubSubTopic,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp) );
}
@ -172,7 +168,7 @@ package waku
WAKU_CALL( waku_connect(wakuCtx,
peerMultiAddr,
timeoutMs,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp) );
}
@ -186,7 +182,7 @@ package waku
peerMultiAddr,
protocol,
timeoutMs,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp) );
}
@ -200,47 +196,47 @@ package waku
peerId,
protocol,
timeoutMs,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp) );
}
static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) {
WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx,
peerId,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp) );
}
static void cGoWakuListenAddresses(void* wakuCtx, void* resp) {
WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) callback, resp) );
WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp) );
}
static void cGoWakuGetMyENR(void* ctx, void* resp) {
WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) callback, resp) );
WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp) );
}
static void cGoWakuGetMyPeerId(void* ctx, void* resp) {
WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) callback, resp) );
WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp) );
}
static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) {
WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) callback, resp) );
WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp) );
}
static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) {
WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) );
WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) );
}
static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) {
WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) );
WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) );
}
static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) {
WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) callback, resp) );
WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp) );
}
static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) {
WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) );
WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp) );
}
static void cGoWakuLightpushPublish(void* wakuCtx,
@ -251,7 +247,7 @@ package waku
WAKU_CALL (waku_lightpush_publish(wakuCtx,
pubSubTopic,
jsonWakuMessage,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp));
}
@ -265,7 +261,7 @@ package waku
jsonQuery,
peerAddr,
timeoutMs,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp));
}
@ -275,7 +271,7 @@ package waku
WAKU_CALL (waku_peer_exchange_request(wakuCtx,
numPeers,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp));
}
@ -285,7 +281,7 @@ package waku
WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx,
protocol,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp));
}
@ -299,7 +295,7 @@ package waku
entTreeUrl,
nameDnsServer,
timeoutMs,
(WakuCallBack) callback,
(WakuCallBack) GoCallback,
resp));
}
@ -313,6 +309,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
"unsafe"
@ -421,84 +418,75 @@ func (w *Waku) DropPeer(peerID peer.ID) error {
return w.node.DisconnectPeerByID(peerID)
}
type request struct {
id string
reqType requestType
input any
responseCh chan response
}
type response struct {
err error
value any
}
// WakuNode represents an instance of an nwaku node
type WakuNode struct {
wakuCtx unsafe.Pointer
cancel context.CancelFunc
requestCh chan *request
//export GoCallback
func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
if resp != nil {
m := (*C.Resp)(resp)
m.ret = ret
m.msg = msg
m.len = len
wg := (*sync.WaitGroup)(m.wg)
wg.Done()
}
}
type requestType int
const (
requestTypeNew requestType = iota + 1
requestTypePing
requestTypeStart
requestTypeRelayPublish
requestTypeStoreQuery
requestTypePeerID
requestTypeStop
requestTypeDestroy
requestTypeStartDiscV5
requestTypeStopDiscV5
requestTypeVersion
requestTypeRelaySubscribe
requestTypeRelayUnsubscribe
requestTypePeerExchangeRequest
requestTypeConnect
requestTypeDialPeerByID
requestTypeListenAddresses
requestTypeENR
requestTypeGetNumPeersInMesh
requestTypeGetConnectedPeers
requestTypeGetPeerIDsFromPeerStore
requestTypeGetPeerIDsByProtocol
requestTypeDisconnectPeerByID
requestTypeDnsDiscovery
requestTypeDialPeer
requestTypeGetNumConnectedRelayPeers
)
// WakuNode represents an instance of an nwaku node
type WakuNode struct {
wakuCtx unsafe.Pointer
cancel context.CancelFunc
}
func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) {
ctx, cancel := context.WithCancel(ctx)
n := &WakuNode{
requestCh: make(chan *request),
cancel: cancel,
cancel: cancel,
}
// Notice this runs insto a separate goroutine. This is because we can't be sure
// from which OS thread will go call nwaku operations (They need to be done from
// the same thread that started nwaku). Communication with the goroutine to send
// operations to nwaku will be done via channels
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
// defer gocommon.LogOnPanic() TODO-nwaku
// defer gocommon.LogOnPanic()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
C.waku_setup()
wg.Done()
n.processLoop(ctx)
<-ctx.Done()
}()
_, err := n.postTask(requestTypeNew, config)
wg.Wait()
jsonConfig, err := json.Marshal(config)
if err != nil {
cancel()
return nil, err
}
var cJsonConfig = C.CString(string(jsonConfig))
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.free(unsafe.Pointer(cJsonConfig))
defer C.freeResp(resp)
if C.getRet(resp) != C.RET_OK {
errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, errors.New(errMsg)
}
wg.Add(1)
n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp)
wg.Wait()
// Notice that the events for self node are handled by the 'MyEventCallback' method
C.cGoWakuSetEventCallback(n.wakuCtx)
return n, nil
}
@ -530,20 +518,6 @@ func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) {
}, nil
}
func (n *WakuNode) postTask(reqType requestType, input any) (any, error) {
responseCh := make(chan response)
n.requestCh <- &request{
reqType: reqType,
input: input,
responseCh: responseCh,
}
response := <-responseCh
if response.err != nil {
return nil, response.err
}
return response.value, nil
}
//export globalEventCallback
func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) {
// This is shared among all Golang instances
@ -552,81 +526,6 @@ func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData un
// self.MyEventCallback(callerRet, msg, len)
}
func (n *WakuNode) processLoop(ctx context.Context) {
for req := range n.requestCh {
switch req.reqType {
case requestTypeNew:
req.responseCh <- response{err: n.newNode(req.input.(*WakuConfig))}
case requestTypePing:
duration, err := n.pingPeer(req.input.(pingRequest))
req.responseCh <- response{value: duration, err: err}
case requestTypeStart:
req.responseCh <- response{err: n.start()}
case requestTypeRelayPublish:
hash, err := n.relayPublish(req.input.(relayPublishRequest))
req.responseCh <- response{value: hash, err: err}
case requestTypeStoreQuery:
results, err := n.storeQuery(req.input.(storeQueryRequest))
req.responseCh <- response{value: results, err: err}
case requestTypeDestroy:
req.responseCh <- response{err: n.destroy()}
case requestTypePeerID:
peerID, err := n.peerID()
req.responseCh <- response{value: peerID, err: err}
case requestTypeStop:
req.responseCh <- response{err: n.stop()}
case requestTypeStartDiscV5:
req.responseCh <- response{err: n.startDiscV5()}
case requestTypeStopDiscV5:
req.responseCh <- response{err: n.stopDiscV5()}
case requestTypeVersion:
version, err := n.version()
req.responseCh <- response{value: version, err: err}
case requestTypePeerExchangeRequest:
numPeers, err := n.peerExchangeRequest(req.input.(uint64))
req.responseCh <- response{value: numPeers, err: err}
case requestTypeRelaySubscribe:
req.responseCh <- response{err: n.relaySubscribe(req.input.(string))}
case requestTypeRelayUnsubscribe:
req.responseCh <- response{err: n.relayUnsubscribe(req.input.(string))}
case requestTypeConnect:
req.responseCh <- response{err: n.connect(req.input.(connectRequest))}
case requestTypeDialPeerByID:
req.responseCh <- response{err: n.dialPeerById(req.input.(dialPeerByIDRequest))}
case requestTypeListenAddresses:
addrs, err := n.listenAddresses()
req.responseCh <- response{value: addrs, err: err}
case requestTypeENR:
enr, err := n.enr()
req.responseCh <- response{value: enr, err: err}
case requestTypeGetNumPeersInMesh:
numPeers, err := n.getNumPeersInMesh(req.input.(string))
req.responseCh <- response{value: numPeers, err: err}
case requestTypeGetConnectedPeers:
peers, err := n.getConnectedPeers()
req.responseCh <- response{value: peers, err: err}
case requestTypeGetPeerIDsFromPeerStore:
peers, err := n.getPeerIDsFromPeerStore()
req.responseCh <- response{value: peers, err: err}
case requestTypeGetPeerIDsByProtocol:
peers, err := n.getPeerIDsByProtocol(req.input.(libp2pproto.ID))
req.responseCh <- response{value: peers, err: err}
case requestTypeDisconnectPeerByID:
req.responseCh <- response{err: n.disconnectPeerByID(req.input.(peer.ID))}
case requestTypeDnsDiscovery:
addrs, err := n.dnsDiscovery(req.input.(dnsDiscoveryRequest))
req.responseCh <- response{value: addrs, err: err}
case requestTypeDialPeer:
req.responseCh <- response{err: n.dialPeer(req.input.(dialPeerRequest))}
case requestTypeGetNumConnectedRelayPeers:
numPeers, err := n.getNumConnectedRelayPeers(req.input.([]string)...)
req.responseCh <- response{value: numPeers, err: err}
default:
req.responseCh <- response{err: errors.New("invalid operation")}
}
}
}
func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
var pubsubTopic string
if len(optPubsubTopic) == 0 {