Slot Streaming
Stream real-time slot updates for chain head tracking and block production monitoring
Slot Streaming
Subscribe to real-time slot updates from FortiBlox gRPC streaming. Monitor blockchain slot production, track chain head, and detect missed slots with <30 second freshness guaranteed by Redis pub/sub.
Now Live! Slot streaming is fully operational with 227+ entries in Redis and <30s freshness. Perfect for real-time blockchain monitoring applications.
What are Slots?
In Solana and X1 Blockchain, a slot is a fixed period of time (approximately 400ms) during which a leader validator can produce a block. Each slot represents an opportunity for block production, though not every slot results in a block (some slots are "skipped" if the validator is offline or slow).
Slot Data Structure
{
"slot": 11163364,
"parent": 11163363,
"status": "confirmed",
"timestamp": 1764035684
}| Field | Type | Description |
|---|---|---|
slot | uint64 | Current slot number |
parent | uint64 | Parent slot number (typically slot - 1) |
status | string | Commitment level: processed, confirmed, or finalized |
timestamp | int64 | Unix timestamp when slot was produced |
Why Stream Slots?
Use Cases
1. Block Explorer & Analytics
- Display real-time chain head
- Calculate block production rates
- Visualize network health
- Track validator performance
2. Trading & MEV Bots
- Precise timing for transaction submission
- Track block production timing
- Monitor network congestion
- Optimize transaction ordering
3. Validator Monitoring
- Detect missed slots in real-time
- Calculate uptime metrics
- Monitor slot performance
- Alert on validator issues
4. Network Health Dashboards
- Real-time slot production visualization
- Network speed metrics
- Historical slot production charts
- Chain finality tracking
5. DeFi Applications
- Trigger time-sensitive operations
- Monitor block timing for liquidations
- Track confirmation timing
- Optimize transaction submission timing
Connection Setup
Endpoint Details
| Parameter | Value |
|---|---|
| Host | grpc.fortiblox.com |
| Port | 10002 |
| Protocol | HTTP/2 (plaintext, not TLS) |
| Authentication | SHA-256 based via x-api-key header |
| Tier Required | Professional+ |
| Data Source | Redis pub/sub (geyser:slots stream) |
| Freshness | <30 seconds |
| Entries | 227+ in Redis |
Important: The endpoint uses HTTP/2 plaintext (insecure), not HTTPS/TLS. Make sure to configure your client accordingly.
Code Examples
Complete Slot Streaming Example
// slot-stream.js
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
// Load protocol buffers
const packageDefinition = protoLoader.loadSync('geyser.proto', {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const proto = grpc.loadPackageDefinition(packageDefinition);
// Create client (plaintext/insecure)
const client = new proto.geyser.Geyser(
'grpc.fortiblox.com:10002',
grpc.credentials.createInsecure()
);
// Add API key metadata
const metadata = new grpc.Metadata();
metadata.add('x-api-key', process.env.FORTIBLOX_API_KEY);
// Create bidirectional stream
const stream = client.subscribe(metadata);
// Subscribe to slots
stream.write({
slots: {
'slot_updates': {} // Empty filter subscribes to all slots
},
commitment: 'CONFIRMED'
});
// Track slot metrics
let lastSlot = 0;
let missedSlots = 0;
let startTime = Date.now();
let slotCount = 0;
// Handle slot updates
stream.on('data', (update) => {
if (update.slot) {
const slot = update.slot;
slotCount++;
// Detect missed slots
if (lastSlot > 0 && slot.slot > lastSlot + 1) {
const missed = slot.slot - lastSlot - 1;
missedSlots += missed;
console.log(`⚠️ Missed ${missed} slot(s) between ${lastSlot} and ${slot.slot}`);
}
console.log(`📊 Slot ${slot.slot} | Parent: ${slot.parent} | Status: ${slot.status}`);
// Calculate metrics every 100 slots
if (slotCount % 100 === 0) {
const elapsed = (Date.now() - startTime) / 1000;
const slotsPerSecond = slotCount / elapsed;
const missRate = (missedSlots / slotCount * 100).toFixed(2);
console.log(`\n📈 Metrics (last ${slotCount} slots):`);
console.log(` Rate: ${slotsPerSecond.toFixed(2)} slots/sec`);
console.log(` Missed: ${missedSlots} (${missRate}%)\n`);
}
lastSlot = slot.slot;
}
});
// Handle errors with reconnection
stream.on('error', (err) => {
console.error('❌ Stream error:', err.message);
// Reconnect after 5 seconds
if (err.code === grpc.status.UNAVAILABLE) {
console.log('🔄 Reconnecting in 5 seconds...');
setTimeout(() => reconnect(), 5000);
}
});
stream.on('end', () => {
console.log('Stream ended, reconnecting...');
setTimeout(() => reconnect(), 1000);
});
function reconnect() {
console.log('🔌 Reconnecting...');
// Restart connection
const newStream = client.subscribe(metadata);
// ... (resubscribe with same logic)
}
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\n👋 Shutting down...');
stream.end();
process.exit(0);
});
console.log('🚀 Slot streaming started...');
console.log(' Endpoint: grpc.fortiblox.com:10002');
console.log(' Press Ctrl+C to stop\n');Run
export FORTIBLOX_API_KEY="your_api_key_here"
node slot-stream.jsComplete Slot Streaming Example
# slot_stream.py
import grpc
import os
import time
from datetime import datetime
import geyser_pb2
import geyser_pb2_grpc
class SlotMonitor:
def __init__(self, api_key):
self.api_key = api_key
self.last_slot = 0
self.missed_slots = 0
self.slot_count = 0
self.start_time = time.time()
def connect(self):
# Create insecure channel (plaintext)
channel = grpc.insecure_channel('grpc.fortiblox.com:10002')
stub = geyser_pb2_grpc.GeyserStub(channel)
return stub
def request_generator(self):
"""Generate subscription request for slots"""
yield geyser_pb2.SubscribeRequest(
slots={
'slot_updates': geyser_pb2.SubscribeRequestFilterSlots()
},
commitment=geyser_pb2.CommitmentLevel.CONFIRMED
)
def monitor(self):
print("🚀 Slot streaming started...")
print(" Endpoint: grpc.fortiblox.com:10002")
print(" Press Ctrl+C to stop\n")
stub = self.connect()
metadata = [('x-api-key', self.api_key)]
try:
for update in stub.Subscribe(self.request_generator(), metadata=metadata):
if update.HasField('slot'):
self.process_slot(update.slot)
except grpc.RpcError as e:
print(f"❌ gRPC error: {e.code()} - {e.details()}")
print("🔄 Reconnecting in 5 seconds...")
time.sleep(5)
self.monitor() # Reconnect
except KeyboardInterrupt:
print("\n👋 Shutting down...")
def process_slot(self, slot):
self.slot_count += 1
# Detect missed slots
if self.last_slot > 0 and slot.slot > self.last_slot + 1:
missed = slot.slot - self.last_slot - 1
self.missed_slots += missed
print(f"⚠️ Missed {missed} slot(s) between {self.last_slot} and {slot.slot}")
# Format timestamp
timestamp = datetime.fromtimestamp(slot.timestamp).strftime('%H:%M:%S')
print(f"📊 Slot {slot.slot} | Parent: {slot.parent} | Status: {slot.status} | Time: {timestamp}")
# Calculate metrics every 100 slots
if self.slot_count % 100 == 0:
elapsed = time.time() - self.start_time
slots_per_second = self.slot_count / elapsed
miss_rate = (self.missed_slots / self.slot_count * 100)
print(f"\n📈 Metrics (last {self.slot_count} slots):")
print(f" Rate: {slots_per_second:.2f} slots/sec")
print(f" Missed: {self.missed_slots} ({miss_rate:.2f}%)\n")
self.last_slot = slot.slot
if __name__ == '__main__':
api_key = os.getenv('FORTIBLOX_API_KEY')
if not api_key:
print("Error: FORTIBLOX_API_KEY environment variable not set")
exit(1)
monitor = SlotMonitor(api_key)
monitor.monitor()Setup & Run
# Install dependencies
pip install grpcio grpcio-tools
# Download and compile proto
curl -O https://raw.githubusercontent.com/rpcpool/yellowstone-grpc/master/yellowstone-grpc-proto/proto/geyser.proto
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. geyser.proto
# Run
export FORTIBLOX_API_KEY="your_api_key_here"
python slot_stream.pyComplete Slot Streaming Example
// src/main.rs
use tonic::{transport::Channel, metadata::MetadataValue, Request};
use tokio_stream::StreamExt;
use std::time::{Duration, Instant};
pub mod geyser {
tonic::include_proto!("geyser");
}
use geyser::{geyser_client::GeyserClient, SubscribeRequest};
struct SlotMetrics {
last_slot: u64,
missed_slots: u64,
slot_count: u64,
start_time: Instant,
}
impl SlotMetrics {
fn new() -> Self {
Self {
last_slot: 0,
missed_slots: 0,
slot_count: 0,
start_time: Instant::now(),
}
}
fn process_slot(&mut self, slot: u64, parent: u64, status: &str) {
self.slot_count += 1;
// Detect missed slots
if self.last_slot > 0 && slot > self.last_slot + 1 {
let missed = slot - self.last_slot - 1;
self.missed_slots += missed;
println!("⚠️ Missed {} slot(s) between {} and {}", missed, self.last_slot, slot);
}
println!("📊 Slot {} | Parent: {} | Status: {}", slot, parent, status);
// Print metrics every 100 slots
if self.slot_count % 100 == 0 {
let elapsed = self.start_time.elapsed().as_secs_f64();
let slots_per_second = self.slot_count as f64 / elapsed;
let miss_rate = (self.missed_slots as f64 / self.slot_count as f64) * 100.0;
println!("\n📈 Metrics (last {} slots):", self.slot_count);
println!(" Rate: {:.2} slots/sec", slots_per_second);
println!(" Missed: {} ({:.2}%)\n", self.missed_slots, miss_rate);
}
self.last_slot = slot;
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 Slot streaming started...");
println!(" Endpoint: grpc.fortiblox.com:10002");
println!(" Press Ctrl+C to stop\n");
let mut metrics = SlotMetrics::new();
loop {
if let Err(e) = run_slot_stream(&mut metrics).await {
eprintln!("❌ Error: {}", e);
eprintln!("🔄 Reconnecting in 5 seconds...");
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
async fn run_slot_stream(metrics: &mut SlotMetrics) -> Result<(), Box<dyn std::error::Error>> {
// Create channel (insecure/plaintext)
let channel = Channel::from_static("http://grpc.fortiblox.com:10002")
.connect()
.await?;
let mut client = GeyserClient::new(channel);
// Create request with API key
let api_key = std::env::var("FORTIBLOX_API_KEY")?;
let token = MetadataValue::try_from(api_key)?;
let mut request = Request::new(tokio_stream::iter(vec![
SubscribeRequest {
slots: std::collections::HashMap::from([(
"slot_updates".to_string(),
geyser::SubscribeRequestFilterSlots::default()
)]),
commitment: Some(geyser::CommitmentLevel::Confirmed as i32),
..Default::default()
}
]));
request.metadata_mut().insert("x-api-key", token);
// Subscribe
let mut stream = client.subscribe(request).await?.into_inner();
// Process updates
while let Some(update) = stream.next().await {
match update {
Ok(msg) => {
if let Some(slot) = msg.slot {
metrics.process_slot(
slot.slot,
slot.parent,
&format!("{:?}", slot.status)
);
}
}
Err(e) => {
eprintln!("Stream error: {}", e);
return Err(Box::new(e));
}
}
}
Ok(())
}Cargo.toml
[dependencies]
tonic = "0.11"
prost = "0.12"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
[build-dependencies]
tonic-build = "0.11"Build & Run
export FORTIBLOX_API_KEY="your_api_key_here"
cargo run --releaseComplete Slot Streaming Example
// main.go
package main
import (
"context"
"fmt"
"io"
"log"
"os"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
pb "your-module/geyser"
)
type SlotMetrics struct {
lastSlot uint64
missedSlots uint64
slotCount uint64
startTime time.Time
}
func NewSlotMetrics() *SlotMetrics {
return &SlotMetrics{
startTime: time.Now(),
}
}
func (m *SlotMetrics) ProcessSlot(slot, parent uint64, status string) {
m.slotCount++
// Detect missed slots
if m.lastSlot > 0 && slot > m.lastSlot+1 {
missed := slot - m.lastSlot - 1
m.missedSlots += missed
fmt.Printf("⚠️ Missed %d slot(s) between %d and %d\n", missed, m.lastSlot, slot)
}
fmt.Printf("📊 Slot %d | Parent: %d | Status: %s\n", slot, parent, status)
// Print metrics every 100 slots
if m.slotCount%100 == 0 {
elapsed := time.Since(m.startTime).Seconds()
slotsPerSecond := float64(m.slotCount) / elapsed
missRate := (float64(m.missedSlots) / float64(m.slotCount)) * 100
fmt.Printf("\n📈 Metrics (last %d slots):\n", m.slotCount)
fmt.Printf(" Rate: %.2f slots/sec\n", slotsPerSecond)
fmt.Printf(" Missed: %d (%.2f%%)\n\n", m.missedSlots, missRate)
}
m.lastSlot = slot
}
func main() {
fmt.Println("🚀 Slot streaming started...")
fmt.Println(" Endpoint: grpc.fortiblox.com:10002")
fmt.Println(" Press Ctrl+C to stop\n")
apiKey := os.Getenv("FORTIBLOX_API_KEY")
if apiKey == "" {
log.Fatal("FORTIBLOX_API_KEY environment variable not set")
}
metrics := NewSlotMetrics()
for {
if err := runSlotStream(apiKey, metrics); err != nil {
log.Printf("❌ Error: %v", err)
log.Println("🔄 Reconnecting in 5 seconds...")
time.Sleep(5 * time.Second)
}
}
}
func runSlotStream(apiKey string, metrics *SlotMetrics) error {
// Create insecure connection (plaintext)
conn, err := grpc.Dial(
"grpc.fortiblox.com:10002",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return fmt.Errorf("failed to connect: %w", err)
}
defer conn.Close()
// Create client
client := pb.NewGeyserClient(conn)
// Add API key to context
md := metadata.New(map[string]string{
"x-api-key": apiKey,
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
// Create stream
stream, err := client.Subscribe(ctx)
if err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
}
// Send subscription request
req := &pb.SubscribeRequest{
Slots: map[string]*pb.SubscribeRequestFilterSlots{
"slot_updates": {},
},
Commitment: pb.CommitmentLevel_CONFIRMED,
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
// Receive updates
for {
update, err := stream.Recv()
if err == io.EOF {
return fmt.Errorf("stream ended")
}
if err != nil {
return fmt.Errorf("stream error: %w", err)
}
if slot := update.GetSlot(); slot != nil {
metrics.ProcessSlot(slot.Slot, slot.Parent, slot.Status.String())
}
}
}Run
export FORTIBLOX_API_KEY="your_api_key_here"
go run main.goComplete Slot Streaming Example
// SlotStreamClient.java
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
public class SlotStreamClient {
private final GeyserGrpc.GeyserStub asyncStub;
private long lastSlot = 0;
private long missedSlots = 0;
private long slotCount = 0;
private long startTime = System.currentTimeMillis();
public SlotStreamClient(String host, int port, String apiKey) {
// Create insecure channel (plaintext)
ManagedChannel channel = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext()
.build();
// Add API key metadata
Metadata metadata = new Metadata();
Metadata.Key<String> key = Metadata.Key.of("x-api-key", Metadata.ASCII_STRING_MARSHALLER);
metadata.put(key, apiKey);
this.asyncStub = GeyserGrpc.newStub(channel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));
}
public void startStreaming() {
System.out.println("🚀 Slot streaming started...");
System.out.println(" Endpoint: grpc.fortiblox.com:10002");
System.out.println(" Press Ctrl+C to stop\n");
StreamObserver<SubscribeRequest> requestObserver = asyncStub.subscribe(
new StreamObserver<SubscribeUpdate>() {
@Override
public void onNext(SubscribeUpdate update) {
if (update.hasSlot()) {
processSlot(update.getSlot());
}
}
@Override
public void onError(Throwable t) {
System.err.println("❌ Stream error: " + t.getMessage());
System.out.println("🔄 Reconnecting in 5 seconds...");
try {
Thread.sleep(5000);
startStreaming(); // Reconnect
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void onCompleted() {
System.out.println("Stream completed");
}
}
);
// Send subscription request
SubscribeRequest request = SubscribeRequest.newBuilder()
.putSlots("slot_updates", SubscribeRequestFilterSlots.getDefaultInstance())
.setCommitment(CommitmentLevel.CONFIRMED)
.build();
requestObserver.onNext(request);
}
private void processSlot(SubscribeUpdateSlot slot) {
slotCount++;
// Detect missed slots
if (lastSlot > 0 && slot.getSlot() > lastSlot + 1) {
long missed = slot.getSlot() - lastSlot - 1;
missedSlots += missed;
System.out.printf("⚠️ Missed %d slot(s) between %d and %d%n",
missed, lastSlot, slot.getSlot());
}
System.out.printf("📊 Slot %d | Parent: %d | Status: %s%n",
slot.getSlot(), slot.getParent(), slot.getStatus());
// Print metrics every 100 slots
if (slotCount % 100 == 0) {
long elapsed = System.currentTimeMillis() - startTime;
double slotsPerSecond = slotCount / (elapsed / 1000.0);
double missRate = (missedSlots / (double) slotCount) * 100;
System.out.printf("%n📈 Metrics (last %d slots):%n", slotCount);
System.out.printf(" Rate: %.2f slots/sec%n", slotsPerSecond);
System.out.printf(" Missed: %d (%.2f%%)%n%n", missedSlots, missRate);
}
lastSlot = slot.getSlot();
}
public static void main(String[] args) {
String apiKey = System.getenv("FORTIBLOX_API_KEY");
if (apiKey == null || apiKey.isEmpty()) {
System.err.println("Error: FORTIBLOX_API_KEY environment variable not set");
System.exit(1);
}
SlotStreamClient client = new SlotStreamClient("grpc.fortiblox.com", 10002, apiKey);
client.startStreaming();
// Keep alive
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
System.out.println("\n👋 Shutting down...");
}
}
}pom.xml
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.58.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.58.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.58.0</version>
</dependency>
</dependencies>Run
export FORTIBLOX_API_KEY="your_api_key_here"
mvn compile exec:java -Dexec.mainClass="SlotStreamClient"Real-World Scenarios
1. Block Explorer Chain Head Display
Display real-time chain head with slot updates:
// chain-head-tracker.js
class ChainHeadTracker {
constructor() {
this.currentSlot = 0;
this.confirmedSlot = 0;
this.finalizedSlot = 0;
}
async start() {
const stream = await this.createStream();
stream.on('data', (update) => {
if (update.slot) {
const { slot, status } = update.slot;
// Update tracked slots based on commitment
switch (status) {
case 'processed':
this.currentSlot = Math.max(this.currentSlot, slot);
break;
case 'confirmed':
this.confirmedSlot = Math.max(this.confirmedSlot, slot);
break;
case 'finalized':
this.finalizedSlot = Math.max(this.finalizedSlot, slot);
break;
}
// Update UI
this.updateUI({
current: this.currentSlot,
confirmed: this.confirmedSlot,
finalized: this.finalizedSlot,
lag: this.currentSlot - this.finalizedSlot
});
}
});
}
updateUI(data) {
// Push to websocket clients, update dashboard, etc.
console.log(`Current: ${data.current} | Confirmed: ${data.confirmed} | Finalized: ${data.finalized} | Lag: ${data.lag}`);
}
}2. Validator Performance Monitor
Track validator slot performance:
# validator_monitor.py
class ValidatorMonitor:
def __init__(self, validator_identity):
self.validator_identity = validator_identity
self.slots_produced = 0
self.slots_missed = 0
self.expected_slots = []
async def monitor(self):
async for update in self.stream_slots():
if update.HasField('slot'):
slot_number = update.slot.slot
leader = await self.get_slot_leader(slot_number)
if leader == self.validator_identity:
# This validator was the leader
if self.is_slot_produced(slot_number):
self.slots_produced += 1
print(f"✅ Validator produced slot {slot_number}")
else:
self.slots_missed += 1
print(f"❌ Validator missed slot {slot_number}")
# Send alert
await self.send_alert(f"Missed slot {slot_number}")
# Calculate uptime
uptime = (self.slots_produced / (self.slots_produced + self.slots_missed)) * 100
print(f"📊 Uptime: {uptime:.2f}%")3. MEV Bot Timing Optimization
Optimize transaction submission timing:
// mev_bot.rs
struct MevBot {
current_slot: u64,
next_slot_estimate: Instant,
}
impl MevBot {
async fn monitor_slots(&mut self) {
let mut stream = self.create_slot_stream().await;
while let Some(update) = stream.next().await {
if let Some(slot) = update.slot {
self.current_slot = slot.slot;
// Estimate next slot timing (400ms per slot)
self.next_slot_estimate = Instant::now() + Duration::from_millis(400);
// Check if we should submit pending transactions
self.check_pending_transactions().await;
}
}
}
async fn check_pending_transactions(&self) {
// Submit transactions optimally timed for next slot
if self.has_pending_profitable_transaction() {
// Calculate optimal submission time
let submit_time = self.next_slot_estimate - Duration::from_millis(100);
tokio::time::sleep_until(submit_time.into()).await;
self.submit_transaction().await;
}
}
}4. Network Health Dashboard
Real-time network metrics:
// network_dashboard.go
type NetworkMetrics struct {
SlotRate float64
AverageBlockTime float64
MissedSlotRate float64
LastSlot uint64
}
func (d *Dashboard) MonitorNetwork() {
var slots []uint64
var timestamps []time.Time
for update := range d.slotStream {
if slot := update.GetSlot(); slot != nil {
slots = append(slots, slot.Slot)
timestamps = append(timestamps, time.Now())
// Keep last 1000 slots
if len(slots) > 1000 {
slots = slots[1:]
timestamps = timestamps[1:]
}
// Calculate metrics
metrics := d.calculateMetrics(slots, timestamps)
// Update dashboard
d.updateMetrics(metrics)
// Check for anomalies
if metrics.SlotRate < 2.0 {
d.sendAlert("⚠️ Network slow: " + fmt.Sprintf("%.2f slots/sec", metrics.SlotRate))
}
}
}
}
func (d *Dashboard) calculateMetrics(slots []uint64, timestamps []time.Time) NetworkMetrics {
// Calculate slot production rate
elapsed := timestamps[len(timestamps)-1].Sub(timestamps[0]).Seconds()
slotRate := float64(len(slots)) / elapsed
// Calculate missed slots
expectedSlots := slots[len(slots)-1] - slots[0]
actualSlots := uint64(len(slots))
missedSlots := expectedSlots - actualSlots
missRate := float64(missedSlots) / float64(expectedSlots) * 100
return NetworkMetrics{
SlotRate: slotRate,
MissedSlotRate: missRate,
LastSlot: slots[len(slots)-1],
}
}Performance & Best Practices
Connection Management
1. Use Persistent Connections
// ✅ Good: Single persistent connection
const stream = client.subscribe(metadata);
stream.on('data', handleSlot);
// ❌ Bad: Creating new connections repeatedly
setInterval(() => {
const stream = client.subscribe(metadata); // Don't do this!
}, 1000);2. Implement Reconnection Logic
function createSlotStream() {
const stream = client.subscribe(metadata);
stream.on('error', (err) => {
console.error('Stream error:', err);
setTimeout(() => createSlotStream(), 5000);
});
stream.on('end', () => {
console.log('Stream ended');
setTimeout(() => createSlotStream(), 1000);
});
return stream;
}3. Handle Backpressure
# Implement buffering for high-throughput scenarios
from collections import deque
slot_buffer = deque(maxlen=1000)
for update in stub.Subscribe(request_generator(), metadata=metadata):
if update.HasField('slot'):
slot_buffer.append(update.slot)
# Process in batches
if len(slot_buffer) >= 100:
process_slot_batch(list(slot_buffer))
slot_buffer.clear()Data Processing
1. Efficient Slot Tracking
// Use circular buffer for memory efficiency
class SlotTracker {
constructor(size = 1000) {
this.slots = new Array(size);
this.index = 0;
this.size = size;
}
add(slot) {
this.slots[this.index % this.size] = slot;
this.index++;
}
getRecent(count) {
const start = Math.max(0, this.index - count);
return this.slots.slice(start, this.index);
}
}2. Detect Slot Gaps Efficiently
struct SlotGapDetector {
last_slot: Option<u64>,
gaps: Vec<(u64, u64)>, // (from, to)
}
impl SlotGapDetector {
fn process(&mut self, slot: u64) -> Option<(u64, u64)> {
if let Some(last) = self.last_slot {
if slot > last + 1 {
let gap = (last + 1, slot - 1);
self.gaps.push(gap);
self.last_slot = Some(slot);
return Some(gap);
}
}
self.last_slot = Some(slot);
None
}
}Error Handling
1. Exponential Backoff
import time
def connect_with_retry(max_retries=5):
retry = 0
while retry < max_retries:
try:
return create_slot_stream()
except grpc.RpcError as e:
retry += 1
wait = min(2 ** retry, 60) # Cap at 60 seconds
print(f"Retry {retry}/{max_retries} in {wait}s...")
time.sleep(wait)
raise Exception("Max retries exceeded")2. Graceful Degradation
class ResilientSlotMonitor {
constructor() {
this.fallbackEnabled = true;
this.lastSlot = 0;
}
async monitor() {
try {
await this.streamSlots();
} catch (err) {
console.error('Streaming failed, falling back to polling');
if (this.fallbackEnabled) {
await this.pollSlots();
}
}
}
async pollSlots() {
// Fallback to HTTP polling if streaming fails
setInterval(async () => {
const slot = await this.fetchLatestSlot();
this.processSlot(slot);
}, 1000);
}
}Filtering & Subscription Options
Basic Subscription
Subscribe to all slots (recommended):
stream.write({
slots: {
'all_slots': {}
}
});Commitment Level Filtering
Filter by commitment level:
# Only confirmed slots
request = geyser_pb2.SubscribeRequest(
slots={
'confirmed_slots': geyser_pb2.SubscribeRequestFilterSlots()
},
commitment=geyser_pb2.CommitmentLevel.CONFIRMED
)
# Only finalized slots
request = geyser_pb2.SubscribeRequest(
slots={
'finalized_slots': geyser_pb2.SubscribeRequestFilterSlots()
},
commitment=geyser_pb2.CommitmentLevel.FINALIZED
)Troubleshooting
Common Issues
1. Connection Refused
Error: 14 UNAVAILABLE: Connection refusedSolution:
- Verify endpoint:
grpc.fortiblox.com:10002 - Ensure using insecure/plaintext credentials
- Check Professional+ tier access
- Verify firewall allows outbound port 10002
2. Authentication Errors
Error: 16 UNAUTHENTICATED: Invalid API keySolution:
- Verify API key in metadata header
- Check header name is exactly
x-api-key - Ensure Professional+ tier subscription
- Regenerate API key if needed
3. No Data Received
Solution:
- Check subscription request format
- Verify commitment level
- Ensure stream is properly initialized
- Check for error callbacks
4. High Memory Usage
Solution:
- Implement circular buffers
- Process slots in batches
- Clear old data periodically
- Use efficient data structures
Debug Mode
Enable debug logging:
// Node.js
process.env.GRPC_VERBOSITY = 'DEBUG';
process.env.GRPC_TRACE = 'all';
// Python
import logging
logging.basicConfig(level=logging.DEBUG)Rate Limits & Quotas
| Tier | Concurrent Streams | Slots/Second | Monthly Quota |
|---|---|---|---|
| Professional | 10 | Unlimited | 10M slots |
| Enterprise | 100 | Unlimited | Unlimited |
Slot streaming counts toward your monthly streaming quota. Each slot update consumed counts as 1 event.
Next Steps
Transaction Streaming
Stream transaction updates with filtering
Getting Started
gRPC installation and setup
gRPC Overview
Learn about gRPC streaming capabilities
Historical Replay
Replay historical slot data
Support
Need help with slot streaming?
- Discord: discord.gg/fortiblox
- Email: [email protected]
- Status: status.fortiblox.com
- Examples: github.com/fortiblox/grpc-examples
Technical Details:
- Redis Stream: geyser:slots
- Current Entries: 227+
- Freshness: <30 seconds
- Latency: <50ms avg
- Protocol: HTTP/2 plaintext
- Authentication: SHA-256 via x-api-key header