Initial commit

This commit is contained in:
Ivan Danyliuk 2018-04-17 13:10:35 +02:00
commit 0af715b62b
No known key found for this signature in database
GPG Key ID: 97ED33CE024E1DBF
12 changed files with 726 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.json

3
cmd/simulator/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
simulator
network.json
propagation.json

4
cmd/simulator/README.md Normal file
View File

@ -0,0 +1,4 @@
# Graph Data Generator
---
Data generator generates different kinds of graph data, ready to use with this library.

64
cmd/simulator/main.go Normal file
View File

@ -0,0 +1,64 @@
package main
import (
"encoding/json"
"flag"
"log"
"os"
"time"
"github.com/divan/graph-experiments/graph"
gethlog "github.com/ethereum/go-ethereum/log"
"github.com/status-im/simulator/simulation"
"github.com/status-im/simulator/simulation/naivep2p"
"github.com/status-im/simulator/simulation/whisperv6"
)
func main() {
var (
simType = flag.String("type", "whisperv6", "Type of simulators (naivep2p, whisperv6)")
ttl = flag.Int("ttl", 10, "Message TTL for simulation")
naiveP2PN = flag.Int("naivep2p.N", 3, "Number of peers to propagate (0..N of peers)")
naiveP2PDelay = flag.Duration("naivep2p.delay", 10*time.Millisecond, "Delay for each step")
input = flag.String("i", "network.json", "Input filename for pregenerated data to be used with simulation")
output = flag.String("o", "propagation.json", "Output filename for p2p sending data")
ggethlogLevel = flag.String("loglevel", "crit", "Geth log level for whisper simulator (crti, error, warn, info, debug, trace)")
)
flag.Parse()
data, err := graph.NewGraphFromJSON(*input)
if err != nil {
log.Fatal("Opening input file failed: ", err)
}
fd, err := os.Create(*output)
if err != nil {
log.Fatal("Opening output file failed: ", err)
}
defer fd.Close()
var sim simulation.Simulator
switch *simType {
case "naivep2p":
sim = naivep2p.NewSimulator(data, *naiveP2PN, *naiveP2PDelay)
case "whisperv6":
lvl, err := gethlog.LvlFromString(*ggethlogLevel)
if err != nil {
lvl = gethlog.LvlCrit
}
gethlog.Root().SetHandler(gethlog.LvlFilterHandler(lvl, gethlog.StreamHandler(os.Stderr, gethlog.TerminalFormat(true))))
sim = whisperv6.NewSimulator(data)
default:
log.Fatal("Unknown simulation type: ", *simType)
}
defer sim.Stop()
// Start simulation by sending single message
log.Printf("Starting message sending %s simulation for graph with %d nodes...", *simType, len(data.Nodes()))
sendData := sim.SendMessage(0, *ttl)
err = json.NewEncoder(fd).Encode(sendData)
if err != nil {
log.Fatal(err)
}
log.Printf("Written %s propagation data into %s", *simType, *output)
}

48
recorder/recorder.go Normal file
View File

@ -0,0 +1,48 @@
package simulations
import (
"fmt"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
)
// Recorder records send/receive events for
// generating further propagation log.
type Recorder struct {
Log []*LogEntry
start time.Time
nodeMap map[discover.NodeID]int
}
// NewRecorder inits new recorder.
func NewRecorder(nodeMap map[discover.NodeID]int) *Recorder {
return &Recorder{
start: time.Now(),
nodeMap: nodeMap,
}
}
func (r *Recorder) Reset() {
r.start = time.Now()
}
func (r *Recorder) Send(from, to discover.NodeID) {
fromIdx, ok := r.nodeMap[from]
if !ok {
panic("node not found")
}
toIdx, ok := r.nodeMap[to]
if !ok {
panic("node not found")
}
log.Error("NewLogEntry", "start", r.start, "from", fromIdx, "to", toIdx)
e := NewLogEntry(r.start, fromIdx, toIdx)
r.Log = append(r.Log, e)
}
func (r *Recorder) Receive(from, to discover.NodeID) {
log.Info("DidReceive", "from", from, "to", to)
}

View File

@ -0,0 +1,38 @@
package naivep2p
import (
"github.com/divan/graph-experiments/graph"
)
// LinkIndex stores link information in form of indexes, rather than nodes IP.
type LinkIndex struct {
From int
To int
}
// PrecalculatePeers creates map with peers indexes for faster lookup.
func PrecalculatePeers(data *graph.Graph) map[int][]int {
links := data.Links()
ret := make(map[int][]int)
for _, link := range links {
if link.From == link.To {
continue
}
if _, ok := ret[link.From]; !ok {
ret[link.From] = make([]int, 0)
}
if _, ok := ret[link.To]; !ok {
ret[link.To] = make([]int, 0)
}
peers := ret[link.From]
peers = append(peers, link.To)
ret[link.From] = peers
peers = ret[link.To]
peers = append(peers, link.From)
ret[link.To] = peers
}
return ret
}

View File

@ -0,0 +1,89 @@
package naivep2p
import (
"fmt"
"log"
"time"
"github.com/status-im/simulator/simulation"
)
// LogEntry defines the reporting log entry for one
// p2p message sending.
type LogEntry struct {
From int
To int
Ts time.Duration
}
// String implements Stringer interface for LogEntry.
func (l LogEntry) String() string {
return fmt.Sprintf("%s: %d -> %d", l.Ts.String(), l.From, l.To)
}
// NewLogEntry creates new log entry.
func NewLogEntry(start time.Time, from, to int) LogEntry {
return LogEntry{
Ts: time.Since(start) / time.Millisecond,
From: from,
To: to,
}
}
// LogEntries2PropagationLog converts raw slice of LogEntries to PropagationLog,
// aggregating by timestamps and converting nodes indices to link indices.
// We expect that timestamps already bucketed into Nms groups.
func (s *Simulator) LogEntries2PropagationLog(entries []*LogEntry) *simulation.Log {
findLink := func(from, to int) int {
links := s.data.Links()
for i := range links {
if links[i].From == from && links[i].To == to ||
links[i].To == from && links[i].From == to {
return i
}
}
return -1
}
tss := make(map[time.Duration][]int)
tsnodes := make(map[time.Duration][]int)
for _, entry := range entries {
idx := findLink(entry.From, entry.To)
if idx == -1 {
log.Println("[EE] Wrong link", entry)
continue
}
// fill links map
if _, ok := tss[entry.Ts]; !ok {
tss[entry.Ts] = make([]int, 0)
}
values := tss[entry.Ts]
values = append(values, idx)
tss[entry.Ts] = values
// fill tsnodes map
if _, ok := tsnodes[entry.Ts]; !ok {
tsnodes[entry.Ts] = make([]int, 0)
}
nnodes := tsnodes[entry.Ts]
nnodes = append(nnodes, entry.From, entry.To)
tsnodes[entry.Ts] = nnodes
}
var ret = &simulation.Log{
Timestamps: make([]int, 0, len(tss)),
Indices: make([][]int, 0, len(tss)),
Nodes: make([][]int, 0, len(tss)),
}
for ts, links := range tss {
ret.Timestamps = append(ret.Timestamps, int(ts))
ret.Indices = append(ret.Indices, links)
ret.Nodes = append(ret.Nodes, tsnodes[ts])
fmt.Println("Adding", ts*time.Millisecond, int(ts), links, tsnodes[ts])
}
return ret
}

View File

@ -0,0 +1,122 @@
package naivep2p
import (
"sync"
"time"
"github.com/divan/graph-experiments/graph"
"github.com/status-im/simulator/simulation"
)
// Simulator is responsible for running propagation simulation.
type Simulator struct {
data *graph.Graph
delay time.Duration
peers map[int][]int
nodesCh []chan Message
reportCh chan LogEntry
peersToSendTo int // number of peers to propagate message
wg *sync.WaitGroup
simulationStart time.Time
}
// Message represents the message propagated in the simulation.
type Message struct {
Content string
TTL int
}
// NewSimulator initializes new simulator for the given graph data.
func NewSimulator(data *graph.Graph, N int, delay time.Duration) *Simulator {
nodeCount := len(data.Nodes())
sim := &Simulator{
data: data,
delay: delay,
peers: PrecalculatePeers(data),
peersToSendTo: N,
reportCh: make(chan LogEntry),
nodesCh: make([]chan Message, nodeCount), // one channel per node
wg: new(sync.WaitGroup),
}
sim.wg.Add(nodeCount)
for i := 0; i < nodeCount; i++ {
ch := sim.startNode(i)
sim.nodesCh[i] = ch // this channel will be used to talk to node by index
}
return sim
}
// Stop stops simulator and frees all resources if any.
func (s *Simulator) Stop() error {
return nil
}
func (s *Simulator) SendMessage(startNodeIdx, ttl int) *simulation.Log {
message := Message{
Content: "dummy",
TTL: ttl,
}
s.simulationStart = time.Now()
s.propagateMessage(startNodeIdx, message)
done := make(chan bool)
go func() {
s.wg.Wait()
done <- true
}()
var ret []*LogEntry
for {
select {
case val := <-s.reportCh:
ret = append(ret, &val)
case <-done:
return s.LogEntries2PropagationLog(ret)
}
}
}
func (s *Simulator) startNode(i int) chan Message {
ch := make(chan Message)
go s.runNode(i, ch)
return ch
}
// runNode does actual node processing part
func (s *Simulator) runNode(i int, ch chan Message) {
defer s.wg.Done()
t := time.NewTimer(10 * time.Second)
cache := make(map[string]bool)
for {
select {
case message := <-ch:
if cache[message.Content] {
continue
}
cache[message.Content] = true
message.TTL--
if message.TTL == 0 {
return
}
s.propagateMessage(i, message)
case <-t.C:
return
}
}
}
// propagateMessage simulates message sending from node to its peers.
func (s *Simulator) propagateMessage(from int, message Message) {
time.Sleep(s.delay)
peers := s.peers[from]
for i := range peers {
go s.sendMessage(from, peers[i], message)
}
}
// sendMessage simulates message sending for given from and to indexes.
func (s *Simulator) sendMessage(from, to int, message Message) {
s.nodesCh[to] <- message
s.reportCh <- NewLogEntry(s.simulationStart, from, to)
}

15
simulation/simulation.go Normal file
View File

@ -0,0 +1,15 @@
package simulation
// Simulator defines the simulators for message propagation within the graph.
type Simulator interface {
SendMessage(idx, ttl int) *Log
Stop() error
}
// Log represnts log of p2p message propagation
// with relative timestamps (starting from T0).
type Log struct {
Timestamps []int // timestamps in milliseconds starting from T0
Indices [][]int // indices of links for each step, len should be equal to len of Timestamps field
Nodes [][]int // indices of nodes involved in each step
}

View File

@ -0,0 +1,28 @@
package whisperv6
import (
"fmt"
"time"
)
// LogEntry defines the reporting log entry for one
// p2p message sending.
type LogEntry struct {
From int
To int
Ts time.Duration
}
// String implements Stringer interface for LogEntry.
func (l LogEntry) String() string {
return fmt.Sprintf("%s: %d -> %d", l.Ts.String(), l.From, l.To)
}
// NewLogEntry creates new log entry.
func NewLogEntry(start time.Time, from, to int) *LogEntry {
return &LogEntry{
Ts: time.Since(start) / time.Millisecond,
From: from,
To: to,
}
}

View File

@ -0,0 +1,31 @@
package whisperv6
import (
"math/rand"
"github.com/ethereum/go-ethereum/whisper/whisperv6"
)
// const from github.com/ethereum/go-ethereum/whisper/whisperv5/doc.go
const (
aesKeyLength = 32
)
func generateMessage(ttl int, symkeyID string) *whisperv6.NewMessage {
// set all the parameters except p.Dst and p.Padding
buf := make([]byte, 4)
rand.Read(buf)
sz := rand.Intn(400)
msg := &whisperv6.NewMessage{
PowTarget: 0.01,
PowTime: 1,
Payload: make([]byte, sz),
SymKeyID: symkeyID,
Topic: whisperv6.BytesToTopic(buf),
TTL: uint32(ttl),
}
rand.Read(msg.Payload)
return msg
}

View File

@ -0,0 +1,283 @@
package whisperv6
import (
"fmt"
"log"
"math/rand"
"time"
"github.com/divan/graph-experiments/graph"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/simulator/simulation"
)
// Simulator simulates WhisperV6 message propagation through the
// given p2p network.
type Simulator struct {
data *graph.Graph
network *simulations.Network
whispers map[discover.NodeID]*whisper.Whisper
}
// NewSimulator intializes simulator for the given graph data.
func NewSimulator(data *graph.Graph) *Simulator {
rand.Seed(time.Now().UnixNano())
cfg := &whisper.Config{
MaxMessageSize: whisper.DefaultMaxMessageSize,
MinimumAcceptedPOW: 0.001,
}
whispers := make(map[discover.NodeID]*whisper.Whisper, len(data.Nodes()))
services := map[string]adapters.ServiceFunc{
"shh": func(ctx *adapters.ServiceContext) (node.Service, error) {
return whispers[ctx.Config.ID], nil
},
}
adapters.RegisterServices(services)
adapter := adapters.NewSimAdapter(services)
network := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
DefaultService: "shh",
})
nodes := data.Nodes()
nodeCount := len(nodes)
sim := &Simulator{
data: data,
network: network,
}
log.Println("Creating nodes...")
for i := 0; i < nodeCount; i++ {
node, err := sim.network.NewNodeWithConfig(nodeConfig(i))
if err != nil {
panic(err)
}
// it's important to init whisper service here, as it
// be initialized for each peer
log.Println("Generating new whisper: ", node.ID())
service := whisper.New(cfg)
whispers[node.ID()] = service
}
log.Println("Starting nodes...")
if err := network.StartAll(); err != nil {
panic(err)
}
// subscribing to network events
events := make(chan *simulations.Event)
sub := sim.network.Events().Subscribe(events)
defer sub.Unsubscribe()
go func() {
log.Println("Connecting nodes...")
for _, link := range data.Links() {
node1 := sim.network.Nodes[link.From]
node2 := sim.network.Nodes[link.To]
// if connection already exists, skip it, as network.Connect will fail
if network.GetConn(node1.ID(), node2.ID()) != nil {
continue
}
if err := network.Connect(node1.ID(), node2.ID()); err != nil {
panic(err)
}
}
}()
// wait for all nodes to establish connections
var connected int
var subErr error
for connected < len(data.Links()) && subErr == nil {
select {
case event := <-events:
if event.Type == simulations.EventTypeConn {
if event.Conn.Up {
fmt.Println("Got connection", event)
connected++
}
}
case e := <-sub.Err():
subErr = e
log.Fatal("Failed to connect nodes", subErr)
}
}
log.Println("All connections established")
return sim
}
// Stop stops simulator and frees all resources if any.
func (s *Simulator) Stop() error {
log.Println("Shutting down simulation nodes...")
s.network.Shutdown()
return nil
}
// SendMessage sends single message and tracks propagation. Implements simulator.Interface.
func (s *Simulator) SendMessage(startNodeIdx, ttl int) *simulation.Log {
node := s.network.Nodes[startNodeIdx]
// the easiest way to send a message through the node is
// by using its public RPC methods - ssh_post.
client, err := node.Client()
if err != nil {
log.Fatal("Failed getting client", err)
}
log.Printf(" Sending Whisper message from %s...\n", node.ID().String())
var symkeyID string
symKey := make([]byte, aesKeyLength)
rand.Read(symKey)
err = client.Call(&symkeyID, "shh_addSymKey", hexutil.Bytes(symKey))
// subscribing to network events
events := make(chan *simulations.Event)
sub := s.network.Events().Subscribe(events)
defer sub.Unsubscribe()
ticker := time.NewTicker(500 * time.Millisecond)
start := time.Now()
msg := generateMessage(ttl, symkeyID)
var ignored bool
err = client.Call(&ignored, "shh_post", msg)
// pre-cache node indexes
var ncache = make(map[discover.NodeID]int)
for i := range s.network.Nodes {
ncache[s.network.Nodes[i].ID()] = i
}
var (
subErr error
done bool
count int
plog []*LogEntry
)
for subErr == nil && !done {
select {
case event := <-events:
if event.Type == simulations.EventTypeMsg {
msg := event.Msg
if msg.Code == 1 && msg.Protocol == "shh" && msg.Received == false {
from := ncache[msg.One]
to := ncache[msg.Other]
entry := NewLogEntry(start, from, to)
plog = append(plog, entry)
count++
}
}
case <-ticker.C:
if count == 0 {
done = true
} else {
count = 0
}
case e := <-sub.Err():
subErr = e
}
}
if subErr != nil {
log.Fatal("Failed to collect propagation info", subErr)
}
return s.LogEntries2PropagationLog(plog)
}
// LogEntries2PropagationLog converts raw slice of LogEntries to PropagationLog,
// aggregating by timestamps and converting nodes indices to link indices.
// We expect that timestamps already bucketed into Nms groups.
func (s *Simulator) LogEntries2PropagationLog(entries []*LogEntry) *simulation.Log {
links := s.data.Links()
findLink := func(from, to int) int {
for i := range links {
if links[i].From == from && links[i].To == to ||
links[i].To == from && links[i].From == to {
return i
}
}
return -1
}
tss := make(map[time.Duration][]int)
tsnodes := make(map[time.Duration][]int)
for _, entry := range entries {
idx := findLink(entry.From, entry.To)
if idx == -1 {
log.Println("[EE] Wrong link", entry)
continue
}
// fill links map
if _, ok := tss[entry.Ts]; !ok {
tss[entry.Ts] = make([]int, 0)
}
values := tss[entry.Ts]
values = append(values, idx)
tss[entry.Ts] = values
// fill tsnodes map
if _, ok := tsnodes[entry.Ts]; !ok {
tsnodes[entry.Ts] = make([]int, 0)
}
nnodes := tsnodes[entry.Ts]
nnodes = append(nnodes, entry.From, entry.To)
tsnodes[entry.Ts] = nnodes
}
var ret = &simulation.Log{
Timestamps: make([]int, 0, len(tss)),
Indices: make([][]int, 0, len(tss)),
Nodes: make([][]int, 0, len(tss)),
}
for ts, links := range tss {
ret.Timestamps = append(ret.Timestamps, int(ts))
ret.Indices = append(ret.Indices, links)
ret.Nodes = append(ret.Nodes, tsnodes[ts])
fmt.Println("Adding", ts*time.Millisecond, int(ts), links, tsnodes[ts])
}
return ret
}
// nodeConfig generates config for simulated node with random key.
func nodeConfig(idx int) *adapters.NodeConfig {
key, err := crypto.GenerateKey()
if err != nil {
panic("unable to generate key")
}
var id discover.NodeID
pubkey := crypto.FromECDSAPub(&key.PublicKey)
copy(id[:], pubkey[1:])
return &adapters.NodeConfig{
ID: id,
PrivateKey: key,
Name: nodeIdxToName(idx),
}
}
func nodeIdxToName(id int) string {
return fmt.Sprintf("Node %d", id)
}
// findNode is a helper for finding node index by it's ID.
// TODO: remove this when links replaces into indexes.
func findNode(nodes []graph.Node, ID string) (int, error) {
for i := range nodes {
if nodes[i].ID() == ID {
return i, nil
}
}
return -1, fmt.Errorf("Node with ID '%s' not found", ID)
}