add ping protocol (#584)
* add ping protocol * add ping protocol handler * ping styling * more ping tests * switch ping to bearssl rng * update ping style * new cancellation test
This commit is contained in:
parent
caac8191d2
commit
55a3606ecb
|
@ -0,0 +1,95 @@
|
|||
## Nim-LibP2P
|
||||
## Copyright (c) 2021 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import chronos, chronicles, bearssl
|
||||
import ../protobuf/minprotobuf,
|
||||
../peerinfo,
|
||||
../stream/connection,
|
||||
../peerid,
|
||||
../crypto/crypto,
|
||||
../multiaddress,
|
||||
../protocols/protocol,
|
||||
../errors
|
||||
|
||||
logScope:
|
||||
topics = "libp2p ping"
|
||||
|
||||
const
|
||||
PingCodec* = "/ipfs/ping/1.0.0"
|
||||
PingSize = 32
|
||||
|
||||
type
|
||||
PingError* = object of LPError
|
||||
WrongPingAckError* = object of LPError
|
||||
|
||||
PingHandler* = proc (
|
||||
peer: PeerInfo):
|
||||
Future[void]
|
||||
{.gcsafe, raises: [Defect].}
|
||||
|
||||
Ping* = ref object of LPProtocol
|
||||
pingHandler*: PingHandler
|
||||
rng: ref BrHmacDrbgContext
|
||||
|
||||
proc new*(T: typedesc[Ping], handler: PingHandler = nil, rng: ref BrHmacDrbgContext = newRng()): T =
|
||||
let ping = Ping(pinghandler: handler, rng: rng)
|
||||
ping.init()
|
||||
ping
|
||||
|
||||
method init*(p: Ping) =
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
try:
|
||||
trace "handling ping", conn
|
||||
var buf: array[PingSize, byte]
|
||||
await conn.readExactly(addr buf[0], PingSize)
|
||||
trace "echoing ping", conn
|
||||
await conn.write(addr buf[0], PingSize)
|
||||
if not isNil(p.pingHandler):
|
||||
await p.pingHandler(conn.peerInfo)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception in ping handler", exc = exc.msg, conn
|
||||
|
||||
p.handler = handle
|
||||
p.codec = PingCodec
|
||||
|
||||
proc ping*(
|
||||
p: Ping,
|
||||
conn: Connection,
|
||||
): Future[Duration] {.async, gcsafe.} =
|
||||
## Sends ping to `conn`
|
||||
## Returns the delay
|
||||
##
|
||||
|
||||
trace "initiating ping", conn
|
||||
|
||||
var
|
||||
randomBuf: array[PingSize, byte]
|
||||
resultBuf: array[PingSize, byte]
|
||||
|
||||
p.rng[].brHmacDrbgGenerate(randomBuf)
|
||||
|
||||
let startTime = Moment.now()
|
||||
|
||||
trace "sending ping", conn
|
||||
await conn.write(addr randomBuf[0], randomBuf.len)
|
||||
|
||||
await conn.readExactly(addr resultBuf[0], PingSize)
|
||||
|
||||
let responseDur = Moment.now() - startTime
|
||||
|
||||
trace "got ping response", conn, responseDur
|
||||
|
||||
for i in 0..<randomBuf.len:
|
||||
if randomBuf[i] != resultBuf[i]:
|
||||
raise newException(WrongPingAckError, "Incorrect ping data from peer!")
|
||||
|
||||
trace "valid ping response", conn
|
||||
return responseDur
|
|
@ -0,0 +1,147 @@
|
|||
import options, bearssl
|
||||
import chronos, strutils
|
||||
import ../libp2p/[protocols/identify,
|
||||
protocols/ping,
|
||||
multiaddress,
|
||||
peerinfo,
|
||||
wire,
|
||||
peerid,
|
||||
stream/connection,
|
||||
multistream,
|
||||
transports/transport,
|
||||
transports/tcptransport,
|
||||
crypto/crypto,
|
||||
upgrademngrs/upgrade]
|
||||
import ./helpers
|
||||
|
||||
when defined(nimHasUsed): {.used.}
|
||||
|
||||
suite "Ping":
|
||||
teardown:
|
||||
checkTrackers()
|
||||
|
||||
suite "handle ping message":
|
||||
var
|
||||
ma {.threadvar.}: MultiAddress
|
||||
serverFut {.threadvar.}: Future[void]
|
||||
acceptFut {.threadvar.}: Future[void]
|
||||
pingProto1 {.threadvar.}: Ping
|
||||
pingProto2 {.threadvar.}: Ping
|
||||
transport1 {.threadvar.}: Transport
|
||||
transport2 {.threadvar.}: Transport
|
||||
msListen {.threadvar.}: MultistreamSelect
|
||||
msDial {.threadvar.}: MultistreamSelect
|
||||
conn {.threadvar.}: Connection
|
||||
pingReceivedCount {.threadvar.}: int
|
||||
|
||||
asyncSetup:
|
||||
ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
|
||||
transport1 = TcpTransport.init(upgrade = Upgrade())
|
||||
transport2 = TcpTransport.init(upgrade = Upgrade())
|
||||
|
||||
proc handlePing(peer: PeerInfo) {.async, gcsafe, closure.} =
|
||||
inc pingReceivedCount
|
||||
pingProto1 = Ping.new()
|
||||
pingProto2 = Ping.new(handlePing)
|
||||
|
||||
msListen = newMultistream()
|
||||
msDial = newMultistream()
|
||||
|
||||
pingReceivedCount = 0
|
||||
|
||||
asyncTeardown:
|
||||
await conn.close()
|
||||
await acceptFut
|
||||
await transport1.stop()
|
||||
await serverFut
|
||||
await transport2.stop()
|
||||
|
||||
asyncTest "simple ping":
|
||||
msListen.addHandler(PingCodec, pingProto1)
|
||||
serverFut = transport1.start(ma)
|
||||
proc acceptHandler(): Future[void] {.async, gcsafe.} =
|
||||
let c = await transport1.accept()
|
||||
await msListen.handle(c)
|
||||
|
||||
acceptFut = acceptHandler()
|
||||
conn = await transport2.dial(transport1.ma)
|
||||
|
||||
discard await msDial.select(conn, PingCodec)
|
||||
let time = await pingProto2.ping(conn)
|
||||
|
||||
check not time.isZero()
|
||||
|
||||
asyncTest "networked cancel ping":
|
||||
proc testPing(): Future[void] {.async.} =
|
||||
let baseMa = Multiaddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
|
||||
|
||||
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
let transportdialer: TcpTransport = TcpTransport.init(upgrade = Upgrade())
|
||||
asyncCheck transport.start(baseMa)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
let handler = Ping.new().handler
|
||||
let conn = await transport.accept()
|
||||
await handler(conn, "na")
|
||||
|
||||
let handlerWait = acceptHandler()
|
||||
|
||||
let streamTransport = await transportdialer.dial(transport.ma)
|
||||
|
||||
discard await pingProto2.ping(streamTransport)
|
||||
|
||||
for pollCount in 0..20:
|
||||
#echo "Polling ", pollCount, " times"
|
||||
let p = testPing()
|
||||
for _ in 0..<pollCount:
|
||||
if p.finished: break
|
||||
poll()
|
||||
if p.finished: break #We actually finished the sequence
|
||||
await p.cancelAndWait()
|
||||
check p.cancelled
|
||||
|
||||
asyncTest "ping callback":
|
||||
msDial.addHandler(PingCodec, pingProto2)
|
||||
serverFut = transport1.start(ma)
|
||||
proc acceptHandler(): Future[void] {.async, gcsafe.} =
|
||||
let c = await transport1.accept()
|
||||
discard await msListen.select(c, PingCodec)
|
||||
discard await pingProto1.ping(c)
|
||||
|
||||
acceptFut = acceptHandler()
|
||||
conn = await transport2.dial(transport1.ma)
|
||||
|
||||
await msDial.handle(conn)
|
||||
check pingReceivedCount == 1
|
||||
|
||||
asyncTest "bad ping data ack":
|
||||
type FakePing = ref object of LPProtocol
|
||||
let fakePingProto = FakePing()
|
||||
proc fakeHandle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
var
|
||||
buf: array[32, byte]
|
||||
fakebuf: array[32, byte]
|
||||
await conn.readExactly(addr buf[0], 32)
|
||||
await conn.write(addr fakebuf[0], 32)
|
||||
fakePingProto.codec = PingCodec
|
||||
fakePingProto.handler = fakeHandle
|
||||
|
||||
msListen.addHandler(PingCodec, fakePingProto)
|
||||
serverFut = transport1.start(ma)
|
||||
proc acceptHandler(): Future[void] {.async, gcsafe.} =
|
||||
let c = await transport1.accept()
|
||||
await msListen.handle(c)
|
||||
|
||||
acceptFut = acceptHandler()
|
||||
conn = await transport2.dial(transport1.ma)
|
||||
|
||||
discard await msDial.select(conn, PingCodec)
|
||||
let p = pingProto2.ping(conn)
|
||||
var raised = false
|
||||
try:
|
||||
discard await p
|
||||
check false #should have raised
|
||||
except WrongPingAckError:
|
||||
raised = true
|
||||
check raised
|
Loading…
Reference in New Issue