Switch to async process I/O

This commit is contained in:
Zahary Karadjov 2020-06-08 22:44:08 +03:00
parent f0451ca847
commit c487352586
No known key found for this signature in database
GPG Key ID: C8936F8A3073D609
5 changed files with 83 additions and 72 deletions

View File

@ -1,6 +1,9 @@
import
macros, os, chronicles, strformat
const
nim_compiler_path {.strdefine.} = "nim"
type
InstantiationInfo = tuple[filename: string, line: int, column: int]
@ -10,7 +13,7 @@ macro compileJsImpl(callerInfo: static InstantiationInfo,
var
fullSourceFile = callerInfo.filename.splitFile.dir / sourceFile
targetFile = fullSourceFile & ".js"
nimCmd = &"nim {nimOptions} js -o:{targetFile} {fullSourceFile}"
nimCmd = &"{nim_compiler_path} {nimOptions} js -o:{targetFile} {fullSourceFile}"
let (output, status) = gorgeEx nimCmd
if status != 0: error &"Compiling {sourceFile} failed:\n" & output

135
ctail.nim
View File

@ -1,7 +1,8 @@
import
tables, macros, parseopt, strutils, sequtils, unicode, algorithm, json,
re, terminal, os, osproc, streams, threadpool, parsesql, asyncdispatch,
browsers, prompt, chronicles, chronicles/topics_registry,
re, terminal, os, streams, threadpool, parsesql, asyncdispatch, browsers,
asynctools/asyncproc, faststreams/asynctools_adapters, faststreams/textio,
prompt, chronicles, chronicles/topics_registry,
pipes, webui/server
type
@ -50,10 +51,6 @@ var
activeTopics = ""
webuiPort = -1
template postToMainLoop(msg: string) =
{.gcsafe.}:
mainLoopPipe.writePipeFrame(msg)
template jsKind(value): JsonNodeKind =
when value is string: JString
elif value is int: JInt
@ -446,21 +443,14 @@ proc handleCommand(line: string) =
let s = setTopicState(topic, Required)
assert s
var process = startProcess(command = program & " " & commandLine,
workingDir = getCurrentDir(),
options = {poUsePath, poEvalCommand})
var tailedPrograms: seq[AsyncProcess]
proc quitProc(){.noconv.} =
for p in tailedPrograms:
terminate(p)
proc quitProc(){.noconv.} = terminate(process)
system.addQuitProc(quitProc)
# Transform json input into TextBlockRecord/TextLineRecord or JsonRecord
proc processTailingThread(process: Process) =
for line in outputStream(process).lines:
var line = line & " L"
postToMainLoop line
spawn processTailingThread(process)
proc checkType(j: JsonNode, key: string, kind: JsonNodeKind): bool =
j.hasKey(key) and j[key].kind == kind
@ -480,8 +470,8 @@ proc inputThread =
try:
while true:
var line = pAddr[].readLine()
line.add " C"
postToMainLoop line
{.gcsafe.}:
mainLoopPipe.writePipeFrame(line)
except:
let ex = getCurrentException()
# print ex.getStackTrace
@ -490,6 +480,12 @@ proc inputThread =
spawn inputThread()
proc handleCommandsLoop {.async.} =
while true:
var cmd = await mainLoopPipe.readPipeFrame()
handleCommand cmd
setStatus()
proc extractAttr[T](j: JsonNode, key: string, default: T): T =
if j.hasKey(key):
var val = j[key]
@ -503,60 +499,65 @@ proc extractAttr[T](j: JsonNode, key: string, default: T): T =
return default
proc programTailingLoop(server: WebuiServer,
program, commandLine: string) {.async.} =
var process = try:
startProcess(command = program & " " & commandLine,
workingDir = getCurrentDir(),
options = {poUsePath, poEvalCommand})
except CatchableError as err:
echo "Failed to start process"
echo err.msg
quit 1
proc mainLoop {.async.} =
let input = asyncPipeInput(process.outputHandle)
while input.readable:
let logLine = await input.readLine()
if matchRE(logLine, regex):
var j = try: parseJson(logLine)
except CatchableError:
print(logLine)
continue
if not (j.kind == JObject):
print(logLine)
continue
let msg = j.extractAttr("msg", "")
if msg == "$chronicles":
if j.checkType("cmd", JString):
case j["cmd"].str
of "loadPlugin":
if server != nil and j.checkType("pluginSrc", JString):
server.plugins.add j["pluginSrc"].str
else:
discard
continue
if server != nil:
server.broadcastLine(logLine)
var
level = parseLogLevel j.extractAttr("level", "")
topics = j.extractAttr("topics", "")
topicStates = map(topics.split(Whitespace + {',', ';'}), createTopicState)
discard j.extractAttr("ts", "")
if topicsMatch(level, topicStates) and matches(filter, j, false):
activeRecordPrinter(level, msg, topics, j)
proc main {.async.} =
var server: WebuiServer
if webuiPort != -1:
server = newServer(webuiPort)
asyncCheck server.serve()
# openDefaultBrowser("http://localhost:" & $webuiPort)
while true:
var logLine = await mainLoopPipe.readPipeFrame()
let msgType = logLine[^1]
logLine.setLen(logLine.len - 2)
asyncCheck handleCommandsLoop()
case msgType
of 'C':
handleCommand(logLine)
setStatus()
of 'L':
if matchRE(logLine, regex):
var j: JsonNode
try:
j = parseJson(logLine)
except:
print(logLine)
continue
await programTailingLoop(server, program, commandLine)
if not (j.kind == JObject):
print(logLine)
continue
waitFor main()
let msg = j.extractAttr("msg", "")
if msg == "$chronicles":
if j.checkType("cmd", JString):
case j["cmd"].str
of "loadPlugin":
if server != nil and j.checkType("pluginSrc", JString):
server.plugins.add j["pluginSrc"].str
else:
discard
continue
if server != nil:
server.broadcastLine(logLine)
var
level = parseLogLevel j.extractAttr("level", "")
topics = j.extractAttr("topics", "")
topicStates = map(topics.split(Whitespace + {',', ';'}), createTopicState)
discard j.extractAttr("ts", "")
if topicsMatch(level, topicStates) and matches(filter, j, false):
activeRecordPrinter(level, msg, topics, j)
else:
discard
waitFor mainLoop()

View File

@ -1 +1,3 @@
--threads:on
--define:"faststreams_async_backend=std"

View File

@ -1,5 +1,5 @@
import
os, asyncdispatch, asynctools/asyncpipe, ranges/ptr_arith
os, asyncdispatch, asynctools/asyncpipe
export
asyncpipe
@ -20,8 +20,9 @@ else:
proc writePipeFrame*(p: AsyncPipe, data: string) =
var dataLen = data.len
p.writeToPipe(addr(dataLen), sizeof(dataLen))
p.writeToPipe(data.baseAddr, data.len)
if dataLen > 0:
p.writeToPipe(addr(dataLen), sizeof(dataLen))
p.writeToPipe(unsafeAddr data[0], data.len)
proc readPipeFrame*(p: AsyncPipe): Future[string] {.async.} =
var frameSize: int
@ -31,7 +32,7 @@ proc readPipeFrame*(p: AsyncPipe): Future[string] {.async.} =
result = newString(frameSize)
bytesRead = await p.readInto(result.baseAddr, frameSize)
bytesRead = await p.readInto(addr result[0], frameSize)
if bytesRead != frameSize:
raiseOsError(osLastError())

4
webui/karax_app.nim.cfg Normal file
View File

@ -0,0 +1,4 @@
--threads:off
--stacktrace:off
--linetrace:off