From 86da158c486eff9d0bb285781e23959c32b24a5a Mon Sep 17 00:00:00 2001 From: Arnaud Date: Wed, 17 Sep 2025 13:25:30 +0200 Subject: [PATCH] Add peer connect --- examples/golang/codex.go | 25 +++++ .../codex_thread_request.nim | 4 + .../requests/node_p2p_request.nim | 91 +++++++++++++++++++ library/libcodex.h | 8 ++ library/libcodex.nim | 30 ++++++ 5 files changed, 158 insertions(+) create mode 100644 library/codex_thread_requests/requests/node_p2p_request.nim diff --git a/examples/golang/codex.go b/examples/golang/codex.go index a5ed015f..01a88a02 100644 --- a/examples/golang/codex.go +++ b/examples/golang/codex.go @@ -100,6 +100,10 @@ package main return codex_log_level(codexCtx, logLevel, (CodexCallback) callback, resp); } + static int cGoCodexConnect(void* codexCtx, char* peerId, const char** peerAddresses, uintptr_t peerAddressesSize, void* resp) { + return codex_connect(codexCtx, peerId, peerAddresses, peerAddressesSize, (CodexCallback) callback, resp); + } + static int cGoCodexStart(void* codexCtx, void* resp) { return codex_start(codexCtx, (CodexCallback) callback, resp); } @@ -423,6 +427,27 @@ func (self *CodexNode) CodexLogLevel(logLevel LogLevel) error { return err } +func (self *CodexNode) CodexConnect(peerId string, peerAddresses []string) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cPeerId = C.CString(peerId) + defer C.free(unsafe.Pointer(cPeerId)) + + var cAddresses = make([]*C.char, len(peerAddresses)) + for i, addr := range peerAddresses { + cAddresses[i] = C.CString(addr) + defer C.free(unsafe.Pointer(cAddresses[i])) + } + + if C.cGoCodexConnect(self.ctx, cPeerId, &cAddresses[0], C.uintptr_t(len(peerAddresses)), bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexConnect") + } + + _, err := bridge.wait() + return err +} + func (self *CodexNode) CodexStart() error { bridge := newBridgeCtx() defer bridge.free() diff --git a/library/codex_thread_requests/codex_thread_request.nim b/library/codex_thread_requests/codex_thread_request.nim index ecf75448..c49622b1 100644 --- a/library/codex_thread_requests/codex_thread_request.nim +++ b/library/codex_thread_requests/codex_thread_request.nim @@ -9,6 +9,7 @@ import ../ffi_types import ./requests/node_lifecycle_request import ./requests/node_info_request import ./requests/node_debug_request +import ./requests/node_p2p_request from ../../codex/codex import CodexServer @@ -16,6 +17,7 @@ type RequestType* {.pure.} = enum LIFECYCLE INFO DEBUG + P2P type CodexThreadRequest* = object reqType: RequestType @@ -88,6 +90,8 @@ proc process*( cast[ptr NodeInfoRequest](request[].reqContent).process(codex) of RequestType.DEBUG: cast[ptr NodeDebugRequest](request[].reqContent).process(codex) + of P2P: + cast[ptr NodeP2PRequest](request[].reqContent).process(codex) handleRes(await retFut, request) diff --git a/library/codex_thread_requests/requests/node_p2p_request.nim b/library/codex_thread_requests/requests/node_p2p_request.nim new file mode 100644 index 00000000..5291c138 --- /dev/null +++ b/library/codex_thread_requests/requests/node_p2p_request.nim @@ -0,0 +1,91 @@ +## This file contains the lifecycle request type that will be handled. + +import std/[options] +import chronos +import chronicles +import ../../alloc +import libp2p +import ../../../codex/node + +from ../../../codex/codex import CodexServer, node + +type NodeP2PMsgType* = enum + CONNECT + +type NodeP2PRequest* = object + operation: NodeP2PMsgType + peerId: cstring + peerAddresses: seq[cstring] + +proc createShared*( + T: type NodeP2PRequest, + op: NodeP2PMsgType, + peerId: cstring = "", + peerAddresses: seq[cstring] = @[], +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].peerId = peerId.alloc() + ret[].peerAddresses = peerAddresses + return ret + +proc destroyShared(self: ptr NodeP2PRequest) = + deallocShared(self[].peerId) + deallocShared(self) + +proc connect( + codex: ptr CodexServer, peerId: cstring, peerAddresses: seq[cstring] = @[] +): Future[Result[string, string]] {.async: (raises: []).} = + let node = codex[].node + let res = PeerId.init($peerId) + if res.isErr: + return err("Invalid peer ID: " & $res.error()) + + let id = res.get() + + let addresses = + if peerAddresses.len > 0: + var addrs: seq[MultiAddress] + for addrStr in peerAddresses: + let res = MultiAddress.init($addrStr) + if res.isOk: + addrs.add(res[]) + else: + return err("Invalid address: " & $addrStr) + addrs + else: + try: + let peerRecord = await node.findPeer(id) + if peerRecord.isNone: + return err("Peer not found") + + peerRecord.get().addresses.mapIt(it.address) + except CancelledError as e: + return err("Operation cancelled") + except CatchableError as e: + return err("Error finding peer: " & $e.msg) + + try: + await node.connect(id, addresses) + except CancelledError as e: + return err("Operation cancelled") + except CatchableError as e: + return err("Connection failed: " & $e.msg) + + return ok("") + +proc process*( + self: ptr NodeP2PRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeP2PMsgType.CONNECT: + let res = (await connect(codex, self.peerId)) + if res.isErr: + error "CONNECT failed", error = res.error + return err($res.error) + return res + + return ok("") diff --git a/library/libcodex.h b/library/libcodex.h index 3623d446..026a317c 100644 --- a/library/libcodex.h +++ b/library/libcodex.h @@ -66,6 +66,14 @@ int codex_log_level( CodexCallback callback, void* userData); +int codex_connect( + void* ctx, + const char* peerId, + const char** peerAddresses, + size_t peerAddressesSize, + CodexCallback callback, + void* userData); + int codex_start(void* ctx, CodexCallback callback, void* userData); diff --git a/library/libcodex.nim b/library/libcodex.nim index eb1326d8..a4402024 100644 --- a/library/libcodex.nim +++ b/library/libcodex.nim @@ -32,6 +32,7 @@ import ./codex_thread_requests/codex_thread_request import ./codex_thread_requests/requests/node_lifecycle_request import ./codex_thread_requests/requests/node_info_request import ./codex_thread_requests/requests/node_debug_request +import ./codex_thread_requests/requests/node_p2p_request import ./ffi_types from ../codex/conf import codexVersion, updateLogLevel @@ -218,6 +219,35 @@ proc codex_log_level( return RET_OK +proc codex_connect( + ctx: ptr CodexContext, + peerId: cstring, + peerAddressesPtr: ptr cstring, + peerAddressesLength: csize_t, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + var peerAddresses = newSeq[cstring](peerAddressesLength) + let peers = cast[ptr UncheckedArray[cstring]](peerAddressesPtr) + for i in 0 ..< peerAddressesLength: + peerAddresses[i] = peers[i] + + let reqContent = NodeP2PRequest.createShared( + NodeP2PMsgType.CONNECT, peerId = peerId, peerAddresses = peerAddresses + ) + + codex_context.sendRequestToCodexThread( + ctx, RequestType.P2P, reqContent, callback, userData + ).isOkOr: + let msg = "libcodex error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + return RET_OK + proc codex_destroy( ctx: ptr CodexContext, callback: CodexCallback, userData: pointer ): cint {.dynlib, exportc.} =