From c8284a342ecb95494b7518bdb110ab6234ad27b7 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 27 Nov 2024 16:28:27 +0200 Subject: [PATCH] more compilation errors --- nwaku.go | 187 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) diff --git a/nwaku.go b/nwaku.go index b38a123..46d0aa5 100644 --- a/nwaku.go +++ b/nwaku.go @@ -311,10 +311,16 @@ import "C" import ( "context" "errors" + "fmt" "runtime" + "strconv" + "strings" + "time" "unsafe" "github.com/libp2p/go-libp2p/core/peer" + libp2pproto "github.com/libp2p/go-libp2p/core/protocol" + "github.com/multiformats/go-multiaddr" "go.uber.org/zap" ) @@ -338,6 +344,19 @@ type WakuConfig struct { TcpPort uint16 `json:"tcpPort,omitempty"` } +// Waku represents a dark communication interface through the Ethereum +// network, using its very own P2P communication layer. +type Waku struct { + node *WakuNode + + ctx context.Context + cancel context.CancelFunc + + wakuCfg *WakuConfig + + logger *zap.Logger +} + type request struct { id string reqType requestType @@ -531,3 +550,171 @@ func (n *WakuNode) processLoop(ctx context.Context) { } } } + +// Start implements node.Service, starting the background data propagation thread +// of the Waku protocol. +func (w *Waku) Start() error { + err := w.node.Start() + if err != nil { + return fmt.Errorf("failed to start nwaku node: %v", err) + } + + peerID, err := w.node.PeerID() + if err != nil { + return err + } + + w.logger.Info("WakuV2 PeerID", zap.Stringer("id", peerID)) + + return nil +} + +func (n *WakuNode) listenAddresses() ([]multiaddr.Multiaddr, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuListenAddresses(n.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + var addrsRet []multiaddr.Multiaddr + listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + addrss := strings.Split(listenAddresses, ",") + for _, addr := range addrss { + addr, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return nil, err + } + addrsRet = append(addrsRet, addr) + } + return addrsRet, nil + } + errMsg := "error WakuListenAddresses: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) +} + +func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { + response, err := n.postTask(requestTypeListenAddresses, nil) + if err != nil { + return nil, err + } + return response.([]multiaddr.Multiaddr), nil +} + +func (w *Waku) PeerCount() (int, error) { + return w.node.GetNumConnectedPeers() +} + +func (n *WakuNode) getNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) { + var pubsubTopic string + if len(optPubsubTopic) == 0 { + pubsubTopic = "" + } else { + pubsubTopic = optPubsubTopic[0] + } + + var resp = C.allocResp() + var cPubsubTopic = C.CString(pubsubTopic) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPubsubTopic)) + + C.cGoWakuGetNumConnectedRelayPeers(n.wakuCtx, cPubsubTopic, resp) + + if C.getRet(resp) == C.RET_OK { + numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + numPeers, err := strconv.Atoi(numPeersStr) + if err != nil { + errMsg := "GetNumConnectedRelayPeers - error converting string to int: " + err.Error() + return 0, errors.New(errMsg) + } + return numPeers, nil + } + errMsg := "error GetNumConnectedRelayPeers: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return 0, errors.New(errMsg) +} + +func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { + response, err := n.postTask(requestTypeGetConnectedPeers, nil) + if err != nil { + return nil, err + } + return response.(peer.IDSlice), nil +} + +func (n *WakuNode) GetNumConnectedPeers() (int, error) { + peers, err := n.GetConnectedPeers() + if err != nil { + return 0, err + } + return len(peers), nil +} + +func (n *WakuNode) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) { + response, err := n.postTask(requestTypeGetNumConnectedRelayPeers, paramPubsubTopic) + if err != nil { + return 0, err + } + return response.(int), nil +} + +type dialPeerRequest struct { + ctx context.Context + peerAddr multiaddr.Multiaddr + protocol libp2pproto.ID +} + +func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { + _, err := n.postTask(requestTypeDialPeer, dialPeerRequest{ + ctx: ctx, + peerAddr: peerAddr, + protocol: protocol, + }) + return err +} + +func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { + // Using WakuConnect so it matches the go-waku's behavior and terminology + ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) + defer cancel() + return w.node.Connect(ctx, address) +} + +func (n *WakuNode) dialPeer(dialPeerReq dialPeerRequest) error { + var resp = C.allocResp() + var cPeerMultiAddr = C.CString(dialPeerReq.peerAddr.String()) + var cProtocol = C.CString(string(dialPeerReq.protocol)) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerMultiAddr)) + defer C.free(unsafe.Pointer(cProtocol)) + // TODO: extract timeout from context + C.cGoWakuDialPeer(n.wakuCtx, cPeerMultiAddr, cProtocol, C.int(requestTimeout.Milliseconds()), resp) + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DialPeer: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + +func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { + _, err := n.postTask(requestTypeConnect, connectRequest{ + ctx: ctx, + addr: addr, + }) + return err +} + +func (n *WakuNode) connect(connReq connectRequest) error { + var resp = C.allocResp() + var cPeerMultiAddr = C.CString(connReq.addr.String()) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerMultiAddr)) + + // TODO: extract timeout from ctx + C.cGoWakuConnect(n.wakuCtx, cPeerMultiAddr, C.int(time.Minute.Milliseconds()), resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error WakuConnect: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +}