From 7e5f52afffe1aea827b0d0da8c3347563bf05a99 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Sun, 9 Dec 2018 18:44:20 +0200 Subject: [PATCH] Fixes for pubsub methods and full tests for both methods. --- libp2p/daemon/daemonapi.nim | 308 +++++++++++++++++++++++++++++++----- libp2p/daemon/transpool.nim | 15 +- tests/testdaemon.nim | 76 +++++++++ 3 files changed, 350 insertions(+), 49 deletions(-) diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index a75e3702d..c19f4e407 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -17,6 +17,7 @@ when not defined(windows): const DefaultSocketPath* = "/tmp/p2pd.sock" + DefaultSocketPattern* = "/tmp/p2pd-$1.sock" DefaultDaemonFile* = "p2pd" type @@ -47,6 +48,12 @@ type UNTAG_PEER = 1, TRIM = 2 + PSRequestType* {.pure.} = enum + GET_TOPICS = 0, + LIST_PEERS = 1, + PUBLISH = 2, + SUBSCRIBE = 3 + ResponseKind* = enum Malformed, Error, @@ -65,15 +72,8 @@ type VALUE = 1, END = 2 - PSResponseType* {.pure.} = enum - GET_TOPIC = 0, - LIST_PEERS = 1, - PUBLISH = 2, - SUBSCRIBE = 3 - PeerID* = seq[byte] MultiProtocol* = string - # MultiAddress* = seq[byte] CID* = seq[byte] LibP2PPublicKey* = seq[byte] DHTValue* = seq[byte] @@ -82,7 +82,8 @@ type None, Closed, Inbound, Outbound P2PDaemonFlags* {.pure.} = enum - DHTClient, DHTFull, Bootstrap + DHTClient, DHTFull, Bootstrap, + PSFloodSub, PSGossipSub, PSSign, PSStrictSign P2PStream* = ref object flags*: set[P2PStreamFlags] @@ -106,12 +107,31 @@ type peer*: PeerID addresses*: seq[MultiAddress] + PubsubTicket* = ref object + topic*: string + handler*: P2PPubSubCallback + transp*: StreamTransport + + PubSubMessage* = object + peer*: PeerID + data*: seq[byte] + seqno*: seq[byte] + topics*: seq[string] + signature*: seq[byte] + key*: seq[byte] + P2PStreamCallback* = proc(api: DaemonAPI, stream: P2PStream): Future[void] {.gcsafe.} + P2PPubSubCallback* = proc(api: DaemonAPI, + ticket: PubsubTicket, + message: PubSubMessage): Future[bool] {.gcsafe.} DaemonRemoteError* = object of Exception DaemonLocalError* = object of Exception + +var daemonsCount {.threadvar.}: int + proc requestIdentity(): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go ## Processing function `doIdentify(req *pb.Request)`. @@ -359,6 +379,58 @@ proc requestCMTrim(): ProtoBuffer = result.write(initProtoField(6, msg)) result.finish() +proc requestPSGetTopics(): ProtoBuffer = + ## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go + ## Processing function `doPubsubGetTopics(req *pb.PSRequest)`. + let msgid = cast[uint](PSRequestType.GET_TOPICS) + result = initProtoBuffer({WithVarintLength}) + var msg = initProtoBuffer() + msg.write(initProtoField(1, msgid)) + msg.finish() + result.write(initProtoField(1, cast[uint](RequestType.PUBSUB))) + result.write(initProtoField(8, msg)) + result.finish() + +proc requestPSListPeers(topic: string): ProtoBuffer = + ## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go + ## Processing function `doPubsubListPeers(req *pb.PSRequest)`. + let msgid = cast[uint](PSRequestType.LIST_PEERS) + result = initProtoBuffer({WithVarintLength}) + var msg = initProtoBuffer() + msg.write(initProtoField(1, msgid)) + msg.write(initProtoField(2, topic)) + msg.finish() + result.write(initProtoField(1, cast[uint](RequestType.PUBSUB))) + result.write(initProtoField(8, msg)) + result.finish() + +proc requestPSPublish(topic: string, data: openarray[byte]): ProtoBuffer = + ## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go + ## Processing function `doPubsubPublish(req *pb.PSRequest)`. + let msgid = cast[uint](PSRequestType.PUBLISH) + result = initProtoBuffer({WithVarintLength}) + var msg = initProtoBuffer() + msg.write(initProtoField(1, msgid)) + msg.write(initProtoField(2, topic)) + msg.write(initProtoField(3, data)) + msg.finish() + result.write(initProtoField(1, cast[uint](RequestType.PUBSUB))) + result.write(initProtoField(8, msg)) + result.finish() + +proc requestPSSubscribe(topic: string): ProtoBuffer = + ## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go + ## Processing function `doPubsubSubscribe(req *pb.PSRequest)`. + let msgid = cast[uint](PSRequestType.SUBSCRIBE) + result = initProtoBuffer({WithVarintLength}) + var msg = initProtoBuffer() + msg.write(initProtoField(1, msgid)) + msg.write(initProtoField(2, topic)) + msg.finish() + result.write(initProtoField(1, cast[uint](RequestType.PUBSUB))) + result.write(initProtoField(8, msg)) + result.finish() + proc checkResponse(pb: var ProtoBuffer): ResponseKind {.inline.} = result = ResponseKind.Malformed var value: uint64 @@ -379,15 +451,19 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = length: int res: VarintStatus var buffer = newSeq[byte](10) - for i in 0.. MaxMessageSize: - raise newException(ValueError, "Invalid message size") - buffer.setLen(size) - await conn.readExactly(addr buffer[0], int(size)) + try: + for i in 0.. MaxMessageSize: + buffer.setLen(0) + buffer.setLen(size) + await conn.readExactly(addr buffer[0], int(size)) + except TransportIncompleteError: + buffer.setLen(0) + result = buffer proc socketExists(filename: string): bool = @@ -400,36 +476,67 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, daemon = DefaultDaemonFile, sockpath = DefaultSocketPath, pattern = "/tmp/nim-p2pd-$1.sock", - poolSize = 10): Future[DaemonAPI] {.async.} = + poolSize = 10, + gossipsubHeartbeatInterval = 0, + gossipsubHeartbeatDelay = 0): Future[DaemonAPI] {.async.} = ## Initialize connections to `go-libp2p-daemon` control socket. - result = new DaemonAPI - result.flags = flags - result.servers = newSeq[StreamServer]() - result.address = initTAddress(sockpath) - result.pattern = pattern - result.ucounter = 1 - result.handlers = initTable[string, P2PStreamCallback]() + var api = new DaemonAPI + api.flags = flags + api.servers = newSeq[StreamServer]() + api.pattern = pattern + api.ucounter = 1 + api.handlers = initTable[string, P2PStreamCallback]() + api.sockname = sockpath + + if api.sockname == DefaultSocketPath: + # If client not specify `sockpath` but tries to spawn many daemons, we will + # replace sockname. + if daemonsCount != 0: + api.sockname = DefaultSocketPattern % [$daemonsCount] + + api.address = initTAddress(api.sockname) + inc(daemonsCount) + # We will start daemon process only when control socket path is not default or # options are specified. - if flags == {} and sockpath == DefaultSocketPath: - result.pool = await newPool(initTAddress(sockpath), poolsize = poolSize) + if flags == {} and api.sockname == DefaultSocketPath: + api.pool = await newPool(initTAddress(api.sockname), poolsize = poolSize) else: var args = newSeq[string]() # DHTFull and DHTClient could not be present at the same time if P2PDaemonFlags.DHTFull in flags and P2PDaemonFlags.DHTClient in flags: - result.flags.excl(DHTClient) - if P2PDaemonFlags.DHTFull in result.flags: + api.flags.excl(DHTClient) + # PSGossipSub and PSFloodSub could not be present at the same time + if P2PDaemonFlags.PSGossipSub in flags and + P2PDaemonFlags.PSFloodSub in flags: + api.flags.excl(PSFloodSub) + if P2PDaemonFlags.DHTFull in api.flags: args.add("-dht") - if P2PDaemonFlags.DHTClient in result.flags: + if P2PDaemonFlags.DHTClient in api.flags: args.add("-dhtClient") - if P2PDaemonFlags.Bootstrap in result.flags: + if P2PDaemonFlags.Bootstrap in api.flags: args.add("-b") + if P2PDaemonFlags.PSGossipSub in api.flags: + args.add("-pubsub") + args.add("-pubsubRouter=gossipsub") + if P2PDaemonFlags.PSFloodSub in api.flags: + args.add("-pubsub") + args.add("-pubsubRouter=floodsub") + if gossipsubHeartbeatInterval != 0: + args.add("-gossipsubHeartbeatInterval=" & $gossipsubHeartbeatInterval) + if gossipsubHeartbeatDelay != 0: + args.add("-gossipsubHeartbeatInitialDelay=" & $gossipsubHeartbeatDelay) + if api.flags * {P2PDaemonFlags.PSFloodSub, P2PDaemonFlags.PSFloodSub} != {}: + if P2PDaemonFlags.PSSign in api.flags: + args.add("-pubsubSign=true") + if P2PDaemonFlags.PSStrictSign in api.flags: + args.add("-pubsubSignStrict=true") if len(bootstrapNodes) > 0: args.add("-bootstrapPeers=" & bootstrapNodes.join(",")) if len(id) != 0: args.add("-id=" & id) - if sockpath != DefaultSocketPath: - args.add("-sock=" & sockpath) + if api.sockname != DefaultSocketPath: + args.add("-sock=" & api.sockname) # We are trying to get absolute daemon path. let cmd = findExe(daemon) if len(cmd) == 0: @@ -438,21 +545,22 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, # if its not able to create new socket control file. # We can't use `existsFile()` because it do not support unix-domain socket # endpoints. - if socketExists(sockpath): - discard tryRemoveFile(sockpath) + if socketExists(api.sockname): + discard tryRemoveFile(api.sockname) # Starting daemon process - result.process = startProcess(cmd, "", args, options = {poStdErrToStdOut}) + api.process = startProcess(cmd, "", args, options = {poStdErrToStdOut}) # Waiting until daemon will not be bound to control socket. while true: - if not result.process.running(): - echo result.process.errorStream.readAll() + if not api.process.running(): + echo api.process.errorStream.readAll() raise newException(DaemonLocalError, "Daemon executable could not be started!") - if socketExists(sockpath): + if socketExists(api.sockname): break await sleepAsync(100) - result.sockname = sockpath - result.pool = await newPool(initTAddress(sockpath), poolsize = poolSize) + api.pool = await newPool(initTAddress(api.sockname), poolsize = poolSize) + + result = api proc close*(api: DaemonAPI, stream: P2PStream) {.async.} = ## Close ``stream``. @@ -498,6 +606,8 @@ proc transactMessage(transp: StreamTransport, if res != length: raise newException(DaemonLocalError, "Could not send message to daemon!") var message = await transp.recvMessage() + if len(message) == 0: + raise newException(DaemonLocalError, "Incorrect or empty message received!") result = initProtoBuffer(message) proc getPeerInfo(pb: var ProtoBuffer): PeerInfo = @@ -699,6 +809,11 @@ proc enterDhtMessage(pb: var ProtoBuffer, rt: DHTResponseType) {.inline.} = else: raise newException(DaemonLocalError, "Wrong message type!") +proc enterPsMessage(pb: var ProtoBuffer) {.inline.} = + var res = pb.enterSubmessage() + if res != cast[int](ResponseType.PUBSUB): + raise newException(DaemonLocalError, "Wrong message type!") + proc getDhtMessageType(pb: var ProtoBuffer): DHTResponseType {.inline.} = var dtype: uint if pb.getVarintValue(1, dtype) == 0: @@ -798,6 +913,8 @@ proc dhtFindPeersConnectedToPeer*(api: DaemonAPI, peer: PeerID, pb.enterDhtMessage(DHTResponseType.BEGIN) while true: var message = await transp.recvMessage() + if len(message) == 0: + break var cpb = initProtoBuffer(message) if cpb.getDhtMessageType() == DHTResponseType.END: break @@ -821,6 +938,8 @@ proc dhtGetClosestPeers*(api: DaemonAPI, key: string, pb.enterDhtMessage(DHTResponseType.BEGIN) while true: var message = await transp.recvMessage() + if len(message) == 0: + break var cpb = initProtoBuffer(message) if cpb.getDhtMessageType() == DHTResponseType.END: break @@ -830,7 +949,7 @@ proc dhtGetClosestPeers*(api: DaemonAPI, key: string, api.pool.release(transp) proc dhtFindProviders*(api: DaemonAPI, cid: CID, count: uint32, - timeout = 0): Future[seq[PeerInfo]] {.async.} = + timeout = 0): Future[seq[PeerInfo]] {.async.} = ## Get ``count`` providers for content with id ``cid``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value @@ -844,6 +963,8 @@ proc dhtFindProviders*(api: DaemonAPI, cid: CID, count: uint32, pb.enterDhtMessage(DHTResponseType.BEGIN) while true: var message = await transp.recvMessage() + if len(message) == 0: + break var cpb = initProtoBuffer(message) if cpb.getDhtMessageType() == DHTResponseType.END: break @@ -866,6 +987,8 @@ proc dhtSearchValue*(api: DaemonAPI, key: string, pb.enterDhtMessage(DHTResponseType.BEGIN) while true: var message = await transp.recvMessage() + if len(message) == 0: + break var cpb = initProtoBuffer(message) if cpb.getDhtMessageType() == DHTResponseType.END: break @@ -873,3 +996,104 @@ proc dhtSearchValue*(api: DaemonAPI, key: string, result = list finally: api.pool.release(transp) + +proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} = + ## Get list of topics this node is subscribed to. + var transp = await api.pool.acquire() + try: + var pb = await transp.transactMessage(requestPSGetTopics()) + withMessage(pb) do: + pb.enterPsMessage() + var topics = newSeq[string]() + var topic = "" + while pb.getString(1, topic) != -1: + topics.add(topic) + topic.setLen(0) + result = topics + finally: + api.pool.release(transp) + +proc pubsubListPeers*(api: DaemonAPI, + topic: string): Future[seq[PeerID]] {.async.} = + ## Get list of peers we are connected to and which also subscribed to topic + ## ``topic``. + var transp = await api.pool.acquire() + try: + var pb = await transp.transactMessage(requestPSListPeers(topic)) + withMessage(pb) do: + pb.enterPsMessage() + var peers = newSeq[PeerID]() + var peer = newSeq[byte]() + while pb.getBytes(2, peer) != -1: + peers.add(peer) + peer.setLen(0) + result = peers + finally: + api.pool.release(transp) + +proc pubsubPublish*(api: DaemonAPI, topic: string, + value: seq[byte]) {.async.} = + ## Get list of peer identifiers which are subscribed to topic ``topic``. + var transp = await api.pool.acquire() + try: + var pb = await transp.transactMessage(requestPSPublish(topic, value)) + withMessage(pb) do: + discard + finally: + api.pool.release(transp) + +proc getPubsubMessage*(pb: var ProtoBuffer): PubSubMessage = + var item = newSeq[byte]() + for field in 1..6: + while true: + if pb.getBytes(field, item) == -1: + break + if field == 1: + result.peer = item + elif field == 2: + result.data = item + elif field == 3: + result.seqno = item + elif field == 4: + var copyitem = item + var stritem = cast[string](copyitem) + if len(result.topics) == 0: + result.topics = newSeq[string]() + result.topics.add(stritem) + elif field == 5: + result.signature = item + elif field == 6: + result.key = item + item.setLen(0) + +proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} = + while true: + var pbmessage = await ticket.transp.recvMessage() + if len(pbmessage) == 0: + break + var pb = initProtoBuffer(pbmessage) + var message = pb.getPubsubMessage() + ## We can do here `await` too + let res = await ticket.handler(api, ticket, message) + if not res: + ticket.transp.close() + await ticket.transp.join() + break + +proc pubsubSubscribe*(api: DaemonAPI, topic: string, + handler: P2PPubSubCallback): Future[PubsubTicket] {.async.} = + ## Subscribe to topic ``topic``. + var transp = await connect(api.address) + try: + var pb = await transp.transactMessage(requestPSSubscribe(topic)) + pb.withMessage() do: + var ticket = new PubsubTicket + ticket.topic = topic + ticket.handler = handler + ticket.transp = transp + asyncCheck pubsubLoop(api, ticket) + result = ticket + except: + transp.close() + await transp.join() + raise getCurrentException() diff --git a/libp2p/daemon/transpool.nim b/libp2p/daemon/transpool.nim index cae8d2cb8..a12ea62aa 100644 --- a/libp2p/daemon/transpool.nim +++ b/libp2p/daemon/transpool.nim @@ -52,11 +52,11 @@ proc newPool*(address: TransportAddress, poolsize: int = DefaultPoolSize, ): Future[TransportPool] {.async.} = ## Establish pool of connections to address ``address`` with size ## ``poolsize``. - result = new TransportPool - result.bufferSize = bufferSize - result.transports = newSeq[PoolItem](poolsize) + var pool = new TransportPool + pool.bufferSize = bufferSize + pool.transports = newSeq[PoolItem](poolsize) var conns = newSeq[Future[StreamTransport]](poolsize) - result.state = Connecting + pool.state = Connecting for i in 0..