This commit is contained in:
Gabriel mermelstein 2024-11-27 17:25:26 +02:00
parent dd035cbbf5
commit 73321c4199
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D

940
nwaku.go
View File

@ -319,10 +319,12 @@ import (
"time"
"unsafe"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
libp2pproto "github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
@ -361,6 +363,63 @@ type Waku struct {
logger *zap.Logger
}
// Start implements node.Service, starting the background data propagation thread
// of the Waku protocol.
func (w *Waku) Start() error {
err := w.node.Start()
if err != nil {
return fmt.Errorf("failed to start nwaku node: %v", err)
}
peerID, err := w.node.PeerID()
if err != nil {
return err
}
w.logger.Info("WakuV2 PeerID", zap.Stringer("id", peerID))
return nil
}
// Stop implements node.Service, stopping the background data propagation thread
// of the Waku protocol.
func (w *Waku) Stop() error {
w.cancel()
err := w.node.Stop()
if err != nil {
return err
}
// w.wg.Wait()
w.ctx = nil
w.cancel = nil
return nil
}
func (w *Waku) PeerCount() (int, error) {
return w.node.GetNumConnectedPeers()
}
func (w *Waku) DialPeer(address multiaddr.Multiaddr) error {
// Using WakuConnect so it matches the go-waku's behavior and terminology
ctx, cancel := context.WithTimeout(w.ctx, requestTimeout)
defer cancel()
return w.node.Connect(ctx, address)
}
func (w *Waku) DialPeerByID(peerID peer.ID) error {
ctx, cancel := context.WithTimeout(w.ctx, requestTimeout)
defer cancel()
return w.node.DialPeerByID(ctx, peerID, relay.WakuRelayID_v200)
}
func (w *Waku) DropPeer(peerID peer.ID) error {
return w.node.DisconnectPeerByID(peerID)
}
type request struct {
id string
reqType requestType
@ -555,118 +614,6 @@ func (n *WakuNode) processLoop(ctx context.Context) {
}
}
// Start implements node.Service, starting the background data propagation thread
// of the Waku protocol.
func (w *Waku) Start() error {
err := w.node.Start()
if err != nil {
return fmt.Errorf("failed to start nwaku node: %v", err)
}
peerID, err := w.node.PeerID()
if err != nil {
return err
}
w.logger.Info("WakuV2 PeerID", zap.Stringer("id", peerID))
return nil
}
func (n *WakuNode) start() error {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuStart(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
// Stop implements node.Service, stopping the background data propagation thread
// of the Waku protocol.
func (w *Waku) Stop() error {
w.cancel()
err := w.node.Stop()
if err != nil {
return err
}
// w.wg.Wait()
w.ctx = nil
w.cancel = nil
return nil
}
func (n *WakuNode) stop() error {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuStop(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) destroy() error {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuDestroy(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuListenAddresses(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
var addrsRet []multiaddr.Multiaddr
listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
addrss := strings.Split(listenAddresses, ",")
for _, addr := range addrss {
addr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return nil, err
}
addrsRet = append(addrsRet, addr)
}
return addrsRet, nil
}
errMsg := "error WakuListenAddresses: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, errors.New(errMsg)
}
func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) {
response, err := n.postTask(requestTypeListenAddresses, nil)
if err != nil {
return nil, err
}
return response.([]multiaddr.Multiaddr), nil
}
func (w *Waku) PeerCount() (int, error) {
return w.node.GetNumConnectedPeers()
}
func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
var pubsubTopic string
if len(optPubsubTopic) == 0 {
@ -696,114 +643,153 @@ func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, err
return 0, errors.New(errMsg)
}
func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
response, err := n.postTask(requestTypeGetConnectedPeers, nil)
if err != nil {
return nil, err
}
return response.(peer.IDSlice), nil
}
func (n *WakuNode) GetNumConnectedPeers() (int, error) {
peers, err := n.GetConnectedPeers()
if err != nil {
return 0, err
}
return len(peers), nil
}
func (n *WakuNode) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) {
response, err := n.postTask(requestTypeGetNumConnectedRelayPeers, paramPubsubTopic)
if err != nil {
return 0, err
}
return response.(int), nil
}
type dialPeerRequest struct {
ctx context.Context
peerAddr multiaddr.Multiaddr
protocol libp2pproto.ID
}
func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error {
_, err := n.postTask(requestTypeDialPeer, dialPeerRequest{
ctx: ctx,
peerAddr: peerAddr,
protocol: protocol,
})
return err
}
func (w *Waku) DialPeer(address multiaddr.Multiaddr) error {
// Using WakuConnect so it matches the go-waku's behavior and terminology
ctx, cancel := context.WithTimeout(w.ctx, requestTimeout)
defer cancel()
return w.node.Connect(ctx, address)
}
func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error {
func (n *WakuNode) disconnectPeerByID(peerID peer.ID) error {
var resp = C.allocResp()
var cPeerMultiAddr = C.CString(dialPeerReq.peerAddr.String())
var cProtocol = C.CString(string(dialPeerReq.protocol))
var cPeerId = C.CString(peerID.String())
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerMultiAddr))
defer C.free(unsafe.Pointer(cProtocol))
// TODO: extract timeout from context
C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(requestTimeout.Milliseconds()), resp)
defer C.free(unsafe.Pointer(cPeerId))
C.cGoWakuDisconnectPeerById(n.wakuCtx, cPeerId, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error DialPeer: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
errMsg := "error DisconnectPeerById: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
type connectRequest struct {
ctx context.Context
addr multiaddr.Multiaddr
}
func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error {
_, err := n.postTask(requestTypeConnect, connectRequest{
ctx: ctx,
addr: addr,
})
return err
}
func (n *WakuNode) connect(connReq connectRequest) error {
func (n *WakuNode) getConnectedPeers() (peer.IDSlice, error) {
var resp = C.allocResp()
var cPeerMultiAddr = C.CString(connReq.addr.String())
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerMultiAddr))
C.cGoWakuGetConnectedPeers(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
return nil, nil
}
// peersStr contains a comma-separated list of peer ids
itemsPeerIds := strings.Split(peersStr, ",")
var peers peer.IDSlice
for _, peerId := range itemsPeerIds {
id, err := peer.Decode(peerId)
if err != nil {
return nil, fmt.Errorf("GetConnectedPeers - decoding peerId: %w", err)
}
peers = append(peers, id)
}
return peers, nil
}
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, fmt.Errorf("GetConnectedPeers: %s", errMsg)
// TODO: extract timeout from ctx
C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(time.Minute.Milliseconds()), resp)
}
func (n *WakuNode) relaySubscribe(pubsubTopic string) error {
if pubsubTopic == "" {
return errors.New("pubsub topic is empty")
}
var resp = C.allocResp()
var cPubsubTopic = C.CString(pubsubTopic)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPubsubTopic))
if n.wakuCtx == nil {
return errors.New("wakuCtx is nil")
}
C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuConnect: " +
errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) relayUnsubscribe(pubsubTopic string) error {
if pubsubTopic == "" {
return errors.New("pubsub topic is empty")
}
var resp = C.allocResp()
var cPubsubTopic = C.CString(pubsubTopic)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPubsubTopic))
if n.wakuCtx == nil {
return errors.New("wakuCtx is nil")
}
C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuRelayUnsubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuPeerExchangeQuery(n.wakuCtx, C.uint64_t(numPeers), resp)
if C.getRet(resp) == C.RET_OK {
numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64)
if err != nil {
return 0, err
}
return numRecvPeers, nil
}
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return 0, errors.New(errMsg)
}
func (n *WakuNode) startDiscV5() error {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuStartDiscV5(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) stopDiscV5() error {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuStopDiscV5(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) version() (string, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuVersion(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return version, nil
}
errMsg := "error WakuVersion: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
type storeQueryRequest struct {
ctx context.Context
storeRequest *storepb.StoreQueryRequest
peerInfo peer.AddrInfo
}
func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) {
response, err := n.postTask(requestTypeStoreQuery, storeQueryRequest{
ctx: ctx,
peerInfo: peerInfo,
storeRequest: storeRequest,
})
if err != nil {
return nil, err
}
return response.(*storepb.StoreQueryResponse), nil
return "", errors.New(errMsg)
}
func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error) {
@ -843,24 +829,6 @@ func (n *WakuNode) storeQuery(storeQueryRequest storeQueryRequest) (any, error)
return nil, errors.New(errMsg)
}
type relayPublishRequest struct {
ctx context.Context
pubsubTopic string
message *pb.WakuMessage
}
func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
response, err := n.postTask(requestTypeRelayPublish, relayPublishRequest{
ctx: ctx,
pubsubTopic: pubsubTopic,
message: message,
})
if err != nil {
return pb.MessageHash{}, err
}
return response.(pb.MessageHash), nil
}
func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.MessageHash, error) {
// TODO: extract timeout from context
timeoutMs := time.Minute.Milliseconds()
@ -891,6 +859,277 @@ func (n *WakuNode) relayPublish(relayPublishRequest relayPublishRequest) (pb.Mes
return pb.MessageHash{}, errors.New(errMsg)
}
func (n *WakuNode) dnsDiscovery(dnsDiscRequest dnsDiscoveryRequest) ([]multiaddr.Multiaddr, error) {
var resp = C.allocResp()
var cEnrTree = C.CString(dnsDiscRequest.enrTreeUrl)
var cDnsServer = C.CString(dnsDiscRequest.nameDnsServer)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cEnrTree))
defer C.free(unsafe.Pointer(cDnsServer))
// TODO: extract timeout from context
C.cGoWakuDnsDiscovery(n.wakuCtx, cEnrTree, cDnsServer, C.int(time.Minute.Milliseconds()), resp)
if C.getRet(resp) == C.RET_OK {
var addrsRet []multiaddr.Multiaddr
nodeAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
addrss := strings.Split(nodeAddresses, ",")
for _, addr := range addrss {
addr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return nil, err
}
addrsRet = append(addrsRet, addr)
}
return addrsRet, nil
}
errMsg := "error WakuDnsDiscovery: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, errors.New(errMsg)
}
func (n *WakuNode) pingPeer(request pingRequest) (time.Duration, error) {
peerInfo := request.peerInfo
addrs := make([]string, len(peerInfo.Addrs))
for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) {
addrs[i] = addr.String()
}
var resp = C.allocResp()
var cPeerId = C.CString(strings.Join(addrs, ","))
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerId))
// TODO: extract timeout from ctx
C.cGoWakuPingPeer(n.wakuCtx, cPeerId, C.int(time.Minute.Milliseconds()), resp)
if C.getRet(resp) == C.RET_OK {
rttStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
rttInt, err := strconv.ParseInt(rttStr, 10, 64)
if err != nil {
return 0, err
}
return time.Duration(rttInt), nil
}
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return 0, fmt.Errorf("PingPeer: %s", errMsg)
}
func (n *WakuNode) start() error {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuStart(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) stop() error {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuStop(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) destroy() error {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuDestroy(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) peerID() (peer.ID, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuGetMyPeerId(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
id, err := peer.Decode(peerIdStr)
if err != nil {
errMsg := "WakuGetMyPeerId - decoding peerId: %w"
return "", fmt.Errorf(errMsg, err)
}
return id, nil
}
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return "", errors.New(errMsg)
}
func (n *WakuNode) connect(connReq connectRequest) error {
var resp = C.allocResp()
var cPeerMultiAddr = C.CString(connReq.addr.String())
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerMultiAddr))
// TODO: extract timeout from ctx
C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(time.Minute.Milliseconds()), resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error WakuConnect: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) dialPeerById(dialPeerByIDReq dialPeerByIDRequest) error {
var resp = C.allocResp()
var cPeerId = C.CString(dialPeerByIDReq.peerID.String())
var cProtocol = C.CString(string(dialPeerByIDReq.protocol))
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerId))
defer C.free(unsafe.Pointer(cProtocol))
// TODO: extract timeout from ctx
C.cGoWakuDialPeerById(n.wakuCtx, cPeerId, cProtocol, C.int(time.Minute.Milliseconds()), resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error DialPeerById: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuListenAddresses(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
var addrsRet []multiaddr.Multiaddr
listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
addrss := strings.Split(listenAddresses, ",")
for _, addr := range addrss {
addr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return nil, err
}
addrsRet = append(addrsRet, addr)
}
return addrsRet, nil
}
errMsg := "error WakuListenAddresses: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, errors.New(errMsg)
}
func (n *WakuNode) enr() (*enode.Node, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuGetMyENR(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
n, err := enode.Parse(enode.ValidSchemes, enrStr)
if err != nil {
return nil, err
}
return n, nil
}
errMsg := "error WakuGetMyENR: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, errors.New(errMsg)
}
func (n *WakuNode) listPeersInMesh(pubsubTopic string) (int, error) {
var resp = C.allocResp()
var cPubsubTopic = C.CString(pubsubTopic)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPubsubTopic))
C.cGoWakuListPeersInMesh(n.wakuCtx, cPubsubTopic, resp)
if C.getRet(resp) == C.RET_OK {
numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
numPeers, err := strconv.Atoi(numPeersStr)
if err != nil {
errMsg := "ListPeersInMesh - error converting string to int: " + err.Error()
return 0, errors.New(errMsg)
}
return numPeers, nil
}
errMsg := "error ListPeersInMesh: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return 0, errors.New(errMsg)
}
func (n *WakuNode) getPeerIDsFromPeerStore() (peer.IDSlice, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuGetPeerIdsFromPeerStore(n.wakuCtx, resp)
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
return nil, nil
}
// peersStr contains a comma-separated list of peer ids
itemsPeerIds := strings.Split(peersStr, ",")
var peers peer.IDSlice
for _, peerId := range itemsPeerIds {
id, err := peer.Decode(peerId)
if err != nil {
return nil, fmt.Errorf("GetPeerIdsFromPeerStore - decoding peerId: %w", err)
}
peers = append(peers, id)
}
return peers, nil
}
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg)
}
func (n *WakuNode) getPeerIDsByProtocol(protocolID libp2pproto.ID) (peer.IDSlice, error) {
var resp = C.allocResp()
var cProtocol = C.CString(string(protocolID))
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cProtocol))
C.cGoWakuGetPeerIdsByProtocol(n.wakuCtx, cProtocol, resp)
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
return nil, nil
}
// peersStr contains a comma-separated list of peer ids
itemsPeerIds := strings.Split(peersStr, ",")
var peers peer.IDSlice
for _, p := range itemsPeerIds {
id, err := peer.Decode(p)
if err != nil {
return nil, fmt.Errorf("GetPeerIdsByProtocol - decoding peerId: %w", err)
}
peers = append(peers, id)
}
return peers, nil
}
errMsg := "error GetPeerIdsByProtocol: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, fmt.Errorf("GetPeerIdsByProtocol: %s", errMsg)
}
func (n *WakuNode) newNode(config *WakuConfig) error {
jsonConfig, err := json.Marshal(config)
if err != nil {
@ -916,3 +1155,260 @@ func (n *WakuNode) newNode(config *WakuConfig) error {
return nil
}
func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error {
var resp = C.allocResp()
var cPeerMultiAddr = C.CString(dialPeerReq.peerAddr.String())
var cProtocol = C.CString(string(dialPeerReq.protocol))
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerMultiAddr))
defer C.free(unsafe.Pointer(cProtocol))
// TODO: extract timeout from context
C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(requestTimeout.Milliseconds()), resp)
if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error DialPeer: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}
type pingRequest struct {
ctx context.Context
peerInfo peer.AddrInfo
}
func (n *WakuNode) PingPeer(ctx context.Context, info peer.AddrInfo) (time.Duration, error) {
response, err := n.postTask(requestTypePing, pingRequest{
ctx: ctx,
peerInfo: info,
})
if err != nil {
return 0, err
}
return response.(time.Duration), nil
}
func (n *WakuNode) Start() error {
_, err := n.postTask(requestTypeStart, nil)
return err
}
type relayPublishRequest struct {
ctx context.Context
pubsubTopic string
message *pb.WakuMessage
}
func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
response, err := n.postTask(requestTypeRelayPublish, relayPublishRequest{
ctx: ctx,
pubsubTopic: pubsubTopic,
message: message,
})
if err != nil {
return pb.MessageHash{}, err
}
return response.(pb.MessageHash), nil
}
type storeQueryRequest struct {
ctx context.Context
storeRequest *storepb.StoreQueryRequest
peerInfo peer.AddrInfo
}
func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) {
response, err := n.postTask(requestTypeStoreQuery, storeQueryRequest{
ctx: ctx,
peerInfo: peerInfo,
storeRequest: storeRequest,
})
if err != nil {
return nil, err
}
return response.(*storepb.StoreQueryResponse), nil
}
func (n *WakuNode) PeerID() (peer.ID, error) {
response, err := n.postTask(requestTypePeerID, nil)
if err != nil {
return "", err
}
return response.(peer.ID), nil
}
func (n *WakuNode) Stop() error {
_, err := n.postTask(requestTypeStop, nil)
return err
}
func (n *WakuNode) Destroy() error {
_, err := n.postTask(requestTypeDestroy, nil)
return err
}
func (n *WakuNode) StartDiscV5() error {
_, err := n.postTask(requestTypeStartDiscV5, nil)
return err
}
func (n *WakuNode) StopDiscV5() error {
_, err := n.postTask(requestTypeStopDiscV5, nil)
return err
}
func (n *WakuNode) Version() (string, error) {
response, err := n.postTask(requestTypeVersion, nil)
if err != nil {
return "", err
}
return response.(string), nil
}
func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
_, err := n.postTask(requestTypeRelaySubscribe, pubsubTopic)
return err
}
func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error {
_, err := n.postTask(requestTypeRelayUnsubscribe, pubsubTopic)
return err
}
func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) {
response, err := n.postTask(requestTypePeerExchangeRequest, numPeers)
if err != nil {
return 0, err
}
return response.(uint64), nil
}
type connectRequest struct {
ctx context.Context
addr multiaddr.Multiaddr
}
func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error {
_, err := n.postTask(requestTypeConnect, connectRequest{
ctx: ctx,
addr: addr,
})
return err
}
type dialPeerByIDRequest struct {
ctx context.Context
peerID peer.ID
protocol libp2pproto.ID
}
func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error {
_, err := n.postTask(requestTypeDialPeerByID, dialPeerByIDRequest{
ctx: ctx,
peerID: peerID,
protocol: protocol,
})
return err
}
func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) {
response, err := n.postTask(requestTypeListenAddresses, nil)
if err != nil {
return nil, err
}
return response.([]multiaddr.Multiaddr), nil
}
func (n *WakuNode) ENR() (*enode.Node, error) {
response, err := n.postTask(requestTypeENR, nil)
if err != nil {
return nil, err
}
return response.(*enode.Node), nil
}
func (n *WakuNode) ListPeersInMesh(pubsubTopic string) (int, error) {
response, err := n.postTask(requestTypeListPeersInMesh, pubsubTopic)
if err != nil {
return 0, err
}
return response.(int), nil
}
func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
response, err := n.postTask(requestTypeGetConnectedPeers, nil)
if err != nil {
return nil, err
}
return response.(peer.IDSlice), nil
}
func (n *WakuNode) GetNumConnectedPeers() (int, error) {
peers, err := n.GetConnectedPeers()
if err != nil {
return 0, err
}
return len(peers), nil
}
func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) {
response, err := n.postTask(requestTypeGetPeerIDsFromPeerStore, nil)
if err != nil {
return nil, err
}
return response.(peer.IDSlice), nil
}
func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) {
response, err := n.postTask(requestTypeGetPeerIDsByProtocol, protocol)
if err != nil {
return nil, err
}
return response.(peer.IDSlice), nil
}
func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error {
_, err := n.postTask(requestTypeDisconnectPeerByID, peerID)
return err
}
type dnsDiscoveryRequest struct {
ctx context.Context
enrTreeUrl string
nameDnsServer string
}
func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) {
response, err := n.postTask(requestTypeDnsDiscovery, dnsDiscoveryRequest{
ctx: ctx,
enrTreeUrl: enrTreeUrl,
nameDnsServer: nameDnsServer,
})
if err != nil {
return nil, err
}
return response.([]multiaddr.Multiaddr), nil
}
type dialPeerRequest struct {
ctx context.Context
peerAddr multiaddr.Multiaddr
protocol libp2pproto.ID
}
func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error {
_, err := n.postTask(requestTypeDialPeer, dialPeerRequest{
ctx: ctx,
peerAddr: peerAddr,
protocol: protocol,
})
return err
}
func (n *WakuNode) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) {
response, err := n.postTask(requestTypeGetNumConnectedRelayPeers, paramPubsubTopic)
if err != nil {
return 0, err
}
return response.(int), nil
}