From b5b7dde0bde67b638f598d5a7bead112f75d55cf Mon Sep 17 00:00:00 2001 From: nicksavers Date: Fri, 3 Oct 2014 16:19:02 +0200 Subject: [PATCH] Prepare for block hash retrieval and move genesis hash to Blockchain interface --- .../org/ethereum/config/SystemProperties.java | 7 +- .../java/org/ethereum/core/BlockQueue.java | 49 ++++-- .../org/ethereum/core/BlockchainImpl.java | 7 +- .../java/org/ethereum/facade/Blockchain.java | 8 +- .../org/ethereum/facade/EthereumImpl.java | 4 +- .../org/ethereum/manager/WorldManager.java | 35 +---- .../java/org/ethereum/net/MessageQueue.java | 28 +++- .../org/ethereum/net/client/PeerClient.java | 4 +- .../ethereum/net/client/PeerDiscovery.java | 39 ++++- .../net/client/PeerMonitorThread.java | 3 +- .../ethereum/net/client/ProtocolHandler.java | 145 ++++++++++-------- .../org/ethereum/net/client/WorkerThread.java | 8 +- .../ethereum/net/message/StaticMessages.java | 2 - .../src/main/resources/system.properties | 11 +- 14 files changed, 215 insertions(+), 135 deletions(-) diff --git a/ethereumj-core/src/main/java/org/ethereum/config/SystemProperties.java b/ethereumj-core/src/main/java/org/ethereum/config/SystemProperties.java index f3f59f38..d47fadc8 100644 --- a/ethereumj-core/src/main/java/org/ethereum/config/SystemProperties.java +++ b/ethereumj-core/src/main/java/org/ethereum/config/SystemProperties.java @@ -37,10 +37,11 @@ public class SystemProperties { private static Boolean DEFAULT_PLAY_VM = true; private static Boolean DEFAULT_BLOCKCHAIN_ONLY = false; private static int DEFAULT_TRACE_STARTBLOCK = -1; + private static int DEFAULT_MAX_HASHES_ASK = -1; // unlimited private static byte DEFAULT_MAX_BLOCKS_ASK = 10; private static int DEFAULT_MAX_BLOCKS_QUEUED = 300; private static String DEFAULT_PROJECT_VERSION = ""; - private static String DEFAULT_HELLO_PHRASE = "RJ"; + private static String DEFAULT_HELLO_PHRASE = "Dev"; public static SystemProperties CONFIG = new SystemProperties(); @@ -195,6 +196,10 @@ public class SystemProperties { return Boolean.parseBoolean(prop.getProperty("blockchain.only")); } + public Integer maxHashesAsk() { + if(prop.isEmpty()) return DEFAULT_MAX_HASHES_ASK; + return Integer.parseInt(prop.getProperty("max.hashes.ask")); + } public Byte maxBlocksAsk() { if(prop.isEmpty()) return DEFAULT_MAX_BLOCKS_ASK; diff --git a/ethereumj-core/src/main/java/org/ethereum/core/BlockQueue.java b/ethereumj-core/src/main/java/org/ethereum/core/BlockQueue.java index bfa07148..7beb065b 100644 --- a/ethereumj-core/src/main/java/org/ethereum/core/BlockQueue.java +++ b/ethereumj-core/src/main/java/org/ethereum/core/BlockQueue.java @@ -5,6 +5,7 @@ import org.ethereum.manager.WorldManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigInteger; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; @@ -16,7 +17,10 @@ public class BlockQueue { private static Logger logger = LoggerFactory.getLogger("blockchain"); - private Queue blockQueue = new ConcurrentLinkedQueue<>(); + private Queue blockHashQueue = new ConcurrentLinkedQueue<>(); + private Queue blockReceivedQueue = new ConcurrentLinkedQueue<>(); + private BigInteger highestTotalDifficulty; + private byte[] bestHash; private Block lastBlock; private Timer timer = new Timer("BlockQueueTimer"); @@ -30,38 +34,63 @@ public class BlockQueue { } private void nudgeQueue() { - if (blockQueue.isEmpty()) + if (blockReceivedQueue.isEmpty()) return; - Block block = blockQueue.poll(); + Block block = blockReceivedQueue.poll(); WorldManager.getInstance().getBlockchain().add(block); } public void addBlocks(List blockList) { Block lastReceivedBlock = blockList.get(blockList.size() - 1); - if (lastReceivedBlock.getNumber() != getLast().getNumber() + 1) + if (lastReceivedBlock.getNumber() != getLastBlock().getNumber() + 1) return; for (int i = blockList.size() - 1; i >= 0; --i) { - if (blockQueue.size() > SystemProperties.CONFIG.maxBlocksQueued()) + if (blockReceivedQueue.size() > SystemProperties.CONFIG.maxBlocksQueued()) return; this.lastBlock = blockList.get(i); logger.trace("Last block now index: [{}]", lastBlock.getNumber()); - blockQueue.add(lastBlock); + blockReceivedQueue.add(lastBlock); } logger.trace("Blocks waiting to be proceed in the queue: [{}]", - blockQueue.size()); + blockReceivedQueue.size()); + } + + public BigInteger getHighestTotalDifficulty() { + return highestTotalDifficulty; } - public Block getLast() { - if (blockQueue.isEmpty()) + public void setHighestTotalDifficulty(BigInteger highestTotalDifficulty) { + this.highestTotalDifficulty = highestTotalDifficulty; + } + + public Block getLastBlock() { + if (blockReceivedQueue.isEmpty()) return WorldManager.getInstance().getBlockchain().getLastBlock(); return lastBlock; } + public void setBestHash(byte[] bestHash) { + this.bestHash = bestHash; + } + + public byte[] getBestHash() { + return bestHash; + } + public List getHashes(int amount) { + List hashes = new ArrayList<>(); + for (int i = 0; i < amount; i++) { + if (!blockHashQueue.isEmpty()) + hashes.add(blockHashQueue.poll()); + else break; + } + return hashes; + } + private class BlockByIndexComparator implements Comparator { @Override @@ -80,7 +109,7 @@ public class BlockQueue { } public int size() { - return blockQueue.size(); + return blockReceivedQueue.size(); } public void close() { diff --git a/ethereumj-core/src/main/java/org/ethereum/core/BlockchainImpl.java b/ethereumj-core/src/main/java/org/ethereum/core/BlockchainImpl.java index bd62c0ee..4b4e7135 100644 --- a/ethereumj-core/src/main/java/org/ethereum/core/BlockchainImpl.java +++ b/ethereumj-core/src/main/java/org/ethereum/core/BlockchainImpl.java @@ -5,7 +5,6 @@ import org.ethereum.facade.Repository; import org.ethereum.listener.EthereumListener; import org.ethereum.manager.WorldManager; import org.ethereum.util.AdvancedDeviceUtils; -import org.ethereum.util.ByteUtil; import org.ethereum.vm.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -449,7 +448,7 @@ public class BlockchainImpl implements Blockchain { } @Override - public BlockQueue getBlockQueue() { + public BlockQueue getQueue() { return blockQueue; } @@ -474,7 +473,7 @@ public class BlockchainImpl implements Blockchain { } @Override - public byte[] getTotalDifficulty() { - return ByteUtil.bigIntegerToBytes(totalDifficulty); + public BigInteger getTotalDifficulty() { + return totalDifficulty; } } diff --git a/ethereumj-core/src/main/java/org/ethereum/facade/Blockchain.java b/ethereumj-core/src/main/java/org/ethereum/facade/Blockchain.java index 5915805c..98043612 100644 --- a/ethereumj-core/src/main/java/org/ethereum/facade/Blockchain.java +++ b/ethereumj-core/src/main/java/org/ethereum/facade/Blockchain.java @@ -1,12 +1,16 @@ package org.ethereum.facade; +import java.math.BigInteger; import java.util.Map; import org.ethereum.core.Block; import org.ethereum.core.BlockQueue; +import org.ethereum.core.Genesis; public interface Blockchain { + public static final byte[] GENESIS_HASH = Genesis.getInstance().getHash(); + public int getSize(); public void add(Block block); public void storeBlock(Block block); @@ -15,8 +19,8 @@ public interface Blockchain { public long getGasPrice(); public void setLastBlock(Block block); public Block getLastBlock(); - public BlockQueue getBlockQueue(); + public BlockQueue getQueue(); public void close(); - public byte[] getTotalDifficulty(); + public BigInteger getTotalDifficulty(); public byte[] getLatestBlockHash(); } diff --git a/ethereumj-core/src/main/java/org/ethereum/facade/EthereumImpl.java b/ethereumj-core/src/main/java/org/ethereum/facade/EthereumImpl.java index 9ef9a9d4..97baaf86 100644 --- a/ethereumj-core/src/main/java/org/ethereum/facade/EthereumImpl.java +++ b/ethereumj-core/src/main/java/org/ethereum/facade/EthereumImpl.java @@ -60,7 +60,7 @@ public class EthereumImpl implements Ethereum { WorldManager.getInstance().startPeerDiscovery(); - final Set peers = WorldManager.getInstance().getPeers(); + final Set peers = WorldManager.getInstance().getPeerDiscovery().getPeers(); synchronized (peers) { for (Peer peer : peers) { // it blocks until a peer is available. if (peer.isOnline() && !excludePeers.contains(peer)) { @@ -91,7 +91,7 @@ public class EthereumImpl implements Ethereum { @Override public Set getPeers() { - return WorldManager.getInstance().getPeers(); + return WorldManager.getInstance().getPeerDiscovery().getPeers(); } @Override diff --git a/ethereumj-core/src/main/java/org/ethereum/manager/WorldManager.java b/ethereumj-core/src/main/java/org/ethereum/manager/WorldManager.java index 4f836481..55ef303f 100644 --- a/ethereumj-core/src/main/java/org/ethereum/manager/WorldManager.java +++ b/ethereumj-core/src/main/java/org/ethereum/manager/WorldManager.java @@ -33,9 +33,9 @@ public class WorldManager { private PeerClient activePeer; private PeerDiscovery peerDiscovery; - private final Set peers = Collections.synchronizedSet(new HashSet()); private final Set pendingTransactions = Collections.synchronizedSet(new HashSet()); + private EthereumListener listener; private static final class WorldManagerHolder { @@ -75,33 +75,19 @@ public class WorldManager { this.listener = listener; } - /** - * Update list of known peers with new peers - * This method checks for duplicate peer id's and addresses - * - * @param newPeers to be added to the list of known peers - */ - public void addPeers(final Set newPeers) { - synchronized (peers) { - for (final Peer newPeer : newPeers) { - if(!peers.contains(newPeer)) - peerDiscovery.addNewPeer(newPeer); - peers.add(newPeer); - } - } - } - public void startPeerDiscovery() { if (!peerDiscovery.isStarted()) peerDiscovery.start(); } public void stopPeerDiscovery() { - if (listener != null) - listener.trace("Stopping peer discovery"); if (peerDiscovery.isStarted()) peerDiscovery.stop(); } + + public PeerDiscovery getPeerDiscovery() { + return peerDiscovery; + } public EthereumListener getListener() { return listener; @@ -158,16 +144,12 @@ public class WorldManager { return activePeer; } - public Set getPeers() { - return peers; - } - public Set getPendingTransactions() { return pendingTransactions; } - - public boolean isBlockchainLoading(){ - return blockchain.getBlockQueue().size() > 2; + + public boolean isBlockchainLoading(){ + return blockchain.getQueue().size() > 2; } public void close() { @@ -175,5 +157,4 @@ public class WorldManager { repository.close(); blockchain.close(); } - } diff --git a/ethereumj-core/src/main/java/org/ethereum/net/MessageQueue.java b/ethereumj-core/src/main/java/org/ethereum/net/MessageQueue.java index 207eab6c..714d3c45 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/MessageQueue.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/MessageQueue.java @@ -55,7 +55,11 @@ public class MessageQueue { if (listener != null) listener.console("[Recv: " + msg + "]"); - if (logger.isInfoEnabled()) + if (logger.isInfoEnabled() + && msg.getCommand() != Command.PING + && msg.getCommand() != Command.PONG + && msg.getCommand() != Command.PEERS + && msg.getCommand() != Command.GET_PEERS) logger.info("From: \t{} \tRecv: \t{}", ctx.channel().remoteAddress(), msg); if (logger.isDebugEnabled()) logger.debug("Encoded: [{}]", Hex.toHexString(msg.getEncoded())); @@ -70,11 +74,9 @@ public class MessageQueue { logger.debug("Message round trip covered: [{}] ", messageRoundtrip.getMsg().getCommand()); } - if (msg instanceof DisconnectMessage) { - ctx.close().sync(); - ctx.disconnect().sync(); - } } + if (msg instanceof DisconnectMessage) + disconnect(); } private void nudgeQueue() { @@ -111,7 +113,11 @@ public class MessageQueue { if (listener != null) listener.console("[Send: " + msg + "]"); - if (logger.isInfoEnabled()) + if (logger.isInfoEnabled() + && msg.getCommand() != Command.PING + && msg.getCommand() != Command.PONG + && msg.getCommand() != Command.PEERS + && msg.getCommand() != Command.GET_PEERS) logger.info("To: \t{} \tSend: \t{}", ctx.channel().remoteAddress(), msg); if (logger.isDebugEnabled()) logger.debug("Encdoded: [{}]", Hex.toHexString(msg.getEncoded())); @@ -122,5 +128,15 @@ public class MessageQueue { buffer.writeBytes(ByteUtil.calcPacketLength(msg.getEncoded())); buffer.writeBytes(msg.getEncoded()); ctx.writeAndFlush(buffer); + + if(msg instanceof DisconnectMessage) + disconnect(); } + + private void disconnect() { + throw new DisconnectException(); + } + + @SuppressWarnings("serial") + public class DisconnectException extends RuntimeException {} } diff --git a/ethereumj-core/src/main/java/org/ethereum/net/client/PeerClient.java b/ethereumj-core/src/main/java/org/ethereum/net/client/PeerClient.java index 3e4a5729..db7d9653 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/client/PeerClient.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/client/PeerClient.java @@ -26,6 +26,7 @@ public class PeerClient { private Logger logger = LoggerFactory.getLogger("wire"); + private PeerDiscovery peerDiscovery; private PeerListener peerListener; private ProtocolHandler handler; @@ -84,7 +85,8 @@ public class PeerClient { handler.killTimers(); - final Set peers = WorldManager.getInstance().getPeers(); + final Set peers = WorldManager.getInstance() + .getPeerDiscovery().getPeers(); synchronized (peers) { for (Peer peer : peers) { diff --git a/ethereumj-core/src/main/java/org/ethereum/net/client/PeerDiscovery.java b/ethereumj-core/src/main/java/org/ethereum/net/client/PeerDiscovery.java index 4a45c37f..9b39f9dc 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/client/PeerDiscovery.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/client/PeerDiscovery.java @@ -3,6 +3,9 @@ package org.ethereum.net.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -14,9 +17,10 @@ import static org.ethereum.config.SystemProperties.CONFIG; */ public class PeerDiscovery { - private static final Logger logger = LoggerFactory - .getLogger("peerdiscovery"); + private static final Logger logger = LoggerFactory.getLogger("peerdiscovery"); + private final Set peers = Collections.synchronizedSet(new HashSet()); + private PeerMonitorThread monitor; private ThreadFactory threadFactory; private ThreadPoolExecutor executorPool; @@ -45,11 +49,6 @@ public class PeerDiscovery { started.set(true); } - public void addNewPeer(Peer peer) { - logger.debug("Add new peer for discovery: {}", peer); - executorPool.execute(new WorkerThread(peer, executorPool)); - } - public void stop() { executorPool.shutdown(); monitor.shutdown(); @@ -59,4 +58,30 @@ public class PeerDiscovery { public boolean isStarted() { return started.get(); } + + /** + * Update list of known peers with new peers + * This method checks for duplicate peer id's and addresses + * + * @param newPeers to be added to the list of known peers + */ + public void addPeers(final Set newPeers) { + synchronized (peers) { + for (final Peer newPeer : newPeers) { + if(!peers.contains(newPeer)) + addNewPeer(newPeer); + peers.add(newPeer); + } + } + } + + public void addNewPeer(Peer peer) { + logger.debug("Add new peer for discovery: {}", peer); + executorPool.execute(new WorkerThread(peer, executorPool)); + } + + public Set getPeers() { + return peers; + } + } \ No newline at end of file diff --git a/ethereumj-core/src/main/java/org/ethereum/net/client/PeerMonitorThread.java b/ethereumj-core/src/main/java/org/ethereum/net/client/PeerMonitorThread.java index 4523e331..1724d2b0 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/client/PeerMonitorThread.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/client/PeerMonitorThread.java @@ -44,7 +44,8 @@ public class PeerMonitorThread implements Runnable { toStringBuff.append(", isTerminated: "); toStringBuff.append(this.executor.isTerminated()); toStringBuff.append(", peersDiscovered: "); - toStringBuff.append(WorldManager.getInstance().getPeers().size()); + toStringBuff.append(WorldManager.getInstance() + .getPeerDiscovery().getPeers().size()); logger.trace(toStringBuff.toString()); } try { diff --git a/ethereumj-core/src/main/java/org/ethereum/net/client/ProtocolHandler.java b/ethereumj-core/src/main/java/org/ethereum/net/client/ProtocolHandler.java index 49849349..22451617 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/client/ProtocolHandler.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/client/ProtocolHandler.java @@ -5,10 +5,11 @@ import static org.ethereum.net.message.StaticMessages.PING_MESSAGE; import static org.ethereum.net.message.StaticMessages.PONG_MESSAGE; import static org.ethereum.net.message.StaticMessages.HELLO_MESSAGE; import static org.ethereum.net.message.StaticMessages.GET_PEERS_MESSAGE; +import static org.ethereum.config.SystemProperties.CONFIG; +import java.math.BigInteger; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -21,9 +22,10 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOption; import io.netty.channel.FixedRecvByteBufAllocator; -import org.ethereum.config.SystemProperties; import org.ethereum.core.Block; +import org.ethereum.core.BlockQueue; import org.ethereum.core.Transaction; +import org.ethereum.facade.Blockchain; import org.ethereum.listener.EthereumListener; import org.ethereum.manager.WorldManager; import org.ethereum.net.MessageQueue; @@ -38,9 +40,9 @@ import org.ethereum.net.message.HelloMessage; import org.ethereum.net.message.Message; import org.ethereum.net.message.PeersMessage; import org.ethereum.net.message.ReasonCode; -import org.ethereum.net.message.StaticMessages; import org.ethereum.net.message.StatusMessage; import org.ethereum.net.message.TransactionsMessage; +import org.ethereum.util.ByteUtil; import org.ethereum.util.RLP; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +82,7 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { private Timer blocksAskTimer = new Timer("ChainAskTimer"); private int secToAskForBlocks = 1; + private PeerDiscovery peerDiscovery; private PeerListener peerListener; private EthereumListener listener; @@ -87,11 +90,13 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { private final Timer timer = new Timer("MiscMessageTimer"); public ProtocolHandler() { + this.listener = WorldManager.getInstance().getListener(); + this.peerDiscovery = WorldManager.getInstance().getPeerDiscovery(); } public ProtocolHandler(PeerListener peerListener) { + this(); this.peerListener = peerListener; - this.listener = WorldManager.getInstance().getListener(); } @Override @@ -127,9 +132,6 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { DisconnectMessage disconnectMessage = new DisconnectMessage(payload); msgQueue.receivedMessage(disconnectMessage); - ctx.close().sync(); - ctx.disconnect().sync(); - if (listener != null) listener.onRecvMessage(disconnectMessage); break; case PING: @@ -216,9 +218,28 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { break; } } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws InterruptedException { + tearDown = true; + logger.warn("Lost connection to {}", ctx.channel().remoteAddress()); + cause.printStackTrace(); + logger.info("Reason: {} ({})", cause.getMessage(), cause.getClass().getName()); + logger.debug("Stacktrace", cause); + ctx.close().sync(); + ctx.disconnect().sync(); + killTimers(); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + // limit the size of recieving buffer to 1024 + ctx.channel().config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(32368)); + ctx.channel().config().setOption(ChannelOption.SO_RCVBUF, 32368); + } private void processPeers(PeersMessage peersMessage) { - WorldManager.getInstance().addPeers(peersMessage.getPeers()); + peerDiscovery.addPeers(peersMessage.getPeers()); } /** @@ -229,18 +250,24 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { * @param msg is the StatusMessage */ private void processStatus(StatusMessage msg) { - if (!Arrays.equals(msg.getGenesisHash(), StaticMessages.GENESIS_HASH)) + if (!Arrays.equals(msg.getGenesisHash(), Blockchain.GENESIS_HASH)) sendDisconnect(ReasonCode.WRONG_GENESIS); else if (msg.getProtocolVersion() != 33) sendDisconnect(ReasonCode.INCOMPATIBLE_PROTOCOL); else if (msg.getNetworkId() != 0) sendDisconnect(ReasonCode.INCOMPATIBLE_PROTOCOL); else { - // if totalDifficulty is highest known from all peers - // - update bestHash - // sendGetBlockHashes(msg.getBestHash()); +// BlockQueue chainQueue = WorldManager.getInstance().getBlockchain().getQueue(); +// BigInteger peerTotalDifficulty = new BigInteger(1, msg.getTotalDifficulty()); +// BigInteger highestKnownTotalDifficulty = chainQueue.getHighestTotalDifficulty(); +// if (peerTotalDifficulty.compareTo(highestKnownTotalDifficulty) > 0) { +// WorldManager.getInstance().getBlockchain().getQueue() +// .setHighestTotalDifficulty(peerTotalDifficulty); +// WorldManager.getInstance().getBlockchain().getQueue() +// .setBestHash(msg.getBestHash()); +// sendGetBlockHashes(msg.getBestHash()); +// } } - // TODO: discard peer from pool } private void processBlockHashes(BlockHashesMessage blockHashesMessage) { @@ -265,7 +292,7 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { } if (blockList.isEmpty()) return; - WorldManager.getInstance().getBlockchain().getBlockQueue().addBlocks(blockList); + WorldManager.getInstance().getBlockchain().getQueue().addBlocks(blockList); } @@ -282,7 +309,6 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { private void sendDisconnect(ReasonCode reason) { DisconnectMessage msg = new DisconnectMessage(reason); sendMsg(msg); - // TODO actually disconnect from peer (and remove from list of peers) } private void sendPing() { @@ -298,47 +324,17 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { } private void sendPeers() { - Set peers = WorldManager.getInstance().getPeers(); - PeersMessage msg = new PeersMessage(peers); + PeersMessage msg = new PeersMessage(peerDiscovery.getPeers()); sendMsg(msg); } - - private void updateBlockAskTimer(int seconds) { - secToAskForBlocks = seconds; - blocksAskTimer.cancel(); - blocksAskTimer.purge(); - blocksAskTimer = new Timer(); - blocksAskTimer.scheduleAtFixedRate(new TimerTask() { - public void run() { - sendGetBlocks(); - } - }, 3000, secToAskForBlocks * 1000); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws InterruptedException { - tearDown = true; - logger.warn("Lost connection to {}", ctx.channel().remoteAddress()); - logger.info("Reason: {} ({})", cause.getMessage(), cause.getClass().getName()); - logger.debug("Stacktrace", cause); - ctx.close().sync(); - ctx.disconnect().sync(); - killTimers(); - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - // limit the size of recieving buffer to 1024 - ctx.channel().config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(32368)); - ctx.channel().config().setOption(ChannelOption.SO_RCVBUF, 32368); - } private void sendStatus() { + Blockchain chain = WorldManager.getInstance().getBlockchain(); byte protocolVersion = 33, networkId = 0; - byte[] totalDifficulty = WorldManager.getInstance().getBlockchain().getTotalDifficulty(); - byte[] bestHash = WorldManager.getInstance().getBlockchain().getLatestBlockHash(); + BigInteger totalDifficulty = chain.getTotalDifficulty(); + byte[] bestHash = chain.getLatestBlockHash(); StatusMessage msg = new StatusMessage(protocolVersion, networkId, - totalDifficulty, bestHash, StaticMessages.GENESIS_HASH); + ByteUtil.bigIntegerToBytes(totalDifficulty), bestHash, Blockchain.GENESIS_HASH); sendMsg(msg); } @@ -356,21 +352,26 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { sendMsg(GET_TRANSACTIONS_MESSAGE); } - private void sendGetBlockHashes(byte[] bestHash) { - GetBlockHashesMessage msg = new GetBlockHashesMessage(bestHash, 128); + private void sendGetBlockHashes(byte[] bestHash) { + GetBlockHashesMessage msg = new GetBlockHashesMessage(bestHash, CONFIG.maxHashesAsk()); sendMsg(msg); } private void sendGetBlocks() { - if (WorldManager.getInstance().getBlockchain().getBlockQueue().size() > - SystemProperties.CONFIG.maxBlocksQueued()) return; + if (WorldManager.getInstance().getBlockchain().getQueue().size() > + CONFIG.maxBlocksQueued()) return; - Block lastBlock = WorldManager.getInstance().getBlockchain().getBlockQueue().getLast(); + Block lastBlock = WorldManager.getInstance().getBlockchain().getQueue() + .getLastBlock(); if (lastBlock == null) return; - List hashes = new ArrayList<>(); // retrieve list from block-hashes queue - GetBlocksMessage msg = new GetBlocksMessage(hashes); + // retrieve list of block hashes from queue + int blocksPerPeer = CONFIG.maxBlocksAsk(); + List hashes = WorldManager.getInstance().getBlockchain() + .getQueue().getHashes(blocksPerPeer); + + GetBlocksMessage msg = new GetBlocksMessage(hashes); sendMsg(msg); } @@ -388,14 +389,6 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { private void sendBlockHashes() { // TODO: Send block hashes } - - public void killTimers(){ - blocksAskTimer.cancel(); - blocksAskTimer.purge(); - - timer.cancel(); - timer.purge(); - } private void setHandshake(HelloMessage msg, ChannelHandlerContext ctx) { // TODO validate p2pVersion @@ -406,7 +399,7 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { Peer confirmedPeer = new Peer(address, port, peerId); confirmedPeer.setOnline(true); confirmedPeer.setHandshake(handshake); - WorldManager.getInstance().getPeers().add(confirmedPeer); + WorldManager.getInstance().getPeerDiscovery().getPeers().add(confirmedPeer); startTimers(); } @@ -444,6 +437,26 @@ public class ProtocolHandler extends ChannelInboundHandlerAdapter { } } + private void updateBlockAskTimer(int seconds) { + secToAskForBlocks = seconds; + blocksAskTimer.cancel(); + blocksAskTimer.purge(); + blocksAskTimer = new Timer(); + blocksAskTimer.scheduleAtFixedRate(new TimerTask() { + public void run() { + sendGetBlocks(); + } + }, 3000, secToAskForBlocks * 1000); + } + + public void killTimers(){ + blocksAskTimer.cancel(); + blocksAskTimer.purge(); + + timer.cancel(); + timer.purge(); + } + protected HelloMessage getHandshake() { return handshake; } diff --git a/ethereumj-core/src/main/java/org/ethereum/net/client/WorkerThread.java b/ethereumj-core/src/main/java/org/ethereum/net/client/WorkerThread.java index 7a58bfa9..5ad4a126 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/client/WorkerThread.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/client/WorkerThread.java @@ -11,7 +11,7 @@ import java.util.concurrent.ThreadPoolExecutor; */ public class WorkerThread implements Runnable { - private final static Logger logger = LoggerFactory.getLogger("wire"); + private final static Logger logger = LoggerFactory.getLogger("peerdiscovery"); private Peer peer; private PeerClient clientPeer; @@ -41,13 +41,13 @@ public class WorkerThread implements Runnable { peer.setOnline(true); peer.setHandshake(clientPeer.getHandler().getHandshake()); } catch (Throwable e) { - if (peer.isOnline() == true) + if (peer.isOnline()) logger.info("Peer: [{}] went offline, due to: [{}]", peer .getAddress().getHostAddress(), e); peer.setOnline(false); } finally { - logger.info("Peer: " + peer.toString() + " isOnline: " - + peer.isOnline()); + logger.info("Peer: " + peer.toString() + " is " + + (peer.isOnline() ? "online" : "offline")); peer.setLastCheckTime(System.currentTimeMillis()); } } diff --git a/ethereumj-core/src/main/java/org/ethereum/net/message/StaticMessages.java b/ethereumj-core/src/main/java/org/ethereum/net/message/StaticMessages.java index d422ffe9..5dfcb350 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/message/StaticMessages.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/message/StaticMessages.java @@ -5,7 +5,6 @@ import java.util.Arrays; import java.util.List; import org.ethereum.config.SystemProperties; -import org.ethereum.core.Genesis; import org.ethereum.crypto.HashUtil; import org.spongycastle.util.encoders.Hex; @@ -25,7 +24,6 @@ public class StaticMessages { public final static GetTransactionsMessage GET_TRANSACTIONS_MESSAGE = new GetTransactionsMessage(); public static final byte[] SYNC_TOKEN = Hex.decode("22400891"); - public static final byte[] GENESIS_HASH = Genesis.getInstance().getHash(); private static HelloMessage generateHelloMessage() { String helloAnnouncement = buildHelloAnnouncement(); diff --git a/ethereumj-studio/src/main/resources/system.properties b/ethereumj-studio/src/main/resources/system.properties index 6ce3dd13..be82ffd0 100644 --- a/ethereumj-studio/src/main/resources/system.properties +++ b/ethereumj-studio/src/main/resources/system.properties @@ -112,10 +112,17 @@ trace.startblock = -1 # occurs anyway [true/false] play.vm = true +# maximum blocks hashes to ask. +# sending GET_BLOCK_HASHES msg +# we specify number of block we want +# to get, recomendec value [1..1000] +# Default: unlimited +max.hashes.ask = 1000 + # maximum blocks to ask, # when downloading the chain -# sequenteally sending GET_CHAIN msg -# we specify number of block we want +# sequenteally sending GET_BLOCKS msg +# we specify number of blocks we want # to get, recomendec value [1..100] max.blocks.ask = 100