Solana

Solana Beta is live. Try BoltRPC Solana endpoints free - start your trial now.

Building a Production eth_getLogs Pipeline: Indexing Blockchain Events at Scale

How to build a reliable, production-grade eth_getLogs pipeline. Covers chunking, deduplication, reorg handling, retry logic and storage patterns for indexing blockchain events at scale.

BoltRPC
BoltRPC Team
8 min read
Building a Production eth_getLogs Pipeline: Indexing Blockchain Events at Scale

Building a Production eth_getLogs Pipeline: Indexing Blockchain Events at Scale

eth_getLogs is the backbone of blockchain data infrastructure. Every indexer, analytics platform, portfolio tracker, and protocol dashboard uses it. A single call returns all events matching a filter across a block range. Powerful, but unreliable without the right patterns around it.

This guide builds a production pipeline: chunked queries, deduplication, reorg handling, retry logic, and storage.


Why eth_getLogs Needs a Pipeline

A raw eth_getLogs call works fine for small block ranges. At production scale, three problems appear:

1. Timeouts: Large block ranges (100,000+ blocks) take too long. Providers time out or cap at a maximum range.

2. Reorgs: A block you already indexed may be replaced by a competing block. Your data becomes wrong.

3. Rate limits: Continuous high-frequency queries drain rate limits and budgets on compute-unit-priced providers.

A proper pipeline handles all three automatically.


The Basic eth_getLogs Call

import { ethers } from 'ethers';

const provider = new ethers.JsonRpcProvider(
  'https://eu.endpoints.matrixed.link/rpc/ethereum?auth=YOUR_KEY'
);

// Get all Transfer events for USDC in the last 1,000 blocks
const currentBlock = await provider.getBlockNumber();
const transferTopic = ethers.id('Transfer(address,address,uint256)');

const logs = await provider.getLogs({
  address: USDC_ADDRESS,
  topics: [transferTopic],
  fromBlock: currentBlock - 1000,
  toBlock: 'latest',
});

console.log(`Found ${logs.length} Transfer events`);

This works for small ranges. For historical indexing or continuous pipelines, use everything below.


Chunked Queries

Most providers limit block ranges to 2,000–10,000 blocks per query. For larger ranges, chunk:

async function getLogsChunked(filter, fromBlock, toBlock, chunkSize = 2000) {
  const allLogs = [];
  let currentFrom = fromBlock;

  while (currentFrom <= toBlock) {
    const currentTo = Math.min(currentFrom + chunkSize - 1, toBlock);

    try {
      const logs = await provider.getLogs({
        ...filter,
        fromBlock: currentFrom,
        toBlock: currentTo,
      });

      allLogs.push(...logs);
      console.log(`Fetched blocks ${currentFrom}-${currentTo}: ${logs.length} logs`);
      currentFrom = currentTo + 1;

    } catch (error) {
      if (error.message?.includes('block range') || error.message?.includes('too many')) {
        // Provider rejected the range — halve the chunk size and retry
        chunkSize = Math.floor(chunkSize / 2);
        if (chunkSize < 10) throw new Error('Chunk size too small — query cannot complete');
        console.warn(`Range too large. Reducing chunk to ${chunkSize} blocks`);
        continue; // Retry same range with smaller chunk
      }
      throw error;
    }
  }

  return allLogs;
}

// Index 6 months of USDC transfers
const sixMonthsAgo = currentBlock - (6 * 30 * 24 * 300); // ~6 months at 12s/block
const historicalLogs = await getLogsChunked(
  { address: USDC_ADDRESS, topics: [transferTopic] },
  sixMonthsAgo,
  currentBlock
);

Retry with Exponential Backoff

Transient failures (network blips, provider timeouts) are normal. Retry automatically:

async function getLogsWithRetry(filter, maxRetries = 3, baseDelay = 1000) {
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await provider.getLogs(filter);
    } catch (error) {
      if (attempt === maxRetries) throw error;

      const isRetryable =
        error.message?.includes('timeout') ||
        error.message?.includes('rate limit') ||
        error.code === 'NETWORK_ERROR' ||
        error.code === 'SERVER_ERROR';

      if (!isRetryable) throw error;

      const delay = baseDelay * Math.pow(2, attempt);
      console.warn(`getLogs failed (attempt ${attempt + 1}/${maxRetries + 1}). Retrying in ${delay}ms`);
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
}

The Full Indexer Class

class EventIndexer {
  constructor(config) {
    this.provider = config.provider;
    this.wsProvider = config.wsProvider;
    this.filter = config.filter;
    this.chunkSize = config.chunkSize || 2000;
    this.storage = config.storage; // Database adapter
    this.lastIndexedBlock = null;
    this.confirmations = config.confirmations || 12; // Wait for 12 blocks before finalizing
  }

  async start(fromBlock) {
    // Load last indexed block from storage
    this.lastIndexedBlock = await this.storage.getLastIndexedBlock() || fromBlock;

    console.log(`Starting indexer from block ${this.lastIndexedBlock}`);

    // Catch up on historical data
    await this.catchUp();

    // Switch to live mode
    await this.subscribeLive();
  }

  async catchUp() {
    const currentBlock = await this.provider.getBlockNumber();
    const safeBlock = currentBlock - this.confirmations; // Only index confirmed blocks

    if (this.lastIndexedBlock >= safeBlock) {
      console.log('Already caught up');
      return;
    }

    console.log(`Catching up: ${this.lastIndexedBlock} → ${safeBlock}`);

    const logs = await getLogsChunked(
      this.filter,
      this.lastIndexedBlock + 1,
      safeBlock,
      this.chunkSize
    );

    await this.processLogs(logs);
    this.lastIndexedBlock = safeBlock;
    await this.storage.saveLastIndexedBlock(safeBlock);

    console.log(`Caught up to block ${safeBlock}`);
  }

  async subscribeLive() {
    // Use WebSocket for real-time new blocks
    this.wsProvider.on('block', async (blockNumber) => {
      const safeBlock = blockNumber - this.confirmations;

      if (safeBlock <= this.lastIndexedBlock) return;

      // Index the new confirmed blocks
      const logs = await getLogsWithRetry({
        ...this.filter,
        fromBlock: this.lastIndexedBlock + 1,
        toBlock: safeBlock,
      });

      if (logs.length > 0) {
        await this.processLogs(logs);
      }

      this.lastIndexedBlock = safeBlock;
      await this.storage.saveLastIndexedBlock(safeBlock);
    });

    console.log('Live indexing started');
  }

  async processLogs(logs) {
    if (logs.length === 0) return;

    // Deduplicate by transaction hash + log index
    const unique = deduplicateLogs(logs);

    // Parse logs
    const parsed = unique.map(log => this.parseLog(log)).filter(Boolean);

    // Store in batches
    await this.storage.batchInsert(parsed);
    console.log(`Processed ${parsed.length} events`);
  }

  parseLog(log) {
    try {
      const parsed = this.filter.interface?.parseLog(log);
      return {
        blockNumber: log.blockNumber,
        transactionHash: log.transactionHash,
        logIndex: log.index,
        address: log.address,
        eventName: parsed?.name,
        args: parsed?.args,
        raw: log,
      };
    } catch {
      return null;
    }
  }
}

Deduplication

Logs can appear duplicates during reorgs. Deduplicate by the combination of transactionHash + logIndex:

function deduplicateLogs(logs) {
  const seen = new Set();
  return logs.filter(log => {
    const key = `${log.transactionHash}-${log.index}`;
    if (seen.has(key)) return false;
    seen.add(key);
    return true;
  });
}

Reorg Handling

A reorg means blocks you indexed get replaced. Transactions in those blocks may be removed, reordered, or changed. To handle reorgs correctly:

async function handleReorg(provider, storage, safeDepth = 12) {
  const currentBlock = await provider.getBlockNumber();

  // Re-verify the last N blocks
  for (let i = safeDepth; i >= 0; i--) {
    const blockNumber = currentBlock - i;
    const block = await provider.getBlock(blockNumber);
    const storedHash = await storage.getBlockHash(blockNumber);

    if (storedHash && storedHash !== block.hash) {
      console.warn(`Reorg detected at block ${blockNumber}`);

      // Remove all events from the reorged block onwards
      await storage.deleteFromBlock(blockNumber);

      // Re-index from the reorg point
      const logs = await getLogsChunked(filter, blockNumber, currentBlock - safeDepth);
      await storage.batchInsert(logs.map(parseLog));
      await storage.saveBlockHash(blockNumber, block.hash);

      break;
    }
  }
}

Simpler approach: Only index blocks with 12+ confirmations. The this.confirmations = 12 setting in the indexer above does this automatically. Blocks older than 12 confirmations have a statistically negligible reorg probability.


Topic Filtering

Narrow your queries with specific topic filters to reduce result set size:

const transferTopic = ethers.id('Transfer(address,address,uint256)');

// All Transfer events
const allTransfers = { topics: [transferTopic] };

// Only transfers FROM a specific address
const fromAddress = ethers.zeroPadValue('0xYourAddress', 32);
const transfersFrom = { topics: [transferTopic, fromAddress] };

// Only transfers TO a specific address
const toAddress = ethers.zeroPadValue('0xYourAddress', 32);
const transfersTo = { topics: [transferTopic, null, toAddress] };

// Transfers between two specific addresses
const exactTransfer = { topics: [transferTopic, fromAddress, toAddress] };

// Multiple possible senders (OR logic)
const fromEither = {
  topics: [
    transferTopic,
    [
      ethers.zeroPadValue(ADDRESS_A, 32),
      ethers.zeroPadValue(ADDRESS_B, 32),
    ]
  ]
};

Storage Patterns

// Minimal storage adapter interface
class PostgresStorage {
  async getLastIndexedBlock() {
    const result = await db.query(
      'SELECT block_number FROM indexer_state WHERE id = 1'
    );
    return result.rows[0]?.block_number || null;
  }

  async saveLastIndexedBlock(blockNumber) {
    await db.query(
      'INSERT INTO indexer_state (id, block_number) VALUES (1, $1) ON CONFLICT (id) DO UPDATE SET block_number = $1',
      [blockNumber]
    );
  }

  async batchInsert(events) {
    if (events.length === 0) return;

    // Use ON CONFLICT DO NOTHING for idempotency
    const values = events.map((e, i) =>
      `($${i * 5 + 1}, $${i * 5 + 2}, $${i * 5 + 3}, $${i * 5 + 4}, $${i * 5 + 5})`
    ).join(',');

    await db.query(
      `INSERT INTO events (block_number, tx_hash, log_index, event_name, args)
       VALUES ${values}
       ON CONFLICT (tx_hash, log_index) DO NOTHING`,
      events.flatMap(e => [e.blockNumber, e.transactionHash, e.logIndex, e.eventName, JSON.stringify(e.args)])
    );
  }
}

FAQ

What is the maximum block range for eth_getLogs?

It varies by provider. Most managed providers cap at 2,000–10,000 blocks per query. Some providers (including those running Nethermind with Log Index enabled) can handle much larger ranges. The chunked query pattern in this guide handles any limit automatically.

How do I index from the genesis block?

Use the chunked query pattern with fromBlock: 0. For Ethereum mainnet this means indexing 20+ million blocks, which takes hours to days depending on event frequency. Start with a more recent block if you only need recent history.

Can I run multiple indexers in parallel?

Yes. Shard by block range or by contract address. One indexer handles blocks 0-5,000,000, another handles 5,000,001-10,000,000, etc. Merge results in your storage layer. Ensure your deduplication handles overlapping ranges at shard boundaries.

Is eth_getLogs affected by compute unit pricing?

Yes. On providers using compute unit pricing, eth_getLogs typically carries a significantly higher CU weight than simple reads, often 75 CUs or more per call depending on result size. A continuous indexer running many queries per hour can exhaust plans quickly on CU-priced providers. BoltRPC pricing is straightforward — fixed monthly plans with no surprise bills regardless of how heavily you use eth_getLogs.

How do I handle topics for anonymous events?

Anonymous events have no event signature topic. The first topic slot is empty. Filter by topics: [null, ...] and match by the log’s data field instead. Anonymous events are rare in modern contracts.


BoltRPC provides the eth_getLogs access your indexer needs. Fixed monthly plans mean predictable costs as your indexer scales.

Start your free 2-week trial: trial.boltrpc.io

Related: The Graph vs Direct RPC | Optimizing RPC Calls for DeFi

Frequently asked questions

Ready to build with high-performance RPC?

Start your free trial today. No credit card required. Access 20+ networks instantly.

Disclaimer: The content in this article is for informational purposes only and does not constitute financial, legal, or technical advice. Code examples and configurations are provided as-is. Always verify information with official documentation and test thoroughly in your own environment before deploying to production.

Continue reading