Attempt to fix tests.

This commit is contained in:
cheatfate 2018-12-10 22:55:06 +02:00
parent 337c6c932c
commit 01a268a440
2 changed files with 152 additions and 102 deletions

View File

@ -10,7 +10,7 @@
## This module implementes API for `go-libp2p-daemon`.
import os, osproc, strutils, tables, streams
import asyncdispatch2
import ../varint, ../multiaddress, ../protobuf/minprotobuf, transpool
import ../varint, ../multiaddress, ../protobuf/minprotobuf
when not defined(windows):
import posix
@ -97,7 +97,7 @@ type
address*: TransportAddress
DaemonAPI* = ref object
pool*: TransportPool
# pool*: TransportPool
flags*: set[P2PDaemonFlags]
address*: TransportAddress
sockname*: string
@ -470,6 +470,15 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} =
result = buffer
proc newConnection*(api: DaemonAPI): Future[StreamTransport] =
echo "Establish new connection to daemon [", $api.address, "]"
result = connect(api.address)
proc closeConnection*(api: DaemonAPI, transp: StreamTransport) {.async.} =
echo "Close connection with daemon [", $api.address, "]"
transp.close()
await transp.join()
proc socketExists(filename: string): bool =
var res: Stat
result = stat(filename, res) >= 0'i32
@ -485,6 +494,7 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
gossipsubHeartbeatDelay = 0): Future[DaemonAPI] {.async.} =
## Initialize connections to `go-libp2p-daemon` control socket.
var api = new DaemonAPI
var args = newSeq[string]()
api.flags = flags
api.servers = newSeq[P2PServer]()
api.pattern = pattern
@ -504,9 +514,8 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
# We will start daemon process only when control socket path is not default or
# options are specified.
if flags == {} and api.sockname == DefaultSocketPath:
api.pool = await newPool(initTAddress(api.sockname), poolsize = poolSize)
discard
else:
var args = newSeq[string]()
# DHTFull and DHTClient could not be present at the same time
if P2PDaemonFlags.DHTFull in flags and P2PDaemonFlags.DHTClient in flags:
api.flags.excl(DHTClient)
@ -523,15 +532,15 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
if P2PDaemonFlags.PSGossipSub in api.flags:
args.add("-pubsub")
args.add("-pubsubRouter=gossipsub")
if P2PDaemonFlags.PSFloodSub in api.flags:
args.add("-pubsub")
args.add("-pubsubRouter=floodsub")
if gossipsubHeartbeatInterval != 0:
let param = $gossipsubHeartbeatInterval & "ms"
args.add("-gossipsubHeartbeatInterval=" & param)
if gossipsubHeartbeatDelay != 0:
let param = $gossipsubHeartbeatDelay & "ms"
args.add("-gossipsubHeartbeatInitialDelay=" & param)
if P2PDaemonFlags.PSFloodSub in api.flags:
args.add("-pubsub")
args.add("-pubsubRouter=floodsub")
if api.flags * {P2PDaemonFlags.PSFloodSub, P2PDaemonFlags.PSFloodSub} != {}:
if P2PDaemonFlags.PSSign in api.flags:
args.add("-pubsubSign=true")
@ -543,31 +552,32 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
args.add("-id=" & id)
if api.sockname != DefaultSocketPath:
args.add("-sock=" & api.sockname)
# We are trying to get absolute daemon path.
let cmd = findExe(daemon)
if len(cmd) == 0:
raise newException(DaemonLocalError, "Could not find daemon executable!")
# We will try to remove control socket file, because daemon will fail
# if its not able to create new socket control file.
# We can't use `existsFile()` because it do not support unix-domain socket
# endpoints.
if socketExists(api.sockname):
if not tryRemoveFile(api.sockname):
if api.sockname != sockpath:
raise newException(DaemonLocalError, "Socket is already bound!")
# Starting daemon process
api.process = startProcess(cmd, "", args, options = {poStdErrToStdOut})
# Waiting until daemon will not be bound to control socket.
while true:
if not api.process.running():
echo api.process.errorStream.readAll()
raise newException(DaemonLocalError,
"Daemon executable could not be started!")
if socketExists(api.sockname):
break
await sleepAsync(100)
api.pool = await newPool(initTAddress(api.sockname), poolsize = poolSize)
# We are trying to get absolute daemon path.
let cmd = findExe(daemon)
if len(cmd) == 0:
raise newException(DaemonLocalError, "Could not find daemon executable!")
# We will try to remove control socket file, because daemon will fail
# if its not able to create new socket control file.
# We can't use `existsFile()` because it do not support unix-domain socket
# endpoints.
if socketExists(api.sockname):
if not tryRemoveFile(api.sockname):
if api.sockname != sockpath:
raise newException(DaemonLocalError, "Socket is already bound!")
# Starting daemon process
echo "Spawn [", cmd, " ", args.join(" "), "]"
api.process = startProcess(cmd, "", args, options = {poStdErrToStdOut})
# Waiting until daemon will not be bound to control socket.
while true:
if not api.process.running():
echo api.process.errorStream.readAll()
raise newException(DaemonLocalError,
"Daemon executable could not be started!")
if socketExists(api.sockname):
break
await sleepAsync(100)
# api.pool = await newPool(api.address, poolsize = poolSize)
result = api
proc close*(stream: P2PStream) {.async.} =
@ -582,7 +592,7 @@ proc close*(stream: P2PStream) {.async.} =
proc close*(api: DaemonAPI) {.async.} =
## Shutdown connections to `go-libp2p-daemon` control socket.
await api.pool.close()
# await api.pool.close()
# Closing all pending servers.
if len(api.servers) > 0:
var pending = newSeq[Future[void]]()
@ -636,7 +646,7 @@ proc getPeerInfo(pb: var ProtoBuffer): PeerInfo =
proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} =
## Get Node identity information
var transp = await api.pool.acquire()
var transp = await api.newConnection()
try:
var pb = await transactMessage(transp, requestIdentity())
pb.withMessage() do:
@ -644,37 +654,37 @@ proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} =
if res == cast[int](ResponseType.IDENTITY):
result = pb.getPeerInfo()
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc connect*(api: DaemonAPI, peer: PeerID,
addresses: seq[MultiAddress],
timeout = 0) {.async.} =
## Connect to remote peer with id ``peer`` and addresses ``addresses``.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestConnect(peer, addresses,
timeout))
pb.withMessage() do:
discard
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc disconnect*(api: DaemonAPI, peer: PeerID) {.async.} =
## Disconnect from remote peer with id ``peer``.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestDisconnect(peer))
pb.withMessage() do:
discard
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc openStream*(api: DaemonAPI, peer: PeerID,
protocols: seq[string],
timeout = 0): Future[P2PStream] {.async.} =
## Open new stream to peer ``peer`` using one of the protocols in
## ``protocols``. Returns ``StreamTransport`` for the stream.
var transp = await connect(api.address)
var transp = await api.newConnection()
var stream = new P2PStream
try:
var pb = await transp.transactMessage(requestStreamOpen(peer, protocols,
@ -696,8 +706,7 @@ proc openStream*(api: DaemonAPI, peer: PeerID,
stream.transp = transp
result = stream
except:
transp.close()
await transp.join()
await api.closeConnection(transp)
raise getCurrentException()
proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
@ -725,7 +734,7 @@ proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
proc addHandler*(api: DaemonAPI, protocols: seq[string],
handler: P2PStreamCallback) {.async.} =
## Add stream handler ``handler`` for set of protocols ``protocols``.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
var sockname = api.pattern % [$api.ucounter]
var localaddr = initTAddress(sockname)
inc(api.ucounter)
@ -746,11 +755,11 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string],
await server.join()
raise getCurrentException()
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} =
## Get list of remote peers to which we are currently connected.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestListPeers())
pb.withMessage() do:
@ -765,38 +774,38 @@ proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} =
pb.skipSubmessage()
res = pb.enterSubmessage()
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc cmTagPeer*(api: DaemonAPI, peer: PeerID, tag: string,
weight: int) {.async.} =
## Tag peer with id ``peer`` using ``tag`` and ``weight``.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestCMTagPeer(peer, tag, weight))
withMessage(pb) do:
discard
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc cmUntagPeer*(api: DaemonAPI, peer: PeerID, tag: string) {.async.} =
## Remove tag ``tag`` from peer with id ``peer``.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestCMUntagPeer(peer, tag))
withMessage(pb) do:
discard
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc cmTrimPeers*(api: DaemonAPI) {.async.} =
## Trim all connections.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestCMTrim())
withMessage(pb) do:
discard
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc dhtGetSinglePeerInfo(pb: var ProtoBuffer): PeerInfo =
if pb.enterSubmessage() == 2:
@ -842,14 +851,14 @@ proc dhtFindPeer*(api: DaemonAPI, peer: PeerID,
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
## means no timeout.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestDHTFindPeer(peer, timeout))
withMessage(pb) do:
pb.enterDhtMessage(DHTResponseType.VALUE)
result = pb.dhtGetSinglePeerInfo()
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc dhtGetPublicKey*(api: DaemonAPI, peer: PeerID,
timeout = 0): Future[LibP2PPublicKey] {.async.} =
@ -857,14 +866,14 @@ proc dhtGetPublicKey*(api: DaemonAPI, peer: PeerID,
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
## means no timeout.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestDHTGetPublicKey(peer, timeout))
withMessage(pb) do:
pb.enterDhtMessage(DHTResponseType.VALUE)
result = pb.dhtGetSingleValue()
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc dhtGetValue*(api: DaemonAPI, key: string,
timeout = 0): Future[seq[byte]] {.async.} =
@ -872,14 +881,14 @@ proc dhtGetValue*(api: DaemonAPI, key: string,
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
## means no timeout.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestDHTGetValue(key, timeout))
withMessage(pb) do:
pb.enterDhtMessage(DHTResponseType.VALUE)
result = pb.dhtGetSingleValue()
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc dhtPutValue*(api: DaemonAPI, key: string, value: seq[byte],
timeout = 0) {.async.} =
@ -887,27 +896,27 @@ proc dhtPutValue*(api: DaemonAPI, key: string, value: seq[byte],
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
## means no timeout.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestDHTPutValue(key, value,
timeout))
withMessage(pb) do:
discard
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc dhtProvide*(api: DaemonAPI, cid: CID, timeout = 0) {.async.} =
## Provide content with id ``cid``.
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
## means no timeout.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestDHTProvide(cid, timeout))
withMessage(pb) do:
discard
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc dhtFindPeersConnectedToPeer*(api: DaemonAPI, peer: PeerID,
timeout = 0): Future[seq[PeerInfo]] {.async.} =
@ -915,7 +924,7 @@ proc dhtFindPeersConnectedToPeer*(api: DaemonAPI, peer: PeerID,
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
## means no timeout.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
var list = newSeq[PeerInfo]()
try:
let spb = requestDHTFindPeersConnectedToPeer(peer, timeout)
@ -932,7 +941,7 @@ proc dhtFindPeersConnectedToPeer*(api: DaemonAPI, peer: PeerID,
list.add(cpb.dhtGetSinglePeerInfo())
result = list
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc dhtGetClosestPeers*(api: DaemonAPI, key: string,
timeout = 0): Future[seq[PeerID]] {.async.} =
@ -940,7 +949,7 @@ proc dhtGetClosestPeers*(api: DaemonAPI, key: string,
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
## means no timeout.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
var list = newSeq[PeerID]()
try:
let spb = requestDHTGetClosestPeers(key, timeout)
@ -957,7 +966,7 @@ proc dhtGetClosestPeers*(api: DaemonAPI, key: string,
list.add(cpb.dhtGetSingleValue())
result = list
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc dhtFindProviders*(api: DaemonAPI, cid: CID, count: uint32,
timeout = 0): Future[seq[PeerInfo]] {.async.} =
@ -965,7 +974,7 @@ proc dhtFindProviders*(api: DaemonAPI, cid: CID, count: uint32,
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
## means no timeout.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
var list = newSeq[PeerInfo]()
try:
let spb = requestDHTFindProviders(cid, count, timeout)
@ -982,7 +991,7 @@ proc dhtFindProviders*(api: DaemonAPI, cid: CID, count: uint32,
list.add(cpb.dhtGetSinglePeerInfo())
result = list
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc dhtSearchValue*(api: DaemonAPI, key: string,
timeout = 0): Future[seq[seq[byte]]] {.async.} =
@ -990,7 +999,7 @@ proc dhtSearchValue*(api: DaemonAPI, key: string,
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
## means no timeout.
var transp = await api.pool.acquire()
var transp = await api.newConnection()
var list = newSeq[seq[byte]]()
try:
var pb = await transp.transactMessage(requestDHTSearchValue(key, timeout))
@ -1006,11 +1015,13 @@ proc dhtSearchValue*(api: DaemonAPI, key: string,
list.add(cpb.dhtGetSingleValue())
result = list
finally:
api.pool.release(transp)
await api.closeConnection(transp)
proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} =
## Get list of topics this node is subscribed to.
var transp = await api.pool.acquire()
# var transp = await api.pool.acquire()
echo "pubsubGetTopics()"
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestPSGetTopics())
withMessage(pb) do:
@ -1022,13 +1033,16 @@ proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} =
topic.setLen(0)
result = topics
finally:
api.pool.release(transp)
await api.closeConnection(transp)
# api.pool.release(transp)
proc pubsubListPeers*(api: DaemonAPI,
topic: string): Future[seq[PeerID]] {.async.} =
## Get list of peers we are connected to and which also subscribed to topic
## ``topic``.
var transp = await api.pool.acquire()
# var transp = await api.pool.acquire()
echo "pubsubListPeers()"
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestPSListPeers(topic))
withMessage(pb) do:
@ -1040,18 +1054,22 @@ proc pubsubListPeers*(api: DaemonAPI,
peer.setLen(0)
result = peers
finally:
api.pool.release(transp)
await api.closeConnection(transp)
# api.pool.release(transp)
proc pubsubPublish*(api: DaemonAPI, topic: string,
value: seq[byte]) {.async.} =
## Get list of peer identifiers which are subscribed to topic ``topic``.
var transp = await api.pool.acquire()
# var transp = await api.pool.acquire()
echo "pubsubPublish()"
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestPSPublish(topic, value))
withMessage(pb) do:
discard
finally:
api.pool.release(transp)
await api.closeConnection(transp)
# api.pool.release(transp)
proc getPubsubMessage*(pb: var ProtoBuffer): PubSubMessage =
var item = newSeq[byte]()
@ -1094,7 +1112,8 @@ proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} =
proc pubsubSubscribe*(api: DaemonAPI, topic: string,
handler: P2PPubSubCallback): Future[PubsubTicket] {.async.} =
## Subscribe to topic ``topic``.
var transp = await connect(api.address)
echo "pubsubSubscribe()"
var transp = await api.newConnection()
try:
var pb = await transp.transactMessage(requestPSSubscribe(topic))
pb.withMessage() do:

View File

@ -3,14 +3,14 @@ import asyncdispatch2
import ../libp2p/daemon/daemonapi
proc identitySpawnTest(): Future[bool] {.async.} =
var api = await newDaemonApi(sockpath = "/tmp/p2pd-1.sock")
var api = await newDaemonApi()
var data = await api.identity()
await api.close()
result = true
proc connectStreamTest(): Future[bool] {.async.} =
var api1 = await newDaemonApi(sockpath = "/tmp/p2pd-1.sock")
var api2 = await newDaemonApi(sockpath = "/tmp/p2pd-2.sock")
var api1 = await newDaemonApi()
var api2 = await newDaemonApi()
var id1 = await api1.identity()
var id2 = await api2.identity()
@ -26,6 +26,7 @@ proc connectStreamTest(): Future[bool] {.async.} =
await api2.addHandler(protos, streamHandler)
await api1.connect(id2.peer, id2.addresses)
echo await api1.listPeers()
var stream = await api1.openStream(id2.peer, protos)
let sent = await stream.transp.write(test & "\r\n")
doAssert(sent == len(test) + 2)
@ -50,8 +51,15 @@ proc provideBadCidTest(): Future[bool] {.async.} =
proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
var pubsubData = "TEST MESSAGE"
var msgData = cast[seq[byte]](pubsubData)
var api1 = await newDaemonApi(f)
var api2 = await newDaemonApi(f)
var api1, api2: DaemonAPI
if PSGossipSub in f:
api1 = await newDaemonApi(f, gossipsubHeartbeatInterval = 100,
gossipsubHeartbeatDelay = 100)
api2 = await newDaemonApi(f, gossipsubHeartbeatInterval = 100,
gossipsubHeartbeatDelay = 100)
else:
api1 = await newDaemonApi(f)
api2 = await newDaemonApi(f)
var id1 = await api1.identity()
var id2 = await api2.identity()
@ -60,23 +68,31 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
var topics10 = await api1.pubsubGetTopics()
var peers10 = await api1.pubsubListPeers("test-topic")
var topics20 = await api1.pubsubGetTopics()
var peers20 = await api1.pubsubListPeers("test-topic")
var topics20 = await api2.pubsubGetTopics()
var peers20 = await api2.pubsubListPeers("test-topic")
var handlerFuture1 = newFuture[void]()
var handlerFuture2 = newFuture[void]()
proc pubsubHandler1(api: DaemonAPI,
ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.async.} =
echo "handler1"
let smsg = cast[string](message.data)
if smsg == pubsubData:
inc(resultsCount)
handlerFuture1.complete()
# Callback must return `false` to close subscription channel.
result = false
proc pubsubHandler2(api: DaemonAPI,
ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.async.} =
echo "handler2"
let smsg = cast[string](message.data)
if smsg == pubsubData:
inc(resultsCount)
handlerFuture2.complete()
# Callback must return `false` to close subscription channel.
result = false
@ -84,31 +100,46 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
len(topics20) == 0 and len(peers20) == 0:
# Not subscribed to any topics everything must be 0.
await api1.connect(id2.peer, id2.addresses)
await api2.connect(id1.peer, id1.addresses)
var gpeers1 = await api1.listPeers()
var gpeers2 = await api2.listPeers()
echo "globalPeers1 = ", gpeers1
echo "globalPeers2 = ", gpeers2
var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1)
var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2)
var topics11 = await api1.pubsubGetTopics()
var peers11 = await api1.pubsubListPeers("test-topic")
var topics21 = await api2.pubsubGetTopics()
var peers21 = await api2.pubsubListPeers("test-topic")
var topics1 = await api1.pubsubGetTopics()
var topics2 = await api2.pubsubGetTopics()
if len(topics11) == 1 and len(peers11) == 0 and
len(topics21) == 1 and len(peers21) == 0:
# Subscribed but not yet connected to each other
await sleepAsync(5000)
await api1.connect(id2.peer, id2.addresses)
await sleepAsync(5000)
if len(topics1) == 1 and len(topics2) == 1:
echo "connecting"
var topics12 = await api1.pubsubGetTopics()
var peers12 = await api1.pubsubListPeers("test-topic")
var topics22 = await api2.pubsubGetTopics()
var peers22 = await api2.pubsubListPeers("test-topic")
while true:
var peers1 = await api1.pubsubListPeers("test-topic")
var peers2 = await api2.pubsubListPeers("test-topic")
if len(peers1) == 1 and len(peers2) == 1:
break
echo "pubsubPeers1 = ", peers1
echo "pubsubPeers2 = ", peers2
# var gpeers1 = await api1.listPeers()
# var gpeers2 = await api2.listPeers()
# echo "globalPeers1 = ", gpeers1
# echo "globalPeers2 = ", gpeers2
await sleepAsync(500)
if len(topics12) == 1 and len(peers12) == 1 and
len(topics22) == 1 and len(peers22) == 1:
# Publish test data via api1.
await api1.pubsubPublish("test-topic", msgData)
await sleepAsync(5000)
# if len(topics12) == 1 and len(peers12) == 1 and
# len(topics22) == 1 and len(peers22) == 1:
echo "Publishing"
# Publish test data via api1.
await sleepAsync(500)
await api1.pubsubPublish("test-topic", msgData)
var andfut = handlerFuture1 and handlerFuture2
await andfut or sleepAsync(10000)
await api1.close()
await api2.close()
if resultsCount == 2: