Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ pub struct AppState {

### Frontend API client
- Base URL: `/api` (proxied by nginx to `atlas-api:3000`)
- Fast polling endpoint: `GET /api/height` → `{ block_height, indexed_at }` — used by navbar every 2s
- Chain status: `GET /api/status` → full chain info, fetched once on page load
- `GET /api/status` → `{ block_height, indexed_at }` — single key-value lookup from `indexer_state`, sub-ms. This is the **only** chain status endpoint; there is no separate "full chain info" endpoint. Used by the navbar as a polling fallback when SSE is disconnected.
- `GET /api/events` → SSE stream of `new_block` events, one per block in order. Primary live-update path for navbar counter and blocks page. Falls back to `/api/status` polling on disconnect.

## Important Conventions

Expand Down
5 changes: 5 additions & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
# Config
dotenvy = "0.15"

# Streaming
tokio-stream = "0.1"
futures = "0.3"
async-stream = "0.3"

# Utilities
bigdecimal = { version = "0.4", features = ["serde"] }
hex = "0.4"
Expand Down
3 changes: 3 additions & 0 deletions backend/crates/atlas-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ bigdecimal = { workspace = true }
hex = { workspace = true }
chrono = { workspace = true }
reqwest = { workspace = true }
tokio-stream = { workspace = true }
futures = { workspace = true }
async-stream = { workspace = true }
1 change: 1 addition & 0 deletions backend/crates/atlas-api/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod logs;
pub mod nfts;
pub mod proxy;
pub mod search;
pub mod sse;
pub mod status;
pub mod tokens;
pub mod transactions;
Expand Down
116 changes: 116 additions & 0 deletions backend/crates/atlas-api/src/handlers/sse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use axum::{
extract::State,
response::sse::{Event, Sse},
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;

use crate::AppState;
use atlas_common::Block;
use tracing::warn;

#[derive(Serialize)]
struct NewBlockEvent {
block: Block,
}

/// GET /api/events — Server-Sent Events stream for live block updates.
/// Polls the DB every 200ms and emits one `new_block` event per block, in order.
/// Never skips blocks — fetches all blocks since the last one sent.
pub async fn block_events(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream = async_stream::stream! {
let mut last_block_number: Option<i64> = None;
let mut tick = interval(Duration::from_millis(200));
let mut ping_counter: u32 = 0;

loop {
tick.tick().await;
ping_counter += 1;

// On first tick, seed with the latest block number
if last_block_number.is_none() {
let latest: Option<i64> = match sqlx::query_scalar("SELECT MAX(number) FROM blocks")
.fetch_one(&state.pool)
.await
{
Ok(v) => v,
Err(e) => { warn!(error = ?e, "sse: failed to query latest block number"); continue; }
};

if let Some(max_num) = latest {
// Emit the current latest block as the initial event.
// Only advance the cursor after a successful fetch-and-emit so the
// block is not skipped if the fetch fails.
let block: Option<Block> = match sqlx::query_as(
"SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
FROM blocks WHERE number = $1"
)
.bind(max_num)
.fetch_optional(&state.pool)
.await
{
Ok(v) => v,
Err(e) => { warn!(error = ?e, "sse: failed to fetch initial block"); continue; }
};

if let Some(block) = block {
last_block_number = Some(block.number);
let event = NewBlockEvent { block };
if let Ok(json) = serde_json::to_string(&event) {
yield Ok(Event::default().event("new_block").data(json));
}
ping_counter = 0;
}
}
continue;
}

let cursor = last_block_number.unwrap();

// Fetch ALL new blocks since last sent, in ascending order
let new_blocks: Vec<Block> = match sqlx::query_as(
"SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
FROM blocks WHERE number > $1 ORDER BY number ASC"
)
.bind(cursor)
.fetch_all(&state.pool)
.await
{
Ok(rows) => rows,
Err(e) => { warn!(error = ?e, cursor, "sse: failed to fetch new blocks"); continue; }
};

if !new_blocks.is_empty() {
ping_counter = 0;
}

// Emit one event per block, in order
for block in new_blocks {
last_block_number = Some(block.number);

let event = NewBlockEvent { block };
if let Ok(json) = serde_json::to_string(&event) {
yield Ok(Event::default().event("new_block").data(json));
}
}

// Send keep-alive ping every ~15s (75 ticks * 200ms)
if ping_counter >= 75 {
ping_counter = 0;
yield Ok(Event::default().comment("keep-alive"));
}
}
};

Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive"),
)
}
12 changes: 10 additions & 2 deletions backend/crates/atlas-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ async fn main() -> Result<()> {
admin_api_key,
});

// SSE route — excluded from TimeoutLayer so connections stay alive
let sse_routes = Router::new()
.route("/api/events", get(handlers::sse::block_events))
.with_state(state.clone());

// Build router
let app = Router::new()
// Blocks
Expand Down Expand Up @@ -215,14 +220,17 @@ async fn main() -> Result<()> {
axum::http::StatusCode::REQUEST_TIMEOUT,
Duration::from_secs(10),
))
.with_state(state)
// Merge SSE routes (no TimeoutLayer so connections stay alive)
.merge(sse_routes)
// Shared layers applied to all routes
.layer(
CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any),
)
.layer(TraceLayer::new_for_http())
.with_state(state);
.layer(TraceLayer::new_for_http());

let addr = format!("{}:{}", host, port);
tracing::info!("Listening on {}", addr);
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,5 @@ services:

volumes:
pgdata:
external: true
name: atlas_pgdata
13 changes: 13 additions & 0 deletions frontend/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@ server {
try_files $uri $uri/ /index.html;
}

# SSE endpoint — disable buffering so events stream through immediately
location /api/events {
proxy_pass http://atlas-api:3000/api/events;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
}

# Proxy API requests to atlas-api service
location /api/ {
proxy_pass http://atlas-api:3000/api/;
Expand Down
28 changes: 19 additions & 9 deletions frontend/src/components/Layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Link, NavLink, Outlet, useLocation } from 'react-router-dom';
import { useEffect, useMemo, useRef, useState } from 'react';
import SearchBar from './SearchBar';
import useLatestBlockHeight from '../hooks/useLatestBlockHeight';
import useBlockSSE from '../hooks/useBlockSSE';
import SmoothCounter from './SmoothCounter';
import logoImg from '../assets/logo.png';
import { BlockStatsContext } from '../context/BlockStatsContext';
Expand All @@ -10,7 +11,8 @@ import { useTheme } from '../hooks/useTheme';
export default function Layout() {
const location = useLocation();
const isHome = location.pathname === '/';
const { height, lastUpdatedAt, bps } = useLatestBlockHeight(2000, 1000000);
const sse = useBlockSSE();
const { height, lastUpdatedAt, bps } = useLatestBlockHeight(2000, 1000000, sse.height, sse.connected, sse.bps);
const [now, setNow] = useState(() => Date.now());
const recentlyUpdated = lastUpdatedAt ? (now - lastUpdatedAt) < 10000 : false;
const [displayedHeight, setDisplayedHeight] = useState<number | null>(null);
Expand All @@ -25,7 +27,9 @@ export default function Layout() {
return () => window.clearInterval(id);
}, []);

// Smoothly increment displayed height using bps
// Update displayed height
// When SSE is connected: show exact height from SSE (increments one-by-one)
// When polling: use bps prediction to smooth between poll intervals
useEffect(() => {
if (height == null) {
if (displayRafRef.current !== null) {
Expand All @@ -52,9 +56,17 @@ export default function Layout() {
});
}

// When SSE is connected, just track the real height directly — no prediction.
// The initialization block above already scheduled a RAF to call setDisplayedHeight
// whenever height changes, so no synchronous setState needed here.
if (sse.connected) {
displayedRef.current = height;
return;
}

// Polling mode: use bps prediction to smooth between poll intervals
const loop = (t: number) => {
if (!bps || bps <= 0) {
// No rate info; just stick to the last known real height
if (displayedRef.current !== height) {
displayedRef.current = height;
setDisplayedHeight(displayedRef.current);
Expand All @@ -64,9 +76,7 @@ export default function Layout() {
const dt = lastFrameRef.current ? (now - lastFrameRef.current) / 1000 : 0;
lastFrameRef.current = now;

// Increase predicted height smoothly by bps * dt
const predicted = displayedRef.current + bps * dt;
// Always at least the latest known chain height
const next = Math.max(height, Math.floor(predicted));
if (next !== displayedRef.current) {
displayedRef.current = next;
Expand All @@ -84,7 +94,7 @@ export default function Layout() {
displayRafRef.current = null;
lastFrameRef.current = 0;
};
}, [height, bps]);
}, [height, bps, sse.connected]);
const blockTimeLabel = useMemo(() => {
if (bps !== null && bps > 0) {
const secs = 1 / bps;
Expand Down Expand Up @@ -175,8 +185,8 @@ export default function Layout() {
</button>
<div className="flex items-center gap-3 text-sm text-gray-300">
<span
className={`inline-block w-2.5 h-2.5 rounded-full ${recentlyUpdated ? 'bg-red-500 live-dot' : 'bg-gray-600'}`}
title={recentlyUpdated ? 'Live updates' : 'Idle'}
className={`inline-block w-2.5 h-2.5 rounded-full ${sse.connected ? 'bg-green-500 live-dot' : recentlyUpdated ? 'bg-red-500 live-dot' : 'bg-gray-600'}`}
title={sse.connected ? 'SSE connected' : recentlyUpdated ? 'Polling' : 'Idle'}
/>
<SmoothCounter value={displayedHeight} />
<span className="text-gray-600">|</span>
Expand Down Expand Up @@ -259,7 +269,7 @@ export default function Layout() {
{/* Main content */}
<main className="flex-1">
<div className="max-w-7xl mx-auto px-4 sm:px-6 lg:px-8 py-8">
<BlockStatsContext.Provider value={{ bps, height: displayedHeight }}>
<BlockStatsContext.Provider value={{ bps, height: displayedHeight, latestBlockEvent: sse.latestBlock, sseConnected: sse.connected }}>
<Outlet />
</BlockStatsContext.Provider>
</div>
Expand Down
5 changes: 1 addition & 4 deletions frontend/src/components/SmoothCounter.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ interface SmoothCounterProps {

export default function SmoothCounter({ value, className = '' }: SmoothCounterProps) {
const text = value !== null ? new Intl.NumberFormat('en-US').format(Math.floor(value)) : '—';
// Key on value so the animation restarts on change
return (
<span className={`font-mono ${className}`}>
<span key={text} className="fade-in-up inline-block align-bottom">{text}</span>
</span>
<span className={`font-mono tabular-nums ${className}`}>{text}</span>
);
}
10 changes: 9 additions & 1 deletion frontend/src/context/BlockStatsContext.tsx
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import { createContext } from 'react';
import type { NewBlockEvent } from '../hooks/useBlockSSE';

export interface BlockStats {
bps: number | null;
height: number | null;
latestBlockEvent: NewBlockEvent | null;
sseConnected: boolean;
}

export const BlockStatsContext = createContext<BlockStats>({ bps: null, height: null });
export const BlockStatsContext = createContext<BlockStats>({
bps: null,
height: null,
latestBlockEvent: null,
sseConnected: false,
});

Loading