以太坊数据导出全攻略,轻松将链上数据转为CSV格式
在区块链数据分析、智能合约审计或DeFi研究等场景中,将以太坊链上数据导出为结构化的CSV格式,是提升数据处理效率的关键一步,CSV(逗号分隔值)文件因其通用性强、兼容Excel等工具,成为数据分析师和开发者的首选格式,本文将详细介绍以太坊数据导出的核心方法、具体步骤及实用技巧,助你轻松实现链上数据的结构化存储与后续分析。
明确导出需求:你需要什么以太坊数据
在开始导出前,首先需明确数据类型和数据范围,这直接影响工具选择和操作复杂度,常见的以太坊数据包括:
交易数据
- 基础信息:交易哈希(hash)、区块高度(block number)、时间戳(timestamp)、发送方(from)、接收方(to)、交易金额(value,单位为wei)、 Gas消耗(gas used)、Gas价格(gas price)、交易费(transaction fee,即value*gas price)。
- 高级信息:输入数据(input data,包含合约交互参数)、nonce、区块哈希(block hash)等。
账户数据
- 账户余额:指定地址的以太币余额(历史或实时)。
- 交易记录:地址作为发送方或接收方的所有交易列表。
合约数据
- 合约状态变量:特定合约的存储值(如ERC20代币的totalSupply、用户余额)。
- 事件日志(Event Logs):合约触发的事件(如Transfer事件、Deposit事件),包含事件名称、参数、区块号等。
区块数据
- 区块头信息:区块高度、时间戳、矿工地址(miner)、交易数量(transaction count)、难度(difficulty)等。
导出方法全解析:从入门到进阶
根据数据量和需求复杂度,可选择以下三种主流方法,覆盖从小白到开发者的不同场景。
使用区块链浏览器(适合新手,零代码基础)
区块链浏览器(如Etherscan、Ethtx、BscScan)是最直观的数据导出工具,适合获取少量特定地址或交易的数据。
操作步骤(以Etherscan为例):
-
选择数据类型:
- 导出交易记录:打开Etherscan,在搜索框输入地址,点击“Transactions”标签页,右上角选择“Export”→“Download CSV”。
- 导出事件日志:进入合约页面(如某ERC20代币合约),点击“Events”标签页,筛选事件类型(如“Transfer”),点击“Export”→“Download CSV”。
- 导出区块数据:搜索区块高度(如“15000000”),进入区块详情页,点击“Export”→“Download CSV”。
-
筛选数据范围:
在导出前,可通过时间筛选、交易类型(如成功/失败)、事件参数等条件缩小数据范围,避免导出无用数据。
优点:
- 无需编程,操作简单,适合非技术人员。
- 数据已脱敏处理,可直接查看基础信息。
缺点:
- 导出量受限(单次最多约1万条数据)。
- 无法自定义字段,难以获取复杂合约数据(如动态状态变量)。
借助第三方API(适合开发者,灵活高效)
通过以太坊节点服务商的API(如Infura、Alchemy、Moralis)获取数据,再通过代码处理为CSV格式,适合批量导出和自定义字段。
核心工具:
- API服务:Infura(免费额度)、Alchemy(免费额度)、Moralis(支持Web3数据聚合)。
- 编程语言:Python(推荐,库丰富)、JavaScript(Node.js)。

Python实现示例(导出指定地址的交易记录):
import requests
import csv
from datetime import datetime
# 配置API参数(以Infura为例)
API_KEY = "YOUR_INFURA_API_KEY"
address = "0x742d35Cc6634C0532925a3b844Bc9e7595f8e5a8" # 示例地址
url = f"https://mainnet.infura.io/v3/{API_KEY}"
# 构造请求参数(获取地址交易记录)
params = {
"jsonrpc": "2.0",
"method": "eth_getLogs",
"params": [
{
"fromBlock": "0x0", # 起始区块(可设为"latest"或具体高度)
"toBlock": "latest", # 结束区块
"address": address, # 目标地址(空则查询所有交易)
"topics": [] # 主题(用于筛选事件日志,空则查询所有交易)
}
],
"id": 1
}
# 发送请求并解析数据
response = requests.post(url, json=params)
transactions = response.json().get("result", [])
# 提取交易关键信息
data_to_export = []
for tx in transactions:
tx_hash = tx.get("transactionHash")
block_num = int(tx.get("blockNumber"), 16)
timestamp = datetime.fromtimestamp(int(tx.get("timeStamp"), 16)).strftime("%Y-%m-%d %H:%M:%S")
from_addr = tx.get("from")
to_addr = tx.get("to")
value = int(tx.get("value"), 16) / 1e18 # wei转ETH
data_to_export.append([tx_hash, block_num, timestamp, from_addr, to_addr, value])
# 写入CSV文件
with open("ethereum_transactions.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["Transaction Hash", "Block Number", "Timestamp", "From", "To", "Value (ETH)"]) # 表头
writer.writerows(data_to_export)
print("数据导出成功!文件名:ethereum_transactions.csv")
操作步骤:
- 注册API服务商(如Infura),创建项目获取API Key。
- 安装Python依赖库:
pip install requests。 - 修改代码中的API Key、地址等参数,运行脚本即可生成CSV文件。
优点:
- 可自定义字段(如添加Gas价格、输入数据),支持批量导出。
- 可结合其他库(如Pandas)进行数据清洗和分析。
缺点:
- 需基础编程能力,API调用频率受限(免费额度下有QPS限制)。
本地同步节点+数据库查询(适合高级用户,数据自主可控)
对于需要全量数据或高频查询的场景,可通过本地同步以太坊全节点(如Geth、Nethermind),将数据存入数据库(如PostgreSQL、MongoDB),再通过SQL查询导出CSV。
核心工具:
- 以太坊客户端:Geth(命令行工具)、Nethermind(.NET客户端)。
- 数据库:PostgreSQL(支持JSON字段,适合存储链上数据)、MongoDB(文档型数据库,适合存储事件日志)。
- 查询工具:pgAdmin(PostgreSQL管理工具)、MongoDB Compass。
操作步骤(以Geth+PostgreSQL为例):
-
同步以太坊节点:
安装Geth,执行同步命令(首次同步需较长时间,建议SSD硬盘):geth --syncmode full --http --http.addr 0.0.0.0 --http.port 8545
同步完成后,通过HTTP API访问节点数据。
-
监听数据并写入数据库:
使用Python脚本监听新区块,解析交易和事件数据,存入PostgreSQL:import requests import psycopg2 import time # 连接PostgreSQL conn = psycopg2.connect( dbname="ethereum", user="postgres", password="your_password", host="localhost" ) cur = conn.cursor() # 创建交易表 cur.execute(""" CREATE TABLE IF NOT EXISTS transactions ( tx_hash VARCHAR(66) PRIMARY KEY, block_num BIGINT, timestamp TIMESTAMP, from_addr VARCHAR(42), to_addr VARCHAR(42), value NUMERIC(36, 18) ) """) conn.commit() # 监听新区块(通过轮询最新区块号) last_block = 0 while True: url = "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY" params = {"jsonrpc": "2.0", "method": "eth_blockNumber", "params": [], "id": 1} response = requests.post(url, json=params) current_block = int(response.json().get("result"), 16) if current_block > last_block: for block_num in range(last_block + 1, current_block + 1): block_params = { "jsonrpc": "2