# Nim-LibP2P # Copyright (c) 2023 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) # at your option. # This file may not be copied, modified, or distributed except according to # those terms. {.push raises: [].} ## This module implementes API for `go-libp2p-daemon`. import std/[os, osproc, strutils, tables, strtabs, sequtils] import pkg/[chronos, chronicles] import ../varint, ../multiaddress, ../multicodec, ../cid, ../peerid import ../wire, ../multihash, ../protobuf/minprotobuf, ../errors import ../crypto/crypto, ../utility export peerid, multiaddress, multicodec, multihash, cid, crypto, wire, errors when not defined(windows): import posix const DefaultSocketPath* = "/unix/tmp/p2pd.sock" DefaultUnixSocketPattern* = "/unix/tmp/nim-p2pd-$1-$2.sock" DefaultIpSocketPattern* = "/ip4/127.0.0.1/tcp/$2" DefaultUnixChildPattern* = "/unix/tmp/nim-p2pd-handle-$1-$2.sock" DefaultIpChildPattern* = "/ip4/127.0.0.1/tcp/$2" DefaultDaemonFile* = "p2pd" type IpfsLogLevel* {.pure.} = enum Critical, Error, Warning, Notice, Info, Debug, Trace RequestType* {.pure.} = enum IDENTIFY = 0, CONNECT = 1, STREAM_OPEN = 2, STREAM_HANDLER = 3, DHT = 4, LIST_PEERS = 5, CONNMANAGER = 6, DISCONNECT = 7 PUBSUB = 8 DHTRequestType* {.pure.} = enum FIND_PEER = 0, FIND_PEERS_CONNECTED_TO_PEER = 1, FIND_PROVIDERS = 2, GET_CLOSEST_PEERS = 3, GET_PUBLIC_KEY = 4, GET_VALUE = 5, SEARCH_VALUE = 6, PUT_VALUE = 7, PROVIDE = 8 ConnManagerRequestType* {.pure.} = enum TAG_PEER = 0, UNTAG_PEER = 1, TRIM = 2 PSRequestType* {.pure.} = enum GET_TOPICS = 0, LIST_PEERS = 1, PUBLISH = 2, SUBSCRIBE = 3 ResponseKind* = enum Malformed, Error, Success ResponseType* {.pure.} = enum ERROR = 2, STREAMINFO = 3, IDENTITY = 4, DHT = 5, PEERINFO = 6 PUBSUB = 7 DHTResponseType* {.pure.} = enum BEGIN = 0, VALUE = 1, END = 2 MultiProtocol* = string DHTValue* = seq[byte] P2PStreamFlags* {.pure.} = enum None, Closed, Inbound, Outbound P2PDaemonFlags* = enum DHTClient, ## Start daemon in DHT client mode DHTFull, ## Start daemon with full DHT support Bootstrap, ## Start daemon with bootstrap WaitBootstrap, ## Start daemon with bootstrap and wait until daemon ## establish connection to at least 2 peers PSFloodSub, ## Enable `FloodSub` protocol in daemon PSGossipSub, ## Enable `GossipSub` protocol in daemon PSNoSign, ## Disable pubsub message signing (default true) PSStrictSign, ## Force strict checking pubsub message signature NATPortMap, ## Force daemon to use NAT-PMP. AutoNAT, ## Force daemon to use AutoNAT. AutoRelay, ## Enables autorelay mode. RelayActive, ## Enables active mode for relay. RelayDiscovery,## Enables passive discovery for relay. RelayHop, ## Enables hop for relay. NoInlinePeerId,## Disable inlining of peer ID (not yet in #master). NoProcessCtrl ## Process was not spawned. P2PStream* = ref object flags*: set[P2PStreamFlags] peer*: PeerId raddress*: MultiAddress protocol*: string transp*: StreamTransport P2PServer = object server*: StreamServer address*: MultiAddress DaemonAPI* = ref object # pool*: TransportPool flags*: set[P2PDaemonFlags] address*: MultiAddress pattern*: string ucounter*: int process*: Process handlers*: Table[string, P2PStreamCallback] servers*: seq[P2PServer] userData*: RootRef PeerInfo* = object peer*: PeerId addresses*: seq[MultiAddress] PubsubTicket* = ref object topic*: string handler*: P2PPubSubCallback transp*: StreamTransport PubSubMessage* = object peer*: PeerId data*: seq[byte] seqno*: seq[byte] topics*: seq[string] signature*: Signature key*: PublicKey P2PStreamCallback* = proc(api: DaemonAPI, stream: P2PStream): Future[void] {.gcsafe, raises: [CatchableError].} P2PPubSubCallback* = proc(api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage): Future[bool] {.gcsafe, raises: [CatchableError].} DaemonError* = object of LPError DaemonRemoteError* = object of DaemonError DaemonLocalError* = object of DaemonError var daemonsCount {.threadvar.}: int chronicles.formatIt(PeerInfo): shortLog(it) proc requestIdentity(): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go ## Processing function `doIdentify(req *pb.Request)`. result = initProtoBuffer({WithVarintLength}) result.write(1, safeConvert[uint](RequestType.IDENTIFY)) result.finish() proc requestConnect(peerid: PeerId, addresses: openArray[MultiAddress], timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go ## Processing function `doConnect(req *pb.Request)`. result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, peerid) for item in addresses: msg.write(2, item.data.buffer) if timeout > 0: msg.write(3, hint64(timeout)) result.write(1, safeConvert[uint](RequestType.CONNECT)) result.write(2, msg) result.finish() proc requestDisconnect(peerid: PeerId): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go ## Processing function `doDisconnect(req *pb.Request)`. result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, peerid) result.write(1, safeConvert[uint](RequestType.DISCONNECT)) result.write(7, msg) result.finish() proc requestStreamOpen(peerid: PeerId, protocols: openArray[string], timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go ## Processing function `doStreamOpen(req *pb.Request)`. result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, peerid) for item in protocols: msg.write(2, item) if timeout > 0: msg.write(3, hint64(timeout)) result.write(1, safeConvert[uint](RequestType.STREAM_OPEN)) result.write(3, msg) result.finish() proc requestStreamHandler(address: MultiAddress, protocols: openArray[MultiProtocol]): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go ## Processing function `doStreamHandler(req *pb.Request)`. result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, address.data.buffer) for item in protocols: msg.write(2, item) result.write(1, safeConvert[uint](RequestType.STREAM_HANDLER)) result.write(4, msg) result.finish() proc requestListPeers(): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/conn.go ## Processing function `doListPeers(req *pb.Request)` result = initProtoBuffer({WithVarintLength}) result.write(1, safeConvert[uint](RequestType.LIST_PEERS)) result.finish() proc requestDHTFindPeer(peer: PeerId, timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## Processing function `doDHTFindPeer(req *pb.DHTRequest)`. let msgid = safeConvert[uint](DHTRequestType.FIND_PEER) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(2, peer) if timeout > 0: msg.write(7, hint64(timeout)) msg.finish() result.write(1, safeConvert[uint](RequestType.DHT)) result.write(5, msg) result.finish() proc requestDHTFindPeersConnectedToPeer(peer: PeerId, timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## Processing function `doDHTFindPeersConnectedToPeer(req *pb.DHTRequest)`. let msgid = safeConvert[uint](DHTRequestType.FIND_PEERS_CONNECTED_TO_PEER) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(2, peer) if timeout > 0: msg.write(7, hint64(timeout)) msg.finish() result.write(1, safeConvert[uint](RequestType.DHT)) result.write(5, msg) result.finish() proc requestDHTFindProviders(cid: Cid, count: uint32, timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## Processing function `doDHTFindProviders(req *pb.DHTRequest)`. let msgid = safeConvert[uint](DHTRequestType.FIND_PROVIDERS) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(3, cid.data.buffer) msg.write(6, count) if timeout > 0: msg.write(7, hint64(timeout)) msg.finish() result.write(1, safeConvert[uint](RequestType.DHT)) result.write(5, msg) result.finish() proc requestDHTGetClosestPeers(key: string, timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## Processing function `doDHTGetClosestPeers(req *pb.DHTRequest)`. let msgid = safeConvert[uint](DHTRequestType.GET_CLOSEST_PEERS) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(4, key) if timeout > 0: msg.write(7, hint64(timeout)) msg.finish() result.write(1, safeConvert[uint](RequestType.DHT)) result.write(5, msg) result.finish() proc requestDHTGetPublicKey(peer: PeerId, timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## Processing function `doDHTGetPublicKey(req *pb.DHTRequest)`. let msgid = safeConvert[uint](DHTRequestType.GET_PUBLIC_KEY) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(2, peer) if timeout > 0: msg.write(7, hint64(timeout)) msg.finish() result.write(1, safeConvert[uint](RequestType.DHT)) result.write(5, msg) result.finish() proc requestDHTGetValue(key: string, timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## Processing function `doDHTGetValue(req *pb.DHTRequest)`. let msgid = safeConvert[uint](DHTRequestType.GET_VALUE) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(4, key) if timeout > 0: msg.write(7, hint64(timeout)) msg.finish() result.write(1, safeConvert[uint](RequestType.DHT)) result.write(5, msg) result.finish() proc requestDHTSearchValue(key: string, timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## Processing function `doDHTSearchValue(req *pb.DHTRequest)`. let msgid = safeConvert[uint](DHTRequestType.SEARCH_VALUE) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(4, key) if timeout > 0: msg.write(7, hint64(timeout)) msg.finish() result.write(1, safeConvert[uint](RequestType.DHT)) result.write(5, msg) result.finish() proc requestDHTPutValue(key: string, value: openArray[byte], timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## Processing function `doDHTPutValue(req *pb.DHTRequest)`. let msgid = safeConvert[uint](DHTRequestType.PUT_VALUE) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(4, key) msg.write(5, value) if timeout > 0: msg.write(7, hint64(timeout)) msg.finish() result.write(1, uint(RequestType.DHT)) result.write(5, msg) result.finish() proc requestDHTProvide(cid: Cid, timeout = 0): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/dht.go ## Processing function `doDHTProvide(req *pb.DHTRequest)`. let msgid = safeConvert[uint](DHTRequestType.PROVIDE) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(3, cid.data.buffer) if timeout > 0: msg.write(7, hint64(timeout)) msg.finish() result.write(1, safeConvert[uint](RequestType.DHT)) result.write(5, msg) result.finish() proc requestCMTagPeer(peer: PeerId, tag: string, weight: int): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/connmgr.go#L18 let msgid = safeConvert[uint](ConnManagerRequestType.TAG_PEER) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(2, peer) msg.write(3, tag) msg.write(4, hint64(weight)) msg.finish() result.write(1, safeConvert[uint](RequestType.CONNMANAGER)) result.write(6, msg) result.finish() proc requestCMUntagPeer(peer: PeerId, tag: string): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/connmgr.go#L33 let msgid = safeConvert[uint](ConnManagerRequestType.UNTAG_PEER) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(2, peer) msg.write(3, tag) msg.finish() result.write(1, safeConvert[uint](RequestType.CONNMANAGER)) result.write(6, msg) result.finish() proc requestCMTrim(): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/connmgr.go#L47 let msgid = safeConvert[uint](ConnManagerRequestType.TRIM) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.finish() result.write(1, safeConvert[uint](RequestType.CONNMANAGER)) result.write(6, msg) result.finish() proc requestPSGetTopics(): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go ## Processing function `doPubsubGetTopics(req *pb.PSRequest)`. let msgid = safeConvert[uint](PSRequestType.GET_TOPICS) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.finish() result.write(1, safeConvert[uint](RequestType.PUBSUB)) result.write(8, msg) result.finish() proc requestPSListPeers(topic: string): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go ## Processing function `doPubsubListPeers(req *pb.PSRequest)`. let msgid = safeConvert[uint](PSRequestType.LIST_PEERS) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(2, topic) msg.finish() result.write(1, safeConvert[uint](RequestType.PUBSUB)) result.write(8, msg) result.finish() proc requestPSPublish(topic: string, data: openArray[byte]): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go ## Processing function `doPubsubPublish(req *pb.PSRequest)`. let msgid = safeConvert[uint](PSRequestType.PUBLISH) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(2, topic) msg.write(3, data) msg.finish() result.write(1, safeConvert[uint](RequestType.PUBSUB)) result.write(8, msg) result.finish() proc requestPSSubscribe(topic: string): ProtoBuffer = ## https://github.com/libp2p/go-libp2p-daemon/blob/master/pubsub.go ## Processing function `doPubsubSubscribe(req *pb.PSRequest)`. let msgid = safeConvert[uint](PSRequestType.SUBSCRIBE) result = initProtoBuffer({WithVarintLength}) var msg = initProtoBuffer() msg.write(1, msgid) msg.write(2, topic) msg.finish() result.write(1, safeConvert[uint](RequestType.PUBSUB)) result.write(8, msg) result.finish() proc checkResponse(pb: ProtoBuffer): ResponseKind {.inline.} = result = ResponseKind.Malformed var value: uint64 if getRequiredField(pb, 1, value).isOk(): if value == 0: result = ResponseKind.Success else: result = ResponseKind.Error proc getErrorMessage(pb: ProtoBuffer): string {.inline, raises: [DaemonLocalError].} = var error: seq[byte] if pb.getRequiredField(ResponseType.ERROR.int, error).isOk(): if initProtoBuffer(error).getRequiredField(1, result).isErr(): raise newException(DaemonLocalError, "Error message is missing!") proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = var size: uint length: int res: VarintResult[void] var buffer = newSeq[byte](10) try: for i in 0.. 1'u shl 22: buffer.setLen(0) result = buffer return buffer.setLen(size) await conn.readExactly(addr buffer[0], int(size)) except TransportIncompleteError: buffer.setLen(0) result = buffer proc newConnection*(api: DaemonAPI): Future[StreamTransport] {.raises: [LPError].} = result = connect(api.address) proc closeConnection*(api: DaemonAPI, transp: StreamTransport): Future[void] = result = transp.closeWait() proc socketExists(address: MultiAddress): Future[bool] {.async.} = try: var transp = await connect(address) await transp.closeWait() result = true except CatchableError: result = false when defined(windows): proc getCurrentProcessId(): uint32 {.stdcall, dynlib: "kernel32", importc: "GetCurrentProcessId".} proc getProcessId(): int = result = cast[int](getCurrentProcessId()) else: proc getProcessId(): int = result = int(posix.getpid()) proc getSocket(pattern: string, count: ptr int): Future[MultiAddress] {.async.} = var sockname = "" var pid = $getProcessId() sockname = pattern % [pid, $(count[])] let tmpma = MultiAddress.init(sockname).tryGet() if UNIX.match(tmpma): while true: count[] = count[] + 1 sockname = pattern % [pid, $(count[])] var ma = MultiAddress.init(sockname).tryGet() let res = await socketExists(ma) if not res: result = ma break elif TCP.match(tmpma): sockname = pattern % [pid, "0"] var ma = MultiAddress.init(sockname).tryGet() var sock = createAsyncSocket(ma) if sock.bindAsyncSocket(ma): # Socket was successfully bound, then its free to use count[] = count[] + 1 var ta = sock.getLocalAddress() sockname = pattern % [pid, $ta.port] result = MultiAddress.init(sockname).tryGet() closeSocket(sock) # This is forward declaration needed for newDaemonApi() proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} proc copyEnv(): StringTableRef = ## This procedure copy all environment variables into StringTable. result = newStringTable(modeStyleInsensitive) for key, val in envPairs(): result[key] = val proc newDaemonApi*(flags: set[P2PDaemonFlags] = {}, bootstrapNodes: seq[string] = @[], id: string = "", hostAddresses: seq[MultiAddress] = @[], announcedAddresses: seq[MultiAddress] = @[], daemon = DefaultDaemonFile, sockpath = "", patternSock = "", patternHandler = "", poolSize = 10, gossipsubHeartbeatInterval = 0, gossipsubHeartbeatDelay = 0, peersRequired = 2, logFile = "", logLevel = IpfsLogLevel.Debug): Future[DaemonAPI] {.async.} = ## Initialize connection to `go-libp2p-daemon` control socket. ## ## ``flags`` - set of P2PDaemonFlags. ## ## ``bootstrapNodes`` - list of bootnode's addresses in MultiAddress format. ## (default: @[], which means usage of default nodes inside of ## `go-libp2p-daemon`). ## ## ``id`` - path to file with identification information (default: "" which ## means - generate new random identity). ## ## ``hostAddresses`` - list of multiaddrs the host should listen on. ## (default: @[], the daemon will pick a listening port at random). ## ## ``announcedAddresses`` - list of multiaddrs the host should announce to ## the network (default: @[], the daemon will announce its own listening ## address). ## ## ``daemon`` - name of ``go-libp2p-daemon`` executable (default: "p2pd"). ## ## ``sockpath`` - default control socket MultiAddress ## (default: "/unix/tmp/p2pd.sock"). ## ## ``patternSock`` - MultiAddress pattern string, used to start multiple ## daemons (default on Unix: "/unix/tmp/nim-p2pd-$1-$2.sock", on Windows: ## "/ip4/127.0.0.1/tcp/$2"). ## ## ``patternHandler`` - MultiAddress pattern string, used to establish ## incoming channels (default on Unix: "/unix/tmp/nim-p2pd-handle-$1-$2.sock", ## on Windows: "/ip4/127.0.0.1/tcp/$2"). ## ## ``poolSize`` - size of connections pool (default: 10). ## ## ``gossipsubHeartbeatInterval`` - GossipSub protocol heartbeat interval in ## milliseconds (default: 0, use default `go-libp2p-daemon` values). ## ## ``gossipsubHeartbeatDelay`` - GossipSub protocol heartbeat delay in ## millseconds (default: 0, use default `go-libp2p-daemon` values). ## ## ``peersRequired`` - Wait until `go-libp2p-daemon` will connect to at least ## ``peersRequired`` peers before return from `newDaemonApi()` procedure ## (default: 2). ## ## ``logFile`` - Enable ``go-libp2p-daemon`` logging and store it to file ## ``logFile`` (default: "", no logging) ## ## ``logLevel`` - Set ``go-libp2p-daemon`` logging verbosity level to ## ``logLevel`` (default: Debug) var api = new DaemonAPI var args = newSeq[string]() var env: StringTableRef when defined(windows): var patternForSocket = if len(patternSock) > 0: patternSock else: DefaultIpSocketPattern var patternForChild = if len(patternHandler) > 0: patternHandler else: DefaultIpChildPattern else: var patternForSocket = if len(patternSock) > 0: patternSock else: DefaultUnixSocketPattern var patternForChild = if len(patternHandler) > 0: patternHandler else: DefaultUnixChildPattern api.flags = flags api.servers = newSeq[P2PServer]() api.pattern = patternForChild api.ucounter = 1 if len(sockpath) == 0: api.flags.excl(NoProcessCtrl) api.address = await getSocket(patternForSocket, addr daemonsCount) else: api.address = MultiAddress.init(sockpath).tryGet() api.flags.incl(NoProcessCtrl) let res = await socketExists(api.address) if not res: raise newException(DaemonLocalError, "Could not connect to remote daemon") result = api return # DHTFull and DHTClient could not be present at the same time if DHTFull in flags and DHTClient in flags: api.flags.excl(DHTClient) # PSGossipSub and PSFloodSub could not be present at the same time if PSGossipSub in flags and PSFloodSub in flags: api.flags.excl(PSFloodSub) if DHTFull in api.flags: args.add("-dht") if DHTClient in api.flags: args.add("-dhtClient") if {Bootstrap, WaitBootstrap} * api.flags != {}: args.add("-b") if len(logFile) != 0: env = copyEnv() env["IPFS_LOGGING_FMT"] = "nocolor" env["GOLOG_FILE"] = logFile case logLevel of IpfsLogLevel.Critical: env["IPFS_LOGGING"] = "CRITICAL" of IpfsLogLevel.Error: env["IPFS_LOGGING"] = "ERROR" of IpfsLogLevel.Warning: env["IPFS_LOGGING"] = "WARNING" of IpfsLogLevel.Notice: env["IPFS_LOGGING"] = "NOTICE" of IpfsLogLevel.Info: env["IPFS_LOGGING"] = "INFO" of IpfsLogLevel.Debug: env["IPFS_LOGGING"] = "DEBUG" of IpfsLogLevel.Trace: env["IPFS_LOGGING"] = "DEBUG" env["GOLOG_TRACING_FILE"] = logFile if PSGossipSub in api.flags: args.add("-pubsub") args.add("-pubsubRouter=gossipsub") if gossipsubHeartbeatInterval != 0: let param = $gossipsubHeartbeatInterval & "ms" args.add("-gossipsubHeartbeatInterval=" & param) if gossipsubHeartbeatDelay != 0: let param = $gossipsubHeartbeatDelay & "ms" args.add("-gossipsubHeartbeatInitialDelay=" & param) if PSFloodSub in api.flags: args.add("-pubsub") args.add("-pubsubRouter=floodsub") if api.flags * {PSFloodSub, PSGossipSub} != {}: if PSNoSign in api.flags: args.add("-pubsubSign=false") if PSStrictSign in api.flags: args.add("-pubsubSignStrict=true") if NATPortMap in api.flags: args.add("-natPortMap=true") if AutoNAT in api.flags: args.add("-autonat=true") if AutoRelay in api.flags: args.add("-autoRelay=true") if RelayActive in api.flags: args.add("-relayActive=true") if RelayDiscovery in api.flags: args.add("-relayDiscovery=true") if RelayHop in api.flags: args.add("-relayHop=true") if NoInlinePeerId in api.flags: args.add("-noInlinePeerId=true") if len(bootstrapNodes) > 0: args.add("-bootstrapPeers=" & bootstrapNodes.join(",")) if len(id) != 0: args.add("-id=" & id) if len(hostAddresses) > 0: var opt = "-hostAddrs=" for i, address in hostAddresses: if i > 0: opt.add "," opt.add $address args.add(opt) if len(announcedAddresses) > 0: var opt = "-announceAddrs=" for i, address in announcedAddresses: if i > 0: opt.add "," opt.add $address args.add(opt) args.add("-noise=true") args.add("-quic=false") args.add("-listen=" & $api.address) # We are trying to get absolute daemon path. let cmd = findExe(daemon) trace "p2pd cmd", cmd, args if len(cmd) == 0: raise newException(DaemonLocalError, "Could not find daemon executable!") # Starting daemon process # echo "Starting ", cmd, " ", args.join(" ") api.process = exceptionToAssert: startProcess(cmd, "", args, env, {poParentStreams}) # Waiting until daemon will not be bound to control socket. while true: if not api.process.running(): raise newException(DaemonLocalError, "Daemon executable could not be started!") let res = await socketExists(api.address) if res: break await sleepAsync(500.milliseconds) if WaitBootstrap in api.flags: while true: var peers = await listPeers(api) if len(peers) >= peersRequired: break await sleepAsync(1.seconds) result = api proc close*(stream: P2PStream) {.async.} = ## Close ``stream``. if P2PStreamFlags.Closed notin stream.flags: await stream.transp.closeWait() stream.transp = nil stream.flags.incl(P2PStreamFlags.Closed) else: raise newException(DaemonLocalError, "Stream is already closed!") proc close*(api: DaemonAPI) {.async.} = ## Shutdown connections to `go-libp2p-daemon` control socket. # await api.pool.close() # Closing all pending servers. if len(api.servers) > 0: var pending = newSeq[Future[void]]() for server in api.servers: server.server.stop() server.server.close() pending.add(server.server.join()) await allFutures(pending) for server in api.servers: let address = initTAddress(server.address).tryGet() discard tryRemoveFile($address) api.servers.setLen(0) # Closing daemon's process. if NoProcessCtrl notin api.flags: when defined(windows): api.process.kill() else: api.process.terminate() discard api.process.waitForExit() # Attempt to delete unix socket endpoint. let address = initTAddress(api.address).tryGet() if address.family == AddressFamily.Unix: discard tryRemoveFile($address) template withMessage(m, body: untyped): untyped = let kind = m.checkResponse() if kind == ResponseKind.Error: raise newException(DaemonRemoteError, m.getErrorMessage()) elif kind == ResponseKind.Malformed: raise newException(DaemonLocalError, "Malformed message received!") else: body proc transactMessage(transp: StreamTransport, pb: ProtoBuffer): Future[ProtoBuffer] {.async.} = let length = pb.getLen() let res = await transp.write(pb.getPtr(), length) if res != length: raise newException(DaemonLocalError, "Could not send message to daemon!") var message = await transp.recvMessage() if len(message) == 0: raise newException(DaemonLocalError, "Incorrect or empty message received!") result = initProtoBuffer(message) proc getPeerInfo(pb: ProtoBuffer): PeerInfo {.raises: [DaemonLocalError].} = ## Get PeerInfo object from ``pb``. result.addresses = newSeq[MultiAddress]() if pb.getRequiredField(1, result.peer).isErr(): raise newException(DaemonLocalError, "Incorrect or empty message received!") discard pb.getRepeatedField(2, result.addresses) proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} = ## Get Node identity information var transp = await api.newConnection() try: var pb = await transactMessage(transp, requestIdentity()) pb.withMessage() do: var res: seq[byte] if pb.getRequiredField(ResponseType.IDENTITY.int, res).isOk(): var resPb = initProtoBuffer(res) result = getPeerInfo(resPb) finally: await api.closeConnection(transp) proc connect*(api: DaemonAPI, peer: PeerId, addresses: seq[MultiAddress], timeout = 0) {.async.} = ## Connect to remote peer with id ``peer`` and addresses ``addresses``. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestConnect(peer, addresses, timeout)) pb.withMessage() do: discard except CatchableError: await api.closeConnection(transp) proc disconnect*(api: DaemonAPI, peer: PeerId) {.async.} = ## Disconnect from remote peer with id ``peer``. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestDisconnect(peer)) pb.withMessage() do: discard finally: await api.closeConnection(transp) proc openStream*(api: DaemonAPI, peer: PeerId, protocols: seq[string], timeout = 0): Future[P2PStream] {.async.} = ## Open new stream to peer ``peer`` using one of the protocols in ## ``protocols``. Returns ``StreamTransport`` for the stream. var transp = await api.newConnection() var stream = new P2PStream try: var pb = await transp.transactMessage(requestStreamOpen(peer, protocols, timeout)) pb.withMessage() do: var res: seq[byte] if pb.getRequiredField(ResponseType.STREAMINFO.int, res).isOk(): let resPb = initProtoBuffer(res) # stream.peer = newSeq[byte]() var raddress = newSeq[byte]() stream.protocol = "" resPb.getRequiredField(1, stream.peer).tryGet() resPb.getRequiredField(2, raddress).tryGet() stream.raddress = MultiAddress.init(raddress).tryGet() resPb.getRequiredField(3, stream.protocol).tryGet() stream.flags.incl(Outbound) stream.transp = transp result = stream except CatchableError as exc: await api.closeConnection(transp) raise exc proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} = var api = getUserData[DaemonAPI](server) var message = await transp.recvMessage() var pb = initProtoBuffer(message) var stream = new P2PStream var raddress = newSeq[byte]() stream.protocol = "" pb.getRequiredField(1, stream.peer).tryGet() pb.getRequiredField(2, raddress).tryGet() stream.raddress = MultiAddress.init(raddress).tryGet() pb.getRequiredField(3, stream.protocol).tryGet() stream.flags.incl(Inbound) stream.transp = transp if len(stream.protocol) > 0: var handler = api.handlers.getOrDefault(stream.protocol) if not isNil(handler): asyncSpawn handler(api, stream) proc addHandler*(api: DaemonAPI, protocols: seq[string], handler: P2PStreamCallback) {.async, raises: [LPError].} = ## Add stream handler ``handler`` for set of protocols ``protocols``. var transp = await api.newConnection() let maddress = await getSocket(api.pattern, addr api.ucounter) var server = createStreamServer(maddress, streamHandler, udata = api) try: for item in protocols: api.handlers[item] = handler server.start() var pb = await transp.transactMessage(requestStreamHandler(maddress, protocols)) pb.withMessage() do: api.servers.add(P2PServer(server: server, address: maddress)) except CatchableError as exc: for item in protocols: api.handlers.del(item) server.stop() server.close() await server.join() raise exc finally: await api.closeConnection(transp) proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} = ## Get list of remote peers to which we are currently connected. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestListPeers()) pb.withMessage() do: result = newSeq[PeerInfo]() var ress: seq[seq[byte]] if pb.getRequiredRepeatedField(ResponseType.PEERINFO.int, ress).isOk(): for p in ress: let peer = initProtoBuffer(p).getPeerInfo() result.add(peer) finally: await api.closeConnection(transp) proc cmTagPeer*(api: DaemonAPI, peer: PeerId, tag: string, weight: int) {.async.} = ## Tag peer with id ``peer`` using ``tag`` and ``weight``. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestCMTagPeer(peer, tag, weight)) withMessage(pb) do: discard finally: await api.closeConnection(transp) proc cmUntagPeer*(api: DaemonAPI, peer: PeerId, tag: string) {.async.} = ## Remove tag ``tag`` from peer with id ``peer``. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestCMUntagPeer(peer, tag)) withMessage(pb) do: discard finally: await api.closeConnection(transp) proc cmTrimPeers*(api: DaemonAPI) {.async.} = ## Trim all connections. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestCMTrim()) withMessage(pb) do: discard finally: await api.closeConnection(transp) proc dhtGetSinglePeerInfo(pb: ProtoBuffer): PeerInfo {.raises: [DaemonLocalError].} = var res: seq[byte] if pb.getRequiredField(2, res).isOk(): result = initProtoBuffer(res).getPeerInfo() else: raise newException(DaemonLocalError, "Missing required field `peer`!") proc dhtGetSingleValue(pb: ProtoBuffer): seq[byte] {.raises: [DaemonLocalError].} = result = newSeq[byte]() if pb.getRequiredField(3, result).isErr(): raise newException(DaemonLocalError, "Missing field `value`!") proc dhtGetSinglePublicKey(pb: ProtoBuffer): PublicKey {.raises: [DaemonLocalError].} = if pb.getRequiredField(3, result).isErr(): raise newException(DaemonLocalError, "Missing field `value`!") proc dhtGetSinglePeerId(pb: ProtoBuffer): PeerId {.raises: [DaemonLocalError].} = if pb.getRequiredField(3, result).isErr(): raise newException(DaemonLocalError, "Missing field `value`!") proc enterDhtMessage(pb: ProtoBuffer, rt: DHTResponseType): ProtoBuffer {.inline, raises: [DaemonLocalError].} = var dhtResponse: seq[byte] if pb.getRequiredField(ResponseType.DHT.int, dhtResponse).isOk(): var pbDhtResponse = initProtoBuffer(dhtResponse) var dtype: uint if pbDhtResponse.getRequiredField(1, dtype).isErr(): raise newException(DaemonLocalError, "Missing required DHT field `type`!") if dtype != safeConvert[uint](rt): raise newException(DaemonLocalError, "Wrong DHT answer type! ") var value: seq[byte] if pbDhtResponse.getRequiredField(3, value).isErr(): raise newException(DaemonLocalError, "Missing required DHT field `value`!") return initProtoBuffer(value) else: raise newException(DaemonLocalError, "Wrong message type!") proc enterPsMessage(pb: ProtoBuffer): ProtoBuffer {.inline, raises: [DaemonLocalError].} = var res: seq[byte] if pb.getRequiredField(ResponseType.PUBSUB.int, res).isErr(): raise newException(DaemonLocalError, "Wrong message type!") initProtoBuffer(res) proc getDhtMessageType(pb: ProtoBuffer): DHTResponseType {.inline, raises: [DaemonLocalError].} = var dtype: uint if pb.getRequiredField(1, dtype).isErr(): raise newException(DaemonLocalError, "Missing required DHT field `type`!") if dtype == safeConvert[uint](DHTResponseType.VALUE): result = DHTResponseType.VALUE elif dtype == safeConvert[uint](DHTResponseType.END): result = DHTResponseType.END else: raise newException(DaemonLocalError, "Wrong DHT answer type!") proc dhtFindPeer*(api: DaemonAPI, peer: PeerId, timeout = 0): Future[PeerInfo] {.async.} = ## Find peer with id ``peer`` and return peer information ``PeerInfo``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestDHTFindPeer(peer, timeout)) withMessage(pb) do: result = pb.enterDhtMessage(DHTResponseType.VALUE).dhtGetSinglePeerInfo() finally: await api.closeConnection(transp) proc dhtGetPublicKey*(api: DaemonAPI, peer: PeerId, timeout = 0): Future[PublicKey] {.async.} = ## Get peer's public key from peer with id ``peer``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestDHTGetPublicKey(peer, timeout)) withMessage(pb) do: result = pb.enterDhtMessage(DHTResponseType.VALUE).dhtGetSinglePublicKey() finally: await api.closeConnection(transp) proc dhtGetValue*(api: DaemonAPI, key: string, timeout = 0): Future[seq[byte]] {.async.} = ## Get value associated with ``key``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestDHTGetValue(key, timeout)) withMessage(pb) do: result = pb.enterDhtMessage(DHTResponseType.VALUE).dhtGetSingleValue() finally: await api.closeConnection(transp) proc dhtPutValue*(api: DaemonAPI, key: string, value: seq[byte], timeout = 0) {.async.} = ## Associate ``value`` with ``key``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestDHTPutValue(key, value, timeout)) withMessage(pb) do: discard finally: await api.closeConnection(transp) proc dhtProvide*(api: DaemonAPI, cid: Cid, timeout = 0) {.async.} = ## Provide content with id ``cid``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestDHTProvide(cid, timeout)) withMessage(pb) do: discard finally: await api.closeConnection(transp) proc dhtFindPeersConnectedToPeer*(api: DaemonAPI, peer: PeerId, timeout = 0): Future[seq[PeerInfo]] {.async.} = ## Find peers which are connected to peer with id ``peer``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. var transp = await api.newConnection() var list = newSeq[PeerInfo]() try: let spb = requestDHTFindPeersConnectedToPeer(peer, timeout) var pb = await transp.transactMessage(spb) withMessage(pb) do: discard pb.enterDhtMessage(DHTResponseType.BEGIN) while true: var message = await transp.recvMessage() if len(message) == 0: break var cpb = initProtoBuffer(message) if cpb.getDhtMessageType() == DHTResponseType.END: break list.add(cpb.dhtGetSinglePeerInfo()) result = list finally: await api.closeConnection(transp) proc dhtGetClosestPeers*(api: DaemonAPI, key: string, timeout = 0): Future[seq[PeerId]] {.async.} = ## Get closest peers for ``key``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. var transp = await api.newConnection() var list = newSeq[PeerId]() try: let spb = requestDHTGetClosestPeers(key, timeout) var pb = await transp.transactMessage(spb) withMessage(pb) do: discard pb.enterDhtMessage(DHTResponseType.BEGIN) while true: var message = await transp.recvMessage() if len(message) == 0: break var cpb = initProtoBuffer(message) if cpb.getDhtMessageType() == DHTResponseType.END: break list.add(cpb.dhtGetSinglePeerId()) result = list finally: await api.closeConnection(transp) proc dhtFindProviders*(api: DaemonAPI, cid: Cid, count: uint32, timeout = 0): Future[seq[PeerInfo]] {.async.} = ## Get ``count`` providers for content with id ``cid``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. var transp = await api.newConnection() var list = newSeq[PeerInfo]() try: let spb = requestDHTFindProviders(cid, count, timeout) var pb = await transp.transactMessage(spb) withMessage(pb) do: discard pb.enterDhtMessage(DHTResponseType.BEGIN) while true: var message = await transp.recvMessage() if len(message) == 0: break var cpb = initProtoBuffer(message) if cpb.getDhtMessageType() == DHTResponseType.END: break list.add(cpb.dhtGetSinglePeerInfo()) result = list finally: await api.closeConnection(transp) proc dhtSearchValue*(api: DaemonAPI, key: string, timeout = 0): Future[seq[seq[byte]]] {.async.} = ## Search for value with ``key``, return list of values found. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## means no timeout. var transp = await api.newConnection() var list = newSeq[seq[byte]]() try: var pb = await transp.transactMessage(requestDHTSearchValue(key, timeout)) withMessage(pb) do: discard pb.enterDhtMessage(DHTResponseType.BEGIN) while true: var message = await transp.recvMessage() if len(message) == 0: break var cpb = initProtoBuffer(message) if cpb.getDhtMessageType() == DHTResponseType.END: break list.add(cpb.dhtGetSingleValue()) result = list finally: await api.closeConnection(transp) proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} = ## Get list of topics this node is subscribed to. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestPSGetTopics()) withMessage(pb) do: let innerPb = pb.enterPsMessage() var topics = newSeq[string]() discard innerPb.getRepeatedField(1, topics) result = topics finally: await api.closeConnection(transp) proc pubsubListPeers*(api: DaemonAPI, topic: string): Future[seq[PeerId]] {.async.} = ## Get list of peers we are connected to and which also subscribed to topic ## ``topic``. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestPSListPeers(topic)) withMessage(pb) do: var peer: PeerId let innerPb = pb.enterPsMessage() var peers = newSeq[seq[byte]]() discard innerPb.getRepeatedField(2, peers) result = peers.mapIt(PeerId.init(it).get()) finally: await api.closeConnection(transp) proc pubsubPublish*(api: DaemonAPI, topic: string, value: seq[byte]) {.async.} = ## Get list of peer identifiers which are subscribed to topic ``topic``. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestPSPublish(topic, value)) withMessage(pb) do: discard finally: await api.closeConnection(transp) proc getPubsubMessage*(pb: ProtoBuffer): PubSubMessage = result.data = newSeq[byte]() result.seqno = newSeq[byte]() discard pb.getField(1, result.peer) discard pb.getField(2, result.data) discard pb.getField(3, result.seqno) discard pb.getRepeatedField(4, result.topics) discard pb.getField(5, result.signature) discard pb.getField(6, result.key) proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} = while true: var pbmessage = await ticket.transp.recvMessage() if len(pbmessage) == 0: break var pb = initProtoBuffer(pbmessage) var message = pb.getPubsubMessage() ## We can do here `await` too let res = await ticket.handler(api, ticket, message) if not res: ticket.transp.close() await ticket.transp.join() break proc pubsubSubscribe*(api: DaemonAPI, topic: string, handler: P2PPubSubCallback): Future[PubsubTicket] {.async.} = ## Subscribe to topic ``topic``. var transp = await api.newConnection() try: var pb = await transp.transactMessage(requestPSSubscribe(topic)) pb.withMessage() do: var ticket = new PubsubTicket ticket.topic = topic ticket.handler = handler ticket.transp = transp asyncSpawn pubsubLoop(api, ticket) result = ticket except CatchableError as exc: await api.closeConnection(transp) raise exc proc shortLog*(pinfo: PeerInfo): string = ## Get string representation of ``PeerInfo`` object. result = newStringOfCap(128) result.add("{PeerId: '") result.add($pinfo.peer.shortLog()) result.add("' Addresses: [") let length = len(pinfo.addresses) for i in 0.. 0: result = result