Make unix socket name generation more reliable (depending on process id).

Add examples of bootstrap node and gossipsub node.
This commit is contained in:
cheatfate 2018-12-18 16:31:58 +02:00
parent 79fbd74acd
commit e9785bb509
4 changed files with 243 additions and 60 deletions

43
examples/bootstrap.nim Normal file
View File

@ -0,0 +1,43 @@
import asyncdispatch2, nimcrypto, strutils
import ../libp2p/daemon/daemonapi, ../libp2p/[base58, multicodec, multiaddress]
import hexdump
const
PubSubTopic = "test-net"
proc dumpSubscribedPeers(api: DaemonAPI) {.async.} =
var peers = await api.pubsubListPeers(PubSubTopic)
echo "= List of connected and subscribed peers:"
for item in peers:
echo Base58.encode(item)
proc main() {.async.} =
echo "= Starting P2P bootnode"
var api = await newDaemonApi({DHTFull, PSGossipSub})
var id = await api.identity()
let tpeerid = Base58.encode(id.peer)
echo "= P2P bootnode ", tpeerid, " started."
let mcip4 = multiCodec("ip4")
let mcip6 = multiCodec("ip6")
echo "= You can use one of this addresses to bootstrap your nodes:"
for item in id.addresses:
if item.protoCode() == mcip4 or item.protoCode() == mcip6:
echo $item & "/ipfs/" & tpeerid
proc pubsubLogger(api: DaemonAPI,
ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.async.} =
let bpeer = Base58.encode(message.peer)
let msglen = len(message.data)
echo "= Recieved pubsub message wit length ", msglen,
" bytes from peer ", bpeer
echo dumpHex(message.data)
await api.dumpSubscribedPeers()
result = true
var ticket = await api.pubsubSubscribe(PubSubTopic, pubsubLogger)
when isMainModule:
waitFor(main())
while true:
poll()

92
examples/hexdump.nim Normal file
View File

@ -0,0 +1,92 @@
#
# Copyright (c) 2016 Eugene Kabanov <ka@hardcore.kiev.ua>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
from strutils import toHex, repeat
proc dumpHex*(pbytes: pointer, nbytes: int, items = 1, ascii = true): string =
## Return hexadecimal memory dump representation pointed by ``p``.
## ``nbytes`` - number of bytes to show
## ``items`` - number of bytes in group (supported ``items`` count is
## 1, 2, 4, 8)
## ``ascii`` - if ``true`` show ASCII representation of memory dump.
result = ""
let hexSize = items * 2
var i = 0
var slider = pbytes
var asciiText = ""
while i < nbytes:
if i %% 16 == 0:
result = result & toHex(cast[BiggestInt](slider),
sizeof(BiggestInt) * 2) & ": "
var k = 0
while k < items:
var ch = cast[ptr char](cast[uint](slider) + k.uint)[]
if ord(ch) > 31 and ord(ch) < 127: asciiText &= ch else: asciiText &= "."
inc(k)
case items:
of 1:
result = result & toHex(cast[BiggestInt](cast[ptr uint8](slider)[]),
hexSize)
of 2:
result = result & toHex(cast[BiggestInt](cast[ptr uint16](slider)[]),
hexSize)
of 4:
result = result & toHex(cast[BiggestInt](cast[ptr uint32](slider)[]),
hexSize)
of 8:
result = result & toHex(cast[BiggestInt](cast[ptr uint64](slider)[]),
hexSize)
else:
raise newException(ValueError, "Wrong items size!")
result = result & " "
slider = cast[pointer](cast[uint](slider) + items.uint)
i = i + items
if i %% 16 == 0:
result = result & " " & asciiText
asciiText.setLen(0)
result = result & "\n"
if i %% 16 != 0:
var spacesCount = ((16 - (i %% 16)) div items) * (hexSize + 1) + 1
result = result & repeat(' ', spacesCount)
result = result & asciiText
result = result & "\n"
proc dumpHex*[T](v: openarray[T], items: int = 0, ascii = true): string =
## Return hexadecimal memory dump representation of openarray[T] ``v``.
## ``items`` - number of bytes in group (supported ``items`` count is
## 0, 1, 2, 4, 8). If ``items`` is ``0`` group size will depend on
## ``sizeof(T)``.
## ``ascii`` - if ``true`` show ASCII representation of memory dump.
var i = 0
if items == 0:
when sizeof(T) == 2:
i = 2
elif sizeof(T) == 4:
i = 4
elif sizeof(T) == 8:
i = 8
else:
i = 1
else:
i = items
result = dumpHex(unsafeAddr v[0], sizeof(T) * len(v), i, ascii)

36
examples/node.nim Normal file
View File

@ -0,0 +1,36 @@
import asyncdispatch2, nimcrypto, strutils, os
import ../libp2p/daemon/daemonapi, ../libp2p/[base58, multiaddress]
proc main(bn: string) {.async.} =
echo "= Starting P2P node"
var bootnodes = bn.split(",")
var api = await newDaemonApi({DHTFull, PSGossipSub, WaitBootstrap},
bootstrapNodes = bootnodes,
peersRequired = 1)
var id = await api.identity()
echo "= P2P node ", Base58.encode(id.peer), " started:"
for item in id.addresses:
echo item
proc pubsubLogger(api: DaemonAPI,
ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.async.} =
let bpeer = Base58.encode(message.peer)
let msglen = len(message.data)
echo "= Recieved pubsub message wit length ", msglen,
" bytes from peer ", bpeer
result = true
var ticket = await api.pubsubSubscribe("test-net", pubsubLogger)
var data = "HELLO\r\n"
var msgData = cast[seq[byte]](data)
await api.pubsubPublish("test-net", msgData)
when isMainModule:
if paramCount() != 1:
echo "Please supply bootnodes!"
else:
waitFor(main(paramStr(1)))
while true:
poll()

View File

@ -517,13 +517,36 @@ when not defined(windows):
register(AsyncFD(pfd)) register(AsyncFD(pfd))
addReader(fd, readOutputLoop, nil) addReader(fd, readOutputLoop, nil)
result = retFuture result = retFuture
proc getProcessId(): int =
result = posix.getpid()
else: else:
proc socketExists(filename: string): bool = false proc socketExists(filename: string): bool =
# Not ready yet
false
proc loggingHandler(api: DaemonAPI): Future[void] = proc loggingHandler(api: DaemonAPI): Future[void] =
# Not ready yet. # Not ready yet.
discard discard
proc getProcessId(): int =
# Not ready yet
discard
proc getSocketName(pattern: string): string =
var sockname = ""
var pid = $getProcessId()
while true:
inc(daemonsCount)
sockname = pattern % [pid, $daemonsCount]
if socketExists(sockname):
if tryRemoveFile(sockname):
result = sockname
break
else:
result = sockname
break
# This is forward declaration needed for newDaemonApi() # This is forward declaration needed for newDaemonApi()
proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.}
@ -532,7 +555,8 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
id: string = "", id: string = "",
daemon = DefaultDaemonFile, daemon = DefaultDaemonFile,
sockpath = DefaultSocketPath, sockpath = DefaultSocketPath,
pattern = "/tmp/nim-p2pd-$1.sock", patternSock = "/tmp/nim-p2pd-$1-$2.sock",
patternHandler = "/tmp/nim-p2pd-handle-$1-$2.sock",
poolSize = 10, poolSize = 10,
gossipsubHeartbeatInterval = 0, gossipsubHeartbeatInterval = 0,
gossipsubHeartbeatDelay = 0, gossipsubHeartbeatDelay = 0,
@ -544,77 +568,65 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
api.flags = flags api.flags = flags
api.servers = newSeq[P2PServer]() api.servers = newSeq[P2PServer]()
api.pattern = pattern api.pattern = patternHandler
api.ucounter = 1 api.ucounter = 1
api.handlers = initTable[string, P2PStreamCallback]() api.handlers = initTable[string, P2PStreamCallback]()
api.sockname = sockpath api.sockname = sockpath
if api.sockname == DefaultSocketPath: if api.sockname == DefaultSocketPath:
# If client not specify `sockpath` but tries to spawn many daemons, we will api.sockname = getSocketName(patternSock)
# replace sockname. else:
if daemonsCount != 0: if not socketExists(api.sockname):
api.sockname = DefaultSocketPattern % [$daemonsCount] raise newException(DaemonLocalError, "Unix socket is not found!")
api.address = initTAddress(api.sockname) api.address = initTAddress(api.sockname)
inc(daemonsCount) inc(daemonsCount)
# We will start daemon process only when control socket path is not default or # DHTFull and DHTClient could not be present at the same time
# options are specified. if DHTFull in flags and DHTClient in flags:
if flags == {} and api.sockname == DefaultSocketPath: api.flags.excl(DHTClient)
discard # PSGossipSub and PSFloodSub could not be present at the same time
else: if PSGossipSub in flags and PSFloodSub in flags:
# DHTFull and DHTClient could not be present at the same time api.flags.excl(PSFloodSub)
if DHTFull in flags and DHTClient in flags: if DHTFull in api.flags:
api.flags.excl(DHTClient) args.add("-dht")
# PSGossipSub and PSFloodSub could not be present at the same time if DHTClient in api.flags:
if PSGossipSub in flags and PSFloodSub in flags: args.add("-dhtClient")
api.flags.excl(PSFloodSub) if {Bootstrap, WaitBootstrap} * api.flags != {}:
if DHTFull in api.flags: args.add("-b")
args.add("-dht") if Verbose in api.flags:
if DHTClient in api.flags: env = newStringTable("IPFS_LOGGING", "debug", modeCaseSensitive)
args.add("-dhtClient") if PSGossipSub in api.flags:
if {Bootstrap, WaitBootstrap} * api.flags != {}: args.add("-pubsub")
args.add("-b") args.add("-pubsubRouter=gossipsub")
if Verbose in api.flags: if gossipsubHeartbeatInterval != 0:
env = newStringTable("IPFS_LOGGING", "debug", modeCaseSensitive) let param = $gossipsubHeartbeatInterval & "ms"
if PSGossipSub in api.flags: args.add("-gossipsubHeartbeatInterval=" & param)
args.add("-pubsub") if gossipsubHeartbeatDelay != 0:
args.add("-pubsubRouter=gossipsub") let param = $gossipsubHeartbeatDelay & "ms"
if gossipsubHeartbeatInterval != 0: args.add("-gossipsubHeartbeatInitialDelay=" & param)
let param = $gossipsubHeartbeatInterval & "ms" if PSFloodSub in api.flags:
args.add("-gossipsubHeartbeatInterval=" & param) args.add("-pubsub")
if gossipsubHeartbeatDelay != 0: args.add("-pubsubRouter=floodsub")
let param = $gossipsubHeartbeatDelay & "ms" if api.flags * {PSFloodSub, PSGossipSub} != {}:
args.add("-gossipsubHeartbeatInitialDelay=" & param) if PSNoSign in api.flags:
if PSFloodSub in api.flags: args.add("-pubsubSign=false")
args.add("-pubsub") if PSStrictSign in api.flags:
args.add("-pubsubRouter=floodsub") args.add("-pubsubSignStrict=true")
if api.flags * {PSFloodSub, PSGossipSub} != {}: if NATPortMap in api.flags:
if PSNoSign in api.flags: args.add("-natPortMap=true")
args.add("-pubsubSign=false") if len(bootstrapNodes) > 0:
if PSStrictSign in api.flags: args.add("-bootstrapPeers=" & bootstrapNodes.join(","))
args.add("-pubsubSignStrict=true") if len(id) != 0:
if NATPortMap in api.flags: args.add("-id=" & id)
args.add("-natPortMap=true") if api.sockname != DefaultSocketPath:
if len(bootstrapNodes) > 0: args.add("-sock=" & api.sockname)
args.add("-bootstrapPeers=" & bootstrapNodes.join(","))
if len(id) != 0:
args.add("-id=" & id)
if api.sockname != DefaultSocketPath:
args.add("-sock=" & api.sockname)
# We are trying to get absolute daemon path. # We are trying to get absolute daemon path.
let cmd = findExe(daemon) let cmd = findExe(daemon)
if len(cmd) == 0: if len(cmd) == 0:
raise newException(DaemonLocalError, "Could not find daemon executable!") raise newException(DaemonLocalError, "Could not find daemon executable!")
# We will try to remove control socket file, because daemon will fail
# if its not able to create new socket control file.
# We can't use `existsFile()` because it do not support unix-domain socket
# endpoints.
if socketExists(api.sockname):
if not tryRemoveFile(api.sockname):
if api.sockname != sockpath:
raise newException(DaemonLocalError, "Socket is already bound!")
# Starting daemon process # Starting daemon process
api.process = startProcess(cmd, "", args, env, {poStdErrToStdOut}) api.process = startProcess(cmd, "", args, env, {poStdErrToStdOut})
# Waiting until daemon will not be bound to control socket. # Waiting until daemon will not be bound to control socket.
@ -797,7 +809,7 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string],
handler: P2PStreamCallback) {.async.} = handler: P2PStreamCallback) {.async.} =
## Add stream handler ``handler`` for set of protocols ``protocols``. ## Add stream handler ``handler`` for set of protocols ``protocols``.
var transp = await api.newConnection() var transp = await api.newConnection()
var sockname = api.pattern % [$api.ucounter] var sockname = api.pattern % [$getProcessId(), $api.ucounter]
var localaddr = initTAddress(sockname) var localaddr = initTAddress(sockname)
inc(api.ucounter) inc(api.ucounter)
var server = createStreamServer(localaddr, streamHandler, udata = api) var server = createStreamServer(localaddr, streamHandler, udata = api)