diff --git a/eth/protocol.go b/eth/protocol.go index 992fc7550..380bcc8d2 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -7,18 +7,16 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethutil" - ethlogger "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" ) -var logger = ethlogger.NewLogger("SERV") - // ethProtocol represents the ethereum wire protocol // instance is running on each peer type ethProtocol struct { eth backend - td *big.Int peer *p2p.Peer + id string rw p2p.MsgReadWriter } @@ -26,28 +24,21 @@ type ethProtocol struct { // used as an argument to EthProtocol type backend interface { GetTransactions() (txs []*types.Transaction) - AddTransactions(txs []*types.Transaction) + AddTransactions([]*types.Transaction) GetBlockHashes(hash []byte, amount uint32) (hashes [][]byte) - AddHash(hash []byte, peer *p2p.Peer) (more bool) + AddBlockHashes(next func() ([]byte, bool), peerId string) GetBlock(hash []byte) (block *types.Block) - AddBlock(td *big.Int, block *types.Block, peer *p2p.Peer) (fetchHashes bool, err error) - AddPeer(td *big.Int, currentBlock []byte, peer *p2p.Peer) (fetchHashes bool) + AddBlock(block *types.Block, peerId string) (err error) + AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool) + RemovePeer(peerId string) Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) } const ( - ProtocolVersion = 43 - // 0x00 // PoC-1 - // 0x01 // PoC-2 - // 0x07 // PoC-3 - // 0x09 // PoC-4 - // 0x17 // PoC-5 - // 0x1c // PoC-6 + ProtocolVersion = 43 NetworkId = 0 ProtocolLength = uint64(8) ProtocolMaxMsgSize = 10 * 1024 * 1024 - - blockHashesBatchSize = 256 ) // eth protocol message codes @@ -74,7 +65,8 @@ type getBlockHashesMsgData struct { } // main entrypoint, wrappers starting a server running the eth protocol -// use this constructor to attach the protocol (class) to server caps +// use this constructor to attach the protocol ("class") to server caps +// the Dev p2p layer then runs the protocol instance on each peer func EthProtocol(eth backend) *p2p.Protocol { return &p2p.Protocol{ Name: "eth", @@ -86,11 +78,14 @@ func EthProtocol(eth backend) *p2p.Protocol { } } +// the main loop that handles incoming messages +// note RemovePeer in the post-disconnect hook func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { self := ðProtocol{ eth: eth, rw: rw, peer: peer, + id: (string)(peer.Identity().Pubkey()), } err = self.handleStatus() if err == nil { @@ -98,6 +93,7 @@ func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err erro for { err = self.handle() if err != nil { + self.eth.RemovePeer(self.id) break } } @@ -132,6 +128,7 @@ func (self *ethProtocol) handle() error { return self.rw.EncodeMsg(TxMsg, txsInterface...) case TxMsg: + // TODO: rework using lazy RLP stream var txs []*types.Transaction if err := msg.Decode(&txs); err != nil { return ProtocolError(ErrDecode, "%v", err) @@ -148,29 +145,26 @@ func (self *ethProtocol) handle() error { case BlockHashesMsg: // TODO: redo using lazy decode , this way very inefficient on known chains - // s := rlp.NewListStream(msg.Payload, uint64(msg.Size)) - var blockHashes [][]byte - if err := msg.Decode(&blockHashes); err != nil { - return ProtocolError(ErrDecode, "%v", err) - } - fetchMore := true - for _, hash := range blockHashes { - fetchMore = self.eth.AddHash(hash, self.peer) - if !fetchMore { - break + msgStream := rlp.NewListStream(msg.Payload, uint64(msg.Size)) + var err error + iter := func() (hash []byte, ok bool) { + hash, err = msgStream.Bytes() + if err == nil { + ok = true } + return } - if fetchMore { - return self.FetchHashes(blockHashes[len(blockHashes)-1]) + self.eth.AddBlockHashes(iter, self.id) + if err != nil && err != rlp.EOL { + return ProtocolError(ErrDecode, "%v", err) } case GetBlocksMsg: - // Limit to max 300 blocks var blockHashes [][]byte if err := msg.Decode(&blockHashes); err != nil { return ProtocolError(ErrDecode, "%v", err) } - max := int(math.Min(float64(len(blockHashes)), 300.0)) + max := int(math.Min(float64(len(blockHashes)), blockHashesBatchSize)) var blocks []interface{} for i, hash := range blockHashes { if i >= max { @@ -184,20 +178,19 @@ func (self *ethProtocol) handle() error { return self.rw.EncodeMsg(BlocksMsg, blocks...) case BlocksMsg: - var blocks []*types.Block - if err := msg.Decode(&blocks); err != nil { - return ProtocolError(ErrDecode, "%v", err) - } - for _, block := range blocks { - fetchHashes, err := self.eth.AddBlock(nil, block, self.peer) - if err != nil { - return ProtocolError(ErrInvalidBlock, "%v", err) - } - if fetchHashes { - if err := self.FetchHashes(block.Hash()); err != nil { - return err + msgStream := rlp.NewListStream(msg.Payload, uint64(msg.Size)) + for { + var block *types.Block + if err := msgStream.Decode(&block); err != nil { + if err == rlp.EOL { + break + } else { + return ProtocolError(ErrDecode, "%v", err) } } + if err := self.eth.AddBlock(block, self.id); err != nil { + return ProtocolError(ErrInvalidBlock, "%v", err) + } } case NewBlockMsg: @@ -205,13 +198,24 @@ func (self *ethProtocol) handle() error { if err := msg.Decode(&request); err != nil { return ProtocolError(ErrDecode, "%v", err) } - var fetchHashes bool - // this should reset td and offer blockpool as candidate new peer? - if fetchHashes, err = self.eth.AddBlock(request.TD, request.Block, self.peer); err != nil { - return ProtocolError(ErrInvalidBlock, "%v", err) - } - if fetchHashes { - return self.FetchHashes(request.Block.Hash()) + hash := request.Block.Hash() + // to simplify backend interface adding a new block + // uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer + // (or selected as new best peer) + if self.eth.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.invalidBlock) { + called := true + iter := func() (hash []byte, ok bool) { + if called { + called = false + return hash, true + } else { + return + } + } + self.eth.AddBlockHashes(iter, self.id) + if err := self.eth.AddBlock(request.Block, self.id); err != nil { + return ProtocolError(ErrInvalidBlock, "%v", err) + } } default: @@ -279,16 +283,34 @@ func (self *ethProtocol) handleStatus() error { return ProtocolError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, ProtocolVersion) } - logger.Infof("Peer is [eth] capable (%d/%d). TD = %v ~ %x", status.ProtocolVersion, status.NetworkId, status.CurrentBlock) + self.peer.Infof("Peer is [eth] capable (%d/%d). TD = %v ~ %x", status.ProtocolVersion, status.NetworkId, status.CurrentBlock) - if self.eth.AddPeer(status.TD, status.CurrentBlock, self.peer) { - return self.FetchHashes(status.CurrentBlock) - } + self.eth.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.invalidBlock) return nil } -func (self *ethProtocol) FetchHashes(from []byte) error { - logger.Debugf("Fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4]) +func (self *ethProtocol) requestBlockHashes(from []byte) error { + self.peer.Debugf("fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4]) return self.rw.EncodeMsg(GetBlockHashesMsg, from, blockHashesBatchSize) } + +func (self *ethProtocol) requestBlocks(hashes [][]byte) error { + self.peer.Debugf("fetching %v blocks", len(hashes)) + return self.rw.EncodeMsg(GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)) +} + +func (self *ethProtocol) invalidBlock(err error) { + ProtocolError(ErrInvalidBlock, "%v", err) + self.peer.Disconnect(p2p.DiscSubprotocolError) +} + +func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) { + err = ProtocolError(code, format, params...) + if err.Fatal() { + self.peer.Errorln(err) + } else { + self.peer.Debugln(err) + } + return +} diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 757e87fa4..a166ea6cd 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" ) @@ -55,10 +56,11 @@ type TestBackend struct { getTransactions func() []*types.Transaction addTransactions func(txs []*types.Transaction) getBlockHashes func(hash []byte, amount uint32) (hashes [][]byte) - addHash func(hash []byte, peer *p2p.Peer) (more bool) + addBlockHashes func(next func() ([]byte, bool), peerId string) getBlock func(hash []byte) *types.Block - addBlock func(td *big.Int, block *types.Block, peer *p2p.Peer) (fetchHashes bool, err error) - addPeer func(td *big.Int, currentBlock []byte, peer *p2p.Peer) (fetchHashes bool) + addBlock func(block *types.Block, peerId string) (err error) + addPeer func(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool) + removePeer func(peerId string) status func() (td *big.Int, currentBlock []byte, genesisBlock []byte) } @@ -82,12 +84,12 @@ func (self *TestBackend) GetBlockHashes(hash []byte, amount uint32) (hashes [][] return } -func (self *TestBackend) AddHash(hash []byte, peer *p2p.Peer) (more bool) { - if self.addHash != nil { - more = self.addHash(hash, peer) +func (self *TestBackend) AddBlockHashes(next func() ([]byte, bool), peerId string) { + if self.addBlockHashes != nil { + self.addBlockHashes(next, peerId) } - return } + func (self *TestBackend) GetBlock(hash []byte) (block *types.Block) { if self.getBlock != nil { block = self.getBlock(hash) @@ -95,20 +97,26 @@ func (self *TestBackend) GetBlock(hash []byte) (block *types.Block) { return } -func (self *TestBackend) AddBlock(td *big.Int, block *types.Block, peer *p2p.Peer) (fetchHashes bool, err error) { +func (self *TestBackend) AddBlock(block *types.Block, peerId string) (err error) { if self.addBlock != nil { - fetchHashes, err = self.addBlock(td, block, peer) + err = self.addBlock(block, peerId) } return } -func (self *TestBackend) AddPeer(td *big.Int, currentBlock []byte, peer *p2p.Peer) (fetchHashes bool) { +func (self *TestBackend) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool) { if self.addPeer != nil { - fetchHashes = self.addPeer(td, currentBlock, peer) + best = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, invalidBlock) } return } +func (self *TestBackend) RemovePeer(peerId string) { + if self.removePeer != nil { + self.removePeer(peerId) + } +} + func (self *TestBackend) Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) { if self.status != nil { td, currentBlock, genesisBlock = self.status() @@ -116,13 +124,35 @@ func (self *TestBackend) Status() (td *big.Int, currentBlock []byte, genesisBloc return } -func TestEth(t *testing.T) { +// TODO: refactor this into p2p/client_identity +type peerId struct { + pubkey []byte +} + +func (self *peerId) String() string { + return "test peer" +} + +func (self *peerId) Pubkey() (pubkey []byte) { + pubkey = self.pubkey + if len(pubkey) == 0 { + pubkey = crypto.GenerateNewKeyPair().PublicKey + self.pubkey = pubkey + } + return +} + +func testPeer() *p2p.Peer { + return p2p.NewPeer(&peerId{}, []p2p.Cap{}) +} + +func TestErrNoStatusMsg(t *testing.T) { quit := make(chan bool) rw := &testMsgReadWriter{make(chan p2p.Msg, 10), make(chan p2p.Msg, 10)} testBackend := &TestBackend{} var err error go func() { - err = runEthProtocol(testBackend, nil, rw) + err = runEthProtocol(testBackend, testPeer(), rw) close(quit) }() statusMsg := p2p.NewMsg(4)