mirror of https://github.com/vacp2p/nim-libp2p.git
feat: move connection handling logic to base class
This commit is contained in:
parent
871d8d7478
commit
9aa9e97602
|
@ -10,6 +10,7 @@ import options
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
import nimcrypto/[sysrand, hmac, sha2, sha, hash, rijndael, twofish, bcmode]
|
import nimcrypto/[sysrand, hmac, sha2, sha, hash, rijndael, twofish, bcmode]
|
||||||
import secure,
|
import secure,
|
||||||
|
secureconn,
|
||||||
../../connection,
|
../../connection,
|
||||||
../../peerinfo,
|
../../peerinfo,
|
||||||
../../stream/lpstream,
|
../../stream/lpstream,
|
||||||
|
@ -62,7 +63,7 @@ type
|
||||||
of SecureMacType.Sha1:
|
of SecureMacType.Sha1:
|
||||||
ctxsha1: HMAC[sha1]
|
ctxsha1: HMAC[sha1]
|
||||||
|
|
||||||
SecureConnection = ref object of Connection
|
SecioConn = ref object of SecureConn
|
||||||
writerMac: SecureMac
|
writerMac: SecureMac
|
||||||
readerMac: SecureMac
|
readerMac: SecureMac
|
||||||
writerCoder: SecureCipher
|
writerCoder: SecureCipher
|
||||||
|
@ -149,7 +150,7 @@ proc decrypt(cipher: var SecureCipher, input: openarray[byte],
|
||||||
of SecureCipherType.Twofish:
|
of SecureCipherType.Twofish:
|
||||||
cipher.ctxtwofish256.decrypt(input, output)
|
cipher.ctxtwofish256.decrypt(input, output)
|
||||||
|
|
||||||
proc macCheckAndDecode(sconn: SecureConnection, data: var seq[byte]): bool =
|
proc macCheckAndDecode(sconn: SecioConn, data: var seq[byte]): bool =
|
||||||
## This procedure checks MAC of recieved message ``data`` and if message is
|
## This procedure checks MAC of recieved message ``data`` and if message is
|
||||||
## authenticated, then decrypt message.
|
## authenticated, then decrypt message.
|
||||||
##
|
##
|
||||||
|
@ -176,7 +177,7 @@ proc macCheckAndDecode(sconn: SecureConnection, data: var seq[byte]): bool =
|
||||||
data.setLen(mark)
|
data.setLen(mark)
|
||||||
result = true
|
result = true
|
||||||
|
|
||||||
proc readMessage(sconn: SecureConnection): Future[seq[byte]] {.async.} =
|
method readMessage(sconn: SecioConn): Future[seq[byte]] {.async.} =
|
||||||
## Read message from channel secure connection ``sconn``.
|
## Read message from channel secure connection ``sconn``.
|
||||||
try:
|
try:
|
||||||
var buf = newSeq[byte](4)
|
var buf = newSeq[byte](4)
|
||||||
|
@ -201,7 +202,7 @@ proc readMessage(sconn: SecureConnection): Future[seq[byte]] {.async.} =
|
||||||
except AsyncStreamReadError:
|
except AsyncStreamReadError:
|
||||||
trace "Error reading from connection"
|
trace "Error reading from connection"
|
||||||
|
|
||||||
proc writeMessage(sconn: SecureConnection, message: seq[byte]) {.async.} =
|
method writeMessage(sconn: SecioConn, message: seq[byte]) {.async.} =
|
||||||
## Write message ``message`` to secure connection ``sconn``.
|
## Write message ``message`` to secure connection ``sconn``.
|
||||||
let macsize = sconn.writerMac.sizeDigest()
|
let macsize = sconn.writerMac.sizeDigest()
|
||||||
var msg = newSeq[byte](len(message) + 4 + macsize)
|
var msg = newSeq[byte](len(message) + 4 + macsize)
|
||||||
|
@ -221,12 +222,12 @@ proc writeMessage(sconn: SecureConnection, message: seq[byte]) {.async.} =
|
||||||
except AsyncStreamWriteError:
|
except AsyncStreamWriteError:
|
||||||
trace "Could not write to connection"
|
trace "Could not write to connection"
|
||||||
|
|
||||||
proc newSecureConnection(conn: Connection,
|
proc newSecioConn(conn: Connection,
|
||||||
hash: string,
|
hash: string,
|
||||||
cipher: string,
|
cipher: string,
|
||||||
secrets: Secret,
|
secrets: Secret,
|
||||||
order: int,
|
order: int,
|
||||||
remotePubKey: PublicKey): SecureConnection =
|
remotePubKey: PublicKey): SecioConn =
|
||||||
## Create new secure connection, using specified hash algorithm ``hash``,
|
## Create new secure connection, using specified hash algorithm ``hash``,
|
||||||
## cipher algorithm ``cipher``, stretched keys ``secrets`` and order
|
## cipher algorithm ``cipher``, stretched keys ``secrets`` and order
|
||||||
## ``order``.
|
## ``order``.
|
||||||
|
@ -280,7 +281,7 @@ proc transactMessage(conn: Connection,
|
||||||
except AsyncStreamWriteError:
|
except AsyncStreamWriteError:
|
||||||
trace "Could not write to connection", conn = $conn
|
trace "Could not write to connection", conn = $conn
|
||||||
|
|
||||||
proc handshake(s: Secio, conn: Connection): Future[SecureConnection] {.async.} =
|
method handshake*(s: Secio, conn: Connection): Future[SecureConn] {.async.} =
|
||||||
var
|
var
|
||||||
localNonce: array[SecioNonceSize, byte]
|
localNonce: array[SecioNonceSize, byte]
|
||||||
remoteNonce: seq[byte]
|
remoteNonce: seq[byte]
|
||||||
|
@ -403,9 +404,10 @@ proc handshake(s: Secio, conn: Connection): Future[SecureConnection] {.async.} =
|
||||||
|
|
||||||
# Perform Nonce exchange over encrypted channel.
|
# Perform Nonce exchange over encrypted channel.
|
||||||
|
|
||||||
result = newSecureConnection(conn, hash, cipher, keys, order, remotePubkey)
|
var secioConn = newSecioConn(conn, hash, cipher, keys, order, remotePubkey)
|
||||||
await result.writeMessage(remoteNonce)
|
result = secioConn
|
||||||
var res = await result.readMessage()
|
await secioConn.writeMessage(remoteNonce)
|
||||||
|
var res = await secioConn.readMessage()
|
||||||
|
|
||||||
if res != @localNonce:
|
if res != @localNonce:
|
||||||
trace "Nonce verification failed", receivedNonce = toHex(res),
|
trace "Nonce verification failed", receivedNonce = toHex(res),
|
||||||
|
@ -414,52 +416,9 @@ proc handshake(s: Secio, conn: Connection): Future[SecureConnection] {.async.} =
|
||||||
else:
|
else:
|
||||||
trace "Secure handshake succeeded"
|
trace "Secure handshake succeeded"
|
||||||
|
|
||||||
proc readLoop(sconn: SecureConnection, stream: BufferStream) {.async.} =
|
|
||||||
try:
|
|
||||||
while not sconn.closed:
|
|
||||||
let msg = await sconn.readMessage()
|
|
||||||
if msg.len == 0:
|
|
||||||
trace "stream EOF"
|
|
||||||
return
|
|
||||||
|
|
||||||
await stream.pushTo(msg)
|
|
||||||
except CatchableError as exc:
|
|
||||||
trace "exception occurred SecureConnection.readLoop", exc = exc.msg
|
|
||||||
finally:
|
|
||||||
if not sconn.closed:
|
|
||||||
await sconn.close()
|
|
||||||
trace "ending secio readLoop", isclosed = sconn.closed()
|
|
||||||
|
|
||||||
proc handleConn(s: Secio, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
|
||||||
var sconn = await s.handshake(conn)
|
|
||||||
proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
|
|
||||||
trace "sending encrypted bytes", bytes = data.toHex()
|
|
||||||
await sconn.writeMessage(data)
|
|
||||||
|
|
||||||
var stream = newBufferStream(writeHandler)
|
|
||||||
asyncCheck readLoop(sconn, stream)
|
|
||||||
result = newConnection(stream)
|
|
||||||
result.closeEvent.wait()
|
|
||||||
.addCallback do (udata: pointer):
|
|
||||||
trace "wrapped connection closed, closing upstream"
|
|
||||||
if not isNil(sconn) and not sconn.closed:
|
|
||||||
asyncCheck sconn.close()
|
|
||||||
|
|
||||||
result.peerInfo = PeerInfo.init(sconn.peerInfo.publicKey.get())
|
|
||||||
|
|
||||||
method init(s: Secio) {.gcsafe.} =
|
method init(s: Secio) {.gcsafe.} =
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
procCall Secure(s).init()
|
||||||
trace "handling connection"
|
|
||||||
try:
|
|
||||||
asyncCheck s.handleConn(conn)
|
|
||||||
trace "connection secured"
|
|
||||||
except CatchableError as exc:
|
|
||||||
if not conn.closed():
|
|
||||||
warn "securing connection failed", msg = exc.msg
|
|
||||||
await conn.close()
|
|
||||||
|
|
||||||
s.codec = SecioCodec
|
s.codec = SecioCodec
|
||||||
s.handler = handle
|
|
||||||
|
|
||||||
method secure*(s: Secio, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
method secure*(s: Secio, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -7,14 +7,70 @@
|
||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
|
import options
|
||||||
import chronos
|
import chronos
|
||||||
import ../protocol,
|
import chronicles
|
||||||
../../connection
|
import secureconn,
|
||||||
|
../protocol,
|
||||||
|
../../stream/bufferstream,
|
||||||
|
../../crypto/crypto,
|
||||||
|
../../connection,
|
||||||
|
../../peerinfo
|
||||||
|
|
||||||
type
|
type
|
||||||
Secure* = ref object of LPProtocol # base type for secure managers
|
Secure* = ref object of LPProtocol # base type for secure managers
|
||||||
|
|
||||||
method secure*(p: Secure, conn: Connection): Future[Connection] {.base.} =
|
method handshake(s: Secure, conn: Connection): Future[SecureConn] {.async, base.} =
|
||||||
|
doAssert(false, "Not implemented!")
|
||||||
|
|
||||||
|
proc readLoop(sconn: SecureConn, stream: BufferStream) {.async.} =
|
||||||
|
try:
|
||||||
|
while not sconn.closed:
|
||||||
|
let msg = await sconn.readMessage()
|
||||||
|
if msg.len == 0:
|
||||||
|
trace "stream EOF"
|
||||||
|
return
|
||||||
|
|
||||||
|
await stream.pushTo(msg)
|
||||||
|
except CatchableError as exc:
|
||||||
|
trace "exception occurred SecioConn.readLoop", exc = exc.msg
|
||||||
|
finally:
|
||||||
|
if not sconn.closed:
|
||||||
|
await sconn.close()
|
||||||
|
trace "ending secio readLoop", isclosed = sconn.closed()
|
||||||
|
|
||||||
|
method handleConn*(s: Secure, conn: Connection): Future[Connection] {.async, base, gcsafe.} =
|
||||||
|
var sconn = await s.handshake(conn)
|
||||||
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
trace "sending encrypted bytes", bytes = data.toHex()
|
||||||
|
await sconn.writeMessage(data)
|
||||||
|
|
||||||
|
var stream = newBufferStream(writeHandler)
|
||||||
|
asyncCheck readLoop(sconn, stream)
|
||||||
|
result = newConnection(stream)
|
||||||
|
result.closeEvent.wait()
|
||||||
|
.addCallback do (udata: pointer):
|
||||||
|
trace "wrapped connection closed, closing upstream"
|
||||||
|
if not isNil(sconn) and not sconn.closed:
|
||||||
|
asyncCheck sconn.close()
|
||||||
|
|
||||||
|
result.peerInfo = PeerInfo.init(sconn.peerInfo.publicKey.get())
|
||||||
|
|
||||||
|
method init*(s: Secure) {.gcsafe.} =
|
||||||
|
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
||||||
|
trace "handling connection"
|
||||||
|
try:
|
||||||
|
asyncCheck s.handleConn(conn)
|
||||||
|
trace "connection secured"
|
||||||
|
except CatchableError as exc:
|
||||||
|
if not conn.closed():
|
||||||
|
warn "securing connection failed", msg = exc.msg
|
||||||
|
await conn.close()
|
||||||
|
|
||||||
|
s.handler = handle
|
||||||
|
|
||||||
|
method secure*(p: Secure, conn: Connection): Future[Connection]
|
||||||
|
{.base, async, gcsafe.} =
|
||||||
## default implementation matches plaintext
|
## default implementation matches plaintext
|
||||||
var retFuture = newFuture[Connection]("secure.secure")
|
var retFuture = newFuture[Connection]("secure.secure")
|
||||||
retFuture.complete(conn)
|
retFuture.complete(conn)
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
## Nim-LibP2P
|
||||||
|
## Copyright (c) 2020 Status Research & Development GmbH
|
||||||
|
## Licensed under either of
|
||||||
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
## at your option.
|
||||||
|
## This file may not be copied, modified, or distributed except according to
|
||||||
|
## those terms.
|
||||||
|
|
||||||
|
import chronos
|
||||||
|
import ../../connection
|
||||||
|
|
||||||
|
type
|
||||||
|
SecureConn* = ref object of Connection
|
||||||
|
|
||||||
|
method readMessage*(c: SecureConn): Future[seq[byte]] {.async, base.} =
|
||||||
|
doAssert(false, "Not implemented!")
|
||||||
|
|
||||||
|
method writeMessage*(c: SecureConn, data: seq[byte]) {.async, base.} =
|
||||||
|
doAssert(false, "Not implemented!")
|
Loading…
Reference in New Issue