Skip to content

Web socket in Rust#3039

Merged
vigoo merged 30 commits intomainfrom
web_socket
Apr 1, 2026
Merged

Web socket in Rust#3039
vigoo merged 30 commits intomainfrom
web_socket

Conversation

@afsalthaj
Copy link
Copy Markdown
Contributor

@afsalthaj afsalthaj commented Mar 19, 2026

Fixes #3053

Rust Agent Websocket

// In test-components/host-api-tests/components-rust/golem-it-host-api-tests/src/websocket.rs
use golem_rust::{agent_definition, agent_implementation};
use golem_rust::{WebsocketConnection, WebsocketMessage};

#[agent_definition]
pub trait WebsocketTest {
    fn new(name: String) -> Self;
    fn echo(&self, url: String, msg: String) -> String;
}

pub struct WebsocketTestImpl { _name: String }

#[agent_implementation]
impl WebsocketTest for WebsocketTestImpl {
    fn new(name: String) -> Self { Self { _name: name } }

    fn echo(&self, url: String, msg: String) -> String {
        let ws = WebsocketConnection::connect(&url, None).expect("connect");
        ws.send(&WebsocketMessage::Text(msg)).expect("send");
        match ws.receive().expect("receive") {
            Message::Text(t) => t,
            Message::Binary(b) => format!("{} bytes", b.len()),
        }
    }
}
image

@vigoo
Copy link
Copy Markdown
Contributor

vigoo commented Mar 20, 2026

Looks good so far. Let's have a per-executor semaphor to constraint the number of parallel websocket connections (we already have it for regular http connections). This way we protect the executor from overloading with ws sockets. (Let the connect rather wait than the whole server overloading or running out of sockets)

@afsalthaj afsalthaj changed the title Web socket (Rust) Web socket Mar 23, 2026
@afsalthaj afsalthaj changed the title Web socket Web socket in Rust Mar 25, 2026
@afsalthaj afsalthaj marked this pull request as ready for review March 25, 2026 04:35
Comment thread golem-worker-executor/src/durable_host/websocket/client.rs Outdated
Comment thread integration-tests/tests/worker.rs Outdated
#[test]
#[tracing::instrument]
#[timeout("2m")]
async fn websocket_echo_rust(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these not worker-executor tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I tried to place these tests in ehre, and was failing with registry service errors.. I can try again

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be a better place, as we can much quicker and easier iterate on it and this is an executor feature (not depending on any other service)

@mschuwalow
Copy link
Copy Markdown
Contributor

This has no durability at all, right? Would it make sense to only allow using websocket connections within a persist-nothing region? Otherwise we are going to reexecute (the potentially side-effecting) connections during replay

@vigoo
Copy link
Copy Markdown
Contributor

vigoo commented Mar 26, 2026

This has no durability at all, right? Would it make sense to only allow using websocket connections within a persist-nothing region? Otherwise we are going to reexecute (the potentially side-effecting) connections during replay

It will have durability, I asked for adding it in two new phases on top of this.


#[async_trait::async_trait]
impl wasmtime_wasi::p2::Pollable for WebSocketConnectionEntry {
async fn ready(&mut self) {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is no-op, we cannot to async websocket communication in our agents.
A minimal version would be something like:

  • a separate blocking_receive (the current receive) and non-blocking receive (returns None if there is no current message to receive)
  • this ready() function async-blocks until we know that the next call to receive will return Some

With that it's possible to mix async ws reads with other operations in the agent. (And send is still always blocking).

Depending on how send is implemented it may make sense to make an async send too (if there is backpressure by some limited output queue). In that case we may need two different subscribes.

The most advanced and idiomatic version would probably be if a connected websocket would materialize in a wasi InputStream and OutputStream (with properly implemented read() for each - like wasi-http input/output streams do). I don't remember if this is possible without touching the wasmtime fork though.

Comment thread golem-worker-executor/src/durable_host/websocket/client.rs Outdated
Comment thread golem-worker-executor/src/durable_host/websocket/client.rs Outdated
Comment thread golem-worker-executor/src/durable_host/websocket/client.rs Outdated
}
}

fn poll_for_message(&self, url: String, timeout_ms: u64) -> Result<String, String> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's write a "real" async test as well, which is an async agent method and uses wstd to convert the pollable to a future, and does real bidiretional messaging (multiple sends, multiple receives, receive being async through wstd)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.into_return_value()
.ok_or_else(|| anyhow!("expected return value"))?;

assert_eq!(result, Value::Result(Ok(Some(Box::new(Value::String(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small note (only fix if you do other changes anyway): the only reason we do explicit drop of the executor and stop of the servers in the end of the test is to put the assertions after them. this way logs coming from stopping the executor are always before the actual assertion failure

.map_err(|e| format!("Send error: {:?}", e))?;

// Convert websocket pollable to an async future via wstd.
let pollable = ws.subscribe();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you just told me a few hours ago to do a single subscribe outside the loop :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:D hehe yes. I can move that out

@vigoo vigoo merged commit f5da4b7 into main Apr 1, 2026
40 checks passed
@vigoo vigoo deleted the web_socket branch April 1, 2026 13:02
@github-actions github-actions bot locked and limited conversation to collaborators Apr 1, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Websocket for Rust agents

3 participants