Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ bin/
pkg/
target/
.wrangler/
.edgezero/

# env
.env
Expand Down
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ log = "0.4"
log-fastly = "0.11"
matchit = "0.9"
once_cell = "1"
redb = "3.1.0"
reqwest = { version = "0.13", default-features = false, features = ["rustls"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand Down
16 changes: 14 additions & 2 deletions crates/edgezero-adapter-axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@ license = { workspace = true }
[features]
default = ["axum"]
axum = ["dep:axum", "dep:tokio", "dep:tower", "dep:futures-util", "dep:reqwest"]
cli = ["dep:edgezero-adapter", "edgezero-adapter/cli", "dep:ctor", "dep:toml", "dep:walkdir"]
cli = [
"dep:edgezero-adapter",
"edgezero-adapter/cli",
"dep:ctor",
"dep:toml",
"dep:walkdir",
]

[dependencies]
edgezero-adapter = { path = "../edgezero-adapter", optional = true, features = ["cli"] }
edgezero-adapter = { path = "../edgezero-adapter", optional = true, features = [
"cli",
] }
edgezero-core = { path = "../edgezero-core" }
anyhow = { workspace = true }
async-trait = { workspace = true }
Expand All @@ -22,17 +30,21 @@ futures = { workspace = true }
futures-util = { workspace = true, optional = true }
http = { workspace = true }
log = { workspace = true }
redb = { workspace = true }
reqwest = { workspace = true, optional = true }
simple_logger = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, optional = true }
toml = { workspace = true, optional = true }
tower = { workspace = true, optional = true }
serde_json = { workspace = true }
tracing = { workspace = true }
walkdir = { workspace = true, optional = true }
web-time = { workspace = true }

[dev-dependencies]
async-trait = { workspace = true }
axum = { workspace = true, features = ["macros"] }
serde = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }
222 changes: 218 additions & 4 deletions crates/edgezero-adapter-axum/src/dev_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@ impl AxumDevServer {
}

#[cfg(test)]
async fn run_with_listener(self, listener: tokio::net::TcpListener) -> anyhow::Result<()> {
async fn run_with_listener(
self,
listener: tokio::net::TcpListener,
kv_path: &str,
) -> anyhow::Result<()> {
let AxumDevServer { router, config } = self;
serve_with_listener(router, listener, config.enable_ctrl_c).await
serve_with_listener_and_kv_path(router, listener, config.enable_ctrl_c, kv_path).await
}
}

Expand All @@ -85,7 +89,25 @@ async fn serve_with_listener(
listener: tokio::net::TcpListener,
enable_ctrl_c: bool,
) -> anyhow::Result<()> {
let service = EdgeZeroAxumService::new(router);
serve_with_listener_and_kv_path(router, listener, enable_ctrl_c, ".edgezero/kv.redb").await
}

async fn serve_with_listener_and_kv_path(
router: RouterService,
listener: tokio::net::TcpListener,
enable_ctrl_c: bool,
kv_path: &str,
) -> anyhow::Result<()> {
// Create a persistent KV store
if let Some(parent) = std::path::Path::new(kv_path).parent() {
std::fs::create_dir_all(parent).context("failed to create KV store directory")?;
}
let kv_store = std::sync::Arc::new(
crate::kv::PersistentKvStore::new(kv_path).context("failed to create KV store")?,
);
let kv_handle = edgezero_core::kv::KvHandle::new(kv_store);

let service = EdgeZeroAxumService::new(router).with_kv_handle(kv_handle);
let router = Router::new().fallback_service(service_fn(move |req| {
let mut svc = service.clone();
async move { svc.call(req).await }
Expand Down Expand Up @@ -204,6 +226,7 @@ mod integration_tests {
struct TestServer {
base_url: String,
handle: tokio::task::JoinHandle<()>,
_temp_dir: tempfile::TempDir,
}

async fn start_test_server(router: RouterService) -> TestServer {
Expand All @@ -217,13 +240,19 @@ mod integration_tests {
};
let server = AxumDevServer::with_config(router, config);

// Use a unique temp directory for each test server
let temp_dir = tempfile::tempdir().expect("create temp dir");
let kv_path = temp_dir.path().join("kv.redb");
let kv_path_str = kv_path.to_str().expect("valid path").to_string();

let handle = tokio::spawn(async move {
let _ = server.run_with_listener(listener).await;
let _ = server.run_with_listener(listener, &kv_path_str).await;
});

TestServer {
base_url: format!("http://{}", addr),
handle,
_temp_dir: temp_dir,
}
}

Expand Down Expand Up @@ -358,4 +387,189 @@ mod integration_tests {

drop(listener);
}

#[tokio::test(flavor = "multi_thread")]
async fn kv_store_persists_across_requests() {
async fn write_handler(ctx: RequestContext) -> Result<&'static str, EdgeError> {
let store = ctx.kv_handle().expect("kv configured");
store.put("counter", &42i32).await?;
Ok("written")
}

async fn read_handler(ctx: RequestContext) -> Result<String, EdgeError> {
let store = ctx.kv_handle().expect("kv configured");
let val: i32 = store.get_or("counter", 0).await?;
Ok(val.to_string())
}

let router = RouterService::builder()
.post("/write", write_handler)
.get("/read", read_handler)
.build();
let server = start_test_server(router).await;

let client = reqwest::Client::new();

// Write a value
let write_url = format!("{}/write", server.base_url);
let response = send_with_retry(&client, |client| client.post(write_url.as_str())).await;
assert_eq!(response.status(), reqwest::StatusCode::OK);
assert_eq!(response.text().await.unwrap(), "written");

// Read it back — proves shared state across requests
let read_url = format!("{}/read", server.base_url);
let response = send_with_retry(&client, |client| client.get(read_url.as_str())).await;
assert_eq!(response.status(), reqwest::StatusCode::OK);
assert_eq!(response.text().await.unwrap(), "42");

server.handle.abort();
}

#[tokio::test(flavor = "multi_thread")]
async fn kv_store_delete_across_requests() {
async fn write_handler(ctx: RequestContext) -> Result<&'static str, EdgeError> {
let kv = ctx.kv_handle().expect("kv configured");
kv.put("temp", &"to_delete").await?;
Ok("written")
}

async fn delete_handler(ctx: RequestContext) -> Result<&'static str, EdgeError> {
let kv = ctx.kv_handle().expect("kv configured");
kv.delete("temp").await?;
Ok("deleted")
}

async fn check_handler(ctx: RequestContext) -> Result<String, EdgeError> {
let kv = ctx.kv_handle().expect("kv configured");
let exists = kv.exists("temp").await?;
Ok(format!("exists={exists}"))
}

let router = RouterService::builder()
.post("/write", write_handler)
.post("/delete", delete_handler)
.get("/check", check_handler)
.build();
let server = start_test_server(router).await;
let client = reqwest::Client::new();

// Write
let url = format!("{}/write", server.base_url);
send_with_retry(&client, |c| c.post(url.as_str())).await;

// Verify exists
let url = format!("{}/check", server.base_url);
let resp = send_with_retry(&client, |c| c.get(url.as_str())).await;
assert_eq!(resp.text().await.unwrap(), "exists=true");

// Delete
let url = format!("{}/delete", server.base_url);
send_with_retry(&client, |c| c.post(url.as_str())).await;

// Verify gone
let url = format!("{}/check", server.base_url);
let resp = send_with_retry(&client, |c| c.get(url.as_str())).await;
assert_eq!(resp.text().await.unwrap(), "exists=false");

server.handle.abort();
}

#[tokio::test(flavor = "multi_thread")]
async fn kv_store_update_across_requests() {
async fn increment_handler(ctx: RequestContext) -> Result<String, EdgeError> {
let kv = ctx.kv_handle().expect("kv configured");
let val = kv.update("counter", 0i32, |n| n + 1).await?;
Ok(val.to_string())
}

let router = RouterService::builder()
.post("/inc", increment_handler)
.build();
let server = start_test_server(router).await;
let client = reqwest::Client::new();
let url = format!("{}/inc", server.base_url);

// Increment 5 times, each should return incremented value
for expected in 1..=5i32 {
let resp = send_with_retry(&client, |c| c.post(url.as_str())).await;
assert_eq!(
resp.text().await.unwrap(),
expected.to_string(),
"increment #{expected}"
);
}

server.handle.abort();
}

#[tokio::test(flavor = "multi_thread")]
async fn kv_store_returns_not_found_gracefully() {
async fn read_handler(ctx: RequestContext) -> Result<String, EdgeError> {
let kv = ctx.kv_handle().expect("kv configured");
let val: i32 = kv.get_or("nonexistent", -1).await?;
Ok(val.to_string())
}

let router = RouterService::builder().get("/read", read_handler).build();
let server = start_test_server(router).await;
let client = reqwest::Client::new();

let url = format!("{}/read", server.base_url);
let resp = send_with_retry(&client, |c| c.get(url.as_str())).await;
assert_eq!(resp.status(), reqwest::StatusCode::OK);
assert_eq!(resp.text().await.unwrap(), "-1");

server.handle.abort();
}

#[tokio::test(flavor = "multi_thread")]
async fn kv_store_handles_typed_data() {
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, PartialEq, Debug)]
struct UserProfile {
name: String,
age: u32,
active: bool,
}

async fn write_handler(ctx: RequestContext) -> Result<&'static str, EdgeError> {
let kv = ctx.kv_handle().expect("kv configured");
let profile = UserProfile {
name: "Alice".to_string(),
age: 30,
active: true,
};
kv.put("user:alice", &profile).await?;
Ok("saved")
}

async fn read_handler(ctx: RequestContext) -> Result<String, EdgeError> {
let kv = ctx.kv_handle().expect("kv configured");
let profile: Option<UserProfile> = kv.get("user:alice").await?;
match profile {
Some(p) => Ok(format!("{}:{}", p.name, p.age)),
None => Ok("not found".to_string()),
}
}

let router = RouterService::builder()
.post("/save", write_handler)
.get("/load", read_handler)
.build();
let server = start_test_server(router).await;
let client = reqwest::Client::new();

// Save profile
let url = format!("{}/save", server.base_url);
let resp = send_with_retry(&client, |c| c.post(url.as_str())).await;
assert_eq!(resp.text().await.unwrap(), "saved");

// Load profile
let url = format!("{}/load", server.base_url);
let resp = send_with_retry(&client, |c| c.get(url.as_str())).await;
assert_eq!(resp.text().await.unwrap(), "Alice:30");

server.handle.abort();
}
}
Loading