From ee5d8acb05780aa9f591303fc9a10ea0f6a7bbe9 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 20 Sep 2023 10:20:26 -0600 Subject: [PATCH] cleanup and avoid lockups (#85) --- .../p2p/discoveryv5/providers/maintenance.nim | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/providers/maintenance.nim b/codexdht/private/eth/p2p/discoveryv5/providers/maintenance.nim index 92ca75d..713029e 100644 --- a/codexdht/private/eth/p2p/discoveryv5/providers/maintenance.nim +++ b/codexdht/private/eth/p2p/discoveryv5/providers/maintenance.nim @@ -48,15 +48,14 @@ proc cleanupExpired*( keys = newSeq[Key]() for item in iter: - if pair =? (await item) and pair.key.isSome: + if (key, data) =? (await item) and key.isSome: let - (key, data) = (pair.key.get(), pair.data) expired = Moment.init(uint64.fromBytesBE(data).int64, Microsecond) if now >= expired: trace "Found expired record", key - keys.add(key) - without pairs =? key.fromCidKey(), err: + keys.add(key.get) + without pairs =? key.get.fromCidKey(), err: trace "Error extracting parts from cid key", key continue @@ -74,7 +73,7 @@ proc cleanupOrphaned*( trace "Cleaning up orphaned records" let - providersQuery = Query.init(ProvidersKey, limit = batchSize) + providersQuery = Query.init(ProvidersKey, limit = batchSize, value = false) block: without iter =? (await store.query(providersQuery)), err: @@ -83,7 +82,7 @@ proc cleanupOrphaned*( defer: if not isNil(iter): - trace "Cleaning up query iterator" + trace "Cleaning up orphaned query iterator" discard (await iter.dispose()) var count = 0 @@ -92,27 +91,26 @@ proc cleanupOrphaned*( trace "Batch cleaned up", size = batchSize count.inc - if pair =? (await item) and pair.key.isSome: - let - key = pair.key.get() - - without peerId =? key.fromProvKey(), err: - trace "Error extracting parts from cid key", key + if (key, _) =? (await item) and key.isSome: + without peerId =? key.get.fromProvKey(), err: + trace "Error extracting parts from cid key", key = key.get continue without cidKey =? (CidKey / "*" / $peerId), err: trace "Error building cid key", err = err.msg continue - without cidIter =? (await store.query(Query.init(cidKey, limit = 1))), err: - trace "Error querying key", cidKey + without cidIter =? (await store.query(Query.init(cidKey, limit = 1, value = false))), err: + trace "Error querying key", cidKey, err = err.msg continue let - res = (await allFinished(toSeq(cidIter))) - .filterIt( it.completed ) - .mapIt( it.read.get ) - .filterIt( it.key.isSome ).len + res = block: + var count = 0 + for item in cidIter: + if (key, _) =? (await item) and key.isSome: + count.inc + count if not isNil(cidIter): trace "Disposing cid iter" @@ -122,7 +120,7 @@ proc cleanupOrphaned*( trace "Peer not orphaned, skipping", peerId continue - if err =? (await store.delete(key)).errorOption: + if err =? (await store.delete(key.get)).errorOption: trace "Error deleting orphaned peer", err = err.msg continue