more fixes

This commit is contained in:
Gabriel mermelstein 2024-11-27 16:41:30 +02:00
parent c8284a342e
commit dd035cbbf5
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D

202
nwaku.go
View File

@ -310,6 +310,7 @@ package wakuv2
import "C"
import (
"context"
"encoding/json"
"errors"
"fmt"
"runtime"
@ -321,6 +322,9 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
libp2pproto "github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -420,7 +424,7 @@ func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) {
// the same thread that started nwaku). Communication with the goroutine to send
// operations to nwaku will be done via channels
go func() {
defer gocommon.LogOnPanic()
// defer gocommon.LogOnPanic() TODO-nwaku
runtime.LockOSThread()
defer runtime.UnlockOSThread()
@ -448,7 +452,7 @@ func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) {
}
}
logger.Info("starting wakuv2 with config", zap.Any("nwakuCfg", nwakuCfg), zap.Any("wakuCfg", cfg))
logger.Info("starting wakuv2 with config", zap.Any("nwakuCfg", nwakuCfg))
wakunode, err := newWakuNode(ctx, nwakuCfg)
if err != nil {
@ -569,6 +573,66 @@ func (w *Waku) Start() error {
return nil
}
func (n *WakuNode) start() error {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuStart(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
// Stop implements node.Service, stopping the background data propagation thread
// of the Waku protocol.
func (w *Waku) Stop() error {
w.cancel()
err := w.node.Stop()
if err != nil {
return err
}
// w.wg.Wait()
w.ctx = nil
w.cancel = nil
return nil
}
func (n *WakuNode) stop() error {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuStop(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) destroy() error {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuDestroy(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
@ -694,6 +758,11 @@ func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error {
return errors.New(errMsg)
}
type connectRequest struct {
ctx context.Context
addr multiaddr.Multiaddr
}
func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error {
_, err := n.postTask(requestTypeConnect, connectRequest{
ctx: ctx,
@ -718,3 +787,132 @@ func (n *WakuNode) connect(connReq connectRequest) error {
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
type storeQueryRequest struct {
ctx context.Context
storeRequest *storepb.StoreQueryRequest
peerInfo peer.AddrInfo
}
func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) {
response, err := n.postTask(requestTypeStoreQuery, storeQueryRequest{
ctx: ctx,
peerInfo: peerInfo,
storeRequest: storeRequest,
})
if err != nil {
return nil, err
}
return response.(*storepb.StoreQueryResponse), nil
}
func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) {
// TODO: extract timeout from context
timeoutMs := time.Minute.Milliseconds()
b, err := json.Marshal(storeQueryRequest.storeRequest)
if err != nil {
return nil, err
}
addrs := make([]string, len(storeQueryRequest.peerInfo.Addrs))
for i, addr := range utils.EncapsulatePeerID(storeQueryRequest.peerInfo.ID, storeQueryRequest.peerInfo.Addrs...) {
addrs[i] = addr.String()
}
var cJsonQuery = C.CString(string(b))
var cPeerAddr = C.CString(strings.Join(addrs, ","))
var resp = C.allocResp()
defer C.free(unsafe.Pointer(cJsonQuery))
defer C.free(unsafe.Pointer(cPeerAddr))
defer C.freeResp(resp)
C.cGoWakuStoreQuery(n.wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp)
if C.getRet(resp) == C.RET_OK {
jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
storeQueryResponse := &storepb.StoreQueryResponse{}
err = json.Unmarshal([]byte(jsonResponseStr), storeQueryResponse)
if err != nil {
return nil, err
}
return storeQueryResponse, nil
}
errMsg := "error WakuStoreQuery: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, errors.New(errMsg)
}
type relayPublishRequest struct {
ctx context.Context
pubsubTopic string
message *pb.WakuMessage
}
func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
response, err := n.postTask(requestTypeRelayPublish, relayPublishRequest{
ctx: ctx,
pubsubTopic: pubsubTopic,
message: message,
})
if err != nil {
return pb.MessageHash{}, err
}
return response.(pb.MessageHash), nil
}
func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.MessageHash, error) {
// TODO: extract timeout from context
timeoutMs := time.Minute.Milliseconds()
jsonMsg, err := json.Marshal(relayPublishRequest.message)
if err != nil {
return pb.MessageHash{}, err
}
var cPubsubTopic = C.CString(relayPublishRequest.pubsubTopic)
var msg = C.CString(string(jsonMsg))
var resp = C.allocResp()
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPubsubTopic))
defer C.free(unsafe.Pointer(msg))
C.cGoWakuRelayPublish(n.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp)
if C.getRet(resp) == C.RET_OK {
msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
msgHashBytes, err := hexutil.Decode(msgHash)
if err != nil {
return pb.MessageHash{}, err
}
return pb.ToMessageHash(msgHashBytes), nil
}
errMsg := "WakuRelayPublish: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return pb.MessageHash{}, errors.New(errMsg)
}
func (n *WakuNode) newNode(config *WakuConfig) error {
jsonConfig, err := json.Marshal(config)
if err != nil {
return err
}
var cJsonConfig = C.CString(string(jsonConfig))
var resp = C.allocResp()
defer C.free(unsafe.Pointer(cJsonConfig))
defer C.freeResp(resp)
if C.getRet(resp) != C.RET_OK {
errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
wakuCtx := C.cGoWakuNew(cJsonConfig, resp)
n.wakuCtx = unsafe.Pointer(wakuCtx)
// Notice that the events for self node are handled by the 'MyEventCallback' method
C.cGoWakuSetEventCallback(n.wakuCtx)
return nil
}