FortiBlox LogoFortiBlox Docs
NexusgRPC Streaming

Transaction Streaming

Stream real-time X1 Blockchain transactions with advanced filtering and comprehensive examples

Transaction Streaming

Subscribe to real-time transaction updates from FortiBlox gRPC streaming with powerful filtering options. Monitor specific accounts, track program interactions, and build sophisticated blockchain applications with <100ms latency.

Fully Operational: Transaction streaming is live with 262,000+ entries in Redis (geyser:transactions) providing real-time updates.

Overview

Transaction streaming allows you to:

  • Monitor specific accounts - Track wallet activity, program interactions
  • Filter by program - Watch specific smart contract activity
  • Exclude vote transactions - Focus on meaningful transactions
  • Choose commitment levels - Balance speed vs finality
  • Real-time processing - Sub-second latency from Redis pub/sub

Data Source

  • Redis Stream: geyser:transactions
  • Current Entries: 262,000+
  • Freshness: Real-time (<100ms)
  • Endpoint: grpc.fortiblox.com:10002 (HTTP/2 plaintext)

Request Parameters

SubscribeTransactions Filter Options

ParameterTypeDescriptionExample
account_includestring[]Include transactions touching these accounts (OR logic)['JUP6...', 'TokenkegQ...']
account_excludestring[]Exclude transactions touching these accounts['Vote111...']
account_requiredstring[]Require ALL these accounts (AND logic)['Wallet1...', 'Wallet2...']
voteboolInclude vote transactions (default: true)false (recommended)
failedboolInclude failed transactions (default: true)false
commitmentenumCommitment levelCONFIRMED or FINALIZED
start_slotuint64Start from this slot (0 = real-time only)11163364
end_slotuint64End at this slot (0 = continue indefinitely)0

Commitment Levels

LevelDescriptionUse CaseLatency
PROCESSEDOptimistically confirmed by clusterReal-time monitoring~400ms
CONFIRMEDVoted on by supermajorityMost applications~1s
FINALIZEDFinalized by clusterHigh-value transactions~30s

Recommended: Use CONFIRMED commitment for most applications. It provides a good balance between speed and certainty.

Complete Code Examples

Monitor Jupiter Swaps

// jupiter-monitor.js
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');

const JUPITER_PROGRAM = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4';

// Load proto
const packageDefinition = protoLoader.loadSync('geyser.proto', {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true
});

const proto = grpc.loadPackageDefinition(packageDefinition);

// Create client (plaintext/insecure)
const client = new proto.geyser.Geyser(
  'grpc.fortiblox.com:10002',
  grpc.credentials.createInsecure()
);

// Add API key metadata
const metadata = new grpc.Metadata();
metadata.add('x-api-key', process.env.FORTIBLOX_API_KEY);

// Create bidirectional stream
const stream = client.subscribe(metadata);

// Subscribe to Jupiter transactions
stream.write({
  transactions: {
    'jupiter_swaps': {
      accountInclude: [JUPITER_PROGRAM],
      vote: false,
      failed: false
    }
  },
  commitment: 'CONFIRMED'
});

// Track metrics
let swapCount = 0;
let totalVolume = 0;
const startTime = Date.now();

// Process swaps
stream.on('data', (update) => {
  if (update.transaction) {
    const tx = update.transaction;
    swapCount++;

    // Extract signature
    const signature = Buffer.from(tx.transaction.signature).toString('base64');

    console.log(`\n🔄 Jupiter Swap #${swapCount}`);
    console.log(`   Signature: ${signature.slice(0, 20)}...`);
    console.log(`   Slot: ${tx.slot}`);
    console.log(`   Accounts: ${tx.transaction.message.accountKeys.length}`);

    // Parse swap details
    const swapData = parseJupiterSwap(tx);
    if (swapData) {
      console.log(`   Input: ${swapData.inputAmount} ${swapData.inputToken}`);
      console.log(`   Output: ${swapData.outputAmount} ${swapData.outputToken}`);
      totalVolume += swapData.volumeUSD;
    }

    // Show metrics every 10 swaps
    if (swapCount % 10 === 0) {
      const elapsed = (Date.now() - startTime) / 1000;
      const swapsPerMin = (swapCount / elapsed) * 60;

      console.log(`\n📊 Metrics:`);
      console.log(`   Total Swaps: ${swapCount}`);
      console.log(`   Rate: ${swapsPerMin.toFixed(1)} swaps/min`);
      console.log(`   Total Volume: $${totalVolume.toFixed(2)}\n`);
    }
  }
});

function parseJupiterSwap(tx) {
  // Parse transaction data
  // This is simplified - actual parsing depends on Jupiter's instruction format
  try {
    const instructions = tx.transaction.message.instructions;
    // Extract swap details from instructions
    return {
      inputToken: 'USDC',
      outputToken: 'SOL',
      inputAmount: 100,
      outputAmount: 0.5,
      volumeUSD: 100
    };
  } catch (err) {
    console.error('Parse error:', err.message);
    return null;
  }
}

// Error handling with reconnection
stream.on('error', (err) => {
  console.error('❌ Stream error:', err.message);
  if (err.code === grpc.status.UNAVAILABLE) {
    console.log('🔄 Reconnecting in 5 seconds...');
    setTimeout(() => reconnect(), 5000);
  }
});

stream.on('end', () => {
  console.log('Stream ended, reconnecting...');
  setTimeout(() => reconnect(), 1000);
});

function reconnect() {
  console.log('🔌 Reconnecting...');
  // Create new stream and resubscribe
  // ... (same logic as above)
}

// Graceful shutdown
process.on('SIGINT', () => {
  console.log('\n👋 Shutting down...');
  console.log(`Final stats: ${swapCount} swaps processed`);
  stream.end();
  process.exit(0);
});

console.log('🚀 Jupiter swap monitor started');
console.log('   Endpoint: grpc.fortiblox.com:10002');
console.log('   Program: JUP6...');
console.log('   Commitment: CONFIRMED\n');

Run

export FORTIBLOX_API_KEY="your_api_key_here"
node jupiter-monitor.js

Wallet Activity Monitor

# wallet_monitor.py
import grpc
import os
import base64
from datetime import datetime
import geyser_pb2
import geyser_pb2_grpc

class WalletMonitor:
    def __init__(self, wallet_address, api_key):
        self.wallet_address = wallet_address
        self.api_key = api_key
        self.tx_count = 0
        self.total_fees = 0

    def connect(self):
        # Create insecure channel (plaintext)
        channel = grpc.insecure_channel('grpc.fortiblox.com:10002')
        stub = geyser_pb2_grpc.GeyserStub(channel)
        return stub

    def request_generator(self):
        """Generate subscription request"""
        yield geyser_pb2.SubscribeRequest(
            transactions={
                'wallet_txs': geyser_pb2.SubscribeRequestFilterTransactions(
                    account_include=[self.wallet_address],
                    vote=False,
                    failed=False
                )
            },
            commitment=geyser_pb2.CommitmentLevel.CONFIRMED
        )

    def monitor(self):
        print(f"🚀 Monitoring wallet: {self.wallet_address[:8]}...")
        print(f"   Endpoint: grpc.fortiblox.com:10002")
        print(f"   Commitment: CONFIRMED\n")

        stub = self.connect()
        metadata = [('x-api-key', self.api_key)]

        try:
            for update in stub.Subscribe(self.request_generator(), metadata=metadata):
                if update.HasField('transaction'):
                    self.process_transaction(update.transaction)

        except grpc.RpcError as e:
            print(f"❌ gRPC error: {e.code()} - {e.details()}")
            print("🔄 Reconnecting in 5 seconds...")
            import time
            time.sleep(5)
            self.monitor()  # Reconnect

        except KeyboardInterrupt:
            print("\n👋 Shutting down...")
            print(f"Final stats: {self.tx_count} transactions processed")

    def process_transaction(self, tx):
        self.tx_count += 1

        # Extract signature
        sig = base64.b64encode(tx.transaction.signature).decode('utf-8')

        # Get timestamp
        timestamp = datetime.fromtimestamp(tx.transaction.block_time).strftime('%H:%M:%S')

        # Extract fee
        fee_lamports = tx.transaction.meta.fee if tx.transaction.HasField('meta') else 0
        fee_sol = fee_lamports / 1_000_000_000
        self.total_fees += fee_sol

        # Determine transaction type
        tx_type = self.determine_tx_type(tx)

        print(f"\n💳 Transaction #{self.tx_count}")
        print(f"   Signature: {sig[:20]}...")
        print(f"   Time: {timestamp}")
        print(f"   Slot: {tx.slot}")
        print(f"   Type: {tx_type}")
        print(f"   Fee: {fee_sol:.6f} SOL")
        print(f"   Success: {'✅' if not tx.transaction.meta.err else '❌'}")

        # Show cumulative stats every 10 transactions
        if self.tx_count % 10 == 0:
            print(f"\n📊 Stats:")
            print(f"   Total Transactions: {self.tx_count}")
            print(f"   Total Fees: {self.total_fees:.6f} SOL")
            print(f"   Average Fee: {(self.total_fees/self.tx_count):.6f} SOL\n")

    def determine_tx_type(self, tx):
        """Determine transaction type from instructions"""
        # Simplified - actual implementation would parse instructions
        instructions = tx.transaction.message.instructions
        if len(instructions) == 0:
            return "Unknown"

        # Check for common program IDs
        program_ids = [ix.program_id_index for ix in instructions]
        # Map program IDs to types
        # ... (implementation specific)

        return "Transfer"  # Default

if __name__ == '__main__':
    api_key = os.getenv('FORTIBLOX_API_KEY')
    if not api_key:
        print("Error: FORTIBLOX_API_KEY environment variable not set")
        exit(1)

    # Replace with your wallet address
    wallet = '9B5XszUGdMaxCZ7uSQhPzdks5ZQSmWxrmzCSvtJ6Ns6g'

    monitor = WalletMonitor(wallet, api_key)
    monitor.monitor()

Setup & Run

# Install dependencies
pip install grpcio grpcio-tools

# Download and compile proto
curl -O https://raw.githubusercontent.com/rpcpool/yellowstone-grpc/master/yellowstone-grpc-proto/proto/geyser.proto
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. geyser.proto

# Run
export FORTIBLOX_API_KEY="your_api_key_here"
python wallet_monitor.py

Token Transfer Monitor

// src/main.rs
use tonic::{transport::Channel, metadata::MetadataValue, Request};
use tokio_stream::StreamExt;
use std::collections::HashMap;

pub mod geyser {
    tonic::include_proto!("geyser");
}

use geyser::{geyser_client::GeyserClient, SubscribeRequest};

const TOKEN_PROGRAM: &str = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA";

struct TransferMetrics {
    transfer_count: u64,
    total_volume: u64,
    token_types: HashMap<String, u64>,
}

impl TransferMetrics {
    fn new() -> Self {
        Self {
            transfer_count: 0,
            total_volume: 0,
            token_types: HashMap::new(),
        }
    }

    fn process_transfer(&mut self, token: String, amount: u64) {
        self.transfer_count += 1;
        self.total_volume += amount;

        *self.token_types.entry(token).or_insert(0) += 1;

        // Print metrics every 100 transfers
        if self.transfer_count % 100 == 0 {
            self.print_metrics();
        }
    }

    fn print_metrics(&self) {
        println!("\n📊 Transfer Metrics:");
        println!("   Total Transfers: {}", self.transfer_count);
        println!("   Total Volume: {} tokens", self.total_volume);
        println!("   Unique Tokens: {}", self.token_types.len());

        // Top 5 tokens by transfer count
        let mut sorted: Vec<_> = self.token_types.iter().collect();
        sorted.sort_by(|a, b| b.1.cmp(a.1));

        println!("\n   Top Tokens:");
        for (token, count) in sorted.iter().take(5) {
            println!("     {} - {} transfers", &token[..8], count);
        }
        println!();
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("🚀 Token transfer monitor started");
    println!("   Endpoint: grpc.fortiblox.com:10002");
    println!("   Program: Token Program");
    println!("   Commitment: CONFIRMED\n");

    let mut metrics = TransferMetrics::new();

    loop {
        if let Err(e) = monitor_transfers(&mut metrics).await {
            eprintln!("❌ Error: {}", e);
            eprintln!("🔄 Reconnecting in 5 seconds...");
            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
        }
    }
}

async fn monitor_transfers(metrics: &mut TransferMetrics) -> Result<(), Box<dyn std::error::Error>> {
    // Create channel (insecure/plaintext)
    let channel = Channel::from_static("http://grpc.fortiblox.com:10002")
        .connect()
        .await?;

    let mut client = GeyserClient::new(channel);

    // Create request with API key
    let api_key = std::env::var("FORTIBLOX_API_KEY")?;
    let token = MetadataValue::try_from(api_key)?;

    let mut request = Request::new(tokio_stream::iter(vec![
        SubscribeRequest {
            transactions: std::collections::HashMap::from([(
                "token_transfers".to_string(),
                geyser::SubscribeRequestFilterTransactions {
                    account_include: vec![TOKEN_PROGRAM.to_string()],
                    vote: Some(false),
                    failed: Some(false),
                    ..Default::default()
                }
            )]),
            commitment: Some(geyser::CommitmentLevel::Confirmed as i32),
            ..Default::default()
        }
    ]));

    request.metadata_mut().insert("x-api-key", token);

    // Subscribe
    let mut stream = client.subscribe(request).await?.into_inner();

    // Process updates
    while let Some(update) = stream.next().await {
        match update {
            Ok(msg) => {
                if let Some(tx) = msg.transaction {
                    process_transaction(tx, metrics);
                }
            }
            Err(e) => {
                eprintln!("Stream error: {}", e);
                return Err(Box::new(e));
            }
        }
    }

    Ok(())
}

fn process_transaction(
    tx: geyser::SubscribeUpdateTransaction,
    metrics: &mut TransferMetrics
) {
    if let Some(transaction) = tx.transaction {
        let sig = hex::encode(&transaction.signature[..8]);

        println!("\n💸 Token Transfer");
        println!("   Signature: {}...", sig);
        println!("   Slot: {}", tx.slot);

        // Parse transfer details
        if let Some(transfer) = parse_token_transfer(&transaction) {
            println!("   From: {}...", &transfer.from[..8]);
            println!("   To: {}...", &transfer.to[..8]);
            println!("   Amount: {} {}", transfer.amount, transfer.token);

            metrics.process_transfer(transfer.token, transfer.amount);
        }
    }
}

struct TokenTransfer {
    from: String,
    to: String,
    amount: u64,
    token: String,
}

fn parse_token_transfer(tx: &geyser::Transaction) -> Option<TokenTransfer> {
    // Simplified parser - actual implementation would decode instructions
    Some(TokenTransfer {
        from: "FromAddress...".to_string(),
        to: "ToAddress...".to_string(),
        amount: 1000000,
        token: "USDC".to_string(),
    })
}

Cargo.toml

[dependencies]
tonic = "0.11"
prost = "0.12"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
hex = "0.4"

[build-dependencies]
tonic-build = "0.11"

Run

export FORTIBLOX_API_KEY="your_api_key_here"
cargo run --release

NFT Sales Monitor

// main.go
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "os"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/metadata"

    pb "your-module/geyser"
)

const (
    MAGIC_EDEN = "M2mx93ekt1fmXSVkTrUL9xVFHkmME8HTUi5Cyc5aF7K"
    TENSOR     = "TSWAPaqyCSx2KABk68Shruf4rp7CxcNi8hAsbdwmHbN"
)

type NFTSale struct {
    Signature string
    Mint      string
    Price     float64
    Buyer     string
    Seller    string
    Slot      uint64
}

type SalesMetrics struct {
    TotalSales  int
    TotalVolume float64
    SalesByMarketplace map[string]int
}

func main() {
    fmt.Println("🚀 NFT sales monitor started")
    fmt.Println("   Endpoint: grpc.fortiblox.com:10002")
    fmt.Println("   Marketplaces: Magic Eden, Tensor")
    fmt.Println("   Commitment: CONFIRMED\n")

    apiKey := os.Getenv("FORTIBLOX_API_KEY")
    if apiKey == "" {
        log.Fatal("FORTIBLOX_API_KEY environment variable not set")
    }

    metrics := &SalesMetrics{
        SalesByMarketplace: make(map[string]int),
    }

    for {
        if err := monitorNFTSales(apiKey, metrics); err != nil {
            log.Printf("❌ Error: %v", err)
            log.Println("🔄 Reconnecting in 5 seconds...")
            time.Sleep(5 * time.Second)
        }
    }
}

func monitorNFTSales(apiKey string, metrics *SalesMetrics) error {
    // Create insecure connection (plaintext)
    conn, err := grpc.Dial(
        "grpc.fortiblox.com:10002",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        return fmt.Errorf("failed to connect: %w", err)
    }
    defer conn.Close()

    // Create client
    client := pb.NewGeyserClient(conn)

    // Add API key to context
    md := metadata.New(map[string]string{
        "x-api-key": apiKey,
    })
    ctx := metadata.NewOutgoingContext(context.Background(), md)

    // Create stream
    stream, err := client.Subscribe(ctx)
    if err != nil {
        return fmt.Errorf("failed to subscribe: %w", err)
    }

    // Send subscription request for both marketplaces
    req := &pb.SubscribeRequest{
        Transactions: map[string]*pb.SubscribeRequestFilterTransactions{
            "nft_sales": {
                AccountInclude: []string{MAGIC_EDEN, TENSOR},
                Vote:           false,
                Failed:         false,
            },
        },
        Commitment: pb.CommitmentLevel_CONFIRMED,
    }

    if err := stream.Send(req); err != nil {
        return fmt.Errorf("failed to send request: %w", err)
    }

    // Receive updates
    for {
        update, err := stream.Recv()
        if err == io.EOF {
            return fmt.Errorf("stream ended")
        }
        if err != nil {
            return fmt.Errorf("stream error: %w", err)
        }

        if tx := update.GetTransaction(); tx != nil {
            processNFTSale(tx, metrics)
        }
    }
}

func processNFTSale(tx *pb.SubscribeUpdateTransaction, metrics *SalesMetrics) {
    // Detect marketplace
    marketplace := detectMarketplace(tx)
    if marketplace == "" {
        return
    }

    // Parse sale details
    sale := parseNFTSale(tx, marketplace)
    if sale == nil {
        return
    }

    // Update metrics
    metrics.TotalSales++
    metrics.TotalVolume += sale.Price
    metrics.SalesByMarketplace[marketplace]++

    // Print sale info
    fmt.Printf("\n🎨 NFT Sale #%d\n", metrics.TotalSales)
    fmt.Printf("   Marketplace: %s\n", marketplace)
    fmt.Printf("   NFT: %s...\n", sale.Mint[:8])
    fmt.Printf("   Price: %.2f SOL\n", sale.Price)
    fmt.Printf("   Buyer: %s...\n", sale.Buyer[:8])
    fmt.Printf("   Signature: %s...\n", sale.Signature[:16])

    // Print metrics every 10 sales
    if metrics.TotalSales%10 == 0 {
        printMetrics(metrics)
    }
}

func detectMarketplace(tx *pb.SubscribeUpdateTransaction) string {
    // Check accounts to determine marketplace
    if tx.Transaction == nil {
        return ""
    }

    accounts := tx.Transaction.Message.AccountKeys
    for _, account := range accounts {
        if string(account) == MAGIC_EDEN {
            return "Magic Eden"
        }
        if string(account) == TENSOR {
            return "Tensor"
        }
    }

    return ""
}

func parseNFTSale(tx *pb.SubscribeUpdateTransaction, marketplace string) *NFTSale {
    // Simplified parser - actual implementation would decode instructions
    signature := fmt.Sprintf("%x", tx.Transaction.Signature)

    return &NFTSale{
        Signature: signature,
        Mint:      "NFTMintAddress...",
        Price:     1.5,
        Buyer:     "BuyerAddress...",
        Seller:    "SellerAddress...",
        Slot:      tx.Slot,
    }
}

func printMetrics(metrics *SalesMetrics) {
    fmt.Println("\n📊 Sales Metrics:")
    fmt.Printf("   Total Sales: %d\n", metrics.TotalSales)
    fmt.Printf("   Total Volume: %.2f SOL\n", metrics.TotalVolume)
    fmt.Printf("   Average Price: %.2f SOL\n", metrics.TotalVolume/float64(metrics.TotalSales))

    fmt.Println("\n   Sales by Marketplace:")
    for marketplace, count := range metrics.SalesByMarketplace {
        fmt.Printf("     %s: %d\n", marketplace, count)
    }
    fmt.Println()
}

Run

export FORTIBLOX_API_KEY="your_api_key_here"
go run main.go

DeFi Protocol Monitor

// DeFiMonitor.java
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;

import java.util.*;
import java.util.concurrent.TimeUnit;

public class DeFiMonitor {
    private static final String JUPITER_PROGRAM = "JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4";
    private static final String RAYDIUM_PROGRAM = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8";

    private final GeyserGrpc.GeyserStub asyncStub;
    private int swapCount = 0;
    private double totalVolume = 0;
    private Map<String, Integer> swapsByProtocol = new HashMap<>();

    public DeFiMonitor(String host, int port, String apiKey) {
        // Create insecure channel (plaintext)
        ManagedChannel channel = ManagedChannelBuilder
            .forAddress(host, port)
            .usePlaintext()
            .build();

        // Add API key metadata
        Metadata metadata = new Metadata();
        Metadata.Key<String> key = Metadata.Key.of("x-api-key", Metadata.ASCII_STRING_MARSHALLER);
        metadata.put(key, apiKey);

        this.asyncStub = GeyserGrpc.newStub(channel)
            .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));

        swapsByProtocol.put("Jupiter", 0);
        swapsByProtocol.put("Raydium", 0);
    }

    public void startMonitoring() {
        System.out.println("🚀 DeFi protocol monitor started");
        System.out.println("   Endpoint: grpc.fortiblox.com:10002");
        System.out.println("   Protocols: Jupiter, Raydium");
        System.out.println("   Commitment: CONFIRMED\n");

        StreamObserver<SubscribeRequest> requestObserver = asyncStub.subscribe(
            new StreamObserver<SubscribeUpdate>() {
                @Override
                public void onNext(SubscribeUpdate update) {
                    if (update.hasTransaction()) {
                        processSwap(update.getTransaction());
                    }
                }

                @Override
                public void onError(Throwable t) {
                    System.err.println("❌ Stream error: " + t.getMessage());
                    System.out.println("🔄 Reconnecting in 5 seconds...");
                    try {
                        Thread.sleep(5000);
                        startMonitoring(); // Reconnect
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }

                @Override
                public void onCompleted() {
                    System.out.println("Stream completed");
                }
            }
        );

        // Send subscription request
        Map<String, SubscribeRequestFilterTransactions> filters = new HashMap<>();
        filters.put("defi_swaps",
            SubscribeRequestFilterTransactions.newBuilder()
                .addAccountInclude(JUPITER_PROGRAM)
                .addAccountInclude(RAYDIUM_PROGRAM)
                .setVote(false)
                .setFailed(false)
                .build()
        );

        SubscribeRequest request = SubscribeRequest.newBuilder()
            .putAllTransactions(filters)
            .setCommitment(CommitmentLevel.CONFIRMED)
            .build();

        requestObserver.onNext(request);
    }

    private void processSwap(SubscribeUpdateTransaction tx) {
        swapCount++;

        // Detect protocol
        String protocol = detectProtocol(tx);
        if (protocol != null) {
            swapsByProtocol.put(protocol, swapsByProtocol.get(protocol) + 1);
        }

        // Parse swap details
        SwapData swap = parseSwap(tx);

        System.out.printf("\n🔄 Swap #%d\n", swapCount);
        System.out.printf("   Protocol: %s\n", protocol);
        System.out.printf("   Slot: %d\n", tx.getSlot());

        if (swap != null) {
            System.out.printf("   Input: %s %s\n", swap.inputAmount, swap.inputToken);
            System.out.printf("   Output: %s %s\n", swap.outputAmount, swap.outputToken);
            System.out.printf("   Volume: $%.2f\n", swap.volumeUSD);
            totalVolume += swap.volumeUSD;
        }

        // Print metrics every 25 swaps
        if (swapCount % 25 == 0) {
            printMetrics();
        }
    }

    private String detectProtocol(SubscribeUpdateTransaction tx) {
        // Check accounts to determine protocol
        List<byte[]> accounts = tx.getTransaction().getMessage().getAccountKeysList();
        for (byte[] account : accounts) {
            String accountStr = new String(account);
            if (accountStr.contains(JUPITER_PROGRAM.substring(0, 10))) {
                return "Jupiter";
            }
            if (accountStr.contains(RAYDIUM_PROGRAM.substring(0, 10))) {
                return "Raydium";
            }
        }
        return "Unknown";
    }

    private SwapData parseSwap(SubscribeUpdateTransaction tx) {
        // Simplified parser
        SwapData swap = new SwapData();
        swap.inputToken = "USDC";
        swap.outputToken = "SOL";
        swap.inputAmount = "100";
        swap.outputAmount = "0.5";
        swap.volumeUSD = 100.0;
        return swap;
    }

    private void printMetrics() {
        System.out.println("\n📊 DeFi Metrics:");
        System.out.printf("   Total Swaps: %d\n", swapCount);
        System.out.printf("   Total Volume: $%.2f\n", totalVolume);
        System.out.printf("   Average: $%.2f per swap\n", totalVolume / swapCount);

        System.out.println("\n   Swaps by Protocol:");
        for (Map.Entry<String, Integer> entry : swapsByProtocol.entrySet()) {
            System.out.printf("     %s: %d\n", entry.getKey(), entry.getValue());
        }
        System.out.println();
    }

    static class SwapData {
        String inputToken;
        String outputToken;
        String inputAmount;
        String outputAmount;
        double volumeUSD;
    }

    public static void main(String[] args) {
        String apiKey = System.getenv("FORTIBLOX_API_KEY");
        if (apiKey == null || apiKey.isEmpty()) {
            System.err.println("Error: FORTIBLOX_API_KEY environment variable not set");
            System.exit(1);
        }

        DeFiMonitor monitor = new DeFiMonitor("grpc.fortiblox.com", 10002, apiKey);
        monitor.startMonitoring();

        // Keep alive
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            System.out.println("\n👋 Shutting down...");
        }
    }
}

Run

export FORTIBLOX_API_KEY="your_api_key_here"
mvn compile exec:java -Dexec.mainClass="DeFiMonitor"

Real-World Use Cases

1. Payment Gateway

Monitor incoming payments and update order status:

const USDC_MINT = 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v';
const PAYMENT_WALLET = 'YourWalletAddress...';

stream.write({
  transactions: {
    'payments': {
      accountInclude: [PAYMENT_WALLET],
      vote: false,
      failed: false
    }
  },
  commitment: 'CONFIRMED'
});

stream.on('data', async (update) => {
  if (update.transaction) {
    const tokenTransfer = findTokenTransfer(update.transaction, USDC_MINT, PAYMENT_WALLET);

    if (tokenTransfer) {
      const amountUSDC = tokenTransfer.amount / 1_000_000;
      const order = await findOrderByAmount(amountUSDC);

      if (order) {
        await updateOrderStatus(order.id, 'paid', {
          transaction: tokenTransfer.signature,
          amount: amountUSDC
        });

        await sendConfirmationEmail(order.customer_email);
        console.log(`✅ Payment received: ${amountUSDC} USDC for order ${order.id}`);
      }
    }
  }
});

2. Trading Bot

Execute arbitrage based on DEX swaps:

MONITORED_POOLS = ['Pool1...', 'Pool2...', 'Pool3...']

async def handle_swap(tx):
    for pool in MONITORED_POOLS:
        if pool in tx.accounts:
            # Analyze price impact
            price_impact = await calculate_price_impact(tx, pool)

            if price_impact > ARBITRAGE_THRESHOLD:
                # Calculate optimal trade size
                amount = calculate_optimal_amount(price_impact)

                # Execute arbitrage
                await execute_trade({
                    'pool': pool,
                    'direction': 'buy' if price_impact > 0 else 'sell',
                    'amount': amount,
                    'slippage': 0.5
                })

                print(f"💰 Arbitrage executed: {price_impact:.2f}% impact")

3. NFT Sniper Bot

Snipe underpriced NFT listings:

async fn snipe_nfts(tx: SubscribeUpdateTransaction) {
    if let Some(listing) = parse_nft_listing(&tx) {
        // Check floor price
        let floor_price = get_collection_floor(& listing.collection).await;

        if listing.price < floor_price * 0.8 {
            // Listed 20% below floor!
            println!("🎯 Sniper opportunity!");
            println!("   Floor: {} SOL", floor_price);
            println!("   Listed: {} SOL", listing.price);
            println!("   Discount: {:.1}%", (1.0 - listing.price / floor_price) * 100.0);

            // Execute buy
            execute_nft_purchase(&listing).await;
        }
    }
}

4. Analytics Dashboard

Real-time blockchain analytics:

type Analytics struct {
    TxPerSecond float64
    UniqueWallets map[string]bool
    VolumeByProgram map[string]float64
    TopPrograms []ProgramStats
}

func (a *Analytics) ProcessTransaction(tx *pb.SubscribeUpdateTransaction) {
    // Count unique wallets
    for _, account := range tx.Transaction.Message.AccountKeys {
        a.UniqueWallets[string(account)] = true
    }

    // Track program usage
    programID := detectProgram(tx)
    volume := calculateVolume(tx)
    a.VolumeByProgram[programID] += volume

    // Update dashboard every second
    a.UpdateDashboard()
}

Performance Optimization

1. Efficient Account Filtering

Use account_include for OR logic, account_required for AND logic:

// Monitor transactions involving EITHER account
stream.write({
  transactions: {
    'either': {
      accountInclude: ['Account1...', 'Account2...']  // OR logic
    }
  }
});

// Monitor transactions involving BOTH accounts
stream.write({
  transactions: {
    'both': {
      accountRequired: ['Account1...', 'Account2...']  // AND logic
    }
  }
});

2. Batch Processing

Process transactions in batches for better throughput:

import asyncio
from collections import deque

batch_size = 100
tx_buffer = deque(maxlen=1000)

async def process_stream():
    for update in stub.Subscribe(request, metadata=metadata):
        if update.HasField('transaction'):
            tx_buffer.append(update.transaction)

            if len(tx_buffer) >= batch_size:
                # Process batch
                await process_batch(list(tx_buffer))
                tx_buffer.clear()

3. Parallel Processing

Use worker pools for CPU-intensive parsing:

use tokio::sync::mpsc;
use std::sync::Arc;

async fn parallel_processing() {
    let (tx_sender, mut tx_receiver) = mpsc::channel(1000);

    // Spawn worker tasks
    for _ in 0..num_cpus::get() {
        let mut receiver = tx_receiver.clone();

        tokio::spawn(async move {
            while let Some(tx) = receiver.recv().await {
                // Parse and process transaction
                process_transaction(tx).await;
            }
        });
    }

    // Feed transactions to workers
    while let Some(update) = stream.next().await {
        if let Some(tx) = update.transaction {
            tx_sender.send(tx).await.unwrap();
        }
    }
}

Error Handling

Reconnection with Exponential Backoff

class ResilientStream {
  constructor() {
    this.retryCount = 0;
    this.maxRetries = 10;
  }

  async connect() {
    try {
      this.retryCount = 0;
      await this.createStream();
    } catch (err) {
      await this.handleError(err);
    }
  }

  async handleError(err) {
    this.retryCount++;

    if (this.retryCount >= this.maxRetries) {
      console.error('Max retries reached, giving up');
      return;
    }

    // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, 60s (capped)
    const delay = Math.min(Math.pow(2, this.retryCount) * 1000, 60000);

    console.log(`Retry ${this.retryCount}/${this.maxRetries} in ${delay/1000}s`);
    await new Promise(resolve => setTimeout(resolve, delay));

    await this.connect();
  }
}

Troubleshooting

Issue: No Data Received

Symptoms:

  • Stream connects successfully
  • No transactions received

Solutions:

  1. Check filter configuration
  2. Verify accounts are active
  3. Try broader filters first
  4. Check commitment level
// Debug: Subscribe to ALL transactions (temporarily)
stream.write({
  transactions: {
    'debug': {
      vote: false
      // No account filters - will receive all non-vote transactions
    }
  },
  commitment: 'CONFIRMED'
});

Issue: High Memory Usage

Solutions:

  1. Implement circular buffers
  2. Process and discard data immediately
  3. Avoid storing full transaction objects
  4. Use streaming JSON parsers
# Good: Process immediately
for update in stub.Subscribe(request, metadata=metadata):
    if update.HasField('transaction'):
        process_immediately(update.transaction)  # Don't store!

# Bad: Store everything
transactions = []
for update in stub.Subscribe(request, metadata=metadata):
    transactions.append(update)  # Memory leak!

Issue: Falling Behind Stream

Symptoms:

  • Processing delay increasing
  • Memory growing
  • Eventual disconnection

Solutions:

  1. Optimize processing code
  2. Use parallel processing
  3. Reduce logging/IO operations
  4. Consider filtering more aggressively

Next Steps

Support


Technical Details:

  • Redis Stream: geyser:transactions
  • Current Entries: 262,000+
  • Freshness: Real-time (<100ms)
  • Protocol: HTTP/2 plaintext
  • Endpoint: grpc.fortiblox.com:10002
  • Authentication: SHA-256 via x-api-key header