Conversation
|
Looks good so far. Let's have a per-executor semaphor to constraint the number of parallel websocket connections (we already have it for regular http connections). This way we protect the executor from overloading with ws sockets. (Let the |
| #[test] | ||
| #[tracing::instrument] | ||
| #[timeout("2m")] | ||
| async fn websocket_echo_rust( |
There was a problem hiding this comment.
Why are these not worker-executor tests?
There was a problem hiding this comment.
I think I tried to place these tests in ehre, and was failing with registry service errors.. I can try again
There was a problem hiding this comment.
It would be a better place, as we can much quicker and easier iterate on it and this is an executor feature (not depending on any other service)
|
This has no durability at all, right? Would it make sense to only allow using websocket connections within a persist-nothing region? Otherwise we are going to reexecute (the potentially side-effecting) connections during replay |
It will have durability, I asked for adding it in two new phases on top of this. |
|
|
||
| #[async_trait::async_trait] | ||
| impl wasmtime_wasi::p2::Pollable for WebSocketConnectionEntry { | ||
| async fn ready(&mut self) {} |
There was a problem hiding this comment.
If this is no-op, we cannot to async websocket communication in our agents.
A minimal version would be something like:
- a separate
blocking_receive(the currentreceive) and non-blockingreceive(returnsNoneif there is no current message to receive) - this
ready()function async-blocks until we know that the next call toreceivewill returnSome
With that it's possible to mix async ws reads with other operations in the agent. (And send is still always blocking).
Depending on how send is implemented it may make sense to make an async send too (if there is backpressure by some limited output queue). In that case we may need two different subscribes.
The most advanced and idiomatic version would probably be if a connected websocket would materialize in a wasi InputStream and OutputStream (with properly implemented read() for each - like wasi-http input/output streams do). I don't remember if this is possible without touching the wasmtime fork though.
| } | ||
| } | ||
|
|
||
| fn poll_for_message(&self, url: String, timeout_ms: u64) -> Result<String, String> { |
There was a problem hiding this comment.
Let's write a "real" async test as well, which is an async agent method and uses wstd to convert the pollable to a future, and does real bidiretional messaging (multiple sends, multiple receives, receive being async through wstd)
| .into_return_value() | ||
| .ok_or_else(|| anyhow!("expected return value"))?; | ||
|
|
||
| assert_eq!(result, Value::Result(Ok(Some(Box::new(Value::String( |
There was a problem hiding this comment.
Small note (only fix if you do other changes anyway): the only reason we do explicit drop of the executor and stop of the servers in the end of the test is to put the assertions after them. this way logs coming from stopping the executor are always before the actual assertion failure
| .map_err(|e| format!("Send error: {:?}", e))?; | ||
|
|
||
| // Convert websocket pollable to an async future via wstd. | ||
| let pollable = ws.subscribe(); |
There was a problem hiding this comment.
you just told me a few hours ago to do a single subscribe outside the loop :)
There was a problem hiding this comment.
:D hehe yes. I can move that out
Fixes #3053
Rust Agent Websocket