diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 73916e1..bb99af0 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -76,6 +76,10 @@ method rpcHandler*(f: FloodSub, toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic if t in f.topics: # check that we're subscribed to it for h in f.topics[t].handler: + trace "calling handler for message", msg = msg.msgId, + topicId = t, + localPeer = f.peerInfo.id, + fromPeer = msg.fromPeerId().pretty await h(t, msg.data) # trigger user provided handler # forward the message to all peers interested in it diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index e925079..0db63b0 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -191,6 +191,7 @@ method publish*(p: PubSub, ## publish to a ``topic`` if p.triggerSelf and topic in p.topics: for h in p.topics[topic].handler: + trace "triggering handler", topicID = topic await h(topic, data) method initPubSub*(p: PubSub) {.base.} = diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index 55ea089..9d1f649 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -182,7 +182,7 @@ proc readMessage*(sconn: SecureConnection): Future[seq[byte]] {.async.} = var buf = newSeq[byte](4) await sconn.readExactly(addr buf[0], 4) let length = (int(buf[0]) shl 24) or (int(buf[1]) shl 16) or - (int(buf[2]) shl 8) or (int(buf[3])) + (int(buf[2]) shl 8) or (int(buf[3])) trace "Recieved message header", header = toHex(buf), length = length if length <= SecioMaxMessageSize: buf.setLen(length) @@ -261,12 +261,14 @@ proc transactMessage(conn: Connection, await conn.write(msg) await conn.readExactly(addr buf[0], 4) let length = (int(buf[0]) shl 24) or (int(buf[1]) shl 16) or - (int(buf[2]) shl 8) or (int(buf[3])) + (int(buf[2]) shl 8) or (int(buf[3])) trace "Recieved message header", header = toHex(buf), length = length if length <= SecioMaxMessageSize: buf.setLen(length) await conn.readExactly(addr buf[0], length) - trace "Received message body", conn = conn, length = length + trace "Received message body", conn = conn, + length = length, + buff = buf result = buf else: trace "Received size of message exceed limits", conn = conn, @@ -364,15 +366,15 @@ proc handshake*(s: Secio, conn: Connection): Future[SecureConnection] {.async.} raise newException(SecioError, "Remote exchange decoding failed") if not remoteESignature.init(remoteEBytesSig): - trace "Remote signature incorrect or corrupted", - signature = toHex(remoteEBytesSig) + trace "Remote signature incorrect or corrupted", signature = toHex(remoteEBytesSig) raise newException(SecioError, "Remote signature incorrect or corrupted") var remoteCorpus = answer & request[4..^1] & remoteEBytesPubkey if not remoteESignature.verify(remoteCorpus, remotePubkey): trace "Signature verification failed", scheme = remotePubkey.scheme, - signature = remoteESignature, pubkey = remotePubkey, - corpus = remoteCorpus + signature = remoteESignature, + pubkey = remotePubkey, + corpus = remoteCorpus raise newException(SecioError, "Signature verification failed") trace "Signature verified", scheme = remotePubkey.scheme @@ -402,7 +404,6 @@ proc handshake*(s: Secio, conn: Connection): Future[SecureConnection] {.async.} # Perform Nonce exchange over encrypted channel. result = newSecureConnection(conn, hash, cipher, keys, order, remotePubkey) - await result.writeMessage(remoteNonce) var res = await result.readMessage() @@ -440,9 +441,9 @@ proc handleConn(s: Secio, conn: Connection): Future[Connection] {.async, gcsafe. result = newConnection(stream) result.closeEvent.wait() .addCallback do (udata: pointer): - trace "wrapped connection closed, closing upstream" - if not isNil(sconn) and not sconn.closed: - asyncCheck sconn.close() + trace "wrapped connection closed, closing upstream" + if not isNil(sconn) and not sconn.closed: + asyncCheck sconn.close() result.peerInfo = PeerInfo.init(sconn.peerInfo.publicKey.get()) @@ -453,8 +454,9 @@ method init(s: Secio) {.gcsafe.} = discard await s.handleConn(conn) trace "connection secured" except CatchableError as exc: - trace "securing connection failed", msg = exc.msg - await conn.close() + if not conn.closed(): + warn "securing connection failed", msg = exc.msg + await conn.close() s.codec = SecioCodec s.handler = handle @@ -463,7 +465,8 @@ method secure*(s: Secio, conn: Connection): Future[Connection] {.async, gcsafe.} try: result = await s.handleConn(conn) except CatchableError as exc: - trace "securing connection failed", msg = exc.msg + warn "securing connection failed", msg = exc.msg + if not conn.closed(): await conn.close() proc newSecio*(localPrivateKey: PrivateKey): Secio = diff --git a/libp2p/standard_setup.nim b/libp2p/standard_setup.nim index 30faa07..f2809ee 100644 --- a/libp2p/standard_setup.nim +++ b/libp2p/standard_setup.nim @@ -11,7 +11,8 @@ export proc newStandardSwitch*(privKey = none(PrivateKey), address = MultiAddress.init("/ip4/127.0.0.1/tcp/0"), - triggerSelf = false, gossip = false): Switch = + triggerSelf = false, + gossip = false): Switch = proc createMplex(conn: Connection): Muxer = result = newMplex(conn) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 958b68d..b6219bd 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -18,7 +18,6 @@ import utils, ../../libp2p/[peer, protocols/pubsub/gossipsub, protocols/pubsub/rpc/messages] - proc createGossipSub(): GossipSub = var peerInfo = PeerInfo.init(PrivateKey.random(RSA)) result = newPubSub(GossipSub, peerInfo) diff --git a/tests/testinterop.nim b/tests/testinterop.nim index 76086f7..f4c8618 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -93,7 +93,8 @@ proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey), secureManagers = secureManagers, pubSub = pubSub) -proc testPubSubDaemonPublish(gossip: bool = false): Future[bool] {.async.} = +proc testPubSubDaemonPublish(gossip: bool = false, count: int = 1): Future[ + bool] {.async.} = var pubsubData = "TEST MESSAGE" var testTopic = "test-topic" var msgData = cast[seq[byte]](pubsubData) @@ -109,10 +110,13 @@ proc testPubSubDaemonPublish(gossip: bool = false): Future[bool] {.async.} = let nativePeer = nativeNode.peerInfo var handlerFuture = newFuture[bool]() + var times = 0 proc nativeHandler(topic: string, data: seq[byte]) {.async.} = let smsg = cast[string](data) check smsg == pubsubData - handlerFuture.complete(true) + times.inc() + if times >= count: + handlerFuture.complete(true) await nativeNode.subscribeToPeer(NativePeerInfo.init(daemonPeer.peer, daemonPeer.addresses)) @@ -126,15 +130,18 @@ proc testPubSubDaemonPublish(gossip: bool = false): Future[bool] {.async.} = asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) await nativeNode.subscribe(testTopic, nativeHandler) - await sleepAsync(1.seconds) - await daemonNode.pubsubPublish(testTopic, msgData) + while times < count: + await sleepAsync(1.seconds) + await daemonNode.pubsubPublish(testTopic, msgData) + await sleepAsync(100.millis) result = await handlerFuture await nativeNode.stop() await allFutures(awaiters) await daemonNode.close() -proc testPubSubNodePublish(gossip: bool = false): Future[bool] {.async.} = +proc testPubSubNodePublish(gossip: bool = false, count: int = 1): Future[ + bool] {.async.} = var pubsubData = "TEST MESSAGE" var testTopic = "test-topic" var msgData = cast[seq[byte]](pubsubData) @@ -156,17 +163,25 @@ proc testPubSubNodePublish(gossip: bool = false): Future[bool] {.async.} = await sleepAsync(1.seconds) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + var times = 0 proc pubsubHandler(api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage): Future[bool] {.async.} = let smsg = cast[string](message.data) check smsg == pubsubData - handlerFuture.complete(true) + times.inc() + if times >= count: + handlerFuture.complete(true) result = true # don't cancel subscription - asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) + discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler) + proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard + await nativeNode.subscribe(testTopic, nativeHandler) await sleepAsync(1.seconds) - await nativeNode.publish(testTopic, msgData) + while times < count: + await sleepAsync(1.seconds) + await nativeNode.publish(testTopic, msgData) + await sleepAsync(100.millis) result = await handlerFuture await nativeNode.stop() @@ -370,18 +385,34 @@ suite "Interop": check: waitFor(runTests()) == true - test "floodsub: daemon publish": + test "floodsub: daemon publish one": check: waitFor(testPubSubDaemonPublish()) == true - test "gossipsub: daemon publish": + test "floodsub: daemon publish many": check: - waitFor(testPubSubDaemonPublish(true)) == true + waitFor(testPubSubDaemonPublish(count = 10)) == true - test "floodsub: node publish": + test "gossipsub: daemon publish one": + check: + waitFor(testPubSubDaemonPublish(gossip = true)) == true + + test "gossipsub: daemon publish many": + check: + waitFor(testPubSubDaemonPublish(gossip = true, count = 10)) == true + + test "floodsub: node publish one": check: waitFor(testPubSubNodePublish()) == true - test "gossipsub: node publish": + test "floodsub: node publish many": check: - waitFor(testPubSubNodePublish(true)) == true + waitFor(testPubSubNodePublish(count = 10)) == true + + test "gossipsub: node publish one": + check: + waitFor(testPubSubNodePublish(gossip = true)) == true + + test "gossipsub: node publish many": + check: + waitFor(testPubSubNodePublish(gossip = true, count = 10)) == true