diff --git a/server/cmd/api/api/display.go b/server/cmd/api/api/display.go index 534f2a01..20fe337c 100644 --- a/server/cmd/api/api/display.go +++ b/server/cmd/api/api/display.go @@ -4,14 +4,18 @@ import ( "context" "encoding/base64" "fmt" + "log/slog" "os" "os/exec" + "path/filepath" "strconv" "strings" + "time" nekooapi "github.com/m1k1o/neko/server/lib/oapi" "github.com/onkernel/kernel-images/server/lib/logger" oapi "github.com/onkernel/kernel-images/server/lib/oapi" + "github.com/onkernel/kernel-images/server/lib/recorder" ) // PatchDisplay updates the display configuration. When require_idle @@ -79,6 +83,26 @@ func (s *ApiService) PatchDisplay(ctx context.Context, req oapi.PatchDisplayRequ } } + // Gracefully stop active recordings so the resize can proceed. + // Recordings are always restarted (via defer) regardless of whether the + // resize succeeds — losing recording data is worse than a brief gap. If + // the resize fails the display is still at the old resolution, so + // restarting at the "old" resolution is correct. + stopped, stopErr := s.stopActiveRecordings(ctx) + if stopErr != nil { + log.Error("failed to stop recordings for resize", "error", stopErr) + return oapi.PatchDisplay500JSONResponse{ + InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{ + Message: fmt.Sprintf("failed to stop recordings for resize: %s", stopErr), + }, + }, nil + } + if len(stopped) > 0 { + defer func() { + go s.startNewRecordingSegments(context.WithoutCancel(ctx), stopped) + }() + } + // Detect display mode (xorg or xvfb) displayMode := s.detectDisplayMode(ctx) @@ -361,6 +385,129 @@ func (s *ApiService) getCurrentResolution(ctx context.Context) (int, int, int, e return width, height, refreshRate, nil } +// stoppedRecordingInfo holds state captured from a recording that was stopped +// so it can be restarted after a display resize. +type stoppedRecordingInfo struct { + id string + params recorder.FFmpegRecordingParams + metadata *recorder.RecordingMetadata +} + +// stopActiveRecordings gracefully stops every recording that is currently in +// progress. The old recorders remain registered in the manager so their +// finalized files stay discoverable and downloadable. It returns info needed +// to start a new recording segment for each stopped recorder. +func (s *ApiService) stopActiveRecordings(ctx context.Context) ([]stoppedRecordingInfo, error) { + log := logger.FromContext(ctx) + var stopped []stoppedRecordingInfo + + for _, rec := range s.recordManager.ListActiveRecorders(ctx) { + if !rec.IsRecording(ctx) { + continue + } + + id := rec.ID() + + ffmpegRec, ok := rec.(*recorder.FFmpegRecorder) + if !ok { + log.Warn("cannot capture params from non-FFmpeg recorder, skipping", "id", id) + continue + } + + params := ffmpegRec.Params() + + log.Info("stopping recording for resize", "id", id) + if err := rec.Stop(ctx); err != nil { + // Stop() returns finalization errors even when the process was + // successfully terminated. Only treat it as a hard failure if + // the process is still running. + if rec.IsRecording(ctx) { + log.Error("failed to stop recording for resize", "id", id, "error", err) + return stopped, fmt.Errorf("failed to stop recording %s: %w", id, err) + } + log.Warn("recording stopped with finalization warning", "id", id, "error", err) + } + + stopped = append(stopped, stoppedRecordingInfo{ + id: id, + params: params, + metadata: rec.Metadata(), + }) + log.Info("recording stopped for resize, old segment preserved", "id", id) + } + + return stopped, nil +} + +// adjustParamsForRemainingBudget reduces MaxDurationInSeconds and MaxSizeInMB +// in the cloned params to reflect what the previous segment already consumed. +// This keeps cumulative duration and disk usage within the originally requested limits. +func adjustParamsForRemainingBudget(log *slog.Logger, info stoppedRecordingInfo) recorder.FFmpegRecordingParams { + params := info.params + + if params.MaxDurationInSeconds != nil && info.metadata != nil && !info.metadata.EndTime.IsZero() { + elapsed := int(info.metadata.EndTime.Sub(info.metadata.StartTime).Seconds()) + remaining := *params.MaxDurationInSeconds - elapsed + if remaining < 1 { + remaining = 1 + } + params.MaxDurationInSeconds = &remaining + log.Info("adjusted max duration for new segment", "id", info.id, "elapsed_s", elapsed, "remaining_s", remaining) + } + + if params.MaxSizeInMB != nil && params.OutputDir != nil { + segmentPath := filepath.Join(*params.OutputDir, info.id+".mp4") + if fi, err := os.Stat(segmentPath); err == nil { + consumedMB := int((fi.Size() + 1024*1024 - 1) / (1024 * 1024)) + remaining := *params.MaxSizeInMB - consumedMB + if remaining < 1 { + remaining = 1 + } + params.MaxSizeInMB = &remaining + log.Info("adjusted max size for new segment", "id", info.id, "consumed_mb", consumedMB, "remaining_mb", remaining) + } + } + + return params +} + +// startNewRecordingSegments creates and starts a new recording segment for +// each previously-stopped recorder. Each new segment gets a unique suffixed +// ID so the old (stopped) recorder and its finalized file remain accessible +// in the manager. +// +// Duration and size limits are adjusted to account for what the previous +// segment already consumed, so the cumulative totals stay within the +// originally requested bounds. +func (s *ApiService) startNewRecordingSegments(ctx context.Context, stopped []stoppedRecordingInfo) { + log := logger.FromContext(ctx) + + for _, info := range stopped { + newID := fmt.Sprintf("%s-%d", info.id, time.Now().UnixMilli()) + + params := adjustParamsForRemainingBudget(log, info) + + rec, err := s.factory(newID, params) + if err != nil { + log.Error("failed to create recorder for new segment", "old_id", info.id, "new_id", newID, "error", err) + continue + } + + if err := s.recordManager.RegisterRecorder(ctx, rec); err != nil { + log.Error("failed to register new segment recorder", "old_id", info.id, "new_id", newID, "error", err) + continue + } + + if err := rec.Start(ctx); err != nil { + log.Error("failed to start new segment recording", "old_id", info.id, "new_id", newID, "error", err) + _ = s.recordManager.DeregisterRecorder(ctx, rec) + continue + } + + log.Info("new recording segment started after resize", "old_id", info.id, "new_id", newID) + } +} + // isNekoEnabled checks if Neko service is enabled func (s *ApiService) isNekoEnabled() bool { return os.Getenv("ENABLE_WEBRTC") == "true" diff --git a/server/cmd/api/api/display_test.go b/server/cmd/api/api/display_test.go new file mode 100644 index 00000000..687d65fc --- /dev/null +++ b/server/cmd/api/api/display_test.go @@ -0,0 +1,413 @@ +package api + +import ( + "context" + "log/slog" + "os" + "path/filepath" + "testing" + "time" + + "github.com/onkernel/kernel-images/server/lib/recorder" + "github.com/onkernel/kernel-images/server/lib/scaletozero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testMockFFmpegBin = filepath.Join("..", "..", "..", "lib", "recorder", "testdata", "mock_ffmpeg.sh") + +func testFFmpegFactory(t *testing.T, tempDir string) recorder.FFmpegRecorderFactory { + t.Helper() + fr := 5 + disp := 0 + size := 1 + config := recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &size, + OutputDir: &tempDir, + } + return recorder.NewFFmpegRecorderFactory(testMockFFmpegBin, config, scaletozero.NewNoopController()) +} + +func newTestServiceWithFactory(t *testing.T, mgr recorder.RecordManager, factory recorder.FFmpegRecorderFactory) *ApiService { + t.Helper() + svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + require.NoError(t, err) + return svc +} + +func TestStopActiveRecordings(t *testing.T) { + t.Run("stops recording but keeps it registered", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + rec, err := factory("test-rec", recorder.FFmpegRecordingParams{}) + require.NoError(t, err) + require.NoError(t, mgr.RegisterRecorder(ctx, rec)) + require.NoError(t, rec.Start(ctx)) + time.Sleep(50 * time.Millisecond) + require.True(t, rec.IsRecording(ctx)) + + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + require.Len(t, stopped, 1) + assert.Equal(t, "test-rec", stopped[0].id) + assert.NotNil(t, stopped[0].params.FrameRate) + require.NotNil(t, stopped[0].metadata, "metadata should be captured") + assert.False(t, stopped[0].metadata.StartTime.IsZero(), "start time should be set") + + oldRec, exists := mgr.GetRecorder("test-rec") + assert.True(t, exists, "old recorder should remain registered") + assert.False(t, oldRec.IsRecording(ctx), "old recorder should be stopped") + }) + + t.Run("stops multiple active recordings", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + ids := []string{"rec-a", "rec-b"} + for _, id := range ids { + rec, err := factory(id, recorder.FFmpegRecordingParams{}) + require.NoError(t, err) + require.NoError(t, mgr.RegisterRecorder(ctx, rec)) + require.NoError(t, rec.Start(ctx)) + } + time.Sleep(50 * time.Millisecond) + + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + assert.Len(t, stopped, 2) + + for _, id := range ids { + oldRec, exists := mgr.GetRecorder(id) + assert.True(t, exists, "recorder %s should remain registered", id) + assert.False(t, oldRec.IsRecording(ctx), "recorder %s should be stopped", id) + } + }) + + t.Run("skips non-recording recorders", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + mock := &mockRecorder{id: "idle-rec", isRecordingFlag: false} + require.NoError(t, mgr.RegisterRecorder(ctx, mock)) + + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + assert.Empty(t, stopped) + + _, exists := mgr.GetRecorder("idle-rec") + assert.True(t, exists, "non-recording recorder should remain registered") + }) + + t.Run("returns empty when no recorders exist", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + assert.Empty(t, stopped) + }) +} + +func TestStartNewRecordingSegments(t *testing.T) { + t.Run("creates new segment with suffixed ID", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + fr := 5 + disp := 0 + size := 1 + info := stoppedRecordingInfo{ + id: "test-rec", + params: recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &size, + OutputDir: &tempDir, + }, + } + + svc.startNewRecordingSegments(ctx, []stoppedRecordingInfo{info}) + + // The new recorder should have a suffixed ID, not the original + _, existsOld := mgr.GetRecorder("test-rec") + assert.False(t, existsOld, "original ID should not be re-registered") + + // Find the new segment by iterating active recorders + var newRec recorder.Recorder + for _, r := range mgr.ListActiveRecorders(ctx) { + if r.IsRecording(ctx) { + newRec = r + break + } + } + require.NotNil(t, newRec, "a new recording segment should be active") + assert.Contains(t, newRec.ID(), "test-rec-", "new ID should be prefixed with the original ID") + assert.True(t, newRec.IsRecording(ctx)) + + _ = newRec.Stop(ctx) + }) + + t.Run("starts segment even when no old recorder exists in manager", func(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + fr := 5 + disp := 0 + size := 1 + info := stoppedRecordingInfo{ + id: "fresh-rec", + params: recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &size, + OutputDir: &tempDir, + }, + } + + svc.startNewRecordingSegments(ctx, []stoppedRecordingInfo{info}) + + var newRec recorder.Recorder + for _, r := range mgr.ListActiveRecorders(ctx) { + if r.IsRecording(ctx) { + newRec = r + break + } + } + require.NotNil(t, newRec, "new segment should be active") + assert.Contains(t, newRec.ID(), "fresh-rec-") + + _ = newRec.Stop(ctx) + }) +} + +func TestStopAndStartNewSegment_RoundTrip(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + factory := testFFmpegFactory(t, tempDir) + mgr := recorder.NewFFmpegManager() + svc := newTestServiceWithFactory(t, mgr, factory) + + // Start a recording + rec, err := factory("round-trip", recorder.FFmpegRecordingParams{}) + require.NoError(t, err) + require.NoError(t, mgr.RegisterRecorder(ctx, rec)) + require.NoError(t, rec.Start(ctx)) + time.Sleep(50 * time.Millisecond) + require.True(t, rec.IsRecording(ctx)) + + // Stop active recordings (simulating resize) + stopped, err := svc.stopActiveRecordings(ctx) + require.NoError(t, err) + require.Len(t, stopped, 1) + assert.Equal(t, "round-trip", stopped[0].id) + + // Old recorder should still be registered but stopped + oldRec, exists := mgr.GetRecorder("round-trip") + require.True(t, exists, "old recorder should remain registered") + assert.False(t, oldRec.IsRecording(ctx), "old recorder should be stopped") + + // Start new segments + svc.startNewRecordingSegments(ctx, stopped) + + // Old recorder should still be there + oldRec2, exists := mgr.GetRecorder("round-trip") + require.True(t, exists, "old recorder should still be registered after new segment starts") + assert.False(t, oldRec2.IsRecording(ctx)) + + // New recorder should be active with a different ID + var newRec recorder.Recorder + for _, r := range mgr.ListActiveRecorders(ctx) { + if r.ID() != "round-trip" && r.IsRecording(ctx) { + newRec = r + break + } + } + require.NotNil(t, newRec, "new segment recorder should exist") + assert.Contains(t, newRec.ID(), "round-trip-", "new ID should be suffixed") + assert.True(t, newRec.IsRecording(ctx)) + + _ = newRec.Stop(ctx) +} + +func TestAdjustParamsForRemainingBudget(t *testing.T) { + log := slog.Default() + + t.Run("reduces MaxDurationInSeconds by elapsed time", func(t *testing.T) { + maxDur := 60 + fr := 5 + disp := 0 + size := 500 + dir := t.TempDir() + + info := stoppedRecordingInfo{ + id: "dur-test", + params: recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &size, + MaxDurationInSeconds: &maxDur, + OutputDir: &dir, + }, + metadata: &recorder.RecordingMetadata{ + StartTime: time.Now().Add(-25 * time.Second), + EndTime: time.Now(), + }, + } + + adjusted := adjustParamsForRemainingBudget(log, info) + require.NotNil(t, adjusted.MaxDurationInSeconds) + assert.InDelta(t, 35, *adjusted.MaxDurationInSeconds, 2, "remaining duration should be ~35s") + assert.Equal(t, 60, maxDur, "original param should not be mutated") + }) + + t.Run("clamps remaining duration to 1 when budget exhausted", func(t *testing.T) { + maxDur := 10 + fr := 5 + disp := 0 + size := 500 + dir := t.TempDir() + + info := stoppedRecordingInfo{ + id: "exhausted-dur", + params: recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &size, + MaxDurationInSeconds: &maxDur, + OutputDir: &dir, + }, + metadata: &recorder.RecordingMetadata{ + StartTime: time.Now().Add(-30 * time.Second), + EndTime: time.Now(), + }, + } + + adjusted := adjustParamsForRemainingBudget(log, info) + require.NotNil(t, adjusted.MaxDurationInSeconds) + assert.Equal(t, 1, *adjusted.MaxDurationInSeconds) + }) + + t.Run("reduces MaxSizeInMB by consumed file size", func(t *testing.T) { + maxSize := 10 + fr := 5 + disp := 0 + dir := t.TempDir() + + segmentFile := filepath.Join(dir, "size-test.mp4") + data := make([]byte, 3*1024*1024) // 3 MB + require.NoError(t, os.WriteFile(segmentFile, data, 0644)) + + info := stoppedRecordingInfo{ + id: "size-test", + params: recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &maxSize, + OutputDir: &dir, + }, + metadata: &recorder.RecordingMetadata{}, + } + + adjusted := adjustParamsForRemainingBudget(log, info) + require.NotNil(t, adjusted.MaxSizeInMB) + assert.Equal(t, 7, *adjusted.MaxSizeInMB) + assert.Equal(t, 10, maxSize, "original param should not be mutated") + }) + + t.Run("clamps remaining size to 1 when budget exhausted", func(t *testing.T) { + maxSize := 2 + fr := 5 + disp := 0 + dir := t.TempDir() + + segmentFile := filepath.Join(dir, "big-test.mp4") + data := make([]byte, 5*1024*1024) // 5 MB > 2 MB limit + require.NoError(t, os.WriteFile(segmentFile, data, 0644)) + + info := stoppedRecordingInfo{ + id: "big-test", + params: recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &maxSize, + OutputDir: &dir, + }, + metadata: &recorder.RecordingMetadata{}, + } + + adjusted := adjustParamsForRemainingBudget(log, info) + require.NotNil(t, adjusted.MaxSizeInMB) + assert.Equal(t, 1, *adjusted.MaxSizeInMB) + }) + + t.Run("rounds up fractional MB when computing consumed size", func(t *testing.T) { + maxSize := 10 + fr := 5 + disp := 0 + dir := t.TempDir() + + segmentFile := filepath.Join(dir, "frac-test.mp4") + data := make([]byte, 3*1024*1024+512*1024) // 3.5 MB → rounds up to 4 MB consumed + require.NoError(t, os.WriteFile(segmentFile, data, 0644)) + + info := stoppedRecordingInfo{ + id: "frac-test", + params: recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &maxSize, + OutputDir: &dir, + }, + metadata: &recorder.RecordingMetadata{}, + } + + adjusted := adjustParamsForRemainingBudget(log, info) + require.NotNil(t, adjusted.MaxSizeInMB) + assert.Equal(t, 6, *adjusted.MaxSizeInMB) // 10 - 4 = 6 + }) + + t.Run("no adjustment when limits are nil", func(t *testing.T) { + fr := 5 + disp := 0 + size := 500 + dir := t.TempDir() + + info := stoppedRecordingInfo{ + id: "no-limits", + params: recorder.FFmpegRecordingParams{ + FrameRate: &fr, + DisplayNum: &disp, + MaxSizeInMB: &size, + OutputDir: &dir, + }, + metadata: &recorder.RecordingMetadata{ + StartTime: time.Now().Add(-10 * time.Second), + EndTime: time.Now(), + }, + } + + adjusted := adjustParamsForRemainingBudget(log, info) + assert.Nil(t, adjusted.MaxDurationInSeconds, "should remain nil when not set") + }) +} diff --git a/server/lib/recorder/ffmeg_test.go b/server/lib/recorder/ffmeg_test.go index 22aea379..bb739bb1 100644 --- a/server/lib/recorder/ffmeg_test.go +++ b/server/lib/recorder/ffmeg_test.go @@ -50,6 +50,24 @@ func TestFFmpegRecorder_StartAndStop(t *testing.T) { require.False(t, rec.IsRecording(t.Context())) } +func TestFFmpegRecorder_Params(t *testing.T) { + tempDir := t.TempDir() + params := defaultParams(tempDir) + rec := &FFmpegRecorder{ + id: "params-test", + binaryPath: mockBin, + params: params, + outputPath: filepath.Join(tempDir, "params-test.mp4"), + stz: scaletozero.NewOncer(scaletozero.NewNoopController()), + } + + got := rec.Params() + assert.Equal(t, *params.FrameRate, *got.FrameRate) + assert.Equal(t, *params.DisplayNum, *got.DisplayNum) + assert.Equal(t, *params.MaxSizeInMB, *got.MaxSizeInMB) + assert.Equal(t, *params.OutputDir, *got.OutputDir) +} + func TestFFmpegRecorder_ForceStop(t *testing.T) { tempDir := t.TempDir() rec := &FFmpegRecorder{ diff --git a/server/lib/recorder/ffmpeg.go b/server/lib/recorder/ffmpeg.go index 52e6b054..d3d2dc80 100644 --- a/server/lib/recorder/ffmpeg.go +++ b/server/lib/recorder/ffmpeg.go @@ -141,6 +141,38 @@ func (fr *FFmpegRecorder) ID() string { return fr.id } +// Params returns a deep copy of the merged recording parameters. +func (fr *FFmpegRecorder) Params() FFmpegRecordingParams { + fr.mu.Lock() + defer fr.mu.Unlock() + return fr.params.clone() +} + +func (p FFmpegRecordingParams) clone() FFmpegRecordingParams { + c := p + if p.FrameRate != nil { + v := *p.FrameRate + c.FrameRate = &v + } + if p.DisplayNum != nil { + v := *p.DisplayNum + c.DisplayNum = &v + } + if p.MaxSizeInMB != nil { + v := *p.MaxSizeInMB + c.MaxSizeInMB = &v + } + if p.MaxDurationInSeconds != nil { + v := *p.MaxDurationInSeconds + c.MaxDurationInSeconds = &v + } + if p.OutputDir != nil { + v := *p.OutputDir + c.OutputDir = &v + } + return c +} + // Start begins the recording process by launching ffmpeg with the configured parameters. func (fr *FFmpegRecorder) Start(ctx context.Context) error { log := logger.FromContext(ctx)