Merge pull request #98 from logos-messaging/adapt-for-nim-ffi

adapt nwaku.go to latest nim-ffi ctx callback and userData go first
This commit is contained in:
Darshan 2025-12-14 01:37:35 +05:30 committed by GitHub
commit c1dfce50bf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 158 additions and 108 deletions

View File

@ -5,7 +5,7 @@ Go bindings for the Waku library.
## Install
```
go get -u github.com/waku-org/waku-go-bindings
go get -u github.com/logos-messaging/logos-messaging-go-bindings
```
## Dependencies

2
go.mod
View File

@ -1,4 +1,4 @@
module github.com/waku-org/waku-go-bindings
module github.com/logos-messaging/logos-messaging-go-bindings
go 1.21

View File

@ -6,7 +6,7 @@ import (
"os"
"runtime"
"github.com/waku-org/waku-go-bindings/utils"
"github.com/logos-messaging/logos-messaging-go-bindings/utils"
)
var (

View File

@ -1,30 +1,53 @@
# Makefile for Waku Go Bindings
# Directories
THIRD_PARTY_DIR := $(shell pwd)/../third_party
NWAKU_REPO := https://github.com/waku-org/nwaku
NWAKU_DIR := $(THIRD_PARTY_DIR)/nwaku
# The LIBWAKU_LIB_PATH and LIBWAKU_HEADER_PATH env vars should be defined beforehand
# Therefore, we assume the libwaku library and headers are located in the specified paths
export CGO_CFLAGS="-I${LIBWAKU_HEADER_PATH}/"
export CGO_LDFLAGS="-L${LIBWAKU_LIB_PATH}/ -lsds -Wl,-rpath,${LIBWAKU_LIB_PATH}/"
.PHONY: all clean build-libwaku build
# Expected files
HEADER_FILE := $(LIBWAKU_HEADER_PATH)/libwaku.h
LIB_FILES := $(wildcard $(LIBWAKU_LIB_PATH)/libwaku.*)
.PHONY: all clean prepare build
# Default target
all: build
# Build libwaku
build-libwaku:
@echo "Building libwaku..."
@cd $(NWAKU_DIR) && make libwaku
# Validate necessary folders and files
check-folders:
@echo Checking libwaku header directory ...
@if [ -z "$(LIBWAKU_HEADER_PATH)" ]; then \
echo "ERROR: LIBWAKU_HEADER_PATH not set"; exit 1; \
fi
@if [ ! -d "$(LIBWAKU_HEADER_PATH)" ]; then \
echo "ERROR: Header path does not exist: $(LIBWAKU_HEADER_PATH)"; exit 1; \
fi
# Build Waku Go Bindings
@echo Checking libwaku lib directory ...
@if [ -z "$(LIBWAKU_LIB_PATH)" ]; then \
echo "ERROR: LIBWAKU_LIB_PATH not set"; exit 1; \
fi
@if [ ! -d "$(LIBWAKU_LIB_PATH)" ]; then \
echo "ERROR: Library path does not exist: $(LIBWAKU_LIB_PATH)"; exit 1; \
fi
build: export CGO_CFLAGS = "-I${NWAKU_DIR}/library/"
build: export CGO_LDFLAGS = "-L${NWAKU_DIR}/build/ -lwaku -L${NWAKU_DIR} -Wl,-rpath,${NWAKU_DIR}/build/"
build:
@echo Checking for libwaku.h ...
@if [ ! -f "$(HEADER_FILE)" ]; then \
echo "ERROR: libwaku.h not found at: $(HEADER_FILE)"; exit 1; \
fi
@echo Checking for libwaku library file ...
@if [ -z "$(LIB_FILES)" ]; then \
echo "ERROR: No libwaku library file found in: $(LIBWAKU_LIB_PATH)"; exit 1; \
fi
# Build SDS Go Bindings
build: check-folders
@echo "Building Waku Go Bindings..."
go build ./...
# Clean up generated files
clean:
@echo "Cleaning up..."
@rm -rf $(THIRD_PARTY_DIR)
@rm -f waku-go-bindings
@rm -f waku-go-bindings

View File

@ -4,8 +4,8 @@ import (
"testing"
"time"
"github.com/logos-messaging/logos-messaging-go-bindings/waku/common"
"github.com/stretchr/testify/require"
"github.com/waku-org/waku-go-bindings/waku/common"
)
func TestBasicWakuNodes(t *testing.T) {
@ -29,7 +29,7 @@ func TestBasicWakuNodes(t *testing.T) {
Debug("TestBasicWakuNodes completed successfully")
}
/* artifact https://github.com/waku-org/waku-go-bindings/issues/40 */
/* artifact https://github.com/logos-messaging/logos-messaging-go-bindings/issues/40 */
func TestNodeRestart(t *testing.T) {
t.Skip("Skipping test for open artifact ")
Debug("Starting TestNodeRestart")

View File

@ -55,32 +55,32 @@ package waku
static void* cGoWakuNew(const char* configJson, void* resp) {
// We pass NULL because we are not interested in retrieving data from this callback
void* ret = waku_new(configJson, (WakuCallBack) WakuGoCallback, resp);
void* ret = waku_new(configJson, (FFICallBack) WakuGoCallback, resp);
return ret;
}
static void cGoWakuStart(void* wakuCtx, void* resp) {
waku_start(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_start(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuStop(void* wakuCtx, void* resp) {
waku_stop(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_stop(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuDestroy(void* wakuCtx, void* resp) {
waku_destroy(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_destroy(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) {
waku_start_discv5(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_start_discv5(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) {
waku_stop_discv5(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_stop_discv5(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuVersion(void* wakuCtx, void* resp) {
waku_version(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_version(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuSetEventCallback(void* wakuCtx) {
@ -96,7 +96,7 @@ package waku
// This technique is needed because cgo only allows to export Go functions and not methods.
waku_set_event_callback(wakuCtx, (WakuCallBack) wakuGlobalEventCallback, wakuCtx);
set_event_callback(wakuCtx, (FFICallBack) wakuGlobalEventCallback, wakuCtx);
}
static void cGoWakuContentTopic(void* wakuCtx,
@ -107,20 +107,21 @@ package waku
void* resp) {
waku_content_topic(wakuCtx,
(FFICallBack) WakuGoCallback,
resp,
appName,
appVersion,
contentTopicName,
encoding,
(WakuCallBack) WakuGoCallback,
resp);
encoding
);
}
static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) {
waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) WakuGoCallback, resp);
waku_pubsub_topic(wakuCtx, (FFICallBack) WakuGoCallback, resp, topicName);
}
static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) {
waku_default_pubsub_topic(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_default_pubsub_topic(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuRelayPublish(void* wakuCtx,
@ -130,43 +131,48 @@ package waku
void* resp) {
waku_relay_publish(wakuCtx,
(FFICallBack) WakuGoCallback,
resp,
pubSubTopic,
jsonWakuMessage,
timeoutMs,
(WakuCallBack) WakuGoCallback,
resp);
timeoutMs
);
}
static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {
waku_relay_subscribe(wakuCtx,
pubSubTopic,
(WakuCallBack) WakuGoCallback,
resp);
(FFICallBack) WakuGoCallback,
resp,
pubSubTopic
);
}
static void cGoWakuRelayAddProtectedShard(void* wakuCtx, int clusterId, int shardId, char* publicKey, void* resp) {
waku_relay_add_protected_shard(wakuCtx,
(FFICallBack) WakuGoCallback,
resp,
clusterId,
shardId,
publicKey,
(WakuCallBack) WakuGoCallback,
resp);
publicKey
);
}
static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {
waku_relay_unsubscribe(wakuCtx,
pubSubTopic,
(WakuCallBack) WakuGoCallback,
resp);
(FFICallBack) WakuGoCallback,
resp,
pubSubTopic
);
}
static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) {
waku_connect(wakuCtx,
(FFICallBack) WakuGoCallback,
resp,
peerMultiAddr,
timeoutMs,
(WakuCallBack) WakuGoCallback,
resp);
timeoutMs
);
}
static void cGoWakuDialPeer(void* wakuCtx,
@ -176,11 +182,12 @@ package waku
void* resp) {
waku_dial_peer(wakuCtx,
(FFICallBack) WakuGoCallback,
resp,
peerMultiAddr,
protocol,
timeoutMs,
(WakuCallBack) WakuGoCallback,
resp);
timeoutMs
);
}
static void cGoWakuDialPeerById(void* wakuCtx,
@ -190,68 +197,70 @@ package waku
void* resp) {
waku_dial_peer_by_id(wakuCtx,
(FFICallBack) WakuGoCallback,
resp,
peerId,
protocol,
timeoutMs,
(WakuCallBack) WakuGoCallback,
resp);
timeoutMs
);
}
static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) {
waku_disconnect_peer_by_id(wakuCtx,
peerId,
(WakuCallBack) WakuGoCallback,
resp);
(FFICallBack) WakuGoCallback,
resp,
peerId
);
}
static void cGoWakuDisconnectAllPeers(void* wakuCtx, void* resp) {
waku_disconnect_all_peers(wakuCtx,
(WakuCallBack) WakuGoCallback,
(FFICallBack) WakuGoCallback,
resp);
}
static void cGoWakuListenAddresses(void* wakuCtx, void* resp) {
waku_listen_addresses(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_listen_addresses(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuGetMyENR(void* ctx, void* resp) {
waku_get_my_enr(ctx, (WakuCallBack) WakuGoCallback, resp);
waku_get_my_enr(ctx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuGetMyPeerId(void* ctx, void* resp) {
waku_get_my_peerid(ctx, (WakuCallBack) WakuGoCallback, resp);
waku_get_my_peerid(ctx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) {
waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) WakuGoCallback, resp);
waku_ping_peer(ctx, (FFICallBack) WakuGoCallback, resp, peerAddr, timeoutMs);
}
static void cGoWakuGetPeersInMesh(void* ctx, char* pubSubTopic, void* resp) {
waku_relay_get_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) WakuGoCallback, resp);
waku_relay_get_peers_in_mesh(ctx, (FFICallBack) WakuGoCallback, resp, pubSubTopic);
}
static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) {
waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) WakuGoCallback, resp);
waku_relay_get_num_peers_in_mesh(ctx, (FFICallBack) WakuGoCallback, resp, pubSubTopic);
}
static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) {
waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) WakuGoCallback, resp);
waku_relay_get_num_connected_peers(ctx, (FFICallBack) WakuGoCallback, resp, pubSubTopic);
}
static void cGoWakuGetConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) {
waku_relay_get_connected_peers(ctx, pubSubTopic, (WakuCallBack) WakuGoCallback, resp);
waku_relay_get_connected_peers(ctx, (FFICallBack) WakuGoCallback, resp, pubSubTopic);
}
static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) {
waku_get_connected_peers(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_get_connected_peers(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) {
waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_get_peerids_from_peerstore(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuGetConnectedPeersInfo(void* wakuCtx, void* resp) {
waku_get_connected_peers_info(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_get_connected_peers_info(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuLightpushPublish(void* wakuCtx,
@ -260,10 +269,11 @@ package waku
void* resp) {
waku_lightpush_publish(wakuCtx,
(FFICallBack) WakuGoCallback,
resp,
pubSubTopic,
jsonWakuMessage,
(WakuCallBack) WakuGoCallback,
resp);
jsonWakuMessage
);
}
static void cGoWakuStoreQuery(void* wakuCtx,
@ -273,11 +283,12 @@ package waku
void* resp) {
waku_store_query(wakuCtx,
(FFICallBack) WakuGoCallback,
resp,
jsonQuery,
peerAddr,
timeoutMs,
(WakuCallBack) WakuGoCallback,
resp);
timeoutMs
);
}
static void cGoWakuPeerExchangeQuery(void* wakuCtx,
@ -285,9 +296,10 @@ package waku
void* resp) {
waku_peer_exchange_request(wakuCtx,
numPeers,
(WakuCallBack) WakuGoCallback,
resp);
(FFICallBack) WakuGoCallback,
resp,
numPeers
);
}
static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx,
@ -295,9 +307,10 @@ package waku
void* resp) {
waku_get_peerids_by_protocol(wakuCtx,
protocol,
(WakuCallBack) WakuGoCallback,
resp);
(FFICallBack) WakuGoCallback,
resp,
protocol
);
}
static void cGoWakuDnsDiscovery(void* wakuCtx,
@ -307,19 +320,20 @@ package waku
void* resp) {
waku_dns_discovery(wakuCtx,
(FFICallBack) WakuGoCallback,
resp,
entTreeUrl,
nameDnsServer,
timeoutMs,
(WakuCallBack) WakuGoCallback,
resp);
timeoutMs
);
}
static void cGoWakuIsOnline(void* wakuCtx, void* resp) {
waku_is_online(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_is_online(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
static void cGoWakuGetMetrics(void* wakuCtx, void* resp) {
waku_get_metrics(wakuCtx, (WakuCallBack) WakuGoCallback, resp);
waku_get_metrics(wakuCtx, (FFICallBack) WakuGoCallback, resp);
}
*/
@ -338,6 +352,8 @@ import (
"time"
"unsafe"
"github.com/logos-messaging/logos-messaging-go-bindings/waku/timesource"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
@ -346,7 +362,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/waku-go-bindings/waku/common"
"github.com/logos-messaging/logos-messaging-go-bindings/waku/common"
)
const requestTimeout = 30 * time.Second
@ -374,6 +390,7 @@ type WakuNode struct {
TopicHealthChan chan topicHealth
ConnectionChangeChan chan connectionChange
nodeName string
_ timesource.Timesource
}
func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error) {

View File

@ -13,11 +13,11 @@ import (
"github.com/cenkalti/backoff/v3"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/logos-messaging/logos-messaging-go-bindings/waku/common"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/waku-go-bindings/waku/common"
)
// In order to run this test, you must run an nwaku node

View File

@ -16,9 +16,9 @@ import (
"github.com/cenkalti/backoff/v3"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/logos-messaging/logos-messaging-go-bindings/utils"
"github.com/logos-messaging/logos-messaging-go-bindings/waku/common"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/waku-go-bindings/utils"
"github.com/waku-org/waku-go-bindings/waku/common"
"google.golang.org/protobuf/proto"
)
@ -296,9 +296,9 @@ func captureMemory(testName, phase string) {
runtime.ReadMemStats(&ms)
heapKB := ms.HeapAlloc / 1024
rssKB, _ := utils.GetRSSKB()
rssKB, _ := utils.GetRSSKB()
Debug("[%s] Memory usage (%s): %d KB (RSS %d KB)", testName, phase, heapKB, rssKB)
_ = recordMemoryMetricsPX(testName, phase, heapKB, rssKB)
}
}

View File

@ -5,8 +5,8 @@ import (
"testing"
"time"
"github.com/logos-messaging/logos-messaging-go-bindings/waku/common"
"github.com/stretchr/testify/require"
"github.com/waku-org/waku-go-bindings/waku/common"
)
// Test node connect & disconnect peers

View File

@ -7,9 +7,9 @@ import (
"time"
"github.com/cenkalti/backoff/v3"
"github.com/logos-messaging/logos-messaging-go-bindings/waku/common"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/waku-go-bindings/waku/common"
"google.golang.org/protobuf/proto"
)

View File

@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/waku-go-bindings/waku/common"
"github.com/logos-messaging/logos-messaging-go-bindings/waku/common"
"google.golang.org/protobuf/proto"
)

View File

@ -10,8 +10,8 @@ import (
"testing"
"time"
"github.com/logos-messaging/logos-messaging-go-bindings/waku/common"
"github.com/stretchr/testify/require"
"github.com/waku-org/waku-go-bindings/waku/common"
// "go.uber.org/zap/zapcore"
"google.golang.org/protobuf/proto"
@ -107,9 +107,9 @@ func TestStressStoreQuery5kMessagesWithPagination(t *testing.T) {
if i%10 == 0 {
storeQueryRequest := &common.StoreQueryRequest{
TimeStart: queryTimestamp,
IncludeData: true,
PaginationLimit: proto.Uint64(50),
TimeStart: queryTimestamp,
IncludeData: true,
PaginationLimit: proto.Uint64(50),
PaginationForward: false,
}
@ -117,7 +117,7 @@ func TestStressStoreQuery5kMessagesWithPagination(t *testing.T) {
require.NoError(t, err, "Failed to query store messages")
require.Greater(t, len(*storedmsgs.Messages), 0, "Expected at least one stored message")
}
Debug("##Iteration #%d",i)
Debug("##Iteration #%d", i)
}
captureMemory(t.Name(), "at end")
@ -144,8 +144,8 @@ func TestStressHighThroughput10kPublish(t *testing.T) {
captureMemory(t.Name(), "at start")
const totalMessages = 1000
var pubsubTopic = DefaultPubsubTopic
const totalMessages = 1000
var pubsubTopic = DefaultPubsubTopic
for i := 0; i < totalMessages; i++ {
msg := node1.CreateMessage()
@ -153,17 +153,15 @@ func TestStressHighThroughput10kPublish(t *testing.T) {
hash, err := node1.RelayPublishNoCTX(pubsubTopic, msg)
require.NoError(t, err, "publish failed @%d", i)
Debug("Iteration-10kpublish #%d",i)
err = node2.VerifyMessageReceived(msg, hash )
Debug("Iteration-10kpublish #%d", i)
err = node2.VerifyMessageReceived(msg, hash)
require.NoError(t, err, "verification failed @%d", i)
}
captureMemory(t.Name(), "at end")
}
func TestStressConnectDisconnect1kIteration(t *testing.T) {
captureMemory(t.Name(), "at start")
@ -342,7 +340,7 @@ func TestStressLargePayloadEphemeralMessagesEndurance(t *testing.T) {
func TestStress2Nodes2kIterationTearDown(t *testing.T) {
captureMemory(t.Name(), "at start")
var err error
var err error
totalIterations := 2000
for i := 1; i <= totalIterations; i++ {
var nodes []*WakuNode

View File

@ -4,7 +4,7 @@ import (
"fmt"
"time"
"github.com/waku-org/waku-go-bindings/waku/common"
"github.com/logos-messaging/logos-messaging-go-bindings/waku/common"
"google.golang.org/protobuf/proto"
)

View File

@ -0,0 +1,12 @@
package timesource
import (
"context"
"time"
)
type Timesource interface {
Now() time.Time
Start(ctx context.Context) error
Stop()
}