Pending transaction save in external datasource
This commit is contained in:
parent
2c2a0f2f37
commit
43b956a106
|
@ -50,6 +50,7 @@ public class SystemProperties {
|
|||
private final static String DEFAULT_VM_TRACE_DIR = "dmp";
|
||||
private final static int DEFAULT_PEER_LISTEN_PORT = 30303;
|
||||
private final static String DEFAULT_KEY_VALUE_DATA_SOURCE = "leveldb";
|
||||
private final static boolean DEFAULT_REDIS_ENABLED = true;
|
||||
|
||||
|
||||
/* Testing */
|
||||
|
@ -239,7 +240,11 @@ public class SystemProperties {
|
|||
}
|
||||
|
||||
public boolean vmTrace() {
|
||||
return Boolean.parseBoolean(prop.getProperty("vm.structured.trace", String.valueOf(DEFAULT_VM_TRACE)));
|
||||
return boolProperty("vm.structured.trace", DEFAULT_VM_TRACE);
|
||||
}
|
||||
|
||||
private boolean boolProperty(String key, Boolean defaultValue) {
|
||||
return Boolean.parseBoolean(prop.getProperty(key, String.valueOf(defaultValue)));
|
||||
}
|
||||
|
||||
public String vmTraceDir() {
|
||||
|
@ -250,10 +255,13 @@ public class SystemProperties {
|
|||
return Integer.parseInt(prop.getProperty("peer.listen.port", String.valueOf(DEFAULT_PEER_LISTEN_PORT)));
|
||||
}
|
||||
|
||||
|
||||
public String getKeyValueDataSource() {
|
||||
return prop.getProperty("keyvalue.datasource", DEFAULT_KEY_VALUE_DATA_SOURCE);
|
||||
}
|
||||
|
||||
public boolean isRedisEnabled() {
|
||||
return boolProperty("redis.enabled", DEFAULT_REDIS_ENABLED);
|
||||
}
|
||||
|
||||
public void setListenPort(Integer port) {
|
||||
prop.setProperty("peer.listen.port", port.toString());
|
||||
|
|
|
@ -16,9 +16,11 @@ import org.slf4j.LoggerFactory;
|
|||
import org.spongycastle.util.encoders.Hex;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.FileSystemUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
|
@ -72,7 +74,9 @@ public class BlockchainImpl implements Blockchain {
|
|||
// to avoid using minGasPrice=0 from Genesis for the wallet
|
||||
private static final long INITIAL_MIN_GAS_PRICE = 10 * SZABO.longValue();
|
||||
|
||||
private final Set<Transaction> pendingTransactions = Collections.synchronizedSet(new HashSet<Transaction>());
|
||||
@Resource
|
||||
@Qualifier("pendingTransactions")
|
||||
private Set<Transaction> pendingTransactions;
|
||||
|
||||
@Autowired
|
||||
private Repository repository;
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
package org.ethereum.datasource.redis;
|
||||
|
||||
import org.ethereum.core.Transaction;
|
||||
import org.ethereum.datasource.KeyValueDataSource;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public interface RedisConnection {
|
||||
|
||||
boolean isAvailable();
|
||||
|
||||
<T> Set<T> createSetFor(Class<T> clazz, String name);
|
||||
|
||||
Set<Transaction> createTransactionSet(String name);
|
||||
|
||||
KeyValueDataSource createKeyValueDataSource(String name);
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
package org.ethereum.datasource.redis;
|
||||
|
||||
import org.ethereum.config.SystemProperties;
|
||||
import org.ethereum.core.Transaction;
|
||||
import org.ethereum.datasource.KeyValueDataSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.JedisPoolConfig;
|
||||
import redis.clients.jedis.Protocol;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.net.URI;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.springframework.util.StringUtils.isEmpty;
|
||||
|
||||
@Component
|
||||
public class RedisConnectionImpl implements RedisConnection {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger("db");
|
||||
|
||||
private JedisPool jedisPool;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
if (!SystemProperties.CONFIG.isRedisEnabled()) return;
|
||||
|
||||
String redisCloudUrl = System.getenv("REDISCLOUD_URL");
|
||||
if (isEmpty(redisCloudUrl)) {
|
||||
logger.info("Cannot connect to Redis. 'REDISCLOUD_URL' environment variable is not defined.");
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info("Redis pool creating: " + redisCloudUrl);
|
||||
try {
|
||||
jedisPool = createJedisPool(new URI(redisCloudUrl));
|
||||
} catch (Exception e) {
|
||||
logger.warn("Cannot connect to Redis cloud: ", e);
|
||||
} finally {
|
||||
logger.info(isAvailable() ? "Redis cloud connected successfully." : "Redis cloud connection failed.");
|
||||
}
|
||||
}
|
||||
|
||||
private static JedisPool createJedisPool(URI redisUri) {
|
||||
String userInfo = redisUri.getUserInfo();
|
||||
if (StringUtils.hasText(userInfo)) {
|
||||
return new JedisPool(new JedisPoolConfig(),
|
||||
redisUri.getHost(),
|
||||
redisUri.getPort(),
|
||||
Protocol.DEFAULT_TIMEOUT,
|
||||
userInfo.split(":", 2)[1]);
|
||||
}
|
||||
|
||||
return new JedisPool(new JedisPoolConfig(),
|
||||
redisUri.getHost(),
|
||||
redisUri.getPort(),
|
||||
Protocol.DEFAULT_TIMEOUT);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
if (jedisPool != null) {
|
||||
jedisPool.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAvailable() {
|
||||
boolean available = jedisPool != null;
|
||||
if (available) {
|
||||
Jedis jedis = jedisPool.getResource();
|
||||
try {
|
||||
available = jedis.isConnected();
|
||||
} catch (Throwable t) {
|
||||
logger.warn("Connection testing failed: ", t);
|
||||
available = false;
|
||||
} finally {
|
||||
jedisPool.returnResource(jedis);
|
||||
}
|
||||
}
|
||||
return available;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Set<T> createSetFor(Class<T> clazz, String name) {
|
||||
return new RedisSet<T>(name, jedisPool, Serializers.forClass(clazz));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Transaction> createTransactionSet(String name) {
|
||||
return createSetFor(Transaction.class, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyValueDataSource createKeyValueDataSource(String name) {
|
||||
return new RedisKeyValueDataSource(name, jedisPool, null);
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,91 @@
|
|||
package org.ethereum.datasource.redis;
|
||||
|
||||
import org.ethereum.datasource.KeyValueDataSource;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.Pipeline;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.ethereum.config.SystemProperties.CONFIG;
|
||||
import static org.ethereum.util.Functional.*;
|
||||
|
||||
public class RedisKeyValueDataSource extends RedisStorage<byte[]> implements KeyValueDataSource {
|
||||
|
||||
RedisKeyValueDataSource(String namespace, JedisPool pool, RedisSerializer<byte[]> serializer) {
|
||||
super(namespace, pool, serializer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
if (CONFIG.databaseReset()) {
|
||||
pooled(new Consumer<Jedis>() {
|
||||
@Override
|
||||
public void accept(Jedis jedis) {
|
||||
jedis.flushAll();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setName(String name) {
|
||||
setNamespace(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] get(final byte[] key) {
|
||||
return pooledWithResult(new Function<Jedis, byte[]>() {
|
||||
@Override
|
||||
public byte[] apply(Jedis jedis) {
|
||||
return jedis.get(getNamespace());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(final byte[] key, final byte[] value) {
|
||||
pooled(new Consumer<Jedis>() {
|
||||
@Override
|
||||
public void accept(Jedis jedis) {
|
||||
jedis.set(formatKey(key), value);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(final byte[] key) {
|
||||
pooled(new Consumer<Jedis>() {
|
||||
@Override
|
||||
public void accept(Jedis jedis) {
|
||||
jedis.del(formatKey(key));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<byte[]> keys() {
|
||||
return pooledWithResult(new Function<Jedis, Set<byte[]>>() {
|
||||
@Override
|
||||
public Set<byte[]> apply(Jedis jedis) {
|
||||
return jedis.keys(formatKey("*".getBytes()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateBatch(final Map<byte[], byte[]> rows) {
|
||||
doInPipeline(rows.entrySet(), new BiConsumer<Map.Entry<byte[], byte[]>, Pipeline>() {
|
||||
@Override
|
||||
public void accept(Map.Entry<byte[], byte[]> entry, Pipeline pipeline) {
|
||||
pipeline.set(formatKey(entry.getKey()), entry.getValue());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,152 @@
|
|||
package org.ethereum.datasource.redis;
|
||||
|
||||
import org.apache.commons.collections4.Transformer;
|
||||
import org.apache.commons.collections4.keyvalue.AbstractMapEntry;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.commons.collections4.CollectionUtils.collect;
|
||||
import static org.ethereum.util.Functional.Consumer;
|
||||
import static org.ethereum.util.Functional.Function;
|
||||
|
||||
public class RedisMap<K, V> extends RedisStorage<V> implements Map<K, V> {
|
||||
|
||||
private final RedisSerializer<K> keySerializer;
|
||||
|
||||
RedisMap(String namespace, JedisPool pool, RedisSerializer<K> keySerializer, RedisSerializer<V> valueSerializer) {
|
||||
super(namespace, pool, valueSerializer);
|
||||
this.keySerializer = keySerializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return pooledWithResult(new Function<Jedis, Integer>() {
|
||||
@Override
|
||||
public Integer apply(Jedis jedis) {
|
||||
return jedis.hlen(getNamespace()).intValue();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return size() == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsKey(final Object key) {
|
||||
return pooledWithResult(new Function<Jedis, Boolean>() {
|
||||
@Override
|
||||
public Boolean apply(Jedis jedis) {
|
||||
return jedis.hexists(getNamespace(), keySerializer.serialize((K) key));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsValue(Object value) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V get(final Object key) {
|
||||
return pooledWithResult(new Function<Jedis, V>() {
|
||||
@Override
|
||||
public V apply(Jedis jedis) {
|
||||
byte[] value = jedis.hget(getNamespace(), keySerializer.serialize((K) key));
|
||||
return deserialize(value);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public V put(K key, V value) {
|
||||
/*
|
||||
return pooledWithResult(new Function<Jedis, V>() {
|
||||
@Override
|
||||
public V apply(Jedis jedis) {
|
||||
return jedis.hset(getNamespace(), keySerializer.serialize(key), serialize(value));
|
||||
}
|
||||
});
|
||||
*/
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V remove(Object key) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putAll(Map<? extends K, ? extends V> m) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
pooled(new Consumer<Jedis>() {
|
||||
@Override
|
||||
public void accept(Jedis jedis) {
|
||||
jedis.del(getNamespace());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<K> keySet() {
|
||||
return pooledWithResult(new Function<Jedis, Set<K>>() {
|
||||
@Override
|
||||
public Set<K> apply(Jedis jedis) {
|
||||
Set<K> result = new HashSet<K>();
|
||||
collect(jedis.hkeys(getNamespace()), new Transformer<byte[], K>() {
|
||||
@Override
|
||||
public K transform(byte[] input) {
|
||||
return keySerializer.deserialize(input);
|
||||
}
|
||||
}, result);
|
||||
return result;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<V> values() {
|
||||
return pooledWithResult(new Function<Jedis, Collection<V>>() {
|
||||
@Override
|
||||
public Collection<V> apply(Jedis jedis) {
|
||||
return deserialize(jedis.hvals(getNamespace()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Entry<K, V>> entrySet() {
|
||||
return pooledWithResult(new Function<Jedis, Set<Entry<K, V>>>() {
|
||||
@Override
|
||||
public Set<Entry<K, V>> apply(Jedis jedis) {
|
||||
Set<Entry<K, V>> result = new HashSet<Entry<K, V>>();
|
||||
collect(jedis.hgetAll(getNamespace()).entrySet(), new Transformer<Entry<byte[], byte[]>, Entry<K, V>>() {
|
||||
@Override
|
||||
public Entry<K, V> transform(Entry<byte[], byte[]> input) {
|
||||
K key = keySerializer.deserialize(input.getKey());
|
||||
V value = deserialize(input.getValue());
|
||||
return new RedisMapEntry<K, V>(key, value);
|
||||
}
|
||||
}, result);
|
||||
return result;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private class RedisMapEntry<K, V> extends AbstractMapEntry<K, V> {
|
||||
|
||||
RedisMapEntry(K key, V value) {
|
||||
super(key, value);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
package org.ethereum.datasource.redis;
|
||||
|
||||
public interface RedisSerializer<T> {
|
||||
|
||||
boolean canSerialize(Object o);
|
||||
|
||||
byte[] serialize(T t);
|
||||
|
||||
T deserialize(byte[] bytes);
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
package org.ethereum.datasource.redis;
|
||||
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.ethereum.util.Functional.Consumer;
|
||||
import static org.ethereum.util.Functional.Function;
|
||||
|
||||
public class RedisSet<T> extends RedisStorage<T> implements Set<T> {
|
||||
|
||||
RedisSet(String namespace, JedisPool pool, RedisSerializer<T> serializer) {
|
||||
super(namespace, pool, serializer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return pooledWithResult(new Function<Jedis, Integer>() {
|
||||
@Override
|
||||
public Integer apply(Jedis jedis) {
|
||||
return jedis.scard(getNamespace()).intValue();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return size() == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(final Object o) {
|
||||
return pooledWithResult(new Function<Jedis, Boolean>() {
|
||||
@Override
|
||||
public Boolean apply(Jedis jedis) {
|
||||
return jedis.sismember(getNamespace(), serialize(o));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<T> iterator() {
|
||||
Set<byte[]> members = pooledWithResult(new Function<Jedis, Set<byte[]>>() {
|
||||
@Override
|
||||
public Set<byte[]> apply(Jedis jedis) {
|
||||
return jedis.smembers(getNamespace());
|
||||
}
|
||||
});
|
||||
return deserialize(members).iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] toArray() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T1> T1[] toArray(T1[] a) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean add(T t) {
|
||||
return addAll(Arrays.asList(t));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Object o) {
|
||||
return removeAll(Arrays.asList(o));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsAll(Collection<?> c) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addAll(final Collection<? extends T> c) {
|
||||
return pooledWithResult(new Function<Jedis, Boolean>() {
|
||||
@Override
|
||||
public Boolean apply(Jedis jedis) {
|
||||
return jedis.sadd(getNamespace(), serialize(c)) == 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean retainAll(final Collection<?> c) {
|
||||
return pooledWithResult(new Function<Jedis, Boolean>() {
|
||||
@Override
|
||||
public Boolean apply(Jedis jedis) {
|
||||
return jedis.sinterstore(getNamespace(), serialize(c)) == c.size();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeAll(final Collection<?> c) {
|
||||
return pooledWithResult(new Function<Jedis, Boolean>() {
|
||||
@Override
|
||||
public Boolean apply(Jedis jedis) {
|
||||
return jedis.srem(getNamespace(), serialize(c)) == 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
pooled(new Consumer<Jedis>() {
|
||||
@Override
|
||||
public void accept(Jedis jedis) {
|
||||
jedis.del(getNamespace());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
package org.ethereum.datasource.redis;
|
||||
|
||||
import org.apache.commons.collections4.Transformer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.Pipeline;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.apache.commons.collections4.CollectionUtils.collect;
|
||||
import static org.ethereum.util.Functional.*;
|
||||
|
||||
public abstract class RedisStorage<T> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger("db");
|
||||
|
||||
private String namespace;
|
||||
private final JedisPool pool;
|
||||
private final RedisSerializer<T> serializer;
|
||||
|
||||
RedisStorage(String namespace, JedisPool pool, RedisSerializer<T> serializer) {
|
||||
this.pool = pool;
|
||||
this.serializer = serializer;
|
||||
|
||||
setNamespace(namespace);
|
||||
}
|
||||
|
||||
protected byte[] getNamespace() {
|
||||
return namespace.getBytes();
|
||||
}
|
||||
|
||||
protected void setNamespace(String namespace) {
|
||||
this.namespace = namespace + ":";
|
||||
}
|
||||
|
||||
protected byte[] formatKey(byte[] key) {
|
||||
byte[] prefix = getNamespace();
|
||||
|
||||
int length = prefix.length + key.length;
|
||||
byte[] result = new byte[length];
|
||||
System.arraycopy(prefix, 0, result, 0, prefix.length);
|
||||
System.arraycopy(key, 0, result, prefix.length, key.length);
|
||||
return result;
|
||||
}
|
||||
|
||||
protected byte[] serialize(Object o) {
|
||||
if (serializer.canSerialize(o)) {
|
||||
return serializer.serialize((T) o);
|
||||
}
|
||||
|
||||
logger.warn("Cannot serialize '%s'.", o.getClass());
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
protected byte[][] serialize(Collection<?> collection) {
|
||||
return collect(collection, new Transformer<Object, byte[]>() {
|
||||
@Override
|
||||
public byte[] transform(Object input) {
|
||||
return serialize(input);
|
||||
}
|
||||
}).toArray(new byte[][]{});
|
||||
}
|
||||
|
||||
protected T deserialize(byte[] bytes) {
|
||||
return serializer.deserialize(bytes);
|
||||
}
|
||||
|
||||
|
||||
protected Collection<T> deserialize(Collection<byte[]> bytesCollection) {
|
||||
return collect(bytesCollection, new Transformer<byte[], T>() {
|
||||
@Override
|
||||
public T transform(byte[] input) {
|
||||
return deserialize(input);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected <P> void doInPipeline(final Collection<P> collection, final BiConsumer<P, Pipeline> consumer) {
|
||||
pooled(new Consumer<Jedis>() {
|
||||
@Override
|
||||
public void accept(Jedis jedis) {
|
||||
Pipeline pipeline = jedis.pipelined();
|
||||
try {
|
||||
for (P el : collection) {
|
||||
consumer.accept(el, pipeline);
|
||||
}
|
||||
} finally {
|
||||
pipeline.sync();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected void pooled(final Consumer<Jedis> consumer) {
|
||||
pooledWithResult(new Function<Jedis, Object>() {
|
||||
@Override
|
||||
public Object apply(Jedis jedis) {
|
||||
consumer.accept(jedis);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected <R> R pooledWithResult(Function<Jedis, R> function) {
|
||||
Jedis jedis = pool.getResource();
|
||||
try {
|
||||
return function.apply(jedis);
|
||||
} finally {
|
||||
pool.returnResource(jedis);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,117 @@
|
|||
package org.ethereum.datasource.redis;
|
||||
|
||||
import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.type.TypeFactory;
|
||||
import org.ethereum.core.Transaction;
|
||||
import org.ethereum.core.TransactionReceipt;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
public final class Serializers {
|
||||
|
||||
private static abstract class BaseRedisSerializer<T> implements RedisSerializer<T> {
|
||||
|
||||
public abstract boolean supports(Class<?> aClass);
|
||||
|
||||
@Override
|
||||
public boolean canSerialize(Object o) {
|
||||
return (o != null) && supports(o.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
private static class JacksonJsonRedisSerializer<T> implements RedisSerializer<T> {
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private final JavaType javaType;
|
||||
|
||||
JacksonJsonRedisSerializer(Class<T> clazz) {
|
||||
this.javaType = TypeFactory.defaultInstance().constructType(clazz);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T deserialize(byte[] bytes) {
|
||||
if (isEmpty(bytes)) return null;
|
||||
|
||||
try {
|
||||
return (T) this.objectMapper.readValue(bytes, 0, bytes.length, javaType);
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException("Cannot read JSON: " + ex.getMessage(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(Object t) {
|
||||
if (t == null) return EMPTY_ARRAY;
|
||||
|
||||
try {
|
||||
return this.objectMapper.writeValueAsBytes(t);
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException("Cannot write JSON: " + ex.getMessage(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canSerialize(Object o) {
|
||||
return (o != null) && javaType.hasRawClass(o.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
private static class TransactionReceiptSerializer extends BaseRedisSerializer<TransactionReceipt> {
|
||||
|
||||
@Override
|
||||
public boolean supports(Class<?> aClass) {
|
||||
return TransactionReceipt.class.isAssignableFrom(aClass);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public byte[] serialize(TransactionReceipt transactionReceipt) {
|
||||
return transactionReceipt.getEncoded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransactionReceipt deserialize(byte[] bytes) {
|
||||
return new TransactionReceipt(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
private static class TransactionSerializer extends BaseRedisSerializer<Transaction> {
|
||||
|
||||
@Override
|
||||
public boolean supports(Class<?> aClass) {
|
||||
return Transaction.class.isAssignableFrom(aClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(Transaction transaction) {
|
||||
return transaction.getEncoded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Transaction deserialize(byte[] bytes) {
|
||||
return new Transaction(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static final byte[] EMPTY_ARRAY = new byte[0];
|
||||
private static final Set<? extends BaseRedisSerializer> SERIALIZERS = new HashSet<BaseRedisSerializer>() {{
|
||||
add(new TransactionSerializer());
|
||||
add(new TransactionReceiptSerializer());
|
||||
}};
|
||||
|
||||
public static <T> RedisSerializer<T> forClass(Class<T> clazz) {
|
||||
for (BaseRedisSerializer serializer : SERIALIZERS) {
|
||||
if (serializer.supports(clazz)) {
|
||||
return serializer;
|
||||
}
|
||||
}
|
||||
return new JacksonJsonRedisSerializer(clazz);
|
||||
}
|
||||
|
||||
private static boolean isEmpty(byte[] data) {
|
||||
return (data == null || data.length == 0);
|
||||
}
|
||||
}
|
|
@ -1,13 +1,15 @@
|
|||
package org.ethereum.facade;
|
||||
|
||||
import org.ethereum.config.SystemProperties;
|
||||
import org.ethereum.core.Transaction;
|
||||
import org.ethereum.datasource.KeyValueDataSource;
|
||||
import org.ethereum.datasource.LevelDbDataSource;
|
||||
import org.ethereum.datasource.RedisDataSource;
|
||||
import org.ethereum.datasource.redis.RedisConnection;
|
||||
import org.ethereum.datasource.redis.RedisConnectionImpl;
|
||||
import org.ethereum.db.BlockStore;
|
||||
import org.ethereum.db.BlockStoreImpl;
|
||||
import org.ethereum.db.RepositoryImpl;
|
||||
import org.hibernate.Session;
|
||||
import org.hibernate.SessionFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -24,7 +26,10 @@ import org.springframework.transaction.annotation.Propagation;
|
|||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.ethereum.config.SystemProperties.CONFIG;
|
||||
|
||||
|
@ -59,6 +64,21 @@ public class DefaultConfig {
|
|||
return new LevelDbDataSource();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RedisConnection redisConnection() {
|
||||
RedisConnectionImpl connection = new RedisConnectionImpl();
|
||||
connection.init();
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Set<Transaction> pendingTransactions() {
|
||||
RedisConnection connection = redisConnection();
|
||||
return connection.isAvailable()
|
||||
? connection.createTransactionSet("pendingTransactions")
|
||||
: Collections.synchronizedSet(new HashSet<Transaction>());
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Transactional(propagation = Propagation.SUPPORTS)
|
||||
public BlockStore blockStore(SessionFactory sessionFactory){
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package org.ethereum.facade;
|
||||
|
||||
import org.ethereum.config.SystemProperties;
|
||||
import org.ethereum.core.Transaction;
|
||||
import org.ethereum.datasource.KeyValueDataSource;
|
||||
import org.ethereum.datasource.LevelDbDataSource;
|
||||
import org.ethereum.datasource.RedisDataSource;
|
||||
import org.ethereum.datasource.redis.RedisConnection;
|
||||
import org.ethereum.datasource.redis.RedisConnectionImpl;
|
||||
import org.ethereum.db.BlockStore;
|
||||
import org.ethereum.db.BlockStoreImpl;
|
||||
import org.ethereum.db.InMemoryBlockStore;
|
||||
import org.ethereum.db.RepositoryImpl;
|
||||
import org.hibernate.SessionFactory;
|
||||
|
@ -24,7 +26,10 @@ import org.springframework.transaction.annotation.Propagation;
|
|||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.ethereum.config.SystemProperties.CONFIG;
|
||||
|
||||
|
@ -62,6 +67,20 @@ public class RemoteConfig {
|
|||
return new LevelDbDataSource();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RedisConnection redisConnection() {
|
||||
RedisConnectionImpl connection = new RedisConnectionImpl();
|
||||
connection.init();
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Set<Transaction> pendingTransactions() {
|
||||
RedisConnection connection = redisConnection();
|
||||
return connection.isAvailable()
|
||||
? connection.createTransactionSet("pendingTransactions")
|
||||
: Collections.synchronizedSet(new HashSet<Transaction>());
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Transactional(propagation = Propagation.SUPPORTS)
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
package org.ethereum.util;
|
||||
|
||||
|
||||
public interface Functional {
|
||||
|
||||
/**
|
||||
* Represents an operation that accepts a single input argument and returns no
|
||||
* result. Unlike most other functional interfaces, {@code Consumer} is expected
|
||||
* to operate via side-effects.
|
||||
*
|
||||
* @param <T> the type of the input to the operation
|
||||
*/
|
||||
public static interface Consumer<T> {
|
||||
|
||||
/**
|
||||
* Performs this operation on the given argument.
|
||||
*
|
||||
* @param t the input argument
|
||||
*/
|
||||
void accept(T t);
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents an operation that accepts two input arguments and returns no
|
||||
* result. This is the two-arity specialization of {@link java.util.function.Consumer}.
|
||||
* Unlike most other functional interfaces, {@code BiConsumer} is expected
|
||||
* to operate via side-effects.
|
||||
*
|
||||
* @param <T> the type of the first argument to the operation
|
||||
* @param <U> the type of the second argument to the operation
|
||||
*
|
||||
* @see org.ethereum.util.Functional.Consumer
|
||||
*/
|
||||
public interface BiConsumer<T, U> {
|
||||
|
||||
/**
|
||||
* Performs this operation on the given arguments.
|
||||
*
|
||||
* @param t the first input argument
|
||||
* @param u the second input argument
|
||||
*/
|
||||
void accept(T t, U u);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Represents a function that accepts one argument and produces a result.
|
||||
*
|
||||
* @param <T> the type of the input to the function
|
||||
* @param <R> the type of the result of the function
|
||||
*/
|
||||
public static interface Function<T, R> {
|
||||
|
||||
/**
|
||||
* Applies this function to the given argument.
|
||||
*
|
||||
* @param t the function argument
|
||||
* @return the function result
|
||||
*/
|
||||
R apply(T t);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
package test.ethereum.datasource;
|
||||
|
||||
import org.ethereum.core.Transaction;
|
||||
import org.ethereum.crypto.ECKey;
|
||||
import org.ethereum.crypto.HashUtil;
|
||||
import org.ethereum.datasource.redis.RedisConnection;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.spongycastle.util.encoders.Hex;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
import org.springframework.test.context.support.AnnotationConfigContextLoader;
|
||||
import test.ethereum.TestContext;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration(loader = AnnotationConfigContextLoader.class)
|
||||
public class RedisConnectionTest {
|
||||
|
||||
@Configuration
|
||||
@ComponentScan(basePackages = "org.ethereum")
|
||||
static class ContextConfiguration extends TestContext { }
|
||||
|
||||
@Autowired
|
||||
private RedisConnection redisConnection;
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
if (!redisConnection.isAvailable()) return;
|
||||
|
||||
Set<Pojo> pojos = redisConnection.createSetFor(Pojo.class, "pojos");
|
||||
|
||||
Pojo pojo = Pojo.create(1L, "test");
|
||||
pojos.add(pojo);
|
||||
assertTrue(pojos.contains(pojo));
|
||||
assertEquals(1, pojos.size());
|
||||
Pojo next = pojos.iterator().next();
|
||||
assertNotNull(next);
|
||||
assertEquals(pojo.getId(), next.getId());
|
||||
assertEquals(pojo.getName(), next.getName());
|
||||
assertTrue(pojos.remove(pojo));
|
||||
assertTrue(pojos.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void transactionStorageTest() {
|
||||
if (!redisConnection.isAvailable()) return;
|
||||
|
||||
String namespace = "txnNamespace";
|
||||
Set<Transaction> transactions = redisConnection.createTransactionSet(namespace);
|
||||
transactions.add(createTransaction("09184e72a000", "4255", "1000000000000000000000", "cat"));
|
||||
transactions.add(createTransaction("09184e72a000", "4255", "1000000000000000000000", "dog"));
|
||||
transactions.add(createTransaction("09184e72a000", "4255", "1000000000000000000000", "rabbit"));
|
||||
|
||||
Set<Transaction> transactions1 = redisConnection.createTransactionSet(namespace);
|
||||
transactions1.add(createTransaction("09184e72a000", "4255", "1000000000000000000000", "duck"));
|
||||
transactions1.add(createTransaction("09184e72a000", "4255", "1000000000000000000000", "chicken"));
|
||||
transactions1.add(createTransaction("09184e72a000", "4255", "1000000000000000000000", "cow"));
|
||||
|
||||
assertEquals(6, transactions1.size());
|
||||
transactions.clear();
|
||||
assertTrue(transactions1.isEmpty());
|
||||
}
|
||||
|
||||
private static class Pojo {
|
||||
private long id;
|
||||
private String name;
|
||||
|
||||
public long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public static Pojo create(long id, String name) {
|
||||
Pojo result = new Pojo();
|
||||
result.setId(id);
|
||||
result.setName(name);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
public static Transaction createTransaction(String gasPrice, String gas, String val, String secret) {
|
||||
|
||||
ECKey ecKey = ECKey.fromPrivate(HashUtil.sha3(secret.getBytes()));
|
||||
|
||||
// Tn (nonce); Tp(pgas); Tg(gaslimi); Tt(value); Tv(value); Ti(sender); Tw; Tr; Ts
|
||||
return new Transaction(null, Hex.decode(gasPrice), Hex.decode(gas), ecKey.getAddress(),
|
||||
new BigInteger(val).toByteArray(),
|
||||
null);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue