mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-07 16:03:13 +00:00
Provide better logs (hopefully)
This commit is contained in:
parent
e8b76e160a
commit
6bf2c48dea
@ -95,23 +95,28 @@ proc sendRequestToCodexThread*(
|
|||||||
let sentOk = ctx.reqChannel.trySend(req)
|
let sentOk = ctx.reqChannel.trySend(req)
|
||||||
if not sentOk:
|
if not sentOk:
|
||||||
deallocShared(req)
|
deallocShared(req)
|
||||||
return err("Couldn't send a request to the codex thread: " & $req[])
|
return err("Failed to send request to the codex thread: " & $req[])
|
||||||
|
|
||||||
# Notify the Codex thread that a request is available
|
# Notify the Codex thread that a request is available
|
||||||
let fireSyncRes = ctx.reqSignal.fireSync()
|
let fireSyncRes = ctx.reqSignal.fireSync()
|
||||||
if fireSyncRes.isErr():
|
if fireSyncRes.isErr():
|
||||||
deallocShared(req)
|
deallocShared(req)
|
||||||
return err("failed fireSync: " & $fireSyncRes.error)
|
return err(
|
||||||
|
"Failed to send request to the codex thread: unable to fireSync: " &
|
||||||
|
$fireSyncRes.error
|
||||||
|
)
|
||||||
|
|
||||||
if fireSyncRes.get() == false:
|
if fireSyncRes.get() == false:
|
||||||
deallocShared(req)
|
deallocShared(req)
|
||||||
return err("Couldn't fireSync in time")
|
return err("Failed to send request to the codex thread: fireSync timed out.")
|
||||||
|
|
||||||
# Wait until the Codex Thread properly received the request
|
# Wait until the Codex Thread properly received the request
|
||||||
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
||||||
if res.isErr():
|
if res.isErr():
|
||||||
deallocShared(req)
|
deallocShared(req)
|
||||||
return err("Couldn't receive reqReceivedSignal signal")
|
return err(
|
||||||
|
"Failed to send request to the codex thread: unable to receive reqReceivedSignal signal."
|
||||||
|
)
|
||||||
|
|
||||||
## Notice that in case of "ok", the deallocShared(req) is performed by the Codex Thread in the
|
## Notice that in case of "ok", the deallocShared(req) is performed by the Codex Thread in the
|
||||||
## process proc. See the 'codex_thread_request.nim' module for more details.
|
## process proc. See the 'codex_thread_request.nim' module for more details.
|
||||||
@ -125,7 +130,7 @@ proc runCodex(ctx: ptr CodexContext) {.async: (raises: []).} =
|
|||||||
# Wait until a request is available
|
# Wait until a request is available
|
||||||
await ctx.reqSignal.wait()
|
await ctx.reqSignal.wait()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error "codex thread error while waiting for reqSignal", error = e.msg
|
error "Failed to run codex thread while waiting for reqSignal.", error = e.msg
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# If codex_destroy was called, exit the loop
|
# If codex_destroy was called, exit the loop
|
||||||
@ -137,7 +142,7 @@ proc runCodex(ctx: ptr CodexContext) {.async: (raises: []).} =
|
|||||||
# Pop a request from the channel
|
# Pop a request from the channel
|
||||||
let recvOk = ctx.reqChannel.tryRecv(request)
|
let recvOk = ctx.reqChannel.tryRecv(request)
|
||||||
if not recvOk:
|
if not recvOk:
|
||||||
error "codex thread could not receive a request"
|
error "Failed to run codex: unable to receive request in codex thread."
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# yield immediately to the event loop
|
# yield immediately to the event loop
|
||||||
@ -152,7 +157,8 @@ proc runCodex(ctx: ptr CodexContext) {.async: (raises: []).} =
|
|||||||
# Notify the main thread that we picked up the request
|
# Notify the main thread that we picked up the request
|
||||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||||
if fireRes.isErr():
|
if fireRes.isErr():
|
||||||
error "could not fireSync back to requester thread", error = fireRes.error
|
error "Failed to run codex: unable to fire back to requester thread.",
|
||||||
|
error = fireRes.error
|
||||||
|
|
||||||
proc run(ctx: ptr CodexContext) {.thread.} =
|
proc run(ctx: ptr CodexContext) {.thread.} =
|
||||||
waitFor runCodex(ctx)
|
waitFor runCodex(ctx)
|
||||||
@ -167,12 +173,15 @@ proc createCodexContext*(): Result[ptr CodexContext, string] =
|
|||||||
# This signal is used by the main side to wake the Codex thread
|
# This signal is used by the main side to wake the Codex thread
|
||||||
# when a new request is enqueued.
|
# when a new request is enqueued.
|
||||||
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
||||||
return err("couldn't create reqSignal ThreadSignalPtr")
|
return
|
||||||
|
err("Failed to create a context: unable to create reqSignal ThreadSignalPtr.")
|
||||||
|
|
||||||
# Used to let the caller know that the Codex thread has
|
# Used to let the caller know that the Codex thread has
|
||||||
# acknowledged / picked up a request (like a handshake).
|
# acknowledged / picked up a request (like a handshake).
|
||||||
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
|
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
|
||||||
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
|
return err(
|
||||||
|
"Failed to create codex context: unable to create reqReceivedSignal ThreadSignalPtr."
|
||||||
|
)
|
||||||
|
|
||||||
# Protects shared state inside CodexContext
|
# Protects shared state inside CodexContext
|
||||||
ctx.lock.initLock()
|
ctx.lock.initLock()
|
||||||
@ -184,7 +193,10 @@ proc createCodexContext*(): Result[ptr CodexContext, string] =
|
|||||||
createThread(ctx.thread, run, ctx)
|
createThread(ctx.thread, run, ctx)
|
||||||
except ValueError, ResourceExhaustedError:
|
except ValueError, ResourceExhaustedError:
|
||||||
freeShared(ctx)
|
freeShared(ctx)
|
||||||
return err("failed to create the Codex thread: " & getCurrentExceptionMsg())
|
return err(
|
||||||
|
"Failed to create codex context: unable to create thread: " &
|
||||||
|
getCurrentExceptionMsg()
|
||||||
|
)
|
||||||
|
|
||||||
return ok(ctx)
|
return ok(ctx)
|
||||||
|
|
||||||
@ -194,10 +206,12 @@ proc destroyCodexContext*(ctx: ptr CodexContext): Result[void, string] =
|
|||||||
|
|
||||||
# Wake the worker up if it's waiting
|
# Wake the worker up if it's waiting
|
||||||
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
|
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
|
||||||
return err("error in destroyCodexContext: " & $error)
|
return err("Failed to destroy codex context: " & $error)
|
||||||
|
|
||||||
if not signaledOnTime:
|
if not signaledOnTime:
|
||||||
return err("failed to signal reqSignal on time in destroyCodexContext")
|
return err(
|
||||||
|
"Failed to destroy codex context: unable to get signal reqSignal on time in destroyCodexContext."
|
||||||
|
)
|
||||||
|
|
||||||
# Wait for the thread to finish
|
# Wait for the thread to finish
|
||||||
joinThread(ctx.thread)
|
joinThread(ctx.thread)
|
||||||
|
|||||||
@ -64,7 +64,7 @@ proc handleRes[T: string | void](
|
|||||||
|
|
||||||
if res.isErr():
|
if res.isErr():
|
||||||
foreignThreadGc:
|
foreignThreadGc:
|
||||||
let msg = "libcodex error: handleRes fireSyncRes error: " & $res.error
|
let msg = $res.error
|
||||||
request[].callback(
|
request[].callback(
|
||||||
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
|
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
|
||||||
)
|
)
|
||||||
|
|||||||
@ -64,22 +64,22 @@ proc getPeer(
|
|||||||
let node = codex[].node
|
let node = codex[].node
|
||||||
let res = PeerId.init($peerId)
|
let res = PeerId.init($peerId)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
return err("Invalid peer ID " & $peerId & ": " & $res.error())
|
return err("Failed to get peer: invalid peer ID " & $peerId & ": " & $res.error())
|
||||||
|
|
||||||
let id = res.get()
|
let id = res.get()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
let peerRecord = await node.findPeer(id)
|
let peerRecord = await node.findPeer(id)
|
||||||
if peerRecord.isNone:
|
if peerRecord.isNone:
|
||||||
return err("Peer not found")
|
return err("Failed to get peer: peer not found")
|
||||||
|
|
||||||
return ok($ %RestPeerRecord.init(peerRecord.get()))
|
return ok($ %RestPeerRecord.init(peerRecord.get()))
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
return err("Operation cancelled")
|
return err("Failed to get peer: operation cancelled")
|
||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
return err("Error when finding peer: " & e.msg)
|
return err("Failed to get peer: " & e.msg)
|
||||||
else:
|
else:
|
||||||
return err("Peer debug API is disabled")
|
return err("Failed to get peer: peer debug API is disabled")
|
||||||
|
|
||||||
proc process*(
|
proc process*(
|
||||||
self: ptr NodeDebugRequest, codex: ptr CodexServer
|
self: ptr NodeDebugRequest, codex: ptr CodexServer
|
||||||
@ -91,13 +91,13 @@ proc process*(
|
|||||||
of NodeDebugMsgType.DEBUG:
|
of NodeDebugMsgType.DEBUG:
|
||||||
let res = (await getDebug(codex))
|
let res = (await getDebug(codex))
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
error "DEBUG failed", error = res.error
|
error "Failed to get DEBUG.", error = res.error
|
||||||
return err($res.error)
|
return err($res.error)
|
||||||
return res
|
return res
|
||||||
of NodeDebugMsgType.PEER:
|
of NodeDebugMsgType.PEER:
|
||||||
let res = (await getPeer(codex, self.peerId))
|
let res = (await getPeer(codex, self.peerId))
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
error "PEER failed", error = res.error
|
error "Failed to get PEER.", error = res.error
|
||||||
return err($res.error)
|
return err($res.error)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|||||||
@ -40,7 +40,7 @@ proc getSpr(
|
|||||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||||
let spr = codex[].node.discovery.dhtRecord
|
let spr = codex[].node.discovery.dhtRecord
|
||||||
if spr.isNone:
|
if spr.isNone:
|
||||||
return err("No SPR record found")
|
return err("Failed to get SPR: no SPR record found.")
|
||||||
|
|
||||||
return ok(spr.get.toURI)
|
return ok(spr.get.toURI)
|
||||||
|
|
||||||
@ -59,19 +59,19 @@ proc process*(
|
|||||||
of REPO:
|
of REPO:
|
||||||
let res = (await getRepo(codex))
|
let res = (await getRepo(codex))
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
error "REPO failed", error = res.error
|
error "Failed to get REPO.", error = res.error
|
||||||
return err($res.error)
|
return err($res.error)
|
||||||
return res
|
return res
|
||||||
of SPR:
|
of SPR:
|
||||||
let res = (await getSpr(codex))
|
let res = (await getSpr(codex))
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
error "SPR failed", error = res.error
|
error "Failed to get SPR.", error = res.error
|
||||||
return err($res.error)
|
return err($res.error)
|
||||||
return res
|
return res
|
||||||
of PEERID:
|
of PEERID:
|
||||||
let res = (await getPeerId(codex))
|
let res = (await getPeerId(codex))
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
error "PEERID failed", error = res.error
|
error "Failed to get PEERID.", error = res.error
|
||||||
return err($res.error)
|
return err($res.error)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|||||||
@ -112,7 +112,7 @@ proc createCodex(
|
|||||||
,
|
,
|
||||||
)
|
)
|
||||||
except ConfigurationError as e:
|
except ConfigurationError as e:
|
||||||
return err("Failed to load configuration: " & e.msg)
|
return err("Failed to create codex: unable to load configuration: " & e.msg)
|
||||||
|
|
||||||
conf.setupLogging()
|
conf.setupLogging()
|
||||||
conf.setupMetrics()
|
conf.setupMetrics()
|
||||||
@ -121,30 +121,31 @@ proc createCodex(
|
|||||||
# We are unable to access/create data folder or data folder's
|
# We are unable to access/create data folder or data folder's
|
||||||
# permissions are insecure.
|
# permissions are insecure.
|
||||||
return err(
|
return err(
|
||||||
"Unable to access/create data folder or data folder's permissions are insecure."
|
"Failed to create codex: unable to access/create data folder or data folder's permissions are insecure."
|
||||||
)
|
)
|
||||||
|
|
||||||
if not (checkAndCreateDataDir((conf.dataDir / "repo"))):
|
if not (checkAndCreateDataDir((conf.dataDir / "repo"))):
|
||||||
# We are unable to access/create data folder or data folder's
|
# We are unable to access/create data folder or data folder's
|
||||||
# permissions are insecure.
|
# permissions are insecure.
|
||||||
return err(
|
return err(
|
||||||
"Unable to access/create data folder or data folder's permissions are insecure."
|
"Failed to create codex: unable to access/create data folder or data folder's permissions are insecure."
|
||||||
)
|
)
|
||||||
|
|
||||||
debug "Repo dir initialized", dir = conf.dataDir / "repo"
|
|
||||||
|
|
||||||
let keyPath =
|
let keyPath =
|
||||||
if isAbsolute(conf.netPrivKeyFile):
|
if isAbsolute(conf.netPrivKeyFile):
|
||||||
conf.netPrivKeyFile
|
conf.netPrivKeyFile
|
||||||
else:
|
else:
|
||||||
conf.dataDir / conf.netPrivKeyFile
|
conf.dataDir / conf.netPrivKeyFile
|
||||||
let privateKey = setupKey(keyPath).expect("Should setup private key!")
|
let privateKey = setupKey(keyPath)
|
||||||
|
if privateKey.isErr:
|
||||||
|
return err("Failed to create codex: unable to get the private key.")
|
||||||
|
let pk = privateKey.get()
|
||||||
|
|
||||||
let server =
|
let server =
|
||||||
try:
|
try:
|
||||||
CodexServer.new(conf, privateKey)
|
CodexServer.new(conf, pk)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
return err("Failed to start Codex: " & exc.msg)
|
return err("Failed to create codex: " & exc.msg)
|
||||||
|
|
||||||
return ok(server)
|
return ok(server)
|
||||||
|
|
||||||
@ -161,19 +162,19 @@ proc process*(
|
|||||||
self.configJson # , self.appCallbacks
|
self.configJson # , self.appCallbacks
|
||||||
)
|
)
|
||||||
).valueOr:
|
).valueOr:
|
||||||
error "CREATE_NODE failed", error = error
|
error "Failed to CREATE_NODE.", error = error
|
||||||
return err($error)
|
return err($error)
|
||||||
of START_NODE:
|
of START_NODE:
|
||||||
try:
|
try:
|
||||||
await codex[].start()
|
await codex[].start()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error "START_NODE failed", error = e.msg
|
error "Failed to START_NODE.", error = e.msg
|
||||||
return err(e.msg)
|
return err(e.msg)
|
||||||
of STOP_NODE:
|
of STOP_NODE:
|
||||||
try:
|
try:
|
||||||
await codex[].stop()
|
await codex[].stop()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error "STOP_NODE failed", error = e.msg
|
error "Failed to STOP_NODE.", error = e.msg
|
||||||
return err(e.msg)
|
return err(e.msg)
|
||||||
|
|
||||||
return ok("")
|
return ok("")
|
||||||
|
|||||||
@ -45,7 +45,7 @@ proc connect(
|
|||||||
let node = codex[].node
|
let node = codex[].node
|
||||||
let res = PeerId.init($peerId)
|
let res = PeerId.init($peerId)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
return err("Invalid peer ID: " & $res.error())
|
return err("Failed to connect to peer: invalid peer ID: " & $res.error())
|
||||||
|
|
||||||
let id = res.get()
|
let id = res.get()
|
||||||
|
|
||||||
@ -57,26 +57,26 @@ proc connect(
|
|||||||
if res.isOk:
|
if res.isOk:
|
||||||
addrs.add(res[])
|
addrs.add(res[])
|
||||||
else:
|
else:
|
||||||
return err("Invalid address: " & $addrStr)
|
return err("Failed to connect to peer: invalid address: " & $addrStr)
|
||||||
addrs
|
addrs
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
let peerRecord = await node.findPeer(id)
|
let peerRecord = await node.findPeer(id)
|
||||||
if peerRecord.isNone:
|
if peerRecord.isNone:
|
||||||
return err("Peer not found")
|
return err("Failed to connect to peer: peer not found.")
|
||||||
|
|
||||||
peerRecord.get().addresses.mapIt(it.address)
|
peerRecord.get().addresses.mapIt(it.address)
|
||||||
except CancelledError as e:
|
except CancelledError as e:
|
||||||
return err("Operation cancelled")
|
return err("Failed to connect to peer: operation cancelled.")
|
||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
return err("Error finding peer: " & $e.msg)
|
return err("Failed to connect to peer: " & $e.msg)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await node.connect(id, addresses)
|
await node.connect(id, addresses)
|
||||||
except CancelledError as e:
|
except CancelledError as e:
|
||||||
return err("Operation cancelled")
|
return err("Failed to connect to peer: operation cancelled.")
|
||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
return err("Connection failed: " & $e.msg)
|
return err("Failed to connect to peer: " & $e.msg)
|
||||||
|
|
||||||
return ok("")
|
return ok("")
|
||||||
|
|
||||||
@ -90,7 +90,7 @@ proc process*(
|
|||||||
of NodeP2PMsgType.CONNECT:
|
of NodeP2PMsgType.CONNECT:
|
||||||
let res = (await connect(codex, self.peerId))
|
let res = (await connect(codex, self.peerId))
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
error "CONNECT failed", error = res.error
|
error "Failed to CONNECT.", error = res.error
|
||||||
return err($res.error)
|
return err($res.error)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|||||||
@ -85,11 +85,11 @@ proc codex_new(
|
|||||||
initializeLibrary()
|
initializeLibrary()
|
||||||
|
|
||||||
if isNil(callback):
|
if isNil(callback):
|
||||||
error "Missing callback in codex_new"
|
error "Failed to create codex instance: the callback is missing."
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
var ctx = codex_context.createCodexContext().valueOr:
|
var ctx = codex_context.createCodexContext().valueOr:
|
||||||
let msg = "Error in createCodexContext: " & $error
|
let msg = $error
|
||||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
@ -101,7 +101,7 @@ proc codex_new(
|
|||||||
codex_context.sendRequestToCodexThread(
|
codex_context.sendRequestToCodexThread(
|
||||||
ctx, RequestType.LIFECYCLE, reqContent, callback, userData
|
ctx, RequestType.LIFECYCLE, reqContent, callback, userData
|
||||||
).isOkOr:
|
).isOkOr:
|
||||||
let msg = "libcodex error: " & $error
|
let msg = $error
|
||||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user