mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-06 07:23:10 +00:00
Add peer connect
This commit is contained in:
parent
429e6d77a7
commit
86da158c48
@ -100,6 +100,10 @@ package main
|
|||||||
return codex_log_level(codexCtx, logLevel, (CodexCallback) callback, resp);
|
return codex_log_level(codexCtx, logLevel, (CodexCallback) callback, resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int cGoCodexConnect(void* codexCtx, char* peerId, const char** peerAddresses, uintptr_t peerAddressesSize, void* resp) {
|
||||||
|
return codex_connect(codexCtx, peerId, peerAddresses, peerAddressesSize, (CodexCallback) callback, resp);
|
||||||
|
}
|
||||||
|
|
||||||
static int cGoCodexStart(void* codexCtx, void* resp) {
|
static int cGoCodexStart(void* codexCtx, void* resp) {
|
||||||
return codex_start(codexCtx, (CodexCallback) callback, resp);
|
return codex_start(codexCtx, (CodexCallback) callback, resp);
|
||||||
}
|
}
|
||||||
@ -423,6 +427,27 @@ func (self *CodexNode) CodexLogLevel(logLevel LogLevel) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *CodexNode) CodexConnect(peerId string, peerAddresses []string) error {
|
||||||
|
bridge := newBridgeCtx()
|
||||||
|
defer bridge.free()
|
||||||
|
|
||||||
|
var cPeerId = C.CString(peerId)
|
||||||
|
defer C.free(unsafe.Pointer(cPeerId))
|
||||||
|
|
||||||
|
var cAddresses = make([]*C.char, len(peerAddresses))
|
||||||
|
for i, addr := range peerAddresses {
|
||||||
|
cAddresses[i] = C.CString(addr)
|
||||||
|
defer C.free(unsafe.Pointer(cAddresses[i]))
|
||||||
|
}
|
||||||
|
|
||||||
|
if C.cGoCodexConnect(self.ctx, cPeerId, &cAddresses[0], C.uintptr_t(len(peerAddresses)), bridge.resp) != C.RET_OK {
|
||||||
|
return bridge.CallError("cGoCodexConnect")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := bridge.wait()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (self *CodexNode) CodexStart() error {
|
func (self *CodexNode) CodexStart() error {
|
||||||
bridge := newBridgeCtx()
|
bridge := newBridgeCtx()
|
||||||
defer bridge.free()
|
defer bridge.free()
|
||||||
|
|||||||
@ -9,6 +9,7 @@ import ../ffi_types
|
|||||||
import ./requests/node_lifecycle_request
|
import ./requests/node_lifecycle_request
|
||||||
import ./requests/node_info_request
|
import ./requests/node_info_request
|
||||||
import ./requests/node_debug_request
|
import ./requests/node_debug_request
|
||||||
|
import ./requests/node_p2p_request
|
||||||
|
|
||||||
from ../../codex/codex import CodexServer
|
from ../../codex/codex import CodexServer
|
||||||
|
|
||||||
@ -16,6 +17,7 @@ type RequestType* {.pure.} = enum
|
|||||||
LIFECYCLE
|
LIFECYCLE
|
||||||
INFO
|
INFO
|
||||||
DEBUG
|
DEBUG
|
||||||
|
P2P
|
||||||
|
|
||||||
type CodexThreadRequest* = object
|
type CodexThreadRequest* = object
|
||||||
reqType: RequestType
|
reqType: RequestType
|
||||||
@ -88,6 +90,8 @@ proc process*(
|
|||||||
cast[ptr NodeInfoRequest](request[].reqContent).process(codex)
|
cast[ptr NodeInfoRequest](request[].reqContent).process(codex)
|
||||||
of RequestType.DEBUG:
|
of RequestType.DEBUG:
|
||||||
cast[ptr NodeDebugRequest](request[].reqContent).process(codex)
|
cast[ptr NodeDebugRequest](request[].reqContent).process(codex)
|
||||||
|
of P2P:
|
||||||
|
cast[ptr NodeP2PRequest](request[].reqContent).process(codex)
|
||||||
|
|
||||||
handleRes(await retFut, request)
|
handleRes(await retFut, request)
|
||||||
|
|
||||||
|
|||||||
91
library/codex_thread_requests/requests/node_p2p_request.nim
Normal file
91
library/codex_thread_requests/requests/node_p2p_request.nim
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
## This file contains the lifecycle request type that will be handled.
|
||||||
|
|
||||||
|
import std/[options]
|
||||||
|
import chronos
|
||||||
|
import chronicles
|
||||||
|
import ../../alloc
|
||||||
|
import libp2p
|
||||||
|
import ../../../codex/node
|
||||||
|
|
||||||
|
from ../../../codex/codex import CodexServer, node
|
||||||
|
|
||||||
|
type NodeP2PMsgType* = enum
|
||||||
|
CONNECT
|
||||||
|
|
||||||
|
type NodeP2PRequest* = object
|
||||||
|
operation: NodeP2PMsgType
|
||||||
|
peerId: cstring
|
||||||
|
peerAddresses: seq[cstring]
|
||||||
|
|
||||||
|
proc createShared*(
|
||||||
|
T: type NodeP2PRequest,
|
||||||
|
op: NodeP2PMsgType,
|
||||||
|
peerId: cstring = "",
|
||||||
|
peerAddresses: seq[cstring] = @[],
|
||||||
|
): ptr type T =
|
||||||
|
var ret = createShared(T)
|
||||||
|
ret[].operation = op
|
||||||
|
ret[].peerId = peerId.alloc()
|
||||||
|
ret[].peerAddresses = peerAddresses
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc destroyShared(self: ptr NodeP2PRequest) =
|
||||||
|
deallocShared(self[].peerId)
|
||||||
|
deallocShared(self)
|
||||||
|
|
||||||
|
proc connect(
|
||||||
|
codex: ptr CodexServer, peerId: cstring, peerAddresses: seq[cstring] = @[]
|
||||||
|
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||||
|
let node = codex[].node
|
||||||
|
let res = PeerId.init($peerId)
|
||||||
|
if res.isErr:
|
||||||
|
return err("Invalid peer ID: " & $res.error())
|
||||||
|
|
||||||
|
let id = res.get()
|
||||||
|
|
||||||
|
let addresses =
|
||||||
|
if peerAddresses.len > 0:
|
||||||
|
var addrs: seq[MultiAddress]
|
||||||
|
for addrStr in peerAddresses:
|
||||||
|
let res = MultiAddress.init($addrStr)
|
||||||
|
if res.isOk:
|
||||||
|
addrs.add(res[])
|
||||||
|
else:
|
||||||
|
return err("Invalid address: " & $addrStr)
|
||||||
|
addrs
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
let peerRecord = await node.findPeer(id)
|
||||||
|
if peerRecord.isNone:
|
||||||
|
return err("Peer not found")
|
||||||
|
|
||||||
|
peerRecord.get().addresses.mapIt(it.address)
|
||||||
|
except CancelledError as e:
|
||||||
|
return err("Operation cancelled")
|
||||||
|
except CatchableError as e:
|
||||||
|
return err("Error finding peer: " & $e.msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await node.connect(id, addresses)
|
||||||
|
except CancelledError as e:
|
||||||
|
return err("Operation cancelled")
|
||||||
|
except CatchableError as e:
|
||||||
|
return err("Connection failed: " & $e.msg)
|
||||||
|
|
||||||
|
return ok("")
|
||||||
|
|
||||||
|
proc process*(
|
||||||
|
self: ptr NodeP2PRequest, codex: ptr CodexServer
|
||||||
|
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||||
|
defer:
|
||||||
|
destroyShared(self)
|
||||||
|
|
||||||
|
case self.operation
|
||||||
|
of NodeP2PMsgType.CONNECT:
|
||||||
|
let res = (await connect(codex, self.peerId))
|
||||||
|
if res.isErr:
|
||||||
|
error "CONNECT failed", error = res.error
|
||||||
|
return err($res.error)
|
||||||
|
return res
|
||||||
|
|
||||||
|
return ok("")
|
||||||
@ -66,6 +66,14 @@ int codex_log_level(
|
|||||||
CodexCallback callback,
|
CodexCallback callback,
|
||||||
void* userData);
|
void* userData);
|
||||||
|
|
||||||
|
int codex_connect(
|
||||||
|
void* ctx,
|
||||||
|
const char* peerId,
|
||||||
|
const char** peerAddresses,
|
||||||
|
size_t peerAddressesSize,
|
||||||
|
CodexCallback callback,
|
||||||
|
void* userData);
|
||||||
|
|
||||||
int codex_start(void* ctx,
|
int codex_start(void* ctx,
|
||||||
CodexCallback callback,
|
CodexCallback callback,
|
||||||
void* userData);
|
void* userData);
|
||||||
|
|||||||
@ -32,6 +32,7 @@ import ./codex_thread_requests/codex_thread_request
|
|||||||
import ./codex_thread_requests/requests/node_lifecycle_request
|
import ./codex_thread_requests/requests/node_lifecycle_request
|
||||||
import ./codex_thread_requests/requests/node_info_request
|
import ./codex_thread_requests/requests/node_info_request
|
||||||
import ./codex_thread_requests/requests/node_debug_request
|
import ./codex_thread_requests/requests/node_debug_request
|
||||||
|
import ./codex_thread_requests/requests/node_p2p_request
|
||||||
import ./ffi_types
|
import ./ffi_types
|
||||||
|
|
||||||
from ../codex/conf import codexVersion, updateLogLevel
|
from ../codex/conf import codexVersion, updateLogLevel
|
||||||
@ -218,6 +219,35 @@ proc codex_log_level(
|
|||||||
|
|
||||||
return RET_OK
|
return RET_OK
|
||||||
|
|
||||||
|
proc codex_connect(
|
||||||
|
ctx: ptr CodexContext,
|
||||||
|
peerId: cstring,
|
||||||
|
peerAddressesPtr: ptr cstring,
|
||||||
|
peerAddressesLength: csize_t,
|
||||||
|
callback: CodexCallback,
|
||||||
|
userData: pointer,
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibcodexParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
var peerAddresses = newSeq[cstring](peerAddressesLength)
|
||||||
|
let peers = cast[ptr UncheckedArray[cstring]](peerAddressesPtr)
|
||||||
|
for i in 0 ..< peerAddressesLength:
|
||||||
|
peerAddresses[i] = peers[i]
|
||||||
|
|
||||||
|
let reqContent = NodeP2PRequest.createShared(
|
||||||
|
NodeP2PMsgType.CONNECT, peerId = peerId, peerAddresses = peerAddresses
|
||||||
|
)
|
||||||
|
|
||||||
|
codex_context.sendRequestToCodexThread(
|
||||||
|
ctx, RequestType.P2P, reqContent, callback, userData
|
||||||
|
).isOkOr:
|
||||||
|
let msg = "libcodex error: " & $error
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
return RET_OK
|
||||||
|
|
||||||
proc codex_destroy(
|
proc codex_destroy(
|
||||||
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
|
ctx: ptr CodexContext, callback: CodexCallback, userData: pointer
|
||||||
): cint {.dynlib, exportc.} =
|
): cint {.dynlib, exportc.} =
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user