Various cleanups part 1 (#632)

* raise -> raise exc
* replace stdlib random with bearssl
* object init -> new
* Remove deprecated procs
* getMandatoryField
This commit is contained in:
Tanguy 2021-10-25 10:26:32 +02:00 committed by GitHub
parent 3669b90ceb
commit 846baf3853
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 365 additions and 621 deletions

View File

@ -80,7 +80,7 @@ proc withAddresses*(b: SwitchBuilder, addresses: seq[MultiAddress]): SwitchBuild
proc withMplex*(b: SwitchBuilder, inTimeout = 5.minutes, outTimeout = 5.minutes): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer =
Mplex.init(
Mplex.new(
conn,
inTimeout = inTimeout,
outTimeout = outTimeout)
@ -151,7 +151,7 @@ proc build*(b: SwitchBuilder): Switch
secureManagerInstances.add(Noise.new(b.rng, seckey).Secure)
let
peerInfo = PeerInfo.init(
peerInfo = PeerInfo.new(
seckey,
b.addresses,
protoVersion = b.protoVersion,
@ -166,9 +166,9 @@ proc build*(b: SwitchBuilder): Switch
let
identify = Identify.new(peerInfo)
connManager = ConnManager.init(b.maxConnsPerPeer, b.maxConnections, b.maxIn, b.maxOut)
connManager = ConnManager.new(b.maxConnsPerPeer, b.maxConnections, b.maxIn, b.maxOut)
ms = MultistreamSelect.new()
muxedUpgrade = MuxedUpgrade.init(identify, muxers, secureManagerInstances, connManager, ms)
muxedUpgrade = MuxedUpgrade.new(identify, muxers, secureManagerInstances, connManager, ms)
let
transports = block:

View File

@ -84,7 +84,7 @@ type
proc newTooManyConnectionsError(): ref TooManyConnectionsError {.inline.} =
result = newException(TooManyConnectionsError, "Too many connections")
proc init*(C: type ConnManager,
proc new*(C: type ConnManager,
maxConnsPerPeer = MaxConnectionsPerPeer,
maxConnections = MaxConnections,
maxIn = -1,

View File

@ -174,6 +174,19 @@ proc newRng*(): ref BrHmacDrbgContext =
return nil
rng
proc shuffle*[T](
rng: ref BrHmacDrbgContext,
x: var openArray[T]) =
var randValues = newSeqUninitialized[byte](len(x) * 2)
brHmacDrbgGenerate(rng[], randValues)
for i in countdown(x.high, 1):
let
rand = randValues[i * 2].int32 or (randValues[i * 2 + 1].int32 shl 8)
y = rand mod i
swap(x[i], x[y])
proc random*(T: typedesc[PrivateKey], scheme: PKScheme,
rng: var BrHmacDrbgContext,
bits = RsaDefaultKeySize): CryptoResult[PrivateKey] =
@ -331,9 +344,6 @@ proc getPublicKey*(key: PrivateKey): CryptoResult[PublicKey] =
else:
err(SchemeError)
proc getKey*(key: PrivateKey): CryptoResult[PublicKey] {.deprecated: "use getPublicKey".} =
key.getPublicKey()
proc toRawBytes*(key: PrivateKey | PublicKey,
data: var openarray[byte]): CryptoResult[int] =
## Serialize private key ``key`` (using scheme's own serialization) and store
@ -1013,39 +1023,6 @@ proc write*(pb: var ProtoBuffer, field: int, sig: Signature) {.
inline, raises: [Defect].} =
write(pb, field, sig.getBytes())
proc initProtoField*(index: int, key: PublicKey|PrivateKey): ProtoField {.
deprecated, raises: [Defect, ResultError[CryptoError]].} =
## Initialize ProtoField with PublicKey/PrivateKey ``key``.
result = initProtoField(index, key.getBytes().tryGet())
proc initProtoField*(index: int, sig: Signature): ProtoField {.deprecated.} =
## Initialize ProtoField with Signature ``sig``.
result = initProtoField(index, sig.getBytes())
proc getValue*[T: PublicKey|PrivateKey](data: var ProtoBuffer, field: int,
value: var T): int {.deprecated.} =
## Read PublicKey/PrivateKey from ProtoBuf's message and validate it.
var buf: seq[byte]
var key: PublicKey
result = getLengthValue(data, field, buf)
if result > 0:
if not key.init(buf):
result = -1
else:
value = key
proc getValue*(data: var ProtoBuffer, field: int, value: var Signature): int {.
deprecated.} =
## Read ``Signature`` from ProtoBuf's message and validate it.
var buf: seq[byte]
var sig: Signature
result = getLengthValue(data, field, buf)
if result > 0:
if not sig.init(buf):
result = -1
else:
value = sig
proc getField*[T: PublicKey|PrivateKey](pb: ProtoBuffer, field: int,
value: var T): ProtoResult[bool] =
## Deserialize public/private key from protobuf's message ``pb`` using field

View File

@ -10,7 +10,7 @@
{.push raises: [Defect].}
## This module implementes API for `go-libp2p-daemon`.
import std/[os, osproc, strutils, tables, strtabs]
import std/[os, osproc, strutils, tables, strtabs, sequtils]
import pkg/[chronos, chronicles]
import ../varint, ../multiaddress, ../multicodec, ../cid, ../peerid
import ../wire, ../multihash, ../protobuf/minprotobuf, ../errors
@ -35,7 +35,7 @@ type
Critical, Error, Warning, Notice, Info, Debug, Trace
RequestType* {.pure.} = enum
IDENTITY = 0,
IDENTIFY = 0,
CONNECT = 1,
STREAM_OPEN = 2,
STREAM_HANDLER = 3,
@ -167,7 +167,7 @@ proc requestIdentity(): ProtoBuffer =
## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go
## Processing function `doIdentify(req *pb.Request)`.
result = initProtoBuffer({WithVarintLength})
result.write(initProtoField(1, cast[uint](RequestType.IDENTITY)))
result.write(1, cast[uint](RequestType.IDENTIFY))
result.finish()
proc requestConnect(peerid: PeerID,
@ -177,13 +177,13 @@ proc requestConnect(peerid: PeerID,
## Processing function `doConnect(req *pb.Request)`.
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, peerid))
msg.write(1, peerid)
for item in addresses:
msg.write(initProtoField(2, item.data.buffer))
msg.write(2, item.data.buffer)
if timeout > 0:
msg.write(initProtoField(3, hint64(timeout)))
result.write(initProtoField(1, cast[uint](RequestType.CONNECT)))
result.write(initProtoField(2, msg))
msg.write(3, hint64(timeout))
result.write(1, cast[uint](RequestType.CONNECT))
result.write(2, msg)
result.finish()
proc requestDisconnect(peerid: PeerID): ProtoBuffer =
@ -191,9 +191,9 @@ proc requestDisconnect(peerid: PeerID): ProtoBuffer =
## Processing function `doDisconnect(req *pb.Request)`.
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, peerid))
result.write(initProtoField(1, cast[uint](RequestType.DISCONNECT)))
result.write(initProtoField(7, msg))
msg.write(1, peerid)
result.write(1, cast[uint](RequestType.DISCONNECT))
result.write(7, msg)
result.finish()
proc requestStreamOpen(peerid: PeerID,
@ -203,13 +203,13 @@ proc requestStreamOpen(peerid: PeerID,
## Processing function `doStreamOpen(req *pb.Request)`.
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, peerid))
msg.write(1, peerid)
for item in protocols:
msg.write(initProtoField(2, item))
msg.write(2, item)
if timeout > 0:
msg.write(initProtoField(3, hint64(timeout)))
result.write(initProtoField(1, cast[uint](RequestType.STREAM_OPEN)))
result.write(initProtoField(3, msg))
msg.write(3, hint64(timeout))
result.write(1, cast[uint](RequestType.STREAM_OPEN))
result.write(3, msg)
result.finish()
proc requestStreamHandler(address: MultiAddress,
@ -218,18 +218,18 @@ proc requestStreamHandler(address: MultiAddress,
## Processing function `doStreamHandler(req *pb.Request)`.
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, address.data.buffer))
msg.write(1, address.data.buffer)
for item in protocols:
msg.write(initProtoField(2, item))
result.write(initProtoField(1, cast[uint](RequestType.STREAM_HANDLER)))
result.write(initProtoField(4, msg))
msg.write(2, item)
result.write(1, cast[uint](RequestType.STREAM_HANDLER))
result.write(4, msg)
result.finish()
proc requestListPeers(): ProtoBuffer =
## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go
## Processing function `doListPeers(req *pb.Request)`
result = initProtoBuffer({WithVarintLength})
result.write(initProtoField(1, cast[uint](RequestType.LIST_PEERS)))
result.write(1, cast[uint](RequestType.LIST_PEERS))
result.finish()
proc requestDHTFindPeer(peer: PeerID, timeout = 0): ProtoBuffer =
@ -238,13 +238,13 @@ proc requestDHTFindPeer(peer: PeerID, timeout = 0): ProtoBuffer =
let msgid = cast[uint](DHTRequestType.FIND_PEER)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(2, peer))
msg.write(1, msgid)
msg.write(2, peer)
if timeout > 0:
msg.write(initProtoField(7, hint64(timeout)))
msg.write(7, hint64(timeout))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.DHT)))
result.write(initProtoField(5, msg))
result.write(1, cast[uint](RequestType.DHT))
result.write(5, msg)
result.finish()
proc requestDHTFindPeersConnectedToPeer(peer: PeerID,
@ -254,13 +254,13 @@ proc requestDHTFindPeersConnectedToPeer(peer: PeerID,
let msgid = cast[uint](DHTRequestType.FIND_PEERS_CONNECTED_TO_PEER)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(2, peer))
msg.write(1, msgid)
msg.write(2, peer)
if timeout > 0:
msg.write(initProtoField(7, hint64(timeout)))
msg.write(7, hint64(timeout))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.DHT)))
result.write(initProtoField(5, msg))
result.write(1, cast[uint](RequestType.DHT))
result.write(5, msg)
result.finish()
proc requestDHTFindProviders(cid: Cid,
@ -270,14 +270,14 @@ proc requestDHTFindProviders(cid: Cid,
let msgid = cast[uint](DHTRequestType.FIND_PROVIDERS)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(3, cid.data.buffer))
msg.write(initProtoField(6, count))
msg.write(1, msgid)
msg.write(3, cid.data.buffer)
msg.write(6, count)
if timeout > 0:
msg.write(initProtoField(7, hint64(timeout)))
msg.write(7, hint64(timeout))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.DHT)))
result.write(initProtoField(5, msg))
result.write(1, cast[uint](RequestType.DHT))
result.write(5, msg)
result.finish()
proc requestDHTGetClosestPeers(key: string, timeout = 0): ProtoBuffer =
@ -286,13 +286,13 @@ proc requestDHTGetClosestPeers(key: string, timeout = 0): ProtoBuffer =
let msgid = cast[uint](DHTRequestType.GET_CLOSEST_PEERS)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(4, key))
msg.write(1, msgid)
msg.write(4, key)
if timeout > 0:
msg.write(initProtoField(7, hint64(timeout)))
msg.write(7, hint64(timeout))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.DHT)))
result.write(initProtoField(5, msg))
result.write(1, cast[uint](RequestType.DHT))
result.write(5, msg)
result.finish()
proc requestDHTGetPublicKey(peer: PeerID, timeout = 0): ProtoBuffer =
@ -301,13 +301,13 @@ proc requestDHTGetPublicKey(peer: PeerID, timeout = 0): ProtoBuffer =
let msgid = cast[uint](DHTRequestType.GET_PUBLIC_KEY)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(2, peer))
msg.write(1, msgid)
msg.write(2, peer)
if timeout > 0:
msg.write(initProtoField(7, hint64(timeout)))
msg.write(7, hint64(timeout))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.DHT)))
result.write(initProtoField(5, msg))
result.write(1, cast[uint](RequestType.DHT))
result.write(5, msg)
result.finish()
proc requestDHTGetValue(key: string, timeout = 0): ProtoBuffer =
@ -316,13 +316,13 @@ proc requestDHTGetValue(key: string, timeout = 0): ProtoBuffer =
let msgid = cast[uint](DHTRequestType.GET_VALUE)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(4, key))
msg.write(1, msgid)
msg.write(4, key)
if timeout > 0:
msg.write(initProtoField(7, hint64(timeout)))
msg.write(7, hint64(timeout))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.DHT)))
result.write(initProtoField(5, msg))
result.write(1, cast[uint](RequestType.DHT))
result.write(5, msg)
result.finish()
proc requestDHTSearchValue(key: string, timeout = 0): ProtoBuffer =
@ -331,13 +331,13 @@ proc requestDHTSearchValue(key: string, timeout = 0): ProtoBuffer =
let msgid = cast[uint](DHTRequestType.SEARCH_VALUE)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(4, key))
msg.write(1, msgid)
msg.write(4, key)
if timeout > 0:
msg.write(initProtoField(7, hint64(timeout)))
msg.write(7, hint64(timeout))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.DHT)))
result.write(initProtoField(5, msg))
result.write(1, cast[uint](RequestType.DHT))
result.write(5, msg)
result.finish()
proc requestDHTPutValue(key: string, value: openarray[byte],
@ -347,14 +347,14 @@ proc requestDHTPutValue(key: string, value: openarray[byte],
let msgid = cast[uint](DHTRequestType.PUT_VALUE)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(4, key))
msg.write(initProtoField(5, value))
msg.write(1, msgid)
msg.write(4, key)
msg.write(5, value)
if timeout > 0:
msg.write(initProtoField(7, hint64(timeout)))
msg.write(7, hint64(timeout))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.DHT)))
result.write(initProtoField(5, msg))
result.write(1, cast[uint](RequestType.DHT))
result.write(5, msg)
result.finish()
proc requestDHTProvide(cid: Cid, timeout = 0): ProtoBuffer =
@ -363,13 +363,13 @@ proc requestDHTProvide(cid: Cid, timeout = 0): ProtoBuffer =
let msgid = cast[uint](DHTRequestType.PROVIDE)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(3, cid.data.buffer))
msg.write(1, msgid)
msg.write(3, cid.data.buffer)
if timeout > 0:
msg.write(initProtoField(7, hint64(timeout)))
msg.write(7, hint64(timeout))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.DHT)))
result.write(initProtoField(5, msg))
result.write(1, cast[uint](RequestType.DHT))
result.write(5, msg)
result.finish()
proc requestCMTagPeer(peer: PeerID, tag: string, weight: int): ProtoBuffer =
@ -377,13 +377,13 @@ proc requestCMTagPeer(peer: PeerID, tag: string, weight: int): ProtoBuffer =
let msgid = cast[uint](ConnManagerRequestType.TAG_PEER)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(2, peer))
msg.write(initProtoField(3, tag))
msg.write(initProtoField(4, hint64(weight)))
msg.write(1, msgid)
msg.write(2, peer)
msg.write(3, tag)
msg.write(4, hint64(weight))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.CONNMANAGER)))
result.write(initProtoField(6, msg))
result.write(1, cast[uint](RequestType.CONNMANAGER))
result.write(6, msg)
result.finish()
proc requestCMUntagPeer(peer: PeerID, tag: string): ProtoBuffer =
@ -391,12 +391,12 @@ proc requestCMUntagPeer(peer: PeerID, tag: string): ProtoBuffer =
let msgid = cast[uint](ConnManagerRequestType.UNTAG_PEER)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(2, peer))
msg.write(initProtoField(3, tag))
msg.write(1, msgid)
msg.write(2, peer)
msg.write(3, tag)
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.CONNMANAGER)))
result.write(initProtoField(6, msg))
result.write(1, cast[uint](RequestType.CONNMANAGER))
result.write(6, msg)
result.finish()
proc requestCMTrim(): ProtoBuffer =
@ -404,10 +404,10 @@ proc requestCMTrim(): ProtoBuffer =
let msgid = cast[uint](ConnManagerRequestType.TRIM)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(1, msgid)
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.CONNMANAGER)))
result.write(initProtoField(6, msg))
result.write(1, cast[uint](RequestType.CONNMANAGER))
result.write(6, msg)
result.finish()
proc requestPSGetTopics(): ProtoBuffer =
@ -416,10 +416,10 @@ proc requestPSGetTopics(): ProtoBuffer =
let msgid = cast[uint](PSRequestType.GET_TOPICS)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(1, msgid)
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.PUBSUB)))
result.write(initProtoField(8, msg))
result.write(1, cast[uint](RequestType.PUBSUB))
result.write(8, msg)
result.finish()
proc requestPSListPeers(topic: string): ProtoBuffer =
@ -428,11 +428,11 @@ proc requestPSListPeers(topic: string): ProtoBuffer =
let msgid = cast[uint](PSRequestType.LIST_PEERS)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(2, topic))
msg.write(1, msgid)
msg.write(2, topic)
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.PUBSUB)))
result.write(initProtoField(8, msg))
result.write(1, cast[uint](RequestType.PUBSUB))
result.write(8, msg)
result.finish()
proc requestPSPublish(topic: string, data: openarray[byte]): ProtoBuffer =
@ -441,12 +441,12 @@ proc requestPSPublish(topic: string, data: openarray[byte]): ProtoBuffer =
let msgid = cast[uint](PSRequestType.PUBLISH)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(2, topic))
msg.write(initProtoField(3, data))
msg.write(1, msgid)
msg.write(2, topic)
msg.write(3, data)
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.PUBSUB)))
result.write(initProtoField(8, msg))
result.write(1, cast[uint](RequestType.PUBSUB))
result.write(8, msg)
result.finish()
proc requestPSSubscribe(topic: string): ProtoBuffer =
@ -455,25 +455,26 @@ proc requestPSSubscribe(topic: string): ProtoBuffer =
let msgid = cast[uint](PSRequestType.SUBSCRIBE)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(2, topic))
msg.write(1, msgid)
msg.write(2, topic)
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.PUBSUB)))
result.write(initProtoField(8, msg))
result.write(1, cast[uint](RequestType.PUBSUB))
result.write(8, msg)
result.finish()
proc checkResponse(pb: var ProtoBuffer): ResponseKind {.inline.} =
proc checkResponse(pb: ProtoBuffer): ResponseKind {.inline.} =
result = ResponseKind.Malformed
var value: uint64
if getVarintValue(pb, 1, value) > 0:
if getRequiredField(pb, 1, value).isOk():
if value == 0:
result = ResponseKind.Success
else:
result = ResponseKind.Error
proc getErrorMessage(pb: var ProtoBuffer): string {.inline, raises: [Defect, DaemonLocalError].} =
if pb.enterSubmessage() == cast[int](ResponseType.ERROR):
if pb.getString(1, result) == -1:
proc getErrorMessage(pb: ProtoBuffer): string {.inline, raises: [Defect, DaemonLocalError].} =
var error: seq[byte]
if pb.getRequiredField(ResponseType.ERROR.int, error).isOk():
if initProtoBuffer(error).getRequiredField(1, result).isErr():
raise newException(DaemonLocalError, "Error message is missing!")
proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} =
@ -830,26 +831,14 @@ proc transactMessage(transp: StreamTransport,
raise newException(DaemonLocalError, "Incorrect or empty message received!")
result = initProtoBuffer(message)
proc getPeerInfo(pb: var ProtoBuffer): PeerInfo
proc getPeerInfo(pb: ProtoBuffer): PeerInfo
{.raises: [Defect, DaemonLocalError].} =
## Get PeerInfo object from ``pb``.
result.addresses = newSeq[MultiAddress]()
if pb.getValue(1, result.peer) == -1:
raise newException(DaemonLocalError, "Missing required field `peer`!")
if pb.getRequiredField(1, result.peer).isErr():
raise newException(DaemonLocalError, "Incorrect or empty message received!")
var address = newSeq[byte]()
while pb.getBytes(2, address) != -1:
if len(address) != 0:
var copyaddr = address
let addrRes = MultiAddress.init(copyaddr)
# TODO: for some reason `toException` doesn't
# work for this module
if addrRes.isErr:
raise newException(DaemonLocalError, addrRes.error)
result.addresses.add(addrRes.get())
address.setLen(0)
discard pb.getRepeatedField(2, result.addresses)
proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} =
## Get Node identity information
@ -857,9 +846,10 @@ proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} =
try:
var pb = await transactMessage(transp, requestIdentity())
pb.withMessage() do:
let res = pb.enterSubmessage()
if res == cast[int](ResponseType.IDENTITY):
result = pb.getPeerInfo()
var res: seq[byte]
if pb.getRequiredField(ResponseType.IDENTITY.int, res).isOk():
var resPb = initProtoBuffer(res)
result = getPeerInfo(resPb)
finally:
await api.closeConnection(transp)
@ -897,18 +887,16 @@ proc openStream*(api: DaemonAPI, peer: PeerID,
var pb = await transp.transactMessage(requestStreamOpen(peer, protocols,
timeout))
pb.withMessage() do:
var res = pb.enterSubmessage()
if res == cast[int](ResponseType.STREAMINFO):
var res: seq[byte]
if pb.getRequiredField(ResponseType.STREAMINFO.int, res).isOk():
let resPb = initProtoBuffer(res)
# stream.peer = newSeq[byte]()
var raddress = newSeq[byte]()
stream.protocol = ""
if pb.getValue(1, stream.peer) == -1:
raise newException(DaemonLocalError, "Missing `peer` field!")
if pb.getLengthValue(2, raddress) == -1:
raise newException(DaemonLocalError, "Missing `address` field!")
resPb.getRequiredField(1, stream.peer).tryGet()
resPb.getRequiredField(2, raddress).tryGet()
stream.raddress = MultiAddress.init(raddress).tryGet()
if pb.getLengthValue(3, stream.protocol) == -1:
raise newException(DaemonLocalError, "Missing `proto` field!")
resPb.getRequiredField(3, stream.protocol).tryGet()
stream.flags.incl(Outbound)
stream.transp = transp
result = stream
@ -923,13 +911,10 @@ proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
var stream = new P2PStream
var raddress = newSeq[byte]()
stream.protocol = ""
if pb.getValue(1, stream.peer) == -1:
raise newException(DaemonLocalError, "Missing `peer` field!")
if pb.getLengthValue(2, raddress) == -1:
raise newException(DaemonLocalError, "Missing `address` field!")
pb.getRequiredField(1, stream.peer).tryGet()
pb.getRequiredField(2, raddress).tryGet()
stream.raddress = MultiAddress.init(raddress).tryGet()
if pb.getLengthValue(3, stream.protocol) == -1:
raise newException(DaemonLocalError, "Missing `proto` field!")
pb.getRequiredField(3, stream.protocol).tryGet()
stream.flags.incl(Inbound)
stream.transp = transp
if len(stream.protocol) > 0:
@ -968,14 +953,11 @@ proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} =
var pb = await transp.transactMessage(requestListPeers())
pb.withMessage() do:
result = newSeq[PeerInfo]()
var res = pb.enterSubmessage()
while res != 0:
if res == cast[int](ResponseType.PEERINFO):
var peer = pb.getPeerInfo()
var ress: seq[seq[byte]]
if pb.getRequiredRepeatedField(ResponseType.PEERINFO.int, ress).isOk():
for p in ress:
let peer = initProtoBuffer(p).getPeerInfo()
result.add(peer)
else:
pb.skipSubmessage()
res = pb.enterSubmessage()
finally:
await api.closeConnection(transp)
@ -1010,51 +992,61 @@ proc cmTrimPeers*(api: DaemonAPI) {.async.} =
finally:
await api.closeConnection(transp)
proc dhtGetSinglePeerInfo(pb: var ProtoBuffer): PeerInfo
proc dhtGetSinglePeerInfo(pb: ProtoBuffer): PeerInfo
{.raises: [Defect, DaemonLocalError].} =
if pb.enterSubmessage() == 2:
result = pb.getPeerInfo()
var res: seq[byte]
if pb.getRequiredField(2, res).isOk():
result = initProtoBuffer(res).getPeerInfo()
else:
raise newException(DaemonLocalError, "Missing required field `peer`!")
proc dhtGetSingleValue(pb: var ProtoBuffer): seq[byte]
proc dhtGetSingleValue(pb: ProtoBuffer): seq[byte]
{.raises: [Defect, DaemonLocalError].} =
result = newSeq[byte]()
if pb.getLengthValue(3, result) == -1:
if pb.getRequiredField(3, result).isErr():
raise newException(DaemonLocalError, "Missing field `value`!")
proc dhtGetSinglePublicKey(pb: var ProtoBuffer): PublicKey
proc dhtGetSinglePublicKey(pb: ProtoBuffer): PublicKey
{.raises: [Defect, DaemonLocalError].} =
if pb.getValue(3, result) == -1:
if pb.getRequiredField(3, result).isErr():
raise newException(DaemonLocalError, "Missing field `value`!")
proc dhtGetSinglePeerID(pb: var ProtoBuffer): PeerID
proc dhtGetSinglePeerID(pb: ProtoBuffer): PeerID
{.raises: [Defect, DaemonLocalError].} =
if pb.getValue(3, result) == -1:
if pb.getRequiredField(3, result).isErr():
raise newException(DaemonLocalError, "Missing field `value`!")
proc enterDhtMessage(pb: var ProtoBuffer, rt: DHTResponseType)
proc enterDhtMessage(pb: ProtoBuffer, rt: DHTResponseType): Protobuffer
{.inline, raises: [Defect, DaemonLocalError].} =
var dtype: uint
var res = pb.enterSubmessage()
if res == cast[int](ResponseType.DHT):
if pb.getVarintValue(1, dtype) == 0:
var dhtResponse: seq[byte]
if pb.getRequiredField(ResponseType.DHT.int, dhtResponse).isOk():
var pbDhtResponse = initProtoBuffer(dhtResponse)
var dtype: uint
if pbDhtResponse.getRequiredField(1, dtype).isErr():
raise newException(DaemonLocalError, "Missing required DHT field `type`!")
if dtype != cast[uint](rt):
raise newException(DaemonLocalError, "Wrong DHT answer type! ")
var value: seq[byte]
if pbDhtResponse.getRequiredField(3, value).isErr():
raise newException(DaemonLocalError, "Missing required DHT field `value`!")
return initProtoBuffer(value)
else:
raise newException(DaemonLocalError, "Wrong message type!")
proc enterPsMessage(pb: var ProtoBuffer)
proc enterPsMessage(pb: ProtoBuffer): ProtoBuffer
{.inline, raises: [Defect, DaemonLocalError].} =
var res = pb.enterSubmessage()
if res != cast[int](ResponseType.PUBSUB):
var res: seq[byte]
if pb.getRequiredField(ResponseType.PUBSUB.int, res).isErr():
raise newException(DaemonLocalError, "Wrong message type!")
proc getDhtMessageType(pb: var ProtoBuffer): DHTResponseType
initProtoBuffer(res)
proc getDhtMessageType(pb: ProtoBuffer): DHTResponseType
{.inline, raises: [Defect, DaemonLocalError].} =
var dtype: uint
if pb.getVarintValue(1, dtype) == 0:
if pb.getRequiredField(1, dtype).isErr():
raise newException(DaemonLocalError, "Missing required DHT field `type`!")
if dtype == cast[uint](DHTResponseType.VALUE):
result = DHTResponseType.VALUE
@ -1073,8 +1065,7 @@ proc dhtFindPeer*(api: DaemonAPI, peer: PeerID,
try:
var pb = await transp.transactMessage(requestDHTFindPeer(peer, timeout))
withMessage(pb) do:
pb.enterDhtMessage(DHTResponseType.VALUE)
result = pb.dhtGetSinglePeerInfo()
result = pb.enterDhtMessage(DHTResponseType.VALUE).dhtGetSinglePeerInfo()
finally:
await api.closeConnection(transp)
@ -1088,8 +1079,7 @@ proc dhtGetPublicKey*(api: DaemonAPI, peer: PeerID,
try:
var pb = await transp.transactMessage(requestDHTGetPublicKey(peer, timeout))
withMessage(pb) do:
pb.enterDhtMessage(DHTResponseType.VALUE)
result = pb.dhtGetSinglePublicKey()
result = pb.enterDhtMessage(DHTResponseType.VALUE).dhtGetSinglePublicKey()
finally:
await api.closeConnection(transp)
@ -1103,8 +1093,7 @@ proc dhtGetValue*(api: DaemonAPI, key: string,
try:
var pb = await transp.transactMessage(requestDHTGetValue(key, timeout))
withMessage(pb) do:
pb.enterDhtMessage(DHTResponseType.VALUE)
result = pb.dhtGetSingleValue()
result = pb.enterDhtMessage(DHTResponseType.VALUE).dhtGetSingleValue()
finally:
await api.closeConnection(transp)
@ -1148,7 +1137,7 @@ proc dhtFindPeersConnectedToPeer*(api: DaemonAPI, peer: PeerID,
let spb = requestDHTFindPeersConnectedToPeer(peer, timeout)
var pb = await transp.transactMessage(spb)
withMessage(pb) do:
pb.enterDhtMessage(DHTResponseType.BEGIN)
discard pb.enterDhtMessage(DHTResponseType.BEGIN)
while true:
var message = await transp.recvMessage()
if len(message) == 0:
@ -1173,7 +1162,7 @@ proc dhtGetClosestPeers*(api: DaemonAPI, key: string,
let spb = requestDHTGetClosestPeers(key, timeout)
var pb = await transp.transactMessage(spb)
withMessage(pb) do:
pb.enterDhtMessage(DHTResponseType.BEGIN)
discard pb.enterDhtMessage(DHTResponseType.BEGIN)
while true:
var message = await transp.recvMessage()
if len(message) == 0:
@ -1198,7 +1187,7 @@ proc dhtFindProviders*(api: DaemonAPI, cid: Cid, count: uint32,
let spb = requestDHTFindProviders(cid, count, timeout)
var pb = await transp.transactMessage(spb)
withMessage(pb) do:
pb.enterDhtMessage(DHTResponseType.BEGIN)
discard pb.enterDhtMessage(DHTResponseType.BEGIN)
while true:
var message = await transp.recvMessage()
if len(message) == 0:
@ -1222,7 +1211,7 @@ proc dhtSearchValue*(api: DaemonAPI, key: string,
try:
var pb = await transp.transactMessage(requestDHTSearchValue(key, timeout))
withMessage(pb) do:
pb.enterDhtMessage(DHTResponseType.BEGIN)
discard pb.enterDhtMessage(DHTResponseType.BEGIN)
while true:
var message = await transp.recvMessage()
if len(message) == 0:
@ -1241,12 +1230,9 @@ proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} =
try:
var pb = await transp.transactMessage(requestPSGetTopics())
withMessage(pb) do:
pb.enterPsMessage()
let innerPb = pb.enterPsMessage()
var topics = newSeq[string]()
var topic = ""
while pb.getString(1, topic) != -1:
topics.add(topic)
topic.setLen(0)
discard innerPb.getRepeatedField(1, topics)
result = topics
finally:
await api.closeConnection(transp)
@ -1260,11 +1246,10 @@ proc pubsubListPeers*(api: DaemonAPI,
var pb = await transp.transactMessage(requestPSListPeers(topic))
withMessage(pb) do:
var peer: PeerID
pb.enterPsMessage()
var peers = newSeq[PeerID]()
while pb.getValue(2, peer) != -1:
peers.add(peer)
result = peers
let innerPb = pb.enterPsMessage()
var peers = newSeq[seq[byte]]()
discard innerPb.getRepeatedField(2, peers)
result = peers.mapIt(PeerId.init(it).get())
finally:
await api.closeConnection(transp)
@ -1279,24 +1264,15 @@ proc pubsubPublish*(api: DaemonAPI, topic: string,
finally:
await api.closeConnection(transp)
proc getPubsubMessage*(pb: var ProtoBuffer): PubSubMessage =
proc getPubsubMessage*(pb: ProtoBuffer): PubSubMessage =
result.data = newSeq[byte]()
result.seqno = newSeq[byte]()
discard pb.getValue(1, result.peer)
discard pb.getBytes(2, result.data)
discard pb.getBytes(3, result.seqno)
var item = newSeq[byte]()
while true:
if pb.getBytes(4, item) == -1:
break
var copyitem = item
var stritem = cast[string](copyitem)
if len(result.topics) == 0:
result.topics = newSeq[string]()
result.topics.add(stritem)
item.setLen(0)
discard pb.getValue(5, result.signature)
discard pb.getValue(6, result.key)
discard pb.getField(1, result.peer)
discard pb.getField(2, result.data)
discard pb.getField(3, result.seqno)
discard pb.getRepeatedField(4, result.topics)
discard pb.getField(5, result.signature)
discard pb.getField(6, result.key)
proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} =
while true:

View File

@ -1073,7 +1073,7 @@ proc getField*(pb: var ProtoBuffer, field: int,
else:
err(ProtoError.IncorrectBlob)
proc getRepeatedField*(pb: var ProtoBuffer, field: int,
proc getRepeatedField*(pb: ProtoBuffer, field: int,
value: var seq[MultiAddress]): ProtoResult[bool] {.
inline.} =
var items: seq[seq[byte]]

View File

@ -43,9 +43,6 @@ type
proc new*(T: typedesc[MultistreamSelect]): T =
T(codec: MSCodec)
proc newMultistream*(): MultistreamSelect {.deprecated: "use MultistreamSelect.new".} =
MultistreamSelect.new()
template validateSuffix(str: string): untyped =
if str.endsWith("\n"):
str.removeSuffix("\n")

View File

@ -190,7 +190,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
await m.close()
trace "Stopped mplex handler", m
proc init*(M: type Mplex,
proc new*(M: type Mplex,
conn: Connection,
inTimeout, outTimeout: Duration = DefaultChanTimeout,
maxChannCount: int = MaxChannelCount): Mplex =

View File

@ -58,9 +58,6 @@ proc new*(
muxerProvider.init()
muxerProvider
proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider {.gcsafe, deprecated: "use MuxerProvider.new".} =
MuxerProvider.new(creator, codec)
method init(c: MuxerProvider) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
trace "starting muxer handler", proto=proto, conn

View File

@ -196,21 +196,6 @@ func write*(vb: var VBuffer, pid: PeerID) =
## Write PeerID value ``peerid`` to buffer ``vb``.
vb.writeSeq(pid.data)
func initProtoField*(index: int, pid: PeerID): ProtoField {.deprecated.} =
## Initialize ProtoField with PeerID ``value``.
initProtoField(index, pid.data)
func getValue*(data: var ProtoBuffer, field: int, value: var PeerID): int {.
deprecated.} =
## Read ``PeerID`` from ProtoBuf's message and validate it.
var pid: PeerID
result = getLengthValue(data, field, pid.data)
if result > 0:
if not pid.validate():
result = -1
else:
value = pid
func write*(pb: var ProtoBuffer, field: int, pid: PeerID) =
## Write PeerID value ``peerid`` to object ``pb`` using ProtoBuf's encoding.
write(pb, field, pid.data)

View File

@ -39,7 +39,7 @@ func shortLog*(p: PeerInfo): auto =
)
chronicles.formatIt(PeerInfo): shortLog(it)
proc init*(
proc new*(
p: typedesc[PeerInfo],
key: PrivateKey,
addrs: openarray[MultiAddress] = [],
@ -49,7 +49,7 @@ proc init*(
{.raises: [Defect, PeerInfoError].} =
let pubkey = try:
key.getKey().tryGet()
key.getPublicKey().tryGet()
except CatchableError:
raise newException(PeerInfoError, "invalid private key")

View File

@ -58,7 +58,8 @@ type
BufferOverflow,
MessageTooBig,
BadWireType,
IncorrectBlob
IncorrectBlob,
RequiredFieldMissing
ProtoResult*[T] = Result[T, ProtoError]
@ -115,43 +116,6 @@ proc vsizeof*(field: ProtoField): int {.inline.} =
else:
0
proc initProtoField*(index: int, value: SomeVarint): ProtoField {.deprecated.} =
## Initialize ProtoField with integer value.
result = ProtoField(kind: Varint, index: index)
when type(value) is uint64:
result.vint = value
else:
result.vint = cast[uint64](value)
proc initProtoField*(index: int, value: bool): ProtoField {.deprecated.} =
## Initialize ProtoField with integer value.
result = ProtoField(kind: Varint, index: index)
result.vint = byte(value)
proc initProtoField*(index: int,
value: openarray[byte]): ProtoField {.deprecated.} =
## Initialize ProtoField with bytes array.
result = ProtoField(kind: Length, index: index)
if len(value) > 0:
result.vbuffer = newSeq[byte](len(value))
copyMem(addr result.vbuffer[0], unsafeAddr value[0], len(value))
proc initProtoField*(index: int, value: string): ProtoField {.deprecated.} =
## Initialize ProtoField with string.
result = ProtoField(kind: Length, index: index)
if len(value) > 0:
result.vbuffer = newSeq[byte](len(value))
copyMem(addr result.vbuffer[0], unsafeAddr value[0], len(value))
proc initProtoField*(index: int,
value: ProtoBuffer): ProtoField {.deprecated, inline.} =
## Initialize ProtoField with nested message stored in ``value``.
##
## Note: This procedure performs shallow copy of ``value`` sequence.
result = ProtoField(kind: Length, index: index)
if len(value.buffer) > 0:
shallowCopy(result.vbuffer, value.buffer)
proc initProtoBuffer*(data: seq[byte], offset = 0,
options: set[ProtoFlags] = {}): ProtoBuffer =
## Initialize ProtoBuffer with shallow copy of ``data``.
@ -299,51 +263,6 @@ proc write*(pb: var ProtoBuffer, field: int, value: ProtoBuffer) {.inline.} =
## ``pb`` with field number ``field``.
write(pb, field, value.buffer)
proc write*(pb: var ProtoBuffer, field: ProtoField) {.deprecated.} =
## Encode protobuf's field ``field`` and store it to protobuf's buffer ``pb``.
var length = 0
var res: VarintResult[void]
pb.buffer.setLen(len(pb.buffer) + vsizeof(field))
res = PB.putUVarint(pb.toOpenArray(), length, getProtoHeader(field))
doAssert(res.isOk())
pb.offset += length
case field.kind
of ProtoFieldKind.Varint:
res = PB.putUVarint(pb.toOpenArray(), length, field.vint)
doAssert(res.isOk())
pb.offset += length
of ProtoFieldKind.Fixed64:
doAssert(pb.isEnough(8))
var value = cast[uint64](field.vfloat64)
pb.buffer[pb.offset] = byte(value and 0xFF'u32)
pb.buffer[pb.offset + 1] = byte((value shr 8) and 0xFF'u64)
pb.buffer[pb.offset + 2] = byte((value shr 16) and 0xFF'u64)
pb.buffer[pb.offset + 3] = byte((value shr 24) and 0xFF'u64)
pb.buffer[pb.offset + 4] = byte((value shr 32) and 0xFF'u64)
pb.buffer[pb.offset + 5] = byte((value shr 40) and 0xFF'u64)
pb.buffer[pb.offset + 6] = byte((value shr 48) and 0xFF'u64)
pb.buffer[pb.offset + 7] = byte((value shr 56) and 0xFF'u64)
pb.offset += 8
of ProtoFieldKind.Fixed32:
doAssert(pb.isEnough(4))
var value = cast[uint32](field.vfloat32)
pb.buffer[pb.offset] = byte(value and 0xFF'u32)
pb.buffer[pb.offset + 1] = byte((value shr 8) and 0xFF'u32)
pb.buffer[pb.offset + 2] = byte((value shr 16) and 0xFF'u32)
pb.buffer[pb.offset + 3] = byte((value shr 24) and 0xFF'u32)
pb.offset += 4
of ProtoFieldKind.Length:
res = PB.putUVarint(pb.toOpenArray(), length, uint(len(field.vbuffer)))
doAssert(res.isOk())
pb.offset += length
doAssert(pb.isEnough(len(field.vbuffer)))
if len(field.vbuffer) > 0:
copyMem(addr pb.buffer[pb.offset], unsafeAddr field.vbuffer[0],
len(field.vbuffer))
pb.offset += len(field.vbuffer)
else:
discard
proc finish*(pb: var ProtoBuffer) =
## Prepare protobuf's buffer ``pb`` for writing to stream.
doAssert(len(pb.buffer) > 0)
@ -657,6 +576,17 @@ proc getField*(pb: ProtoBuffer, field: int,
else:
err(res.error)
proc getRequiredField*[T](pb: ProtoBuffer, field: int,
output: var T): ProtoResult[void] {.inline.} =
let res = pb.getField(field, output)
if res.isOk():
if res.get():
ok()
else:
err(RequiredFieldMissing)
else:
err(res.error)
proc getRepeatedField*[T: seq[byte]|string](data: ProtoBuffer, field: int,
output: var seq[T]): ProtoResult[bool] =
checkFieldNumber(field)
@ -733,6 +663,17 @@ proc getRepeatedField*[T: ProtoScalar](data: ProtoBuffer, field: int,
else:
ok(false)
proc getRequiredRepeatedField*[T](pb: ProtoBuffer, field: int,
output: var seq[T]): ProtoResult[void] {.inline.} =
let res = pb.getRepeatedField(field, output)
if res.isOk():
if res.get():
ok()
else:
err(RequiredFieldMissing)
else:
err(res.error)
proc getPackedRepeatedField*[T: ProtoScalar](data: ProtoBuffer, field: int,
output: var seq[T]): ProtoResult[bool] =
checkFieldNumber(field)
@ -787,93 +728,3 @@ proc getPackedRepeatedField*[T: ProtoScalar](data: ProtoBuffer, field: int,
ok(true)
else:
ok(false)
proc getVarintValue*(data: var ProtoBuffer, field: int,
value: var SomeVarint): int {.deprecated.} =
## Get value of `Varint` type.
var length = 0
var header = 0'u64
var soffset = data.offset
if not data.isEmpty() and PB.getUVarint(data.toOpenArray(),
length, header).isOk():
data.offset += length
if header == getProtoHeader(field, Varint):
if not data.isEmpty():
when type(value) is int32 or type(value) is int64 or type(value) is int:
let res = getSVarint(data.toOpenArray(), length, value)
else:
let res = PB.getUVarint(data.toOpenArray(), length, value)
if res.isOk():
data.offset += length
result = length
return
# Restore offset on error
data.offset = soffset
proc getLengthValue*[T: string|seq[byte]](data: var ProtoBuffer, field: int,
buffer: var T): int {.deprecated.} =
## Get value of `Length` type.
var length = 0
var header = 0'u64
var ssize = 0'u64
var soffset = data.offset
result = -1
buffer.setLen(0)
if not data.isEmpty() and PB.getUVarint(data.toOpenArray(),
length, header).isOk():
data.offset += length
if header == getProtoHeader(field, Length):
if not data.isEmpty() and PB.getUVarint(data.toOpenArray(),
length, ssize).isOk():
data.offset += length
if ssize <= MaxMessageSize and data.isEnough(int(ssize)):
buffer.setLen(ssize)
# Protobuf allow zero-length values.
if ssize > 0'u64:
copyMem(addr buffer[0], addr data.buffer[data.offset], ssize)
result = int(ssize)
data.offset += int(ssize)
return
# Restore offset on error
data.offset = soffset
proc getBytes*(data: var ProtoBuffer, field: int,
buffer: var seq[byte]): int {.deprecated, inline.} =
## Get value of `Length` type as bytes.
result = getLengthValue(data, field, buffer)
proc getString*(data: var ProtoBuffer, field: int,
buffer: var string): int {.deprecated, inline.} =
## Get value of `Length` type as string.
result = getLengthValue(data, field, buffer)
proc enterSubmessage*(pb: var ProtoBuffer): int {.deprecated.} =
## Processes protobuf's sub-message and adjust internal offset to enter
## inside of sub-message. Returns field index of sub-message field or
## ``0`` on error.
var length = 0
var header = 0'u64
var msize = 0'u64
var soffset = pb.offset
if not pb.isEmpty() and PB.getUVarint(pb.toOpenArray(),
length, header).isOk():
pb.offset += length
if (header and 0x07'u64) == cast[uint64](ProtoFieldKind.Length):
if not pb.isEmpty() and PB.getUVarint(pb.toOpenArray(),
length, msize).isOk():
pb.offset += length
if msize <= MaxMessageSize and pb.isEnough(int(msize)):
pb.length = int(msize)
result = int(header shr 3)
return
# Restore offset on error
pb.offset = soffset
proc skipSubmessage*(pb: var ProtoBuffer) {.deprecated.} =
## Skip current protobuf's sub-message and adjust internal offset to the
## end of sub-message.
doAssert(pb.length != 0)
pb.offset += pb.length
pb.length = 0

View File

@ -122,9 +122,6 @@ proc new*(T: typedesc[Identify], peerInfo: PeerInfo): T =
identify.init()
identify
proc newIdentify*(peerInfo: PeerInfo): Identify {.deprecated: "use Identify.new".} =
Identify.new(peerInfo)
method init*(p: Identify) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
try:

View File

@ -49,7 +49,7 @@ method init*(p: Ping) =
var buf: array[PingSize, byte]
await conn.readExactly(addr buf[0], PingSize)
trace "echoing ping", conn
await conn.write(addr buf[0], PingSize)
await conn.write(@buf)
if not isNil(p.pingHandler):
await p.pingHandler(conn.peerId)
except CancelledError as exc:
@ -79,7 +79,7 @@ proc ping*(
let startTime = Moment.now()
trace "sending ping", conn
await conn.write(addr randomBuf[0], randomBuf.len)
await conn.write(@randomBuf)
await conn.readExactly(addr resultBuf[0], PingSize)

View File

@ -16,6 +16,7 @@ import ./pubsub,
./timedcache,
./peertable,
./rpc/[message, messages],
../../crypto/crypto,
../../stream/connection,
../../peerid,
../../peerinfo,
@ -207,8 +208,7 @@ method initPubSub*(f: FloodSub)
{.raises: [Defect, InitializationError].} =
procCall PubSub(f).initPubSub()
f.seen = TimedCache[MessageID].init(2.minutes)
var rng = newRng()
f.seenSalt = newSeqUninitialized[byte](sizeof(Hash))
brHmacDrbgGenerate(rng[], f.seenSalt)
brHmacDrbgGenerate(f.rng[], f.seenSalt)
f.init()

View File

@ -9,7 +9,7 @@
{.push raises: [Defect].}
import std/[tables, sets, options, sequtils, random]
import std/[tables, sets, options, sequtils]
import chronos, chronicles, metrics
import ./pubsub,
./floodsub,
@ -297,9 +297,8 @@ method rpcHandler*(g: GossipSub,
template msg: untyped = rpcMsg.messages[i]
let msgId = g.msgIdProvider(msg)
# avoid the remote peer from controlling the seen table hashing
# by adding random bytes to the ID we ensure we randomize the IDs
# we do only for seen as this is the great filter from the external world
# addSeen adds salt to msgId to avoid
# remote attacking the hash function
if g.addSeen(msgId):
trace "Dropping already-seen message", msgId = shortLog(msgId), peer
# make sure to update score tho before continuing
@ -503,9 +502,9 @@ proc maintainDirectPeers(g: GossipSub) {.async.} =
let _ = await g.switch.dial(id, addrs, g.codecs)
# populate the peer after it's connected
discard g.getOrCreatePeer(id, g.codecs)
except CancelledError:
except CancelledError as exc:
trace "Direct peer dial canceled"
raise
raise exc
except CatchableError as exc:
debug "Direct peer error dialing", msg = exc.msg
@ -548,8 +547,6 @@ method initPubSub*(g: GossipSub)
if validationRes.isErr:
raise newException(InitializationError, $validationRes.error)
randomize()
# init the floodsub stuff here, we customize timedcache in gossip!
g.seen = TimedCache[MessageID].init(g.parameters.seenTTL)

View File

@ -10,7 +10,6 @@
{.push raises: [Defect].}
import std/[tables, sequtils, sets, algorithm]
import random # for shuffle
import chronos, chronicles, metrics
import "."/[types, scoring]
import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub]
@ -215,7 +214,7 @@ proc handleIHave*(g: GossipSub,
break
# shuffling res.messageIDs before sending it out to increase the likelihood
# of getting an answer if the peer truncates the list due to internal size restrictions.
shuffle(res.messageIDs)
g.rng.shuffle(res.messageIDs)
return res
proc handleIWant*(g: GossipSub,
@ -282,7 +281,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
)
# shuffle anyway, score might be not used
shuffle(candidates)
g.rng.shuffle(candidates)
# sort peers by score, high score first since we graft
candidates.sort(byScore, SortOrder.Descending)
@ -318,7 +317,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
)
# shuffle anyway, score might be not used
shuffle(candidates)
g.rng.shuffle(candidates)
# sort peers by score, high score first, we are grafting
candidates.sort(byScore, SortOrder.Descending)
@ -350,7 +349,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafts
# shuffle anyway, score might be not used
shuffle(prunes)
g.rng.shuffle(prunes)
# sort peers by score (inverted), pruning, so low score peers are on top
prunes.sort(byScore, SortOrder.Ascending)
@ -382,7 +381,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
if pruneLen > 0:
# Ok we got some peers to prune,
# for this heartbeat let's prune those
shuffle(prunes)
g.rng.shuffle(prunes)
prunes.setLen(pruneLen)
trace "pruning", prunes = prunes.len
@ -519,7 +518,7 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
# similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101
# and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582
if midsSeq.len > IHaveMaxLength:
shuffle(midsSeq)
g.rng.shuffle(midsSeq)
midsSeq.setLen(IHaveMaxLength)
let
@ -540,7 +539,7 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
target = min(factor, allPeers.len)
if target < allPeers.len:
shuffle(allPeers)
g.rng.shuffle(allPeers)
allPeers.setLen(target)
for peer in allPeers:

View File

@ -10,11 +10,12 @@
{.push raises: [Defect].}
import std/[tables, sequtils, sets, strutils]
import chronos, chronicles, metrics
import chronos, chronicles, metrics, bearssl
import ./pubsubpeer,
./rpc/[message, messages, protobuf],
../../switch,
../protocol,
../../crypto/crypto,
../../stream/connection,
../../peerid,
../../peerinfo,
@ -106,6 +107,7 @@ type
anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send
subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions
topicsHigh*: int # the maximum number of topics a peer is allowed to subscribe to
rng*: ref BrHmacDrbgContext
knownTopics*: HashSet[string]
@ -538,6 +540,7 @@ proc init*[PubParams: object | bool](
sign: bool = true,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
subscriptionValidator: SubscriptionValidator = nil,
rng: ref BrHmacDrbgContext = newRng(),
parameters: PubParams = false): P
{.raises: [Defect, InitializationError].} =
let pubsub =
@ -550,6 +553,7 @@ proc init*[PubParams: object | bool](
sign: sign,
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
rng: rng,
topicsHigh: int.high)
else:
P(switch: switch,
@ -561,6 +565,7 @@ proc init*[PubParams: object | bool](
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
parameters: parameters,
rng: rng,
topicsHigh: int.high)
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =

View File

@ -186,8 +186,8 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
try:
if p.onEvent != nil:
p.onEvent(p, PubsubPeerEvent(kind: PubSubPeerEventKind.Disconnected))
except CancelledError:
raise
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Errors during diconnection events", error = exc.msg
@ -289,18 +289,3 @@ proc new*(
codec: codec,
peerId: peerId,
)
proc newPubSubPeer*(
peerId: PeerID,
getConn: GetConn,
dropConn: DropConn,
onEvent: OnEvent,
codec: string): PubSubPeer {.deprecated: "use PubSubPeer.new".} =
PubSubPeer.new(
peerId,
getConn,
dropConn,
onEvent,
codec
)

View File

@ -565,7 +565,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon
raise newException(NoiseHandshakeError, "Invalid remote peer id")
conn.peerId = pid.get()
var tmp = NoiseConnection.init(conn, conn.peerId, conn.observedAddr)
var tmp = NoiseConnection.new(conn, conn.peerId, conn.observedAddr)
if initiator:
tmp.readCs = handshakeRes.cs2
@ -615,10 +615,3 @@ proc new*(
noise.init()
noise
proc newNoise*(
rng: ref BrHmacDrbgContext,
privateKey: PrivateKey,
outgoing: bool = true,
commonPrologue: seq[byte] = @[]): Noise {.deprecated: "use Noise.new".}=
Noise.new(rng, privateKey, outgoing, commonPrologue)

View File

@ -29,6 +29,3 @@ proc new*(T: typedesc[PlainText]): T =
let plainText = T()
plainText.init()
plainText
proc newPlainText*(): PlainText {.deprecated: "use PlainText.new".} =
PlainText.new()

View File

@ -263,7 +263,7 @@ proc newSecioConn(conn: Connection,
## cipher algorithm ``cipher``, stretched keys ``secrets`` and order
## ``order``.
result = SecioConn.init(conn, conn.peerId, conn.observedAddr)
result = SecioConn.new(conn, conn.peerId, conn.observedAddr)
let i0 = if order < 0: 1 else: 0
let i1 = if order < 0: 0 else: 1
@ -441,6 +441,3 @@ proc new*(
)
secio.init()
secio
proc newSecio*(rng: ref BrHmacDrbgContext, localPrivateKey: PrivateKey): Secio {.deprecated: "use Secio.new".} =
Secio.new(rng, localPrivateKey)

View File

@ -42,7 +42,7 @@ func shortLog*(conn: SecureConn): auto =
chronicles.formatIt(SecureConn): shortLog(it)
proc init*(T: type SecureConn,
proc new*(T: type SecureConn,
conn: Connection,
peerId: PeerId,
observedAddr: Multiaddress,

View File

@ -65,10 +65,6 @@ proc new*(
bufferStream.initStream()
bufferStream
proc newBufferStream*(
timeout: Duration = DefaultConnectionTimeout): BufferStream {.deprecated: "use BufferStream.new".} =
return BufferStream.new(timeout)
method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} =
## Write bytes to internal read buffer, use this to fill up the
## buffer with data.

View File

@ -151,7 +151,7 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} =
if not await s.pollActivity():
return
proc init*(C: type Connection,
proc new*(C: type Connection,
peerId: PeerId,
dir: Direction,
timeout: Duration = DefaultConnectionTimeout,

View File

@ -180,7 +180,7 @@ proc readExactly*(s: LPStream,
proc readLine*(s: LPStream,
limit = 0,
sep = "\r\n"): Future[string]
{.async, deprecated: "todo".} =
{.async.} =
# TODO replace with something that exploits buffering better
var lim = if limit <= 0: -1 else: limit
var state = 0
@ -255,9 +255,6 @@ proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] =
proc writeLp*(s: LPStream, msg: string): Future[void] =
writeLp(s, msg.toOpenArrayByte(0, msg.high))
proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecated: "seq".} =
s.write(@(toOpenArray(cast[ptr UncheckedArray[byte]](pbytes), 0, nbytes - 1)))
proc write*(s: LPStream, msg: string): Future[void] =
s.write(msg.toBytes())

View File

@ -114,13 +114,6 @@ proc connHandler*(self: TcpTransport,
return conn
proc init*(
T: typedesc[TcpTransport],
flags: set[ServerFlags] = {},
upgrade: Upgrade): T {.deprecated: "use .new".} =
T.new(flags, upgrade)
proc new*(
T: typedesc[TcpTransport],
flags: set[ServerFlags] = {},

View File

@ -34,7 +34,7 @@ type
WsStream = ref object of Connection
session: WSSession
proc init*(T: type WsStream,
proc new*(T: type WsStream,
session: WSSession,
dir: Direction,
timeout = 10.minutes,
@ -170,7 +170,7 @@ proc connHandler(self: WsTransport,
await stream.close()
raise exc
let conn = WsStream.init(stream, dir)
let conn = WsStream.new(stream, dir)
conn.observedAddr = observedAddr
self.connections[dir].add(conn)

View File

@ -189,7 +189,7 @@ proc muxerHandler(
await muxer.close()
trace "Exception in muxer handler", conn, msg = exc.msg
proc init*(
proc new*(
T: type MuxedUpgrade,
identity: Identify,
muxers: Table[string, MuxerProvider],

View File

@ -83,9 +83,6 @@ proc new*(T: typedesc[TestBufferStream], writeHandler: WriteHandler): T =
testBufferStream.initStream()
testBufferStream
proc newBufferStream*(writeHandler: WriteHandler): TestBufferStream {.deprecated: "use TestBufferStream.new".}=
TestBufferStream.new(writeHandler)
proc checkExpiringInternal(cond: proc(): bool {.raises: [Defect].} ): Future[bool] {.async, gcsafe.} =
{.gcsafe.}:
let start = Moment.now()

View File

@ -14,7 +14,7 @@ suite "Message":
test "signature":
var seqno = 11'u64
let
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
peer = PeerInfo.new(PrivateKey.random(ECDSA, rng[]).get())
msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true)
check verify(msg)

View File

@ -18,16 +18,16 @@ method newStream*(
name: string = "",
lazy: bool = false):
Future[Connection] {.async, gcsafe.} =
result = Connection.init(m.peerId, Direction.Out)
result = Connection.new(m.peerId, Direction.Out)
suite "Connection Manager":
teardown:
checkTrackers()
asyncTest "add and retrieve a connection":
let connMngr = ConnManager.init()
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.init(peerId, Direction.In)
let conn = Connection.new(peerId, Direction.In)
connMngr.storeConn(conn)
check conn in connMngr
@ -39,9 +39,9 @@ suite "Connection Manager":
await connMngr.close()
asyncTest "shouldn't allow a closed connection":
let connMngr = ConnManager.init()
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.init(peerId, Direction.In)
let conn = Connection.new(peerId, Direction.In)
await conn.close()
expect CatchableError:
@ -50,9 +50,9 @@ suite "Connection Manager":
await connMngr.close()
asyncTest "shouldn't allow an EOFed connection":
let connMngr = ConnManager.init()
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.init(peerId, Direction.In)
let conn = Connection.new(peerId, Direction.In)
conn.isEof = true
expect CatchableError:
@ -62,9 +62,9 @@ suite "Connection Manager":
await connMngr.close()
asyncTest "add and retrieve a muxer":
let connMngr = ConnManager.init()
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.init(peerId, Direction.In)
let conn = Connection.new(peerId, Direction.In)
let muxer = new Muxer
muxer.connection = conn
@ -78,9 +78,9 @@ suite "Connection Manager":
await connMngr.close()
asyncTest "shouldn't allow a muxer for an untracked connection":
let connMngr = ConnManager.init()
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.init(peerId, Direction.In)
let conn = Connection.new(peerId, Direction.In)
let muxer = new Muxer
muxer.connection = conn
@ -92,10 +92,10 @@ suite "Connection Manager":
await connMngr.close()
asyncTest "get conn with direction":
let connMngr = ConnManager.init()
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn1 = Connection.init(peerId, Direction.Out)
let conn2 = Connection.init(peerId, Direction.In)
let conn1 = Connection.new(peerId, Direction.Out)
let conn2 = Connection.new(peerId, Direction.In)
connMngr.storeConn(conn1)
connMngr.storeConn(conn2)
@ -112,9 +112,9 @@ suite "Connection Manager":
await connMngr.close()
asyncTest "get muxed stream for peer":
let connMngr = ConnManager.init()
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.init(peerId, Direction.In)
let conn = Connection.new(peerId, Direction.In)
let muxer = new TestMuxer
muxer.peerId = peerId
@ -132,9 +132,9 @@ suite "Connection Manager":
await stream.close()
asyncTest "get stream from directed connection":
let connMngr = ConnManager.init()
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.init(peerId, Direction.In)
let conn = Connection.new(peerId, Direction.In)
let muxer = new TestMuxer
muxer.peerId = peerId
@ -153,9 +153,9 @@ suite "Connection Manager":
await stream1.close()
asyncTest "get stream from any connection":
let connMngr = ConnManager.init()
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.init(peerId, Direction.In)
let conn = Connection.new(peerId, Direction.In)
let muxer = new TestMuxer
muxer.peerId = peerId
@ -172,14 +172,14 @@ suite "Connection Manager":
await stream.close()
asyncTest "should raise on too many connections":
let connMngr = ConnManager.init(maxConnsPerPeer = 1)
let connMngr = ConnManager.new(maxConnsPerPeer = 1)
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
connMngr.storeConn(Connection.init(peerId, Direction.In))
connMngr.storeConn(Connection.new(peerId, Direction.In))
let conns = @[
Connection.init(peerId, Direction.In),
Connection.init(peerId, Direction.In)]
Connection.new(peerId, Direction.In),
Connection.new(peerId, Direction.In)]
expect TooManyConnectionsError:
connMngr.storeConn(conns[0])
@ -191,9 +191,9 @@ suite "Connection Manager":
allFutures(conns.mapIt( it.close() )))
asyncTest "cleanup on connection close":
let connMngr = ConnManager.init()
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.init(peerId, Direction.In)
let conn = Connection.new(peerId, Direction.In)
let muxer = new Muxer
muxer.connection = conn
@ -212,7 +212,7 @@ suite "Connection Manager":
await connMngr.close()
asyncTest "drop connections for peer":
let connMngr = ConnManager.init()
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
for i in 0..<2:
@ -220,7 +220,7 @@ suite "Connection Manager":
Direction.In else:
Direction.Out
let conn = Connection.init(peerId, dir)
let conn = Connection.new(peerId, dir)
let muxer = new Muxer
muxer.connection = conn
@ -241,13 +241,13 @@ suite "Connection Manager":
await connMngr.close()
asyncTest "track total incoming connection limits":
let connMngr = ConnManager.init(maxConnections = 3)
let connMngr = ConnManager.new(maxConnections = 3)
var conns: seq[Connection]
for i in 0..<3:
let conn = connMngr.trackIncomingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -258,7 +258,7 @@ suite "Connection Manager":
# should timeout adding a connection over the limit
let conn = connMngr.trackIncomingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -270,13 +270,13 @@ suite "Connection Manager":
allFutures(conns.mapIt( it.close() )))
asyncTest "track total outgoing connection limits":
let connMngr = ConnManager.init(maxConnections = 3)
let connMngr = ConnManager.new(maxConnections = 3)
var conns: seq[Connection]
for i in 0..<3:
let conn = await connMngr.trackOutgoingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -287,7 +287,7 @@ suite "Connection Manager":
expect TooManyConnectionsError:
discard await connMngr.trackOutgoingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -297,13 +297,13 @@ suite "Connection Manager":
allFutures(conns.mapIt( it.close() )))
asyncTest "track both incoming and outgoing total connections limits - fail on incoming":
let connMngr = ConnManager.init(maxConnections = 3)
let connMngr = ConnManager.new(maxConnections = 3)
var conns: seq[Connection]
for i in 0..<3:
let conn = await connMngr.trackOutgoingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -313,7 +313,7 @@ suite "Connection Manager":
# should timeout adding a connection over the limit
let conn = connMngr.trackIncomingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -325,13 +325,13 @@ suite "Connection Manager":
allFutures(conns.mapIt( it.close() )))
asyncTest "track both incoming and outgoing total connections limits - fail on outgoing":
let connMngr = ConnManager.init(maxConnections = 3)
let connMngr = ConnManager.new(maxConnections = 3)
var conns: seq[Connection]
for i in 0..<3:
let conn = connMngr.trackIncomingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -343,7 +343,7 @@ suite "Connection Manager":
expect TooManyConnectionsError:
discard await connMngr.trackOutgoingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -353,13 +353,13 @@ suite "Connection Manager":
allFutures(conns.mapIt( it.close() )))
asyncTest "track max incoming connection limits":
let connMngr = ConnManager.init(maxIn = 3)
let connMngr = ConnManager.new(maxIn = 3)
var conns: seq[Connection]
for i in 0..<3:
let conn = connMngr.trackIncomingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -370,7 +370,7 @@ suite "Connection Manager":
# should timeout adding a connection over the limit
let conn = connMngr.trackIncomingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -382,13 +382,13 @@ suite "Connection Manager":
allFutures(conns.mapIt( it.close() )))
asyncTest "track max outgoing connection limits":
let connMngr = ConnManager.init(maxOut = 3)
let connMngr = ConnManager.new(maxOut = 3)
var conns: seq[Connection]
for i in 0..<3:
let conn = await connMngr.trackOutgoingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -399,7 +399,7 @@ suite "Connection Manager":
expect TooManyConnectionsError:
discard await connMngr.trackOutgoingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -409,13 +409,13 @@ suite "Connection Manager":
allFutures(conns.mapIt( it.close() )))
asyncTest "track incoming max connections limits - fail on incoming":
let connMngr = ConnManager.init(maxOut = 3)
let connMngr = ConnManager.new(maxOut = 3)
var conns: seq[Connection]
for i in 0..<3:
let conn = await connMngr.trackOutgoingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -425,7 +425,7 @@ suite "Connection Manager":
# should timeout adding a connection over the limit
let conn = connMngr.trackIncomingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -437,13 +437,13 @@ suite "Connection Manager":
allFutures(conns.mapIt( it.close() )))
asyncTest "track incoming max connections limits - fail on outgoing":
let connMngr = ConnManager.init(maxIn = 3)
let connMngr = ConnManager.new(maxIn = 3)
var conns: seq[Connection]
for i in 0..<3:
let conn = connMngr.trackIncomingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)
@ -455,7 +455,7 @@ suite "Connection Manager":
expect TooManyConnectionsError:
discard await connMngr.trackOutgoingConn(
proc(): Future[Connection] {.async.} =
return Connection.init(
return Connection.new(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
)

View File

@ -12,6 +12,7 @@
import unittest2
import nimcrypto/[utils, sysrand]
import ../libp2p/crypto/[crypto, chacha20poly1305, curve25519, hkdf]
import bearssl
when defined(nimHasUsed): {.used.}
@ -545,3 +546,10 @@ suite "Key interface test suite":
sha256.hkdf(salt, ikm, info, output)
check output[0].toHex(true) == truth
test "shuffle":
var cards = ["Ace", "King", "Queen", "Jack", "Ten"]
var rng = (ref BrHmacDrbgContext)()
brHmacDrbgInit(addr rng[], addr sha256Vtable, nil, 0)
rng.shuffle(cards)
check cards == ["King", "Ten", "Ace", "Queen", "Jack"]

View File

@ -38,7 +38,7 @@ suite "Identify":
asyncSetup:
ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
remoteSecKey = PrivateKey.random(ECDSA, rng[]).get()
remotePeerInfo = PeerInfo.init(
remotePeerInfo = PeerInfo.new(
remoteSecKey, [ma], ["/test/proto1/1.0.0", "/test/proto2/1.0.0"])
transport1 = TcpTransport.new(upgrade = Upgrade())
@ -117,7 +117,7 @@ suite "Identify":
conn = await transport2.dial(transport1.ma)
expect IdentityNoMatchError:
let pi2 = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
let pi2 = PeerInfo.new(PrivateKey.random(ECDSA, rng[]).get())
discard await msDial.select(conn, IdentifyCodec)
discard await identifyProto2.identify(conn, pi2.peerId)
@ -192,7 +192,7 @@ suite "Identify":
switch1.peerStore.protoBook.get(switch2.peerInfo.peerId) != switch2.peerInfo.protocols.toHashSet()
let oldPeerId = switch2.peerInfo.peerId
switch2.peerInfo = PeerInfo.init(PrivateKey.random(newRng()[]).get())
switch2.peerInfo = PeerInfo.new(PrivateKey.random(newRng()[]).get())
await identifyPush2.push(switch2.peerInfo, conn)

View File

@ -385,7 +385,7 @@ suite "Mplex":
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(1024)
@ -399,7 +399,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream()
await stream.writeLp("HELLO")
@ -422,7 +422,7 @@ suite "Mplex":
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(1024)
@ -436,7 +436,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let stream = await mplexDial.newStream(lazy = true)
let mplexDialFut = mplexDial.handle()
check not LPChannel(stream).isOpen # assert lazy
@ -467,7 +467,7 @@ suite "Mplex":
proc acceptHandler() {.async, gcsafe.} =
try:
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(MaxMsgSize)
@ -488,7 +488,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream()
@ -513,7 +513,7 @@ suite "Mplex":
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
await stream.writeLp("Hello from stream!")
@ -526,7 +526,7 @@ suite "Mplex":
let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler()
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream("DIALER")
let msg = string.fromBytes(await stream.readLp(1024))
@ -551,7 +551,7 @@ suite "Mplex":
proc acceptHandler() {.async, gcsafe.} =
var count = 1
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(1024)
@ -568,7 +568,7 @@ suite "Mplex":
let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler()
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
# TODO: Reenable once half-closed is working properly
let mplexDialFut = mplexDial.handle()
for i in 1..10:
@ -595,7 +595,7 @@ suite "Mplex":
proc acceptHandler() {.async, gcsafe.} =
var count = 1
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(1024)
@ -613,7 +613,7 @@ suite "Mplex":
let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler()
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
for i in 1..10:
let stream = await mplexDial.newStream("dialer stream")
@ -639,7 +639,7 @@ suite "Mplex":
var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
@ -660,7 +660,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams: seq[Connection]
for i in 0..9:
@ -689,7 +689,7 @@ suite "Mplex":
var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
listenStreams.add(stream)
@ -708,7 +708,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams: seq[Connection]
for i in 0..9:
@ -752,7 +752,7 @@ suite "Mplex":
var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
listenStreams.add(stream)
@ -767,7 +767,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams: seq[Connection]
for i in 0..9:
@ -795,7 +795,7 @@ suite "Mplex":
var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
mplexListen = Mplex.init(conn)
mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
listenStreams.add(stream)
@ -810,7 +810,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams: seq[Connection]
for i in 0..9:
@ -838,7 +838,7 @@ suite "Mplex":
var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
listenStreams.add(stream)
@ -854,7 +854,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams: seq[Connection]
for i in 0..9:
@ -880,7 +880,7 @@ suite "Mplex":
var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
listenStreams.add(stream)
@ -895,7 +895,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams: seq[Connection]
for i in 0..9:
@ -923,7 +923,7 @@ suite "Mplex":
var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} =
listenConn = await transport1.accept()
let mplexListen = Mplex.init(listenConn)
let mplexListen = Mplex.new(listenConn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
listenStreams.add(stream)
@ -938,7 +938,7 @@ suite "Mplex":
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams: seq[Connection]
for i in 0..9:
@ -970,7 +970,7 @@ suite "Mplex":
const MsgSize = 1024
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
try:
@ -988,7 +988,7 @@ suite "Mplex":
let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler()
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream()
var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1)
@ -1042,7 +1042,7 @@ suite "Mplex":
const MsgSize = 512
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
let mplexListen = Mplex.init(conn)
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
let msg = await stream.readLp(MsgSize)
@ -1057,7 +1057,7 @@ suite "Mplex":
let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler()
let mplexDial = Mplex.init(conn)
let mplexDial = Mplex.new(conn)
let stream = await mplexDial.newStream()
let mplexDialFut = mplexDial.handle()
var bigseq = newSeqOfCap[uint8](MsgSize + 1)

View File

@ -53,11 +53,11 @@ method init(p: TestProto) {.gcsafe.} =
proc createSwitch(ma: MultiAddress; outgoing: bool, secio: bool = false): (Switch, PeerInfo) =
var
privateKey = PrivateKey.random(ECDSA, rng[]).get()
peerInfo = PeerInfo.init(privateKey)
peerInfo = PeerInfo.new(privateKey)
peerInfo.addrs.add(ma)
proc createMplex(conn: Connection): Muxer =
result = Mplex.init(conn)
result = Mplex.new(conn)
let
identify = Identify.new(peerInfo)
@ -67,9 +67,9 @@ proc createSwitch(ma: MultiAddress; outgoing: bool, secio: bool = false): (Switc
[Secure(Secio.new(rng, privateKey))]
else:
[Secure(Noise.new(rng, privateKey, outgoing = outgoing))]
connManager = ConnManager.init()
connManager = ConnManager.new()
ms = MultistreamSelect.new()
muxedUpgrade = MuxedUpgrade.init(identify, muxers, secureManagers, connManager, ms)
muxedUpgrade = MuxedUpgrade.new(identify, muxers, secureManagers, connManager, ms)
transports = @[Transport(TcpTransport.new(upgrade = muxedUpgrade))]
let switch = newSwitch(
@ -90,7 +90,7 @@ suite "Noise":
let
server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
serverPrivKey = PrivateKey.random(ECDSA, rng[]).get()
serverInfo = PeerInfo.init(serverPrivKey, [server])
serverInfo = PeerInfo.new(serverPrivKey, [server])
serverNoise = Noise.new(rng, serverPrivKey, outgoing = false)
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
@ -109,7 +109,7 @@ suite "Noise":
acceptFut = acceptHandler()
transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
clientPrivKey = PrivateKey.random(ECDSA, rng[]).get()
clientInfo = PeerInfo.init(clientPrivKey, [transport1.ma])
clientInfo = PeerInfo.new(clientPrivKey, [transport1.ma])
clientNoise = Noise.new(rng, clientPrivKey, outgoing = true)
conn = await transport2.dial(transport1.ma)
@ -131,7 +131,7 @@ suite "Noise":
let
server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
serverPrivKey = PrivateKey.random(ECDSA, rng[]).get()
serverInfo = PeerInfo.init(serverPrivKey, [server])
serverInfo = PeerInfo.new(serverPrivKey, [server])
serverNoise = Noise.new(rng, serverPrivKey, outgoing = false)
let
@ -153,7 +153,7 @@ suite "Noise":
handlerWait = acceptHandler()
transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
clientPrivKey = PrivateKey.random(ECDSA, rng[]).get()
clientInfo = PeerInfo.init(clientPrivKey, [transport1.ma])
clientInfo = PeerInfo.new(clientPrivKey, [transport1.ma])
clientNoise = Noise.new(rng, clientPrivKey, outgoing = true, commonPrologue = @[1'u8, 2'u8, 3'u8])
conn = await transport2.dial(transport1.ma)
conn.peerId = serverInfo.peerId
@ -171,7 +171,7 @@ suite "Noise":
let
server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
serverPrivKey = PrivateKey.random(ECDSA, rng[]).get()
serverInfo = PeerInfo.init(serverPrivKey, [server])
serverInfo = PeerInfo.new(serverPrivKey, [server])
serverNoise = Noise.new(rng, serverPrivKey, outgoing = false)
readTask = newFuture[void]()
@ -193,7 +193,7 @@ suite "Noise":
acceptFut = acceptHandler()
transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
clientPrivKey = PrivateKey.random(ECDSA, rng[]).get()
clientInfo = PeerInfo.init(clientPrivKey, [transport1.ma])
clientInfo = PeerInfo.new(clientPrivKey, [transport1.ma])
clientNoise = Noise.new(rng, clientPrivKey, outgoing = true)
conn = await transport2.dial(transport1.ma)
conn.peerId = serverInfo.peerId
@ -210,7 +210,7 @@ suite "Noise":
let
server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
serverPrivKey = PrivateKey.random(ECDSA, rng[]).get()
serverInfo = PeerInfo.init(serverPrivKey, [server])
serverInfo = PeerInfo.new(serverPrivKey, [server])
serverNoise = Noise.new(rng, serverPrivKey, outgoing = false)
readTask = newFuture[void]()
@ -235,7 +235,7 @@ suite "Noise":
acceptFut = acceptHandler()
transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
clientPrivKey = PrivateKey.random(ECDSA, rng[]).get()
clientInfo = PeerInfo.init(clientPrivKey, [transport1.ma])
clientInfo = PeerInfo.new(clientPrivKey, [transport1.ma])
clientNoise = Noise.new(rng, clientPrivKey, outgoing = true)
conn = await transport2.dial(transport1.ma)
conn.peerId = serverInfo.peerId

View File

@ -11,7 +11,7 @@ import ./helpers
suite "PeerInfo":
test "Should init with private key":
let seckey = PrivateKey.random(ECDSA, rng[]).get()
var peerInfo = PeerInfo.init(seckey)
var peerInfo = PeerInfo.new(seckey)
var peerId = PeerID.init(seckey).get()
check peerId == peerInfo.peerId

View File

@ -41,8 +41,8 @@ suite "Ping":
pingProto1 = Ping.new()
pingProto2 = Ping.new(handlePing)
msListen = newMultistream()
msDial = newMultistream()
msListen = MultistreamSelect.new()
msDial = MultistreamSelect.new()
pingReceivedCount = 0
@ -91,7 +91,7 @@ suite "Ping":
buf: array[32, byte]
fakebuf: array[32, byte]
await conn.readExactly(addr buf[0], 32)
await conn.write(addr fakebuf[0], 32)
await conn.write(@fakebuf)
fakePingProto.codec = PingCodec
fakePingProto.handler = fakeHandle

View File

@ -534,7 +534,7 @@ suite "Switch":
# use same private keys to emulate two connection from same peer
let
privateKey = PrivateKey.random(rng[]).tryGet()
peerInfo = PeerInfo.init(privateKey)
peerInfo = PeerInfo.new(privateKey)
var switches: seq[Switch]
var done = newFuture[void]()
@ -577,7 +577,7 @@ suite "Switch":
# use same private keys to emulate two connection from same peer
let
privateKey = PrivateKey.random(rng[]).tryGet()
peerInfo = PeerInfo.init(privateKey)
peerInfo = PeerInfo.new(privateKey)
var conns = 1
var switches: seq[Switch]
@ -736,7 +736,7 @@ suite "Switch":
discard await switch2.start()
let someAddr = MultiAddress.init("/ip4/127.128.0.99").get()
let seckey = PrivateKey.random(ECDSA, rng[]).get()
let somePeer = PeerInfo.init(secKey, [someAddr])
let somePeer = PeerInfo.new(secKey, [someAddr])
expect(DialFailedError):
discard await switch2.dial(somePeer.peerId, somePeer.addrs, TestCodec)
await switch2.stop()