POC-7: networking

+ Status msg fix
+ NewBlock msg support - after the sync process (GET_BLOCK_HASHES & GET_BLOCKS) the new block received by NEW_BLOCK announcement
This commit is contained in:
romanman 2014-10-17 23:12:11 -05:00
parent e0e99baa29
commit 12d014272e
13 changed files with 287 additions and 116 deletions

View File

@ -216,7 +216,7 @@ public class BlockchainImpl implements Blockchain {
this.setLastBlock(block);
if (logger.isDebugEnabled())
logger.debug("block added {}", block.toFlatString());
logger.debug("block added to the blockChain {}", block.toFlatString());
if (block.getNumber() % 100 == 0)
logger.info("*** Last block added [ #{} ]", block.getNumber());
}

View File

@ -56,6 +56,8 @@ public class BlockQueue {
logger.debug("BlockQueue size: {}", blockReceivedQueue.size());
Block block = blockReceivedQueue.poll();
logger.info("Processing block index: {}", block.getNumber());
WorldManager.getInstance().getBlockchain().add(block);
}
@ -72,22 +74,38 @@ public class BlockQueue {
public void addBlocks(List<Block> blockList) {
Block lastReceivedBlock = blockList.get(0);
if (lastReceivedBlock.getNumber() != getLastBlock().getNumber() + 1)
return;
if (lastReceivedBlock.getNumber() != getLastBlock().getNumber() + 1){
logger.error("Block download out of sync");
return;
}
for (Block block : blockList) {
if (blockReceivedQueue.size() >= SystemProperties.CONFIG.maxBlocksQueued())
return;
blockReceivedQueue.addAll(blockList);
lastBlock = blockList.get(blockList.size() - 1);
this.lastBlock = block;
logger.debug("Last block now index: [{}]", lastBlock.getNumber());
blockReceivedQueue.add(lastBlock);
blockHashQueue.removeLast();
}
logger.debug("Blocks waiting to be proceed in the queue: [{}]",
blockReceivedQueue.size());
logger.debug("Blocks waiting to be proceed: queue.size: [{}] lastBlock.number: [{}]" ,
blockReceivedQueue.size(),
lastBlock.getNumber());
}
/**
* adding single block to the queue usually
* a result of a NEW_BLOCK message announce.
* @param block - new block
*/
public void addBlock(Block block){
if (block.getNumber() != getLastBlock().getNumber() + 1){
logger.error("Block download out of sync");
return;
}
blockReceivedQueue.add(block);
lastBlock = block;
logger.debug("Blocks waiting to be proceed: queue.size: [{}] lastBlock.number: [{}]" ,
blockReceivedQueue.size(),
lastBlock.getNumber());
}
/**
* Returns the last block in the queue. If the queue is empty,
@ -127,22 +145,30 @@ public class BlockQueue {
public void addHash(byte[] hash) {
blockHashQueue.addLast(hash);
}
public void addNewBlockHash(byte[] hash){
blockHashQueue.addFirst(hash);
}
/**
* Return a list of hashes from blocks that still need to be downloaded.
*
* @param amount - the number of hashes to return
* @return A list of hashes for which blocks need to be retrieved.
*/
public List<byte[]> getHashes() {
List<byte[]> hashes = new ArrayList<>();
Iterator<byte[]> hashIterator = blockHashQueue.descendingIterator();
while (hashIterator.hasNext() && hashes.size() <= CONFIG.maxBlocksAsk()) {
hashes.add(hashIterator.next());
while (!blockHashQueue.isEmpty() && hashes.size() < CONFIG.maxBlocksAsk()) {
hashes.add(blockHashQueue.removeLast());
}
return hashes;
}
// a bit ugly but really gives
// good result
public void logHashQueueSize(){
logger.trace("Block hashes list size: [{}]", blockHashQueue.size());
}
private class BlockByIndexComparator implements Comparator<Block> {
@Override

View File

@ -11,7 +11,7 @@ import org.ethereum.net.MessageQueue;
import org.ethereum.net.PeerListener;
import org.ethereum.net.message.ReasonCode;
import org.ethereum.net.p2p.DisconnectMessage;
import org.ethereum.net.p2p.UserMessage;
import org.ethereum.util.ByteUtil;
import org.ethereum.util.FastByteComparisons;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,15 +41,17 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
private final static Logger logger = LoggerFactory.getLogger("net");
public static byte version = 0x23;
private int secToAskForBlocks = 1;
private String peerId;
private PeerListener peerListener;
private static String hashRetrievalLock;
private MessageQueue msgQueue = null;
private SyncSatus syncStatus = SyncSatus.INIT;
private Timer getBlocksTimer = new Timer("GetBlocksTimer");
//
private Timer getTxTimer = new Timer("GetTransactionsTimer");
public EthHandler(MessageQueue msgQueue) {
@ -65,6 +67,7 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
logger.info("ETH protocol activated");
sendStatus();
}
@Override
@ -76,12 +79,12 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
public void channelRead0(final ChannelHandlerContext ctx, EthMessage msg) throws InterruptedException {
if (EthMessageCodes.inRange(msg.getCommand().asByte()))
logger.info("EthHandler invoke: {}", msg.getCommand());
logger.info("EthHandler invoke: [{}]", msg.getCommand());
switch (msg.getCommand()) {
case STATUS:
msgQueue.receivedMessage(msg);
// processStatus((StatusMessage)msg, ctx);
processStatus((StatusMessage) msg, ctx);
break;
case GET_TRANSACTIONS:
msgQueue.receivedMessage(msg);
@ -111,6 +114,8 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
msgQueue.receivedMessage(msg);
processBlocks((BlocksMessage) msg);
break;
case NEW_BLOCK:
procesNewBlock((NewBlockMessage)msg);
default:
break;
}
@ -140,7 +145,7 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
* @param msg is the StatusMessage
* @param ctx the ChannelHandlerContext
*/
public void processUser(UserMessage msg, ChannelHandlerContext ctx) {
public void processStatus(StatusMessage msg, ChannelHandlerContext ctx) {
Blockchain blockchain = WorldManager.getInstance().getBlockchain();
@ -160,13 +165,13 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
hashRetrievalLock = this.peerId;
chainQueue.setHighestTotalDifficulty(peerTotalDifficulty);
chainQueue.setBestHash(msg.getBestHash());
syncStatus = SyncSatus.HASH_RETREIVING;
sendGetBlockHashes();
} else
startGetBlockTimer();
}
}
private void processBlockHashes(BlockHashesMessage blockHashesMessage) {
Blockchain blockchain = WorldManager.getInstance().getBlockchain();
@ -185,8 +190,9 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
byte[] foundHash, latestHash = blockchain.getLatestBlockHash();
while (hashIterator.hasNext()) {
foundHash = hashIterator.next();
if (FastByteComparisons.compareTo(foundHash, 0, 32, latestHash, 0, 32) != 0)
if (FastByteComparisons.compareTo(foundHash, 0, 32, latestHash, 0, 32) != 0){
chainQueue.addHash(foundHash); // store unknown hashes in queue until known hash is found
}
else {
// if known hash is found, ignore the rest
startGetBlockTimer(); // start getting blocks from hash queue
@ -194,30 +200,67 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
}
}
// no known hash has been reached
chainQueue.logHashQueueSize();
sendGetBlockHashes(); // another getBlockHashes with last received hash.
}
private void processBlocks(BlocksMessage blocksMessage) {
Blockchain blockchain = WorldManager.getInstance().getBlockchain();
List<Block> blockList = blocksMessage.getBlocks();
// If we get one block from a peer we ask less greedy
if (blockList.size() <= 1 && secToAskForBlocks != 10) {
logger.info("Now we ask for blocks every 10 seconds");
updateGetBlocksTimer(10);
}
// If we get more blocks from a peer we ask more greedy
if (blockList.size() > 2 && secToAskForBlocks != 1) {
logger.info("Now we ask for a chain every 1 seconds");
updateGetBlocksTimer(1);
// If we got less blocks then we could get,
// it the correct indication that we are in sync we
// the chain from here there will be NEW_BLOCK only
// message expectation
if (blockList.size() < CONFIG.maxBlocksAsk()) {
logger.info(" *** The chain sync process fully complete ***");
syncStatus = SyncSatus.SYNC_DONE;
stopGetBlocksTimer();
}
if (blockList.isEmpty()) return;
blockchain.getQueue().addBlocks(blockList);
blockchain.getQueue().logHashQueueSize();
}
private void sendStatus() {
StatusMessage msg = new StatusMessage();
/**
* Processing NEW_BLOCK announce message
* @param newBlockMessage - new block message
*/
private void procesNewBlock(NewBlockMessage newBlockMessage){
Blockchain blockchain = WorldManager.getInstance().getBlockchain();
Block newBlock = newBlockMessage.getBlock();
// If the hashes still being downloaded ignore the NEW_BLOCKs
// that block hash will be retrieved by the others and letter the block itself
if (syncStatus == SyncSatus.INIT || syncStatus == SyncSatus.HASH_RETREIVING) {
logger.debug("Sync status INIT or HASH_RETREIVING ignore new block.index: [{}]", newBlock.getNumber());
return;
}
// If the GET_BLOCKs stage started add hash to the end of the hash list
// then the block will be retrieved in it's turn;
if (syncStatus == SyncSatus.BLOCK_RETREIVING){
logger.debug("Sync status BLOCK_RETREIVING add to the end of hash list: block.index: [{}]", newBlock.getNumber());
blockchain.getQueue().addNewBlockHash(newBlockMessage.getBlock().getHash());
return;
}
logger.info("New block received: block.index [{}]", newBlockMessage.getBlock().getNumber());
blockchain.getQueue().addBlock(newBlockMessage.getBlock());
blockchain.getQueue().logHashQueueSize();
}
private void sendStatus(){
Blockchain blockChain= WorldManager.getInstance().getBlockchain();
byte protocolVersion = EthHandler.version, networkId = 0;
BigInteger totalDifficulty = blockChain.getTotalDifficulty();
byte[] bestHash = blockChain.getLatestBlockHash();
StatusMessage msg = new StatusMessage(protocolVersion, networkId,
ByteUtil.bigIntegerToBytes(totalDifficulty), bestHash, Blockchain.GENESIS_HASH);
msgQueue.sendMessage(msg);
}
@ -250,6 +293,10 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
// retrieve list of block hashes from queue
List<byte[]> hashes = queue.getHashes();
if (hashes.isEmpty()) {
stopGetBlocksTimer();
return;
}
GetBlocksMessage msg = new GetBlocksMessage(hashes);
msgQueue.sendMessage(msg);
@ -279,23 +326,17 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
}
public void startGetBlockTimer() {
syncStatus = SyncSatus.BLOCK_RETREIVING;
getBlocksTimer.scheduleAtFixedRate(new TimerTask() {
public void run() {
BlockQueue blockQueue = WorldManager.getInstance().getBlockchain().getQueue();
if (blockQueue.size() > CONFIG.maxBlocksQueued()) {
logger.info("Blocks queue too big temporary postpone blocks request");
return;
}
sendGetBlocks();
}
}, 1000, secToAskForBlocks * 1000);
}
private void updateGetBlocksTimer(int seconds) {
secToAskForBlocks = seconds;
getBlocksTimer.cancel();
getBlocksTimer.purge();
getBlocksTimer = new Timer();
getBlocksTimer.scheduleAtFixedRate(new TimerTask() {
public void run() {
sendGetBlocks();
}
}, 3000, secToAskForBlocks * 1000);
}, 1000, 300);
}
private void stopGetBlocksTimer() {
@ -312,4 +353,12 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
stopGetBlocksTimer();
stopGetTxTimer();
}
private enum SyncSatus{
INIT,
HASH_RETREIVING,
BLOCK_RETREIVING,
SYNC_DONE;
}
}

View File

@ -40,7 +40,7 @@ public class GetBlockHashesMessage extends EthMessage {
}
private void parse() {
RLPList paramsList = (RLPList) RLP.decode2(encoded).get(0);
RLPList paramsList = (RLPList) RLP.decode2(encoded).get(1);
this.bestHash = paramsList.get(1).getRLPData();
byte[] maxBlocksBytes = paramsList.get(2).getRLPData();

View File

@ -1,16 +1,11 @@
package org.ethereum.net.eth;
import org.ethereum.core.Block;
import org.ethereum.net.eth.EthMessage;
import org.ethereum.core.BlockHeader;
import org.ethereum.util.RLP;
import org.ethereum.util.RLPItem;
import org.ethereum.util.RLPList;
import java.util.ArrayList;
import java.util.List;
import static org.ethereum.net.eth.EthMessageCodes.NEW_BLOCK;
/**
* Wrapper around an Ethereum Blocks message on the network
*/
@ -26,14 +21,18 @@ public class NewBlockMessage extends EthMessage {
private void parse() {
RLPList paramsList = (RLPList) RLP.decode2(encoded).get(0);
RLPItem blockItem = ((RLPItem) paramsList.get(0));
block = new Block(blockItem.getRLPData());
difficulty = ((RLPItem) paramsList.get(1)).getRLPData();
RLPList blockRLP = ((RLPList) paramsList.get(1));
block = new Block(blockRLP.getRLPData());
difficulty = paramsList.get(2).getRLPData();
parsed = true;
}
public Block getBlock(){
if (!parsed) parse();
return block;
}
@Override
public byte[] getEncoded() {
return encoded;

View File

@ -1,44 +1,120 @@
package org.ethereum.net.eth;
import org.ethereum.net.p2p.PongMessage;
import org.ethereum.util.ByteUtil;
import org.ethereum.util.RLP;
import org.ethereum.util.RLPItem;
import org.ethereum.util.RLPList;
import org.spongycastle.util.encoders.Hex;
import static org.ethereum.net.eth.EthMessageCodes.STATUS;
/**
* Wrapper around an Ethereum Ping message on the network
* Wrapper around an Ethereum Status message on the network
*
*/
public class StatusMessage extends EthMessage {
public StatusMessage() {
encode();
private byte protocolVersion;
private byte networkId;
/** Total difficulty of the best chain as found in block header. */
private byte[] totalDifficulty;
/** The hash of the best (i.e. highest TD) known block. */
private byte[] bestHash;
/** The hash of the Genesis block */
private byte[] genesisHash;
public StatusMessage(byte[] encoded) {
super(encoded);
}
public StatusMessage(byte[] payload) {
this.encoded = payload;
public StatusMessage(byte protocolVersion, byte networkId,
byte[] totalDifficulty, byte[] bestHash, byte[] genesisHash) {
this.protocolVersion = protocolVersion;
this.networkId = networkId;
this.totalDifficulty = totalDifficulty;
this.bestHash = bestHash;
this.genesisHash = genesisHash;
this.parsed = true;
}
private void parse() {
RLPList paramsList = (RLPList) RLP.decode2(encoded).get(0);
public byte[] getEncoded() {
return this.encoded;
}
this.protocolVersion = ((RLPItem) paramsList.get(1)).getRLPData()[0];
byte[] networkIdBytes = ((RLPItem) paramsList.get(2)).getRLPData();
this.networkId = networkIdBytes == null ? 0 : networkIdBytes[0];
this.totalDifficulty = ((RLPItem) paramsList.get(3)).getRLPData();
this.bestHash = ((RLPItem) paramsList.get(4)).getRLPData();
this.genesisHash = ((RLPItem) paramsList.get(5)).getRLPData();
parsed = true;
}
private void encode() {
this.encoded = RLP.encodeList(new byte[] {EthMessageCodes.STATUS.asByte()} );
byte[] command = RLP.encodeByte(STATUS.asByte());
byte[] protocolVersion = RLP.encodeByte(this.protocolVersion);
byte[] networkId = RLP.encodeByte(this.networkId);
byte[] totalDifficulty = RLP.encodeElement(this.totalDifficulty);
byte[] bestHash = RLP.encodeElement(this.bestHash);
byte[] genesisHash = RLP.encodeElement(this.genesisHash);
this.encoded = RLP.encodeList(command, protocolVersion, networkId,
totalDifficulty, bestHash, genesisHash);
}
@Override
public byte[] getEncoded() {
if (encoded == null) encode();
return encoded;
}
@Override
public Class<PongMessage> getAnswerMessage() {
return PongMessage.class;
}
@Override
public Class<?> getAnswerMessage() {
return null;
}
public byte getProtocolVersion() {
if (!parsed) parse();
return protocolVersion;
}
public byte getNetworkId() {
if (!parsed) parse();
return networkId;
}
public byte[] getTotalDifficulty() {
if (!parsed) parse();
return totalDifficulty;
}
public byte[] getBestHash() {
if (!parsed) parse();
return bestHash;
}
public byte[] getGenesisHash() {
if (!parsed) parse();
return genesisHash;
}
@Override
public EthMessageCodes getCommand(){
return EthMessageCodes.STATUS;
return STATUS;
}
@Override
public String toString() {
return "[" + getCommand().name() + "]";
}
}
@Override
public String toString() {
if (!parsed) parse();
return "[" + this.getCommand().name() +
" protocolVersion=" + this.protocolVersion +
" networkId=" + this.networkId +
" totalDifficulty=" + ByteUtil.toHexString(this.totalDifficulty) +
" bestHash=" + Hex.toHexString(this.bestHash) +
" genesisHash=" + Hex.toHexString(this.genesisHash) +
"]";
}
}

View File

@ -86,14 +86,13 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
public void channelRead0(final ChannelHandlerContext ctx, P2pMessage msg) throws InterruptedException {
if (P2pMessageCodes.inRange(msg.getCommand().asByte()))
logger.info("P2PHandler invoke: {}", msg.getCommand());
logger.info("P2PHandler invoke: [{}]", msg.getCommand());
switch (msg.getCommand()) {
case HELLO:
msgQueue.receivedMessage(msg);
if (!peerDiscoveryMode)
setHandshake((HelloMessage) msg, ctx);
sendUser();
break;
case DISCONNECT:
msgQueue.receivedMessage(msg);
@ -122,7 +121,7 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
break;
case USER:
processUser((UserMessage)msg, ctx);
processUser((UserMessage) msg);
break;
default:
ctx.fireChannelRead(msg);
@ -131,12 +130,7 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
}
public void processUser(UserMessage msg, ChannelHandlerContext ctx){
ChannelHandler handler = ctx.pipeline().get("eth");
if (handler != null){
((EthHandler)handler).processUser(msg, ctx);
}
public void processUser(UserMessage msg){
}
@Override
@ -161,18 +155,7 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
msgQueue.sendMessage(msg);
}
private void sendUser(){
Blockchain blockChain= WorldManager.getInstance().getBlockchain();
byte protocolVersion = EthHandler.version, networkId = 0;
BigInteger totalDifficulty = blockChain.getTotalDifficulty();
byte[] bestHash = blockChain.getLatestBlockHash();
UserMessage msg = new UserMessage(protocolVersion, networkId,
ByteUtil.bigIntegerToBytes(totalDifficulty), bestHash, Blockchain.GENESIS_HASH);
msgQueue.sendMessage(msg);
}
private void setHandshake(HelloMessage msg, ChannelHandlerContext ctx) {
if (msg.getP2PVersion() != HELLO_MESSAGE.getP2PVersion())
msgQueue.sendMessage(new DisconnectMessage(ReasonCode.INCOMPATIBLE_PROTOCOL));
@ -203,7 +186,7 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
public void adaptMessageIds(List<String> capabilities) {
byte offset = 0x10;
byte offset = (byte) (P2pMessageCodes.USER.asByte() + 1);
for (String capability : capabilities){
if (capability.equals("eth")){

View File

@ -48,7 +48,7 @@ public enum P2pMessageCodes {
/**
*
*/
USER(0x10);
USER(0x0F);
private int cmd;

View File

@ -38,7 +38,7 @@ public class ShhHandler extends SimpleChannelInboundHandler<ShhMessage> {
public void channelRead0(final ChannelHandlerContext ctx, ShhMessage msg) throws InterruptedException {
if (ShhMessageCodes.inRange(msg.getCommand().asByte()))
logger.info("ShhHandler invoke: {}", msg.getCommand());
logger.info("ShhHandler invoke: [{}]", msg.getCommand());
switch (msg.getCommand()) {
case STATUS:

View File

@ -21,22 +21,22 @@ log4j.appender.file.RollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolic
log4j.appender.file.RollingPolicy.FileNamePattern=./logs/ethereum_%d{yyyy-MM-dd}_h%d{HH}.log
# filter noisy classes
log4j.logger.block = ERROR
log4j.logger.blockqueue = ERROR
log4j.logger.wallet = ERROR
log4j.logger.block = ERROR
log4j.logger.blockqueue = TRACE
log4j.logger.wallet = ERROR
log4j.logger.net = TRACE
log4j.logger.db = ERROR
log4j.logger.db = ERROR
log4j.logger.peerdiscovery = ERROR
log4j.logger.java.nio = ERROR
log4j.logger.io.netty = ERROR
log4j.logger.wire = TRACE
log4j.logger.VM = ERROR
log4j.logger.main = ERROR
log4j.logger.trie = ERROR
log4j.logger.wire = ERROR
log4j.logger.VM = ERROR
log4j.logger.main = ERROR
log4j.logger.trie = ERROR
log4j.logger.state = INFO
log4j.logger.repository = INFO
log4j.logger.blockchain = INFO
log4j.logger.blockchain = TRACE
log4j.logger.txs = ERROR
log4j.logger.ui = ERROR
log4j.logger.ui = ERROR
log4j.logger.gas = ERROR

View File

@ -0,0 +1,38 @@
package org.ethereum.net;
import org.ethereum.core.Block;
import org.ethereum.net.eth.BlocksMessage;
import org.ethereum.net.eth.NewBlockMessage;
import org.junit.Ignore;
import org.junit.Test;
import org.spongycastle.util.encoders.Hex;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class NewBlockMessageTest {
/* NEW_BLOCK */
@Test
public void test_1() {
String blocksRaw = "f8c017f8b7f8b3a0d8faffbc4c4213d35db9007de41cece45d95db7fd6c0f129e158baa888c48eefa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d4934794baedba0480e1b882b606cd302d8c4f5701cabac7a0c7d4565fb7b3d98e54a0dec8b76f8c001a784a5689954ce0aedcc1bbe8d130958083063477825fc88609184e72a0008301e8488084543ffee680a00de0b9d4a0f0c23546d31f1f70db00d25cf6a7af79365b4e058e4a6a3b69527bc0c0850177ddbebe";
byte[] payload = Hex.decode(blocksRaw);
NewBlockMessage newBlockMessage = new NewBlockMessage(payload);
newBlockMessage.toString();
}
}

View File

@ -71,7 +71,7 @@
<dependency>
<groupId>org.ethereum</groupId>
<artifactId>ethereumj</artifactId>
<version>0.6.1.20141014.0918</version>
<version>0.7.1</version>
<type>jar</type>
</dependency>

View File

@ -138,7 +138,7 @@ max.blocks.ask = 100
max.blocks.queued = 300
# project version auto copied during build phase
project.version = 0.6.1
project.version = 0.7.1
# hello phrase will be included in
# the hello message of the peer