sets up loopmeasure for upload and download

This commit is contained in:
benbierens 2023-07-03 11:51:38 +02:00
parent 67b54592e4
commit d70f8ee79c
No known key found for this signature in database
GPG Key ID: FE44815D96D0A1AA
4 changed files with 95 additions and 14 deletions

View File

@ -23,6 +23,7 @@ import ./codex/conf
import ./codex/codex import ./codex/codex
import ./codex/units import ./codex/units
import ./codex/utils/keyutils import ./codex/utils/keyutils
import ./codex/loopmeasure
export codex, conf, libp2p, chronos, chronicles export codex, conf, libp2p, chronos, chronicles
@ -90,7 +91,8 @@ when isMainModule:
config.dataDir / config.netPrivKeyFile config.dataDir / config.netPrivKeyFile
privateKey = setupKey(keyPath).expect("Should setup private key!") privateKey = setupKey(keyPath).expect("Should setup private key!")
server = CodexServer.new(config, privateKey) loopMeasure = LoopMeasure.new()
server = CodexServer.new(config, privateKey, loopMeasure)
## Ctrl+C handling ## Ctrl+C handling
proc controlCHandler() {.noconv.} = proc controlCHandler() {.noconv.} =
@ -101,7 +103,7 @@ when isMainModule:
except Exception as exc: raiseAssert exc.msg # shouldn't happen except Exception as exc: raiseAssert exc.msg # shouldn't happen
notice "Shutting down after having received SIGINT" notice "Shutting down after having received SIGINT"
# pendingFuts.add(server.stop()) pendingFuts.add(server.stop())
state = CodexStatus.Stopping state = CodexStatus.Stopping
notice "Stopping Codex" notice "Stopping Codex"
@ -116,7 +118,7 @@ when isMainModule:
proc SIGTERMHandler(signal: cint) {.noconv.} = proc SIGTERMHandler(signal: cint) {.noconv.} =
notice "Shutting down after having received SIGTERM" notice "Shutting down after having received SIGTERM"
# pendingFuts.add(server.stop()) pendingFuts.add(server.stop())
state = CodexStatus.Stopping state = CodexStatus.Stopping
notice "Stopping Codex" notice "Stopping Codex"
@ -128,15 +130,17 @@ when isMainModule:
state = CodexStatus.Running state = CodexStatus.Running
while state == CodexStatus.Running: while state == CodexStatus.Running:
# poll chronos # poll chronos
loopMeasure.startMeasure()
chronos.poll() chronos.poll()
loopMeasure.stopMeasure()
# wait fot futures to finish # wait fot futures to finish
# let res = waitFor allFinished(pendingFuts) let res = waitFor allFinished(pendingFuts)
state = CodexStatus.Stopped state = CodexStatus.Stopped
# if res.anyIt( it.failed ): if res.anyIt( it.failed ):
# error "Codex didn't shutdown correctly" error "Codex didn't shutdown correctly"
# quit QuitFailure quit QuitFailure
notice "Exited codex" notice "Exited codex"

View File

@ -22,8 +22,6 @@ import pkg/stew/io2
import pkg/stew/shims/net as stewnet import pkg/stew/shims/net as stewnet
import pkg/datastore import pkg/datastore
import std/nimprof
import ./node import ./node
import ./conf import ./conf
import ./rng import ./rng
@ -38,6 +36,7 @@ import ./contracts/clock
import ./contracts/deployment import ./contracts/deployment
import ./utils/addrutils import ./utils/addrutils
import ./namespaces import ./namespaces
import ./loopmeasure
logScope: logScope:
topics = "codex node" topics = "codex node"
@ -158,7 +157,8 @@ proc stop*(s: CodexServer) {.async.} =
proc new*( proc new*(
T: type CodexServer, T: type CodexServer,
config: CodexConf, config: CodexConf,
privateKey: CodexPrivateKey privateKey: CodexPrivateKey,
loopMeasure: LoopMeasure
): CodexServer = ): CodexServer =
## create CodexServer including setting up datastore, repostore, etc ## create CodexServer including setting up datastore, repostore, etc
let let
@ -232,7 +232,7 @@ proc new*(
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery) codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery)
restServer = RestServerRef.new( restServer = RestServerRef.new(
codexNode.initRestApi(config), codexNode.initRestApi(config, loopMeasure),
initTAddress(config.apiBindAddress , config.apiPort), initTAddress(config.apiBindAddress , config.apiPort),
bufferSize = (1024 * 64), bufferSize = (1024 * 64),
maxRequestBodySize = int.high) maxRequestBodySize = int.high)

62
codex/loopmeasure.nim Normal file
View File

@ -0,0 +1,62 @@
import pkg/chronicles
import std/monotimes
type
LoopMeasure* = ref object of RootObj
minUs: int64
maxUs: int64
avgCount: int64
avgUs: int64
current: int64
isArmed: bool
proc new*(T: type LoopMeasure): LoopMeasure =
LoopMeasure(
minUs: 1_000_000_000,
maxUs: 0,
avgCount: 0,
avgUs: 0,
current: 0,
isArmed: false
)
proc loopArm*(loop: LoopMeasure) =
loop.minUs = 1_000_000_000
loop.maxUs = 0
loop.avgCount = 0
loop.avgUs = 0
loop.current = 0
loop.isArmed = true
proc loopDisarm*(loop: LoopMeasure, name: string) =
loop.isArmed = false
trace "LoopMeasure", name, min=loop.minUs, max=loop.maxUs, avg=loop.avgUs, count=loop.avgCount
if loop.avgUs > 100_000:
error "LoopMeasure: suspiciously high average"
if loop.maxUs > 250_000:
error "LoopMeasure upper threshold breached"
raiseAssert "AAA"
proc startMeasure*(loop: LoopMeasure) =
loop.current = getMonoTime().ticks
proc stopMeasure*(loop: LoopMeasure) =
if not loop.isArmed:
return
if loop.current == 0:
return
let durationNs = (getMonoTime().ticks - loop.current).int64 # < nanoseconds?
let durationUs = (durationNs div 1_000).int64 # microseconds?
loop.avgUs = ((loop.avgUs * loop.avgCount) + durationUs) div (loop.avgCount + 1)
inc loop.avgCount
if durationUs > loop.maxUs:
loop.maxUs = durationUs
if durationUs < loop.minUs:
loop.minUs = durationUs

View File

@ -35,6 +35,7 @@ import ../blocktype
import ../conf import ../conf
import ../contracts import ../contracts
import ../streams import ../streams
import ../loopmeasure
import ./coders import ./coders
import ./json import ./json
@ -89,7 +90,7 @@ proc formatPeerRecord(peerRecord: PeerRecord): JsonNode =
} }
return jobj return jobj
proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = proc initRestApi*(node: CodexNodeRef, conf: CodexConf, loopMeasure: LoopMeasure): RestRouter =
var router = RestRouter.init(validate) var router = RestRouter.init(validate)
router.api( router.api(
MethodGet, MethodGet,
@ -131,11 +132,13 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
MethodGet, MethodGet,
"/api/codex/v1/download/{id}") do ( "/api/codex/v1/download/{id}") do (
id: Cid, resp: HttpResponseRef) -> RestApiResponse: id: Cid, resp: HttpResponseRef) -> RestApiResponse:
loopMeasure.loopArm()
## Download a file from the node in a streaming ## Download a file from the node in a streaming
## manner ## manner
## ##
if id.isErr: if id.isErr:
loopMeasure.loopDisarm("download error")
return RestApiResponse.error( return RestApiResponse.error(
Http400, Http400,
$id.error()) $id.error())
@ -146,6 +149,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
var bytes = 0 var bytes = 0
try: try:
without stream =? (await node.retrieve(id.get())), error: without stream =? (await node.retrieve(id.get())), error:
loopMeasure.loopDisarm("download error")
return RestApiResponse.error(Http404, error.msg) return RestApiResponse.error(Http404, error.msg)
resp.addHeader("Content-Type", "application/octet-stream") resp.addHeader("Content-Type", "application/octet-stream")
@ -166,11 +170,13 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
await resp.finish() await resp.finish()
except CatchableError as exc: except CatchableError as exc:
trace "Excepting streaming blocks", exc = exc.msg trace "Excepting streaming blocks", exc = exc.msg
loopMeasure.loopDisarm("download error")
return RestApiResponse.error(Http500) return RestApiResponse.error(Http500)
finally: finally:
trace "Sent bytes", cid = id.get(), bytes trace "Sent bytes", cid = id.get(), bytes
if not stream.isNil: if not stream.isNil:
await stream.close() await stream.close()
loopMeasure.loopDisarm("download finished")
router.rawApi( router.rawApi(
MethodPost, MethodPost,
@ -215,12 +221,14 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
MethodPost, MethodPost,
"/api/codex/v1/upload") do ( "/api/codex/v1/upload") do (
) -> RestApiResponse: ) -> RestApiResponse:
loopMeasure.loopArm()
## Upload a file in a streaming manner ## Upload a file in a streaming manner
## ##
trace "Handling file upload" trace "Handling file upload"
var bodyReader = request.getBodyReader() var bodyReader = request.getBodyReader()
if bodyReader.isErr(): if bodyReader.isErr():
loopMeasure.loopDisarm("upload error")
return RestApiResponse.error(Http500) return RestApiResponse.error(Http500)
# Attempt to handle `Expect` header # Attempt to handle `Expect` header
@ -236,18 +244,23 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
without cid =? ( without cid =? (
await node.store(AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)))), error: await node.store(AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)))), error:
trace "Error uploading file", exc = error.msg trace "Error uploading file", exc = error.msg
loopMeasure.loopDisarm("upload error")
return RestApiResponse.error(Http500, error.msg) return RestApiResponse.error(Http500, error.msg)
trace "Uploaded file", cid trace "Uploaded file", cid
loopMeasure.loopDisarm("upload finished")
return RestApiResponse.response($cid) return RestApiResponse.response($cid)
except CancelledError: except CancelledError:
loopMeasure.loopDisarm("upload error")
return RestApiResponse.error(Http500) return RestApiResponse.error(Http500)
except AsyncStreamError: except AsyncStreamError:
loopMeasure.loopDisarm("upload error")
return RestApiResponse.error(Http500) return RestApiResponse.error(Http500)
finally: finally:
await reader.closeWait() await reader.closeWait()
# if we got here something went wrong? # if we got here something went wrong?
loopMeasure.loopDisarm("upload error")
return RestApiResponse.error(Http500) return RestApiResponse.error(Http500)
router.api( router.api(
@ -275,6 +288,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
router.api( router.api(
MethodGet, MethodGet,
"/api/codex/v1/debug/info") do () -> RestApiResponse: "/api/codex/v1/debug/info") do () -> RestApiResponse:
loopMeasure.loopArm()
## Print rudimentary node information ## Print rudimentary node information
## ##
@ -295,6 +309,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
} }
} }
loopMeasure.loopDisarm("debug/info")
return RestApiResponse.response($json, contentType="application/json") return RestApiResponse.response($json, contentType="application/json")
router.api( router.api(