diff --git a/nwaku.go b/nwaku.go index 46d0aa5..3377ac7 100644 --- a/nwaku.go +++ b/nwaku.go @@ -310,6 +310,7 @@ package wakuv2 import "C" import ( "context" + "encoding/json" "errors" "fmt" "runtime" @@ -321,6 +322,9 @@ import ( "github.com/libp2p/go-libp2p/core/peer" libp2pproto "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -420,7 +424,7 @@ func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) { // the same thread that started nwaku). Communication with the goroutine to send // operations to nwaku will be done via channels go func() { - defer gocommon.LogOnPanic() + // defer gocommon.LogOnPanic() TODO-nwaku runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -448,7 +452,7 @@ func New(nwakuCfg *WakuConfig, logger *zap.Logger) (*Waku, error) { } } - logger.Info("starting wakuv2 with config", zap.Any("nwakuCfg", nwakuCfg), zap.Any("wakuCfg", cfg)) + logger.Info("starting wakuv2 with config", zap.Any("nwakuCfg", nwakuCfg)) wakunode, err := newWakuNode(ctx, nwakuCfg) if err != nil { @@ -569,6 +573,66 @@ func (w *Waku) Start() error { return nil } +func (n *WakuNode) start() error { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuStart(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + + errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +// Stop implements node.Service, stopping the background data propagation thread +// of the Waku protocol. +func (w *Waku) Stop() error { + w.cancel() + + err := w.node.Stop() + if err != nil { + return err + } + + // w.wg.Wait() + + w.ctx = nil + w.cancel = nil + + return nil +} + +func (n *WakuNode) stop() error { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuStop(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + + errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) destroy() error { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuDestroy(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + + errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { var resp = C.allocResp() defer C.freeResp(resp) @@ -694,6 +758,11 @@ func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { return errors.New(errMsg) } +type connectRequest struct { + ctx context.Context + addr multiaddr.Multiaddr +} + func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { _, err := n.postTask(requestTypeConnect, connectRequest{ ctx: ctx, @@ -718,3 +787,132 @@ func (n *WakuNode) connect(connReq connectRequest) error { C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) return errors.New(errMsg) } + +type storeQueryRequest struct { + ctx context.Context + storeRequest *storepb.StoreQueryRequest + peerInfo peer.AddrInfo +} + +func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { + response, err := n.postTask(requestTypeStoreQuery, storeQueryRequest{ + ctx: ctx, + peerInfo: peerInfo, + storeRequest: storeRequest, + }) + if err != nil { + return nil, err + } + return response.(*storepb.StoreQueryResponse), nil +} + +func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) { + // TODO: extract timeout from context + timeoutMs := time.Minute.Milliseconds() + + b, err := json.Marshal(storeQueryRequest.storeRequest) + if err != nil { + return nil, err + } + + addrs := make([]string, len(storeQueryRequest.peerInfo.Addrs)) + for i, addr := range utils.EncapsulatePeerID(storeQueryRequest.peerInfo.ID, storeQueryRequest.peerInfo.Addrs...) { + addrs[i] = addr.String() + } + + var cJsonQuery = C.CString(string(b)) + var cPeerAddr = C.CString(strings.Join(addrs, ",")) + var resp = C.allocResp() + + defer C.free(unsafe.Pointer(cJsonQuery)) + defer C.free(unsafe.Pointer(cPeerAddr)) + defer C.freeResp(resp) + + C.cGoWakuStoreQuery(n.wakuCtx, cJsonQuery, cPeerAddr, C.int(timeoutMs), resp) + if C.getRet(resp) == C.RET_OK { + jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + storeQueryResponse := &storepb.StoreQueryResponse{} + err = json.Unmarshal([]byte(jsonResponseStr), storeQueryResponse) + if err != nil { + return nil, err + } + return storeQueryResponse, nil + } + errMsg := "error WakuStoreQuery: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) +} + +type relayPublishRequest struct { + ctx context.Context + pubsubTopic string + message *pb.WakuMessage +} + +func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { + response, err := n.postTask(requestTypeRelayPublish, relayPublishRequest{ + ctx: ctx, + pubsubTopic: pubsubTopic, + message: message, + }) + if err != nil { + return pb.MessageHash{}, err + } + return response.(pb.MessageHash), nil +} + +func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.MessageHash, error) { + // TODO: extract timeout from context + timeoutMs := time.Minute.Milliseconds() + + jsonMsg, err := json.Marshal(relayPublishRequest.message) + if err != nil { + return pb.MessageHash{}, err + } + + var cPubsubTopic = C.CString(relayPublishRequest.pubsubTopic) + var msg = C.CString(string(jsonMsg)) + var resp = C.allocResp() + + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPubsubTopic)) + defer C.free(unsafe.Pointer(msg)) + + C.cGoWakuRelayPublish(n.wakuCtx, cPubsubTopic, msg, C.int(timeoutMs), resp) + if C.getRet(resp) == C.RET_OK { + msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + msgHashBytes, err := hexutil.Decode(msgHash) + if err != nil { + return pb.MessageHash{}, err + } + return pb.ToMessageHash(msgHashBytes), nil + } + errMsg := "WakuRelayPublish: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return pb.MessageHash{}, errors.New(errMsg) +} + +func (n *WakuNode) newNode(config *WakuConfig) error { + jsonConfig, err := json.Marshal(config) + if err != nil { + return err + } + + var cJsonConfig = C.CString(string(jsonConfig)) + var resp = C.allocResp() + + defer C.free(unsafe.Pointer(cJsonConfig)) + defer C.freeResp(resp) + + if C.getRet(resp) != C.RET_OK { + errMsg := "error wakuNew: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) + } + + wakuCtx := C.cGoWakuNew(cJsonConfig, resp) + n.wakuCtx = unsafe.Pointer(wakuCtx) + + // Notice that the events for self node are handled by the 'MyEventCallback' method + C.cGoWakuSetEventCallback(n.wakuCtx) + + return nil +}