+ catch up on lost NEW_BLOCK packet

- temporary cancel rollback on fork
This commit is contained in:
Roman Mandeleil 2014-12-28 22:59:24 +02:00
parent e5d2a32a13
commit 4ab4d4d919
8 changed files with 109 additions and 84 deletions

View File

@ -8,8 +8,8 @@ INDENT, DEDENT }
@lexer::header { @lexer::header {
import com.yuvalshavit.antlr4.DenterHelper; import com.yuvalshavit.antlr4.DenterHelper;
} }
@lexer::members { @lexer::members {
private final DenterHelper denter = new DenterHelper(NL, SerpentParser.INDENT, SerpentParser.DEDENT) { private final DenterHelper denter = new DenterHelper(NL, SerpentParser.INDENT, SerpentParser.DEDENT) {
@Override @Override

View File

@ -9,12 +9,9 @@ import org.ethereum.net.BlockQueue;
import org.ethereum.net.server.ChannelManager; import org.ethereum.net.server.ChannelManager;
import org.ethereum.util.AdvancedDeviceUtils; import org.ethereum.util.AdvancedDeviceUtils;
import org.ethereum.vm.ProgramInvokeFactory; import org.ethereum.vm.ProgramInvokeFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.spongycastle.util.encoders.Hex; import org.spongycastle.util.encoders.Hex;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.FileSystemUtils; import org.springframework.util.FileSystemUtils;
@ -23,9 +20,7 @@ import java.io.BufferedWriter;
import java.io.File; import java.io.File;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -166,8 +161,15 @@ public class BlockchainImpl implements Blockchain {
return; return;
} }
if (!hasParentOnTheChain(block) && block.getNumber() > bestBlock.getNumber()) { // cut on the chain got lastBlock + 1 > n
if (block.getNumber() > bestBlock.getNumber() + 1){
channelManager.ethSync();
}
if (!hasParentOnTheChain(block) && block.getNumber() > bestBlock.getNumber()){
if (1==1)return; // todo: temporary cancel the rollback
logger.info("*** Blockchain will rollback and resynchronise now "); logger.info("*** Blockchain will rollback and resynchronise now ");
long rollbackIdx = bestBlock.getNumber() - 30; long rollbackIdx = bestBlock.getNumber() - 30;
@ -182,6 +184,7 @@ public class BlockchainImpl implements Blockchain {
blockStore.deleteBlocksSince(rollbackBlock.getNumber()); blockStore.deleteBlocksSince(rollbackBlock.getNumber());
blockQueue.clear();
channelManager.ethSync(); channelManager.ethSync();
return; return;
} }
@ -230,6 +233,7 @@ public class BlockchainImpl implements Blockchain {
logger.info("Sync done"); logger.info("Sync done");
syncDoneCalled = true; syncDoneCalled = true;
listener.onSyncDone(); listener.onSyncDone();
} }
} }
@ -412,8 +416,9 @@ public class BlockchainImpl implements Blockchain {
if (logger.isDebugEnabled()) if (logger.isDebugEnabled())
logger.debug("block added to the blockChain: index: [{}]", block.getNumber()); logger.debug("block added to the blockChain: index: [{}]", block.getNumber());
if (block.getNumber() % 100 == 0) if (block.getNumber() % 100 == 0)
logger.info("*** Last block added [ #{} ]", block.getNumber()); logger.info("*** Last block added [ #{} ]", block.getNumber());
}
}
public boolean hasParentOnTheChain(Block block) { public boolean hasParentOnTheChain(Block block) {

View File

@ -65,29 +65,32 @@ public class BlockQueue {
@Autowired @Autowired
Blockchain blockchain; Blockchain blockchain;
public BlockQueue() { public BlockQueue() {
timer.scheduleAtFixedRate(new TimerTask() { timer.scheduleAtFixedRate(new TimerTask() {
public void run() { public void run() {
nudgeQueue(); nudgeQueue();
}
}, 10, 10);
}
/**
* Processing the queue adding blocks to the chain.
*/
private void nudgeQueue() {
try {
if (blockReceivedQueue.isEmpty())
return;
logger.info("BlockQueue size: {}", blockReceivedQueue.size());
while(!blockReceivedQueue.isEmpty()){
Block block = blockReceivedQueue.poll();
logger.info("Processing block index: {}", block.getNumber());
blockchain.tryToConnect(block);
} }
}, 10, 10); } catch (Throwable e) {
} logger.error("Error: ", e.getMessage());
/**
* Processing the queue adding blocks to the chain.
*/
private void nudgeQueue() {
if (blockReceivedQueue.isEmpty())
return;
logger.info("BlockQueue size: {}", blockReceivedQueue.size());
while (!blockReceivedQueue.isEmpty()) {
Block block = blockReceivedQueue.poll();
logger.info("Processing block index: {}", block.getNumber());
blockchain.tryToConnect(block);
} }
} }
/** /**

View File

@ -19,6 +19,8 @@ import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import static org.ethereum.net.message.StaticMessages.DISCONNECT_MESSAGE;
/** /**
* This class contains the logic for sending messages in a queue * This class contains the logic for sending messages in a queue
* *
@ -70,7 +72,12 @@ public class MessageQueue {
messageQueue.add(new MessageRoundtrip(msg)); messageQueue.add(new MessageRoundtrip(msg));
} }
public void receivedMessage(Message msg) throws InterruptedException { public void disconnect(){
ctx.writeAndFlush(DISCONNECT_MESSAGE);
ctx.close();
}
public void receivedMessage(Message msg) throws InterruptedException {
worldManager.getListener().trace("[Recv: " + msg + "]"); worldManager.getListener().trace("[Recv: " + msg + "]");

View File

@ -17,7 +17,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -158,11 +157,15 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
case NEW_BLOCK: case NEW_BLOCK:
msgQueue.receivedMessage(msg); msgQueue.receivedMessage(msg);
procesNewBlock((NewBlockMessage) msg); procesNewBlock((NewBlockMessage) msg);
case PACKET_COUNT:
break;
default: default:
break; break;
} }
} }
private void processTransactions(TransactionsMessage msg) { private void processTransactions(TransactionsMessage msg) {
Set<Transaction> txSet = msg.getTransactions(); Set<Transaction> txSet = msg.getTransactions();
@ -246,6 +249,9 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
private void processBlockHashes(BlockHashesMessage blockHashesMessage) { private void processBlockHashes(BlockHashesMessage blockHashesMessage) {
List<byte[]> receivedHashes = blockHashesMessage.getBlockHashes(); List<byte[]> receivedHashes = blockHashesMessage.getBlockHashes();
// receivedHashes.forEach(hash -> System.out.println(Hex.toHexString(hash)));
BlockQueue chainQueue = blockchain.getQueue(); BlockQueue chainQueue = blockchain.getQueue();
// result is empty, peer has no more hashes // result is empty, peer has no more hashes
@ -279,8 +285,11 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
List<Block> blockList = blocksMessage.getBlocks(); List<Block> blockList = blocksMessage.getBlocks();
if (!blockList.isEmpty()) if (!blockList.isEmpty()){
lastBlock = blockList.get(blockList.size() - 1); Block block = blockList.get(blockList.size()-1);
if (block.getNumber() > lastBlock.getNumber())
lastBlock = blockList.get(blockList.size()-1);
}
// check if you got less blocks than you asked // check if you got less blocks than you asked
if (blockList.size() < sentHashes.size()) { if (blockList.size() < sentHashes.size()) {
@ -318,7 +327,9 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
public void procesNewBlock(NewBlockMessage newBlockMessage) { public void procesNewBlock(NewBlockMessage newBlockMessage) {
Block newBlock = newBlockMessage.getBlock(); Block newBlock = newBlockMessage.getBlock();
this.lastBlock = newBlock;
if (newBlock.getNumber() > this.lastBlock.getNumber())
this.lastBlock = newBlock;
// If the hashes still being downloaded ignore the NEW_BLOCKs // If the hashes still being downloaded ignore the NEW_BLOCKs
// that block hash will be retrieved by the others and letter the block itself // that block hash will be retrieved by the others and letter the block itself
@ -502,8 +513,8 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
blockchain.getQueue().addHash(hash); blockchain.getQueue().addHash(hash);
} }
public void doSync() { public void doSync(){
logger.info("Sync force activated"); logger.info("Sync force activated, block: {}", lastBlock);
syncStatus = SyncSatus.HASH_RETRIEVING; syncStatus = SyncSatus.HASH_RETRIEVING;
setBestHash(lastBlock.getHash()); setBestHash(lastBlock.getHash());
sendGetBlockHashes(); sendGetBlockHashes();

View File

@ -5,11 +5,7 @@ import org.ethereum.crypto.HashUtil;
import org.ethereum.net.client.Capability; import org.ethereum.net.client.Capability;
import org.ethereum.net.eth.EthHandler; import org.ethereum.net.eth.EthHandler;
import org.ethereum.net.eth.GetTransactionsMessage; import org.ethereum.net.eth.GetTransactionsMessage;
import org.ethereum.net.p2p.GetPeersMessage; import org.ethereum.net.p2p.*;
import org.ethereum.net.p2p.HelloMessage;
import org.ethereum.net.p2p.P2pHandler;
import org.ethereum.net.p2p.PingMessage;
import org.ethereum.net.p2p.PongMessage;
import org.ethereum.net.shh.ShhHandler; import org.ethereum.net.shh.ShhHandler;
import org.spongycastle.util.encoders.Hex; import org.spongycastle.util.encoders.Hex;
@ -26,37 +22,38 @@ import java.util.List;
*/ */
public class StaticMessages { public class StaticMessages {
public static final String PEER_ID = Hex.toHexString(HashUtil.randomPeerId()); public static final String PEER_ID = Hex.toHexString(HashUtil.randomPeerId());
public final static PingMessage PING_MESSAGE = new PingMessage();
public final static PongMessage PONG_MESSAGE = new PongMessage();
public final static HelloMessage HELLO_MESSAGE = generateHelloMessage();
public final static GetPeersMessage GET_PEERS_MESSAGE = new GetPeersMessage();
public final static GetTransactionsMessage GET_TRANSACTIONS_MESSAGE = new GetTransactionsMessage();
public final static DisconnectMessage DISCONNECT_MESSAGE = new DisconnectMessage(ReasonCode.REQUESTED);
public final static PingMessage PING_MESSAGE = new PingMessage(); public static final byte[] SYNC_TOKEN = Hex.decode("22400891");
public final static PongMessage PONG_MESSAGE = new PongMessage();
public final static HelloMessage HELLO_MESSAGE = generateHelloMessage();
public final static GetPeersMessage GET_PEERS_MESSAGE = new GetPeersMessage();
public final static GetTransactionsMessage GET_TRANSACTIONS_MESSAGE = new GetTransactionsMessage();
public static final byte[] SYNC_TOKEN = Hex.decode("22400891"); private static HelloMessage generateHelloMessage() {
String helloAnnouncement = buildHelloAnnouncement();
byte p2pVersion = P2pHandler.VERSION;
List<Capability> capabilities = Arrays.asList(
new Capability(Capability.ETH, EthHandler.VERSION),
new Capability(Capability.SHH, ShhHandler.VERSION));
int listenPort = SystemProperties.CONFIG.listenPort();
private static HelloMessage generateHelloMessage() { return new HelloMessage(p2pVersion, helloAnnouncement,
String helloAnnouncement = buildHelloAnnouncement(); capabilities, listenPort, PEER_ID);
byte p2pVersion = P2pHandler.VERSION; }
List<Capability> capabilities = Arrays.asList(
new Capability(Capability.ETH, EthHandler.VERSION),
new Capability(Capability.SHH, ShhHandler.VERSION));
int listenPort = SystemProperties.CONFIG.listenPort();
return new HelloMessage(p2pVersion, helloAnnouncement, private static String buildHelloAnnouncement() {
capabilities, listenPort, PEER_ID); String version = SystemProperties.CONFIG.projectVersion();
} String system = System.getProperty("os.name");
if (system.contains(" "))
system = system.substring(0, system.indexOf(" "));
if (System.getProperty("java.vm.vendor").contains("Android"))
system = "Android";
String phrase = SystemProperties.CONFIG.helloPhrase();
private static String buildHelloAnnouncement() { return String.format("Ethereum(J)/v%s/%s/%s/Java", version, phrase, system);
String version = SystemProperties.CONFIG.projectVersion(); }
String system = System.getProperty("os.name");
if (system.contains(" "))
system = system.substring(0, system.indexOf(" "));
if (System.getProperty("java.vm.vendor").contains("Android"))
system = "Android";
String phrase = SystemProperties.CONFIG.helloPhrase();
return String.format("Ethereum(J)/v%s/%s/%s/Java", version, phrase, system);
}
} }

View File

@ -1,5 +1,7 @@
package org.ethereum.net.p2p; package org.ethereum.net.p2p;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.ethereum.core.Block; import org.ethereum.core.Block;
import org.ethereum.core.Transaction; import org.ethereum.core.Transaction;
import org.ethereum.manager.WorldManager; import org.ethereum.manager.WorldManager;
@ -14,30 +16,19 @@ import org.ethereum.net.message.StaticMessages;
import org.ethereum.net.peerdiscovery.PeerInfo; import org.ethereum.net.peerdiscovery.PeerInfo;
import org.ethereum.net.shh.ShhHandler; import org.ethereum.net.shh.ShhHandler;
import org.ethereum.net.shh.ShhMessageCodes; import org.ethereum.net.shh.ShhMessageCodes;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import static org.ethereum.net.message.StaticMessages.*; import static org.ethereum.net.message.StaticMessages.*;
/** /**
* Process the basic protocol messages between every peer on the network. * Process the basic protocol messages between every peer on the network.
* *
@ -259,6 +250,9 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
msgQueue.sendMessage(msg); msgQueue.sendMessage(msg);
} }
public void sendDisconnect(){
msgQueue.disconnect();
}
public void adaptMessageIds(List<Capability> capabilities) { public void adaptMessageIds(List<Capability> capabilities) {
@ -283,6 +277,8 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
return handshakeHelloMessage; return handshakeHelloMessage;
} }
private void startTimers() { private void startTimers() {
// sample for pinging in background // sample for pinging in background

View File

@ -107,6 +107,12 @@ public class ChannelManager {
}, 2000, 5000); }, 2000, 5000);
} }
public void reconnect(){
for (Channel channel : channels){
channel.p2pHandler.sendDisconnect();
}
}
public void ethSync() { public void ethSync() {
Channel bestChannel = channels.get(0); Channel bestChannel = channels.get(0);