New p2p protocol. NOTE: Needs major refactoring. See #50

This commit is contained in:
obscuren 2014-09-10 11:22:19 +02:00
parent 2949990016
commit 2f362509b8
2 changed files with 103 additions and 56 deletions

View File

@ -31,20 +31,16 @@ const (
MsgDiscTy = 0x01 MsgDiscTy = 0x01
MsgPingTy = 0x02 MsgPingTy = 0x02
MsgPongTy = 0x03 MsgPongTy = 0x03
MsgGetPeersTy = 0x10 MsgGetPeersTy = 0x04
MsgPeersTy = 0x11 MsgPeersTy = 0x05
MsgStatusTy = 0x10
MsgGetTxsTy = 0x11
MsgTxTy = 0x12 MsgTxTy = 0x12
MsgGetChainTy = 0x14 MsgGetBlockHashesTy = 0x13
MsgNotInChainTy = 0x15 MsgBlockHashesTy = 0x14
MsgGetTxsTy = 0x16 MsgGetBlocksTy = 0x15
MsgGetBlockHashesTy = 0x17 MsgBlockTy = 0x16
MsgBlockHashesTy = 0x18
MsgGetBlocksTy = 0x19
MsgBlockTy = 0x13
MsgOldBlockTy = 0xbb
MsgTalkTy = 0xff
) )
var msgTypeToString = map[MsgType]string{ var msgTypeToString = map[MsgType]string{
@ -56,9 +52,7 @@ var msgTypeToString = map[MsgType]string{
MsgPeersTy: "Peers", MsgPeersTy: "Peers",
MsgTxTy: "Transactions", MsgTxTy: "Transactions",
MsgBlockTy: "Blocks", MsgBlockTy: "Blocks",
MsgGetChainTy: "Get chain",
MsgGetTxsTy: "Get Txs", MsgGetTxsTy: "Get Txs",
MsgNotInChainTy: "Not in chain",
MsgGetBlockHashesTy: "Get block hashes", MsgGetBlockHashesTy: "Get block hashes",
MsgBlockHashesTy: "Block hashes", MsgBlockHashesTy: "Block hashes",
MsgGetBlocksTy: "Get blocks", MsgGetBlocksTy: "Get blocks",

125
peer.go
View File

@ -25,6 +25,8 @@ const (
outputBufferSize = 50 outputBufferSize = 50
// Current protocol version // Current protocol version
ProtocolVersion = 28 ProtocolVersion = 28
// Current P2P version
P2PVersion = 0
// Interval for ping/pong message // Interval for ping/pong message
pingPongTimer = 2 * time.Second pingPongTimer = 2 * time.Second
) )
@ -122,6 +124,7 @@ type Peer struct {
// This flag is used by writeMessage to check if messages are allowed // This flag is used by writeMessage to check if messages are allowed
// to be send or not. If no version is known all messages are ignored. // to be send or not. If no version is known all messages are ignored.
versionKnown bool versionKnown bool
statusKnown bool
// Last received pong message // Last received pong message
lastPong int64 lastPong int64
@ -271,6 +274,14 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) {
default: // Anything but ack is allowed default: // Anything but ack is allowed
return return
} }
} else {
if !p.statusKnown {
switch msg.Type {
case ethwire.MsgStatusTy: // Ok
default: // Anything but ack is allowed
return
}
}
} }
peerlogger.DebugDetailf("(%v) <= %v %v\n", p.conn.RemoteAddr(), msg.Type, msg.Data) peerlogger.DebugDetailf("(%v) <= %v %v\n", p.conn.RemoteAddr(), msg.Type, msg.Data)
@ -356,9 +367,9 @@ func (p *Peer) HandleInbound() {
// Version message // Version message
p.handleHandshake(msg) p.handleHandshake(msg)
if p.caps.IsCap(CapPeerDiscTy) { //if p.caps.IsCap(CapPeerDiscTy) {
p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, "")) p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, ""))
} //}
case ethwire.MsgDiscTy: case ethwire.MsgDiscTy:
p.Stop() p.Stop()
@ -396,6 +407,10 @@ func (p *Peer) HandleInbound() {
// Connect to the list of peers // Connect to the list of peers
p.ethereum.ProcessPeerList(peers) p.ethereum.ProcessPeerList(peers)
case ethwire.MsgStatusTy:
// Handle peer's status msg
p.handleStatus(msg)
case ethwire.MsgGetTxsTy: case ethwire.MsgGetTxsTy:
// Get the current transactions of the pool // Get the current transactions of the pool
txs := p.ethereum.TxPool().CurrentTransactions() txs := p.ethereum.TxPool().CurrentTransactions()
@ -581,6 +596,7 @@ func (p *Peer) Stop() {
p.ethereum.RemovePeer(p) p.ethereum.RemovePeer(p)
} }
/*
func (p *Peer) pushHandshake() error { func (p *Peer) pushHandshake() error {
pubkey := p.ethereum.KeyManager().PublicKey() pubkey := p.ethereum.KeyManager().PublicKey()
msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{
@ -592,6 +608,7 @@ func (p *Peer) pushHandshake() error {
return nil return nil
} }
*/
func (p *Peer) peersMessage() *ethwire.Msg { func (p *Peer) peersMessage() *ethwire.Msg {
outPeers := make([]interface{}, len(p.ethereum.InOutPeers())) outPeers := make([]interface{}, len(p.ethereum.InOutPeers()))
@ -612,13 +629,72 @@ func (p *Peer) pushPeers() {
p.QueueMessage(p.peersMessage()) p.QueueMessage(p.peersMessage())
} }
func (p *Peer) pushHandshake() error {
pubkey := p.ethereum.KeyManager().PublicKey()
msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{
uint32(0), []byte(p.version), []string{"eth"}, p.port, pubkey[1:],
})
p.QueueMessage(msg)
return nil
}
func (self *Peer) pushStatus() {
const netVersion = 0
msg := ethwire.NewMessage(ethwire.MsgStatusTy, []interface{}{
uint32(ProtocolVersion),
netVersion,
self.ethereum.BlockChain().TD.Uint64(),
self.ethereum.BlockChain().CurrentBlock.Hash(),
self.ethereum.BlockChain().Genesis().Hash(),
})
self.QueueMessage(msg)
}
func (self *Peer) handleStatus(msg *ethwire.Msg) {
c := msg.Data
// Set the peer's caps
//p.caps = Caps(c.Get(3).Byte())
// Get the td and last hash
self.td = c.Get(6).BigInt()
self.bestHash = c.Get(7).Bytes()
self.lastReceivedHash = self.bestHash
// Compare the total TD with the blockchain TD. If remote is higher
// fetch hashes from highest TD node.
if self.td.Cmp(self.ethereum.BlockChain().TD) > 0 {
self.ethereum.blockPool.AddHash(self.lastReceivedHash)
self.FetchHashes()
}
ethlogger.Infof("Peer is [ETH] capable. (TD = %v ~ %x", self.td, self.bestHash)
}
func (p *Peer) handleHandshake(msg *ethwire.Msg) { func (p *Peer) handleHandshake(msg *ethwire.Msg) {
c := msg.Data c := msg.Data
// Set pubkey var (
p.pubkey = c.Get(5).Bytes() p2pVersion = c.Get(0).Uint()
clientId = c.Get(1).Str()
caps = c.Get(2).Raw()
port = c.Get(3).Uint()
pub = c.Get(4).Bytes()
)
if p.pubkey == nil { fmt.Println("PEER CAPS", caps)
// Check correctness of p2p protocol version
if p2pVersion != P2PVersion {
peerlogger.Debugf("Invalid P2P version. Require protocol %d, received %d\n", P2PVersion, p2pVersion)
p.Stop()
return
}
// Handle the pub key (validation, uniqueness)
if pub == nil || len(pub) == 0 {
peerlogger.Warnln("Pubkey required, not supplied in handshake.") peerlogger.Warnln("Pubkey required, not supplied in handshake.")
p.Stop() p.Stop()
return return
@ -627,7 +703,7 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
usedPub := 0 usedPub := 0
// This peer is already added to the peerlist so we expect to find a double pubkey at least once // This peer is already added to the peerlist so we expect to find a double pubkey at least once
eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) { eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) {
if bytes.Compare(p.pubkey, peer.pubkey) == 0 { if bytes.Compare(pub, peer.pubkey) == 0 {
usedPub++ usedPub++
} }
}) })
@ -637,18 +713,11 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
p.Stop() p.Stop()
return return
} }
p.pubkey = pub
if c.Get(0).Uint() != ProtocolVersion {
peerlogger.Debugf("Invalid peer version. Require protocol: %d. Received: %d\n", ProtocolVersion, c.Get(0).Uint())
p.Stop()
return
}
p.versionKnown = true
// If this is an inbound connection send an ack back // If this is an inbound connection send an ack back
if p.inbound { if p.inbound {
p.port = uint16(c.Get(4).Uint()) p.port = uint16(port)
// Self connect detection // Self connect detection
pubkey := p.ethereum.KeyManager().PublicKey() pubkey := p.ethereum.KeyManager().PublicKey()
@ -659,34 +728,18 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
} }
} }
p.SetVersion(clientId)
// Set the peer's caps p.versionKnown = true
p.caps = Caps(c.Get(3).Byte())
// Get a reference to the peers version
versionString := c.Get(2).Str()
if len(versionString) > 0 {
p.SetVersion(c.Get(2).Str())
}
// Get the td and last hash
p.td = c.Get(6).BigInt()
p.bestHash = c.Get(7).Bytes()
p.lastReceivedHash = p.bestHash
p.ethereum.PushPeer(p) p.ethereum.PushPeer(p)
p.ethereum.reactor.Post("peerList", p.ethereum.Peers()) p.ethereum.reactor.Post("peerList", p.ethereum.Peers())
ethlogger.Infof("Added peer (%s) %d / %d (TD = %v ~ %x)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, p.td, p.bestHash) ethlogger.Infof("Added peer (%s) %d / %d \n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers)
// Compare the total TD with the blockchain TD. If remote is higher
// fetch hashes from highest TD node.
if p.td.Cmp(p.ethereum.BlockChain().TD) > 0 {
p.ethereum.blockPool.AddHash(p.lastReceivedHash)
p.FetchHashes()
}
peerlogger.Debugln(p) peerlogger.Debugln(p)
p.pushStatus()
} }
func (p *Peer) String() string { func (p *Peer) String() string {