mirror of
https://github.com/status-im/nim-chronos.git
synced 2025-03-03 04:10:38 +00:00
futures: sinkify and lentify
this avoids copies here and there throughout the pipeline - ie `copyString` and friends can often be avoided when moving things into and out of futures annoyingly, one has to sprinkle the codebase liberally with `sink` and `move` for the pipeline to work well - sink stuff _generally_ works better in orc/arc
This commit is contained in:
parent
1306170255
commit
686e4a3d26
@ -202,14 +202,14 @@ proc finish(fut: FutureBase, state: FutureState) =
|
|||||||
when chronosFutureTracking:
|
when chronosFutureTracking:
|
||||||
scheduleDestructor(fut)
|
scheduleDestructor(fut)
|
||||||
|
|
||||||
proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) =
|
proc complete[T](future: Future[T], val: sink T, loc: ptr SrcLoc) =
|
||||||
if not(future.cancelled()):
|
if not(future.cancelled()):
|
||||||
checkFinished(future, loc)
|
checkFinished(future, loc)
|
||||||
doAssert(isNil(future.internalError))
|
doAssert(isNil(future.internalError))
|
||||||
future.internalValue = val
|
future.internalValue = move(val)
|
||||||
future.finish(FutureState.Completed)
|
future.finish(FutureState.Completed)
|
||||||
|
|
||||||
template complete*[T](future: Future[T], val: T) =
|
template complete*[T](future: Future[T], val: sink T) =
|
||||||
## Completes ``future`` with value ``val``.
|
## Completes ``future`` with value ``val``.
|
||||||
complete(future, val, getSrcLocation())
|
complete(future, val, getSrcLocation())
|
||||||
|
|
||||||
|
@ -157,7 +157,7 @@ proc wrapInTryFinally(
|
|||||||
newCall(ident "complete", fut)
|
newCall(ident "complete", fut)
|
||||||
),
|
),
|
||||||
nnkElseExpr.newTree(
|
nnkElseExpr.newTree(
|
||||||
newCall(ident "complete", fut, ident "result")
|
newCall(ident "complete", fut, newCall(ident "move", ident "result"))
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -755,7 +755,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink seq[byte],
|
|||||||
if isNil(wstream.wsource):
|
if isNil(wstream.wsource):
|
||||||
var res: int
|
var res: int
|
||||||
try:
|
try:
|
||||||
res = await write(wstream.tsource, sbytes, length)
|
res = await write(wstream.tsource, move(sbytes), length)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
raise exc
|
raise exc
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
@ -765,17 +765,11 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink seq[byte],
|
|||||||
wstream.bytesCount = wstream.bytesCount + uint64(length)
|
wstream.bytesCount = wstream.bytesCount + uint64(length)
|
||||||
else:
|
else:
|
||||||
if isNil(wstream.writerLoop):
|
if isNil(wstream.writerLoop):
|
||||||
await write(wstream.wsource, sbytes, length)
|
await write(wstream.wsource, move(sbytes), length)
|
||||||
wstream.bytesCount = wstream.bytesCount + uint64(length)
|
wstream.bytesCount = wstream.bytesCount + uint64(length)
|
||||||
else:
|
else:
|
||||||
var item = WriteItem(kind: Sequence)
|
var item = WriteItem(kind: Sequence)
|
||||||
when declared(shallowCopy):
|
item.dataSeq = move(sbytes)
|
||||||
if not(isLiteral(sbytes)):
|
|
||||||
shallowCopy(item.dataSeq, sbytes)
|
|
||||||
else:
|
|
||||||
item.dataSeq = sbytes
|
|
||||||
else:
|
|
||||||
item.dataSeq = sbytes
|
|
||||||
item.size = length
|
item.size = length
|
||||||
item.future = newFuture[void]("async.stream.write(seq)")
|
item.future = newFuture[void]("async.stream.write(seq)")
|
||||||
try:
|
try:
|
||||||
@ -808,7 +802,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink string,
|
|||||||
if isNil(wstream.wsource):
|
if isNil(wstream.wsource):
|
||||||
var res: int
|
var res: int
|
||||||
try:
|
try:
|
||||||
res = await write(wstream.tsource, sbytes, length)
|
res = await write(wstream.tsource, move(sbytes), length)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
raise exc
|
raise exc
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
@ -818,17 +812,11 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink string,
|
|||||||
wstream.bytesCount = wstream.bytesCount + uint64(length)
|
wstream.bytesCount = wstream.bytesCount + uint64(length)
|
||||||
else:
|
else:
|
||||||
if isNil(wstream.writerLoop):
|
if isNil(wstream.writerLoop):
|
||||||
await write(wstream.wsource, sbytes, length)
|
await write(wstream.wsource, move(sbytes), length)
|
||||||
wstream.bytesCount = wstream.bytesCount + uint64(length)
|
wstream.bytesCount = wstream.bytesCount + uint64(length)
|
||||||
else:
|
else:
|
||||||
var item = WriteItem(kind: String)
|
var item = WriteItem(kind: String)
|
||||||
when declared(shallowCopy):
|
item.dataStr = move(sbytes)
|
||||||
if not(isLiteral(sbytes)):
|
|
||||||
shallowCopy(item.dataStr, sbytes)
|
|
||||||
else:
|
|
||||||
item.dataStr = sbytes
|
|
||||||
else:
|
|
||||||
item.dataStr = sbytes
|
|
||||||
item.size = length
|
item.size = length
|
||||||
item.future = newFuture[void]("async.stream.write(string)")
|
item.future = newFuture[void]("async.stream.write(string)")
|
||||||
try:
|
try:
|
||||||
|
@ -59,7 +59,7 @@ type
|
|||||||
|
|
||||||
PEMContext = ref object
|
PEMContext = ref object
|
||||||
data: seq[byte]
|
data: seq[byte]
|
||||||
|
|
||||||
TrustAnchorStore* = ref object
|
TrustAnchorStore* = ref object
|
||||||
anchors: seq[X509TrustAnchor]
|
anchors: seq[X509TrustAnchor]
|
||||||
|
|
||||||
@ -155,7 +155,7 @@ proc tlsWriteRec(engine: ptr SslEngineContext,
|
|||||||
var length = 0'u
|
var length = 0'u
|
||||||
var buf = sslEngineSendrecBuf(engine[], length)
|
var buf = sslEngineSendrecBuf(engine[], length)
|
||||||
doAssert(length != 0 and not isNil(buf))
|
doAssert(length != 0 and not isNil(buf))
|
||||||
await writer.wsource.write(buf, int(length))
|
await writer.wsource.write(move(buf), int(length))
|
||||||
sslEngineSendrecAck(engine[], length)
|
sslEngineSendrecAck(engine[], length)
|
||||||
TLSResult.Success
|
TLSResult.Success
|
||||||
except AsyncStreamError as exc:
|
except AsyncStreamError as exc:
|
||||||
@ -468,7 +468,7 @@ proc newTLSClientAsyncStream*(
|
|||||||
## ``minVersion`` of bigger then ``maxVersion`` you will get an error.
|
## ``minVersion`` of bigger then ``maxVersion`` you will get an error.
|
||||||
##
|
##
|
||||||
## ``flags`` - custom TLS connection flags.
|
## ``flags`` - custom TLS connection flags.
|
||||||
##
|
##
|
||||||
## ``trustAnchors`` - use this if you want to use certificate trust
|
## ``trustAnchors`` - use this if you want to use certificate trust
|
||||||
## anchors other than the default Mozilla trust anchors. If you pass
|
## anchors other than the default Mozilla trust anchors. If you pass
|
||||||
## a ``TrustAnchorStore`` you should reuse the same instance for
|
## a ``TrustAnchorStore`` you should reuse the same instance for
|
||||||
|
@ -10,8 +10,9 @@
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import std/deques
|
import std/deques
|
||||||
|
import stew/ptrops
|
||||||
import ".."/[asyncloop, handles, osdefs, osutils, oserrno]
|
import ".."/[asyncloop, handles, osdefs, osutils, oserrno]
|
||||||
import common
|
import ./common
|
||||||
|
|
||||||
type
|
type
|
||||||
VectorKind = enum
|
VectorKind = enum
|
||||||
@ -770,7 +771,7 @@ when defined(windows):
|
|||||||
# Continue only if `retFuture` is not cancelled.
|
# Continue only if `retFuture` is not cancelled.
|
||||||
if not(retFuture.finished()):
|
if not(retFuture.finished()):
|
||||||
let
|
let
|
||||||
pipeSuffix = $cast[cstring](unsafeAddr address.address_un[0])
|
pipeSuffix = $cast[cstring](baseAddr address.address_un)
|
||||||
pipeAsciiName = PipeHeaderName & pipeSuffix[1 .. ^1]
|
pipeAsciiName = PipeHeaderName & pipeSuffix[1 .. ^1]
|
||||||
pipeName = toWideString(pipeAsciiName).valueOr:
|
pipeName = toWideString(pipeAsciiName).valueOr:
|
||||||
retFuture.fail(getTransportOsError(error))
|
retFuture.fail(getTransportOsError(error))
|
||||||
@ -806,7 +807,7 @@ when defined(windows):
|
|||||||
|
|
||||||
proc createAcceptPipe(server: StreamServer): Result[AsyncFD, OSErrorCode] =
|
proc createAcceptPipe(server: StreamServer): Result[AsyncFD, OSErrorCode] =
|
||||||
let
|
let
|
||||||
pipeSuffix = $cast[cstring](addr server.local.address_un)
|
pipeSuffix = $cast[cstring](baseAddr server.local.address_un)
|
||||||
pipeName = ? toWideString(PipeHeaderName & pipeSuffix)
|
pipeName = ? toWideString(PipeHeaderName & pipeSuffix)
|
||||||
openMode =
|
openMode =
|
||||||
if FirstPipe notin server.flags:
|
if FirstPipe notin server.flags:
|
||||||
@ -878,7 +879,7 @@ when defined(windows):
|
|||||||
if server.status notin {ServerStatus.Stopped, ServerStatus.Closed}:
|
if server.status notin {ServerStatus.Stopped, ServerStatus.Closed}:
|
||||||
server.apending = true
|
server.apending = true
|
||||||
let
|
let
|
||||||
pipeSuffix = $cast[cstring](addr server.local.address_un)
|
pipeSuffix = $cast[cstring](baseAddr server.local.address_un)
|
||||||
pipeAsciiName = PipeHeaderName & pipeSuffix
|
pipeAsciiName = PipeHeaderName & pipeSuffix
|
||||||
pipeName = toWideString(pipeAsciiName).valueOr:
|
pipeName = toWideString(pipeAsciiName).valueOr:
|
||||||
raiseOsDefect(error, "acceptPipeLoop(): Unable to create name " &
|
raiseOsDefect(error, "acceptPipeLoop(): Unable to create name " &
|
||||||
@ -2011,7 +2012,7 @@ proc createStreamServer*(host: TransportAddress,
|
|||||||
elif host.family in {AddressFamily.Unix}:
|
elif host.family in {AddressFamily.Unix}:
|
||||||
# We do not care about result here, because if file cannot be removed,
|
# We do not care about result here, because if file cannot be removed,
|
||||||
# `bindSocket` will return EADDRINUSE.
|
# `bindSocket` will return EADDRINUSE.
|
||||||
discard osdefs.unlink(cast[cstring](unsafeAddr host.address_un[0]))
|
discard osdefs.unlink(cast[cstring](baseAddr host.address_un))
|
||||||
|
|
||||||
host.toSAddr(saddr, slen)
|
host.toSAddr(saddr, slen)
|
||||||
if osdefs.bindSocket(SocketHandle(serverSocket),
|
if osdefs.bindSocket(SocketHandle(serverSocket),
|
||||||
@ -2240,12 +2241,11 @@ proc write*(transp: StreamTransport, msg: sink string,
|
|||||||
var retFuture = newFuture[int]("stream.transport.write(string)")
|
var retFuture = newFuture[int]("stream.transport.write(string)")
|
||||||
transp.checkClosed(retFuture)
|
transp.checkClosed(retFuture)
|
||||||
transp.checkWriteEof(retFuture)
|
transp.checkWriteEof(retFuture)
|
||||||
|
|
||||||
let
|
let
|
||||||
nbytes = if msglen <= 0: len(msg) else: msglen
|
nbytes = if msglen <= 0: len(msg) else: msglen
|
||||||
|
|
||||||
var
|
var
|
||||||
pbytes = cast[ptr byte](unsafeAddr msg[0])
|
pbytes = cast[ptr byte](baseAddr msg)
|
||||||
rbytes = nbytes
|
rbytes = nbytes
|
||||||
|
|
||||||
fastWrite(transp, pbytes, rbytes, nbytes)
|
fastWrite(transp, pbytes, rbytes, nbytes)
|
||||||
@ -2253,7 +2253,7 @@ proc write*(transp: StreamTransport, msg: sink string,
|
|||||||
let
|
let
|
||||||
written = nbytes - rbytes # In case fastWrite wrote some
|
written = nbytes - rbytes # In case fastWrite wrote some
|
||||||
|
|
||||||
var localCopy = msg
|
var localCopy = move(msg)
|
||||||
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
|
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
|
||||||
|
|
||||||
pbytes = cast[ptr byte](addr localCopy[written])
|
pbytes = cast[ptr byte](addr localCopy[written])
|
||||||
@ -2278,7 +2278,7 @@ proc write*[T](transp: StreamTransport, msg: sink seq[T],
|
|||||||
nbytes = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
|
nbytes = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
|
||||||
|
|
||||||
var
|
var
|
||||||
pbytes = cast[ptr byte](unsafeAddr msg[0])
|
pbytes = cast[ptr byte](baseAddr msg)
|
||||||
rbytes = nbytes
|
rbytes = nbytes
|
||||||
|
|
||||||
fastWrite(transp, pbytes, rbytes, nbytes)
|
fastWrite(transp, pbytes, rbytes, nbytes)
|
||||||
@ -2286,7 +2286,7 @@ proc write*[T](transp: StreamTransport, msg: sink seq[T],
|
|||||||
let
|
let
|
||||||
written = nbytes - rbytes # In case fastWrite wrote some
|
written = nbytes - rbytes # In case fastWrite wrote some
|
||||||
|
|
||||||
var localCopy = msg
|
var localCopy = move(msg)
|
||||||
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
|
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
|
||||||
|
|
||||||
pbytes = cast[ptr byte](addr localCopy[written])
|
pbytes = cast[ptr byte](addr localCopy[written])
|
||||||
|
Loading…
x
Reference in New Issue
Block a user