chore: extract asynchronous channels code from status-im/nim-chronos PR #45

Involves some adjustments necessary for the code to compile with the version of
Nim compiler (1.2.6) provided by latest status-im/nimbus-build-system and to be
compatible with latest status-im/nim-chronos.

Does not include "transport" code added with respect to asynchronous channels
introduced in https://github.com/status-im/nim-chronos/pull/45. That code will
likely be added to this repo in the future.

Note that at present vendor/nim-chronos points to a GitHub fork of
status-im/nim-chronos as two small changes were needed in
vendor/nim-chronos. See: https://github.com/status-im/nim-chronos/pull/154.
This commit is contained in:
Michael Bradley, Jr 2021-01-14 13:51:18 -06:00 committed by Michael Bradley
parent 8753fdd53a
commit b2442ecba8
21 changed files with 1370 additions and 36 deletions

View File

@ -37,6 +37,11 @@ jobs:
steps:
- name: Install Valgrind via APT
if: matrix.platform.os == 'ubuntu'
run: |
sudo apt install -y valgrind
- name: Install awk (gawk) and coreutils via Homebrew
if: matrix.platform.os == 'macos'
run: |
@ -75,3 +80,8 @@ jobs:
- name: Build and run tests
run: |
make -j${NPROC} NIMFLAGS="--parallelBuild:${NPROC}" V=1 test
- name: Build and run channel_helgrind
if: matrix.platform.os == 'ubuntu'
run: |
./env.sh nimble channel_helgrind

6
.gitmodules vendored
View File

@ -3,12 +3,12 @@
url = https://github.com/status-im/nimbus-build-system.git
branch = master
ignore = dirty
[submodule "vendor/nim-chronos"]
path = vendor/nim-chronos
url = https://github.com/status-im/nim-chronos.git
[submodule "vendor/nim-stew"]
path = vendor/nim-stew
url = https://github.com/status-im/nim-stew.git
[submodule "vendor/nim-bearssl"]
path = vendor/nim-bearssl
url = https://github.com/status-im/nim-bearssl.git
[submodule "vendor/nim-chronos"]
path = vendor/nim-chronos
url = https://github.com/michaelsbradleyjr/nim-chronos.git

201
LICENSE-APACHEv2 Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2018-Present Status Research & Development GmbH
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1,6 +1,6 @@
MIT License
The MIT License (MIT)
Copyright (c) 2020 Status Research & Development GmbH
Copyright (c) 2020-Present Status Research & Development GmbH
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -1,6 +1,20 @@
# nim-task-runner
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![License: Apache-2.0 or MIT](https://img.shields.io/badge/License-Apache%E2%80%932.0%20or%20MIT-blue)](https://github.com/status-im/nim-task-runner#license)
![Stability: experimental](https://img.shields.io/badge/Stability-experimental-orange.svg)
[![Tests (GitHub Actions)](https://github.com/status-im/nim-task-runner/workflows/Tests/badge.svg?branch=master)](https://github.com/status-im/nim-task-runner/actions?query=workflow%3ATests+branch%3Amaster)
General purpose background task runner for Nim programs
## License
Licensed and distributed under either of
* MIT license: [LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT
or
* Apache License, Version 2.0, ([LICENSE-APACHEv2](LICENSE-APACHEv2) or
http://www.apache.org/licenses/LICENSE-2.0)
at your option. These files may not be copied, modified, or distributed except
according to those terms.

View File

@ -1,2 +1,14 @@
proc foo*(): string =
"bar"
# Task Runner
# adapted in parts from
# Chronos
# (github.com/status-im/nim-chronos/pull/45)
#
# (c) Copyright 2018-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import ./task_runner/achannels
export achannels

View File

@ -3,14 +3,12 @@ mode = ScriptMode.Verbose
version = "0.1.0"
author = "Status Research & Development GmbH"
description = "General purpose background task runner for Nim programs"
license = "MIT"
license = "Apache License 2.0 or MIT"
skipDirs = @["test"]
requires "nim >= 1.2.0",
"chronos"
import strutils
proc buildAndRunTest(name: string,
srcDir = "test/",
outDir = "test/build/",
@ -28,8 +26,10 @@ proc buildAndRunTest(name: string,
" --debugger:native" &
" --define:chronicles_line_numbers" &
" --define:debug" &
" --linetrace:on" &
" --nimcache:nimcache/test/" & name &
" --out:" & outDir & name &
" --stacktrace:on" &
" --threads:on" &
" --tlsEmulation:off" &
" " &
@ -41,4 +41,22 @@ proc buildAndRunTest(name: string,
exec outDir & name
task tests, "Run all tests":
buildAndRunTest "all_tests"
buildAndRunTest "test_all"
task channel_helgrind, "Run channel implementation through helgrind to detect threading or lock errors":
rmDir "test/build/"
mkDir "test/build/"
var commands = [
"nim c" &
" --define:useMalloc" &
" --nimcache:nimcache/test/channel_helgrind" &
" --out:test/build/test_achannels" &
" --threads:on" &
" --tlsEmulation:off" &
" test/test_achannels.nim",
"valgrind --tool=helgrind test/build/test_achannels"
]
echo "\n" & commands[0]
exec commands[0]
echo "\n" & commands[1]
exec commands[1]

View File

258
task_runner/achannels.nim Normal file
View File

@ -0,0 +1,258 @@
# Task Runner multi-threading asynchronous channels
# adapted from
# Chronos multi-threading asynchronous channels
# (github.com/status-im/nim-chronos/pull/45)
#
# (c) Copyright 2018-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import strutils
import chronos/[handles, transport]
import ./asyncloop, ./asyncsync
when hasThreadSupport:
import locks
type
RawAsyncChannelImpl {.pure, final.} = object
rd, wr, count, mask: int
maxItems: int
refCount: int
data: ptr UncheckedArray[byte]
when hasThreadSupport:
lock: Lock
eventNotEmpty: AsyncThreadEvent
eventNotFull: AsyncThreadEvent
RawAsyncChannel = ptr RawAsyncChannelImpl
AsyncChannel*[Msg] = RawAsyncChannel
AsyncChannelError* = object of CatchableError
proc initLocks(rchan: RawAsyncChannel) {.inline.} =
## Initialize and create OS locks.
when hasThreadSupport:
initLock(rchan.lock)
else:
discard
rchan.eventNotEmpty = newAsyncThreadEvent()
if rchan.maxItems > 0:
rchan.eventNotFull = newAsyncThreadEvent()
proc deinitLocks(rchan: RawAsyncChannel) {.inline.} =
## Deinitialize and close OS locks.
when hasThreadSupport:
deinitLock(rchan.lock)
else:
discard
close(rchan.eventNotEmpty)
if rchan.maxItems > 0:
close(rchan.eventNotFull)
proc acquireLock(rchan: RawAsyncChannel) {.inline.} =
## Acquire lock in multi-threaded mode and do nothing in
## single-threaded mode.
when hasThreadSupport:
acquire(rchan.lock)
else:
discard
proc releaseLock(rchan: RawAsyncChannel) {.inline.} =
## Release lock in multi-threaded mode and do nothing in
## single-threaded mode.
when hasThreadSupport:
release(rchan.lock)
else:
discard
proc newAsyncChannel*[Msg](maxItems: int = -1): AsyncChannel[Msg] =
## Create new AsyncChannel[Msg] with internal queue size ``maxItems``.
##
## If ``maxItems <= 0`` (default value), then queue size is unlimited.
let loop = getThreadDispatcher()
result = cast[AsyncChannel[Msg]](allocShared(sizeof(RawAsyncChannelImpl)))
result.mask = -1
if maxItems <= 0:
result.maxItems = -1
else:
result.maxItems = maxItems
result.count = 0
result.wr = 0
result.rd = 0
result.refCount = 0
result.data = cast[ptr UncheckedArray[byte]](0)
result.initLocks()
proc raiseChannelClosed() {.inline.} =
var err = newException(AsyncChannelError, "Channel closed or not opened")
raise err
proc raiseChannelFailed() {.inline.} =
var err = newException(AsyncChannelError, "Channel synchronization failed")
raise err
proc open*[Msg](chan: AsyncChannel[Msg]) =
## Open channel ``chan``.
let loop = getThreadDispatcher()
chan.acquireLock()
inc(chan.refCount)
chan.releaseLock()
proc close*[Msg](chan: AsyncChannel[Msg]) =
## Close channel ``chan``.
chan.acquireLock()
if chan.refCount == 0:
if not(isNil(chan.data)):
deallocShared(cast[pointer](chan.data))
chan.deinitLocks()
deallocShared(cast[pointer](chan))
else:
dec(chan.refCount)
chan.releaseLock()
proc `$`*[Msg](chan: AsyncChannel[Msg]): string =
## Dump channel ``chan`` debugging information as string.
chan.acquireLock()
result = "channel 0x" & toHex(cast[uint](chan)) & " ("
result.add("eventNotEmpty = 0x" & toHex(cast[uint](chan.eventNotEmpty)))
result.add(", eventNotFull = 0x" & toHex(cast[uint](chan.eventNotFull)))
result.add(", rd = " & $chan.rd)
result.add(", wr = " & $chan.wr)
result.add(", count = " & $chan.count)
result.add(", mask = 0x" & toHex(chan.mask))
result.add(", data = 0x" & toHex(cast[uint](chan.data)))
result.add(", maxItems = " & $chan.maxItems)
result.add(", refCount = " & $chan.refCount)
result.add(")")
chan.releaseLock()
proc rawSend(rchan: RawAsyncChannel, pbytes: pointer, nbytes: int) =
var cap = rchan.mask + 1
if rchan.count >= cap:
if cap == 0: cap = 1
var n = cast[ptr UncheckedArray[byte]](allocShared0(cap * 2 * nbytes))
var z = 0
var i = rchan.rd
var c = rchan.count
while c > 0:
dec(c)
copyMem(addr(n[z * nbytes]), addr(rchan.data[i * nbytes]), nbytes)
i = (i + 1) and rchan.mask
inc(z)
if not isNil(rchan.data):
deallocShared(rchan.data)
rchan.data = n
rchan.mask = (cap * 2) - 1
rchan.wr = rchan.count
rchan.rd = 0
copyMem(addr(rchan.data[rchan.wr * nbytes]), pbytes, nbytes)
inc(rchan.count)
rchan.wr = (rchan.wr + 1) and rchan.mask
proc send*[Msg](chan: AsyncChannel[Msg], msg: Msg) {.async.} =
## Send message ``msg`` over channel ``chan``. This procedure will wait if
## internal channel queue is full.
chan.acquireLock()
try:
if chan.refCount == 0:
raiseChannelClosed()
if chan.maxItems > 0:
# Wait until count is less then `maxItems`.
while chan.count >= chan.maxItems:
chan.releaseLock()
let res = await chan.eventNotFull.wait(InfiniteDuration)
chan.acquireLock()
if res == WaitFailed:
raiseChannelFailed()
rawSend(chan, unsafeAddr msg, sizeof(Msg))
chan.eventNotEmpty.fire()
finally:
chan.releaseLock()
proc sendSync*[Msg](chan: AsyncChannel[Msg], msg: Msg) =
## Immediately send message ``msg`` over channel ``chan``. This procedure will
## block until internal channel's queue is full.
chan.acquireLock()
try:
if chan.refCount == 0:
raiseChannelClosed()
if chan.maxItems > 0:
# Wait until count is less then `maxItems`.
while chan.count >= chan.maxItems:
chan.releaseLock()
let res = chan.eventNotFull.waitSync(InfiniteDuration)
chan.acquireLock()
if res == WaitFailed:
raiseChannelFailed()
rawSend(chan, unsafeAddr msg, sizeof(Msg))
chan.eventNotEmpty.fire()
finally:
chan.releaseLock()
proc rawRecv(rchan: RawAsyncChannel, pbytes: pointer, nbytes: int) {.inline.} =
doAssert(rchan.count > 0)
dec(rchan.count)
copyMem(pbytes, addr rchan.data[rchan.rd * nbytes], nbytes)
rchan.rd = (rchan.rd + 1) and rchan.mask
proc recv*[Msg](chan: AsyncChannel[Msg]): Future[Msg] {.async.} =
## Wait for message ``Msg`` in channel ``chan`` asynchronously and receive
## it when it become available.
var rmsg: Msg
chan.acquireLock()
try:
if chan.refCount == 0:
raiseChannelClosed()
while chan.count <= 0:
chan.releaseLock()
let res = await chan.eventNotEmpty.wait(InfiniteDuration)
chan.acquireLock()
if res == WaitFailed:
raiseChannelFailed()
rawRecv(chan, addr rmsg, sizeof(Msg))
result = rmsg
if chan.maxItems > 0:
chan.eventNotFull.fire()
finally:
chan.releaseLock()
proc recvSync*[Msg](chan: AsyncChannel[Msg]): Msg =
## Blocking receive message ``Msg`` from channel ``chan``.
chan.acquireLock()
try:
if chan.refCount == 0:
raiseChannelClosed()
while chan.count <= 0:
chan.releaseLock()
let res = chan.eventNotEmpty.waitSync(InfiniteDuration)
chan.acquireLock()
if res == WaitFailed:
raiseChannelFailed()
rawRecv(chan, addr result, sizeof(Msg))
if chan.maxItems > 0:
chan.eventNotFull.fire()
finally:
chan.releaseLock()

174
task_runner/asyncloop.nim Normal file
View File

@ -0,0 +1,174 @@
# Task Runner
# adapted from
# Chronos
# (github.com/status-im/nim-chronos/pull/45)
#
# (c) Copyright 2015 Dominik Picheta
# (c) Copyright 2018-Present Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import os, selectors
import chronos/asyncloop
export asyncloop
when defined(windows):
import winlean, sets, hashes
when defined(windows) or defined(nimdoc):
type
RwfsoOverlapped* = object of CustomOverlapped
ioPort*: Handle
handle*: Handle
waitFd*: Handle
timerOrWait*: WINBOOL
RefRwfsoOverlapped* = ref RwfsoOverlapped
when defined(windows):
{.push stackTrace:off.}
proc waitCallback(param: pointer,
timerOrWaitFired: WINBOOL): void {.stdcall.} =
var p = cast[RefRwfsoOverlapped](param)
p.timerOrWait = timerOrWaitFired
discard postQueuedCompletionStatus(p.ioPort, DWORD(timerOrWaitFired),
ULONG_PTR(p.handle),
cast[pointer](p))
{.pop.}
proc awaitForSingleObject*(handle: Handle, timeout: Duration): Future[bool] =
## Wait for Windows' waitable handle (handle which can be waited via
## WaitForSingleObject API call) in asynchronous way.
## Procedure returns ``true`` if state of handle ``handle`` become
## signalled, and ``false`` if timeout ``timeout`` was expired before
## handle ``handle`` become signaled.
##
## ``handle`` can be one of the listed types: Change notification,
## Console input, Event, Memory resource notification, Mutex, Process,
## Semaphore, Thread, Waitable timer.
##
## If timeout ``timeout`` is ``ZeroDuration`` procedure will check if
## handle is signalled and return immediately.
var retFuture = newFuture[bool]("chronos.awaitForSingleObject")
var loop = getThreadDispatcher()
var povl: RefRwfsoOverlapped
var flags = DWORD(WT_EXECUTEONLYONCE)
var timems: ULONG
if timeout == ZeroDuration:
let res = waitForSingleObject(handle, 0)
if res == WAIT_TIMEOUT:
retFuture.complete(false)
return retFuture
elif res == WAIT_OBJECT_0:
retFuture.complete(true)
return retFuture
else:
retFuture.fail(newException(AsyncError,
"Mutex object was not released"))
return retFuture
else:
if timeout == InfiniteDuration:
timems = INFINITE
else:
timems = ULONG(timeout.milliseconds)
povl = RefRwfsoOverlapped()
GC_ref(povl)
proc handleContinuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
loop.handles.excl(AsyncFD(handle))
if unregisterWait(povl.waitFd) == 0:
let err = osLastError()
if int(err) != ERROR_IO_PENDING:
GC_unref(povl)
retFuture.fail(newException(OSError, osErrorMsg(err)))
return
if povl.timerOrWait != 0:
GC_unref(povl)
retFuture.complete(false)
else:
GC_unref(povl)
retFuture.complete(true)
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
loop.handles.excl(AsyncFD(handle))
discard unregisterWait(povl.waitFd)
GC_unref(povl)
povl.data = CompletionData(fd: AsyncFD(handle), cb: handleContinuation)
povl.ioPort = loop.getIoHandler()
povl.handle = handle
loop.handles.incl(AsyncFD(handle))
if not registerWaitForSingleObject(addr povl.waitFd, povl.handle,
cast[WAITORTIMERCALLBACK](waitCallback),
cast[pointer](povl), timems, flags):
let err = osLastError()
GC_unref(povl)
loop.handles.excl(AsyncFD(handle))
retFuture.fail(newException(OSError, osErrorMsg(err)))
retFuture.cancelCallback = cancel
return retFuture
else:
proc getFd*(event: SelectEvent): cint =
type
EventType = object
fd: cint
PEventType = ptr EventType
var e = cast[PEventType](event)
result = e.fd
proc awaitForSelectEvent*(event: SelectEvent,
timeout: Duration): Future[bool] =
## Wait for Selectors' event SelectEvent in asynchronous way.
##
## Procedure returns ``true`` if state of event ``event`` become
## signalled, and ``false`` if timeout ``timeout`` occurs before
## event ``event`` become signaled.
var retFuture = newFuture[bool]("chronos.awaitForSelectEvent")
let loop = getThreadDispatcher()
var data: SelectorData
var moment: Moment
proc handleContinuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
loop.selector.unregister(event)
if isNil(udata):
retFuture.complete(false)
else:
retFuture.complete(true)
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
loop.selector.unregister(event)
if timeout != InfiniteDuration:
removeTimer(moment, handleContinuation, nil)
if timeout != InfiniteDuration:
moment = Moment.fromNow(timeout)
addTimer(moment, handleContinuation, nil)
let fd = event.getFd()
loop.selector.registerEvent(event, data)
withData(loop.selector, int(fd), adata) do:
adata.reader = AsyncCallback(function: handleContinuation,
udata: addr adata.rdata)
adata.rdata.fd = AsyncFD(fd)
adata.rdata.udata = nil
do:
retFuture.fail(newException(ValueError,
"Event descriptor not registered."))
retFuture.cancelCallback = cancel
return retFuture

378
task_runner/asyncsync.nim Normal file
View File

@ -0,0 +1,378 @@
# Task Runner synchronization primitives
# adapted from
# Chronos synchronization primitives
# (github.com/status-im/nim-chronos/pull/45)
#
# (c) Copyright 2018-Present Eugene Kabanov
# (c) Copyright 2018-Present Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import os
import chronos/[asyncloop, asyncsync, handles, timer]
export asyncsync
import ./osapi
when defined(windows):
from ./asyncloop as extracted_asyncloop import awaitForSingleObject
const hasThreadSupport* = compileOption("threads")
when hasThreadSupport:
import locks
when defined(windows):
import winlean
else:
import posix
type
AsyncThreadEventImpl = object
when defined(linux):
efd: AsyncFD
elif defined(windows):
event: Handle
else:
when hasThreadSupport:
# We need this lock to have behavior equal to Windows' event object and
# Linux' eventfd descriptor. Otherwise our Event becomes Semaphore on
# BSD/MacOS/Solaris.
lock: Lock
flag: bool
rfd: AsyncFD
wfd: AsyncFD
AsyncThreadEvent* = ptr AsyncThreadEventImpl
## A primitive event object which can be shared between threads.
##
## An event manages a flag that can be set to `true` with the ``fire()``
## procedure.
## The ``wait()`` coroutine blocks until the flag is `false`.
## The ``waitSync()`` procedure blocks until the flag is `false`.
##
## If more than one coroutine blocked in ``wait()`` waiting for event state
## to be signalled, when event get fired, only ``ONE`` coroutine proceeds.
WaitResult* = enum
WaitSuccess, WaitTimeout, WaitFailed
proc newAsyncThreadEvent*(): AsyncThreadEvent =
## Create new AsyncThreadEvent event.
when defined(linux):
# On Linux we are using `eventfd`.
let fd = eventfd(0, 0)
if fd == -1:
raiseOSError(osLastError())
if not(setSocketBlocking(SocketHandle(fd), false)):
raiseOSError(osLastError())
result = cast[AsyncThreadEvent](allocShared0(sizeof(AsyncThreadEventImpl)))
result.efd = AsyncFD(fd)
elif defined(windows):
# On Windows we are using kernel Event object.
let event = osapi.createEvent(nil, DWORD(0), DWORD(0), nil)
if event == Handle(0):
raiseOSError(osLastError())
result = cast[AsyncThreadEvent](allocShared0(sizeof(AsyncThreadEventImpl)))
result.event = event
else:
# On all other posix systems we are using anonymous pipe.
var (rfd, wfd) = createAsyncPipe()
# CHANGED :: cast to int32
if rfd.int32 == asyncInvalidPipe.int32 or wfd.int32 == asyncInvalidPipe.int32:
raiseOSError(osLastError())
if not(setSocketBlocking(SocketHandle(wfd), true)):
raiseOSError(osLastError())
result = cast[AsyncThreadEvent](allocShared0(sizeof(AsyncThreadEventImpl)))
# CHANGED :: cast to AsyncFD
result.rfd = cast[AsyncFD](rfd)
result.wfd = cast[AsyncFD](wfd)
result.flag = false
when hasThreadSupport:
initLock(result.lock)
proc close*(event: AsyncThreadEvent) =
## Close AsyncThreadEvent ``event`` and free all the resources.
when defined(linux):
let loop = getThreadDispatcher()
if event.efd in loop:
unregister(event.efd)
discard posix.close(cint(event.efd))
elif defined(windows):
discard winlean.closeHandle(event.event)
else:
let loop = getThreadDispatcher()
when hasThreadSupport:
acquire(event.lock)
if event.rfd in loop:
unregister(event.rfd)
discard posix.close(cint(event.rfd))
discard posix.close(cint(event.wfd))
when hasThreadSupport:
deinitLock(event.lock)
deallocShared(event)
proc fire*(event: AsyncThreadEvent) =
## Set state of AsyncThreadEvent ``event`` to signalled.
when defined(linux):
var data = 1'u64
while true:
if posix.write(cint(event.efd), addr data, sizeof(uint64)) == -1:
let err = osLastError()
if cint(err) == posix.EINTR:
continue
raiseOSError(osLastError())
break
elif defined(windows):
if setEvent(event.event) == 0:
raiseOSError(osLastError())
else:
var data = 1'u64
when hasThreadSupport:
acquire(event.lock)
try:
if not(event.flag):
while true:
if posix.write(cint(event.wfd), addr data, sizeof(uint64)) == -1:
let err = osLastError()
if cint(err) == posix.EINTR:
continue
raiseOSError(osLastError())
break
event.flag = true
finally:
release(event.lock)
else:
if not(event.flag):
while true:
if posix.write(cint(event.wfd), addr data, sizeof(uint64)) == -1:
let err = osLastError()
if cint(err) == posix.EINTR:
continue
raiseOSError(osLastError())
break
event.flag = true
when defined(windows):
proc wait*(event: AsyncThreadEvent,
timeout: Duration = InfiniteDuration): Future[WaitResult] {.async.} =
## Block until the internal flag of ``event`` is `true`. This procedure is
## coroutine.
##
## Procedure returns ``WaitSuccess`` when internal event's state is
## signaled. Returns ``WaitTimeout`` when timeout interval elapsed, and the
## event's state is nonsignaled. Returns ``WaitFailed`` if error happens
## while waiting.
try:
let res = await awaitForSingleObject(event.event, timeout)
if res:
result = WaitSuccess
else:
result = WaitTimeout
except OSError:
result = WaitFailed
except AsyncError:
result = WaitFailed
proc waitSync*(event: AsyncThreadEvent,
timeout: Duration = InfiniteDuration): WaitResult =
## Block until the internal flag of ``event`` is `true`. This procedure is
## ``NOT`` coroutine, so it is actually blocks, but this procedure do not
## need asynchronous event loop to be present.
##
## Procedure returns ``WaitSuccess`` when internal event's state is
## signaled. Returns ``WaitTimeout`` when timeout interval elapsed, and the
## event's state is nonsignaled. Returns ``WaitFailed`` if error happens
## while waiting.
var timeoutWin: DWORD
if timeout.isInfinite():
timeoutWin = INFINITE
else:
timeoutWin = DWORD(timeout.milliseconds)
let res = waitForSingleObject(event.event, timeoutWin)
if res == WAIT_OBJECT_0:
result = WaitSuccess
elif res == winlean.WAIT_TIMEOUT:
result = WaitTimeout
else:
result = WaitFailed
else:
proc wait*(event: AsyncThreadEvent,
timeout: Duration = InfiniteDuration): Future[WaitResult] =
## Block until the internal flag of ``event`` is `true`.
##
## Procedure returns ``WaitSuccess`` when internal event's state is
## signaled. Returns ``WaitTimeout`` when timeout interval elapsed, and the
## event's state is nonsignaled. Returns ``WaitFailed`` if error happens
## while waiting.
var moment: Moment
var retFuture = newFuture[WaitResult]("mtevent.wait")
let loop = getThreadDispatcher()
when defined(linux):
let fd = AsyncFD(event.efd)
else:
let fd = AsyncFD(event.rfd)
proc contiunuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
var data: uint64 = 0
if isNil(udata):
removeReader(fd)
retFuture.complete(WaitTimeout)
else:
while true:
if posix.read(cint(fd), addr data,
sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if cint(err) == posix.EINTR:
# This error happens when interrupt signal was received by
# process so we need to repeat `read` syscall.
continue
elif cint(err) == posix.EAGAIN or
cint(err) == posix.EWOULDBLOCK:
# This error happens when there already pending `read` syscall
# in different thread for this descriptor. This is race
# condition, so to avoid it we will wait for another `read`
# event from system queue.
break
else:
# All other errors
removeReader(fd)
retFuture.complete(WaitFailed)
else:
removeReader(fd)
when not(defined(linux)):
when hasThreadSupport:
acquire(event.lock)
event.flag = false
when hasThreadSupport:
release(event.lock)
retFuture.complete(WaitSuccess)
break
proc cancel(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
removeTimer(moment, contiunuation, nil)
removeReader(fd)
if fd notin loop:
register(fd)
addReader(fd, contiunuation, cast[pointer](retFuture))
if not(timeout.isInfinite()):
moment = Moment.fromNow(timeout)
addTimer(moment, contiunuation, nil)
retFuture.cancelCallback = cancel
return retFuture
proc waitReady(fd: int, timeout: var Duration): WaitResult {.inline.} =
var tv: Timeval
var ptv: ptr Timeval = addr tv
var rset: TFdSet
posix.FD_ZERO(rset)
posix.FD_SET(SocketHandle(fd), rset)
if timeout.isInfinite():
ptv = nil
else:
tv = timeout.toTimeval()
while true:
let nfd = fd + 1
var smoment = Moment.now()
let res = posix.select(cint(nfd), addr rset, nil, nil, ptv)
var emoment = Moment.now()
if res == 1:
result = WaitSuccess
if not(timeout.isInfinite()):
timeout = timeout - (emoment - smoment)
break
elif res == 0:
result = WaitTimeout
if not(timeout.isInfinite()):
timeout = ZeroDuration
break
elif res == -1:
let err = osLastError()
if int(err) == EINTR:
if not(timeout.isInfinite()):
tv = (emoment - smoment).toTimeval()
continue
proc waitSync*(event: AsyncThreadEvent,
timeout: Duration = InfiniteDuration): WaitResult =
## Block until the internal flag of ``event`` is `true`. This procedure is
## ``NOT`` coroutine, so it is actually blocks, but this procedure do not
## need asynchronous event loop to be present.
##
## Procedure returns ``WaitSuccess`` when internal event's state is
## signaled. Returns ``WaitTimeout`` when timeout interval elapsed, and the
## event's state is nonsignaled. Returns ``WaitFailed`` if error happens
## while waiting.
var data = 0'u64
when defined(linux):
var fd = int(event.efd)
else:
var fd = int(event.rfd)
var curtimeout = timeout
while true:
var repeat = false
let res = waitReady(fd, curtimeout)
if res == WaitSuccess:
# Updating timeout value for next iteration.
when defined(linux):
while true:
if posix.read(cint(fd), addr data,
sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if cint(err) == posix.EINTR:
continue
elif cint(err) == posix.EAGAIN or
cint(err) == posix.EWOULDBLOCK:
# This error happens when there already pending `read` syscall
# in different thread for this descriptor.
repeat = true
break
result = WaitFailed
else:
result = WaitSuccess
break
else:
when hasThreadSupport:
acquire(event.lock)
while true:
if posix.read(cint(fd), addr data,
sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if cint(err) == posix.EINTR:
continue
elif cint(err) == posix.EAGAIN or
cint(err) == posix.EWOULDBLOCK:
# This error happens when there already pending `read` syscall
# in different thread for this descriptor.
repeat = true
break
else:
result = WaitFailed
else:
result = WaitSuccess
break
if repeat:
when hasThreadSupport:
release(event.lock)
discard
else:
event.flag = false
when hasThreadSupport:
release(event.lock)
else:
result = res
if not(repeat):
break

69
task_runner/osapi.nim Normal file
View File

@ -0,0 +1,69 @@
# Task Runner OS API declarations
# adapted from
# Chronos OS API declarations
# (github.com/status-im/nim-chronos/pull/45)
#
# (c) Copyright 2018-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
## This module implements a small wrapper for some needed Windows/*nix API
## procedures, which are not defined in Nim stdlib modules, or its definition
## is wrong.
when defined(windows):
import winlean
const
TCP_NODELAY* = 1
IPPROTO_TCP* = 6
PIPE_TYPE_BYTE* = 0x00000000'i32
PIPE_READMODE_BYTE* = 0x00000000'i32
PIPE_WAIT* = 0x00000000'i32
DEFAULT_PIPE_SIZE* = 65536'i32
ERROR_PIPE_CONNECTED* = 535
ERROR_PIPE_BUSY* = 231
ERROR_OPERATION_ABORTED* = 995
ERROR_SUCCESS* = 0
ERROR_CONNECTION_REFUSED* = 1225
PIPE_TYPE_MESSAGE* = 0x4
PIPE_READMODE_MESSAGE* = 0x2
PIPE_UNLIMITED_INSTANCES* = 255
ERROR_BROKEN_PIPE* = 109
ERROR_PIPE_NOT_CONNECTED* = 233
ERROR_NO_DATA* = 232
ERROR_CONNECTION_ABORTED* = 1236
proc createEvent*(lpEventAttributes: ptr SECURITY_ATTRIBUTES,
bManualReset: DWORD, bInitialState: DWORD,
lpName: ptr Utf16Char): Handle
{.stdcall, dynlib: "kernel32", importc: "CreateEventW".}
proc connectNamedPipe*(hNamedPipe: Handle, lpOverlapped: pointer): WINBOOL
{.importc: "ConnectNamedPipe", stdcall, dynlib: "kernel32".}
proc cancelIo*(hFile: HANDLE): WINBOOL
{.stdcall, dynlib: "kernel32", importc: "CancelIo".}
proc disconnectNamedPipe*(hPipe: HANDLE): WINBOOL
{.stdcall, dynlib: "kernel32", importc: "DisconnectNamedPipe".}
proc setNamedPipeHandleState*(hPipe: HANDLE, lpMode, lpMaxCollectionCount,
lpCollectDataTimeout: ptr DWORD): WINBOOL
{.stdcall, dynlib: "kernel32", importc: "SetNamedPipeHandleState".}
proc resetEvent*(hEvent: HANDLE): WINBOOL
{.stdcall, dynlib: "kernel32", importc: "ResetEvent".}
else:
import posix
const
TCP_NODELAY* = 1
IPPROTO_TCP* = 6
when defined(linux):
proc eventfd*(count: cuint, flags: cint): cint
{.cdecl, importc: "eventfd", header: "<sys/eventfd.h>".}

View File

@ -1,2 +0,0 @@
import
./test_task_runner

191
test/test_achannels.nim Normal file
View File

@ -0,0 +1,191 @@
# Task Runner Test Suite
# adapted from
# Chronos Test Suite
# (github.com/status-im/nim-chronos/pull/45)
#
# (c) Copyright 2018-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import unittest
import chronos
import ../task_runner
const
hasThreadSupport* = compileOption("threads")
TestRunsCount = 100
suite "Asynchronous channels test suite":
proc testStSync(runs: int, queue: int): bool =
var tun = newAsyncChannel[int](queue)
tun.open()
for i in 0..<runs:
tun.sendSync(i * 10)
var msg = tun.recvSync()
if msg != i * 10:
tun.close()
return false
tun.close()
return true
proc testStAsync(runs: int, queue: int): Future[bool] {.async.} =
var tun = newAsyncChannel[int](queue)
tun.open()
for i in 0..<runs:
await tun.send(i * 10)
var msg = await tun.recv()
if msg != i * 10:
tun.close()
return false
tun.close()
return true
proc testStCombined(runs: int, queue: int): Future[bool] {.async.} =
var tun = newAsyncChannel[int](queue)
tun.open()
for i in 0..<runs:
tun.sendSync(i * 10)
var msg = await tun.recv()
if msg != i * 10:
tun.close()
return false
for i in 0..<runs:
await tun.send(i * 10)
var msg = tun.recvSync()
if msg != i * 10:
tun.close()
return false
tun.close()
return true
when hasThreadSupport:
type
ThreadArg = object
runs: int
tun: AsyncChannel[int]
proc threadSyncFunc1(arg: ThreadArg) {.thread.} =
# echo "Sending thread started [", getThreadId(), "]"
arg.tun.open()
for i in 0..<arg.runs:
arg.tun.sendSync(i * 10)
arg.tun.close()
# echo "Sending thread finished [", getThreadId(), "]"
proc testMtSync(threads: int, runs: int, queue: int): bool =
var tun = newAsyncChannel[int](queue)
var arg = ThreadArg(tun: tun, runs: runs)
var thrs = newSeq[Thread[ThreadArg]](threads)
for i in 0..<threads:
createThread(thrs[i], threadSyncFunc1, arg)
var total = threads * runs
tun.open()
for i in 0..<total:
var msg = arg.tun.recvSync()
joinThreads(thrs)
tun.close()
result = true
proc threadAsyncFunc(arg: ThreadArg) {.async.} =
arg.tun.open()
for i in 0..<arg.runs:
await arg.tun.send(i * 10)
arg.tun.close()
proc threadSyncFunc2(arg: ThreadArg) {.thread.} =
waitFor threadAsyncFunc(arg)
proc asyncReceiver(arg: ThreadArg, total: int) {.async.} =
arg.tun.open()
for i in 0..<total:
var msg = await arg.tun.recv()
arg.tun.close()
proc testMtAsync(threads: int, runs: int, queue: int): bool =
var tun = newAsyncChannel[int](queue)
var arg = ThreadArg(tun: tun, runs: runs)
var thrs = newSeq[Thread[ThreadArg]](threads)
for i in 0..<threads:
createThread(thrs[i], threadSyncFunc2, arg)
var total = threads * runs
waitFor asyncReceiver(arg, total)
joinThreads(thrs)
result = true
proc threadCombinedAsyncFunc(arg: ThreadArg) {.async.} =
arg.tun.open()
for i in 0..<arg.runs:
await arg.tun.send(i * 10)
arg.tun.close()
proc threadCombinedFunc(arg: ThreadArg) {.thread.} =
arg.tun.open()
for i in 0..<arg.runs:
arg.tun.sendSync(i * 10)
arg.tun.close()
waitFor threadCombinedAsyncFunc(arg)
proc testMtCombined(threads: int, runs: int, queue: int): bool =
var tun = newAsyncChannel[int](queue)
var arg = ThreadArg(tun: tun, runs: runs)
var thrs = newSeq[Thread[ThreadArg]](threads)
for i in 0..<threads:
createThread(thrs[i], threadCombinedFunc, arg)
var total = threads * runs
tun.open()
waitFor asyncReceiver(arg, total)
for i in 0..<total:
var msg = arg.tun.recvSync()
joinThreads(thrs)
tun.close()
result = true
test "Single-threaded synchronous with infinite queue test":
check testStSync(500, -1) == true
test "Single-threaded synchronous with limited queue test":
check testStSync(500, 10) == true
test "Single-threaded asynchronous with infinite queue test":
check waitFor(testStAsync(500, -1)) == true
test "Single-threaded synchronous with limited queue test":
check waitFor(testStAsync(500, 10)) == true
test "Single-threaded sync + async with infinite queue test":
check waitFor(testStCombined(250, -1)) == true
test "Single-threaded sync + async with limited queue test":
check waitFor(testStCombined(250, 10)) == true
test "Multi-threaded synchronous with infinite queue test":
when hasThreadSupport:
check testMtSync(10, TestRunsCount, -1) == true
else:
skip()
test "Multi-threaded synchronous with limited queue test":
when hasThreadSupport:
check testMtSync(10, TestRunsCount, 10) == true
else:
skip()
test "Multi-threaded asynchronous with infinite queue test":
when hasThreadSupport:
check testMtAsync(10, TestRunsCount, -1) == true
else:
skip()
test "Multi-threaded asynchronous with limited queue test":
when hasThreadSupport:
check testMtAsync(10, TestRunsCount, 10) == true
else:
skip()
test "Multi-threaded sync + async with infinite queue test":
when hasThreadSupport:
check testMtCombined(10, TestRunsCount, -1) == true
else:
skip()
test "Multi-threaded sync + async with limited queue test":
when hasThreadSupport:
check testMtCombined(10, TestRunsCount, 10) == true
else:
skip()

13
test/test_all.nim Normal file
View File

@ -0,0 +1,13 @@
# Task Runner Test Suite
# adapted in parts from
# Chronos Test Suite
#
# (c) Copyright 2018-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import
./test_achannels,
./test_task_runner

View File

@ -1,12 +0,0 @@
import chronos, unittest
template asyncTest*(name, body: untyped) =
test name:
proc scenario {.async.} = body
waitFor scenario()
template procSuite*(name, body: untyped) =
proc suitePayload =
suite name:
body
suitePayload()

View File

@ -1,11 +1,21 @@
# Task Runner Test Suite
# adapted in parts from
# Chronos Test Suite
#
# (c) Copyright 2018-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import unittest
import
../task_runner,
./test_helpers
import chronos
suite "task_runner test stub":
test "foo test":
import ../task_runner
suite "Task runner test suite":
test "dummy test":
check:
foo() == "bar"
1 == 1

2
vendor/nim-bearssl vendored

@ -1 +1 @@
Subproject commit ba5f4687987817902c2727e30b35cb5ad1e61203
Subproject commit eebf730ccda5b5fade2a8f48b3da1496f2c47ba5

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit 46c0bf3c5aff131ac437a88c00aa112133c94d54
Subproject commit c066bfcb16482d82ef5b6fdbac85ec8f7565d56c

2
vendor/nim-stew vendored

@ -1 +1 @@
Subproject commit 1401c34374fe7606dbf1252e361725415890f5f0
Subproject commit 932fa6cef129126064e8c6eda46dcca6fbd555ad

@ -1 +1 @@
Subproject commit e2de003ce634deca72dab3b3c79426698e7b8579
Subproject commit 53870bc6bccf0f09f9c50a923d1fa1fb235d2db0