Adding additional APIs

This commit is contained in:
aya 2025-02-12 12:57:38 +02:00
parent 1187190ee0
commit 0b007a2202
2 changed files with 119 additions and 5 deletions

0
waku/nodes_basic_test.go Normal file
View File

View File

@ -334,6 +334,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/waku-go-bindings/waku/common"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
const requestTimeout = 30 * time.Second
@ -683,7 +684,9 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
if pubsubTopic == "" {
return errors.New("pubsub topic is empty")
err := errors.New("pubsub topic is empty")
Error("Failed to subscribe to relay: %v", err)
return err
}
wg := sync.WaitGroup{}
@ -695,18 +698,24 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
defer C.free(unsafe.Pointer(cPubsubTopic))
if n.wakuCtx == nil {
err := errors.New("wakuCtx is nil")
Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err)
return errors.New("wakuCtx is nil")
}
wg.Add(1)
Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
return nil
}
errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg)
return errors.New(errMsg)
}
@ -743,7 +752,9 @@ func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubk
func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error {
if pubsubTopic == "" {
return errors.New("pubsub topic is empty")
err := errors.New("pubsub topic is empty")
Error("Failed to unsubscribe from relay: %v", err)
return err
}
wg := sync.WaitGroup{}
@ -759,15 +770,19 @@ func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error {
}
wg.Add(1)
Debug("Attempting to unsubscribe from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
Debug("Successfully unsubscribed from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
return nil
}
errMsg := "error WakuRelayUnsubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to unsubscribe from relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg)
return errors.New("error WakuRelayUnsubscribe: " + errMsg)
}
func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) {
@ -783,18 +798,21 @@ func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) {
numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64)
if err != nil {
Error("Failed to parse number of received peers: %v", err)
return 0, err
}
return numRecvPeers, nil
}
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("PeerExchangeRequest failed: %v", errMsg)
return 0, errors.New(errMsg)
}
func (n *WakuNode) StartDiscV5() error {
wg := sync.WaitGroup{}
Debug("Starting DiscV5 for node: %s", n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
@ -802,9 +820,11 @@ func (n *WakuNode) StartDiscV5() error {
C.cGoWakuStartDiscV5(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
Debug("Successfully started DiscV5 for node: %s", n.nodeName)
return nil
}
errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to start DiscV5 for node %s: %v", n.nodeName, errMsg)
return errors.New(errMsg)
}
@ -819,9 +839,11 @@ func (n *WakuNode) StopDiscV5() error {
wg.Wait()
if C.getRet(resp) == C.RET_OK {
Debug("Successfully stopped DiscV5 for node: %s", n.nodeName)
return nil
}
errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to stop DiscV5 for node %s: %v", n.nodeName, errMsg)
return errors.New(errMsg)
}
@ -837,11 +859,13 @@ func (n *WakuNode) Version() (string, error) {
if C.getRet(resp) == C.RET_OK {
var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Debug("Successfully fetched Waku version for node %s: %s", n.nodeName, version)
return version, nil
}
errMsg := "error WakuVersion: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to fetch Waku version for node %s: %v", n.nodeName, errMsg)
return "", errors.New(errMsg)
}
@ -918,6 +942,29 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu
return common.MessageHash(""), errors.New(errMsg)
}
func (n *WakuNode) RelayPublishNoCTX(pubsubTopic string, message *pb.WakuMessage) (common.MessageHash, error) {
if n == nil {
err := errors.New("cannot publish message; node is nil")
Error("Failed to publish message via relay: %v", err)
return "", err
}
// Handling context internally with a timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Debug("Attempting to publish message via relay on node %s", n.nodeName)
msgHash, err := n.RelayPublish(ctx, message, pubsubTopic)
if err != nil {
Error("Failed to publish message via relay on node %s: %v", n.nodeName, err)
return "", err
}
Debug("Successfully published message via relay on node %s, messageHash: %s", n.nodeName, msgHash.String())
return msgHash, nil
}
func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) {
wg := sync.WaitGroup{}
@ -1463,3 +1510,70 @@ func (n *WakuNode) DisconnectPeer(target *WakuNode) error {
Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName)
return nil
}
func ConnectAllPeers(nodes []*WakuNode) error {
Debug("Connecting nodes in a relay chain")
for i := 0; i < len(nodes)-1; i++ {
Debug("Connecting node %d to node %d", i, i+1)
err := nodes[i].ConnectPeer(nodes[i+1])
if err != nil {
Error("Failed to connect node %d to node %d: %v", i, i+1, err)
return err
}
}
time.Sleep(4 * time.Second)
Debug("Waiting for connections to stabilize")
return nil
}
func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash) error {
Debug("Verifying if the message was received on node %s", n.nodeName)
select {
case envelope := <-n.MsgChan:
if envelope == nil {
Error("Received envelope is nil on node %s", n.nodeName)
return errors.New("received envelope is nil")
}
if string(expectedMessage.Payload) != string(envelope.Message().Payload) {
Error("Payload does not match on node %s", n.nodeName)
return errors.New("payload does not match")
}
if expectedMessage.ContentTopic != envelope.Message().ContentTopic {
Error("Content topic does not match on node %s", n.nodeName)
return errors.New("content topic does not match")
}
if expectedHash != envelope.Hash() {
Error("Message hash does not match on node %s", n.nodeName)
return errors.New("message hash does not match")
}
Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload))
return nil
case <-time.After(5 * time.Second):
Error("Timeout: message not received within 5 seconds on node %s", n.nodeName)
return errors.New("timeout: message not received within 5 seconds")
}
}
func (n *WakuNode) CreateMessage(customMessage ...*pb.WakuMessage) *pb.WakuMessage {
Debug("Creating a WakuMessage on node %s", n.nodeName)
if len(customMessage) > 0 && customMessage[0] != nil {
Debug("Using provided custom message on node %s", n.nodeName)
return customMessage[0]
}
Debug("Using default message format on node %s", n.nodeName)
defaultMessage := &pb.WakuMessage{
Payload: []byte("This is a default Waku message payload"),
ContentTopic: "test-content-topic",
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().UnixNano()),
}
Debug("Successfully created a default WakuMessage on node %s", n.nodeName)
return defaultMessage
}