diff --git a/benchmarks/codex/config.py b/benchmarks/codex/config.py index 3f02233..c2688f4 100644 --- a/benchmarks/codex/config.py +++ b/benchmarks/codex/config.py @@ -114,6 +114,7 @@ class CodexExperimentConfig( seed=random.randint(0, 2**16), meta=CodexMeta(f"dataset-{seeder_set}-{experiment_run}"), logging_cooldown=self.logging_cooldown, + stagger_delay=self.stagger_delay, ) ) diff --git a/benchmarks/core/experiments/dissemination_experiment/config.py b/benchmarks/core/experiments/dissemination_experiment/config.py index fcc424a..1d969e1 100644 --- a/benchmarks/core/experiments/dissemination_experiment/config.py +++ b/benchmarks/core/experiments/dissemination_experiment/config.py @@ -32,6 +32,12 @@ class DisseminationExperimentConfig(ConfigModel, Generic[TNodeConfig, TNodeSetCo description="Time to wait after the last download completes before tearing down the experiment.", ) + stagger_delay: float = Field( + ge=0, + default=0, + description="Delay in seconds between starting each leecher (0 = simultaneous).", + ) + @computed_field # type: ignore @property def experiment_type(self) -> str: diff --git a/benchmarks/core/experiments/dissemination_experiment/static.py b/benchmarks/core/experiments/dissemination_experiment/static.py index 4b05049..992fd3f 100644 --- a/benchmarks/core/experiments/dissemination_experiment/static.py +++ b/benchmarks/core/experiments/dissemination_experiment/static.py @@ -32,6 +32,7 @@ class StaticDisseminationExperiment( seed: int, concurrency: Optional[int] = None, logging_cooldown: int = 0, + stagger_delay: float = 0, experiment_id: Optional[str] = None, ) -> None: self.nodes = network @@ -48,6 +49,7 @@ class StaticDisseminationExperiment( ) self._cid: Optional[TNetworkHandle] = None self.logging_cooldown = logging_cooldown + self.stagger_delay = stagger_delay def experiment_id(self) -> Optional[str]: return self._experiment_id @@ -77,14 +79,19 @@ class StaticDisseminationExperiment( f"Setting up leechers: {[str(leecher) for leecher in leechers]}" ) - def _leech(leecher): + def _leech(leecher, delay): + if delay > 0: + sleep(delay) _log_request(leecher, "leech", str(self.meta), EventBoundary.start) download = leecher.leech(self._cid) _log_request(leecher, "leech", str(self.meta), EventBoundary.end) return download downloads = ensure_successful( - [self._executor.submit(_leech, leecher) for leecher in leechers] + [ + self._executor.submit(_leech, leecher, i * self.stagger_delay) + for i, leecher in enumerate(leechers) + ] ) with experiment_stage(self, "downloading"): diff --git a/config/codex/experiments.k8s.yaml b/config/codex/experiments.k8s.yaml index e4c32d2..bc05ba5 100644 --- a/config/codex/experiments.k8s.yaml +++ b/config/codex/experiments.k8s.yaml @@ -9,6 +9,7 @@ codex_experiment: remove_data: ${REMOVE_DATA} # No need for cooldown as Codex takes forever to remove files, so there's plenty of time to log stuff. :-) logging_cooldown: 0 + stagger_delay: ${STAGGER_DELAY} nodes: network_size: ${NETWORK_SIZE} diff --git a/k8s/charts/codex/templates/testrunner-job.yaml b/k8s/charts/codex/templates/testrunner-job.yaml index 2ceb34d..75f26b9 100644 --- a/k8s/charts/codex/templates/testrunner-job.yaml +++ b/k8s/charts/codex/templates/testrunner-job.yaml @@ -50,6 +50,8 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + - name: STAGGER_DELAY + value: {{ .Values.experiment.staggerDelay | quote }} resources: requests: diff --git a/k8s/charts/codex/values.yaml b/k8s/charts/codex/values.yaml index d24a70f..99bc418 100644 --- a/k8s/charts/codex/values.yaml +++ b/k8s/charts/codex/values.yaml @@ -25,6 +25,9 @@ experiment: # than deleting it, but requires more space. removeData: true + # Delay in seconds between starting each leecher (0 = simultaneous). + staggerDelay: 0 + deployment: appName: ""