wip: initial bootstrap implementation

This commit is contained in:
Dmitriy Ryajov 2020-02-21 17:52:39 -06:00
parent 568a82013e
commit 8809af9c9d
9 changed files with 154 additions and 15 deletions

View File

@ -44,26 +44,30 @@ template postInit(peerinfo: PeerInfo,
peerinfo.protocols = @protocols
peerinfo.lifefut = newFuture[void]("libp2p.peerinfo.lifetime")
proc init*(p: typedesc[PeerInfo], key: PrivateKey,
proc init*(p: typedesc[PeerInfo],
key: PrivateKey,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.inline.} =
result = PeerInfo(keyType: HasPrivate, peerId: PeerID.init(key),
privateKey: key)
result.postInit(addrs, protocols)
proc init*(p: typedesc[PeerInfo], peerId: PeerID,
proc init*(p: typedesc[PeerInfo],
peerId: PeerID,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.inline.} =
result = PeerInfo(keyType: HasPublic, peerId: peerId)
result.postInit(addrs, protocols)
proc init*(p: typedesc[PeerInfo], peerId: string,
proc init*(p: typedesc[PeerInfo],
peerId: string,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.inline.} =
result = PeerInfo(keyType: HasPublic, peerId: PeerID.init(peerId))
result.postInit(addrs, protocols)
proc init*(p: typedesc[PeerInfo], key: PublicKey,
proc init*(p: typedesc[PeerInfo],
key: PublicKey,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.inline.} =
result = PeerInfo(keyType: HasPublic, peerId: PeerID.init(key),

View File

@ -7,4 +7,113 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import options, sequtils, tables
import chronos, chronicles
import discovery,
../utils/timedcache,
../../multiaddress,
../../protobuf/minprotobuf,
../../crypto/crypto,
../../connection,
../../peerinfo,
../../switch
const Codec* = "/eth2/bootstrap/1.0"
const MaxPeers* = 100
type
Bootstrap* = ref object of Discovery
switch: Switch
bootstrapPeers*: seq[PeerInfo]
maxPeers: int
refreshTask: Future[void]
running: bool
sentPeers: TimedCache[PeerInfo]
proc encode(msg: seq[PeerInfo]): ProtoBuffer =
result = initProtoBuffer()
var peerMsg: ProtoBuffer
for m in msg:
peerMsg = initProtoBuffer()
peerMsg.write(initProtoField(1, m.publicKey().get().getBytes()))
for ma in m.addrs:
peerMsg.write(initProtoField(2, ma.data.buffer))
peerMsg.finish()
result.write(initProtoField(1, peerMsg))
result.finish()
proc decode(msg: seq[byte]): seq[PeerInfo] =
var pb = initProtoBuffer(msg)
while pb.enterSubMessage() > 0:
var pubKey: PublicKey
if pb.getValue(1, pubKey) < 0:
warn "unable to read pubkey"
continue
var address = newSeq[byte]()
var mas: seq[MultiAddress]
while pb.getBytes(2, address) > 0:
if len(address) != 0:
var ma = MultiAddress.init(address)
mas.add(ma)
trace "read address bytes from message", address = ma
address.setLen(0)
result.add(PeerInfo.init(pubKey, mas))
method init(b: Bootstrap) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var msg = encode(toSeq(b.switch.connections.values).mapIt( it.peerInfo ))
await conn.writeLp(msg.buffer)
b.codec = Codec
b.handler = handler
proc newBootstrap*(b: type[Bootstrap],
bootstrapPeers: seq[PeerInfo],
switch: Switch,
onNewPeers: NewPeersHandler,
interval: Duration = DefaultDiscoveryInterval,
peersTimeout: Duration = DefaultPeersTimeout,
maxPeers: int = MaxPeers): b =
Bootstrap(switch: switch,
bootstrapPeers: bootstrapPeers,
interval: interval,
onNewPeers: onNewPeers,
peersTimeout: peersTimeout,
maxPeers: maxPeers)
proc getNewPeers(b: Bootstrap) {.async.} =
await b.switch.onStarted.wait() # wait for the switch to start
while b.running:
var dials: seq[Future[Connection]]
for p in b.bootstrapPeers:
dials.add(b.switch.dial(p, Codec))
await allFutures(dials)
var bootstraps: seq[Connection]
for d in dials:
try:
bootstraps.add(d.read())
except CatchableError as exc:
warn "unable to connect to bootstrap", exc = exc.msg
continue
for bootstrap in bootstraps:
let msg = await bootstrap.readLp()
await bootstrap.close()
for p in msg.decode():
b.peers.put(p.id, p)
await b.onNewPeers(b.getPeers) # notify listeners of new peers
await sleepAsync(b.interval)
method start*(b: Bootstrap) {.async.} =
b.running = true
b.refreshTask = b.getNewPeers()
method stop*(b: Bootstrap) {.async.} =
b.running = false
await b.refreshTask

View File

@ -7,12 +7,13 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import chronos
import chronos, sequtils
import ../protocol,
../../peerinfo,
../utils/timedcache
const DefaultDiscoveryInterval* = 10.seconds
const DefaultPeersTimeout* = 5.minutes
type
NewPeersHandler* = proc(peers: seq[PeerInfo]): Future[void]
@ -24,11 +25,15 @@ type
proc newDiscovery*(d: type[Discovery],
onNewPeers: NewPeersHandler,
interval: Duration = DefaultDiscoveryInterval): d =
interval: Duration = DefaultDiscoveryInterval,
peersTimeout: Duration = DefaultPeersTimeout): d =
Discovery(onNewPeers: onNewPeers,
interval: interval,
peers: newTimedCache[PeerInfo]())
proc getPeers*(d: Discovery): seq[PeerInfo] =
d.peers.entries().mapIt( it.val )
method start*(d: Discovery) {.base, async.} =
doAssert(false, "Not implmented!")

View File

@ -7,12 +7,12 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import tables, sets, options, sequtils
import tables, sets, options, sequtils, random
import chronos, chronicles
import pubsub,
floodsub,
pubsubpeer,
../utils/mcache,
mcache,
rpc/[messages, message],
../utils/timedcache,
../../crypto/crypto,

View File

@ -195,13 +195,12 @@ proc decodeMessages*(pb: var ProtoBuffer): seq[Message] {.gcsafe.} =
break
trace "read message field", seqno = msg.seqno
var topic: string
while true:
var topic: string
if pb.getString(4, topic) < 0:
break
msg.topicIDs.add(topic)
trace "read message field", topicName = topic
topic = ""
discard pb.getBytes(5, msg.signature)
trace "read message field", signature = msg.signature

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import tables
import tables, sequtils
import chronos, chronicles
logScope:
@ -18,8 +18,8 @@ const Timeout* = 10.seconds # default timeout in ms
type
ExpireHandler*[V] = proc(key: string, val: V) {.gcsafe.}
TimedEntry*[V] = object of RootObj
val: V
handler: ExpireHandler[V]
val*: V
handler*: ExpireHandler[V]
TimedCache*[V] = ref object of RootObj
cache: Table[string, TimedEntry[V]]
@ -73,6 +73,9 @@ proc `[]`*[V](t: TimedCache[V], key: string): V =
proc `[]=`*[V](t: TimedCache[V], key: string, val: V): V =
t.put(key, val)
proc entries*[V](t: TimedCache[V]): seq[TimedEntry[V]] =
toSeq(t.cache.values)
proc newTimedCache*[V](timeout: Duration = Timeout): TimedCache[V] =
TimedCache(cache: initTable[string, TimedEntry[V]](),
TimedCache[V](cache: initTable[string, TimedEntry[V]](),
timeout: timeout)

View File

@ -46,6 +46,7 @@ type
secureManagers*: Table[string, Secure]
pubSub*: Option[PubSub]
dialedPubSubPeers: HashSet[string]
onStarted*: AsyncEvent
proc newNoPubSubException(): ref Exception {.inline.} =
result = newException(NoPubSubException, "no pubsub provided!")
@ -279,7 +280,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
await conn.close()
await s.cleanupConn(conn)
var startFuts: seq[Future[void]]
for t in s.transports: # for each transport
for i, a in s.peerInfo.addrs:
@ -292,6 +293,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
await s.pubSub.get().start()
result = startFuts # listen for incoming connections
s.onStarted.fire()
proc stop*(s: Switch) {.async.} =
trace "stopping switch"
@ -368,6 +370,7 @@ proc newSwitch*(peerInfo: PeerInfo,
result.muxers = muxers
result.secureManagers = initTable[string, Secure]()
result.dialedPubSubPeers = initHashSet[string]()
result.onStarted = newAsyncEvent()
let s = result # can't capture result
result.streamHandler = proc(stream: Connection) {.async, gcsafe.} =

16
tests/testdiscovery.nim Normal file
View File

@ -0,0 +1,16 @@
import unittest, tables
import chronos
import ../libp2p/[protocols/discovery/discovery,
protocols/discovery/bootstrap,
switch,
connection,
peerinfo]
suite "Discovery":
suite "Bootstrap":
test "should get peers":
proc test(): Future[bool] {.async.} =
check:
waitFor(test()) == true