refactor: move libwaku bridge into internal/ffi/libwaku subpackage

One subpackage per C library so a binary links exactly the libraries it
imports: libwaku and liblogosdelivery carry overlapping symbols and must never
link together (until logos-delivery#3851). With the package named libwaku the
Waku* prefixes became stutter, so the exported primitives drop them
(libwaku.New/Start/RelayPublish/...). internal/ffi keeps an umbrella doc.go
stating the rule.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Igor Sirotin 2026-06-10 14:24:02 +03:00
parent ec187d085d
commit e1f0200c94
No known key found for this signature in database
GPG Key ID: 0EABBCB40CB9AD4A
3 changed files with 164 additions and 161 deletions

View File

@ -1,8 +1,7 @@
// Package ffi holds the cgo bridge over the logos-delivery C libraries: the
// synchronous request/callback plumbing, the global event callback, and the
// handle registry. It exposes Go-typed primitives so the public packages
// (e.g. messaging) stay pure Go.
// Package ffi groups the cgo bridges over the logos-delivery C libraries.
//
// Currently holds the Kernel API bridge (libwaku); the Messaging API bindings
// (over liblogosdelivery) land here in a follow-up.
// Each C library gets its own subpackage (libwaku now; liblogosdelivery in a
// follow-up) so that a binary links exactly the libraries it imports — the
// two .so files carry overlapping symbols and must never be linked together
// (until logos-delivery#3851 consolidates them).
package ffi

View File

@ -1,4 +1,8 @@
package ffi
// Package libwaku is the cgo bridge over libwaku (the legacy Kernel API
// library): the synchronous request/callback plumbing, the global event
// callback, and the handle registry. It exposes Go-typed primitives so
// pkg/kernel stays pure Go.
package libwaku
/*
#include <libwaku.h>
@ -148,15 +152,15 @@ type Handle = unsafe.Pointer
// RetOK is the return code callbacks report on success.
const RetOK = C.RET_OK
// WakuEventHandler receives every event libwaku emits for a node: the raw
// EventHandler receives every event libwaku emits for a node: the raw
// event JSON when ret == RetOK, an error message otherwise.
type WakuEventHandler func(ret int, msg string)
type EventHandler func(ret int, msg string)
// wakuEventHandlers maps a node handle to the Go function that receives its
// eventHandlers maps a node handle to the Go function that receives its
// events. The shared C event callback looks the handler up by handle.
var (
wakuEventHandlersMu sync.RWMutex
wakuEventHandlers = make(map[Handle]WakuEventHandler)
eventHandlersMu sync.RWMutex
eventHandlers = make(map[Handle]EventHandler)
)
//export wakuGoCallback
@ -174,18 +178,18 @@ func wakuGoCallback(ret C.int, msg *C.char, length C.size_t, resp unsafe.Pointer
//export wakuEventCallback
func wakuEventCallback(ret C.int, msg *C.char, length C.size_t, userData unsafe.Pointer) {
wakuEventHandlersMu.RLock()
fn := wakuEventHandlers[userData] // userData carries the node's handle
wakuEventHandlersMu.RUnlock()
eventHandlersMu.RLock()
fn := eventHandlers[userData] // userData carries the node's handle
eventHandlersMu.RUnlock()
if fn != nil {
fn(int(ret), C.GoStringN(msg, C.int(length)))
}
}
// wakuCall runs a synchronous libwaku entry point that reports its result
// call runs a synchronous libwaku entry point that reports its result
// through the response callback, blocks until it completes, and returns the
// callback message (on RetOK) or an error built from it.
func wakuCall(invoke func(resp unsafe.Pointer)) (string, error) {
func call(invoke func(resp unsafe.Pointer)) (string, error) {
var wg sync.WaitGroup
wg.Add(1)
resp := C.allocWakuResp(unsafe.Pointer(&wg))
@ -201,9 +205,9 @@ func wakuCall(invoke func(resp unsafe.Pointer)) (string, error) {
return msg, nil
}
// WakuNew builds a node from a WakuConfig JSON string and returns its handle.
// The handle must be released with WakuDestroy.
func WakuNew(configJSON string) (Handle, error) {
// New builds a node from a WakuConfig JSON string and returns its handle.
// The handle must be released with Destroy.
func New(configJSON string) (Handle, error) {
cCfg := C.CString(configJSON)
defer C.free(unsafe.Pointer(cCfg))
@ -225,246 +229,246 @@ func WakuNew(configJSON string) (Handle, error) {
return Handle(ctx), nil
}
// SetWakuEventHandler registers fn to receive events for the node and wires up
// SetEventHandler registers fn to receive events for the node and wires up
// the underlying C event callback.
func SetWakuEventHandler(h Handle, fn WakuEventHandler) {
wakuEventHandlersMu.Lock()
wakuEventHandlers[h] = fn
wakuEventHandlersMu.Unlock()
func SetEventHandler(h Handle, fn EventHandler) {
eventHandlersMu.Lock()
eventHandlers[h] = fn
eventHandlersMu.Unlock()
C.cGoWakuSetEventCallback(h)
}
// WakuStart starts the node.
func WakuStart(h Handle) error {
_, err := wakuCall(func(resp unsafe.Pointer) { C.cGoWakuStart(h, resp) })
// Start starts the node.
func Start(h Handle) error {
_, err := call(func(resp unsafe.Pointer) { C.cGoWakuStart(h, resp) })
return err
}
// WakuStop stops the node.
func WakuStop(h Handle) error {
_, err := wakuCall(func(resp unsafe.Pointer) { C.cGoWakuStop(h, resp) })
// Stop stops the node.
func Stop(h Handle) error {
_, err := call(func(resp unsafe.Pointer) { C.cGoWakuStop(h, resp) })
return err
}
// WakuDestroy releases the node context and unregisters its event handler.
func WakuDestroy(h Handle) error {
_, err := wakuCall(func(resp unsafe.Pointer) { C.cGoWakuDestroy(h, resp) })
// Destroy releases the node context and unregisters its event handler.
func Destroy(h Handle) error {
_, err := call(func(resp unsafe.Pointer) { C.cGoWakuDestroy(h, resp) })
if err == nil {
wakuEventHandlersMu.Lock()
delete(wakuEventHandlers, h)
wakuEventHandlersMu.Unlock()
eventHandlersMu.Lock()
delete(eventHandlers, h)
eventHandlersMu.Unlock()
}
return err
}
// WakuStartDiscV5 starts DiscV5 peer discovery.
func WakuStartDiscV5(h Handle) error {
_, err := wakuCall(func(resp unsafe.Pointer) { C.cGoWakuStartDiscV5(h, resp) })
// StartDiscV5 starts DiscV5 peer discovery.
func StartDiscV5(h Handle) error {
_, err := call(func(resp unsafe.Pointer) { C.cGoWakuStartDiscV5(h, resp) })
return err
}
// WakuStopDiscV5 stops DiscV5 peer discovery.
func WakuStopDiscV5(h Handle) error {
_, err := wakuCall(func(resp unsafe.Pointer) { C.cGoWakuStopDiscV5(h, resp) })
// StopDiscV5 stops DiscV5 peer discovery.
func StopDiscV5(h Handle) error {
_, err := call(func(resp unsafe.Pointer) { C.cGoWakuStopDiscV5(h, resp) })
return err
}
// WakuVersion returns the libwaku version string.
func WakuVersion(h Handle) (string, error) {
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuVersion(h, resp) })
// Version returns the libwaku version string.
func Version(h Handle) (string, error) {
return call(func(resp unsafe.Pointer) { C.cGoWakuVersion(h, resp) })
}
// WakuRelayPublish publishes a WakuMessage JSON on a pubsub topic and returns
// RelayPublish publishes a WakuMessage JSON on a pubsub topic and returns
// the message hash.
func WakuRelayPublish(h Handle, pubsubTopic, messageJSON string, timeoutMs int) (string, error) {
func RelayPublish(h Handle, pubsubTopic, messageJSON string, timeoutMs int) (string, error) {
cTopic := C.CString(pubsubTopic)
cMsg := C.CString(messageJSON)
defer C.free(unsafe.Pointer(cTopic))
defer C.free(unsafe.Pointer(cMsg))
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuRelayPublish(h, cTopic, cMsg, C.int(timeoutMs), resp) })
return call(func(resp unsafe.Pointer) { C.cGoWakuRelayPublish(h, cTopic, cMsg, C.int(timeoutMs), resp) })
}
// WakuRelaySubscribe subscribes the node to a pubsub topic.
func WakuRelaySubscribe(h Handle, pubsubTopic string) error {
// RelaySubscribe subscribes the node to a pubsub topic.
func RelaySubscribe(h Handle, pubsubTopic string) error {
cTopic := C.CString(pubsubTopic)
defer C.free(unsafe.Pointer(cTopic))
_, err := wakuCall(func(resp unsafe.Pointer) { C.cGoWakuRelaySubscribe(h, cTopic, resp) })
_, err := call(func(resp unsafe.Pointer) { C.cGoWakuRelaySubscribe(h, cTopic, resp) })
return err
}
// WakuRelayAddProtectedShard registers the hex-encoded public key allowed to
// RelayAddProtectedShard registers the hex-encoded public key allowed to
// sign messages on a protected shard.
func WakuRelayAddProtectedShard(h Handle, clusterID, shardID int, publicKeyHex string) error {
func RelayAddProtectedShard(h Handle, clusterID, shardID int, publicKeyHex string) error {
cPublicKey := C.CString(publicKeyHex)
defer C.free(unsafe.Pointer(cPublicKey))
_, err := wakuCall(func(resp unsafe.Pointer) {
_, err := call(func(resp unsafe.Pointer) {
C.cGoWakuRelayAddProtectedShard(h, C.int(clusterID), C.int(shardID), cPublicKey, resp)
})
return err
}
// WakuRelayUnsubscribe unsubscribes the node from a pubsub topic.
func WakuRelayUnsubscribe(h Handle, pubsubTopic string) error {
// RelayUnsubscribe unsubscribes the node from a pubsub topic.
func RelayUnsubscribe(h Handle, pubsubTopic string) error {
cTopic := C.CString(pubsubTopic)
defer C.free(unsafe.Pointer(cTopic))
_, err := wakuCall(func(resp unsafe.Pointer) { C.cGoWakuRelayUnsubscribe(h, cTopic, resp) })
_, err := call(func(resp unsafe.Pointer) { C.cGoWakuRelayUnsubscribe(h, cTopic, resp) })
return err
}
// WakuConnect dials a peer multiaddress.
func WakuConnect(h Handle, peerMultiAddr string, timeoutMs int) error {
// Connect dials a peer multiaddress.
func Connect(h Handle, peerMultiAddr string, timeoutMs int) error {
cAddr := C.CString(peerMultiAddr)
defer C.free(unsafe.Pointer(cAddr))
_, err := wakuCall(func(resp unsafe.Pointer) { C.cGoWakuConnect(h, cAddr, C.int(timeoutMs), resp) })
_, err := call(func(resp unsafe.Pointer) { C.cGoWakuConnect(h, cAddr, C.int(timeoutMs), resp) })
return err
}
// WakuDialPeer dials a peer multiaddress over a specific protocol.
func WakuDialPeer(h Handle, peerMultiAddr, protocol string, timeoutMs int) error {
// DialPeer dials a peer multiaddress over a specific protocol.
func DialPeer(h Handle, peerMultiAddr, protocol string, timeoutMs int) error {
cAddr := C.CString(peerMultiAddr)
cProtocol := C.CString(protocol)
defer C.free(unsafe.Pointer(cAddr))
defer C.free(unsafe.Pointer(cProtocol))
_, err := wakuCall(func(resp unsafe.Pointer) { C.cGoWakuDialPeer(h, cAddr, cProtocol, C.int(timeoutMs), resp) })
_, err := call(func(resp unsafe.Pointer) { C.cGoWakuDialPeer(h, cAddr, cProtocol, C.int(timeoutMs), resp) })
return err
}
// WakuDialPeerByID dials a known peer id over a specific protocol.
func WakuDialPeerByID(h Handle, peerID, protocol string, timeoutMs int) error {
// DialPeerByID dials a known peer id over a specific protocol.
func DialPeerByID(h Handle, peerID, protocol string, timeoutMs int) error {
cPeerID := C.CString(peerID)
cProtocol := C.CString(protocol)
defer C.free(unsafe.Pointer(cPeerID))
defer C.free(unsafe.Pointer(cProtocol))
_, err := wakuCall(func(resp unsafe.Pointer) { C.cGoWakuDialPeerById(h, cPeerID, cProtocol, C.int(timeoutMs), resp) })
_, err := call(func(resp unsafe.Pointer) { C.cGoWakuDialPeerById(h, cPeerID, cProtocol, C.int(timeoutMs), resp) })
return err
}
// WakuDisconnectPeerByID drops the connection to a peer.
func WakuDisconnectPeerByID(h Handle, peerID string) error {
// DisconnectPeerByID drops the connection to a peer.
func DisconnectPeerByID(h Handle, peerID string) error {
cPeerID := C.CString(peerID)
defer C.free(unsafe.Pointer(cPeerID))
_, err := wakuCall(func(resp unsafe.Pointer) { C.cGoWakuDisconnectPeerById(h, cPeerID, resp) })
_, err := call(func(resp unsafe.Pointer) { C.cGoWakuDisconnectPeerById(h, cPeerID, resp) })
return err
}
// WakuDisconnectAllPeers drops all peer connections.
func WakuDisconnectAllPeers(h Handle) error {
_, err := wakuCall(func(resp unsafe.Pointer) { C.cGoWakuDisconnectAllPeers(h, resp) })
// DisconnectAllPeers drops all peer connections.
func DisconnectAllPeers(h Handle) error {
_, err := call(func(resp unsafe.Pointer) { C.cGoWakuDisconnectAllPeers(h, resp) })
return err
}
// WakuListenAddresses returns the node's listen multiaddresses as a
// ListenAddresses returns the node's listen multiaddresses as a
// comma-separated list.
func WakuListenAddresses(h Handle) (string, error) {
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuListenAddresses(h, resp) })
func ListenAddresses(h Handle) (string, error) {
return call(func(resp unsafe.Pointer) { C.cGoWakuListenAddresses(h, resp) })
}
// WakuGetMyENR returns the node's ENR record.
func WakuGetMyENR(h Handle) (string, error) {
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuGetMyENR(h, resp) })
// GetMyENR returns the node's ENR record.
func GetMyENR(h Handle) (string, error) {
return call(func(resp unsafe.Pointer) { C.cGoWakuGetMyENR(h, resp) })
}
// WakuGetMyPeerID returns the node's peer id.
func WakuGetMyPeerID(h Handle) (string, error) {
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuGetMyPeerId(h, resp) })
// GetMyPeerID returns the node's peer id.
func GetMyPeerID(h Handle) (string, error) {
return call(func(resp unsafe.Pointer) { C.cGoWakuGetMyPeerId(h, resp) })
}
// WakuPingPeer pings a peer (comma-separated multiaddresses) and returns the
// PingPeer pings a peer (comma-separated multiaddresses) and returns the
// round-trip time in nanoseconds.
func WakuPingPeer(h Handle, peerAddrs string, timeoutMs int) (string, error) {
func PingPeer(h Handle, peerAddrs string, timeoutMs int) (string, error) {
cAddr := C.CString(peerAddrs)
defer C.free(unsafe.Pointer(cAddr))
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuPingPeer(h, cAddr, C.int(timeoutMs), resp) })
return call(func(resp unsafe.Pointer) { C.cGoWakuPingPeer(h, cAddr, C.int(timeoutMs), resp) })
}
// WakuGetPeersInMesh returns the relay mesh peer ids for a pubsub topic as a
// GetPeersInMesh returns the relay mesh peer ids for a pubsub topic as a
// comma-separated list.
func WakuGetPeersInMesh(h Handle, pubsubTopic string) (string, error) {
func GetPeersInMesh(h Handle, pubsubTopic string) (string, error) {
cTopic := C.CString(pubsubTopic)
defer C.free(unsafe.Pointer(cTopic))
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuGetPeersInMesh(h, cTopic, resp) })
return call(func(resp unsafe.Pointer) { C.cGoWakuGetPeersInMesh(h, cTopic, resp) })
}
// WakuGetNumPeersInMesh returns the relay mesh peer count for a pubsub topic.
func WakuGetNumPeersInMesh(h Handle, pubsubTopic string) (string, error) {
// GetNumPeersInMesh returns the relay mesh peer count for a pubsub topic.
func GetNumPeersInMesh(h Handle, pubsubTopic string) (string, error) {
cTopic := C.CString(pubsubTopic)
defer C.free(unsafe.Pointer(cTopic))
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuGetNumPeersInMesh(h, cTopic, resp) })
return call(func(resp unsafe.Pointer) { C.cGoWakuGetNumPeersInMesh(h, cTopic, resp) })
}
// WakuGetNumConnectedRelayPeers returns the connected relay peer count for a
// GetNumConnectedRelayPeers returns the connected relay peer count for a
// pubsub topic.
func WakuGetNumConnectedRelayPeers(h Handle, pubsubTopic string) (string, error) {
func GetNumConnectedRelayPeers(h Handle, pubsubTopic string) (string, error) {
cTopic := C.CString(pubsubTopic)
defer C.free(unsafe.Pointer(cTopic))
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuGetNumConnectedRelayPeers(h, cTopic, resp) })
return call(func(resp unsafe.Pointer) { C.cGoWakuGetNumConnectedRelayPeers(h, cTopic, resp) })
}
// WakuGetConnectedRelayPeers returns the connected relay peer ids for a pubsub
// GetConnectedRelayPeers returns the connected relay peer ids for a pubsub
// topic as a comma-separated list.
func WakuGetConnectedRelayPeers(h Handle, pubsubTopic string) (string, error) {
func GetConnectedRelayPeers(h Handle, pubsubTopic string) (string, error) {
cTopic := C.CString(pubsubTopic)
defer C.free(unsafe.Pointer(cTopic))
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuGetConnectedRelayPeers(h, cTopic, resp) })
return call(func(resp unsafe.Pointer) { C.cGoWakuGetConnectedRelayPeers(h, cTopic, resp) })
}
// WakuGetConnectedPeers returns the connected peer ids as a comma-separated
// GetConnectedPeers returns the connected peer ids as a comma-separated
// list.
func WakuGetConnectedPeers(h Handle) (string, error) {
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuGetConnectedPeers(h, resp) })
func GetConnectedPeers(h Handle) (string, error) {
return call(func(resp unsafe.Pointer) { C.cGoWakuGetConnectedPeers(h, resp) })
}
// WakuGetPeerIDsFromPeerStore returns the peer-store peer ids as a
// GetPeerIDsFromPeerStore returns the peer-store peer ids as a
// comma-separated list.
func WakuGetPeerIDsFromPeerStore(h Handle) (string, error) {
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuGetPeerIdsFromPeerStore(h, resp) })
func GetPeerIDsFromPeerStore(h Handle) (string, error) {
return call(func(resp unsafe.Pointer) { C.cGoWakuGetPeerIdsFromPeerStore(h, resp) })
}
// WakuGetConnectedPeersInfo returns the connected peers' info as JSON.
func WakuGetConnectedPeersInfo(h Handle) (string, error) {
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuGetConnectedPeersInfo(h, resp) })
// GetConnectedPeersInfo returns the connected peers' info as JSON.
func GetConnectedPeersInfo(h Handle) (string, error) {
return call(func(resp unsafe.Pointer) { C.cGoWakuGetConnectedPeersInfo(h, resp) })
}
// WakuStoreQuery runs a store query (JSON) against a peer (comma-separated
// StoreQuery runs a store query (JSON) against a peer (comma-separated
// multiaddresses) and returns the response JSON.
func WakuStoreQuery(h Handle, queryJSON, peerAddrs string, timeoutMs int) (string, error) {
func StoreQuery(h Handle, queryJSON, peerAddrs string, timeoutMs int) (string, error) {
cQuery := C.CString(queryJSON)
cAddr := C.CString(peerAddrs)
defer C.free(unsafe.Pointer(cQuery))
defer C.free(unsafe.Pointer(cAddr))
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuStoreQuery(h, cQuery, cAddr, C.int(timeoutMs), resp) })
return call(func(resp unsafe.Pointer) { C.cGoWakuStoreQuery(h, cQuery, cAddr, C.int(timeoutMs), resp) })
}
// WakuPeerExchangeRequest asks peer exchange for numPeers peers and returns
// PeerExchangeRequest asks peer exchange for numPeers peers and returns
// the number of received peers.
func WakuPeerExchangeRequest(h Handle, numPeers uint64) (string, error) {
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuPeerExchangeQuery(h, C.uint64_t(numPeers), resp) })
func PeerExchangeRequest(h Handle, numPeers uint64) (string, error) {
return call(func(resp unsafe.Pointer) { C.cGoWakuPeerExchangeQuery(h, C.uint64_t(numPeers), resp) })
}
// WakuGetPeerIDsByProtocol returns the peer ids supporting a protocol as a
// GetPeerIDsByProtocol returns the peer ids supporting a protocol as a
// comma-separated list.
func WakuGetPeerIDsByProtocol(h Handle, protocol string) (string, error) {
func GetPeerIDsByProtocol(h Handle, protocol string) (string, error) {
cProtocol := C.CString(protocol)
defer C.free(unsafe.Pointer(cProtocol))
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuGetPeerIdsByProtocol(h, cProtocol, resp) })
return call(func(resp unsafe.Pointer) { C.cGoWakuGetPeerIdsByProtocol(h, cProtocol, resp) })
}
// WakuDnsDiscovery resolves an ENR tree URL via DNS discovery and returns the
// DnsDiscovery resolves an ENR tree URL via DNS discovery and returns the
// discovered multiaddresses as a comma-separated list.
func WakuDnsDiscovery(h Handle, enrTreeURL, nameDNSServer string, timeoutMs int) (string, error) {
func DnsDiscovery(h Handle, enrTreeURL, nameDNSServer string, timeoutMs int) (string, error) {
cEnrTree := C.CString(enrTreeURL)
cDNSServer := C.CString(nameDNSServer)
defer C.free(unsafe.Pointer(cEnrTree))
defer C.free(unsafe.Pointer(cDNSServer))
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuDnsDiscovery(h, cEnrTree, cDNSServer, C.int(timeoutMs), resp) })
return call(func(resp unsafe.Pointer) { C.cGoWakuDnsDiscovery(h, cEnrTree, cDNSServer, C.int(timeoutMs), resp) })
}
// WakuIsOnline reports the node's online state ("true"/"false").
func WakuIsOnline(h Handle) (string, error) {
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuIsOnline(h, resp) })
// IsOnline reports the node's online state ("true"/"false").
func IsOnline(h Handle) (string, error) {
return call(func(resp unsafe.Pointer) { C.cGoWakuIsOnline(h, resp) })
}
// WakuGetMetrics returns the node's metrics in Prometheus text format.
func WakuGetMetrics(h Handle) (string, error) {
return wakuCall(func(resp unsafe.Pointer) { C.cGoWakuGetMetrics(h, resp) })
// GetMetrics returns the node's metrics in Prometheus text format.
func GetMetrics(h Handle) (string, error) {
return call(func(resp unsafe.Pointer) { C.cGoWakuGetMetrics(h, resp) })
}

View File

@ -12,7 +12,7 @@ import (
"strings"
"time"
"github.com/logos-messaging/logos-delivery-go-bindings/internal/ffi"
"github.com/logos-messaging/logos-delivery-go-bindings/internal/ffi/libwaku"
"github.com/logos-messaging/logos-delivery-go-bindings/pkg/kernel/timesource"
"github.com/ethereum/go-ethereum/crypto"
@ -33,7 +33,7 @@ const ConnectionChangeChanBufferSize = 1024
// WakuNode represents an instance of an nwaku node
type WakuNode struct {
wakuCtx ffi.Handle
wakuCtx libwaku.Handle
config *common.WakuConfig
MsgChan chan common.Envelope
TopicHealthChan chan topicHealth
@ -54,7 +54,7 @@ func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error)
return nil, err
}
n.wakuCtx, err = ffi.WakuNew(string(jsonConfig))
n.wakuCtx, err = libwaku.New(string(jsonConfig))
if err != nil {
Error("error wakuNew for %s: %v", nodeName, err)
return nil, err
@ -64,7 +64,7 @@ func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error)
n.TopicHealthChan = make(chan topicHealth, TopicHealthChanBufferSize)
n.ConnectionChangeChan = make(chan connectionChange, ConnectionChangeChanBufferSize)
ffi.SetWakuEventHandler(n.wakuCtx, n.onRawEvent)
libwaku.SetEventHandler(n.wakuCtx, n.onRawEvent)
Debug("Successfully created WakuNode: %s", nodeName)
return n, nil
@ -72,7 +72,7 @@ func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error)
// onRawEvent receives every libwaku event for this node from the ffi bridge.
func (n *WakuNode) onRawEvent(ret int, msg string) {
if ret == ffi.RetOK {
if ret == libwaku.RetOK {
n.OnEvent(msg)
return
}
@ -168,7 +168,7 @@ func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, err
pubsubTopic = optPubsubTopic[0]
}
numPeersStr, err := ffi.WakuGetNumConnectedRelayPeers(n.wakuCtx, pubsubTopic)
numPeersStr, err := libwaku.GetNumConnectedRelayPeers(n.wakuCtx, pubsubTopic)
if err != nil {
errMsg := "error GetNumConnectedRelayPeers: " + err.Error()
Error("Failed to get number of connected relay peers for %s: %s", n.nodeName, errMsg)
@ -199,7 +199,7 @@ func (n *WakuNode) GetConnectedRelayPeers(optPubsubTopic ...string) (peer.IDSlic
Debug("Fetching connected relay peers for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName)
peersStr, err := ffi.WakuGetConnectedRelayPeers(n.wakuCtx, pubsubTopic)
peersStr, err := libwaku.GetConnectedRelayPeers(n.wakuCtx, pubsubTopic)
if err != nil {
errMsg := "error GetConnectedRelayPeers: " + err.Error()
Error("Failed to get connected relay peers for pubsubTopic: %v:, node: %v. %v", pubsubTopic, n.nodeName, errMsg)
@ -227,14 +227,14 @@ func (n *WakuNode) GetConnectedRelayPeers(optPubsubTopic ...string) (peer.IDSlic
}
func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error {
if err := ffi.WakuDisconnectPeerByID(n.wakuCtx, peerID.String()); err != nil {
if err := libwaku.DisconnectPeerByID(n.wakuCtx, peerID.String()); err != nil {
return fmt.Errorf("error DisconnectPeerById: %w", err)
}
return nil
}
func (n *WakuNode) DisconnectAllPeers() error {
if err := ffi.WakuDisconnectAllPeers(n.wakuCtx); err != nil {
if err := libwaku.DisconnectAllPeers(n.wakuCtx); err != nil {
return fmt.Errorf("error DisconnectAllPeers: %w", err)
}
return nil
@ -249,7 +249,7 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
Debug("Fetching connected peers for %v", n.nodeName)
peersStr, err := ffi.WakuGetConnectedPeers(n.wakuCtx)
peersStr, err := libwaku.GetConnectedPeers(n.wakuCtx)
if err != nil {
errMsg := "error GetConnectedPeers: " + err.Error()
Error("Failed to get connected peers for %v: %v", n.nodeName, errMsg)
@ -285,7 +285,7 @@ func (n *WakuNode) GetPeersInMesh(pubsubTopic string) (peer.IDSlice, error) {
Debug("Fetching peers in mesh peers for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName)
peersStr, err := ffi.WakuGetPeersInMesh(n.wakuCtx, pubsubTopic)
peersStr, err := libwaku.GetPeersInMesh(n.wakuCtx, pubsubTopic)
if err != nil {
errMsg := "error GetPeersInMesh: " + err.Error()
Error("Failed to get peers in mesh for pubsubTopic: %v:, node: %v. %v", pubsubTopic, n.nodeName, errMsg)
@ -321,7 +321,7 @@ func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
return errors.New("wakuCtx is nil")
}
if err := ffi.WakuRelaySubscribe(n.wakuCtx, pubsubTopic); err != nil {
if err := libwaku.RelaySubscribe(n.wakuCtx, pubsubTopic); err != nil {
Error("Failed to subscribe to relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, err)
return fmt.Errorf("error WakuRelaySubscribe: %w", err)
}
@ -341,7 +341,7 @@ func (n *WakuNode) RelayAddProtectedShard(clusterId uint16, shardId uint16, pubk
keyHexStr := hex.EncodeToString(crypto.FromECDSAPub(pubkey))
if err := ffi.WakuRelayAddProtectedShard(n.wakuCtx, int(clusterId), int(shardId), keyHexStr); err != nil {
if err := libwaku.RelayAddProtectedShard(n.wakuCtx, int(clusterId), int(shardId), keyHexStr); err != nil {
return fmt.Errorf("error WakuRelayAddProtectedShard: %w", err)
}
return nil
@ -359,7 +359,7 @@ func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error {
}
Debug("Attempting to unsubscribe from relay on node %s, pubsubTopic: %s", n.nodeName, pubsubTopic)
if err := ffi.WakuRelayUnsubscribe(n.wakuCtx, pubsubTopic); err != nil {
if err := libwaku.RelayUnsubscribe(n.wakuCtx, pubsubTopic); err != nil {
Error("Failed to unsubscribe from relay on node %s, pubsubTopic: %s, error: %v", n.nodeName, pubsubTopic, err)
return fmt.Errorf("error WakuRelayUnsubscribe: %w", err)
}
@ -369,7 +369,7 @@ func (n *WakuNode) RelayUnsubscribe(pubsubTopic string) error {
}
func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) {
numRecvPeersStr, err := ffi.WakuPeerExchangeRequest(n.wakuCtx, numPeers)
numRecvPeersStr, err := libwaku.PeerExchangeRequest(n.wakuCtx, numPeers)
if err != nil {
Error("PeerExchangeRequest failed: %v", err)
return 0, err
@ -386,7 +386,7 @@ func (n *WakuNode) PeerExchangeRequest(numPeers uint64) (uint64, error) {
func (n *WakuNode) StartDiscV5() error {
Debug("Starting DiscV5 for node: %s", n.nodeName)
if err := ffi.WakuStartDiscV5(n.wakuCtx); err != nil {
if err := libwaku.StartDiscV5(n.wakuCtx); err != nil {
errMsg := "error WakuStartDiscV5: " + err.Error()
Error("Failed to start DiscV5 for node %s: %v", n.nodeName, errMsg)
return errors.New(errMsg)
@ -396,7 +396,7 @@ func (n *WakuNode) StartDiscV5() error {
}
func (n *WakuNode) StopDiscV5() error {
if err := ffi.WakuStopDiscV5(n.wakuCtx); err != nil {
if err := libwaku.StopDiscV5(n.wakuCtx); err != nil {
errMsg := "error WakuStopDiscV5: " + err.Error()
Error("Failed to stop DiscV5 for node %s: %v", n.nodeName, errMsg)
return errors.New(errMsg)
@ -406,7 +406,7 @@ func (n *WakuNode) StopDiscV5() error {
}
func (n *WakuNode) Version() (string, error) {
version, err := ffi.WakuVersion(n.wakuCtx)
version, err := libwaku.Version(n.wakuCtx)
if err != nil {
errMsg := "error WakuVersion: " + err.Error()
Error("Failed to fetch Waku version for node %s: %v", n.nodeName, errMsg)
@ -430,7 +430,7 @@ func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *common.StoreQue
addrs[i] = addr.String()
}
jsonResponseStr, err := ffi.WakuStoreQuery(n.wakuCtx, string(b), strings.Join(addrs, ","), timeoutMs)
jsonResponseStr, err := libwaku.StoreQuery(n.wakuCtx, string(b), strings.Join(addrs, ","), timeoutMs)
if err != nil {
return nil, fmt.Errorf("error WakuStoreQuery: %w", err)
}
@ -451,7 +451,7 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu
return common.MessageHash(""), err
}
msgHash, err := ffi.WakuRelayPublish(n.wakuCtx, pubsubTopic, string(jsonMsg), timeoutMs)
msgHash, err := libwaku.RelayPublish(n.wakuCtx, pubsubTopic, string(jsonMsg), timeoutMs)
if err != nil {
return common.MessageHash(""), fmt.Errorf("WakuRelayPublish: %w", err)
}
@ -489,7 +489,7 @@ func (n *WakuNode) RelayPublishNoCTX(pubsubTopic string, message *pb.WakuMessage
func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) {
timeoutMs := getContextTimeoutMilliseconds(ctx)
nodeAddresses, err := ffi.WakuDnsDiscovery(n.wakuCtx, enrTreeUrl, nameDnsServer, timeoutMs)
nodeAddresses, err := libwaku.DnsDiscovery(n.wakuCtx, enrTreeUrl, nameDnsServer, timeoutMs)
if err != nil {
return nil, fmt.Errorf("error WakuDnsDiscovery: %w", err)
}
@ -514,7 +514,7 @@ func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.D
timeoutMs := getContextTimeoutMilliseconds(ctx)
rttStr, err := ffi.WakuPingPeer(n.wakuCtx, strings.Join(addrs, ","), timeoutMs)
rttStr, err := libwaku.PingPeer(n.wakuCtx, strings.Join(addrs, ","), timeoutMs)
if err != nil {
return 0, fmt.Errorf("PingPeer: %w", err)
}
@ -529,7 +529,7 @@ func (n *WakuNode) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.D
func (n *WakuNode) Start() error {
Debug("Starting %s", n.nodeName)
if err := ffi.WakuStart(n.wakuCtx); err != nil {
if err := libwaku.Start(n.wakuCtx); err != nil {
errMsg := "error WakuStart: " + err.Error()
Error("Failed to start %s: %s", n.nodeName, errMsg)
return errors.New(errMsg)
@ -542,7 +542,7 @@ func (n *WakuNode) Start() error {
func (n *WakuNode) Stop() error {
Debug("Stopping %s", n.nodeName)
if err := ffi.WakuStop(n.wakuCtx); err != nil {
if err := libwaku.Stop(n.wakuCtx); err != nil {
errMsg := "error WakuStop: " + err.Error()
Error("Failed to stop %s: %s", n.nodeName, errMsg)
return errors.New(errMsg)
@ -561,7 +561,7 @@ func (n *WakuNode) Destroy() error {
Debug("Destroying %v", n.nodeName)
if err := ffi.WakuDestroy(n.wakuCtx); err != nil {
if err := libwaku.Destroy(n.wakuCtx); err != nil {
errMsg := "error WakuDestroy: " + err.Error()
Error("Failed to destroy %v: %v", n.nodeName, errMsg)
return errors.New(errMsg)
@ -572,7 +572,7 @@ func (n *WakuNode) Destroy() error {
}
func (n *WakuNode) PeerID() (peer.ID, error) {
peerIdStr, err := ffi.WakuGetMyPeerID(n.wakuCtx)
peerIdStr, err := libwaku.GetMyPeerID(n.wakuCtx)
if err != nil {
return "", err
}
@ -588,7 +588,7 @@ func (n *WakuNode) PeerID() (peer.ID, error) {
func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error {
timeoutMs := getContextTimeoutMilliseconds(ctx)
if err := ffi.WakuConnect(n.wakuCtx, addr.String(), timeoutMs); err != nil {
if err := libwaku.Connect(n.wakuCtx, addr.String(), timeoutMs); err != nil {
return fmt.Errorf("error WakuConnect: %w", err)
}
return nil
@ -597,14 +597,14 @@ func (n *WakuNode) Connect(ctx context.Context, addr multiaddr.Multiaddr) error
func (n *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID, protocol libp2pproto.ID) error {
timeoutMs := getContextTimeoutMilliseconds(ctx)
if err := ffi.WakuDialPeerByID(n.wakuCtx, peerID.String(), string(protocol), timeoutMs); err != nil {
if err := libwaku.DialPeerByID(n.wakuCtx, peerID.String(), string(protocol), timeoutMs); err != nil {
return fmt.Errorf("error DialPeerById: %w", err)
}
return nil
}
func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) {
listenAddresses, err := ffi.WakuListenAddresses(n.wakuCtx)
listenAddresses, err := libwaku.ListenAddresses(n.wakuCtx)
if err != nil {
return nil, fmt.Errorf("error WakuListenAddresses: %w", err)
}
@ -622,7 +622,7 @@ func (n *WakuNode) ListenAddresses() ([]multiaddr.Multiaddr, error) {
}
func (n *WakuNode) ENR() (*enode.Node, error) {
enrStr, err := ffi.WakuGetMyENR(n.wakuCtx)
enrStr, err := libwaku.GetMyENR(n.wakuCtx)
if err != nil {
return nil, fmt.Errorf("error WakuGetMyENR: %w", err)
}
@ -635,7 +635,7 @@ func (n *WakuNode) ENR() (*enode.Node, error) {
}
func (n *WakuNode) GetNumPeersInMesh(pubsubTopic string) (int, error) {
numPeersStr, err := ffi.WakuGetNumPeersInMesh(n.wakuCtx, pubsubTopic)
numPeersStr, err := libwaku.GetNumPeersInMesh(n.wakuCtx, pubsubTopic)
if err != nil {
return 0, fmt.Errorf("error GetNumPeersInMesh: %w", err)
}
@ -649,7 +649,7 @@ func (n *WakuNode) GetNumPeersInMesh(pubsubTopic string) (int, error) {
}
func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) {
peersStr, err := ffi.WakuGetPeerIDsFromPeerStore(n.wakuCtx)
peersStr, err := libwaku.GetPeerIDsFromPeerStore(n.wakuCtx)
if err != nil {
return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", err.Error())
}
@ -672,7 +672,7 @@ func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) {
}
func (n *WakuNode) GetConnectedPeersInfo() (common.PeersData, error) {
jsonStr, err := ffi.WakuGetConnectedPeersInfo(n.wakuCtx)
jsonStr, err := libwaku.GetConnectedPeersInfo(n.wakuCtx)
if err != nil {
return nil, fmt.Errorf("GetConnectedPeersInfo: %s", err.Error())
}
@ -691,7 +691,7 @@ func (n *WakuNode) GetConnectedPeersInfo() (common.PeersData, error) {
}
func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) {
peersStr, err := ffi.WakuGetPeerIDsByProtocol(n.wakuCtx, string(protocol))
peersStr, err := libwaku.GetPeerIDsByProtocol(n.wakuCtx, string(protocol))
if err != nil {
return nil, fmt.Errorf("GetPeerIdsByProtocol: error GetPeerIdsByProtocol: %s", err.Error())
}
@ -716,7 +716,7 @@ func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice,
func (n *WakuNode) DialPeer(ctx context.Context, peerAddr multiaddr.Multiaddr, protocol libp2pproto.ID) error {
timeoutMs := getContextTimeoutMilliseconds(ctx)
if err := ffi.WakuDialPeer(n.wakuCtx, peerAddr.String(), string(protocol), timeoutMs); err != nil {
if err := libwaku.DialPeer(n.wakuCtx, peerAddr.String(), string(protocol), timeoutMs); err != nil {
return fmt.Errorf("error DialPeer: %w", err)
}
return nil
@ -934,7 +934,7 @@ func (n *WakuNode) IsOnline() (bool, error) {
Debug("Querying online state for %v", n.nodeName)
onlineStr, err := ffi.WakuIsOnline(n.wakuCtx)
onlineStr, err := libwaku.IsOnline(n.wakuCtx)
if err != nil {
errMsg := "error IsOnline: " + err.Error()
Error("Failed to query online state for %v: %v", n.nodeName, errMsg)
@ -953,7 +953,7 @@ func (n *WakuNode) GetMetrics() (string, error) {
Debug("Querying metrics for %v", n.nodeName)
metricsStr, err := ffi.WakuGetMetrics(n.wakuCtx)
metricsStr, err := libwaku.GetMetrics(n.wakuCtx)
if err != nil {
errMsg := "error GetMetrics: " + err.Error()
Error("Failed to query metrics for %v: %v", n.nodeName, errMsg)