Retrieve block hashes from single peer
This commit is contained in:
parent
f877d09f5a
commit
4f901c8d16
|
@ -17,10 +17,9 @@ public class BlockQueue {
|
|||
|
||||
private static Logger logger = LoggerFactory.getLogger("blockchain");
|
||||
|
||||
private Queue<byte[]> blockHashQueue = new ConcurrentLinkedQueue<>();
|
||||
private Deque<byte[]> blockHashQueue = new ArrayDeque<>();
|
||||
private Queue<Block> blockReceivedQueue = new ConcurrentLinkedQueue<>();
|
||||
private BigInteger highestTotalDifficulty;
|
||||
private byte[] bestHash;
|
||||
private Block lastBlock;
|
||||
|
||||
private Timer timer = new Timer("BlockQueueTimer");
|
||||
|
@ -74,13 +73,19 @@ public class BlockQueue {
|
|||
return lastBlock;
|
||||
}
|
||||
|
||||
public void setBestHash(byte[] bestHash) {
|
||||
this.bestHash = bestHash;
|
||||
public void setBestHash(byte[] hash) {
|
||||
blockHashQueue.clear();
|
||||
blockHashQueue.addLast(hash);
|
||||
}
|
||||
|
||||
public byte[] getBestHash() {
|
||||
return bestHash;
|
||||
return blockHashQueue.peekLast();
|
||||
}
|
||||
|
||||
public void addHash(byte[] hash) {
|
||||
blockHashQueue.addLast(hash);
|
||||
}
|
||||
|
||||
public List<byte[]> getHashes(int amount) {
|
||||
List<byte[]> hashes = new ArrayList<>();
|
||||
for (int i = 0; i < amount; i++) {
|
||||
|
|
|
@ -13,14 +13,14 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|||
* This class contains the logic for sending messages in a queue
|
||||
*
|
||||
* Messages open by send and answered by receive of appropriate message
|
||||
* GET_BLOCK_HASHES by BLOCK_HASHES
|
||||
* GET_BLOCKS by BLOCKS
|
||||
* PING by PONG
|
||||
* GET_PEERS by PEERS
|
||||
* GET_TRANSACTIONS by TRANSACTIONS
|
||||
* GET_BLOCK_HASHES by BLOCK_HASHES
|
||||
* GET_BLOCKS by BLOCKS
|
||||
*
|
||||
* The following messages will not be answered:
|
||||
* PONG, PEERS, BLOCKS, TRANSACTIONS
|
||||
* PONG, PEERS, HELLO, STATUS, TRANSACTIONS, BLOCKS
|
||||
*
|
||||
* @author Roman Mandeleil
|
||||
*/
|
||||
|
@ -52,12 +52,6 @@ public class MessageQueue {
|
|||
|
||||
if (listener != null)
|
||||
listener.console("[Recv: " + msg + "]");
|
||||
if (logger.isInfoEnabled())
|
||||
// && msg.getCommand() != Command.PING
|
||||
// && msg.getCommand() != Command.PONG
|
||||
// && msg.getCommand() != Command.PEERS
|
||||
// && msg.getCommand() != Command.GET_PEERS)
|
||||
logger.info("From: \t{} \tRecv: \t{}", ctx.channel().remoteAddress(), msg);
|
||||
|
||||
if (messageQueue.peek() != null) {
|
||||
MessageRoundtrip messageRoundtrip = messageQueue.peek();
|
||||
|
@ -72,47 +66,33 @@ public class MessageQueue {
|
|||
}
|
||||
}
|
||||
|
||||
private void nudgeQueue() {
|
||||
|
||||
// The message was answered, remove from the queue
|
||||
if (messageQueue.peek() != null) {
|
||||
MessageRoundtrip messageRoundtrip = messageQueue.peek();
|
||||
if (messageRoundtrip.isAnswered()) {
|
||||
messageQueue.remove();
|
||||
}
|
||||
}
|
||||
|
||||
// Now send the next message
|
||||
if (messageQueue.peek() != null) {
|
||||
|
||||
MessageRoundtrip messageRoundtrip = messageQueue.peek();
|
||||
if (messageRoundtrip.getRetryTimes() == 0) {
|
||||
// TODO: retry logic || messageRoundtrip.hasToRetry()){
|
||||
|
||||
Message msg = messageRoundtrip.getMsg();
|
||||
sendToWire(msg);
|
||||
|
||||
if (msg.getAnswerMessage() == null)
|
||||
messageQueue.remove();
|
||||
else {
|
||||
messageRoundtrip.incRetryTimes();
|
||||
messageRoundtrip.saveTime();
|
||||
}
|
||||
}
|
||||
}
|
||||
private void removeAnsweredMessage(MessageRoundtrip messageRoundtrip) {
|
||||
if (messageRoundtrip != null && messageRoundtrip.isAnswered())
|
||||
messageQueue.remove();
|
||||
}
|
||||
|
||||
private void sendToWire(Message msg) {
|
||||
private void nudgeQueue() {
|
||||
// remove last answered message on the queue
|
||||
removeAnsweredMessage(messageQueue.peek());
|
||||
// Now send the next message
|
||||
sendToWire(messageQueue.peek());
|
||||
}
|
||||
|
||||
if (listener != null)
|
||||
listener.console("[Send: " + msg + "]");
|
||||
if (logger.isInfoEnabled())
|
||||
// && msg.getCommand() != Command.PING
|
||||
// && msg.getCommand() != Command.PONG
|
||||
// && msg.getCommand() != Command.PEERS
|
||||
// && msg.getCommand() != Command.GET_PEERS)
|
||||
logger.info("To: \t{} \tSend: \t{}", ctx.channel().remoteAddress(), msg);
|
||||
private void sendToWire(MessageRoundtrip messageRoundtrip) {
|
||||
|
||||
ctx.writeAndFlush(msg);
|
||||
if (messageRoundtrip != null && messageRoundtrip.getRetryTimes() == 0) {
|
||||
// TODO: retry logic || messageRoundtrip.hasToRetry()){
|
||||
|
||||
Message msg = messageRoundtrip.getMsg();
|
||||
|
||||
ctx.writeAndFlush(msg);
|
||||
|
||||
if (msg.getAnswerMessage() == null)
|
||||
messageQueue.remove();
|
||||
else {
|
||||
messageRoundtrip.incRetryTimes();
|
||||
messageRoundtrip.saveTime();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ public class PeerClient {
|
|||
private Logger logger = LoggerFactory.getLogger("wire");
|
||||
|
||||
private PeerListener peerListener;
|
||||
private P2pHandler handler;
|
||||
private P2pHandler p2pHandler;
|
||||
|
||||
public PeerClient() {
|
||||
}
|
||||
|
@ -44,6 +44,8 @@ public class PeerClient {
|
|||
if (peerListener != null)
|
||||
peerListener.console("Connecting to: " + host + ":" + port);
|
||||
|
||||
p2pHandler = new P2pHandler(peerListener);
|
||||
|
||||
try {
|
||||
Bootstrap b = new Bootstrap();
|
||||
b.group(workerGroup);
|
||||
|
@ -61,7 +63,7 @@ public class PeerClient {
|
|||
new ReadTimeoutHandler(CONFIG.peerChannelReadTimeout(), TimeUnit.SECONDS));
|
||||
ch.pipeline().addLast(new PacketDecoder());
|
||||
ch.pipeline().addLast(new PacketEncoder());
|
||||
ch.pipeline().addLast(new P2pHandler(peerListener));
|
||||
ch.pipeline().addLast(p2pHandler);
|
||||
// limit the size of receiving buffer to 1024
|
||||
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(32368));
|
||||
ch.config().setOption(ChannelOption.SO_RCVBUF, 32368);
|
||||
|
@ -81,7 +83,7 @@ public class PeerClient {
|
|||
} finally {
|
||||
workerGroup.shutdownGracefully();
|
||||
|
||||
handler.killTimers();
|
||||
p2pHandler.killTimers();
|
||||
|
||||
final Set<Peer> peers = WorldManager.getInstance().getPeerDiscovery().getPeers();
|
||||
|
||||
|
@ -99,7 +101,7 @@ public class PeerClient {
|
|||
this.peerListener = peerListener;
|
||||
}
|
||||
|
||||
public P2pHandler getHandler() {
|
||||
return handler;
|
||||
public P2pHandler getP2pHandler() {
|
||||
return p2pHandler;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import static org.ethereum.config.SystemProperties.CONFIG;
|
|||
import java.math.BigInteger;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
|
@ -18,7 +19,6 @@ import org.ethereum.core.Block;
|
|||
import org.ethereum.core.BlockQueue;
|
||||
import org.ethereum.core.Transaction;
|
||||
import org.ethereum.facade.Blockchain;
|
||||
import org.ethereum.listener.EthereumListener;
|
||||
import org.ethereum.manager.WorldManager;
|
||||
import org.ethereum.net.MessageQueue;
|
||||
import org.ethereum.net.PeerListener;
|
||||
|
@ -32,6 +32,7 @@ import org.ethereum.net.message.ReasonCode;
|
|||
import org.ethereum.net.message.StatusMessage;
|
||||
import org.ethereum.net.message.TransactionsMessage;
|
||||
import org.ethereum.util.ByteUtil;
|
||||
import org.ethereum.util.FastByteComparisons;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -53,18 +54,19 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
|
|||
|
||||
private final static Logger logger = LoggerFactory.getLogger("wire");
|
||||
|
||||
private Timer getBlocksTimer = new Timer("GetBlocksTimer");
|
||||
private int secToAskForBlocks = 1;
|
||||
|
||||
private Blockchain blockchain;
|
||||
private PeerListener peerListener;
|
||||
private EthereumListener listener;
|
||||
|
||||
private static boolean hashRetrievalLocked = false;
|
||||
private MessageQueue msgQueue = null;
|
||||
private final Timer timer = new Timer("MiscMessageTimer");
|
||||
|
||||
private Timer getBlocksTimer = new Timer("GetBlocksTimer");
|
||||
private Timer getHashesTimer = new Timer("GetBlockHashesTimer");
|
||||
private Timer getTxTimer = new Timer("GetTransactionsTimer");
|
||||
|
||||
public EthHandler() {
|
||||
this.listener = WorldManager.getInstance().getListener();
|
||||
this.blockchain = WorldManager.getInstance().getBlockchain();
|
||||
}
|
||||
|
||||
|
@ -77,34 +79,38 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
|
|||
public void channelRead0(final ChannelHandlerContext ctx, Message msg) throws InterruptedException {
|
||||
logger.trace("Read channel for {}", ctx.channel().remoteAddress());
|
||||
|
||||
msgQueue.receivedMessage(msg);
|
||||
if (listener != null) listener.onRecvMessage(msg);
|
||||
|
||||
switch (msg.getCommand()) {
|
||||
case STATUS:
|
||||
msgQueue.receivedMessage(msg);
|
||||
processStatus((StatusMessage)msg, ctx);
|
||||
break;
|
||||
case GET_TRANSACTIONS:
|
||||
msgQueue.receivedMessage(msg);
|
||||
sendPendingTransactions();
|
||||
break;
|
||||
case TRANSACTIONS:
|
||||
msgQueue.receivedMessage(msg);
|
||||
// List<Transaction> txList = transactionsMessage.getTransactions();
|
||||
// for(Transaction tx : txList)
|
||||
// WorldManager.getInstance().getBlockchain().applyTransaction(null,
|
||||
// tx);
|
||||
// WorldManager.getInstance().getWallet().addTransaction(tx);
|
||||
break;
|
||||
case BLOCKS:
|
||||
processBlocks((BlocksMessage)msg);
|
||||
break;
|
||||
case GET_TRANSACTIONS:
|
||||
sendPendingTransactions();
|
||||
break;
|
||||
case GET_BLOCK_HASHES:
|
||||
sendBlockHashes();
|
||||
msgQueue.receivedMessage(msg);
|
||||
// sendBlockHashes();
|
||||
break;
|
||||
case BLOCK_HASHES:
|
||||
msgQueue.receivedMessage(msg);
|
||||
processBlockHashes((BlockHashesMessage)msg);
|
||||
break;
|
||||
case GET_BLOCKS:
|
||||
sendBlocks();
|
||||
msgQueue.receivedMessage(msg);
|
||||
// sendBlocks();
|
||||
break;
|
||||
case BLOCKS:
|
||||
msgQueue.receivedMessage(msg);
|
||||
// processBlocks((BlocksMessage)msg);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
@ -117,10 +123,17 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
|
|||
super.exceptionCaught(ctx, cause);
|
||||
ctx.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
logger.debug("handlerRemoved: kill timers in EthHandler");
|
||||
this.killTimers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
msgQueue = new MessageQueue(ctx, peerListener);
|
||||
// Send STATUS once when channel connection has been established
|
||||
sendStatus();
|
||||
}
|
||||
|
||||
|
@ -136,28 +149,51 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
|
|||
* @param ctx the ChannelHandlerContext
|
||||
*/
|
||||
private void processStatus(StatusMessage msg, ChannelHandlerContext ctx) {
|
||||
if (!Arrays.equals(msg.getGenesisHash(), Blockchain.GENESIS_HASH) || msg.getProtocolVersion() != 33)
|
||||
if (!Arrays.equals(msg.getGenesisHash(), Blockchain.GENESIS_HASH) || msg.getProtocolVersion() != 33) {
|
||||
logger.info("Removing EthHandler due to protocol incompatibility");
|
||||
ctx.pipeline().remove(this); // Peer is not compatible for the 'eth' sub-protocol
|
||||
else if (msg.getNetworkId() != 0)
|
||||
} else if (msg.getNetworkId() != 0)
|
||||
msgQueue.sendMessage(new DisconnectMessage(ReasonCode.INCOMPATIBLE_NETWORK));
|
||||
else {
|
||||
BlockQueue chainQueue = this.blockchain.getQueue();
|
||||
BigInteger peerTotalDifficulty = new BigInteger(1, msg.getTotalDifficulty());
|
||||
BigInteger highestKnownTotalDifficulty = chainQueue.getHighestTotalDifficulty();
|
||||
// if (peerTotalDifficulty.compareTo(highestKnownTotalDifficulty) > 0) {
|
||||
// this.blockchain.getQueue().setHighestTotalDifficulty(peerTotalDifficulty);
|
||||
// this.blockchain.getQueue().setBestHash(msg.getBestHash());
|
||||
// sendGetBlockHashes(msg.getBestHash());
|
||||
// }
|
||||
startTimers();
|
||||
if(!hashRetrievalLocked) {
|
||||
BigInteger peerTotalDifficulty = new BigInteger(1, msg.getTotalDifficulty());
|
||||
BigInteger highestKnownTotalDifficulty = chainQueue.getHighestTotalDifficulty();
|
||||
if (highestKnownTotalDifficulty == null
|
||||
|| peerTotalDifficulty.compareTo(highestKnownTotalDifficulty) > 0) {
|
||||
chainQueue.setHighestTotalDifficulty(peerTotalDifficulty);
|
||||
chainQueue.setBestHash(msg.getBestHash());
|
||||
}
|
||||
hashRetrievalLocked = true;
|
||||
sendGetBlockHashes();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processBlockHashes(BlockHashesMessage blockHashesMessage) {
|
||||
// for each block hash
|
||||
// check if blockHash != known hash
|
||||
// store blockhash
|
||||
// if no known hash has been reached, another getBlockHashes with last stored hash.
|
||||
List<byte[]> receivedHashes = blockHashesMessage.getBlockHashes();
|
||||
BlockQueue chainQueue = this.blockchain.getQueue();
|
||||
|
||||
// result is empty, peer has no more hashes
|
||||
if (receivedHashes.isEmpty()) {
|
||||
// startGetBlockTimer(); // start getting blocks from hash queue
|
||||
return;
|
||||
}
|
||||
|
||||
Iterator<byte[]> hashIterator = receivedHashes.iterator();
|
||||
byte[] foundHash, latestHash = this.blockchain.getLatestBlockHash();
|
||||
while(hashIterator.hasNext()) {
|
||||
foundHash = hashIterator.next();
|
||||
if (FastByteComparisons.compareTo(foundHash, 0, 32, latestHash, 0, 32) != 0)
|
||||
chainQueue.addHash(foundHash); // store unknown hashes in queue until known hash is found
|
||||
else {
|
||||
// if known hash is found, ignore the rest
|
||||
// startGetBlockTimer(); // start getting blocks from hash queue
|
||||
return;
|
||||
}
|
||||
}
|
||||
// no known hash has been reached
|
||||
sendGetBlockHashes(); // another getBlockHashes with last received hash.
|
||||
}
|
||||
|
||||
private void processBlocks(BlocksMessage blocksMessage) {
|
||||
|
@ -176,7 +212,6 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
|
|||
|
||||
if (blockList.isEmpty()) return;
|
||||
this.blockchain.getQueue().addBlocks(blockList);
|
||||
|
||||
}
|
||||
|
||||
private void sendStatus() {
|
||||
|
@ -202,22 +237,23 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
|
|||
msgQueue.sendMessage(GET_TRANSACTIONS_MESSAGE);
|
||||
}
|
||||
|
||||
private void sendGetBlockHashes(byte[] bestHash) {
|
||||
private void sendGetBlockHashes() {
|
||||
byte[] bestHash = this.blockchain.getQueue().getBestHash();
|
||||
GetBlockHashesMessage msg = new GetBlockHashesMessage(bestHash, CONFIG.maxHashesAsk());
|
||||
msgQueue.sendMessage(msg);
|
||||
}
|
||||
|
||||
// Parallel download blocks based on hashQueue
|
||||
private void sendGetBlocks() {
|
||||
BlockQueue queue = this.blockchain.getQueue();
|
||||
if (queue.size() > CONFIG.maxBlocksQueued()) return;
|
||||
|
||||
if (WorldManager.getInstance().getBlockchain().getQueue().size() >
|
||||
CONFIG.maxBlocksQueued()) return;
|
||||
|
||||
Block lastBlock = this.blockchain.getQueue().getLastBlock();
|
||||
Block lastBlock = queue.getLastBlock();
|
||||
if (lastBlock == null) return;
|
||||
|
||||
// retrieve list of block hashes from queue
|
||||
int blocksPerPeer = CONFIG.maxBlocksAsk();
|
||||
List<byte[]> hashes = this.blockchain.getQueue().getHashes(blocksPerPeer);
|
||||
List<byte[]> hashes = queue.getHashes(blocksPerPeer);
|
||||
|
||||
GetBlocksMessage msg = new GetBlocksMessage(hashes);
|
||||
msgQueue.sendMessage(msg);
|
||||
|
@ -238,20 +274,22 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
|
|||
// TODO: Send block hashes
|
||||
}
|
||||
|
||||
private void startTimers() {
|
||||
timer.scheduleAtFixedRate(new TimerTask() {
|
||||
private void startTxTimer() {
|
||||
getTxTimer.scheduleAtFixedRate(new TimerTask() {
|
||||
public void run() {
|
||||
sendGetTransactions();
|
||||
}
|
||||
}, 2000, 10000);
|
||||
|
||||
}
|
||||
|
||||
public void startGetBlockTimer() {
|
||||
getBlocksTimer.scheduleAtFixedRate(new TimerTask() {
|
||||
public void run() {
|
||||
sendGetBlocks();
|
||||
}
|
||||
}, 1000, secToAskForBlocks * 1000);
|
||||
}
|
||||
|
||||
|
||||
private void updateGetBlocksTimer(int seconds) {
|
||||
secToAskForBlocks = seconds;
|
||||
getBlocksTimer.cancel();
|
||||
|
@ -263,12 +301,25 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
|
|||
}
|
||||
}, 3000, secToAskForBlocks * 1000);
|
||||
}
|
||||
|
||||
public void killTimers(){
|
||||
|
||||
private void stopGetHashesTimer() {
|
||||
getHashesTimer.cancel();
|
||||
getHashesTimer.purge();
|
||||
}
|
||||
|
||||
private void stopGetBlocksTimer() {
|
||||
getBlocksTimer.cancel();
|
||||
getBlocksTimer.purge();
|
||||
}
|
||||
|
||||
timer.cancel();
|
||||
timer.purge();
|
||||
private void stopGetTxTimer() {
|
||||
getTxTimer.cancel();
|
||||
getTxTimer.purge();
|
||||
}
|
||||
|
||||
public void killTimers(){
|
||||
stopGetBlocksTimer();
|
||||
stopGetHashesTimer();
|
||||
stopGetTxTimer();
|
||||
}
|
||||
}
|
|
@ -59,28 +59,39 @@ public class P2pHandler extends SimpleChannelInboundHandler<Message> {
|
|||
this();
|
||||
this.peerListener = peerListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
msgQueue = new MessageQueue(ctx, peerListener);
|
||||
// Send HELLO once when channel connection has been established
|
||||
msgQueue.sendMessage(HELLO_MESSAGE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead0(final ChannelHandlerContext ctx, Message msg) throws InterruptedException {
|
||||
logger.trace("Read channel for {}", ctx.channel().remoteAddress());
|
||||
|
||||
msgQueue.receivedMessage(msg);
|
||||
|
||||
switch (msg.getCommand()) {
|
||||
case HELLO:
|
||||
msgQueue.receivedMessage(msg);
|
||||
setHandshake((HelloMessage)msg, ctx);
|
||||
break;
|
||||
case DISCONNECT:
|
||||
msgQueue.receivedMessage(msg);
|
||||
break;
|
||||
case PING:
|
||||
msgQueue.receivedMessage(msg);
|
||||
msgQueue.sendMessage(PONG_MESSAGE);
|
||||
break;
|
||||
case PONG:
|
||||
msgQueue.receivedMessage(msg);
|
||||
break;
|
||||
case GET_PEERS:
|
||||
msgQueue.receivedMessage(msg);
|
||||
sendPeers();
|
||||
break;
|
||||
case PEERS:
|
||||
msgQueue.receivedMessage(msg);
|
||||
processPeers((PeersMessage)msg);
|
||||
break;
|
||||
default:
|
||||
|
@ -97,12 +108,10 @@ public class P2pHandler extends SimpleChannelInboundHandler<Message> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) {
|
||||
msgQueue = new MessageQueue(ctx, peerListener);
|
||||
// Send HELLO once when channel connection has been established
|
||||
msgQueue.sendMessage(HELLO_MESSAGE);
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
this.killTimers();
|
||||
}
|
||||
|
||||
|
||||
private void processPeers(PeersMessage peersMessage) {
|
||||
peerDiscovery.addPeers(peersMessage.getPeers());
|
||||
}
|
||||
|
@ -117,7 +126,7 @@ public class P2pHandler extends SimpleChannelInboundHandler<Message> {
|
|||
msgQueue.sendMessage(new DisconnectMessage(ReasonCode.INCOMPATIBLE_PROTOCOL));
|
||||
else {
|
||||
if(msg.getCapabilities().contains("eth"))
|
||||
ctx.pipeline().addLast(new EthHandler(peerListener)).fireChannelActive();
|
||||
ctx.pipeline().addLast(new EthHandler(peerListener)).fireChannelReadComplete();
|
||||
|
||||
InetAddress address = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress();
|
||||
int port = msg.getListenPort();
|
||||
|
|
|
@ -4,6 +4,8 @@ import io.netty.buffer.ByteBuf;
|
|||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
|
||||
import org.ethereum.net.message.Command;
|
||||
import org.ethereum.net.message.Message;
|
||||
import org.ethereum.net.message.MessageFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -27,11 +29,20 @@ public class PacketDecoder extends ByteToMessageDecoder {
|
|||
|
||||
byte[] encoded = new byte[in.readInt()];
|
||||
in.readBytes(encoded);
|
||||
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("Encoded: [{}]", Hex.toHexString(encoded));
|
||||
|
||||
out.add(MessageFactory.createMessage(encoded));
|
||||
Message msg = MessageFactory.createMessage(encoded);
|
||||
|
||||
if (logger.isInfoEnabled())
|
||||
// && msg.getCommand() != Command.PING
|
||||
// && msg.getCommand() != Command.PONG
|
||||
// && msg.getCommand() != Command.PEERS
|
||||
// && msg.getCommand() != Command.GET_PEERS)
|
||||
logger.info("From: \t{} \tRecv: \t{}", ctx.channel().remoteAddress(), msg);
|
||||
|
||||
out.add(msg);
|
||||
}
|
||||
|
||||
private boolean isValidEthereumPacket(ByteBuf in) {
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
package org.ethereum.net.handler;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.MessageToByteEncoder;
|
||||
|
||||
import org.ethereum.net.message.Command;
|
||||
import org.ethereum.net.message.Message;
|
||||
import org.ethereum.net.message.StaticMessages;
|
||||
import org.ethereum.util.ByteUtil;
|
||||
|
@ -21,9 +21,19 @@ public class PacketEncoder extends MessageToByteEncoder<Message> {
|
|||
|
||||
@Override
|
||||
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
|
||||
|
||||
if (logger.isInfoEnabled())
|
||||
// && msg.getCommand() != Command.PING
|
||||
// && msg.getCommand() != Command.PONG
|
||||
// && msg.getCommand() != Command.PEERS
|
||||
// && msg.getCommand() != Command.GET_PEERS)
|
||||
logger.info("To: \t{} \tSend: \t{}", ctx.channel().remoteAddress(), msg);
|
||||
|
||||
byte[] encoded = msg.getEncoded();
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("Encoded: [{}]", Hex.toHexString(encoded));
|
||||
|
||||
out.capacity(encoded.length + 8);
|
||||
out.writeBytes(StaticMessages.SYNC_TOKEN);
|
||||
out.writeBytes(ByteUtil.calcPacketLength(encoded));
|
||||
|
|
|
@ -8,7 +8,7 @@ import java.util.List;
|
|||
import org.ethereum.util.RLP;
|
||||
import org.ethereum.util.RLPItem;
|
||||
import org.ethereum.util.RLPList;
|
||||
import org.spongycastle.util.encoders.Hex;
|
||||
import org.ethereum.util.Utils;
|
||||
|
||||
/**
|
||||
* Wrapper around an Ethereum BlockHashes message on the network
|
||||
|
@ -17,14 +17,15 @@ import org.spongycastle.util.encoders.Hex;
|
|||
*/
|
||||
public class BlockHashesMessage extends Message {
|
||||
|
||||
private List<byte[]> hashes;
|
||||
/** List of block hashes from the peer ordered from child to parent */
|
||||
private List<byte[]> blockHashes;
|
||||
|
||||
public BlockHashesMessage(byte[] payload) {
|
||||
super(payload);
|
||||
}
|
||||
|
||||
public BlockHashesMessage(List<byte[]> blockHashes) {
|
||||
this.hashes = blockHashes;
|
||||
this.blockHashes = blockHashes;
|
||||
parsed = true;
|
||||
}
|
||||
|
||||
|
@ -32,10 +33,10 @@ public class BlockHashesMessage extends Message {
|
|||
RLPList paramsList = (RLPList) RLP.decode2(encoded).get(0);
|
||||
validateMessage(paramsList, BLOCK_HASHES);
|
||||
|
||||
hashes = new ArrayList<>();
|
||||
blockHashes = new ArrayList<>();
|
||||
for (int i = 1; i < paramsList.size(); ++i) {
|
||||
RLPItem rlpData = ((RLPItem) paramsList.get(i));
|
||||
hashes.add(rlpData.getRLPData());
|
||||
blockHashes.add(rlpData.getRLPData());
|
||||
}
|
||||
parsed = true;
|
||||
}
|
||||
|
@ -43,8 +44,8 @@ public class BlockHashesMessage extends Message {
|
|||
private void encode() {
|
||||
List<byte[]> encodedElements = new ArrayList<>();
|
||||
encodedElements.add(RLP.encodeByte(BLOCK_HASHES.asByte()));
|
||||
for (byte[] hash : hashes)
|
||||
encodedElements.add(RLP.encodeElement(hash));
|
||||
for (byte[] blockHash : blockHashes)
|
||||
encodedElements.add(RLP.encodeElement(blockHash));
|
||||
byte[][] encodedElementArray = encodedElements
|
||||
.toArray(new byte[encodedElements.size()][]);
|
||||
this.encoded = RLP.encodeList(encodedElementArray);
|
||||
|
@ -66,18 +67,16 @@ public class BlockHashesMessage extends Message {
|
|||
return null;
|
||||
}
|
||||
|
||||
public List<byte[]> getHashes() {
|
||||
public List<byte[]> getBlockHashes() {
|
||||
if (!parsed) parse();
|
||||
return hashes;
|
||||
return blockHashes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (!parsed) parse();
|
||||
StringBuffer sb = new StringBuffer();
|
||||
for (byte[] hash : this.hashes) {
|
||||
sb.append("\n ").append(Hex.toHexString(hash));
|
||||
}
|
||||
return "[" + this.getCommand().name() + sb.toString() + "]";
|
||||
|
||||
StringBuffer sb = Utils.getHashlistShort(this.blockHashes);
|
||||
return "[" + this.getCommand().name() + sb.toString() + "] (" + this.blockHashes.size() + ")";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,8 +15,8 @@ import org.spongycastle.util.encoders.Hex;
|
|||
*/
|
||||
public class GetBlockHashesMessage extends Message {
|
||||
|
||||
/** The hash from the block of which parent hash to start sending */
|
||||
private byte[] hash;
|
||||
/** The newest block hash from which to start sending older hashes */
|
||||
private byte[] bestHash;
|
||||
|
||||
/** The maximum number of blocks to return.
|
||||
* <b>Note:</b> the peer could return fewer. */
|
||||
|
@ -27,7 +27,7 @@ public class GetBlockHashesMessage extends Message {
|
|||
}
|
||||
|
||||
public GetBlockHashesMessage(byte[] hash, int maxBlocks) {
|
||||
this.hash = hash;
|
||||
this.bestHash = hash;
|
||||
this.maxBlocks = maxBlocks;
|
||||
parsed = true;
|
||||
encode();
|
||||
|
@ -35,7 +35,7 @@ public class GetBlockHashesMessage extends Message {
|
|||
|
||||
private void encode() {
|
||||
byte[] command = RLP.encodeByte(GET_BLOCK_HASHES.asByte());
|
||||
byte[] hash = RLP.encodeElement(this.hash);
|
||||
byte[] hash = RLP.encodeElement(this.bestHash);
|
||||
byte[] maxBlocks = RLP.encodeInt(this.maxBlocks);
|
||||
this.encoded = RLP.encodeList(command, hash, maxBlocks);
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ public class GetBlockHashesMessage extends Message {
|
|||
RLPList paramsList = (RLPList) RLP.decode2(encoded).get(0);
|
||||
validateMessage(paramsList, GET_BLOCK_HASHES);
|
||||
|
||||
this.hash = ((RLPItem) paramsList.get(1)).getRLPData();
|
||||
this.bestHash = ((RLPItem) paramsList.get(1)).getRLPData();
|
||||
byte[] maxBlocksBytes = ((RLPItem) paramsList.get(2)).getRLPData();
|
||||
this.maxBlocks = ByteUtil.byteArrayToInt(maxBlocksBytes);
|
||||
|
||||
|
@ -67,9 +67,9 @@ public class GetBlockHashesMessage extends Message {
|
|||
return BlockHashesMessage.class;
|
||||
}
|
||||
|
||||
public byte[] getHash() {
|
||||
public byte[] getBestHash() {
|
||||
if (!parsed) parse();
|
||||
return hash;
|
||||
return bestHash;
|
||||
}
|
||||
|
||||
public int getMaxBlocks() {
|
||||
|
@ -81,7 +81,7 @@ public class GetBlockHashesMessage extends Message {
|
|||
public String toString() {
|
||||
if (!parsed) parse();
|
||||
return "[" + this.getCommand().name() +
|
||||
" hash=" + Hex.toHexString(hash) +
|
||||
" bestHash=" + Hex.toHexString(bestHash) +
|
||||
" maxBlocks=" + maxBlocks + "]";
|
||||
}
|
||||
}
|
|
@ -8,7 +8,7 @@ import java.util.List;
|
|||
import org.ethereum.util.RLP;
|
||||
import org.ethereum.util.RLPItem;
|
||||
import org.ethereum.util.RLPList;
|
||||
import org.spongycastle.util.encoders.Hex;
|
||||
import org.ethereum.util.Utils;
|
||||
|
||||
/**
|
||||
* Wrapper around an Ethereum GetBlocks message on the network
|
||||
|
@ -17,6 +17,7 @@ import org.spongycastle.util.encoders.Hex;
|
|||
*/
|
||||
public class GetBlocksMessage extends Message {
|
||||
|
||||
/** List of block hashes for which to retrieve the blocks */
|
||||
private List<byte[]> blockHashes;
|
||||
|
||||
public GetBlocksMessage(byte[] encoded) {
|
||||
|
@ -73,10 +74,7 @@ public class GetBlocksMessage extends Message {
|
|||
public String toString() {
|
||||
if (!parsed) parse();
|
||||
|
||||
StringBuffer sb = new StringBuffer();
|
||||
for (byte[] blockHash : blockHashes) {
|
||||
sb.append(Hex.toHexString(blockHash)).append("\n ");
|
||||
}
|
||||
return "[" + this.getCommand().name() + sb.toString() + "]";
|
||||
StringBuffer sb = Utils.getHashlistShort(this.blockHashes);
|
||||
return "[" + this.getCommand().name() + sb.toString() + "] (" + this.blockHashes.size() + ")";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,18 +26,18 @@ public class MessageFactory {
|
|||
|
||||
case STATUS:
|
||||
return new StatusMessage(encoded);
|
||||
case TRANSACTIONS:
|
||||
return new TransactionsMessage(encoded);
|
||||
case BLOCKS:
|
||||
return new BlocksMessage(encoded);
|
||||
case GET_TRANSACTIONS:
|
||||
return StaticMessages.GET_TRANSACTIONS_MESSAGE;
|
||||
case TRANSACTIONS:
|
||||
return new TransactionsMessage(encoded);
|
||||
case GET_BLOCK_HASHES:
|
||||
return new GetBlockHashesMessage(encoded);
|
||||
case BLOCK_HASHES:
|
||||
return new BlockHashesMessage(encoded);
|
||||
case GET_BLOCKS:
|
||||
return new GetBlocksMessage(encoded);
|
||||
case BLOCKS:
|
||||
return new BlocksMessage(encoded);
|
||||
default:
|
||||
throw new IllegalArgumentException("No such message");
|
||||
}
|
||||
|
|
|
@ -113,8 +113,8 @@ public class StatusMessage extends Message {
|
|||
" protocolVersion=" + this.protocolVersion +
|
||||
" networkId=" + this.networkId +
|
||||
" totalDifficulty=" + ByteUtil.toHexString(this.totalDifficulty) +
|
||||
" bestHash=" + Hex.toHexString(this.bestHash) + " " +
|
||||
" genesisHash=" + Hex.toHexString(this.genesisHash) + " " +
|
||||
" bestHash=" + Hex.toHexString(this.bestHash) +
|
||||
" genesisHash=" + Hex.toHexString(this.genesisHash) +
|
||||
"]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import java.security.SecureRandom;
|
|||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import javax.swing.ImageIcon;
|
||||
|
@ -143,4 +144,11 @@ public class Utils {
|
|||
return result;
|
||||
}
|
||||
|
||||
public static StringBuffer getHashlistShort(List<byte[]> blockHashes) {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
String firstHash = Hex.toHexString(blockHashes.get(0));
|
||||
String lastHash = Hex.toHexString(blockHashes.get(blockHashes.size()-1));
|
||||
return sb.append(" ").append(firstHash).append("...").append(lastHash);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ public class BlockHashesMessageTest {
|
|||
System.out.println(blockHashesMessage);
|
||||
|
||||
assertEquals(Command.BLOCK_HASHES, blockHashesMessage.getCommand());
|
||||
assertEquals(128, blockHashesMessage.getHashes().size());
|
||||
assertEquals(128, blockHashesMessage.getBlockHashes().size());
|
||||
// TODO maybe also assert values for individual hashes
|
||||
assertEquals(null, blockHashesMessage.getAnswerMessage());
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ public class BlockHashesMessageTest {
|
|||
assertEquals(expected, Hex.toHexString(blockHashesMessage.getEncoded()));
|
||||
|
||||
assertEquals(Command.BLOCK_HASHES, blockHashesMessage.getCommand());
|
||||
assertEquals(2, blockHashesMessage.getHashes().size());
|
||||
assertEquals(2, blockHashesMessage.getBlockHashes().size());
|
||||
// TODO maybe also assert values for individual hashes
|
||||
assertEquals(null, blockHashesMessage.getAnswerMessage());
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ public class GetBlockHashesMessageTest {
|
|||
System.out.println(getBlockHashesMessage);
|
||||
|
||||
assertEquals(Command.GET_BLOCK_HASHES, getBlockHashesMessage.getCommand());
|
||||
assertEquals("5ad1c9caeade4cdf5798e796dc87939231d9c76c20a6a33fea6dab8e9d6dd009", Hex.toHexString(getBlockHashesMessage.getHash()));
|
||||
assertEquals("5ad1c9caeade4cdf5798e796dc87939231d9c76c20a6a33fea6dab8e9d6dd009", Hex.toHexString(getBlockHashesMessage.getBestHash()));
|
||||
assertEquals(256, getBlockHashesMessage.getMaxBlocks());
|
||||
assertEquals(BlockHashesMessage.class, getBlockHashesMessage.getAnswerMessage());
|
||||
}
|
||||
|
@ -36,7 +36,7 @@ public class GetBlockHashesMessageTest {
|
|||
assertEquals(expected, Hex.toHexString(getBlockHashesMessage.getEncoded()));
|
||||
|
||||
assertEquals(Command.GET_BLOCK_HASHES, getBlockHashesMessage.getCommand());
|
||||
assertEquals(Hex.toHexString(bestHash), Hex.toHexString(getBlockHashesMessage.getHash()));
|
||||
assertEquals(Hex.toHexString(bestHash), Hex.toHexString(getBlockHashesMessage.getBestHash()));
|
||||
assertEquals(128, getBlockHashesMessage.getMaxBlocks());
|
||||
assertEquals(BlockHashesMessage.class, getBlockHashesMessage.getAnswerMessage());
|
||||
}
|
||||
|
|
|
@ -9,16 +9,16 @@ peer.discovery.ip.list = poc-6.ethdev.com:30303,\
|
|||
54.204.10.41:30303
|
||||
|
||||
# Peer Server Zero (poc-6.ethdev.com)
|
||||
peer.active.ip = 207.12.89.101
|
||||
peer.active.port = 30303
|
||||
#peer.active.ip = 207.12.89.101
|
||||
#peer.active.port = 30303
|
||||
|
||||
# ZeroGox
|
||||
#peer.active.ip = 54.204.10.41
|
||||
#peer.active.port = 30303
|
||||
|
||||
# Winslave
|
||||
#peer.active.ip = 185.43.109.23
|
||||
#peer.active.port = 30303
|
||||
peer.active.ip = 185.43.109.23
|
||||
peer.active.port = 30303
|
||||
|
||||
# Mist
|
||||
#peer.active.ip = 213.46.28.223
|
||||
|
@ -26,7 +26,7 @@ peer.active.port = 30303
|
|||
|
||||
# VM
|
||||
#peer.active.ip = 192.168.1.193
|
||||
#peer.active.port = 30303
|
||||
#peer.active.port = 30305
|
||||
|
||||
# Localhost
|
||||
#peer.active.ip = 127.0.0.1
|
||||
|
|
Loading…
Reference in New Issue