Add PeerInfo string representation procedure.

Use only IPv4 for pubsub tests.
This commit is contained in:
cheatfate 2018-12-11 04:17:36 +02:00
parent 01a268a440
commit fc6902dda2
2 changed files with 46 additions and 58 deletions

View File

@ -10,7 +10,7 @@
## This module implementes API for `go-libp2p-daemon`. ## This module implementes API for `go-libp2p-daemon`.
import os, osproc, strutils, tables, streams import os, osproc, strutils, tables, streams
import asyncdispatch2 import asyncdispatch2
import ../varint, ../multiaddress, ../protobuf/minprotobuf import ../varint, ../multiaddress, ../protobuf/minprotobuf, ../base58
when not defined(windows): when not defined(windows):
import posix import posix
@ -471,11 +471,11 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} =
result = buffer result = buffer
proc newConnection*(api: DaemonAPI): Future[StreamTransport] = proc newConnection*(api: DaemonAPI): Future[StreamTransport] =
echo "Establish new connection to daemon [", $api.address, "]" # echo "Establish new connection to daemon [", $api.address, "]"
result = connect(api.address) result = connect(api.address)
proc closeConnection*(api: DaemonAPI, transp: StreamTransport) {.async.} = proc closeConnection*(api: DaemonAPI, transp: StreamTransport) {.async.} =
echo "Close connection with daemon [", $api.address, "]" # echo "Close connection with daemon [", $api.address, "]"
transp.close() transp.close()
await transp.join() await transp.join()
@ -566,7 +566,7 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
if api.sockname != sockpath: if api.sockname != sockpath:
raise newException(DaemonLocalError, "Socket is already bound!") raise newException(DaemonLocalError, "Socket is already bound!")
# Starting daemon process # Starting daemon process
echo "Spawn [", cmd, " ", args.join(" "), "]" # echo "Spawn [", cmd, " ", args.join(" "), "]"
api.process = startProcess(cmd, "", args, options = {poStdErrToStdOut}) api.process = startProcess(cmd, "", args, options = {poStdErrToStdOut})
# Waiting until daemon will not be bound to control socket. # Waiting until daemon will not be bound to control socket.
while true: while true:
@ -1019,8 +1019,6 @@ proc dhtSearchValue*(api: DaemonAPI, key: string,
proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} = proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} =
## Get list of topics this node is subscribed to. ## Get list of topics this node is subscribed to.
# var transp = await api.pool.acquire()
echo "pubsubGetTopics()"
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
var pb = await transp.transactMessage(requestPSGetTopics()) var pb = await transp.transactMessage(requestPSGetTopics())
@ -1034,14 +1032,11 @@ proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} =
result = topics result = topics
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
# api.pool.release(transp)
proc pubsubListPeers*(api: DaemonAPI, proc pubsubListPeers*(api: DaemonAPI,
topic: string): Future[seq[PeerID]] {.async.} = topic: string): Future[seq[PeerID]] {.async.} =
## Get list of peers we are connected to and which also subscribed to topic ## Get list of peers we are connected to and which also subscribed to topic
## ``topic``. ## ``topic``.
# var transp = await api.pool.acquire()
echo "pubsubListPeers()"
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
var pb = await transp.transactMessage(requestPSListPeers(topic)) var pb = await transp.transactMessage(requestPSListPeers(topic))
@ -1055,13 +1050,10 @@ proc pubsubListPeers*(api: DaemonAPI,
result = peers result = peers
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
# api.pool.release(transp)
proc pubsubPublish*(api: DaemonAPI, topic: string, proc pubsubPublish*(api: DaemonAPI, topic: string,
value: seq[byte]) {.async.} = value: seq[byte]) {.async.} =
## Get list of peer identifiers which are subscribed to topic ``topic``. ## Get list of peer identifiers which are subscribed to topic ``topic``.
# var transp = await api.pool.acquire()
echo "pubsubPublish()"
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
var pb = await transp.transactMessage(requestPSPublish(topic, value)) var pb = await transp.transactMessage(requestPSPublish(topic, value))
@ -1069,7 +1061,6 @@ proc pubsubPublish*(api: DaemonAPI, topic: string,
discard discard
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
# api.pool.release(transp)
proc getPubsubMessage*(pb: var ProtoBuffer): PubSubMessage = proc getPubsubMessage*(pb: var ProtoBuffer): PubSubMessage =
var item = newSeq[byte]() var item = newSeq[byte]()
@ -1112,7 +1103,6 @@ proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} =
proc pubsubSubscribe*(api: DaemonAPI, topic: string, proc pubsubSubscribe*(api: DaemonAPI, topic: string,
handler: P2PPubSubCallback): Future[PubsubTicket] {.async.} = handler: P2PPubSubCallback): Future[PubsubTicket] {.async.} =
## Subscribe to topic ``topic``. ## Subscribe to topic ``topic``.
echo "pubsubSubscribe()"
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
var pb = await transp.transactMessage(requestPSSubscribe(topic)) var pb = await transp.transactMessage(requestPSSubscribe(topic))
@ -1124,6 +1114,22 @@ proc pubsubSubscribe*(api: DaemonAPI, topic: string,
asyncCheck pubsubLoop(api, ticket) asyncCheck pubsubLoop(api, ticket)
result = ticket result = ticket
except: except:
transp.close() await api.closeConnection(transp)
await transp.join()
raise getCurrentException() raise getCurrentException()
proc `$`*(pinfo: PeerInfo): string =
## Get string representation of ``PeerInfo`` object.
result = newStringOfCap(128)
result.add("{PeerID: '")
result.add(Base58.encode(pinfo.peer))
result.add("' Addresses: [")
let length = len(pinfo.addresses)
for i in 0..<length:
result.add("'")
result.add($pinfo.addresses[i])
result.add("'")
if i < length - 1:
result.add(", ")
result.add("]}")
if len(pinfo.addresses) > 0:
result = result

View File

@ -1,6 +1,6 @@
import unittest import unittest
import asyncdispatch2 import asyncdispatch2
import ../libp2p/daemon/daemonapi import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec
proc identitySpawnTest(): Future[bool] {.async.} = proc identitySpawnTest(): Future[bool] {.async.} =
var api = await newDaemonApi() var api = await newDaemonApi()
@ -48,22 +48,28 @@ proc provideBadCidTest(): Future[bool] {.async.} =
finally: finally:
await api.close() await api.close()
proc getOnlyIPv4Addresses(addresses: seq[MultiAddress]): seq[MultiAddress] =
if len(addresses) > 0:
result = newSeqOfCap[MultiAddress](len(addresses))
let ip4 = multiCodec("ip4")
for item in addresses:
if item.protoCode() == ip4:
result.add(item)
proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
var pubsubData = "TEST MESSAGE" var pubsubData = "TEST MESSAGE"
var msgData = cast[seq[byte]](pubsubData) var msgData = cast[seq[byte]](pubsubData)
var api1, api2: DaemonAPI var api1, api2: DaemonAPI
if PSGossipSub in f:
api1 = await newDaemonApi(f, gossipsubHeartbeatInterval = 100, api1 = await newDaemonApi(f)
gossipsubHeartbeatDelay = 100) api2 = await newDaemonApi(f)
api2 = await newDaemonApi(f, gossipsubHeartbeatInterval = 100,
gossipsubHeartbeatDelay = 100)
else:
api1 = await newDaemonApi(f)
api2 = await newDaemonApi(f)
var id1 = await api1.identity() var id1 = await api1.identity()
var id2 = await api2.identity() var id2 = await api2.identity()
echo $id1
echo $id2
var resultsCount = 0 var resultsCount = 0
var topics10 = await api1.pubsubGetTopics() var topics10 = await api1.pubsubGetTopics()
@ -77,7 +83,6 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
proc pubsubHandler1(api: DaemonAPI, proc pubsubHandler1(api: DaemonAPI,
ticket: PubsubTicket, ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.async.} = message: PubSubMessage): Future[bool] {.async.} =
echo "handler1"
let smsg = cast[string](message.data) let smsg = cast[string](message.data)
if smsg == pubsubData: if smsg == pubsubData:
inc(resultsCount) inc(resultsCount)
@ -88,7 +93,6 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
proc pubsubHandler2(api: DaemonAPI, proc pubsubHandler2(api: DaemonAPI,
ticket: PubsubTicket, ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.async.} = message: PubSubMessage): Future[bool] {.async.} =
echo "handler2"
let smsg = cast[string](message.data) let smsg = cast[string](message.data)
if smsg == pubsubData: if smsg == pubsubData:
inc(resultsCount) inc(resultsCount)
@ -100,13 +104,8 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
len(topics20) == 0 and len(peers20) == 0: len(topics20) == 0 and len(peers20) == 0:
# Not subscribed to any topics everything must be 0. # Not subscribed to any topics everything must be 0.
await api1.connect(id2.peer, id2.addresses) await api1.connect(id2.peer, getOnlyIPv4Addresses(id2.addresses))
await api2.connect(id1.peer, id1.addresses) await api2.connect(id1.peer, getOnlyIPv4Addresses(id1.addresses))
var gpeers1 = await api1.listPeers()
var gpeers2 = await api2.listPeers()
echo "globalPeers1 = ", gpeers1
echo "globalPeers2 = ", gpeers2
var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1) var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1)
var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2) var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2)
@ -115,30 +114,14 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
var topics2 = await api2.pubsubGetTopics() var topics2 = await api2.pubsubGetTopics()
if len(topics1) == 1 and len(topics2) == 1: if len(topics1) == 1 and len(topics2) == 1:
echo "connecting" var peers1 = await api1.pubsubListPeers("test-topic")
var peers2 = await api2.pubsubListPeers("test-topic")
while true: if len(peers1) == 1 and len(peers2) == 1:
var peers1 = await api1.pubsubListPeers("test-topic") # Publish test data via api1.
var peers2 = await api2.pubsubListPeers("test-topic")
if len(peers1) == 1 and len(peers2) == 1:
break
echo "pubsubPeers1 = ", peers1
echo "pubsubPeers2 = ", peers2
# var gpeers1 = await api1.listPeers()
# var gpeers2 = await api2.listPeers()
# echo "globalPeers1 = ", gpeers1
# echo "globalPeers2 = ", gpeers2
await sleepAsync(500) await sleepAsync(500)
await api1.pubsubPublish("test-topic", msgData)
var andfut = handlerFuture1 and handlerFuture2
# if len(topics12) == 1 and len(peers12) == 1 and await andfut or sleepAsync(10000)
# len(topics22) == 1 and len(peers22) == 1:
echo "Publishing"
# Publish test data via api1.
await sleepAsync(500)
await api1.pubsubPublish("test-topic", msgData)
var andfut = handlerFuture1 and handlerFuture2
await andfut or sleepAsync(10000)
await api1.close() await api1.close()
await api2.close() await api2.close()
@ -162,4 +145,3 @@ when isMainModule:
test "FloodSub test": test "FloodSub test":
check: check:
waitFor(pubsubTest({PSFloodSub})) == true waitFor(pubsubTest({PSFloodSub})) == true