strenghten pubsub interop testing

This commit is contained in:
Dmitriy Ryajov 2019-12-23 12:45:12 -06:00
parent 8714c66353
commit 0fb1f1c5b8
6 changed files with 69 additions and 30 deletions

View File

@ -76,6 +76,10 @@ method rpcHandler*(f: FloodSub,
toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic
if t in f.topics: # check that we're subscribed to it
for h in f.topics[t].handler:
trace "calling handler for message", msg = msg.msgId,
topicId = t,
localPeer = f.peerInfo.id,
fromPeer = msg.fromPeerId().pretty
await h(t, msg.data) # trigger user provided handler
# forward the message to all peers interested in it

View File

@ -191,6 +191,7 @@ method publish*(p: PubSub,
## publish to a ``topic``
if p.triggerSelf and topic in p.topics:
for h in p.topics[topic].handler:
trace "triggering handler", topicID = topic
await h(topic, data)
method initPubSub*(p: PubSub) {.base.} =

View File

@ -182,7 +182,7 @@ proc readMessage*(sconn: SecureConnection): Future[seq[byte]] {.async.} =
var buf = newSeq[byte](4)
await sconn.readExactly(addr buf[0], 4)
let length = (int(buf[0]) shl 24) or (int(buf[1]) shl 16) or
(int(buf[2]) shl 8) or (int(buf[3]))
(int(buf[2]) shl 8) or (int(buf[3]))
trace "Recieved message header", header = toHex(buf), length = length
if length <= SecioMaxMessageSize:
buf.setLen(length)
@ -261,12 +261,14 @@ proc transactMessage(conn: Connection,
await conn.write(msg)
await conn.readExactly(addr buf[0], 4)
let length = (int(buf[0]) shl 24) or (int(buf[1]) shl 16) or
(int(buf[2]) shl 8) or (int(buf[3]))
(int(buf[2]) shl 8) or (int(buf[3]))
trace "Recieved message header", header = toHex(buf), length = length
if length <= SecioMaxMessageSize:
buf.setLen(length)
await conn.readExactly(addr buf[0], length)
trace "Received message body", conn = conn, length = length
trace "Received message body", conn = conn,
length = length,
buff = buf
result = buf
else:
trace "Received size of message exceed limits", conn = conn,
@ -364,15 +366,15 @@ proc handshake*(s: Secio, conn: Connection): Future[SecureConnection] {.async.}
raise newException(SecioError, "Remote exchange decoding failed")
if not remoteESignature.init(remoteEBytesSig):
trace "Remote signature incorrect or corrupted",
signature = toHex(remoteEBytesSig)
trace "Remote signature incorrect or corrupted", signature = toHex(remoteEBytesSig)
raise newException(SecioError, "Remote signature incorrect or corrupted")
var remoteCorpus = answer & request[4..^1] & remoteEBytesPubkey
if not remoteESignature.verify(remoteCorpus, remotePubkey):
trace "Signature verification failed", scheme = remotePubkey.scheme,
signature = remoteESignature, pubkey = remotePubkey,
corpus = remoteCorpus
signature = remoteESignature,
pubkey = remotePubkey,
corpus = remoteCorpus
raise newException(SecioError, "Signature verification failed")
trace "Signature verified", scheme = remotePubkey.scheme
@ -402,7 +404,6 @@ proc handshake*(s: Secio, conn: Connection): Future[SecureConnection] {.async.}
# Perform Nonce exchange over encrypted channel.
result = newSecureConnection(conn, hash, cipher, keys, order, remotePubkey)
await result.writeMessage(remoteNonce)
var res = await result.readMessage()
@ -440,9 +441,9 @@ proc handleConn(s: Secio, conn: Connection): Future[Connection] {.async, gcsafe.
result = newConnection(stream)
result.closeEvent.wait()
.addCallback do (udata: pointer):
trace "wrapped connection closed, closing upstream"
if not isNil(sconn) and not sconn.closed:
asyncCheck sconn.close()
trace "wrapped connection closed, closing upstream"
if not isNil(sconn) and not sconn.closed:
asyncCheck sconn.close()
result.peerInfo = PeerInfo.init(sconn.peerInfo.publicKey.get())
@ -453,8 +454,9 @@ method init(s: Secio) {.gcsafe.} =
discard await s.handleConn(conn)
trace "connection secured"
except CatchableError as exc:
trace "securing connection failed", msg = exc.msg
await conn.close()
if not conn.closed():
warn "securing connection failed", msg = exc.msg
await conn.close()
s.codec = SecioCodec
s.handler = handle
@ -463,7 +465,8 @@ method secure*(s: Secio, conn: Connection): Future[Connection] {.async, gcsafe.}
try:
result = await s.handleConn(conn)
except CatchableError as exc:
trace "securing connection failed", msg = exc.msg
warn "securing connection failed", msg = exc.msg
if not conn.closed():
await conn.close()
proc newSecio*(localPrivateKey: PrivateKey): Secio =

View File

@ -11,7 +11,8 @@ export
proc newStandardSwitch*(privKey = none(PrivateKey),
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0"),
triggerSelf = false, gossip = false): Switch =
triggerSelf = false,
gossip = false): Switch =
proc createMplex(conn: Connection): Muxer =
result = newMplex(conn)

View File

@ -18,7 +18,6 @@ import utils, ../../libp2p/[peer,
protocols/pubsub/gossipsub,
protocols/pubsub/rpc/messages]
proc createGossipSub(): GossipSub =
var peerInfo = PeerInfo.init(PrivateKey.random(RSA))
result = newPubSub(GossipSub, peerInfo)

View File

@ -93,7 +93,8 @@ proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey),
secureManagers = secureManagers,
pubSub = pubSub)
proc testPubSubDaemonPublish(gossip: bool = false): Future[bool] {.async.} =
proc testPubSubDaemonPublish(gossip: bool = false, count: int = 1): Future[
bool] {.async.} =
var pubsubData = "TEST MESSAGE"
var testTopic = "test-topic"
var msgData = cast[seq[byte]](pubsubData)
@ -109,10 +110,13 @@ proc testPubSubDaemonPublish(gossip: bool = false): Future[bool] {.async.} =
let nativePeer = nativeNode.peerInfo
var handlerFuture = newFuture[bool]()
var times = 0
proc nativeHandler(topic: string, data: seq[byte]) {.async.} =
let smsg = cast[string](data)
check smsg == pubsubData
handlerFuture.complete(true)
times.inc()
if times >= count:
handlerFuture.complete(true)
await nativeNode.subscribeToPeer(NativePeerInfo.init(daemonPeer.peer,
daemonPeer.addresses))
@ -126,15 +130,18 @@ proc testPubSubDaemonPublish(gossip: bool = false): Future[bool] {.async.} =
asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
await nativeNode.subscribe(testTopic, nativeHandler)
await sleepAsync(1.seconds)
await daemonNode.pubsubPublish(testTopic, msgData)
while times < count:
await sleepAsync(1.seconds)
await daemonNode.pubsubPublish(testTopic, msgData)
await sleepAsync(100.millis)
result = await handlerFuture
await nativeNode.stop()
await allFutures(awaiters)
await daemonNode.close()
proc testPubSubNodePublish(gossip: bool = false): Future[bool] {.async.} =
proc testPubSubNodePublish(gossip: bool = false, count: int = 1): Future[
bool] {.async.} =
var pubsubData = "TEST MESSAGE"
var testTopic = "test-topic"
var msgData = cast[seq[byte]](pubsubData)
@ -156,17 +163,25 @@ proc testPubSubNodePublish(gossip: bool = false): Future[bool] {.async.} =
await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var times = 0
proc pubsubHandler(api: DaemonAPI,
ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.async.} =
let smsg = cast[string](message.data)
check smsg == pubsubData
handlerFuture.complete(true)
times.inc()
if times >= count:
handlerFuture.complete(true)
result = true # don't cancel subscription
asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard
await nativeNode.subscribe(testTopic, nativeHandler)
await sleepAsync(1.seconds)
await nativeNode.publish(testTopic, msgData)
while times < count:
await sleepAsync(1.seconds)
await nativeNode.publish(testTopic, msgData)
await sleepAsync(100.millis)
result = await handlerFuture
await nativeNode.stop()
@ -370,18 +385,34 @@ suite "Interop":
check:
waitFor(runTests()) == true
test "floodsub: daemon publish":
test "floodsub: daemon publish one":
check:
waitFor(testPubSubDaemonPublish()) == true
test "gossipsub: daemon publish":
test "floodsub: daemon publish many":
check:
waitFor(testPubSubDaemonPublish(true)) == true
waitFor(testPubSubDaemonPublish(count = 10)) == true
test "floodsub: node publish":
test "gossipsub: daemon publish one":
check:
waitFor(testPubSubDaemonPublish(gossip = true)) == true
test "gossipsub: daemon publish many":
check:
waitFor(testPubSubDaemonPublish(gossip = true, count = 10)) == true
test "floodsub: node publish one":
check:
waitFor(testPubSubNodePublish()) == true
test "gossipsub: node publish":
test "floodsub: node publish many":
check:
waitFor(testPubSubNodePublish(true)) == true
waitFor(testPubSubNodePublish(count = 10)) == true
test "gossipsub: node publish one":
check:
waitFor(testPubSubNodePublish(gossip = true)) == true
test "gossipsub: node publish many":
check:
waitFor(testPubSubNodePublish(gossip = true, count = 10)) == true