diff --git a/pubsub/nim/Dockerfile b/pubsub/nim/Dockerfile index e8f79f8..9cac4d3 100644 --- a/pubsub/nim/Dockerfile +++ b/pubsub/nim/Dockerfile @@ -9,6 +9,9 @@ FROM builder ARG PLAN_PATH="./" COPY ./plan/${PLAN_PATH} ./plan -RUN cd plan && nimble install -dy && nim c -d:chronicles_log_level=NOTICE -d:release main.nim +RUN cd plan && nimble install -dy && \ + nim c -d:chronicles_log_level=NOTICE \ + -d:release \ + main.nim ENTRYPOINT ["plan/main"] diff --git a/pubsub/nim/main.nim b/pubsub/nim/main.nim index 3d8811b..e1c61d8 100644 --- a/pubsub/nim/main.nim +++ b/pubsub/nim/main.nim @@ -2,7 +2,7 @@ import serialization, json_serialization, stew/endians2, stew/byteutils import libp2p, testground_sdk, libp2p/protocols/pubsub/rpc/messages import chronos import sequtils, hashes -from times import getTime, toUnix, fromUnix, `-`, initTime, `$` +from times import getTime, toUnix, fromUnix, `-`, initTime, `$`, inMilliseconds type PeerData = object @@ -22,6 +22,7 @@ testground(client): let myId = await client.signal("initialized_global") isPublisher = myId <= client.param(int, "publisher_count") + isAttacker = myId - client.param(int, "publisher_count") <= client.param(int, "attacker_count") rng = libp2p.newRng() address = addresses[0][0].host switch = @@ -29,8 +30,8 @@ testground(client): .new() .withAddress(MultiAddress.init(address).tryGet()) .withRng(rng) - #.withYamux() - .withMplex() + .withYamux() + #.withMplex() .withTcpTransport(flags = {ServerFlags.TcpNoDelay}) #.withPlainText() .withNoise() @@ -43,17 +44,39 @@ testground(client): anonymize = true, ) gossipSub.parameters.floodPublish = false - gossipSub.parameters.heartbeatInterval = 5.minutes + gossipSub.parameters.opportunisticGraftThreshold = 10000 + gossipSub.parameters.heartbeatInterval = 500.milliseconds + gossipSub.parameters.pruneBackoff = 5.seconds + gossipSub.topicParams["test"] = TopicParams( + topicWeight: 1, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesCap: 30, + firstMessageDeliveriesDecay: 0.6 + ) proc messageHandler(topic: string, data: seq[byte]) {.async.} = + let sentUint = uint64.fromBytesLE(data) + # warm-up + if sentUint < 1000000 or isAttacker: return let - sentUint = uint64.fromBytesLE(data) sentMoment = nanoseconds(int64(uint64.fromBytesLE(data))) sentNanosecs = nanoseconds(sentMoment - seconds(sentMoment.seconds)) sentDate = initTime(sentMoment.seconds, sentNanosecs) diff = getTime() - sentDate - echo sentUint, ": ", diff + echo sentUint, " milliseconds: ", diff.inMilliseconds() + + + var receivedMessage = 0 + proc messageValidator(topic: string, msg: Message): Future[ValidationResult] {.async.} = + receivedMessage.inc() + return + if receivedMessage >= client.param(int, "attack_after"): + ValidationResult.Ignore + else: + ValidationResult.Accept gossipSub.subscribe("test", messageHandler) + if isAttacker: + gossipSub.addValidator(["test"], messageValidator) switch.mount(gossipSub) await switch.start() #TODO @@ -66,7 +89,7 @@ testground(client): ) ) echo "Listening on ", switch.peerInfo.addrs - echo myId, ", ", isPublisher + echo myId, ", ", isPublisher, ", ", switch.peerInfo.peerId var peersInfo: seq[PeerData] while peersInfo.len < client.testInstanceCount: @@ -107,7 +130,12 @@ testground(client): await client.waitForBarrier("connected", client.testInstanceCount) if isPublisher: + # wait for mesh to be setup let maxMessageDelay = client.param(int, "max_message_delay") + for i in 0 ..< client.param(int, "warmup_messages"): + await sleepAsync(milliseconds(rng.rand(maxMessageDelay))) + doAssert((await gossipSub.publish("test", @(toBytesLE(uint64(myId * 1000 + i))))) > 0) + for _ in 0 ..< client.param(int, "message_count"): await sleepAsync(milliseconds(rng.rand(maxMessageDelay))) let diff --git a/pubsub/nim/manifest.toml b/pubsub/nim/manifest.toml index f1d5b21..9c69f33 100644 --- a/pubsub/nim/manifest.toml +++ b/pubsub/nim/manifest.toml @@ -17,5 +17,8 @@ instances = { min = 2, max = 100000, default = 20 } [testcases.params] connection_count = { type = "int", desc = "number of connections per node", unit = "nodes", default = 20 } publisher_count = { type = "int", desc = "number of nodes publishing", unit = "nodes", default = 20 } + attacker_count = { type = "int", desc = "number of nodes trying to swallow messages", unit = "nodes", default = 0 } + attack_after = { type = "int", desc = "start the attack after receiving N messages", unit = "msg", default = 0 } message_count = { type = "int", desc = "number of messages to send per publisher", unit = "msg", default = 20 } + warmup_messages = { type = "int", desc = "empty messages to send before test", unit = "msg", default = 10 } max_message_delay = { type = "int", desc = "max delay between messages per publisher", unit = "ms", default = 500 }