cleanup and avoid lockups (#85)

This commit is contained in:
Dmitriy Ryajov 2023-09-20 10:20:26 -06:00 committed by GitHub
parent ed7caa119d
commit ee5d8acb05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -48,15 +48,14 @@ proc cleanupExpired*(
keys = newSeq[Key]() keys = newSeq[Key]()
for item in iter: for item in iter:
if pair =? (await item) and pair.key.isSome: if (key, data) =? (await item) and key.isSome:
let let
(key, data) = (pair.key.get(), pair.data)
expired = Moment.init(uint64.fromBytesBE(data).int64, Microsecond) expired = Moment.init(uint64.fromBytesBE(data).int64, Microsecond)
if now >= expired: if now >= expired:
trace "Found expired record", key trace "Found expired record", key
keys.add(key) keys.add(key.get)
without pairs =? key.fromCidKey(), err: without pairs =? key.get.fromCidKey(), err:
trace "Error extracting parts from cid key", key trace "Error extracting parts from cid key", key
continue continue
@ -74,7 +73,7 @@ proc cleanupOrphaned*(
trace "Cleaning up orphaned records" trace "Cleaning up orphaned records"
let let
providersQuery = Query.init(ProvidersKey, limit = batchSize) providersQuery = Query.init(ProvidersKey, limit = batchSize, value = false)
block: block:
without iter =? (await store.query(providersQuery)), err: without iter =? (await store.query(providersQuery)), err:
@ -83,7 +82,7 @@ proc cleanupOrphaned*(
defer: defer:
if not isNil(iter): if not isNil(iter):
trace "Cleaning up query iterator" trace "Cleaning up orphaned query iterator"
discard (await iter.dispose()) discard (await iter.dispose())
var count = 0 var count = 0
@ -92,27 +91,26 @@ proc cleanupOrphaned*(
trace "Batch cleaned up", size = batchSize trace "Batch cleaned up", size = batchSize
count.inc count.inc
if pair =? (await item) and pair.key.isSome: if (key, _) =? (await item) and key.isSome:
let without peerId =? key.get.fromProvKey(), err:
key = pair.key.get() trace "Error extracting parts from cid key", key = key.get
without peerId =? key.fromProvKey(), err:
trace "Error extracting parts from cid key", key
continue continue
without cidKey =? (CidKey / "*" / $peerId), err: without cidKey =? (CidKey / "*" / $peerId), err:
trace "Error building cid key", err = err.msg trace "Error building cid key", err = err.msg
continue continue
without cidIter =? (await store.query(Query.init(cidKey, limit = 1))), err: without cidIter =? (await store.query(Query.init(cidKey, limit = 1, value = false))), err:
trace "Error querying key", cidKey trace "Error querying key", cidKey, err = err.msg
continue continue
let let
res = (await allFinished(toSeq(cidIter))) res = block:
.filterIt( it.completed ) var count = 0
.mapIt( it.read.get ) for item in cidIter:
.filterIt( it.key.isSome ).len if (key, _) =? (await item) and key.isSome:
count.inc
count
if not isNil(cidIter): if not isNil(cidIter):
trace "Disposing cid iter" trace "Disposing cid iter"
@ -122,7 +120,7 @@ proc cleanupOrphaned*(
trace "Peer not orphaned, skipping", peerId trace "Peer not orphaned, skipping", peerId
continue continue
if err =? (await store.delete(key)).errorOption: if err =? (await store.delete(key.get)).errorOption:
trace "Error deleting orphaned peer", err = err.msg trace "Error deleting orphaned peer", err = err.msg
continue continue