Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bundle:
name: nested_notebooks
11 changes: 11 additions & 0 deletions acceptance/bundle/generate/job_nested_notebooks/out.job.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
resources:
jobs:
out:
name: dev.my_repo.my_job
tasks:
- task_key: my_notebook_task
notebook_task:
notebook_path: src/my_folder/my_notebook.py
- task_key: other_notebook_task
notebook_task:
notebook_path: src/other_folder/other_notebook.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions acceptance/bundle/generate/job_nested_notebooks/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
File successfully saved to src/my_folder/my_notebook.py
File successfully saved to src/other_folder/other_notebook.py
Job configuration successfully saved to out.job.yml
src/my_folder/my_notebook.py
src/other_folder/other_notebook.py
2 changes: 2 additions & 0 deletions acceptance/bundle/generate/job_nested_notebooks/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
$CLI bundle generate job --existing-job-id 1234 --config-dir . --key out --force --source-dir src 2>&1 | sort
find src -type f | sort
42 changes: 42 additions & 0 deletions acceptance/bundle/generate/job_nested_notebooks/test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
Ignore = ["src"]

[[Server]]
Pattern = "GET /api/2.2/jobs/get"
Response.Body = '''
{
"job_id": 11223344,
"settings": {
"name": "dev.my_repo.my_job",
"tasks": [
{
"task_key": "my_notebook_task",
"notebook_task": {
"notebook_path": "/my_data_product/dev/my_folder/my_notebook"
}
},
{
"task_key": "other_notebook_task",
"notebook_task": {
"notebook_path": "/my_data_product/dev/other_folder/other_notebook"
}
}
]
}
}
'''

[[Server]]
Pattern = "GET /api/2.0/workspace/get-status"
Response.Body = '''
{
"object_type": "NOTEBOOK",
"language": "PYTHON",
"repos_export_format": "SOURCE"
}
'''

[[Server]]
Pattern = "GET /api/2.0/workspace/export"
Response.Body = '''
print("Hello, World!")
'''
46 changes: 46 additions & 0 deletions bundle/generate/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,52 @@ func (n *Downloader) markNotebookForDownload(ctx context.Context, notebookPath *
return nil
}

func (n *Downloader) MarkTasksForDownload(ctx context.Context, tasks []jobs.Task) error {
var paths []string
for _, task := range tasks {
if task.NotebookTask != nil {
paths = append(paths, task.NotebookTask.NotebookPath)
}
}
if len(paths) > 0 {
n.basePath = commonDirPrefix(paths)
}
for i := range tasks {
if err := n.MarkTaskForDownload(ctx, &tasks[i]); err != nil {
return err
}
}
return nil
}

// commonDirPrefix returns the longest common directory-aligned prefix of the given paths.
func commonDirPrefix(paths []string) string {
if len(paths) == 0 {
return ""
}
if len(paths) == 1 {
return path.Dir(paths[0])
}

prefix := paths[0]
for _, p := range paths[1:] {
for !strings.HasPrefix(p, prefix) {
prefix = prefix[:len(prefix)-1]
if prefix == "" {
return ""
}
}
}

// Truncate to last '/' to ensure directory alignment.
if i := strings.LastIndex(prefix, "/"); i >= 0 {
prefix = prefix[:i]
} else {
prefix = ""
}
return prefix
}

func (n *Downloader) relativePath(fullPath string) string {
basePath := path.Dir(fullPath)
if n.basePath != "" {
Expand Down
160 changes: 160 additions & 0 deletions bundle/generate/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package generate

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"path/filepath"
"testing"

"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/experimental/mocks"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -94,3 +99,158 @@ func TestDownloader_DoesNotRecurseIntoNodeModules(t *testing.T) {
assert.Contains(t, downloader.files, filepath.Join(sourceDir, "app.py"))
assert.Contains(t, downloader.files, filepath.Join(sourceDir, "src/index.js"))
}

func TestCommonDirPrefix(t *testing.T) {
tests := []struct {
name string
paths []string
want string
}{
{
name: "empty",
paths: nil,
want: "",
},
{
name: "single path",
paths: []string{"/a/b/c"},
want: "/a/b",
},
{
name: "shared parent",
paths: []string{"/a/b/c", "/a/b/d"},
want: "/a/b",
},
{
name: "root divergence",
paths: []string{"/x/y", "/z/w"},
want: "",
},
{
name: "partial dir name safety",
paths: []string{"/a/bc/d", "/a/bd/e"},
want: "/a",
},
{
name: "nested shared prefix",
paths: []string{"/Users/user/project/etl/extract", "/Users/user/project/reporting/dashboard"},
want: "/Users/user/project",
},
{
name: "identical paths",
paths: []string{"/a/b/c", "/a/b/c"},
want: "/a/b",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, commonDirPrefix(tt.paths))
})
}
}

func newTestWorkspaceClient(t *testing.T, handler http.HandlerFunc) *databricks.WorkspaceClient {
server := httptest.NewServer(handler)
t.Cleanup(server.Close)

w, err := databricks.NewWorkspaceClient(&databricks.Config{
Host: server.URL,
Token: "test-token",
})
require.NoError(t, err)
return w
}

func notebookStatusHandler(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/2.0/workspace/get-status" {
t.Fatalf("unexpected request path: %s", r.URL.Path)
}
resp := workspaceStatus{
Language: workspace.LanguagePython,
ObjectType: workspace.ObjectTypeNotebook,
ExportFormat: workspace.ExportFormatSource,
}
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(resp)
if err != nil {
t.Fatal(err)
}
}
}

func TestDownloader_MarkTasksForDownload_PreservesStructure(t *testing.T) {
ctx := context.Background()
w := newTestWorkspaceClient(t, notebookStatusHandler(t))

dir := "base/dir"
sourceDir := filepath.Join(dir, "source")
configDir := filepath.Join(dir, "config")
downloader := NewDownloader(w, sourceDir, configDir)

tasks := []jobs.Task{
{
TaskKey: "extract_task",
NotebookTask: &jobs.NotebookTask{
NotebookPath: "/Users/user/project/etl/extract",
},
},
{
TaskKey: "dashboard_task",
NotebookTask: &jobs.NotebookTask{
NotebookPath: "/Users/user/project/reporting/dashboard",
},
},
}

err := downloader.MarkTasksForDownload(ctx, tasks)
require.NoError(t, err)

assert.Equal(t, filepath.FromSlash("../source/etl/extract.py"), tasks[0].NotebookTask.NotebookPath)
assert.Equal(t, filepath.FromSlash("../source/reporting/dashboard.py"), tasks[1].NotebookTask.NotebookPath)
assert.Len(t, downloader.files, 2)
}

func TestDownloader_MarkTasksForDownload_SingleNotebook(t *testing.T) {
ctx := context.Background()
w := newTestWorkspaceClient(t, notebookStatusHandler(t))

dir := "base/dir"
sourceDir := filepath.Join(dir, "source")
configDir := filepath.Join(dir, "config")
downloader := NewDownloader(w, sourceDir, configDir)

tasks := []jobs.Task{
{
TaskKey: "task1",
NotebookTask: &jobs.NotebookTask{
NotebookPath: "/Users/user/project/notebook",
},
},
}

err := downloader.MarkTasksForDownload(ctx, tasks)
require.NoError(t, err)

// Single notebook: basePath = path.Dir => same as old behavior.
assert.Equal(t, filepath.FromSlash("../source/notebook.py"), tasks[0].NotebookTask.NotebookPath)
assert.Len(t, downloader.files, 1)
}

func TestDownloader_MarkTasksForDownload_NoNotebooks(t *testing.T) {
ctx := context.Background()
w := newTestWorkspaceClient(t, func(w http.ResponseWriter, r *http.Request) {
t.Fatalf("unexpected request: %s %s", r.Method, r.URL.Path)
})

downloader := NewDownloader(w, "source", "config")

tasks := []jobs.Task{
{TaskKey: "spark_task"},
{TaskKey: "python_wheel_task"},
}

err := downloader.MarkTasksForDownload(ctx, tasks)
require.NoError(t, err)
assert.Empty(t, downloader.files)
}
8 changes: 3 additions & 5 deletions cmd/bundle/generate/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,9 @@ After generation, you can deploy this job to other targets using:
if job.Settings.GitSource != nil {
cmdio.LogString(ctx, "Job is using Git source, skipping downloading files")
} else {
for _, task := range job.Settings.Tasks {
err := downloader.MarkTaskForDownload(ctx, &task)
if err != nil {
return err
}
err = downloader.MarkTasksForDownload(ctx, job.Settings.Tasks)
if err != nil {
return err
}
}

Expand Down
Loading