From 8e49df14007e27370cd1ce77edb2843783b45e6d Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Sun, 7 Apr 2024 07:03:12 +0300 Subject: [PATCH] Ensure that all buffers used inside HTTP client will follow original buffer size. (#530) Ensure that buffer size cannot be lower than default size. --- chronos/apps/http/httpclient.nim | 18 ++++++++++++------ chronos/streams/asyncstream.nim | 6 ++++-- chronos/transports/stream.nim | 12 ++++++++---- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/chronos/apps/http/httpclient.nim b/chronos/apps/http/httpclient.nim index 33a6b7f..414b1d3 100644 --- a/chronos/apps/http/httpclient.nim +++ b/chronos/apps/http/httpclient.nim @@ -567,7 +567,8 @@ proc new( tls = try: newTLSClientAsyncStream(treader, twriter, ha.hostname, - flags = session.flags.getTLSFlags()) + flags = session.flags.getTLSFlags(), + bufferSize = session.connectionBufferSize) except TLSStreamInitError as exc: return err(exc.msg) @@ -1327,13 +1328,18 @@ proc getBodyReader*(response: HttpClientResponseRef): HttpBodyReader {. let reader = case response.bodyFlag of HttpClientBodyFlag.Sized: - let bstream = newBoundedStreamReader(response.connection.reader, - response.contentLength) - newHttpBodyReader(bstream) + newHttpBodyReader( + newBoundedStreamReader( + response.connection.reader, response.contentLength, + bufferSize = response.session.connectionBufferSize)) of HttpClientBodyFlag.Chunked: - newHttpBodyReader(newChunkedStreamReader(response.connection.reader)) + newHttpBodyReader( + newChunkedStreamReader( + response.connection.reader, + bufferSize = response.session.connectionBufferSize)) of HttpClientBodyFlag.Custom: - newHttpBodyReader(newAsyncStreamReader(response.connection.reader)) + newHttpBodyReader( + newAsyncStreamReader(response.connection.reader)) response.connection.state = HttpClientConnectionState.ResponseBodyReceiving response.reader = reader response.reader diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index bb878db..301b086 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -929,7 +929,8 @@ proc init*(child, rsource: AsyncStreamReader, loop: StreamReaderLoop, child.readerLoop = loop child.rsource = rsource child.tsource = rsource.tsource - child.buffer = AsyncBufferRef.new(bufferSize) + let size = max(AsyncStreamDefaultBufferSize, bufferSize) + child.buffer = AsyncBufferRef.new(size) trackCounter(AsyncStreamReaderTrackerName) child.startReader() @@ -941,7 +942,8 @@ proc init*[T](child, rsource: AsyncStreamReader, loop: StreamReaderLoop, child.readerLoop = loop child.rsource = rsource child.tsource = rsource.tsource - child.buffer = AsyncBufferRef.new(bufferSize) + let size = max(AsyncStreamDefaultBufferSize, bufferSize) + child.buffer = AsyncBufferRef.new(size) if not isNil(udata): GC_ref(udata) child.udata = cast[pointer](udata) diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index 7b5925b..0f006b8 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -585,7 +585,8 @@ when defined(windows): udata: cast[pointer](transp)) transp.wovl.data = CompletionData(cb: writeStreamLoop, udata: cast[pointer](transp)) - transp.buffer = BipBuffer.init(bufsize) + let size = max(bufsize, DefaultStreamBufferSize) + transp.buffer = BipBuffer.init(size) transp.state = {ReadPaused, WritePaused} transp.queue = initDeque[StreamVector]() transp.future = Future[void].Raising([]).init( @@ -606,7 +607,8 @@ when defined(windows): udata: cast[pointer](transp)) transp.wovl.data = CompletionData(cb: writeStreamLoop, udata: cast[pointer](transp)) - transp.buffer = BipBuffer.init(bufsize) + let size = max(bufsize, DefaultStreamBufferSize) + transp.buffer = BipBuffer.init(size) transp.flags = flags transp.state = {ReadPaused, WritePaused} transp.queue = initDeque[StreamVector]() @@ -1452,7 +1454,8 @@ else: transp = StreamTransport(kind: TransportKind.Socket) transp.fd = sock - transp.buffer = BipBuffer.init(bufsize) + let size = max(bufsize, DefaultStreamBufferSize) + transp.buffer = BipBuffer.init(size) transp.state = {ReadPaused, WritePaused} transp.queue = initDeque[StreamVector]() transp.future = Future[void].Raising([]).init( @@ -1469,7 +1472,8 @@ else: transp = StreamTransport(kind: TransportKind.Pipe) transp.fd = fd - transp.buffer = BipBuffer.init(bufsize) + let size = max(bufsize, DefaultStreamBufferSize) + transp.buffer = BipBuffer.init(size) transp.state = {ReadPaused, WritePaused} transp.queue = initDeque[StreamVector]() transp.future = Future[void].Raising([]).init(