From d2bc29f32e0ba5dceddc7d5df7ebf7103de18ada Mon Sep 17 00:00:00 2001 From: nicksavers Date: Wed, 8 Oct 2014 09:56:39 +0200 Subject: [PATCH] Dynamically add handler and use String for peerId --- .../org/ethereum/config/SystemProperties.java | 6 +-- .../java/org/ethereum/core/BlockQueue.java | 19 ++++++---- .../org/ethereum/core/BlockchainImpl.java | 3 +- .../java/org/ethereum/net/client/Peer.java | 12 +++--- .../org/ethereum/net/client/PeerClient.java | 6 +-- .../org/ethereum/net/handler/EthHandler.java | 37 +++++++++---------- .../org/ethereum/net/handler/P2pHandler.java | 4 +- .../ethereum/net/message/HelloMessage.java | 14 +++---- .../ethereum/net/message/PeersMessage.java | 3 +- .../ethereum/net/message/StaticMessages.java | 2 +- .../org/ethereum/net/HelloMessageTest.java | 10 ++--- .../org/ethereum/net/PeersMessageTest.java | 4 +- 12 files changed, 60 insertions(+), 60 deletions(-) diff --git a/ethereumj-core/src/main/java/org/ethereum/config/SystemProperties.java b/ethereumj-core/src/main/java/org/ethereum/config/SystemProperties.java index 1bb5cf66..590e71aa 100644 --- a/ethereumj-core/src/main/java/org/ethereum/config/SystemProperties.java +++ b/ethereumj-core/src/main/java/org/ethereum/config/SystemProperties.java @@ -39,7 +39,7 @@ public class SystemProperties { private static Boolean DEFAULT_BLOCKCHAIN_ONLY = false; private static int DEFAULT_TRACE_STARTBLOCK = -1; private static int DEFAULT_MAX_HASHES_ASK = -1; // unlimited - private static byte DEFAULT_MAX_BLOCKS_ASK = 10; + private static int DEFAULT_MAX_BLOCKS_ASK = 10; private static int DEFAULT_MAX_BLOCKS_QUEUED = 300; private static String DEFAULT_PROJECT_VERSION = ""; private static String DEFAULT_HELLO_PHRASE = "Dev"; @@ -192,9 +192,9 @@ public class SystemProperties { return Integer.parseInt(prop.getProperty("max.hashes.ask")); } - public Byte maxBlocksAsk() { + public Integer maxBlocksAsk() { if (prop.isEmpty()) return DEFAULT_MAX_BLOCKS_ASK; - return Byte.parseByte(prop.getProperty("max.blocks.ask")); + return Integer.parseInt(prop.getProperty("max.blocks.ask")); } public Integer maxBlocksQueued() { diff --git a/ethereumj-core/src/main/java/org/ethereum/core/BlockQueue.java b/ethereumj-core/src/main/java/org/ethereum/core/BlockQueue.java index c1490231..0b0ab6ed 100644 --- a/ethereumj-core/src/main/java/org/ethereum/core/BlockQueue.java +++ b/ethereumj-core/src/main/java/org/ethereum/core/BlockQueue.java @@ -1,5 +1,7 @@ package org.ethereum.core; +import static org.ethereum.config.SystemProperties.CONFIG; + import org.ethereum.config.SystemProperties; import org.ethereum.manager.WorldManager; import org.slf4j.Logger; @@ -20,7 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public class BlockQueue { - private static Logger logger = LoggerFactory.getLogger("blockchain"); + private static Logger logger = LoggerFactory.getLogger("blockqueue"); /** The list of hashes of the heaviest chain on the network, * for which this client doesn't have the blocks yet */ @@ -52,6 +54,7 @@ public class BlockQueue { if (blockReceivedQueue.isEmpty()) return; + logger.debug("BlockQueue size: {}", blockReceivedQueue.size()); Block block = blockReceivedQueue.poll(); WorldManager.getInstance().getBlockchain().add(block); } @@ -74,14 +77,14 @@ public class BlockQueue { for (Block block : blockList) { - if (blockReceivedQueue.size() > SystemProperties.CONFIG.maxBlocksQueued()) + if (blockReceivedQueue.size() <= SystemProperties.CONFIG.maxBlocksQueued()) return; this.lastBlock = block; - logger.trace("Last block now index: [{}]", lastBlock.getNumber()); + logger.debug("Last block now index: [{}]", lastBlock.getNumber()); blockReceivedQueue.add(lastBlock); } - logger.trace("Blocks waiting to be proceed in the queue: [{}]", + logger.debug("Blocks waiting to be proceed in the queue: [{}]", blockReceivedQueue.size()); } @@ -90,6 +93,8 @@ public class BlockQueue { * this will return the last block added to the blockchain. * * @return The last known block this client on the network + * and will never return null as there is + * always the Genesis block at the start of the chain. */ public Block getLastBlock() { if (blockReceivedQueue.isEmpty()) @@ -128,12 +133,12 @@ public class BlockQueue { * @param amount - the number of hashes to return * @return A list of hashes for which blocks need to be retrieved. */ - public List getHashes(int amount) { + public List getHashes() { + int amount = CONFIG.maxBlocksAsk(); List hashes = new ArrayList<>(); for (int i = 0; i < amount; i++) { - if (!blockHashQueue.isEmpty()) + while (!blockHashQueue.isEmpty()) hashes.add(blockHashQueue.pollLast()); - else break; } return hashes; } diff --git a/ethereumj-core/src/main/java/org/ethereum/core/BlockchainImpl.java b/ethereumj-core/src/main/java/org/ethereum/core/BlockchainImpl.java index d24aa111..95e5b1b4 100644 --- a/ethereumj-core/src/main/java/org/ethereum/core/BlockchainImpl.java +++ b/ethereumj-core/src/main/java/org/ethereum/core/BlockchainImpl.java @@ -216,7 +216,8 @@ public class BlockchainImpl implements Blockchain { if (logger.isDebugEnabled()) logger.debug("block added {}", block.toFlatString()); - logger.info("*** Last block added [ #{} ]", block.getNumber()); + if (block.getNumber() % 100 == 0) + logger.info("*** Last block added [ #{} ]", block.getNumber()); } /** diff --git a/ethereumj-core/src/main/java/org/ethereum/net/client/Peer.java b/ethereumj-core/src/main/java/org/ethereum/net/client/Peer.java index 72915429..59324945 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/client/Peer.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/client/Peer.java @@ -3,8 +3,6 @@ package org.ethereum.net.client; import org.ethereum.util.RLP; import org.spongycastle.util.encoders.Hex; -import java.util.Arrays; - import java.net.InetAddress; import java.util.ArrayList; import java.util.List; @@ -16,14 +14,14 @@ public class Peer { private InetAddress address; private int port; - private byte[] peerId; + private String peerId; private List capabilities; private transient boolean isOnline = false; private transient long lastCheckTime = 0; - public Peer(InetAddress ip, int port, byte[] peerId) { + public Peer(InetAddress ip, int port, String peerId) { this.address = ip; this.port = port; this.peerId = peerId; @@ -39,7 +37,7 @@ public class Peer { } public String getPeerId() { - return peerId == null ? "" : Hex.toHexString(peerId); + return peerId == null ? "" : peerId; } public boolean isOnline() { @@ -67,7 +65,7 @@ public class Peer { public byte[] getEncoded() { byte[] ip = RLP.encodeElement(this.address.getAddress()); byte[] port = RLP.encodeInt(this.port); - byte[] peerId = RLP.encodeElement(this.peerId); + byte[] peerId = RLP.encodeElement(Hex.decode(this.peerId)); byte[][] encodedCaps = new byte[this.capabilities.size()][]; for (int i = 0; i < this.capabilities.size(); i++) { encodedCaps[i] = RLP.encodeString(this.capabilities.get(i)); @@ -87,7 +85,7 @@ public class Peer { public boolean equals(Object obj) { if (obj == null) return false; Peer peerData = (Peer) obj; - return Arrays.equals(peerData.peerId, this.peerId) + return peerData.peerId.equals(this.peerId) || this.getAddress().equals(peerData.getAddress()); } diff --git a/ethereumj-core/src/main/java/org/ethereum/net/client/PeerClient.java b/ethereumj-core/src/main/java/org/ethereum/net/client/PeerClient.java index c898ee37..6e1d3baa 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/client/PeerClient.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/client/PeerClient.java @@ -8,7 +8,6 @@ import io.netty.handler.timeout.ReadTimeoutHandler; import org.ethereum.manager.WorldManager; import org.ethereum.net.PeerListener; -import org.ethereum.net.handler.EthHandler; import org.ethereum.net.handler.P2pHandler; import org.ethereum.net.handler.PacketDecoder; import org.ethereum.net.handler.PacketEncoder; @@ -30,7 +29,6 @@ public class PeerClient { private PeerListener peerListener; private P2pHandler p2pHandler; - private EthHandler ethHandler; public PeerClient() { } @@ -47,7 +45,6 @@ public class PeerClient { peerListener.console("Connecting to: " + host + ":" + port); p2pHandler = new P2pHandler(peerListener); - ethHandler = new EthHandler(peerListener); try { Bootstrap b = new Bootstrap(); @@ -66,8 +63,7 @@ public class PeerClient { new ReadTimeoutHandler(CONFIG.peerChannelReadTimeout(), TimeUnit.SECONDS)); ch.pipeline().addLast(new PacketEncoder()); ch.pipeline().addLast(new PacketDecoder()); - ch.pipeline().addLast("p2p", p2pHandler); - ch.pipeline().addLast("eth", ethHandler); + ch.pipeline().addLast(p2pHandler); // limit the size of receiving buffer to 1024 ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(32368)); diff --git a/ethereumj-core/src/main/java/org/ethereum/net/handler/EthHandler.java b/ethereumj-core/src/main/java/org/ethereum/net/handler/EthHandler.java index e81c5abd..76d5e90c 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/handler/EthHandler.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/handler/EthHandler.java @@ -58,10 +58,11 @@ public class EthHandler extends SimpleChannelInboundHandler { private int secToAskForBlocks = 1; + private String peerId; private Blockchain blockchain; private PeerListener peerListener; - private static boolean hashRetrievalLocked = false; + private static String hashRetrievalLock; private MessageQueue msgQueue = null; private Timer getBlocksTimer = new Timer("GetBlocksTimer"); @@ -72,8 +73,9 @@ public class EthHandler extends SimpleChannelInboundHandler { this.blockchain = WorldManager.getInstance().getBlockchain(); } - public EthHandler(PeerListener peerListener) { + public EthHandler(String peerId, PeerListener peerListener) { this(); + this.peerId = peerId; this.peerListener = peerListener; } @@ -159,17 +161,16 @@ public class EthHandler extends SimpleChannelInboundHandler { msgQueue.sendMessage(new DisconnectMessage(ReasonCode.INCOMPATIBLE_NETWORK)); else { BlockQueue chainQueue = this.blockchain.getQueue(); - if(!hashRetrievalLocked) { - BigInteger peerTotalDifficulty = new BigInteger(1, msg.getTotalDifficulty()); - BigInteger highestKnownTotalDifficulty = chainQueue.getHighestTotalDifficulty(); - if (highestKnownTotalDifficulty == null - || peerTotalDifficulty.compareTo(highestKnownTotalDifficulty) > 0) { - chainQueue.setHighestTotalDifficulty(peerTotalDifficulty); - chainQueue.setBestHash(msg.getBestHash()); - } - hashRetrievalLocked = true; + BigInteger peerTotalDifficulty = new BigInteger(1, msg.getTotalDifficulty()); + BigInteger highestKnownTotalDifficulty = chainQueue.getHighestTotalDifficulty(); + if (highestKnownTotalDifficulty == null + || peerTotalDifficulty.compareTo(highestKnownTotalDifficulty) > 0) { + hashRetrievalLock = this.peerId; + chainQueue.setHighestTotalDifficulty(peerTotalDifficulty); + chainQueue.setBestHash(msg.getBestHash()); sendGetBlockHashes(); - } + } else + startGetBlockTimer(); } } @@ -178,7 +179,9 @@ public class EthHandler extends SimpleChannelInboundHandler { BlockQueue chainQueue = this.blockchain.getQueue(); // result is empty, peer has no more hashes - if (receivedHashes.isEmpty()) { + // or peer doesn't have the best hash anymore + if (receivedHashes.isEmpty() + || !this.peerId.equals(hashRetrievalLock)) { startGetBlockTimer(); // start getting blocks from hash queue return; } @@ -250,13 +253,9 @@ public class EthHandler extends SimpleChannelInboundHandler { private void sendGetBlocks() { BlockQueue queue = this.blockchain.getQueue(); if (queue.size() > CONFIG.maxBlocksQueued()) return; - - Block lastBlock = queue.getLastBlock(); - if (lastBlock == null) return; - + // retrieve list of block hashes from queue - int blocksPerPeer = CONFIG.maxBlocksAsk(); - List hashes = queue.getHashes(blocksPerPeer); + List hashes = queue.getHashes(); GetBlocksMessage msg = new GetBlocksMessage(hashes); msgQueue.sendMessage(msg); diff --git a/ethereumj-core/src/main/java/org/ethereum/net/handler/P2pHandler.java b/ethereumj-core/src/main/java/org/ethereum/net/handler/P2pHandler.java index 6985d871..bbc69865 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/handler/P2pHandler.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/handler/P2pHandler.java @@ -130,13 +130,13 @@ public class P2pHandler extends SimpleChannelInboundHandler { else { if (msg.getCapabilities().contains("eth")) { // Activate EthHandler for this peer + ctx.pipeline().addLast(new EthHandler(msg.getPeerId(), peerListener)); ctx.fireChannelActive(); } InetAddress address = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress(); int port = msg.getListenPort(); - byte[] peerId = msg.getPeerId(); - Peer confirmedPeer = new Peer(address, port, peerId); + Peer confirmedPeer = new Peer(address, port, msg.getPeerId()); confirmedPeer.setOnline(true); confirmedPeer.getCapabilities().addAll(msg.getCapabilities()); WorldManager.getInstance().getPeerDiscovery().getPeers().add(confirmedPeer); diff --git a/ethereumj-core/src/main/java/org/ethereum/net/message/HelloMessage.java b/ethereumj-core/src/main/java/org/ethereum/net/message/HelloMessage.java index 3547f33c..c0919d52 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/message/HelloMessage.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/message/HelloMessage.java @@ -31,14 +31,14 @@ public class HelloMessage extends P2pMessage { /** The port on which the peer is listening for an incoming connection */ private int listenPort; /** The identity and public key of the peer */ - private byte[] peerId; + private String peerId; public HelloMessage(byte[] encoded) { super(encoded); } public HelloMessage(byte p2pVersion, String clientId, - List capabilities, int listenPort, byte[] peerId) { + List capabilities, int listenPort, String peerId) { this.p2pVersion = p2pVersion; this.clientId = clientId; this.capabilities = capabilities; @@ -69,8 +69,8 @@ public class HelloMessage extends P2pMessage { byte[] peerPortBytes = ((RLPItem) paramsList.get(4)).getRLPData(); this.listenPort = ByteUtil.byteArrayToInt(peerPortBytes); - this.peerId = ((RLPItem) paramsList.get(5)).getRLPData(); - + byte[] peerIdBytes = ((RLPItem) paramsList.get(5)).getRLPData(); + this.peerId = Hex.toHexString(peerIdBytes); this.parsed = true; } @@ -84,7 +84,7 @@ public class HelloMessage extends P2pMessage { } byte[] capabilityList = RLP.encodeList(capabilities); byte[] peerPort = RLP.encodeInt(this.listenPort); - byte[] peerId = RLP.encodeElement(this.peerId); + byte[] peerId = RLP.encodeElement(Hex.decode(this.peerId)); this.encoded = RLP.encodeList(command, p2pVersion, clientId, capabilityList, peerPort, peerId); @@ -121,7 +121,7 @@ public class HelloMessage extends P2pMessage { return listenPort; } - public byte[] getPeerId() { + public String getPeerId() { if (!parsed) parse(); return peerId; } @@ -137,6 +137,6 @@ public class HelloMessage extends P2pMessage { + this.p2pVersion + " clientId=" + this.clientId + " capabilities=[" + Joiner.on(" ").join(this.capabilities) + "]" + " peerPort=" + this.listenPort + " peerId=" - + Hex.toHexString(this.peerId) + "]"; + + this.peerId + "]"; } } \ No newline at end of file diff --git a/ethereumj-core/src/main/java/org/ethereum/net/message/PeersMessage.java b/ethereumj-core/src/main/java/org/ethereum/net/message/PeersMessage.java index aa342c96..123de535 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/message/PeersMessage.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/message/PeersMessage.java @@ -14,6 +14,7 @@ import org.ethereum.util.ByteUtil; import org.ethereum.util.RLP; import org.ethereum.util.RLPItem; import org.ethereum.util.RLPList; +import org.spongycastle.util.encoders.Hex; /** * Wrapper around an Ethereum Peers message on the network @@ -49,7 +50,7 @@ public class PeersMessage extends P2pMessage { try { int peerPort = ByteUtil.byteArrayToInt(portBytes); InetAddress address = InetAddress.getByAddress(ipBytes); - Peer peer = new Peer(address, peerPort, peerId); + Peer peer = new Peer(address, peerPort, Hex.toHexString(peerId)); peers.add(peer); } catch (UnknownHostException e) { throw new RuntimeException("Malformed ip", e); diff --git a/ethereumj-core/src/main/java/org/ethereum/net/message/StaticMessages.java b/ethereumj-core/src/main/java/org/ethereum/net/message/StaticMessages.java index 78b1a5ec..05763cef 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/message/StaticMessages.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/message/StaticMessages.java @@ -33,7 +33,7 @@ public class StaticMessages { int listenPort = 30303; return new HelloMessage(p2pVersion, helloAnnouncement, - capabilities, listenPort, Hex.decode(PEER_ID)); + capabilities, listenPort, PEER_ID); } private static String buildHelloAnnouncement() { diff --git a/ethereumj-core/src/test/java/org/ethereum/net/HelloMessageTest.java b/ethereumj-core/src/test/java/org/ethereum/net/HelloMessageTest.java index 2cbe591a..6e68bdfa 100644 --- a/ethereumj-core/src/test/java/org/ethereum/net/HelloMessageTest.java +++ b/ethereumj-core/src/test/java/org/ethereum/net/HelloMessageTest.java @@ -38,7 +38,7 @@ public class HelloMessageTest { assertEquals(30303, helloMessage.getListenPort()); assertEquals( "2017B95D5586ADD053E7C5DCC8112DB1D557ED83589C4E0BD25442F39E4111655A487257AA7E4ED309E8B4D55BE5FA8D8D6E97B72C67D76AA03EB69AD981ED60", - Hex.toHexString(helloMessage.getPeerId()).toUpperCase()); + helloMessage.getPeerId().toUpperCase()); } @Test /* HelloMessage 2 from Node */ @@ -63,7 +63,7 @@ public class HelloMessageTest { assertEquals("eth", helloMessage.getCapabilities().get(0)); assertEquals(30303, helloMessage.getListenPort()); assertEquals("cadfb93d2bb5fbe2943584d93ed90e374667c9e8b2502e974693ccc6b3d370bd4cde7738d0b626e3d2f3caecc59e1302d1711bf595711060d7b4921e18b97656", - Hex.toHexString(helloMessage.getPeerId())); + helloMessage.getPeerId()); } @Test /* HelloMessage 3 from new */ @@ -72,12 +72,12 @@ public class HelloMessageTest { byte p2pVersion = 0x00; List capabilities = new ArrayList<>(Arrays.asList("eth", "shh")); int listenPort = 30303; - byte[] peerIdBytes = Hex.decode("CAB0D93EEE1F44EF1286367101F1553450E3DDCE" + String peerId = "CAB0D93EEE1F44EF1286367101F1553450E3DDCE" + "EA45ABCAB0AC21E1EFB48A6610EBE88CE7317EB09229558311BA8B7250911D" - + "7E49562C3988CA3143329DA3EA"); + + "7E49562C3988CA3143329DA3EA"; HelloMessage helloMessage = new HelloMessage(p2pVersion, helloAnnouncement, - capabilities, listenPort, peerIdBytes); + capabilities, listenPort, peerId); System.out.println(helloMessage); // rlp encoded hello message String expected = "F8738080A2457468657265756D284A292F302E362E302F6465762F5" diff --git a/ethereumj-core/src/test/java/org/ethereum/net/PeersMessageTest.java b/ethereumj-core/src/test/java/org/ethereum/net/PeersMessageTest.java index 06df2636..0fa4dacb 100644 --- a/ethereumj-core/src/test/java/org/ethereum/net/PeersMessageTest.java +++ b/ethereumj-core/src/test/java/org/ethereum/net/PeersMessageTest.java @@ -64,8 +64,8 @@ public class PeersMessageTest { @Test /* PeersMessage 2 from constructor */ public void testPeers_2() throws UnknownHostException { Set peers = new HashSet<>(); - peers.add(new Peer(InetAddress.getByName("82.217.72.169"), 30303, Hex.decode("585764a3c49a3838c69ad0855abfeb5672f71b072af62082b5679961781100814b8de88a8fbc1da7c73791f88159d73b5d2a13a5579535d603e045c3db5cbb75"))); - peers.add(new Peer(InetAddress.getByName("192.168.1.193"), 30303, new byte[0])); + peers.add(new Peer(InetAddress.getByName("82.217.72.169"), 30303, "585764a3c49a3838c69ad0855abfeb5672f71b072af62082b5679961781100814b8de88a8fbc1da7c73791f88159d73b5d2a13a5579535d603e045c3db5cbb75")); + peers.add(new Peer(InetAddress.getByName("192.168.1.193"), 30303, "")); PeersMessage peersMessage = new PeersMessage(peers); System.out.println(peersMessage.toString());