Account Streaming
Stream real-time account updates from X1 Blockchain with sub-second latency
Account Streaming
Subscribe to real-time account updates from FortiBlox gRPC streaming. Monitor wallet balances, track token account changes, watch vote accounts, and build sophisticated blockchain applications with live account state updates.
Fully Operational: Account streaming is live with 26,000+ entries in Redis (geyser:accounts) providing real-time balance and data updates.
Overview
Account streaming allows you to:
- Monitor wallet balances - Track SOL balance changes in real-time
- Watch token accounts - Stream token balance updates for SPL tokens
- Track program accounts - Monitor program-owned account state changes
- Vote account monitoring - Watch validator vote account activity
- Real-time notifications - Build instant alerts for account changes
- Sub-second latency - Get updates from Redis pub/sub in <100ms
Data Source
- Redis Stream:
geyser:accounts - Current Entries: 26,000+
- Freshness: Real-time (<100ms)
- Endpoint:
grpc.fortiblox.com:10002(HTTP/2 plaintext)
Why Use Account Streaming?
Traditional polling wastes resources and introduces latency:
// ❌ BAD: Polling every second
setInterval(async () => {
const balance = await connection.getBalance(wallet);
console.log(`Current balance: ${balance}`);
}, 1000);Account streaming provides instant updates only when accounts change:
// ✅ GOOD: Real-time streaming
stream.write({
accounts: {
'wallet_watch': {
account: ['YourWalletAddress...']
}
},
commitment: 'CONFIRMED'
});
stream.on('data', (update) => {
if (update.account) {
console.log(`Balance changed: ${update.account.lamports}`);
}
});Request Parameters
SubscribeAccounts Filter Options
| Parameter | Type | Required | Description | Example |
|---|---|---|---|---|
account | string[] | Yes | Account public keys to watch | ['9B5Xs...', 'vines1...'] |
commitment | enum | No | Commitment level (default: CONFIRMED) | CONFIRMED or FINALIZED |
include_data | bool | No | Include account data in updates (default: false) | true |
Required Parameter: You must specify at least one account pubkey to watch. Empty account lists will result in an error.
Commitment Levels
| Level | Description | Use Case | Latency |
|---|---|---|---|
PROCESSED | Optimistically confirmed by cluster | Real-time monitoring | ~400ms |
CONFIRMED | Voted on by supermajority | Most applications | ~1s |
FINALIZED | Finalized by cluster | Critical balance checks | ~30s |
Recommended: Use CONFIRMED commitment for most applications. It provides a good balance between speed and certainty.
Response Fields
AccountUpdate Message
| Field | Type | Description |
|---|---|---|
pubkey | string | Account public key (base58) |
owner | string | Owner program public key |
lamports | uint64 | Account balance in lamports (1 SOL = 1B lamports) |
slot | uint64 | Slot number when update occurred |
data | bytes | Account data (if requested) |
commitment | enum | Commitment level of this update |
Complete Code Examples
Wallet Balance Monitor
// wallet-balance-monitor.js
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
// Load proto
const packageDefinition = protoLoader.loadSync('geyser.proto', {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const proto = grpc.loadPackageDefinition(packageDefinition);
// Create client (plaintext/insecure)
const client = new proto.geyser.Geyser(
'grpc.fortiblox.com:10002',
grpc.credentials.createInsecure()
);
// Add API key metadata
const metadata = new grpc.Metadata();
metadata.add('x-api-key', process.env.FORTIBLOX_API_KEY);
// Wallets to monitor
const WATCHED_WALLETS = [
'9B5XszUGdMaxCZ7uSQhPzdks5ZQSmWxrmzCSvtJ6Ns6g',
'vines1vzrYbzLMRdu58ou5XTby4qAqVRLmqo36NKPTg',
'AjozzgE83A3x1sHNUR64hfH7zaEBWeMaFuAN9kQgujrc'
];
// Track balance history
const balanceHistory = new Map();
let updateCount = 0;
// Initialize balance history
WATCHED_WALLETS.forEach(wallet => {
balanceHistory.set(wallet, {
current: 0,
previous: 0,
changes: 0,
lastUpdate: null
});
});
// Create subscription request
const request = {
accounts: WATCHED_WALLETS,
commitment: 'CONFIRMED',
include_data: false
};
// Start streaming
const call = client.SubscribeAccounts(request, metadata);
console.log('🚀 Wallet balance monitor started');
console.log(' Endpoint: grpc.fortiblox.com:10002');
console.log(` Watching: ${WATCHED_WALLETS.length} wallets`);
console.log(' Commitment: CONFIRMED\n');
// Process account updates
call.on('data', (update) => {
updateCount++;
const wallet = update.pubkey;
const lamports = BigInt(update.lamports);
const sol = Number(lamports) / 1_000_000_000;
// Get balance history
const history = balanceHistory.get(wallet);
if (!history) return;
// Calculate change
const previousSol = history.current;
const changeSol = sol - previousSol;
const changePercent = previousSol > 0
? ((changeSol / previousSol) * 100).toFixed(2)
: 0;
// Update history
history.previous = history.current;
history.current = sol;
history.changes++;
history.lastUpdate = new Date();
// Determine change type
let changeIcon = '💰';
let changeType = 'unchanged';
if (changeSol > 0) {
changeIcon = '📈';
changeType = 'increase';
} else if (changeSol < 0) {
changeIcon = '📉';
changeType = 'decrease';
}
// Log update
console.log(`\n${changeIcon} Balance Update #${updateCount}`);
console.log(` Wallet: ${wallet.slice(0, 8)}...${wallet.slice(-4)}`);
console.log(` Balance: ${sol.toFixed(9)} SOL`);
if (changeSol !== 0) {
console.log(` Change: ${changeSol > 0 ? '+' : ''}${changeSol.toFixed(9)} SOL (${changePercent}%)`);
}
console.log(` Slot: ${update.slot}`);
console.log(` Owner: ${update.owner.slice(0, 20)}...`);
console.log(` Updates: ${history.changes}`);
// Alert on large changes
if (Math.abs(changeSol) > 1.0) {
console.log(`\n🚨 LARGE CHANGE DETECTED!`);
console.log(` Amount: ${changeSol > 0 ? '+' : ''}${changeSol.toFixed(9)} SOL`);
console.log(` Wallet: ${wallet}`);
// Send notification (webhook, email, etc.)
sendAlert(wallet, changeSol, changeType);
}
// Show summary every 10 updates
if (updateCount % 10 === 0) {
printSummary();
}
});
function printSummary() {
console.log(`\n📊 Summary Statistics:`);
console.log(` Total Updates: ${updateCount}`);
let totalBalance = 0;
let totalChanges = 0;
balanceHistory.forEach((history, wallet) => {
totalBalance += history.current;
totalChanges += history.changes;
if (history.changes > 0) {
console.log(`\n ${wallet.slice(0, 8)}...:`);
console.log(` Balance: ${history.current.toFixed(9)} SOL`);
console.log(` Changes: ${history.changes}`);
console.log(` Last: ${history.lastUpdate?.toLocaleTimeString()}`);
}
});
console.log(`\n Total Portfolio: ${totalBalance.toFixed(9)} SOL`);
console.log(` Total Changes: ${totalChanges}\n`);
}
function sendAlert(wallet, change, type) {
// Implement your alert logic here
// Examples:
// - Send webhook to Discord/Slack
// - Send email notification
// - Trigger SMS alert
// - Update dashboard
console.log(`🔔 Alert sent for ${wallet}`);
}
// Error handling with reconnection
call.on('error', (err) => {
console.error('❌ Stream error:', err.message);
if (err.code === grpc.status.UNAVAILABLE) {
console.log('🔄 Reconnecting in 5 seconds...');
setTimeout(() => {
console.log('🔌 Attempting reconnection...');
// Restart stream (same logic as above)
}, 5000);
}
});
call.on('end', () => {
console.log('Stream ended, reconnecting...');
setTimeout(() => reconnect(), 1000);
});
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\n👋 Shutting down...');
printSummary();
console.log(`\nFinal stats: ${updateCount} updates processed`);
call.end();
process.exit(0);
});Run
export FORTIBLOX_API_KEY="your_api_key_here"
node wallet-balance-monitor.jsMulti-Account Dashboard
// dashboard.js
const express = require('express');
const app = express();
// Store latest account states
const accountStates = new Map();
// Set up gRPC streaming (as above)
call.on('data', (update) => {
accountStates.set(update.pubkey, {
lamports: update.lamports,
sol: Number(update.lamports) / 1e9,
slot: update.slot,
owner: update.owner,
lastUpdate: Date.now()
});
});
// REST API endpoint
app.get('/api/accounts', (req, res) => {
const accounts = Array.from(accountStates.entries()).map(([pubkey, state]) => ({
pubkey,
...state
}));
res.json({
count: accounts.length,
accounts
});
});
app.get('/api/accounts/:pubkey', (req, res) => {
const state = accountStates.get(req.params.pubkey);
if (!state) {
return res.status(404).json({ error: 'Account not found' });
}
res.json({
pubkey: req.params.pubkey,
...state
});
});
app.listen(3000, () => {
console.log('📊 Dashboard API running on http://localhost:3000');
});Token Account Tracker
# token_account_tracker.py
import grpc
import os
import base64
from datetime import datetime
from decimal import Decimal
import geyser_pb2
import geyser_pb2_grpc
class TokenAccountTracker:
def __init__(self, token_accounts, api_key):
self.token_accounts = token_accounts
self.api_key = api_key
self.update_count = 0
self.account_states = {}
# Initialize states
for account in token_accounts:
self.account_states[account] = {
'balance': 0,
'previous_balance': 0,
'updates': 0,
'last_update': None
}
def connect(self):
"""Create insecure channel (plaintext)"""
channel = grpc.insecure_channel('grpc.fortiblox.com:10002')
stub = geyser_pb2_grpc.GeyserStub(channel)
return stub
def monitor(self):
print(f"🚀 Token account tracker started")
print(f" Endpoint: grpc.fortiblox.com:10002")
print(f" Watching: {len(self.token_accounts)} accounts")
print(f" Commitment: CONFIRMED\n")
stub = self.connect()
metadata = [('x-api-key', self.api_key)]
# Create subscription request
request = geyser_pb2.SubscribeAccountsRequest(
accounts=self.token_accounts,
commitment=geyser_pb2.CommitmentLevel.CONFIRMED,
include_data=True # Include data to parse token amounts
)
try:
for update in stub.SubscribeAccounts(request, metadata=metadata):
self.process_account_update(update)
except grpc.RpcError as e:
print(f"❌ gRPC error: {e.code()} - {e.details()}")
print("🔄 Reconnecting in 5 seconds...")
import time
time.sleep(5)
self.monitor() # Reconnect
except KeyboardInterrupt:
print("\n👋 Shutting down...")
self.print_summary()
print(f"Final stats: {self.update_count} updates processed")
def process_account_update(self, update):
self.update_count += 1
pubkey = update.pubkey
lamports = update.lamports
slot = update.slot
owner = update.owner
# Get account state
state = self.account_states.get(pubkey)
if not state:
return
# Update state
state['previous_balance'] = state['balance']
state['balance'] = lamports
state['updates'] += 1
state['last_update'] = datetime.now()
# Calculate change
change = lamports - state['previous_balance']
sol_balance = lamports / 1_000_000_000
sol_change = change / 1_000_000_000
# Determine change type
if change > 0:
change_icon = "📈"
change_type = "INCREASE"
elif change < 0:
change_icon = "📉"
change_type = "DECREASE"
else:
change_icon = "💰"
change_type = "UNCHANGED"
# Parse token data if available
token_info = None
if update.data:
token_info = self.parse_token_account(update.data)
print(f"\n{change_icon} Account Update #{self.update_count}")
print(f" Account: {pubkey[:8]}...{pubkey[-4:]}")
print(f" Balance: {sol_balance:.9f} SOL")
if change != 0:
print(f" Change: {'+' if change > 0 else ''}{sol_change:.9f} SOL")
print(f" Slot: {slot}")
print(f" Owner: {owner[:20]}...")
print(f" Updates: {state['updates']}")
# Show token info if parsed
if token_info:
print(f"\n Token Info:")
print(f" Mint: {token_info['mint'][:8]}...")
print(f" Amount: {token_info['amount']}")
print(f" Decimals: {token_info['decimals']}")
# Alert on large changes
if abs(sol_change) > 0.1:
self.send_alert(pubkey, sol_change, change_type)
# Show summary every 10 updates
if self.update_count % 10 == 0:
self.print_summary()
def parse_token_account(self, data):
"""Parse SPL token account data"""
try:
# Token account layout (165 bytes):
# mint: 32 bytes (offset 0)
# owner: 32 bytes (offset 32)
# amount: 8 bytes (offset 64)
# delegate: 36 bytes optional (offset 72)
# state: 1 byte (offset 108)
# is_native: 12 bytes optional (offset 109)
# delegated_amount: 8 bytes (offset 121)
# close_authority: 36 bytes optional (offset 129)
mint = base64.b64encode(data[0:32]).decode('utf-8')
amount = int.from_bytes(data[64:72], byteorder='little')
# Determine decimals (usually 6 for USDC, 9 for SOL)
# In real implementation, fetch from mint account
decimals = 9
return {
'mint': mint,
'amount': amount / (10 ** decimals),
'decimals': decimals
}
except Exception as e:
print(f" ⚠️ Failed to parse token data: {e}")
return None
def send_alert(self, account, change, change_type):
"""Send alert for large changes"""
print(f"\n🚨 LARGE CHANGE DETECTED!")
print(f" Account: {account}")
print(f" Change: {'+' if change > 0 else ''}{change:.9f} SOL")
print(f" Type: {change_type}")
# Implement your alert logic here
# - Webhook to Discord/Slack
# - Email notification
# - SMS alert
# - Database logging
def print_summary(self):
"""Print summary statistics"""
print(f"\n📊 Summary Statistics:")
print(f" Total Updates: {self.update_count}")
total_balance = 0
active_accounts = 0
for pubkey, state in self.account_states.items():
balance_sol = state['balance'] / 1_000_000_000
total_balance += balance_sol
if state['updates'] > 0:
active_accounts += 1
print(f"\n {pubkey[:8]}...:")
print(f" Balance: {balance_sol:.9f} SOL")
print(f" Updates: {state['updates']}")
if state['last_update']:
print(f" Last: {state['last_update'].strftime('%H:%M:%S')}")
print(f"\n Active Accounts: {active_accounts}/{len(self.token_accounts)}")
print(f" Total Balance: {total_balance:.9f} SOL\n")
if __name__ == '__main__':
api_key = os.getenv('FORTIBLOX_API_KEY')
if not api_key:
print("Error: FORTIBLOX_API_KEY environment variable not set")
exit(1)
# Token accounts to monitor
token_accounts = [
'TokenAccount1PublicKey...',
'TokenAccount2PublicKey...',
'TokenAccount3PublicKey...'
]
tracker = TokenAccountTracker(token_accounts, api_key)
tracker.monitor()Setup & Run
# Install dependencies
pip install grpcio grpcio-tools
# Download and compile proto
curl -O https://raw.githubusercontent.com/rpcpool/yellowstone-grpc/master/yellowstone-grpc-proto/proto/geyser.proto
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. geyser.proto
# Run
export FORTIBLOX_API_KEY="your_api_key_here"
python token_account_tracker.pyBalance Alert System
# balance_alerts.py
import smtplib
from email.mime.text import MIMEText
class BalanceAlertSystem:
def __init__(self, thresholds):
self.thresholds = thresholds # {account: min_balance}
def check_balance(self, account, balance_sol):
"""Check if balance is below threshold"""
threshold = self.thresholds.get(account)
if threshold and balance_sol < threshold:
self.send_low_balance_alert(account, balance_sol, threshold)
def send_low_balance_alert(self, account, balance, threshold):
"""Send low balance alert"""
print(f"\n⚠️ LOW BALANCE ALERT!")
print(f" Account: {account}")
print(f" Balance: {balance:.9f} SOL")
print(f" Threshold: {threshold:.9f} SOL")
# Send email
self.send_email_alert(account, balance, threshold)
# Send webhook
self.send_webhook_alert(account, balance, threshold)
def send_email_alert(self, account, balance, threshold):
"""Send email notification"""
msg = MIMEText(f"""
Low Balance Alert
Account: {account}
Current Balance: {balance:.9f} SOL
Threshold: {threshold:.9f} SOL
Please add funds to maintain operations.
""")
msg['Subject'] = f'⚠️ Low Balance Alert: {account[:8]}...'
msg['From'] = '[email protected]'
msg['To'] = '[email protected]'
# Send email (configure SMTP)
# smtp = smtplib.SMTP('smtp.gmail.com', 587)
# smtp.send_message(msg)
# smtp.quit()
def send_webhook_alert(self, account, balance, threshold):
"""Send webhook to Discord/Slack"""
import requests
webhook_url = "https://discord.com/api/webhooks/YOUR_WEBHOOK"
data = {
"content": f"⚠️ **Low Balance Alert**\n"
f"Account: `{account[:8]}...`\n"
f"Balance: {balance:.9f} SOL\n"
f"Threshold: {threshold:.9f} SOL"
}
# requests.post(webhook_url, json=data)Vote Account Monitor
// src/main.rs
use tonic::{transport::Channel, metadata::MetadataValue, Request};
use tokio_stream::StreamExt;
use std::collections::HashMap;
use chrono::{DateTime, Utc};
pub mod geyser {
tonic::include_proto!("geyser");
}
use geyser::{
geyser_client::GeyserClient,
SubscribeAccountsRequest,
CommitmentLevel
};
#[derive(Debug, Clone)]
struct AccountState {
balance: u64,
previous_balance: u64,
updates: u64,
last_update: DateTime<Utc>,
owner: String,
}
struct VoteAccountMonitor {
vote_accounts: Vec<String>,
account_states: HashMap<String, AccountState>,
update_count: u64,
}
impl VoteAccountMonitor {
fn new(vote_accounts: Vec<String>) -> Self {
Self {
vote_accounts,
account_states: HashMap::new(),
update_count: 0,
}
}
fn process_update(&mut self, update: geyser::AccountUpdate) {
self.update_count += 1;
let pubkey = update.pubkey.clone();
let lamports = update.lamports;
let slot = update.slot;
let owner = update.owner;
// Get or create account state
let state = self.account_states.entry(pubkey.clone())
.or_insert(AccountState {
balance: 0,
previous_balance: 0,
updates: 0,
last_update: Utc::now(),
owner: owner.clone(),
});
// Update state
state.previous_balance = state.balance;
state.balance = lamports;
state.updates += 1;
state.last_update = Utc::now();
// Calculate change
let change = lamports as i128 - state.previous_balance as i128;
let sol_balance = lamports as f64 / 1_000_000_000.0;
let sol_change = change as f64 / 1_000_000_000.0;
// Determine change type
let (icon, change_type) = if change > 0 {
("📈", "INCREASE")
} else if change < 0 {
("📉", "DECREASE")
} else {
("💰", "UNCHANGED")
};
// Log update
println!("\n{} Vote Account Update #{}", icon, self.update_count);
println!(" Account: {}...{}", &pubkey[..8], &pubkey[pubkey.len()-4..]);
println!(" Balance: {:.9} SOL", sol_balance);
if change != 0 {
println!(" Change: {:+.9} SOL", sol_change);
}
println!(" Slot: {}", slot);
println!(" Owner: {}...", &owner[..20]);
println!(" Updates: {}", state.updates);
// Alert on significant changes
if sol_change.abs() > 5.0 {
self.send_alert(&pubkey, sol_change, change_type);
}
// Show summary every 10 updates
if self.update_count % 10 == 0 {
self.print_summary();
}
}
fn send_alert(&self, account: &str, change: f64, change_type: &str) {
println!("\n🚨 SIGNIFICANT CHANGE DETECTED!");
println!(" Account: {}", account);
println!(" Change: {:+.9} SOL", change);
println!(" Type: {}", change_type);
// Implement your alert logic here
// - Send to monitoring system
// - Log to database
// - Trigger webhook
}
fn print_summary(&self) {
println!("\n📊 Summary Statistics:");
println!(" Total Updates: {}", self.update_count);
let mut total_balance = 0u64;
let mut active_accounts = 0;
for (pubkey, state) in &self.account_states {
total_balance += state.balance;
if state.updates > 0 {
active_accounts += 1;
let sol_balance = state.balance as f64 / 1_000_000_000.0;
println!("\n {}...:", &pubkey[..8]);
println!(" Balance: {:.9} SOL", sol_balance);
println!(" Updates: {}", state.updates);
println!(" Last: {}", state.last_update.format("%H:%M:%S"));
}
}
let total_sol = total_balance as f64 / 1_000_000_000.0;
println!("\n Active Accounts: {}/{}", active_accounts, self.vote_accounts.len());
println!(" Total Staked: {:.9} SOL\n", total_sol);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 Vote account monitor started");
println!(" Endpoint: grpc.fortiblox.com:10002");
println!(" Commitment: CONFIRMED\n");
// Vote accounts to monitor
let vote_accounts = vec![
"VoteAccount1PublicKey...".to_string(),
"VoteAccount2PublicKey...".to_string(),
"VoteAccount3PublicKey...".to_string(),
];
let mut monitor = VoteAccountMonitor::new(vote_accounts.clone());
loop {
if let Err(e) = monitor_accounts(&mut monitor, &vote_accounts).await {
eprintln!("❌ Error: {}", e);
eprintln!("🔄 Reconnecting in 5 seconds...");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
async fn monitor_accounts(
monitor: &mut VoteAccountMonitor,
vote_accounts: &[String]
) -> Result<(), Box<dyn std::error::Error>> {
// Create channel (insecure/plaintext)
let channel = Channel::from_static("http://grpc.fortiblox.com:10002")
.connect()
.await?;
let mut client = GeyserClient::new(channel);
// Create request with API key
let api_key = std::env::var("FORTIBLOX_API_KEY")?;
let token = MetadataValue::try_from(api_key)?;
let mut request = Request::new(SubscribeAccountsRequest {
accounts: vote_accounts.to_vec(),
commitment: Some(CommitmentLevel::Confirmed as i32),
include_data: Some(false),
});
request.metadata_mut().insert("x-api-key", token);
// Subscribe
let mut stream = client.subscribe_accounts(request).await?.into_inner();
// Process updates
while let Some(update) = stream.next().await {
match update {
Ok(account_update) => {
monitor.process_update(account_update);
}
Err(e) => {
eprintln!("Stream error: {}", e);
return Err(Box::new(e));
}
}
}
Ok(())
}Cargo.toml
[package]
name = "vote-account-monitor"
version = "0.1.0"
edition = "2021"
[dependencies]
tonic = "0.11"
prost = "0.12"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
chrono = "0.4"
[build-dependencies]
tonic-build = "0.11"Run
export FORTIBLOX_API_KEY="your_api_key_here"
cargo run --releaseMulti-Wallet Portfolio Tracker
// main.go
package main
import (
"context"
"fmt"
"io"
"log"
"os"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
pb "your-module/geyser"
)
type AccountState struct {
Balance uint64
PreviousBalance uint64
Updates int
LastUpdate time.Time
Owner string
}
type PortfolioTracker struct {
Accounts []string
States map[string]*AccountState
UpdateCount int
TotalValue float64
}
func NewPortfolioTracker(accounts []string) *PortfolioTracker {
states := make(map[string]*AccountState)
for _, account := range accounts {
states[account] = &AccountState{
Balance: 0,
PreviousBalance: 0,
Updates: 0,
LastUpdate: time.Now(),
}
}
return &PortfolioTracker{
Accounts: accounts,
States: states,
UpdateCount: 0,
TotalValue: 0,
}
}
func main() {
fmt.Println("🚀 Portfolio tracker started")
fmt.Println(" Endpoint: grpc.fortiblox.com:10002")
fmt.Println(" Commitment: CONFIRMED\n")
apiKey := os.Getenv("FORTIBLOX_API_KEY")
if apiKey == "" {
log.Fatal("FORTIBLOX_API_KEY environment variable not set")
}
// Accounts to track
accounts := []string{
"Wallet1PublicKey...",
"Wallet2PublicKey...",
"Wallet3PublicKey...",
}
tracker := NewPortfolioTracker(accounts)
fmt.Printf(" Watching: %d wallets\n\n", len(accounts))
for {
if err := tracker.Monitor(apiKey); err != nil {
log.Printf("❌ Error: %v", err)
log.Println("🔄 Reconnecting in 5 seconds...")
time.Sleep(5 * time.Second)
}
}
}
func (pt *PortfolioTracker) Monitor(apiKey string) error {
// Create insecure connection (plaintext)
conn, err := grpc.Dial(
"grpc.fortiblox.com:10002",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return fmt.Errorf("failed to connect: %w", err)
}
defer conn.Close()
// Create client
client := pb.NewGeyserClient(conn)
// Add API key to context
md := metadata.New(map[string]string{
"x-api-key": apiKey,
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
// Create subscription request
req := &pb.SubscribeAccountsRequest{
Accounts: pt.Accounts,
Commitment: pb.CommitmentLevel_CONFIRMED,
IncludeData: false,
}
// Subscribe
stream, err := client.SubscribeAccounts(ctx, req)
if err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
}
// Receive updates
for {
update, err := stream.Recv()
if err == io.EOF {
return fmt.Errorf("stream ended")
}
if err != nil {
return fmt.Errorf("stream error: %w", err)
}
pt.ProcessUpdate(update)
}
}
func (pt *PortfolioTracker) ProcessUpdate(update *pb.AccountUpdate) {
pt.UpdateCount++
pubkey := update.Pubkey
lamports := update.Lamports
slot := update.Slot
owner := update.Owner
// Get account state
state, exists := pt.States[pubkey]
if !exists {
return
}
// Update state
state.PreviousBalance = state.Balance
state.Balance = lamports
state.Updates++
state.LastUpdate = time.Now()
state.Owner = owner
// Calculate change
change := int64(lamports) - int64(state.PreviousBalance)
solBalance := float64(lamports) / 1_000_000_000
solChange := float64(change) / 1_000_000_000
// Determine change type
var icon string
var changeType string
if change > 0 {
icon = "📈"
changeType = "INCREASE"
} else if change < 0 {
icon = "📉"
changeType = "DECREASE"
} else {
icon = "💰"
changeType = "UNCHANGED"
}
// Log update
fmt.Printf("\n%s Account Update #%d\n", icon, pt.UpdateCount)
fmt.Printf(" Account: %s...%s\n", pubkey[:8], pubkey[len(pubkey)-4:])
fmt.Printf(" Balance: %.9f SOL\n", solBalance)
if change != 0 {
sign := ""
if change > 0 {
sign = "+"
}
fmt.Printf(" Change: %s%.9f SOL\n", sign, solChange)
}
fmt.Printf(" Slot: %d\n", slot)
fmt.Printf(" Owner: %s...\n", owner[:20])
fmt.Printf(" Updates: %d\n", state.Updates)
// Update total portfolio value
pt.CalculateTotalValue()
// Alert on large changes
if solChange > 10.0 || solChange < -10.0 {
pt.SendAlert(pubkey, solChange, changeType)
}
// Show summary every 10 updates
if pt.UpdateCount%10 == 0 {
pt.PrintSummary()
}
}
func (pt *PortfolioTracker) CalculateTotalValue() {
total := 0.0
for _, state := range pt.States {
total += float64(state.Balance) / 1_000_000_000
}
pt.TotalValue = total
}
func (pt *PortfolioTracker) SendAlert(account string, change float64, changeType string) {
fmt.Println("\n🚨 LARGE CHANGE DETECTED!")
fmt.Printf(" Account: %s\n", account)
fmt.Printf(" Change: %+.9f SOL\n", change)
fmt.Printf(" Type: %s\n", changeType)
// Implement your alert logic here
// - Send webhook
// - Log to database
// - Send email/SMS
}
func (pt *PortfolioTracker) PrintSummary() {
fmt.Println("\n📊 Portfolio Summary:")
fmt.Printf(" Total Updates: %d\n", pt.UpdateCount)
fmt.Printf(" Total Value: %.9f SOL\n", pt.TotalValue)
activeAccounts := 0
for pubkey, state := range pt.States {
if state.Updates > 0 {
activeAccounts++
solBalance := float64(state.Balance) / 1_000_000_000
fmt.Printf("\n %s...:\n", pubkey[:8])
fmt.Printf(" Balance: %.9f SOL\n", solBalance)
fmt.Printf(" Updates: %d\n", state.Updates)
fmt.Printf(" Last: %s\n", state.LastUpdate.Format("15:04:05"))
}
}
fmt.Printf("\n Active Accounts: %d/%d\n", activeAccounts, len(pt.Accounts))
fmt.Println()
}Run
export FORTIBLOX_API_KEY="your_api_key_here"
go run main.goProgram Account Monitor
// ProgramAccountMonitor.java
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
public class ProgramAccountMonitor {
private final GeyserGrpc.GeyserStub asyncStub;
private final List<String> programAccounts;
private final Map<String, AccountState> accountStates;
private int updateCount = 0;
static class AccountState {
long balance;
long previousBalance;
int updates;
LocalDateTime lastUpdate;
String owner;
AccountState() {
this.balance = 0;
this.previousBalance = 0;
this.updates = 0;
this.lastUpdate = LocalDateTime.now();
this.owner = "";
}
}
public ProgramAccountMonitor(String host, int port, String apiKey, List<String> accounts) {
// Create insecure channel (plaintext)
ManagedChannel channel = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext()
.build();
// Add API key metadata
Metadata metadata = new Metadata();
Metadata.Key<String> key = Metadata.Key.of("x-api-key", Metadata.ASCII_STRING_MARSHALLER);
metadata.put(key, apiKey);
this.asyncStub = GeyserGrpc.newStub(channel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));
this.programAccounts = accounts;
this.accountStates = new HashMap<>();
// Initialize account states
for (String account : accounts) {
accountStates.put(account, new AccountState());
}
}
public void startMonitoring() {
System.out.println("🚀 Program account monitor started");
System.out.println(" Endpoint: grpc.fortiblox.com:10002");
System.out.println(" Watching: " + programAccounts.size() + " accounts");
System.out.println(" Commitment: CONFIRMED\n");
// Create subscription request
SubscribeAccountsRequest request = SubscribeAccountsRequest.newBuilder()
.addAllAccounts(programAccounts)
.setCommitment(CommitmentLevel.CONFIRMED)
.setIncludeData(false)
.build();
// Subscribe
asyncStub.subscribeAccounts(request, new StreamObserver<AccountUpdate>() {
@Override
public void onNext(AccountUpdate update) {
processAccountUpdate(update);
}
@Override
public void onError(Throwable t) {
System.err.println("❌ Stream error: " + t.getMessage());
System.out.println("🔄 Reconnecting in 5 seconds...");
try {
Thread.sleep(5000);
startMonitoring(); // Reconnect
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void onCompleted() {
System.out.println("Stream completed");
}
});
// Keep alive
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
System.out.println("\n👋 Shutting down...");
printSummary();
}
}
private void processAccountUpdate(AccountUpdate update) {
updateCount++;
String pubkey = update.getPubkey();
long lamports = update.getLamports();
long slot = update.getSlot();
String owner = update.getOwner();
// Get account state
AccountState state = accountStates.get(pubkey);
if (state == null) {
return;
}
// Update state
state.previousBalance = state.balance;
state.balance = lamports;
state.updates++;
state.lastUpdate = LocalDateTime.now();
state.owner = owner;
// Calculate change
long change = lamports - state.previousBalance;
double solBalance = lamports / 1_000_000_000.0;
double solChange = change / 1_000_000_000.0;
// Determine change type
String icon;
String changeType;
if (change > 0) {
icon = "📈";
changeType = "INCREASE";
} else if (change < 0) {
icon = "📉";
changeType = "DECREASE";
} else {
icon = "💰";
changeType = "UNCHANGED";
}
// Log update
System.out.printf("\n%s Account Update #%d\n", icon, updateCount);
System.out.printf(" Account: %s...%s\n",
pubkey.substring(0, 8),
pubkey.substring(pubkey.length() - 4));
System.out.printf(" Balance: %.9f SOL\n", solBalance);
if (change != 0) {
System.out.printf(" Change: %+.9f SOL\n", solChange);
}
System.out.printf(" Slot: %d\n", slot);
System.out.printf(" Owner: %s...\n", owner.substring(0, Math.min(20, owner.length())));
System.out.printf(" Updates: %d\n", state.updates);
// Alert on large changes
if (Math.abs(solChange) > 1.0) {
sendAlert(pubkey, solChange, changeType);
}
// Show summary every 10 updates
if (updateCount % 10 == 0) {
printSummary();
}
}
private void sendAlert(String account, double change, String changeType) {
System.out.println("\n🚨 LARGE CHANGE DETECTED!");
System.out.printf(" Account: %s\n", account);
System.out.printf(" Change: %+.9f SOL\n", change);
System.out.printf(" Type: %s\n", changeType);
// Implement your alert logic here
// - Send webhook
// - Log to database
// - Send email/SMS
}
private void printSummary() {
System.out.println("\n📊 Summary Statistics:");
System.out.printf(" Total Updates: %d\n", updateCount);
long totalBalance = 0;
int activeAccounts = 0;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
for (Map.Entry<String, AccountState> entry : accountStates.entrySet()) {
String pubkey = entry.getKey();
AccountState state = entry.getValue();
totalBalance += state.balance;
if (state.updates > 0) {
activeAccounts++;
double solBalance = state.balance / 1_000_000_000.0;
System.out.printf("\n %s...:\n", pubkey.substring(0, 8));
System.out.printf(" Balance: %.9f SOL\n", solBalance);
System.out.printf(" Updates: %d\n", state.updates);
System.out.printf(" Last: %s\n", state.lastUpdate.format(formatter));
}
}
double totalSol = totalBalance / 1_000_000_000.0;
System.out.printf("\n Active Accounts: %d/%d\n", activeAccounts, programAccounts.size());
System.out.printf(" Total Balance: %.9f SOL\n\n", totalSol);
}
public static void main(String[] args) {
String apiKey = System.getenv("FORTIBLOX_API_KEY");
if (apiKey == null || apiKey.isEmpty()) {
System.err.println("Error: FORTIBLOX_API_KEY environment variable not set");
System.exit(1);
}
// Program accounts to monitor
List<String> accounts = Arrays.asList(
"ProgramAccount1PublicKey...",
"ProgramAccount2PublicKey...",
"ProgramAccount3PublicKey..."
);
ProgramAccountMonitor monitor = new ProgramAccountMonitor(
"grpc.fortiblox.com",
10002,
apiKey,
accounts
);
monitor.startMonitoring();
}
}Run
export FORTIBLOX_API_KEY="your_api_key_here"
mvn compile exec:java -Dexec.mainClass="ProgramAccountMonitor"Real-World Use Cases
1. Payment Notification System
Monitor a payment wallet and send notifications when payments are received:
const PAYMENT_WALLET = 'YourPaymentWalletAddress...';
const MINIMUM_PAYMENT = 0.1; // 0.1 SOL minimum
stream.write({
accounts: [PAYMENT_WALLET],
commitment: 'CONFIRMED'
});
let previousBalance = 0;
stream.on('data', async (update) => {
if (update.pubkey === PAYMENT_WALLET) {
const currentBalance = Number(update.lamports) / 1e9;
const change = currentBalance - previousBalance;
if (change >= MINIMUM_PAYMENT) {
console.log(`✅ Payment received: ${change.toFixed(9)} SOL`);
// Find pending order
const order = await findPendingOrder(change);
if (order) {
await markOrderAsPaid(order.id, {
amount: change,
slot: update.slot,
timestamp: Date.now()
});
await sendCustomerNotification(order.customer_email, change);
}
}
previousBalance = currentBalance;
}
});2. Low Balance Alert System
Monitor operational wallets and alert when balances drop below thresholds:
OPERATIONAL_WALLETS = {
'TradingBot...': 10.0, # Minimum 10 SOL
'FeeAccount...': 5.0, # Minimum 5 SOL
'HotWallet...': 50.0 # Minimum 50 SOL
}
def check_low_balance(update):
pubkey = update.pubkey
threshold = OPERATIONAL_WALLETS.get(pubkey)
if not threshold:
return
balance_sol = update.lamports / 1_000_000_000
if balance_sol < threshold:
send_urgent_alert(
title=f"⚠️ Low Balance Warning",
message=f"Wallet {pubkey[:8]}... has {balance_sol:.9f} SOL "
f"(threshold: {threshold:.9f} SOL)",
severity="HIGH"
)
# Auto-refill from cold storage
if balance_sol < threshold * 0.5:
trigger_auto_refill(pubkey, threshold * 2)3. Token Account Portfolio Tracker
Monitor all token accounts for a portfolio with real-time value updates:
async fn track_token_portfolio(token_accounts: Vec<String>) {
let mut portfolio_value = 0.0;
let mut token_prices = HashMap::new();
// Fetch current token prices
update_token_prices(&mut token_prices).await;
for update in stream {
if let Some(token_data) = parse_token_account(&update.data) {
// Calculate token value
let price = token_prices.get(&token_data.mint).unwrap_or(&0.0);
let value = token_data.amount * price;
println!("Token: {} - Amount: {} - Value: ${:.2}",
token_data.mint_symbol,
token_data.amount,
value
);
// Update total portfolio value
portfolio_value += value;
// Update dashboard
update_dashboard(&token_data.mint, token_data.amount, value).await;
}
}
}4. Vote Account Health Monitor
Monitor validator vote accounts for health and performance tracking:
type VoteAccountHealth struct {
LastVoteSlot uint64
TotalVotes int
MissedVotes int
BalanceChanges []float64
HealthScore float64
}
func monitorVoteAccountHealth(update *pb.AccountUpdate) {
health := voteAccountHealth[update.Pubkey]
// Track balance changes
solBalance := float64(update.Lamports) / 1_000_000_000
health.BalanceChanges = append(health.BalanceChanges, solBalance)
// Check for concerning patterns
if len(health.BalanceChanges) > 10 {
// Calculate trend
trend := calculateTrend(health.BalanceChanges)
if trend < -5.0 {
// Validator is bleeding SOL
sendValidatorAlert(
update.Pubkey,
"Negative balance trend detected",
fmt.Sprintf("Trend: %.2f%% over last 10 updates", trend),
)
}
}
// Calculate health score
health.HealthScore = calculateHealthScore(health)
// Update monitoring dashboard
updateValidatorDashboard(update.Pubkey, health)
}5. Escrow Account Monitor
Monitor escrow accounts for DeFi protocols or marketplaces:
public class EscrowMonitor {
private Map<String, EscrowState> escrows = new HashMap<>();
private void monitorEscrow(AccountUpdate update) {
String pubkey = update.getPubkey();
EscrowState escrow = escrows.get(pubkey);
if (escrow == null) {
return;
}
long lamports = update.getLamports();
double solBalance = lamports / 1_000_000_000.0;
// Check if escrow was funded
if (!escrow.isFunded() && solBalance >= escrow.getRequiredAmount()) {
System.out.println("✅ Escrow funded: " + pubkey);
escrow.setFunded(true);
// Notify parties
notifyBuyer(escrow.getBuyerId(), "Escrow funded, waiting for seller");
notifySeller(escrow.getSellerId(), "Payment received, ship item");
}
// Check if escrow was released
if (escrow.isFunded() && solBalance < escrow.getRequiredAmount() * 0.1) {
System.out.println("✅ Escrow released: " + pubkey);
escrow.setReleased(true);
// Complete transaction
completeTransaction(escrow.getTransactionId());
// Notify parties
notifyBuyer(escrow.getBuyerId(), "Transaction complete");
notifySeller(escrow.getSellerId(), "Payment released");
}
}
}6. NFT Royalty Tracker
Monitor NFT creator accounts for royalty payments:
const NFT_CREATOR_ACCOUNTS = [
'CreatorAccount1...',
'CreatorAccount2...',
'CreatorAccount3...'
];
const royaltyHistory = new Map();
stream.on('data', (update) => {
const account = update.pubkey;
if (!NFT_CREATOR_ACCOUNTS.includes(account)) {
return;
}
const currentBalance = Number(update.lamports) / 1e9;
const history = royaltyHistory.get(account) || { balance: 0, earned: 0 };
const royaltyPayment = currentBalance - history.balance;
if (royaltyPayment > 0) {
history.earned += royaltyPayment;
console.log(`💰 Royalty Payment Received`);
console.log(` Creator: ${account.slice(0, 8)}...`);
console.log(` Amount: ${royaltyPayment.toFixed(9)} SOL`);
console.log(` Total Earned: ${history.earned.toFixed(9)} SOL`);
// Update analytics
updateCreatorAnalytics(account, royaltyPayment);
// Send notification
sendCreatorNotification(account, royaltyPayment);
}
history.balance = currentBalance;
royaltyHistory.set(account, history);
});Performance Optimization
1. Minimize Data Transfer
Only request account data when you need it:
// ❌ BAD - Always includes data (wastes bandwidth)
stream.write({
accounts: wallets,
commitment: 'CONFIRMED',
include_data: true
});
// ✅ GOOD - Only request data when needed
stream.write({
accounts: wallets,
commitment: 'CONFIRMED',
include_data: false // Just get balance updates
});2. Batch Account Monitoring
Monitor multiple accounts in a single stream:
// ❌ BAD - Multiple streams for each account
wallets.forEach(wallet => {
const stream = client.SubscribeAccounts({
accounts: [wallet],
commitment: 'CONFIRMED'
}, metadata);
});
// ✅ GOOD - Single stream for all accounts
const stream = client.SubscribeAccounts({
accounts: wallets, // All wallets in one request
commitment: 'CONFIRMED'
}, metadata);3. Efficient State Management
Use efficient data structures for tracking account states:
# ✅ GOOD - Dictionary for O(1) lookups
account_states = {}
def process_update(update):
state = account_states.get(update.pubkey)
if not state:
account_states[update.pubkey] = {
'balance': update.lamports,
'updates': 1
}
else:
state['balance'] = update.lamports
state['updates'] += 14. Implement Smart Caching
Cache account states to reduce processing:
use std::time::{Duration, Instant};
struct CachedAccountState {
balance: u64,
last_update: Instant,
cache_duration: Duration,
}
impl CachedAccountState {
fn is_fresh(&self) -> bool {
self.last_update.elapsed() < self.cache_duration
}
fn update(&mut self, balance: u64) {
self.balance = balance;
self.last_update = Instant::now();
}
}5. Use Connection Pooling
Maintain persistent connections with automatic reconnection:
type ConnectionPool struct {
connections []*grpc.ClientConn
current int
mutex sync.Mutex
}
func (cp *ConnectionPool) GetConnection() *grpc.ClientConn {
cp.mutex.Lock()
defer cp.mutex.Unlock()
conn := cp.connections[cp.current]
cp.current = (cp.current + 1) % len(cp.connections)
return conn
}Error Handling
Reconnection with Exponential Backoff
class AccountStreamManager {
constructor(accounts, apiKey) {
this.accounts = accounts;
this.apiKey = apiKey;
this.retryCount = 0;
this.maxRetries = 10;
this.baseDelay = 1000;
}
async connect() {
try {
this.retryCount = 0;
await this.createStream();
} catch (err) {
await this.handleError(err);
}
}
async handleError(err) {
this.retryCount++;
if (this.retryCount >= this.maxRetries) {
console.error('❌ Max retries reached, giving up');
return;
}
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, 60s (capped)
const delay = Math.min(
this.baseDelay * Math.pow(2, this.retryCount),
60000
);
console.log(`🔄 Retry ${this.retryCount}/${this.maxRetries} in ${delay/1000}s`);
console.log(` Error: ${err.message}`);
await new Promise(resolve => setTimeout(resolve, delay));
await this.connect();
}
async createStream() {
const call = client.SubscribeAccounts({
accounts: this.accounts,
commitment: 'CONFIRMED'
}, this.getMetadata());
call.on('data', (update) => this.handleUpdate(update));
call.on('error', (err) => this.handleError(err));
call.on('end', () => this.handleError(new Error('Stream ended')));
}
getMetadata() {
const metadata = new grpc.Metadata();
metadata.add('x-api-key', this.apiKey);
return metadata;
}
handleUpdate(update) {
// Process account update
}
}Graceful Shutdown
import signal
import sys
class AccountStreamMonitor:
def __init__(self):
self.running = True
self.setup_signal_handlers()
def setup_signal_handlers(self):
signal.signal(signal.SIGINT, self.shutdown)
signal.signal(signal.SIGTERM, self.shutdown)
def shutdown(self, signum, frame):
print("\n👋 Shutting down gracefully...")
self.running = False
# Save state
self.save_account_states()
# Close connections
self.close_connections()
print("✅ Shutdown complete")
sys.exit(0)
def monitor(self):
while self.running:
try:
for update in self.stream:
if not self.running:
break
self.process_update(update)
except Exception as e:
if self.running:
self.handle_error(e)Health Checks
public class HealthCheckManager {
private long lastUpdateTime = System.currentTimeMillis();
private static final long HEALTH_CHECK_INTERVAL = 30000; // 30 seconds
private static final long UPDATE_TIMEOUT = 60000; // 60 seconds
public void recordUpdate() {
this.lastUpdateTime = System.currentTimeMillis();
}
public boolean isHealthy() {
long timeSinceUpdate = System.currentTimeMillis() - lastUpdateTime;
return timeSinceUpdate < UPDATE_TIMEOUT;
}
public void startHealthCheckMonitor() {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (!isHealthy()) {
System.err.println("⚠️ No updates received in " +
(System.currentTimeMillis() - lastUpdateTime) / 1000 + " seconds");
System.err.println("🔄 Triggering reconnection...");
// Trigger reconnection
reconnect();
}
}
}, HEALTH_CHECK_INTERVAL, HEALTH_CHECK_INTERVAL);
}
}Best Practices
1. Always Specify Accounts
Account streaming requires explicit account addresses:
// ❌ BAD - Empty accounts array
stream.write({
accounts: [],
commitment: 'CONFIRMED'
});
// Error: At least one account required
// ✅ GOOD - Specific accounts
stream.write({
accounts: ['Wallet1...', 'Wallet2...'],
commitment: 'CONFIRMED'
});2. Choose Appropriate Commitment
Select commitment level based on your use case:
# Real-time dashboard (speed matters)
request = SubscribeAccountsRequest(
accounts=wallets,
commitment=CommitmentLevel.CONFIRMED # ~1 second
)
# Financial settlement (safety matters)
request = SubscribeAccountsRequest(
accounts=payment_accounts,
commitment=CommitmentLevel.FINALIZED # ~30 seconds
)3. Implement Proper Error Handling
Always handle stream errors and implement reconnection:
loop {
match monitor_accounts().await {
Ok(_) => {
eprintln!("Stream ended normally");
}
Err(e) => {
eprintln!("❌ Error: {}", e);
eprintln!("🔄 Reconnecting in 5 seconds...");
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}4. Monitor Stream Health
Track updates to detect connection issues:
type StreamHealthMonitor struct {
lastUpdate time.Time
updateCount int
expectedRate float64 // Updates per minute
}
func (shm *StreamHealthMonitor) CheckHealth() bool {
elapsed := time.Since(shm.lastUpdate).Minutes()
if elapsed > 5.0 {
// No updates for 5 minutes
log.Println("⚠️ Stream appears stalled")
return false
}
actualRate := float64(shm.updateCount) / elapsed
if actualRate < shm.expectedRate * 0.5 {
// Update rate dropped significantly
log.Printf("⚠️ Update rate dropped: %.1f/min (expected: %.1f/min)",
actualRate, shm.expectedRate)
return false
}
return true
}5. Use Structured Logging
Implement structured logging for better observability:
const winston = require('winston');
const logger = winston.createLogger({
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: 'account-updates.log' })
]
});
call.on('data', (update) => {
logger.info('account_update', {
pubkey: update.pubkey,
lamports: update.lamports,
slot: update.slot,
commitment: update.commitment,
timestamp: Date.now()
});
});Troubleshooting
Issue: No Updates Received
Symptoms:
- Stream connects successfully
- No account updates received
Solutions:
- Verify accounts are active:
# Check if account exists
curl -X POST https://nexus.fortiblox.com/rpc \
-H "X-API-Key: $FORTIBLOX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"id": 1,
"method": "getAccountInfo",
"params": ["YourAccountAddress..."]
}'- Check account has activity:
- Inactive accounts won't generate updates
- Test with a known active wallet
- Verify correct commitment level:
// Try PROCESSED for faster updates
stream.write({
accounts: wallets,
commitment: 'PROCESSED' // Most sensitive
});Issue: High CPU Usage
Solutions:
- Reduce logging verbosity:
# Only log important events
if change > threshold:
log_update(update)- Optimize data parsing:
// Skip parsing if data not needed
if !needs_parsing(&update) {
return;
}- Use worker threads:
const { Worker } = require('worker_threads');
call.on('data', (update) => {
// Offload processing to worker
worker.postMessage(update);
});Issue: Memory Leaks
Solutions:
- Limit history storage:
# Use circular buffer
from collections import deque
update_history = deque(maxlen=1000) # Only keep last 1000- Clear old data periodically:
setInterval(() => {
const cutoff = Date.now() - 3600000; // 1 hour ago
for (const [key, state] of accountStates) {
if (state.lastUpdate < cutoff) {
accountStates.delete(key);
}
}
}, 300000); // Clean every 5 minutesIssue: Connection Drops
Solutions:
- Implement keep-alive:
// Send periodic pings
ticker := time.NewTicker(30 * time.Second)
go func() {
for range ticker.C {
if err := stream.Send(&pb.PingRequest{}); err != nil {
log.Println("Keep-alive failed:", err)
reconnect()
}
}
}()- Monitor connection health:
// Set connection timeouts
ManagedChannel channel = ManagedChannelBuilder
.forAddress("grpc.fortiblox.com", 10002)
.usePlaintext()
.keepAliveTime(30, TimeUnit.SECONDS)
.keepAliveTimeout(10, TimeUnit.SECONDS)
.build();Advanced Patterns
Multi-Region Failover
const ENDPOINTS = [
'grpc.fortiblox.com:10002',
'grpc-backup.fortiblox.com:10002'
];
let currentEndpoint = 0;
async function connectWithFailover() {
while (currentEndpoint < ENDPOINTS.length) {
try {
const client = new proto.geyser.Geyser(
ENDPOINTS[currentEndpoint],
grpc.credentials.createInsecure()
);
return client;
} catch (err) {
console.error(`Failed to connect to ${ENDPOINTS[currentEndpoint]}`);
currentEndpoint++;
}
}
throw new Error('All endpoints failed');
}State Persistence
import json
import os
class PersistentAccountTracker:
def __init__(self, state_file='account_states.json'):
self.state_file = state_file
self.states = self.load_state()
def load_state(self):
"""Load state from disk"""
if os.path.exists(self.state_file):
with open(self.state_file, 'r') as f:
return json.load(f)
return {}
def save_state(self):
"""Save state to disk"""
with open(self.state_file, 'w') as f:
json.dump(self.states, f, indent=2)
def update_account(self, pubkey, lamports):
"""Update account and persist"""
self.states[pubkey] = {
'balance': lamports,
'last_update': datetime.now().isoformat()
}
self.save_state()Rate Limiting
use std::time::{Duration, Instant};
struct RateLimiter {
max_updates_per_second: u32,
updates_this_second: u32,
last_reset: Instant,
}
impl RateLimiter {
fn new(max_updates_per_second: u32) -> Self {
Self {
max_updates_per_second,
updates_this_second: 0,
last_reset: Instant::now(),
}
}
fn should_process(&mut self) -> bool {
// Reset counter every second
if self.last_reset.elapsed() >= Duration::from_secs(1) {
self.updates_this_second = 0;
self.last_reset = Instant::now();
}
if self.updates_this_second < self.max_updates_per_second {
self.updates_this_second += 1;
true
} else {
false
}
}
}Next Steps
Transaction Streaming
Stream real-time transaction updates
Slot Streaming
Monitor blockchain slot progression
Getting Started
gRPC setup and installation guide
Historical Replay
Replay historical account states
Support
- Discord: discord.gg/fortiblox
- Email: [email protected]
- Status: status.fortiblox.com
- Examples: github.com/fortiblox/grpc-examples
Technical Details:
- Redis Stream: geyser:accounts
- Current Entries: 26,000+
- Freshness: Real-time (<100ms)
- Protocol: HTTP/2 plaintext
- Endpoint: grpc.fortiblox.com:10002
- Authentication: SHA-256 via x-api-key header