nim-task-runner/task_runner/achannels.nim

259 lines
7.2 KiB
Nim

# Task Runner multi-threading asynchronous channels
# adapted from
# Chronos multi-threading asynchronous channels
# (github.com/status-im/nim-chronos/pull/45)
#
# (c) Copyright 2018-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import strutils
import chronos/[handles, transport]
import ./asyncloop, ./asyncsync
when hasThreadSupport:
import locks
type
RawAsyncChannelImpl {.pure, final.} = object
rd, wr, count, mask: int
maxItems: int
refCount: int
data: ptr UncheckedArray[byte]
when hasThreadSupport:
lock: Lock
eventNotEmpty: AsyncThreadEvent
eventNotFull: AsyncThreadEvent
RawAsyncChannel = ptr RawAsyncChannelImpl
AsyncChannel*[Msg] = RawAsyncChannel
AsyncChannelError* = object of CatchableError
proc initLocks(rchan: RawAsyncChannel) {.inline.} =
## Initialize and create OS locks.
when hasThreadSupport:
initLock(rchan.lock)
else:
discard
rchan.eventNotEmpty = newAsyncThreadEvent()
if rchan.maxItems > 0:
rchan.eventNotFull = newAsyncThreadEvent()
proc deinitLocks(rchan: RawAsyncChannel) {.inline.} =
## Deinitialize and close OS locks.
when hasThreadSupport:
deinitLock(rchan.lock)
else:
discard
close(rchan.eventNotEmpty)
if rchan.maxItems > 0:
close(rchan.eventNotFull)
proc acquireLock(rchan: RawAsyncChannel) {.inline.} =
## Acquire lock in multi-threaded mode and do nothing in
## single-threaded mode.
when hasThreadSupport:
acquire(rchan.lock)
else:
discard
proc releaseLock(rchan: RawAsyncChannel) {.inline.} =
## Release lock in multi-threaded mode and do nothing in
## single-threaded mode.
when hasThreadSupport:
release(rchan.lock)
else:
discard
proc newAsyncChannel*[Msg](maxItems: int = -1): AsyncChannel[Msg] =
## Create new AsyncChannel[Msg] with internal queue size ``maxItems``.
##
## If ``maxItems <= 0`` (default value), then queue size is unlimited.
let loop = getThreadDispatcher()
result = cast[AsyncChannel[Msg]](allocShared(sizeof(RawAsyncChannelImpl)))
result.mask = -1
if maxItems <= 0:
result.maxItems = -1
else:
result.maxItems = maxItems
result.count = 0
result.wr = 0
result.rd = 0
result.refCount = 0
result.data = cast[ptr UncheckedArray[byte]](0)
result.initLocks()
proc raiseChannelClosed() {.inline.} =
var err = newException(AsyncChannelError, "Channel closed or not opened")
raise err
proc raiseChannelFailed() {.inline.} =
var err = newException(AsyncChannelError, "Channel synchronization failed")
raise err
proc open*[Msg](chan: AsyncChannel[Msg]) =
## Open channel ``chan``.
let loop = getThreadDispatcher()
chan.acquireLock()
inc(chan.refCount)
chan.releaseLock()
proc close*[Msg](chan: AsyncChannel[Msg]) =
## Close channel ``chan``.
chan.acquireLock()
if chan.refCount == 0:
if not(isNil(chan.data)):
deallocShared(cast[pointer](chan.data))
chan.deinitLocks()
deallocShared(cast[pointer](chan))
else:
dec(chan.refCount)
chan.releaseLock()
proc `$`*[Msg](chan: AsyncChannel[Msg]): string =
## Dump channel ``chan`` debugging information as string.
chan.acquireLock()
result = "channel 0x" & toHex(cast[uint](chan)) & " ("
result.add("eventNotEmpty = 0x" & toHex(cast[uint](chan.eventNotEmpty)))
result.add(", eventNotFull = 0x" & toHex(cast[uint](chan.eventNotFull)))
result.add(", rd = " & $chan.rd)
result.add(", wr = " & $chan.wr)
result.add(", count = " & $chan.count)
result.add(", mask = 0x" & toHex(chan.mask))
result.add(", data = 0x" & toHex(cast[uint](chan.data)))
result.add(", maxItems = " & $chan.maxItems)
result.add(", refCount = " & $chan.refCount)
result.add(")")
chan.releaseLock()
proc rawSend(rchan: RawAsyncChannel, pbytes: pointer, nbytes: int) =
var cap = rchan.mask + 1
if rchan.count >= cap:
if cap == 0: cap = 1
var n = cast[ptr UncheckedArray[byte]](allocShared0(cap * 2 * nbytes))
var z = 0
var i = rchan.rd
var c = rchan.count
while c > 0:
dec(c)
copyMem(addr(n[z * nbytes]), addr(rchan.data[i * nbytes]), nbytes)
i = (i + 1) and rchan.mask
inc(z)
if not isNil(rchan.data):
deallocShared(rchan.data)
rchan.data = n
rchan.mask = (cap * 2) - 1
rchan.wr = rchan.count
rchan.rd = 0
copyMem(addr(rchan.data[rchan.wr * nbytes]), pbytes, nbytes)
inc(rchan.count)
rchan.wr = (rchan.wr + 1) and rchan.mask
proc send*[Msg](chan: AsyncChannel[Msg], msg: Msg) {.async.} =
## Send message ``msg`` over channel ``chan``. This procedure will wait if
## internal channel queue is full.
chan.acquireLock()
try:
if chan.refCount == 0:
raiseChannelClosed()
if chan.maxItems > 0:
# Wait until count is less then `maxItems`.
while chan.count >= chan.maxItems:
chan.releaseLock()
let res = await chan.eventNotFull.wait(InfiniteDuration)
chan.acquireLock()
if res == WaitFailed:
raiseChannelFailed()
rawSend(chan, unsafeAddr msg, sizeof(Msg))
chan.eventNotEmpty.fire()
finally:
chan.releaseLock()
proc sendSync*[Msg](chan: AsyncChannel[Msg], msg: Msg) =
## Immediately send message ``msg`` over channel ``chan``. This procedure will
## block until internal channel's queue is full.
chan.acquireLock()
try:
if chan.refCount == 0:
raiseChannelClosed()
if chan.maxItems > 0:
# Wait until count is less then `maxItems`.
while chan.count >= chan.maxItems:
chan.releaseLock()
let res = chan.eventNotFull.waitSync(InfiniteDuration)
chan.acquireLock()
if res == WaitFailed:
raiseChannelFailed()
rawSend(chan, unsafeAddr msg, sizeof(Msg))
chan.eventNotEmpty.fire()
finally:
chan.releaseLock()
proc rawRecv(rchan: RawAsyncChannel, pbytes: pointer, nbytes: int) {.inline.} =
doAssert(rchan.count > 0)
dec(rchan.count)
copyMem(pbytes, addr rchan.data[rchan.rd * nbytes], nbytes)
rchan.rd = (rchan.rd + 1) and rchan.mask
proc recv*[Msg](chan: AsyncChannel[Msg]): Future[Msg] {.async.} =
## Wait for message ``Msg`` in channel ``chan`` asynchronously and receive
## it when it become available.
var rmsg: Msg
chan.acquireLock()
try:
if chan.refCount == 0:
raiseChannelClosed()
while chan.count <= 0:
chan.releaseLock()
let res = await chan.eventNotEmpty.wait(InfiniteDuration)
chan.acquireLock()
if res == WaitFailed:
raiseChannelFailed()
rawRecv(chan, addr rmsg, sizeof(Msg))
result = rmsg
if chan.maxItems > 0:
chan.eventNotFull.fire()
finally:
chan.releaseLock()
proc recvSync*[Msg](chan: AsyncChannel[Msg]): Msg =
## Blocking receive message ``Msg`` from channel ``chan``.
chan.acquireLock()
try:
if chan.refCount == 0:
raiseChannelClosed()
while chan.count <= 0:
chan.releaseLock()
let res = chan.eventNotEmpty.waitSync(InfiniteDuration)
chan.acquireLock()
if res == WaitFailed:
raiseChannelFailed()
rawRecv(chan, addr result, sizeof(Msg))
if chan.maxItems > 0:
chan.eventNotFull.fire()
finally:
chan.releaseLock()