Fixes for pubsub methods and full tests for both methods.

This commit is contained in:
cheatfate 2018-12-09 18:44:20 +02:00
parent b486410ac0
commit 7e5f52afff
3 changed files with 350 additions and 49 deletions

View File

@ -17,6 +17,7 @@ when not defined(windows):
const
DefaultSocketPath* = "/tmp/p2pd.sock"
DefaultSocketPattern* = "/tmp/p2pd-$1.sock"
DefaultDaemonFile* = "p2pd"
type
@ -47,6 +48,12 @@ type
UNTAG_PEER = 1,
TRIM = 2
PSRequestType* {.pure.} = enum
GET_TOPICS = 0,
LIST_PEERS = 1,
PUBLISH = 2,
SUBSCRIBE = 3
ResponseKind* = enum
Malformed,
Error,
@ -65,15 +72,8 @@ type
VALUE = 1,
END = 2
PSResponseType* {.pure.} = enum
GET_TOPIC = 0,
LIST_PEERS = 1,
PUBLISH = 2,
SUBSCRIBE = 3
PeerID* = seq[byte]
MultiProtocol* = string
# MultiAddress* = seq[byte]
CID* = seq[byte]
LibP2PPublicKey* = seq[byte]
DHTValue* = seq[byte]
@ -82,7 +82,8 @@ type
None, Closed, Inbound, Outbound
P2PDaemonFlags* {.pure.} = enum
DHTClient, DHTFull, Bootstrap
DHTClient, DHTFull, Bootstrap,
PSFloodSub, PSGossipSub, PSSign, PSStrictSign
P2PStream* = ref object
flags*: set[P2PStreamFlags]
@ -106,12 +107,31 @@ type
peer*: PeerID
addresses*: seq[MultiAddress]
PubsubTicket* = ref object
topic*: string
handler*: P2PPubSubCallback
transp*: StreamTransport
PubSubMessage* = object
peer*: PeerID
data*: seq[byte]
seqno*: seq[byte]
topics*: seq[string]
signature*: seq[byte]
key*: seq[byte]
P2PStreamCallback* = proc(api: DaemonAPI,
stream: P2PStream): Future[void] {.gcsafe.}
P2PPubSubCallback* = proc(api: DaemonAPI,
ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.gcsafe.}
DaemonRemoteError* = object of Exception
DaemonLocalError* = object of Exception
var daemonsCount {.threadvar.}: int
proc requestIdentity(): ProtoBuffer =
## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go
## Processing function `doIdentify(req *pb.Request)`.
@ -359,6 +379,58 @@ proc requestCMTrim(): ProtoBuffer =
result.write(initProtoField(6, msg))
result.finish()
proc requestPSGetTopics(): ProtoBuffer =
## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go
## Processing function `doPubsubGetTopics(req *pb.PSRequest)`.
let msgid = cast[uint](PSRequestType.GET_TOPICS)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.PUBSUB)))
result.write(initProtoField(8, msg))
result.finish()
proc requestPSListPeers(topic: string): ProtoBuffer =
## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go
## Processing function `doPubsubListPeers(req *pb.PSRequest)`.
let msgid = cast[uint](PSRequestType.LIST_PEERS)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(2, topic))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.PUBSUB)))
result.write(initProtoField(8, msg))
result.finish()
proc requestPSPublish(topic: string, data: openarray[byte]): ProtoBuffer =
## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go
## Processing function `doPubsubPublish(req *pb.PSRequest)`.
let msgid = cast[uint](PSRequestType.PUBLISH)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(2, topic))
msg.write(initProtoField(3, data))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.PUBSUB)))
result.write(initProtoField(8, msg))
result.finish()
proc requestPSSubscribe(topic: string): ProtoBuffer =
## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go
## Processing function `doPubsubSubscribe(req *pb.PSRequest)`.
let msgid = cast[uint](PSRequestType.SUBSCRIBE)
result = initProtoBuffer({WithVarintLength})
var msg = initProtoBuffer()
msg.write(initProtoField(1, msgid))
msg.write(initProtoField(2, topic))
msg.finish()
result.write(initProtoField(1, cast[uint](RequestType.PUBSUB)))
result.write(initProtoField(8, msg))
result.finish()
proc checkResponse(pb: var ProtoBuffer): ResponseKind {.inline.} =
result = ResponseKind.Malformed
var value: uint64
@ -379,15 +451,19 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} =
length: int
res: VarintStatus
var buffer = newSeq[byte](10)
for i in 0..<len(buffer):
await conn.readExactly(addr buffer[i], 1)
res = PB.getUVarint(buffer.toOpenArray(0, i), length, size)
if res == VarintStatus.Success:
break
if res != VarintStatus.Success or size > MaxMessageSize:
raise newException(ValueError, "Invalid message size")
buffer.setLen(size)
await conn.readExactly(addr buffer[0], int(size))
try:
for i in 0..<len(buffer):
await conn.readExactly(addr buffer[i], 1)
res = PB.getUVarint(buffer.toOpenArray(0, i), length, size)
if res == VarintStatus.Success:
break
if res != VarintStatus.Success or size > MaxMessageSize:
buffer.setLen(0)
buffer.setLen(size)
await conn.readExactly(addr buffer[0], int(size))
except TransportIncompleteError:
buffer.setLen(0)
result = buffer
proc socketExists(filename: string): bool =
@ -400,36 +476,67 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
daemon = DefaultDaemonFile,
sockpath = DefaultSocketPath,
pattern = "/tmp/nim-p2pd-$1.sock",
poolSize = 10): Future[DaemonAPI] {.async.} =
poolSize = 10,
gossipsubHeartbeatInterval = 0,
gossipsubHeartbeatDelay = 0): Future[DaemonAPI] {.async.} =
## Initialize connections to `go-libp2p-daemon` control socket.
result = new DaemonAPI
result.flags = flags
result.servers = newSeq[StreamServer]()
result.address = initTAddress(sockpath)
result.pattern = pattern
result.ucounter = 1
result.handlers = initTable[string, P2PStreamCallback]()
var api = new DaemonAPI
api.flags = flags
api.servers = newSeq[StreamServer]()
api.pattern = pattern
api.ucounter = 1
api.handlers = initTable[string, P2PStreamCallback]()
api.sockname = sockpath
if api.sockname == DefaultSocketPath:
# If client not specify `sockpath` but tries to spawn many daemons, we will
# replace sockname.
if daemonsCount != 0:
api.sockname = DefaultSocketPattern % [$daemonsCount]
api.address = initTAddress(api.sockname)
inc(daemonsCount)
# We will start daemon process only when control socket path is not default or
# options are specified.
if flags == {} and sockpath == DefaultSocketPath:
result.pool = await newPool(initTAddress(sockpath), poolsize = poolSize)
if flags == {} and api.sockname == DefaultSocketPath:
api.pool = await newPool(initTAddress(api.sockname), poolsize = poolSize)
else:
var args = newSeq[string]()
# DHTFull and DHTClient could not be present at the same time
if P2PDaemonFlags.DHTFull in flags and P2PDaemonFlags.DHTClient in flags:
result.flags.excl(DHTClient)
if P2PDaemonFlags.DHTFull in result.flags:
api.flags.excl(DHTClient)
# PSGossipSub and PSFloodSub could not be present at the same time
if P2PDaemonFlags.PSGossipSub in flags and
P2PDaemonFlags.PSFloodSub in flags:
api.flags.excl(PSFloodSub)
if P2PDaemonFlags.DHTFull in api.flags:
args.add("-dht")
if P2PDaemonFlags.DHTClient in result.flags:
if P2PDaemonFlags.DHTClient in api.flags:
args.add("-dhtClient")
if P2PDaemonFlags.Bootstrap in result.flags:
if P2PDaemonFlags.Bootstrap in api.flags:
args.add("-b")
if P2PDaemonFlags.PSGossipSub in api.flags:
args.add("-pubsub")
args.add("-pubsubRouter=gossipsub")
if P2PDaemonFlags.PSFloodSub in api.flags:
args.add("-pubsub")
args.add("-pubsubRouter=floodsub")
if gossipsubHeartbeatInterval != 0:
args.add("-gossipsubHeartbeatInterval=" & $gossipsubHeartbeatInterval)
if gossipsubHeartbeatDelay != 0:
args.add("-gossipsubHeartbeatInitialDelay=" & $gossipsubHeartbeatDelay)
if api.flags * {P2PDaemonFlags.PSFloodSub, P2PDaemonFlags.PSFloodSub} != {}:
if P2PDaemonFlags.PSSign in api.flags:
args.add("-pubsubSign=true")
if P2PDaemonFlags.PSStrictSign in api.flags:
args.add("-pubsubSignStrict=true")
if len(bootstrapNodes) > 0:
args.add("-bootstrapPeers=" & bootstrapNodes.join(","))
if len(id) != 0:
args.add("-id=" & id)
if sockpath != DefaultSocketPath:
args.add("-sock=" & sockpath)
if api.sockname != DefaultSocketPath:
args.add("-sock=" & api.sockname)
# We are trying to get absolute daemon path.
let cmd = findExe(daemon)
if len(cmd) == 0:
@ -438,21 +545,22 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
# if its not able to create new socket control file.
# We can't use `existsFile()` because it do not support unix-domain socket
# endpoints.
if socketExists(sockpath):
discard tryRemoveFile(sockpath)
if socketExists(api.sockname):
discard tryRemoveFile(api.sockname)
# Starting daemon process
result.process = startProcess(cmd, "", args, options = {poStdErrToStdOut})
api.process = startProcess(cmd, "", args, options = {poStdErrToStdOut})
# Waiting until daemon will not be bound to control socket.
while true:
if not result.process.running():
echo result.process.errorStream.readAll()
if not api.process.running():
echo api.process.errorStream.readAll()
raise newException(DaemonLocalError,
"Daemon executable could not be started!")
if socketExists(sockpath):
if socketExists(api.sockname):
break
await sleepAsync(100)
result.sockname = sockpath
result.pool = await newPool(initTAddress(sockpath), poolsize = poolSize)
api.pool = await newPool(initTAddress(api.sockname), poolsize = poolSize)
result = api
proc close*(api: DaemonAPI, stream: P2PStream) {.async.} =
## Close ``stream``.
@ -498,6 +606,8 @@ proc transactMessage(transp: StreamTransport,
if res != length:
raise newException(DaemonLocalError, "Could not send message to daemon!")
var message = await transp.recvMessage()
if len(message) == 0:
raise newException(DaemonLocalError, "Incorrect or empty message received!")
result = initProtoBuffer(message)
proc getPeerInfo(pb: var ProtoBuffer): PeerInfo =
@ -699,6 +809,11 @@ proc enterDhtMessage(pb: var ProtoBuffer, rt: DHTResponseType) {.inline.} =
else:
raise newException(DaemonLocalError, "Wrong message type!")
proc enterPsMessage(pb: var ProtoBuffer) {.inline.} =
var res = pb.enterSubmessage()
if res != cast[int](ResponseType.PUBSUB):
raise newException(DaemonLocalError, "Wrong message type!")
proc getDhtMessageType(pb: var ProtoBuffer): DHTResponseType {.inline.} =
var dtype: uint
if pb.getVarintValue(1, dtype) == 0:
@ -798,6 +913,8 @@ proc dhtFindPeersConnectedToPeer*(api: DaemonAPI, peer: PeerID,
pb.enterDhtMessage(DHTResponseType.BEGIN)
while true:
var message = await transp.recvMessage()
if len(message) == 0:
break
var cpb = initProtoBuffer(message)
if cpb.getDhtMessageType() == DHTResponseType.END:
break
@ -821,6 +938,8 @@ proc dhtGetClosestPeers*(api: DaemonAPI, key: string,
pb.enterDhtMessage(DHTResponseType.BEGIN)
while true:
var message = await transp.recvMessage()
if len(message) == 0:
break
var cpb = initProtoBuffer(message)
if cpb.getDhtMessageType() == DHTResponseType.END:
break
@ -830,7 +949,7 @@ proc dhtGetClosestPeers*(api: DaemonAPI, key: string,
api.pool.release(transp)
proc dhtFindProviders*(api: DaemonAPI, cid: CID, count: uint32,
timeout = 0): Future[seq[PeerInfo]] {.async.} =
timeout = 0): Future[seq[PeerInfo]] {.async.} =
## Get ``count`` providers for content with id ``cid``.
##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@ -844,6 +963,8 @@ proc dhtFindProviders*(api: DaemonAPI, cid: CID, count: uint32,
pb.enterDhtMessage(DHTResponseType.BEGIN)
while true:
var message = await transp.recvMessage()
if len(message) == 0:
break
var cpb = initProtoBuffer(message)
if cpb.getDhtMessageType() == DHTResponseType.END:
break
@ -866,6 +987,8 @@ proc dhtSearchValue*(api: DaemonAPI, key: string,
pb.enterDhtMessage(DHTResponseType.BEGIN)
while true:
var message = await transp.recvMessage()
if len(message) == 0:
break
var cpb = initProtoBuffer(message)
if cpb.getDhtMessageType() == DHTResponseType.END:
break
@ -873,3 +996,104 @@ proc dhtSearchValue*(api: DaemonAPI, key: string,
result = list
finally:
api.pool.release(transp)
proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} =
## Get list of topics this node is subscribed to.
var transp = await api.pool.acquire()
try:
var pb = await transp.transactMessage(requestPSGetTopics())
withMessage(pb) do:
pb.enterPsMessage()
var topics = newSeq[string]()
var topic = ""
while pb.getString(1, topic) != -1:
topics.add(topic)
topic.setLen(0)
result = topics
finally:
api.pool.release(transp)
proc pubsubListPeers*(api: DaemonAPI,
topic: string): Future[seq[PeerID]] {.async.} =
## Get list of peers we are connected to and which also subscribed to topic
## ``topic``.
var transp = await api.pool.acquire()
try:
var pb = await transp.transactMessage(requestPSListPeers(topic))
withMessage(pb) do:
pb.enterPsMessage()
var peers = newSeq[PeerID]()
var peer = newSeq[byte]()
while pb.getBytes(2, peer) != -1:
peers.add(peer)
peer.setLen(0)
result = peers
finally:
api.pool.release(transp)
proc pubsubPublish*(api: DaemonAPI, topic: string,
value: seq[byte]) {.async.} =
## Get list of peer identifiers which are subscribed to topic ``topic``.
var transp = await api.pool.acquire()
try:
var pb = await transp.transactMessage(requestPSPublish(topic, value))
withMessage(pb) do:
discard
finally:
api.pool.release(transp)
proc getPubsubMessage*(pb: var ProtoBuffer): PubSubMessage =
var item = newSeq[byte]()
for field in 1..6:
while true:
if pb.getBytes(field, item) == -1:
break
if field == 1:
result.peer = item
elif field == 2:
result.data = item
elif field == 3:
result.seqno = item
elif field == 4:
var copyitem = item
var stritem = cast[string](copyitem)
if len(result.topics) == 0:
result.topics = newSeq[string]()
result.topics.add(stritem)
elif field == 5:
result.signature = item
elif field == 6:
result.key = item
item.setLen(0)
proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} =
while true:
var pbmessage = await ticket.transp.recvMessage()
if len(pbmessage) == 0:
break
var pb = initProtoBuffer(pbmessage)
var message = pb.getPubsubMessage()
## We can do here `await` too
let res = await ticket.handler(api, ticket, message)
if not res:
ticket.transp.close()
await ticket.transp.join()
break
proc pubsubSubscribe*(api: DaemonAPI, topic: string,
handler: P2PPubSubCallback): Future[PubsubTicket] {.async.} =
## Subscribe to topic ``topic``.
var transp = await connect(api.address)
try:
var pb = await transp.transactMessage(requestPSSubscribe(topic))
pb.withMessage() do:
var ticket = new PubsubTicket
ticket.topic = topic
ticket.handler = handler
ticket.transp = transp
asyncCheck pubsubLoop(api, ticket)
result = ticket
except:
transp.close()
await transp.join()
raise getCurrentException()

View File

@ -52,11 +52,11 @@ proc newPool*(address: TransportAddress, poolsize: int = DefaultPoolSize,
): Future[TransportPool] {.async.} =
## Establish pool of connections to address ``address`` with size
## ``poolsize``.
result = new TransportPool
result.bufferSize = bufferSize
result.transports = newSeq[PoolItem](poolsize)
var pool = new TransportPool
pool.bufferSize = bufferSize
pool.transports = newSeq[PoolItem](poolsize)
var conns = newSeq[Future[StreamTransport]](poolsize)
result.state = Connecting
pool.state = Connecting
for i in 0..<poolsize:
conns[i] = connect(address, bufferSize)
# Waiting for all connections to be established.
@ -68,10 +68,11 @@ proc newPool*(address: TransportAddress, poolsize: int = DefaultPoolSize,
else:
let transp = conns[i].read()
let item = PoolItem(transp: transp)
result.transports[i] = item
pool.transports[i] = item
# Setup available connections event
result.event = newAsyncEvent()
result.state = Connected
pool.event = newAsyncEvent()
pool.state = Connected
result = pool
proc acquire*(pool: TransportPool): Future[StreamTransport] {.async.} =
## Acquire non-busy connection from pool ``pool``.

View File

@ -43,6 +43,75 @@ proc provideBadCidTest(): Future[bool] {.async.} =
result = false
except DaemonRemoteError:
result = true
finally:
await api.close()
proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} =
var pubsubData = "TEST MESSAGE"
var msgData = cast[seq[byte]](pubsubData)
var api1 = await newDaemonApi(f)
var api2 = await newDaemonApi(f)
var id1 = await api1.identity()
var id2 = await api2.identity()
var resultsCount = 0
var topics10 = await api1.pubsubGetTopics()
var peers10 = await api1.pubsubListPeers("test-topic")
var topics20 = await api1.pubsubGetTopics()
var peers20 = await api1.pubsubListPeers("test-topic")
proc pubsubHandler1(api: DaemonAPI,
ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.async.} =
let smsg = cast[string](message.data)
if smsg == pubsubData:
inc(resultsCount)
# Callback must return `false` to close subscription channel.
proc pubsubHandler2(api: DaemonAPI,
ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.async.} =
let smsg = cast[string](message.data)
if smsg == pubsubData:
inc(resultsCount)
# Callback must return `false` to close subscription channel.
result = false
if len(topics10) == 0 and len(peers10) == 0 and
len(topics20) == 0 and len(peers20) == 0:
# Not subscribed to any topics everything must be 0.
var ticket1 = await api1.pubsubSubscribe("test-topic", pubsubHandler1)
var ticket2 = await api2.pubsubSubscribe("test-topic", pubsubHandler2)
var topics11 = await api1.pubsubGetTopics()
var peers11 = await api1.pubsubListPeers("test-topic")
var topics21 = await api2.pubsubGetTopics()
var peers21 = await api2.pubsubListPeers("test-topic")
if len(topics11) == 1 and len(peers11) == 0 and
len(topics21) == 1 and len(peers21) == 0:
# Subscribed but not yet connected to each other
await sleepAsync(5000)
await api1.connect(id2.peer, id2.addresses)
await sleepAsync(5000)
var topics12 = await api1.pubsubGetTopics()
var peers12 = await api1.pubsubListPeers("test-topic")
var topics22 = await api2.pubsubGetTopics()
var peers22 = await api2.pubsubListPeers("test-topic")
if len(topics12) == 1 and len(peers12) == 1 and
len(topics22) == 1 and len(peers22) == 1:
# Publish test data via api1.
await api1.pubsubPublish("test-topic", msgData)
await sleepAsync(5000)
await api1.close()
await api2.close()
if resultsCount == 2:
result = true
when isMainModule:
suite "libp2p-daemon test suite":
@ -55,3 +124,10 @@ when isMainModule:
test "DHT provide bad CID test":
check:
waitFor(provideBadCidTest()) == true
test "GossipSub test":
check:
waitFor(pubsubTest({PSGossipSub})) == true
test "FloodSub test":
check:
waitFor(pubsubTest({PSFloodSub})) == true