fix: interrupt running sticker task when exiting

Fixes: #2318.

Currently, when exiting the app and the sticker packs task is being run, the app will wait for the sticker packs task to completely finish before exiting the app. This causes a beachball to be displayed and can cause a long delay while waiting for the task to finish.

This PR passes an interrupt signal to the sticker pack loading task in the threadpool thread. The task loads the interrupt on each iteration of its loop (each sticker pack to load) and also during the processing of each sticker pack (getting of sticker pack data). This allows the application to exit neatly.

fix: Handle shutdown of long-running (marathon) task before login
Currently, we wait for the “loggedIn” message to be passed to the marathon task runner before initialising the mailserver model. We were, however, blocking the thread until “loggedIn” was received, meaning that if “shutdown” was received (as it is when cmd+q is pressed), this message would be ignored and the application would have to be forced to quit.

This PR adds logic that not only listens for the “loggedIn” message, but also for the “shudown” message. Once the “shutdown” message is received, the marathon worker thread exits.
This commit is contained in:
Eric Mastro 2021-05-27 16:12:12 +10:00 committed by Eric Mastro
parent fb47d73e79
commit c415f3b989
7 changed files with 85 additions and 47 deletions

View File

@ -1,10 +1,14 @@
import NimQml, tables, json, chronicles, sets, strutils import # std libs
import ../../../status/[status, stickers] atomics, json, sets, strutils, tables
import ../../../status/libstatus/[types, utils]
import ../../../status/libstatus/stickers as status_stickers import # vendor libs
import ../../../status/libstatus/wallet as status_wallet chronicles, NimQml
import sticker_pack_list, sticker_list, chat_item
import ../../../status/tasks/[qt, task_runner_impl] import # status-desktop libs
../../../status/[status, stickers], ../../../status/libstatus/[types, utils],
../../../status/libstatus/stickers as status_stickers,
../../../status/libstatus/wallet as status_wallet, sticker_pack_list,
sticker_list, chat_item, ../../../status/tasks/[qt, task_runner_impl]
logScope: logScope:
topics = "stickers-view" topics = "stickers-view"
@ -16,6 +20,7 @@ type
price: string price: string
uuid: string uuid: string
ObtainAvailableStickerPacksTaskArg = ref object of QObjectTaskArg ObtainAvailableStickerPacksTaskArg = ref object of QObjectTaskArg
running*: ByteAddress # pointer to threadpool's `.running` Atomic[bool]
# The pragmas `{.gcsafe, nimcall.}` in this context do not force the compiler # The pragmas `{.gcsafe, nimcall.}` in this context do not force the compiler
# to accept unsafe code, rather they work in conjunction with the proc # to accept unsafe code, rather they work in conjunction with the proc
@ -49,7 +54,8 @@ proc estimate[T](self: T, slot: string, packId: int, address: string, price: str
const obtainAvailableStickerPacksTask: Task = proc(argEncoded: string) {.gcsafe, nimcall.} = const obtainAvailableStickerPacksTask: Task = proc(argEncoded: string) {.gcsafe, nimcall.} =
let arg = decode[ObtainAvailableStickerPacksTaskArg](argEncoded) let arg = decode[ObtainAvailableStickerPacksTaskArg](argEncoded)
let availableStickerPacks = status_stickers.getAvailableStickerPacks() var running = cast[ptr Atomic[bool]](arg.running)
let availableStickerPacks = status_stickers.getAvailableStickerPacks(running[])
var packs: seq[StickerPack] = @[] var packs: seq[StickerPack] = @[]
for packId, stickerPack in availableStickerPacks.pairs: for packId, stickerPack in availableStickerPacks.pairs:
packs.add(stickerPack) packs.add(stickerPack)
@ -59,7 +65,9 @@ proc obtainAvailableStickerPacks[T](self: T, slot: string) =
let arg = ObtainAvailableStickerPacksTaskArg( let arg = ObtainAvailableStickerPacksTaskArg(
tptr: cast[ByteAddress](obtainAvailableStickerPacksTask), tptr: cast[ByteAddress](obtainAvailableStickerPacksTask),
vptr: cast[ByteAddress](self.vptr), vptr: cast[ByteAddress](self.vptr),
slot: slot) slot: slot,
running: cast[ByteAddress](addr self.status.tasks.threadpool.running)
)
self.status.tasks.threadpool.start(arg) self.status.tasks.threadpool.start(arg)
QtObject: QtObject:

View File

@ -1,17 +1,20 @@
import NimQml, Tables, strformat, strutils, chronicles, sequtils, json, std/wrapnils, parseUtils, stint, tables import # std libs
import ../../status/[status, wallet] atomics, strformat, strutils, sequtils, json, std/wrapnils, parseUtils, tables
import ../../status/wallet/collectibles as status_collectibles
import ../../status/libstatus/accounts/constants import # vendor libs
import ../../status/libstatus/wallet as status_wallet NimQml, chronicles, stint
import ../../status/libstatus/settings as status_settings
import ../../status/libstatus/tokens import # status-desktop libs
import ../../status/libstatus/types ../../status/[status, wallet],
import ../../status/libstatus/utils as status_utils ../../status/wallet/collectibles as status_collectibles,
import ../../status/libstatus/eth/contracts ../../status/libstatus/accounts/constants,
import ../../status/ens as status_ens ../../status/libstatus/wallet as status_wallet,
import views/[asset_list, account_list, account_item, token_list, transaction_list, collectibles_list] ../../status/libstatus/settings as status_settings,
import ../../status/tasks/[qt, task_runner_impl] ../../status/libstatus/tokens, ../../status/libstatus/types,
import ../../status/signals/types as signal_types ../../status/libstatus/utils as status_utils,
../../status/libstatus/eth/contracts, ../../status/ens as status_ens,
views/[asset_list, account_list, account_item, token_list, transaction_list, collectibles_list],
../../status/tasks/[qt, task_runner_impl], ../../status/signals/types as signal_types
type type
SendTransactionTaskArg = ref object of QObjectTaskArg SendTransactionTaskArg = ref object of QObjectTaskArg
@ -29,6 +32,7 @@ type
LoadCollectiblesTaskArg = ref object of QObjectTaskArg LoadCollectiblesTaskArg = ref object of QObjectTaskArg
address: string address: string
collectiblesType: string collectiblesType: string
running*: ByteAddress # pointer to threadpool's `.running` Atomic[bool]
GasPredictionsTaskArg = ref object of QObjectTaskArg GasPredictionsTaskArg = ref object of QObjectTaskArg
LoadTransactionsTaskArg = ref object of QObjectTaskArg LoadTransactionsTaskArg = ref object of QObjectTaskArg
address: string address: string
@ -91,6 +95,7 @@ proc initBalances[T](self: T, slot: string, address: string, tokenList: seq[stri
const loadCollectiblesTask: Task = proc(argEncoded: string) {.gcsafe, nimcall.} = const loadCollectiblesTask: Task = proc(argEncoded: string) {.gcsafe, nimcall.} =
let arg = decode[LoadCollectiblesTaskArg](argEncoded) let arg = decode[LoadCollectiblesTaskArg](argEncoded)
var running = cast[ptr Atomic[bool]](arg.running)
var collectiblesOrError = "" var collectiblesOrError = ""
case arg.collectiblesType: case arg.collectiblesType:
of status_collectibles.CRYPTOKITTY: of status_collectibles.CRYPTOKITTY:
@ -100,7 +105,7 @@ const loadCollectiblesTask: Task = proc(argEncoded: string) {.gcsafe, nimcall.}
of status_collectibles.ETHERMON: of status_collectibles.ETHERMON:
collectiblesOrError = status_collectibles.getEthermons(arg.address) collectiblesOrError = status_collectibles.getEthermons(arg.address)
of status_collectibles.STICKER: of status_collectibles.STICKER:
collectiblesOrError = status_collectibles.getStickers(arg.address) collectiblesOrError = status_collectibles.getStickers(arg.address, running[])
let output = %*{ let output = %*{
"address": arg.address, "address": arg.address,
@ -116,6 +121,7 @@ proc loadCollectibles[T](self: T, slot: string, address: string, collectiblesTyp
slot: slot, slot: slot,
address: address, address: address,
collectiblesType: collectiblesType, collectiblesType: collectiblesType,
running: cast[ByteAddress](addr self.status.tasks.threadpool.running)
) )
self.status.tasks.threadpool.start(arg) self.status.tasks.threadpool.start(arg)

View File

@ -1,10 +1,15 @@
import ./core as status, ./types, ./eth/contracts, ./settings, ./edn_helpers import # std libs
import atomics, json, tables, sequtils, httpclient, net
json, json_serialization, tables, chronicles, sequtils, httpclient, net,
stint, libp2p/[multihash, multicodec, cid], web3/[ethtypes, conversions]
from strutils import parseHexInt, parseInt from strutils import parseHexInt, parseInt
import # vendor libs
json_serialization, chronicles, libp2p/[multihash, multicodec, cid], stint,
web3/[ethtypes, conversions]
from nimcrypto import fromHex from nimcrypto import fromHex
import # status-desktop libs
./core as status, ./types, ./eth/contracts, ./settings, ./edn_helpers
proc decodeContentHash*(value: string): string = proc decodeContentHash*(value: string): string =
if value == "": if value == "":
return "" return ""
@ -95,7 +100,7 @@ proc getPackCount*(): int =
result = parseHexInt(response.result) result = parseHexInt(response.result)
# Gets sticker pack data # Gets sticker pack data
proc getPackData*(id: Stuint[256]): StickerPack = proc getPackData*(id: Stuint[256], running: var Atomic[bool]): StickerPack =
let let
contract = contracts.getContract("stickers") contract = contracts.getContract("stickers")
contractMethod = contract.methods["getPackData"] contractMethod = contract.methods["getPackData"]
@ -111,6 +116,10 @@ proc getPackData*(id: Stuint[256]): StickerPack =
let packData = contracts.decodeContractResponse[PackData](response.result) let packData = contracts.decodeContractResponse[PackData](response.result)
if not running.load():
trace "Sticker pack task interrupted, exiting sticker pack loading"
return
# contract response includes a contenthash, which needs to be decoded to reveal # contract response includes a contenthash, which needs to be decoded to reveal
# an IPFS identifier. Once decoded, download the content from IPFS. This content # an IPFS identifier. Once decoded, download the content from IPFS. This content
# is in EDN format, ie https://ipfs.infura.io/ipfs/QmWVVLwVKCwkVNjYJrRzQWREVvEk917PhbHYAUhA1gECTM # is in EDN format, ie https://ipfs.infura.io/ipfs/QmWVVLwVKCwkVNjYJrRzQWREVvEk917PhbHYAUhA1gECTM
@ -200,13 +209,16 @@ proc getRecentStickers*(): seq[Sticker] =
# inserting recent stickers at the front of the list # inserting recent stickers at the front of the list
result.insert(Sticker(hash: $hash, packId: packId), 0) result.insert(Sticker(hash: $hash, packId: packId), 0)
proc getAvailableStickerPacks*(): Table[int, StickerPack] = proc getAvailableStickerPacks*(running: var Atomic[bool]): Table[int, StickerPack] =
var availableStickerPacks = initTable[int, StickerPack]() var availableStickerPacks = initTable[int, StickerPack]()
try: try:
let numPacks = getPackCount() let numPacks = getPackCount()
for i in 0..<numPacks: for i in 0..<numPacks:
if not running.load():
trace "Sticker pack task interrupted, exiting sticker pack loading"
break
try: try:
let stickerPack = getPackData(i.u256) let stickerPack = getPackData(i.u256, running)
availableStickerPacks[stickerPack.id] = stickerPack availableStickerPacks[stickerPack.id] = stickerPack
except: except:
continue continue

View File

@ -1,5 +1,5 @@
import # global deps import # global deps
tables, strutils, sequtils atomics, sequtils, strutils, tables
import # project deps import # project deps
chronicles, web3/[ethtypes, conversions], stint chronicles, web3/[ethtypes, conversions], stint
@ -107,7 +107,7 @@ proc getInstalledStickerPacks*(self: StickersModel): Table[int, StickerPack] =
self.installedStickerPacks = status_stickers.getInstalledStickerPacks() self.installedStickerPacks = status_stickers.getInstalledStickerPacks()
result = self.installedStickerPacks result = self.installedStickerPacks
proc getAvailableStickerPacks*(): Table[int, StickerPack] = status_stickers.getAvailableStickerPacks() proc getAvailableStickerPacks*(running: var Atomic[bool]): Table[int, StickerPack] = status_stickers.getAvailableStickerPacks(running)
proc getRecentStickers*(self: StickersModel): seq[Sticker] = proc getRecentStickers*(self: StickersModel): seq[Sticker] =
result = status_stickers.getRecentStickers() result = status_stickers.getRecentStickers()

View File

@ -122,14 +122,19 @@ proc worker(arg: WorkerThreadArg) {.async, gcsafe, nimcall.} =
let mailserverModel = newMailserverModel(arg.vptr) let mailserverModel = newMailserverModel(arg.vptr)
var unprocessedMsgs: seq[string] = @[] var unprocessedMsgs: seq[string] = @[]
# wait for "loggedIn" before initing mailserverModel and continuing
while true: while true:
let received = $(await chanRecvFromMain.recv()) let received = $(await chanRecvFromMain.recv())
if received == "loggedIn": if received == "loggedIn":
mailserverModel.init()
break break
elif received == "shutdown":
trace "received 'shutdown'"
trace "stopping worker"
return
else: else:
unprocessedMsgs.add received unprocessedMsgs.add received
mailserverModel.init()
discard mailserverModel.checkConnection() discard mailserverModel.checkConnection()
for msg in unprocessedMsgs.items: for msg in unprocessedMsgs.items:

View File

@ -1,5 +1,5 @@
import # std libs import # std libs
json, sequtils, tables atomics, json, sequtils, tables
import # vendor libs import # vendor libs
chronicles, chronos, json_serialization, task_runner chronicles, chronos, json_serialization, task_runner
@ -19,6 +19,7 @@ type
chanSendToPool: AsyncChannel[ThreadSafeString] chanSendToPool: AsyncChannel[ThreadSafeString]
thread: Thread[PoolThreadArg] thread: Thread[PoolThreadArg]
size: int size: int
running*: Atomic[bool]
PoolThreadArg = object PoolThreadArg = object
chanSendToMain: AsyncChannel[ThreadSafeString] chanSendToMain: AsyncChannel[ThreadSafeString]
chanRecvFromMain: AsyncChannel[ThreadSafeString] chanRecvFromMain: AsyncChannel[ThreadSafeString]
@ -42,6 +43,7 @@ proc newThreadPool*(size: int = MaxThreadPoolSize): ThreadPool =
result.chanSendToPool = newAsyncChannel[ThreadSafeString](-1) result.chanSendToPool = newAsyncChannel[ThreadSafeString](-1)
result.thread = Thread[PoolThreadArg]() result.thread = Thread[PoolThreadArg]()
result.size = size result.size = size
result.running.store(false)
proc init*(self: ThreadPool) = proc init*(self: ThreadPool) =
self.chanRecvFromPool.open() self.chanRecvFromPool.open()
@ -56,6 +58,7 @@ proc init*(self: ThreadPool) =
discard $(self.chanRecvFromPool.recvSync()) discard $(self.chanRecvFromPool.recvSync())
proc teardown*(self: ThreadPool) = proc teardown*(self: ThreadPool) =
self.running.store(false)
self.chanSendToPool.sendSync("shutdown".safe) self.chanSendToPool.sendSync("shutdown".safe)
self.chanRecvFromPool.close() self.chanRecvFromPool.close()
self.chanSendToPool.close() self.chanSendToPool.close()
@ -64,6 +67,7 @@ proc teardown*(self: ThreadPool) =
proc start*[T: TaskArg](self: Threadpool, arg: T) = proc start*[T: TaskArg](self: Threadpool, arg: T) =
self.chanSendToPool.sendSync(arg.encode.safe) self.chanSendToPool.sendSync(arg.encode.safe)
self.running.store(true)
proc runner(arg: TaskThreadArg) {.async.} = proc runner(arg: TaskThreadArg) {.async.} =
arg.chanRecvFromPool.open() arg.chanRecvFromPool.open()

View File

@ -1,11 +1,14 @@
import strformat, httpclient, json, chronicles, sequtils, strutils, tables, sugar, net import # std libs
import ../libstatus/core as status atomics, strformat, httpclient, json, chronicles, sequtils, strutils, tables,
import ../libstatus/eth/contracts as contracts sugar, net
import ../libstatus/stickers as status_stickers
import ../libstatus/types import # vendor libs
import web3/[conversions, ethtypes], stint stint
import ../libstatus/utils
import account import # status-desktop libs
../libstatus/core as status, ../libstatus/eth/contracts as contracts,
../libstatus/stickers as status_stickers, ../libstatus/types,
web3/[conversions, ethtypes], ../libstatus/utils, account
const CRYPTOKITTY* = "cryptokitty" const CRYPTOKITTY* = "cryptokitty"
const KUDO* = "kudo" const KUDO* = "kudo"
@ -198,7 +201,7 @@ proc getKudos*(address: string): string =
let eth_address = parseAddress(address) let eth_address = parseAddress(address)
result = getKudos(eth_address) result = getKudos(eth_address)
proc getStickers*(address: Address): string = proc getStickers*(address: Address, running: var Atomic[bool]): string =
try: try:
var stickers: seq[Collectible] var stickers: seq[Collectible]
stickers = @[] stickers = @[]
@ -215,7 +218,7 @@ proc getStickers*(address: Address): string =
if (purchasedStickerPacks.len == 0): if (purchasedStickerPacks.len == 0):
return $(%*stickers) return $(%*stickers)
# TODO find a way to keep those in memory so as not to reload it each time # TODO find a way to keep those in memory so as not to reload it each time
let availableStickerPacks = getAvailableStickerPacks() let availableStickerPacks = getAvailableStickerPacks(running)
var index = 0 var index = 0
for stickerId in purchasedStickerPacks: for stickerId in purchasedStickerPacks:
@ -234,6 +237,6 @@ proc getStickers*(address: Address): string =
error "Error getting Stickers", msg = e.msg error "Error getting Stickers", msg = e.msg
result = e.msg result = e.msg
proc getStickers*(address: string): string = proc getStickers*(address: string, running: var Atomic[bool]): string =
let eth_address = parseAddress(address) let eth_address = parseAddress(address)
result = getStickers(eth_address) result = getStickers(eth_address, running)