Several fixes:

+ sync fix for NEW_BLOCK got in the middle of sync process
+ sync for rollback
+ apply transaction adjustments
This commit is contained in:
romanman 2014-12-11 09:50:42 +01:00
parent a8d407ab28
commit 70fc04104c
10 changed files with 153 additions and 78 deletions

View File

@ -152,51 +152,30 @@ public class BlockchainImpl implements Blockchain {
return;
}
// case when one of the alt chain probably
// going to connect this block
if (!hasParentOnTheChain(block)){
if (!hasParentOnTheChain(block) && block.getNumber() > bestBlock.getNumber()){
Iterator<Chain> iterAltChains = altChains.iterator();
boolean connected = false;
while (iterAltChains.hasNext() && !connected){
logger.info("*** Blockchain will rollback and resynchronise now ");
Chain chain = iterAltChains.next();
connected = chain.tryToConnect(block);
if (connected &&
chain.getTotalDifficulty().subtract(totalDifficulty).longValue() > 5000){
long rollbackIdx = bestBlock.getNumber() - 30;
if (rollbackIdx <= 0) rollbackIdx = bestBlock.getNumber() - bestBlock.getNumber() / 10;
Block rollbackBlock = blockStore.getBlockByNumber(rollbackIdx);
repository.syncToRoot(rollbackBlock.getStateRoot());
// todo: replay the alt on the main chain
}
}
if (connected) return;
}
BigInteger deltaTD = blockStore.getTotalDifficultySince(rollbackBlock.getNumber());
totalDifficulty = totalDifficulty.subtract(deltaTD);
bestBlock = rollbackBlock;
// The uncle block case: it is
// start of alt chain: different
// version of block we already
// got on the main chain
long gap = bestBlock.getNumber() - block.getNumber();
if (hasParentOnTheChain(block) && gap >=0){
blockStore.deleteBlocksSince(rollbackBlock.getNumber());
logger.info("created alt chain by block.hash: [{}] ", block.getShortHash());
Chain chain = new Chain();
chain.setTotalDifficulty(totalDifficulty);
chain.tryToConnect(block);
altChains.add(chain);
channelManager.ethSync();
return;
}
// provisional, by the garbage will be
// defined how to deal with it in the
// future.
garbage.add(block);
// if there is too much garbage ask for re-sync
if (garbage.size() > 20){
worldManager.reset();
}
}
@ -466,6 +445,9 @@ public class BlockchainImpl implements Blockchain {
} else {
receiverAddress = tx.getReceiveAddress();
code = track.getCode(receiverAddress);
// on invocation the contract is created event if doesn't exist.
track.addBalance(receiverAddress, BigInteger.ZERO);
if (code != EMPTY_BYTE_ARRAY) {
if (stateLogger.isDebugEnabled())
stateLogger.debug("calling for existing contract: address={}",
@ -475,8 +457,8 @@ public class BlockchainImpl implements Blockchain {
// THE SIMPLE VALUE/BALANCE CHANGE
boolean isValueTx = tx.getValue() != null;
if (isValueTx) {
BigInteger txValue = new BigInteger(1, tx.getValue());
BigInteger txValue = new BigInteger(1, tx.getValue());
if (isValueTx && !isContractCreation) {
if (track.getBalance(senderAddress).compareTo(txValue) >= 0) {
track.addBalance(receiverAddress, txValue); // balance will be read again below
@ -495,7 +477,6 @@ public class BlockchainImpl implements Blockchain {
}
// GET TOTAL ETHER VALUE AVAILABLE FOR TX FEE
// TODO: performance improve multiply without BigInteger
BigInteger gasPrice = new BigInteger(1, tx.getGasPrice());
BigInteger gasDebit = new BigInteger(1, tx.getGasLimit()).multiply(gasPrice);
logger.info("Gas price limited to [{} wei]", gasDebit.toString());
@ -531,13 +512,16 @@ public class BlockchainImpl implements Blockchain {
// START TRACKING FOR REVERT CHANGES OPTION
Repository trackTx = track.startTracking();
trackTx.addBalance(receiverAddress, BigInteger.ZERO); // the contract created for anycase but SUICIDE call
trackTx.addBalance(receiverAddress, txValue);
track.addBalance(senderAddress, txValue.negate()); // will not be reverted
logger.info("Start tracking VM run");
try {
// CREATE NEW CONTRACT ADDRESS AND ADD TX VALUE
if(isContractCreation) {
trackTx.addBalance(receiverAddress, BigInteger.ZERO); // also creates account
if(stateLogger.isDebugEnabled())
stateLogger.debug("new contract created address={}",
Hex.toHexString(receiverAddress));
@ -563,7 +547,6 @@ public class BlockchainImpl implements Blockchain {
List<LogInfo> logs = result.getLogInfoList();
receipt.setLogInfoList(logs);
} catch (RuntimeException e) {
trackTx.rollback();
receipt.setCumulativeGas(tx.getGasLimit());

View File

@ -12,8 +12,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spongycastle.util.BigIntegers;
import java.math.BigInteger;
import java.security.SignatureException;
import static org.ethereum.util.ByteUtil.EMPTY_BYTE_ARRAY;
import static org.ethereum.util.ByteUtil.ZERO_BYTE_ARRAY;
/**
* A transaction (formally, T) is a single cryptographically
* signed instruction sent by an actor external to Ethereum.
@ -109,9 +113,6 @@ public class Transaction {
byte[] r = ((RLPItem) transaction.get(7)).getRLPData();
byte[] s = ((RLPItem) transaction.get(8)).getRLPData();
this.signature = ECDSASignature.fromComponents(r, s, v);
} else {
logger.debug("RLP encoded tx is not signed!");
}
@ -124,21 +125,27 @@ public class Transaction {
}
public byte[] getHash() {
if (!parsed) rlpParse();
byte[] plainMsg = this.getEncoded();
return HashUtil.sha3(plainMsg);
}
public byte[] getRawHash() {
if (!parsed) rlpParse();
byte[] plainMsg = this.getEncodedRaw();
return HashUtil.sha3(plainMsg);
}
public byte[] getNonce() {
if (!parsed) rlpParse();
if (nonce == null) return new byte[]{0};
return nonce;
return nonce == null ? ZERO_BYTE_ARRAY : nonce ;
}
public byte[] getValue() {
if (!parsed) rlpParse();
return value;
return value == null ? ZERO_BYTE_ARRAY : value;
}
public byte[] getReceiveAddress() {
@ -181,14 +188,14 @@ public class Transaction {
*/
public ECKey getKey() {
byte[] hash = this.getHash();
byte[] hash = this.getRawHash();
return ECKey.recoverFromSignature(signature.v, signature, hash, true);
}
public byte[] getSender() {
try {
if (sendAddress == null) {
ECKey key = ECKey.signatureToKey(getHash(), getSignature().toBase64());
ECKey key = ECKey.signatureToKey(getRawHash(), getSignature().toBase64());
sendAddress = key.getAddress();
}
return sendAddress;
@ -199,7 +206,7 @@ public class Transaction {
}
public void sign(byte[] privKeyBytes) throws MissingPrivateKeyException {
byte[] hash = this.getHash();
byte[] hash = this.getRawHash();
ECKey key = ECKey.fromPrivate(privKeyBytes).decompress();
this.signature = key.sign(hash);
this.rlpEncoded = null;
@ -272,9 +279,9 @@ public class Transaction {
r = RLP.encodeElement(BigIntegers.asUnsignedByteArray(signature.r));
s = RLP.encodeElement(BigIntegers.asUnsignedByteArray(signature.s));
} else {
v = RLP.encodeElement(new byte[0]);
r = RLP.encodeElement(new byte[0]);
s = RLP.encodeElement(new byte[0]);
v = RLP.encodeElement(EMPTY_BYTE_ARRAY);
r = RLP.encodeElement(EMPTY_BYTE_ARRAY);
s = RLP.encodeElement(EMPTY_BYTE_ARRAY);
}
this.rlpEncoded = RLP.encodeList(nonce, gasPrice, gasLimit,

View File

@ -78,6 +78,15 @@ public class BlockStore {
return hashes;
}
@Transactional
public void deleteBlocksSince(long number){
sessionFactory.getCurrentSession().
createQuery("delete from BlockVO where number > :number").
setParameter("number", number).
executeUpdate();
}
@Transactional
public void saveBlock(Block block, List<TransactionReceipt> receipts) {
@ -97,6 +106,17 @@ public class BlockStore {
sessionFactory.getCurrentSession().persist(blockVO);
}
@Transactional(readOnly = true)
public BigInteger getTotalDifficultySince(long number){
BigInteger result = (BigInteger)sessionFactory.getCurrentSession().
createQuery("select sum(cummulativeDifficulty) from BlockVO where number > :number").
setParameter("number", number).
uniqueResult();
return result;
}
@Transactional(readOnly = true)
public BigInteger getTotalDifficulty(){

View File

@ -5,6 +5,7 @@ import io.netty.channel.ChannelHandlerContext;
import org.ethereum.listener.EthereumListener;
import org.ethereum.manager.WorldManager;
import org.ethereum.net.message.*;
import org.ethereum.net.p2p.PingMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -41,6 +42,7 @@ public class MessageQueue {
@Autowired
WorldManager worldManager;
boolean hasPing = false;
public MessageQueue(){}
@ -54,6 +56,12 @@ public class MessageQueue {
}
public void sendMessage(Message msg) {
if (msg instanceof PingMessage && hasPing)
return;
if (msg instanceof PingMessage && !hasPing)
hasPing = true;
messageQueue.add(new MessageRoundtrip(msg));
}
@ -65,6 +73,8 @@ public class MessageQueue {
MessageRoundtrip messageRoundtrip = messageQueue.peek();
Message waitingMessage = messageRoundtrip.getMsg();
if (waitingMessage instanceof PingMessage) hasPing = false;
if (waitingMessage.getAnswerMessage() != null
&& msg.getClass() == waitingMessage.getAnswerMessage()) {
messageRoundtrip.answer();

View File

@ -3,6 +3,7 @@ package org.ethereum.net.eth;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.ethereum.core.Block;
import org.ethereum.core.Genesis;
import org.ethereum.core.Transaction;
import org.ethereum.facade.Blockchain;
import org.ethereum.manager.WorldManager;
@ -57,6 +58,8 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
private boolean active = false;
private StatusMessage handshakeStatusMessage = null;
private BigInteger totalDifficulty = Genesis.getInstance().getCumulativeDifficulty();
private boolean peerDiscoveryMode = false;
private Timer getBlocksTimer = new Timer("GetBlocksTimer");
@ -68,6 +71,7 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
@Autowired
private WorldManager worldManager;
private List<byte[]> sentHashes;
private Block lastBlock = Genesis.getInstance();
public EthHandler(){
this.peerDiscoveryMode = false;
@ -261,6 +265,9 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
List<Block> blockList = blocksMessage.getBlocks();
if (!blockList.isEmpty())
lastBlock = blockList.get(blockList.size()-1);
// check if you got less blocks than you asked
if (blockList.size() < sentHashes.size()){
for (int i = 0; i < blockList.size(); ++i)
@ -282,6 +289,10 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
blockchain.getQueue().logHashQueueSize();
sendGetBlocks();
}
for (Block block : blockList){
totalDifficulty.add(block.getCumulativeDifficulty());
}
}
@ -292,11 +303,14 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
public void procesNewBlock(NewBlockMessage newBlockMessage){
Block newBlock = newBlockMessage.getBlock();
this.lastBlock = newBlock;
// If the hashes still being downloaded ignore the NEW_BLOCKs
// that block hash will be retrieved by the others and letter the block itself
if (syncStatus == SyncSatus.INIT || syncStatus == SyncSatus.HASH_RETRIEVING) {
logger.debug("Sync status INIT or HASH_RETREIVING ignore new block.index: [{}]", newBlock.getNumber());
logger.debug("Sync status INIT or HASH_RETREIVING adding to hashes new block.index: [{}]",
newBlock.getNumber());
blockchain.getQueue().addNewBlockHash(newBlock.getHash());
return;
}
@ -312,32 +326,12 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
// here is post sync process
logger.info("New block received: block.index [{}]", newBlock.getNumber());
/*
if (blockchain.hasParentOnTheChain(newBlock) && gap <=0){
//todo: here we create an alternative chain.
return;
}
if (!blockchain.hasParentOnTheChain(newBlock)){
//todo: here we check if one of alt chains is connecting this guy
return;
}
if (blockchain.hasParentOnTheChain(newBlock) && gap > 1){
logger.error("Gap in the chain, go out of sync");
this.syncStatus = SyncSatus.HASH_RETRIEVING;
blockchain.getQueue().addHash(newBlock.getHash());
sendGetBlockHashes();
return;
}
*/
// adding block to the queue
// there will be decided how to
// connect it to the chain
blockchain.getQueue().addBlock(newBlockMessage.getBlock());
blockchain.getQueue().logHashQueueSize();
totalDifficulty = new BigInteger(1, newBlockMessage.getDifficulty());
}
private void sendStatus(){
@ -489,11 +483,15 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
SYNC_DONE;
}
public void doSync(){
public void setBestHash(byte[] hash){
blockchain.getQueue().addHash(hash);
}
public void doSync(){
logger.info("Sync force activated");
syncStatus = SyncSatus.INIT;
sendStatus();
syncStatus = SyncSatus.HASH_RETRIEVING;
setBestHash(lastBlock.getHash());
sendGetBlockHashes();
}
public StatusMessage getHandshakeStatusMessage(){
@ -507,4 +505,8 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
public void setPeerDiscoveryMode(boolean peerDiscoveryMode) {
this.peerDiscoveryMode = peerDiscoveryMode;
}
public BigInteger getTotalDifficulty() {
return totalDifficulty;
}
}

View File

@ -126,7 +126,7 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
break;
case PING:
msgQueue.receivedMessage(msg);
msgQueue.sendMessage(PONG_MESSAGE);
ctx.writeAndFlush(PONG_MESSAGE);
break;
case PONG:
msgQueue.receivedMessage(msg);
@ -284,6 +284,8 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
if (tearDown) cancel();
msgQueue.sendMessage(PING_MESSAGE);
}
}, 2000, 5000);

View File

@ -12,6 +12,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.math.BigInteger;
/**
* www.etherj.com
*
@ -91,5 +93,12 @@ public class Channel {
}
public BigInteger getTotalDifficulty(){
return ethHandler.getTotalDifficulty();
}
public void ethSync(){
ethHandler.doSync();
}
}

View File

@ -98,4 +98,17 @@ public class ChannelManager {
}
}, 2000, 5000);
}
public void ethSync() {
Channel bestChannel = channels.get(0);
for (Channel channel : channels){
if (bestChannel.getTotalDifficulty().
compareTo(channel.getTotalDifficulty()) < 0 ){
bestChannel = channel;
}
}
bestChannel.ethSync();
}
}

View File

@ -11,7 +11,8 @@ import org.spongycastle.util.encoders.Hex;
public class ByteUtil {
public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
public static final byte[] ZERO_BYTE_ARRAY = new byte[]{0};
/**
* Creates a copy of bytes and appends b to the end of it

View File

@ -204,6 +204,34 @@ public class BlockTest {
Hex.toHexString(worldManager.getRepository().getRoot()));
}
@Test
public void testScenario2() throws URISyntaxException, IOException {
BlockchainImpl blockchain = (BlockchainImpl)worldManager.getBlockchain();
URL scenario1 = ClassLoader
.getSystemResource("blockload/scenario2.dmp");
File file = new File(scenario1.toURI());
List<String> strData = Files.readAllLines(file.toPath(), StandardCharsets.UTF_8);
byte[] root = Genesis.getInstance().getStateRoot();
for(String blockRLP : strData){
Block block = new Block(
Hex.decode(blockRLP));
logger.info("sending block.hash: {}", Hex.toHexString( block.getHash() ));
blockchain.tryToConnect(block);
root = block.getStateRoot();
}
logger.info("asserting root state is: {}", Hex.toHexString( root ));
//expected root: a5e2a18bdbc4ab97775f44852382ff5585b948ccb15b1d69f0abb71e2d8f727d
assertEquals(Hex.toHexString(root),
Hex.toHexString(worldManager.getRepository().getRoot()));
}
@Test