diff --git a/waku/nwaku.go b/waku/nwaku.go index 1f29fd6..a150ab3 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -56,6 +56,15 @@ package waku // resp must be set != NULL in case interest on retrieving data from the callback void GoCallback(int ret, char* msg, size_t len, void* resp); + #define WAKU_CALL(call) \ + do { \ + int ret = call; \ + if (ret != 0) { \ + printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ + exit(1); \ + } \ + } while (0) + static void* cGoWakuNew(const char* configJson, void* resp) { // We pass NULL because we are not interested in retrieving data from this callback void* ret = waku_new(configJson, (WakuCallBack) GoCallback, resp); @@ -63,27 +72,27 @@ package waku } static void cGoWakuStart(void* wakuCtx, void* resp) { - waku_start(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStop(void* wakuCtx, void* resp) { - waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuDestroy(void* wakuCtx, void* resp) { - waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { - waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { - waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuVersion(void* wakuCtx, void* resp) { - waku_version(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuSetEventCallback(void* wakuCtx) { @@ -109,21 +118,21 @@ package waku char* encoding, void* resp) { - waku_content_topic(wakuCtx, + WAKU_CALL( waku_content_topic(wakuCtx, appName, appVersion, contentTopicName, encoding, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { - waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp); + WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { - waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp)); } static void cGoWakuRelayPublish(void* wakuCtx, @@ -132,44 +141,44 @@ package waku int timeoutMs, void* resp) { - waku_relay_publish(wakuCtx, + WAKU_CALL (waku_relay_publish(wakuCtx, pubSubTopic, jsonWakuMessage, timeoutMs, (WakuCallBack) GoCallback, - resp); + resp)); } static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { - waku_relay_subscribe(wakuCtx, + WAKU_CALL ( waku_relay_subscribe(wakuCtx, pubSubTopic, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuRelayAddProtectedShard(void* wakuCtx, int clusterId, int shardId, char* publicKey, void* resp) { - waku_relay_add_protected_shard(wakuCtx, + WAKU_CALL ( waku_relay_add_protected_shard(wakuCtx, clusterId, shardId, publicKey, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { - waku_relay_unsubscribe(wakuCtx, + WAKU_CALL ( waku_relay_unsubscribe(wakuCtx, pubSubTopic, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) { - waku_connect(wakuCtx, + WAKU_CALL( waku_connect(wakuCtx, peerMultiAddr, timeoutMs, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuDialPeer(void* wakuCtx, @@ -178,12 +187,12 @@ package waku int timeoutMs, void* resp) { - waku_dial_peer(wakuCtx, + WAKU_CALL( waku_dial_peer(wakuCtx, peerMultiAddr, protocol, timeoutMs, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuDialPeerById(void* wakuCtx, @@ -192,51 +201,51 @@ package waku int timeoutMs, void* resp) { - waku_dial_peer_by_id(wakuCtx, + WAKU_CALL( waku_dial_peer_by_id(wakuCtx, peerId, protocol, timeoutMs, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { - waku_disconnect_peer_by_id(wakuCtx, + WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx, peerId, (WakuCallBack) GoCallback, - resp); + resp) ); } static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { - waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetMyENR(void* ctx, void* resp) { - waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetMyPeerId(void* ctx, void* resp) { - waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) { - waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { - waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { - waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { - waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { - waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp); + WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp) ); } static void cGoWakuLightpushPublish(void* wakuCtx, @@ -244,11 +253,11 @@ package waku const char* jsonWakuMessage, void* resp) { - waku_lightpush_publish(wakuCtx, + WAKU_CALL (waku_lightpush_publish(wakuCtx, pubSubTopic, jsonWakuMessage, (WakuCallBack) GoCallback, - resp); + resp)); } static void cGoWakuStoreQuery(void* wakuCtx, @@ -257,32 +266,32 @@ package waku int timeoutMs, void* resp) { - waku_store_query(wakuCtx, - jsonQuery, - peerAddr, - timeoutMs, - (WakuCallBack) GoCallback, - resp); + WAKU_CALL (waku_store_query(wakuCtx, + jsonQuery, + peerAddr, + timeoutMs, + (WakuCallBack) GoCallback, + resp)); } static void cGoWakuPeerExchangeQuery(void* wakuCtx, uint64_t numPeers, void* resp) { - waku_peer_exchange_request(wakuCtx, + WAKU_CALL (waku_peer_exchange_request(wakuCtx, numPeers, (WakuCallBack) GoCallback, - resp); + resp)); } static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx, const char* protocol, void* resp) { - waku_get_peerids_by_protocol(wakuCtx, + WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx, protocol, (WakuCallBack) GoCallback, - resp); + resp)); } static void cGoWakuDnsDiscovery(void* wakuCtx, @@ -291,12 +300,12 @@ package waku int timeoutMs, void* resp) { - waku_dns_discovery(wakuCtx, - entTreeUrl, - nameDnsServer, - timeoutMs, - (WakuCallBack) GoCallback, - resp); + WAKU_CALL (waku_dns_discovery(wakuCtx, + entTreeUrl, + nameDnsServer, + timeoutMs, + (WakuCallBack) GoCallback, + resp)); } */ @@ -325,12 +334,89 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/waku-go-bindings/waku/common" "go.uber.org/zap" + "google.golang.org/protobuf/proto" ) const requestTimeout = 30 * time.Second -const MsgChanBufferSize = 1024 -const TopicHealthChanBufferSize = 1024 -const ConnectionChangeChanBufferSize = 1024 +const MsgChanBufferSize = 100 +const TopicHealthChanBufferSize = 100 +const ConnectionChangeChanBufferSize = 100 + +type WakuConfig struct { + Host string `json:"host,omitempty"` + Nodekey string `json:"nodekey,omitempty"` + Relay bool `json:"relay"` + Store bool `json:"store,omitempty"` + LegacyStore bool `json:"legacyStore"` + Storenode string `json:"storenode,omitempty"` + StoreMessageRetentionPolicy string `json:"storeMessageRetentionPolicy,omitempty"` + StoreMessageDbUrl string `json:"storeMessageDbUrl,omitempty"` + StoreMessageDbVacuum bool `json:"storeMessageDbVacuum,omitempty"` + StoreMaxNumDbConnections int `json:"storeMaxNumDbConnections,omitempty"` + StoreResume bool `json:"storeResume,omitempty"` + Filter bool `json:"filter,omitempty"` + Filternode string `json:"filternode,omitempty"` + FilterSubscriptionTimeout int64 `json:"filterSubscriptionTimeout,omitempty"` + FilterMaxPeersToServe uint32 `json:"filterMaxPeersToServe,omitempty"` + FilterMaxCriteria uint32 `json:"filterMaxCriteria,omitempty"` + Lightpush bool `json:"lightpush,omitempty"` + LightpushNode string `json:"lightpushnode,omitempty"` + LogLevel string `json:"logLevel,omitempty"` + DnsDiscovery bool `json:"dnsDiscovery,omitempty"` + DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` + MaxMessageSize string `json:"maxMessageSize,omitempty"` + Staticnodes []string `json:"staticnodes,omitempty"` + Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` + Discv5Discovery bool `json:"discv5Discovery,omitempty"` + Discv5UdpPort int `json:"discv5UdpPort,omitempty"` + ClusterID uint16 `json:"clusterId,omitempty"` + Shards []uint16 `json:"shards,omitempty"` + PeerExchange bool `json:"peerExchange,omitempty"` + PeerExchangeNode string `json:"peerExchangeNode,omitempty"` + TcpPort int `json:"tcpPort,omitempty"` + RateLimits RateLimitsConfig `json:"rateLimits,omitempty"` +} + +type RateLimitsConfig struct { + Filter *RateLimit `json:"-"` + Lightpush *RateLimit `json:"-"` + PeerExchange *RateLimit `json:"-"` +} + +func (rlc RateLimitsConfig) MarshalJSON() ([]byte, error) { + output := []string{} + if rlc.Filter != nil { + output = append(output, fmt.Sprintf("filter:%s", rlc.Filter.String())) + } + if rlc.Lightpush != nil { + output = append(output, fmt.Sprintf("lightpush:%s", rlc.Lightpush.String())) + } + if rlc.PeerExchange != nil { + output = append(output, fmt.Sprintf("px:%s", rlc.PeerExchange.String())) + } + return json.Marshal(output) +} + +type RateLimitTimeUnit string + +const Hour RateLimitTimeUnit = "h" +const Minute RateLimitTimeUnit = "m" +const Second RateLimitTimeUnit = "s" +const Millisecond RateLimitTimeUnit = "ms" + +type RateLimit struct { + Volume int // Number of allowed messages per period + Period int // Length of each rate-limit period (in TimeUnit) + TimeUnit RateLimitTimeUnit // Time unit of the period +} + +func (rl RateLimit) String() string { + return fmt.Sprintf("%d/%d%s", rl.Volume, rl.Period, rl.TimeUnit) +} + +func (rl RateLimit) MarshalJSON() ([]byte, error) { + return json.Marshal(rl.String()) +} //export GoCallback func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { @@ -347,14 +433,14 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // WakuNode represents an instance of an nwaku node type WakuNode struct { wakuCtx unsafe.Pointer - config *common.WakuConfig + config *WakuConfig MsgChan chan common.Envelope TopicHealthChan chan topicHealth ConnectionChangeChan chan connectionChange nodeName string } -func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error) { +func NewWakuNode(config *WakuConfig, nodeName string) (*WakuNode, error) { Debug("Creating new WakuNode: %v", nodeName) n := &WakuNode{ config: config, @@ -571,7 +657,7 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) if peersStr == "" { - Debug("No connected peers found for " + n.nodeName) + Debug("No connected peers found for %s", n.nodeName) return nil, nil } @@ -598,10 +684,21 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { if pubsubTopic == "" { - return errors.New("pubsub topic is empty") + err := errors.New("pubsub topic is empty") + Error("Failed to subscribe to relay: %v", err) + return err } + if n.wakuCtx == nil { + err := errors.New("wakuCtx is nil") + Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err) + return err + } + + Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) + wg := sync.WaitGroup{} + wg.Add(1) var resp = C.allocResp(unsafe.Pointer(&wg)) var cPubsubTopic = C.CString(pubsubTopic) @@ -609,20 +706,20 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { defer C.freeResp(resp) defer C.free(unsafe.Pointer(cPubsubTopic)) - if n.wakuCtx == nil { - return errors.New("wakuCtx is nil") - } - - wg.Add(1) + Debug("Calling cGoWakuRelaySubscribe on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp) - wg.Wait() + + Debug("Waiting for response from cGoWakuRelaySubscribe on node %s", n.nodeName) + wg.Wait() // Ensures the function completes before proceeding if C.getRet(resp) == C.RET_OK { + Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) return nil } - errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) + return errors.New("error WakuRelaySubscribe: " + errMsg) } func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubkey *ecdsa.PublicKey) error { @@ -658,7 +755,9 @@ func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubk func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { if pubsubTopic == "" { - return errors.New("pubsub topic is empty") + err := errors.New("pubsub topic is empty") + Error("Failed to unsubscribe from relay: %v", err) + return err } wg := sync.WaitGroup{} @@ -674,15 +773,19 @@ func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error { } wg.Add(1) + Debug("Attempting to unsubscribe from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + + Debug("Successfully unsubscribed from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic) return nil } - errMsg := "error WakuRelayUnsubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return errors.New(errMsg) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to unsubscribe from relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg) + return errors.New("error WakuRelayUnsubscribe: " + errMsg) } func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { @@ -698,18 +801,21 @@ func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) { numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) if err != nil { + Error("Failed to parse number of received peers: %v", err) return 0, err } return numRecvPeers, nil } errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("PeerExchangeRequest failed: %v", errMsg) return 0, errors.New(errMsg) } func (n *WakuNode) StartDiscV5() error { - wg := sync.WaitGroup{} + Debug("Starting DiscV5 for node: %s", n.nodeName) + wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) defer C.freeResp(resp) @@ -717,9 +823,11 @@ func (n *WakuNode) StartDiscV5() error { C.cGoWakuStartDiscV5(n.wakuCtx, resp) wg.Wait() if C.getRet(resp) == C.RET_OK { + Debug("Successfully started DiscV5 for node: %s", n.nodeName) return nil } errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to start DiscV5 for node %s: %v", n.nodeName, errMsg) return errors.New(errMsg) } @@ -734,9 +842,11 @@ func (n *WakuNode) StopDiscV5() error { wg.Wait() if C.getRet(resp) == C.RET_OK { + Debug("Successfully stopped DiscV5 for node: %s", n.nodeName) return nil } errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to stop DiscV5 for node %s: %v", n.nodeName, errMsg) return errors.New(errMsg) } @@ -752,11 +862,13 @@ func (n *WakuNode) Version() (string, error) { if C.getRet(resp) == C.RET_OK { var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Debug("Successfully fetched Waku version for node %s: %s", n.nodeName, version) return version, nil } errMsg := "error WakuVersion: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to fetch Waku version for node %s: %v", n.nodeName, errMsg) return "", errors.New(errMsg) } @@ -833,6 +945,29 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu return common.MessageHash(""), errors.New(errMsg) } +func (n *WakuNode) RelayPublishNoCTX(pubsubTopic string, message *pb.WakuMessage) (common.MessageHash, error) { + if n == nil { + err := errors.New("cannot publish message; node is nil") + Error("Failed to publish message via relay: %v", err) + return "", err + } + + // Handling context internally with a timeout + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + Debug("Attempting to publish message via relay on node %s", n.nodeName) + + msgHash, err := n.RelayPublish(ctx, message, pubsubTopic) + if err != nil { + Error("Failed to publish message via relay on node %s: %v", n.nodeName, err) + return "", err + } + + Debug("Successfully published message via relay on node %s, messageHash: %s", n.nodeName, msgHash.String()) + return msgHash, nil +} + func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { wg := sync.WaitGroup{} @@ -957,7 +1092,7 @@ func (n *WakuNode) Destroy() error { wg.Wait() if C.getRet(resp) == C.RET_OK { - Debug("Successfully destroyed " + n.nodeName) + Debug("Successfully destroyed %s", n.nodeName) return nil } @@ -1274,17 +1409,27 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) { } // Create & start node -func StartWakuNode(nodeName string, customCfg *common.WakuConfig) (*WakuNode, error) { +func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) { Debug("Initializing %s", nodeName) - var nodeCfg common.WakuConfig + var nodeCfg WakuConfig if customCfg == nil { nodeCfg = DefaultWakuConfig + } else { nodeCfg = *customCfg } + tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0) + if err != nil { + Error("Failed to allocate unique ports: %v", err) + tcpPort, udpPort = 0, 0 // Fallback to OS-assigned ports + } + + nodeCfg.TcpPort = tcpPort + nodeCfg.Discv5UdpPort = udpPort + Debug("Creating %s", nodeName) node, err := NewWakuNode(&nodeCfg, nodeName) if err != nil { @@ -1378,3 +1523,162 @@ func (n *WakuNode) DisconnectPeer(target *WakuNode) error { Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName) return nil } + +func ConnectAllPeers(nodes []*WakuNode) error { + if len(nodes) == 0 { + Error("Cannot connect peers: node list is empty") + return errors.New("node list is empty") + } + + timeout := time.Duration(len(nodes)*2) * time.Second + Debug("Connecting nodes in a relay chain with timeout: %v", timeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + for i := 0; i < len(nodes)-1; i++ { + Debug("Connecting node %d to node %d", i, i+1) + err := nodes[i].ConnectPeer(nodes[i+1]) + if err != nil { + Error("Failed to connect node %d to node %d: %v", i, i+1, err) + return err + } + } + + <-ctx.Done() + Debug("Connections stabilized") + return nil +} + +func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash) error { + timeout := 3 * time.Second + Debug("Verifying if the message was received on node %s, timeout: %v", n.nodeName, timeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + select { + case envelope := <-n.MsgChan: + if envelope == nil { + Error("Received envelope is nil on node %s", n.nodeName) + return errors.New("received envelope is nil") + } + if string(expectedMessage.Payload) != string(envelope.Message().Payload) { + Error("Payload does not match on node %s", n.nodeName) + return errors.New("payload does not match") + } + if expectedMessage.ContentTopic != envelope.Message().ContentTopic { + Error("Content topic does not match on node %s", n.nodeName) + return errors.New("content topic does not match") + } + if expectedHash != envelope.Hash() { + Error("Message hash does not match on node %s", n.nodeName) + return errors.New("message hash does not match") + } + Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload)) + return nil + case <-ctx.Done(): + Error("Timeout: message not received within %v on node %s", timeout, n.nodeName) + return errors.New("timeout: message not received within the given duration") + } +} + +func (n *WakuNode) CreateMessage(customMessage ...*pb.WakuMessage) *pb.WakuMessage { + Debug("Creating a WakuMessage on node %s", n.nodeName) + + if len(customMessage) > 0 && customMessage[0] != nil { + Debug("Using provided custom message on node %s", n.nodeName) + return customMessage[0] + } + + Debug("Using default message format on node %s", n.nodeName) + defaultMessage := &pb.WakuMessage{ + Payload: []byte("This is a default Waku message payload"), + ContentTopic: "test-content-topic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().UnixNano()), + } + + Debug("Successfully created a default WakuMessage on node %s", n.nodeName) + return defaultMessage +} + +func WaitForAutoConnection(nodeList []*WakuNode) error { + Debug("Waiting for auto-connection of nodes...") + + var hardWait = 30 * time.Second + Debug("Applying hard wait of %v seconds before checking connections", hardWait.Seconds()) + time.Sleep(hardWait) + + for _, node := range nodeList { + peers, err := node.GetConnectedPeers() + if err != nil { + Error("Failed to get connected peers for node %s: %v", node.nodeName, err) + return err + } + + if len(peers) < 1 { + Error("Node %s has no connected peers, expected at least 1", node.nodeName) + return errors.New("expected at least one connected peer") + } + + Debug("Node %s has %d connected peers", node.nodeName, len(peers)) + } + + Debug("Auto-connection check completed successfully") + return nil +} + +func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error { + for _, node := range nodes { + Debug("Subscribing node %s to topic %s", node.nodeName, topic) + err := RetryWithBackOff(func() error { + return node.RelaySubscribe(topic) + }) + if err != nil { + Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err) + return err + } + Debug("Node %s successfully subscribed to topic %s", node.nodeName, topic) + } + return nil +} + +func (n *WakuNode) GetStoredMessages(storeNode *WakuNode, storeRequest *common.StoreQueryRequest) (*common.StoreQueryResponse, error) { + Debug("Starting store query request") + + if storeRequest == nil { + Debug("Using DefaultStoreQueryRequest") + storeRequest = &DefaultStoreQueryRequest + } + + storeMultiaddr, err := storeNode.ListenAddresses() + if err != nil { + Error("Failed to retrieve listen addresses for store node: %v", err) + return nil, err + } + + if len(storeMultiaddr) == 0 { + Error("Store node has no available listen addresses") + return nil, fmt.Errorf("store node has no available listen addresses") + } + + storeNodeAddrInfo, err := peer.AddrInfoFromString(storeMultiaddr[0].String()) + if err != nil { + Error("Failed to convert store node address to AddrInfo: %v", err) + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + Debug("Querying store node for messages") + res, err := n.StoreQuery(ctx, storeRequest, *storeNodeAddrInfo) + if err != nil { + Error("StoreQuery failed: %v", err) + return nil, err + } + + Debug("Store query successful, retrieved %d messages", len(*res.Messages)) + return res, nil +} diff --git a/waku/store_test.go b/waku/store_test.go new file mode 100644 index 0000000..3c2488e --- /dev/null +++ b/waku/store_test.go @@ -0,0 +1,83 @@ +package waku + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + + //"github.com/waku-org/waku-go-bindings/waku/common" + "google.golang.org/protobuf/proto" +) + +func TestStoreQueryFromPeer(t *testing.T) { + Debug("Starting test to verify store query from a peer using direct peer connections") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + Debug("Publishing message from Node1 using RelayPublish") + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte("test-message"), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + defaultPubsubTopic := DefaultPubsubTopic + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + Debug("Waiting for message delivery to Node2") + time.Sleep(2 * time.Second) + + Debug("Verifying that Node2 received the message") + err = node2.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "Node2 should have received the message") + + Debug("Node3 querying stored messages from Node2") + res, err := node3.GetStoredMessages(node2, nil) + var storedMessages = *res.Messages + require.NoError(t, err, "Failed to retrieve stored messages from Node2") + require.NotEmpty(t, storedMessages, "Expected at least one stored message") + Debug("Verifying stored message matches the published message") + require.Equal(t, message.Payload, storedMessages[0].WakuMessage.Payload, "Stored message payload does not match") + Debug("Test successfully verified store query from a peer using direct peer connections") +} diff --git a/waku/test_data.go b/waku/test_data.go index 121e1f2..ff8859d 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -4,9 +4,11 @@ import ( "time" "github.com/waku-org/waku-go-bindings/waku/common" + "google.golang.org/protobuf/proto" ) -var DefaultWakuConfig common.WakuConfig +var DefaultWakuConfig WakuConfig +var DefaultStoreQueryRequest common.StoreQueryRequest func init() { @@ -17,7 +19,7 @@ func init() { Error("Failed to get free ports %v %v", err1, err2) } - DefaultWakuConfig = common.WakuConfig{ + DefaultWakuConfig = WakuConfig{ Relay: false, LogLevel: "DEBUG", Discv5Discovery: true, @@ -30,6 +32,14 @@ func init() { Discv5UdpPort: udpPort, TcpPort: tcpPort, } + + DefaultStoreQueryRequest = common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(uint64(50)), + PaginationForward: true, + TimeStart: proto.Int64(time.Now().Add(-5 * time.Minute).UnixNano()), // 5 mins before now + } } const ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node