more changes of the async pr

This commit is contained in:
Gabriel mermelstein 2024-12-03 18:51:02 +01:00
parent e0f960c5a7
commit d1179a0687
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D

View File

@ -526,7 +526,7 @@ func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData un
// self.MyEventCallback(callerRet, msg, len)
}
func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
var pubsubTopic string
if len(optPubsubTopic) == 0 {
pubsubTopic = ""
@ -534,9 +534,12 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err
pubsubTopic = optPubsubTopic[0]
}
var resp = C.allocResp()
var cPubsubTopic = C.CString(pubsubTopic)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
var cPubsubTopic = C.CString(pubsubTopic)
defer C.free(unsafe.Pointer(cPubsubTopic))
C.cGoWakuGetNumConnectedRelayPeers(n.wakuCtx, cPubsubTopic, resp)
@ -555,13 +558,17 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err
return 0, errors.New(errMsg)
}
func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error {
var resp = C.allocResp()
func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
var cPeerId = C.CString(peerID.String())
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerId))
wg.Add(1)
C.cGoWakuDisconnectPeerById(n.wakuCtx, cPeerId, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
return nil
@ -570,10 +577,16 @@ func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error {
return errors.New(errMsg)
}
func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) {
var resp = C.allocResp()
func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuGetConnectedPeers(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
@ -596,12 +609,14 @@ func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) {
}
func (n *WakuNode) relaySubscribe(pubsubTopic string) error {
func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
if pubsubTopic == "" {
return errors.New("pubsub topic is empty")
}
var resp = C.allocResp()
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
var cPubsubTopic = C.CString(pubsubTopic)
defer C.freeResp(resp)
@ -611,7 +626,9 @@ func (n *WakuNode) relaySubscribe(pubsubTopic string) error {
return errors.New("wakuCtx is nil")
}
wg.Add(1)
C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
return nil
@ -621,12 +638,14 @@ func (n *WakuNode) relaySubscribe(pubsubTopic string) error {
return errors.New(errMsg)
}
func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error {
func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error {
if pubsubTopic == "" {
return errors.New("pubsub topic is empty")
}
var resp = C.allocResp()
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
var cPubsubTopic = C.CString(pubsubTopic)
defer C.freeResp(resp)
@ -636,7 +655,9 @@ func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error {
return errors.New("wakuCtx is nil")
}
wg.Add(1)
C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
return nil
@ -664,11 +685,15 @@ func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) {
return 0, errors.New(errMsg)
}
func (n *WakuNode) startDiscV5() error {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuStartDiscV5(n.wakuCtx, resp)
func (n *WakuNode) StartDiscV5() error {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuStartDiscV5(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
return nil
}
@ -676,10 +701,15 @@ func (n *WakuNode) startDiscV5() error {
return errors.New(errMsg)
}
func (n *WakuNode) stopDiscV5() error {
var resp = C.allocResp()
func (n *WakuNode) StopDiscV5() error {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuStopDiscV5(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
return nil
@ -688,11 +718,15 @@ func (n *WakuNode) stopDiscV5() error {
return errors.New(errMsg)
}
func (n *WakuNode) version() (string, error) {
var resp = C.allocResp()
func (n *WakuNode) Version() (string, error) {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuVersion(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
@ -704,29 +738,34 @@ func (n *WakuNode) version() (string, error) {
return "", errors.New(errMsg)
}
func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) {
func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) {
// TODO: extract timeout from context
timeoutMs := time.Minute.Milliseconds()
b, err := json.Marshal(storeQueryRequest.storeRequest)
b, err := json.Marshal(storeRequest)
if err != nil {
return nil, err
}
addrs := make([]string, len(storeQueryRequest.peerInfo.Addrs))
for i, addr := range utils.EncapsulatePeerID(storeQueryRequest.peerInfo.ID, storeQueryRequest.peerInfo.Addrs...) {
addrs := make([]string, len(peerInfo.Addrs))
for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) {
addrs[i] = addr.String()
}
var cJsonQuery = C.CString(string(b))
var cPeerAddr = C.CString(strings.Join(addrs, ","))
var resp = C.allocResp()
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.free(unsafe.Pointer(cJsonQuery))
defer C.free(unsafe.Pointer(cPeerAddr))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuStoreQuery(n.wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
storeQueryResponse := &storepb.StoreQueryResponse{}
@ -741,24 +780,27 @@ func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error)
return nil, errors.New(errMsg)
}
func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.MessageHash, error) {
func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
// TODO: extract timeout from context
timeoutMs := time.Minute.Milliseconds()
jsonMsg, err := json.Marshal(relayPublishRequest.message)
jsonMsg, err := json.Marshal(message)
if err != nil {
return pb.MessageHash{}, err
}
var cPubsubTopic = C.CString(relayPublishRequest.pubsubTopic)
var msg = C.CString(string(jsonMsg))
var resp = C.allocResp()
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
var cPubsubTopic = C.CString(pubsubTopic)
var msg = C.CString(string(jsonMsg))
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPubsubTopic))
defer C.free(unsafe.Pointer(msg))
wg.Add(1)
C.cGoWakuRelayPublish(n.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
msgHashBytes, err := hexutil.Decode(msgHash)
@ -771,15 +813,20 @@ func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.Mes
return pb.MessageHash{}, errors.New(errMsg)
}
func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr.Multiaddr, error) {
var resp = C.allocResp()
var cEnrTree = C.CString(dnsDiscRequest.enrTreeUrl)
var cDnsServer = C.CString(dnsDiscRequest.nameDnsServer)
func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
var cEnrTree = C.CString(enrTreeUrl)
var cDnsServer = C.CString(nameDnsServer)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cEnrTree))
defer C.free(unsafe.Pointer(cDnsServer))
// TODO: extract timeout from context
wg.Add(1)
C.cGoWakuDnsDiscovery(n.wakuCtx, cEnrTree, cDnsServer, C.int(time.Minute.Milliseconds()), resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
var addrsRet []multiaddr.Multiaddr
nodeAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
@ -797,21 +844,24 @@ func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr
return nil, errors.New(errMsg)
}
func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) {
peerInfo := request.peerInfo
func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) {
addrs := make([]string, len(peerInfo.Addrs))
for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) {
addrs[i] = addr.String()
}
var resp = C.allocResp()
var cPeerId = C.CString(strings.Join(addrs, ","))
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
var cPeerId = C.CString(strings.Join(addrs, ","))
defer C.free(unsafe.Pointer(cPeerId))
// TODO: extract timeout from ctx
wg.Add(1)
C.cGoWakuPingPeer(n.wakuCtx, cPeerId, C.int(time.Minute.Milliseconds()), resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
rttStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
rttInt, err := strconv.ParseInt(rttStr, 10, 64)
@ -825,12 +875,15 @@ func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) {
return 0, fmt.Errorf("PingPeer: %s", errMsg)
}
func (n *WakuNode) start() error {
var resp = C.allocResp()
func (n *WakuNode) Start() error {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuStart(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
return nil
}
@ -839,12 +892,15 @@ func (n *WakuNode) start() error {
return errors.New(errMsg)
}
func (n *WakuNode) stop() error {
var resp = C.allocResp()
func (n *WakuNode) Stop() error {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuStop(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
return nil
}
@ -853,12 +909,15 @@ func (n *WakuNode) stop() error {
return errors.New(errMsg)
}
func (n *WakuNode) destroy() error {
var resp = C.allocResp()
func (n *WakuNode) Destroy() error {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuDestroy(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
return nil
}
@ -867,12 +926,15 @@ func (n *WakuNode) destroy() error {
return errors.New(errMsg)
}
func (n *WakuNode) peerID() (peer.ID, error) {
var resp = C.allocResp()
func (n *WakuNode) PeerID() (peer.ID, error) {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuGetMyPeerId(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
id, err := peer.Decode(peerIdStr)
@ -886,15 +948,18 @@ func (n *WakuNode) peerID() (peer.ID, error) {
return "", errors.New(errMsg)
}
func (n *WakuNode) connect(connReq connectRequest) error {
var resp = C.allocResp()
var cPeerMultiAddr = C.CString(connReq.addr.String())
func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
var cPeerMultiAddr = C.CString(addr.String())
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerMultiAddr))
// TODO: extract timeout from ctx
wg.Add(1)
C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(time.Minute.Milliseconds()), resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
return nil
}
@ -903,17 +968,20 @@ func (n *WakuNode) connect(connReq connectRequest) error {
return errors.New(errMsg)
}
func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error {
var resp = C.allocResp()
var cPeerId = C.CString(dialPeerByIDReq.peerID.String())
var cProtocol = C.CString(string(dialPeerByIDReq.protocol))
func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
var cPeerId = C.CString(peerID.String())
var cProtocol = C.CString(string(protocol))
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerId))
defer C.free(unsafe.Pointer(cProtocol))
// TODO: extract timeout from ctx
wg.Add(1)
C.cGoWakuDialPeerById(n.wakuCtx, cPeerId, cProtocol, C.int(time.Minute.Milliseconds()), resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
return nil
}
@ -922,11 +990,15 @@ func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error {
return errors.New(errMsg)
}
func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuListenAddresses(n.wakuCtx, resp)
func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuListenAddresses(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
var addrsRet []multiaddr.Multiaddr
listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
@ -944,11 +1016,15 @@ func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) {
return nil, errors.New(errMsg)
}
func (n *WakuNode) enr() (*enode.Node, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuGetMyENR(n.wakuCtx, resp)
func (n *WakuNode) ENR() (*enode.Node, error) {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuGetMyENR(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
n, err := enode.Parse(enode.ValidSchemes, enrStr)
@ -962,14 +1038,17 @@ func (n *WakuNode) enr() (*enode.Node, error) {
return nil, errors.New(errMsg)
}
func (n *WakuNode) getNumPeersInMesh(pubsubTopic string) (int, error) {
var resp = C.allocResp()
func (n *WakuNode) GetNumPeersInMesh(pubsubTopic string) (int, error) {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
var cPubsubTopic = C.CString(pubsubTopic)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPubsubTopic))
wg.Add(1)
C.cGoWakuGetNumPeersInMesh(n.wakuCtx, cPubsubTopic, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
numPeers, err := strconv.Atoi(numPeersStr)
@ -984,11 +1063,15 @@ func (n *WakuNode) getNumPeersInMesh(pubsubTopic string) (int, error) {
return 0, errors.New(errMsg)
}
func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp)
func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
wg.Add(1)
C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
@ -1011,14 +1094,17 @@ func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) {
return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg)
}
func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice, error) {
var resp = C.allocResp()
var cProtocol = C.CString(string(protocolID))
func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) {
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
var cProtocol = C.CString(string(protocol))
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cProtocol))
wg.Add(1)
C.cGoWakuGetPeerIdsByProtocol(n.wakuCtx, cProtocol, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
@ -1042,41 +1128,19 @@ func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice
return nil, fmt.Errorf("GetPeerIdsByProtocol: %s", errMsg)
}
func (n *WakuNode) newNode(config *WakuConfig) error {
jsonConfig, err := json.Marshal(config)
if err != nil {
return err
}
func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error {
wg := sync.WaitGroup{}
var cJsonConfig = C.CString(string(jsonConfig))
var resp = C.allocResp()
defer C.free(unsafe.Pointer(cJsonConfig))
defer C.freeResp(resp)
if C.getRet(resp) != C.RET_OK {
errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
wakuCtx := C.cGoWakuNew(cJsonConfig, resp)
n.wakuCtx = unsafe.Pointer(wakuCtx)
// Notice that the events for self node are handled by the 'MyEventCallback' method
C.cGoWakuSetEventCallback(n.wakuCtx)
return nil
}
func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error {
var resp = C.allocResp()
var cPeerMultiAddr = C.CString(dialPeerReq.peerAddr.String())
var cProtocol = C.CString(string(dialPeerReq.protocol))
var resp = C.allocResp(unsafe.Pointer(&wg))
var cPeerMultiAddr = C.CString(peerAddr.String())
var cProtocol = C.CString(string(protocol))
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerMultiAddr))
defer C.free(unsafe.Pointer(cProtocol))
// TODO: extract timeout from context
wg.Add(1)
C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(requestTimeout.Milliseconds()), resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
return nil
}