Net layer packet interfere bug:

+ temporary turn off all the parallel traffic, in order to fully download the chain
This commit is contained in:
romanman 2014-07-08 20:25:32 +01:00
parent 982c500909
commit 8dc0a56644
2 changed files with 29 additions and 3 deletions

View File

@ -3,6 +3,9 @@ package org.ethereum.net.client;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.CorruptedFrameException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List; import java.util.List;
@ -13,6 +16,8 @@ import java.util.List;
*/ */
public class EthereumFrameDecoder extends ByteToMessageDecoder { public class EthereumFrameDecoder extends ByteToMessageDecoder {
private Logger logger = LoggerFactory.getLogger("wire");
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
@ -27,21 +32,26 @@ public class EthereumFrameDecoder extends ByteToMessageDecoder {
(magicBytes >> 8 & 0xFF) == 0x08 && (magicBytes >> 8 & 0xFF) == 0x08 &&
(magicBytes & 0xFF) == 0x91 )) { (magicBytes & 0xFF) == 0x91 )) {
logger.error("abandon garbage, wrong magic bytes: [ {} ] msgSize: [ {} ]", magicBytes, msgSize);
ctx.close(); ctx.close();
} }
// Don't have the full packet yet // Don't have the full packet yet
if (msgSize > in.readableBytes()) { if (msgSize > in.readableBytes()) {
logger.debug("msg decode: magicBytes: [ {} ], readBytes: [ {} ] / msgSize: [ {} ] ", magicBytes, in.readableBytes(), msgSize);
in.resetReaderIndex(); in.resetReaderIndex();
return; return;
} }
logger.debug("message fully constructed go handle it: readBytes: [ {} ] / msgSize: [ {} ]", in.readableBytes(), msgSize);
byte[] decoded = new byte[(int)msgSize]; byte[] decoded = new byte[(int)msgSize];
in.readBytes(decoded); in.readBytes(decoded);
out.add(decoded); out.add(decoded);
// Chop the achieved data.
in.markReaderIndex(); in.markReaderIndex();
} }
} }

View File

@ -43,7 +43,7 @@ public class EthereumProtocolHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger("wire"); private Logger logger = LoggerFactory.getLogger("wire");
private Timer chainAskTimer = new Timer(); private Timer chainAskTimer = new Timer();
private int secToAskForChain = 1; private int secToAskForChain = 7;
private final Timer timer = new Timer(); private final Timer timer = new Timer();
private final static byte[] MAGIC_PREFIX = {(byte)0x22, (byte)0x40, (byte)0x08, (byte)0x91}; private final static byte[] MAGIC_PREFIX = {(byte)0x22, (byte)0x40, (byte)0x08, (byte)0x91};
@ -76,6 +76,7 @@ public class EthereumProtocolHandler extends ChannelInboundHandlerAdapter {
ctx.writeAndFlush(buffer); ctx.writeAndFlush(buffer);
// sample for pinging in background // sample for pinging in background
/*
timer.scheduleAtFixedRate(new TimerTask() { timer.scheduleAtFixedRate(new TimerTask() {
public void run() { public void run() {
@ -94,7 +95,9 @@ public class EthereumProtocolHandler extends ChannelInboundHandlerAdapter {
sendPing(ctx); sendPing(ctx);
} }
}, 2000, 5000); }, 2000, 5000);
*/
/*
timer.scheduleAtFixedRate(new TimerTask() { timer.scheduleAtFixedRate(new TimerTask() {
public void run() { public void run() {
@ -110,14 +113,18 @@ public class EthereumProtocolHandler extends ChannelInboundHandlerAdapter {
sendGetTransactions(ctx); sendGetTransactions(ctx);
} }
}, 2000, 30000); }, 2000, 30000);
*/
/*
chainAskTimer.schedule(new TimerTask() { chainAskTimer.schedule(new TimerTask() {
public void run() { public void run() {
logger.info("[Send: GET_CHAIN]");
sendGetChain(ctx); sendGetChain(ctx);
} }
}, 3000, secToAskForChain * 1000); }, 3000, secToAskForChain * 1000);
*/
sendGetChain(ctx);
} }
@ -128,6 +135,7 @@ public class EthereumProtocolHandler extends ChannelInboundHandlerAdapter {
logger.info("[Recv msg: [{}] ]", Hex.toHexString(payload)); logger.info("[Recv msg: [{}] ]", Hex.toHexString(payload));
byte command = RLP.getCommandCode(payload); byte command = RLP.getCommandCode(payload);
// got HELLO // got HELLO
if (Command.fromInt(command) == HELLO) { if (Command.fromInt(command) == HELLO) {
logger.info("[Recv: HELLO]" ); logger.info("[Recv: HELLO]" );
@ -223,6 +231,7 @@ public class EthereumProtocolHandler extends ChannelInboundHandlerAdapter {
// If we get one block from a peer // If we get one block from a peer
// we ask less greedy // we ask less greedy
/*
if (blockList.size() <= 1 && secToAskForChain != 10) { if (blockList.size() <= 1 && secToAskForChain != 10) {
logger.info("Now we ask for a chain each 10 seconds"); logger.info("Now we ask for a chain each 10 seconds");
@ -239,9 +248,11 @@ public class EthereumProtocolHandler extends ChannelInboundHandlerAdapter {
} }
}, 3000, secToAskForChain * 1000); }, 3000, secToAskForChain * 1000);
} }
*/
// If we get more blocks from a peer // If we get more blocks from a peer
// we ask more greedy // we ask more greedy
/*
if (blockList.size() > 2 && secToAskForChain != 1) { if (blockList.size() > 2 && secToAskForChain != 1) {
logger.info("Now we ask for a chain each 1 seconds"); logger.info("Now we ask for a chain each 1 seconds");
@ -258,9 +269,13 @@ public class EthereumProtocolHandler extends ChannelInboundHandlerAdapter {
} }
}, 3000, secToAskForChain * 1000); }, 3000, secToAskForChain * 1000);
} }
*/
WorldManager.getInstance().getBlockChain().addBlocks(blockList); WorldManager.getInstance().getBlockChain().addBlocks(blockList);
if (peerListener != null) peerListener.console(blocksMessage.toString()); if (peerListener != null) peerListener.console(blocksMessage.toString());
if (blockList.size() > 1)
sendGetChain(ctx);
} }
// got GETCHAIN // got GETCHAIN
@ -359,6 +374,7 @@ public class EthereumProtocolHandler extends ChannelInboundHandlerAdapter {
private void sendGetChain(ChannelHandlerContext ctx) { private void sendGetChain(ChannelHandlerContext ctx) {
logger.info("[Send: GET_CHAIN]");
byte[] hash = WorldManager.getInstance().getBlockChain().getLastBlock().getHash(); byte[] hash = WorldManager.getInstance().getBlockChain().getLastBlock().getHash();
GetChainMessage chainMessage = new GetChainMessage((byte)100, hash); GetChainMessage chainMessage = new GetChainMessage((byte)100, hash);
chainMessage.toString(); chainMessage.toString();