Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,39 @@ To disable Cloud Fetch (e.g., when handling smaller datasets or to avoid additio
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?useCloudFetch=false
```

### Telemetry Configuration (Optional)

The driver includes optional telemetry to help improve performance and reliability. Telemetry is **disabled by default** and requires explicit opt-in.

**Opt-in to telemetry** (respects server-side feature flags):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=true
```

**Opt-out of telemetry** (explicitly disable):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=false
```

**Advanced configuration** (for testing/debugging):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?forceEnableTelemetry=true
```

**What data is collected:**
- ✅ Query latency and performance metrics
- ✅ Error codes (not error messages)
- ✅ Feature usage (CloudFetch, LZ4, etc.)
- ✅ Driver version and environment info

**What is NOT collected:**
- ❌ SQL query text
- ❌ Query results or data values
- ❌ Table/column names
- ❌ User identities or credentials

Telemetry has < 1% performance overhead and uses circuit breaker protection to ensure it never impacts your queries. For more details, see `telemetry/DESIGN.md` and `telemetry/TROUBLESHOOTING.md`.

### Connecting with a new Connector

You can also connect with a new connector object. For example:
Expand Down
80 changes: 59 additions & 21 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
dbsqlerr "github.com/databricks/databricks-sql-go/errors"
"github.com/databricks/databricks-sql-go/internal/cli_service"
"github.com/databricks/databricks-sql-go/internal/client"
context2 "github.com/databricks/databricks-sql-go/internal/compat/context"
"github.com/databricks/databricks-sql-go/internal/config"
dbsqlerrint "github.com/databricks/databricks-sql-go/internal/errors"
"github.com/databricks/databricks-sql-go/internal/rows"
Expand Down Expand Up @@ -53,18 +52,21 @@ func (c *conn) Close() error {
ctx := driverctx.NewContextWithConnId(context.Background(), c.id)

// Close telemetry and release resources
closeStart := time.Now()
_, err := c.client.CloseSession(ctx, &cli_service.TCloseSessionReq{
SessionHandle: c.session.SessionHandle,
})
closeLatencyMs := time.Since(closeStart).Milliseconds()

if c.telemetry != nil {
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeDeleteSession, closeLatencyMs)
_ = c.telemetry.Close(ctx)
telemetry.ReleaseForConnection(c.cfg.Host)
}

_, err := c.client.CloseSession(ctx, &cli_service.TCloseSessionReq{
SessionHandle: c.session.SessionHandle,
})

if err != nil {
log.Err(err).Msg("databricks: failed to close connection")
return dbsqlerrint.NewBadConnectionError(err)
return dbsqlerrint.NewRequestError(ctx, dbsqlerr.ErrCloseConnection, err)
}
return nil
}
Expand Down Expand Up @@ -123,15 +125,16 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name

corrId := driverctx.CorrelationIdFromContext(ctx)

exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args)
var pollCount int
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args, &pollCount)
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
stagingErr := c.execStagingOperation(exStmtResp, ctx)

// Telemetry: track statement execution
var statementID string
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
ctx = c.telemetry.BeforeExecute(ctx, statementID)
ctx = c.telemetry.BeforeExecute(ctx, c.id, statementID)
defer func() {
finalErr := err
if stagingErr != nil {
Expand All @@ -140,6 +143,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
c.telemetry.AfterExecute(ctx, finalErr)
c.telemetry.CompleteStatement(ctx, statementID, finalErr != nil)
}()
c.telemetry.AddTag(ctx, "poll_count", pollCount)
}

if exStmtResp != nil && exStmtResp.OperationHandle != nil {
Expand Down Expand Up @@ -181,34 +185,61 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
log, _ := client.LoggerAndContext(ctx, nil)
msg, start := log.Track("QueryContext")

// first we try to get the results synchronously.
// at any point in time that the context is done we must cancel and return
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args)
// Capture execution start time for telemetry before running the query
executeStart := time.Now()
var pollCount int
exStmtResp, opStatusResp, pollCount, err := c.runQueryWithTelemetry(ctx, query, args, &pollCount)
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
defer log.Duration(msg, start)

// Telemetry: track statement execution
var statementID string
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
ctx = c.telemetry.BeforeExecute(ctx, statementID)
// Use BeforeExecuteWithTime to set the correct start time (before execution)
ctx = c.telemetry.BeforeExecuteWithTime(ctx, c.id, statementID, executeStart)
defer func() {
c.telemetry.AfterExecute(ctx, err)
c.telemetry.CompleteStatement(ctx, statementID, err != nil)
}()

c.telemetry.AddTag(ctx, "poll_count", pollCount)
c.telemetry.AddTag(ctx, "operation_type", telemetry.OperationTypeExecuteStatement)

if exStmtResp.DirectResults != nil && exStmtResp.DirectResults.ResultSetMetadata != nil {
resultFormat := exStmtResp.DirectResults.ResultSetMetadata.GetResultFormat()
c.telemetry.AddTag(ctx, "result.format", resultFormat.String())
}
}

if err != nil {
log.Err(err).Msg("databricks: failed to run query") // To log query we need to redact credentials
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
var telemetryUpdate func(int, int64)
if c.telemetry != nil {
telemetryUpdate = func(chunkCount int, bytesDownloaded int64) {
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
}
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, ctx, telemetryUpdate)

return rows, err

}

func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedValue) (*cli_service.TExecuteStatementResp, *cli_service.TGetOperationStatusResp, error) {
func (c *conn) runQueryWithTelemetry(ctx context.Context, query string, args []driver.NamedValue, pollCount *int) (*cli_service.TExecuteStatementResp, *cli_service.TGetOperationStatusResp, int, error) {
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args, pollCount)
count := 0
if pollCount != nil {
count = *pollCount
}
return exStmtResp, opStatusResp, count, err
}

func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedValue, pollCount *int) (*cli_service.TExecuteStatementResp, *cli_service.TGetOperationStatusResp, error) {
// first we try to get the results synchronously.
// at any point in time that the context is done we must cancel and return
exStmtResp, err := c.executeStatement(ctx, query, args)
Expand Down Expand Up @@ -240,7 +271,7 @@ func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedVa
case cli_service.TOperationState_INITIALIZED_STATE,
cli_service.TOperationState_PENDING_STATE,
cli_service.TOperationState_RUNNING_STATE:
statusResp, err := c.pollOperation(ctx, opHandle)
statusResp, err := c.pollOperationWithCount(ctx, opHandle, pollCount)
if err != nil {
return exStmtResp, statusResp, err
}
Expand Down Expand Up @@ -268,7 +299,7 @@ func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedVa
}

} else {
statusResp, err := c.pollOperation(ctx, opHandle)
statusResp, err := c.pollOperationWithCount(ctx, opHandle, pollCount)
if err != nil {
return exStmtResp, statusResp, err
}
Expand Down Expand Up @@ -372,7 +403,6 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver

select {
default:
// Non-blocking check: continue if context not done
case <-ctx.Done():
newCtx := driverctx.NewContextFromBackground(ctx)
// in case context is done, we need to cancel the operation if necessary
Expand All @@ -396,12 +426,12 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver
return resp, err
}

func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperationHandle) (*cli_service.TGetOperationStatusResp, error) {
func (c *conn) pollOperationWithCount(ctx context.Context, opHandle *cli_service.TOperationHandle, pollCount *int) (*cli_service.TGetOperationStatusResp, error) {
corrId := driverctx.CorrelationIdFromContext(ctx)
log := logger.WithContext(c.id, corrId, client.SprintGuid(opHandle.OperationId.GUID))
var statusResp *cli_service.TGetOperationStatusResp
ctx = driverctx.NewContextWithConnId(ctx, c.id)
newCtx := context2.WithoutCancel(ctx)
newCtx := driverctx.NewContextWithCorrelationId(driverctx.NewContextWithConnId(context.Background(), c.id), corrId)
pollSentinel := sentinel.Sentinel{
OnDoneFn: func(statusResp any) (any, error) {
return statusResp, nil
Expand All @@ -413,6 +443,10 @@ func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperati
OperationHandle: opHandle,
})

if pollCount != nil {
*pollCount++
}

if statusResp != nil && statusResp.OperationState != nil {
log.Debug().Msgf("databricks: status %s", statusResp.GetOperationState().String())
}
Expand Down Expand Up @@ -455,6 +489,10 @@ func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperati
return statusResp, nil
}

func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperationHandle) (*cli_service.TGetOperationStatusResp, error) {
return c.pollOperationWithCount(ctx, opHandle, nil)
}

func (c *conn) CheckNamedValue(nv *driver.NamedValue) error {
var err error
if parameter, ok := nv.Value.(Parameter); ok {
Expand Down Expand Up @@ -622,7 +660,7 @@ func (c *conn) execStagingOperation(
}

if len(driverctx.StagingPathsFromContext(ctx)) != 0 {
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, nil, nil)
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error reading row.", err)
}
Expand Down
16 changes: 8 additions & 8 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
assert.Error(t, err)
assert.Nil(t, exStmtResp)
assert.Nil(t, opStatusResp)
Expand Down Expand Up @@ -875,7 +875,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.Error(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -921,7 +921,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.NoError(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -968,7 +968,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.Error(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1021,7 +1021,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.NoError(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1073,7 +1073,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.Error(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1126,7 +1126,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.NoError(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1179,7 +1179,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.Error(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down
30 changes: 16 additions & 14 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
}

protocolVersion := int64(c.cfg.ThriftProtocolVersion)

sessionStart := time.Now()
session, err := tclient.OpenSession(ctx, &cli_service.TOpenSessionReq{
ClientProtocolI64: &protocolVersion,
Configuration: sessionParams,
Expand All @@ -64,6 +66,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
},
CanUseMultipleCatalogs: &c.cfg.CanUseMultipleCatalogs,
})
sessionLatencyMs := time.Since(sessionStart).Milliseconds()

if err != nil {
return nil, dbsqlerrint.NewRequestError(ctx, fmt.Sprintf("error connecting: host=%s port=%d, httpPath=%s", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath), err)
}
Expand All @@ -76,21 +80,19 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
}
log := logger.WithContext(conn.id, driverctx.CorrelationIdFromContext(ctx), "")

// Initialize telemetry: pass user opt-in flag; if unset, feature flags decide
var enableTelemetry *bool
// Initialize telemetry if configured
if c.cfg.EnableTelemetry {
trueVal := true
enableTelemetry = &trueVal
}

conn.telemetry = telemetry.InitializeForConnection(
ctx,
c.cfg.Host,
c.client,
enableTelemetry,
)
if conn.telemetry != nil {
log.Debug().Msg("telemetry initialized for connection")
conn.telemetry = telemetry.InitializeForConnection(
ctx,
c.cfg.Host,
c.cfg.DriverVersion,
c.client,
c.cfg.EnableTelemetry,
)
if conn.telemetry != nil {
log.Debug().Msg("telemetry initialized for connection")
conn.telemetry.RecordOperation(ctx, conn.id, telemetry.OperationTypeCreateSession, sessionLatencyMs)
}
}

log.Info().Msgf("connect: host=%s port=%d httpPath=%s serverProtocolVersion=0x%X", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath, session.ServerProtocolVersion)
Expand Down
3 changes: 3 additions & 0 deletions connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestNewConnector(t *testing.T) {
RetryWaitMax: 60 * time.Second,
Transport: roundTripper,
CloudFetchConfig: expectedCloudFetchConfig,
EnableTelemetry: true,
}
expectedCfg := config.WithDefaults()
expectedCfg.DriverVersion = DriverVersion
Expand Down Expand Up @@ -110,6 +111,7 @@ func TestNewConnector(t *testing.T) {
RetryWaitMin: 1 * time.Second,
RetryWaitMax: 30 * time.Second,
CloudFetchConfig: expectedCloudFetchConfig,
EnableTelemetry: true,
}
expectedCfg := config.WithDefaults()
expectedCfg.UserConfig = expectedUserConfig
Expand Down Expand Up @@ -152,6 +154,7 @@ func TestNewConnector(t *testing.T) {
RetryWaitMin: 0,
RetryWaitMax: 0,
CloudFetchConfig: expectedCloudFetchConfig,
EnableTelemetry: true,
}
expectedCfg := config.WithDefaults()
expectedCfg.DriverVersion = DriverVersion
Expand Down
Loading
Loading