feat: introduce Task Manager threadpool

The `TaskManager` threadpool is a memory-safe replacement for the `spawnAndSend` operations that are currently causing memory issues in status-desktop.

From a fundamental memory management point of view, `libstatus/settings`, `libstatus/contracts`, and `libstatus/tokens` (custom tokens) have all been converted to `{.threadvar.}`s and `Atomic[bool]`s to maintain the cache and `dirty` flag across threads, respectively, eliminating the need for thread locks and incorrect `{.gcsafe.}` compiler overrides.

The successful [recyclable threadpool experiment from `nim-task-runner`](https://github.com/status-im/nim-task-runner/blob/test/use-cases/test/use_cases/test_sync.nim) using `AsyncChannel[ThreadSafeString]`s was brought over to `status-desktop` and implemented in somewhat of a hardcoded manner, as we knew this would save some time instead of trying to create a fully fleshed out `nim-task-runner` API and build a miraculous macro that may or may not be able to generate the needed API.

The threadpool is started by the `TaskManager` and both the `TaskManager` and the `TaskManager`'s threadpool are started as early as possible in the application lifecycle (in `nim_status_client.nim`). The `TaskManager` creates a thread to run the threadpool. During its initialization, the threadpool then spools up all the threads it will manage and puts them in an idle thread sequence. This is to prevent expensive thread creation and teardown happening during the app's lifetime as it is quite expensive and blocks the main thread. When tasks comes in to the pool, the task is sent to an idle thread, or put in a queue if all threads are busy. The idle thread is moved to the busy thread sequence. When a task is completed, the thread is taken out of the busy threads sequence and moved back in to the sequence of idle threads, effectively recycling it.

The first `spawnAndSend` we were able to change over to the new threadpool was `estimate`, which estimates the gas of a sticker purchase transaction.

From the consumer point of view, the existing `spawnAndSend` to achieve this looks like:
```nim
  proc estimate*(self: StickersView, packId: int, address: string, price: string, uuid: string) {.slot.} =
    let status_stickers = self.status.stickers
    spawnAndSend(self, "setGasEstimate") do:
      var success: bool
      var estimate = status_stickers.estimateGas(packId, address, price, success)
      if not success:
        estimate = 325000
      let result: tuple[estimate: int, uuid: string] = (estimate, uuid)
      Json.encode(result)
```
And the new syntax looks like this:
```nim
  proc estimate*(self: StickersView, packId: int, address: string, price: string, uuid: string) {.slot.} =
    self.status.taskManager.threadPool.stickers.stickerPackPurchaseGasEstimate(cast[pointer](self.vptr), "setGasEstimate", packId, address, price, uuid)
```
The logic inside the `spawnAndSend` body was moved to [src/status/tasks/stickers.nim](https://github.com/status-im/status-desktop/compare/experiment/tasks-3?expand=1#diff-09e57eef00b0cee5c4abdb9039f948d8372e7003e09e934a9b4c7e9167d47658).

This is just the first migration of `spawnAndSend`, however moving the majority of the remaining `spawnAndSend`s will likely just be an exercise in copy/pasta. There will be one or two that may require a bit more thinking, depending how they rely on data from the model.

Once the `spawnAndSend`s have been converted to the threadpool, we can start implementing the [long-running process from the task runner use case experiments](https://github.com/status-im/nim-task-runner/blob/test/use-cases/test/use_cases/test_long_running.nim).

And finally, we can then implement the [async tasks](https://github.com/status-im/nim-task-runner/blob/test/use-cases/test/use_cases/test_async.nim) if needed.

@michaelsbradleyjr and I spent many hours digging in to the depths of nim's memory management in an attempt to understand it. We have created [a presentation with our task runner experiment findings](https://docs.google.com/presentation/d/1ItCxAfsVTcIoH_E4bgvmHljhbU-tC3T6K2A6ahwAedk/edit?usp=sharing), and @michaelsbradleyjr has spent time [answering questions off the back of that presentation.](https://gist.github.com/michaelsbradleyjr/1eaa9937b3fbb4ffff3fb814f0dd82a9).

We have created a fork of `edn.nim` at `status-im/edn.nim` and we need the PR to be merged and the commit hash updated before we can merge this PR in to `status-desktop`.
This commit is contained in:
Eric Mastro 2021-03-17 17:25:41 +11:00 committed by Iuri Matias
parent 6b6a318a8c
commit 66912fd811
20 changed files with 681 additions and 330 deletions

1
.gitignore vendored
View File

@ -4,6 +4,7 @@ noBackup/
*.pro.user
*.pro.autosave
*.qml.autosave
.update.timestamp
.vscode
bin/
/bottles/

6
.gitmodules vendored
View File

@ -91,3 +91,9 @@
[submodule "vendor/nim-status-go"]
path = vendor/nim-status-go
url = https://github.com/status-im/nim-status-go.git
[submodule "vendor/nim-task-runner"]
path = vendor/nim-task-runner
url = https://github.com/status-im/nim-task-runner.git
[submodule "vendor/edn.nim"]
path = vendor/edn.nim
url = https://github.com/status-im/edn.nim.git

View File

@ -5,6 +5,7 @@ import ../../../status/libstatus/stickers as status_stickers
import ../../../status/libstatus/wallet as status_wallet
import sticker_pack_list, sticker_list, chat_item
import json_serialization
import ../../../status/tasks/task_manager
logScope:
topics = "stickers-view"
@ -45,14 +46,7 @@ QtObject:
proc transactionCompleted*(self: StickersView, success: bool, txHash: string, revertReason: string = "") {.signal.}
proc estimate*(self: StickersView, packId: int, address: string, price: string, uuid: string) {.slot.} =
let status_stickers = self.status.stickers
spawnAndSend(self, "setGasEstimate") do:
var success: bool
var estimate = status_stickers.estimateGas(packId, address, price, success)
if not success:
estimate = 325000
let result: tuple[estimate: int, uuid: string] = (estimate, uuid)
Json.encode(result)
self.status.taskManager.threadPool.stickers.stickerPackPurchaseGasEstimate(cast[pointer](self.vptr), "setGasEstimate", packId, address, price, uuid)
proc gasEstimateReturned*(self: StickersView, estimate: int, uuid: string) {.signal.}

View File

@ -491,9 +491,8 @@ QtObject:
proc gasPricePredictionsChanged*(self: WalletView) {.signal.}
proc getGasPricePredictions*(self: WalletView) {.slot.} =
let walletModel = self.status.wallet
spawnAndSend(self, "getGasPricePredictionsResult") do:
$ %walletModel.getGasPricePredictions()
$ %getGasPricePredictions2()
proc getGasPricePredictionsResult(self: WalletView, gasPricePredictionsJson: string) {.slot.} =
let prediction = Json.decode(gasPricePredictionsJson, GasPricePrediction)

View File

@ -10,11 +10,13 @@ import app/onboarding/core as onboarding
import app/login/core as login
import app/provider/core as provider
import status/signals/core as signals
import status/tasks/task_manager
import status/libstatus/types
import status/libstatus/accounts/constants
import status_go
import status/status as statuslib
import ./eventemitter
import chronos, task_runner
var signalsQObjPointer: pointer
@ -28,7 +30,9 @@ proc mainProc() =
else:
"/../fleets.json"
let status = statuslib.newStatusInstance(readFile(joinPath(getAppDir(), fleets)))
let taskManager = newTaskManager()
taskManager.init()
let status = statuslib.newStatusInstance(taskManager, readFile(joinPath(getAppDir(), fleets)))
status.initNode()
enableHDPI()
@ -132,6 +136,7 @@ proc mainProc() =
wallet.checkPendingTransactions()
wallet.start()
engine.setRootContextProperty("loginModel", login.variant)
engine.setRootContextProperty("onboardingModel", onboarding.variant)
@ -152,6 +157,7 @@ proc mainProc() =
profile.delete()
utilsController.delete()
browserController.delete()
taskManager.teardown()
# Initialize only controllers whose init functions
@ -188,9 +194,7 @@ proc mainProc() =
# it will be passed as a regular C function to libstatus. This means that
# we cannot capture any local variables here (we must rely on globals)
var callback: SignalCallback = proc(p0: cstring) {.cdecl.} =
setupForeignThreadGc()
signal_handler(signalsQObjPointer, p0, "receiveSignal")
tearDownForeignThreadGc()
status_go.setSignalEventCallback(callback)

View File

@ -295,8 +295,10 @@ proc decodeENSContentHash*(value: string): tuple[ensType: ENSType, output: strin
# 12 = identifies sha2-256 hash
# 20 = multihash length = 32
# ...rest = multihash digest
let multiHash = MultiHash.init(nimcrypto.fromHex(multiHashStr)).get()
return (ENSType.IPFS, $Cid.init(CIDv0, MultiCodec.codec(codec), multiHash))
let
multiHash = MultiHash.init(nimcrypto.fromHex(multiHashStr)).get()
decoded = Cid.init(CIDv0, MultiCodec.codec(codec), multiHash).get()
return (ENSType.IPFS, $decoded)
except Exception as e:
error "Error decoding ENS contenthash", hash=value, exception=e.msg
raise

View File

@ -1,5 +1,5 @@
import
sequtils, sugar, macros, tables, strutils, locks
sequtils, sugar, macros, tables, strutils
import
web3/ethtypes, stew/byteutils, nimcrypto, json_serialization, chronicles
@ -16,9 +16,6 @@ export
logScope:
topics = "contracts"
var contractsLock: Lock
initLock(contractsLock)
const ERC20_METHODS = @[
("name", Method(signature: "name()")),
("symbol", Method(signature: "symbol()")),
@ -80,7 +77,14 @@ proc newErc721Contract(name: string, network: Network, address: Address, symbol:
Erc721Contract(name: name, network: network, address: address, symbol: symbol, hasIcon: hasIcon, methods: ERC721_ENUMERABLE_METHODS.concat(addlMethods).toTable)
var ALL_CONTRACTS {.guard: contractsLock.}: seq[Contract] = @[
var
contracts {.threadvar.}: seq[Contract]
contractsInited {.threadvar.}: bool
proc allContracts(): seq[Contract] =
if contractsInited:
result = contracts
else:
contracts = @[
# Mainnet contracts
newErc20Contract("Status Network Token", Network.Mainnet, parseAddress("0x744d70fdbe2ba4cf95131626614a1763df805b9e"), "SNT", 18, true),
newErc20Contract("Dai Stablecoin", Network.Mainnet, parseAddress("0x6b175474e89094c44da98b954eedeac495271d0f"), "DAI", 18, true),
@ -254,11 +258,11 @@ var ALL_CONTRACTS {.guard: contractsLock.}: seq[Contract] = @[
newErc20Contract("Akropolis", Network.Mainnet, parseAddress("0x8ab7404063ec4dbcfd4598215992dc3f8ec853d7"), "AKRO", 18, true),
newErc20Contract("Orchid", Network.Mainnet, parseAddress("0x4575f41308EC1483f3d399aa9a2826d74Da13Deb"), "OXT", 18, false),
]
contractsInited = true
result = contracts
proc getContract(network: Network, name: string): Contract =
{.gcsafe.}:
withLock contractsLock:
let found = ALL_CONTRACTS.filter(contract => contract.name == name and contract.network == network)
let found = allContracts().filter(contract => contract.name == name and contract.network == network)
result = if found.len > 0: found[0] else: nil
proc getContract*(name: string): Contract =
@ -275,26 +279,18 @@ proc getErc20ContractByAddress*(contracts: seq[Erc20Contract], address: Address)
proc getErc20Contract*(symbol: string): Erc20Contract =
let network = settings.getCurrentNetwork()
{.gcsafe.}:
withLock contractsLock:
result = ALL_CONTRACTS.filter(contract => contract.network == network and contract of Erc20Contract).map(contract => Erc20Contract(contract)).getErc20ContractBySymbol(symbol)
result = allContracts().filter(contract => contract.network == network and contract of Erc20Contract).map(contract => Erc20Contract(contract)).getErc20ContractBySymbol(symbol)
proc getErc20Contract*(address: Address): Erc20Contract =
let network = settings.getCurrentNetwork()
{.gcsafe.}:
withLock contractsLock:
result = ALL_CONTRACTS.filter(contract => contract.network == network and contract of Erc20Contract).map(contract => Erc20Contract(contract)).getErc20ContractByAddress(address)
result = allContracts().filter(contract => contract.network == network and contract of Erc20Contract).map(contract => Erc20Contract(contract)).getErc20ContractByAddress(address)
proc getErc20Contracts*(): seq[Erc20Contract] =
let network = settings.getCurrentNetwork()
{.gcsafe.}:
withLock contractsLock:
result = ALL_CONTRACTS.filter(contract => contract of Erc20Contract and contract.network == network).map(contract => Erc20Contract(contract))
result = allContracts().filter(contract => contract of Erc20Contract and contract.network == network).map(contract => Erc20Contract(contract))
proc getErc721Contract(network: Network, name: string): Erc721Contract =
{.gcsafe.}:
withLock contractsLock:
let found = ALL_CONTRACTS.filter(contract => contract of Erc721Contract and Erc721Contract(contract).name.toLower == name.toLower and contract.network == network)
let found = allContracts().filter(contract => contract of Erc721Contract and Erc721Contract(contract).name.toLower == name.toLower and contract.network == network)
result = if found.len > 0: Erc721Contract(found[0]) else: nil
proc getErc721Contract*(name: string): Erc721Contract =
@ -303,9 +299,7 @@ proc getErc721Contract*(name: string): Erc721Contract =
proc getErc721Contracts*(): seq[Erc721Contract] =
let network = settings.getCurrentNetwork()
{.gcsafe.}:
withLock contractsLock:
result = ALL_CONTRACTS.filter(contract => contract of Erc721Contract and contract.network == network).map(contract => Erc721Contract(contract))
result = allContracts().filter(contract => contract of Erc721Contract and contract.network == network).map(contract => Erc721Contract(contract))
proc getSntContract*(): Erc20Contract =
if settings.getCurrentNetwork() == Network.Mainnet:

View File

@ -1,37 +1,46 @@
import core, ./types, ../signals/types as statusgo_types, ./accounts/constants, ./utils
import json, tables, sugar, sequtils, strutils
import json_serialization
import locks
import uuids
import
json, tables, sugar, sequtils, strutils, atomics
var settingsLock {.global.}: Lock
initLock(settingsLock)
import
json_serialization, chronicles, uuids
var settings = %*{}
var dirty = true
import
./core, ./types, ../signals/types as statusgo_types, ./accounts/constants,
./utils
var
settings {.threadvar.}: JsonNode
settingsInited {.threadvar.}: bool
dirty: Atomic[bool]
dirty.store(true)
settings = %* {}
proc saveSetting*(key: Setting, value: string | JsonNode): StatusGoError =
withLock settingsLock:
try:
let response = callPrivateRPC("settings_saveSetting", %* [key, value])
result = Json.decode($response, StatusGoError)
except:
dirty = true
let responseResult = $(response.parseJSON(){"result"})
if responseResult == "null":
result.error = ""
else: result = Json.decode(response, StatusGoError)
except Exception as e:
error "Error saving setting", key=key, value=value, msg=e.msg
dirty.store(true)
proc getWeb3ClientVersion*(): string =
parseJson(callPrivateRPC("web3_clientVersion"))["result"].getStr
proc getSettings*(useCached: bool = true, keepSensitiveData: bool = false): JsonNode =
withLock settingsLock:
{.gcsafe.}:
if useCached and not dirty and not keepSensitiveData:
let cacheIsDirty = (not settingsInited) or dirty.load
if useCached and (not cacheIsDirty) and (not keepSensitiveData):
result = settings
else:
result = callPrivateRPC("settings_getSettings").parseJSON()["result"]
if not keepSensitiveData:
dirty = false
dirty.store(false)
delete(result, "mnemonic")
settings = result
settingsInited = true
proc getSetting*[T](name: Setting, defaultValue: T, useCached: bool = true): T =
let settings: JsonNode = getSettings(useCached, $name == "mnemonic")

View File

@ -47,7 +47,8 @@ proc decodeContentHash*(value: string): string =
# 20 = multihash length = 32
# ...rest = multihash digest
let multiHash = MultiHash.init(nimcrypto.fromHex(multiHashStr)).get()
result = $Cid.init(CIDv0, MultiCodec.codec(codec), multiHash)
let resultTyped = Cid.init(CIDv0, MultiCodec.codec(codec), multiHash).get()
result = $resultTyped
trace "Decoded sticker hash", cid=result
except Exception as e:
error "Error decoding sticker", hash=value, exception=e.msg
@ -95,7 +96,6 @@ proc getPackCount*(): int =
# Gets sticker pack data
proc getPackData*(id: Stuint[256]): StickerPack =
{.gcsafe.}:
let
contract = contracts.getContract("stickers")
contractMethod = contract.methods["getPackData"]

View File

@ -1,26 +1,27 @@
import json, chronicles, strformat, stint, strutils, sequtils, tables
import core, wallet
import ./eth/contracts
import web3/[ethtypes, conversions]
import json_serialization
import settings
import
json, chronicles, strformat, stint, strutils, sequtils, tables, atomics
import
web3/[ethtypes, conversions], json_serialization
import
./settings, ./core, ./wallet, ./eth/contracts
from types import Setting, Network, RpcResponse, RpcException
from utils import parseAddress
import locks
logScope:
topics = "wallet"
var customTokensLock: Lock
initLock(customTokensLock)
var
customTokens {.threadvar.}: seq[Erc20Contract]
customTokensInited {.threadvar.}: bool
dirty: Atomic[bool]
var customTokens {.guard: customTokensLock.}: seq[Erc20Contract] = @[]
var dirty {.guard: customTokensLock.} = true
dirty.store(true)
proc getCustomTokens*(useCached: bool = true): seq[Erc20Contract] =
{.gcsafe.}:
withLock customTokensLock:
if useCached and not dirty:
let cacheIsDirty = not customTokensInited or dirty.load
if useCached and not cacheIsDirty:
result = customTokens
else:
let payload = %* []
@ -31,8 +32,9 @@ proc getCustomTokens*(useCached: bool = true): seq[Erc20Contract] =
if not response.error.isNil:
raise newException(RpcException, "Error getting custom tokens: " & response.error.message)
result = if response.result == "null": @[] else: Json.decode(response.result, seq[Erc20Contract])
dirty = false
dirty.store(false)
customTokens = result
customTokensInited = true
proc visibleTokensSNTDefault(): JsonNode =
let currentNetwork = getCurrentNetwork()
@ -85,14 +87,12 @@ proc getVisibleTokens*(): seq[Erc20Contract] =
proc addCustomToken*(address: string, name: string, symbol: string, decimals: int, color: string) =
let payload = %* [{"address": address, "name": name, "symbol": symbol, "decimals": decimals, "color": color}]
discard callPrivateRPC("wallet_addCustomToken", payload)
withLock customTokensLock:
dirty = true
dirty.store(true)
proc removeCustomToken*(address: string) =
let payload = %* [address]
echo callPrivateRPC("wallet_deleteCustomToken", payload)
withLock customTokensLock:
dirty = true
dirty.store(true)
proc getTokensBalances*(accounts: openArray[string], tokens: openArray[string]): JsonNode =
let payload = %* [accounts, tokens]

View File

@ -4,6 +4,7 @@ import libstatus/settings as libstatus_settings
import libstatus/types as libstatus_types
import chat, accounts, wallet, node, network, mailservers, messages, contacts, profile, stickers, permissions, fleet
import ../eventemitter
import tasks/task_manager
export chat, accounts, node, mailservers, messages, contacts, profile, network, permissions, fleet
@ -21,9 +22,11 @@ type Status* = ref object
network*: NetworkModel
stickers*: StickersModel
permissions*: PermissionsModel
taskManager*: TaskManager
proc newStatusInstance*(fleetConfig: string): Status =
proc newStatusInstance*(taskManager: TaskManager, fleetConfig: string): Status =
result = Status()
result.taskManager = taskManager
result.events = createEventEmitter()
result.fleet = fleet.newFleetModel(result.events, fleetConfig)
result.chat = chat.newChatModel(result.events)

View File

@ -42,7 +42,7 @@ proc init*(self: StickersModel) =
var evArgs = StickerArgs(e)
self.addStickerToRecent(evArgs.sticker, evArgs.save)
proc buildTransaction(self: StickersModel, packId: Uint256, address: Address, price: Uint256, approveAndCall: var ApproveAndCall[100], sntContract: var Erc20Contract, gas = "", gasPrice = ""): EthSend =
proc buildTransaction(packId: Uint256, address: Address, price: Uint256, approveAndCall: var ApproveAndCall[100], sntContract: var Erc20Contract, gas = "", gasPrice = ""): EthSend =
sntContract = status_contracts.getSntContract()
let
stickerMktContract = status_contracts.getContract("sticker-market")
@ -51,11 +51,11 @@ proc buildTransaction(self: StickersModel, packId: Uint256, address: Address, pr
approveAndCall = ApproveAndCall[100](to: stickerMktContract.address, value: price, data: DynamicBytes[100].fromHex(buyTxAbiEncoded))
transactions.buildTokenTransaction(address, sntContract.address, gas, gasPrice)
proc estimateGas*(self: StickersModel, packId: int, address: string, price: string, success: var bool): int =
proc estimateGas*(packId: int, address: string, price: string, success: var bool): int =
var
approveAndCall: ApproveAndCall[100]
sntContract = status_contracts.getSntContract()
tx = self.buildTransaction(
tx = buildTransaction(
packId.u256,
parseAddress(address),
eth2Wei(parseFloat(price), sntContract.decimals),
@ -71,7 +71,7 @@ proc buyPack*(self: StickersModel, packId: int, address, price, gas, gasPrice, p
var
sntContract: Erc20Contract
approveAndCall: ApproveAndCall[100]
tx = self.buildTransaction(
tx = buildTransaction(
packId.u256,
parseAddress(address),
eth2Wei(parseFloat(price), 18), # SNT

View File

@ -0,0 +1,40 @@
import
chronos, NimQml, json_serialization, task_runner
import
../stickers
type
StickerPackPurchaseGasEstimate* = object
vptr*: ByteAddress
slot*: string
packId*: int
address*: string
price*: string
uuid*: string
StickersTasks* = ref object
chanSendToPool: AsyncChannel[ThreadSafeString]
proc newStickersTasks*(chanSendToPool: AsyncChannel[ThreadSafeString]): StickersTasks =
new(result)
result.chanSendToPool = chanSendToPool
proc runTask*(stickerPackPurchaseGasEstimate: StickerPackPurchaseGasEstimate) =
var success: bool
var estimate = estimateGas(
stickerPackPurchaseGasEstimate.packId,
stickerPackPurchaseGasEstimate.address,
stickerPackPurchaseGasEstimate.price,
success
)
if not success:
estimate = 325000
let result: tuple[estimate: int, uuid: string] = (estimate, stickerPackPurchaseGasEstimate.uuid)
let resultPayload = Json.encode(result)
signal_handler(cast[pointer](stickerPackPurchaseGasEstimate.vptr), resultPayload, stickerPackPurchaseGasEstimate.slot)
proc stickerPackPurchaseGasEstimate*(self: StickersTasks, vptr: pointer, slot: string, packId: int, address: string, price: string, uuid: string) =
let task = StickerPackPurchaseGasEstimate(vptr: cast[ByteAddress](vptr), slot: slot, packId: packId, address: address, price: price, uuid: uuid)
let payload = task.toJson(typeAnnotations = true)
self.chanSendToPool.sendSync(payload.safe)

View File

@ -0,0 +1,27 @@
import # vendor libs
chronicles, task_runner
import # status-desktop libs
./threadpool
export threadpool
logScope:
topics = "task-manager"
type
TaskManager* = ref object
threadPool*: ThreadPool
proc newTaskManager*(): TaskManager =
new(result)
result.threadPool = newThreadPool()
proc init*(self: TaskManager) =
self.threadPool.init()
proc teardown*(self: TaskManager) =
self.threadPool.teardown()

View File

@ -0,0 +1,256 @@
import
chronicles, chronos, json, json_serialization, NimQml, sequtils, tables,
task_runner
import
./stickers
export
stickers
logScope:
topics = "task-threadpool"
type
ThreadPool* = ref object
chanRecvFromPool*: AsyncChannel[ThreadSafeString]
chanSendToPool*: AsyncChannel[ThreadSafeString]
thread: Thread[PoolThreadArg]
size: int
stickers*: StickersTasks
PoolThreadArg* = object
chanSendToMain*: AsyncChannel[ThreadSafeString]
chanRecvFromMain*: AsyncChannel[ThreadSafeString]
size*: int
TaskThreadArg = object
id: int
chanRecvFromPool: AsyncChannel[ThreadSafeString]
chanSendToPool: AsyncChannel[ThreadSafeString]
ThreadNotification = object
id: int
notice: string
# forward declarations
proc poolThread(arg: PoolThreadArg) {.thread.}
const MaxThreadPoolSize = 16
proc newThreadPool*(size: int = MaxThreadPoolSize): ThreadPool =
new(result)
result.chanRecvFromPool = newAsyncChannel[ThreadSafeString](-1)
result.chanSendToPool = newAsyncChannel[ThreadSafeString](-1)
result.thread = Thread[PoolThreadArg]()
result.size = size
result.stickers = newStickersTasks(result.chanSendToPool)
proc init*(self: ThreadPool) =
self.chanRecvFromPool.open()
self.chanSendToPool.open()
let arg = PoolThreadArg(
chanSendToMain: self.chanRecvFromPool,
chanRecvFromMain: self.chanSendToPool,
size: self.size
)
createThread(self.thread, poolThread, arg)
# block until we receive "ready"
let received = $(self.chanRecvFromPool.recvSync())
proc teardown*(self: ThreadPool) =
self.chanSendToPool.sendSync("shutdown".safe)
self.chanRecvFromPool.close()
self.chanSendToPool.close()
joinThread(self.thread)
proc task(arg: TaskThreadArg) {.async.} =
arg.chanRecvFromPool.open()
arg.chanSendToPool.open()
let noticeToPool = ThreadNotification(id: arg.id, notice: "ready")
info "[threadpool task thread] sending 'ready'", threadid=arg.id
await arg.chanSendToPool.send(noticeToPool.toJson(typeAnnotations = true).safe)
while true:
info "[threadpool task thread] waiting for message"
let received = $(await arg.chanRecvFromPool.recv())
if received == "shutdown":
info "[threadpool task thread] received 'shutdown'"
info "[threadpool task thread] breaking while loop"
break
let
jsonNode = parseJson(received)
messageType = jsonNode{"$type"}.getStr
info "[threadpool task thread] received task", messageType=messageType
info "[threadpool task thread] initiating task", messageType=messageType,
threadid=arg.id
try:
case messageType
of "StickerPackPurchaseGasEstimate":
let decoded = Json.decode(received, StickerPackPurchaseGasEstimate, allowUnknownFields = true)
decoded.runTask()
else:
error "[threadpool task thread] unknown message", message=received
except Exception as e:
error "[threadpool task thread] exception", error=e.msg
let noticeToPool = ThreadNotification(id: arg.id, notice: "done")
info "[threadpool task thread] sending 'done' notice to pool",
threadid=arg.id
await arg.chanSendToPool.send(noticeToPool.toJson(typeAnnotations = true).safe)
arg.chanRecvFromPool.close()
arg.chanSendToPool.close()
proc taskThread(arg: TaskThreadArg) {.thread.} =
waitFor task(arg)
proc pool(arg: PoolThreadArg) {.async.} =
let
chanSendToMain = arg.chanSendToMain
chanRecvFromMainOrTask = arg.chanRecvFromMain
var threadsBusy = newTable[int, tuple[thr: Thread[TaskThreadArg],
chanSendToTask: AsyncChannel[ThreadSafeString]]]()
var threadsIdle = newSeq[tuple[id: int, thr: Thread[TaskThreadArg],
chanSendToTask: AsyncChannel[ThreadSafeString]]](arg.size)
var taskQueue: seq[string] = @[] # FIFO queue
var allReady = 0
chanSendToMain.open()
chanRecvFromMainOrTask.open()
info "[threadpool] sending 'ready' to main thread"
await chanSendToMain.send("ready".safe)
for i in 0..<arg.size:
let id = i + 1
let chanSendToTask = newAsyncChannel[ThreadSafeString](-1)
chanSendToTask.open()
info "[threadpool] adding to threadsIdle", threadid=id
threadsIdle[i].id = id
createThread(
threadsIdle[i].thr,
taskThread,
TaskThreadArg(id: id, chanRecvFromPool: chanSendToTask,
chanSendToPool: chanRecvFromMainOrTask
)
)
threadsIdle[i].chanSendToTask = chanSendToTask
# when task received and number of busy threads == MaxThreadPoolSize,
# then put the task in a queue
# when task received and number of busy threads < MaxThreadPoolSize, pop
# a thread from threadsIdle, track that thread in threadsBusy, and run
# task in that thread
# if "done" received from a thread, remove thread from threadsBusy, and
# push thread into threadsIdle
while true:
info "[threadpool] waiting for message"
var task = $(await chanRecvFromMainOrTask.recv())
info "[threadpool] received message", msg=task
if task == "shutdown":
info "[threadpool] sending 'shutdown' to all task threads"
for tpl in threadsIdle:
await tpl.chanSendToTask.send("shutdown".safe)
for tpl in threadsBusy.values:
await tpl.chanSendToTask.send("shutdown".safe)
info "[threadpool] breaking while loop"
break
let
jsonNode = parseJson(task)
messageType = jsonNode{"$type"}.getStr
info "[threadpool] determined message type", messageType=messageType
case messageType
of "ThreadNotification":
try:
let notification = Json.decode(task, ThreadNotification, allowUnknownFields = true)
info "[threadpool] received notification",
notice=notification.notice, threadid=notification.id
if notification.notice == "ready":
info "[threadpool] received 'ready' from a task thread"
allReady = allReady + 1
elif notification.notice == "done":
let tpl = threadsBusy[notification.id]
info "[threadpool] adding to threadsIdle",
newlength=(threadsIdle.len + 1)
threadsIdle.add (notification.id, tpl.thr, tpl.chanSendToTask)
info "[threadpool] removing from threadsBusy",
newlength=(threadsBusy.len - 1), threadid=notification.id
threadsBusy.del notification.id
if taskQueue.len > 0:
info "[threadpool] removing from taskQueue",
newlength=(taskQueue.len - 1)
task = taskQueue[0]
taskQueue.delete 0, 0
info "[threadpool] removing from threadsIdle",
newlength=(threadsIdle.len - 1)
let tpl = threadsIdle[0]
threadsIdle.delete 0, 0
info "[threadpool] adding to threadsBusy",
newlength=(threadsBusy.len + 1), threadid=tpl.id
threadsBusy.add tpl.id, (tpl.thr, tpl.chanSendToTask)
await tpl.chanSendToTask.send(task.safe)
else:
error "[threadpool] unknown notification", notice=notification.notice
except Exception as e:
warn "[threadpool] unknown error in thread notification", message=task, error=e.msg
else: # must be a request to do task work
if allReady < arg.size or threadsBusy.len == arg.size:
# add to queue
info "[threadpool] adding to taskQueue",
newlength=(taskQueue.len + 1)
taskQueue.add task
# do we have available threads in the threadpool?
elif threadsBusy.len < arg.size:
# check if we have tasks waiting on queue
if taskQueue.len > 0:
# remove first element from the task queue
info "[threadpool] adding to taskQueue",
newlength=(taskQueue.len + 1)
taskQueue.add task
info "[threadpool] removing from taskQueue",
newlength=(taskQueue.len - 1)
task = taskQueue[0]
taskQueue.delete 0, 0
info "[threadpool] removing from threadsIdle",
newlength=(threadsIdle.len - 1)
let tpl = threadsIdle[0]
threadsIdle.delete 0, 0
info "[threadpool] adding to threadsBusy",
newlength=(threadsBusy.len + 1), threadid=tpl.id
threadsBusy.add tpl.id, (tpl.thr, tpl.chanSendToTask)
await tpl.chanSendToTask.send(task.safe)
var allTaskThreads: seq[Thread[TaskThreadArg]] = @[]
for tpl in threadsIdle:
tpl.chanSendToTask.close()
allTaskThreads.add tpl.thr
for tpl in threadsBusy.values:
tpl.chanSendToTask.close()
allTaskThreads.add tpl.thr
chanSendToMain.close()
chanRecvFromMainOrTask.close()
joinThreads(allTaskThreads)
proc poolThread(arg: PoolThreadArg) {.thread.} =
waitFor pool(arg)

View File

@ -349,3 +349,18 @@ proc getGasPricePredictions*(self: WalletModel): GasPricePrediction =
except Exception as e:
echo "error getting gas price predictions"
echo e.msg
proc getGasPricePredictions2*(): GasPricePrediction =
if status_settings.getCurrentNetwork() != Network.Mainnet:
# TODO: what about other chains like xdai?
return GasPricePrediction(safeLow: 1.0, standard: 2.0, fast: 3.0, fastest: 4.0)
try:
let url: string = fmt"https://etherchain.org/api/gasPriceOracle"
let secureSSLContext = newContext()
let client = newHttpClient(sslContext = secureSSLContext)
client.headers = newHttpHeaders({ "Content-Type": "application/json" })
let response = client.request(url)
result = Json.decode(response.body, GasPricePrediction)
except Exception as e:
echo "error getting gas price predictions"
echo e.msg

2
vendor/DOtherSide vendored

@ -1 +1 @@
Subproject commit 1cc16aaa5c643d0d33c31f9953fbe4a9f6bde151
Subproject commit 5177a129d45985bb61471d7afecd397a82ed9d42

2
vendor/edn.nim vendored

@ -1 +1 @@
Subproject commit 4cda60880e108f0cb7efe1c209308f2db03267d9
Subproject commit 3305e41f9da3f2f21c56bd23b74b0a3589f3bf3e

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 8d9e231a74c1afc76b6745e05020f8d4e33501e7
Subproject commit 70deac9e0d16f7d00a0d4404ed191042dbf079be

1
vendor/nim-task-runner vendored Submodule

@ -0,0 +1 @@
Subproject commit a87f3f85be052fb3332358f95079a059cf1daf15