Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion lib/instances/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/kernel/hypeman/lib/resources"
"github.com/kernel/hypeman/lib/system"
"github.com/kernel/hypeman/lib/volumes"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -293,6 +294,10 @@ func (m *manager) maybePersistExitInfo(ctx context.Context, id string) {

// maybePersistBootMarkers persists boot markers to metadata under lock.
func (m *manager) maybePersistBootMarkers(ctx context.Context, id string) {
ctx, span := m.tracerOrDefault().Start(ctx, "instances.persist_boot_markers",
traceWithInstanceID(id),
)
defer span.End()
lock := m.getInstanceLock(id)
lock.Lock()
defer lock.Unlock()
Expand Down Expand Up @@ -526,6 +531,8 @@ func (m *manager) UpdateInstance(ctx context.Context, id string, req UpdateInsta
// ListInstances returns instances, optionally filtered by the given criteria.
// Pass nil to return all instances.
func (m *manager) ListInstances(ctx context.Context, filter *ListInstancesFilter) ([]Instance, error) {
ctx, span := m.tracerOrDefault().Start(ctx, "instances.list")
defer span.End()
// No lock - eventual consistency is acceptable for list operations.
// State is derived dynamically, so list is always reasonably current.
all, err := m.listInstances(ctx)
Expand All @@ -542,13 +549,19 @@ func (m *manager) ListInstances(ctx context.Context, filter *ListInstancesFilter
}
result = filtered
}
span.SetAttributes(attribute.Int("instances", len(result)))

persistCtx, persistSpan := m.tracerOrDefault().Start(ctx, "instances.list.persist_boot_markers")
persisted := 0
for i := range result {
inst := result[i]
if (inst.State == StateRunning || inst.State == StateInitializing) && inst.BootMarkersHydrated {
m.maybePersistBootMarkers(ctx, inst.Id)
m.maybePersistBootMarkers(persistCtx, inst.Id)
persisted++
}
}
persistSpan.SetAttributes(attribute.Int("persisted", persisted))
persistSpan.End()

return result, nil
}
Expand Down
39 changes: 32 additions & 7 deletions lib/instances/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/kernel/hypeman/lib/hypervisor"
"github.com/kernel/hypeman/lib/logger"
"go.opentelemetry.io/otel/attribute"
)

// exitSentinelPrefix is the machine-parseable prefix written by init to serial console.
Expand Down Expand Up @@ -46,6 +47,10 @@ func (m *manager) deriveStateWithoutHydration(ctx context.Context, stored *Store
}

func (m *manager) deriveStateWithOptions(ctx context.Context, stored *StoredMetadata, hydrateBootMarkers bool) stateResult {
ctx, span := m.tracerOrDefault().Start(ctx, "instances.derive_state",
traceWithInstanceID(stored.Id),
)
defer span.End()
log := logger.FromContext(ctx)

// 1. Check if socket exists
Expand Down Expand Up @@ -89,7 +94,7 @@ func (m *manager) deriveStateWithOptions(ctx context.Context, stored *StoredMeta
case hypervisor.StateRunning:
hydrated := false
if hydrateBootMarkers {
hydrated = m.hydrateBootMarkersFromLogs(stored)
hydrated = m.hydrateBootMarkersFromLogs(ctx, stored)
}
return stateResult{
State: deriveRunningState(stored),
Expand Down Expand Up @@ -125,7 +130,7 @@ func deriveRunningState(stored *StoredMetadata) State {

// hydrateBootMarkersFromLogs fills missing boot markers from serial logs.
// Returns true when at least one missing marker was found and populated.
func (m *manager) hydrateBootMarkersFromLogs(stored *StoredMetadata) bool {
func (m *manager) hydrateBootMarkersFromLogs(ctx context.Context, stored *StoredMetadata) bool {
needProgram := stored.ProgramStartedAt == nil
needAgent := !stored.SkipGuestAgent && stored.GuestAgentReadyAt == nil
if !needProgram && !needAgent {
Expand All @@ -136,7 +141,12 @@ func (m *manager) hydrateBootMarkersFromLogs(stored *StoredMetadata) bool {
return false
}

programStartedAt, guestAgentReadyAt := m.parseBootMarkers(stored.Id, needProgram, needAgent, stored.StartedAt)
ctx, span := m.tracerOrDefault().Start(ctx, "instances.hydrate_boot_markers",
traceWithInstanceID(stored.Id),
)
defer span.End()

programStartedAt, guestAgentReadyAt := m.parseBootMarkers(ctx, stored.Id, needProgram, needAgent, stored.StartedAt)
hydrated := false
if needProgram && programStartedAt != nil {
stored.ProgramStartedAt = programStartedAt
Expand All @@ -157,8 +167,14 @@ func (m *manager) hydrateBootMarkersFromLogs(stored *StoredMetadata) bool {
// parseBootMarkers scans app logs (including rotated files) and returns the
// newest observed program-start and guest-agent-ready marker timestamps.
// When startedAt is provided, files last modified before this boot start are ignored.
func (m *manager) parseBootMarkers(id string, needProgram bool, needAgent bool, startedAt *time.Time) (*time.Time, *time.Time) {
func (m *manager) parseBootMarkers(ctx context.Context, id string, needProgram bool, needAgent bool, startedAt *time.Time) (*time.Time, *time.Time) {
_, span := m.tracerOrDefault().Start(ctx, "instances.parse_boot_markers",
traceWithInstanceID(id),
)
defer span.End()

logPaths := m.appLogPathsForMarkerScan(id)
span.SetAttributes(attribute.Int("log_paths", len(logPaths)))

var programStartedAt *time.Time
var guestAgentReadyAt *time.Time
Expand Down Expand Up @@ -480,7 +496,7 @@ func (m *manager) persistBootMarkers(ctx context.Context, id string) {
return
}

programStartedAt, guestAgentReadyAt := m.parseBootMarkers(id, needProgram, needAgent, meta.StartedAt)
programStartedAt, guestAgentReadyAt := m.parseBootMarkers(ctx, id, needProgram, needAgent, meta.StartedAt)
updated := false
if needProgram && programStartedAt != nil {
meta.ProgramStartedAt = programStartedAt
Expand Down Expand Up @@ -619,6 +635,8 @@ func parseSentinelTimestamp(line, sentinelPrefix string) (time.Time, bool) {

// listInstances returns all instances
func (m *manager) listInstances(ctx context.Context) ([]Instance, error) {
ctx, span := m.tracerOrDefault().Start(ctx, "instances.list_metadata")
defer span.End()
log := logger.FromContext(ctx)
log.DebugContext(ctx, "listing all instances")

Expand All @@ -627,25 +645,32 @@ func (m *manager) listInstances(ctx context.Context) ([]Instance, error) {
log.ErrorContext(ctx, "failed to list metadata files", "error", err)
return nil, err
}
span.SetAttributes(attribute.Int("metadata_files", len(files)))

result := make([]Instance, 0, len(files))
for _, file := range files {
// Extract instance ID from path
// Path format: {dataDir}/guests/{id}/metadata.json
id := filepath.Base(filepath.Dir(file))

hydrateCtx, hydrateSpan := m.tracerOrDefault().Start(ctx, "instances.list_metadata.hydrate_one",
traceWithInstanceID(id),
)
meta, err := m.loadMetadata(id)
if err != nil {
// Skip instances with invalid metadata
log.WarnContext(ctx, "skipping instance with invalid metadata", "instance_id", id, "error", err)
log.WarnContext(hydrateCtx, "skipping instance with invalid metadata", "instance_id", id, "error", err)
hydrateSpan.End()
continue
}

inst := m.toInstance(ctx, meta)
inst := m.toInstance(hydrateCtx, meta)
result = append(result, inst)
hydrateSpan.End()
}

log.DebugContext(ctx, "listed instances", "count", len(result))
span.SetAttributes(attribute.Int("instances", len(result)))
return result, nil
}

Expand Down
10 changes: 5 additions & 5 deletions lib/instances/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestHydrateBootMarkersFromLogs_RescanThrottle(t *testing.T) {
}

// First call finds nothing and schedules a deferred rescan.
hydrated := m.hydrateBootMarkersFromLogs(meta)
hydrated := m.hydrateBootMarkersFromLogs(t.Context(), meta)
require.False(t, hydrated)
require.Nil(t, meta.ProgramStartedAt)
require.Nil(t, meta.GuestAgentReadyAt)
Expand All @@ -200,14 +200,14 @@ func TestHydrateBootMarkersFromLogs_RescanThrottle(t *testing.T) {
require.NoError(t, err)

// Immediate second call should be throttled and skip scanning.
hydrated = m.hydrateBootMarkersFromLogs(meta)
hydrated = m.hydrateBootMarkersFromLogs(t.Context(), meta)
require.False(t, hydrated)
require.Nil(t, meta.ProgramStartedAt)
require.Nil(t, meta.GuestAgentReadyAt)

// Once the rescan interval has elapsed, markers are hydrated.
now = now.Add(bootMarkerRescanInterval + time.Millisecond)
hydrated = m.hydrateBootMarkersFromLogs(meta)
hydrated = m.hydrateBootMarkersFromLogs(t.Context(), meta)
require.True(t, hydrated)
require.NotNil(t, meta.ProgramStartedAt)
require.NotNil(t, meta.GuestAgentReadyAt)
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestParseBootMarkers_IgnoresStaleMarkersBeforeBootStart(t *testing.T) {
require.NoError(t, os.WriteFile(logPath, []byte(freshData), 0o644))
require.NoError(t, os.Chtimes(logPath, bootStart.Add(time.Second), bootStart.Add(time.Second)))

programStartedAt, guestAgentReadyAt := m.parseBootMarkers(id, true, true, &bootStart)
programStartedAt, guestAgentReadyAt := m.parseBootMarkers(t.Context(), id, true, true, &bootStart)
require.NotNil(t, programStartedAt)
require.NotNil(t, guestAgentReadyAt)
assert.Equal(t, freshProgram.Format(time.RFC3339Nano), programStartedAt.UTC().Format(time.RFC3339Nano))
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestParseBootMarkers_ReturnsLatestMarkerFromNewestLog(t *testing.T) {
"HYPEMAN-PROGRAM-START ts="+newProgramLatest.Format(time.RFC3339Nano)+" mode=exec\n",
), 0o644))

programStartedAt, guestAgentReadyAt := m.parseBootMarkers(id, true, true, nil)
programStartedAt, guestAgentReadyAt := m.parseBootMarkers(t.Context(), id, true, true, nil)
require.NotNil(t, programStartedAt)
require.NotNil(t, guestAgentReadyAt)
assert.Equal(t, newProgramLatest.Format(time.RFC3339Nano), programStartedAt.UTC().Format(time.RFC3339Nano))
Expand Down
4 changes: 4 additions & 0 deletions lib/instances/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func startInstancesSpan(ctx context.Context, tracer trace.Tracer, name string, a
return tracer.Start(ctx, name, trace.WithAttributes(attrs...))
}

func traceWithInstanceID(id string) trace.SpanStartOption {
return trace.WithAttributes(attribute.String("instance_id", id))
}

func propagatedTraceAttributes(attrs ...attribute.KeyValue) []attribute.KeyValue {
out := make([]attribute.KeyValue, 0, len(attrs))
for _, attr := range attrs {
Expand Down
Loading