diff --git a/codex.nim b/codex.nim index d6c17ee1..1d163bfc 100644 --- a/codex.nim +++ b/codex.nim @@ -23,6 +23,7 @@ import ./codex/conf import ./codex/codex import ./codex/units import ./codex/utils/keyutils +import ./codex/loopmeasure export codex, conf, libp2p, chronos, chronicles @@ -90,7 +91,8 @@ when isMainModule: config.dataDir / config.netPrivKeyFile privateKey = setupKey(keyPath).expect("Should setup private key!") - server = CodexServer.new(config, privateKey) + loopMeasure = LoopMeasure.new() + server = CodexServer.new(config, privateKey, loopMeasure) ## Ctrl+C handling proc controlCHandler() {.noconv.} = @@ -101,7 +103,7 @@ when isMainModule: except Exception as exc: raiseAssert exc.msg # shouldn't happen notice "Shutting down after having received SIGINT" - # pendingFuts.add(server.stop()) + pendingFuts.add(server.stop()) state = CodexStatus.Stopping notice "Stopping Codex" @@ -116,7 +118,7 @@ when isMainModule: proc SIGTERMHandler(signal: cint) {.noconv.} = notice "Shutting down after having received SIGTERM" - # pendingFuts.add(server.stop()) + pendingFuts.add(server.stop()) state = CodexStatus.Stopping notice "Stopping Codex" @@ -128,15 +130,17 @@ when isMainModule: state = CodexStatus.Running while state == CodexStatus.Running: # poll chronos + loopMeasure.startMeasure() chronos.poll() + loopMeasure.stopMeasure() # wait fot futures to finish - # let res = waitFor allFinished(pendingFuts) + let res = waitFor allFinished(pendingFuts) state = CodexStatus.Stopped - # if res.anyIt( it.failed ): - # error "Codex didn't shutdown correctly" - # quit QuitFailure + if res.anyIt( it.failed ): + error "Codex didn't shutdown correctly" + quit QuitFailure notice "Exited codex" diff --git a/codex/codex.nim b/codex/codex.nim index fd5dcd2f..692ccc81 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -22,8 +22,6 @@ import pkg/stew/io2 import pkg/stew/shims/net as stewnet import pkg/datastore -import std/nimprof - import ./node import ./conf import ./rng @@ -38,6 +36,7 @@ import ./contracts/clock import ./contracts/deployment import ./utils/addrutils import ./namespaces +import ./loopmeasure logScope: topics = "codex node" @@ -57,9 +56,9 @@ proc bootstrapInteractions( config: CodexConf, repo: RepoStore ): Future[Contracts] {.async.} = - ## bootstrap interactions and return contracts + ## bootstrap interactions and return contracts ## using clients, hosts, validators pairings - ## + ## if not config.persistence and not config.validator: if config.ethAccount.isSome: @@ -158,7 +157,8 @@ proc stop*(s: CodexServer) {.async.} = proc new*( T: type CodexServer, config: CodexConf, - privateKey: CodexPrivateKey + privateKey: CodexPrivateKey, + loopMeasure: LoopMeasure ): CodexServer = ## create CodexServer including setting up datastore, repostore, etc let @@ -232,7 +232,7 @@ proc new*( erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery) restServer = RestServerRef.new( - codexNode.initRestApi(config), + codexNode.initRestApi(config, loopMeasure), initTAddress(config.apiBindAddress , config.apiPort), bufferSize = (1024 * 64), maxRequestBodySize = int.high) diff --git a/codex/loopmeasure.nim b/codex/loopmeasure.nim new file mode 100644 index 00000000..c430e815 --- /dev/null +++ b/codex/loopmeasure.nim @@ -0,0 +1,62 @@ +import pkg/chronicles + +import std/monotimes + +type + LoopMeasure* = ref object of RootObj + minUs: int64 + maxUs: int64 + avgCount: int64 + avgUs: int64 + current: int64 + isArmed: bool + +proc new*(T: type LoopMeasure): LoopMeasure = + LoopMeasure( + minUs: 1_000_000_000, + maxUs: 0, + avgCount: 0, + avgUs: 0, + current: 0, + isArmed: false + ) + +proc loopArm*(loop: LoopMeasure) = + loop.minUs = 1_000_000_000 + loop.maxUs = 0 + loop.avgCount = 0 + loop.avgUs = 0 + loop.current = 0 + loop.isArmed = true + +proc loopDisarm*(loop: LoopMeasure, name: string) = + loop.isArmed = false + trace "LoopMeasure", name, min=loop.minUs, max=loop.maxUs, avg=loop.avgUs, count=loop.avgCount + + if loop.avgUs > 100_000: + error "LoopMeasure: suspiciously high average" + + if loop.maxUs > 250_000: + error "LoopMeasure upper threshold breached" + raiseAssert "AAA" + +proc startMeasure*(loop: LoopMeasure) = + loop.current = getMonoTime().ticks + +proc stopMeasure*(loop: LoopMeasure) = + if not loop.isArmed: + return + + if loop.current == 0: + return + + let durationNs = (getMonoTime().ticks - loop.current).int64 # < nanoseconds? + let durationUs = (durationNs div 1_000).int64 # microseconds? + + loop.avgUs = ((loop.avgUs * loop.avgCount) + durationUs) div (loop.avgCount + 1) + inc loop.avgCount + + if durationUs > loop.maxUs: + loop.maxUs = durationUs + if durationUs < loop.minUs: + loop.minUs = durationUs diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 51a14749..84edfd3f 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -35,6 +35,7 @@ import ../blocktype import ../conf import ../contracts import ../streams +import ../loopmeasure import ./coders import ./json @@ -89,7 +90,7 @@ proc formatPeerRecord(peerRecord: PeerRecord): JsonNode = } return jobj -proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = +proc initRestApi*(node: CodexNodeRef, conf: CodexConf, loopMeasure: LoopMeasure): RestRouter = var router = RestRouter.init(validate) router.api( MethodGet, @@ -131,11 +132,13 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = MethodGet, "/api/codex/v1/download/{id}") do ( id: Cid, resp: HttpResponseRef) -> RestApiResponse: + loopMeasure.loopArm() ## Download a file from the node in a streaming ## manner ## if id.isErr: + loopMeasure.loopDisarm("download error") return RestApiResponse.error( Http400, $id.error()) @@ -146,6 +149,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = var bytes = 0 try: without stream =? (await node.retrieve(id.get())), error: + loopMeasure.loopDisarm("download error") return RestApiResponse.error(Http404, error.msg) resp.addHeader("Content-Type", "application/octet-stream") @@ -166,11 +170,13 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = await resp.finish() except CatchableError as exc: trace "Excepting streaming blocks", exc = exc.msg + loopMeasure.loopDisarm("download error") return RestApiResponse.error(Http500) finally: trace "Sent bytes", cid = id.get(), bytes if not stream.isNil: await stream.close() + loopMeasure.loopDisarm("download finished") router.rawApi( MethodPost, @@ -215,12 +221,14 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = MethodPost, "/api/codex/v1/upload") do ( ) -> RestApiResponse: + loopMeasure.loopArm() ## Upload a file in a streaming manner ## trace "Handling file upload" var bodyReader = request.getBodyReader() if bodyReader.isErr(): + loopMeasure.loopDisarm("upload error") return RestApiResponse.error(Http500) # Attempt to handle `Expect` header @@ -236,18 +244,23 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = without cid =? ( await node.store(AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)))), error: trace "Error uploading file", exc = error.msg + loopMeasure.loopDisarm("upload error") return RestApiResponse.error(Http500, error.msg) trace "Uploaded file", cid + loopMeasure.loopDisarm("upload finished") return RestApiResponse.response($cid) except CancelledError: + loopMeasure.loopDisarm("upload error") return RestApiResponse.error(Http500) except AsyncStreamError: + loopMeasure.loopDisarm("upload error") return RestApiResponse.error(Http500) finally: await reader.closeWait() # if we got here something went wrong? + loopMeasure.loopDisarm("upload error") return RestApiResponse.error(Http500) router.api( @@ -275,6 +288,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = router.api( MethodGet, "/api/codex/v1/debug/info") do () -> RestApiResponse: + loopMeasure.loopArm() ## Print rudimentary node information ## @@ -295,6 +309,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = } } + loopMeasure.loopDisarm("debug/info") return RestApiResponse.response($json, contentType="application/json") router.api(