FortiBlox LogoFortiBlox Docs
NexusGrpc streaming

Slot Streaming

Stream real-time slot updates for chain head tracking and block production monitoring

Slot Streaming

Subscribe to real-time slot updates from FortiBlox gRPC streaming. Monitor blockchain slot production, track chain head, and detect missed slots with <30 second freshness guaranteed by Redis pub/sub.

Now Live! Slot streaming is fully operational with 227+ entries in Redis and <30s freshness. Perfect for real-time blockchain monitoring applications.

What are Slots?

In Solana and X1 Blockchain, a slot is a fixed period of time (approximately 400ms) during which a leader validator can produce a block. Each slot represents an opportunity for block production, though not every slot results in a block (some slots are "skipped" if the validator is offline or slow).

Slot Data Structure

{
  "slot": 11163364,
  "parent": 11163363,
  "status": "confirmed",
  "timestamp": 1764035684
}
FieldTypeDescription
slotuint64Current slot number
parentuint64Parent slot number (typically slot - 1)
statusstringCommitment level: processed, confirmed, or finalized
timestampint64Unix timestamp when slot was produced

Why Stream Slots?

Use Cases

1. Block Explorer & Analytics

  • Display real-time chain head
  • Calculate block production rates
  • Visualize network health
  • Track validator performance

2. Trading & MEV Bots

  • Precise timing for transaction submission
  • Track block production timing
  • Monitor network congestion
  • Optimize transaction ordering

3. Validator Monitoring

  • Detect missed slots in real-time
  • Calculate uptime metrics
  • Monitor slot performance
  • Alert on validator issues

4. Network Health Dashboards

  • Real-time slot production visualization
  • Network speed metrics
  • Historical slot production charts
  • Chain finality tracking

5. DeFi Applications

  • Trigger time-sensitive operations
  • Monitor block timing for liquidations
  • Track confirmation timing
  • Optimize transaction submission timing

Connection Setup

Endpoint Details

ParameterValue
Hostgrpc.fortiblox.com
Port10002
ProtocolHTTP/2 (plaintext, not TLS)
AuthenticationSHA-256 based via x-api-key header
Tier RequiredProfessional+
Data SourceRedis pub/sub (geyser:slots stream)
Freshness<30 seconds
Entries227+ in Redis

Important: The endpoint uses HTTP/2 plaintext (insecure), not HTTPS/TLS. Make sure to configure your client accordingly.

Code Examples

Complete Slot Streaming Example

// slot-stream.js
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');

// Load protocol buffers
const packageDefinition = protoLoader.loadSync('geyser.proto', {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true
});

const proto = grpc.loadPackageDefinition(packageDefinition);

// Create client (plaintext/insecure)
const client = new proto.geyser.Geyser(
  'grpc.fortiblox.com:10002',
  grpc.credentials.createInsecure()
);

// Add API key metadata
const metadata = new grpc.Metadata();
metadata.add('x-api-key', process.env.FORTIBLOX_API_KEY);

// Create bidirectional stream
const stream = client.subscribe(metadata);

// Subscribe to slots
stream.write({
  slots: {
    'slot_updates': {}  // Empty filter subscribes to all slots
  },
  commitment: 'CONFIRMED'
});

// Track slot metrics
let lastSlot = 0;
let missedSlots = 0;
let startTime = Date.now();
let slotCount = 0;

// Handle slot updates
stream.on('data', (update) => {
  if (update.slot) {
    const slot = update.slot;
    slotCount++;

    // Detect missed slots
    if (lastSlot > 0 && slot.slot > lastSlot + 1) {
      const missed = slot.slot - lastSlot - 1;
      missedSlots += missed;
      console.log(`⚠️  Missed ${missed} slot(s) between ${lastSlot} and ${slot.slot}`);
    }

    console.log(`📊 Slot ${slot.slot} | Parent: ${slot.parent} | Status: ${slot.status}`);

    // Calculate metrics every 100 slots
    if (slotCount % 100 === 0) {
      const elapsed = (Date.now() - startTime) / 1000;
      const slotsPerSecond = slotCount / elapsed;
      const missRate = (missedSlots / slotCount * 100).toFixed(2);

      console.log(`\n📈 Metrics (last ${slotCount} slots):`);
      console.log(`   Rate: ${slotsPerSecond.toFixed(2)} slots/sec`);
      console.log(`   Missed: ${missedSlots} (${missRate}%)\n`);
    }

    lastSlot = slot.slot;
  }
});

// Handle errors with reconnection
stream.on('error', (err) => {
  console.error('❌ Stream error:', err.message);

  // Reconnect after 5 seconds
  if (err.code === grpc.status.UNAVAILABLE) {
    console.log('🔄 Reconnecting in 5 seconds...');
    setTimeout(() => reconnect(), 5000);
  }
});

stream.on('end', () => {
  console.log('Stream ended, reconnecting...');
  setTimeout(() => reconnect(), 1000);
});

function reconnect() {
  console.log('🔌 Reconnecting...');
  // Restart connection
  const newStream = client.subscribe(metadata);
  // ... (resubscribe with same logic)
}

// Graceful shutdown
process.on('SIGINT', () => {
  console.log('\n👋 Shutting down...');
  stream.end();
  process.exit(0);
});

console.log('🚀 Slot streaming started...');
console.log('   Endpoint: grpc.fortiblox.com:10002');
console.log('   Press Ctrl+C to stop\n');

Run

export FORTIBLOX_API_KEY="your_api_key_here"
node slot-stream.js

Complete Slot Streaming Example

# slot_stream.py
import grpc
import os
import time
from datetime import datetime
import geyser_pb2
import geyser_pb2_grpc

class SlotMonitor:
    def __init__(self, api_key):
        self.api_key = api_key
        self.last_slot = 0
        self.missed_slots = 0
        self.slot_count = 0
        self.start_time = time.time()

    def connect(self):
        # Create insecure channel (plaintext)
        channel = grpc.insecure_channel('grpc.fortiblox.com:10002')
        stub = geyser_pb2_grpc.GeyserStub(channel)
        return stub

    def request_generator(self):
        """Generate subscription request for slots"""
        yield geyser_pb2.SubscribeRequest(
            slots={
                'slot_updates': geyser_pb2.SubscribeRequestFilterSlots()
            },
            commitment=geyser_pb2.CommitmentLevel.CONFIRMED
        )

    def monitor(self):
        print("🚀 Slot streaming started...")
        print("   Endpoint: grpc.fortiblox.com:10002")
        print("   Press Ctrl+C to stop\n")

        stub = self.connect()
        metadata = [('x-api-key', self.api_key)]

        try:
            for update in stub.Subscribe(self.request_generator(), metadata=metadata):
                if update.HasField('slot'):
                    self.process_slot(update.slot)

        except grpc.RpcError as e:
            print(f"❌ gRPC error: {e.code()} - {e.details()}")
            print("🔄 Reconnecting in 5 seconds...")
            time.sleep(5)
            self.monitor()  # Reconnect

        except KeyboardInterrupt:
            print("\n👋 Shutting down...")

    def process_slot(self, slot):
        self.slot_count += 1

        # Detect missed slots
        if self.last_slot > 0 and slot.slot > self.last_slot + 1:
            missed = slot.slot - self.last_slot - 1
            self.missed_slots += missed
            print(f"⚠️  Missed {missed} slot(s) between {self.last_slot} and {slot.slot}")

        # Format timestamp
        timestamp = datetime.fromtimestamp(slot.timestamp).strftime('%H:%M:%S')

        print(f"📊 Slot {slot.slot} | Parent: {slot.parent} | Status: {slot.status} | Time: {timestamp}")

        # Calculate metrics every 100 slots
        if self.slot_count % 100 == 0:
            elapsed = time.time() - self.start_time
            slots_per_second = self.slot_count / elapsed
            miss_rate = (self.missed_slots / self.slot_count * 100)

            print(f"\n📈 Metrics (last {self.slot_count} slots):")
            print(f"   Rate: {slots_per_second:.2f} slots/sec")
            print(f"   Missed: {self.missed_slots} ({miss_rate:.2f}%)\n")

        self.last_slot = slot.slot

if __name__ == '__main__':
    api_key = os.getenv('FORTIBLOX_API_KEY')
    if not api_key:
        print("Error: FORTIBLOX_API_KEY environment variable not set")
        exit(1)

    monitor = SlotMonitor(api_key)
    monitor.monitor()

Setup & Run

# Install dependencies
pip install grpcio grpcio-tools

# Download and compile proto
curl -O https://raw.githubusercontent.com/rpcpool/yellowstone-grpc/master/yellowstone-grpc-proto/proto/geyser.proto
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. geyser.proto

# Run
export FORTIBLOX_API_KEY="your_api_key_here"
python slot_stream.py

Complete Slot Streaming Example

// src/main.rs
use tonic::{transport::Channel, metadata::MetadataValue, Request};
use tokio_stream::StreamExt;
use std::time::{Duration, Instant};

pub mod geyser {
    tonic::include_proto!("geyser");
}

use geyser::{geyser_client::GeyserClient, SubscribeRequest};

struct SlotMetrics {
    last_slot: u64,
    missed_slots: u64,
    slot_count: u64,
    start_time: Instant,
}

impl SlotMetrics {
    fn new() -> Self {
        Self {
            last_slot: 0,
            missed_slots: 0,
            slot_count: 0,
            start_time: Instant::now(),
        }
    }

    fn process_slot(&mut self, slot: u64, parent: u64, status: &str) {
        self.slot_count += 1;

        // Detect missed slots
        if self.last_slot > 0 && slot > self.last_slot + 1 {
            let missed = slot - self.last_slot - 1;
            self.missed_slots += missed;
            println!("⚠️  Missed {} slot(s) between {} and {}", missed, self.last_slot, slot);
        }

        println!("📊 Slot {} | Parent: {} | Status: {}", slot, parent, status);

        // Print metrics every 100 slots
        if self.slot_count % 100 == 0 {
            let elapsed = self.start_time.elapsed().as_secs_f64();
            let slots_per_second = self.slot_count as f64 / elapsed;
            let miss_rate = (self.missed_slots as f64 / self.slot_count as f64) * 100.0;

            println!("\n📈 Metrics (last {} slots):", self.slot_count);
            println!("   Rate: {:.2} slots/sec", slots_per_second);
            println!("   Missed: {} ({:.2}%)\n", self.missed_slots, miss_rate);
        }

        self.last_slot = slot;
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("🚀 Slot streaming started...");
    println!("   Endpoint: grpc.fortiblox.com:10002");
    println!("   Press Ctrl+C to stop\n");

    let mut metrics = SlotMetrics::new();

    loop {
        if let Err(e) = run_slot_stream(&mut metrics).await {
            eprintln!("❌ Error: {}", e);
            eprintln!("🔄 Reconnecting in 5 seconds...");
            tokio::time::sleep(Duration::from_secs(5)).await;
        }
    }
}

async fn run_slot_stream(metrics: &mut SlotMetrics) -> Result<(), Box<dyn std::error::Error>> {
    // Create channel (insecure/plaintext)
    let channel = Channel::from_static("http://grpc.fortiblox.com:10002")
        .connect()
        .await?;

    let mut client = GeyserClient::new(channel);

    // Create request with API key
    let api_key = std::env::var("FORTIBLOX_API_KEY")?;
    let token = MetadataValue::try_from(api_key)?;

    let mut request = Request::new(tokio_stream::iter(vec![
        SubscribeRequest {
            slots: std::collections::HashMap::from([(
                "slot_updates".to_string(),
                geyser::SubscribeRequestFilterSlots::default()
            )]),
            commitment: Some(geyser::CommitmentLevel::Confirmed as i32),
            ..Default::default()
        }
    ]));

    request.metadata_mut().insert("x-api-key", token);

    // Subscribe
    let mut stream = client.subscribe(request).await?.into_inner();

    // Process updates
    while let Some(update) = stream.next().await {
        match update {
            Ok(msg) => {
                if let Some(slot) = msg.slot {
                    metrics.process_slot(
                        slot.slot,
                        slot.parent,
                        &format!("{:?}", slot.status)
                    );
                }
            }
            Err(e) => {
                eprintln!("Stream error: {}", e);
                return Err(Box::new(e));
            }
        }
    }

    Ok(())
}

Cargo.toml

[dependencies]
tonic = "0.11"
prost = "0.12"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"

[build-dependencies]
tonic-build = "0.11"

Build & Run

export FORTIBLOX_API_KEY="your_api_key_here"
cargo run --release

Complete Slot Streaming Example

// main.go
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "os"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/metadata"

    pb "your-module/geyser"
)

type SlotMetrics struct {
    lastSlot     uint64
    missedSlots  uint64
    slotCount    uint64
    startTime    time.Time
}

func NewSlotMetrics() *SlotMetrics {
    return &SlotMetrics{
        startTime: time.Now(),
    }
}

func (m *SlotMetrics) ProcessSlot(slot, parent uint64, status string) {
    m.slotCount++

    // Detect missed slots
    if m.lastSlot > 0 && slot > m.lastSlot+1 {
        missed := slot - m.lastSlot - 1
        m.missedSlots += missed
        fmt.Printf("⚠️  Missed %d slot(s) between %d and %d\n", missed, m.lastSlot, slot)
    }

    fmt.Printf("📊 Slot %d | Parent: %d | Status: %s\n", slot, parent, status)

    // Print metrics every 100 slots
    if m.slotCount%100 == 0 {
        elapsed := time.Since(m.startTime).Seconds()
        slotsPerSecond := float64(m.slotCount) / elapsed
        missRate := (float64(m.missedSlots) / float64(m.slotCount)) * 100

        fmt.Printf("\n📈 Metrics (last %d slots):\n", m.slotCount)
        fmt.Printf("   Rate: %.2f slots/sec\n", slotsPerSecond)
        fmt.Printf("   Missed: %d (%.2f%%)\n\n", m.missedSlots, missRate)
    }

    m.lastSlot = slot
}

func main() {
    fmt.Println("🚀 Slot streaming started...")
    fmt.Println("   Endpoint: grpc.fortiblox.com:10002")
    fmt.Println("   Press Ctrl+C to stop\n")

    apiKey := os.Getenv("FORTIBLOX_API_KEY")
    if apiKey == "" {
        log.Fatal("FORTIBLOX_API_KEY environment variable not set")
    }

    metrics := NewSlotMetrics()

    for {
        if err := runSlotStream(apiKey, metrics); err != nil {
            log.Printf("❌ Error: %v", err)
            log.Println("🔄 Reconnecting in 5 seconds...")
            time.Sleep(5 * time.Second)
        }
    }
}

func runSlotStream(apiKey string, metrics *SlotMetrics) error {
    // Create insecure connection (plaintext)
    conn, err := grpc.Dial(
        "grpc.fortiblox.com:10002",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        return fmt.Errorf("failed to connect: %w", err)
    }
    defer conn.Close()

    // Create client
    client := pb.NewGeyserClient(conn)

    // Add API key to context
    md := metadata.New(map[string]string{
        "x-api-key": apiKey,
    })
    ctx := metadata.NewOutgoingContext(context.Background(), md)

    // Create stream
    stream, err := client.Subscribe(ctx)
    if err != nil {
        return fmt.Errorf("failed to subscribe: %w", err)
    }

    // Send subscription request
    req := &pb.SubscribeRequest{
        Slots: map[string]*pb.SubscribeRequestFilterSlots{
            "slot_updates": {},
        },
        Commitment: pb.CommitmentLevel_CONFIRMED,
    }

    if err := stream.Send(req); err != nil {
        return fmt.Errorf("failed to send request: %w", err)
    }

    // Receive updates
    for {
        update, err := stream.Recv()
        if err == io.EOF {
            return fmt.Errorf("stream ended")
        }
        if err != nil {
            return fmt.Errorf("stream error: %w", err)
        }

        if slot := update.GetSlot(); slot != nil {
            metrics.ProcessSlot(slot.Slot, slot.Parent, slot.Status.String())
        }
    }
}

Run

export FORTIBLOX_API_KEY="your_api_key_here"
go run main.go

Complete Slot Streaming Example

// SlotStreamClient.java
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;

import java.time.Instant;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

public class SlotStreamClient {
    private final GeyserGrpc.GeyserStub asyncStub;
    private long lastSlot = 0;
    private long missedSlots = 0;
    private long slotCount = 0;
    private long startTime = System.currentTimeMillis();

    public SlotStreamClient(String host, int port, String apiKey) {
        // Create insecure channel (plaintext)
        ManagedChannel channel = ManagedChannelBuilder
            .forAddress(host, port)
            .usePlaintext()
            .build();

        // Add API key metadata
        Metadata metadata = new Metadata();
        Metadata.Key<String> key = Metadata.Key.of("x-api-key", Metadata.ASCII_STRING_MARSHALLER);
        metadata.put(key, apiKey);

        this.asyncStub = GeyserGrpc.newStub(channel)
            .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));
    }

    public void startStreaming() {
        System.out.println("🚀 Slot streaming started...");
        System.out.println("   Endpoint: grpc.fortiblox.com:10002");
        System.out.println("   Press Ctrl+C to stop\n");

        StreamObserver<SubscribeRequest> requestObserver = asyncStub.subscribe(
            new StreamObserver<SubscribeUpdate>() {
                @Override
                public void onNext(SubscribeUpdate update) {
                    if (update.hasSlot()) {
                        processSlot(update.getSlot());
                    }
                }

                @Override
                public void onError(Throwable t) {
                    System.err.println("❌ Stream error: " + t.getMessage());
                    System.out.println("🔄 Reconnecting in 5 seconds...");
                    try {
                        Thread.sleep(5000);
                        startStreaming(); // Reconnect
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }

                @Override
                public void onCompleted() {
                    System.out.println("Stream completed");
                }
            }
        );

        // Send subscription request
        SubscribeRequest request = SubscribeRequest.newBuilder()
            .putSlots("slot_updates", SubscribeRequestFilterSlots.getDefaultInstance())
            .setCommitment(CommitmentLevel.CONFIRMED)
            .build();

        requestObserver.onNext(request);
    }

    private void processSlot(SubscribeUpdateSlot slot) {
        slotCount++;

        // Detect missed slots
        if (lastSlot > 0 && slot.getSlot() > lastSlot + 1) {
            long missed = slot.getSlot() - lastSlot - 1;
            missedSlots += missed;
            System.out.printf("⚠️  Missed %d slot(s) between %d and %d%n",
                missed, lastSlot, slot.getSlot());
        }

        System.out.printf("📊 Slot %d | Parent: %d | Status: %s%n",
            slot.getSlot(), slot.getParent(), slot.getStatus());

        // Print metrics every 100 slots
        if (slotCount % 100 == 0) {
            long elapsed = System.currentTimeMillis() - startTime;
            double slotsPerSecond = slotCount / (elapsed / 1000.0);
            double missRate = (missedSlots / (double) slotCount) * 100;

            System.out.printf("%n📈 Metrics (last %d slots):%n", slotCount);
            System.out.printf("   Rate: %.2f slots/sec%n", slotsPerSecond);
            System.out.printf("   Missed: %d (%.2f%%)%n%n", missedSlots, missRate);
        }

        lastSlot = slot.getSlot();
    }

    public static void main(String[] args) {
        String apiKey = System.getenv("FORTIBLOX_API_KEY");
        if (apiKey == null || apiKey.isEmpty()) {
            System.err.println("Error: FORTIBLOX_API_KEY environment variable not set");
            System.exit(1);
        }

        SlotStreamClient client = new SlotStreamClient("grpc.fortiblox.com", 10002, apiKey);
        client.startStreaming();

        // Keep alive
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            System.out.println("\n👋 Shutting down...");
        }
    }
}

pom.xml

<dependencies>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-netty-shaded</artifactId>
        <version>1.58.0</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-protobuf</artifactId>
        <version>1.58.0</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-stub</artifactId>
        <version>1.58.0</version>
    </dependency>
</dependencies>

Run

export FORTIBLOX_API_KEY="your_api_key_here"
mvn compile exec:java -Dexec.mainClass="SlotStreamClient"

Real-World Scenarios

1. Block Explorer Chain Head Display

Display real-time chain head with slot updates:

// chain-head-tracker.js
class ChainHeadTracker {
  constructor() {
    this.currentSlot = 0;
    this.confirmedSlot = 0;
    this.finalizedSlot = 0;
  }

  async start() {
    const stream = await this.createStream();

    stream.on('data', (update) => {
      if (update.slot) {
        const { slot, status } = update.slot;

        // Update tracked slots based on commitment
        switch (status) {
          case 'processed':
            this.currentSlot = Math.max(this.currentSlot, slot);
            break;
          case 'confirmed':
            this.confirmedSlot = Math.max(this.confirmedSlot, slot);
            break;
          case 'finalized':
            this.finalizedSlot = Math.max(this.finalizedSlot, slot);
            break;
        }

        // Update UI
        this.updateUI({
          current: this.currentSlot,
          confirmed: this.confirmedSlot,
          finalized: this.finalizedSlot,
          lag: this.currentSlot - this.finalizedSlot
        });
      }
    });
  }

  updateUI(data) {
    // Push to websocket clients, update dashboard, etc.
    console.log(`Current: ${data.current} | Confirmed: ${data.confirmed} | Finalized: ${data.finalized} | Lag: ${data.lag}`);
  }
}

2. Validator Performance Monitor

Track validator slot performance:

# validator_monitor.py
class ValidatorMonitor:
    def __init__(self, validator_identity):
        self.validator_identity = validator_identity
        self.slots_produced = 0
        self.slots_missed = 0
        self.expected_slots = []

    async def monitor(self):
        async for update in self.stream_slots():
            if update.HasField('slot'):
                slot_number = update.slot.slot
                leader = await self.get_slot_leader(slot_number)

                if leader == self.validator_identity:
                    # This validator was the leader
                    if self.is_slot_produced(slot_number):
                        self.slots_produced += 1
                        print(f"✅ Validator produced slot {slot_number}")
                    else:
                        self.slots_missed += 1
                        print(f"❌ Validator missed slot {slot_number}")
                        # Send alert
                        await self.send_alert(f"Missed slot {slot_number}")

                    # Calculate uptime
                    uptime = (self.slots_produced / (self.slots_produced + self.slots_missed)) * 100
                    print(f"📊 Uptime: {uptime:.2f}%")

3. MEV Bot Timing Optimization

Optimize transaction submission timing:

// mev_bot.rs
struct MevBot {
    current_slot: u64,
    next_slot_estimate: Instant,
}

impl MevBot {
    async fn monitor_slots(&mut self) {
        let mut stream = self.create_slot_stream().await;

        while let Some(update) = stream.next().await {
            if let Some(slot) = update.slot {
                self.current_slot = slot.slot;

                // Estimate next slot timing (400ms per slot)
                self.next_slot_estimate = Instant::now() + Duration::from_millis(400);

                // Check if we should submit pending transactions
                self.check_pending_transactions().await;
            }
        }
    }

    async fn check_pending_transactions(&self) {
        // Submit transactions optimally timed for next slot
        if self.has_pending_profitable_transaction() {
            // Calculate optimal submission time
            let submit_time = self.next_slot_estimate - Duration::from_millis(100);

            tokio::time::sleep_until(submit_time.into()).await;
            self.submit_transaction().await;
        }
    }
}

4. Network Health Dashboard

Real-time network metrics:

// network_dashboard.go
type NetworkMetrics struct {
    SlotRate          float64
    AverageBlockTime  float64
    MissedSlotRate    float64
    LastSlot          uint64
}

func (d *Dashboard) MonitorNetwork() {
    var slots []uint64
    var timestamps []time.Time

    for update := range d.slotStream {
        if slot := update.GetSlot(); slot != nil {
            slots = append(slots, slot.Slot)
            timestamps = append(timestamps, time.Now())

            // Keep last 1000 slots
            if len(slots) > 1000 {
                slots = slots[1:]
                timestamps = timestamps[1:]
            }

            // Calculate metrics
            metrics := d.calculateMetrics(slots, timestamps)

            // Update dashboard
            d.updateMetrics(metrics)

            // Check for anomalies
            if metrics.SlotRate < 2.0 {
                d.sendAlert("⚠️ Network slow: " + fmt.Sprintf("%.2f slots/sec", metrics.SlotRate))
            }
        }
    }
}

func (d *Dashboard) calculateMetrics(slots []uint64, timestamps []time.Time) NetworkMetrics {
    // Calculate slot production rate
    elapsed := timestamps[len(timestamps)-1].Sub(timestamps[0]).Seconds()
    slotRate := float64(len(slots)) / elapsed

    // Calculate missed slots
    expectedSlots := slots[len(slots)-1] - slots[0]
    actualSlots := uint64(len(slots))
    missedSlots := expectedSlots - actualSlots
    missRate := float64(missedSlots) / float64(expectedSlots) * 100

    return NetworkMetrics{
        SlotRate:       slotRate,
        MissedSlotRate: missRate,
        LastSlot:       slots[len(slots)-1],
    }
}

Performance & Best Practices

Connection Management

1. Use Persistent Connections

// ✅ Good: Single persistent connection
const stream = client.subscribe(metadata);
stream.on('data', handleSlot);

// ❌ Bad: Creating new connections repeatedly
setInterval(() => {
  const stream = client.subscribe(metadata); // Don't do this!
}, 1000);

2. Implement Reconnection Logic

function createSlotStream() {
  const stream = client.subscribe(metadata);

  stream.on('error', (err) => {
    console.error('Stream error:', err);
    setTimeout(() => createSlotStream(), 5000);
  });

  stream.on('end', () => {
    console.log('Stream ended');
    setTimeout(() => createSlotStream(), 1000);
  });

  return stream;
}

3. Handle Backpressure

# Implement buffering for high-throughput scenarios
from collections import deque

slot_buffer = deque(maxlen=1000)

for update in stub.Subscribe(request_generator(), metadata=metadata):
    if update.HasField('slot'):
        slot_buffer.append(update.slot)

        # Process in batches
        if len(slot_buffer) >= 100:
            process_slot_batch(list(slot_buffer))
            slot_buffer.clear()

Data Processing

1. Efficient Slot Tracking

// Use circular buffer for memory efficiency
class SlotTracker {
  constructor(size = 1000) {
    this.slots = new Array(size);
    this.index = 0;
    this.size = size;
  }

  add(slot) {
    this.slots[this.index % this.size] = slot;
    this.index++;
  }

  getRecent(count) {
    const start = Math.max(0, this.index - count);
    return this.slots.slice(start, this.index);
  }
}

2. Detect Slot Gaps Efficiently

struct SlotGapDetector {
    last_slot: Option<u64>,
    gaps: Vec<(u64, u64)>, // (from, to)
}

impl SlotGapDetector {
    fn process(&mut self, slot: u64) -> Option<(u64, u64)> {
        if let Some(last) = self.last_slot {
            if slot > last + 1 {
                let gap = (last + 1, slot - 1);
                self.gaps.push(gap);
                self.last_slot = Some(slot);
                return Some(gap);
            }
        }
        self.last_slot = Some(slot);
        None
    }
}

Error Handling

1. Exponential Backoff

import time

def connect_with_retry(max_retries=5):
    retry = 0
    while retry < max_retries:
        try:
            return create_slot_stream()
        except grpc.RpcError as e:
            retry += 1
            wait = min(2 ** retry, 60)  # Cap at 60 seconds
            print(f"Retry {retry}/{max_retries} in {wait}s...")
            time.sleep(wait)

    raise Exception("Max retries exceeded")

2. Graceful Degradation

class ResilientSlotMonitor {
  constructor() {
    this.fallbackEnabled = true;
    this.lastSlot = 0;
  }

  async monitor() {
    try {
      await this.streamSlots();
    } catch (err) {
      console.error('Streaming failed, falling back to polling');
      if (this.fallbackEnabled) {
        await this.pollSlots();
      }
    }
  }

  async pollSlots() {
    // Fallback to HTTP polling if streaming fails
    setInterval(async () => {
      const slot = await this.fetchLatestSlot();
      this.processSlot(slot);
    }, 1000);
  }
}

Filtering & Subscription Options

Basic Subscription

Subscribe to all slots (recommended):

stream.write({
  slots: {
    'all_slots': {}
  }
});

Commitment Level Filtering

Filter by commitment level:

# Only confirmed slots
request = geyser_pb2.SubscribeRequest(
    slots={
        'confirmed_slots': geyser_pb2.SubscribeRequestFilterSlots()
    },
    commitment=geyser_pb2.CommitmentLevel.CONFIRMED
)

# Only finalized slots
request = geyser_pb2.SubscribeRequest(
    slots={
        'finalized_slots': geyser_pb2.SubscribeRequestFilterSlots()
    },
    commitment=geyser_pb2.CommitmentLevel.FINALIZED
)

Troubleshooting

Common Issues

1. Connection Refused

Error: 14 UNAVAILABLE: Connection refused

Solution:

  • Verify endpoint: grpc.fortiblox.com:10002
  • Ensure using insecure/plaintext credentials
  • Check Professional+ tier access
  • Verify firewall allows outbound port 10002

2. Authentication Errors

Error: 16 UNAUTHENTICATED: Invalid API key

Solution:

  • Verify API key in metadata header
  • Check header name is exactly x-api-key
  • Ensure Professional+ tier subscription
  • Regenerate API key if needed

3. No Data Received

Solution:

  • Check subscription request format
  • Verify commitment level
  • Ensure stream is properly initialized
  • Check for error callbacks

4. High Memory Usage

Solution:

  • Implement circular buffers
  • Process slots in batches
  • Clear old data periodically
  • Use efficient data structures

Debug Mode

Enable debug logging:

// Node.js
process.env.GRPC_VERBOSITY = 'DEBUG';
process.env.GRPC_TRACE = 'all';

// Python
import logging
logging.basicConfig(level=logging.DEBUG)

Rate Limits & Quotas

TierConcurrent StreamsSlots/SecondMonthly Quota
Professional10Unlimited10M slots
Enterprise100UnlimitedUnlimited

Slot streaming counts toward your monthly streaming quota. Each slot update consumed counts as 1 event.

Next Steps

Support

Need help with slot streaming?


Technical Details:

  • Redis Stream: geyser:slots
  • Current Entries: 227+
  • Freshness: <30 seconds
  • Latency: <50ms avg
  • Protocol: HTTP/2 plaintext
  • Authentication: SHA-256 via x-api-key header