This commit is contained in:
Tanguy 2022-11-03 10:52:57 +01:00
parent a3e9d1ed80
commit aaf13aeddf
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
19 changed files with 28 additions and 26 deletions

View File

@ -5,6 +5,7 @@ if dirExists("nimbledeps/pkgs"):
switch("warning", "CaseTransition:off")
switch("warning", "ObservableStores:off")
switch("warning", "LockLevel:off")
switch("warningAsError", "UseBase:on")
--define:chronosStrictException
--styleCheck:usages
if (NimMajor, NimMinor) < (1, 6):

View File

@ -61,7 +61,7 @@ proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} =
proc writeMsg*(conn: Connection,
id: uint64,
msgType: MessageType,
data: seq[byte] = @[]): Future[void] =
data: sink seq[byte] = @[]): Future[void] =
var
left = data.len
offset = 0

View File

@ -197,7 +197,7 @@ method readOnce*(s: LPChannel,
await s.reset()
raise newLPStreamConnDownError(exc)
proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
proc prepareWrite(s: LPChannel, msg: sink seq[byte]): Future[void] {.async.} =
# prepareWrite is the slow path of writing a message - see conditions in
# write
if s.remoteReset:
@ -254,7 +254,7 @@ proc completeWrite(
finally:
s.writes -= 1
method write*(s: LPChannel, msg: seq[byte]): Future[void] =
method write*(s: LPChannel, msg: sink seq[byte]): Future[void] =
## Write to mplex channel - there may be up to MaxWrite concurrent writes
## pending after which the peer is disconnected

View File

@ -265,7 +265,7 @@ method readOnce*(
channel.activity = true
return toRead
proc gotDataFromRemote(channel: YamuxChannel, b: seq[byte]) {.async.} =
proc gotDataFromRemote(channel: YamuxChannel, b: sink seq[byte]) {.async.} =
channel.recvWindow -= b.len
channel.recvQueue = channel.recvQueue.concat(b)
channel.receivedData.fire()
@ -333,7 +333,7 @@ proc trySend(channel: YamuxChannel) {.async.} =
fut.complete()
channel.activity = true
method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
method write*(channel: YamuxChannel, msg: sink seq[byte]): Future[void] =
result = newFuture[void]("Yamux Send")
if channel.remoteReset:
result.fail(newLPStreamResetError())

View File

@ -30,7 +30,7 @@ method readOnce*(
self.activity = true
return await self.conn.readOnce(pbytes, nbytes)
method write*(self: RelayConnection, msg: seq[byte]): Future[void] {.async.} =
method write*(self: RelayConnection, msg: sink seq[byte]): Future[void] {.async.} =
self.dataSent.inc(msg.len)
if self.limitData != 0 and self.dataSent > self.limitData:
await self.close()

View File

@ -111,7 +111,7 @@ proc encodeMsg(peerInfo: PeerInfo, observedAddr: Opt[MultiAddress], sendSpr: boo
result.finish()
proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
proc decodeMsg*(buf: sink seq[byte]): Option[IdentifyInfo] =
var
iinfo: IdentifyInfo
pubkey: PublicKey

View File

@ -176,7 +176,7 @@ method init*(f: FloodSub) =
method publish*(f: FloodSub,
topic: string,
data: seq[byte]): Future[int] {.async.} =
data: sink seq[byte]): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(f).publish(topic, data)

View File

@ -477,7 +477,7 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
method publish*(g: GossipSub,
topic: string,
data: seq[byte]): Future[int] {.async.} =
data: sink seq[byte]): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(g).publish(topic, data)

View File

@ -84,6 +84,7 @@ declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", lab
type
InitializationError* = object of LPError
#TODO sink?
TopicHandler* {.public.} = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
@ -311,7 +312,7 @@ proc getOrCreatePeer*(
return pubSubPeer
proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] =
proc handleData*(p: PubSub, topic: string, data: sink seq[byte]): Future[void] =
# Start work on all data handlers without copying data into closure like
# happens on {.async.} transformation
p.topics.withValue(topic, handlers) do:
@ -474,7 +475,7 @@ proc subscribe*(p: PubSub,
method publish*(p: PubSub,
topic: string,
data: seq[byte]): Future[int] {.base, async, public.} =
data: sink seq[byte]): Future[int] {.base, async, public.} =
## publish to a ``topic``
##
## The return value is the number of neighbours that we attempted to send the

View File

@ -210,7 +210,7 @@ proc connectImpl(p: PubSubPeer) {.async.} =
proc connect*(p: PubSubPeer) =
asyncSpawn connectImpl(p)
proc sendImpl(conn: Connection, encoded: seq[byte]): Future[void] {.raises: [Defect].} =
proc sendImpl(conn: Connection, encoded: sink seq[byte]): Future[void] {.raises: [Defect].} =
trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded)
let fut = conn.writeLp(encoded) # Avoid copying `encoded` into future
@ -237,7 +237,7 @@ template sendMetrics(msg: RPCMsg): untyped =
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} =
proc sendEncoded*(p: PubSubPeer, msg: sink seq[byte]) {.raises: [Defect].} =
doAssert(not isNil(p), "pubsubpeer nil!")
if msg.len <= 0:

View File

@ -322,7 +322,7 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
pb.finish()
pb.buffer
proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} =
proc decodeRpcMsg*(msg: sink seq[byte]): ProtoResult[RPCMsg] {.inline.} =
trace "decodeRpcMsg: decoding message", msg = msg.shortLog()
var pb = initProtoBuffer(msg)
var rpcMsg = ok(RPCMsg())

View File

@ -333,7 +333,7 @@ proc sendHSMessage(sconn: Connection, buf: openArray[byte]): Future[void] =
proc handshakeXXOutbound(
p: Noise, conn: Connection,
p2pSecret: seq[byte]): Future[HandshakeResult] {.async.} =
p2pSecret: sink seq[byte]): Future[HandshakeResult] {.async.} =
const initiator = true
var
hs = HandshakeState.init()
@ -381,7 +381,7 @@ proc handshakeXXOutbound(
proc handshakeXXInbound(
p: Noise, conn: Connection,
p2pSecret: seq[byte]): Future[HandshakeResult] {.async.} =
p2pSecret: sink seq[byte]): Future[HandshakeResult] {.async.} =
const initiator = false
var
@ -461,7 +461,7 @@ proc encryptFrame(
cipherFrame[2 + src.len()..<cipherFrame.len] = tag
method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] =
method write*(sconn: NoiseConnection, message: sink seq[byte]): Future[void] =
# Fast path: `{.async.}` would introduce a copy of `message`
const FramingSize = 2 + sizeof(ChaChaPolyTag)

View File

@ -226,7 +226,7 @@ method readMessage*(sconn: SecioConn): Future[seq[byte]] {.async.} =
trace "Message MAC verification failed", buf = buf.shortLog
raise (ref SecioError)(msg: "message failed MAC verification")
method write*(sconn: SecioConn, message: seq[byte]) {.async.} =
method write*(sconn: SecioConn, message: sink seq[byte]) {.async.} =
## Write message ``message`` to secure connection ``sconn``.
if message.len == 0:
return

View File

@ -68,7 +68,7 @@ proc new*(
bufferStream.initStream()
bufferStream
method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} =
method pushData*(s: BufferStream, data: sink seq[byte]) {.base, async.} =
## Write bytes to internal read buffer, use this to fill up the
## buffer with data.
##

View File

@ -127,7 +127,7 @@ proc completeWrite(
if s.tracked:
libp2p_peers_traffic_write.inc(msgLen.int64, labelValues = [s.shortAgent])
method write*(s: ChronosStream, msg: seq[byte]): Future[void] =
method write*(s: ChronosStream, msg: sink seq[byte]): Future[void] =
# Avoid a copy of msg being kept in the closure created by `{.async.}` as this
# drives up memory usage
if msg.len == 0:

View File

@ -283,7 +283,7 @@ proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe, publ
await s.readExactly(addr res[0], res.len)
return res
method write*(s: LPStream, msg: seq[byte]): Future[void] {.base, public.} =
method write*(s: LPStream, msg: sink seq[byte]): Future[void] {.base, public.} =
# Write `msg` to stream, waiting for the write to be finished
doAssert(false, "not implemented!")

View File

@ -83,7 +83,7 @@ method readOnce*(
method write*(
s: WsStream,
msg: seq[byte]): Future[void] {.async.} =
msg: sink seq[byte]): Future[void] {.async.} =
mapExceptions(await s.session.send(msg, Opcode.Binary))
s.activity = true # reset activity flag

View File

@ -81,7 +81,7 @@ type
TestBufferStream* = ref object of BufferStream
writeHandler*: WriteHandler
method write*(s: TestBufferStream, msg: seq[byte]): Future[void] =
method write*(s: TestBufferStream, msg: sink seq[byte]): Future[void] =
s.writeHandler(msg)
proc new*(T: typedesc[TestBufferStream], writeHandler: WriteHandler): T =

View File

@ -57,7 +57,7 @@ method readOnce*(s: TestSelectStream,
return "\0x3na\n".len()
method write*(s: TestSelectStream, msg: seq[byte]) {.async, gcsafe.} = discard
method write*(s: TestSelectStream, msg: sink seq[byte]) {.async, gcsafe.} = discard
method close(s: TestSelectStream) {.async, gcsafe.} =
s.isClosed = true
@ -106,7 +106,7 @@ method readOnce*(s: TestLsStream,
copyMem(pbytes, addr buf[0], buf.len())
return buf.len()
method write*(s: TestLsStream, msg: seq[byte]) {.async, gcsafe.} =
method write*(s: TestLsStream, msg: sink seq[byte]) {.async, gcsafe.} =
if s.step == 4:
await s.ls(msg)
@ -160,7 +160,7 @@ method readOnce*(s: TestNaStream,
return "\0x3na\n".len()
method write*(s: TestNaStream, msg: seq[byte]) {.async, gcsafe.} =
method write*(s: TestNaStream, msg: sink seq[byte]) {.async, gcsafe.} =
if s.step == 4:
await s.na(string.fromBytes(msg))