Retrieve block hashes and fix separated handlers

This commit is contained in:
nicksavers 2014-10-06 11:23:28 +02:00
parent ea5214912f
commit 687ac2d535
22 changed files with 111 additions and 76 deletions

View File

@ -7,6 +7,7 @@ import org.slf4j.LoggerFactory;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@ -17,7 +18,7 @@ public class BlockQueue {
private static Logger logger = LoggerFactory.getLogger("blockchain");
private Deque<byte[]> blockHashQueue = new ArrayDeque<>();
private Deque<byte[]> blockHashQueue = new ConcurrentLinkedDeque<>();
private Queue<Block> blockReceivedQueue = new ConcurrentLinkedQueue<>();
private BigInteger highestTotalDifficulty;
private Block lastBlock;
@ -90,7 +91,7 @@ public class BlockQueue {
List<byte[]> hashes = new ArrayList<>();
for (int i = 0; i < amount; i++) {
if (!blockHashQueue.isEmpty())
hashes.add(blockHashQueue.poll());
hashes.add(blockHashQueue.pollLast());
else break;
}
return hashes;

View File

@ -19,7 +19,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* GET_BLOCK_HASHES by BLOCK_HASHES
* GET_BLOCKS by BLOCKS
*
* The following messages will not be answered:
* The following messages will not be answered:
* PONG, PEERS, HELLO, STATUS, TRANSACTIONS, BLOCKS
*
* @author Roman Mandeleil

View File

@ -8,6 +8,7 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
import org.ethereum.manager.WorldManager;
import org.ethereum.net.PeerListener;
import org.ethereum.net.handler.EthHandler;
import org.ethereum.net.handler.P2pHandler;
import org.ethereum.net.handler.PacketDecoder;
import org.ethereum.net.handler.PacketEncoder;
@ -29,6 +30,7 @@ public class PeerClient {
private PeerListener peerListener;
private P2pHandler p2pHandler;
private EthHandler ethHandler;
public PeerClient() {
}
@ -45,6 +47,7 @@ public class PeerClient {
peerListener.console("Connecting to: " + host + ":" + port);
p2pHandler = new P2pHandler(peerListener);
ethHandler = new EthHandler(peerListener);
try {
Bootstrap b = new Bootstrap();
@ -61,9 +64,11 @@ public class PeerClient {
public void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("readTimeoutHandler",
new ReadTimeoutHandler(CONFIG.peerChannelReadTimeout(), TimeUnit.SECONDS));
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new PacketEncoder());
ch.pipeline().addLast(p2pHandler);
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast("p2p", p2pHandler);
ch.pipeline().addLast("eth", ethHandler);
// limit the size of receiving buffer to 1024
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(32368));
ch.config().setOption(ChannelOption.SO_RCVBUF, 32368);

View File

@ -1,5 +1,6 @@
package org.ethereum.net.client;
import org.ethereum.net.message.StaticMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,11 +76,13 @@ public class PeerDiscovery {
*/
public void addPeers(final Set<Peer> newPeers) {
synchronized (peers) {
for (final Peer newPeer : newPeers) {
if(!peers.contains(newPeer))
startWorker(newPeer);
peers.add(newPeer);
}
for (final Peer newPeer : newPeers) {
if (!StaticMessages.PEER_ID.equals(newPeer.getPeerId())) {
if (!peers.contains(newPeer))
startWorker(newPeer);
peers.add(newPeer);
}
}
}
}

View File

@ -12,6 +12,7 @@ import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@ -25,9 +26,9 @@ import org.ethereum.net.PeerListener;
import org.ethereum.net.message.BlockHashesMessage;
import org.ethereum.net.message.BlocksMessage;
import org.ethereum.net.message.DisconnectMessage;
import org.ethereum.net.message.EthMessage;
import org.ethereum.net.message.GetBlockHashesMessage;
import org.ethereum.net.message.GetBlocksMessage;
import org.ethereum.net.message.Message;
import org.ethereum.net.message.ReasonCode;
import org.ethereum.net.message.StatusMessage;
import org.ethereum.net.message.TransactionsMessage;
@ -50,7 +51,8 @@ import org.slf4j.LoggerFactory;
* <li>BLOCKS : Send a list of blocks</li>
* </ul>
*/
public class EthHandler extends SimpleChannelInboundHandler<Message> {
@ChannelHandler.Sharable
public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
private final static Logger logger = LoggerFactory.getLogger("wire");
@ -74,9 +76,15 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
this();
this.peerListener = peerListener;
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, Message msg) throws InterruptedException {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
msgQueue = new MessageQueue(ctx, peerListener);
sendStatus();
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, EthMessage msg) throws InterruptedException {
logger.trace("Read channel for {}", ctx.channel().remoteAddress());
switch (msg.getCommand()) {
@ -110,7 +118,7 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
break;
case BLOCKS:
msgQueue.receivedMessage(msg);
// processBlocks((BlocksMessage)msg);
processBlocks((BlocksMessage)msg);
break;
default:
break;
@ -130,13 +138,6 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
this.killTimers();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
msgQueue = new MessageQueue(ctx, peerListener);
// Send STATUS once when channel connection has been established
sendStatus();
}
/**
* Processing:
* <ul>
@ -149,8 +150,10 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
* @param ctx the ChannelHandlerContext
*/
private void processStatus(StatusMessage msg, ChannelHandlerContext ctx) {
if (!Arrays.equals(msg.getGenesisHash(), Blockchain.GENESIS_HASH) || msg.getProtocolVersion() != 33) {
logger.info("Removing EthHandler due to protocol incompatibility");
if (!Arrays.equals(msg.getGenesisHash(), Blockchain.GENESIS_HASH)
|| msg.getProtocolVersion() != CONFIG.protocolVersion()) {
logger.info("Removing EthHandler for {} due to protocol incompatibility", ctx.channel().remoteAddress());
// msgQueue.sendMessage(new DisconnectMessage(ReasonCode.INCOMPATIBLE_NETWORK));
ctx.pipeline().remove(this); // Peer is not compatible for the 'eth' sub-protocol
} else if (msg.getNetworkId() != 0)
msgQueue.sendMessage(new DisconnectMessage(ReasonCode.INCOMPATIBLE_NETWORK));
@ -176,7 +179,7 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
// result is empty, peer has no more hashes
if (receivedHashes.isEmpty()) {
// startGetBlockTimer(); // start getting blocks from hash queue
startGetBlockTimer(); // start getting blocks from hash queue
return;
}
@ -188,7 +191,7 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
chainQueue.addHash(foundHash); // store unknown hashes in queue until known hash is found
else {
// if known hash is found, ignore the rest
// startGetBlockTimer(); // start getting blocks from hash queue
startGetBlockTimer(); // start getting blocks from hash queue
return;
}
}
@ -213,7 +216,7 @@ public class EthHandler extends SimpleChannelInboundHandler<Message> {
if (blockList.isEmpty()) return;
this.blockchain.getQueue().addBlocks(blockList);
}
private void sendStatus() {
byte protocolVersion = 33, networkId = 0;
BigInteger totalDifficulty = this.blockchain.getTotalDifficulty();

View File

@ -10,6 +10,7 @@ import java.net.InetSocketAddress;
import java.util.Timer;
import java.util.TimerTask;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@ -20,7 +21,7 @@ import org.ethereum.net.client.Peer;
import org.ethereum.net.client.PeerDiscovery;
import org.ethereum.net.message.DisconnectMessage;
import org.ethereum.net.message.HelloMessage;
import org.ethereum.net.message.Message;
import org.ethereum.net.message.P2pMessage;
import org.ethereum.net.message.PeersMessage;
import org.ethereum.net.message.ReasonCode;
import org.slf4j.Logger;
@ -39,7 +40,8 @@ import org.slf4j.LoggerFactory;
* <li>PONG : Confirm that they themselves are still alive</li>
* </ul>
*/
public class P2pHandler extends SimpleChannelInboundHandler<Message> {
@ChannelHandler.Sharable
public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
private final static Logger logger = LoggerFactory.getLogger("wire");
@ -50,7 +52,7 @@ public class P2pHandler extends SimpleChannelInboundHandler<Message> {
private MessageQueue msgQueue = null;
private boolean tearDown = false;
public P2pHandler() {
this.peerDiscovery = WorldManager.getInstance().getPeerDiscovery();
}
@ -65,16 +67,17 @@ public class P2pHandler extends SimpleChannelInboundHandler<Message> {
msgQueue = new MessageQueue(ctx, peerListener);
// Send HELLO once when channel connection has been established
msgQueue.sendMessage(HELLO_MESSAGE);
startTimers();
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, Message msg) throws InterruptedException {
public void channelRead0(final ChannelHandlerContext ctx, P2pMessage msg) throws InterruptedException {
logger.trace("Read channel for {}", ctx.channel().remoteAddress());
switch (msg.getCommand()) {
case HELLO:
msgQueue.receivedMessage(msg);
setHandshake((HelloMessage)msg, ctx);
setHandshake((HelloMessage) msg, ctx);
break;
case DISCONNECT:
msgQueue.receivedMessage(msg);
@ -92,27 +95,27 @@ public class P2pHandler extends SimpleChannelInboundHandler<Message> {
break;
case PEERS:
msgQueue.receivedMessage(msg);
processPeers((PeersMessage)msg);
processPeers(ctx, (PeersMessage) msg);
break;
default:
ctx.fireChannelRead(msg);
break;
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
this.killTimers();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error(cause.getCause().toString());
super.exceptionCaught(ctx, cause);
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
this.killTimers();
}
private void processPeers(PeersMessage peersMessage) {
private void processPeers(ChannelHandlerContext ctx, PeersMessage peersMessage) {
peerDiscovery.addPeers(peersMessage.getPeers());
}
@ -121,23 +124,24 @@ public class P2pHandler extends SimpleChannelInboundHandler<Message> {
msgQueue.sendMessage(msg);
}
private void setHandshake(HelloMessage msg, ChannelHandlerContext ctx) {
if (msg.getP2PVersion() != 0)
msgQueue.sendMessage(new DisconnectMessage(ReasonCode.INCOMPATIBLE_PROTOCOL));
else {
if(msg.getCapabilities().contains("eth"))
ctx.pipeline().addLast(new EthHandler(peerListener)).fireChannelReadComplete();
private void setHandshake(HelloMessage msg, ChannelHandlerContext ctx) {
if (msg.getP2PVersion() != 0)
msgQueue.sendMessage(new DisconnectMessage(ReasonCode.INCOMPATIBLE_PROTOCOL));
else {
if (msg.getCapabilities().contains("eth")) {
// Activate EthHandler for this peer
ctx.fireChannelActive();
}
InetAddress address = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress();
int port = msg.getListenPort();
byte[] peerId = msg.getPeerId();
Peer confirmedPeer = new Peer(address, port, peerId);
confirmedPeer.setOnline(true);
confirmedPeer.getCapabilities().addAll(msg.getCapabilities());
WorldManager.getInstance().getPeerDiscovery().getPeers().add(confirmedPeer);
startTimers();
}
}
InetAddress address = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress();
int port = msg.getListenPort();
byte[] peerId = msg.getPeerId();
Peer confirmedPeer = new Peer(address, port, peerId);
confirmedPeer.setOnline(true);
confirmedPeer.getCapabilities().addAll(msg.getCapabilities());
WorldManager.getInstance().getPeerDiscovery().getPeers().add(confirmedPeer);
}
}
private void startTimers() {
// sample for pinging in background
@ -152,7 +156,7 @@ public class P2pHandler extends SimpleChannelInboundHandler<Message> {
public void run() {
msgQueue.sendMessage(GET_PEERS_MESSAGE);
}
}, 2000, 6000);
}, 2000, 25000);
}
public void killTimers(){

View File

@ -15,7 +15,7 @@ import org.ethereum.util.Utils;
*
* @see {@link org.ethereum.net.message.Command#BLOCK_HASHES}
*/
public class BlockHashesMessage extends Message {
public class BlockHashesMessage extends EthMessage {
/** List of block hashes from the peer ordered from child to parent */
private List<byte[]> blockHashes;

View File

@ -14,7 +14,7 @@ import org.ethereum.util.RLPList;
*
* @see {@link org.ethereum.net.message.Command#BLOCKS}
*/
public class BlocksMessage extends Message {
public class BlocksMessage extends EthMessage {
private List<Block> blocks;

View File

@ -12,7 +12,7 @@ import static org.ethereum.net.message.ReasonCode.REQUESTED;
*
* @see {@link org.ethereum.net.message.Command#DISCONNECT}
*/
public class DisconnectMessage extends Message {
public class DisconnectMessage extends P2pMessage {
private ReasonCode reason;

View File

@ -0,0 +1,10 @@
package org.ethereum.net.message;
public abstract class EthMessage extends Message {
public EthMessage() {}
public EthMessage(byte[] encoded) {
super(encoded);
}
}

View File

@ -13,7 +13,7 @@ import org.spongycastle.util.encoders.Hex;
*
* @see {@link org.ethereum.net.message.Command#GET_BLOCK_HASHES}
*/
public class GetBlockHashesMessage extends Message {
public class GetBlockHashesMessage extends EthMessage {
/** The newest block hash from which to start sending older hashes */
private byte[] bestHash;

View File

@ -15,7 +15,7 @@ import org.ethereum.util.Utils;
*
* @see {@link org.ethereum.net.message.Command#GET_BLOCKS}
*/
public class GetBlocksMessage extends Message {
public class GetBlocksMessage extends EthMessage {
/** List of block hashes for which to retrieve the blocks */
private List<byte[]> blockHashes;

View File

@ -9,7 +9,7 @@ import org.spongycastle.util.encoders.Hex;
*
* @see {@link org.ethereum.net.message.Command#GET_PEERS}
*/
public class GetPeersMessage extends Message {
public class GetPeersMessage extends P2pMessage {
/** GetPeers message is always a the same single command payload */
private final static byte[] FIXED_PAYLOAD = Hex.decode("C104");

View File

@ -9,7 +9,7 @@ import org.spongycastle.util.encoders.Hex;
*
* @see {@link org.ethereum.net.message.Command#GET_TRANSACTIONS}
*/
public class GetTransactionsMessage extends Message {
public class GetTransactionsMessage extends EthMessage {
/** GetTransactions message is always a the same single command payload */
private static byte[] FIXED_PAYLOAD = Hex.decode("C116");

View File

@ -19,7 +19,7 @@ import java.util.List;
*
* @see {@link org.ethereum.net.message.Command#HELLO}
*/
public class HelloMessage extends Message {
public class HelloMessage extends P2pMessage {
/** The implemented version of the P2P protocol. */
private byte p2pVersion;

View File

@ -0,0 +1,10 @@
package org.ethereum.net.message;
public abstract class P2pMessage extends Message {
public P2pMessage() {}
public P2pMessage(byte[] encoded) {
super(encoded);
}
}

View File

@ -20,7 +20,7 @@ import org.ethereum.util.RLPList;
*
* @see {@link org.ethereum.net.message.Command#PEERS}
*/
public class PeersMessage extends Message {
public class PeersMessage extends P2pMessage {
private boolean parsed = false;

View File

@ -9,7 +9,7 @@ import org.spongycastle.util.encoders.Hex;
*
* @see {@link org.ethereum.net.message.Command#PING}
*/
public class PingMessage extends Message {
public class PingMessage extends P2pMessage {
/** Ping message is always a the same single command payload */
private static byte[] FIXED_PAYLOAD = Hex.decode("C102");

View File

@ -9,7 +9,7 @@ import org.spongycastle.util.encoders.Hex;
*
* @see {@link org.ethereum.net.message.Command#PONG}
*/
public class PongMessage extends Message {
public class PongMessage extends P2pMessage {
/** Pong message is always a the same single command payload */
private static byte[] FIXED_PAYLOAD = Hex.decode("C103");

View File

@ -1,6 +1,5 @@
package org.ethereum.net.message;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -17,6 +16,8 @@ import org.spongycastle.util.encoders.Hex;
*/
public class StaticMessages {
public static final String PEER_ID = Hex.toHexString(HashUtil.randomPeerId());
public final static PingMessage PING_MESSAGE = new PingMessage();
public final static PongMessage PONG_MESSAGE = new PongMessage();
public final static HelloMessage HELLO_MESSAGE = generateHelloMessage();
@ -28,13 +29,11 @@ public class StaticMessages {
private static HelloMessage generateHelloMessage() {
String helloAnnouncement = buildHelloAnnouncement();
byte p2pVersion = 0x00;
List<String> capabilities = new ArrayList<>(Arrays.asList("eth"));
// List<String> capabilities = new ArrayList<>(Arrays.asList("eth"));
List<String> capabilities = Arrays.asList("eth");
int listenPort = 30303;
byte[] peerIdBytes = HashUtil.randomPeerId();
return new HelloMessage(p2pVersion, helloAnnouncement,
capabilities, listenPort, peerIdBytes);
capabilities, listenPort, Hex.decode(PEER_ID));
}
private static String buildHelloAnnouncement() {

View File

@ -13,7 +13,7 @@ import org.spongycastle.util.encoders.Hex;
*
* @see {@link org.ethereum.net.message.Command#STATUS}
*/
public class StatusMessage extends Message {
public class StatusMessage extends EthMessage {
private byte protocolVersion;
private byte networkId;

View File

@ -16,7 +16,7 @@ import java.util.Set;
*
* @see {@link org.ethereum.net.message.Command#TRANSACTIONS}
*/
public class TransactionsMessage extends Message {
public class TransactionsMessage extends EthMessage {
private Set<Transaction> transactions;