FortiBlox LogoFortiBlox Docs
NexusgRPC Streaming

Account Streaming

Stream real-time account updates from X1 Blockchain with sub-second latency

Account Streaming

Subscribe to real-time account updates from FortiBlox gRPC streaming. Monitor wallet balances, track token account changes, watch vote accounts, and build sophisticated blockchain applications with live account state updates.

Fully Operational: Account streaming is live with 26,000+ entries in Redis (geyser:accounts) providing real-time balance and data updates.

Overview

Account streaming allows you to:

  • Monitor wallet balances - Track SOL balance changes in real-time
  • Watch token accounts - Stream token balance updates for SPL tokens
  • Track program accounts - Monitor program-owned account state changes
  • Vote account monitoring - Watch validator vote account activity
  • Real-time notifications - Build instant alerts for account changes
  • Sub-second latency - Get updates from Redis pub/sub in <100ms

Data Source

  • Redis Stream: geyser:accounts
  • Current Entries: 26,000+
  • Freshness: Real-time (<100ms)
  • Endpoint: grpc.fortiblox.com:10002 (HTTP/2 plaintext)

Why Use Account Streaming?

Traditional polling wastes resources and introduces latency:

// ❌ BAD: Polling every second
setInterval(async () => {
  const balance = await connection.getBalance(wallet);
  console.log(`Current balance: ${balance}`);
}, 1000);

Account streaming provides instant updates only when accounts change:

// ✅ GOOD: Real-time streaming
stream.write({
  accounts: {
    'wallet_watch': {
      account: ['YourWalletAddress...']
    }
  },
  commitment: 'CONFIRMED'
});

stream.on('data', (update) => {
  if (update.account) {
    console.log(`Balance changed: ${update.account.lamports}`);
  }
});

Request Parameters

SubscribeAccounts Filter Options

ParameterTypeRequiredDescriptionExample
accountstring[]YesAccount public keys to watch['9B5Xs...', 'vines1...']
commitmentenumNoCommitment level (default: CONFIRMED)CONFIRMED or FINALIZED
include_databoolNoInclude account data in updates (default: false)true

Required Parameter: You must specify at least one account pubkey to watch. Empty account lists will result in an error.

Commitment Levels

LevelDescriptionUse CaseLatency
PROCESSEDOptimistically confirmed by clusterReal-time monitoring~400ms
CONFIRMEDVoted on by supermajorityMost applications~1s
FINALIZEDFinalized by clusterCritical balance checks~30s

Recommended: Use CONFIRMED commitment for most applications. It provides a good balance between speed and certainty.

Response Fields

AccountUpdate Message

FieldTypeDescription
pubkeystringAccount public key (base58)
ownerstringOwner program public key
lamportsuint64Account balance in lamports (1 SOL = 1B lamports)
slotuint64Slot number when update occurred
databytesAccount data (if requested)
commitmentenumCommitment level of this update

Complete Code Examples

Wallet Balance Monitor

// wallet-balance-monitor.js
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');

// Load proto
const packageDefinition = protoLoader.loadSync('geyser.proto', {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true
});

const proto = grpc.loadPackageDefinition(packageDefinition);

// Create client (plaintext/insecure)
const client = new proto.geyser.Geyser(
  'grpc.fortiblox.com:10002',
  grpc.credentials.createInsecure()
);

// Add API key metadata
const metadata = new grpc.Metadata();
metadata.add('x-api-key', process.env.FORTIBLOX_API_KEY);

// Wallets to monitor
const WATCHED_WALLETS = [
  '9B5XszUGdMaxCZ7uSQhPzdks5ZQSmWxrmzCSvtJ6Ns6g',
  'vines1vzrYbzLMRdu58ou5XTby4qAqVRLmqo36NKPTg',
  'AjozzgE83A3x1sHNUR64hfH7zaEBWeMaFuAN9kQgujrc'
];

// Track balance history
const balanceHistory = new Map();
let updateCount = 0;

// Initialize balance history
WATCHED_WALLETS.forEach(wallet => {
  balanceHistory.set(wallet, {
    current: 0,
    previous: 0,
    changes: 0,
    lastUpdate: null
  });
});

// Create subscription request
const request = {
  accounts: WATCHED_WALLETS,
  commitment: 'CONFIRMED',
  include_data: false
};

// Start streaming
const call = client.SubscribeAccounts(request, metadata);

console.log('🚀 Wallet balance monitor started');
console.log('   Endpoint: grpc.fortiblox.com:10002');
console.log(`   Watching: ${WATCHED_WALLETS.length} wallets`);
console.log('   Commitment: CONFIRMED\n');

// Process account updates
call.on('data', (update) => {
  updateCount++;

  const wallet = update.pubkey;
  const lamports = BigInt(update.lamports);
  const sol = Number(lamports) / 1_000_000_000;

  // Get balance history
  const history = balanceHistory.get(wallet);
  if (!history) return;

  // Calculate change
  const previousSol = history.current;
  const changeSol = sol - previousSol;
  const changePercent = previousSol > 0
    ? ((changeSol / previousSol) * 100).toFixed(2)
    : 0;

  // Update history
  history.previous = history.current;
  history.current = sol;
  history.changes++;
  history.lastUpdate = new Date();

  // Determine change type
  let changeIcon = '💰';
  let changeType = 'unchanged';
  if (changeSol > 0) {
    changeIcon = '📈';
    changeType = 'increase';
  } else if (changeSol < 0) {
    changeIcon = '📉';
    changeType = 'decrease';
  }

  // Log update
  console.log(`\n${changeIcon} Balance Update #${updateCount}`);
  console.log(`   Wallet: ${wallet.slice(0, 8)}...${wallet.slice(-4)}`);
  console.log(`   Balance: ${sol.toFixed(9)} SOL`);

  if (changeSol !== 0) {
    console.log(`   Change: ${changeSol > 0 ? '+' : ''}${changeSol.toFixed(9)} SOL (${changePercent}%)`);
  }

  console.log(`   Slot: ${update.slot}`);
  console.log(`   Owner: ${update.owner.slice(0, 20)}...`);
  console.log(`   Updates: ${history.changes}`);

  // Alert on large changes
  if (Math.abs(changeSol) > 1.0) {
    console.log(`\n🚨 LARGE CHANGE DETECTED!`);
    console.log(`   Amount: ${changeSol > 0 ? '+' : ''}${changeSol.toFixed(9)} SOL`);
    console.log(`   Wallet: ${wallet}`);

    // Send notification (webhook, email, etc.)
    sendAlert(wallet, changeSol, changeType);
  }

  // Show summary every 10 updates
  if (updateCount % 10 === 0) {
    printSummary();
  }
});

function printSummary() {
  console.log(`\n📊 Summary Statistics:`);
  console.log(`   Total Updates: ${updateCount}`);

  let totalBalance = 0;
  let totalChanges = 0;

  balanceHistory.forEach((history, wallet) => {
    totalBalance += history.current;
    totalChanges += history.changes;

    if (history.changes > 0) {
      console.log(`\n   ${wallet.slice(0, 8)}...:`);
      console.log(`     Balance: ${history.current.toFixed(9)} SOL`);
      console.log(`     Changes: ${history.changes}`);
      console.log(`     Last: ${history.lastUpdate?.toLocaleTimeString()}`);
    }
  });

  console.log(`\n   Total Portfolio: ${totalBalance.toFixed(9)} SOL`);
  console.log(`   Total Changes: ${totalChanges}\n`);
}

function sendAlert(wallet, change, type) {
  // Implement your alert logic here
  // Examples:
  // - Send webhook to Discord/Slack
  // - Send email notification
  // - Trigger SMS alert
  // - Update dashboard
  console.log(`🔔 Alert sent for ${wallet}`);
}

// Error handling with reconnection
call.on('error', (err) => {
  console.error('❌ Stream error:', err.message);

  if (err.code === grpc.status.UNAVAILABLE) {
    console.log('🔄 Reconnecting in 5 seconds...');
    setTimeout(() => {
      console.log('🔌 Attempting reconnection...');
      // Restart stream (same logic as above)
    }, 5000);
  }
});

call.on('end', () => {
  console.log('Stream ended, reconnecting...');
  setTimeout(() => reconnect(), 1000);
});

// Graceful shutdown
process.on('SIGINT', () => {
  console.log('\n👋 Shutting down...');
  printSummary();
  console.log(`\nFinal stats: ${updateCount} updates processed`);
  call.end();
  process.exit(0);
});

Run

export FORTIBLOX_API_KEY="your_api_key_here"
node wallet-balance-monitor.js

Multi-Account Dashboard

// dashboard.js
const express = require('express');
const app = express();

// Store latest account states
const accountStates = new Map();

// Set up gRPC streaming (as above)
call.on('data', (update) => {
  accountStates.set(update.pubkey, {
    lamports: update.lamports,
    sol: Number(update.lamports) / 1e9,
    slot: update.slot,
    owner: update.owner,
    lastUpdate: Date.now()
  });
});

// REST API endpoint
app.get('/api/accounts', (req, res) => {
  const accounts = Array.from(accountStates.entries()).map(([pubkey, state]) => ({
    pubkey,
    ...state
  }));

  res.json({
    count: accounts.length,
    accounts
  });
});

app.get('/api/accounts/:pubkey', (req, res) => {
  const state = accountStates.get(req.params.pubkey);

  if (!state) {
    return res.status(404).json({ error: 'Account not found' });
  }

  res.json({
    pubkey: req.params.pubkey,
    ...state
  });
});

app.listen(3000, () => {
  console.log('📊 Dashboard API running on http://localhost:3000');
});

Token Account Tracker

# token_account_tracker.py
import grpc
import os
import base64
from datetime import datetime
from decimal import Decimal
import geyser_pb2
import geyser_pb2_grpc

class TokenAccountTracker:
    def __init__(self, token_accounts, api_key):
        self.token_accounts = token_accounts
        self.api_key = api_key
        self.update_count = 0
        self.account_states = {}

        # Initialize states
        for account in token_accounts:
            self.account_states[account] = {
                'balance': 0,
                'previous_balance': 0,
                'updates': 0,
                'last_update': None
            }

    def connect(self):
        """Create insecure channel (plaintext)"""
        channel = grpc.insecure_channel('grpc.fortiblox.com:10002')
        stub = geyser_pb2_grpc.GeyserStub(channel)
        return stub

    def monitor(self):
        print(f"🚀 Token account tracker started")
        print(f"   Endpoint: grpc.fortiblox.com:10002")
        print(f"   Watching: {len(self.token_accounts)} accounts")
        print(f"   Commitment: CONFIRMED\n")

        stub = self.connect()
        metadata = [('x-api-key', self.api_key)]

        # Create subscription request
        request = geyser_pb2.SubscribeAccountsRequest(
            accounts=self.token_accounts,
            commitment=geyser_pb2.CommitmentLevel.CONFIRMED,
            include_data=True  # Include data to parse token amounts
        )

        try:
            for update in stub.SubscribeAccounts(request, metadata=metadata):
                self.process_account_update(update)

        except grpc.RpcError as e:
            print(f"❌ gRPC error: {e.code()} - {e.details()}")
            print("🔄 Reconnecting in 5 seconds...")
            import time
            time.sleep(5)
            self.monitor()  # Reconnect

        except KeyboardInterrupt:
            print("\n👋 Shutting down...")
            self.print_summary()
            print(f"Final stats: {self.update_count} updates processed")

    def process_account_update(self, update):
        self.update_count += 1

        pubkey = update.pubkey
        lamports = update.lamports
        slot = update.slot
        owner = update.owner

        # Get account state
        state = self.account_states.get(pubkey)
        if not state:
            return

        # Update state
        state['previous_balance'] = state['balance']
        state['balance'] = lamports
        state['updates'] += 1
        state['last_update'] = datetime.now()

        # Calculate change
        change = lamports - state['previous_balance']
        sol_balance = lamports / 1_000_000_000
        sol_change = change / 1_000_000_000

        # Determine change type
        if change > 0:
            change_icon = "📈"
            change_type = "INCREASE"
        elif change < 0:
            change_icon = "📉"
            change_type = "DECREASE"
        else:
            change_icon = "💰"
            change_type = "UNCHANGED"

        # Parse token data if available
        token_info = None
        if update.data:
            token_info = self.parse_token_account(update.data)

        print(f"\n{change_icon} Account Update #{self.update_count}")
        print(f"   Account: {pubkey[:8]}...{pubkey[-4:]}")
        print(f"   Balance: {sol_balance:.9f} SOL")

        if change != 0:
            print(f"   Change: {'+' if change > 0 else ''}{sol_change:.9f} SOL")

        print(f"   Slot: {slot}")
        print(f"   Owner: {owner[:20]}...")
        print(f"   Updates: {state['updates']}")

        # Show token info if parsed
        if token_info:
            print(f"\n   Token Info:")
            print(f"     Mint: {token_info['mint'][:8]}...")
            print(f"     Amount: {token_info['amount']}")
            print(f"     Decimals: {token_info['decimals']}")

        # Alert on large changes
        if abs(sol_change) > 0.1:
            self.send_alert(pubkey, sol_change, change_type)

        # Show summary every 10 updates
        if self.update_count % 10 == 0:
            self.print_summary()

    def parse_token_account(self, data):
        """Parse SPL token account data"""
        try:
            # Token account layout (165 bytes):
            # mint: 32 bytes (offset 0)
            # owner: 32 bytes (offset 32)
            # amount: 8 bytes (offset 64)
            # delegate: 36 bytes optional (offset 72)
            # state: 1 byte (offset 108)
            # is_native: 12 bytes optional (offset 109)
            # delegated_amount: 8 bytes (offset 121)
            # close_authority: 36 bytes optional (offset 129)

            mint = base64.b64encode(data[0:32]).decode('utf-8')
            amount = int.from_bytes(data[64:72], byteorder='little')

            # Determine decimals (usually 6 for USDC, 9 for SOL)
            # In real implementation, fetch from mint account
            decimals = 9

            return {
                'mint': mint,
                'amount': amount / (10 ** decimals),
                'decimals': decimals
            }
        except Exception as e:
            print(f"   ⚠️  Failed to parse token data: {e}")
            return None

    def send_alert(self, account, change, change_type):
        """Send alert for large changes"""
        print(f"\n🚨 LARGE CHANGE DETECTED!")
        print(f"   Account: {account}")
        print(f"   Change: {'+' if change > 0 else ''}{change:.9f} SOL")
        print(f"   Type: {change_type}")

        # Implement your alert logic here
        # - Webhook to Discord/Slack
        # - Email notification
        # - SMS alert
        # - Database logging

    def print_summary(self):
        """Print summary statistics"""
        print(f"\n📊 Summary Statistics:")
        print(f"   Total Updates: {self.update_count}")

        total_balance = 0
        active_accounts = 0

        for pubkey, state in self.account_states.items():
            balance_sol = state['balance'] / 1_000_000_000
            total_balance += balance_sol

            if state['updates'] > 0:
                active_accounts += 1
                print(f"\n   {pubkey[:8]}...:")
                print(f"     Balance: {balance_sol:.9f} SOL")
                print(f"     Updates: {state['updates']}")
                if state['last_update']:
                    print(f"     Last: {state['last_update'].strftime('%H:%M:%S')}")

        print(f"\n   Active Accounts: {active_accounts}/{len(self.token_accounts)}")
        print(f"   Total Balance: {total_balance:.9f} SOL\n")

if __name__ == '__main__':
    api_key = os.getenv('FORTIBLOX_API_KEY')
    if not api_key:
        print("Error: FORTIBLOX_API_KEY environment variable not set")
        exit(1)

    # Token accounts to monitor
    token_accounts = [
        'TokenAccount1PublicKey...',
        'TokenAccount2PublicKey...',
        'TokenAccount3PublicKey...'
    ]

    tracker = TokenAccountTracker(token_accounts, api_key)
    tracker.monitor()

Setup & Run

# Install dependencies
pip install grpcio grpcio-tools

# Download and compile proto
curl -O https://raw.githubusercontent.com/rpcpool/yellowstone-grpc/master/yellowstone-grpc-proto/proto/geyser.proto
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. geyser.proto

# Run
export FORTIBLOX_API_KEY="your_api_key_here"
python token_account_tracker.py

Balance Alert System

# balance_alerts.py
import smtplib
from email.mime.text import MIMEText

class BalanceAlertSystem:
    def __init__(self, thresholds):
        self.thresholds = thresholds  # {account: min_balance}

    def check_balance(self, account, balance_sol):
        """Check if balance is below threshold"""
        threshold = self.thresholds.get(account)

        if threshold and balance_sol < threshold:
            self.send_low_balance_alert(account, balance_sol, threshold)

    def send_low_balance_alert(self, account, balance, threshold):
        """Send low balance alert"""
        print(f"\n⚠️  LOW BALANCE ALERT!")
        print(f"   Account: {account}")
        print(f"   Balance: {balance:.9f} SOL")
        print(f"   Threshold: {threshold:.9f} SOL")

        # Send email
        self.send_email_alert(account, balance, threshold)

        # Send webhook
        self.send_webhook_alert(account, balance, threshold)

    def send_email_alert(self, account, balance, threshold):
        """Send email notification"""
        msg = MIMEText(f"""
        Low Balance Alert

        Account: {account}
        Current Balance: {balance:.9f} SOL
        Threshold: {threshold:.9f} SOL

        Please add funds to maintain operations.
        """)

        msg['Subject'] = f'⚠️ Low Balance Alert: {account[:8]}...'
        msg['From'] = '[email protected]'
        msg['To'] = '[email protected]'

        # Send email (configure SMTP)
        # smtp = smtplib.SMTP('smtp.gmail.com', 587)
        # smtp.send_message(msg)
        # smtp.quit()

    def send_webhook_alert(self, account, balance, threshold):
        """Send webhook to Discord/Slack"""
        import requests

        webhook_url = "https://discord.com/api/webhooks/YOUR_WEBHOOK"

        data = {
            "content": f"⚠️ **Low Balance Alert**\n"
                      f"Account: `{account[:8]}...`\n"
                      f"Balance: {balance:.9f} SOL\n"
                      f"Threshold: {threshold:.9f} SOL"
        }

        # requests.post(webhook_url, json=data)

Vote Account Monitor

// src/main.rs
use tonic::{transport::Channel, metadata::MetadataValue, Request};
use tokio_stream::StreamExt;
use std::collections::HashMap;
use chrono::{DateTime, Utc};

pub mod geyser {
    tonic::include_proto!("geyser");
}

use geyser::{
    geyser_client::GeyserClient,
    SubscribeAccountsRequest,
    CommitmentLevel
};

#[derive(Debug, Clone)]
struct AccountState {
    balance: u64,
    previous_balance: u64,
    updates: u64,
    last_update: DateTime<Utc>,
    owner: String,
}

struct VoteAccountMonitor {
    vote_accounts: Vec<String>,
    account_states: HashMap<String, AccountState>,
    update_count: u64,
}

impl VoteAccountMonitor {
    fn new(vote_accounts: Vec<String>) -> Self {
        Self {
            vote_accounts,
            account_states: HashMap::new(),
            update_count: 0,
        }
    }

    fn process_update(&mut self, update: geyser::AccountUpdate) {
        self.update_count += 1;

        let pubkey = update.pubkey.clone();
        let lamports = update.lamports;
        let slot = update.slot;
        let owner = update.owner;

        // Get or create account state
        let state = self.account_states.entry(pubkey.clone())
            .or_insert(AccountState {
                balance: 0,
                previous_balance: 0,
                updates: 0,
                last_update: Utc::now(),
                owner: owner.clone(),
            });

        // Update state
        state.previous_balance = state.balance;
        state.balance = lamports;
        state.updates += 1;
        state.last_update = Utc::now();

        // Calculate change
        let change = lamports as i128 - state.previous_balance as i128;
        let sol_balance = lamports as f64 / 1_000_000_000.0;
        let sol_change = change as f64 / 1_000_000_000.0;

        // Determine change type
        let (icon, change_type) = if change > 0 {
            ("📈", "INCREASE")
        } else if change < 0 {
            ("📉", "DECREASE")
        } else {
            ("💰", "UNCHANGED")
        };

        // Log update
        println!("\n{} Vote Account Update #{}", icon, self.update_count);
        println!("   Account: {}...{}", &pubkey[..8], &pubkey[pubkey.len()-4..]);
        println!("   Balance: {:.9} SOL", sol_balance);

        if change != 0 {
            println!("   Change: {:+.9} SOL", sol_change);
        }

        println!("   Slot: {}", slot);
        println!("   Owner: {}...", &owner[..20]);
        println!("   Updates: {}", state.updates);

        // Alert on significant changes
        if sol_change.abs() > 5.0 {
            self.send_alert(&pubkey, sol_change, change_type);
        }

        // Show summary every 10 updates
        if self.update_count % 10 == 0 {
            self.print_summary();
        }
    }

    fn send_alert(&self, account: &str, change: f64, change_type: &str) {
        println!("\n🚨 SIGNIFICANT CHANGE DETECTED!");
        println!("   Account: {}", account);
        println!("   Change: {:+.9} SOL", change);
        println!("   Type: {}", change_type);

        // Implement your alert logic here
        // - Send to monitoring system
        // - Log to database
        // - Trigger webhook
    }

    fn print_summary(&self) {
        println!("\n📊 Summary Statistics:");
        println!("   Total Updates: {}", self.update_count);

        let mut total_balance = 0u64;
        let mut active_accounts = 0;

        for (pubkey, state) in &self.account_states {
            total_balance += state.balance;

            if state.updates > 0 {
                active_accounts += 1;
                let sol_balance = state.balance as f64 / 1_000_000_000.0;

                println!("\n   {}...:", &pubkey[..8]);
                println!("     Balance: {:.9} SOL", sol_balance);
                println!("     Updates: {}", state.updates);
                println!("     Last: {}", state.last_update.format("%H:%M:%S"));
            }
        }

        let total_sol = total_balance as f64 / 1_000_000_000.0;
        println!("\n   Active Accounts: {}/{}", active_accounts, self.vote_accounts.len());
        println!("   Total Staked: {:.9} SOL\n", total_sol);
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("🚀 Vote account monitor started");
    println!("   Endpoint: grpc.fortiblox.com:10002");
    println!("   Commitment: CONFIRMED\n");

    // Vote accounts to monitor
    let vote_accounts = vec![
        "VoteAccount1PublicKey...".to_string(),
        "VoteAccount2PublicKey...".to_string(),
        "VoteAccount3PublicKey...".to_string(),
    ];

    let mut monitor = VoteAccountMonitor::new(vote_accounts.clone());

    loop {
        if let Err(e) = monitor_accounts(&mut monitor, &vote_accounts).await {
            eprintln!("❌ Error: {}", e);
            eprintln!("🔄 Reconnecting in 5 seconds...");
            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
        }
    }
}

async fn monitor_accounts(
    monitor: &mut VoteAccountMonitor,
    vote_accounts: &[String]
) -> Result<(), Box<dyn std::error::Error>> {
    // Create channel (insecure/plaintext)
    let channel = Channel::from_static("http://grpc.fortiblox.com:10002")
        .connect()
        .await?;

    let mut client = GeyserClient::new(channel);

    // Create request with API key
    let api_key = std::env::var("FORTIBLOX_API_KEY")?;
    let token = MetadataValue::try_from(api_key)?;

    let mut request = Request::new(SubscribeAccountsRequest {
        accounts: vote_accounts.to_vec(),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        include_data: Some(false),
    });

    request.metadata_mut().insert("x-api-key", token);

    // Subscribe
    let mut stream = client.subscribe_accounts(request).await?.into_inner();

    // Process updates
    while let Some(update) = stream.next().await {
        match update {
            Ok(account_update) => {
                monitor.process_update(account_update);
            }
            Err(e) => {
                eprintln!("Stream error: {}", e);
                return Err(Box::new(e));
            }
        }
    }

    Ok(())
}

Cargo.toml

[package]
name = "vote-account-monitor"
version = "0.1.0"
edition = "2021"

[dependencies]
tonic = "0.11"
prost = "0.12"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
chrono = "0.4"

[build-dependencies]
tonic-build = "0.11"

Run

export FORTIBLOX_API_KEY="your_api_key_here"
cargo run --release

Multi-Wallet Portfolio Tracker

// main.go
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "os"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/metadata"

    pb "your-module/geyser"
)

type AccountState struct {
    Balance         uint64
    PreviousBalance uint64
    Updates         int
    LastUpdate      time.Time
    Owner           string
}

type PortfolioTracker struct {
    Accounts      []string
    States        map[string]*AccountState
    UpdateCount   int
    TotalValue    float64
}

func NewPortfolioTracker(accounts []string) *PortfolioTracker {
    states := make(map[string]*AccountState)

    for _, account := range accounts {
        states[account] = &AccountState{
            Balance:         0,
            PreviousBalance: 0,
            Updates:         0,
            LastUpdate:      time.Now(),
        }
    }

    return &PortfolioTracker{
        Accounts:    accounts,
        States:      states,
        UpdateCount: 0,
        TotalValue:  0,
    }
}

func main() {
    fmt.Println("🚀 Portfolio tracker started")
    fmt.Println("   Endpoint: grpc.fortiblox.com:10002")
    fmt.Println("   Commitment: CONFIRMED\n")

    apiKey := os.Getenv("FORTIBLOX_API_KEY")
    if apiKey == "" {
        log.Fatal("FORTIBLOX_API_KEY environment variable not set")
    }

    // Accounts to track
    accounts := []string{
        "Wallet1PublicKey...",
        "Wallet2PublicKey...",
        "Wallet3PublicKey...",
    }

    tracker := NewPortfolioTracker(accounts)

    fmt.Printf("   Watching: %d wallets\n\n", len(accounts))

    for {
        if err := tracker.Monitor(apiKey); err != nil {
            log.Printf("❌ Error: %v", err)
            log.Println("🔄 Reconnecting in 5 seconds...")
            time.Sleep(5 * time.Second)
        }
    }
}

func (pt *PortfolioTracker) Monitor(apiKey string) error {
    // Create insecure connection (plaintext)
    conn, err := grpc.Dial(
        "grpc.fortiblox.com:10002",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        return fmt.Errorf("failed to connect: %w", err)
    }
    defer conn.Close()

    // Create client
    client := pb.NewGeyserClient(conn)

    // Add API key to context
    md := metadata.New(map[string]string{
        "x-api-key": apiKey,
    })
    ctx := metadata.NewOutgoingContext(context.Background(), md)

    // Create subscription request
    req := &pb.SubscribeAccountsRequest{
        Accounts:    pt.Accounts,
        Commitment:  pb.CommitmentLevel_CONFIRMED,
        IncludeData: false,
    }

    // Subscribe
    stream, err := client.SubscribeAccounts(ctx, req)
    if err != nil {
        return fmt.Errorf("failed to subscribe: %w", err)
    }

    // Receive updates
    for {
        update, err := stream.Recv()
        if err == io.EOF {
            return fmt.Errorf("stream ended")
        }
        if err != nil {
            return fmt.Errorf("stream error: %w", err)
        }

        pt.ProcessUpdate(update)
    }
}

func (pt *PortfolioTracker) ProcessUpdate(update *pb.AccountUpdate) {
    pt.UpdateCount++

    pubkey := update.Pubkey
    lamports := update.Lamports
    slot := update.Slot
    owner := update.Owner

    // Get account state
    state, exists := pt.States[pubkey]
    if !exists {
        return
    }

    // Update state
    state.PreviousBalance = state.Balance
    state.Balance = lamports
    state.Updates++
    state.LastUpdate = time.Now()
    state.Owner = owner

    // Calculate change
    change := int64(lamports) - int64(state.PreviousBalance)
    solBalance := float64(lamports) / 1_000_000_000
    solChange := float64(change) / 1_000_000_000

    // Determine change type
    var icon string
    var changeType string

    if change > 0 {
        icon = "📈"
        changeType = "INCREASE"
    } else if change < 0 {
        icon = "📉"
        changeType = "DECREASE"
    } else {
        icon = "💰"
        changeType = "UNCHANGED"
    }

    // Log update
    fmt.Printf("\n%s Account Update #%d\n", icon, pt.UpdateCount)
    fmt.Printf("   Account: %s...%s\n", pubkey[:8], pubkey[len(pubkey)-4:])
    fmt.Printf("   Balance: %.9f SOL\n", solBalance)

    if change != 0 {
        sign := ""
        if change > 0 {
            sign = "+"
        }
        fmt.Printf("   Change: %s%.9f SOL\n", sign, solChange)
    }

    fmt.Printf("   Slot: %d\n", slot)
    fmt.Printf("   Owner: %s...\n", owner[:20])
    fmt.Printf("   Updates: %d\n", state.Updates)

    // Update total portfolio value
    pt.CalculateTotalValue()

    // Alert on large changes
    if solChange > 10.0 || solChange < -10.0 {
        pt.SendAlert(pubkey, solChange, changeType)
    }

    // Show summary every 10 updates
    if pt.UpdateCount%10 == 0 {
        pt.PrintSummary()
    }
}

func (pt *PortfolioTracker) CalculateTotalValue() {
    total := 0.0

    for _, state := range pt.States {
        total += float64(state.Balance) / 1_000_000_000
    }

    pt.TotalValue = total
}

func (pt *PortfolioTracker) SendAlert(account string, change float64, changeType string) {
    fmt.Println("\n🚨 LARGE CHANGE DETECTED!")
    fmt.Printf("   Account: %s\n", account)
    fmt.Printf("   Change: %+.9f SOL\n", change)
    fmt.Printf("   Type: %s\n", changeType)

    // Implement your alert logic here
    // - Send webhook
    // - Log to database
    // - Send email/SMS
}

func (pt *PortfolioTracker) PrintSummary() {
    fmt.Println("\n📊 Portfolio Summary:")
    fmt.Printf("   Total Updates: %d\n", pt.UpdateCount)
    fmt.Printf("   Total Value: %.9f SOL\n", pt.TotalValue)

    activeAccounts := 0

    for pubkey, state := range pt.States {
        if state.Updates > 0 {
            activeAccounts++
            solBalance := float64(state.Balance) / 1_000_000_000

            fmt.Printf("\n   %s...:\n", pubkey[:8])
            fmt.Printf("     Balance: %.9f SOL\n", solBalance)
            fmt.Printf("     Updates: %d\n", state.Updates)
            fmt.Printf("     Last: %s\n", state.LastUpdate.Format("15:04:05"))
        }
    }

    fmt.Printf("\n   Active Accounts: %d/%d\n", activeAccounts, len(pt.Accounts))
    fmt.Println()
}

Run

export FORTIBLOX_API_KEY="your_api_key_here"
go run main.go

Program Account Monitor

// ProgramAccountMonitor.java
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;

public class ProgramAccountMonitor {
    private final GeyserGrpc.GeyserStub asyncStub;
    private final List<String> programAccounts;
    private final Map<String, AccountState> accountStates;
    private int updateCount = 0;

    static class AccountState {
        long balance;
        long previousBalance;
        int updates;
        LocalDateTime lastUpdate;
        String owner;

        AccountState() {
            this.balance = 0;
            this.previousBalance = 0;
            this.updates = 0;
            this.lastUpdate = LocalDateTime.now();
            this.owner = "";
        }
    }

    public ProgramAccountMonitor(String host, int port, String apiKey, List<String> accounts) {
        // Create insecure channel (plaintext)
        ManagedChannel channel = ManagedChannelBuilder
            .forAddress(host, port)
            .usePlaintext()
            .build();

        // Add API key metadata
        Metadata metadata = new Metadata();
        Metadata.Key<String> key = Metadata.Key.of("x-api-key", Metadata.ASCII_STRING_MARSHALLER);
        metadata.put(key, apiKey);

        this.asyncStub = GeyserGrpc.newStub(channel)
            .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));

        this.programAccounts = accounts;
        this.accountStates = new HashMap<>();

        // Initialize account states
        for (String account : accounts) {
            accountStates.put(account, new AccountState());
        }
    }

    public void startMonitoring() {
        System.out.println("🚀 Program account monitor started");
        System.out.println("   Endpoint: grpc.fortiblox.com:10002");
        System.out.println("   Watching: " + programAccounts.size() + " accounts");
        System.out.println("   Commitment: CONFIRMED\n");

        // Create subscription request
        SubscribeAccountsRequest request = SubscribeAccountsRequest.newBuilder()
            .addAllAccounts(programAccounts)
            .setCommitment(CommitmentLevel.CONFIRMED)
            .setIncludeData(false)
            .build();

        // Subscribe
        asyncStub.subscribeAccounts(request, new StreamObserver<AccountUpdate>() {
            @Override
            public void onNext(AccountUpdate update) {
                processAccountUpdate(update);
            }

            @Override
            public void onError(Throwable t) {
                System.err.println("❌ Stream error: " + t.getMessage());
                System.out.println("🔄 Reconnecting in 5 seconds...");
                try {
                    Thread.sleep(5000);
                    startMonitoring(); // Reconnect
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }

            @Override
            public void onCompleted() {
                System.out.println("Stream completed");
            }
        });

        // Keep alive
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            System.out.println("\n👋 Shutting down...");
            printSummary();
        }
    }

    private void processAccountUpdate(AccountUpdate update) {
        updateCount++;

        String pubkey = update.getPubkey();
        long lamports = update.getLamports();
        long slot = update.getSlot();
        String owner = update.getOwner();

        // Get account state
        AccountState state = accountStates.get(pubkey);
        if (state == null) {
            return;
        }

        // Update state
        state.previousBalance = state.balance;
        state.balance = lamports;
        state.updates++;
        state.lastUpdate = LocalDateTime.now();
        state.owner = owner;

        // Calculate change
        long change = lamports - state.previousBalance;
        double solBalance = lamports / 1_000_000_000.0;
        double solChange = change / 1_000_000_000.0;

        // Determine change type
        String icon;
        String changeType;

        if (change > 0) {
            icon = "📈";
            changeType = "INCREASE";
        } else if (change < 0) {
            icon = "📉";
            changeType = "DECREASE";
        } else {
            icon = "💰";
            changeType = "UNCHANGED";
        }

        // Log update
        System.out.printf("\n%s Account Update #%d\n", icon, updateCount);
        System.out.printf("   Account: %s...%s\n",
            pubkey.substring(0, 8),
            pubkey.substring(pubkey.length() - 4));
        System.out.printf("   Balance: %.9f SOL\n", solBalance);

        if (change != 0) {
            System.out.printf("   Change: %+.9f SOL\n", solChange);
        }

        System.out.printf("   Slot: %d\n", slot);
        System.out.printf("   Owner: %s...\n", owner.substring(0, Math.min(20, owner.length())));
        System.out.printf("   Updates: %d\n", state.updates);

        // Alert on large changes
        if (Math.abs(solChange) > 1.0) {
            sendAlert(pubkey, solChange, changeType);
        }

        // Show summary every 10 updates
        if (updateCount % 10 == 0) {
            printSummary();
        }
    }

    private void sendAlert(String account, double change, String changeType) {
        System.out.println("\n🚨 LARGE CHANGE DETECTED!");
        System.out.printf("   Account: %s\n", account);
        System.out.printf("   Change: %+.9f SOL\n", change);
        System.out.printf("   Type: %s\n", changeType);

        // Implement your alert logic here
        // - Send webhook
        // - Log to database
        // - Send email/SMS
    }

    private void printSummary() {
        System.out.println("\n📊 Summary Statistics:");
        System.out.printf("   Total Updates: %d\n", updateCount);

        long totalBalance = 0;
        int activeAccounts = 0;
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");

        for (Map.Entry<String, AccountState> entry : accountStates.entrySet()) {
            String pubkey = entry.getKey();
            AccountState state = entry.getValue();

            totalBalance += state.balance;

            if (state.updates > 0) {
                activeAccounts++;
                double solBalance = state.balance / 1_000_000_000.0;

                System.out.printf("\n   %s...:\n", pubkey.substring(0, 8));
                System.out.printf("     Balance: %.9f SOL\n", solBalance);
                System.out.printf("     Updates: %d\n", state.updates);
                System.out.printf("     Last: %s\n", state.lastUpdate.format(formatter));
            }
        }

        double totalSol = totalBalance / 1_000_000_000.0;
        System.out.printf("\n   Active Accounts: %d/%d\n", activeAccounts, programAccounts.size());
        System.out.printf("   Total Balance: %.9f SOL\n\n", totalSol);
    }

    public static void main(String[] args) {
        String apiKey = System.getenv("FORTIBLOX_API_KEY");
        if (apiKey == null || apiKey.isEmpty()) {
            System.err.println("Error: FORTIBLOX_API_KEY environment variable not set");
            System.exit(1);
        }

        // Program accounts to monitor
        List<String> accounts = Arrays.asList(
            "ProgramAccount1PublicKey...",
            "ProgramAccount2PublicKey...",
            "ProgramAccount3PublicKey..."
        );

        ProgramAccountMonitor monitor = new ProgramAccountMonitor(
            "grpc.fortiblox.com",
            10002,
            apiKey,
            accounts
        );

        monitor.startMonitoring();
    }
}

Run

export FORTIBLOX_API_KEY="your_api_key_here"
mvn compile exec:java -Dexec.mainClass="ProgramAccountMonitor"

Real-World Use Cases

1. Payment Notification System

Monitor a payment wallet and send notifications when payments are received:

const PAYMENT_WALLET = 'YourPaymentWalletAddress...';
const MINIMUM_PAYMENT = 0.1; // 0.1 SOL minimum

stream.write({
  accounts: [PAYMENT_WALLET],
  commitment: 'CONFIRMED'
});

let previousBalance = 0;

stream.on('data', async (update) => {
  if (update.pubkey === PAYMENT_WALLET) {
    const currentBalance = Number(update.lamports) / 1e9;
    const change = currentBalance - previousBalance;

    if (change >= MINIMUM_PAYMENT) {
      console.log(`✅ Payment received: ${change.toFixed(9)} SOL`);

      // Find pending order
      const order = await findPendingOrder(change);

      if (order) {
        await markOrderAsPaid(order.id, {
          amount: change,
          slot: update.slot,
          timestamp: Date.now()
        });

        await sendCustomerNotification(order.customer_email, change);
      }
    }

    previousBalance = currentBalance;
  }
});

2. Low Balance Alert System

Monitor operational wallets and alert when balances drop below thresholds:

OPERATIONAL_WALLETS = {
    'TradingBot...': 10.0,  # Minimum 10 SOL
    'FeeAccount...': 5.0,   # Minimum 5 SOL
    'HotWallet...': 50.0    # Minimum 50 SOL
}

def check_low_balance(update):
    pubkey = update.pubkey
    threshold = OPERATIONAL_WALLETS.get(pubkey)

    if not threshold:
        return

    balance_sol = update.lamports / 1_000_000_000

    if balance_sol < threshold:
        send_urgent_alert(
            title=f"⚠️ Low Balance Warning",
            message=f"Wallet {pubkey[:8]}... has {balance_sol:.9f} SOL "
                   f"(threshold: {threshold:.9f} SOL)",
            severity="HIGH"
        )

        # Auto-refill from cold storage
        if balance_sol < threshold * 0.5:
            trigger_auto_refill(pubkey, threshold * 2)

3. Token Account Portfolio Tracker

Monitor all token accounts for a portfolio with real-time value updates:

async fn track_token_portfolio(token_accounts: Vec<String>) {
    let mut portfolio_value = 0.0;
    let mut token_prices = HashMap::new();

    // Fetch current token prices
    update_token_prices(&mut token_prices).await;

    for update in stream {
        if let Some(token_data) = parse_token_account(&update.data) {
            // Calculate token value
            let price = token_prices.get(&token_data.mint).unwrap_or(&0.0);
            let value = token_data.amount * price;

            println!("Token: {} - Amount: {} - Value: ${:.2}",
                token_data.mint_symbol,
                token_data.amount,
                value
            );

            // Update total portfolio value
            portfolio_value += value;

            // Update dashboard
            update_dashboard(&token_data.mint, token_data.amount, value).await;
        }
    }
}

4. Vote Account Health Monitor

Monitor validator vote accounts for health and performance tracking:

type VoteAccountHealth struct {
    LastVoteSlot     uint64
    TotalVotes       int
    MissedVotes      int
    BalanceChanges   []float64
    HealthScore      float64
}

func monitorVoteAccountHealth(update *pb.AccountUpdate) {
    health := voteAccountHealth[update.Pubkey]

    // Track balance changes
    solBalance := float64(update.Lamports) / 1_000_000_000
    health.BalanceChanges = append(health.BalanceChanges, solBalance)

    // Check for concerning patterns
    if len(health.BalanceChanges) > 10 {
        // Calculate trend
        trend := calculateTrend(health.BalanceChanges)

        if trend < -5.0 {
            // Validator is bleeding SOL
            sendValidatorAlert(
                update.Pubkey,
                "Negative balance trend detected",
                fmt.Sprintf("Trend: %.2f%% over last 10 updates", trend),
            )
        }
    }

    // Calculate health score
    health.HealthScore = calculateHealthScore(health)

    // Update monitoring dashboard
    updateValidatorDashboard(update.Pubkey, health)
}

5. Escrow Account Monitor

Monitor escrow accounts for DeFi protocols or marketplaces:

public class EscrowMonitor {
    private Map<String, EscrowState> escrows = new HashMap<>();

    private void monitorEscrow(AccountUpdate update) {
        String pubkey = update.getPubkey();
        EscrowState escrow = escrows.get(pubkey);

        if (escrow == null) {
            return;
        }

        long lamports = update.getLamports();
        double solBalance = lamports / 1_000_000_000.0;

        // Check if escrow was funded
        if (!escrow.isFunded() && solBalance >= escrow.getRequiredAmount()) {
            System.out.println("✅ Escrow funded: " + pubkey);
            escrow.setFunded(true);

            // Notify parties
            notifyBuyer(escrow.getBuyerId(), "Escrow funded, waiting for seller");
            notifySeller(escrow.getSellerId(), "Payment received, ship item");
        }

        // Check if escrow was released
        if (escrow.isFunded() && solBalance < escrow.getRequiredAmount() * 0.1) {
            System.out.println("✅ Escrow released: " + pubkey);
            escrow.setReleased(true);

            // Complete transaction
            completeTransaction(escrow.getTransactionId());

            // Notify parties
            notifyBuyer(escrow.getBuyerId(), "Transaction complete");
            notifySeller(escrow.getSellerId(), "Payment released");
        }
    }
}

6. NFT Royalty Tracker

Monitor NFT creator accounts for royalty payments:

const NFT_CREATOR_ACCOUNTS = [
  'CreatorAccount1...',
  'CreatorAccount2...',
  'CreatorAccount3...'
];

const royaltyHistory = new Map();

stream.on('data', (update) => {
  const account = update.pubkey;

  if (!NFT_CREATOR_ACCOUNTS.includes(account)) {
    return;
  }

  const currentBalance = Number(update.lamports) / 1e9;
  const history = royaltyHistory.get(account) || { balance: 0, earned: 0 };

  const royaltyPayment = currentBalance - history.balance;

  if (royaltyPayment > 0) {
    history.earned += royaltyPayment;

    console.log(`💰 Royalty Payment Received`);
    console.log(`   Creator: ${account.slice(0, 8)}...`);
    console.log(`   Amount: ${royaltyPayment.toFixed(9)} SOL`);
    console.log(`   Total Earned: ${history.earned.toFixed(9)} SOL`);

    // Update analytics
    updateCreatorAnalytics(account, royaltyPayment);

    // Send notification
    sendCreatorNotification(account, royaltyPayment);
  }

  history.balance = currentBalance;
  royaltyHistory.set(account, history);
});

Performance Optimization

1. Minimize Data Transfer

Only request account data when you need it:

// ❌ BAD - Always includes data (wastes bandwidth)
stream.write({
  accounts: wallets,
  commitment: 'CONFIRMED',
  include_data: true
});

// ✅ GOOD - Only request data when needed
stream.write({
  accounts: wallets,
  commitment: 'CONFIRMED',
  include_data: false  // Just get balance updates
});

2. Batch Account Monitoring

Monitor multiple accounts in a single stream:

// ❌ BAD - Multiple streams for each account
wallets.forEach(wallet => {
  const stream = client.SubscribeAccounts({
    accounts: [wallet],
    commitment: 'CONFIRMED'
  }, metadata);
});

// ✅ GOOD - Single stream for all accounts
const stream = client.SubscribeAccounts({
  accounts: wallets,  // All wallets in one request
  commitment: 'CONFIRMED'
}, metadata);

3. Efficient State Management

Use efficient data structures for tracking account states:

# ✅ GOOD - Dictionary for O(1) lookups
account_states = {}

def process_update(update):
    state = account_states.get(update.pubkey)

    if not state:
        account_states[update.pubkey] = {
            'balance': update.lamports,
            'updates': 1
        }
    else:
        state['balance'] = update.lamports
        state['updates'] += 1

4. Implement Smart Caching

Cache account states to reduce processing:

use std::time::{Duration, Instant};

struct CachedAccountState {
    balance: u64,
    last_update: Instant,
    cache_duration: Duration,
}

impl CachedAccountState {
    fn is_fresh(&self) -> bool {
        self.last_update.elapsed() < self.cache_duration
    }

    fn update(&mut self, balance: u64) {
        self.balance = balance;
        self.last_update = Instant::now();
    }
}

5. Use Connection Pooling

Maintain persistent connections with automatic reconnection:

type ConnectionPool struct {
    connections []*grpc.ClientConn
    current     int
    mutex       sync.Mutex
}

func (cp *ConnectionPool) GetConnection() *grpc.ClientConn {
    cp.mutex.Lock()
    defer cp.mutex.Unlock()

    conn := cp.connections[cp.current]
    cp.current = (cp.current + 1) % len(cp.connections)

    return conn
}

Error Handling

Reconnection with Exponential Backoff

class AccountStreamManager {
  constructor(accounts, apiKey) {
    this.accounts = accounts;
    this.apiKey = apiKey;
    this.retryCount = 0;
    this.maxRetries = 10;
    this.baseDelay = 1000;
  }

  async connect() {
    try {
      this.retryCount = 0;
      await this.createStream();
    } catch (err) {
      await this.handleError(err);
    }
  }

  async handleError(err) {
    this.retryCount++;

    if (this.retryCount >= this.maxRetries) {
      console.error('❌ Max retries reached, giving up');
      return;
    }

    // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, 60s (capped)
    const delay = Math.min(
      this.baseDelay * Math.pow(2, this.retryCount),
      60000
    );

    console.log(`🔄 Retry ${this.retryCount}/${this.maxRetries} in ${delay/1000}s`);
    console.log(`   Error: ${err.message}`);

    await new Promise(resolve => setTimeout(resolve, delay));
    await this.connect();
  }

  async createStream() {
    const call = client.SubscribeAccounts({
      accounts: this.accounts,
      commitment: 'CONFIRMED'
    }, this.getMetadata());

    call.on('data', (update) => this.handleUpdate(update));
    call.on('error', (err) => this.handleError(err));
    call.on('end', () => this.handleError(new Error('Stream ended')));
  }

  getMetadata() {
    const metadata = new grpc.Metadata();
    metadata.add('x-api-key', this.apiKey);
    return metadata;
  }

  handleUpdate(update) {
    // Process account update
  }
}

Graceful Shutdown

import signal
import sys

class AccountStreamMonitor:
    def __init__(self):
        self.running = True
        self.setup_signal_handlers()

    def setup_signal_handlers(self):
        signal.signal(signal.SIGINT, self.shutdown)
        signal.signal(signal.SIGTERM, self.shutdown)

    def shutdown(self, signum, frame):
        print("\n👋 Shutting down gracefully...")
        self.running = False

        # Save state
        self.save_account_states()

        # Close connections
        self.close_connections()

        print("✅ Shutdown complete")
        sys.exit(0)

    def monitor(self):
        while self.running:
            try:
                for update in self.stream:
                    if not self.running:
                        break
                    self.process_update(update)
            except Exception as e:
                if self.running:
                    self.handle_error(e)

Health Checks

public class HealthCheckManager {
    private long lastUpdateTime = System.currentTimeMillis();
    private static final long HEALTH_CHECK_INTERVAL = 30000; // 30 seconds
    private static final long UPDATE_TIMEOUT = 60000; // 60 seconds

    public void recordUpdate() {
        this.lastUpdateTime = System.currentTimeMillis();
    }

    public boolean isHealthy() {
        long timeSinceUpdate = System.currentTimeMillis() - lastUpdateTime;
        return timeSinceUpdate < UPDATE_TIMEOUT;
    }

    public void startHealthCheckMonitor() {
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                if (!isHealthy()) {
                    System.err.println("⚠️  No updates received in " +
                        (System.currentTimeMillis() - lastUpdateTime) / 1000 + " seconds");
                    System.err.println("🔄 Triggering reconnection...");

                    // Trigger reconnection
                    reconnect();
                }
            }
        }, HEALTH_CHECK_INTERVAL, HEALTH_CHECK_INTERVAL);
    }
}

Best Practices

1. Always Specify Accounts

Account streaming requires explicit account addresses:

// ❌ BAD - Empty accounts array
stream.write({
  accounts: [],
  commitment: 'CONFIRMED'
});
// Error: At least one account required

// ✅ GOOD - Specific accounts
stream.write({
  accounts: ['Wallet1...', 'Wallet2...'],
  commitment: 'CONFIRMED'
});

2. Choose Appropriate Commitment

Select commitment level based on your use case:

# Real-time dashboard (speed matters)
request = SubscribeAccountsRequest(
    accounts=wallets,
    commitment=CommitmentLevel.CONFIRMED  # ~1 second
)

# Financial settlement (safety matters)
request = SubscribeAccountsRequest(
    accounts=payment_accounts,
    commitment=CommitmentLevel.FINALIZED  # ~30 seconds
)

3. Implement Proper Error Handling

Always handle stream errors and implement reconnection:

loop {
    match monitor_accounts().await {
        Ok(_) => {
            eprintln!("Stream ended normally");
        }
        Err(e) => {
            eprintln!("❌ Error: {}", e);
            eprintln!("🔄 Reconnecting in 5 seconds...");
            tokio::time::sleep(Duration::from_secs(5)).await;
        }
    }
}

4. Monitor Stream Health

Track updates to detect connection issues:

type StreamHealthMonitor struct {
    lastUpdate    time.Time
    updateCount   int
    expectedRate  float64  // Updates per minute
}

func (shm *StreamHealthMonitor) CheckHealth() bool {
    elapsed := time.Since(shm.lastUpdate).Minutes()

    if elapsed > 5.0 {
        // No updates for 5 minutes
        log.Println("⚠️  Stream appears stalled")
        return false
    }

    actualRate := float64(shm.updateCount) / elapsed

    if actualRate < shm.expectedRate * 0.5 {
        // Update rate dropped significantly
        log.Printf("⚠️  Update rate dropped: %.1f/min (expected: %.1f/min)",
            actualRate, shm.expectedRate)
        return false
    }

    return true
}

5. Use Structured Logging

Implement structured logging for better observability:

const winston = require('winston');

const logger = winston.createLogger({
  format: winston.format.json(),
  transports: [
    new winston.transports.File({ filename: 'account-updates.log' })
  ]
});

call.on('data', (update) => {
  logger.info('account_update', {
    pubkey: update.pubkey,
    lamports: update.lamports,
    slot: update.slot,
    commitment: update.commitment,
    timestamp: Date.now()
  });
});

Troubleshooting

Issue: No Updates Received

Symptoms:

  • Stream connects successfully
  • No account updates received

Solutions:

  1. Verify accounts are active:
# Check if account exists
curl -X POST https://nexus.fortiblox.com/rpc \
  -H "X-API-Key: $FORTIBLOX_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "id": 1,
    "method": "getAccountInfo",
    "params": ["YourAccountAddress..."]
  }'
  1. Check account has activity:
  • Inactive accounts won't generate updates
  • Test with a known active wallet
  1. Verify correct commitment level:
// Try PROCESSED for faster updates
stream.write({
  accounts: wallets,
  commitment: 'PROCESSED'  // Most sensitive
});

Issue: High CPU Usage

Solutions:

  1. Reduce logging verbosity:
# Only log important events
if change > threshold:
    log_update(update)
  1. Optimize data parsing:
// Skip parsing if data not needed
if !needs_parsing(&update) {
    return;
}
  1. Use worker threads:
const { Worker } = require('worker_threads');

call.on('data', (update) => {
  // Offload processing to worker
  worker.postMessage(update);
});

Issue: Memory Leaks

Solutions:

  1. Limit history storage:
# Use circular buffer
from collections import deque

update_history = deque(maxlen=1000)  # Only keep last 1000
  1. Clear old data periodically:
setInterval(() => {
  const cutoff = Date.now() - 3600000; // 1 hour ago

  for (const [key, state] of accountStates) {
    if (state.lastUpdate < cutoff) {
      accountStates.delete(key);
    }
  }
}, 300000); // Clean every 5 minutes

Issue: Connection Drops

Solutions:

  1. Implement keep-alive:
// Send periodic pings
ticker := time.NewTicker(30 * time.Second)

go func() {
    for range ticker.C {
        if err := stream.Send(&pb.PingRequest{}); err != nil {
            log.Println("Keep-alive failed:", err)
            reconnect()
        }
    }
}()
  1. Monitor connection health:
// Set connection timeouts
ManagedChannel channel = ManagedChannelBuilder
    .forAddress("grpc.fortiblox.com", 10002)
    .usePlaintext()
    .keepAliveTime(30, TimeUnit.SECONDS)
    .keepAliveTimeout(10, TimeUnit.SECONDS)
    .build();

Advanced Patterns

Multi-Region Failover

const ENDPOINTS = [
  'grpc.fortiblox.com:10002',
  'grpc-backup.fortiblox.com:10002'
];

let currentEndpoint = 0;

async function connectWithFailover() {
  while (currentEndpoint < ENDPOINTS.length) {
    try {
      const client = new proto.geyser.Geyser(
        ENDPOINTS[currentEndpoint],
        grpc.credentials.createInsecure()
      );

      return client;
    } catch (err) {
      console.error(`Failed to connect to ${ENDPOINTS[currentEndpoint]}`);
      currentEndpoint++;
    }
  }

  throw new Error('All endpoints failed');
}

State Persistence

import json
import os

class PersistentAccountTracker:
    def __init__(self, state_file='account_states.json'):
        self.state_file = state_file
        self.states = self.load_state()

    def load_state(self):
        """Load state from disk"""
        if os.path.exists(self.state_file):
            with open(self.state_file, 'r') as f:
                return json.load(f)
        return {}

    def save_state(self):
        """Save state to disk"""
        with open(self.state_file, 'w') as f:
            json.dump(self.states, f, indent=2)

    def update_account(self, pubkey, lamports):
        """Update account and persist"""
        self.states[pubkey] = {
            'balance': lamports,
            'last_update': datetime.now().isoformat()
        }
        self.save_state()

Rate Limiting

use std::time::{Duration, Instant};

struct RateLimiter {
    max_updates_per_second: u32,
    updates_this_second: u32,
    last_reset: Instant,
}

impl RateLimiter {
    fn new(max_updates_per_second: u32) -> Self {
        Self {
            max_updates_per_second,
            updates_this_second: 0,
            last_reset: Instant::now(),
        }
    }

    fn should_process(&mut self) -> bool {
        // Reset counter every second
        if self.last_reset.elapsed() >= Duration::from_secs(1) {
            self.updates_this_second = 0;
            self.last_reset = Instant::now();
        }

        if self.updates_this_second < self.max_updates_per_second {
            self.updates_this_second += 1;
            true
        } else {
            false
        }
    }
}

Next Steps

Support


Technical Details:

  • Redis Stream: geyser:accounts
  • Current Entries: 26,000+
  • Freshness: Real-time (<100ms)
  • Protocol: HTTP/2 plaintext
  • Endpoint: grpc.fortiblox.com:10002
  • Authentication: SHA-256 via x-api-key header