From 1ffd1cd3dc2145b6922eba465d29d1990e9d4254 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Tue, 15 Sep 2020 09:55:43 +0200 Subject: [PATCH] release callback memory early (#130) * release callback memory early this fixes a memory leak where a deleted callback may keep references alive until the future is finished. In particular, when using helpers like `or` which try to remove themselves from the callback list when a dependent future is completed, create a reference chain between all futures in the expression - in the pathological case where one of the futures is completes only rarely (for example a timeout or a cancellation task), the buildup will be significant. * Removing unnecessary asserts, and place comments instead. --- chronos/asyncfutures2.nim | 73 ++++++++++++++++----------------------- chronos/asyncloop.nim | 38 ++++++++++---------- 2 files changed, 47 insertions(+), 64 deletions(-) diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index 5e812c5..84860c3 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -8,7 +8,7 @@ # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import os, tables, strutils, heapqueue, options, deques, cstrutils +import std/[os, tables, strutils, heapqueue, options, deques, cstrutils, sequtils] import srcloc export srcloc @@ -25,7 +25,7 @@ type FutureBase* = ref object of RootObj ## Untyped future. location*: array[2, ptr SrcLoc] - callbacks: Deque[AsyncCallback] + callbacks: seq[AsyncCallback] cancelcb*: CallbackFunc child*: FutureBase state*: FutureState @@ -142,6 +142,7 @@ template newFutureVar*[T](fromProc: static[string] = ""): auto = proc clean*[T](future: FutureVar[T]) = ## Resets the ``finished`` status of ``future``. Future[T](future).state = FutureState.Pending + Future[T](future).value = default(T) Future[T](future).error = nil proc finished*(future: FutureBase | FutureVar): bool {.inline.} = @@ -201,33 +202,27 @@ proc checkFinished(future: FutureBase, loc: ptr SrcLoc) = else: future.location[LocCompleteIndex] = loc -proc call(callbacks: var Deque[AsyncCallback]) = - var count = len(callbacks) - while count > 0: - var item = callbacks.popFirst() - if not(item.deleted): +proc finish(fut: FutureBase, state: FutureState) = + # We do not perform any checks here, because: + # 1. `finish()` is a private procedure and `state` is under our control. + # 2. `fut.state` is checked by `checkFinished()`. + fut.state = state + fut.cancelcb = nil # release cancellation callback memory + for item in fut.callbacks.mitems(): + if not(isNil(item.function)): callSoon(item.function, item.udata) - dec(count) + item = default(AsyncCallback) # release memory as early as possible + fut.callbacks = default(seq[AsyncCallback]) # release seq as well -proc add(callbacks: var Deque[AsyncCallback], item: AsyncCallback) = - if len(callbacks) == 0: - callbacks = initDeque[AsyncCallback]() - callbacks.addLast(item) - -proc remove(callbacks: var Deque[AsyncCallback], item: AsyncCallback) = - for p in callbacks.mitems(): - if p.function == item.function and p.udata == item.udata: - p.deleted = true + when defined(chronosFutureTracking): + scheduleDestructor(fut) proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) = if not(future.cancelled()): checkFinished(FutureBase(future), loc) doAssert(isNil(future.error)) future.value = val - future.state = FutureState.Finished - future.callbacks.call() - when defined(chronosFutureTracking): - scheduleDestructor(FutureBase(future)) + future.finish(FutureState.Finished) template complete*[T](future: Future[T], val: T) = ## Completes ``future`` with value ``val``. @@ -237,10 +232,7 @@ proc complete(future: Future[void], loc: ptr SrcLoc) = if not(future.cancelled()): checkFinished(FutureBase(future), loc) doAssert(isNil(future.error)) - future.state = FutureState.Finished - future.callbacks.call() - when defined(chronosFutureTracking): - scheduleDestructor(FutureBase(future)) + future.finish(FutureState.Finished) template complete*(future: Future[void]) = ## Completes a void ``future``. @@ -251,10 +243,7 @@ proc complete[T](future: FutureVar[T], loc: ptr SrcLoc) = template fut: untyped = Future[T](future) checkFinished(FutureBase(fut), loc) doAssert(isNil(fut.error)) - fut.state = FutureState.Finished - fut.callbacks.call() - when defined(chronosFutureTracking): - scheduleDestructor(FutureBase(future)) + fut.finish(FutureState.Finished) template complete*[T](futvar: FutureVar[T]) = ## Completes a ``FutureVar``. @@ -265,11 +254,8 @@ proc complete[T](futvar: FutureVar[T], val: T, loc: ptr SrcLoc) = template fut: untyped = Future[T](futvar) checkFinished(FutureBase(fut), loc) doAssert(isNil(fut.error)) - fut.state = FutureState.Finished fut.value = val - fut.callbacks.call() - when defined(chronosFutureTracking): - scheduleDestructor(FutureBase(fut)) + fut.finish(FutureState.Finished) template complete*[T](futvar: FutureVar[T], val: T) = ## Completes a ``FutureVar`` with value ``val``. @@ -280,16 +266,13 @@ template complete*[T](futvar: FutureVar[T], val: T) = proc fail[T](future: Future[T], error: ref Exception, loc: ptr SrcLoc) = if not(future.cancelled()): checkFinished(FutureBase(future), loc) - future.state = FutureState.Failed future.error = error when defined(chronosStackTrace): future.errorStackTrace = if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error) - future.callbacks.call() - when defined(chronosFutureTracking): - scheduleDestructor(FutureBase(future)) + future.finish(FutureState.Failed) template fail*[T](future: Future[T], error: ref Exception) = ## Completes ``future`` with ``error``. @@ -301,13 +284,10 @@ template newCancelledError(): ref CancelledError = proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) = if not(future.finished()): checkFinished(future, loc) - future.state = FutureState.Cancelled future.error = newCancelledError() when defined(chronosStackTrace): future.errorStackTrace = getStackTrace() - future.callbacks.call() - when defined(chronosFutureTracking): - scheduleDestructor(future) + future.finish(FutureState.Cancelled) template cancelAndSchedule*[T](future: Future[T]) = cancelAndSchedule(FutureBase(future), getSrcLocation()) @@ -324,6 +304,7 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc) = else: if not(isNil(future.cancelcb)): future.cancelcb(cast[pointer](future)) + future.cancelcb = nil cancelAndSchedule(future, getSrcLocation()) template cancel*[T](future: Future[T]) = @@ -331,7 +312,7 @@ template cancel*[T](future: Future[T]) = cancel(FutureBase(future), getSrcLocation()) proc clearCallbacks(future: FutureBase) = - future.callbacks.clear() + future.callbacks = default(seq[AsyncCallback]) proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = ## Adds the callbacks proc to be called when the future completes. @@ -352,9 +333,13 @@ proc addCallback*[T](future: Future[T], cb: CallbackFunc) = proc removeCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = + ## Remove future from list of callbacks - this operation may be slow if there + ## are many registered callbacks! doAssert(not isNil(cb)) - let acb = AsyncCallback(function: cb, udata: udata) - future.callbacks.remove acb + # Make sure to release memory associated with callback, or reference chains + # may be created! + future.callbacks.keepItIf: + it.function != cb or it.udata != udata proc removeCallback*[T](future: Future[T], cb: CallbackFunc) = future.removeCallback(cb, cast[pointer](future)) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 84685ab..a1e9f94 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -183,7 +183,6 @@ type AsyncCallback* = object function*: CallbackFunc udata*: pointer - deleted*: bool AsyncError* = object of CatchableError ## Generic async exception @@ -193,7 +192,6 @@ type TimerCallback* = ref object finishAt*: Moment function*: AsyncCallback - deleted*: bool TrackerBase* = ref object of RootRef id*: string @@ -231,7 +229,7 @@ func getAsyncTimestamp*(a: Duration): auto {.inline.} = template processTimersGetTimeout(loop, timeout: untyped) = var lastFinish = curTime while loop.timers.len > 0: - if loop.timers[0].deleted: + if loop.timers[0].function.function.isNil: discard loop.timers.pop() continue @@ -256,7 +254,7 @@ template processTimersGetTimeout(loop, timeout: untyped) = template processTimers(loop: untyped) = var curTime = Moment.now() while loop.timers.len > 0: - if loop.timers[0].deleted: + if loop.timers[0].function.function.isNil: discard loop.timers.pop() continue @@ -581,7 +579,7 @@ elif unixPlatform: var newEvents: set[Event] withData(loop.selector, int(fd), adata) do: # We need to clear `reader` data, because `selectors` don't do it - adata.reader.function = nil + adata.reader = default(AsyncCallback) # adata.rdata = CompletionData() if not(isNil(adata.writer.function)): newEvents.incl(Event.Write) @@ -611,7 +609,7 @@ elif unixPlatform: var newEvents: set[Event] withData(loop.selector, int(fd), adata) do: # We need to clear `writer` data, because `selectors` don't do it - adata.writer.function = nil + adata.writer = default(AsyncCallback) # adata.wdata = CompletionData() if not(isNil(adata.reader.function)): newEvents.incl(Event.Read) @@ -638,16 +636,16 @@ elif unixPlatform: withData(loop.selector, int(fd), adata) do: # We are scheduling reader and writer callbacks to be called # explicitly, so they can get an error and continue work. - if not(isNil(adata.reader.function)): - if not adata.reader.deleted: - loop.callbacks.addLast(adata.reader) - if not(isNil(adata.writer.function)): - if not adata.writer.deleted: - loop.callbacks.addLast(adata.writer) - # Mark callbacks as deleted, we don't need to get REAL notifications + # Callbacks marked as deleted so we don't need to get REAL notifications # from system queue for this reader and writer. - adata.reader.deleted = true - adata.writer.deleted = true + + if not(isNil(adata.reader.function)): + loop.callbacks.addLast(adata.reader) + adata.reader = default(AsyncCallback) + + if not(isNil(adata.writer.function)): + loop.callbacks.addLast(adata.writer) + adata.writer = default(AsyncCallback) # We can't unregister file descriptor from system queue here, because # in such case processing queue will stuck on poll() call, because there @@ -707,20 +705,20 @@ elif unixPlatform: withData(loop.selector, fd, adata) do: if Event.Read in events or events == {Event.Error}: - if not adata.reader.deleted: + if not isNil(adata.reader.function): loop.callbacks.addLast(adata.reader) if Event.Write in events or events == {Event.Error}: - if not adata.writer.deleted: + if not isNil(adata.writer.function): loop.callbacks.addLast(adata.writer) if Event.User in events: - if not adata.reader.deleted: + if not isNil(adata.reader.function): loop.callbacks.addLast(adata.reader) when ioselSupportedPlatform: if customSet * events != {}: - if not adata.reader.deleted: + if not isNil(adata.reader.function): loop.callbacks.addLast(adata.reader) # Moving expired timers to `loop.callbacks`. @@ -744,7 +742,7 @@ proc setTimer*(at: Moment, cb: CallbackFunc, loop.timers.push(result) proc clearTimer*(timer: TimerCallback) {.inline.} = - timer.deleted = true + timer.function = default(AsyncCallback) proc addTimer*(at: Moment, cb: CallbackFunc, udata: pointer = nil) {. inline, deprecated: "Use setTimer/clearTimer instead".} =