mirror of
https://github.com/logos-storage/logos-storage-nim-dht.git
synced 2026-05-21 17:19:27 +00:00
Add future callback to prevent unbound growth
This commit is contained in:
parent
02bf4427ed
commit
4635266869
@ -685,7 +685,11 @@ proc lookupWorker(d: Protocol, destNode: Node, target: NodeId, fast: bool):
|
||||
# Attempt to add all nodes discovered
|
||||
for n in result:
|
||||
if d.addNode(n):
|
||||
d.trackedFutures.add(d.removeIfClientMode(n))
|
||||
let fut = d.removeIfClientMode(n)
|
||||
fut.addCallback(proc(data: pointer) =
|
||||
d.trackedFutures.remove(fut)
|
||||
)
|
||||
d.trackedFutures.add(fut)
|
||||
|
||||
proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]] {.async.} =
|
||||
## Perform a lookup for the given target, return the closest n nodes to the
|
||||
@ -971,9 +975,6 @@ proc populateTable*(d: Protocol) {.async.} =
|
||||
total = d.routingTable.len()
|
||||
|
||||
proc revalidateNode*(d: Protocol, n: Node) {.async.} =
|
||||
# Prune completed futures to avoid unbounded growth
|
||||
d.trackedFutures.keepItIf(not it.finished)
|
||||
|
||||
let pong = await d.ping(n)
|
||||
|
||||
if pong.isOk():
|
||||
@ -1252,8 +1253,6 @@ proc closeWait*(d: Protocol) {.async.} =
|
||||
if not d.ipMajorityLoop.isNil:
|
||||
await d.ipMajorityLoop.cancelAndWait()
|
||||
|
||||
for fut in d.trackedFutures:
|
||||
if not fut.finished:
|
||||
await fut.cancelAndWait()
|
||||
d.trackedFutures.cancelTracked()
|
||||
|
||||
await d.transport.closeWait()
|
||||
|
||||
@ -238,7 +238,10 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
|
||||
# We keep adding the node in the line above in order to not break anything.
|
||||
# Then we remove the node if it using client mode.
|
||||
# The operation is async because the check is done over TalkProtocol.
|
||||
t.client.trackedFutures.add(t.client.removeIfClientMode(node))
|
||||
let fut = t.client.removeIfClientMode(node)
|
||||
fut.addCallback(proc(data: pointer) =
|
||||
t.client.trackedFutures.remove(fut))
|
||||
t.client.trackedFutures.add(fut)
|
||||
|
||||
discard t.sendPending(node)
|
||||
else:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user