import ipywidgets as widgets import run as run_helpers from bunch import Bunch import functools import operator import stringcase import subprocess import os import toml import json import time from jupyter_ui_poll import ui_events from IPython.display import display import analyze import shutil # TODO: move constants TEMPLATE = 'templates/baseline' # set to path of testground bin if not in PATH TESTGROUND = 'testground' class RunButton(object): def __init__(self, config): self.config = config self.pressed = False self.button = widgets.Button(description='Run Test', button_style='primary') self.button.on_click(self._clicked) def _clicked(self, evt): self.pressed = True self.button.description = 'Running' self.button.button_style = 'info' self.button.disabled = True def wait(self): display(self.button) with ui_events() as poll: while self.pressed is False: poll(10) # React to UI events (upto 10 at a time) time.sleep(0.1) self._run() def _run(self): endpoint = self.config.widgets.testground.daemon_endpoint.value workdir = self.config.widgets.test_execution.output_dir.value failed_dir = self.config.widgets.test_execution.failed_dir.value run_helpers.mkdirp(workdir) run_helpers.mkdirp(failed_dir) params = self.config.template_params() comp = self.config.composition() comp_filename = os.path.join(workdir, 'composition.toml') params_filename = os.path.join(workdir, 'template-params.toml') config_snapshot_filename = os.path.join(workdir, 'config-snapshot.json') if 'k8s' in params['TEST_RUNNER']: archive_filename = os.path.join(workdir, 'test-output.tgz') else: archive_filename = os.path.join(workdir, 'test-output.zip') with open(comp_filename, 'wt') as f: f.write(comp) with open(params_filename, 'w') as f: toml.dump(params, f) with open(config_snapshot_filename, 'w') as f: json.dump(self.config.snapshot(), f) cmd = [TESTGROUND, '--vv', '--endpoint', endpoint, 'run', 'composition', '-f', comp_filename, '--collect', '-o', archive_filename] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) for line in iter(p.stdout.readline, ''): print(line, end='') if p.poll(): break self.button.description = 'Done' self.button.button_style = 'danger' return_code = p.wait() if return_code: try: shutil.move(workdir, failed_dir) except BaseException as err: print('tried to move output from failed test to {}, but failed with error: {}'.format(failed_dir, err)) raise ValueError('test execution failed, skipping analysis. moved outputs to {}'.format(failed_dir)) print('test outputs saved to {}'.format(workdir)) print('extracting test data for analysis...') analysis_dir = os.path.join(workdir, 'analysis') analyze.extract_test_outputs(archive_filename, analysis_dir, convert_to_pandas=False, prep_notebook=True) print('saved analysis outputs to {}'.format(analysis_dir)) # a collapsible panel for a single topic's params class TopicConfigPanel(object): def __init__(self): self.topic_widgets = Bunch( name=widgets.Text(description="Topic Name", value="blocks"), message_rate=widgets.Text(description="Message Rate (msg/sec)", value='120/s'), message_size=widgets.Text(description="Message Size", value="2 KiB"), ) self.topic_weight = widgets.FloatText(description="Topic Weight", value=0.25) # NOTE: don't change the description values! they're used to derive the JSON keys when # collecting the param values later self.score_widgets = Bunch( time_in_mesh=Bunch( weight=widgets.FloatText(description="Time in Mesh Weight", value=0.0027), quantum=widgets.Text(description="Time in Mesh Quantum", value='1s'), cap=widgets.FloatText(description="Time in Mesh Cap", value=3600) ), first_message_deliveries=Bunch( weight=widgets.FloatText(description="First Message Deliveries Weight", value=0.664), decay=widgets.FloatText(description="First Message Deliveries Decay", value=0.9916), cap=widgets.FloatText(description="First Message Deliveries Cap", value=1500), ), mesh_message_deliveries=Bunch( weight=widgets.FloatText(description="Mesh Message Deliveries Weight", value=-0.25), decay=widgets.FloatText(description="Mesh Message Deliveries Decay", value=0.97), cap=widgets.FloatText(description="Mesh Message Deliveries Cap", value=400), threshold=widgets.FloatText(description="Mesh Message Deliveries Threshold", value=100), activation=widgets.Text(description="Mesh Message Deliveries Activation", value="30s"), window=widgets.Text(description="Mesh Message Delivery Window", value="5ms"), ), mesh_failure_penalty=Bunch( weight=widgets.FloatText(description="Mesh Failure Penalty Weight", value=-0.25), decay=widgets.FloatText(description="Mesh Failure Penalty Decay", value=0.997), ), invalid_message_deliveries=Bunch( weight=widgets.FloatText(description="Invalid Message Deliveries Weight", value=-99), decay=widgets.FloatText(description="Invalid Message Deliveries Decay", value=0.9994), ) ) topic_panel = widgets.VBox([ labeled(self.topic_widgets.name), labeled(self.topic_widgets.message_rate), labeled(self.topic_widgets.message_size), ]) score_panel = widgets.VBox([ widgets.HTML('

Peer Score Params

'), labeled(self.topic_weight), to_collapsible_sections(self.score_widgets)], layout={'width': '900px'}) self.panel = widgets.VBox([topic_panel, score_panel], layout={'width': '900px'}) def ui(self): return self.panel def snapshot(self): return { 'topic_weight': {'value': self.topic_weight.value}, 'topic': widget_snapshot(self.topic_widgets), 'score': widget_snapshot(self.score_widgets), } def apply_snapshot(self, snapshot): if 'topic_weight' in snapshot and 'value' in snapshot['topic_weight']: self.topic_weight.value = snapshot['topic_weight']['value'] if 'topic' in snapshot: apply_snapshot(self.topic_widgets, snapshot['topic']) if 'score' in snapshot: apply_snapshot(self.score_widgets, snapshot['score']) def topic_id(self): return self.topic_widgets.name.value def topic_params(self): return { 'id': self.topic_widgets.name.value, 'message_rate': self.topic_widgets.message_rate.value, 'message_size': self.topic_widgets.message_size.value, } def score_params(self): p = { 'TopicWeight': self.topic_weight.value, } for group in self.score_widgets.values(): for param in group.values(): key = param.description.replace(' ', '') p[key] = param.value return p # ConfigPanel is a collection of widgets to set the test parameters. class ConfigPanel(object): def __init__(self): # all the widgets used to configure the test default_out_dir = os.path.join('.', 'output', 'pubsub-test-{}'.format(time.strftime("%Y%m%d-%H%M%S"))) default_failed_dir = os.path.join('.', 'output', 'failed') w = Bunch( test_execution=Bunch( output_dir=widgets.Text(description="Local directory to collect test outputs", value=default_out_dir), failed_dir=widgets.Text(description="Local dir to store output from failed runs", value=default_failed_dir) ), testground=Bunch( daemon_endpoint=widgets.Text(description="Daemon Endpoint", value='localhost:8080'), builder=widgets.Dropdown(description="Builder", options=['docker:go', 'exec:go']), runner=widgets.Dropdown(description="Runner", options=['cluster:k8s', 'local:docker', 'local:exec']), plan_dir=widgets.Text(description="Subdir of $TESTGROUND_HOME/plans containing pubsub plan", value="test-plans/pubsub/test"), keep_service=widgets.Checkbox(description="Keep pods after execution? (k8s only)", value=False), log_level=widgets.Dropdown(description="Log level to set on test instances", options=["info", "debug", "warn", "error"]), ), time=Bunch( setup=widgets.Text(description="Test Setup time", value='1m'), run=widgets.Text(description="Test Runtime", value='2m'), warm=widgets.Text(description="Warmup time", value='5s'), cool=widgets.Text(description="Cooldown time", value='10s'), ), node_counts=Bunch( total=widgets.IntText(description="Total number of test instances", disabled=True), total_peers=widgets.IntText(description="Total number of peers in all containers", disabled=True), publisher=widgets.IntText(description="Number of publisher nodes", value=100), lurker=widgets.IntText(description="Number of lurker nodes", value=50), honest_per_container=widgets.IntText(description="# of honest peers per container", value=1), ), pubsub=Bunch( branch=widgets.Text(description="go-libp2p-pubsub branch/tag/commit to target", value="master"), use_hardened_api=widgets.Checkbox(description="target hardening branch API", value=True), heartbeat=widgets.Text(description='Heartbeat interval', value='1s'), hearbeat_delay=widgets.Text(description='Initial heartbeat delay', value='100ms'), validate_queue_size=widgets.IntText(description='Size of validation queue', value=32), outbound_queue_size=widgets.IntText(description='Size of outbound RPC queue', value=32), score_inspect_period=widgets.Text(description='Interval to dump peer scores', value='5s'), full_traces=widgets.Checkbox(description='Capture full event traces)', value=False), degree=widgets.IntText(description='D: target mesh degree', value=10), degree_lo=widgets.IntText(description='D_lo: mesh degree low bound', value=8), degree_hi=widgets.IntText(description='D_hi: mesh degree upper bound', value=16), degree_score=widgets.IntText(description='D_score: peers to select by score', value=5), degree_lazy=widgets.IntText(description='D_lazy: lazy propagation degree', value=12), gossip_factor=widgets.FloatText(description='Gossip Factor', value=0.25), opportunistic_graft_ticks=widgets.IntText(description='Opportunistic Graft heartbeat ticks', value=60), ), network=Bunch( latency = widgets.Text(description="Min latency", value='5ms'), max_latency = widgets.Text(description="Max latency. If zero, latency will = min latency.", value='50ms'), jitter_pct = widgets.IntSlider(description="Latency jitter %", value=10, min=1, max=100), bandwidth_mb = widgets.IntText(description="Bandwidth (mb)", value=10240), degree=widgets.IntText(description="Degree (# of initial connections) for honest peers", value=20), # TODO: support upload of topology file # topology_file = widgets.FileUpload(description="Upload fixed topology file", accept='.json', multiple=False), ), honest_behavior=Bunch( flood_publishing=widgets.Checkbox(value=True, description='Flood Publishing', indent=False), connect_delay = widgets.Text(description='Honest peer connection delay. e.g. "30s" or "50@30s,30@1m"', value='0s'), connect_jitter_pct = widgets.BoundedIntText(description='Jitter % for honest connect delay', value=5, min=0, max=100), ), peer_score=Bunch( gossip_threshold=widgets.FloatText(description='Gossip Threshold', value=-4000), publish_threshold=widgets.FloatText(description='Publish Threshold', value=-5000), graylist_threshold=widgets.FloatText(description='Graylist Threshold', value=-10000), acceptpx_threshold=widgets.FloatText(description='Accept PX Threshold', value=0), opportunistic_graft_threshold=widgets.FloatText(description='Opportunistic Graft Threshold', value=0), ip_colocation_weight=widgets.FloatText(description='IP Colocation Factor Weight', value=0), ip_colocation_threshold=widgets.IntText(description='IP Colocation Factor Threshold', value=1), decay_interval=widgets.Text(description='Score Decay Interval', value='1s'), decay_to_zero=widgets.FloatText(description='Decay Zero Threshold', value=0.01), retain_score=widgets.Text(description="Time to Retain Score", value='30s'), ) ) # wire up node count widgets to calculate and show the total number of containers and peers # and update when the params they're derived from change sum_values(w.node_counts.total, w.node_counts.publisher, w.node_counts.lurker) mul_values(w.node_counts.total_peers, w.node_counts.total, w.node_counts.honest_per_container) self.topic_config = TopicConfigPanel() self.save_widgets = Bunch( save_button = widgets.Button(description='Save Config', button_style='primary'), load_button = widgets.Button(description='Load Saved Config', button_style='warning'), snapshot_filename = widgets.Text(description='Path:', value='configs/snapshot.json') ) self.save_widgets.save_button.on_click(self.save_clicked) self.save_widgets.load_button.on_click(self.load_clicked) save_panel = widgets.HBox(list(self.save_widgets.values())) self.panel = widgets.VBox([ to_collapsible_sections(w), collapsible("Topic Config", [self.topic_config.ui()]), save_panel, ]) self.widgets = w def ui(self): return self.panel def save_clicked(self, evt): filename = self.save_widgets.snapshot_filename.value with open(filename, 'wt') as f: json.dump(self.snapshot(), f) print('saved config snapshot to {}'.format(filename)) def load_clicked(self, evt): filename = self.save_widgets.snapshot_filename.value with open(filename, 'rt') as f: snap = json.load(f) # HACK: ignore the test_execution.output_dir param from the snapshot, to # avoid overwriting the output of a prior run if 'test_execution' in snap.get('main', {}): del(snap['main']['test_execution']['output_dir']) self.apply_snapshot(snap) print('loaded config snapshot from {}'.format(filename)) def snapshot(self): return { 'main': widget_snapshot(self.widgets), 'topic': self.topic_config.snapshot(), } def apply_snapshot(self, snapshot): if 'main' in snapshot: apply_snapshot(self.widgets, snapshot['main']) if 'topic' in snapshot: self.topic_config.apply_snapshot(snapshot['topic']) def template_params(self): w = self.widgets n_nodes = w.node_counts.total.value n_publisher = w.node_counts.publisher.value n_nodes_cont_honest = w.node_counts.honest_per_container.value n_honest_nodes = n_nodes n_honest_peers_total = n_honest_nodes * n_nodes_cont_honest n_container_nodes_total = n_honest_peers_total p = { # testground 'TEST_BUILDER': w.testground.builder.value, 'TEST_RUNNER': w.testground.runner.value, 'TEST_PLAN': w.testground.plan_dir.value, # time 'T_SETUP': w.time.setup.value, 'T_RUN': w.time.run.value, 'T_WARM': w.time.warm.value, 'T_COOL': w.time.cool.value, # node counts 'N_NODES': n_nodes, 'N_CONTAINER_NODES_TOTAL': n_container_nodes_total, 'N_PUBLISHER': n_publisher, 'N_HONEST_PEERS_PER_NODE': n_nodes_cont_honest, # pubsub 'T_HEARTBEAT': w.pubsub.heartbeat.value, 'T_HEARTBEAT_INITIAL_DELAY': w.pubsub.hearbeat_delay.value, 'T_SCORE_INSPECT_PERIOD': w.pubsub.score_inspect_period.value, 'VALIDATE_QUEUE_SIZE': w.pubsub.validate_queue_size.value, 'OUTBOUND_QUEUE_SIZE': w.pubsub.outbound_queue_size.value, 'FULL_TRACES': w.pubsub.full_traces.value, 'OVERLAY_D': w.pubsub.degree.value, 'OVERLAY_DLO': w.pubsub.degree_lo.value, 'OVERLAY_DHI': w.pubsub.degree_hi.value, 'OVERLAY_DSCORE': w.pubsub.degree_score.value, 'OVERLAY_DLAZY': w.pubsub.degree_lazy.value, 'GOSSIP_FACTOR': w.pubsub.gossip_factor.value, 'OPPORTUNISTIC_GRAFT_TICKS': w.pubsub.opportunistic_graft_ticks.value, # network 'T_LATENCY': w.network.latency.value, 'T_LATENCY_MAX': w.network.max_latency.value, 'JITTER_PCT': w.network.jitter_pct.value, 'BANDWIDTH_MB': w.network.bandwidth_mb.value, 'N_DEGREE': w.network.degree.value, # TODO: load topology file 'TOPOLOGY': {}, # honest behavior 'FLOOD_PUBLISHING': w.honest_behavior.flood_publishing.value, 'HONEST_CONNECT_DELAY_JITTER_PCT': w.honest_behavior.connect_jitter_pct.value, # topic & peer score configs 'TOPIC_CONFIG': self._topic_config(), 'PEER_SCORE_PARAMS': self._peer_score_params(), } if w.pubsub.use_hardened_api.value: p['BUILD_SELECTORS'] = [run_helpers.HARDENED_API_BUILD_TAG] else: p['BUILD_SELECTORS'] = [] p['GS_VERSION'] = run_helpers.pubsub_commit(w.pubsub.branch.value) run_config = ['log_level="{}"'.format(w.testground.log_level.value)] if w.testground.runner.value == 'cluster:k8s': buildopts = ['push_registry=true', 'registry_type="aws"'] p['BUILD_CONFIG'] = '\n'.join(buildopts) if w.testground.keep_service.value: run_config.append('keep_service=true') p['RUN_CONFIG'] = '\n'.join(run_config) # if the connect_delay param doesn't specify a count, # make it apply to all honest nodes delay = w.honest_behavior.connect_delay.value if '@' not in delay: delay = '{}@{}'.format(n_honest_peers_total, delay) p['HONEST_CONNECT_DELAYS'] = delay return p def composition(self): return run_helpers.render_template(TEMPLATE, self.template_params()) def _topic_config(self): # TODO: support multiple topics topics = [self.topic_config.topic_params()] return topics def _peer_score_params(self): p = { 'Thresholds': { 'GossipThreshold': self.widgets.peer_score.gossip_threshold.value, 'PublishThreshold': self.widgets.peer_score.publish_threshold.value, 'GraylistThreshold': self.widgets.peer_score.graylist_threshold.value, 'AcceptPXThreshold': self.widgets.peer_score.acceptpx_threshold.value, 'OpportunisticGraftThreshold': self.widgets.peer_score.opportunistic_graft_threshold.value, }, 'IPColocationFactorWeight': self.widgets.peer_score.ip_colocation_weight.value, 'IPColocationFactorThreshold': self.widgets.peer_score.ip_colocation_threshold.value, 'DecayInterval': self.widgets.peer_score.decay_interval.value, 'DecayToZero': self.widgets.peer_score.decay_to_zero.value, 'RetainScore': self.widgets.peer_score.retain_score.value, # TODO: support multiple topics 'Topics': {self.topic_config.topic_id(): self.topic_config.score_params()} } return p #### widget helpers #### def labeled(widget): if widget.description is None or widget.description == '': return widget label = widget.description widget.style.description_width = '0' return widgets.VBox([widgets.Label(value=label), widget]) def collapsible(title, params, expanded=False): grid = widgets.Layout(width='900px', grid_template_columns="repeat(2, 400px)") inner = widgets.GridBox(params, layout=grid) a = widgets.Accordion(children=[inner]) a.set_title(0, title) a.selected_index = 0 if expanded else None return a def to_collapsible_sections(w, expanded=False): # build up vbox of collapsible sections sections = [] for name, params in w.items(): title = stringcase.sentencecase(name) children = [] for p in params.values(): children.append(labeled(p)) sections.append(collapsible(title, children, expanded=expanded)) return widgets.VBox(sections, layout={'width': '900px'}) # sets the value of target widget to the sum of all arg widgets and updates when values change def sum_values(target, *args): def callback(change): if change['name'] != 'value': return target.value = functools.reduce(operator.add, [a.value for a in args]) for widget in args: widget.observe(callback) # trigger callback to set initial value callback({'name': 'value'}) # sets the value of target widget to the product of all arg widgets and updates when values change def mul_values(target, *args): def callback(change): if change['name'] != 'value': return target.value = functools.reduce(operator.mul, [a.value for a in args]) for widget in args: widget.observe(callback) # trigger callback to set initial value callback({'name': 'value'}) # takes a nested dict (or Bunch) whose leaves are widgets, # and returns a dict with the same structure, but with widgets replaced with # a snapshot of their current values def widget_snapshot(widgets): out = dict() for name, val in widgets.items(): if isinstance(val, Bunch) or isinstance(val, dict): out[name] = widget_snapshot(val) else: w = {'value': val.value} out[name] = w return out # takes a nested dict or Bunch of widgets and the output of widget_snapshot, # and sets the current widget values to the values from the snapshot def apply_snapshot(widgets, snapshot): for name, val in widgets.items(): if name not in snapshot: continue if isinstance(val, Bunch) or isinstance(val, dict): apply_snapshot(val, snapshot[name]) else: s = snapshot[name] if 'value' in s: val.value = s['value']