Transaction Streaming
Stream real-time X1 Blockchain transactions with advanced filtering and comprehensive examples
Transaction Streaming
Subscribe to real-time transaction updates from FortiBlox gRPC streaming with powerful filtering options. Monitor specific accounts, track program interactions, and build sophisticated blockchain applications with <100ms latency.
Fully Operational: Transaction streaming is live with 262,000+ entries in Redis (geyser:transactions) providing real-time updates.
Overview
Transaction streaming allows you to:
- Monitor specific accounts - Track wallet activity, program interactions
- Filter by program - Watch specific smart contract activity
- Exclude vote transactions - Focus on meaningful transactions
- Choose commitment levels - Balance speed vs finality
- Real-time processing - Sub-second latency from Redis pub/sub
Data Source
- Redis Stream:
geyser:transactions - Current Entries: 262,000+
- Freshness: Real-time (<100ms)
- Endpoint:
grpc.fortiblox.com:10002(HTTP/2 plaintext)
Request Parameters
SubscribeTransactions Filter Options
| Parameter | Type | Description | Example |
|---|---|---|---|
account_include | string[] | Include transactions touching these accounts (OR logic) | ['JUP6...', 'TokenkegQ...'] |
account_exclude | string[] | Exclude transactions touching these accounts | ['Vote111...'] |
account_required | string[] | Require ALL these accounts (AND logic) | ['Wallet1...', 'Wallet2...'] |
vote | bool | Include vote transactions (default: true) | false (recommended) |
failed | bool | Include failed transactions (default: true) | false |
commitment | enum | Commitment level | CONFIRMED or FINALIZED |
start_slot | uint64 | Start from this slot (0 = real-time only) | 11163364 |
end_slot | uint64 | End at this slot (0 = continue indefinitely) | 0 |
Commitment Levels
| Level | Description | Use Case | Latency |
|---|---|---|---|
PROCESSED | Optimistically confirmed by cluster | Real-time monitoring | ~400ms |
CONFIRMED | Voted on by supermajority | Most applications | ~1s |
FINALIZED | Finalized by cluster | High-value transactions | ~30s |
Recommended: Use CONFIRMED commitment for most applications. It provides a good balance between speed and certainty.
Complete Code Examples
Monitor Jupiter Swaps
// jupiter-monitor.js
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const JUPITER_PROGRAM = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4';
// Load proto
const packageDefinition = protoLoader.loadSync('geyser.proto', {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const proto = grpc.loadPackageDefinition(packageDefinition);
// Create client (plaintext/insecure)
const client = new proto.geyser.Geyser(
'grpc.fortiblox.com:10002',
grpc.credentials.createInsecure()
);
// Add API key metadata
const metadata = new grpc.Metadata();
metadata.add('x-api-key', process.env.FORTIBLOX_API_KEY);
// Create bidirectional stream
const stream = client.subscribe(metadata);
// Subscribe to Jupiter transactions
stream.write({
transactions: {
'jupiter_swaps': {
accountInclude: [JUPITER_PROGRAM],
vote: false,
failed: false
}
},
commitment: 'CONFIRMED'
});
// Track metrics
let swapCount = 0;
let totalVolume = 0;
const startTime = Date.now();
// Process swaps
stream.on('data', (update) => {
if (update.transaction) {
const tx = update.transaction;
swapCount++;
// Extract signature
const signature = Buffer.from(tx.transaction.signature).toString('base64');
console.log(`\n🔄 Jupiter Swap #${swapCount}`);
console.log(` Signature: ${signature.slice(0, 20)}...`);
console.log(` Slot: ${tx.slot}`);
console.log(` Accounts: ${tx.transaction.message.accountKeys.length}`);
// Parse swap details
const swapData = parseJupiterSwap(tx);
if (swapData) {
console.log(` Input: ${swapData.inputAmount} ${swapData.inputToken}`);
console.log(` Output: ${swapData.outputAmount} ${swapData.outputToken}`);
totalVolume += swapData.volumeUSD;
}
// Show metrics every 10 swaps
if (swapCount % 10 === 0) {
const elapsed = (Date.now() - startTime) / 1000;
const swapsPerMin = (swapCount / elapsed) * 60;
console.log(`\n📊 Metrics:`);
console.log(` Total Swaps: ${swapCount}`);
console.log(` Rate: ${swapsPerMin.toFixed(1)} swaps/min`);
console.log(` Total Volume: $${totalVolume.toFixed(2)}\n`);
}
}
});
function parseJupiterSwap(tx) {
// Parse transaction data
// This is simplified - actual parsing depends on Jupiter's instruction format
try {
const instructions = tx.transaction.message.instructions;
// Extract swap details from instructions
return {
inputToken: 'USDC',
outputToken: 'SOL',
inputAmount: 100,
outputAmount: 0.5,
volumeUSD: 100
};
} catch (err) {
console.error('Parse error:', err.message);
return null;
}
}
// Error handling with reconnection
stream.on('error', (err) => {
console.error('❌ Stream error:', err.message);
if (err.code === grpc.status.UNAVAILABLE) {
console.log('🔄 Reconnecting in 5 seconds...');
setTimeout(() => reconnect(), 5000);
}
});
stream.on('end', () => {
console.log('Stream ended, reconnecting...');
setTimeout(() => reconnect(), 1000);
});
function reconnect() {
console.log('🔌 Reconnecting...');
// Create new stream and resubscribe
// ... (same logic as above)
}
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\n👋 Shutting down...');
console.log(`Final stats: ${swapCount} swaps processed`);
stream.end();
process.exit(0);
});
console.log('🚀 Jupiter swap monitor started');
console.log(' Endpoint: grpc.fortiblox.com:10002');
console.log(' Program: JUP6...');
console.log(' Commitment: CONFIRMED\n');Run
export FORTIBLOX_API_KEY="your_api_key_here"
node jupiter-monitor.jsWallet Activity Monitor
# wallet_monitor.py
import grpc
import os
import base64
from datetime import datetime
import geyser_pb2
import geyser_pb2_grpc
class WalletMonitor:
def __init__(self, wallet_address, api_key):
self.wallet_address = wallet_address
self.api_key = api_key
self.tx_count = 0
self.total_fees = 0
def connect(self):
# Create insecure channel (plaintext)
channel = grpc.insecure_channel('grpc.fortiblox.com:10002')
stub = geyser_pb2_grpc.GeyserStub(channel)
return stub
def request_generator(self):
"""Generate subscription request"""
yield geyser_pb2.SubscribeRequest(
transactions={
'wallet_txs': geyser_pb2.SubscribeRequestFilterTransactions(
account_include=[self.wallet_address],
vote=False,
failed=False
)
},
commitment=geyser_pb2.CommitmentLevel.CONFIRMED
)
def monitor(self):
print(f"🚀 Monitoring wallet: {self.wallet_address[:8]}...")
print(f" Endpoint: grpc.fortiblox.com:10002")
print(f" Commitment: CONFIRMED\n")
stub = self.connect()
metadata = [('x-api-key', self.api_key)]
try:
for update in stub.Subscribe(self.request_generator(), metadata=metadata):
if update.HasField('transaction'):
self.process_transaction(update.transaction)
except grpc.RpcError as e:
print(f"❌ gRPC error: {e.code()} - {e.details()}")
print("🔄 Reconnecting in 5 seconds...")
import time
time.sleep(5)
self.monitor() # Reconnect
except KeyboardInterrupt:
print("\n👋 Shutting down...")
print(f"Final stats: {self.tx_count} transactions processed")
def process_transaction(self, tx):
self.tx_count += 1
# Extract signature
sig = base64.b64encode(tx.transaction.signature).decode('utf-8')
# Get timestamp
timestamp = datetime.fromtimestamp(tx.transaction.block_time).strftime('%H:%M:%S')
# Extract fee
fee_lamports = tx.transaction.meta.fee if tx.transaction.HasField('meta') else 0
fee_sol = fee_lamports / 1_000_000_000
self.total_fees += fee_sol
# Determine transaction type
tx_type = self.determine_tx_type(tx)
print(f"\n💳 Transaction #{self.tx_count}")
print(f" Signature: {sig[:20]}...")
print(f" Time: {timestamp}")
print(f" Slot: {tx.slot}")
print(f" Type: {tx_type}")
print(f" Fee: {fee_sol:.6f} SOL")
print(f" Success: {'✅' if not tx.transaction.meta.err else '❌'}")
# Show cumulative stats every 10 transactions
if self.tx_count % 10 == 0:
print(f"\n📊 Stats:")
print(f" Total Transactions: {self.tx_count}")
print(f" Total Fees: {self.total_fees:.6f} SOL")
print(f" Average Fee: {(self.total_fees/self.tx_count):.6f} SOL\n")
def determine_tx_type(self, tx):
"""Determine transaction type from instructions"""
# Simplified - actual implementation would parse instructions
instructions = tx.transaction.message.instructions
if len(instructions) == 0:
return "Unknown"
# Check for common program IDs
program_ids = [ix.program_id_index for ix in instructions]
# Map program IDs to types
# ... (implementation specific)
return "Transfer" # Default
if __name__ == '__main__':
api_key = os.getenv('FORTIBLOX_API_KEY')
if not api_key:
print("Error: FORTIBLOX_API_KEY environment variable not set")
exit(1)
# Replace with your wallet address
wallet = '9B5XszUGdMaxCZ7uSQhPzdks5ZQSmWxrmzCSvtJ6Ns6g'
monitor = WalletMonitor(wallet, api_key)
monitor.monitor()Setup & Run
# Install dependencies
pip install grpcio grpcio-tools
# Download and compile proto
curl -O https://raw.githubusercontent.com/rpcpool/yellowstone-grpc/master/yellowstone-grpc-proto/proto/geyser.proto
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. geyser.proto
# Run
export FORTIBLOX_API_KEY="your_api_key_here"
python wallet_monitor.pyToken Transfer Monitor
// src/main.rs
use tonic::{transport::Channel, metadata::MetadataValue, Request};
use tokio_stream::StreamExt;
use std::collections::HashMap;
pub mod geyser {
tonic::include_proto!("geyser");
}
use geyser::{geyser_client::GeyserClient, SubscribeRequest};
const TOKEN_PROGRAM: &str = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA";
struct TransferMetrics {
transfer_count: u64,
total_volume: u64,
token_types: HashMap<String, u64>,
}
impl TransferMetrics {
fn new() -> Self {
Self {
transfer_count: 0,
total_volume: 0,
token_types: HashMap::new(),
}
}
fn process_transfer(&mut self, token: String, amount: u64) {
self.transfer_count += 1;
self.total_volume += amount;
*self.token_types.entry(token).or_insert(0) += 1;
// Print metrics every 100 transfers
if self.transfer_count % 100 == 0 {
self.print_metrics();
}
}
fn print_metrics(&self) {
println!("\n📊 Transfer Metrics:");
println!(" Total Transfers: {}", self.transfer_count);
println!(" Total Volume: {} tokens", self.total_volume);
println!(" Unique Tokens: {}", self.token_types.len());
// Top 5 tokens by transfer count
let mut sorted: Vec<_> = self.token_types.iter().collect();
sorted.sort_by(|a, b| b.1.cmp(a.1));
println!("\n Top Tokens:");
for (token, count) in sorted.iter().take(5) {
println!(" {} - {} transfers", &token[..8], count);
}
println!();
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 Token transfer monitor started");
println!(" Endpoint: grpc.fortiblox.com:10002");
println!(" Program: Token Program");
println!(" Commitment: CONFIRMED\n");
let mut metrics = TransferMetrics::new();
loop {
if let Err(e) = monitor_transfers(&mut metrics).await {
eprintln!("❌ Error: {}", e);
eprintln!("🔄 Reconnecting in 5 seconds...");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
async fn monitor_transfers(metrics: &mut TransferMetrics) -> Result<(), Box<dyn std::error::Error>> {
// Create channel (insecure/plaintext)
let channel = Channel::from_static("http://grpc.fortiblox.com:10002")
.connect()
.await?;
let mut client = GeyserClient::new(channel);
// Create request with API key
let api_key = std::env::var("FORTIBLOX_API_KEY")?;
let token = MetadataValue::try_from(api_key)?;
let mut request = Request::new(tokio_stream::iter(vec![
SubscribeRequest {
transactions: std::collections::HashMap::from([(
"token_transfers".to_string(),
geyser::SubscribeRequestFilterTransactions {
account_include: vec![TOKEN_PROGRAM.to_string()],
vote: Some(false),
failed: Some(false),
..Default::default()
}
)]),
commitment: Some(geyser::CommitmentLevel::Confirmed as i32),
..Default::default()
}
]));
request.metadata_mut().insert("x-api-key", token);
// Subscribe
let mut stream = client.subscribe(request).await?.into_inner();
// Process updates
while let Some(update) = stream.next().await {
match update {
Ok(msg) => {
if let Some(tx) = msg.transaction {
process_transaction(tx, metrics);
}
}
Err(e) => {
eprintln!("Stream error: {}", e);
return Err(Box::new(e));
}
}
}
Ok(())
}
fn process_transaction(
tx: geyser::SubscribeUpdateTransaction,
metrics: &mut TransferMetrics
) {
if let Some(transaction) = tx.transaction {
let sig = hex::encode(&transaction.signature[..8]);
println!("\n💸 Token Transfer");
println!(" Signature: {}...", sig);
println!(" Slot: {}", tx.slot);
// Parse transfer details
if let Some(transfer) = parse_token_transfer(&transaction) {
println!(" From: {}...", &transfer.from[..8]);
println!(" To: {}...", &transfer.to[..8]);
println!(" Amount: {} {}", transfer.amount, transfer.token);
metrics.process_transfer(transfer.token, transfer.amount);
}
}
}
struct TokenTransfer {
from: String,
to: String,
amount: u64,
token: String,
}
fn parse_token_transfer(tx: &geyser::Transaction) -> Option<TokenTransfer> {
// Simplified parser - actual implementation would decode instructions
Some(TokenTransfer {
from: "FromAddress...".to_string(),
to: "ToAddress...".to_string(),
amount: 1000000,
token: "USDC".to_string(),
})
}Cargo.toml
[dependencies]
tonic = "0.11"
prost = "0.12"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
hex = "0.4"
[build-dependencies]
tonic-build = "0.11"Run
export FORTIBLOX_API_KEY="your_api_key_here"
cargo run --releaseNFT Sales Monitor
// main.go
package main
import (
"context"
"fmt"
"io"
"log"
"os"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
pb "your-module/geyser"
)
const (
MAGIC_EDEN = "M2mx93ekt1fmXSVkTrUL9xVFHkmME8HTUi5Cyc5aF7K"
TENSOR = "TSWAPaqyCSx2KABk68Shruf4rp7CxcNi8hAsbdwmHbN"
)
type NFTSale struct {
Signature string
Mint string
Price float64
Buyer string
Seller string
Slot uint64
}
type SalesMetrics struct {
TotalSales int
TotalVolume float64
SalesByMarketplace map[string]int
}
func main() {
fmt.Println("🚀 NFT sales monitor started")
fmt.Println(" Endpoint: grpc.fortiblox.com:10002")
fmt.Println(" Marketplaces: Magic Eden, Tensor")
fmt.Println(" Commitment: CONFIRMED\n")
apiKey := os.Getenv("FORTIBLOX_API_KEY")
if apiKey == "" {
log.Fatal("FORTIBLOX_API_KEY environment variable not set")
}
metrics := &SalesMetrics{
SalesByMarketplace: make(map[string]int),
}
for {
if err := monitorNFTSales(apiKey, metrics); err != nil {
log.Printf("❌ Error: %v", err)
log.Println("🔄 Reconnecting in 5 seconds...")
time.Sleep(5 * time.Second)
}
}
}
func monitorNFTSales(apiKey string, metrics *SalesMetrics) error {
// Create insecure connection (plaintext)
conn, err := grpc.Dial(
"grpc.fortiblox.com:10002",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return fmt.Errorf("failed to connect: %w", err)
}
defer conn.Close()
// Create client
client := pb.NewGeyserClient(conn)
// Add API key to context
md := metadata.New(map[string]string{
"x-api-key": apiKey,
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
// Create stream
stream, err := client.Subscribe(ctx)
if err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
}
// Send subscription request for both marketplaces
req := &pb.SubscribeRequest{
Transactions: map[string]*pb.SubscribeRequestFilterTransactions{
"nft_sales": {
AccountInclude: []string{MAGIC_EDEN, TENSOR},
Vote: false,
Failed: false,
},
},
Commitment: pb.CommitmentLevel_CONFIRMED,
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
// Receive updates
for {
update, err := stream.Recv()
if err == io.EOF {
return fmt.Errorf("stream ended")
}
if err != nil {
return fmt.Errorf("stream error: %w", err)
}
if tx := update.GetTransaction(); tx != nil {
processNFTSale(tx, metrics)
}
}
}
func processNFTSale(tx *pb.SubscribeUpdateTransaction, metrics *SalesMetrics) {
// Detect marketplace
marketplace := detectMarketplace(tx)
if marketplace == "" {
return
}
// Parse sale details
sale := parseNFTSale(tx, marketplace)
if sale == nil {
return
}
// Update metrics
metrics.TotalSales++
metrics.TotalVolume += sale.Price
metrics.SalesByMarketplace[marketplace]++
// Print sale info
fmt.Printf("\n🎨 NFT Sale #%d\n", metrics.TotalSales)
fmt.Printf(" Marketplace: %s\n", marketplace)
fmt.Printf(" NFT: %s...\n", sale.Mint[:8])
fmt.Printf(" Price: %.2f SOL\n", sale.Price)
fmt.Printf(" Buyer: %s...\n", sale.Buyer[:8])
fmt.Printf(" Signature: %s...\n", sale.Signature[:16])
// Print metrics every 10 sales
if metrics.TotalSales%10 == 0 {
printMetrics(metrics)
}
}
func detectMarketplace(tx *pb.SubscribeUpdateTransaction) string {
// Check accounts to determine marketplace
if tx.Transaction == nil {
return ""
}
accounts := tx.Transaction.Message.AccountKeys
for _, account := range accounts {
if string(account) == MAGIC_EDEN {
return "Magic Eden"
}
if string(account) == TENSOR {
return "Tensor"
}
}
return ""
}
func parseNFTSale(tx *pb.SubscribeUpdateTransaction, marketplace string) *NFTSale {
// Simplified parser - actual implementation would decode instructions
signature := fmt.Sprintf("%x", tx.Transaction.Signature)
return &NFTSale{
Signature: signature,
Mint: "NFTMintAddress...",
Price: 1.5,
Buyer: "BuyerAddress...",
Seller: "SellerAddress...",
Slot: tx.Slot,
}
}
func printMetrics(metrics *SalesMetrics) {
fmt.Println("\n📊 Sales Metrics:")
fmt.Printf(" Total Sales: %d\n", metrics.TotalSales)
fmt.Printf(" Total Volume: %.2f SOL\n", metrics.TotalVolume)
fmt.Printf(" Average Price: %.2f SOL\n", metrics.TotalVolume/float64(metrics.TotalSales))
fmt.Println("\n Sales by Marketplace:")
for marketplace, count := range metrics.SalesByMarketplace {
fmt.Printf(" %s: %d\n", marketplace, count)
}
fmt.Println()
}Run
export FORTIBLOX_API_KEY="your_api_key_here"
go run main.goDeFi Protocol Monitor
// DeFiMonitor.java
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class DeFiMonitor {
private static final String JUPITER_PROGRAM = "JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4";
private static final String RAYDIUM_PROGRAM = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8";
private final GeyserGrpc.GeyserStub asyncStub;
private int swapCount = 0;
private double totalVolume = 0;
private Map<String, Integer> swapsByProtocol = new HashMap<>();
public DeFiMonitor(String host, int port, String apiKey) {
// Create insecure channel (plaintext)
ManagedChannel channel = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext()
.build();
// Add API key metadata
Metadata metadata = new Metadata();
Metadata.Key<String> key = Metadata.Key.of("x-api-key", Metadata.ASCII_STRING_MARSHALLER);
metadata.put(key, apiKey);
this.asyncStub = GeyserGrpc.newStub(channel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));
swapsByProtocol.put("Jupiter", 0);
swapsByProtocol.put("Raydium", 0);
}
public void startMonitoring() {
System.out.println("🚀 DeFi protocol monitor started");
System.out.println(" Endpoint: grpc.fortiblox.com:10002");
System.out.println(" Protocols: Jupiter, Raydium");
System.out.println(" Commitment: CONFIRMED\n");
StreamObserver<SubscribeRequest> requestObserver = asyncStub.subscribe(
new StreamObserver<SubscribeUpdate>() {
@Override
public void onNext(SubscribeUpdate update) {
if (update.hasTransaction()) {
processSwap(update.getTransaction());
}
}
@Override
public void onError(Throwable t) {
System.err.println("❌ Stream error: " + t.getMessage());
System.out.println("🔄 Reconnecting in 5 seconds...");
try {
Thread.sleep(5000);
startMonitoring(); // Reconnect
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void onCompleted() {
System.out.println("Stream completed");
}
}
);
// Send subscription request
Map<String, SubscribeRequestFilterTransactions> filters = new HashMap<>();
filters.put("defi_swaps",
SubscribeRequestFilterTransactions.newBuilder()
.addAccountInclude(JUPITER_PROGRAM)
.addAccountInclude(RAYDIUM_PROGRAM)
.setVote(false)
.setFailed(false)
.build()
);
SubscribeRequest request = SubscribeRequest.newBuilder()
.putAllTransactions(filters)
.setCommitment(CommitmentLevel.CONFIRMED)
.build();
requestObserver.onNext(request);
}
private void processSwap(SubscribeUpdateTransaction tx) {
swapCount++;
// Detect protocol
String protocol = detectProtocol(tx);
if (protocol != null) {
swapsByProtocol.put(protocol, swapsByProtocol.get(protocol) + 1);
}
// Parse swap details
SwapData swap = parseSwap(tx);
System.out.printf("\n🔄 Swap #%d\n", swapCount);
System.out.printf(" Protocol: %s\n", protocol);
System.out.printf(" Slot: %d\n", tx.getSlot());
if (swap != null) {
System.out.printf(" Input: %s %s\n", swap.inputAmount, swap.inputToken);
System.out.printf(" Output: %s %s\n", swap.outputAmount, swap.outputToken);
System.out.printf(" Volume: $%.2f\n", swap.volumeUSD);
totalVolume += swap.volumeUSD;
}
// Print metrics every 25 swaps
if (swapCount % 25 == 0) {
printMetrics();
}
}
private String detectProtocol(SubscribeUpdateTransaction tx) {
// Check accounts to determine protocol
List<byte[]> accounts = tx.getTransaction().getMessage().getAccountKeysList();
for (byte[] account : accounts) {
String accountStr = new String(account);
if (accountStr.contains(JUPITER_PROGRAM.substring(0, 10))) {
return "Jupiter";
}
if (accountStr.contains(RAYDIUM_PROGRAM.substring(0, 10))) {
return "Raydium";
}
}
return "Unknown";
}
private SwapData parseSwap(SubscribeUpdateTransaction tx) {
// Simplified parser
SwapData swap = new SwapData();
swap.inputToken = "USDC";
swap.outputToken = "SOL";
swap.inputAmount = "100";
swap.outputAmount = "0.5";
swap.volumeUSD = 100.0;
return swap;
}
private void printMetrics() {
System.out.println("\n📊 DeFi Metrics:");
System.out.printf(" Total Swaps: %d\n", swapCount);
System.out.printf(" Total Volume: $%.2f\n", totalVolume);
System.out.printf(" Average: $%.2f per swap\n", totalVolume / swapCount);
System.out.println("\n Swaps by Protocol:");
for (Map.Entry<String, Integer> entry : swapsByProtocol.entrySet()) {
System.out.printf(" %s: %d\n", entry.getKey(), entry.getValue());
}
System.out.println();
}
static class SwapData {
String inputToken;
String outputToken;
String inputAmount;
String outputAmount;
double volumeUSD;
}
public static void main(String[] args) {
String apiKey = System.getenv("FORTIBLOX_API_KEY");
if (apiKey == null || apiKey.isEmpty()) {
System.err.println("Error: FORTIBLOX_API_KEY environment variable not set");
System.exit(1);
}
DeFiMonitor monitor = new DeFiMonitor("grpc.fortiblox.com", 10002, apiKey);
monitor.startMonitoring();
// Keep alive
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
System.out.println("\n👋 Shutting down...");
}
}
}Run
export FORTIBLOX_API_KEY="your_api_key_here"
mvn compile exec:java -Dexec.mainClass="DeFiMonitor"Real-World Use Cases
1. Payment Gateway
Monitor incoming payments and update order status:
const USDC_MINT = 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v';
const PAYMENT_WALLET = 'YourWalletAddress...';
stream.write({
transactions: {
'payments': {
accountInclude: [PAYMENT_WALLET],
vote: false,
failed: false
}
},
commitment: 'CONFIRMED'
});
stream.on('data', async (update) => {
if (update.transaction) {
const tokenTransfer = findTokenTransfer(update.transaction, USDC_MINT, PAYMENT_WALLET);
if (tokenTransfer) {
const amountUSDC = tokenTransfer.amount / 1_000_000;
const order = await findOrderByAmount(amountUSDC);
if (order) {
await updateOrderStatus(order.id, 'paid', {
transaction: tokenTransfer.signature,
amount: amountUSDC
});
await sendConfirmationEmail(order.customer_email);
console.log(`✅ Payment received: ${amountUSDC} USDC for order ${order.id}`);
}
}
}
});2. Trading Bot
Execute arbitrage based on DEX swaps:
MONITORED_POOLS = ['Pool1...', 'Pool2...', 'Pool3...']
async def handle_swap(tx):
for pool in MONITORED_POOLS:
if pool in tx.accounts:
# Analyze price impact
price_impact = await calculate_price_impact(tx, pool)
if price_impact > ARBITRAGE_THRESHOLD:
# Calculate optimal trade size
amount = calculate_optimal_amount(price_impact)
# Execute arbitrage
await execute_trade({
'pool': pool,
'direction': 'buy' if price_impact > 0 else 'sell',
'amount': amount,
'slippage': 0.5
})
print(f"💰 Arbitrage executed: {price_impact:.2f}% impact")3. NFT Sniper Bot
Snipe underpriced NFT listings:
async fn snipe_nfts(tx: SubscribeUpdateTransaction) {
if let Some(listing) = parse_nft_listing(&tx) {
// Check floor price
let floor_price = get_collection_floor(& listing.collection).await;
if listing.price < floor_price * 0.8 {
// Listed 20% below floor!
println!("🎯 Sniper opportunity!");
println!(" Floor: {} SOL", floor_price);
println!(" Listed: {} SOL", listing.price);
println!(" Discount: {:.1}%", (1.0 - listing.price / floor_price) * 100.0);
// Execute buy
execute_nft_purchase(&listing).await;
}
}
}4. Analytics Dashboard
Real-time blockchain analytics:
type Analytics struct {
TxPerSecond float64
UniqueWallets map[string]bool
VolumeByProgram map[string]float64
TopPrograms []ProgramStats
}
func (a *Analytics) ProcessTransaction(tx *pb.SubscribeUpdateTransaction) {
// Count unique wallets
for _, account := range tx.Transaction.Message.AccountKeys {
a.UniqueWallets[string(account)] = true
}
// Track program usage
programID := detectProgram(tx)
volume := calculateVolume(tx)
a.VolumeByProgram[programID] += volume
// Update dashboard every second
a.UpdateDashboard()
}Performance Optimization
1. Efficient Account Filtering
Use account_include for OR logic, account_required for AND logic:
// Monitor transactions involving EITHER account
stream.write({
transactions: {
'either': {
accountInclude: ['Account1...', 'Account2...'] // OR logic
}
}
});
// Monitor transactions involving BOTH accounts
stream.write({
transactions: {
'both': {
accountRequired: ['Account1...', 'Account2...'] // AND logic
}
}
});2. Batch Processing
Process transactions in batches for better throughput:
import asyncio
from collections import deque
batch_size = 100
tx_buffer = deque(maxlen=1000)
async def process_stream():
for update in stub.Subscribe(request, metadata=metadata):
if update.HasField('transaction'):
tx_buffer.append(update.transaction)
if len(tx_buffer) >= batch_size:
# Process batch
await process_batch(list(tx_buffer))
tx_buffer.clear()3. Parallel Processing
Use worker pools for CPU-intensive parsing:
use tokio::sync::mpsc;
use std::sync::Arc;
async fn parallel_processing() {
let (tx_sender, mut tx_receiver) = mpsc::channel(1000);
// Spawn worker tasks
for _ in 0..num_cpus::get() {
let mut receiver = tx_receiver.clone();
tokio::spawn(async move {
while let Some(tx) = receiver.recv().await {
// Parse and process transaction
process_transaction(tx).await;
}
});
}
// Feed transactions to workers
while let Some(update) = stream.next().await {
if let Some(tx) = update.transaction {
tx_sender.send(tx).await.unwrap();
}
}
}Error Handling
Reconnection with Exponential Backoff
class ResilientStream {
constructor() {
this.retryCount = 0;
this.maxRetries = 10;
}
async connect() {
try {
this.retryCount = 0;
await this.createStream();
} catch (err) {
await this.handleError(err);
}
}
async handleError(err) {
this.retryCount++;
if (this.retryCount >= this.maxRetries) {
console.error('Max retries reached, giving up');
return;
}
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, 60s (capped)
const delay = Math.min(Math.pow(2, this.retryCount) * 1000, 60000);
console.log(`Retry ${this.retryCount}/${this.maxRetries} in ${delay/1000}s`);
await new Promise(resolve => setTimeout(resolve, delay));
await this.connect();
}
}Troubleshooting
Issue: No Data Received
Symptoms:
- Stream connects successfully
- No transactions received
Solutions:
- Check filter configuration
- Verify accounts are active
- Try broader filters first
- Check commitment level
// Debug: Subscribe to ALL transactions (temporarily)
stream.write({
transactions: {
'debug': {
vote: false
// No account filters - will receive all non-vote transactions
}
},
commitment: 'CONFIRMED'
});Issue: High Memory Usage
Solutions:
- Implement circular buffers
- Process and discard data immediately
- Avoid storing full transaction objects
- Use streaming JSON parsers
# Good: Process immediately
for update in stub.Subscribe(request, metadata=metadata):
if update.HasField('transaction'):
process_immediately(update.transaction) # Don't store!
# Bad: Store everything
transactions = []
for update in stub.Subscribe(request, metadata=metadata):
transactions.append(update) # Memory leak!Issue: Falling Behind Stream
Symptoms:
- Processing delay increasing
- Memory growing
- Eventual disconnection
Solutions:
- Optimize processing code
- Use parallel processing
- Reduce logging/IO operations
- Consider filtering more aggressively
Next Steps
Slot Streaming
Stream real-time slot updates
Getting Started
gRPC setup and installation
Historical Replay
Replay historical transactions
gRPC Overview
Learn about gRPC capabilities
Support
- Discord: discord.gg/fortiblox
- Email: [email protected]
- Status: status.fortiblox.com
- Examples: github.com/fortiblox/grpc-examples
Technical Details:
- Redis Stream: geyser:transactions
- Current Entries: 262,000+
- Freshness: Real-time (<100ms)
- Protocol: HTTP/2 plaintext
- Endpoint: grpc.fortiblox.com:10002
- Authentication: SHA-256 via x-api-key header