optimizations tests

This commit is contained in:
Tanguy 2023-03-22 10:47:27 +01:00
parent b51d7c832c
commit bc1e743e14
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
2 changed files with 40 additions and 23 deletions

View File

@ -11,6 +11,12 @@ ARG PLAN_PATH="./"
COPY ./plan/${PLAN_PATH} ./plan COPY ./plan/${PLAN_PATH} ./plan
RUN cd plan && nimble install -dy && \ RUN cd plan && nimble install -dy && \
nim c -d:chronicles_log_level=NOTICE \ nim c -d:chronicles_log_level=NOTICE \
-p:libp2p \
--NimblePath:libp2p/nimbledeps/pkgs \
-d:metrics \
--threads:on \
-d:withoutPCRE \
-d:libp2p_network_protocols_metrics \
-d:release \ -d:release \
main.nim main.nim

View File

@ -1,7 +1,8 @@
import serialization, json_serialization, stew/endians2, stew/byteutils import serialization, json_serialization, stew/endians2, stew/byteutils
import libp2p, testground_sdk, libp2p/protocols/pubsub/rpc/messages import libp2p, testground_sdk, libp2p/protocols/pubsub/rpc/messages
import libp2p/muxers/mplex/lpchannel
import chronos import chronos
import sequtils, hashes import sequtils, hashes, metrics
from times import getTime, toUnix, fromUnix, `-`, initTime, `$`, inMilliseconds from times import getTime, toUnix, fromUnix, `-`, initTime, `$`, inMilliseconds
type type
@ -21,8 +22,9 @@ testground(client):
let let
myId = await client.signal("initialized_global") myId = await client.signal("initialized_global")
isPublisher = myId <= client.param(int, "publisher_count") publisherCount = client.param(int, "publisher_count")
isAttacker = (not isPublisher) and myId - client.param(int, "publisher_count") <= client.param(int, "attacker_count") isPublisher = myId <= publisherCount
isAttacker = (not isPublisher) and myId - publisherCount <= client.param(int, "attacker_count")
rng = libp2p.newRng() rng = libp2p.newRng()
address = addresses[0][0].host address = addresses[0][0].host
switch = switch =
@ -44,7 +46,8 @@ testground(client):
verifySignature = false, verifySignature = false,
anonymize = true, anonymize = true,
) )
# gossipSub.parameters.floodPublish = false gossipSub.parameters.floodPublish = false
#gossipSub.parameters.lazyPushMinSize = 10000
gossipSub.parameters.opportunisticGraftThreshold = 10000 gossipSub.parameters.opportunisticGraftThreshold = 10000
gossipSub.parameters.heartbeatInterval = 700.milliseconds gossipSub.parameters.heartbeatInterval = 700.milliseconds
gossipSub.parameters.pruneBackoff = 3.seconds gossipSub.parameters.pruneBackoff = 3.seconds
@ -72,13 +75,13 @@ testground(client):
startOfTest: Moment startOfTest: Moment
attackAfter = seconds(client.param(int, "attack_after")) attackAfter = seconds(client.param(int, "attack_after"))
proc messageValidator(topic: string, msg: Message): Future[ValidationResult] {.async.} = proc messageValidator(topic: string, msg: Message): Future[ValidationResult] {.async.} =
return #await sleepAsync(milliseconds(rng.rand(50)))
if Moment.now - startOfTest >= attackAfter: if isAttacker and Moment.now - startOfTest >= attackAfter:
ValidationResult.Ignore return ValidationResult.Ignore
else:
ValidationResult.Accept return ValidationResult.Accept
gossipSub.subscribe("test", messageHandler) gossipSub.subscribe("test", messageHandler)
if isAttacker:
gossipSub.addValidator(["test"], messageValidator) gossipSub.addValidator(["test"], messageValidator)
switch.mount(gossipSub) switch.mount(gossipSub)
await switch.start() await switch.start()
@ -125,14 +128,15 @@ testground(client):
callback_target: some client.testInstanceCount, callback_target: some client.testInstanceCount,
routing_policy: "accept_all", routing_policy: "accept_all",
default: LinkShape( default: LinkShape(
latency: 100000000, # latency: 100000000,
# jitter: 100000000, jitter: 100_000_000, # in nanoseconds
bandwidth: 25_000_000, # bits per seconds
) )
) )
) )
await client.waitForBarrier("connected", client.testInstanceCount) await client.waitForBarrier("connected", client.testInstanceCount)
#discard await client.signalAndWait("connected", client.testInstanceCount)
let let
maxMessageDelay = client.param(int, "max_message_delay") maxMessageDelay = client.param(int, "max_message_delay")
@ -143,15 +147,22 @@ testground(client):
# wait for mesh to be setup # wait for mesh to be setup
for i in 0 ..< warmupMessages: for i in 0 ..< warmupMessages:
await sleepAsync(milliseconds(maxMessageDelay div 2)) await sleepAsync(milliseconds(maxMessageDelay div 2))
doAssert((await gossipSub.publish("test", @(toBytesLE(uint64(myId * 1000 + i))))) > 0) if i mod publisherCount == myId:
let warmupMsg = @(toBytesLE(uint64(myId * 1000 + i))) & newSeq[byte](500_000)
doAssert((await gossipSub.publish("test", warmupMsg)) > 0)
for _ in 0 ..< client.param(int, "message_count"): for msg in 0 ..< client.param(int, "message_count"):
await sleepAsync(milliseconds(rng.rand(maxMessageDelay))) #await sleepAsync(milliseconds(rng.rand(maxMessageDelay)))
await sleepAsync(6.seconds) # half a slot, for faster sims
if msg mod publisherCount == myId:
let let
now = getTime() now = getTime()
nowInt = seconds(now.toUnix()) + nanoseconds(times.nanosecond(now)) nowInt = seconds(now.toUnix()) + nanoseconds(times.nanosecond(now))
nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](500_000)
#echo "sending ", uint64(nowInt.nanoseconds) #echo "sending ", uint64(nowInt.nanoseconds)
doAssert((await gossipSub.publish("test", nowBytes)) > 0) doAssert((await gossipSub.publish("test", nowBytes)) > 0)
discard await client.signalAndWait("done", client.testInstanceCount) discard await client.signalAndWait("done", client.testInstanceCount)
echo "BW: ", libp2p_protocols_bytes.value(labelValues=["/meshsub/1.1.0", "in"]) + libp2p_protocols_bytes.value(labelValues=["/meshsub/1.1.0", "out"])
echo "DUPS: ", libp2p_gossipsub_duplicate.value(), " / ", libp2p_gossipsub_received.value()
echo "WAITED: ", libp2p_waited.value()