diff --git a/pkg/rest/autoplace.go b/pkg/rest/autoplace.go index 170e86e5..13a08b58 100644 --- a/pkg/rest/autoplace.go +++ b/pkg/rest/autoplace.go @@ -185,7 +185,12 @@ func (s *Server) handleResourceGet(w http.ResponseWriter, r *http.Request) { rdName := r.PathValue("rd") node := r.PathValue("node") - _, err := s.Store.ResourceDefinitions().Get(r.Context(), rdName) + // CreateVolume / Attach hot path: autoplace, spawn and + // make-available create replicas server-side and the caller reads + // the replica back immediately; both the RD probe and the replica + // read may be served by a cache that still trails those writes — + // see pkg/rest/cache_retry.go. + _, err := getRDWithCacheRetry(r.Context(), s.Store, rdName) if err != nil { if errors.Is(err, store.ErrNotFound) { writeError(w, http.StatusNotFound, @@ -200,7 +205,7 @@ func (s *Server) handleResourceGet(w http.ResponseWriter, r *http.Request) { return } - res, err := s.Store.Resources().Get(r.Context(), rdName, node) + res, err := getResourceWithCacheRetry(r.Context(), s.Store, rdName, node) if err != nil { writeStoreError(w, err) diff --git a/pkg/rest/cache_retry.go b/pkg/rest/cache_retry.go index 1da7d31f..d197901f 100644 --- a/pkg/rest/cache_retry.go +++ b/pkg/rest/cache_retry.go @@ -20,6 +20,7 @@ package rest import ( "context" + "fmt" "time" "github.com/cockroachdb/errors" @@ -57,25 +58,32 @@ const ( cacheRetryDelay = 200 * time.Millisecond ) -// getRGWithCacheRetry returns the ResourceGroup `name`, retrying on -// store.ErrNotFound to absorb informer-cache lag after a fresh -// write that landed on a sibling apiserver replica. Any non-NotFound -// error (transport, decode, …) is returned immediately. Context -// cancellation aborts the retry loop. -func getRGWithCacheRetry(ctx context.Context, st store.Store, name string) (apiv1.ResourceGroup, error) { +// getWithCacheRetry is the shared retry-on-NotFound loop behind every +// get*WithCacheRetry helper. `read` is re-invoked up to +// cacheRetryAttempts times while it keeps returning store.ErrNotFound, +// with cacheRetryDelay between attempts; the first success or +// non-NotFound error (transport, decode, …) returns immediately. +// Context cancellation aborts the retry loop. `what` names the read +// for error wrapping (e.g. `get resource group "rg-1"`). +// +// A real NotFound (object never existed) still surfaces after the +// budget — callers keep their 404 contract, just (attempts-1)*delay +// later on the miss path. Steady-state hits pay zero extra latency. +func getWithCacheRetry[T any](ctx context.Context, what string, read func(context.Context) (T, error)) (T, error) { var ( - rg apiv1.ResourceGroup - err error + out T + zero T + err error ) for attempt := range cacheRetryAttempts { - rg, err = st.ResourceGroups().Get(ctx, name) + out, err = read(ctx) if err == nil { - return rg, nil + return out, nil } if !errors.Is(err, store.ErrNotFound) { - return apiv1.ResourceGroup{}, errors.Wrapf(err, "get resource group %q", name) + return zero, errors.Wrap(err, what) } if attempt == cacheRetryAttempts-1 { @@ -84,12 +92,24 @@ func getRGWithCacheRetry(ctx context.Context, st store.Store, name string) (apiv select { case <-ctx.Done(): - return apiv1.ResourceGroup{}, errors.Wrap(ctx.Err(), "get resource group: context cancelled") + return zero, errors.Wrap(ctx.Err(), what+": context cancelled") case <-time.After(cacheRetryDelay): } } - return apiv1.ResourceGroup{}, errors.Wrapf(err, "get resource group %q after %d retries", name, cacheRetryAttempts) + return zero, errors.Wrapf(err, "%s after %d retries", what, cacheRetryAttempts) +} + +// getRGWithCacheRetry returns the ResourceGroup `name`, retrying on +// store.ErrNotFound to absorb informer-cache lag after a fresh +// write that landed on a sibling apiserver replica. Any non-NotFound +// error (transport, decode, …) is returned immediately. Context +// cancellation aborts the retry loop. +func getRGWithCacheRetry(ctx context.Context, st store.Store, name string) (apiv1.ResourceGroup, error) { + return getWithCacheRetry(ctx, fmt.Sprintf("get resource group %q", name), + func(ctx context.Context) (apiv1.ResourceGroup, error) { + return st.ResourceGroups().Get(ctx, name) + }) } // getRDWithCacheRetry returns the ResourceDefinition `name`, retrying @@ -97,33 +117,10 @@ func getRGWithCacheRetry(ctx context.Context, st store.Store, name string) (apiv // write that landed on a sibling apiserver replica. Same semantics as // getRGWithCacheRetry. func getRDWithCacheRetry(ctx context.Context, st store.Store, name string) (apiv1.ResourceDefinition, error) { - var ( - rd apiv1.ResourceDefinition - err error - ) - - for attempt := range cacheRetryAttempts { - rd, err = st.ResourceDefinitions().Get(ctx, name) - if err == nil { - return rd, nil - } - - if !errors.Is(err, store.ErrNotFound) { - return apiv1.ResourceDefinition{}, errors.Wrapf(err, "get resource definition %q", name) - } - - if attempt == cacheRetryAttempts-1 { - break - } - - select { - case <-ctx.Done(): - return apiv1.ResourceDefinition{}, errors.Wrap(ctx.Err(), "get resource definition: context cancelled") - case <-time.After(cacheRetryDelay): - } - } - - return apiv1.ResourceDefinition{}, errors.Wrapf(err, "get resource definition %q after %d retries", name, cacheRetryAttempts) + return getWithCacheRetry(ctx, fmt.Sprintf("get resource definition %q", name), + func(ctx context.Context) (apiv1.ResourceDefinition, error) { + return st.ResourceDefinitions().Get(ctx, name) + }) } // getSnapshotWithCacheRetry returns the Snapshot `snapName` on RD @@ -140,19 +137,129 @@ func getRDWithCacheRetry(ctx context.Context, st store.Store, name string) (apiv // cache that may not have observed the write yet — and a spurious // 404 fails the whole CreateVolume. func getSnapshotWithCacheRetry(ctx context.Context, st store.Store, rdName, snapName string) (apiv1.Snapshot, error) { + return getWithCacheRetry(ctx, fmt.Sprintf("get snapshot %s/%s", rdName, snapName), + func(ctx context.Context) (apiv1.Snapshot, error) { + return st.Snapshots().Get(ctx, rdName, snapName) + }) +} + +// getNodeWithCacheRetry returns the Node `name`, retrying on +// store.ErrNotFound to absorb informer-cache lag after a fresh +// write that landed on a sibling apiserver replica. Same semantics +// as getRGWithCacheRetry. +// +// Node registration (`POST /v1/nodes` — piraeus-operator's reconcile, +// satellite re-register on restart) is immediately followed by reads +// of the same node: `GET /v1/nodes/{node}` and `POST +// /v1/nodes/{node}/storage-pools` in the same reconcile pass. The +// register wrote straight to the apiserver; the follow-up read is +// served from a cache that may not have observed the write yet. +func getNodeWithCacheRetry(ctx context.Context, st store.Store, name string) (apiv1.Node, error) { + return getWithCacheRetry(ctx, fmt.Sprintf("get node %q", name), + func(ctx context.Context) (apiv1.Node, error) { + return st.Nodes().Get(ctx, name) + }) +} + +// getStoragePoolWithCacheRetry returns the StoragePool (node, pool), +// retrying on store.ErrNotFound to absorb informer-cache lag after a +// fresh write that landed on a sibling apiserver replica. Same +// semantics as getRGWithCacheRetry. +// +// `linstor sp c` / the CDP one-shot create the pool, and piraeus- +// operator / linstor-csi GetCapacity read it back immediately via +// `GET /v1/nodes/{node}/storage-pools/{pool}`. +func getStoragePoolWithCacheRetry(ctx context.Context, st store.Store, node, pool string) (apiv1.StoragePool, error) { + return getWithCacheRetry(ctx, fmt.Sprintf("get storage pool %s/%s", node, pool), + func(ctx context.Context) (apiv1.StoragePool, error) { + return st.StoragePools().Get(ctx, node, pool) + }) +} + +// getVDWithCacheRetry returns VolumeDefinition (rdName, vn), retrying +// on store.ErrNotFound to absorb informer-cache lag after a fresh +// write that landed on a sibling apiserver replica. Same semantics as +// getRGWithCacheRetry. +// +// VolumeDefinitions live inline on the parent RD CRD, so the lag has +// two faces: the parent RD itself not yet observed, or an old RD +// revision without the just-created VD. Both surface as ErrNotFound +// from the store and both are absorbed here. +func getVDWithCacheRetry(ctx context.Context, st store.Store, rdName string, vn int32) (apiv1.VolumeDefinition, error) { + return getWithCacheRetry(ctx, fmt.Sprintf("get volume definition %s/%d", rdName, vn), + func(ctx context.Context) (apiv1.VolumeDefinition, error) { + return st.VolumeDefinitions().Get(ctx, rdName, vn) + }) +} + +// getResourceWithCacheRetry returns the Resource (rdName, node), +// retrying on store.ErrNotFound to absorb informer-cache lag after a +// fresh write that landed on a sibling apiserver replica. Same +// semantics as getRGWithCacheRetry. +// +// Autoplace / spawn / make-available create replicas server-side and +// linstor-csi (ControllerPublishVolume) plus the CLI (`linstor r l +// `, `linstor v l`) read the replica back immediately via +// `GET /v1/resource-definitions/{rd}/resources/{node}` and the +// per-resource volumes endpoints. +func getResourceWithCacheRetry(ctx context.Context, st store.Store, rdName, node string) (apiv1.Resource, error) { + return getWithCacheRetry(ctx, fmt.Sprintf("get resource %s/%s", rdName, node), + func(ctx context.Context) (apiv1.Resource, error) { + return st.Resources().Get(ctx, rdName, node) + }) +} + +// patchNodeSpecWithCacheRetry runs PatchNodeSpec, retrying on +// store.ErrNotFound to absorb informer-cache lag. The only caller is +// the node-register upsert fall-through: `Nodes().Create` just came +// back with ErrAlreadyExists — an authoritative apiserver-side proof +// the Node exists — so an ErrNotFound from the patch helper's +// cache-served read is definitionally lag, not a missing object. +// Without the retry a satellite re-register racing its own original +// registration surfaces a spurious 404 into the registration loop. +func patchNodeSpecWithCacheRetry(ctx context.Context, st store.Store, name string, mutate func(*apiv1.Node) error) error { + _, err := getWithCacheRetry(ctx, fmt.Sprintf("patch node %q", name), + func(ctx context.Context) (struct{}, error) { + return struct{}{}, st.Nodes().PatchNodeSpec(ctx, name, mutate) + }) + + return err +} + +// pickCDPDevicesWithCacheRetry resolves the `linstor physical-storage +// create-device-pool` target set, retrying while the device list has +// no match for the requested paths. +// +// The satellite discovery loop creates PhysicalDevice CRDs (and +// stamps Status.DevicePath via the Status subresource) straight on +// the apiserver; the operator sees the device in `ps l` (possibly via +// a sibling replica or kubectl) and fires `ps cdp` immediately. The +// REST replica serving the POST reads PhysicalDevices().ListForNode +// through its informer cache, which may not have observed the device +// — or its Status update — yet, so the match comes back empty and the +// pre-fix handler 404'd a perfectly valid request +// (TestGroupB/PhysicalStorageCDPPropsPerKind regression). +// +// An empty match (no busy device) is retried under the standard +// 3×200 ms budget; a genuinely-bogus device path still surfaces the +// 404 after the budget. A busy device (Bug 89) returns immediately — +// the device IS observable, the cache cannot be trailing on it. +// Non-NotFound list errors return immediately. +func pickCDPDevicesWithCacheRetry(ctx context.Context, st store.Store, node string, paths []string) ([]apiv1.PhysicalDevice, *apiv1.PhysicalDevice, error) { var ( - snap apiv1.Snapshot - err error + targets []apiv1.PhysicalDevice + busy *apiv1.PhysicalDevice ) for attempt := range cacheRetryAttempts { - snap, err = st.Snapshots().Get(ctx, rdName, snapName) - if err == nil { - return snap, nil + devs, err := st.PhysicalDevices().ListForNode(ctx, node) + if err != nil { + return nil, nil, errors.Wrapf(err, "list physical devices on node %q", node) } - if !errors.Is(err, store.ErrNotFound) { - return apiv1.Snapshot{}, errors.Wrapf(err, "get snapshot %s/%s", rdName, snapName) + targets, busy = pickFreeDeviceForAttach(devs, paths) + if busy != nil || len(targets) > 0 { + return targets, busy, nil } if attempt == cacheRetryAttempts-1 { @@ -161,10 +268,10 @@ func getSnapshotWithCacheRetry(ctx context.Context, st store.Store, rdName, snap select { case <-ctx.Done(): - return apiv1.Snapshot{}, errors.Wrap(ctx.Err(), "get snapshot: context cancelled") + return nil, nil, errors.Wrapf(ctx.Err(), "list physical devices on node %q: context cancelled", node) case <-time.After(cacheRetryDelay): } } - return apiv1.Snapshot{}, errors.Wrapf(err, "get snapshot %s/%s after %d retries", rdName, snapName, cacheRetryAttempts) + return nil, nil, nil } diff --git a/pkg/rest/cache_retry_test.go b/pkg/rest/cache_retry_test.go index 65ead14f..cc263fd5 100644 --- a/pkg/rest/cache_retry_test.go +++ b/pkg/rest/cache_retry_test.go @@ -129,14 +129,89 @@ func (f *lagSnapshotStore) Get(ctx context.Context, rdName, snapName string) (ap return f.SnapshotStore.Get(ctx, rdName, snapName) //nolint:wrapcheck // pass-through } -// flakyStore lets us substitute the RG / RD / Snapshot views with -// flaky ones while everything else keeps using the wrapped InMemory. +// lagPhysicalDeviceStore wraps an underlying PhysicalDeviceStore and +// models the CDP-side informer-cache lag: the first `missBudget` +// ListForNode() calls hide the named device from the result (the +// cache has not observed the discovery loop's write yet), then reads +// delegate to the real store (the watch event arrived). This is the +// exact shape TestGroupB/PhysicalStorageCDPPropsPerKind trips over: +// the PhysicalDevice CRD + Status land straight on the apiserver, +// then `POST /v1/physical-storage/{node}` lists devices through a +// cache that may still trail. +type lagPhysicalDeviceStore struct { + store.PhysicalDeviceStore + + hideName string + missBudget int + + mu sync.Mutex + calls int +} + +func (f *lagPhysicalDeviceStore) ListForNode(ctx context.Context, nodeName string) ([]apiv1.PhysicalDevice, error) { + devs, err := f.PhysicalDeviceStore.ListForNode(ctx, nodeName) + if err != nil { + return nil, err //nolint:wrapcheck // pass-through + } + + f.mu.Lock() + defer f.mu.Unlock() + + if f.calls >= f.missBudget { + return devs, nil + } + + f.calls++ + + out := make([]apiv1.PhysicalDevice, 0, len(devs)) + + for i := range devs { + if devs[i].Name == f.hideName { + continue + } + + out = append(out, devs[i]) + } + + return out, nil +} + +// lagNodePatchStore wraps an underlying NodeStore and returns +// store.ErrNotFound from the first `missBudget` PatchNodeSpec() +// calls on the configured name. Models the re-register fall-through: +// Nodes().Create just returned ErrAlreadyExists (apiserver-side, +// authoritative), but the patch helper's read is served by an +// informer cache that has not observed the original registration yet. +type lagNodePatchStore struct { + store.NodeStore + + target string + notFoundUntil int + calls atomic.Int32 +} + +func (f *lagNodePatchStore) PatchNodeSpec(ctx context.Context, name string, mutate func(*apiv1.Node) error) error { + if name == f.target { + n := f.calls.Add(1) + if int(n) <= f.notFoundUntil { + return errors.Wrapf(store.ErrNotFound, "node %q", name) + } + } + + return f.NodeStore.PatchNodeSpec(ctx, name, mutate) //nolint:wrapcheck // pass-through +} + +// flakyStore lets us substitute the RG / RD / Snapshot / Node / +// PhysicalDevice views with flaky ones while everything else keeps +// using the wrapped InMemory. type flakyStore struct { store.Store rgs *flakyRGStore rds *flakyRDStore snaps *lagSnapshotStore + pds *lagPhysicalDeviceStore + nodes *lagNodePatchStore } func (f *flakyStore) ResourceGroups() store.ResourceGroupStore { @@ -163,6 +238,22 @@ func (f *flakyStore) Snapshots() store.SnapshotStore { return f.snaps } +func (f *flakyStore) PhysicalDevices() store.PhysicalDeviceStore { + if f.pds == nil { + return f.Store.PhysicalDevices() + } + + return f.pds +} + +func (f *flakyStore) Nodes() store.NodeStore { + if f.nodes == nil { + return f.Store.Nodes() + } + + return f.nodes +} + func TestGetRGWithCacheRetry_SucceedsAfterCacheMiss(t *testing.T) { t.Parallel() @@ -485,3 +576,169 @@ func TestSnapshotRestore_SurvivesCacheMissAfterCreate(t *testing.T) { restoreResp.StatusCode) } } + +// TestPhysicalStorageCDP_SurvivesCacheMissOnDeviceList pins the +// TestGroupB/PhysicalStorageCDPPropsPerKind regression on the wire: +// the satellite discovery loop writes the PhysicalDevice CRD (and its +// Status.DevicePath) straight to the apiserver; the operator's +// `linstor ps cdp` lands moments later on a REST replica whose +// informer cache has not observed the device yet. The CDP handler +// must absorb the lag instead of 404-ing the create-device-pool +// (`status: got 404, want 202`). +func TestPhysicalStorageCDP_SurvivesCacheMissOnDeviceList(t *testing.T) { + t.Parallel() + + st := store.NewInMemory() + + if err := st.PhysicalDevices().Create(t.Context(), &apiv1.PhysicalDevice{ + Name: "n1-cdp-lag", + NodeName: "n1", + DevicePath: "/dev/disk/by-id/scsi-cdp-lag", + Phase: "Available", + }); err != nil { + t.Fatalf("seed PhysicalDevice: %v", err) + } + + flaky := &flakyStore{ + Store: st, + pds: &lagPhysicalDeviceStore{ + PhysicalDeviceStore: st.PhysicalDevices(), + hideName: "n1-cdp-lag", + missBudget: 1, // first list → device invisible, then real + }, + } + + base, stop := startServerWithStore(t, flaky) + defer stop() + + resp := httpPost(t, base+"/v1/physical-storage/n1", []byte(`{ + "provider_kind": "FILE_THIN", + "pool_name": "/var/lib/blockstor/cdp-filethin", + "device_paths": ["/dev/disk/by-id/scsi-cdp-lag"], + "with_storage_pool": {"name": "cdp-filethin"} + }`)) + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusAccepted { + t.Fatalf("cdp under cache-miss status: got %d, want 202", resp.StatusCode) + } + + // The attach must have landed on the real device, not been + // silently dropped. + got, err := st.PhysicalDevices().Get(t.Context(), "n1-cdp-lag") + if err != nil { + t.Fatalf("get device: %v", err) + } + + if got.AttachTo == nil || got.AttachTo.StoragePoolName != "cdp-filethin" { + t.Fatalf("AttachTo after cdp: got %+v, want StoragePoolName=cdp-filethin", got.AttachTo) + } +} + +// TestPickCDPDevicesWithCacheRetry_RealNoMatchStillEmpty pins the +// other half of the contract: a genuinely-bogus device path keeps +// the 404 behaviour, surfacing only after the retry budget elapsed. +func TestPickCDPDevicesWithCacheRetry_RealNoMatchStillEmpty(t *testing.T) { + t.Parallel() + + st := store.NewInMemory() + + start := time.Now() + + targets, busy, err := pickCDPDevicesWithCacheRetry(t.Context(), st, "n1", []string{"/dev/never-there"}) + if err != nil { + t.Fatalf("pickCDPDevicesWithCacheRetry: %v", err) + } + + if len(targets) != 0 || busy != nil { + t.Fatalf("expected no match, got targets=%v busy=%v", targets, busy) + } + + minWait := time.Duration(cacheRetryAttempts-1) * cacheRetryDelay + if elapsed := time.Since(start); elapsed < minWait { + t.Fatalf("retry loop returned in %s, expected at least %s", elapsed, minWait) + } +} + +// TestPickCDPDevicesWithCacheRetry_BusyDeviceReturnsImmediately pins +// the Bug 89 interplay: a busy device IS observable in the cache, so +// the 409 must not pay the retry budget. +func TestPickCDPDevicesWithCacheRetry_BusyDeviceReturnsImmediately(t *testing.T) { + t.Parallel() + + st := store.NewInMemory() + + free := false + if err := st.PhysicalDevices().Create(t.Context(), &apiv1.PhysicalDevice{ + Name: "n1-busy", + NodeName: "n1", + DevicePath: "/dev/disk/by-id/scsi-busy", + Phase: "Available", + Free: &free, + }); err != nil { + t.Fatalf("seed PhysicalDevice: %v", err) + } + + start := time.Now() + + targets, busy, err := pickCDPDevicesWithCacheRetry(t.Context(), st, "n1", []string{"/dev/disk/by-id/scsi-busy"}) + if err != nil { + t.Fatalf("pickCDPDevicesWithCacheRetry: %v", err) + } + + if busy == nil || len(targets) != 0 { + t.Fatalf("expected busy device, got targets=%v busy=%v", targets, busy) + } + + if elapsed := time.Since(start); elapsed >= cacheRetryDelay { + t.Fatalf("busy short-circuit took %s, expected < one cacheRetryDelay (%s)", elapsed, cacheRetryDelay) + } +} + +// TestNodeReRegister_SurvivesCacheMissOnPatch pins the node +// re-register fall-through: the second `POST /v1/nodes` gets +// ErrAlreadyExists from Create (authoritative proof the Node exists), +// but the PatchNodeSpec read is served by a cache that has not +// observed the original registration yet. The registration loop must +// not see a spurious 404. +func TestNodeReRegister_SurvivesCacheMissOnPatch(t *testing.T) { + t.Parallel() + + st := store.NewInMemory() + + flaky := &flakyStore{ + Store: st, + nodes: &lagNodePatchStore{ + NodeStore: st.Nodes(), + target: "n1", + notFoundUntil: 1, // first patch → NotFound, then real + }, + } + + base, stop := startServerWithStore(t, flaky) + defer stop() + + body, err := json.Marshal(apiv1.Node{Name: "n1", Type: apiv1.NodeTypeSatellite}) + if err != nil { + t.Fatalf("marshal node: %v", err) + } + + first := httpPost(t, base+"/v1/nodes", body) + _ = first.Body.Close() + + if first.StatusCode != http.StatusCreated { + t.Fatalf("first register status: got %d, want 201", first.StatusCode) + } + + second := httpPost(t, base+"/v1/nodes", body) + defer func() { _ = second.Body.Close() }() + + if second.StatusCode != http.StatusCreated { + t.Fatalf("re-register under cache-miss status: got %d, want 201 (idempotent upsert; cache-lag 404 must be absorbed)", + second.StatusCode) + } + + if got := flaky.nodes.calls.Load(); got < 2 { + t.Fatalf("expected at least 2 PatchNodeSpec attempts (NotFound, then hit), got %d", got) + } +} diff --git a/pkg/rest/nodes.go b/pkg/rest/nodes.go index 8a153926..a93b96e3 100644 --- a/pkg/rest/nodes.go +++ b/pkg/rest/nodes.go @@ -421,7 +421,11 @@ func (s *Server) handleNodesList(w http.ResponseWriter, r *http.Request) { func (s *Server) handleNodeGet(w http.ResponseWriter, r *http.Request) { name := r.PathValue("node") - n, err := s.Store.Nodes().Get(r.Context(), name) + // Node-register hot path: `POST /v1/nodes` is immediately followed + // by this GET (piraeus-operator reconcile, linstor-wait-node-online) + // while the local informer cache may still trail the write — see + // pkg/rest/cache_retry.go. + n, err := getNodeWithCacheRetry(r.Context(), s.Store, name) if err != nil { writeStoreError(w, err) @@ -578,7 +582,15 @@ func (s *Server) upsertNodeAndDiskless(w http.ResponseWriter, r *http.Request, n // landing between the AlreadyExists probe and the persist // re-applies the wire snapshot under RetryOnConflict // instead of leaking a 409 into the registration loop. - err = s.Store.Nodes().PatchNodeSpec(r.Context(), n.Name, func(live *apiv1.Node) error { + // + // Cache-lag guard: Create just returned ErrAlreadyExists — + // authoritative apiserver-side proof the Node exists — but + // PatchNodeSpec's read is served from the informer cache, + // which may not have observed the original registration yet. + // Retry the NotFound under the standard budget so a fast + // re-register doesn't leak a spurious 404 into the + // registration loop — see pkg/rest/cache_retry.go. + err = patchNodeSpecWithCacheRetry(r.Context(), s.Store, n.Name, func(live *apiv1.Node) error { *live = *n return nil diff --git a/pkg/rest/physical_storage.go b/pkg/rest/physical_storage.go index faf24546..126b856d 100644 --- a/pkg/rest/physical_storage.go +++ b/pkg/rest/physical_storage.go @@ -159,14 +159,19 @@ func (s *Server) handlePhysicalStorageCreate(w http.ResponseWriter, r *http.Requ return } - devs, err := s.Store.PhysicalDevices().ListForNode(r.Context(), node) + // The satellite discovery loop writes PhysicalDevice CRDs (and + // their Status.DevicePath) straight to the apiserver; this + // handler's list is served from the informer cache, which may not + // have observed the just-discovered device yet. Retry the match + // under the standard cache-lag budget instead of 404-ing a valid + // `ps cdp` — see pkg/rest/cache_retry.go. + targets, busy, err := pickCDPDevicesWithCacheRetry(r.Context(), s.Store, node, req.DevicePaths) if err != nil { writeStoreError(w, err) return } - targets, busy := pickFreeDeviceForAttach(devs, req.DevicePaths) if busy != nil { writePhysicalStorageBusyDevice(w, node, busy) diff --git a/pkg/rest/query_size_info.go b/pkg/rest/query_size_info.go index c41d8bc3..49e90f2f 100644 --- a/pkg/rest/query_size_info.go +++ b/pkg/rest/query_size_info.go @@ -49,7 +49,11 @@ func (s *Server) registerQuerySizeInfo(mux *http.ServeMux) { func (s *Server) handleQuerySizeInfo(w http.ResponseWriter, r *http.Request) { rgName := r.PathValue("rg") - rg, err := s.Store.ResourceGroups().Get(r.Context(), rgName) + // CreateVolume hot path: linstor-csi runs the capacity preflight + // right after ensuring the StorageClass's RG exists; the RG read + // may be served from a cache that still trails the RG create — + // see pkg/rest/cache_retry.go. + rg, err := getRGWithCacheRetry(r.Context(), s.Store, rgName) if err != nil { writeStoreError(w, err) diff --git a/pkg/rest/resource_definitions.go b/pkg/rest/resource_definitions.go index b57f4750..0ff2931f 100644 --- a/pkg/rest/resource_definitions.go +++ b/pkg/rest/resource_definitions.go @@ -366,7 +366,14 @@ func (s *Server) refuseRDCreateOnRGDeletedRace(w http.ResponseWriter, r *http.Re return true } - _, err := s.Store.ResourceGroups().Get(r.Context(), rd.ResourceGroupName) + // CreateVolume hot path: the RG may have been created moments ago + // (first volume of a fresh StorageClass) and the local informer + // cache may not have observed it yet. Without the retry this + // post-write re-check would mistake cache lag for the Bug 174 + // delete race, ROLL BACK the just-persisted RD, and fail the whole + // CreateVolume — see pkg/rest/cache_retry.go. A real concurrent + // `rg d` still trips the rollback after the budget. + _, err := getRGWithCacheRetry(r.Context(), s.Store, rd.ResourceGroupName) if err == nil { return true } @@ -466,7 +473,13 @@ func (s *Server) refuseRDCreateOnUnknownRG(w http.ResponseWriter, r *http.Reques return true } - _, err := s.Store.ResourceGroups().Get(r.Context(), rd.ResourceGroupName) + // CreateVolume hot path: linstor-csi ensures the StorageClass's RG + // (POST /v1/resource-groups) and POSTs the RD referencing it + // back-to-back; the local informer cache may not have observed the + // RG write yet. Retry the NotFound under the standard budget so + // the Bug 134 gate doesn't refuse a perfectly valid create — see + // pkg/rest/cache_retry.go. A real typo still 404s after the budget. + _, err := getRGWithCacheRetry(r.Context(), s.Store, rd.ResourceGroupName) if err == nil { return true } diff --git a/pkg/rest/snapshots.go b/pkg/rest/snapshots.go index f2ae91fc..0cce3a55 100644 --- a/pkg/rest/snapshots.go +++ b/pkg/rest/snapshots.go @@ -222,7 +222,11 @@ func (s *Server) handleSnapshotList(w http.ResponseWriter, r *http.Request) { rd := r.PathValue("rd") // Verify the parent RD exists so missing RD is 404, not []. - _, err := s.Store.ResourceDefinitions().Get(r.Context(), rd) + // + // CreateSnapshot hot path: linstor-csi lists snapshots immediately + // after the create while the local informer cache may still trail + // the RD / snapshot writes — see pkg/rest/cache_retry.go. + _, err := getRDWithCacheRetry(r.Context(), s.Store, rd) if err != nil { writeStoreError(w, err) @@ -905,9 +909,13 @@ func makeSnapshotPerNode(name string, nodes []string, vds []apiv1.SnapshotVolume // of the parent RD's props) — both are surfaced via the wire // DTO so CLI consumers don't need a second round-trip. func (s *Server) hydrateSnapshotFromRD(ctx context.Context, snap *apiv1.Snapshot, rd string) error { - srcRD, err := s.Store.ResourceDefinitions().Get(ctx, rd) + // CreateSnapshot hot path: csi-sanity (and impatient operators) + // snapshot a volume right after CreateVolume returns; the source-RD + // read may be served from a cache that still trails the RD write — + // see pkg/rest/cache_retry.go. + srcRD, err := getRDWithCacheRetry(ctx, s.Store, rd) if err != nil { - return err //nolint:wrapcheck // surfaced via writeStoreError + return err } vds, err := s.Store.VolumeDefinitions().List(ctx, rd) diff --git a/pkg/rest/storage_pools.go b/pkg/rest/storage_pools.go index 2d34179d..6ef3bc1a 100644 --- a/pkg/rest/storage_pools.go +++ b/pkg/rest/storage_pools.go @@ -512,7 +512,11 @@ func (s *Server) handleNodeStoragePoolGet(w http.ResponseWriter, r *http.Request node := r.PathValue("node") pool := r.PathValue("pool") - sp, err := s.Store.StoragePools().Get(r.Context(), node, pool) + // Pool-create hot path: `linstor sp c` / the CDP one-shot create + // the pool and piraeus-operator / GetCapacity read it back + // immediately, while the local informer cache may still trail the + // write — see pkg/rest/cache_retry.go. + sp, err := getStoragePoolWithCacheRetry(r.Context(), s.Store, node, pool) if err != nil { writeStoreError(w, err) @@ -592,7 +596,13 @@ func (s *Server) handleNodeStoragePoolCreate(w http.ResponseWriter, r *http.Requ // (node, pool) and doesn't FK to NodeStore) and the satellite // would learn about a pool on a node the controller doesn't // know — a permanent orphan from the controller's POV. - nodeObj, nodeErr := s.Store.Nodes().Get(r.Context(), node) + // + // Node-register hot path: piraeus-operator registers the node and + // creates its pools in the same reconcile pass; the local informer + // cache may still trail the registration write, so retry the + // NotFound under the standard budget before refusing — see + // pkg/rest/cache_retry.go. + nodeObj, nodeErr := getNodeWithCacheRetry(r.Context(), s.Store, node) if nodeErr != nil { if errors.Is(nodeErr, store.ErrNotFound) { writeError(w, http.StatusNotFound, "node not found: "+node) diff --git a/pkg/rest/volume_definitions.go b/pkg/rest/volume_definitions.go index d4624f53..0d10be23 100644 --- a/pkg/rest/volume_definitions.go +++ b/pkg/rest/volume_definitions.go @@ -192,7 +192,11 @@ func (s *Server) handleVDList(w http.ResponseWriter, r *http.Request) { // Verify the parent RD exists so a missing RD is 404, not 200 with []. // k8s store does this internally; in-memory does not, so we do it here. - _, err := s.Store.ResourceDefinitions().Get(r.Context(), rd) + // + // CreateVolume hot path: `vd l` / golinstor VD reads land right + // after the RD create while the local informer cache may still + // trail the write — see pkg/rest/cache_retry.go. + _, err := getRDWithCacheRetry(r.Context(), s.Store, rd) if err != nil { writeStoreError(w, err) @@ -240,7 +244,12 @@ func (s *Server) handleVDGet(w http.ResponseWriter, r *http.Request) { return } - vd, err := s.Store.VolumeDefinitions().Get(r.Context(), rd, vn) + // VD-create hot path: VolumeDefinitions live inline on the parent + // RD, so a `vd c` immediately followed by this GET can be served + // from a cache that still holds the pre-create RD revision — + // retry the NotFound under the standard budget. See + // pkg/rest/cache_retry.go. + vd, err := getVDWithCacheRetry(r.Context(), s.Store, rd, vn) if err != nil { writeStoreError(w, err) @@ -272,6 +281,20 @@ func (s *Server) handleVDGet(w http.ResponseWriter, r *http.Request) { func (s *Server) handleVDCreate(w http.ResponseWriter, r *http.Request) { rd := r.PathValue("rd") + // CreateVolume hot path: linstor-csi POSTs the RD and this VD + // create back-to-back (sub-ms gap); the auto-assign walk and the + // store-level Create both read the parent RD through the informer + // cache, which may not have observed the RD write yet. Probe with + // the standard retry budget so the whole CreateVolume doesn't fail + // on a spurious "resource definition not found" — see + // pkg/rest/cache_retry.go. + _, err := getRDWithCacheRetry(r.Context(), s.Store, rd) + if err != nil { + writeStoreError(w, err) + + return + } + rawBody, err := io.ReadAll(r.Body) if err != nil { writeDecodeError(w, err) diff --git a/pkg/rest/volumes_per_resource.go b/pkg/rest/volumes_per_resource.go index d4390aaf..aaa43ac0 100644 --- a/pkg/rest/volumes_per_resource.go +++ b/pkg/rest/volumes_per_resource.go @@ -63,7 +63,11 @@ func (s *Server) handleVolumesPerResourceList(w http.ResponseWriter, r *http.Req rdName := r.PathValue("rd") node := r.PathValue("node") - res, err := s.Store.Resources().Get(r.Context(), rdName, node) + // Replica-create hot path: `linstor v l` / golinstor volume reads + // land right after autoplace / make-available wrote the replica, + // while the local informer cache may still trail the write — see + // pkg/rest/cache_retry.go. + res, err := getResourceWithCacheRetry(r.Context(), s.Store, rdName, node) if err != nil { writeStoreError(w, err) @@ -95,7 +99,8 @@ func (s *Server) handleVolumesPerResourceGet(w http.ResponseWriter, r *http.Requ return } - res, err := s.Store.Resources().Get(r.Context(), rdName, node) + // Same replica-create hot path as the list sibling above. + res, err := getResourceWithCacheRetry(r.Context(), s.Store, rdName, node) if err != nil { writeStoreError(w, err)