From 298aceb799760cfb093fa3b175c48367ec0d79ed Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 4 Dec 2024 00:07:46 +0100 Subject: [PATCH] porting rest of PR - tests not passing --- waku/nwaku.go | 241 ++------------------------------------------- waku/nwaku_test.go | 10 +- 2 files changed, 12 insertions(+), 239 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index f6c7d65..8c57d72 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -667,11 +667,15 @@ func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { return errors.New(errMsg) } -func (n *WakuNode) peerExchangeRequest(numPeers uint64) (uint64, error) { - var resp = C.allocResp() +func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) + wg.Add(1) C.cGoWakuPeerExchangeQuery(n.wakuCtx, C.uint64_t(numPeers), resp) + wg.Wait() if C.getRet(resp) == C.RET_OK { numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) @@ -1148,177 +1152,6 @@ func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, p return errors.New(errMsg) } -type pingRequest struct { - ctx context.Context - peerInfo peer.AddrInfo -} - -func (n *WakuNode) PingPeer(ctx context.Context, info peer.AddrInfo) (time.Duration, error) { - response, err := n.postTask(requestTypePing, pingRequest{ - ctx: ctx, - peerInfo: info, - }) - if err != nil { - return 0, err - } - return response.(time.Duration), nil -} - -func (n *WakuNode) Start() error { - _, err := n.postTask(requestTypeStart, nil) - return err -} - -type relayPublishRequest struct { - ctx context.Context - pubsubTopic string - message *pb.WakuMessage -} - -func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { - response, err := n.postTask(requestTypeRelayPublish, relayPublishRequest{ - ctx: ctx, - pubsubTopic: pubsubTopic, - message: message, - }) - if err != nil { - return pb.MessageHash{}, err - } - return response.(pb.MessageHash), nil -} - -type storeQueryRequest struct { - ctx context.Context - storeRequest *storepb.StoreQueryRequest - peerInfo peer.AddrInfo -} - -func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { - response, err := n.postTask(requestTypeStoreQuery, storeQueryRequest{ - ctx: ctx, - peerInfo: peerInfo, - storeRequest: storeRequest, - }) - if err != nil { - return nil, err - } - return response.(*storepb.StoreQueryResponse), nil -} - -func (n *WakuNode) PeerID() (peer.ID, error) { - response, err := n.postTask(requestTypePeerID, nil) - if err != nil { - return "", err - } - return response.(peer.ID), nil -} - -func (n *WakuNode) Stop() error { - _, err := n.postTask(requestTypeStop, nil) - return err -} - -func (n *WakuNode) Destroy() error { - _, err := n.postTask(requestTypeDestroy, nil) - return err -} - -func (n *WakuNode) StartDiscV5() error { - _, err := n.postTask(requestTypeStartDiscV5, nil) - return err -} - -func (n *WakuNode) StopDiscV5() error { - _, err := n.postTask(requestTypeStopDiscV5, nil) - return err -} - -func (n *WakuNode) Version() (string, error) { - response, err := n.postTask(requestTypeVersion, nil) - if err != nil { - return "", err - } - return response.(string), nil -} - -func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { - _, err := n.postTask(requestTypeRelaySubscribe, pubsubTopic) - return err -} - -func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { - _, err := n.postTask(requestTypeRelayUnsubscribe, pubsubTopic) - return err -} - -func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { - response, err := n.postTask(requestTypePeerExchangeRequest, numPeers) - if err != nil { - return 0, err - } - return response.(uint64), nil -} - -type connectRequest struct { - ctx context.Context - addr multiaddr.Multiaddr -} - -func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error { - _, err := n.postTask(requestTypeConnect, connectRequest{ - ctx: ctx, - addr: addr, - }) - return err -} - -type dialPeerByIDRequest struct { - ctx context.Context - peerID peer.ID - protocol libp2pproto.ID -} - -func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error { - _, err := n.postTask(requestTypeDialPeerByID, dialPeerByIDRequest{ - ctx: ctx, - peerID: peerID, - protocol: protocol, - }) - return err -} - -func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) { - response, err := n.postTask(requestTypeListenAddresses, nil) - if err != nil { - return nil, err - } - return response.([]multiaddr.Multiaddr), nil -} - -func (n *WakuNode) ENR() (*enode.Node, error) { - response, err := n.postTask(requestTypeENR, nil) - if err != nil { - return nil, err - } - return response.(*enode.Node), nil -} - -func (n *WakuNode) GetNumPeersInMesh(pubsubTopic string) (int, error) { - response, err := n.postTask(requestTypeGetNumPeersInMesh, pubsubTopic) - if err != nil { - return 0, err - } - return response.(int), nil -} - -func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetConnectedPeers, nil) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - func (n *WakuNode) GetNumConnectedPeers() (int, error) { peers, err := n.GetConnectedPeers() if err != nil { @@ -1326,65 +1159,3 @@ func (n *WakuNode) GetNumConnectedPeers() (int, error) { } return len(peers), nil } - -func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetPeerIDsFromPeerStore, nil) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - -func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) { - response, err := n.postTask(requestTypeGetPeerIDsByProtocol, protocol) - if err != nil { - return nil, err - } - return response.(peer.IDSlice), nil -} - -func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { - _, err := n.postTask(requestTypeDisconnectPeerByID, peerID) - return err -} - -type dnsDiscoveryRequest struct { - ctx context.Context - enrTreeUrl string - nameDnsServer string -} - -func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { - response, err := n.postTask(requestTypeDnsDiscovery, dnsDiscoveryRequest{ - ctx: ctx, - enrTreeUrl: enrTreeUrl, - nameDnsServer: nameDnsServer, - }) - if err != nil { - return nil, err - } - return response.([]multiaddr.Multiaddr), nil -} - -type dialPeerRequest struct { - ctx context.Context - peerAddr multiaddr.Multiaddr - protocol libp2pproto.ID -} - -func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error { - _, err := n.postTask(requestTypeDialPeer, dialPeerRequest{ - ctx: ctx, - peerAddr: peerAddr, - protocol: protocol, - }) - return err -} - -func (n *WakuNode) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) { - response, err := n.postTask(requestTypeGetNumConnectedRelayPeers, paramPubsubTopic) - if err != nil { - return 0, err - } - return response.(int), nil -} diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index f368628..9b9736d 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -31,6 +31,8 @@ func TestBasicWaku(t *testing.T) { storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort) require.NoError(t, err) + // ctx := context.Background() + nwakuConfig := WakuConfig{ Port: 30303, NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", @@ -77,11 +79,11 @@ func TestBasicWaku(t *testing.T) { storeNode, err := peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) - /* for i := 0; i <= 100; i++ { - time.Sleep(2 * time.Second) - } + /* + w.node.DialPeer(ctx, storeNode.Addrs[0], "") - w.StorenodeCycle.SetStorenodeConfigProvider(newTestStorenodeConfigProvider(*storeNode)) */ + w.StorenodeCycle.SetStorenodeConfigProvider(newTestStorenodeConfigProvider(*storeNode)) + */ // Check that we are indeed connected to the store node connectedStoreNodes, err := w.node.GetPeerIDsByProtocol(store.StoreQueryID_v300)