FortiBlox LogoFortiBlox Docs
NexusgRPC API

gRPC Streaming Guide

Learn how to use gRPC streaming for blocks, transactions, accounts, and slots

gRPC Streaming Guide

FortiBlox gRPC provides powerful streaming capabilities for real-time blockchain data. This guide covers all streaming modes and best practices.

Stream Types

Transaction Streaming

Stream transactions with advanced filtering:

stream.write({
  transactions: {
    'jupiter_swaps': {
      accountInclude: ['JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'],
      vote: false,
      failed: false
    }
  },
  commitment: 'CONFIRMED'
});

stream.on('data', (update) => {
  if (update.transaction) {
    console.log('New transaction:', update.transaction);
  }
});

Block Streaming

Stream complete blocks with transactions:

stream.write({
  blocks: {
    'new_blocks': {
      includeTransactions: true,
      includeAccounts: false,
      includeEntries: false
    }
  }
});

stream.on('data', (update) => {
  if (update.block) {
    console.log(`Block ${update.block.slot}: ${update.block.transactions.length} txs`);
  }
});

Slot Streaming

Track slot progression:

stream.write({
  slots: {
    'slot_updates': {
      filterByCommitment: true
    }
  }
});

stream.on('data', (update) => {
  if (update.slot) {
    console.log(`Slot ${update.slot.slot} (parent: ${update.slot.parent})`);
  }
});

Account Streaming

Monitor account changes:

stream.write({
  accounts: {
    'wallet_updates': {
      account: ['YourWalletAddress...'],
      owner: [] // or filter by owner program
    }
  }
});

stream.on('data', (update) => {
  if (update.account) {
    console.log(`Account ${update.account.account.pubkey}: ${update.account.account.lamports} lamports`);
  }
});

Filtering Strategies

Account Inclusion

Include specific accounts:

{
  transactions: {
    'swap_programs': {
      accountInclude: [
        'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4', // Jupiter
        '9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP'  // Orca
      ],
      vote: false
    }
  }
}

Account Exclusion

Exclude specific accounts:

{
  transactions: {
    'non_vote': {
      accountExclude: [
        'Vote111111111111111111111111111111111111111'
      ]
    }
  }
}

Required Accounts

Transaction must include all specified accounts:

{
  transactions: {
    'token_transfers': {
      accountRequired: [
        'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA',
        'YourTokenMint...'
      ]
    }
  }
}

Vote and Failed Filters

{
  transactions: {
    'successful_only': {
      vote: false,    // Exclude vote transactions
      failed: false   // Exclude failed transactions
    }
  }
}

Commitment Levels

Processed (Fastest)

stream.write({
  transactions: { 'fast': { vote: false } },
  commitment: 'PROCESSED'
});
  • Latency: ~5ms
  • Finality: Not guaranteed (may be rolled back)
  • Use case: Real-time dashboards
stream.write({
  transactions: { 'reliable': { vote: false } },
  commitment: 'CONFIRMED'
});
  • Latency: ~10ms
  • Finality: 99%+ probability
  • Use case: Most applications

Finalized (Most Secure)

stream.write({
  transactions: { 'final': { vote: false } },
  commitment: 'FINALIZED'
});
  • Latency: ~30-60 seconds
  • Finality: 100% guaranteed
  • Use case: Financial applications

Multiple Subscriptions

Subscribe to different event types simultaneously:

stream.write({
  transactions: {
    'swaps': {
      accountInclude: ['JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'],
      vote: false
    },
    'token_mints': {
      accountInclude: ['TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA'],
      vote: false
    }
  },
  slots: {
    'slot_updates': {}
  },
  blocks: {
    'new_blocks': {
      includeTransactions: false
    }
  },
  commitment: 'CONFIRMED'
});

stream.on('data', (update) => {
  if (update.transaction) {
    const sub = update.transaction.subscription;
    console.log(`Transaction from ${sub}:`, update.transaction);
  } else if (update.slot) {
    console.log('Slot update:', update.slot);
  } else if (update.block) {
    console.log('Block update:', update.block);
  }
});

Stream Management

Connection Management

function createStream() {
  const stream = client.subscribe(metadata);

  stream.on('error', (err) => {
    console.error('Stream error:', err);
    setTimeout(createStream, 5000); // Reconnect after 5s
  });

  stream.on('end', () => {
    console.log('Stream ended');
    setTimeout(createStream, 1000); // Reconnect after 1s
  });

  // Initialize subscriptions
  stream.write({
    transactions: { 'all': { vote: false } }
  });

  return stream;
}

let stream = createStream();

Graceful Shutdown

process.on('SIGTERM', () => {
  console.log('Shutting down...');
  stream.end();
  process.exit(0);
});

Backpressure Handling

let buffer = [];
let processing = false;

stream.on('data', (update) => {
  buffer.push(update);

  if (!processing) {
    processing = true;
    processBuffer();
  }
});

async function processBuffer() {
  while (buffer.length > 0) {
    const update = buffer.shift();
    await processUpdate(update);
  }
  processing = false;
}

Error Handling

Common Errors

stream.on('error', (err) => {
  switch (err.code) {
    case grpc.status.UNAUTHENTICATED:
      console.error('Invalid API key');
      break;

    case grpc.status.UNAVAILABLE:
      console.error('Service unavailable, reconnecting...');
      reconnect();
      break;

    case grpc.status.RESOURCE_EXHAUSTED:
      console.error('Rate limit exceeded');
      break;

    case grpc.status.DEADLINE_EXCEEDED:
      console.error('Request timeout');
      break;

    default:
      console.error('Unknown error:', err);
  }
});

Retry Strategy

class GrpcClient {
  constructor() {
    this.retryCount = 0;
    this.maxRetries = 5;
    this.baseDelay = 1000;
  }

  connect() {
    const stream = client.subscribe(metadata);

    stream.on('error', (err) => {
      if (this.retryCount < this.maxRetries) {
        const delay = this.baseDelay * Math.pow(2, this.retryCount);
        this.retryCount++;

        console.log(`Retry ${this.retryCount}/${this.maxRetries} in ${delay}ms`);
        setTimeout(() => this.connect(), delay);
      } else {
        console.error('Max retries exceeded');
      }
    });

    stream.on('data', () => {
      this.retryCount = 0; // Reset on successful data
    });

    return stream;
  }
}

Performance Optimization

Selective Data

Request only needed fields:

stream.write({
  transactions: {
    'minimal': {
      vote: false,
      signature: true,      // Include signature
      transaction: false,   // Exclude full transaction
      accountKeys: false    // Exclude account keys
    }
  }
});

Batching Updates

Process updates in batches:

let batch = [];
const BATCH_SIZE = 100;
const BATCH_INTERVAL = 1000; // 1 second

stream.on('data', (update) => {
  batch.push(update);

  if (batch.length >= BATCH_SIZE) {
    processBatch(batch);
    batch = [];
  }
});

setInterval(() => {
  if (batch.length > 0) {
    processBatch(batch);
    batch = [];
  }
}, BATCH_INTERVAL);

async function processBatch(updates) {
  // Process multiple updates at once
  console.log(`Processing batch of ${updates.length} updates`);
}

Connection Pooling

Use multiple connections for high throughput:

const POOL_SIZE = 5;
const streams = [];

for (let i = 0; i < POOL_SIZE; i++) {
  const stream = client.subscribe(metadata);

  stream.write({
    transactions: {
      [`stream_${i}`]: {
        accountInclude: getAccountsForShard(i, POOL_SIZE),
        vote: false
      }
    }
  });

  streams.push(stream);
}

function getAccountsForShard(shard, totalShards) {
  // Distribute accounts across shards
  return ALL_ACCOUNTS.filter((_, i) => i % totalShards === shard);
}

Monitoring

Track Stream Health

let lastUpdate = Date.now();
let updateCount = 0;

stream.on('data', () => {
  lastUpdate = Date.now();
  updateCount++;
});

// Health check every 10 seconds
setInterval(() => {
  const timeSinceUpdate = Date.now() - lastUpdate;

  if (timeSinceUpdate > 30000) {
    console.warn('No updates received in 30s, stream may be stalled');
  }

  console.log(`Updates/sec: ${updateCount / 10}`);
  updateCount = 0;
}, 10000);

Metrics Collection

const metrics = {
  transactions: 0,
  blocks: 0,
  slots: 0,
  errors: 0
};

stream.on('data', (update) => {
  if (update.transaction) metrics.transactions++;
  if (update.block) metrics.blocks++;
  if (update.slot) metrics.slots++;
});

stream.on('error', () => {
  metrics.errors++;
});

// Report metrics
setInterval(() => {
  console.log('Metrics:', metrics);
}, 60000);

Best Practices

  1. Use CONFIRMED commitment for most use cases
  2. Filter aggressively to reduce bandwidth
  3. Implement reconnection logic for reliability
  4. Monitor stream health to detect issues
  5. Handle backpressure to avoid memory issues
  6. Use multiple streams for high throughput
  7. Batch processing for better performance
  8. Set appropriate timeouts to detect hangs
  9. Log errors and metrics for debugging
  10. Test failure scenarios before production

Next Steps

Support