posix: fast path for write (#244)

When `write` is called on a `StreamTransport`, the current sequence of
operations is:

* copy data to queue
* register for "write" event notification
* return unfinished future to `write` caller
* wait for "write" notification (in `poll`)
* perform one `send`
* wait for notification again if there's more data to write
* complete the future

In this PR, we introduce a fast path for writing:

* If the queue is empty, try to send as much data as possible
  * If all data is sent, return completed future without `poll` round
  * If there's more data to send than can be sent in one go, add the
rest to queue
* If the queue is not empty, enqueue as above
* When notified that write is possible, keep writing until OS buffer is
full before waiting for event again

The fast path provides significant performance benefits when there are
many small writes, such as when sending gossip to many peers, by
avoiding the poll loop and data copy on each send.

Also fixes an issue where the socket would not be removed from the
writer set if there were pending writes on close.
This commit is contained in:
Jacek Sieka 2021-12-08 11:35:27 +01:00 committed by GitHub
parent 7da1f5d4d2
commit c25fa1f6cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 246 additions and 243 deletions

View File

@ -1,5 +1,5 @@
packageName = "chronos"
version = "3.0.9"
version = "3.0.10"
author = "Status Research & Development GmbH"
description = "Chronos"
license = "Apache License 2.0 or MIT"

View File

@ -29,7 +29,8 @@ when defined(nimdoc):
##
## ``count`` is the number of bytes to copy between the file descriptors.
## On exit ``count`` will hold number of bytes actually transferred between
## file descriptors.
## file descriptors. May be >0 even in the case of error return, if some
## bytes were sent before the error occurred.
##
## If the transfer was successful, the number of bytes written to ``outfd``
## is stored in ``count``, and ``0`` returned. Note that a successful call
@ -45,10 +46,13 @@ when defined(linux) or defined(android):
proc sendfile*(outfd, infd: int, offset: int, count: var int): int =
var o = offset
result = osSendFile(cint(outfd), cint(infd), addr o, count)
if result >= 0:
count = result
result = 0
let res = osSendFile(cint(outfd), cint(infd), addr o, count)
if res >= 0:
count = res
0
else:
count = 0
-1
elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
defined(dragonflybsd):
@ -69,18 +73,17 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
proc sendfile*(outfd, infd: int, offset: int, count: var int): int =
var o = 0'u
result = osSendFile(cint(infd), cint(outfd), uint(offset), uint(count), nil,
let res = osSendFile(cint(infd), cint(outfd), uint(offset), uint(count), nil,
addr o, 0)
if result >= 0:
if res >= 0:
count = int(o)
result = 0
0
else:
let err = osLastError()
if int(err) == EAGAIN:
count = int(o)
result = 0
else:
result = -1
count =
if int(err) == EAGAIN: int(o)
else: 0
-1
elif defined(macosx):
import posix, os
@ -100,14 +103,13 @@ elif defined(macosx):
proc sendfile*(outfd, infd: int, offset: int, count: var int): int =
var o = count
result = osSendFile(cint(infd), cint(outfd), offset, addr o, nil, 0)
if result >= 0:
let res = osSendFile(cint(infd), cint(outfd), offset, addr o, nil, 0)
if res >= 0:
count = int(o)
result = 0
0
else:
let err = osLastError()
if int(err) == EAGAIN:
count = int(o)
result = 0
else:
result = -1
count =
if int(err) == EAGAIN: int(o)
else: 0
-1

View File

@ -191,11 +191,11 @@ template shiftBuffer(t, c: untyped) =
else:
(t).offset = 0
template shiftVectorBuffer(v, o: untyped) =
template shiftVectorBuffer(v: var StreamVector, o: untyped) =
(v).buf = cast[pointer](cast[uint]((v).buf) + uint(o))
(v).buflen -= int(o)
template shiftVectorFile(v, o: untyped) =
template shiftVectorFile(v: var StreamVector, o: untyped) =
(v).buf = cast[pointer](cast[uint]((v).buf) - cast[uint](o))
(v).offset += cast[uint]((o))
@ -1239,8 +1239,17 @@ else:
result = (err == OSErrorCode(ECONNRESET)) or
(err == OSErrorCode(EPIPE))
proc removeWriter(transp: StreamTransport) =
try:
transp.fd.removeWriter()
# For debugging, record that we're no longer getting write notifications
transp.state.incl WritePaused
except IOSelectorsException as exc:
raiseAsDefect exc, "removeWriter"
except ValueError as exc:
raiseAsDefect exc, "removeWriter"
proc writeStreamLoop(udata: pointer) =
# TODO fix Defect raises - they "shouldn't" happen
var cdata = cast[ptr CompletionData](udata)
var transp = cast[StreamTransport](cdata.udata)
let fd = SocketHandle(cdata.fd)
@ -1251,171 +1260,91 @@ else:
return
if WriteClosed in transp.state:
transp.state.incl({WritePaused})
let error = getTransportUseClosedError()
failPendingWriteQueue(transp.queue, error)
else:
if len(transp.queue) > 0:
var vector = transp.queue.popFirst()
while true:
if transp.kind == TransportKind.Socket:
if vector.kind == VectorKind.DataBuffer:
let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL)
if res >= 0:
if vector.buflen - res == 0:
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.shiftVectorBuffer(res)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
try:
transp.fd.removeWriter()
except IOSelectorsException as exc:
raiseAsDefect exc, "removeWriter"
except ValueError as exc:
raiseAsDefect exc, "removeWriter"
if transp.queue.len > 0:
transp.removeWriter()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
else:
var nbytes = cast[int](vector.buf)
let res = sendfile(int(fd), cast[int](vector.buflen),
int(vector.offset),
nbytes)
if res >= 0:
if cast[int](vector.buf) - nbytes == 0:
vector.size += nbytes
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.size += nbytes
vector.shiftVectorFile(nbytes)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
try:
transp.fd.removeWriter()
except IOSelectorsException as exc:
raiseAsDefect exc, "removeWriter"
except ValueError as exc:
raiseAsDefect exc, "removeWriter"
let error = getTransportUseClosedError()
failPendingWriteQueue(transp.queue, error)
return
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
# We exit this loop in two ways:
# * The queue is empty: we call removeWriter to disable further callbacks
# * EWOULDBLOCK is returned and we need to wait for a new notification
elif transp.kind == TransportKind.Pipe:
if vector.kind == VectorKind.DataBuffer:
let res = posix.write(cint(fd), vector.buf, vector.buflen)
if res >= 0:
if vector.buflen - res == 0:
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.shiftVectorBuffer(res)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
try:
transp.fd.removeWriter()
except IOSelectorsException as exc:
raiseAsDefect exc, "removeWriter"
except ValueError as exc:
raiseAsDefect exc, "removeWriter"
while len(transp.queue) > 0:
template handleError() =
let err = osLastError()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
else:
var nbytes = cast[int](vector.buf)
let res = sendfile(int(fd), cast[int](vector.buflen),
int(vector.offset),
nbytes)
if res >= 0:
if cast[int](vector.buf) - nbytes == 0:
vector.size += nbytes
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.size += nbytes
vector.shiftVectorFile(nbytes)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
try:
transp.fd.removeWriter()
except IOSelectorsException as exc:
raiseAsDefect exc, "removeWriter"
except ValueError as exc:
raiseAsDefect exc, "removeWriter"
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else:
transp.state.incl(WritePaused)
try:
transp.fd.removeWriter()
except IOSelectorsException as exc:
raiseAsDefect exc, "removeWriter"
except ValueError as exc:
raiseAsDefect exc, "removeWriter"
if cint(err) == EINTR:
# Signal happened while writing - try again with all data
transp.queue.addFirst(vector)
continue
if cint(err) in [EWOULDBLOCK, EAGAIN]:
# Socket buffer is full - wait until next write notification - in
# particular, ensure removeWriter is not called
transp.queue.addFirst(vector)
return
# The errors below will clear the write queue, meaning we'll exit the
# loop
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
var vector = transp.queue.popFirst()
case vector.kind
of VectorKind.DataBuffer:
let res =
case transp.kind
of TransportKind.Socket:
posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL)
of TransportKind.Pipe:
posix.write(cint(fd), vector.buf, vector.buflen)
else: raiseAssert "Unsupported transport kind: " & $transp.kind
if res >= 0:
if vector.buflen == res:
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.shiftVectorBuffer(res)
transp.queue.addFirst(vector) # Try again with rest of data
else:
handleError()
of VectorKind.DataFile:
var nbytes = cast[int](vector.buf)
let res = sendfile(int(fd), cast[int](vector.buflen),
int(vector.offset), nbytes)
# In case of some errors on some systems, some bytes may have been
# written (see sendfile.nim)
vector.size += nbytes
if res >= 0:
if cast[int](vector.buf) == nbytes:
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.shiftVectorFile(nbytes)
transp.queue.addFirst(vector)
else:
vector.shiftVectorFile(nbytes)
handleError()
# Nothing left in the queue - no need for further write notifications
transp.removeWriter()
proc readStreamLoop(udata: pointer) =
# TODO fix Defect raises - they "shouldn't" happen
@ -1700,11 +1629,17 @@ else:
raiseAsDefect exc, "addReader"
proc resumeWrite(transp: StreamTransport) {.inline.} =
if WritePaused in transp.state:
transp.state.excl(WritePaused)
# TODO reset flag on exception??
if transp.queue.len() == 1:
# writeStreamLoop keeps writing until queue is empty - we should not call
# resumeWrite under any other condition than when the items are
# added to a queue - if the flag is not set here, it means that the socket
# was not removed from write notifications at the right time, and this
# would mean an imbalance in registration and deregistration
doAssert WritePaused in transp.state
try:
addWriter(transp.fd, writeStreamLoop, cast[pointer](transp))
transp.state.excl WritePaused
except IOSelectorsException as exc:
raiseAsDefect exc, "addWriter"
except ValueError as exc:
@ -2101,6 +2036,40 @@ proc getUserData*[T](server: StreamServer): T {.inline.} =
## Obtain user data stored in ``server`` object.
result = cast[T](server.udata)
template fastWrite(fd: auto, pbytes: var ptr byte, rbytes: var int, nbytes: int) =
# On windows, the write could be initiated here if there is no other write
# ongoing, but the queue is still needed due to the mechanics of iocp
when not defined(windows):
if transp.queue.len == 0:
while rbytes > 0:
let res = posix.send(SocketHandle(fd), pbytes, rbytes, MSG_NOSIGNAL)
if res > 0:
pbytes = cast[ptr byte](cast[uint](pbytes) + cast[uint](res))
rbytes -= res
if rbytes == 0:
retFuture.complete(nbytes)
return retFuture
# Not all bytes written - keep going
else:
let err = osLastError()
if cint(err) in [EAGAIN, EWOULDBLOCK]:
break # No bytes written, add to queue
if cint(err) == EINTR:
continue
if isConnResetError(err):
transp.state.incl({WriteEof})
retFuture.complete(0)
return retFuture
else:
transp.state.incl({WriteError})
let error = getTransportOsError(err)
retFuture.fail(error)
return retFuture
proc write*(transp: StreamTransport, pbytes: pointer,
nbytes: int): Future[int] =
## Write data from buffer ``pbytes`` with size ``nbytes`` using transport
@ -2108,8 +2077,15 @@ proc write*(transp: StreamTransport, pbytes: pointer,
var retFuture = newFuture[int]("stream.transport.write(pointer)")
transp.checkClosed(retFuture)
transp.checkWriteEof(retFuture)
var
pbytes = cast[ptr byte](pbytes)
rbytes = nbytes # Remaining bytes
fastWrite(transp.fd, pbytes, rbytes, nbytes)
var vector = StreamVector(kind: DataBuffer, writer: retFuture,
buf: pbytes, buflen: nbytes, size: nbytes)
buf: pbytes, buflen: rbytes, size: nbytes)
transp.queue.addLast(vector)
transp.resumeWrite()
return retFuture
@ -2119,15 +2095,28 @@ proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] =
var retFuture = newFutureStr[int]("stream.transport.write(string)")
transp.checkClosed(retFuture)
transp.checkWriteEof(retFuture)
if not(isLiteral(msg)):
let
nbytes = if msglen <= 0: len(msg) else: msglen
var
pbytes = cast[ptr byte](unsafeAddr msg[0])
rbytes = nbytes
fastWrite(transp.fd, pbytes, rbytes, nbytes)
let
written = nbytes - rbytes # In case fastWrite wrote some
pbytes = if not(isLiteral(msg)):
shallowCopy(retFuture.gcholder, msg)
cast[ptr byte](addr retFuture.gcholder[written])
else:
retFuture.gcholder = msg
let length = if msglen <= 0: len(msg) else: msglen
var vector = StreamVector(kind: DataBuffer,
writer: cast[Future[int]](retFuture),
buf: addr retFuture.gcholder[0], buflen: length,
size: length)
retFuture.gcholder = msg[written..<nbytes]
cast[ptr byte](addr retFuture.gcholder[0])
var vector = StreamVector(kind: DataBuffer, writer: retFuture,
buf: pbytes, buflen: rbytes, size: nbytes)
transp.queue.addLast(vector)
transp.resumeWrite()
return retFuture
@ -2137,15 +2126,28 @@ proc write*[T](transp: StreamTransport, msg: seq[T], msglen = -1): Future[int] =
var retFuture = newFutureSeq[int, T]("stream.transport.write(seq)")
transp.checkClosed(retFuture)
transp.checkWriteEof(retFuture)
if not(isLiteral(msg)):
let
nbytes = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
var
pbytes = cast[ptr byte](unsafeAddr msg[0])
rbytes = nbytes
fastWrite(transp.fd, pbytes, rbytes, nbytes)
let
written = nbytes - rbytes # In case fastWrite wrote some
pbytes = if not(isLiteral(msg)):
shallowCopy(retFuture.gcholder, msg)
cast[ptr byte](addr retFuture.gcholder[written])
else:
retFuture.gcholder = msg
let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
var vector = StreamVector(kind: DataBuffer,
writer: cast[Future[int]](retFuture),
buf: addr retFuture.gcholder[0],
buflen: length, size: length)
retFuture.gcholder = msg[written..<nbytes]
cast[ptr byte](addr retFuture.gcholder[0])
var vector = StreamVector(kind: DataBuffer, writer: retFuture,
buf: pbytes, buflen: rbytes, size: nbytes)
transp.queue.addLast(vector)
transp.resumeWrite()
return retFuture

View File

@ -28,27 +28,6 @@ suite "Stream Transport test suite":
FilesCount = 10
TestsCount = 100
m1 = "readLine() multiple clients with messages (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)"
m2 = "readExactly() multiple clients with messages (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)"
m3 = "readUntil() multiple clients with messages (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)"
m4 = "writeFile() multiple clients (" & $FilesCount & " files)"
m5 = "write(string)/read(int) multiple clients (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)"
m6 = "write(seq[byte])/consume(int)/read(int) multiple clients (" &
$ClientsCount & " clients x " & $MessagesCount & " messages)"
m7 = "readLine() buffer overflow test"
m8 = "readUntil() buffer overflow test"
m11 = "readExactly() unexpected disconnect test"
m12 = "readUntil() unexpected disconnect test"
m13 = "readLine() unexpected disconnect empty string test"
m14 = "Closing socket while operation pending test (issue #8)"
m15 = "Connection refused test"
m16 = "readOnce() read until atEof() test"
m17 = "0.0.0.0/::0 (INADDR_ANY) test"
when defined(windows):
let addresses = [
initTAddress("127.0.0.1:33335"),
@ -1159,15 +1138,30 @@ suite "Stream Transport test suite":
proc acceptTask(server: StreamServer) {.async.} =
let transp = await server.accept()
var futs = newSeq[Future[int]](TestsCount)
var futs = newSeq[Future[int]]()
var msg = createBigMessage(1024)
for i in 0 ..< len(futs):
futs[i] = transp.write(msg)
var tries = 0
while futs.len() < TestsCount:
let fut = transp.write(msg)
# `write` has a fast path that puts the data in the OS socket buffer -
# we'll keep writing until we get EAGAIN from the OS so that we have
# data in the in-chronos queue to fail on close
if not fut.completed():
futs.add(fut)
else:
tries += 1
if tries > 65*1024:
# We've queued 64mb on the socket and it still allows writing,
# something is wrong - we'll break here which will cause the test
# to fail
break
await transp.closeWait()
await sleepAsync(100.milliseconds)
for i in 0 ..< len(futs):
# writes may complete via fast write
if futs[i].failed() and (futs[i].error of TransportUseClosedError):
inc(res)
@ -1244,29 +1238,34 @@ suite "Stream Transport test suite":
for i in 0..<len(addresses):
test prefixes[i] & "close(transport) test":
check waitFor(testCloseTransport(addresses[i])) == 1
test prefixes[i] & m8:
test prefixes[i] & "readUntil() buffer overflow test":
check waitFor(test8(addresses[i])) == 1
test prefixes[i] & m7:
test prefixes[i] & "readLine() buffer overflow test":
check waitFor(test7(addresses[i])) == 1
test prefixes[i] & m11:
test prefixes[i] & "readExactly() unexpected disconnect test":
check waitFor(test11(addresses[i])) == 1
test prefixes[i] & m12:
test prefixes[i] & "readUntil() unexpected disconnect test":
check waitFor(test12(addresses[i])) == 1
test prefixes[i] & m13:
test prefixes[i] & "readLine() unexpected disconnect empty string test":
check waitFor(test13(addresses[i])) == 1
test prefixes[i] & m14:
test prefixes[i] & "Closing socket while operation pending test (issue #8)":
check waitFor(test14(addresses[i])) == 1
test prefixes[i] & m1:
test prefixes[i] & "readLine() multiple clients with messages (" &
$ClientsCount & " clients x " & $MessagesCount & " messages)":
check waitFor(test1(addresses[i])) == ClientsCount * MessagesCount
test prefixes[i] & m2:
test prefixes[i] & "readExactly() multiple clients with messages (" &
$ClientsCount & " clients x " & $MessagesCount & " messages)":
check waitFor(test2(addresses[i])) == ClientsCount * MessagesCount
test prefixes[i] & m3:
test prefixes[i] & "readUntil() multiple clients with messages (" &
$ClientsCount & " clients x " & $MessagesCount & " messages)":
check waitFor(test3(addresses[i])) == ClientsCount * MessagesCount
test prefixes[i] & m5:
test prefixes[i] & "write(string)/read(int) multiple clients (" &
$ClientsCount & " clients x " & $MessagesCount & " messages)":
check waitFor(testWR(addresses[i])) == ClientsCount * MessagesCount
test prefixes[i] & m6:
test prefixes[i] & "write(seq[byte])/consume(int)/read(int) multiple clients (" &
$ClientsCount & " clients x " & $MessagesCount & " messages)":
check waitFor(testWCR(addresses[i])) == ClientsCount * MessagesCount
test prefixes[i] & m4:
test prefixes[i] & "writeFile() multiple clients (" & $FilesCount & " files)":
when defined(windows):
if addresses[i].family == AddressFamily.IPv4:
check waitFor(testSendFile(addresses[i])) == FilesCount
@ -1274,21 +1273,21 @@ suite "Stream Transport test suite":
skip()
else:
check waitFor(testSendFile(addresses[i])) == FilesCount
test prefixes[i] & m15:
test prefixes[i] & "Connection refused test":
var address: TransportAddress
if addresses[i].family == AddressFamily.Unix:
address = initTAddress("/tmp/notexistingtestpipe")
else:
address = initTAddress("127.0.0.1:43335")
check waitFor(testConnectionRefused(address)) == true
test prefixes[i] & m16:
test prefixes[i] & "readOnce() read until atEof() test":
check waitFor(test16(addresses[i])) == 1
test prefixes[i] & "Connection reset test on send() only":
when defined(macosx):
skip()
else:
check waitFor(testWriteConnReset(addresses[i])) == 1
test prefixes[i] & m17:
test prefixes[i] & "0.0.0.0/::0 (INADDR_ANY) test":
if addresses[i].family == AddressFamily.IPv4:
check waitFor(testAnyAddress()) == true
else: