mirror of
https://github.com/logos-messaging/logos-messaging-go-bindings.git
synced 2026-01-10 09:53:12 +00:00
Add store tests
This commit is contained in:
parent
f18c532f70
commit
a0cb52cc2b
452
waku/nwaku.go
452
waku/nwaku.go
@ -56,6 +56,15 @@ package waku
|
||||
// resp must be set != NULL in case interest on retrieving data from the callback
|
||||
void GoCallback(int ret, char* msg, size_t len, void* resp);
|
||||
|
||||
#define WAKU_CALL(call) \
|
||||
do { \
|
||||
int ret = call; \
|
||||
if (ret != 0) { \
|
||||
printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \
|
||||
exit(1); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
static void* cGoWakuNew(const char* configJson, void* resp) {
|
||||
// We pass NULL because we are not interested in retrieving data from this callback
|
||||
void* ret = waku_new(configJson, (WakuCallBack) GoCallback, resp);
|
||||
@ -63,27 +72,27 @@ package waku
|
||||
}
|
||||
|
||||
static void cGoWakuStart(void* wakuCtx, void* resp) {
|
||||
waku_start(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) GoCallback, resp));
|
||||
}
|
||||
|
||||
static void cGoWakuStop(void* wakuCtx, void* resp) {
|
||||
waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp));
|
||||
}
|
||||
|
||||
static void cGoWakuDestroy(void* wakuCtx, void* resp) {
|
||||
waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp));
|
||||
}
|
||||
|
||||
static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) {
|
||||
waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp));
|
||||
}
|
||||
|
||||
static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) {
|
||||
waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp));
|
||||
}
|
||||
|
||||
static void cGoWakuVersion(void* wakuCtx, void* resp) {
|
||||
waku_version(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) GoCallback, resp));
|
||||
}
|
||||
|
||||
static void cGoWakuSetEventCallback(void* wakuCtx) {
|
||||
@ -109,21 +118,21 @@ package waku
|
||||
char* encoding,
|
||||
void* resp) {
|
||||
|
||||
waku_content_topic(wakuCtx,
|
||||
WAKU_CALL( waku_content_topic(wakuCtx,
|
||||
appName,
|
||||
appVersion,
|
||||
contentTopicName,
|
||||
encoding,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) {
|
||||
waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) {
|
||||
waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp));
|
||||
}
|
||||
|
||||
static void cGoWakuRelayPublish(void* wakuCtx,
|
||||
@ -132,44 +141,44 @@ package waku
|
||||
int timeoutMs,
|
||||
void* resp) {
|
||||
|
||||
waku_relay_publish(wakuCtx,
|
||||
WAKU_CALL (waku_relay_publish(wakuCtx,
|
||||
pubSubTopic,
|
||||
jsonWakuMessage,
|
||||
timeoutMs,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
resp));
|
||||
}
|
||||
|
||||
static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {
|
||||
waku_relay_subscribe(wakuCtx,
|
||||
WAKU_CALL ( waku_relay_subscribe(wakuCtx,
|
||||
pubSubTopic,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuRelayAddProtectedShard(void* wakuCtx, int clusterId, int shardId, char* publicKey, void* resp) {
|
||||
waku_relay_add_protected_shard(wakuCtx,
|
||||
WAKU_CALL ( waku_relay_add_protected_shard(wakuCtx,
|
||||
clusterId,
|
||||
shardId,
|
||||
publicKey,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {
|
||||
|
||||
waku_relay_unsubscribe(wakuCtx,
|
||||
WAKU_CALL ( waku_relay_unsubscribe(wakuCtx,
|
||||
pubSubTopic,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) {
|
||||
waku_connect(wakuCtx,
|
||||
WAKU_CALL( waku_connect(wakuCtx,
|
||||
peerMultiAddr,
|
||||
timeoutMs,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuDialPeer(void* wakuCtx,
|
||||
@ -178,12 +187,12 @@ package waku
|
||||
int timeoutMs,
|
||||
void* resp) {
|
||||
|
||||
waku_dial_peer(wakuCtx,
|
||||
WAKU_CALL( waku_dial_peer(wakuCtx,
|
||||
peerMultiAddr,
|
||||
protocol,
|
||||
timeoutMs,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuDialPeerById(void* wakuCtx,
|
||||
@ -192,51 +201,51 @@ package waku
|
||||
int timeoutMs,
|
||||
void* resp) {
|
||||
|
||||
waku_dial_peer_by_id(wakuCtx,
|
||||
WAKU_CALL( waku_dial_peer_by_id(wakuCtx,
|
||||
peerId,
|
||||
protocol,
|
||||
timeoutMs,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) {
|
||||
waku_disconnect_peer_by_id(wakuCtx,
|
||||
WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx,
|
||||
peerId,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuListenAddresses(void* wakuCtx, void* resp) {
|
||||
waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuGetMyENR(void* ctx, void* resp) {
|
||||
waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuGetMyPeerId(void* ctx, void* resp) {
|
||||
waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) {
|
||||
waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) {
|
||||
waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) {
|
||||
waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) {
|
||||
waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) {
|
||||
waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||
WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp) );
|
||||
}
|
||||
|
||||
static void cGoWakuLightpushPublish(void* wakuCtx,
|
||||
@ -244,11 +253,11 @@ package waku
|
||||
const char* jsonWakuMessage,
|
||||
void* resp) {
|
||||
|
||||
waku_lightpush_publish(wakuCtx,
|
||||
WAKU_CALL (waku_lightpush_publish(wakuCtx,
|
||||
pubSubTopic,
|
||||
jsonWakuMessage,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
resp));
|
||||
}
|
||||
|
||||
static void cGoWakuStoreQuery(void* wakuCtx,
|
||||
@ -257,32 +266,32 @@ package waku
|
||||
int timeoutMs,
|
||||
void* resp) {
|
||||
|
||||
waku_store_query(wakuCtx,
|
||||
jsonQuery,
|
||||
peerAddr,
|
||||
timeoutMs,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
WAKU_CALL (waku_store_query(wakuCtx,
|
||||
jsonQuery,
|
||||
peerAddr,
|
||||
timeoutMs,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp));
|
||||
}
|
||||
|
||||
static void cGoWakuPeerExchangeQuery(void* wakuCtx,
|
||||
uint64_t numPeers,
|
||||
void* resp) {
|
||||
|
||||
waku_peer_exchange_request(wakuCtx,
|
||||
WAKU_CALL (waku_peer_exchange_request(wakuCtx,
|
||||
numPeers,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
resp));
|
||||
}
|
||||
|
||||
static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx,
|
||||
const char* protocol,
|
||||
void* resp) {
|
||||
|
||||
waku_get_peerids_by_protocol(wakuCtx,
|
||||
WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx,
|
||||
protocol,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
resp));
|
||||
}
|
||||
|
||||
static void cGoWakuDnsDiscovery(void* wakuCtx,
|
||||
@ -291,12 +300,12 @@ package waku
|
||||
int timeoutMs,
|
||||
void* resp) {
|
||||
|
||||
waku_dns_discovery(wakuCtx,
|
||||
entTreeUrl,
|
||||
nameDnsServer,
|
||||
timeoutMs,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp);
|
||||
WAKU_CALL (waku_dns_discovery(wakuCtx,
|
||||
entTreeUrl,
|
||||
nameDnsServer,
|
||||
timeoutMs,
|
||||
(WakuCallBack) GoCallback,
|
||||
resp));
|
||||
}
|
||||
|
||||
*/
|
||||
@ -325,12 +334,89 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"github.com/waku-org/waku-go-bindings/waku/common"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
const requestTimeout = 30 * time.Second
|
||||
const MsgChanBufferSize = 1024
|
||||
const TopicHealthChanBufferSize = 1024
|
||||
const ConnectionChangeChanBufferSize = 1024
|
||||
const MsgChanBufferSize = 100
|
||||
const TopicHealthChanBufferSize = 100
|
||||
const ConnectionChangeChanBufferSize = 100
|
||||
|
||||
type WakuConfig struct {
|
||||
Host string `json:"host,omitempty"`
|
||||
Nodekey string `json:"nodekey,omitempty"`
|
||||
Relay bool `json:"relay"`
|
||||
Store bool `json:"store,omitempty"`
|
||||
LegacyStore bool `json:"legacyStore"`
|
||||
Storenode string `json:"storenode,omitempty"`
|
||||
StoreMessageRetentionPolicy string `json:"storeMessageRetentionPolicy,omitempty"`
|
||||
StoreMessageDbUrl string `json:"storeMessageDbUrl,omitempty"`
|
||||
StoreMessageDbVacuum bool `json:"storeMessageDbVacuum,omitempty"`
|
||||
StoreMaxNumDbConnections int `json:"storeMaxNumDbConnections,omitempty"`
|
||||
StoreResume bool `json:"storeResume,omitempty"`
|
||||
Filter bool `json:"filter,omitempty"`
|
||||
Filternode string `json:"filternode,omitempty"`
|
||||
FilterSubscriptionTimeout int64 `json:"filterSubscriptionTimeout,omitempty"`
|
||||
FilterMaxPeersToServe uint32 `json:"filterMaxPeersToServe,omitempty"`
|
||||
FilterMaxCriteria uint32 `json:"filterMaxCriteria,omitempty"`
|
||||
Lightpush bool `json:"lightpush,omitempty"`
|
||||
LightpushNode string `json:"lightpushnode,omitempty"`
|
||||
LogLevel string `json:"logLevel,omitempty"`
|
||||
DnsDiscovery bool `json:"dnsDiscovery,omitempty"`
|
||||
DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"`
|
||||
MaxMessageSize string `json:"maxMessageSize,omitempty"`
|
||||
Staticnodes []string `json:"staticnodes,omitempty"`
|
||||
Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"`
|
||||
Discv5Discovery bool `json:"discv5Discovery,omitempty"`
|
||||
Discv5UdpPort int `json:"discv5UdpPort,omitempty"`
|
||||
ClusterID uint16 `json:"clusterId,omitempty"`
|
||||
Shards []uint16 `json:"shards,omitempty"`
|
||||
PeerExchange bool `json:"peerExchange,omitempty"`
|
||||
PeerExchangeNode string `json:"peerExchangeNode,omitempty"`
|
||||
TcpPort int `json:"tcpPort,omitempty"`
|
||||
RateLimits RateLimitsConfig `json:"rateLimits,omitempty"`
|
||||
}
|
||||
|
||||
type RateLimitsConfig struct {
|
||||
Filter *RateLimit `json:"-"`
|
||||
Lightpush *RateLimit `json:"-"`
|
||||
PeerExchange *RateLimit `json:"-"`
|
||||
}
|
||||
|
||||
func (rlc RateLimitsConfig) MarshalJSON() ([]byte, error) {
|
||||
output := []string{}
|
||||
if rlc.Filter != nil {
|
||||
output = append(output, fmt.Sprintf("filter:%s", rlc.Filter.String()))
|
||||
}
|
||||
if rlc.Lightpush != nil {
|
||||
output = append(output, fmt.Sprintf("lightpush:%s", rlc.Lightpush.String()))
|
||||
}
|
||||
if rlc.PeerExchange != nil {
|
||||
output = append(output, fmt.Sprintf("px:%s", rlc.PeerExchange.String()))
|
||||
}
|
||||
return json.Marshal(output)
|
||||
}
|
||||
|
||||
type RateLimitTimeUnit string
|
||||
|
||||
const Hour RateLimitTimeUnit = "h"
|
||||
const Minute RateLimitTimeUnit = "m"
|
||||
const Second RateLimitTimeUnit = "s"
|
||||
const Millisecond RateLimitTimeUnit = "ms"
|
||||
|
||||
type RateLimit struct {
|
||||
Volume int // Number of allowed messages per period
|
||||
Period int // Length of each rate-limit period (in TimeUnit)
|
||||
TimeUnit RateLimitTimeUnit // Time unit of the period
|
||||
}
|
||||
|
||||
func (rl RateLimit) String() string {
|
||||
return fmt.Sprintf("%d/%d%s", rl.Volume, rl.Period, rl.TimeUnit)
|
||||
}
|
||||
|
||||
func (rl RateLimit) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(rl.String())
|
||||
}
|
||||
|
||||
//export GoCallback
|
||||
func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
||||
@ -347,14 +433,14 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
||||
// WakuNode represents an instance of an nwaku node
|
||||
type WakuNode struct {
|
||||
wakuCtx unsafe.Pointer
|
||||
config *common.WakuConfig
|
||||
config *WakuConfig
|
||||
MsgChan chan common.Envelope
|
||||
TopicHealthChan chan topicHealth
|
||||
ConnectionChangeChan chan connectionChange
|
||||
nodeName string
|
||||
}
|
||||
|
||||
func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error) {
|
||||
func NewWakuNode(config *WakuConfig, nodeName string) (*WakuNode, error) {
|
||||
Debug("Creating new WakuNode: %v", nodeName)
|
||||
n := &WakuNode{
|
||||
config: config,
|
||||
@ -571,7 +657,7 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
if peersStr == "" {
|
||||
Debug("No connected peers found for " + n.nodeName)
|
||||
Debug("No connected peers found for %s", n.nodeName)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@ -598,10 +684,21 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
|
||||
|
||||
func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
|
||||
if pubsubTopic == "" {
|
||||
return errors.New("pubsub topic is empty")
|
||||
err := errors.New("pubsub topic is empty")
|
||||
Error("Failed to subscribe to relay: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if n.wakuCtx == nil {
|
||||
err := errors.New("wakuCtx is nil")
|
||||
Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
||||
var resp = C.allocResp(unsafe.Pointer(&wg))
|
||||
var cPubsubTopic = C.CString(pubsubTopic)
|
||||
@ -609,20 +706,20 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
|
||||
defer C.freeResp(resp)
|
||||
defer C.free(unsafe.Pointer(cPubsubTopic))
|
||||
|
||||
if n.wakuCtx == nil {
|
||||
return errors.New("wakuCtx is nil")
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
Debug("Calling cGoWakuRelaySubscribe on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
|
||||
C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp)
|
||||
wg.Wait()
|
||||
|
||||
Debug("Waiting for response from cGoWakuRelaySubscribe on node %s", n.nodeName)
|
||||
wg.Wait() // Ensures the function completes before proceeding
|
||||
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
Debug("Successfully subscribed to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
|
||||
return nil
|
||||
}
|
||||
|
||||
errMsg := "error WakuRelaySubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
return errors.New(errMsg)
|
||||
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg)
|
||||
return errors.New("error WakuRelaySubscribe: " + errMsg)
|
||||
}
|
||||
|
||||
func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubkey *ecdsa.PublicKey) error {
|
||||
@ -658,7 +755,9 @@ func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubk
|
||||
|
||||
func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error {
|
||||
if pubsubTopic == "" {
|
||||
return errors.New("pubsub topic is empty")
|
||||
err := errors.New("pubsub topic is empty")
|
||||
Error("Failed to unsubscribe from relay: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
@ -674,15 +773,19 @@ func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error {
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
Debug("Attempting to unsubscribe from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
|
||||
C.cGoWakuRelayUnsubscribe(n.wakuCtx, cPubsubTopic, resp)
|
||||
wg.Wait()
|
||||
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
|
||||
Debug("Successfully unsubscribed from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
|
||||
return nil
|
||||
}
|
||||
|
||||
errMsg := "error WakuRelayUnsubscribe: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
return errors.New(errMsg)
|
||||
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("Failed to unsubscribe from relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, errMsg)
|
||||
return errors.New("error WakuRelayUnsubscribe: " + errMsg)
|
||||
}
|
||||
|
||||
func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) {
|
||||
@ -698,18 +801,21 @@ func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) {
|
||||
numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64)
|
||||
if err != nil {
|
||||
Error("Failed to parse number of received peers: %v", err)
|
||||
return 0, err
|
||||
}
|
||||
return numRecvPeers, nil
|
||||
}
|
||||
|
||||
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("PeerExchangeRequest failed: %v", errMsg)
|
||||
return 0, errors.New(errMsg)
|
||||
}
|
||||
|
||||
func (n *WakuNode) StartDiscV5() error {
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
Debug("Starting DiscV5 for node: %s", n.nodeName)
|
||||
wg := sync.WaitGroup{}
|
||||
var resp = C.allocResp(unsafe.Pointer(&wg))
|
||||
defer C.freeResp(resp)
|
||||
|
||||
@ -717,9 +823,11 @@ func (n *WakuNode) StartDiscV5() error {
|
||||
C.cGoWakuStartDiscV5(n.wakuCtx, resp)
|
||||
wg.Wait()
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
Debug("Successfully started DiscV5 for node: %s", n.nodeName)
|
||||
return nil
|
||||
}
|
||||
errMsg := "error WakuStartDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("Failed to start DiscV5 for node %s: %v", n.nodeName, errMsg)
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
|
||||
@ -734,9 +842,11 @@ func (n *WakuNode) StopDiscV5() error {
|
||||
wg.Wait()
|
||||
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
Debug("Successfully stopped DiscV5 for node: %s", n.nodeName)
|
||||
return nil
|
||||
}
|
||||
errMsg := "error WakuStopDiscV5: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("Failed to stop DiscV5 for node %s: %v", n.nodeName, errMsg)
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
|
||||
@ -752,11 +862,13 @@ func (n *WakuNode) Version() (string, error) {
|
||||
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
var version = C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Debug("Successfully fetched Waku version for node %s: %s", n.nodeName, version)
|
||||
return version, nil
|
||||
}
|
||||
|
||||
errMsg := "error WakuVersion: " +
|
||||
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("Failed to fetch Waku version for node %s: %v", n.nodeName, errMsg)
|
||||
return "", errors.New(errMsg)
|
||||
}
|
||||
|
||||
@ -833,6 +945,29 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu
|
||||
return common.MessageHash(""), errors.New(errMsg)
|
||||
}
|
||||
|
||||
func (n *WakuNode) RelayPublishNoCTX(pubsubTopic string, message *pb.WakuMessage) (common.MessageHash, error) {
|
||||
if n == nil {
|
||||
err := errors.New("cannot publish message; node is nil")
|
||||
Error("Failed to publish message via relay: %v", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Handling context internally with a timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
Debug("Attempting to publish message via relay on node %s", n.nodeName)
|
||||
|
||||
msgHash, err := n.RelayPublish(ctx, message, pubsubTopic)
|
||||
if err != nil {
|
||||
Error("Failed to publish message via relay on node %s: %v", n.nodeName, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
Debug("Successfully published message via relay on node %s, messageHash: %s", n.nodeName, msgHash.String())
|
||||
return msgHash, nil
|
||||
}
|
||||
|
||||
func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) {
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
@ -957,7 +1092,7 @@ func (n *WakuNode) Destroy() error {
|
||||
wg.Wait()
|
||||
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
Debug("Successfully destroyed " + n.nodeName)
|
||||
Debug("Successfully destroyed %s", n.nodeName)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1274,17 +1409,27 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) {
|
||||
}
|
||||
|
||||
// Create & start node
|
||||
func StartWakuNode(nodeName string, customCfg *common.WakuConfig) (*WakuNode, error) {
|
||||
func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) {
|
||||
|
||||
Debug("Initializing %s", nodeName)
|
||||
|
||||
var nodeCfg common.WakuConfig
|
||||
var nodeCfg WakuConfig
|
||||
if customCfg == nil {
|
||||
nodeCfg = DefaultWakuConfig
|
||||
|
||||
} else {
|
||||
nodeCfg = *customCfg
|
||||
}
|
||||
|
||||
tcpPort, udpPort, err := GetFreePortIfNeeded(0, 0)
|
||||
if err != nil {
|
||||
Error("Failed to allocate unique ports: %v", err)
|
||||
tcpPort, udpPort = 0, 0 // Fallback to OS-assigned ports
|
||||
}
|
||||
|
||||
nodeCfg.TcpPort = tcpPort
|
||||
nodeCfg.Discv5UdpPort = udpPort
|
||||
|
||||
Debug("Creating %s", nodeName)
|
||||
node, err := NewWakuNode(&nodeCfg, nodeName)
|
||||
if err != nil {
|
||||
@ -1378,3 +1523,162 @@ func (n *WakuNode) DisconnectPeer(target *WakuNode) error {
|
||||
Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func ConnectAllPeers(nodes []*WakuNode) error {
|
||||
if len(nodes) == 0 {
|
||||
Error("Cannot connect peers: node list is empty")
|
||||
return errors.New("node list is empty")
|
||||
}
|
||||
|
||||
timeout := time.Duration(len(nodes)*2) * time.Second
|
||||
Debug("Connecting nodes in a relay chain with timeout: %v", timeout)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
for i := 0; i < len(nodes)-1; i++ {
|
||||
Debug("Connecting node %d to node %d", i, i+1)
|
||||
err := nodes[i].ConnectPeer(nodes[i+1])
|
||||
if err != nil {
|
||||
Error("Failed to connect node %d to node %d: %v", i, i+1, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
Debug("Connections stabilized")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expectedHash common.MessageHash) error {
|
||||
timeout := 3 * time.Second
|
||||
Debug("Verifying if the message was received on node %s, timeout: %v", n.nodeName, timeout)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case envelope := <-n.MsgChan:
|
||||
if envelope == nil {
|
||||
Error("Received envelope is nil on node %s", n.nodeName)
|
||||
return errors.New("received envelope is nil")
|
||||
}
|
||||
if string(expectedMessage.Payload) != string(envelope.Message().Payload) {
|
||||
Error("Payload does not match on node %s", n.nodeName)
|
||||
return errors.New("payload does not match")
|
||||
}
|
||||
if expectedMessage.ContentTopic != envelope.Message().ContentTopic {
|
||||
Error("Content topic does not match on node %s", n.nodeName)
|
||||
return errors.New("content topic does not match")
|
||||
}
|
||||
if expectedHash != envelope.Hash() {
|
||||
Error("Message hash does not match on node %s", n.nodeName)
|
||||
return errors.New("message hash does not match")
|
||||
}
|
||||
Debug("Message received and verified successfully on node %s, Message: %s", n.nodeName, string(envelope.Message().Payload))
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
Error("Timeout: message not received within %v on node %s", timeout, n.nodeName)
|
||||
return errors.New("timeout: message not received within the given duration")
|
||||
}
|
||||
}
|
||||
|
||||
func (n *WakuNode) CreateMessage(customMessage ...*pb.WakuMessage) *pb.WakuMessage {
|
||||
Debug("Creating a WakuMessage on node %s", n.nodeName)
|
||||
|
||||
if len(customMessage) > 0 && customMessage[0] != nil {
|
||||
Debug("Using provided custom message on node %s", n.nodeName)
|
||||
return customMessage[0]
|
||||
}
|
||||
|
||||
Debug("Using default message format on node %s", n.nodeName)
|
||||
defaultMessage := &pb.WakuMessage{
|
||||
Payload: []byte("This is a default Waku message payload"),
|
||||
ContentTopic: "test-content-topic",
|
||||
Version: proto.Uint32(0),
|
||||
Timestamp: proto.Int64(time.Now().UnixNano()),
|
||||
}
|
||||
|
||||
Debug("Successfully created a default WakuMessage on node %s", n.nodeName)
|
||||
return defaultMessage
|
||||
}
|
||||
|
||||
func WaitForAutoConnection(nodeList []*WakuNode) error {
|
||||
Debug("Waiting for auto-connection of nodes...")
|
||||
|
||||
var hardWait = 30 * time.Second
|
||||
Debug("Applying hard wait of %v seconds before checking connections", hardWait.Seconds())
|
||||
time.Sleep(hardWait)
|
||||
|
||||
for _, node := range nodeList {
|
||||
peers, err := node.GetConnectedPeers()
|
||||
if err != nil {
|
||||
Error("Failed to get connected peers for node %s: %v", node.nodeName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if len(peers) < 1 {
|
||||
Error("Node %s has no connected peers, expected at least 1", node.nodeName)
|
||||
return errors.New("expected at least one connected peer")
|
||||
}
|
||||
|
||||
Debug("Node %s has %d connected peers", node.nodeName, len(peers))
|
||||
}
|
||||
|
||||
Debug("Auto-connection check completed successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error {
|
||||
for _, node := range nodes {
|
||||
Debug("Subscribing node %s to topic %s", node.nodeName, topic)
|
||||
err := RetryWithBackOff(func() error {
|
||||
return node.RelaySubscribe(topic)
|
||||
})
|
||||
if err != nil {
|
||||
Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err)
|
||||
return err
|
||||
}
|
||||
Debug("Node %s successfully subscribed to topic %s", node.nodeName, topic)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *WakuNode) GetStoredMessages(storeNode *WakuNode, storeRequest *common.StoreQueryRequest) (*common.StoreQueryResponse, error) {
|
||||
Debug("Starting store query request")
|
||||
|
||||
if storeRequest == nil {
|
||||
Debug("Using DefaultStoreQueryRequest")
|
||||
storeRequest = &DefaultStoreQueryRequest
|
||||
}
|
||||
|
||||
storeMultiaddr, err := storeNode.ListenAddresses()
|
||||
if err != nil {
|
||||
Error("Failed to retrieve listen addresses for store node: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(storeMultiaddr) == 0 {
|
||||
Error("Store node has no available listen addresses")
|
||||
return nil, fmt.Errorf("store node has no available listen addresses")
|
||||
}
|
||||
|
||||
storeNodeAddrInfo, err := peer.AddrInfoFromString(storeMultiaddr[0].String())
|
||||
if err != nil {
|
||||
Error("Failed to convert store node address to AddrInfo: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
Debug("Querying store node for messages")
|
||||
res, err := n.StoreQuery(ctx, storeRequest, *storeNodeAddrInfo)
|
||||
if err != nil {
|
||||
Error("StoreQuery failed: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Debug("Store query successful, retrieved %d messages", len(*res.Messages))
|
||||
return res, nil
|
||||
}
|
||||
|
||||
83
waku/store_test.go
Normal file
83
waku/store_test.go
Normal file
@ -0,0 +1,83 @@
|
||||
package waku
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
|
||||
//"github.com/waku-org/waku-go-bindings/waku/common"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func TestStoreQueryFromPeer(t *testing.T) {
|
||||
Debug("Starting test to verify store query from a peer using direct peer connections")
|
||||
|
||||
node1Config := DefaultWakuConfig
|
||||
node1Config.Relay = true
|
||||
|
||||
Debug("Creating Node1 (Relay enabled)")
|
||||
node1, err := StartWakuNode("Node1", &node1Config)
|
||||
require.NoError(t, err, "Failed to start Node1")
|
||||
|
||||
node2Config := DefaultWakuConfig
|
||||
node2Config.Relay = true
|
||||
node2Config.Store = true
|
||||
|
||||
Debug("Creating Node2 (Relay & Store enabled)")
|
||||
node2, err := StartWakuNode("Node2", &node2Config)
|
||||
require.NoError(t, err, "Failed to start Node2")
|
||||
|
||||
node3Config := DefaultWakuConfig
|
||||
node3Config.Relay = false
|
||||
|
||||
Debug("Creating Node3 (Peer connected to Node2)")
|
||||
node3, err := StartWakuNode("Node3", &node3Config)
|
||||
require.NoError(t, err, "Failed to start Node3")
|
||||
|
||||
defer func() {
|
||||
Debug("Stopping and destroying all Waku nodes")
|
||||
node1.StopAndDestroy()
|
||||
node2.StopAndDestroy()
|
||||
node3.StopAndDestroy()
|
||||
}()
|
||||
|
||||
Debug("Connecting Node1 to Node2")
|
||||
err = node1.ConnectPeer(node2)
|
||||
require.NoError(t, err, "Failed to connect Node1 to Node2")
|
||||
|
||||
Debug("Connecting Node3 to Node2")
|
||||
err = node3.ConnectPeer(node2)
|
||||
require.NoError(t, err, "Failed to connect Node3 to Node2")
|
||||
|
||||
Debug("Waiting for peer connections to stabilize")
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
Debug("Publishing message from Node1 using RelayPublish")
|
||||
message := node1.CreateMessage(&pb.WakuMessage{
|
||||
Payload: []byte("test-message"),
|
||||
ContentTopic: "test-content-topic",
|
||||
Timestamp: proto.Int64(time.Now().UnixNano()),
|
||||
})
|
||||
|
||||
defaultPubsubTopic := DefaultPubsubTopic
|
||||
msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message)
|
||||
require.NoError(t, err, "Failed to publish message from Node1")
|
||||
|
||||
Debug("Waiting for message delivery to Node2")
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
Debug("Verifying that Node2 received the message")
|
||||
err = node2.VerifyMessageReceived(message, msgHash)
|
||||
require.NoError(t, err, "Node2 should have received the message")
|
||||
|
||||
Debug("Node3 querying stored messages from Node2")
|
||||
res, err := node3.GetStoredMessages(node2, nil)
|
||||
var storedMessages = *res.Messages
|
||||
require.NoError(t, err, "Failed to retrieve stored messages from Node2")
|
||||
require.NotEmpty(t, storedMessages, "Expected at least one stored message")
|
||||
Debug("Verifying stored message matches the published message")
|
||||
require.Equal(t, message.Payload, storedMessages[0].WakuMessage.Payload, "Stored message payload does not match")
|
||||
Debug("Test successfully verified store query from a peer using direct peer connections")
|
||||
}
|
||||
@ -4,9 +4,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/waku-org/waku-go-bindings/waku/common"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
var DefaultWakuConfig common.WakuConfig
|
||||
var DefaultWakuConfig WakuConfig
|
||||
var DefaultStoreQueryRequest common.StoreQueryRequest
|
||||
|
||||
func init() {
|
||||
|
||||
@ -17,7 +19,7 @@ func init() {
|
||||
Error("Failed to get free ports %v %v", err1, err2)
|
||||
}
|
||||
|
||||
DefaultWakuConfig = common.WakuConfig{
|
||||
DefaultWakuConfig = WakuConfig{
|
||||
Relay: false,
|
||||
LogLevel: "DEBUG",
|
||||
Discv5Discovery: true,
|
||||
@ -30,6 +32,14 @@ func init() {
|
||||
Discv5UdpPort: udpPort,
|
||||
TcpPort: tcpPort,
|
||||
}
|
||||
|
||||
DefaultStoreQueryRequest = common.StoreQueryRequest{
|
||||
IncludeData: true,
|
||||
ContentTopics: &[]string{"test-content-topic"},
|
||||
PaginationLimit: proto.Uint64(uint64(50)),
|
||||
PaginationForward: true,
|
||||
TimeStart: proto.Int64(time.Now().Add(-5 * time.Minute).UnixNano()), // 5 mins before now
|
||||
}
|
||||
}
|
||||
|
||||
const ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user