diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index 3c4f13b59..4c3d57835 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -10,7 +10,7 @@ ## This module implementes API for `go-libp2p-daemon`. import os, osproc, strutils, tables, streams import asyncdispatch2 -import ../varint, ../multiaddress, ../protobuf/minprotobuf, transpool +import ../varint, ../multiaddress, ../protobuf/minprotobuf when not defined(windows): import posix @@ -97,7 +97,7 @@ type address*: TransportAddress DaemonAPI* = ref object - pool*: TransportPool + # pool*: TransportPool flags*: set[P2PDaemonFlags] address*: TransportAddress sockname*: string @@ -470,6 +470,15 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = result = buffer +proc newConnection*(api: DaemonAPI): Future[StreamTransport] = + echo "Establish new connection to daemon [", $api.address, "]" + result = connect(api.address) + +proc closeConnection*(api: DaemonAPI, transp: StreamTransport) {.async.} = + echo "Close connection with daemon [", $api.address, "]" + transp.close() + await transp.join() + proc socketExists(filename: string): bool = var res: Stat result = stat(filename, res) >= 0'i32 @@ -485,6 +494,7 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, gossipsubHeartbeatDelay = 0): Future[DaemonAPI] {.async.} = ## Initialize connections to `go-libp2p-daemon` control socket. var api = new DaemonAPI + var args = newSeq[string]() api.flags = flags api.servers = newSeq[P2PServer]() api.pattern = pattern @@ -504,9 +514,8 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, # We will start daemon process only when control socket path is not default or # options are specified. if flags == {} and api.sockname == DefaultSocketPath: - api.pool = await newPool(initTAddress(api.sockname), poolsize = poolSize) + discard else: - var args = newSeq[string]() # DHTFull and DHTClient could not be present at the same time if P2PDaemonFlags.DHTFull in flags and P2PDaemonFlags.DHTClient in flags: api.flags.excl(DHTClient) @@ -523,15 +532,15 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, if P2PDaemonFlags.PSGossipSub in api.flags: args.add("-pubsub") args.add("-pubsubRouter=gossipsub") - if P2PDaemonFlags.PSFloodSub in api.flags: - args.add("-pubsub") - args.add("-pubsubRouter=floodsub") if gossipsubHeartbeatInterval != 0: let param = $gossipsubHeartbeatInterval & "ms" args.add("-gossipsubHeartbeatInterval=" & param) if gossipsubHeartbeatDelay != 0: let param = $gossipsubHeartbeatDelay & "ms" args.add("-gossipsubHeartbeatInitialDelay=" & param) + if P2PDaemonFlags.PSFloodSub in api.flags: + args.add("-pubsub") + args.add("-pubsubRouter=floodsub") if api.flags * {P2PDaemonFlags.PSFloodSub, P2PDaemonFlags.PSFloodSub} != {}: if P2PDaemonFlags.PSSign in api.flags: args.add("-pubsubSign=true") @@ -543,31 +552,32 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, args.add("-id=" & id) if api.sockname != DefaultSocketPath: args.add("-sock=" & api.sockname) - # We are trying to get absolute daemon path. - let cmd = findExe(daemon) - if len(cmd) == 0: - raise newException(DaemonLocalError, "Could not find daemon executable!") - # We will try to remove control socket file, because daemon will fail - # if its not able to create new socket control file. - # We can't use `existsFile()` because it do not support unix-domain socket - # endpoints. - if socketExists(api.sockname): - if not tryRemoveFile(api.sockname): - if api.sockname != sockpath: - raise newException(DaemonLocalError, "Socket is already bound!") - # Starting daemon process - api.process = startProcess(cmd, "", args, options = {poStdErrToStdOut}) - # Waiting until daemon will not be bound to control socket. - while true: - if not api.process.running(): - echo api.process.errorStream.readAll() - raise newException(DaemonLocalError, - "Daemon executable could not be started!") - if socketExists(api.sockname): - break - await sleepAsync(100) - api.pool = await newPool(initTAddress(api.sockname), poolsize = poolSize) + # We are trying to get absolute daemon path. + let cmd = findExe(daemon) + if len(cmd) == 0: + raise newException(DaemonLocalError, "Could not find daemon executable!") + # We will try to remove control socket file, because daemon will fail + # if its not able to create new socket control file. + # We can't use `existsFile()` because it do not support unix-domain socket + # endpoints. + if socketExists(api.sockname): + if not tryRemoveFile(api.sockname): + if api.sockname != sockpath: + raise newException(DaemonLocalError, "Socket is already bound!") + # Starting daemon process + echo "Spawn [", cmd, " ", args.join(" "), "]" + api.process = startProcess(cmd, "", args, options = {poStdErrToStdOut}) + # Waiting until daemon will not be bound to control socket. + while true: + if not api.process.running(): + echo api.process.errorStream.readAll() + raise newException(DaemonLocalError, + "Daemon executable could not be started!") + if socketExists(api.sockname): + break + await sleepAsync(100) + # api.pool = await newPool(api.address, poolsize = poolSize) result = api proc close*(stream: P2PStream) {.async.} = @@ -582,7 +592,7 @@ proc close*(stream: P2PStream) {.async.} = proc close*(api: DaemonAPI) {.async.} = ## Shutdown connections to `go-libp2p-daemon` control socket. - await api.pool.close() + # await api.pool.close() # Closing all pending servers. if len(api.servers) > 0: var pending = newSeq[Future[void]]() @@ -636,7 +646,7 @@ proc getPeerInfo(pb: var ProtoBuffer): PeerInfo = proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} = ## Get Node identity information - var transp = await api.pool.acquire() + var transp = await api.newConnection() try: var pb = await transactMessage(transp, requestIdentity()) pb.withMessage() do: @@ -644,37 +654,37 @@ proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} = if res == cast[int](ResponseType.IDENTITY): result = pb.getPeerInfo() finally: - api.pool.release(transp) + await api.closeConnection(transp) proc connect*(api: DaemonAPI, peer: PeerID, addresses: seq[MultiAddress], timeout = 0) {.async.} = ## Connect to remote peer with id ``peer`` and addresses ``addresses``. - var transp = await api.pool.acquire() + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestConnect(peer, addresses, timeout)) pb.withMessage() do: discard finally: - api.pool.release(transp) + await api.closeConnection(transp) proc disconnect*(api: DaemonAPI, peer: PeerID) {.async.} = ## Disconnect from remote peer with id ``peer``. - var transp = await api.pool.acquire() + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestDisconnect(peer)) pb.withMessage() do: discard finally: - api.pool.release(transp) + await api.closeConnection(transp) proc openStream*(api: DaemonAPI, peer: PeerID, protocols: seq[string], timeout = 0): Future[P2PStream] {.async.} = ## Open new stream to peer ``peer`` using one of the protocols in ## ``protocols``. Returns ``StreamTransport`` for the stream. - var transp = await connect(api.address) + var transp = await api.newConnection() var stream = new P2PStream try: var pb = await transp.transactMessage(requestStreamOpen(peer, protocols, @@ -696,8 +706,7 @@ proc openStream*(api: DaemonAPI, peer: PeerID, stream.transp = transp result = stream except: - transp.close() - await transp.join() + await api.closeConnection(transp) raise getCurrentException() proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} = @@ -725,7 +734,7 @@ proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} = proc addHandler*(api: DaemonAPI, protocols: seq[string], handler: P2PStreamCallback) {.async.} = ## Add stream handler ``handler`` for set of protocols ``protocols``. - var transp = await api.pool.acquire() + var transp = await api.newConnection() var sockname = api.pattern % [$api.ucounter] var localaddr = initTAddress(sockname) inc(api.ucounter) @@ -746,11 +755,11 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string], await server.join() raise getCurrentException() finally: - api.pool.release(transp) + await api.closeConnection(transp) proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} = ## Get list of remote peers to which we are currently connected. - var transp = await api.pool.acquire() + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestListPeers()) pb.withMessage() do: @@ -765,38 +774,38 @@ proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} = pb.skipSubmessage() res = pb.enterSubmessage() finally: - api.pool.release(transp) + await api.closeConnection(transp) proc cmTagPeer*(api: DaemonAPI, peer: PeerID, tag: string, weight: int) {.async.} = ## Tag peer with id ``peer`` using ``tag`` and ``weight``. - var transp = await api.pool.acquire() + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestCMTagPeer(peer, tag, weight)) withMessage(pb) do: discard finally: - api.pool.release(transp) + await api.closeConnection(transp) proc cmUntagPeer*(api: DaemonAPI, peer: PeerID, tag: string) {.async.} = ## Remove tag ``tag`` from peer with id ``peer``. - var transp = await api.pool.acquire() + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestCMUntagPeer(peer, tag)) withMessage(pb) do: discard finally: - api.pool.release(transp) + await api.closeConnection(transp) proc cmTrimPeers*(api: DaemonAPI) {.async.} = ## Trim all connections. - var transp = await api.pool.acquire() + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestCMTrim()) withMessage(pb) do: discard finally: - api.pool.release(transp) + await api.closeConnection(transp) proc dhtGetSinglePeerInfo(pb: var ProtoBuffer): PeerInfo = if pb.enterSubmessage() == 2: @@ -842,14 +851,14 @@ proc dhtFindPeer*(api: DaemonAPI, peer: PeerID, ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. - var transp = await api.pool.acquire() + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestDHTFindPeer(peer, timeout)) withMessage(pb) do: pb.enterDhtMessage(DHTResponseType.VALUE) result = pb.dhtGetSinglePeerInfo() finally: - api.pool.release(transp) + await api.closeConnection(transp) proc dhtGetPublicKey*(api: DaemonAPI, peer: PeerID, timeout = 0): Future[LibP2PPublicKey] {.async.} = @@ -857,14 +866,14 @@ proc dhtGetPublicKey*(api: DaemonAPI, peer: PeerID, ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. - var transp = await api.pool.acquire() + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestDHTGetPublicKey(peer, timeout)) withMessage(pb) do: pb.enterDhtMessage(DHTResponseType.VALUE) result = pb.dhtGetSingleValue() finally: - api.pool.release(transp) + await api.closeConnection(transp) proc dhtGetValue*(api: DaemonAPI, key: string, timeout = 0): Future[seq[byte]] {.async.} = @@ -872,14 +881,14 @@ proc dhtGetValue*(api: DaemonAPI, key: string, ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. - var transp = await api.pool.acquire() + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestDHTGetValue(key, timeout)) withMessage(pb) do: pb.enterDhtMessage(DHTResponseType.VALUE) result = pb.dhtGetSingleValue() finally: - api.pool.release(transp) + await api.closeConnection(transp) proc dhtPutValue*(api: DaemonAPI, key: string, value: seq[byte], timeout = 0) {.async.} = @@ -887,27 +896,27 @@ proc dhtPutValue*(api: DaemonAPI, key: string, value: seq[byte], ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. - var transp = await api.pool.acquire() + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestDHTPutValue(key, value, timeout)) withMessage(pb) do: discard finally: - api.pool.release(transp) + await api.closeConnection(transp) proc dhtProvide*(api: DaemonAPI, cid: CID, timeout = 0) {.async.} = ## Provide content with id ``cid``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. - var transp = await api.pool.acquire() + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestDHTProvide(cid, timeout)) withMessage(pb) do: discard finally: - api.pool.release(transp) + await api.closeConnection(transp) proc dhtFindPeersConnectedToPeer*(api: DaemonAPI, peer: PeerID, timeout = 0): Future[seq[PeerInfo]] {.async.} = @@ -915,7 +924,7 @@ proc dhtFindPeersConnectedToPeer*(api: DaemonAPI, peer: PeerID, ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. - var transp = await api.pool.acquire() + var transp = await api.newConnection() var list = newSeq[PeerInfo]() try: let spb = requestDHTFindPeersConnectedToPeer(peer, timeout) @@ -932,7 +941,7 @@ proc dhtFindPeersConnectedToPeer*(api: DaemonAPI, peer: PeerID, list.add(cpb.dhtGetSinglePeerInfo()) result = list finally: - api.pool.release(transp) + await api.closeConnection(transp) proc dhtGetClosestPeers*(api: DaemonAPI, key: string, timeout = 0): Future[seq[PeerID]] {.async.} = @@ -940,7 +949,7 @@ proc dhtGetClosestPeers*(api: DaemonAPI, key: string, ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. - var transp = await api.pool.acquire() + var transp = await api.newConnection() var list = newSeq[PeerID]() try: let spb = requestDHTGetClosestPeers(key, timeout) @@ -957,7 +966,7 @@ proc dhtGetClosestPeers*(api: DaemonAPI, key: string, list.add(cpb.dhtGetSingleValue()) result = list finally: - api.pool.release(transp) + await api.closeConnection(transp) proc dhtFindProviders*(api: DaemonAPI, cid: CID, count: uint32, timeout = 0): Future[seq[PeerInfo]] {.async.} = @@ -965,7 +974,7 @@ proc dhtFindProviders*(api: DaemonAPI, cid: CID, count: uint32, ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. - var transp = await api.pool.acquire() + var transp = await api.newConnection() var list = newSeq[PeerInfo]() try: let spb = requestDHTFindProviders(cid, count, timeout) @@ -982,7 +991,7 @@ proc dhtFindProviders*(api: DaemonAPI, cid: CID, count: uint32, list.add(cpb.dhtGetSinglePeerInfo()) result = list finally: - api.pool.release(transp) + await api.closeConnection(transp) proc dhtSearchValue*(api: DaemonAPI, key: string, timeout = 0): Future[seq[seq[byte]]] {.async.} = @@ -990,7 +999,7 @@ proc dhtSearchValue*(api: DaemonAPI, key: string, ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. - var transp = await api.pool.acquire() + var transp = await api.newConnection() var list = newSeq[seq[byte]]() try: var pb = await transp.transactMessage(requestDHTSearchValue(key, timeout)) @@ -1006,11 +1015,13 @@ proc dhtSearchValue*(api: DaemonAPI, key: string, list.add(cpb.dhtGetSingleValue()) result = list finally: - api.pool.release(transp) + await api.closeConnection(transp) proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} = ## Get list of topics this node is subscribed to. - var transp = await api.pool.acquire() + # var transp = await api.pool.acquire() + echo "pubsubGetTopics()" + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestPSGetTopics()) withMessage(pb) do: @@ -1022,13 +1033,16 @@ proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} = topic.setLen(0) result = topics finally: - api.pool.release(transp) + await api.closeConnection(transp) + # api.pool.release(transp) proc pubsubListPeers*(api: DaemonAPI, topic: string): Future[seq[PeerID]] {.async.} = ## Get list of peers we are connected to and which also subscribed to topic ## ``topic``. - var transp = await api.pool.acquire() + # var transp = await api.pool.acquire() + echo "pubsubListPeers()" + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestPSListPeers(topic)) withMessage(pb) do: @@ -1040,18 +1054,22 @@ proc pubsubListPeers*(api: DaemonAPI, peer.setLen(0) result = peers finally: - api.pool.release(transp) + await api.closeConnection(transp) + # api.pool.release(transp) proc pubsubPublish*(api: DaemonAPI, topic: string, value: seq[byte]) {.async.} = ## Get list of peer identifiers which are subscribed to topic ``topic``. - var transp = await api.pool.acquire() + # var transp = await api.pool.acquire() + echo "pubsubPublish()" + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestPSPublish(topic, value)) withMessage(pb) do: discard finally: - api.pool.release(transp) + await api.closeConnection(transp) + # api.pool.release(transp) proc getPubsubMessage*(pb: var ProtoBuffer): PubSubMessage = var item = newSeq[byte]() @@ -1094,7 +1112,8 @@ proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} = proc pubsubSubscribe*(api: DaemonAPI, topic: string, handler: P2PPubSubCallback): Future[PubsubTicket] {.async.} = ## Subscribe to topic ``topic``. - var transp = await connect(api.address) + echo "pubsubSubscribe()" + var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestPSSubscribe(topic)) pb.withMessage() do: diff --git a/tests/testdaemon.nim b/tests/testdaemon.nim index 2b88ab98c..1952d30a0 100644 --- a/tests/testdaemon.nim +++ b/tests/testdaemon.nim @@ -3,14 +3,14 @@ import asyncdispatch2 import ../libp2p/daemon/daemonapi proc identitySpawnTest(): Future[bool] {.async.} = - var api = await newDaemonApi(sockpath = "/tmp/p2pd-1.sock") + var api = await newDaemonApi() var data = await api.identity() await api.close() result = true proc connectStreamTest(): Future[bool] {.async.} = - var api1 = await newDaemonApi(sockpath = "/tmp/p2pd-1.sock") - var api2 = await newDaemonApi(sockpath = "/tmp/p2pd-2.sock") + var api1 = await newDaemonApi() + var api2 = await newDaemonApi() var id1 = await api1.identity() var id2 = await api2.identity() @@ -26,6 +26,7 @@ proc connectStreamTest(): Future[bool] {.async.} = await api2.addHandler(protos, streamHandler) await api1.connect(id2.peer, id2.addresses) + echo await api1.listPeers() var stream = await api1.openStream(id2.peer, protos) let sent = await stream.transp.write(test & "\r\n") doAssert(sent == len(test) + 2) @@ -50,8 +51,15 @@ proc provideBadCidTest(): Future[bool] {.async.} = proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = var pubsubData = "TEST MESSAGE" var msgData = cast[seq[byte]](pubsubData) - var api1 = await newDaemonApi(f) - var api2 = await newDaemonApi(f) + var api1, api2: DaemonAPI + if PSGossipSub in f: + api1 = await newDaemonApi(f, gossipsubHeartbeatInterval = 100, + gossipsubHeartbeatDelay = 100) + api2 = await newDaemonApi(f, gossipsubHeartbeatInterval = 100, + gossipsubHeartbeatDelay = 100) + else: + api1 = await newDaemonApi(f) + api2 = await newDaemonApi(f) var id1 = await api1.identity() var id2 = await api2.identity() @@ -60,23 +68,31 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = var topics10 = await api1.pubsubGetTopics() var peers10 = await api1.pubsubListPeers("test-topic") - var topics20 = await api1.pubsubGetTopics() - var peers20 = await api1.pubsubListPeers("test-topic") + var topics20 = await api2.pubsubGetTopics() + var peers20 = await api2.pubsubListPeers("test-topic") + + var handlerFuture1 = newFuture[void]() + var handlerFuture2 = newFuture[void]() proc pubsubHandler1(api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage): Future[bool] {.async.} = + echo "handler1" let smsg = cast[string](message.data) if smsg == pubsubData: inc(resultsCount) + handlerFuture1.complete() # Callback must return `false` to close subscription channel. + result = false proc pubsubHandler2(api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage): Future[bool] {.async.} = + echo "handler2" let smsg = cast[string](message.data) if smsg == pubsubData: inc(resultsCount) + handlerFuture2.complete() # Callback must return `false` to close subscription channel. result = false @@ -84,31 +100,46 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = len(topics20) == 0 and len(peers20) == 0: # Not subscribed to any topics everything must be 0. + await api1.connect(id2.peer, id2.addresses) + await api2.connect(id1.peer, id1.addresses) + + var gpeers1 = await api1.listPeers() + var gpeers2 = await api2.listPeers() + echo "globalPeers1 = ", gpeers1 + echo "globalPeers2 = ", gpeers2 + var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1) var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2) - var topics11 = await api1.pubsubGetTopics() - var peers11 = await api1.pubsubListPeers("test-topic") - var topics21 = await api2.pubsubGetTopics() - var peers21 = await api2.pubsubListPeers("test-topic") + var topics1 = await api1.pubsubGetTopics() + var topics2 = await api2.pubsubGetTopics() - if len(topics11) == 1 and len(peers11) == 0 and - len(topics21) == 1 and len(peers21) == 0: - # Subscribed but not yet connected to each other - await sleepAsync(5000) - await api1.connect(id2.peer, id2.addresses) - await sleepAsync(5000) + if len(topics1) == 1 and len(topics2) == 1: + echo "connecting" - var topics12 = await api1.pubsubGetTopics() - var peers12 = await api1.pubsubListPeers("test-topic") - var topics22 = await api2.pubsubGetTopics() - var peers22 = await api2.pubsubListPeers("test-topic") + while true: + var peers1 = await api1.pubsubListPeers("test-topic") + var peers2 = await api2.pubsubListPeers("test-topic") + if len(peers1) == 1 and len(peers2) == 1: + break + echo "pubsubPeers1 = ", peers1 + echo "pubsubPeers2 = ", peers2 + # var gpeers1 = await api1.listPeers() + # var gpeers2 = await api2.listPeers() + # echo "globalPeers1 = ", gpeers1 + # echo "globalPeers2 = ", gpeers2 + await sleepAsync(500) + - if len(topics12) == 1 and len(peers12) == 1 and - len(topics22) == 1 and len(peers22) == 1: - # Publish test data via api1. - await api1.pubsubPublish("test-topic", msgData) - await sleepAsync(5000) + # if len(topics12) == 1 and len(peers12) == 1 and + # len(topics22) == 1 and len(peers22) == 1: + echo "Publishing" + # Publish test data via api1. + await sleepAsync(500) + await api1.pubsubPublish("test-topic", msgData) + var andfut = handlerFuture1 and handlerFuture2 + await andfut or sleepAsync(10000) + await api1.close() await api2.close() if resultsCount == 2: