Building a Production eth_getLogs Pipeline: Indexing Blockchain Events at Scale
eth_getLogs is the backbone of blockchain data infrastructure. Every indexer, analytics platform, portfolio tracker, and protocol dashboard uses it. A single call returns all events matching a filter across a block range. Powerful, but unreliable without the right patterns around it.
This guide builds a production pipeline: chunked queries, deduplication, reorg handling, retry logic, and storage.
Why eth_getLogs Needs a Pipeline
A raw eth_getLogs call works fine for small block ranges. At production scale, three problems appear:
1. Timeouts: Large block ranges (100,000+ blocks) take too long. Providers time out or cap at a maximum range.
2. Reorgs: A block you already indexed may be replaced by a competing block. Your data becomes wrong.
3. Rate limits: Continuous high-frequency queries drain rate limits and budgets on compute-unit-priced providers.
A proper pipeline handles all three automatically.
The Basic eth_getLogs Call
import { ethers } from 'ethers';
const provider = new ethers.JsonRpcProvider(
'https://eu.endpoints.matrixed.link/rpc/ethereum?auth=YOUR_KEY'
);
// Get all Transfer events for USDC in the last 1,000 blocks
const currentBlock = await provider.getBlockNumber();
const transferTopic = ethers.id('Transfer(address,address,uint256)');
const logs = await provider.getLogs({
address: USDC_ADDRESS,
topics: [transferTopic],
fromBlock: currentBlock - 1000,
toBlock: 'latest',
});
console.log(`Found ${logs.length} Transfer events`);
This works for small ranges. For historical indexing or continuous pipelines, use everything below.
Chunked Queries
Most providers limit block ranges to 2,000–10,000 blocks per query. For larger ranges, chunk:
async function getLogsChunked(filter, fromBlock, toBlock, chunkSize = 2000) {
const allLogs = [];
let currentFrom = fromBlock;
while (currentFrom <= toBlock) {
const currentTo = Math.min(currentFrom + chunkSize - 1, toBlock);
try {
const logs = await provider.getLogs({
...filter,
fromBlock: currentFrom,
toBlock: currentTo,
});
allLogs.push(...logs);
console.log(`Fetched blocks ${currentFrom}-${currentTo}: ${logs.length} logs`);
currentFrom = currentTo + 1;
} catch (error) {
if (error.message?.includes('block range') || error.message?.includes('too many')) {
// Provider rejected the range — halve the chunk size and retry
chunkSize = Math.floor(chunkSize / 2);
if (chunkSize < 10) throw new Error('Chunk size too small — query cannot complete');
console.warn(`Range too large. Reducing chunk to ${chunkSize} blocks`);
continue; // Retry same range with smaller chunk
}
throw error;
}
}
return allLogs;
}
// Index 6 months of USDC transfers
const sixMonthsAgo = currentBlock - (6 * 30 * 24 * 300); // ~6 months at 12s/block
const historicalLogs = await getLogsChunked(
{ address: USDC_ADDRESS, topics: [transferTopic] },
sixMonthsAgo,
currentBlock
);
Retry with Exponential Backoff
Transient failures (network blips, provider timeouts) are normal. Retry automatically:
async function getLogsWithRetry(filter, maxRetries = 3, baseDelay = 1000) {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await provider.getLogs(filter);
} catch (error) {
if (attempt === maxRetries) throw error;
const isRetryable =
error.message?.includes('timeout') ||
error.message?.includes('rate limit') ||
error.code === 'NETWORK_ERROR' ||
error.code === 'SERVER_ERROR';
if (!isRetryable) throw error;
const delay = baseDelay * Math.pow(2, attempt);
console.warn(`getLogs failed (attempt ${attempt + 1}/${maxRetries + 1}). Retrying in ${delay}ms`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
The Full Indexer Class
class EventIndexer {
constructor(config) {
this.provider = config.provider;
this.wsProvider = config.wsProvider;
this.filter = config.filter;
this.chunkSize = config.chunkSize || 2000;
this.storage = config.storage; // Database adapter
this.lastIndexedBlock = null;
this.confirmations = config.confirmations || 12; // Wait for 12 blocks before finalizing
}
async start(fromBlock) {
// Load last indexed block from storage
this.lastIndexedBlock = await this.storage.getLastIndexedBlock() || fromBlock;
console.log(`Starting indexer from block ${this.lastIndexedBlock}`);
// Catch up on historical data
await this.catchUp();
// Switch to live mode
await this.subscribeLive();
}
async catchUp() {
const currentBlock = await this.provider.getBlockNumber();
const safeBlock = currentBlock - this.confirmations; // Only index confirmed blocks
if (this.lastIndexedBlock >= safeBlock) {
console.log('Already caught up');
return;
}
console.log(`Catching up: ${this.lastIndexedBlock} → ${safeBlock}`);
const logs = await getLogsChunked(
this.filter,
this.lastIndexedBlock + 1,
safeBlock,
this.chunkSize
);
await this.processLogs(logs);
this.lastIndexedBlock = safeBlock;
await this.storage.saveLastIndexedBlock(safeBlock);
console.log(`Caught up to block ${safeBlock}`);
}
async subscribeLive() {
// Use WebSocket for real-time new blocks
this.wsProvider.on('block', async (blockNumber) => {
const safeBlock = blockNumber - this.confirmations;
if (safeBlock <= this.lastIndexedBlock) return;
// Index the new confirmed blocks
const logs = await getLogsWithRetry({
...this.filter,
fromBlock: this.lastIndexedBlock + 1,
toBlock: safeBlock,
});
if (logs.length > 0) {
await this.processLogs(logs);
}
this.lastIndexedBlock = safeBlock;
await this.storage.saveLastIndexedBlock(safeBlock);
});
console.log('Live indexing started');
}
async processLogs(logs) {
if (logs.length === 0) return;
// Deduplicate by transaction hash + log index
const unique = deduplicateLogs(logs);
// Parse logs
const parsed = unique.map(log => this.parseLog(log)).filter(Boolean);
// Store in batches
await this.storage.batchInsert(parsed);
console.log(`Processed ${parsed.length} events`);
}
parseLog(log) {
try {
const parsed = this.filter.interface?.parseLog(log);
return {
blockNumber: log.blockNumber,
transactionHash: log.transactionHash,
logIndex: log.index,
address: log.address,
eventName: parsed?.name,
args: parsed?.args,
raw: log,
};
} catch {
return null;
}
}
}
Deduplication
Logs can appear duplicates during reorgs. Deduplicate by the combination of transactionHash + logIndex:
function deduplicateLogs(logs) {
const seen = new Set();
return logs.filter(log => {
const key = `${log.transactionHash}-${log.index}`;
if (seen.has(key)) return false;
seen.add(key);
return true;
});
}
Reorg Handling
A reorg means blocks you indexed get replaced. Transactions in those blocks may be removed, reordered, or changed. To handle reorgs correctly:
async function handleReorg(provider, storage, safeDepth = 12) {
const currentBlock = await provider.getBlockNumber();
// Re-verify the last N blocks
for (let i = safeDepth; i >= 0; i--) {
const blockNumber = currentBlock - i;
const block = await provider.getBlock(blockNumber);
const storedHash = await storage.getBlockHash(blockNumber);
if (storedHash && storedHash !== block.hash) {
console.warn(`Reorg detected at block ${blockNumber}`);
// Remove all events from the reorged block onwards
await storage.deleteFromBlock(blockNumber);
// Re-index from the reorg point
const logs = await getLogsChunked(filter, blockNumber, currentBlock - safeDepth);
await storage.batchInsert(logs.map(parseLog));
await storage.saveBlockHash(blockNumber, block.hash);
break;
}
}
}
Simpler approach: Only index blocks with 12+ confirmations. The this.confirmations = 12 setting in the indexer above does this automatically. Blocks older than 12 confirmations have a statistically negligible reorg probability.
Topic Filtering
Narrow your queries with specific topic filters to reduce result set size:
const transferTopic = ethers.id('Transfer(address,address,uint256)');
// All Transfer events
const allTransfers = { topics: [transferTopic] };
// Only transfers FROM a specific address
const fromAddress = ethers.zeroPadValue('0xYourAddress', 32);
const transfersFrom = { topics: [transferTopic, fromAddress] };
// Only transfers TO a specific address
const toAddress = ethers.zeroPadValue('0xYourAddress', 32);
const transfersTo = { topics: [transferTopic, null, toAddress] };
// Transfers between two specific addresses
const exactTransfer = { topics: [transferTopic, fromAddress, toAddress] };
// Multiple possible senders (OR logic)
const fromEither = {
topics: [
transferTopic,
[
ethers.zeroPadValue(ADDRESS_A, 32),
ethers.zeroPadValue(ADDRESS_B, 32),
]
]
};
Storage Patterns
// Minimal storage adapter interface
class PostgresStorage {
async getLastIndexedBlock() {
const result = await db.query(
'SELECT block_number FROM indexer_state WHERE id = 1'
);
return result.rows[0]?.block_number || null;
}
async saveLastIndexedBlock(blockNumber) {
await db.query(
'INSERT INTO indexer_state (id, block_number) VALUES (1, $1) ON CONFLICT (id) DO UPDATE SET block_number = $1',
[blockNumber]
);
}
async batchInsert(events) {
if (events.length === 0) return;
// Use ON CONFLICT DO NOTHING for idempotency
const values = events.map((e, i) =>
`($${i * 5 + 1}, $${i * 5 + 2}, $${i * 5 + 3}, $${i * 5 + 4}, $${i * 5 + 5})`
).join(',');
await db.query(
`INSERT INTO events (block_number, tx_hash, log_index, event_name, args)
VALUES ${values}
ON CONFLICT (tx_hash, log_index) DO NOTHING`,
events.flatMap(e => [e.blockNumber, e.transactionHash, e.logIndex, e.eventName, JSON.stringify(e.args)])
);
}
}
FAQ
What is the maximum block range for eth_getLogs?
It varies by provider. Most managed providers cap at 2,000–10,000 blocks per query. Some providers (including those running Nethermind with Log Index enabled) can handle much larger ranges. The chunked query pattern in this guide handles any limit automatically.
How do I index from the genesis block?
Use the chunked query pattern with fromBlock: 0. For Ethereum mainnet this means indexing 20+ million blocks, which takes hours to days depending on event frequency. Start with a more recent block if you only need recent history.
Can I run multiple indexers in parallel?
Yes. Shard by block range or by contract address. One indexer handles blocks 0-5,000,000, another handles 5,000,001-10,000,000, etc. Merge results in your storage layer. Ensure your deduplication handles overlapping ranges at shard boundaries.
Is eth_getLogs affected by compute unit pricing?
Yes. On providers using compute unit pricing, eth_getLogs typically carries a significantly higher CU weight than simple reads, often 75 CUs or more per call depending on result size. A continuous indexer running many queries per hour can exhaust plans quickly on CU-priced providers. BoltRPC pricing is straightforward — fixed monthly plans with no surprise bills regardless of how heavily you use eth_getLogs.
How do I handle topics for anonymous events?
Anonymous events have no event signature topic. The first topic slot is empty. Filter by topics: [null, ...] and match by the log’s data field instead. Anonymous events are rare in modern contracts.
BoltRPC provides the eth_getLogs access your indexer needs. Fixed monthly plans mean predictable costs as your indexer scales.
Start your free 2-week trial: trial.boltrpc.io
Related: The Graph vs Direct RPC | Optimizing RPC Calls for DeFi