NexusgRPC API
gRPC Code Examples
Production-ready gRPC implementations for common blockchain use cases
gRPC Code Examples
Complete, production-ready examples for common gRPC streaming use cases.
Jupiter Swap Monitoring
Monitor all Jupiter swaps in real-time:
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const JUPITER_PROGRAM = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4';
// Load proto
const packageDef = protoLoader.loadSync('geyser.proto');
const proto = grpc.loadPackageDefinition(packageDef);
// Create client
const client = new proto.geyser.Geyser(
'grpc.fortiblox.com:443',
grpc.credentials.createSsl()
);
const metadata = new grpc.Metadata();
metadata.add('x-api-key', process.env.FORTIBLOX_API_KEY);
// Create stream
const stream = client.subscribe(metadata);
// Subscribe to Jupiter transactions
stream.write({
transactions: {
'jupiter_swaps': {
accountInclude: [JUPITER_PROGRAM],
vote: false,
failed: false
}
},
commitment: 'CONFIRMED'
});
// Process swaps
stream.on('data', (update) => {
if (update.transaction) {
const tx = update.transaction.transaction;
const signature = Buffer.from(tx.signature).toString('base64');
console.log('Jupiter Swap:', {
signature: signature.slice(0, 16) + '...',
slot: update.transaction.slot,
accounts: tx.message.accountKeys.length
});
// Parse swap details from transaction
parseJupiterSwap(tx);
}
});
function parseJupiterSwap(tx) {
// Extract token transfers and swap amounts
// Implementation depends on your needs
console.log('Processing Jupiter swap...');
}
// Error handling
stream.on('error', (err) => {
console.error('Stream error:', err.message);
reconnect();
});
function reconnect() {
console.log('Reconnecting in 5 seconds...');
setTimeout(() => {
createStream();
}, 5000);
}import grpc
import os
import base64
from geyser_pb2 import SubscribeRequest, SubscribeRequestFilterTransactions
from geyser_pb2_grpc import GeyserStub
import geyser_pb2
JUPITER_PROGRAM = 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'
def monitor_jupiter_swaps():
# Create channel
credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel('grpc.fortiblox.com:443', credentials)
stub = GeyserStub(channel)
# Metadata
metadata = [('x-api-key', os.getenv('FORTIBLOX_API_KEY'))]
# Create subscription
def request_generator():
yield SubscribeRequest(
transactions={
'jupiter_swaps': SubscribeRequestFilterTransactions(
account_include=[JUPITER_PROGRAM],
vote=False,
failed=False
)
},
commitment=geyser_pb2.CommitmentLevel.CONFIRMED
)
try:
for update in stub.Subscribe(request_generator(), metadata=metadata):
if update.HasField('transaction'):
tx = update.transaction.transaction
sig = base64.b64encode(tx.signature).decode('utf-8')
print(f"Jupiter Swap: {sig[:16]}... at slot {update.transaction.slot}")
# Parse swap details
parse_jupiter_swap(tx)
except grpc.RpcError as e:
print(f"gRPC error: {e.code()} - {e.details()}")
reconnect()
def parse_jupiter_swap(tx):
# Extract and process swap data
print("Processing Jupiter swap...")
pass
def reconnect():
import time
print("Reconnecting in 5 seconds...")
time.sleep(5)
monitor_jupiter_swaps()
if __name__ == '__main__':
monitor_jupiter_swaps()use tonic::{transport::Channel, metadata::MetadataValue, Request};
use tokio_stream::StreamExt;
pub mod geyser {
tonic::include_proto!("geyser");
}
const JUPITER_PROGRAM: &str = "JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
monitor_jupiter_swaps().await
}
async fn monitor_jupiter_swaps() -> Result<(), Box<dyn std::error::Error>> {
let channel = Channel::from_static("https://grpc.fortiblox.com:443")
.connect()
.await?;
let mut client = geyser::geyser_client::GeyserClient::new(channel);
let api_key = std::env::var("FORTIBLOX_API_KEY")?;
let token = MetadataValue::try_from(api_key)?;
let mut request = Request::new(tokio_stream::iter(vec![
geyser::SubscribeRequest {
transactions: std::collections::HashMap::from([(
"jupiter_swaps".to_string(),
geyser::SubscribeRequestFilterTransactions {
account_include: vec![JUPITER_PROGRAM.to_string()],
vote: Some(false),
failed: Some(false),
..Default::default()
}
)]),
commitment: Some(geyser::CommitmentLevel::Confirmed as i32),
..Default::default()
}
]));
request.metadata_mut().insert("x-api-key", token);
let mut stream = client.subscribe(request).await?.into_inner();
while let Some(update) = stream.next().await {
match update {
Ok(msg) => {
if let Some(tx_update) = msg.transaction {
if let Some(tx) = tx_update.transaction {
let sig = hex::encode(&tx.signature);
println!("Jupiter Swap: {}... at slot {}",
&sig[..16], tx_update.slot);
parse_jupiter_swap(&tx);
}
}
}
Err(e) => {
eprintln!("Stream error: {}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
return monitor_jupiter_swaps().await;
}
}
}
Ok(())
}
fn parse_jupiter_swap(tx: &geyser::Transaction) {
println!("Processing Jupiter swap...");
// Parse swap details
}Block Explorer Real-time Feed
Stream blocks and transactions for a block explorer:
const grpc = require('@grpc/grpc-js');
const WebSocket = require('ws');
// WebSocket server for frontend
const wss = new WebSocket.Server({ port: 8080 });
const clients = new Set();
wss.on('connection', (ws) => {
clients.add(ws);
ws.on('close', () => clients.delete(ws));
});
// gRPC client
const metadata = new grpc.Metadata();
metadata.add('x-api-key', process.env.FORTIBLOX_API_KEY);
const stream = client.subscribe(metadata);
// Subscribe to blocks and transactions
stream.write({
blocks: {
'new_blocks': {
accountInclude: []
}
},
transactions: {
'recent_txs': {
vote: false,
failed: false
}
},
slots: {
'slot_updates': {}
}
});
// Forward updates to websocket clients
stream.on('data', (update) => {
let message = null;
if (update.block) {
message = {
type: 'block',
data: {
slot: update.block.slot,
blockhash: Buffer.from(update.block.blockhash).toString('base64'),
transactions: update.block.transactions.length,
blockTime: update.block.blockTime
}
};
} else if (update.transaction) {
message = {
type: 'transaction',
data: {
signature: Buffer.from(update.transaction.transaction.signature).toString('base64'),
slot: update.transaction.slot
}
};
} else if (update.slot) {
message = {
type: 'slot',
data: {
slot: update.slot.slot,
parent: update.slot.parent
}
};
}
if (message) {
const json = JSON.stringify(message);
clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(json);
}
});
}
});NFT Sales Tracker
Track NFT sales across multiple marketplaces:
import grpc
import os
from geyser_pb2 import SubscribeRequest, SubscribeRequestFilterTransactions
from geyser_pb2_grpc import GeyserStub
import geyser_pb2
# Marketplace programs
MARKETPLACES = {
'magic_eden': 'M2mx93ekt1fmXSVkTrUL9xVFHkmME8HTUi5Cyc5aF7K',
'tensor': 'TSWAPaqyCSx2KABk68Shruf4rp7CxcNi8hAsbdwmHbN',
'solanart': 'CJsLwbP1iu5DuUikHEJnLfANgKy6stB2uFgvBBHoyxwz'
}
def track_nft_sales():
credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel('grpc.fortiblox.com:443', credentials)
stub = GeyserStub(channel)
metadata = [('x-api-key', os.getenv('FORTIBLOX_API_KEY'))]
def request_generator():
# Subscribe to all marketplaces
transactions = {}
for name, program in MARKETPLACES.items():
transactions[name] = SubscribeRequestFilterTransactions(
account_include=[program],
vote=False,
failed=False
)
yield SubscribeRequest(
transactions=transactions,
commitment=geyser_pb2.CommitmentLevel.CONFIRMED
)
try:
for update in stub.Subscribe(request_generator(), metadata=metadata):
if update.HasField('transaction'):
tx = update.transaction
# Detect which marketplace
marketplace = detect_marketplace(tx)
if marketplace:
sale = parse_nft_sale(tx, marketplace)
if sale:
print(f"NFT Sale on {marketplace}:")
print(f" Price: {sale['price']} SOL")
print(f" NFT: {sale['mint']}")
print(f" Buyer: {sale['buyer'][:8]}...")
# Store in database
store_sale(sale)
# Send notifications
notify_sale(sale)
except grpc.RpcError as e:
print(f"Error: {e}")
def detect_marketplace(tx):
# Detect marketplace from transaction accounts
pass
def parse_nft_sale(tx, marketplace):
# Parse sale details from transaction
return {
'marketplace': marketplace,
'mint': 'NFTMintAddress...',
'price': 1.5,
'buyer': 'BuyerAddress...',
'seller': 'SellerAddress...',
'signature': tx.transaction.signature.hex()
}
def store_sale(sale):
# Store in database
pass
def notify_sale(sale):
# Send Discord/Telegram notification
pass
if __name__ == '__main__':
track_nft_sales()Wallet Activity Monitor
Monitor specific wallet addresses:
package main
import (
"context"
"fmt"
"log"
"os"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
pb "your-module/geyser"
)
var MONITORED_WALLETS = []string{
"Wallet1111111111111111111111111111111111111",
"Wallet2222222222222222222222222222222222222",
}
func main() {
creds := credentials.NewTLS(nil)
conn, err := grpc.Dial("grpc.fortiblox.com:443",
grpc.WithTransportCredentials(creds))
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pb.NewGeyserClient(conn)
md := metadata.New(map[string]string{
"x-api-key": os.Getenv("FORTIBLOX_API_KEY"),
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
stream, err := client.Subscribe(ctx)
if err != nil {
log.Fatal(err)
}
// Subscribe to wallet transactions
req := &pb.SubscribeRequest{
Transactions: map[string]*pb.SubscribeRequestFilterTransactions{
"wallet_txs": {
AccountInclude: MONITORED_WALLETS,
Vote: false,
Failed: false,
},
},
Accounts: map[string]*pb.SubscribeRequestFilterAccounts{
"wallet_updates": {
Account: MONITORED_WALLETS,
},
},
Commitment: pb.CommitmentLevel_CONFIRMED,
}
if err := stream.Send(req); err != nil {
log.Fatal(err)
}
// Process updates
for {
update, err := stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
break
}
if tx := update.GetTransaction(); tx != nil {
fmt.Printf("Transaction: %x at slot %d\n",
tx.Transaction.Signature[:8], tx.Slot)
// Analyze transaction
analyzeTransaction(tx)
}
if acct := update.GetAccount(); acct != nil {
fmt.Printf("Account updated: %s, balance: %d lamports\n",
acct.Account.Pubkey, acct.Account.Lamports)
// Check for large balance changes
checkBalanceChange(acct)
}
}
}
func analyzeTransaction(tx *pb.SubscribeUpdateTransaction) {
// Analyze transaction details
}
func checkBalanceChange(acct *pb.SubscribeUpdateAccount) {
// Check for significant balance changes
}Historical Data Replay
Replay past transactions for backtesting:
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
async function replayHistory(startSlot, endSlot) {
const packageDef = protoLoader.loadSync('geyser.proto');
const proto = grpc.loadPackageDefinition(packageDef);
const client = new proto.geyser.Geyser(
'grpc.fortiblox.com:443',
grpc.credentials.createSsl()
);
const metadata = new grpc.Metadata();
metadata.add('x-api-key', process.env.FORTIBLOX_API_KEY);
const stream = client.subscribe(metadata);
// Subscribe with slot range for historical replay
stream.write({
transactions: {
'historical': {
accountInclude: ['JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'],
vote: false
}
},
slots: {
'slot_range': {
filterByCommitment: true
}
},
commitment: 'FINALIZED'
});
let txCount = 0;
let currentSlot = startSlot;
stream.on('data', (update) => {
if (update.transaction) {
txCount++;
if (txCount % 100 === 0) {
console.log(`Processed ${txCount} transactions at slot ${update.transaction.slot}`);
}
// Process historical transaction
processHistoricalTx(update.transaction);
}
if (update.slot) {
currentSlot = update.slot.slot;
if (currentSlot >= endSlot) {
console.log(`Reached end slot ${endSlot}, processed ${txCount} transactions`);
stream.end();
}
}
});
stream.on('end', () => {
console.log(`Historical replay complete. Total transactions: ${txCount}`);
});
}
function processHistoricalTx(tx) {
// Backtest trading strategy
// Analyze historical patterns
// Generate reports
}
// Replay last 1000 slots
const currentSlot = 150234567; // Get from getSlot() first
replayHistory(currentSlot - 1000, currentSlot);Next Steps
Protocol Reference
Complete protobuf definitions
Getting Started
Install and connect to gRPC
Streaming Guide
Learn streaming patterns
Overview
gRPC capabilities and benefits
Support
- Discord: discord.gg/fortiblox
- GitHub: github.com/fortiblox/geyser-grpc/examples
- Email: [email protected]