FortiBlox LogoFortiBlox Docs
NexusgRPC API

gRPC Code Examples

Production-ready gRPC implementations for common blockchain use cases

gRPC Code Examples

Complete, production-ready examples for common gRPC streaming use cases.

Jupiter Swap Monitoring

Monitor all Jupiter swaps in real-time:

const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');

const JUPITER_PROGRAM = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4';

// Load proto
const packageDef = protoLoader.loadSync('geyser.proto');
const proto = grpc.loadPackageDefinition(packageDef);

// Create client
const client = new proto.geyser.Geyser(
  'grpc.fortiblox.com:443',
  grpc.credentials.createSsl()
);

const metadata = new grpc.Metadata();
metadata.add('x-api-key', process.env.FORTIBLOX_API_KEY);

// Create stream
const stream = client.subscribe(metadata);

// Subscribe to Jupiter transactions
stream.write({
  transactions: {
    'jupiter_swaps': {
      accountInclude: [JUPITER_PROGRAM],
      vote: false,
      failed: false
    }
  },
  commitment: 'CONFIRMED'
});

// Process swaps
stream.on('data', (update) => {
  if (update.transaction) {
    const tx = update.transaction.transaction;
    const signature = Buffer.from(tx.signature).toString('base64');

    console.log('Jupiter Swap:', {
      signature: signature.slice(0, 16) + '...',
      slot: update.transaction.slot,
      accounts: tx.message.accountKeys.length
    });

    // Parse swap details from transaction
    parseJupiterSwap(tx);
  }
});

function parseJupiterSwap(tx) {
  // Extract token transfers and swap amounts
  // Implementation depends on your needs
  console.log('Processing Jupiter swap...');
}

// Error handling
stream.on('error', (err) => {
  console.error('Stream error:', err.message);
  reconnect();
});

function reconnect() {
  console.log('Reconnecting in 5 seconds...');
  setTimeout(() => {
    createStream();
  }, 5000);
}
import grpc
import os
import base64
from geyser_pb2 import SubscribeRequest, SubscribeRequestFilterTransactions
from geyser_pb2_grpc import GeyserStub
import geyser_pb2

JUPITER_PROGRAM = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'

def monitor_jupiter_swaps():
    # Create channel
    credentials = grpc.ssl_channel_credentials()
    channel = grpc.secure_channel('grpc.fortiblox.com:443', credentials)
    stub = GeyserStub(channel)

    # Metadata
    metadata = [('x-api-key', os.getenv('FORTIBLOX_API_KEY'))]

    # Create subscription
    def request_generator():
        yield SubscribeRequest(
            transactions={
                'jupiter_swaps': SubscribeRequestFilterTransactions(
                    account_include=[JUPITER_PROGRAM],
                    vote=False,
                    failed=False
                )
            },
            commitment=geyser_pb2.CommitmentLevel.CONFIRMED
        )

    try:
        for update in stub.Subscribe(request_generator(), metadata=metadata):
            if update.HasField('transaction'):
                tx = update.transaction.transaction
                sig = base64.b64encode(tx.signature).decode('utf-8')

                print(f"Jupiter Swap: {sig[:16]}... at slot {update.transaction.slot}")

                # Parse swap details
                parse_jupiter_swap(tx)

    except grpc.RpcError as e:
        print(f"gRPC error: {e.code()} - {e.details()}")
        reconnect()

def parse_jupiter_swap(tx):
    # Extract and process swap data
    print("Processing Jupiter swap...")
    pass

def reconnect():
    import time
    print("Reconnecting in 5 seconds...")
    time.sleep(5)
    monitor_jupiter_swaps()

if __name__ == '__main__':
    monitor_jupiter_swaps()
use tonic::{transport::Channel, metadata::MetadataValue, Request};
use tokio_stream::StreamExt;

pub mod geyser {
    tonic::include_proto!("geyser");
}

const JUPITER_PROGRAM: &str = "JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4";

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    monitor_jupiter_swaps().await
}

async fn monitor_jupiter_swaps() -> Result<(), Box<dyn std::error::Error>> {
    let channel = Channel::from_static("https://grpc.fortiblox.com:443")
        .connect()
        .await?;

    let mut client = geyser::geyser_client::GeyserClient::new(channel);

    let api_key = std::env::var("FORTIBLOX_API_KEY")?;
    let token = MetadataValue::try_from(api_key)?;

    let mut request = Request::new(tokio_stream::iter(vec![
        geyser::SubscribeRequest {
            transactions: std::collections::HashMap::from([(
                "jupiter_swaps".to_string(),
                geyser::SubscribeRequestFilterTransactions {
                    account_include: vec![JUPITER_PROGRAM.to_string()],
                    vote: Some(false),
                    failed: Some(false),
                    ..Default::default()
                }
            )]),
            commitment: Some(geyser::CommitmentLevel::Confirmed as i32),
            ..Default::default()
        }
    ]));

    request.metadata_mut().insert("x-api-key", token);

    let mut stream = client.subscribe(request).await?.into_inner();

    while let Some(update) = stream.next().await {
        match update {
            Ok(msg) => {
                if let Some(tx_update) = msg.transaction {
                    if let Some(tx) = tx_update.transaction {
                        let sig = hex::encode(&tx.signature);
                        println!("Jupiter Swap: {}... at slot {}",
                            &sig[..16], tx_update.slot);

                        parse_jupiter_swap(&tx);
                    }
                }
            }
            Err(e) => {
                eprintln!("Stream error: {}", e);
                tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
                return monitor_jupiter_swaps().await;
            }
        }
    }

    Ok(())
}

fn parse_jupiter_swap(tx: &geyser::Transaction) {
    println!("Processing Jupiter swap...");
    // Parse swap details
}

Block Explorer Real-time Feed

Stream blocks and transactions for a block explorer:

const grpc = require('@grpc/grpc-js');
const WebSocket = require('ws');

// WebSocket server for frontend
const wss = new WebSocket.Server({ port: 8080 });
const clients = new Set();

wss.on('connection', (ws) => {
  clients.add(ws);
  ws.on('close', () => clients.delete(ws));
});

// gRPC client
const metadata = new grpc.Metadata();
metadata.add('x-api-key', process.env.FORTIBLOX_API_KEY);

const stream = client.subscribe(metadata);

// Subscribe to blocks and transactions
stream.write({
  blocks: {
    'new_blocks': {
      accountInclude: []
    }
  },
  transactions: {
    'recent_txs': {
      vote: false,
      failed: false
    }
  },
  slots: {
    'slot_updates': {}
  }
});

// Forward updates to websocket clients
stream.on('data', (update) => {
  let message = null;

  if (update.block) {
    message = {
      type: 'block',
      data: {
        slot: update.block.slot,
        blockhash: Buffer.from(update.block.blockhash).toString('base64'),
        transactions: update.block.transactions.length,
        blockTime: update.block.blockTime
      }
    };
  } else if (update.transaction) {
    message = {
      type: 'transaction',
      data: {
        signature: Buffer.from(update.transaction.transaction.signature).toString('base64'),
        slot: update.transaction.slot
      }
    };
  } else if (update.slot) {
    message = {
      type: 'slot',
      data: {
        slot: update.slot.slot,
        parent: update.slot.parent
      }
    };
  }

  if (message) {
    const json = JSON.stringify(message);
    clients.forEach(client => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(json);
      }
    });
  }
});

NFT Sales Tracker

Track NFT sales across multiple marketplaces:

import grpc
import os
from geyser_pb2 import SubscribeRequest, SubscribeRequestFilterTransactions
from geyser_pb2_grpc import GeyserStub
import geyser_pb2

# Marketplace programs
MARKETPLACES = {
    'magic_eden': 'M2mx93ekt1fmXSVkTrUL9xVFHkmME8HTUi5Cyc5aF7K',
    'tensor': 'TSWAPaqyCSx2KABk68Shruf4rp7CxcNi8hAsbdwmHbN',
    'solanart': 'CJsLwbP1iu5DuUikHEJnLfANgKy6stB2uFgvBBHoyxwz'
}

def track_nft_sales():
    credentials = grpc.ssl_channel_credentials()
    channel = grpc.secure_channel('grpc.fortiblox.com:443', credentials)
    stub = GeyserStub(channel)

    metadata = [('x-api-key', os.getenv('FORTIBLOX_API_KEY'))]

    def request_generator():
        # Subscribe to all marketplaces
        transactions = {}
        for name, program in MARKETPLACES.items():
            transactions[name] = SubscribeRequestFilterTransactions(
                account_include=[program],
                vote=False,
                failed=False
            )

        yield SubscribeRequest(
            transactions=transactions,
            commitment=geyser_pb2.CommitmentLevel.CONFIRMED
        )

    try:
        for update in stub.Subscribe(request_generator(), metadata=metadata):
            if update.HasField('transaction'):
                tx = update.transaction

                # Detect which marketplace
                marketplace = detect_marketplace(tx)

                if marketplace:
                    sale = parse_nft_sale(tx, marketplace)
                    if sale:
                        print(f"NFT Sale on {marketplace}:")
                        print(f"  Price: {sale['price']} SOL")
                        print(f"  NFT: {sale['mint']}")
                        print(f"  Buyer: {sale['buyer'][:8]}...")

                        # Store in database
                        store_sale(sale)

                        # Send notifications
                        notify_sale(sale)

    except grpc.RpcError as e:
        print(f"Error: {e}")

def detect_marketplace(tx):
    # Detect marketplace from transaction accounts
    pass

def parse_nft_sale(tx, marketplace):
    # Parse sale details from transaction
    return {
        'marketplace': marketplace,
        'mint': 'NFTMintAddress...',
        'price': 1.5,
        'buyer': 'BuyerAddress...',
        'seller': 'SellerAddress...',
        'signature': tx.transaction.signature.hex()
    }

def store_sale(sale):
    # Store in database
    pass

def notify_sale(sale):
    # Send Discord/Telegram notification
    pass

if __name__ == '__main__':
    track_nft_sales()

Wallet Activity Monitor

Monitor specific wallet addresses:

package main

import (
    "context"
    "fmt"
    "log"
    "os"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/metadata"

    pb "your-module/geyser"
)

var MONITORED_WALLETS = []string{
    "Wallet1111111111111111111111111111111111111",
    "Wallet2222222222222222222222222222222222222",
}

func main() {
    creds := credentials.NewTLS(nil)
    conn, err := grpc.Dial("grpc.fortiblox.com:443",
        grpc.WithTransportCredentials(creds))
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := pb.NewGeyserClient(conn)

    md := metadata.New(map[string]string{
        "x-api-key": os.Getenv("FORTIBLOX_API_KEY"),
    })
    ctx := metadata.NewOutgoingContext(context.Background(), md)

    stream, err := client.Subscribe(ctx)
    if err != nil {
        log.Fatal(err)
    }

    // Subscribe to wallet transactions
    req := &pb.SubscribeRequest{
        Transactions: map[string]*pb.SubscribeRequestFilterTransactions{
            "wallet_txs": {
                AccountInclude: MONITORED_WALLETS,
                Vote:          false,
                Failed:        false,
            },
        },
        Accounts: map[string]*pb.SubscribeRequestFilterAccounts{
            "wallet_updates": {
                Account: MONITORED_WALLETS,
            },
        },
        Commitment: pb.CommitmentLevel_CONFIRMED,
    }

    if err := stream.Send(req); err != nil {
        log.Fatal(err)
    }

    // Process updates
    for {
        update, err := stream.Recv()
        if err != nil {
            log.Printf("Stream error: %v", err)
            break
        }

        if tx := update.GetTransaction(); tx != nil {
            fmt.Printf("Transaction: %x at slot %d\n",
                tx.Transaction.Signature[:8], tx.Slot)

            // Analyze transaction
            analyzeTransaction(tx)
        }

        if acct := update.GetAccount(); acct != nil {
            fmt.Printf("Account updated: %s, balance: %d lamports\n",
                acct.Account.Pubkey, acct.Account.Lamports)

            // Check for large balance changes
            checkBalanceChange(acct)
        }
    }
}

func analyzeTransaction(tx *pb.SubscribeUpdateTransaction) {
    // Analyze transaction details
}

func checkBalanceChange(acct *pb.SubscribeUpdateAccount) {
    // Check for significant balance changes
}

Historical Data Replay

Replay past transactions for backtesting:

const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');

async function replayHistory(startSlot, endSlot) {
  const packageDef = protoLoader.loadSync('geyser.proto');
  const proto = grpc.loadPackageDefinition(packageDef);

  const client = new proto.geyser.Geyser(
    'grpc.fortiblox.com:443',
    grpc.credentials.createSsl()
  );

  const metadata = new grpc.Metadata();
  metadata.add('x-api-key', process.env.FORTIBLOX_API_KEY);

  const stream = client.subscribe(metadata);

  // Subscribe with slot range for historical replay
  stream.write({
    transactions: {
      'historical': {
        accountInclude: ['JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'],
        vote: false
      }
    },
    slots: {
      'slot_range': {
        filterByCommitment: true
      }
    },
    commitment: 'FINALIZED'
  });

  let txCount = 0;
  let currentSlot = startSlot;

  stream.on('data', (update) => {
    if (update.transaction) {
      txCount++;

      if (txCount % 100 === 0) {
        console.log(`Processed ${txCount} transactions at slot ${update.transaction.slot}`);
      }

      // Process historical transaction
      processHistoricalTx(update.transaction);
    }

    if (update.slot) {
      currentSlot = update.slot.slot;

      if (currentSlot >= endSlot) {
        console.log(`Reached end slot ${endSlot}, processed ${txCount} transactions`);
        stream.end();
      }
    }
  });

  stream.on('end', () => {
    console.log(`Historical replay complete. Total transactions: ${txCount}`);
  });
}

function processHistoricalTx(tx) {
  // Backtest trading strategy
  // Analyze historical patterns
  // Generate reports
}

// Replay last 1000 slots
const currentSlot = 150234567; // Get from getSlot() first
replayHistory(currentSlot - 1000, currentSlot);

Next Steps

Support