From 6fdf9fc66e7a8965f0d23a6d01b5fd9a33cbf615 Mon Sep 17 00:00:00 2001 From: "Michael Bradley, Jr" Date: Tue, 16 Aug 2022 09:33:21 -0500 Subject: [PATCH] [wip] code from uncommitted testground-nim-sdk example --- testground/hello_codex/hello_codex.nim | 118 ++++++++++++++++++++++++- testground/hello_codex/manifest.toml | 2 +- 2 files changed, 116 insertions(+), 4 deletions(-) diff --git a/testground/hello_codex/hello_codex.nim b/testground/hello_codex/hello_codex.nim index f47eae10..e421f986 100644 --- a/testground/hello_codex/hello_codex.nim +++ b/testground/hello_codex/hello_codex.nim @@ -1,3 +1,115 @@ -echo "HELLO FROM CODEX" -echo "HELLO FROM CODEX" -echo "HELLO FROM CODEX" +import std/strutils, stew/byteutils, parseutils, hashes, tables, sequtils, sets +import libp2p +import libp2p/protocols/pubsub/rpc/messages +import libp2p/protocols/pubsub/peertable +import testground_sdk + +proc msgIdProvider(m: Message): Result[MessageID, ValidationResult] = + return ok(($m.data.hash).toBytes()) + +testground(client): + let + myId = await client.signalAndWait("setup", client.testInstanceCount) + # Ugly IP generation + myIp = client.testSubnet.split('.')[0..1].join(".") & ".1." & $myId + await client.updateNetworkParameter( + NetworkConf( + network: "default", + ipv4: some myIp & "/24", + enable: true, + callback_state: "network_setup", + callback_target: some client.testInstanceCount, + routing_policy: "accept_all", + default: LinkShape( + latency: 100000000, + # jitter: 100000000, + ) + + ) + ) + + # Subscribing early to the pubsub topic to make sure we don't + # miss any address + let addressQueue = client.subscribe("addresses") + + # The sidecar will signal for "network_setup" when the network is ready + # (callback_state above) + await client.waitForBarrier("network_setup", client.testInstanceCount) + + let + ma = MultiAddress.init("/ip4/" & myIp & "/tcp/0").tryGet() + rng = libp2p.crypto.newRng() + switch = + SwitchBuilder.new() + .withRng(rng) + .withAddress(ma) + .withNoise() + .withMplex() + .withTcpTransport() + .withMaxConnections(client.testInstanceCount * 2) + .build() + + pubsub = GossipSub.init( + switch = switch, + msgIdProvider = msgIdProvider, + verifySignature = false, + anonymize = true, # Signing take a few milliseconds + triggerSelf = true) + switch.mount(pubsub) + + await switch.start() + await pubsub.start() + + let gotMsg = newFuture[void]() + + pubsub.subscribe("letopic", + proc (topic: string, data: seq[byte]) {.async.} = + doAssert data[0] == 12 + gotMsg.complete() + ) + + # Ugly broadcast + await client.publish("addresses", $myId & "," & $switch.peerInfo.addrs[0] & "," & $switch.peerInfo.peerId) + var + peersInfo: seq[string] + toConnect = client.testInstanceCount + # Retrieve every addresses + while peersInfo.len < toConnect: + peersInfo.add(await addressQueue.popLast()) + + rng.shuffle(peersInfo) + # Connect to 15 peers max (in local scenario I had issues when creating + # too many connections (>2500), caused "No route to host" errors.) + if peersInfo.len > 15: + peersInfo = peersInfo[0..<15] + for p in peersInfo: + let dress = p.split(",") + let pid = PeerId.init(dress[2]).tryGet() + var otherId: int + discard parseInt(dress[0], otherId) + toConnect.dec + if otherId < myId and pid != switch.peerInfo.peerId: + try: + await switch.connect(pid, @[MultiAddress.init(dress[1]).tryGet()]) + echo "Connected to ", dress, " from ", myId + except CatchableError as exc: + echo "CAN'T DIAL ", dress, " from ", myId + echo exc.msg + + # safety sleep + await sleepAsync(1.seconds) + discard await client.signalAndWait("readyforaction", client.testInstanceCount) + + let start = Moment.now() + if myId == 1: + # First node publishes + discard await pubsub.publish("letopic", @[12.byte]) + + if await gotMsg.withTimeout(60.seconds): + echo "FINISHED ",myId, ": ", Moment.now() - start + else: + echo myId, " never got the message :'(" + + discard await client.signalAndWait("finished", client.testInstanceCount) + + await switch.stop() diff --git a/testground/hello_codex/manifest.toml b/testground/hello_codex/manifest.toml index 013ab6a7..44fb1dd6 100644 --- a/testground/hello_codex/manifest.toml +++ b/testground/hello_codex/manifest.toml @@ -11,7 +11,7 @@ enabled = true [[testcases]] name= "hello_codex" -instances = { min = 2, default = 2, max = 2 } +instances = { min = 2, default = 2, max = 200 } # [testcases.params] # payload = { type = "string", default = "Hello!" }