Merge pull request #100 from davassi/thread-safety

Thread-safety and data structures fixes
This commit is contained in:
romanman 2014-08-27 13:40:32 +03:00
commit d45b4ad555
15 changed files with 152 additions and 159 deletions

View File

@ -32,7 +32,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
*/
public class Block {
private static Logger logger = LoggerFactory.getLogger(Block.class);
private static Logger logger = LoggerFactory.getLogger("block");
/* A scalar value equal to the mininum limit of gas expenditure per block */
private static long MIN_GAS_LIMIT = 125000L;

View File

@ -52,46 +52,30 @@ import static org.ethereum.core.Denomination.SZABO;
*/
public class Blockchain {
private static Logger logger = LoggerFactory.getLogger("blockchain");
private static Logger stateLogger = LoggerFactory.getLogger("state");
private static final Logger logger = LoggerFactory.getLogger("blockchain");
private static final Logger stateLogger = LoggerFactory.getLogger("state");
// to avoid using minGasPrice=0 from Genesis for the wallet
private static long INITIAL_MIN_GAS_PRICE = 10 * SZABO.longValue();
private static final long INITIAL_MIN_GAS_PRICE = 10 * SZABO.longValue();
private Repository repository;
private Block lastBlock;
// keep the index of the chain for
// convenient usage, <block_number, block_hash>
private Map<Long, byte[]> blockCache = new HashMap<>();
private final Map<Long, byte[]> blockCache = new HashMap<>();
private BlockQueue blockQueue = new BlockQueue();
private final BlockQueue blockQueue = new BlockQueue();
public Blockchain(Repository repository) {
this.repository = repository;
}
public BlockQueue getBlockQueue() {
return blockQueue;
}
public Map<Long, byte[]> getBlockCache() {
return this.blockCache;
}
public long getGasPrice() {
// In case of the genesis block we don't want to rely on the min gas price
return lastBlock.isGenesis() ? lastBlock.getMinGasPrice() : INITIAL_MIN_GAS_PRICE;
}
public Block getLastBlock() {
return lastBlock;
}
public void setLastBlock(Block block) {
this.lastBlock = block;
}
public byte[] getLatestBlockHash() {
if (blockCache.isEmpty())
return Genesis.getInstance().getHash();
@ -450,4 +434,21 @@ public class Blockchain {
repository.delete(address.getNoLeadZeroesData());
}
}
public BlockQueue getBlockQueue() {
return blockQueue;
}
public Map<Long, byte[]> getBlockCache() {
return this.blockCache;
}
public Block getLastBlock() {
return lastBlock;
}
public void setLastBlock(Block block) {
this.lastBlock = block;
}
}

View File

@ -15,8 +15,8 @@ import java.net.InetAddress;
public interface Ethereum {
public PeerData findPeer(PeerData peerData);
public PeerData findPeer();
public PeerData findPeer(PeerData peerData) throws InterruptedException;
public PeerData findPeer() throws InterruptedException;
public void stopPeerDiscover();

View File

@ -1,5 +1,8 @@
package org.ethereum.facade;
import java.net.InetAddress;
import java.util.concurrent.BlockingQueue;
import org.ethereum.core.Block;
import org.ethereum.listener.EthereumListener;
import org.ethereum.manager.WorldManager;
@ -8,9 +11,6 @@ import org.ethereum.net.client.PeerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.List;
/**
* www.ethereumJ.com
*
@ -20,36 +20,34 @@ import java.util.List;
public class EthereumImpl implements Ethereum {
private Logger logger = LoggerFactory.getLogger("facade");
private static final Logger logger = LoggerFactory.getLogger("facade");
public EthereumImpl() {
}
public EthereumImpl() {}
/**
* Find a peer but not this one
* @param peerData - peer to exclude
* @return online peer
* @throws InterruptedException
*/
@Override
public PeerData findPeer(PeerData peerData){
public PeerData findPeer(PeerData peerData) throws InterruptedException {
logger.info("Looking for online peer");
EthereumListener listener = WorldManager.getInstance().getListener();
if (listener != null)
logger.info("Looking for online peers...");
final EthereumListener listener = WorldManager.getInstance().getListener();
if (listener != null) {
listener.trace("Looking for online peer");
}
WorldManager.getInstance().startPeerDiscovery();
List<PeerData> peers = WorldManager.getInstance().getPeers();
boolean found = false;
int i = 0;
while (!found){
if (peers.isEmpty()) { sleep10Milli(); continue;}
if (peers.size()<= i) { i=0; continue;}
PeerData peer = peers.get(i);
final BlockingQueue<PeerData> peers = WorldManager.getInstance().getPeers();
PeerData peer = null;
while ((peer = peers.take()) != null) { // it blocks until a peer is available.
if (peer.isOnline() && !peer.equals(peerData)){
logger.info("Found peer: {}", peer.toString());
@ -59,13 +57,12 @@ public class EthereumImpl implements Ethereum {
return peer;
}
++i;
}
return null;
}
@Override
public PeerData findPeer(){
public PeerData findPeer() throws InterruptedException {
return findPeer(null);
}
@ -103,16 +100,6 @@ public class EthereumImpl implements Ethereum {
WorldManager.getInstance().addListener(listener);
}
private void sleep10Milli(){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void loadBlockChain() {
WorldManager.getInstance().loadBlockchain();

View File

@ -111,10 +111,8 @@ public class PeersTableModel extends AbstractTableModel {
synchronized (peerInfoList) {
peerInfoList.clear();
List<PeerData> peers = WorldManager.getInstance().getPeers();
for (int i = 0; i < peers.size(); ++i) {
PeerData peer = peers.get(i);
final Queue<PeerData> peers = WorldManager.getInstance().getPeers();
for (PeerData peer : peers) {
InetAddress addr = peer.getInetAddress();
Location cr = IpGeoDB.getLocationForIp(addr);
peerInfoList.add(new PeerInfo(cr, addr, peer.isOnline(), peer.getHandshake(), peer.getLastCheckTime()));

View File

@ -5,8 +5,12 @@ import static org.ethereum.config.SystemProperties.CONFIG;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.ethereum.core.AccountState;
import org.ethereum.core.Blockchain;
@ -30,23 +34,27 @@ import org.ethereum.net.peerdiscovery.PeerDiscovery;
public class WorldManager {
private Blockchain blockchain;
private Repository repository;
private final Repository repository;
private Wallet wallet;
private PeerDiscovery peerDiscovery;
private List<PeerData> peers = new CopyOnWriteArrayList<PeerData>();
private final BlockingQueue<PeerData> peers = new LinkedBlockingQueue<>();
private ClientPeer activePeer;
private EthereumListener listener;
private static WorldManager instance;
private static final class WorldManagerHolder {
private static final WorldManager instance = new WorldManager();
}
private WorldManager() {
this.repository = new Repository();
this.blockchain = new Blockchain(repository);
// Initialize PeerData
List<PeerData> peerDataList = parsePeerDiscoveryIpList(CONFIG.peerDiscoveryIPList());
List<PeerData> peerDataList = parsePeerDiscoveryIpList(CONFIG.peerDiscoveryIPList());
peers.addAll(peerDataList);
peerDiscovery = new PeerDiscovery(peers);
@ -66,18 +74,66 @@ public class WorldManager {
}
public static WorldManager getInstance() {
if(instance == null) {
instance = new WorldManager();
}
return instance;
return WorldManagerHolder.instance;
}
public void setRepository(Repository repository) {
this.repository = repository;
}
public void setBlockchain(Blockchain blockchain) {
this.blockchain = blockchain;
public void addListener(EthereumListener listener){
this.listener = listener;
}
public void addPeers(final Set<PeerData> newPeers) {
for (final PeerData peer : newPeers) {
peers.add(peer);
if (peerDiscovery.isStarted()) {
peerDiscovery.addNewPeerData(peer);
}
}
}
public void startPeerDiscovery() {
if (!peerDiscovery.isStarted())
peerDiscovery.start();
};
public void stopPeerDiscover(){
if (listener != null)
listener.trace("Stopping peer discovery");
if (peerDiscovery.isStarted())
peerDiscovery.stop();
}
public void close() {
repository.close();
}
public EthereumListener getListener() {
return listener;
}
public List<PeerData> parsePeerDiscoveryIpList(final String peerDiscoveryIpList){
final List<String> ipList = Arrays.asList( peerDiscoveryIpList.split(",") );
final List<PeerData> peers = new ArrayList<>();
for (String ip : ipList){
String[] addr = ip.trim().split(":");
String ip_trim = addr[0];
String port_trim = addr[1];
try {
InetAddress iAddr = InetAddress.getByName(ip_trim);
int port = Integer.parseInt(port_trim);
PeerData peerData = new PeerData(iAddr.getAddress(), port, new byte[]{00});
peers.add(peerData);
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
return peers;
}
public void setWallet(Wallet wallet) {
@ -108,68 +164,8 @@ public class WorldManager {
return activePeer;
}
public List<PeerData> getPeers() {
return peers;
}
public void addListener(EthereumListener listener){
this.listener = listener;
}
public void addPeers(List<PeerData> newPeers) {
for (PeerData peer : newPeers) {
if (this.peers.indexOf(peer) == -1) {
this.peers.add(peer);
if (peerDiscovery.isStarted())
peerDiscovery.addNewPeerData(peer);
}
}
}
public void startPeerDiscovery() {
if (!peerDiscovery.isStarted())
peerDiscovery.start();
};
public void stopPeerDiscover(){
if (listener != null)
listener.trace("Stopping peer discovery");
if (peerDiscovery.isStarted())
peerDiscovery.stop();
}
public void close() {
repository.close();
public BlockingQueue<PeerData> getPeers() {
return peers;
}
public EthereumListener getListener() {
return listener;
}
public List<PeerData> parsePeerDiscoveryIpList(String peerDiscoveryIpList){
List<String> ipList = Arrays.asList( peerDiscoveryIpList.split(",") );
List<PeerData> peers = new ArrayList<>();
for (String ip : ipList){
String[] addr = ip.trim().split(":");
String ip_trim = addr[0];
String port_trim = addr[1];
try {
InetAddress iAddr = InetAddress.getByName(ip_trim);
int port = Integer.parseInt(port_trim);
PeerData peerData = new PeerData(iAddr.getAddress(), port, new byte[]{00});
peers.add(peerData);
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
return peers;
}
}

View File

@ -16,8 +16,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spongycastle.util.encoders.Hex;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import static org.ethereum.config.SystemProperties.CONFIG;
@ -86,7 +89,7 @@ public class ClientPeer {
handler.killTimers();
List<PeerData> peers = WorldManager.getInstance().getPeers();
final Queue<PeerData> peers = WorldManager.getInstance().getPeers();
for (PeerData peer : peers){
if (host.equals(peer.getInetAddress().getHostAddress()) &&

View File

@ -2,7 +2,9 @@ package org.ethereum.net.message;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.ethereum.net.Command.PEERS;
@ -21,7 +23,7 @@ public class PeersMessage extends Message {
private boolean parsed = false;
private List<PeerData> peers = new ArrayList<PeerData>();
private final Set<PeerData> peers = new HashSet<PeerData>();
public PeersMessage(byte[] payload) {
super(RLP.decode2(payload));
@ -67,7 +69,7 @@ public class PeersMessage extends Message {
return payload;
}
public List<PeerData> getPeers() {
public Set<PeerData> getPeers() {
if (!parsed)
parseRLP();
return peers;

View File

@ -28,7 +28,7 @@ import static org.ethereum.net.Command.*;
*/
public class EthereumPeerTasterHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger("peerdiscovery");
private final static Logger logger = LoggerFactory.getLogger("peerdiscovery");
private final static byte[] MAGIC_PREFIX = {(byte)0x22, (byte)0x40, (byte)0x08, (byte)0x91};

View File

@ -4,8 +4,11 @@ import org.ethereum.net.client.PeerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.ethereum.config.SystemProperties.CONFIG;
@ -15,18 +18,19 @@ import static org.ethereum.config.SystemProperties.CONFIG;
* Created on: 22/05/2014 09:10
*/
public class PeerDiscovery {
private static final Logger logger = LoggerFactory.getLogger("peerdiscovery");
private RejectedExecutionHandlerImpl rejectionHandler;
private ThreadFactory threadFactory;
private ThreadPoolExecutor executorPool;
private PeerDiscoveryMonitorThread monitor;
private List<PeerData> peers;
private final Queue<PeerData> peers;
Logger logger = LoggerFactory.getLogger("peerdiscovery");
private final AtomicBoolean started = new AtomicBoolean(false);
private boolean started = false;
public PeerDiscovery(List<PeerData> peers) {
public PeerDiscovery(Queue<PeerData> peers) {
this.peers = peers;
}
@ -50,11 +54,11 @@ public class PeerDiscovery {
for (PeerData peerData : this.peers) {
executorPool.execute(new WorkerThread(peerData, executorPool));
}
started = true;
started.set(true);
}
public void addNewPeerData(PeerData peerData) {
logger.debug("add new peer for discovery: {}", peerData);
executorPool.execute(new WorkerThread(peerData, executorPool));
}
@ -65,7 +69,7 @@ public class PeerDiscovery {
}
public boolean isStarted() {
return started;
return started.get();
}
}

View File

@ -7,11 +7,11 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadPoolExecutor;
public class PeerDiscoveryMonitorThread implements Runnable {
private Logger logger = LoggerFactory.getLogger("peerdiscovery");
private final static Logger logger = LoggerFactory.getLogger("peerdiscovery");
private ThreadPoolExecutor executor;
private int seconds;
private boolean run=true;
private volatile boolean run = true;
public PeerDiscoveryMonitorThread(ThreadPoolExecutor executor, int delay) {
this.executor = executor;

View File

@ -19,7 +19,7 @@ import static org.ethereum.config.SystemProperties.CONFIG;
*/
public class PeerTaster {
private Logger logger = LoggerFactory.getLogger("peerdiscovery");
private final static Logger logger = LoggerFactory.getLogger("peerdiscovery");
final EthereumPeerTasterHandler handler = new EthereumPeerTasterHandler();
public PeerTaster() {

View File

@ -14,7 +14,7 @@ import java.util.concurrent.ThreadPoolExecutor;
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
Logger logger = LoggerFactory.getLogger("peerdiscovery");
private static final Logger logger = LoggerFactory.getLogger("peerdiscovery");
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

View File

@ -13,7 +13,7 @@ import java.util.concurrent.ThreadPoolExecutor;
*/
public class WorkerThread implements Runnable {
Logger logger = LoggerFactory.getLogger("peerdiscovery");
private final static Logger logger = LoggerFactory.getLogger("peerdiscovery");
ThreadPoolExecutor poolExecutor;
private PeerData peerData;

View File

@ -21,6 +21,7 @@ import java.net.URL;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.junit.Assert.assertEquals;
@ -121,7 +122,8 @@ public class MessagesTest {
assertEquals(2, peersMessage.getPeers().size());
PeerData peerData = peersMessage.getPeers().get(1);
Iterator<PeerData> it = peersMessage.getPeers().iterator(); it.next();
PeerData peerData = it.next();
assertEquals("/81.99.225.18", peerData.getInetAddress().toString());
assertEquals(30303, peerData.getPort());