From ebe2d9d872c5482e02508f1d3e9c3a56e8a41d44 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 8 Dec 2014 12:43:33 +0100 Subject: [PATCH] First draft of Whisper messages relaying --- whisper/envelope.go | 96 ++++++++++++++++++++++++++ whisper/main.go | 46 +++++++++++++ whisper/message.go | 15 +++++ whisper/peer.go | 114 +++++++++++++++++++++++++++++++ whisper/sort.go | 25 +++++++ whisper/sort_test.go | 19 ++++++ whisper/whisper.go | 157 +++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 472 insertions(+) create mode 100644 whisper/envelope.go create mode 100644 whisper/main.go create mode 100644 whisper/message.go create mode 100644 whisper/peer.go create mode 100644 whisper/sort.go create mode 100644 whisper/sort_test.go create mode 100644 whisper/whisper.go diff --git a/whisper/envelope.go b/whisper/envelope.go new file mode 100644 index 000000000..f9254843c --- /dev/null +++ b/whisper/envelope.go @@ -0,0 +1,96 @@ +package whisper + +import ( + "bytes" + "encoding/binary" + "io" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + DefaultTtl = 50 * time.Second +) + +type Envelope struct { + Expiry int32 // Whisper protocol specifies int32, really should be int64 + Ttl int32 // ^^^^^^ + Topics [][]byte + Data []byte + Nonce uint32 + + hash Hash +} + +func NewEnvelopeFromReader(reader io.Reader) (*Envelope, error) { + var envelope Envelope + + buf := new(bytes.Buffer) + buf.ReadFrom(reader) + + h := H(crypto.Sha3(buf.Bytes())) + if err := rlp.Decode(buf, &envelope); err != nil { + return nil, err + } + + envelope.hash = h + + return &envelope, nil +} + +func (self *Envelope) Hash() Hash { + if self.hash == EmptyHash { + self.hash = H(crypto.Sha3(ethutil.Encode(self))) + } + + return self.hash +} + +func NewEnvelope(ttl time.Duration, topics [][]byte, data *Message) *Envelope { + exp := time.Now().Add(ttl) + + return &Envelope{int32(exp.Unix()), int32(ttl.Seconds()), topics, data.Bytes(), 0, Hash{}} +} + +func (self *Envelope) Seal() { + self.proveWork(DefaultTtl) +} + +func (self *Envelope) proveWork(dura time.Duration) { + var bestBit int + d := make([]byte, 64) + copy(d[:32], ethutil.Encode(self.withoutNonce())) + + then := time.Now().Add(dura).UnixNano() + for n := uint32(0); time.Now().UnixNano() < then; { + for i := 0; i < 1024; i++ { + binary.BigEndian.PutUint32(d[60:], n) + + fbs := ethutil.FirstBitSet(ethutil.BigD(crypto.Sha3(d))) + if fbs > bestBit { + bestBit = fbs + self.Nonce = n + } + + n++ + } + } +} + +func (self *Envelope) valid() bool { + d := make([]byte, 64) + copy(d[:32], ethutil.Encode(self.withoutNonce())) + binary.BigEndian.PutUint32(d[60:], self.Nonce) + return ethutil.FirstBitSet(ethutil.BigD(crypto.Sha3(d))) > 0 +} + +func (self *Envelope) withoutNonce() interface{} { + return []interface{}{self.Expiry, self.Ttl, ethutil.ByteSliceToInterface(self.Topics), self.Data} +} + +func (self *Envelope) RlpData() interface{} { + return []interface{}{self.Expiry, self.Ttl, ethutil.ByteSliceToInterface(self.Topics), self.Data, self.Nonce} +} diff --git a/whisper/main.go b/whisper/main.go new file mode 100644 index 000000000..3868f604f --- /dev/null +++ b/whisper/main.go @@ -0,0 +1,46 @@ +// +build none + +package main + +import ( + "fmt" + "log" + "net" + "os" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/whisper" + "github.com/obscuren/secp256k1-go" +) + +func main() { + logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel)) + + pub, sec := secp256k1.GenerateKeyPair() + + whisper := whisper.New(pub, sec) + + srv := p2p.Server{ + MaxPeers: 10, + Identity: p2p.NewSimpleClientIdentity("whisper-go", "1.0", "", string(pub)), + ListenAddr: ":30303", + NAT: p2p.UPNP(), + + Protocols: []p2p.Protocol{whisper.Protocol()}, + } + if err := srv.Start(); err != nil { + fmt.Println("could not start server:", err) + os.Exit(1) + } + + // add seed peers + seed, err := net.ResolveTCPAddr("tcp", "poc-7.ethdev.com:30300") + if err != nil { + fmt.Println("couldn't resolve:", err) + os.Exit(1) + } + srv.SuggestPeer(seed.IP, seed.Port, nil) + + select {} +} diff --git a/whisper/message.go b/whisper/message.go new file mode 100644 index 000000000..21cf163e6 --- /dev/null +++ b/whisper/message.go @@ -0,0 +1,15 @@ +package whisper + +type Message struct { + Flags byte + Signature []byte + Payload []byte +} + +func NewMessage(payload []byte) *Message { + return &Message{Flags: 0, Payload: payload} +} + +func (self *Message) Bytes() []byte { + return append([]byte{self.Flags}, append(self.Signature, self.Payload...)...) +} diff --git a/whisper/peer.go b/whisper/peer.go new file mode 100644 index 000000000..5fe50ba59 --- /dev/null +++ b/whisper/peer.go @@ -0,0 +1,114 @@ +package whisper + +import ( + "fmt" + "io/ioutil" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "gopkg.in/fatih/set.v0" +) + +const ( + protocolVersion = 0x02 +) + +type peer struct { + host *Whisper + peer *p2p.Peer + ws p2p.MsgReadWriter + + // XXX Eventually this is going to reach exceptional large space. We need an expiry here + known *set.Set + + quit chan struct{} +} + +func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer { + return &peer{host, p, ws, set.New(), make(chan struct{})} +} + +func (self *peer) init() error { + if err := self.handleStatus(); err != nil { + return err + } + + return nil +} + +func (self *peer) start() { + go self.update() +} + +func (self *peer) update() { + relay := time.NewTicker(300 * time.Millisecond) +out: + for { + select { + case <-relay.C: + err := self.broadcast(self.host.envelopes()) + if err != nil { + self.peer.Infoln(err) + break out + } + + case <-self.quit: + break out + } + } +} + +func (self *peer) broadcast(envelopes []*Envelope) error { + envs := make([]interface{}, len(envelopes)) + i := 0 + for _, envelope := range envelopes { + if !self.known.Has(envelope.Hash()) { + envs[i] = envelope + self.known.Add(envelope.Hash()) + i++ + } + } + + msg := p2p.NewMsg(envelopesMsg, envs[:i]...) + if err := self.ws.WriteMsg(msg); err != nil { + return err + } + + return nil +} + +func (self *peer) handleStatus() error { + ws := self.ws + + if err := ws.WriteMsg(self.statusMsg()); err != nil { + return err + } + + msg, err := ws.ReadMsg() + if err != nil { + return err + } + + if msg.Code != statusMsg { + return fmt.Errorf("peer send %x before status msg", msg.Code) + } + + data, err := ioutil.ReadAll(msg.Payload) + if err != nil { + return err + } + + if len(data) == 0 { + return fmt.Errorf("malformed status. data len = 0") + } + + if pv := data[0]; pv != protocolVersion { + return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion) + } + + return nil +} + +func (self *peer) statusMsg() p2p.Msg { + return p2p.NewMsg(statusMsg, protocolVersion) +} diff --git a/whisper/sort.go b/whisper/sort.go new file mode 100644 index 000000000..8c5b46e9e --- /dev/null +++ b/whisper/sort.go @@ -0,0 +1,25 @@ +package whisper + +import "sort" + +type sortedKeys struct { + k []int32 +} + +func (self *sortedKeys) Len() int { return len(self.k) } +func (self *sortedKeys) Less(i, j int) bool { return self.k[i] < self.k[j] } +func (self *sortedKeys) Swap(i, j int) { self.k[i], self.k[j] = self.k[j], self.k[i] } + +func sortKeys(m map[int32]Hash) []int32 { + sorted := new(sortedKeys) + sorted.k = make([]int32, len(m)) + i := 0 + for key, _ := range m { + sorted.k[i] = key + i++ + } + + sort.Sort(sorted) + + return sorted.k +} diff --git a/whisper/sort_test.go b/whisper/sort_test.go new file mode 100644 index 000000000..5d8177d41 --- /dev/null +++ b/whisper/sort_test.go @@ -0,0 +1,19 @@ +package whisper + +import "testing" + +func TestSorting(t *testing.T) { + m := map[int32]Hash{ + 1: HS("1"), + 3: HS("3"), + 2: HS("2"), + 5: HS("5"), + } + exp := []int32{1, 2, 3, 5} + res := sortKeys(m) + for i, k := range res { + if k != exp[i] { + t.Error(k, "failed. Expected", exp[i]) + } + } +} diff --git a/whisper/whisper.go b/whisper/whisper.go new file mode 100644 index 000000000..692e6bc2c --- /dev/null +++ b/whisper/whisper.go @@ -0,0 +1,157 @@ +package whisper + +import ( + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "gopkg.in/fatih/set.v0" +) + +// MOVE ME +type Hash struct { + hash string +} + +var EmptyHash Hash + +func H(hash []byte) Hash { + return Hash{string(hash)} +} +func HS(hash string) Hash { + return Hash{hash} +} + +// MOVE ME END + +const ( + statusMsg = 0x0 + envelopesMsg = 0x01 +) + +type Whisper struct { + pub, sec []byte + protocol p2p.Protocol + + mmu sync.RWMutex + messages map[Hash]*Envelope + expiry map[int32]*set.SetNonTS + + quit chan struct{} +} + +func New(pub, sec []byte) *Whisper { + whisper := &Whisper{ + pub: pub, + sec: sec, + messages: make(map[Hash]*Envelope), + expiry: make(map[int32]*set.SetNonTS), + quit: make(chan struct{}), + } + go whisper.update() + + // p2p whisper sub protocol handler + whisper.protocol = p2p.Protocol{ + Name: "shh", + Version: 2, + Length: 2, + Run: whisper.msgHandler, + } + + return whisper +} + +func (self *Whisper) Stop() { + close(self.quit) +} + +func (self *Whisper) Send(ttl time.Duration, topics [][]byte, data *Message) { + envelope := NewEnvelope(ttl, topics, data) + envelope.Seal() + + self.add(envelope) +} + +func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { + wpeer := NewPeer(self, peer, ws) + if err := wpeer.init(); err != nil { + return err + } + go wpeer.start() + + for { + msg, err := ws.ReadMsg() + if err != nil { + return err + } + + envelope, err := NewEnvelopeFromReader(msg.Payload) + if err != nil { + peer.Infoln(err) + continue + } + + self.add(envelope) + } +} + +func (self *Whisper) add(envelope *Envelope) { + self.mmu.Lock() + defer self.mmu.Unlock() + + fmt.Println("received envelope", envelope) + self.messages[envelope.Hash()] = envelope + if self.expiry[envelope.Expiry] == nil { + self.expiry[envelope.Expiry] = set.NewNonTS() + } + self.expiry[envelope.Expiry].Add(envelope.Hash()) +} + +func (self *Whisper) update() { + expire := time.NewTicker(800 * time.Millisecond) +out: + for { + select { + case <-expire.C: + self.expire() + case <-self.quit: + break out + } + } +} +func (self *Whisper) expire() { + self.mmu.Lock() + defer self.mmu.Unlock() + + now := int32(time.Now().Unix()) + for then, hashSet := range self.expiry { + if then > now { + continue + } + + hashSet.Each(func(v interface{}) bool { + delete(self.messages, v.(Hash)) + return true + }) + self.expiry[then].Clear() + } +} + +func (self *Whisper) envelopes() (envelopes []*Envelope) { + self.mmu.RLock() + defer self.mmu.RUnlock() + + envelopes = make([]*Envelope, len(self.messages)) + i := 0 + for _, envelope := range self.messages { + envelopes[i] = envelope + i++ + } + + return +} + +func (self *Whisper) Protocol() p2p.Protocol { + return self.protocol +}