From 47793b606c33e93d9a4295ba4db12165182d9e72 Mon Sep 17 00:00:00 2001 From: zelig Date: Fri, 5 Dec 2014 21:14:55 +0000 Subject: [PATCH] initial commit for eth-p2p integration --- eth/error.go | 73 +++++++++++ eth/protocol.go | 294 +++++++++++++++++++++++++++++++++++++++++++ eth/protocol_test.go | 133 ++++++++++++++++++++ 3 files changed, 500 insertions(+) create mode 100644 eth/error.go create mode 100644 eth/protocol.go create mode 100644 eth/protocol_test.go diff --git a/eth/error.go b/eth/error.go new file mode 100644 index 000000000..cb4459435 --- /dev/null +++ b/eth/error.go @@ -0,0 +1,73 @@ +package eth + +import ( + "fmt" + // "github.com/ethereum/go-ethereum/logger" +) + +const ( + ErrMsgTooLarge = iota + ErrDecode + ErrInvalidMsgCode + ErrProtocolVersionMismatch + ErrNetworkIdMismatch + ErrGenesisBlockMismatch + ErrNoStatusMsg + ErrExtraStatusMsg + ErrInvalidBlock +) + +var errorToString = map[int]string{ + ErrMsgTooLarge: "Message too long", + ErrDecode: "Invalid message", + ErrInvalidMsgCode: "Invalid message code", + ErrProtocolVersionMismatch: "Protocol version mismatch", + ErrNetworkIdMismatch: "NetworkId mismatch", + ErrGenesisBlockMismatch: "Genesis block mismatch", + ErrNoStatusMsg: "No status message", + ErrExtraStatusMsg: "Extra status message", + ErrInvalidBlock: "Invalid block", +} + +type protocolError struct { + Code int + fatal bool + message string + format string + params []interface{} + // size int +} + +func newProtocolError(code int, format string, params ...interface{}) *protocolError { + return &protocolError{Code: code, format: format, params: params} +} + +func ProtocolError(code int, format string, params ...interface{}) (err *protocolError) { + err = newProtocolError(code, format, params...) + // report(err) + if err.Fatal() { + logger.Errorln(err) + } else { + logger.Debugln(err) + } + return +} + +func (self protocolError) Error() (message string) { + message = self.message + if message == "" { + message, ok := errorToString[self.Code] + if !ok { + panic("invalid error code") + } + if self.format != "" { + message += ": " + fmt.Sprintf(self.format, self.params...) + } + self.message = message + } + return +} + +func (self *protocolError) Fatal() bool { + return self.fatal +} diff --git a/eth/protocol.go b/eth/protocol.go new file mode 100644 index 000000000..992fc7550 --- /dev/null +++ b/eth/protocol.go @@ -0,0 +1,294 @@ +package eth + +import ( + "bytes" + "math" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethutil" + ethlogger "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p" +) + +var logger = ethlogger.NewLogger("SERV") + +// ethProtocol represents the ethereum wire protocol +// instance is running on each peer +type ethProtocol struct { + eth backend + td *big.Int + peer *p2p.Peer + rw p2p.MsgReadWriter +} + +// backend is the interface the ethereum protocol backend should implement +// used as an argument to EthProtocol +type backend interface { + GetTransactions() (txs []*types.Transaction) + AddTransactions(txs []*types.Transaction) + GetBlockHashes(hash []byte, amount uint32) (hashes [][]byte) + AddHash(hash []byte, peer *p2p.Peer) (more bool) + GetBlock(hash []byte) (block *types.Block) + AddBlock(td *big.Int, block *types.Block, peer *p2p.Peer) (fetchHashes bool, err error) + AddPeer(td *big.Int, currentBlock []byte, peer *p2p.Peer) (fetchHashes bool) + Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) +} + +const ( + ProtocolVersion = 43 + // 0x00 // PoC-1 + // 0x01 // PoC-2 + // 0x07 // PoC-3 + // 0x09 // PoC-4 + // 0x17 // PoC-5 + // 0x1c // PoC-6 + NetworkId = 0 + ProtocolLength = uint64(8) + ProtocolMaxMsgSize = 10 * 1024 * 1024 + + blockHashesBatchSize = 256 +) + +// eth protocol message codes +const ( + StatusMsg = iota + GetTxMsg // unused + TxMsg + GetBlockHashesMsg + BlockHashesMsg + GetBlocksMsg + BlocksMsg + NewBlockMsg +) + +// message structs used for rlp decoding +type newBlockMsgData struct { + Block *types.Block + TD *big.Int +} + +type getBlockHashesMsgData struct { + Hash []byte + Amount uint32 +} + +// main entrypoint, wrappers starting a server running the eth protocol +// use this constructor to attach the protocol (class) to server caps +func EthProtocol(eth backend) *p2p.Protocol { + return &p2p.Protocol{ + Name: "eth", + Version: ProtocolVersion, + Length: ProtocolLength, + Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + return runEthProtocol(eth, peer, rw) + }, + } +} + +func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { + self := ðProtocol{ + eth: eth, + rw: rw, + peer: peer, + } + err = self.handleStatus() + if err == nil { + go func() { + for { + err = self.handle() + if err != nil { + break + } + } + }() + } + return +} + +func (self *ethProtocol) handle() error { + msg, err := self.rw.ReadMsg() + if err != nil { + return err + } + if msg.Size > ProtocolMaxMsgSize { + return ProtocolError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) + } + // make sure that the payload has been fully consumed + defer msg.Discard() + + switch msg.Code { + + case StatusMsg: + return ProtocolError(ErrExtraStatusMsg, "") + + case GetTxMsg: + txs := self.eth.GetTransactions() + // TODO: rewrite using rlp flat + txsInterface := make([]interface{}, len(txs)) + for i, tx := range txs { + txsInterface[i] = tx.RlpData() + } + return self.rw.EncodeMsg(TxMsg, txsInterface...) + + case TxMsg: + var txs []*types.Transaction + if err := msg.Decode(&txs); err != nil { + return ProtocolError(ErrDecode, "%v", err) + } + self.eth.AddTransactions(txs) + + case GetBlockHashesMsg: + var request getBlockHashesMsgData + if err := msg.Decode(&request); err != nil { + return ProtocolError(ErrDecode, "%v", err) + } + hashes := self.eth.GetBlockHashes(request.Hash, request.Amount) + return self.rw.EncodeMsg(BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...) + + case BlockHashesMsg: + // TODO: redo using lazy decode , this way very inefficient on known chains + // s := rlp.NewListStream(msg.Payload, uint64(msg.Size)) + var blockHashes [][]byte + if err := msg.Decode(&blockHashes); err != nil { + return ProtocolError(ErrDecode, "%v", err) + } + fetchMore := true + for _, hash := range blockHashes { + fetchMore = self.eth.AddHash(hash, self.peer) + if !fetchMore { + break + } + } + if fetchMore { + return self.FetchHashes(blockHashes[len(blockHashes)-1]) + } + + case GetBlocksMsg: + // Limit to max 300 blocks + var blockHashes [][]byte + if err := msg.Decode(&blockHashes); err != nil { + return ProtocolError(ErrDecode, "%v", err) + } + max := int(math.Min(float64(len(blockHashes)), 300.0)) + var blocks []interface{} + for i, hash := range blockHashes { + if i >= max { + break + } + block := self.eth.GetBlock(hash) + if block != nil { + blocks = append(blocks, block.Value().Raw()) + } + } + return self.rw.EncodeMsg(BlocksMsg, blocks...) + + case BlocksMsg: + var blocks []*types.Block + if err := msg.Decode(&blocks); err != nil { + return ProtocolError(ErrDecode, "%v", err) + } + for _, block := range blocks { + fetchHashes, err := self.eth.AddBlock(nil, block, self.peer) + if err != nil { + return ProtocolError(ErrInvalidBlock, "%v", err) + } + if fetchHashes { + if err := self.FetchHashes(block.Hash()); err != nil { + return err + } + } + } + + case NewBlockMsg: + var request newBlockMsgData + if err := msg.Decode(&request); err != nil { + return ProtocolError(ErrDecode, "%v", err) + } + var fetchHashes bool + // this should reset td and offer blockpool as candidate new peer? + if fetchHashes, err = self.eth.AddBlock(request.TD, request.Block, self.peer); err != nil { + return ProtocolError(ErrInvalidBlock, "%v", err) + } + if fetchHashes { + return self.FetchHashes(request.Block.Hash()) + } + + default: + return ProtocolError(ErrInvalidMsgCode, "%v", msg.Code) + } + return nil +} + +type statusMsgData struct { + ProtocolVersion uint + NetworkId uint + TD *big.Int + CurrentBlock []byte + GenesisBlock []byte +} + +func (self *ethProtocol) statusMsg() p2p.Msg { + td, currentBlock, genesisBlock := self.eth.Status() + + return p2p.NewMsg(StatusMsg, + uint32(ProtocolVersion), + uint32(NetworkId), + td, + currentBlock, + genesisBlock, + ) +} + +func (self *ethProtocol) handleStatus() error { + // send precanned status message + if err := self.rw.WriteMsg(self.statusMsg()); err != nil { + return err + } + + // read and handle remote status + msg, err := self.rw.ReadMsg() + if err != nil { + return err + } + + if msg.Code != StatusMsg { + return ProtocolError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg) + } + + if msg.Size > ProtocolMaxMsgSize { + return ProtocolError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) + } + + var status statusMsgData + if err := msg.Decode(&status); err != nil { + return ProtocolError(ErrDecode, "%v", err) + } + + _, _, genesisBlock := self.eth.Status() + + if bytes.Compare(status.GenesisBlock, genesisBlock) != 0 { + return ProtocolError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock) + } + + if status.NetworkId != NetworkId { + return ProtocolError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, NetworkId) + } + + if ProtocolVersion != status.ProtocolVersion { + return ProtocolError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, ProtocolVersion) + } + + logger.Infof("Peer is [eth] capable (%d/%d). TD = %v ~ %x", status.ProtocolVersion, status.NetworkId, status.CurrentBlock) + + if self.eth.AddPeer(status.TD, status.CurrentBlock, self.peer) { + return self.FetchHashes(status.CurrentBlock) + } + + return nil +} + +func (self *ethProtocol) FetchHashes(from []byte) error { + logger.Debugf("Fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4]) + return self.rw.EncodeMsg(GetBlockHashesMsg, from, blockHashesBatchSize) +} diff --git a/eth/protocol_test.go b/eth/protocol_test.go new file mode 100644 index 000000000..757e87fa4 --- /dev/null +++ b/eth/protocol_test.go @@ -0,0 +1,133 @@ +package eth + +import ( + "io" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/p2p" +) + +type testMsgReadWriter struct { + in chan p2p.Msg + out chan p2p.Msg +} + +func (self *testMsgReadWriter) In(msg p2p.Msg) { + self.in <- msg +} + +func (self *testMsgReadWriter) Out(msg p2p.Msg) { + self.in <- msg +} + +func (self *testMsgReadWriter) WriteMsg(msg p2p.Msg) error { + self.out <- msg + return nil +} + +func (self *testMsgReadWriter) EncodeMsg(code uint64, data ...interface{}) error { + return self.WriteMsg(p2p.NewMsg(code, data)) +} + +func (self *testMsgReadWriter) ReadMsg() (p2p.Msg, error) { + msg, ok := <-self.in + if !ok { + return msg, io.EOF + } + return msg, nil +} + +func errorCheck(t *testing.T, expCode int, err error) { + perr, ok := err.(*protocolError) + if ok && perr != nil { + if code := perr.Code; code != expCode { + ok = false + } + } + if !ok { + t.Errorf("expected error code %v, got %v", ErrNoStatusMsg, err) + } +} + +type TestBackend struct { + getTransactions func() []*types.Transaction + addTransactions func(txs []*types.Transaction) + getBlockHashes func(hash []byte, amount uint32) (hashes [][]byte) + addHash func(hash []byte, peer *p2p.Peer) (more bool) + getBlock func(hash []byte) *types.Block + addBlock func(td *big.Int, block *types.Block, peer *p2p.Peer) (fetchHashes bool, err error) + addPeer func(td *big.Int, currentBlock []byte, peer *p2p.Peer) (fetchHashes bool) + status func() (td *big.Int, currentBlock []byte, genesisBlock []byte) +} + +func (self *TestBackend) GetTransactions() (txs []*types.Transaction) { + if self.getTransactions != nil { + txs = self.getTransactions() + } + return +} + +func (self *TestBackend) AddTransactions(txs []*types.Transaction) { + if self.addTransactions != nil { + self.addTransactions(txs) + } +} + +func (self *TestBackend) GetBlockHashes(hash []byte, amount uint32) (hashes [][]byte) { + if self.getBlockHashes != nil { + hashes = self.getBlockHashes(hash, amount) + } + return +} + +func (self *TestBackend) AddHash(hash []byte, peer *p2p.Peer) (more bool) { + if self.addHash != nil { + more = self.addHash(hash, peer) + } + return +} +func (self *TestBackend) GetBlock(hash []byte) (block *types.Block) { + if self.getBlock != nil { + block = self.getBlock(hash) + } + return +} + +func (self *TestBackend) AddBlock(td *big.Int, block *types.Block, peer *p2p.Peer) (fetchHashes bool, err error) { + if self.addBlock != nil { + fetchHashes, err = self.addBlock(td, block, peer) + } + return +} + +func (self *TestBackend) AddPeer(td *big.Int, currentBlock []byte, peer *p2p.Peer) (fetchHashes bool) { + if self.addPeer != nil { + fetchHashes = self.addPeer(td, currentBlock, peer) + } + return +} + +func (self *TestBackend) Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) { + if self.status != nil { + td, currentBlock, genesisBlock = self.status() + } + return +} + +func TestEth(t *testing.T) { + quit := make(chan bool) + rw := &testMsgReadWriter{make(chan p2p.Msg, 10), make(chan p2p.Msg, 10)} + testBackend := &TestBackend{} + var err error + go func() { + err = runEthProtocol(testBackend, nil, rw) + close(quit) + }() + statusMsg := p2p.NewMsg(4) + rw.In(statusMsg) + <-quit + errorCheck(t, ErrNoStatusMsg, err) + // read(t, remote, []byte("hello, world"), nil) +}