Add peer connect

This commit is contained in:
Arnaud 2025-09-17 13:25:30 +02:00 committed by Eric
parent 65a595c3fe
commit 3f2c7b776e
No known key found for this signature in database
5 changed files with 158 additions and 0 deletions

View File

@ -100,6 +100,10 @@ package main
return codex_log_level(codexCtx, logLevel, (CodexCallback) callback, resp);
}
static int cGoCodexConnect(void* codexCtx, char* peerId, const char** peerAddresses, uintptr_t peerAddressesSize, void* resp) {
return codex_connect(codexCtx, peerId, peerAddresses, peerAddressesSize, (CodexCallback) callback, resp);
}
static int cGoCodexStart(void* codexCtx, void* resp) {
return codex_start(codexCtx, (CodexCallback) callback, resp);
}
@ -423,6 +427,27 @@ func (self *CodexNode) CodexLogLevel(logLevel LogLevel) error {
return err
}
func (self *CodexNode) CodexConnect(peerId string, peerAddresses []string) error {
bridge := newBridgeCtx()
defer bridge.free()
var cPeerId = C.CString(peerId)
defer C.free(unsafe.Pointer(cPeerId))
var cAddresses = make([]*C.char, len(peerAddresses))
for i, addr := range peerAddresses {
cAddresses[i] = C.CString(addr)
defer C.free(unsafe.Pointer(cAddresses[i]))
}
if C.cGoCodexConnect(self.ctx, cPeerId, &cAddresses[0], C.uintptr_t(len(peerAddresses)), bridge.resp) != C.RET_OK {
return bridge.CallError("cGoCodexConnect")
}
_, err := bridge.wait()
return err
}
func (self *CodexNode) CodexStart() error {
bridge := newBridgeCtx()
defer bridge.free()

View File

@ -9,6 +9,7 @@ import ../ffi_types
import ./requests/node_lifecycle_request
import ./requests/node_info_request
import ./requests/node_debug_request
import ./requests/node_p2p_request
from ../../codex/codex import CodexServer
@ -16,6 +17,7 @@ type RequestType* {.pure.} = enum
LIFECYCLE
INFO
DEBUG
P2P
type CodexThreadRequest* = object
reqType: RequestType
@ -88,6 +90,8 @@ proc process*(
cast[ptr NodeInfoRequest](request[].reqContent).process(codex)
of RequestType.DEBUG:
cast[ptr NodeDebugRequest](request[].reqContent).process(codex)
of P2P:
cast[ptr NodeP2PRequest](request[].reqContent).process(codex)
handleRes(await retFut, request)

View File

@ -0,0 +1,91 @@
## This file contains the lifecycle request type that will be handled.
import std/[options]
import chronos
import chronicles
import ../../alloc
import libp2p
import ../../../codex/node
from ../../../codex/codex import CodexServer, node
type NodeP2PMsgType* = enum
CONNECT
type NodeP2PRequest* = object
operation: NodeP2PMsgType
peerId: cstring
peerAddresses: seq[cstring]
proc createShared*(
T: type NodeP2PRequest,
op: NodeP2PMsgType,
peerId: cstring = "",
peerAddresses: seq[cstring] = @[],
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].peerId = peerId.alloc()
ret[].peerAddresses = peerAddresses
return ret
proc destroyShared(self: ptr NodeP2PRequest) =
deallocShared(self[].peerId)
deallocShared(self)
proc connect(
codex: ptr CodexServer, peerId: cstring, peerAddresses: seq[cstring] = @[]
): Future[Result[string, string]] {.async: (raises: []).} =
let node = codex[].node
let res = PeerId.init($peerId)
if res.isErr:
return err("Invalid peer ID: " & $res.error())
let id = res.get()
let addresses =
if peerAddresses.len > 0:
var addrs: seq[MultiAddress]
for addrStr in peerAddresses:
let res = MultiAddress.init($addrStr)
if res.isOk:
addrs.add(res[])
else:
return err("Invalid address: " & $addrStr)
addrs
else:
try:
let peerRecord = await node.findPeer(id)
if peerRecord.isNone:
return err("Peer not found")
peerRecord.get().addresses.mapIt(it.address)
except CancelledError as e:
return err("Operation cancelled")
except CatchableError as e:
return err("Error finding peer: " & $e.msg)
try:
await node.connect(id, addresses)
except CancelledError as e:
return err("Operation cancelled")
except CatchableError as e:
return err("Connection failed: " & $e.msg)
return ok("")
proc process*(
self: ptr NodeP2PRequest, codex: ptr CodexServer
): Future[Result[string, string]] {.async: (raises: []).} =
defer:
destroyShared(self)
case self.operation
of NodeP2PMsgType.CONNECT:
let res = (await connect(codex, self.peerId))
if res.isErr:
error "CONNECT failed", error = res.error
return err($res.error)
return res
return ok("")

View File

@ -66,6 +66,14 @@ int codex_log_level(
CodexCallback callback,
void* userData);
int codex_connect(
void* ctx,
const char* peerId,
const char** peerAddresses,
size_t peerAddressesSize,
CodexCallback callback,
void* userData);
int codex_start(void* ctx,
CodexCallback callback,
void* userData);

View File

@ -32,6 +32,7 @@ import ./codex_thread_requests/codex_thread_request
import ./codex_thread_requests/requests/node_lifecycle_request
import ./codex_thread_requests/requests/node_info_request
import ./codex_thread_requests/requests/node_debug_request
import ./codex_thread_requests/requests/node_p2p_request
import ./ffi_types
from ../codex/conf import codexVersion, updateLogLevel
@ -218,6 +219,35 @@ proc codex_log_level(
return RET_OK
proc codex_connect(
ctx: ptr CodexContext,
peerId: cstring,
peerAddressesPtr: ptr cstring,
peerAddressesLength: csize_t,
callback: CodexCallback,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibcodexParams(ctx, callback, userData)
var peerAddresses = newSeq[cstring](peerAddressesLength)
let peers = cast[ptr UncheckedArray[cstring]](peerAddressesPtr)
for i in 0 ..< peerAddressesLength:
peerAddresses[i] = peers[i]
let reqContent = NodeP2PRequest.createShared(
NodeP2PMsgType.CONNECT, peerId = peerId, peerAddresses = peerAddresses
)
codex_context.sendRequestToCodexThread(
ctx, RequestType.P2P, reqContent, callback, userData
).isOkOr:
let msg = "libcodex error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
return RET_OK
proc codex_destroy(
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
): cint {.dynlib, exportc.} =