From 0af715b62b0d809f3d1abd357bbaf672f48b341f Mon Sep 17 00:00:00 2001 From: Ivan Danyliuk Date: Tue, 17 Apr 2018 13:10:35 +0200 Subject: [PATCH] Initial commit --- .gitignore | 1 + cmd/simulator/.gitignore | 3 + cmd/simulator/README.md | 4 + cmd/simulator/main.go | 64 +++++++ recorder/recorder.go | 48 +++++ simulation/naivep2p/links.go | 38 ++++ simulation/naivep2p/logentry.go | 89 ++++++++++ simulation/naivep2p/simulator.go | 122 +++++++++++++ simulation/simulation.go | 15 ++ simulation/whisperv6/logentry.go | 28 +++ simulation/whisperv6/message.go | 31 ++++ simulation/whisperv6/simulator.go | 283 ++++++++++++++++++++++++++++++ 12 files changed, 726 insertions(+) create mode 100644 .gitignore create mode 100644 cmd/simulator/.gitignore create mode 100644 cmd/simulator/README.md create mode 100644 cmd/simulator/main.go create mode 100644 recorder/recorder.go create mode 100644 simulation/naivep2p/links.go create mode 100644 simulation/naivep2p/logentry.go create mode 100644 simulation/naivep2p/simulator.go create mode 100644 simulation/simulation.go create mode 100644 simulation/whisperv6/logentry.go create mode 100644 simulation/whisperv6/message.go create mode 100644 simulation/whisperv6/simulator.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a6c57f5 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.json diff --git a/cmd/simulator/.gitignore b/cmd/simulator/.gitignore new file mode 100644 index 0000000..ece16d0 --- /dev/null +++ b/cmd/simulator/.gitignore @@ -0,0 +1,3 @@ +simulator +network.json +propagation.json diff --git a/cmd/simulator/README.md b/cmd/simulator/README.md new file mode 100644 index 0000000..ce624d9 --- /dev/null +++ b/cmd/simulator/README.md @@ -0,0 +1,4 @@ +# Graph Data Generator +--- + +Data generator generates different kinds of graph data, ready to use with this library. diff --git a/cmd/simulator/main.go b/cmd/simulator/main.go new file mode 100644 index 0000000..408c2ed --- /dev/null +++ b/cmd/simulator/main.go @@ -0,0 +1,64 @@ +package main + +import ( + "encoding/json" + "flag" + "log" + "os" + "time" + + "github.com/divan/graph-experiments/graph" + gethlog "github.com/ethereum/go-ethereum/log" + "github.com/status-im/simulator/simulation" + "github.com/status-im/simulator/simulation/naivep2p" + "github.com/status-im/simulator/simulation/whisperv6" +) + +func main() { + var ( + simType = flag.String("type", "whisperv6", "Type of simulators (naivep2p, whisperv6)") + ttl = flag.Int("ttl", 10, "Message TTL for simulation") + naiveP2PN = flag.Int("naivep2p.N", 3, "Number of peers to propagate (0..N of peers)") + naiveP2PDelay = flag.Duration("naivep2p.delay", 10*time.Millisecond, "Delay for each step") + input = flag.String("i", "network.json", "Input filename for pregenerated data to be used with simulation") + output = flag.String("o", "propagation.json", "Output filename for p2p sending data") + ggethlogLevel = flag.String("loglevel", "crit", "Geth log level for whisper simulator (crti, error, warn, info, debug, trace)") + ) + flag.Parse() + + data, err := graph.NewGraphFromJSON(*input) + if err != nil { + log.Fatal("Opening input file failed: ", err) + } + + fd, err := os.Create(*output) + if err != nil { + log.Fatal("Opening output file failed: ", err) + } + defer fd.Close() + + var sim simulation.Simulator + switch *simType { + case "naivep2p": + sim = naivep2p.NewSimulator(data, *naiveP2PN, *naiveP2PDelay) + case "whisperv6": + lvl, err := gethlog.LvlFromString(*ggethlogLevel) + if err != nil { + lvl = gethlog.LvlCrit + } + gethlog.Root().SetHandler(gethlog.LvlFilterHandler(lvl, gethlog.StreamHandler(os.Stderr, gethlog.TerminalFormat(true)))) + sim = whisperv6.NewSimulator(data) + default: + log.Fatal("Unknown simulation type: ", *simType) + } + defer sim.Stop() + + // Start simulation by sending single message + log.Printf("Starting message sending %s simulation for graph with %d nodes...", *simType, len(data.Nodes())) + sendData := sim.SendMessage(0, *ttl) + err = json.NewEncoder(fd).Encode(sendData) + if err != nil { + log.Fatal(err) + } + log.Printf("Written %s propagation data into %s", *simType, *output) +} diff --git a/recorder/recorder.go b/recorder/recorder.go new file mode 100644 index 0000000..f28c2d8 --- /dev/null +++ b/recorder/recorder.go @@ -0,0 +1,48 @@ +package simulations + +import ( + "fmt" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +// Recorder records send/receive events for +// generating further propagation log. +type Recorder struct { + Log []*LogEntry + + start time.Time + nodeMap map[discover.NodeID]int +} + +// NewRecorder inits new recorder. +func NewRecorder(nodeMap map[discover.NodeID]int) *Recorder { + return &Recorder{ + start: time.Now(), + nodeMap: nodeMap, + } +} + +func (r *Recorder) Reset() { + r.start = time.Now() +} + +func (r *Recorder) Send(from, to discover.NodeID) { + fromIdx, ok := r.nodeMap[from] + if !ok { + panic("node not found") + } + toIdx, ok := r.nodeMap[to] + if !ok { + panic("node not found") + } + log.Error("NewLogEntry", "start", r.start, "from", fromIdx, "to", toIdx) + e := NewLogEntry(r.start, fromIdx, toIdx) + r.Log = append(r.Log, e) +} + +func (r *Recorder) Receive(from, to discover.NodeID) { + log.Info("DidReceive", "from", from, "to", to) +} diff --git a/simulation/naivep2p/links.go b/simulation/naivep2p/links.go new file mode 100644 index 0000000..73d1d03 --- /dev/null +++ b/simulation/naivep2p/links.go @@ -0,0 +1,38 @@ +package naivep2p + +import ( + "github.com/divan/graph-experiments/graph" +) + +// LinkIndex stores link information in form of indexes, rather than nodes IP. +type LinkIndex struct { + From int + To int +} + +// PrecalculatePeers creates map with peers indexes for faster lookup. +func PrecalculatePeers(data *graph.Graph) map[int][]int { + links := data.Links() + + ret := make(map[int][]int) + for _, link := range links { + if link.From == link.To { + continue + } + if _, ok := ret[link.From]; !ok { + ret[link.From] = make([]int, 0) + } + if _, ok := ret[link.To]; !ok { + ret[link.To] = make([]int, 0) + } + + peers := ret[link.From] + peers = append(peers, link.To) + ret[link.From] = peers + + peers = ret[link.To] + peers = append(peers, link.From) + ret[link.To] = peers + } + return ret +} diff --git a/simulation/naivep2p/logentry.go b/simulation/naivep2p/logentry.go new file mode 100644 index 0000000..e169047 --- /dev/null +++ b/simulation/naivep2p/logentry.go @@ -0,0 +1,89 @@ +package naivep2p + +import ( + "fmt" + "log" + "time" + + "github.com/status-im/simulator/simulation" +) + +// LogEntry defines the reporting log entry for one +// p2p message sending. +type LogEntry struct { + From int + To int + Ts time.Duration +} + +// String implements Stringer interface for LogEntry. +func (l LogEntry) String() string { + return fmt.Sprintf("%s: %d -> %d", l.Ts.String(), l.From, l.To) +} + +// NewLogEntry creates new log entry. +func NewLogEntry(start time.Time, from, to int) LogEntry { + return LogEntry{ + Ts: time.Since(start) / time.Millisecond, + From: from, + To: to, + } +} + +// LogEntries2PropagationLog converts raw slice of LogEntries to PropagationLog, +// aggregating by timestamps and converting nodes indices to link indices. +// We expect that timestamps already bucketed into Nms groups. +func (s *Simulator) LogEntries2PropagationLog(entries []*LogEntry) *simulation.Log { + findLink := func(from, to int) int { + links := s.data.Links() + for i := range links { + if links[i].From == from && links[i].To == to || + links[i].To == from && links[i].From == to { + return i + } + } + return -1 + } + + tss := make(map[time.Duration][]int) + tsnodes := make(map[time.Duration][]int) + for _, entry := range entries { + idx := findLink(entry.From, entry.To) + if idx == -1 { + log.Println("[EE] Wrong link", entry) + continue + } + + // fill links map + if _, ok := tss[entry.Ts]; !ok { + tss[entry.Ts] = make([]int, 0) + } + + values := tss[entry.Ts] + values = append(values, idx) + tss[entry.Ts] = values + + // fill tsnodes map + if _, ok := tsnodes[entry.Ts]; !ok { + tsnodes[entry.Ts] = make([]int, 0) + } + nnodes := tsnodes[entry.Ts] + nnodes = append(nnodes, entry.From, entry.To) + tsnodes[entry.Ts] = nnodes + } + + var ret = &simulation.Log{ + Timestamps: make([]int, 0, len(tss)), + Indices: make([][]int, 0, len(tss)), + Nodes: make([][]int, 0, len(tss)), + } + + for ts, links := range tss { + ret.Timestamps = append(ret.Timestamps, int(ts)) + ret.Indices = append(ret.Indices, links) + ret.Nodes = append(ret.Nodes, tsnodes[ts]) + fmt.Println("Adding", ts*time.Millisecond, int(ts), links, tsnodes[ts]) + } + + return ret +} diff --git a/simulation/naivep2p/simulator.go b/simulation/naivep2p/simulator.go new file mode 100644 index 0000000..3c5e141 --- /dev/null +++ b/simulation/naivep2p/simulator.go @@ -0,0 +1,122 @@ +package naivep2p + +import ( + "sync" + "time" + + "github.com/divan/graph-experiments/graph" + "github.com/status-im/simulator/simulation" +) + +// Simulator is responsible for running propagation simulation. +type Simulator struct { + data *graph.Graph + delay time.Duration + peers map[int][]int + nodesCh []chan Message + reportCh chan LogEntry + peersToSendTo int // number of peers to propagate message + wg *sync.WaitGroup + simulationStart time.Time +} + +// Message represents the message propagated in the simulation. +type Message struct { + Content string + TTL int +} + +// NewSimulator initializes new simulator for the given graph data. +func NewSimulator(data *graph.Graph, N int, delay time.Duration) *Simulator { + nodeCount := len(data.Nodes()) + sim := &Simulator{ + data: data, + delay: delay, + peers: PrecalculatePeers(data), + peersToSendTo: N, + reportCh: make(chan LogEntry), + nodesCh: make([]chan Message, nodeCount), // one channel per node + wg: new(sync.WaitGroup), + } + sim.wg.Add(nodeCount) + for i := 0; i < nodeCount; i++ { + ch := sim.startNode(i) + sim.nodesCh[i] = ch // this channel will be used to talk to node by index + } + return sim +} + +// Stop stops simulator and frees all resources if any. +func (s *Simulator) Stop() error { + return nil +} + +func (s *Simulator) SendMessage(startNodeIdx, ttl int) *simulation.Log { + message := Message{ + Content: "dummy", + TTL: ttl, + } + s.simulationStart = time.Now() + s.propagateMessage(startNodeIdx, message) + + done := make(chan bool) + go func() { + s.wg.Wait() + done <- true + }() + + var ret []*LogEntry + for { + select { + case val := <-s.reportCh: + ret = append(ret, &val) + case <-done: + return s.LogEntries2PropagationLog(ret) + } + } +} + +func (s *Simulator) startNode(i int) chan Message { + ch := make(chan Message) + go s.runNode(i, ch) + return ch +} + +// runNode does actual node processing part +func (s *Simulator) runNode(i int, ch chan Message) { + defer s.wg.Done() + t := time.NewTimer(10 * time.Second) + + cache := make(map[string]bool) + for { + select { + case message := <-ch: + if cache[message.Content] { + continue + } + cache[message.Content] = true + message.TTL-- + if message.TTL == 0 { + return + } + s.propagateMessage(i, message) + case <-t.C: + return + } + } +} + +// propagateMessage simulates message sending from node to its peers. +func (s *Simulator) propagateMessage(from int, message Message) { + time.Sleep(s.delay) + peers := s.peers[from] + for i := range peers { + go s.sendMessage(from, peers[i], message) + } +} + +// sendMessage simulates message sending for given from and to indexes. +func (s *Simulator) sendMessage(from, to int, message Message) { + s.nodesCh[to] <- message + s.reportCh <- NewLogEntry(s.simulationStart, from, to) +} diff --git a/simulation/simulation.go b/simulation/simulation.go new file mode 100644 index 0000000..25c5502 --- /dev/null +++ b/simulation/simulation.go @@ -0,0 +1,15 @@ +package simulation + +// Simulator defines the simulators for message propagation within the graph. +type Simulator interface { + SendMessage(idx, ttl int) *Log + Stop() error +} + +// Log represnts log of p2p message propagation +// with relative timestamps (starting from T0). +type Log struct { + Timestamps []int // timestamps in milliseconds starting from T0 + Indices [][]int // indices of links for each step, len should be equal to len of Timestamps field + Nodes [][]int // indices of nodes involved in each step +} diff --git a/simulation/whisperv6/logentry.go b/simulation/whisperv6/logentry.go new file mode 100644 index 0000000..79e3dd6 --- /dev/null +++ b/simulation/whisperv6/logentry.go @@ -0,0 +1,28 @@ +package whisperv6 + +import ( + "fmt" + "time" +) + +// LogEntry defines the reporting log entry for one +// p2p message sending. +type LogEntry struct { + From int + To int + Ts time.Duration +} + +// String implements Stringer interface for LogEntry. +func (l LogEntry) String() string { + return fmt.Sprintf("%s: %d -> %d", l.Ts.String(), l.From, l.To) +} + +// NewLogEntry creates new log entry. +func NewLogEntry(start time.Time, from, to int) *LogEntry { + return &LogEntry{ + Ts: time.Since(start) / time.Millisecond, + From: from, + To: to, + } +} diff --git a/simulation/whisperv6/message.go b/simulation/whisperv6/message.go new file mode 100644 index 0000000..b3603f5 --- /dev/null +++ b/simulation/whisperv6/message.go @@ -0,0 +1,31 @@ +package whisperv6 + +import ( + "math/rand" + + "github.com/ethereum/go-ethereum/whisper/whisperv6" +) + +// const from github.com/ethereum/go-ethereum/whisper/whisperv5/doc.go +const ( + aesKeyLength = 32 +) + +func generateMessage(ttl int, symkeyID string) *whisperv6.NewMessage { + // set all the parameters except p.Dst and p.Padding + buf := make([]byte, 4) + rand.Read(buf) + sz := rand.Intn(400) + + msg := &whisperv6.NewMessage{ + PowTarget: 0.01, + PowTime: 1, + Payload: make([]byte, sz), + SymKeyID: symkeyID, + Topic: whisperv6.BytesToTopic(buf), + TTL: uint32(ttl), + } + rand.Read(msg.Payload) + + return msg +} diff --git a/simulation/whisperv6/simulator.go b/simulation/whisperv6/simulator.go new file mode 100644 index 0000000..bd13c6f --- /dev/null +++ b/simulation/whisperv6/simulator.go @@ -0,0 +1,283 @@ +package whisperv6 + +import ( + "fmt" + "log" + "math/rand" + "time" + + "github.com/divan/graph-experiments/graph" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" + "github.com/status-im/simulator/simulation" +) + +// Simulator simulates WhisperV6 message propagation through the +// given p2p network. +type Simulator struct { + data *graph.Graph + network *simulations.Network + whispers map[discover.NodeID]*whisper.Whisper +} + +// NewSimulator intializes simulator for the given graph data. +func NewSimulator(data *graph.Graph) *Simulator { + rand.Seed(time.Now().UnixNano()) + + cfg := &whisper.Config{ + MaxMessageSize: whisper.DefaultMaxMessageSize, + MinimumAcceptedPOW: 0.001, + } + + whispers := make(map[discover.NodeID]*whisper.Whisper, len(data.Nodes())) + services := map[string]adapters.ServiceFunc{ + "shh": func(ctx *adapters.ServiceContext) (node.Service, error) { + return whispers[ctx.Config.ID], nil + }, + } + adapters.RegisterServices(services) + + adapter := adapters.NewSimAdapter(services) + network := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + DefaultService: "shh", + }) + + nodes := data.Nodes() + nodeCount := len(nodes) + sim := &Simulator{ + data: data, + network: network, + } + + log.Println("Creating nodes...") + for i := 0; i < nodeCount; i++ { + node, err := sim.network.NewNodeWithConfig(nodeConfig(i)) + if err != nil { + panic(err) + } + // it's important to init whisper service here, as it + // be initialized for each peer + log.Println("Generating new whisper: ", node.ID()) + service := whisper.New(cfg) + whispers[node.ID()] = service + } + + log.Println("Starting nodes...") + if err := network.StartAll(); err != nil { + panic(err) + } + + // subscribing to network events + events := make(chan *simulations.Event) + sub := sim.network.Events().Subscribe(events) + defer sub.Unsubscribe() + + go func() { + log.Println("Connecting nodes...") + for _, link := range data.Links() { + node1 := sim.network.Nodes[link.From] + node2 := sim.network.Nodes[link.To] + // if connection already exists, skip it, as network.Connect will fail + if network.GetConn(node1.ID(), node2.ID()) != nil { + continue + } + if err := network.Connect(node1.ID(), node2.ID()); err != nil { + panic(err) + } + } + }() + + // wait for all nodes to establish connections + var connected int + var subErr error + for connected < len(data.Links()) && subErr == nil { + select { + case event := <-events: + if event.Type == simulations.EventTypeConn { + if event.Conn.Up { + fmt.Println("Got connection", event) + connected++ + } + } + case e := <-sub.Err(): + subErr = e + log.Fatal("Failed to connect nodes", subErr) + } + } + log.Println("All connections established") + + return sim +} + +// Stop stops simulator and frees all resources if any. +func (s *Simulator) Stop() error { + log.Println("Shutting down simulation nodes...") + s.network.Shutdown() + return nil +} + +// SendMessage sends single message and tracks propagation. Implements simulator.Interface. +func (s *Simulator) SendMessage(startNodeIdx, ttl int) *simulation.Log { + node := s.network.Nodes[startNodeIdx] + + // the easiest way to send a message through the node is + // by using its public RPC methods - ssh_post. + client, err := node.Client() + if err != nil { + log.Fatal("Failed getting client", err) + } + + log.Printf(" Sending Whisper message from %s...\n", node.ID().String()) + + var symkeyID string + symKey := make([]byte, aesKeyLength) + rand.Read(symKey) + + err = client.Call(&symkeyID, "shh_addSymKey", hexutil.Bytes(symKey)) + + // subscribing to network events + events := make(chan *simulations.Event) + sub := s.network.Events().Subscribe(events) + defer sub.Unsubscribe() + + ticker := time.NewTicker(500 * time.Millisecond) + start := time.Now() + + msg := generateMessage(ttl, symkeyID) + var ignored bool + err = client.Call(&ignored, "shh_post", msg) + + // pre-cache node indexes + var ncache = make(map[discover.NodeID]int) + for i := range s.network.Nodes { + ncache[s.network.Nodes[i].ID()] = i + } + + var ( + subErr error + done bool + count int + plog []*LogEntry + ) + for subErr == nil && !done { + select { + case event := <-events: + if event.Type == simulations.EventTypeMsg { + msg := event.Msg + if msg.Code == 1 && msg.Protocol == "shh" && msg.Received == false { + from := ncache[msg.One] + to := ncache[msg.Other] + entry := NewLogEntry(start, from, to) + plog = append(plog, entry) + count++ + } + } + case <-ticker.C: + if count == 0 { + done = true + } else { + count = 0 + } + case e := <-sub.Err(): + subErr = e + } + } + if subErr != nil { + log.Fatal("Failed to collect propagation info", subErr) + } + return s.LogEntries2PropagationLog(plog) +} + +// LogEntries2PropagationLog converts raw slice of LogEntries to PropagationLog, +// aggregating by timestamps and converting nodes indices to link indices. +// We expect that timestamps already bucketed into Nms groups. +func (s *Simulator) LogEntries2PropagationLog(entries []*LogEntry) *simulation.Log { + links := s.data.Links() + findLink := func(from, to int) int { + for i := range links { + if links[i].From == from && links[i].To == to || + links[i].To == from && links[i].From == to { + return i + } + } + return -1 + } + + tss := make(map[time.Duration][]int) + tsnodes := make(map[time.Duration][]int) + for _, entry := range entries { + idx := findLink(entry.From, entry.To) + if idx == -1 { + log.Println("[EE] Wrong link", entry) + continue + } + + // fill links map + if _, ok := tss[entry.Ts]; !ok { + tss[entry.Ts] = make([]int, 0) + } + + values := tss[entry.Ts] + values = append(values, idx) + tss[entry.Ts] = values + + // fill tsnodes map + if _, ok := tsnodes[entry.Ts]; !ok { + tsnodes[entry.Ts] = make([]int, 0) + } + nnodes := tsnodes[entry.Ts] + nnodes = append(nnodes, entry.From, entry.To) + tsnodes[entry.Ts] = nnodes + } + + var ret = &simulation.Log{ + Timestamps: make([]int, 0, len(tss)), + Indices: make([][]int, 0, len(tss)), + Nodes: make([][]int, 0, len(tss)), + } + + for ts, links := range tss { + ret.Timestamps = append(ret.Timestamps, int(ts)) + ret.Indices = append(ret.Indices, links) + ret.Nodes = append(ret.Nodes, tsnodes[ts]) + fmt.Println("Adding", ts*time.Millisecond, int(ts), links, tsnodes[ts]) + } + + return ret +} + +// nodeConfig generates config for simulated node with random key. +func nodeConfig(idx int) *adapters.NodeConfig { + key, err := crypto.GenerateKey() + if err != nil { + panic("unable to generate key") + } + var id discover.NodeID + pubkey := crypto.FromECDSAPub(&key.PublicKey) + copy(id[:], pubkey[1:]) + return &adapters.NodeConfig{ + ID: id, + PrivateKey: key, + Name: nodeIdxToName(idx), + } +} + +func nodeIdxToName(id int) string { + return fmt.Sprintf("Node %d", id) +} + +// findNode is a helper for finding node index by it's ID. +// TODO: remove this when links replaces into indexes. +func findNode(nodes []graph.Node, ID string) (int, error) { + for i := range nodes { + if nodes[i].ID() == ID { + return i, nil + } + } + return -1, fmt.Errorf("Node with ID '%s' not found", ID) +}