NexusgRPC API
gRPC Streaming Guide
Learn how to use gRPC streaming for blocks, transactions, accounts, and slots
gRPC Streaming Guide
FortiBlox gRPC provides powerful streaming capabilities for real-time blockchain data. This guide covers all streaming modes and best practices.
Stream Types
Transaction Streaming
Stream transactions with advanced filtering:
stream.write({
transactions: {
'jupiter_swaps': {
accountInclude: ['JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'],
vote: false,
failed: false
}
},
commitment: 'CONFIRMED'
});
stream.on('data', (update) => {
if (update.transaction) {
console.log('New transaction:', update.transaction);
}
});Block Streaming
Stream complete blocks with transactions:
stream.write({
blocks: {
'new_blocks': {
includeTransactions: true,
includeAccounts: false,
includeEntries: false
}
}
});
stream.on('data', (update) => {
if (update.block) {
console.log(`Block ${update.block.slot}: ${update.block.transactions.length} txs`);
}
});Slot Streaming
Track slot progression:
stream.write({
slots: {
'slot_updates': {
filterByCommitment: true
}
}
});
stream.on('data', (update) => {
if (update.slot) {
console.log(`Slot ${update.slot.slot} (parent: ${update.slot.parent})`);
}
});Account Streaming
Monitor account changes:
stream.write({
accounts: {
'wallet_updates': {
account: ['YourWalletAddress...'],
owner: [] // or filter by owner program
}
}
});
stream.on('data', (update) => {
if (update.account) {
console.log(`Account ${update.account.account.pubkey}: ${update.account.account.lamports} lamports`);
}
});Filtering Strategies
Account Inclusion
Include specific accounts:
{
transactions: {
'swap_programs': {
accountInclude: [
'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4', // Jupiter
'9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP' // Orca
],
vote: false
}
}
}Account Exclusion
Exclude specific accounts:
{
transactions: {
'non_vote': {
accountExclude: [
'Vote111111111111111111111111111111111111111'
]
}
}
}Required Accounts
Transaction must include all specified accounts:
{
transactions: {
'token_transfers': {
accountRequired: [
'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA',
'YourTokenMint...'
]
}
}
}Vote and Failed Filters
{
transactions: {
'successful_only': {
vote: false, // Exclude vote transactions
failed: false // Exclude failed transactions
}
}
}Commitment Levels
Processed (Fastest)
stream.write({
transactions: { 'fast': { vote: false } },
commitment: 'PROCESSED'
});- Latency: ~5ms
- Finality: Not guaranteed (may be rolled back)
- Use case: Real-time dashboards
Confirmed (Recommended)
stream.write({
transactions: { 'reliable': { vote: false } },
commitment: 'CONFIRMED'
});- Latency: ~10ms
- Finality: 99%+ probability
- Use case: Most applications
Finalized (Most Secure)
stream.write({
transactions: { 'final': { vote: false } },
commitment: 'FINALIZED'
});- Latency: ~30-60 seconds
- Finality: 100% guaranteed
- Use case: Financial applications
Multiple Subscriptions
Subscribe to different event types simultaneously:
stream.write({
transactions: {
'swaps': {
accountInclude: ['JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'],
vote: false
},
'token_mints': {
accountInclude: ['TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA'],
vote: false
}
},
slots: {
'slot_updates': {}
},
blocks: {
'new_blocks': {
includeTransactions: false
}
},
commitment: 'CONFIRMED'
});
stream.on('data', (update) => {
if (update.transaction) {
const sub = update.transaction.subscription;
console.log(`Transaction from ${sub}:`, update.transaction);
} else if (update.slot) {
console.log('Slot update:', update.slot);
} else if (update.block) {
console.log('Block update:', update.block);
}
});Stream Management
Connection Management
function createStream() {
const stream = client.subscribe(metadata);
stream.on('error', (err) => {
console.error('Stream error:', err);
setTimeout(createStream, 5000); // Reconnect after 5s
});
stream.on('end', () => {
console.log('Stream ended');
setTimeout(createStream, 1000); // Reconnect after 1s
});
// Initialize subscriptions
stream.write({
transactions: { 'all': { vote: false } }
});
return stream;
}
let stream = createStream();Graceful Shutdown
process.on('SIGTERM', () => {
console.log('Shutting down...');
stream.end();
process.exit(0);
});Backpressure Handling
let buffer = [];
let processing = false;
stream.on('data', (update) => {
buffer.push(update);
if (!processing) {
processing = true;
processBuffer();
}
});
async function processBuffer() {
while (buffer.length > 0) {
const update = buffer.shift();
await processUpdate(update);
}
processing = false;
}Error Handling
Common Errors
stream.on('error', (err) => {
switch (err.code) {
case grpc.status.UNAUTHENTICATED:
console.error('Invalid API key');
break;
case grpc.status.UNAVAILABLE:
console.error('Service unavailable, reconnecting...');
reconnect();
break;
case grpc.status.RESOURCE_EXHAUSTED:
console.error('Rate limit exceeded');
break;
case grpc.status.DEADLINE_EXCEEDED:
console.error('Request timeout');
break;
default:
console.error('Unknown error:', err);
}
});Retry Strategy
class GrpcClient {
constructor() {
this.retryCount = 0;
this.maxRetries = 5;
this.baseDelay = 1000;
}
connect() {
const stream = client.subscribe(metadata);
stream.on('error', (err) => {
if (this.retryCount < this.maxRetries) {
const delay = this.baseDelay * Math.pow(2, this.retryCount);
this.retryCount++;
console.log(`Retry ${this.retryCount}/${this.maxRetries} in ${delay}ms`);
setTimeout(() => this.connect(), delay);
} else {
console.error('Max retries exceeded');
}
});
stream.on('data', () => {
this.retryCount = 0; // Reset on successful data
});
return stream;
}
}Performance Optimization
Selective Data
Request only needed fields:
stream.write({
transactions: {
'minimal': {
vote: false,
signature: true, // Include signature
transaction: false, // Exclude full transaction
accountKeys: false // Exclude account keys
}
}
});Batching Updates
Process updates in batches:
let batch = [];
const BATCH_SIZE = 100;
const BATCH_INTERVAL = 1000; // 1 second
stream.on('data', (update) => {
batch.push(update);
if (batch.length >= BATCH_SIZE) {
processBatch(batch);
batch = [];
}
});
setInterval(() => {
if (batch.length > 0) {
processBatch(batch);
batch = [];
}
}, BATCH_INTERVAL);
async function processBatch(updates) {
// Process multiple updates at once
console.log(`Processing batch of ${updates.length} updates`);
}Connection Pooling
Use multiple connections for high throughput:
const POOL_SIZE = 5;
const streams = [];
for (let i = 0; i < POOL_SIZE; i++) {
const stream = client.subscribe(metadata);
stream.write({
transactions: {
[`stream_${i}`]: {
accountInclude: getAccountsForShard(i, POOL_SIZE),
vote: false
}
}
});
streams.push(stream);
}
function getAccountsForShard(shard, totalShards) {
// Distribute accounts across shards
return ALL_ACCOUNTS.filter((_, i) => i % totalShards === shard);
}Monitoring
Track Stream Health
let lastUpdate = Date.now();
let updateCount = 0;
stream.on('data', () => {
lastUpdate = Date.now();
updateCount++;
});
// Health check every 10 seconds
setInterval(() => {
const timeSinceUpdate = Date.now() - lastUpdate;
if (timeSinceUpdate > 30000) {
console.warn('No updates received in 30s, stream may be stalled');
}
console.log(`Updates/sec: ${updateCount / 10}`);
updateCount = 0;
}, 10000);Metrics Collection
const metrics = {
transactions: 0,
blocks: 0,
slots: 0,
errors: 0
};
stream.on('data', (update) => {
if (update.transaction) metrics.transactions++;
if (update.block) metrics.blocks++;
if (update.slot) metrics.slots++;
});
stream.on('error', () => {
metrics.errors++;
});
// Report metrics
setInterval(() => {
console.log('Metrics:', metrics);
}, 60000);Best Practices
- Use CONFIRMED commitment for most use cases
- Filter aggressively to reduce bandwidth
- Implement reconnection logic for reliability
- Monitor stream health to detect issues
- Handle backpressure to avoid memory issues
- Use multiple streams for high throughput
- Batch processing for better performance
- Set appropriate timeouts to detect hangs
- Log errors and metrics for debugging
- Test failure scenarios before production
Next Steps
Code Examples
Production-ready streaming implementations
Protocol Reference
Complete protobuf definitions
Getting Started
Install and connect to gRPC
Overview
gRPC capabilities and benefits
Support
- Discord: discord.gg/fortiblox
- Email: [email protected]
- GitHub: github.com/fortiblox/geyser-grpc