diff --git a/waku/nodes_basic_test.go b/waku/nodes_basic_test.go new file mode 100644 index 0000000..e69de29 diff --git a/waku/nwaku.go b/waku/nwaku.go index e3dc669..b3ff294 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -334,6 +334,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/waku-go-bindings/waku/common" "go.uber.org/zap" + "google.golang.org/protobuf/proto" ) const requestTimeout = 30 * time.Second @@ -683,7 +684,9 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { if pubsubTopic == "" { - return errors.New("pubsub topic is empty") + err := errors.New("pubsub topic is empty") + Error("Failed to subscribe to relay: %v", err) + return err } wg := sync.WaitGroup{} @@ -695,18 +698,24 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { defer C.free(unsafe.Pointer(cPubsubTopic)) if n.wakuCtx == nil { + err := errors.New("wakuCtx is nil") + Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err) return errors.New("wakuCtx is nil") } wg.Add(1) + Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + + Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) return nil } errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) return errors.New(errMsg) } @@ -743,7 +752,9 @@ func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubk func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { if pubsubTopic == "" { - return errors.New("pubsub topic is empty") + err := errors.New("pubsub topic is empty") + Error("Failed to unsubscribe from relay: %v", err) + return err } wg := sync.WaitGroup{} @@ -759,15 +770,19 @@ func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { } wg.Add(1) + Debug("Attempting to unsubscribe from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + + Debug("Successfully unsubscribed from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) return nil } - errMsg := "error WakuRelayUnsubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to unsubscribe from relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) + return errors.New("error WakuRelayUnsubscribe: " + errMsg) } func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { @@ -783,18 +798,21 @@ func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) if err != nil { + Error("Failed to parse number of received peers: %v", err) return 0, err } return numRecvPeers, nil } errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("PeerExchangeRequest failed: %v", errMsg) return 0, errors.New(errMsg) } func (n *WakuNode) StartDiscV5() error { - wg := sync.WaitGroup{} + Debug("Starting DiscV5 for node: %s", n.nodeName) + wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) @@ -802,9 +820,11 @@ func (n *WakuNode) StartDiscV5() error { C.cGoWakuStartDiscV5(n.wakuCtx, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + Debug("Successfully started DiscV5 for node: %s", n.nodeName) return nil } errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to start DiscV5 for node %s: %v", n.nodeName, errMsg) return errors.New(errMsg) } @@ -819,9 +839,11 @@ func (n *WakuNode) StopDiscV5() error { wg.Wait() if C.getRet(resp) == C.RET_OK { + Debug("Successfully stopped DiscV5 for node: %s", n.nodeName) return nil } errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to stop DiscV5 for node %s: %v", n.nodeName, errMsg) return errors.New(errMsg) } @@ -837,11 +859,13 @@ func (n *WakuNode) Version() (string, error) { if C.getRet(resp) == C.RET_OK { var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Debug("Successfully fetched Waku version for node %s: %s", n.nodeName, version) return version, nil } errMsg := "error WakuVersion: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to fetch Waku version for node %s: %v", n.nodeName, errMsg) return "", errors.New(errMsg) } @@ -918,6 +942,29 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu return common.MessageHash(""), errors.New(errMsg) } +func (n *WakuNode) RelayPublishNoCTX(pubsubTopic string, message *pb.WakuMessage) (common.MessageHash, error) { + if n == nil { + err := errors.New("cannot publish message; node is nil") + Error("Failed to publish message via relay: %v", err) + return "", err + } + + // Handling context internally with a timeout + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + Debug("Attempting to publish message via relay on node %s", n.nodeName) + + msgHash, err := n.RelayPublish(ctx, message, pubsubTopic) + if err != nil { + Error("Failed to publish message via relay on node %s: %v", n.nodeName, err) + return "", err + } + + Debug("Successfully published message via relay on node %s, messageHash: %s", n.nodeName, msgHash.String()) + return msgHash, nil +} + func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { wg := sync.WaitGroup{} @@ -1463,3 +1510,70 @@ func (n *WakuNode) DisconnectPeer(target *WakuNode) error { Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName) return nil } + +func ConnectAllPeers(nodes []*WakuNode) error { + Debug("Connecting nodes in a relay chain") + + for i := 0; i < len(nodes)-1; i++ { + Debug("Connecting node %d to node %d", i, i+1) + err := nodes[i].ConnectPeer(nodes[i+1]) + if err != nil { + Error("Failed to connect node %d to node %d: %v", i, i+1, err) + return err + } + } + + time.Sleep(4 * time.Second) + Debug("Waiting for connections to stabilize") + + return nil +} + +func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash) error { + Debug("Verifying if the message was received on node %s", n.nodeName) + + select { + case envelope := <-n.MsgChan: + if envelope == nil { + Error("Received envelope is nil on node %s", n.nodeName) + return errors.New("received envelope is nil") + } + if string(expectedMessage.Payload) != string(envelope.Message().Payload) { + Error("Payload does not match on node %s", n.nodeName) + return errors.New("payload does not match") + } + if expectedMessage.ContentTopic != envelope.Message().ContentTopic { + Error("Content topic does not match on node %s", n.nodeName) + return errors.New("content topic does not match") + } + if expectedHash != envelope.Hash() { + Error("Message hash does not match on node %s", n.nodeName) + return errors.New("message hash does not match") + } + Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload)) + return nil + case <-time.After(5 * time.Second): + Error("Timeout: message not received within 5 seconds on node %s", n.nodeName) + return errors.New("timeout: message not received within 5 seconds") + } +} + +func (n *WakuNode) CreateMessage(customMessage ...*pb.WakuMessage) *pb.WakuMessage { + Debug("Creating a WakuMessage on node %s", n.nodeName) + + if len(customMessage) > 0 && customMessage[0] != nil { + Debug("Using provided custom message on node %s", n.nodeName) + return customMessage[0] + } + + Debug("Using default message format on node %s", n.nodeName) + defaultMessage := &pb.WakuMessage{ + Payload: []byte("This is a default Waku message payload"), + ContentTopic: "test-content-topic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().UnixNano()), + } + + Debug("Successfully created a default WakuMessage on node %s", n.nodeName) + return defaultMessage +}