Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- To prevent false positives, non-public email addresses (e.g. `user@localhost`) are no longer scrubbed by default. ([#5737](https://github.com/getsentry/relay/pull/5737))

**Bug Fixes**:

- Explicitly reject in-flight requests during shutdown. ([#5746](https://github.com/getsentry/relay/pull/5746))

**Features**:

- Set `sentry.segment.id` and `sentry.segment.name` attributes on OTLP segment spans. ([#5748](https://github.com/getsentry/relay/pull/5748))
Expand Down
7 changes: 6 additions & 1 deletion relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ impl IntoResponse for BadStoreRequest {
};

metric!(counter(RelayCounters::EnvelopeRejected) += 1);
if response.status().is_server_error() {
if response.status() == http::StatusCode::SERVICE_UNAVAILABLE {
relay_log::warn!(
error = &self as &dyn std::error::Error,
"not handling request: service unavailable"
);
Comment on lines +146 to +149
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Downgraded from error to warning here because events are not necessarily lost, as long as the client retries 503s. We will still see the warning in Sentry.

} else if response.status().is_server_error() {
relay_log::error!(
error = &self as &dyn std::error::Error,
"error handling request"
Expand Down
36 changes: 26 additions & 10 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use ahash::RandomState;
use chrono::DateTime;
use chrono::Utc;
use relay_config::Config;
use relay_system::Receiver;
use relay_system::ServiceSpawn;
use relay_system::ServiceSpawnExt as _;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
use relay_system::{Controller, Shutdown};
use relay_system::{Receiver, ShutdownHandle};
use tokio::sync::watch;
use tokio::time::{Instant, timeout};

Expand Down Expand Up @@ -181,6 +181,7 @@ pub struct EnvelopeBufferMetrics {
pub struct ObservableEnvelopeBuffer {
addr: Addr<EnvelopeBuffer>,
metrics: Arc<EnvelopeBufferMetrics>,
shutdown_handle: ShutdownHandle,
}

impl ObservableEnvelopeBuffer {
Expand All @@ -191,15 +192,26 @@ impl ObservableEnvelopeBuffer {

/// Attempts to push an envelope into the envelope buffer.
///
/// Returns `false`, if the envelope buffer does not have enough capacity.
/// Returns `false`, if the envelope buffer does not have enough capacity,
/// or if the process is shutting down.
pub fn try_push(&self, envelope: Managed<Box<Envelope>>) -> Result<(), Managed<Box<Envelope>>> {
if self.has_capacity() {
let envelope = envelope.into();
self.addr.send(EnvelopeBuffer::Push(envelope));
Ok(())
} else {
Err(envelope)
if self.shutdown_handle.shutting_down() {
relay_log::trace!("Shutting down, envelope rejected");
return Err(envelope);
}

if self.addr.is_closed() {
// This should not happen as it should be covered by the branch above.
relay_log::error!("Pushing envelope after envelope buffer dropped");
return Err(envelope);
}

if !self.has_capacity() {
return Err(envelope);
}

self.addr.send(EnvelopeBuffer::Push(envelope.into()));
Ok(())
}

/// Returns `true` if the buffer has the capacity to accept more elements.
Expand Down Expand Up @@ -271,10 +283,14 @@ impl EnvelopeBufferService {
/// Returns both the [`Addr`] to this service, and references to spooler metrics.
pub fn start_in(self, services: &dyn ServiceSpawn) -> ObservableEnvelopeBuffer {
let metrics = self.metrics.clone();

let addr = services.start(self);
let shutdown_handle = Controller::shutdown_handle();

ObservableEnvelopeBuffer { addr, metrics }
ObservableEnvelopeBuffer {
addr,
metrics,
shutdown_handle,
}
}

/// Wait for the configured amount of time and make sure the project cache is ready to receive.
Expand Down
6 changes: 6 additions & 0 deletions relay-system/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ static MANUAL_SHUTDOWN: LazyLock<Channel<ShutdownMode>> = LazyLock::new(|| watch
///
/// This handle is returned by [`Controller::shutdown_handle`].
// TODO: The receiver of this message can not yet signal they have completed shutdown.
#[derive(Debug, Clone)]
pub struct ShutdownHandle(watch::Receiver<Option<Shutdown>>);

impl ShutdownHandle {
Expand Down Expand Up @@ -97,6 +98,11 @@ impl ShutdownHandle {
}
}
}

/// Returns `true` when the shutdown signal has been sent.
pub fn shutting_down(&self) -> bool {
self.0.borrow().is_some()
}
}

/// Service to start and gracefully stop the system runtime.
Expand Down
7 changes: 7 additions & 0 deletions relay-system/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,13 @@ impl<I: Interface> Addr<I> {
self.len() == 0
}

/// Returns wether the queue is closed.
///
/// This happens when the receiving service has stopped running.
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}

/// Returns the current queue size.
pub fn len(&self) -> u64 {
self.queue_size.load(Ordering::Relaxed)
Expand Down
74 changes: 74 additions & 0 deletions tests/integration/test_shutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import json
import os
import signal
import socket
import time
import tempfile


def test_graceful_shutdown(mini_sentry, relay):
"""
On SIGTERM, relay completes any in-flight HTTP requests before shutting down.
New connections are rejected once the TCP listener closes.

We test this by holding a request in-flight at the TCP level: send the HTTP
headers and the first byte of the body, then send SIGTERM, then send the rest
of the body. Relay must process and respond to the request before it exits.
"""
project_id = 42
mini_sentry.add_basic_project_config(project_id)

with tempfile.TemporaryDirectory() as db_dir:
db_file_path = os.path.join(db_dir, "database.db")

relay = relay(
mini_sentry,
options={
"limits": {"shutdown_timeout": 5},
"spool": {"envelopes": {"path": db_file_path}},
},
)

host, port = relay.server_address
dsn_key = relay.get_dsn_public_key(project_id)

body = json.dumps({"message": "in-flight during shutdown"}).encode()
request = (
f"POST /api/{project_id}/store/ HTTP/1.1\r\n"
f"Host: {host}:{port}\r\n"
f"Content-Type: application/json\r\n"
f"X-Sentry-Auth: Sentry sentry_version=7, sentry_key={dsn_key}\r\n"
f"Content-Length: {len(body)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode() + body

# Open a raw TCP connection and send everything up to the last byte.
# Relay is now holding the connection open waiting for the body to complete —
# the request is in-flight.
sock = socket.create_connection((host, port))
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.sendall(request[:-1])

# Trigger graceful shutdown while the request is in-flight.
relay.process.send_signal(signal.SIGTERM)
time.sleep(0.05)

# Complete the request — relay will respond with Service Unavailable.
sock.sendall(request[-1:])

sock.settimeout(10)
response = b""
while chunk := sock.recv(4096):
response += chunk
sock.close()

assert b"HTTP/1.1 503" in response

# After relay exits, new connections are refused.
relay.wait_for_exit(timeout=10)
try:
socket.create_connection((host, port))
assert False, "Expected connection to be refused after relay exited"
except ConnectionRefusedError:
pass
13 changes: 8 additions & 5 deletions tests/integration/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,21 @@ def test_store_with_low_memory(mini_sentry, relay):
mini_sentry.add_basic_project_config(project_id)

try:
with pytest.raises(HTTPError):
with pytest.raises(HTTPError) as exc:
relay.send_event(project_id, {"message": "pls ignore"})

assert exc.value.response.status_code == 503

pytest.raises(queue.Empty, lambda: mini_sentry.get_captured_envelope(timeout=1))

found_queue_error = False
found_memory_error = False
for _, error in mini_sentry.current_test_failures():
assert isinstance(error, AssertionError)
if "failed to queue envelope" in str(error):
found_queue_error = True
if "Not enough memory" in str(error):
found_memory_error = True
break

assert found_queue_error
assert found_memory_error
finally:
mini_sentry.clear_test_failures()

Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,12 @@ def do_upload():
assert r.json() == {
"detail": "upload error: loadshed",
"causes": ["loadshed"],
}
}, r.text
else:
assert r.json() == {
"detail": "upload error: request timeout: deadline has elapsed",
"causes": [
"request timeout: deadline has elapsed",
"deadline has elapsed",
],
}
}, r.text
Loading