NexusgRPC Streaming
Historical Data Replay
Replay historical transactions and blocks from specific slot ranges
Historical Data Replay
Replay historical blockchain data from TimescaleDB for analytics, backtesting, and data recovery.
Historical Transaction Replay
Use start_slot and end_slot parameters to replay transactions from a specific slot range.
Request
// Replay transactions from slot 100000 to 100100
const stream = client.SubscribeTransactions({
start_slot: 100000,
end_slot: 100100,
filter_votes: true,
commitment: 'FINALIZED'
}, metadata);
let count = 0;
stream.on('data', (tx) => {
count++;
console.log(`[${count}] Slot ${tx.slot}: ${tx.signature}`);
});
stream.on('end', () => {
console.log(`Replay complete: ${count} transactions`);
});# Replay transactions from specific slot range
request = geyser_pb2.SubscribeTransactionsRequest(
start_slot=100000,
end_slot=100100,
filter_votes=True,
commitment=geyser_pb2.FINALIZED
)
count = 0
for tx in stub.SubscribeTransactions(request, metadata=metadata):
count += 1
print(f"[{count}] Slot {tx.slot}: {tx.signature}")
print(f"Replay complete: {count} transactions")Historical Block Replay
Replay blocks from a slot range:
const stream = client.SubscribeBlocks({
start_slot: 100000,
end_slot: 100010,
commitment: 'FINALIZED'
}, metadata);
stream.on('data', (block) => {
console.log(`Block ${block.slot}: ${block.transaction_count} transactions`);
});Use Cases
Backfill Missing Data
async function backfillData(startSlot, endSlot) {
return new Promise((resolve) => {
const transactions = [];
const stream = client.SubscribeTransactions({
start_slot: startSlot,
end_slot: endSlot,
filter_votes: true
}, metadata);
stream.on('data', (tx) => {
transactions.push(tx);
});
stream.on('end', () => {
resolve(transactions);
});
});
}
const data = await backfillData(100000, 101000);
console.log(`Backfilled ${data.length} transactions`);Analytics & Backtesting
def analyze_slot_range(start, end):
"""Analyze transactions in a slot range"""
request = geyser_pb2.SubscribeTransactionsRequest(
start_slot=start,
end_slot=end,
account_include=['JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4'],
filter_votes=True
)
swaps = []
for tx in stub.SubscribeTransactions(request, metadata=metadata):
swaps.append(tx)
return analyze_swaps(swaps)Performance Tips
- Use batching for large slot ranges
- Set appropriate
filter_votesto reduce data volume - Use
FINALIZEDcommitment for historical data - Historical queries are batched at 1000 transactions per batch
See gRPC Streaming Overview for more information.