From 8c92ab1daf64fa31c51dcd6e1c10aa469961f9a1 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 12 Dec 2025 17:41:02 +0100 Subject: [PATCH 1/7] adapt nwaku.go to latest nim-ffi ctx callback and userData go first --- waku/nwaku.go | 146 +++++++++++++++++++++++++++----------------------- 1 file changed, 80 insertions(+), 66 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index 5fa6287..fe7f315 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -55,32 +55,32 @@ package waku static void* cGoWakuNew(const char* configJson, void* resp) { // We pass NULL because we are not interested in retrieving data from this callback - void* ret = waku_new(configJson, (WakuCallBack) WakuGoCallback, resp); + void* ret = waku_new(configJson, (FFICallBack) WakuGoCallback, resp); return ret; } static void cGoWakuStart(void* wakuCtx, void* resp) { - waku_start(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_start(wakuCtx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuStop(void* wakuCtx, void* resp) { - waku_stop(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_stop(wakuCtx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuDestroy(void* wakuCtx, void* resp) { - waku_destroy(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_destroy(wakuCtx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { - waku_start_discv5(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_start_discv5(wakuCtx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { - waku_stop_discv5(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_stop_discv5(wakuCtx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuVersion(void* wakuCtx, void* resp) { - waku_version(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_version(wakuCtx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuSetEventCallback(void* wakuCtx) { @@ -96,7 +96,7 @@ package waku // This technique is needed because cgo only allows to export Go functions and not methods. - waku_set_event_callback(wakuCtx, (WakuCallBack) wakuGlobalEventCallback, wakuCtx); + set_event_callback(wakuCtx, (FFICallBack) wakuGlobalEventCallback, wakuCtx); } static void cGoWakuContentTopic(void* wakuCtx, @@ -107,20 +107,21 @@ package waku void* resp) { waku_content_topic(wakuCtx, + (FFICallBack) WakuGoCallback, + resp, appName, appVersion, contentTopicName, - encoding, - (WakuCallBack) WakuGoCallback, - resp); + encoding + ); } static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { - waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) WakuGoCallback, resp); + waku_pubsub_topic(wakuCtx, (FFICallBack) WakuGoCallback, resp, topicName); } static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { - waku_default_pubsub_topic(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_default_pubsub_topic(wakuCtx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuRelayPublish(void* wakuCtx, @@ -130,43 +131,48 @@ package waku void* resp) { waku_relay_publish(wakuCtx, + (FFICallBack) WakuGoCallback, + resp, pubSubTopic, jsonWakuMessage, - timeoutMs, - (WakuCallBack) WakuGoCallback, - resp); + timeoutMs + ); } static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { waku_relay_subscribe(wakuCtx, - pubSubTopic, - (WakuCallBack) WakuGoCallback, - resp); + (FFICallBack) WakuGoCallback, + resp, + pubSubTopic + ); } static void cGoWakuRelayAddProtectedShard(void* wakuCtx, int clusterId, int shardId, char* publicKey, void* resp) { waku_relay_add_protected_shard(wakuCtx, + (FFICallBack) WakuGoCallback, + resp, clusterId, shardId, - publicKey, - (WakuCallBack) WakuGoCallback, - resp); + publicKey + ); } static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { waku_relay_unsubscribe(wakuCtx, - pubSubTopic, - (WakuCallBack) WakuGoCallback, - resp); + (FFICallBack) WakuGoCallback, + resp, + pubSubTopic + ); } static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) { waku_connect(wakuCtx, + (FFICallBack) WakuGoCallback, + resp, peerMultiAddr, - timeoutMs, - (WakuCallBack) WakuGoCallback, - resp); + timeoutMs + ); } static void cGoWakuDialPeer(void* wakuCtx, @@ -176,11 +182,12 @@ package waku void* resp) { waku_dial_peer(wakuCtx, + (FFICallBack) WakuGoCallback, + resp, peerMultiAddr, protocol, - timeoutMs, - (WakuCallBack) WakuGoCallback, - resp); + timeoutMs + ); } static void cGoWakuDialPeerById(void* wakuCtx, @@ -190,68 +197,70 @@ package waku void* resp) { waku_dial_peer_by_id(wakuCtx, + (FFICallBack) WakuGoCallback, + resp, peerId, protocol, - timeoutMs, - (WakuCallBack) WakuGoCallback, - resp); + timeoutMs + ); } static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { waku_disconnect_peer_by_id(wakuCtx, - peerId, - (WakuCallBack) WakuGoCallback, - resp); + (FFICallBack) WakuGoCallback, + resp, + peerId + ); } static void cGoWakuDisconnectAllPeers(void* wakuCtx, void* resp) { waku_disconnect_all_peers(wakuCtx, - (WakuCallBack) WakuGoCallback, + (FFICallBack) WakuGoCallback, resp); } static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { - waku_listen_addresses(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_listen_addresses(wakuCtx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuGetMyENR(void* ctx, void* resp) { - waku_get_my_enr(ctx, (WakuCallBack) WakuGoCallback, resp); + waku_get_my_enr(ctx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuGetMyPeerId(void* ctx, void* resp) { - waku_get_my_peerid(ctx, (WakuCallBack) WakuGoCallback, resp); + waku_get_my_peerid(ctx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) { - waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) WakuGoCallback, resp); + waku_ping_peer(ctx, (FFICallBack) WakuGoCallback, resp, peerAddr, timeoutMs); } static void cGoWakuGetPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { - waku_relay_get_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) WakuGoCallback, resp); + waku_relay_get_peers_in_mesh(ctx, (FFICallBack) WakuGoCallback, resp, pubSubTopic); } static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { - waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) WakuGoCallback, resp); + waku_relay_get_num_peers_in_mesh(ctx, (FFICallBack) WakuGoCallback, resp, pubSubTopic); } static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { - waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) WakuGoCallback, resp); + waku_relay_get_num_connected_peers(ctx, (FFICallBack) WakuGoCallback, resp, pubSubTopic); } static void cGoWakuGetConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { - waku_relay_get_connected_peers(ctx, pubSubTopic, (WakuCallBack) WakuGoCallback, resp); + waku_relay_get_connected_peers(ctx, (FFICallBack) WakuGoCallback, resp, pubSubTopic); } static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { - waku_get_connected_peers(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_get_connected_peers(wakuCtx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { - waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_get_peerids_from_peerstore(wakuCtx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuGetConnectedPeersInfo(void* wakuCtx, void* resp) { - waku_get_connected_peers_info(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_get_connected_peers_info(wakuCtx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuLightpushPublish(void* wakuCtx, @@ -260,10 +269,11 @@ package waku void* resp) { waku_lightpush_publish(wakuCtx, + (FFICallBack) WakuGoCallback, + resp, pubSubTopic, - jsonWakuMessage, - (WakuCallBack) WakuGoCallback, - resp); + jsonWakuMessage + ); } static void cGoWakuStoreQuery(void* wakuCtx, @@ -273,11 +283,12 @@ package waku void* resp) { waku_store_query(wakuCtx, + (FFICallBack) WakuGoCallback, + resp, jsonQuery, peerAddr, - timeoutMs, - (WakuCallBack) WakuGoCallback, - resp); + timeoutMs + ); } static void cGoWakuPeerExchangeQuery(void* wakuCtx, @@ -285,9 +296,10 @@ package waku void* resp) { waku_peer_exchange_request(wakuCtx, - numPeers, - (WakuCallBack) WakuGoCallback, - resp); + (FFICallBack) WakuGoCallback, + resp, + numPeers + ); } static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx, @@ -295,9 +307,10 @@ package waku void* resp) { waku_get_peerids_by_protocol(wakuCtx, - protocol, - (WakuCallBack) WakuGoCallback, - resp); + (FFICallBack) WakuGoCallback, + resp, + protocol + ); } static void cGoWakuDnsDiscovery(void* wakuCtx, @@ -307,19 +320,20 @@ package waku void* resp) { waku_dns_discovery(wakuCtx, + (FFICallBack) WakuGoCallback, + resp, entTreeUrl, nameDnsServer, - timeoutMs, - (WakuCallBack) WakuGoCallback, - resp); + timeoutMs + ); } static void cGoWakuIsOnline(void* wakuCtx, void* resp) { - waku_is_online(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_is_online(wakuCtx, (FFICallBack) WakuGoCallback, resp); } static void cGoWakuGetMetrics(void* wakuCtx, void* resp) { - waku_get_metrics(wakuCtx, (WakuCallBack) WakuGoCallback, resp); + waku_get_metrics(wakuCtx, (FFICallBack) WakuGoCallback, resp); } */ From 9a9a9e5c88c8ece454418ead4444eb00d98a3615 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 12 Dec 2025 22:27:02 +0100 Subject: [PATCH 2/7] update go module name --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index dcfaad8..bbfeb1d 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/waku-org/waku-go-bindings +module github.com/logos-messaging/logos-messaging-go-bindings go 1.21 From 3d5e8e550beb7d93d42fdaeee7be0baa6f074d26 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 12 Dec 2025 22:48:53 +0100 Subject: [PATCH 3/7] chore: waku-go-bindings -> logos-messaging-go-bindings --- README.md | 2 +- tools/memory_record.go | 2 +- waku/nodes_basic_test.go | 4 ++-- waku/nwaku.go | 2 +- waku/nwaku_test.go | 2 +- waku/nwaku_test_utils.go | 8 ++++---- waku/peer_connections_test.go | 2 +- waku/relay_test.go | 2 +- waku/store_test.go | 2 +- waku/stress_test.go | 22 ++++++++++------------ waku/test_data.go | 2 +- 11 files changed, 24 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 77b812d..4496047 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ Go bindings for the Waku library. ## Install ``` -go get -u github.com/waku-org/waku-go-bindings +go get -u github.com/logos-messaging/logos-messaging-go-bindings ``` ## Dependencies diff --git a/tools/memory_record.go b/tools/memory_record.go index 6ad63af..540a72a 100644 --- a/tools/memory_record.go +++ b/tools/memory_record.go @@ -6,7 +6,7 @@ import ( "os" "runtime" - "github.com/waku-org/waku-go-bindings/utils" + "github.com/logos-messaging/logos-messaging-go-bindings/utils" ) var ( diff --git a/waku/nodes_basic_test.go b/waku/nodes_basic_test.go index f486945..93cba02 100644 --- a/waku/nodes_basic_test.go +++ b/waku/nodes_basic_test.go @@ -4,8 +4,8 @@ import ( "testing" "time" + "github.com/logos-messaging/logos-messaging-go-bindings/waku/common" "github.com/stretchr/testify/require" - "github.com/waku-org/waku-go-bindings/waku/common" ) func TestBasicWakuNodes(t *testing.T) { @@ -29,7 +29,7 @@ func TestBasicWakuNodes(t *testing.T) { Debug("TestBasicWakuNodes completed successfully") } -/* artifact https://github.com/waku-org/waku-go-bindings/issues/40 */ +/* artifact https://github.com/logos-messaging/logos-messaging-go-bindings/issues/40 */ func TestNodeRestart(t *testing.T) { t.Skip("Skipping test for open artifact ") Debug("Starting TestNodeRestart") diff --git a/waku/nwaku.go b/waku/nwaku.go index fe7f315..28e2bd9 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -360,7 +360,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/utils" - "github.com/waku-org/waku-go-bindings/waku/common" + "github.com/logos-messaging/logos-messaging-go-bindings/waku/common" ) const requestTimeout = 30 * time.Second diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index 61a7fcd..59ccb09 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -13,11 +13,11 @@ import ( "github.com/cenkalti/backoff/v3" "github.com/libp2p/go-libp2p/core/peer" + "github.com/logos-messaging/logos-messaging-go-bindings/waku/common" ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" - "github.com/waku-org/waku-go-bindings/waku/common" ) // In order to run this test, you must run an nwaku node diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index 4da8272..75c07c0 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -16,9 +16,9 @@ import ( "github.com/cenkalti/backoff/v3" "github.com/libp2p/go-libp2p/core/peer" + "github.com/logos-messaging/logos-messaging-go-bindings/utils" + "github.com/logos-messaging/logos-messaging-go-bindings/waku/common" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/waku-go-bindings/utils" - "github.com/waku-org/waku-go-bindings/waku/common" "google.golang.org/protobuf/proto" ) @@ -296,9 +296,9 @@ func captureMemory(testName, phase string) { runtime.ReadMemStats(&ms) heapKB := ms.HeapAlloc / 1024 - rssKB, _ := utils.GetRSSKB() + rssKB, _ := utils.GetRSSKB() Debug("[%s] Memory usage (%s): %d KB (RSS %d KB)", testName, phase, heapKB, rssKB) _ = recordMemoryMetricsPX(testName, phase, heapKB, rssKB) -} \ No newline at end of file +} diff --git a/waku/peer_connections_test.go b/waku/peer_connections_test.go index c18bcea..fad825c 100644 --- a/waku/peer_connections_test.go +++ b/waku/peer_connections_test.go @@ -5,8 +5,8 @@ import ( "testing" "time" + "github.com/logos-messaging/logos-messaging-go-bindings/waku/common" "github.com/stretchr/testify/require" - "github.com/waku-org/waku-go-bindings/waku/common" ) // Test node connect & disconnect peers diff --git a/waku/relay_test.go b/waku/relay_test.go index 4c4f5f9..414bbbe 100644 --- a/waku/relay_test.go +++ b/waku/relay_test.go @@ -7,9 +7,9 @@ import ( "time" "github.com/cenkalti/backoff/v3" + "github.com/logos-messaging/logos-messaging-go-bindings/waku/common" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/waku-go-bindings/waku/common" "google.golang.org/protobuf/proto" ) diff --git a/waku/store_test.go b/waku/store_test.go index 2609c8b..befedc4 100644 --- a/waku/store_test.go +++ b/waku/store_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/waku-go-bindings/waku/common" + "github.com/logos-messaging/logos-messaging-go-bindings/waku/common" "google.golang.org/protobuf/proto" ) diff --git a/waku/stress_test.go b/waku/stress_test.go index ceea537..142df92 100644 --- a/waku/stress_test.go +++ b/waku/stress_test.go @@ -10,8 +10,8 @@ import ( "testing" "time" + "github.com/logos-messaging/logos-messaging-go-bindings/waku/common" "github.com/stretchr/testify/require" - "github.com/waku-org/waku-go-bindings/waku/common" // "go.uber.org/zap/zapcore" "google.golang.org/protobuf/proto" @@ -107,9 +107,9 @@ func TestStressStoreQuery5kMessagesWithPagination(t *testing.T) { if i%10 == 0 { storeQueryRequest := &common.StoreQueryRequest{ - TimeStart: queryTimestamp, - IncludeData: true, - PaginationLimit: proto.Uint64(50), + TimeStart: queryTimestamp, + IncludeData: true, + PaginationLimit: proto.Uint64(50), PaginationForward: false, } @@ -117,7 +117,7 @@ func TestStressStoreQuery5kMessagesWithPagination(t *testing.T) { require.NoError(t, err, "Failed to query store messages") require.Greater(t, len(*storedmsgs.Messages), 0, "Expected at least one stored message") } - Debug("##Iteration #%d",i) + Debug("##Iteration #%d", i) } captureMemory(t.Name(), "at end") @@ -144,8 +144,8 @@ func TestStressHighThroughput10kPublish(t *testing.T) { captureMemory(t.Name(), "at start") - const totalMessages = 1000 - var pubsubTopic = DefaultPubsubTopic + const totalMessages = 1000 + var pubsubTopic = DefaultPubsubTopic for i := 0; i < totalMessages; i++ { msg := node1.CreateMessage() @@ -153,17 +153,15 @@ func TestStressHighThroughput10kPublish(t *testing.T) { hash, err := node1.RelayPublishNoCTX(pubsubTopic, msg) require.NoError(t, err, "publish failed @%d", i) - Debug("Iteration-10kpublish #%d",i) - err = node2.VerifyMessageReceived(msg, hash ) + Debug("Iteration-10kpublish #%d", i) + err = node2.VerifyMessageReceived(msg, hash) require.NoError(t, err, "verification failed @%d", i) - } captureMemory(t.Name(), "at end") } - func TestStressConnectDisconnect1kIteration(t *testing.T) { captureMemory(t.Name(), "at start") @@ -342,7 +340,7 @@ func TestStressLargePayloadEphemeralMessagesEndurance(t *testing.T) { func TestStress2Nodes2kIterationTearDown(t *testing.T) { captureMemory(t.Name(), "at start") - var err error + var err error totalIterations := 2000 for i := 1; i <= totalIterations; i++ { var nodes []*WakuNode diff --git a/waku/test_data.go b/waku/test_data.go index 920713c..463e17e 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "github.com/waku-org/waku-go-bindings/waku/common" + "github.com/logos-messaging/logos-messaging-go-bindings/waku/common" "google.golang.org/protobuf/proto" ) From 5ec2ab11de075000c453741e9067f377c920a8fc Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Tue, 25 Nov 2025 17:49:42 +0100 Subject: [PATCH 4/7] waku/Makefile now assumes lib already exists --- waku/Makefile | 53 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/waku/Makefile b/waku/Makefile index b0f68a9..71bcae5 100644 --- a/waku/Makefile +++ b/waku/Makefile @@ -1,30 +1,53 @@ # Makefile for Waku Go Bindings -# Directories -THIRD_PARTY_DIR := $(shell pwd)/../third_party -NWAKU_REPO := https://github.com/waku-org/nwaku -NWAKU_DIR := $(THIRD_PARTY_DIR)/nwaku +# The LIBWAKU_LIB_PATH and LIBWAKU_HEADER_PATH env vars should be defined beforehand +# Therefore, we assume the libwaku library and headers are located in the specified paths +export CGO_CFLAGS="-I${LIBWAKU_HEADER_PATH}/" +export CGO_LDFLAGS="-L${LIBWAKU_LIB_PATH}/ -lsds -Wl,-rpath,${LIBWAKU_LIB_PATH}/" -.PHONY: all clean build-libwaku build +# Expected files +HEADER_FILE := $(LIBWAKU_HEADER_PATH)/libwaku.h +LIB_FILES := $(wildcard $(LIBWAKU_LIB_PATH)/libwaku.*) + +.PHONY: all clean prepare build # Default target all: build -# Build libwaku -build-libwaku: - @echo "Building libwaku..." - @cd $(NWAKU_DIR) && make libwaku +# Validate necessary folders and files +check-folders: + @echo Checking libwaku header directory ... + @if [ -z "$(LIBWAKU_HEADER_PATH)" ]; then \ + echo "ERROR: LIBWAKU_HEADER_PATH not set"; exit 1; \ + fi + @if [ ! -d "$(LIBWAKU_HEADER_PATH)" ]; then \ + echo "ERROR: Header path does not exist: $(LIBWAKU_HEADER_PATH)"; exit 1; \ + fi -# Build Waku Go Bindings + @echo Checking libwaku lib directory ... + @if [ -z "$(LIBWAKU_LIB_PATH)" ]; then \ + echo "ERROR: LIBWAKU_LIB_PATH not set"; exit 1; \ + fi + @if [ ! -d "$(LIBWAKU_LIB_PATH)" ]; then \ + echo "ERROR: Library path does not exist: $(LIBWAKU_LIB_PATH)"; exit 1; \ + fi -build: export CGO_CFLAGS = "-I${NWAKU_DIR}/library/" -build: export CGO_LDFLAGS = "-L${NWAKU_DIR}/build/ -lwaku -L${NWAKU_DIR} -Wl,-rpath,${NWAKU_DIR}/build/" -build: + @echo Checking for libwaku.h ... + @if [ ! -f "$(HEADER_FILE)" ]; then \ + echo "ERROR: libwaku.h not found at: $(HEADER_FILE)"; exit 1; \ + fi + + @echo Checking for libwaku library file ... + @if [ -z "$(LIB_FILES)" ]; then \ + echo "ERROR: No libwaku library file found in: $(LIBWAKU_LIB_PATH)"; exit 1; \ + fi + +# Build SDS Go Bindings +build: check-folders @echo "Building Waku Go Bindings..." go build ./... # Clean up generated files clean: @echo "Cleaning up..." - @rm -rf $(THIRD_PARTY_DIR) - @rm -f waku-go-bindings \ No newline at end of file + @rm -f waku-go-bindings From 00a74606d37b91cc21c096bd0efbbf244de36045 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Tue, 25 Nov 2025 22:08:04 +0100 Subject: [PATCH 5/7] adding timesource package --- waku/nwaku.go | 3 + waku/timesource/ntp.go | 210 ++++++++++++++++++++++++++++ waku/timesource/ntp_test.go | 254 ++++++++++++++++++++++++++++++++++ waku/timesource/timesource.go | 12 ++ waku/timesource/wall.go | 26 ++++ 5 files changed, 505 insertions(+) create mode 100644 waku/timesource/ntp.go create mode 100644 waku/timesource/ntp_test.go create mode 100644 waku/timesource/timesource.go create mode 100644 waku/timesource/wall.go diff --git a/waku/nwaku.go b/waku/nwaku.go index 28e2bd9..0cd2fdb 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -352,6 +352,8 @@ import ( "time" "unsafe" + "github.com/waku-org/waku-go-bindings/waku/timesource" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" @@ -388,6 +390,7 @@ type WakuNode struct { TopicHealthChan chan topicHealth ConnectionChangeChan chan connectionChange nodeName string + _ timesource.Timesource } func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error) { diff --git a/waku/timesource/ntp.go b/waku/timesource/ntp.go new file mode 100644 index 0000000..3454631 --- /dev/null +++ b/waku/timesource/ntp.go @@ -0,0 +1,210 @@ +package timesource + +import ( + "bytes" + "context" + "errors" + "sort" + "sync" + "time" + + "github.com/beevik/ntp" + "go.uber.org/zap" +) + +const ( + // DefaultMaxAllowedFailures defines how many failures will be tolerated. + DefaultMaxAllowedFailures = 1 + + // FastNTPSyncPeriod period between ntp synchronizations before the first + // successful connection. + FastNTPSyncPeriod = 2 * time.Minute + + // SlowNTPSyncPeriod period between ntp synchronizations after the first + // successful connection. + SlowNTPSyncPeriod = 1 * time.Hour + + // DefaultRPCTimeout defines write deadline for single ntp server request. + DefaultRPCTimeout = 2 * time.Second +) + +// DefaultServers will be resolved to the closest available, +// and with high probability resolved to the different IPs +var DefaultServers = []string{ + "0.pool.ntp.org", + "1.pool.ntp.org", + "2.pool.ntp.org", + "3.pool.ntp.org", +} +var errUpdateOffset = errors.New("failed to compute offset") + +type ntpQuery func(string, ntp.QueryOptions) (*ntp.Response, error) + +type queryResponse struct { + Offset time.Duration + Error error +} + +type multiRPCError []error + +func (e multiRPCError) Error() string { + var b bytes.Buffer + b.WriteString("RPC failed: ") + more := false + for _, err := range e { + if more { + b.WriteString("; ") + } + b.WriteString(err.Error()) + more = true + } + b.WriteString(".") + return b.String() +} + +func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (time.Duration, error) { + if len(servers) == 0 { + return 0, nil + } + responses := make(chan queryResponse, len(servers)) + for _, server := range servers { + go func(server string) { + response, err := timeQuery(server, ntp.QueryOptions{ + Timeout: DefaultRPCTimeout, + }) + if err == nil { + err = response.Validate() + } + if err != nil { + responses <- queryResponse{Error: err} + return + } + responses <- queryResponse{Offset: response.ClockOffset} + }(server) + } + var ( + rpcErrors multiRPCError + offsets []time.Duration + collected int + ) + for response := range responses { + if response.Error != nil { + rpcErrors = append(rpcErrors, response.Error) + } else { + offsets = append(offsets, response.Offset) + } + collected++ + if collected == len(servers) { + break + } + } + if lth := len(rpcErrors); lth > allowedFailures { + return 0, rpcErrors + } else if lth == len(servers) { + return 0, rpcErrors + } + sort.SliceStable(offsets, func(i, j int) bool { + return offsets[i] > offsets[j] + }) + mid := len(offsets) / 2 + if len(offsets)%2 == 0 { + return (offsets[mid-1] + offsets[mid]) / 2, nil + } + return offsets[mid], nil +} + +// NewNTPTimesource creates a timesource that uses NTP +func NewNTPTimesource(ntpServers []string, log *zap.Logger) *NTPTimeSource { + return &NTPTimeSource{ + servers: ntpServers, + allowedFailures: DefaultMaxAllowedFailures, + fastNTPSyncPeriod: FastNTPSyncPeriod, + slowNTPSyncPeriod: SlowNTPSyncPeriod, + timeQuery: ntp.QueryWithOptions, + log: log.Named("timesource"), + } +} + +// NTPTimeSource provides source of time that tries to be resistant to time skews. +// It does so by periodically querying time offset from ntp servers. +type NTPTimeSource struct { + servers []string + allowedFailures int + fastNTPSyncPeriod time.Duration + slowNTPSyncPeriod time.Duration + timeQuery ntpQuery // for ease of testing + log *zap.Logger + + cancel context.CancelFunc + wg sync.WaitGroup + + mu sync.RWMutex + latestOffset time.Duration +} + +// Now returns time adjusted by latest known offset +func (s *NTPTimeSource) Now() time.Time { + s.mu.RLock() + defer s.mu.RUnlock() + return time.Now().Add(s.latestOffset) +} + +func (s *NTPTimeSource) updateOffset() error { + offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures) + if err != nil { + s.log.Error("failed to compute offset", zap.Error(err)) + return errUpdateOffset + } + s.log.Info("Difference with ntp servers", zap.Duration("offset", offset)) + s.mu.Lock() + s.latestOffset = offset + s.mu.Unlock() + return nil +} + +// runPeriodically runs periodically the given function based on NTPTimeSource +// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod) +func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error) error { + var period time.Duration + + s.log.Info("starting service") + + // we try to do it synchronously so that user can have reliable messages right away + s.wg.Add(1) + go func() { + for { + select { + case <-time.After(period): + if err := fn(); err == nil { + period = s.slowNTPSyncPeriod + } else if period != s.slowNTPSyncPeriod { + period = s.fastNTPSyncPeriod + } + + case <-ctx.Done(): + s.log.Info("stopping service") + s.wg.Done() + return + } + } + }() + + return nil +} + +// Start runs a goroutine that updates local offset every updatePeriod. +func (s *NTPTimeSource) Start(ctx context.Context) error { + s.wg.Wait() // Waiting for other go routines to stop + ctx, cancel := context.WithCancel(ctx) + s.cancel = cancel + return s.runPeriodically(ctx, s.updateOffset) +} + +// Stop goroutine that updates time source. +func (s *NTPTimeSource) Stop() { + if s.cancel == nil { + return + } + s.cancel() + s.wg.Wait() +} diff --git a/waku/timesource/ntp_test.go b/waku/timesource/ntp_test.go new file mode 100644 index 0000000..5e22fee --- /dev/null +++ b/waku/timesource/ntp_test.go @@ -0,0 +1,254 @@ +package timesource + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/beevik/ntp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + // clockCompareDelta declares time required between multiple calls to time.Now + clockCompareDelta = 100 * time.Microsecond +) + +// we don't user real servers for tests, but logic depends on +// actual number of involved NTP servers. +var mockedServers = []string{"ntp1", "ntp2", "ntp3", "ntp4"} + +type testCase struct { + description string + servers []string + allowedFailures int + responses []queryResponse + expected time.Duration + expectError bool + + // actual attempts are mutable + mu sync.Mutex + actualAttempts int +} + +func (tc *testCase) query(string, ntp.QueryOptions) (*ntp.Response, error) { + tc.mu.Lock() + defer func() { + tc.actualAttempts++ + tc.mu.Unlock() + }() + response := &ntp.Response{ + ClockOffset: tc.responses[tc.actualAttempts].Offset, + Stratum: 1, + } + return response, tc.responses[tc.actualAttempts].Error +} + +func newTestCases() []*testCase { + return []*testCase{ + { + description: "SameResponse", + servers: mockedServers, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Offset: 10 * time.Second}, + {Offset: 10 * time.Second}, + {Offset: 10 * time.Second}, + }, + expected: 10 * time.Second, + }, + { + description: "Median", + servers: mockedServers, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Offset: 20 * time.Second}, + {Offset: 20 * time.Second}, + {Offset: 30 * time.Second}, + }, + expected: 20 * time.Second, + }, + { + description: "EvenMedian", + servers: mockedServers[:2], + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Offset: 20 * time.Second}, + }, + expected: 15 * time.Second, + }, + { + description: "Error", + servers: mockedServers, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Error: errors.New("test")}, + {Offset: 30 * time.Second}, + {Offset: 30 * time.Second}, + }, + expected: time.Duration(0), + expectError: true, + }, + { + description: "MultiError", + servers: mockedServers, + responses: []queryResponse{ + {Error: errors.New("test 1")}, + {Error: errors.New("test 2")}, + {Error: errors.New("test 3")}, + {Error: errors.New("test 3")}, + }, + expected: time.Duration(0), + expectError: true, + }, + { + description: "TolerableError", + servers: mockedServers, + allowedFailures: 1, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Error: errors.New("test")}, + {Offset: 20 * time.Second}, + {Offset: 30 * time.Second}, + }, + expected: 20 * time.Second, + }, + { + description: "NonTolerableError", + servers: mockedServers, + allowedFailures: 1, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + }, + expected: time.Duration(0), + expectError: true, + }, + { + description: "AllFailed", + servers: mockedServers, + allowedFailures: 4, + responses: []queryResponse{ + {Error: errors.New("test")}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + }, + expected: time.Duration(0), + expectError: true, + }, + { + description: "HalfTolerable", + servers: mockedServers, + allowedFailures: 2, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Offset: 20 * time.Second}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + }, + expected: 15 * time.Second, + }, + } +} + +func TestComputeOffset(t *testing.T) { + for _, tc := range newTestCases() { + t.Run(tc.description, func(t *testing.T) { + offset, err := computeOffset(tc.query, tc.servers, tc.allowedFailures) + if tc.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tc.expected, offset) + }) + } +} + +func TestNTPTimeSource(t *testing.T) { + for _, tc := range newTestCases() { + t.Run(tc.description, func(t *testing.T) { + _, cancel := context.WithCancel(context.Background()) + source := &NTPTimeSource{ + servers: tc.servers, + allowedFailures: tc.allowedFailures, + timeQuery: tc.query, + log: utils.Logger(), + cancel: cancel, + } + + assert.WithinDuration(t, time.Now(), source.Now(), clockCompareDelta) + err := source.updateOffset() + if tc.expectError { + assert.Equal(t, errUpdateOffset, err) + } else { + assert.NoError(t, err) + } + assert.WithinDuration(t, time.Now().Add(tc.expected), source.Now(), clockCompareDelta) + }) + } +} + +func TestRunningPeriodically(t *testing.T) { + var hits int + var mu sync.RWMutex + periods := make([]time.Duration, 0) + + tc := newTestCases()[0] + fastHits := 3 + slowHits := 1 + + t.Run(tc.description, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + source := &NTPTimeSource{ + servers: tc.servers, + allowedFailures: tc.allowedFailures, + timeQuery: tc.query, + fastNTPSyncPeriod: time.Duration(fastHits*10) * time.Millisecond, + slowNTPSyncPeriod: time.Duration(slowHits*10) * time.Millisecond, + log: utils.Logger(), + cancel: cancel, + } + lastCall := time.Now() + // we're simulating a calls to updateOffset, testing ntp calls happens + // on NTPTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod) + err := source.runPeriodically(ctx, func() error { + mu.Lock() + periods = append(periods, time.Since(lastCall)) + mu.Unlock() + hits++ + if hits < 3 { + return errUpdateOffset + } + if hits == 6 { + source.wg.Done() + } + return nil + }) + + source.wg.Wait() + require.NoError(t, err) + + mu.Lock() + require.Len(t, periods, 6) + defer mu.Unlock() + prev := 0 + for _, period := range periods[1:3] { + p := int(period.Seconds() * 100) + require.True(t, fastHits <= (p-prev)) + prev = p + } + + for _, period := range periods[3:] { + p := int(period.Seconds() * 100) + require.True(t, slowHits <= (p-prev)) + prev = p + } + }) +} diff --git a/waku/timesource/timesource.go b/waku/timesource/timesource.go new file mode 100644 index 0000000..2d0a86b --- /dev/null +++ b/waku/timesource/timesource.go @@ -0,0 +1,12 @@ +package timesource + +import ( + "context" + "time" +) + +type Timesource interface { + Now() time.Time + Start(ctx context.Context) error + Stop() +} diff --git a/waku/timesource/wall.go b/waku/timesource/wall.go new file mode 100644 index 0000000..778f2d3 --- /dev/null +++ b/waku/timesource/wall.go @@ -0,0 +1,26 @@ +package timesource + +import ( + "context" + "time" +) + +type WallClockTimeSource struct { +} + +func NewDefaultClock() *WallClockTimeSource { + return &WallClockTimeSource{} +} + +func (t *WallClockTimeSource) Now() time.Time { + return time.Now() +} + +func (t *WallClockTimeSource) Start(ctx context.Context) error { + // Do nothing + return nil +} + +func (t *WallClockTimeSource) Stop() { + // Do nothing +} From 3a7b42d93c84accf3b31e8873297bc30d3043e4a Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Tue, 2 Dec 2025 10:59:47 +0100 Subject: [PATCH 6/7] rm ntp and wall from timesource --- waku/timesource/ntp.go | 210 ----------------------------- waku/timesource/ntp_test.go | 254 ------------------------------------ waku/timesource/wall.go | 26 ---- 3 files changed, 490 deletions(-) delete mode 100644 waku/timesource/ntp.go delete mode 100644 waku/timesource/ntp_test.go delete mode 100644 waku/timesource/wall.go diff --git a/waku/timesource/ntp.go b/waku/timesource/ntp.go deleted file mode 100644 index 3454631..0000000 --- a/waku/timesource/ntp.go +++ /dev/null @@ -1,210 +0,0 @@ -package timesource - -import ( - "bytes" - "context" - "errors" - "sort" - "sync" - "time" - - "github.com/beevik/ntp" - "go.uber.org/zap" -) - -const ( - // DefaultMaxAllowedFailures defines how many failures will be tolerated. - DefaultMaxAllowedFailures = 1 - - // FastNTPSyncPeriod period between ntp synchronizations before the first - // successful connection. - FastNTPSyncPeriod = 2 * time.Minute - - // SlowNTPSyncPeriod period between ntp synchronizations after the first - // successful connection. - SlowNTPSyncPeriod = 1 * time.Hour - - // DefaultRPCTimeout defines write deadline for single ntp server request. - DefaultRPCTimeout = 2 * time.Second -) - -// DefaultServers will be resolved to the closest available, -// and with high probability resolved to the different IPs -var DefaultServers = []string{ - "0.pool.ntp.org", - "1.pool.ntp.org", - "2.pool.ntp.org", - "3.pool.ntp.org", -} -var errUpdateOffset = errors.New("failed to compute offset") - -type ntpQuery func(string, ntp.QueryOptions) (*ntp.Response, error) - -type queryResponse struct { - Offset time.Duration - Error error -} - -type multiRPCError []error - -func (e multiRPCError) Error() string { - var b bytes.Buffer - b.WriteString("RPC failed: ") - more := false - for _, err := range e { - if more { - b.WriteString("; ") - } - b.WriteString(err.Error()) - more = true - } - b.WriteString(".") - return b.String() -} - -func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (time.Duration, error) { - if len(servers) == 0 { - return 0, nil - } - responses := make(chan queryResponse, len(servers)) - for _, server := range servers { - go func(server string) { - response, err := timeQuery(server, ntp.QueryOptions{ - Timeout: DefaultRPCTimeout, - }) - if err == nil { - err = response.Validate() - } - if err != nil { - responses <- queryResponse{Error: err} - return - } - responses <- queryResponse{Offset: response.ClockOffset} - }(server) - } - var ( - rpcErrors multiRPCError - offsets []time.Duration - collected int - ) - for response := range responses { - if response.Error != nil { - rpcErrors = append(rpcErrors, response.Error) - } else { - offsets = append(offsets, response.Offset) - } - collected++ - if collected == len(servers) { - break - } - } - if lth := len(rpcErrors); lth > allowedFailures { - return 0, rpcErrors - } else if lth == len(servers) { - return 0, rpcErrors - } - sort.SliceStable(offsets, func(i, j int) bool { - return offsets[i] > offsets[j] - }) - mid := len(offsets) / 2 - if len(offsets)%2 == 0 { - return (offsets[mid-1] + offsets[mid]) / 2, nil - } - return offsets[mid], nil -} - -// NewNTPTimesource creates a timesource that uses NTP -func NewNTPTimesource(ntpServers []string, log *zap.Logger) *NTPTimeSource { - return &NTPTimeSource{ - servers: ntpServers, - allowedFailures: DefaultMaxAllowedFailures, - fastNTPSyncPeriod: FastNTPSyncPeriod, - slowNTPSyncPeriod: SlowNTPSyncPeriod, - timeQuery: ntp.QueryWithOptions, - log: log.Named("timesource"), - } -} - -// NTPTimeSource provides source of time that tries to be resistant to time skews. -// It does so by periodically querying time offset from ntp servers. -type NTPTimeSource struct { - servers []string - allowedFailures int - fastNTPSyncPeriod time.Duration - slowNTPSyncPeriod time.Duration - timeQuery ntpQuery // for ease of testing - log *zap.Logger - - cancel context.CancelFunc - wg sync.WaitGroup - - mu sync.RWMutex - latestOffset time.Duration -} - -// Now returns time adjusted by latest known offset -func (s *NTPTimeSource) Now() time.Time { - s.mu.RLock() - defer s.mu.RUnlock() - return time.Now().Add(s.latestOffset) -} - -func (s *NTPTimeSource) updateOffset() error { - offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures) - if err != nil { - s.log.Error("failed to compute offset", zap.Error(err)) - return errUpdateOffset - } - s.log.Info("Difference with ntp servers", zap.Duration("offset", offset)) - s.mu.Lock() - s.latestOffset = offset - s.mu.Unlock() - return nil -} - -// runPeriodically runs periodically the given function based on NTPTimeSource -// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod) -func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error) error { - var period time.Duration - - s.log.Info("starting service") - - // we try to do it synchronously so that user can have reliable messages right away - s.wg.Add(1) - go func() { - for { - select { - case <-time.After(period): - if err := fn(); err == nil { - period = s.slowNTPSyncPeriod - } else if period != s.slowNTPSyncPeriod { - period = s.fastNTPSyncPeriod - } - - case <-ctx.Done(): - s.log.Info("stopping service") - s.wg.Done() - return - } - } - }() - - return nil -} - -// Start runs a goroutine that updates local offset every updatePeriod. -func (s *NTPTimeSource) Start(ctx context.Context) error { - s.wg.Wait() // Waiting for other go routines to stop - ctx, cancel := context.WithCancel(ctx) - s.cancel = cancel - return s.runPeriodically(ctx, s.updateOffset) -} - -// Stop goroutine that updates time source. -func (s *NTPTimeSource) Stop() { - if s.cancel == nil { - return - } - s.cancel() - s.wg.Wait() -} diff --git a/waku/timesource/ntp_test.go b/waku/timesource/ntp_test.go deleted file mode 100644 index 5e22fee..0000000 --- a/waku/timesource/ntp_test.go +++ /dev/null @@ -1,254 +0,0 @@ -package timesource - -import ( - "context" - "errors" - "sync" - "testing" - "time" - - "github.com/beevik/ntp" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const ( - // clockCompareDelta declares time required between multiple calls to time.Now - clockCompareDelta = 100 * time.Microsecond -) - -// we don't user real servers for tests, but logic depends on -// actual number of involved NTP servers. -var mockedServers = []string{"ntp1", "ntp2", "ntp3", "ntp4"} - -type testCase struct { - description string - servers []string - allowedFailures int - responses []queryResponse - expected time.Duration - expectError bool - - // actual attempts are mutable - mu sync.Mutex - actualAttempts int -} - -func (tc *testCase) query(string, ntp.QueryOptions) (*ntp.Response, error) { - tc.mu.Lock() - defer func() { - tc.actualAttempts++ - tc.mu.Unlock() - }() - response := &ntp.Response{ - ClockOffset: tc.responses[tc.actualAttempts].Offset, - Stratum: 1, - } - return response, tc.responses[tc.actualAttempts].Error -} - -func newTestCases() []*testCase { - return []*testCase{ - { - description: "SameResponse", - servers: mockedServers, - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Offset: 10 * time.Second}, - {Offset: 10 * time.Second}, - {Offset: 10 * time.Second}, - }, - expected: 10 * time.Second, - }, - { - description: "Median", - servers: mockedServers, - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Offset: 20 * time.Second}, - {Offset: 20 * time.Second}, - {Offset: 30 * time.Second}, - }, - expected: 20 * time.Second, - }, - { - description: "EvenMedian", - servers: mockedServers[:2], - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Offset: 20 * time.Second}, - }, - expected: 15 * time.Second, - }, - { - description: "Error", - servers: mockedServers, - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Error: errors.New("test")}, - {Offset: 30 * time.Second}, - {Offset: 30 * time.Second}, - }, - expected: time.Duration(0), - expectError: true, - }, - { - description: "MultiError", - servers: mockedServers, - responses: []queryResponse{ - {Error: errors.New("test 1")}, - {Error: errors.New("test 2")}, - {Error: errors.New("test 3")}, - {Error: errors.New("test 3")}, - }, - expected: time.Duration(0), - expectError: true, - }, - { - description: "TolerableError", - servers: mockedServers, - allowedFailures: 1, - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Error: errors.New("test")}, - {Offset: 20 * time.Second}, - {Offset: 30 * time.Second}, - }, - expected: 20 * time.Second, - }, - { - description: "NonTolerableError", - servers: mockedServers, - allowedFailures: 1, - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Error: errors.New("test")}, - {Error: errors.New("test")}, - {Error: errors.New("test")}, - }, - expected: time.Duration(0), - expectError: true, - }, - { - description: "AllFailed", - servers: mockedServers, - allowedFailures: 4, - responses: []queryResponse{ - {Error: errors.New("test")}, - {Error: errors.New("test")}, - {Error: errors.New("test")}, - {Error: errors.New("test")}, - }, - expected: time.Duration(0), - expectError: true, - }, - { - description: "HalfTolerable", - servers: mockedServers, - allowedFailures: 2, - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Offset: 20 * time.Second}, - {Error: errors.New("test")}, - {Error: errors.New("test")}, - }, - expected: 15 * time.Second, - }, - } -} - -func TestComputeOffset(t *testing.T) { - for _, tc := range newTestCases() { - t.Run(tc.description, func(t *testing.T) { - offset, err := computeOffset(tc.query, tc.servers, tc.allowedFailures) - if tc.expectError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - assert.Equal(t, tc.expected, offset) - }) - } -} - -func TestNTPTimeSource(t *testing.T) { - for _, tc := range newTestCases() { - t.Run(tc.description, func(t *testing.T) { - _, cancel := context.WithCancel(context.Background()) - source := &NTPTimeSource{ - servers: tc.servers, - allowedFailures: tc.allowedFailures, - timeQuery: tc.query, - log: utils.Logger(), - cancel: cancel, - } - - assert.WithinDuration(t, time.Now(), source.Now(), clockCompareDelta) - err := source.updateOffset() - if tc.expectError { - assert.Equal(t, errUpdateOffset, err) - } else { - assert.NoError(t, err) - } - assert.WithinDuration(t, time.Now().Add(tc.expected), source.Now(), clockCompareDelta) - }) - } -} - -func TestRunningPeriodically(t *testing.T) { - var hits int - var mu sync.RWMutex - periods := make([]time.Duration, 0) - - tc := newTestCases()[0] - fastHits := 3 - slowHits := 1 - - t.Run(tc.description, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - source := &NTPTimeSource{ - servers: tc.servers, - allowedFailures: tc.allowedFailures, - timeQuery: tc.query, - fastNTPSyncPeriod: time.Duration(fastHits*10) * time.Millisecond, - slowNTPSyncPeriod: time.Duration(slowHits*10) * time.Millisecond, - log: utils.Logger(), - cancel: cancel, - } - lastCall := time.Now() - // we're simulating a calls to updateOffset, testing ntp calls happens - // on NTPTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod) - err := source.runPeriodically(ctx, func() error { - mu.Lock() - periods = append(periods, time.Since(lastCall)) - mu.Unlock() - hits++ - if hits < 3 { - return errUpdateOffset - } - if hits == 6 { - source.wg.Done() - } - return nil - }) - - source.wg.Wait() - require.NoError(t, err) - - mu.Lock() - require.Len(t, periods, 6) - defer mu.Unlock() - prev := 0 - for _, period := range periods[1:3] { - p := int(period.Seconds() * 100) - require.True(t, fastHits <= (p-prev)) - prev = p - } - - for _, period := range periods[3:] { - p := int(period.Seconds() * 100) - require.True(t, slowHits <= (p-prev)) - prev = p - } - }) -} diff --git a/waku/timesource/wall.go b/waku/timesource/wall.go deleted file mode 100644 index 778f2d3..0000000 --- a/waku/timesource/wall.go +++ /dev/null @@ -1,26 +0,0 @@ -package timesource - -import ( - "context" - "time" -) - -type WallClockTimeSource struct { -} - -func NewDefaultClock() *WallClockTimeSource { - return &WallClockTimeSource{} -} - -func (t *WallClockTimeSource) Now() time.Time { - return time.Now() -} - -func (t *WallClockTimeSource) Start(ctx context.Context) error { - // Do nothing - return nil -} - -func (t *WallClockTimeSource) Stop() { - // Do nothing -} From de7d4cf3c9e4ebf8e13be8c0e1a2674e40336cc6 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 12 Dec 2025 22:56:00 +0100 Subject: [PATCH 7/7] chore: waku-go-bindings -> logos-messaging-go-bindings --- waku/nwaku.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index 0cd2fdb..212cd09 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -352,7 +352,7 @@ import ( "time" "unsafe" - "github.com/waku-org/waku-go-bindings/waku/timesource" + "github.com/logos-messaging/logos-messaging-go-bindings/waku/timesource" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode"