mirror of
https://github.com/logos-messaging/logos-messaging-go-bindings.git
synced 2026-05-28 20:49:41 +00:00
work on merge conflicts
This commit is contained in:
parent
f395ed4766
commit
21eae98e8d
308
waku/nwaku.go
308
waku/nwaku.go
@ -56,15 +56,6 @@ package waku
|
|||||||
// resp must be set != NULL in case interest on retrieving data from the callback
|
// resp must be set != NULL in case interest on retrieving data from the callback
|
||||||
void GoCallback(int ret, char* msg, size_t len, void* resp);
|
void GoCallback(int ret, char* msg, size_t len, void* resp);
|
||||||
|
|
||||||
#define WAKU_CALL(call) \
|
|
||||||
do { \
|
|
||||||
int ret = call; \
|
|
||||||
if (ret != 0) { \
|
|
||||||
printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \
|
|
||||||
exit(1); \
|
|
||||||
} \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
static void* cGoWakuNew(const char* configJson, void* resp) {
|
static void* cGoWakuNew(const char* configJson, void* resp) {
|
||||||
// We pass NULL because we are not interested in retrieving data from this callback
|
// We pass NULL because we are not interested in retrieving data from this callback
|
||||||
void* ret = waku_new(configJson, (WakuCallBack) GoCallback, resp);
|
void* ret = waku_new(configJson, (WakuCallBack) GoCallback, resp);
|
||||||
@ -72,27 +63,27 @@ package waku
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuStart(void* wakuCtx, void* resp) {
|
static void cGoWakuStart(void* wakuCtx, void* resp) {
|
||||||
WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) GoCallback, resp));
|
waku_start(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuStop(void* wakuCtx, void* resp) {
|
static void cGoWakuStop(void* wakuCtx, void* resp) {
|
||||||
WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp));
|
waku_stop(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuDestroy(void* wakuCtx, void* resp) {
|
static void cGoWakuDestroy(void* wakuCtx, void* resp) {
|
||||||
WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp));
|
waku_destroy(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) {
|
static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) {
|
||||||
WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp));
|
waku_start_discv5(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) {
|
static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) {
|
||||||
WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp));
|
waku_stop_discv5(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuVersion(void* wakuCtx, void* resp) {
|
static void cGoWakuVersion(void* wakuCtx, void* resp) {
|
||||||
WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) GoCallback, resp));
|
waku_version(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuSetEventCallback(void* wakuCtx) {
|
static void cGoWakuSetEventCallback(void* wakuCtx) {
|
||||||
@ -118,21 +109,21 @@ package waku
|
|||||||
char* encoding,
|
char* encoding,
|
||||||
void* resp) {
|
void* resp) {
|
||||||
|
|
||||||
WAKU_CALL( waku_content_topic(wakuCtx,
|
waku_content_topic(wakuCtx,
|
||||||
appName,
|
appName,
|
||||||
appVersion,
|
appVersion,
|
||||||
contentTopicName,
|
contentTopicName,
|
||||||
encoding,
|
encoding,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp) );
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) {
|
static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) {
|
||||||
WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp) );
|
waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) {
|
static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) {
|
||||||
WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp));
|
waku_default_pubsub_topic(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuRelayPublish(void* wakuCtx,
|
static void cGoWakuRelayPublish(void* wakuCtx,
|
||||||
@ -141,44 +132,44 @@ package waku
|
|||||||
int timeoutMs,
|
int timeoutMs,
|
||||||
void* resp) {
|
void* resp) {
|
||||||
|
|
||||||
WAKU_CALL (waku_relay_publish(wakuCtx,
|
waku_relay_publish(wakuCtx,
|
||||||
pubSubTopic,
|
pubSubTopic,
|
||||||
jsonWakuMessage,
|
jsonWakuMessage,
|
||||||
timeoutMs,
|
timeoutMs,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp));
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {
|
static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {
|
||||||
WAKU_CALL ( waku_relay_subscribe(wakuCtx,
|
waku_relay_subscribe(wakuCtx,
|
||||||
pubSubTopic,
|
pubSubTopic,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp) );
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuRelayAddProtectedShard(void* wakuCtx, int clusterId, int shardId, char* publicKey, void* resp) {
|
static void cGoWakuRelayAddProtectedShard(void* wakuCtx, int clusterId, int shardId, char* publicKey, void* resp) {
|
||||||
WAKU_CALL ( waku_relay_add_protected_shard(wakuCtx,
|
waku_relay_add_protected_shard(wakuCtx,
|
||||||
clusterId,
|
clusterId,
|
||||||
shardId,
|
shardId,
|
||||||
publicKey,
|
publicKey,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp) );
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {
|
static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {
|
||||||
|
|
||||||
WAKU_CALL ( waku_relay_unsubscribe(wakuCtx,
|
waku_relay_unsubscribe(wakuCtx,
|
||||||
pubSubTopic,
|
pubSubTopic,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp) );
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) {
|
static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) {
|
||||||
WAKU_CALL( waku_connect(wakuCtx,
|
waku_connect(wakuCtx,
|
||||||
peerMultiAddr,
|
peerMultiAddr,
|
||||||
timeoutMs,
|
timeoutMs,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp) );
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuDialPeer(void* wakuCtx,
|
static void cGoWakuDialPeer(void* wakuCtx,
|
||||||
@ -187,12 +178,12 @@ package waku
|
|||||||
int timeoutMs,
|
int timeoutMs,
|
||||||
void* resp) {
|
void* resp) {
|
||||||
|
|
||||||
WAKU_CALL( waku_dial_peer(wakuCtx,
|
waku_dial_peer(wakuCtx,
|
||||||
peerMultiAddr,
|
peerMultiAddr,
|
||||||
protocol,
|
protocol,
|
||||||
timeoutMs,
|
timeoutMs,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp) );
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuDialPeerById(void* wakuCtx,
|
static void cGoWakuDialPeerById(void* wakuCtx,
|
||||||
@ -201,51 +192,51 @@ package waku
|
|||||||
int timeoutMs,
|
int timeoutMs,
|
||||||
void* resp) {
|
void* resp) {
|
||||||
|
|
||||||
WAKU_CALL( waku_dial_peer_by_id(wakuCtx,
|
waku_dial_peer_by_id(wakuCtx,
|
||||||
peerId,
|
peerId,
|
||||||
protocol,
|
protocol,
|
||||||
timeoutMs,
|
timeoutMs,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp) );
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) {
|
static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) {
|
||||||
WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx,
|
waku_disconnect_peer_by_id(wakuCtx,
|
||||||
peerId,
|
peerId,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp) );
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuListenAddresses(void* wakuCtx, void* resp) {
|
static void cGoWakuListenAddresses(void* wakuCtx, void* resp) {
|
||||||
WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp) );
|
waku_listen_addresses(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuGetMyENR(void* ctx, void* resp) {
|
static void cGoWakuGetMyENR(void* ctx, void* resp) {
|
||||||
WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp) );
|
waku_get_my_enr(ctx, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuGetMyPeerId(void* ctx, void* resp) {
|
static void cGoWakuGetMyPeerId(void* ctx, void* resp) {
|
||||||
WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp) );
|
waku_get_my_peerid(ctx, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) {
|
static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) {
|
||||||
WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp) );
|
waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) {
|
static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) {
|
||||||
WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) );
|
waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) {
|
static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) {
|
||||||
WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp) );
|
waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) {
|
static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) {
|
||||||
WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp) );
|
waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) {
|
static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) {
|
||||||
WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp) );
|
waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuLightpushPublish(void* wakuCtx,
|
static void cGoWakuLightpushPublish(void* wakuCtx,
|
||||||
@ -253,11 +244,11 @@ package waku
|
|||||||
const char* jsonWakuMessage,
|
const char* jsonWakuMessage,
|
||||||
void* resp) {
|
void* resp) {
|
||||||
|
|
||||||
WAKU_CALL (waku_lightpush_publish(wakuCtx,
|
waku_lightpush_publish(wakuCtx,
|
||||||
pubSubTopic,
|
pubSubTopic,
|
||||||
jsonWakuMessage,
|
jsonWakuMessage,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp));
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuStoreQuery(void* wakuCtx,
|
static void cGoWakuStoreQuery(void* wakuCtx,
|
||||||
@ -266,32 +257,32 @@ package waku
|
|||||||
int timeoutMs,
|
int timeoutMs,
|
||||||
void* resp) {
|
void* resp) {
|
||||||
|
|
||||||
WAKU_CALL (waku_store_query(wakuCtx,
|
waku_store_query(wakuCtx,
|
||||||
jsonQuery,
|
jsonQuery,
|
||||||
peerAddr,
|
peerAddr,
|
||||||
timeoutMs,
|
timeoutMs,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp));
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuPeerExchangeQuery(void* wakuCtx,
|
static void cGoWakuPeerExchangeQuery(void* wakuCtx,
|
||||||
uint64_t numPeers,
|
uint64_t numPeers,
|
||||||
void* resp) {
|
void* resp) {
|
||||||
|
|
||||||
WAKU_CALL (waku_peer_exchange_request(wakuCtx,
|
waku_peer_exchange_request(wakuCtx,
|
||||||
numPeers,
|
numPeers,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp));
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx,
|
static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx,
|
||||||
const char* protocol,
|
const char* protocol,
|
||||||
void* resp) {
|
void* resp) {
|
||||||
|
|
||||||
WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx,
|
waku_get_peerids_by_protocol(wakuCtx,
|
||||||
protocol,
|
protocol,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp));
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cGoWakuDnsDiscovery(void* wakuCtx,
|
static void cGoWakuDnsDiscovery(void* wakuCtx,
|
||||||
@ -300,12 +291,12 @@ package waku
|
|||||||
int timeoutMs,
|
int timeoutMs,
|
||||||
void* resp) {
|
void* resp) {
|
||||||
|
|
||||||
WAKU_CALL (waku_dns_discovery(wakuCtx,
|
waku_dns_discovery(wakuCtx,
|
||||||
entTreeUrl,
|
entTreeUrl,
|
||||||
nameDnsServer,
|
nameDnsServer,
|
||||||
timeoutMs,
|
timeoutMs,
|
||||||
(WakuCallBack) GoCallback,
|
(WakuCallBack) GoCallback,
|
||||||
resp));
|
resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
*/
|
*/
|
||||||
@ -325,7 +316,6 @@ import (
|
|||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
libp2pproto "github.com/libp2p/go-libp2p/core/protocol"
|
libp2pproto "github.com/libp2p/go-libp2p/core/protocol"
|
||||||
@ -333,89 +323,12 @@ import (
|
|||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"github.com/waku-org/waku-go-bindings/waku/common"
|
"github.com/waku-org/waku-go-bindings/waku/common"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const requestTimeout = 30 * time.Second
|
const requestTimeout = 30 * time.Second
|
||||||
const MsgChanBufferSize = 100
|
const MsgChanBufferSize = 1024
|
||||||
const TopicHealthChanBufferSize = 100
|
const TopicHealthChanBufferSize = 1024
|
||||||
const ConnectionChangeChanBufferSize = 100
|
const ConnectionChangeChanBufferSize = 1024
|
||||||
|
|
||||||
type WakuConfig struct {
|
|
||||||
Host string `json:"host,omitempty"`
|
|
||||||
Nodekey string `json:"nodekey,omitempty"`
|
|
||||||
Relay bool `json:"relay"`
|
|
||||||
Store bool `json:"store,omitempty"`
|
|
||||||
LegacyStore bool `json:"legacyStore"`
|
|
||||||
Storenode string `json:"storenode,omitempty"`
|
|
||||||
StoreMessageRetentionPolicy string `json:"storeMessageRetentionPolicy,omitempty"`
|
|
||||||
StoreMessageDbUrl string `json:"storeMessageDbUrl,omitempty"`
|
|
||||||
StoreMessageDbVacuum bool `json:"storeMessageDbVacuum,omitempty"`
|
|
||||||
StoreMaxNumDbConnections int `json:"storeMaxNumDbConnections,omitempty"`
|
|
||||||
StoreResume bool `json:"storeResume,omitempty"`
|
|
||||||
Filter bool `json:"filter,omitempty"`
|
|
||||||
Filternode string `json:"filternode,omitempty"`
|
|
||||||
FilterSubscriptionTimeout int64 `json:"filterSubscriptionTimeout,omitempty"`
|
|
||||||
FilterMaxPeersToServe uint32 `json:"filterMaxPeersToServe,omitempty"`
|
|
||||||
FilterMaxCriteria uint32 `json:"filterMaxCriteria,omitempty"`
|
|
||||||
Lightpush bool `json:"lightpush,omitempty"`
|
|
||||||
LightpushNode string `json:"lightpushnode,omitempty"`
|
|
||||||
LogLevel string `json:"logLevel,omitempty"`
|
|
||||||
DnsDiscovery bool `json:"dnsDiscovery,omitempty"`
|
|
||||||
DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"`
|
|
||||||
MaxMessageSize string `json:"maxMessageSize,omitempty"`
|
|
||||||
Staticnodes []string `json:"staticnodes,omitempty"`
|
|
||||||
Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"`
|
|
||||||
Discv5Discovery bool `json:"discv5Discovery,omitempty"`
|
|
||||||
Discv5UdpPort int `json:"discv5UdpPort,omitempty"`
|
|
||||||
ClusterID uint16 `json:"clusterId,omitempty"`
|
|
||||||
Shards []uint16 `json:"shards,omitempty"`
|
|
||||||
PeerExchange bool `json:"peerExchange,omitempty"`
|
|
||||||
PeerExchangeNode string `json:"peerExchangeNode,omitempty"`
|
|
||||||
TcpPort int `json:"tcpPort,omitempty"`
|
|
||||||
RateLimits RateLimitsConfig `json:"rateLimits,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type RateLimitsConfig struct {
|
|
||||||
Filter *RateLimit `json:"-"`
|
|
||||||
Lightpush *RateLimit `json:"-"`
|
|
||||||
PeerExchange *RateLimit `json:"-"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rlc RateLimitsConfig) MarshalJSON() ([]byte, error) {
|
|
||||||
output := []string{}
|
|
||||||
if rlc.Filter != nil {
|
|
||||||
output = append(output, fmt.Sprintf("filter:%s", rlc.Filter.String()))
|
|
||||||
}
|
|
||||||
if rlc.Lightpush != nil {
|
|
||||||
output = append(output, fmt.Sprintf("lightpush:%s", rlc.Lightpush.String()))
|
|
||||||
}
|
|
||||||
if rlc.PeerExchange != nil {
|
|
||||||
output = append(output, fmt.Sprintf("px:%s", rlc.PeerExchange.String()))
|
|
||||||
}
|
|
||||||
return json.Marshal(output)
|
|
||||||
}
|
|
||||||
|
|
||||||
type RateLimitTimeUnit string
|
|
||||||
|
|
||||||
const Hour RateLimitTimeUnit = "h"
|
|
||||||
const Minute RateLimitTimeUnit = "m"
|
|
||||||
const Second RateLimitTimeUnit = "s"
|
|
||||||
const Millisecond RateLimitTimeUnit = "ms"
|
|
||||||
|
|
||||||
type RateLimit struct {
|
|
||||||
Volume int // Number of allowed messages per period
|
|
||||||
Period int // Length of each rate-limit period (in TimeUnit)
|
|
||||||
TimeUnit RateLimitTimeUnit // Time unit of the period
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rl RateLimit) String() string {
|
|
||||||
return fmt.Sprintf("%d/%d%s", rl.Volume, rl.Period, rl.TimeUnit)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rl RateLimit) MarshalJSON() ([]byte, error) {
|
|
||||||
return json.Marshal(rl.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
//export GoCallback
|
//export GoCallback
|
||||||
func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
||||||
@ -432,14 +345,14 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
|
|||||||
// WakuNode represents an instance of an nwaku node
|
// WakuNode represents an instance of an nwaku node
|
||||||
type WakuNode struct {
|
type WakuNode struct {
|
||||||
wakuCtx unsafe.Pointer
|
wakuCtx unsafe.Pointer
|
||||||
config *WakuConfig
|
config *common.WakuConfig
|
||||||
MsgChan chan common.Envelope
|
MsgChan chan common.Envelope
|
||||||
TopicHealthChan chan topicHealth
|
TopicHealthChan chan topicHealth
|
||||||
ConnectionChangeChan chan connectionChange
|
ConnectionChangeChan chan connectionChange
|
||||||
nodeName string
|
nodeName string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWakuNode(config *WakuConfig, nodeName string) (*WakuNode, error) {
|
func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error) {
|
||||||
Debug("Creating new WakuNode: %v", nodeName)
|
Debug("Creating new WakuNode: %v", nodeName)
|
||||||
n := &WakuNode{
|
n := &WakuNode{
|
||||||
config: config,
|
config: config,
|
||||||
@ -512,11 +425,12 @@ func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData un
|
|||||||
node.OnEvent(eventStr)
|
node.OnEvent(eventStr)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
errMsgField := zap.Skip()
|
|
||||||
if len != 0 {
|
if len != 0 {
|
||||||
errMsgField = zap.String("error", C.GoStringN(msg, C.int(len)))
|
errMsg := C.GoStringN(msg, C.int(len))
|
||||||
|
Error("globalEventCallback retCode not ok, retCode: %v: %v", callerRet, errMsg)
|
||||||
|
} else {
|
||||||
|
Error("globalEventCallback retCode not ok, retCode: %v", callerRet)
|
||||||
}
|
}
|
||||||
log.Error("globalEventCallback retCode not ok", zap.Int("retCode", int(callerRet)), errMsgField)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -656,7 +570,7 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
|
|||||||
if C.getRet(resp) == C.RET_OK {
|
if C.getRet(resp) == C.RET_OK {
|
||||||
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||||
if peersStr == "" {
|
if peersStr == "" {
|
||||||
Debug("No connected peers found for %s", n.nodeName)
|
Debug("No connected peers found for %v", n.nodeName)
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -665,44 +579,39 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
|
|||||||
for _, peerID := range peerIDs {
|
for _, peerID := range peerIDs {
|
||||||
id, err := peer.Decode(peerID)
|
id, err := peer.Decode(peerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Error("Failed to decode peer ID for "+n.nodeName, zap.Error(err))
|
Error("Failed to decode peer ID for %v: %v", n.nodeName, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
peers = append(peers, id)
|
peers = append(peers, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
Debug("Successfully fetched connected peers for "+n.nodeName, zap.Int("count", len(peers)))
|
Debug("Successfully fetched connected peers for %v, count: %v", n.nodeName, len(peers))
|
||||||
return peers, nil
|
return peers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
errMsg := "error GetConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
errMsg := "error GetConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||||
Error("Failed to get connected peers for "+n.nodeName, zap.String("error", errMsg))
|
Error("Failed to get connected peers for %v: %v", n.nodeName, errMsg)
|
||||||
|
|
||||||
return nil, errors.New(errMsg)
|
return nil, errors.New(errMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
|
func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
if pubsubTopic == "" {
|
if pubsubTopic == "" {
|
||||||
err := errors.New("pubsub topic is empty")
|
return errors.New("pubsub topic is empty")
|
||||||
Error("Failed to subscribe to relay: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.wakuCtx == nil {
|
wg := sync.WaitGroup{}
|
||||||
err := errors.New("wakuCtx is nil")
|
|
||||||
Error("Failed to subscribe to relay on node %s: %v", n.nodeName, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
Debug("Attempting to subscribe to relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
|
|
||||||
var resp = C.allocResp(unsafe.Pointer(&wg))
|
var resp = C.allocResp(unsafe.Pointer(&wg))
|
||||||
var cPubsubTopic = C.CString(pubsubTopic)
|
var cPubsubTopic = C.CString(pubsubTopic)
|
||||||
|
|
||||||
defer C.freeResp(resp)
|
defer C.freeResp(resp)
|
||||||
defer C.free(unsafe.Pointer(cPubsubTopic))
|
defer C.free(unsafe.Pointer(cPubsubTopic))
|
||||||
|
|
||||||
Debug("Calling cGoWakuRelaySubscribe on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
|
if n.wakuCtx == nil {
|
||||||
|
return errors.New("wakuCtx is nil")
|
||||||
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp)
|
C.cGoWakuRelaySubscribe(n.wakuCtx, cPubsubTopic, resp)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@ -811,6 +720,7 @@ func (n *WakuNode) StartDiscV5() error {
|
|||||||
|
|
||||||
Debug("Starting DiscV5 for node: %s", n.nodeName)
|
Debug("Starting DiscV5 for node: %s", n.nodeName)
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
|
|
||||||
var resp = C.allocResp(unsafe.Pointer(&wg))
|
var resp = C.allocResp(unsafe.Pointer(&wg))
|
||||||
defer C.freeResp(resp)
|
defer C.freeResp(resp)
|
||||||
|
|
||||||
@ -1092,7 +1002,7 @@ func (n *WakuNode) Destroy() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
errMsg := "error WakuDestroy: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||||
Error("Failed to destroy "+n.nodeName, zap.String("error", errMsg))
|
Error("Failed to destroy %v: %v", n.nodeName, errMsg)
|
||||||
|
|
||||||
return errors.New(errMsg)
|
return errors.New(errMsg)
|
||||||
}
|
}
|
||||||
@ -1336,7 +1246,7 @@ func (n *WakuNode) GetNumConnectedPeers() (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
numPeers := len(peers)
|
numPeers := len(peers)
|
||||||
Debug("Successfully fetched number of connected peers for "+n.nodeName, zap.Int("count", numPeers))
|
Debug("Successfully fetched number of connected peers for %v, count: %v", n.nodeName, numPeers)
|
||||||
|
|
||||||
return numPeers, nil
|
return numPeers, nil
|
||||||
}
|
}
|
||||||
@ -1358,7 +1268,7 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) {
|
|||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
tcpAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort("localhost", "0"))
|
tcpAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort("localhost", "0"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Warn("unable to resolve tcp addr: %v", zap.Error(err))
|
Warn("unable to resolve tcp addr: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tcpListener, err := net.ListenTCP("tcp", tcpAddr)
|
tcpListener, err := net.ListenTCP("tcp", tcpAddr)
|
||||||
@ -1404,29 +1314,29 @@ func GetFreePortIfNeeded(tcpPort int, discV5UDPPort int) (int, int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create & start node
|
// Create & start node
|
||||||
func StartWakuNode(nodeName string, customCfg *WakuConfig) (*WakuNode, error) {
|
func StartWakuNode(nodeName string, customCfg *common.WakuConfig) (*WakuNode, error) {
|
||||||
|
|
||||||
Debug("Initializing %s", nodeName)
|
Debug("Initializing %s", nodeName)
|
||||||
|
|
||||||
var nodeCfg WakuConfig
|
var nodeCfg common.WakuConfig
|
||||||
if customCfg == nil {
|
if customCfg == nil {
|
||||||
nodeCfg = DefaultWakuConfig
|
nodeCfg = DefaultWakuConfig
|
||||||
} else {
|
} else {
|
||||||
nodeCfg = *customCfg
|
nodeCfg = *customCfg
|
||||||
}
|
}
|
||||||
|
|
||||||
tcpPort, udpPort, err := GetFreePortIfNeeded(nodeCfg.TcpPort, nodeCfg.Discv5UdpPort)
|
tcpPort, udpPort, err := GetFreePortIfNeeded(nodeCfg.TcpPort, nodeCfg.Discv5UdpPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Error("Failed to allocate unique ports: %v", err)
|
Error("Failed to allocate unique ports: %v", err)
|
||||||
tcpPort, udpPort = 0, 0
|
tcpPort, udpPort = 0, 0
|
||||||
}
|
}
|
||||||
|
|
||||||
if nodeCfg.TcpPort == 0 {
|
if nodeCfg.TcpPort == 0 {
|
||||||
nodeCfg.TcpPort = tcpPort
|
nodeCfg.TcpPort = tcpPort
|
||||||
}
|
}
|
||||||
if nodeCfg.Discv5UdpPort == 0 {
|
if nodeCfg.Discv5UdpPort == 0 {
|
||||||
nodeCfg.Discv5UdpPort = udpPort
|
nodeCfg.Discv5UdpPort = udpPort
|
||||||
}
|
}
|
||||||
|
|
||||||
Debug("Creating %s", nodeName)
|
Debug("Creating %s", nodeName)
|
||||||
node, err := NewWakuNode(&nodeCfg, nodeName)
|
node, err := NewWakuNode(&nodeCfg, nodeName)
|
||||||
@ -1522,43 +1432,3 @@ func (n *WakuNode) DisconnectPeer(target *WakuNode) error {
|
|||||||
Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName)
|
Debug("Successfully disconnected %s from %s", n.nodeName, target.nodeName)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ConnectAllPeers(nodes []*WakuNode) error {
|
|
||||||
if len(nodes) == 0 {
|
|
||||||
Error("Cannot connect peers: node list is empty")
|
|
||||||
return errors.New("node list is empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
timeout := time.Duration(len(nodes)*2) * time.Second
|
|
||||||
Debug("Connecting nodes in a relay chain with timeout: %v", timeout)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
for i := 0; i < len(nodes)-1; i++ {
|
|
||||||
Debug("Connecting node %d to node %d", i, i+1)
|
|
||||||
err := nodes[i].ConnectPeer(nodes[i+1])
|
|
||||||
if err != nil {
|
|
||||||
Error("Failed to connect node %d to node %d: %v", i, i+1, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
<-ctx.Done()
|
|
||||||
Debug("Connections stabilized")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error {
|
|
||||||
for _, node := range nodes {
|
|
||||||
Debug("Subscribing node %s to topic %s", node.nodeName, topic)
|
|
||||||
err := node.RelaySubscribe(topic)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
Debug("Node %s successfully subscribed to topic %s", node.nodeName, topic)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|||||||
@ -175,3 +175,43 @@ func (n *WakuNode) VerifyMessageReceived(expectedMessage *pb.WakuMessage, expect
|
|||||||
return errors.New("timeout: message not received within the given duration")
|
return errors.New("timeout: message not received within the given duration")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ConnectAllPeers(nodes []*WakuNode) error {
|
||||||
|
if len(nodes) == 0 {
|
||||||
|
Error("Cannot connect peers: node list is empty")
|
||||||
|
return errors.New("node list is empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
timeout := time.Duration(len(nodes)*2) * time.Second
|
||||||
|
Debug("Connecting nodes in a relay chain with timeout: %v", timeout)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
for i := 0; i < len(nodes)-1; i++ {
|
||||||
|
Debug("Connecting node %d to node %d", i, i+1)
|
||||||
|
err := nodes[i].ConnectPeer(nodes[i+1])
|
||||||
|
if err != nil {
|
||||||
|
Error("Failed to connect node %d to node %d: %v", i, i+1, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
Debug("Connections stabilized")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error {
|
||||||
|
for _, node := range nodes {
|
||||||
|
Debug("Subscribing node %s to topic %s", node.nodeName, topic)
|
||||||
|
err := node.RelaySubscribe(topic)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
Error("Failed to subscribe node %s to topic %s: %v", node.nodeName, topic, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
Debug("Node %s successfully subscribed to topic %s", node.nodeName, topic)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user