-
Notifications
You must be signed in to change notification settings - Fork 0
feat: add SSE for live block updates #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
pthmas
wants to merge
8
commits into
main
Choose a base branch
from
pthmas/websocket-live-updates
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
f2d30cd
feat: add SSE for live block updates
pthmas cc1a48e
Fix SSE bootstrap query for empty blocks table
pthmas bc193c3
fix: address code review comments
pthmas 2a0cea0
fix: defer setState calls in effects to satisfy react-hooks/set-state…
pthmas 5a61a1f
fix: disable polling entirely while SSE is connected
pthmas 8437389
fix: stop polling when SSE connected, fix cursor advance on failed fetch
pthmas 67bfb14
docs: fix /api/height reference to actual /api/status endpoint, docum…
pthmas 4d24fcc
fix: batch SSE prepend per frame to prevent burst block loss
pthmas File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| use axum::{ | ||
| extract::State, | ||
| response::sse::{Event, Sse}, | ||
| }; | ||
| use futures::stream::Stream; | ||
| use serde::Serialize; | ||
| use std::convert::Infallible; | ||
| use std::sync::Arc; | ||
| use std::time::Duration; | ||
| use tokio::time::interval; | ||
|
|
||
| use crate::AppState; | ||
| use atlas_common::Block; | ||
| use tracing::warn; | ||
|
|
||
| #[derive(Serialize)] | ||
| struct NewBlockEvent { | ||
| block: Block, | ||
| } | ||
|
|
||
| /// GET /api/events — Server-Sent Events stream for live block updates. | ||
| /// Polls the DB every 200ms and emits one `new_block` event per block, in order. | ||
| /// Never skips blocks — fetches all blocks since the last one sent. | ||
| pub async fn block_events( | ||
| State(state): State<Arc<AppState>>, | ||
| ) -> Sse<impl Stream<Item = Result<Event, Infallible>>> { | ||
| let stream = async_stream::stream! { | ||
| let mut last_block_number: Option<i64> = None; | ||
| let mut tick = interval(Duration::from_millis(200)); | ||
| let mut ping_counter: u32 = 0; | ||
|
|
||
| loop { | ||
| tick.tick().await; | ||
| ping_counter += 1; | ||
|
|
||
| // On first tick, seed with the latest block number | ||
| if last_block_number.is_none() { | ||
| let latest: Option<i64> = match sqlx::query_scalar("SELECT MAX(number) FROM blocks") | ||
| .fetch_one(&state.pool) | ||
| .await | ||
| { | ||
| Ok(v) => v, | ||
| Err(e) => { warn!(error = ?e, "sse: failed to query latest block number"); continue; } | ||
| }; | ||
|
|
||
| if let Some(max_num) = latest { | ||
| // Emit the current latest block as the initial event. | ||
| // Only advance the cursor after a successful fetch-and-emit so the | ||
| // block is not skipped if the fetch fails. | ||
| let block: Option<Block> = match sqlx::query_as( | ||
| "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at | ||
| FROM blocks WHERE number = $1" | ||
| ) | ||
| .bind(max_num) | ||
| .fetch_optional(&state.pool) | ||
| .await | ||
| { | ||
| Ok(v) => v, | ||
| Err(e) => { warn!(error = ?e, "sse: failed to fetch initial block"); continue; } | ||
| }; | ||
|
|
||
| if let Some(block) = block { | ||
| last_block_number = Some(block.number); | ||
| let event = NewBlockEvent { block }; | ||
| if let Ok(json) = serde_json::to_string(&event) { | ||
| yield Ok(Event::default().event("new_block").data(json)); | ||
| } | ||
| ping_counter = 0; | ||
| } | ||
| } | ||
| continue; | ||
| } | ||
|
|
||
| let cursor = last_block_number.unwrap(); | ||
|
|
||
| // Fetch ALL new blocks since last sent, in ascending order | ||
| let new_blocks: Vec<Block> = match sqlx::query_as( | ||
| "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at | ||
| FROM blocks WHERE number > $1 ORDER BY number ASC" | ||
| ) | ||
| .bind(cursor) | ||
| .fetch_all(&state.pool) | ||
| .await | ||
| { | ||
| Ok(rows) => rows, | ||
| Err(e) => { warn!(error = ?e, cursor, "sse: failed to fetch new blocks"); continue; } | ||
| }; | ||
|
|
||
| if !new_blocks.is_empty() { | ||
| ping_counter = 0; | ||
| } | ||
|
|
||
| // Emit one event per block, in order | ||
| for block in new_blocks { | ||
| last_block_number = Some(block.number); | ||
|
|
||
| let event = NewBlockEvent { block }; | ||
| if let Ok(json) = serde_json::to_string(&event) { | ||
| yield Ok(Event::default().event("new_block").data(json)); | ||
| } | ||
| } | ||
|
|
||
| // Send keep-alive ping every ~15s (75 ticks * 200ms) | ||
| if ping_counter >= 75 { | ||
| ping_counter = 0; | ||
| yield Ok(Event::default().comment("keep-alive")); | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| Sse::new(stream).keep_alive( | ||
| axum::response::sse::KeepAlive::new() | ||
| .interval(Duration::from_secs(15)) | ||
| .text("keep-alive"), | ||
| ) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,3 +66,5 @@ services: | |
|
|
||
| volumes: | ||
| pgdata: | ||
| external: true | ||
| name: atlas_pgdata | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,17 @@ | ||
| import { createContext } from 'react'; | ||
| import type { NewBlockEvent } from '../hooks/useBlockSSE'; | ||
|
|
||
| export interface BlockStats { | ||
| bps: number | null; | ||
| height: number | null; | ||
| latestBlockEvent: NewBlockEvent | null; | ||
| sseConnected: boolean; | ||
| } | ||
|
|
||
| export const BlockStatsContext = createContext<BlockStats>({ bps: null, height: null }); | ||
| export const BlockStatsContext = createContext<BlockStats>({ | ||
| bps: null, | ||
| height: null, | ||
| latestBlockEvent: null, | ||
| sseConnected: false, | ||
| }); | ||
|
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.