Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 70 additions & 12 deletions experimental/ssh/internal/client/releases.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
Expand All @@ -10,11 +11,16 @@ import (
"os"
"path/filepath"
"strings"
"syscall"

"github.com/databricks/cli/experimental/ssh/internal/workspace"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
sdkclient "github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/httpclient"
"golang.org/x/net/http2"
)

Expand All @@ -26,10 +32,15 @@ func UploadTunnelReleases(ctx context.Context, client *databricks.WorkspaceClien
return fmt.Errorf("failed to get versioned directory: %w", err)
}

workspaceFiler, err := filer.NewWorkspaceFilesClient(client, versionedDir)
// Upload the CLI bundle over HTTP/1.1. It is a single ~14 MB POST, so HTTP/2
// buys us nothing, and some corporate proxies reset large HTTP/2 request bodies
// with RST_STREAM(NO_ERROR), which aborts the upload (see DECO-27497). Forcing
// HTTP/1.1 only for this client keeps the rest of the connect flow on HTTP/2.
uploadClient, err := newHTTP11Client(client.Config)
if err != nil {
return fmt.Errorf("failed to create workspace files client: %w", err)
return fmt.Errorf("failed to create upload client: %w", err)
}
workspaceFiler := filer.NewWorkspaceFilesClientWithClient(client, versionedDir, uploadClient)

getRelease := getGithubRelease
if releasesDir != "" {
Expand All @@ -38,6 +49,43 @@ func UploadTunnelReleases(ctx context.Context, client *databricks.WorkspaceClien
return uploadReleases(ctx, workspaceFiler, getRelease, version, releasesDir)
}

// newHTTP11Client returns an SDK client derived from cfg that negotiates HTTP/1.1
// only. cfg is reused, not copied (it embeds a sync.Mutex); only the transport is
// overridden, mirroring how client.New builds its client from the same config.
func newHTTP11Client(cfg *config.Config) (*sdkclient.DatabricksClient, error) {
clientCfg, err := config.HTTPClientConfigFromConfig(cfg)
if err != nil {
return nil, err
}
clientCfg.Transport = newHTTP11Transport(cfg)
return sdkclient.NewWithClient(cfg, httpclient.NewApiClient(clientCfg))
}

// newHTTP11Transport clones cfg's transport (or the default) and disables HTTP/2.
// A non-nil, empty TLSNextProto map is the documented way to turn off the transport's
// automatic HTTP/2 support. See https://pkg.go.dev/net/http#Transport
func newHTTP11Transport(cfg *config.Config) *http.Transport {
t, ok := cfg.HTTPTransport.(*http.Transport)
if ok && t != nil {
t = t.Clone()
} else {
t = http.DefaultTransport.(*http.Transport).Clone()
}
t.ForceAttemptHTTP2 = false
t.TLSNextProto = map[string]func(string, *tls.Conn) http.RoundTripper{}
// Cloning http.DefaultTransport drops the InsecureSkipVerify the SDK would
// otherwise apply, so re-apply it here to honor the resolved config.
if cfg.InsecureSkipVerify {
if t.TLSClientConfig == nil {
t.TLSClientConfig = &tls.Config{}
} else {
t.TLSClientConfig = t.TLSClientConfig.Clone()
}
t.TLSClientConfig.InsecureSkipVerify = true
}
return t
}

func uploadReleases(ctx context.Context, workspaceFiler filer.Filer, getRelease releaseProvider, version, releasesDir string) error {
architectures := []string{"amd64", "arm64"}

Expand Down Expand Up @@ -66,12 +114,13 @@ func uploadReleases(ctx context.Context, workspaceFiler filer.Filer, getRelease
// producing the filerRoot/remoteSubFolder/*archive-contents* structure, with 'databricks' binary inside.
err = workspaceFiler.Write(ctx, remoteArchivePath, releaseReader, filer.OverwriteIfExists, filer.CreateParentDirectories)
if err != nil {
if isStreamResetError(err) {
if isProxyUploadError(err) {
return fmt.Errorf("failed to upload file %s to workspace: %w\n\n"+
"The connection was closed before the upload finished. "+
"This is usually caused by a network intermediary (corporate egress proxy, VPN, or firewall/WAF) "+
"The upload was rejected before it finished. The CLI already sends this upload over HTTP/1.1, "+
"so this is most likely a network intermediary (corporate egress proxy, VPN, or firewall/WAF) "+
"enforcing a request-body size limit on POSTs to *.cloud.databricks.com. "+
"Try running this command from a network without such restrictions",
"Ask your network administrator to allow large uploads to that path, "+
"or run this command from a network without such restrictions",
remoteArchivePath, err)
}
return fmt.Errorf("failed to upload file %s to workspace: %w", remoteArchivePath, err)
Expand All @@ -82,12 +131,21 @@ func uploadReleases(ctx context.Context, workspaceFiler filer.Filer, getRelease
return nil
}

// isStreamResetError reports whether err looks like an HTTP/2 stream reset from
// the server, which typically means an edge proxy or the workspace-files import
// endpoint rejected the request body (e.g. body-size limit). The string fallback
// catches cases where a transport layer re-formats the http2 error before it
// reaches us, losing the typed value but preserving the message shape.
func isStreamResetError(err error) bool {
// isProxyUploadError reports whether err looks like the binary upload was rejected
// or severed by a network intermediary (corporate proxy / VPN / firewall / WAF)
// rather than by Databricks — typically an enforced request-body size limit. Because
// the upload runs over HTTP/1.1 (see newHTTP11Transport), the usual signatures are a
// 413 response or a connection reset mid-body. The HTTP/2 stream-reset checks are
// kept as a guard in case the upload ever runs over HTTP/2 again; that error reaches
// us either as a typed http2.StreamError or, when a transport layer re-formats it,
// as a string that still preserves the "stream error ... stream ID" shape.
func isProxyUploadError(err error) bool {
if aerr, ok := errors.AsType[*apierr.APIError](err); ok {
return aerr.StatusCode == http.StatusRequestEntityTooLarge
}
if errors.Is(err, syscall.ECONNRESET) {
return true
}
if _, ok := errors.AsType[http2.StreamError](err); ok {
return true
}
Expand Down
30 changes: 28 additions & 2 deletions experimental/ssh/internal/client/releases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,32 @@ package client
import (
"errors"
"fmt"
"net/http"
"syscall"
"testing"

"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/config"
"github.com/stretchr/testify/assert"
"golang.org/x/net/http2"
)

func TestIsStreamResetError(t *testing.T) {
func TestIsProxyUploadError(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{
name: "413 request entity too large",
err: &apierr.APIError{StatusCode: http.StatusRequestEntityTooLarge, Message: "request too large"},
want: true,
},
{
name: "connection reset mid-body",
err: fmt.Errorf(`Post "https://example/...": write tcp: %w`, syscall.ECONNRESET),
want: true,
},
{
name: "typed http2.StreamError wrapped",
err: fmt.Errorf(`Post "https://example/api/2.0/workspace-files/import-file/...": %w`, http2.StreamError{StreamID: 15, Code: http2.ErrCodeNo}),
Expand All @@ -25,6 +39,11 @@ func TestIsStreamResetError(t *testing.T) {
err: errors.New("stream error: stream ID 15; NO_ERROR; received from peer"),
want: true,
},
{
name: "non-413 API error",
err: &apierr.APIError{StatusCode: http.StatusForbidden, Message: "permission denied"},
want: false,
},
{
name: "unrelated error",
err: errors.New("connection refused"),
Expand All @@ -39,7 +58,14 @@ func TestIsStreamResetError(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, isStreamResetError(tt.err))
assert.Equal(t, tt.want, isProxyUploadError(tt.err))
})
}
}

func TestNewHTTP11TransportDisablesHTTP2(t *testing.T) {
tr := newHTTP11Transport(&config.Config{})
assert.False(t, tr.ForceAttemptHTTP2)
assert.NotNil(t, tr.TLSNextProto)
assert.Empty(t, tr.TLSNextProto)
}
12 changes: 12 additions & 0 deletions libs/filer/workspace_files_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ func NewWorkspaceFilesClient(w *databricks.WorkspaceClient, root string) (Filer,
}, nil
}

// NewWorkspaceFilesClientWithClient is like [NewWorkspaceFilesClient] but uses the
// provided SDK client instead of building one from w.Config. Callers use it to route
// file operations over a custom HTTP transport (for example, forcing HTTP/1.1)
// without copying or mutating the shared w.Config.
func NewWorkspaceFilesClientWithClient(w *databricks.WorkspaceClient, root string, apiClient *client.DatabricksClient) Filer {
return &WorkspaceFilesClient{
workspaceClient: w,
apiClient: apiClient,
root: NewWorkspaceRootPath(root),
}
}

func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
absPath, err := w.root.Join(name)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions libs/filer/workspace_files_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/databricks/cli/libs/testserver"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -181,3 +182,15 @@ func TestWorkspaceFilesClientStatNotFound(t *testing.T) {
err := statWithError(t, 404, "RESOURCE_DOES_NOT_EXIST")
assert.ErrorIs(t, err, fs.ErrNotExist)
}

func TestNewWorkspaceFilesClientWithClient(t *testing.T) {
w, err := databricks.NewWorkspaceClient(&databricks.Config{Host: "https://x.test", Token: "x"})
require.NoError(t, err)
apiClient, err := client.New(w.Config)
require.NoError(t, err)

f := NewWorkspaceFilesClientWithClient(w, "/Workspace/Users/me", apiClient)
wfc, ok := f.(*WorkspaceFilesClient)
require.True(t, ok)
assert.Same(t, apiClient, wfc.apiClient)
}
Loading