mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2025-03-02 17:10:44 +00:00
Finish the autorelay & add a simple test
This commit is contained in:
parent
2d8be01db9
commit
5bf4fa8953
@ -28,15 +28,16 @@ type
|
|||||||
client: RelayClient
|
client: RelayClient
|
||||||
npeers: int
|
npeers: int
|
||||||
peers: Table[PeerId, Future[void]]
|
peers: Table[PeerId, Future[void]]
|
||||||
|
peerJoined: AsyncEvent
|
||||||
onReservation: OnReservationHandler
|
onReservation: OnReservationHandler
|
||||||
|
|
||||||
proc reserveAndUpdate(self: AutoRelay, peerId: PeerId) {.async.} =
|
proc reserveAndUpdate(self: AutoRelay, relayPid: PeerId, selfPid: PeerId) {.async.} =
|
||||||
while self.running:
|
while self.running:
|
||||||
let
|
let
|
||||||
rsvp = await self.client.reserve(peerId).wait(chronos.seconds(5))
|
rsvp = await self.client.reserve(relayPid).wait(chronos.seconds(5))
|
||||||
relayedAddr = MultiAddress.init("").tryGet() # TODO
|
relayedAddr = MultiAddress.init($(rsvp.addrs[0]) &
|
||||||
# relayedAddr = MultiAddress.init($(rsvp.addrs[0]) & "/p2p-circuit/p2p/" &
|
"/p2p-circuit/p2p/" &
|
||||||
# $(switch.peerInfo.peerId).tryGet()
|
$selfPid).tryGet()
|
||||||
await self.onReservation(relayedAddr)
|
await self.onReservation(relayedAddr)
|
||||||
await sleepAsync chronos.seconds(rsvp.expire.int64 - times.now().utc.toTime.toUnix)
|
await sleepAsync chronos.seconds(rsvp.expire.int64 - times.now().utc.toTime.toUnix)
|
||||||
|
|
||||||
@ -48,16 +49,22 @@ method setup*(self: AutoRelay, switch: Switch) {.async, gcsafe.} =
|
|||||||
proc handlePeer(peerId: PeerId, event: PeerEvent) {.async.} =
|
proc handlePeer(peerId: PeerId, event: PeerEvent) {.async.} =
|
||||||
if event.kind == Left and peerId in self.peers:
|
if event.kind == Left and peerId in self.peers:
|
||||||
self.peers[peerId].cancel()
|
self.peers[peerId].cancel()
|
||||||
|
elif event.kind == Joined and self.peers.len < self.npeers:
|
||||||
|
self.peerJoined.fire()
|
||||||
|
switch.addPeerEventHandler(handlePeer, Joined)
|
||||||
|
switch.addPeerEventHandler(handlePeer, Left)
|
||||||
|
|
||||||
method innerRun(self: AutoRelay, switch: Switch) {.async, gcsafe.} =
|
method innerRun(self: AutoRelay, switch: Switch) {.async, gcsafe.} =
|
||||||
while true:
|
while true:
|
||||||
# Remove peers that failed
|
# Remove peers that failed
|
||||||
var peersToRemove: seq[PeerId]
|
var peersToRemove: seq[PeerId]
|
||||||
for k, v in self.peers:
|
for k, v in self.peers:
|
||||||
if v.failed():
|
if v.failed() or v.cancelled():
|
||||||
peersToRemove.add(k)
|
peersToRemove.add(k)
|
||||||
for k in peersToRemove:
|
for k in peersToRemove:
|
||||||
self.peers.del(k)
|
self.peers.del(k)
|
||||||
|
if peersToRemove.len() > 0:
|
||||||
|
await sleepAsync(500.millis) # To avoid ddosing our peers in certain condition
|
||||||
|
|
||||||
# Get all connected peers
|
# Get all connected peers
|
||||||
let rng = newRng()
|
let rng = newRng()
|
||||||
@ -65,13 +72,18 @@ method innerRun(self: AutoRelay, switch: Switch) {.async, gcsafe.} =
|
|||||||
connectedPeers.keepItIf(RelayV2HopCodec in switch.peerStore[ProtoBook][it])
|
connectedPeers.keepItIf(RelayV2HopCodec in switch.peerStore[ProtoBook][it])
|
||||||
rng.shuffle(connectedPeers)
|
rng.shuffle(connectedPeers)
|
||||||
|
|
||||||
for peerId in switch.connectedPeers(Direction.Out):
|
for relayPid in switch.connectedPeers(Direction.Out):
|
||||||
if self.peers.len() >= self.npeers:
|
if self.peers.len() >= self.npeers:
|
||||||
break
|
break
|
||||||
if RelayV2HopCodec in switch.peerStore[ProtoBook][peerId]:
|
if RelayV2HopCodec in switch.peerStore[ProtoBook][relayPid]:
|
||||||
self.peers[peerId] = self.reserveAndUpdate(peerId)
|
self.peers[relayPid] = self.reserveAndUpdate(relayPid, switch.peerInfo.peerId)
|
||||||
let peersFutures = toSeq(self.peers.values())
|
let peersFutures = toSeq(self.peers.values())
|
||||||
# await race(peersFutures) TODO
|
|
||||||
|
if self.peers.len() < self.npeers:
|
||||||
|
self.peerJoined.clear()
|
||||||
|
await one(peersFutures) or self.peerJoined.wait()
|
||||||
|
else:
|
||||||
|
discard await one(peersFutures)
|
||||||
|
|
||||||
method run*(self: AutoRelay, switch: Switch) {.async, gcsafe.} =
|
method run*(self: AutoRelay, switch: Switch) {.async, gcsafe.} =
|
||||||
if self.running:
|
if self.running:
|
||||||
@ -87,8 +99,11 @@ method stop*(self: AutoRelay, switch: Switch) {.async, gcsafe.} =
|
|||||||
self.running = false
|
self.running = false
|
||||||
self.runner.cancel()
|
self.runner.cancel()
|
||||||
|
|
||||||
method new(T: typedesc[AutoRelay],
|
proc new*(T: typedesc[AutoRelay],
|
||||||
npeers: int,
|
npeers: int,
|
||||||
client: RelayClient,
|
client: RelayClient,
|
||||||
onReservation: OnReservationHandler): T =
|
onReservation: OnReservationHandler): T =
|
||||||
T(npeers: npeers, client: client, onReservation: onReservation)
|
T(npeers: npeers,
|
||||||
|
client: client,
|
||||||
|
onReservation: onReservation,
|
||||||
|
peerJoined: newAsyncEvent())
|
||||||
|
49
tests/testautorelay.nim
Normal file
49
tests/testautorelay.nim
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import bearssl, chronos, options
|
||||||
|
import ../libp2p
|
||||||
|
import ../libp2p/[protocols/connectivity/relay/relay,
|
||||||
|
protocols/connectivity/relay/messages,
|
||||||
|
protocols/connectivity/relay/utils,
|
||||||
|
protocols/connectivity/relay/client,
|
||||||
|
protocols/connectivity/autorelay]
|
||||||
|
import ./helpers
|
||||||
|
import std/times
|
||||||
|
import stew/byteutils
|
||||||
|
|
||||||
|
proc createSwitch(r: Relay): Switch =
|
||||||
|
result = SwitchBuilder.new()
|
||||||
|
.withRng(newRng())
|
||||||
|
.withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ])
|
||||||
|
.withTcpTransport()
|
||||||
|
.withMplex()
|
||||||
|
.withNoise()
|
||||||
|
.withCircuitRelay(r)
|
||||||
|
.build()
|
||||||
|
|
||||||
|
suite "Autorelay":
|
||||||
|
asyncTeardown:
|
||||||
|
checkTrackers()
|
||||||
|
|
||||||
|
asyncTest "Simple test":
|
||||||
|
let relay = createSwitch(Relay.new())
|
||||||
|
let client = RelayClient.new()
|
||||||
|
let switch = createSwitch(client)
|
||||||
|
let fut = newFuture[void]()
|
||||||
|
proc checkMA(address: MultiAddress) {.async.} =
|
||||||
|
check: address == MultiAddress.init($relay.peerInfo.addrs[0] & "/p2p/" &
|
||||||
|
$relay.peerInfo.peerId & "/p2p-circuit/p2p/" &
|
||||||
|
$switch.peerInfo.peerId).get()
|
||||||
|
fut.complete()
|
||||||
|
let autorelay = AutoRelay.new(3, client, checkMA)
|
||||||
|
switch.addService(autorelay)
|
||||||
|
await switch.start()
|
||||||
|
await relay.start()
|
||||||
|
await switch.connect(relay.peerInfo.peerId, relay.peerInfo.addrs)
|
||||||
|
|
||||||
|
discard autorelay.run(switch)
|
||||||
|
|
||||||
|
await fut
|
||||||
|
await switch.stop()
|
||||||
|
await relay.stop()
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user