diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index 4c3d578..b56f9f9 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -10,7 +10,7 @@ ## This module implementes API for `go-libp2p-daemon`. import os, osproc, strutils, tables, streams import asyncdispatch2 -import ../varint, ../multiaddress, ../protobuf/minprotobuf +import ../varint, ../multiaddress, ../protobuf/minprotobuf, ../base58 when not defined(windows): import posix @@ -471,11 +471,11 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = result = buffer proc newConnection*(api: DaemonAPI): Future[StreamTransport] = - echo "Establish new connection to daemon [", $api.address, "]" + # echo "Establish new connection to daemon [", $api.address, "]" result = connect(api.address) proc closeConnection*(api: DaemonAPI, transp: StreamTransport) {.async.} = - echo "Close connection with daemon [", $api.address, "]" + # echo "Close connection with daemon [", $api.address, "]" transp.close() await transp.join() @@ -566,7 +566,7 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, if api.sockname != sockpath: raise newException(DaemonLocalError, "Socket is already bound!") # Starting daemon process - echo "Spawn [", cmd, " ", args.join(" "), "]" + # echo "Spawn [", cmd, " ", args.join(" "), "]" api.process = startProcess(cmd, "", args, options = {poStdErrToStdOut}) # Waiting until daemon will not be bound to control socket. while true: @@ -1019,8 +1019,6 @@ proc dhtSearchValue*(api: DaemonAPI, key: string, proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} = ## Get list of topics this node is subscribed to. - # var transp = await api.pool.acquire() - echo "pubsubGetTopics()" var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestPSGetTopics()) @@ -1034,14 +1032,11 @@ proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} = result = topics finally: await api.closeConnection(transp) - # api.pool.release(transp) proc pubsubListPeers*(api: DaemonAPI, topic: string): Future[seq[PeerID]] {.async.} = ## Get list of peers we are connected to and which also subscribed to topic ## ``topic``. - # var transp = await api.pool.acquire() - echo "pubsubListPeers()" var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestPSListPeers(topic)) @@ -1055,13 +1050,10 @@ proc pubsubListPeers*(api: DaemonAPI, result = peers finally: await api.closeConnection(transp) - # api.pool.release(transp) proc pubsubPublish*(api: DaemonAPI, topic: string, value: seq[byte]) {.async.} = ## Get list of peer identifiers which are subscribed to topic ``topic``. - # var transp = await api.pool.acquire() - echo "pubsubPublish()" var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestPSPublish(topic, value)) @@ -1069,7 +1061,6 @@ proc pubsubPublish*(api: DaemonAPI, topic: string, discard finally: await api.closeConnection(transp) - # api.pool.release(transp) proc getPubsubMessage*(pb: var ProtoBuffer): PubSubMessage = var item = newSeq[byte]() @@ -1112,7 +1103,6 @@ proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} = proc pubsubSubscribe*(api: DaemonAPI, topic: string, handler: P2PPubSubCallback): Future[PubsubTicket] {.async.} = ## Subscribe to topic ``topic``. - echo "pubsubSubscribe()" var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestPSSubscribe(topic)) @@ -1124,6 +1114,22 @@ proc pubsubSubscribe*(api: DaemonAPI, topic: string, asyncCheck pubsubLoop(api, ticket) result = ticket except: - transp.close() - await transp.join() + await api.closeConnection(transp) raise getCurrentException() + +proc `$`*(pinfo: PeerInfo): string = + ## Get string representation of ``PeerInfo`` object. + result = newStringOfCap(128) + result.add("{PeerID: '") + result.add(Base58.encode(pinfo.peer)) + result.add("' Addresses: [") + let length = len(pinfo.addresses) + for i in 0.. 0: + result = result diff --git a/tests/testdaemon.nim b/tests/testdaemon.nim index 1952d30..8d70309 100644 --- a/tests/testdaemon.nim +++ b/tests/testdaemon.nim @@ -1,6 +1,6 @@ import unittest import asyncdispatch2 -import ../libp2p/daemon/daemonapi +import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec proc identitySpawnTest(): Future[bool] {.async.} = var api = await newDaemonApi() @@ -48,22 +48,28 @@ proc provideBadCidTest(): Future[bool] {.async.} = finally: await api.close() +proc getOnlyIPv4Addresses(addresses: seq[MultiAddress]): seq[MultiAddress] = + if len(addresses) > 0: + result = newSeqOfCap[MultiAddress](len(addresses)) + let ip4 = multiCodec("ip4") + for item in addresses: + if item.protoCode() == ip4: + result.add(item) + proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = var pubsubData = "TEST MESSAGE" var msgData = cast[seq[byte]](pubsubData) var api1, api2: DaemonAPI - if PSGossipSub in f: - api1 = await newDaemonApi(f, gossipsubHeartbeatInterval = 100, - gossipsubHeartbeatDelay = 100) - api2 = await newDaemonApi(f, gossipsubHeartbeatInterval = 100, - gossipsubHeartbeatDelay = 100) - else: - api1 = await newDaemonApi(f) - api2 = await newDaemonApi(f) + + api1 = await newDaemonApi(f) + api2 = await newDaemonApi(f) var id1 = await api1.identity() var id2 = await api2.identity() + echo $id1 + echo $id2 + var resultsCount = 0 var topics10 = await api1.pubsubGetTopics() @@ -77,7 +83,6 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = proc pubsubHandler1(api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage): Future[bool] {.async.} = - echo "handler1" let smsg = cast[string](message.data) if smsg == pubsubData: inc(resultsCount) @@ -88,7 +93,6 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = proc pubsubHandler2(api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage): Future[bool] {.async.} = - echo "handler2" let smsg = cast[string](message.data) if smsg == pubsubData: inc(resultsCount) @@ -100,13 +104,8 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = len(topics20) == 0 and len(peers20) == 0: # Not subscribed to any topics everything must be 0. - await api1.connect(id2.peer, id2.addresses) - await api2.connect(id1.peer, id1.addresses) - - var gpeers1 = await api1.listPeers() - var gpeers2 = await api2.listPeers() - echo "globalPeers1 = ", gpeers1 - echo "globalPeers2 = ", gpeers2 + await api1.connect(id2.peer, getOnlyIPv4Addresses(id2.addresses)) + await api2.connect(id1.peer, getOnlyIPv4Addresses(id1.addresses)) var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1) var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2) @@ -115,31 +114,15 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = var topics2 = await api2.pubsubGetTopics() if len(topics1) == 1 and len(topics2) == 1: - echo "connecting" - - while true: - var peers1 = await api1.pubsubListPeers("test-topic") - var peers2 = await api2.pubsubListPeers("test-topic") - if len(peers1) == 1 and len(peers2) == 1: - break - echo "pubsubPeers1 = ", peers1 - echo "pubsubPeers2 = ", peers2 - # var gpeers1 = await api1.listPeers() - # var gpeers2 = await api2.listPeers() - # echo "globalPeers1 = ", gpeers1 - # echo "globalPeers2 = ", gpeers2 + var peers1 = await api1.pubsubListPeers("test-topic") + var peers2 = await api2.pubsubListPeers("test-topic") + if len(peers1) == 1 and len(peers2) == 1: + # Publish test data via api1. await sleepAsync(500) - + await api1.pubsubPublish("test-topic", msgData) + var andfut = handlerFuture1 and handlerFuture2 + await andfut or sleepAsync(10000) - # if len(topics12) == 1 and len(peers12) == 1 and - # len(topics22) == 1 and len(peers22) == 1: - echo "Publishing" - # Publish test data via api1. - await sleepAsync(500) - await api1.pubsubPublish("test-topic", msgData) - var andfut = handlerFuture1 and handlerFuture2 - await andfut or sleepAsync(10000) - await api1.close() await api2.close() if resultsCount == 2: @@ -162,4 +145,3 @@ when isMainModule: test "FloodSub test": check: waitFor(pubsubTest({PSFloodSub})) == true -