-
Notifications
You must be signed in to change notification settings - Fork 41
fix: scale-to-zero and shutdown improvements #150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,9 +6,11 @@ import ( | |
| "fmt" | ||
| "os" | ||
| "os/exec" | ||
| "path/filepath" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/hashicorp/go-multierror" | ||
| "github.com/onkernel/kernel-images/server/lib/devtoolsproxy" | ||
| "github.com/onkernel/kernel-images/server/lib/logger" | ||
| "github.com/onkernel/kernel-images/server/lib/nekoclient" | ||
|
|
@@ -29,8 +31,9 @@ type ApiService struct { | |
| watches map[string]*fsWatch | ||
|
|
||
| // Process management | ||
| procMu sync.RWMutex | ||
| procs map[string]*processHandle | ||
| procMu sync.RWMutex | ||
| procs map[string]*processHandle | ||
| shuttingDown bool | ||
|
|
||
| // Neko authenticated client | ||
| nekoAuthClient *nekoclient.AuthClient | ||
|
|
@@ -297,6 +300,55 @@ func (s *ApiService) ListRecorders(ctx context.Context, _ oapi.ListRecordersRequ | |
| return oapi.ListRecorders200JSONResponse(infos), nil | ||
| } | ||
|
|
||
| // killAllProcesses sends SIGKILL to every tracked process that is still running. | ||
| // It acquires the write lock and sets shuttingDown so that ProcessSpawn rejects | ||
| // new processes once the kill pass begins. | ||
| func (s *ApiService) killAllProcesses(ctx context.Context) error { | ||
| log := logger.FromContext(ctx) | ||
| s.procMu.Lock() | ||
| defer s.procMu.Unlock() | ||
| s.shuttingDown = true | ||
|
|
||
| var result *multierror.Error | ||
| for id, h := range s.procs { | ||
| if h.state() != "running" { | ||
| continue | ||
| } | ||
| if h.cmd.Process == nil { | ||
| continue | ||
| } | ||
| // supervisorctl handles the lifecycle of long running processes so we don't want to kill | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure we should make an exception for supervisorctl here. if someone sent a process exec for a supervisorctl command it shouldn't matter — we're going to hard reset supervisor services anyway, right? or are we doing our own supervisorctl hard-reset of things like chromium during server shutdown? that feels a little weird but i could live with it — just want to make sure the reasoning is clear. |
||
| // any active supervisorctl processes. For example it is used to restart kernel-images-api | ||
| // and killing that process would break the restart process. | ||
| if filepath.Base(h.cmd.Path) == "supervisorctl" { | ||
| continue | ||
| } | ||
| if err := h.cmd.Process.Kill(); err != nil { | ||
| // A process may already have exited between the state check and the | ||
| // kill call; treat that as a benign race rather than a fatal error. | ||
| if !errors.Is(err, os.ErrProcessDone) { | ||
| result = multierror.Append(result, fmt.Errorf("process %s: %w", id, err)) | ||
| log.Error("failed to kill process", "process_id", id, "err", err) | ||
| } | ||
| } | ||
| } | ||
| return result.ErrorOrNil() | ||
| } | ||
|
|
||
| func (s *ApiService) Shutdown(ctx context.Context) error { | ||
| return s.recordManager.StopAll(ctx) | ||
| var wg sync.WaitGroup | ||
| var killErr, stopErr error | ||
|
|
||
| wg.Add(2) | ||
| go func() { | ||
| defer wg.Done() | ||
| killErr = s.killAllProcesses(ctx) | ||
| }() | ||
| go func() { | ||
| defer wg.Done() | ||
| stopErr = s.recordManager.StopAll(ctx) | ||
| }() | ||
| wg.Wait() | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return multierror.Append(killErr, stopErr).ErrorOrNil() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -323,8 +323,14 @@ func (s *ApiService) ProcessSpawn(ctx context.Context, request oapi.ProcessSpawn | |
| doneCh: make(chan struct{}), | ||
| } | ||
|
|
||
| // Store handle | ||
| // Store handle; reject if the server is shutting down. | ||
| s.procMu.Lock() | ||
| if s.shuttingDown { | ||
| s.procMu.Unlock() | ||
| // The process was already started; kill it immediately. | ||
| _ = cmd.Process.Kill() | ||
| return oapi.ProcessSpawn500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "server is shutting down"}}, nil | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shutdown spawn path leaks process lifecycleMedium Severity When |
||
| if s.procs == nil { | ||
| s.procs = make(map[string]*processHandle) | ||
| } | ||
|
|
@@ -624,7 +630,6 @@ func (s *ApiService) ProcessResize(ctx context.Context, request oapi.ProcessResi | |
| return oapi.ProcessResize200JSONResponse(oapi.OkResponse{Ok: true}), nil | ||
| } | ||
|
|
||
|
|
||
| // writeJSON writes a JSON response with the given status code. | ||
| // Unlike http.Error, this sets the correct Content-Type for JSON. | ||
| func writeJSON(w http.ResponseWriter, status int, body string) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -274,6 +274,9 @@ func main() { | |
| defer shutdownCancel() | ||
| g, _ := errgroup.WithContext(shutdownCtx) | ||
|
|
||
| g.Go(func() error { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| return stz.Drain(shutdownCtx) | ||
| }) | ||
| g.Go(func() error { | ||
| return srv.Shutdown(shutdownCtx) | ||
| }) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| package scaletozero | ||
|
|
||
| import ( | ||
| "net/http" | ||
| "net/http/httptest" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestMiddlewareDisablesAndEnablesForExternalAddr(t *testing.T) { | ||
| t.Parallel() | ||
| mock := &mockScaleToZeroer{} | ||
| handler := Middleware(mock)(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| w.WriteHeader(http.StatusOK) | ||
| })) | ||
|
|
||
| req := httptest.NewRequest(http.MethodGet, "/", nil) | ||
| req.RemoteAddr = "203.0.113.50:12345" | ||
| rec := httptest.NewRecorder() | ||
|
|
||
| handler.ServeHTTP(rec, req) | ||
|
|
||
| assert.Equal(t, http.StatusOK, rec.Code) | ||
| assert.Equal(t, 1, mock.disableCalls) | ||
| assert.Equal(t, 1, mock.enableCalls) | ||
| } | ||
|
|
||
| func TestMiddlewareSkipsLoopbackAddrs(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| loopbackAddrs := []struct { | ||
| name string | ||
| addr string | ||
| }{ | ||
| {"loopback-v4", "127.0.0.1:8080"}, | ||
| {"loopback-v6", "[::1]:8080"}, | ||
| } | ||
|
|
||
| for _, tc := range loopbackAddrs { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| t.Parallel() | ||
| mock := &mockScaleToZeroer{} | ||
| var called bool | ||
| handler := Middleware(mock)(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| called = true | ||
| w.WriteHeader(http.StatusOK) | ||
| })) | ||
|
|
||
| req := httptest.NewRequest(http.MethodGet, "/", nil) | ||
| req.RemoteAddr = tc.addr | ||
| rec := httptest.NewRecorder() | ||
|
|
||
| handler.ServeHTTP(rec, req) | ||
|
|
||
| assert.True(t, called, "handler should still be called") | ||
| assert.Equal(t, http.StatusOK, rec.Code) | ||
| assert.Equal(t, 0, mock.disableCalls, "should not disable for loopback addr") | ||
| assert.Equal(t, 0, mock.enableCalls, "should not enable for loopback addr") | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestMiddlewareDisableError(t *testing.T) { | ||
| t.Parallel() | ||
| mock := &mockScaleToZeroer{disableErr: assert.AnError} | ||
| var called bool | ||
| handler := Middleware(mock)(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| called = true | ||
| })) | ||
|
|
||
| req := httptest.NewRequest(http.MethodGet, "/", nil) | ||
| req.RemoteAddr = "203.0.113.50:12345" | ||
| rec := httptest.NewRecorder() | ||
|
|
||
| handler.ServeHTTP(rec, req) | ||
|
|
||
| assert.False(t, called, "handler should not be called on disable error") | ||
| assert.Equal(t, http.StatusInternalServerError, rec.Code) | ||
| assert.Equal(t, 0, mock.enableCalls) | ||
| } | ||
|
|
||
| func TestIsLoopbackAddr(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| tests := []struct { | ||
| addr string | ||
| loopback bool | ||
| }{ | ||
| // Loopback | ||
| {"127.0.0.1:80", true}, | ||
| {"[::1]:80", true}, | ||
| {"127.0.0.1", true}, | ||
| {"::1", true}, | ||
| // Non-loopback | ||
| {"10.0.0.1:80", false}, | ||
| {"172.16.0.1:80", false}, | ||
| {"192.168.1.1:80", false}, | ||
| {"203.0.113.50:80", false}, | ||
| {"8.8.8.8:53", false}, | ||
| {"[2001:db8::1]:80", false}, | ||
| // Unparseable | ||
| {"not-an-ip:80", false}, | ||
| {"", false}, | ||
| } | ||
|
|
||
| for _, tc := range tests { | ||
| t.Run(tc.addr, func(t *testing.T) { | ||
| t.Parallel() | ||
| require.Equal(t, tc.loopback, isLoopbackAddr(tc.addr)) | ||
| }) | ||
| } | ||
| } |


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is new behavior — we'll need to reach out to heavy browser pool users and make sure they don't depend on process execs carrying over between session re-use.