2025-01-02 20:00:49 +01:00

334 lines
11 KiB
Python

import numpy as np
import pandas as pd
from typing import List, Tuple, Dict
from dataclasses import dataclass, field
import uuid
import copy
PROTOCOL_CONSTANTS = {
"TARGET_GAS_USED": 12500000.0, # 12.5 million gas
"MAX_GAS_ALLOWED": 25000000.0, # 25 million gas
"INITIAL_BASEFEE": 1.0, # 10^9 wei = 1 Gwei
"MIN_PRICE": 0.5, # 10^8 wei = 0.1 Gwei
"MAX_PRICE": 10.0 # 10^11 wei = 100 Gwei
}
class Transaction:
def __init__(self, gas_used:float, fee_cap:float, tip:float):
self.gas_used = gas_used
self.fee_cap = fee_cap
self.tip = tip # this is ignored in the case of Stable Fee
self.tx_hash = uuid.uuid4().int
@dataclass
class MemPool:
pool: Dict[int, Transaction] = field(default_factory=dict)
def add_tx(self, tx: Transaction):
self.pool[tx.tx_hash] = tx
def add_txs(self, txs: List[Transaction]):
for tx in txs:
self.add_tx(tx)
def remove_tx(self, tx: Transaction):
self.pool.pop(tx.tx_hash)
def remove_txs(self, txs: List[Transaction]):
for tx in txs:
self.remove_tx(tx)
def __len__(self):
return len(self.pool)
@dataclass
class Block():
txs:Dict[int, Transaction]
def add_tx(self, tx: Transaction):
self.txs[tx.tx_hash] = tx
def add_txs(self, txs: List[Transaction]):
for tx in txs:
self.add_tx(tx)
def print_dataframe(self) -> pd.DataFrame:
_df = pd.DataFrame(
[
[tx.gas_used, tx.fee_cap, tx.tip]
for tx in self.txs
],
columns=['gas_used', 'fee_cap', 'tip'],
index=range(len(self.txs))
)
_df.index.name = "tx"
return _df
class Blockchain:
def __init__(self, blocks: List[Block]=None):
self.blocks:List[Block] = []
if blocks:
self.add_blocks(blocks)
def add_block(self, block: Block):
self.blocks.append(block)
def add_blocks(self, blocks: List[Block]):
for block in blocks:
self.add_block(block)
def get_last_block(self):
return self.blocks[-1]
def compute_stats(self) -> pd.DataFrame:
# compute total gas used, total gas premium, total fee cap, average gas premium, average fee cap
_df = pd.DataFrame(
0.,
columns=['tot_gas_used', 'tot_fee_cap', 'tot_tips', 'avg_fee_cap', 'avg_tips', 'gas_target'],
index=range(len(self.blocks))
)
for b, block in enumerate(self.blocks):
num_tx = float(len(block.txs))
tot_gas_used, tot_fee_cap, tot_tips = np.sum(
[
[tx.gas_used, tx.fee_cap, tx.tip]
for tx in block.txs
],
axis=0
)
_df.iloc[b,:] = np.array(
[
tot_gas_used, tot_fee_cap, tot_tips,
tot_fee_cap/num_tx, tot_tips/num_tx, PROTOCOL_CONSTANTS["TARGET_GAS_USED"]
]
)
return _df
def create_demand(
num_txs: int,
fee_cap_range:Tuple[float]=(-0.1, 0.1),
max_tip_pct:float=0.1,
variable_gas:bool=True
) -> List[Transaction]:
# these are levels that need to be scaled by the price/base fee of the specific transaction fee model
fee_caps = 1. + np.random.normal(fee_cap_range[0], fee_cap_range[1], num_txs)
#fee_caps = (1. + np.random.uniform(fee_cap_range[0], fee_cap_range[1], num_txs))
tip = np.random.uniform(0., max_tip_pct, num_txs) # 0.1 is the max gas premium factor
if variable_gas:
return _create_demand_variable_gas(fee_caps, tip)
else:
return _create_demand_const_gas(fee_caps, tip)
def _create_demand_const_gas(fee_caps:np.ndarray, tip:np.ndarray) -> List[Transaction]:
demand: List[Transaction] = []
gas_used = np.mean([ # 73_850 gas limit
21_000 * 0.3, # eth transfer
45_000 * 0.3, # erc20 transfer
50_000 * 0.1, # token approval
200_000 * 0.2, # token swap
150_000 * 0.03, # NFT (ERC721) minting
75_000* 0.03, # NFT transfer
120_000 * 0.03, # NFT (ERC1155) minting
500_000 * 0.01, # smart contract deployment
])
for fc, tp in zip(fee_caps, tip):
tx = Transaction(
gas_used = gas_used,
fee_cap = fc,
tip= tp
)
demand.append(tx)
return demand
def _create_demand_variable_gas(fee_caps:np.ndarray, tip:np.ndarray) -> List[Transaction]:
demand: List[Transaction] = []
gas_used = np.random.choice(
[
21_000, # eth transfer
45_000, # erc20 transfer
50_000, # token approval
200_000, # token swap
150_000, # NFT (ERC721) minting
75_000, # NFT transfer
120_000, # NFT (ERC1155) minting
500_000, # smart contract deployment
],
p=(0.3, 0.3, 0.1, 0.2, 0.03, 0.03, 0.03, 0.01),
size=len(fee_caps)
)
for gu, fc, tp in zip(gas_used, fee_caps, tip):
tx = Transaction(
gas_used = gu,
fee_cap = fc,
tip = tp
)
demand.append(tx)
return demand
class TransactionFeeMechanism:
def __init__(self):
self.price:List[float] = []
self.price.append(PROTOCOL_CONSTANTS["INITIAL_BASEFEE"])
def update_price(self, blockchain:Blockchain):
raise NotImplementedError
def get_current_price(self) -> float:
return self.price[-1]
def scale_demand(self, demand:List[Transaction]) -> List[Transaction]:
cur_price:float = self.get_current_price()
scaled_demand = copy.deepcopy(demand)
for tx in scaled_demand:
tx.fee_cap *= cur_price
tx.tip *= cur_price
return scaled_demand
def _select_from_sorted_txs(
self,
sorted_txs:List[Transaction],
stop_below_gas_limit:bool=False,
scale_block_size:float=1.0, purge_after:int=4
) -> Tuple[List[Transaction], List[Transaction]]:
# select transactions so that the sum of gas used is less than the block gas limit
selected_txs_idx = 0
to_be_purged_txs_idx = 0
gas_used = 0
# introduce some randomness in the selection in case there are too many transactions
# this is to simulate the fact that miners may not always select the most profitable transactions
# this increases or decreases the number of transactions selected based on the stop_below_gas_limit flag,
# which is also randomly selected
fac = 1.0 + (1. - 2.*stop_below_gas_limit) * scale_block_size
for tx in sorted_txs:
if gas_used + tx.gas_used < fac * PROTOCOL_CONSTANTS["TARGET_GAS_USED"]:
selected_txs_idx += 1
if gas_used + tx.gas_used < purge_after * PROTOCOL_CONSTANTS["MAX_GAS_ALLOWED"]: # enough space for X full blocks
to_be_purged_txs_idx += 1
else:
break
gas_used += tx.gas_used
return sorted_txs[:selected_txs_idx], sorted_txs[to_be_purged_txs_idx:]
def select_transactions(
self, mempool: MemPool, stop_below_gas_limit:bool=False, scale_block_size:float=1.0, purge_after:int=4
) -> Tuple[List[Transaction], List[Transaction]]:
raise NotImplementedError
def total_paid_fees(self, txs: List[Transaction]) -> float:
raise NotImplementedError
class EIP1559(TransactionFeeMechanism):
def __init__(self):
self.base_factor:float = 1./8.
self.base_fee:List[float] = []
self.base_fee.append(PROTOCOL_CONSTANTS["INITIAL_BASEFEE"])
super().__init__()
def update_price(self, blockchain:Blockchain) -> float:
base_fee:float = self.base_fee[-1]
last_txs:List[Transaction] = blockchain.get_last_block().txs
gas_used:float = sum([tx.gas_used for tx in last_txs])
delta:float = (gas_used - PROTOCOL_CONSTANTS["TARGET_GAS_USED"])/PROTOCOL_CONSTANTS["TARGET_GAS_USED"]
self.base_fee.append(
base_fee * np.exp(delta * self.base_factor) # (1. + delta * self.base_factor)
)
sum_price:float = sum([min(tx.fee_cap, base_fee+tx.tip) for tx in last_txs])
self.price.append(
np.clip(
sum_price/float(len(last_txs)),
a_min=PROTOCOL_CONSTANTS["MIN_PRICE"],
a_max=PROTOCOL_CONSTANTS["MAX_PRICE"]
)
)
def select_transactions(
self, mempool: MemPool, stop_below_gas_limit:bool=False, scale_block_size:float=1.0, purge_after:int=4
) -> Tuple[List[Transaction], List[Transaction]]:
base_fee:float = self.base_fee[-1]
# Sort transactions by fee cap
sorted_txs:List[Transaction] = sorted(
mempool.pool.values(),
key=lambda tx: min(tx.fee_cap, base_fee+tx.tip) * tx.gas_used,
reverse=True
)
return self._select_from_sorted_txs(
sorted_txs,
stop_below_gas_limit=stop_below_gas_limit,
scale_block_size=scale_block_size,
purge_after=purge_after
)
def total_paid_fees(self, txs: List[Transaction]) -> float:
base_fee:float = self.base_fee[-1]
return sum([min(tx.fee_cap, base_fee+tx.tip) * tx.gas_used for tx in txs])
class StableFee(TransactionFeeMechanism):
def __init__(self):
super().__init__()
def update_price(self, blockchain:Blockchain):
last_txs:List[Transaction] = blockchain.get_last_block().txs
new_price:float = np.min([tx.fee_cap for tx in last_txs])
self.price.append(
np.clip(
new_price,
a_min=PROTOCOL_CONSTANTS["MIN_PRICE"],
a_max=PROTOCOL_CONSTANTS["MAX_PRICE"]
)
)
def select_transactions(
self, mempool: MemPool, stop_below_gas_limit:bool=False, scale_block_size:float=1.0, purge_after:int=4
) -> Tuple[List[Transaction], List[Transaction]]:
# Sort transactions by fee cap
sorted_txs = sorted(
mempool.pool.values(),
key=lambda tx: tx.fee_cap * tx.gas_used,
reverse=True
)
return self._select_from_sorted_txs(
sorted_txs,
stop_below_gas_limit=stop_below_gas_limit,
scale_block_size=scale_block_size,
purge_after=purge_after
)
def total_paid_fees(self, txs: List[Transaction]) -> float:
price:float = self.get_current_price()
return sum([price * tx.gas_used for tx in txs])