diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala index 9f606b698d30c..2a1c8591e4e51 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala @@ -219,6 +219,12 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends s"Spark Connect - ${Utils.abbreviate(debugString, 128)}") session.sparkContext.setInterruptOnCancel(true) + val sessionUser = executeHolder.sessionHolder.userId + if (sessionUser != null && sessionUser.nonEmpty) { + session.sparkContext.setLocalProperty( + SparkConnectService.SPARK_CONNECT_SESSION_USER_KEY, sessionUser) + } + // Add debug information to the query execution so that the jobs are traceable. session.sparkContext.setLocalProperty( "callSite.short", diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index f2dd9c1b1cef6..d79b36dc8266f 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -314,6 +314,8 @@ class SparkConnectService(debug: Boolean) extends AsyncService with BindableServ */ object SparkConnectService extends Logging { + private[connect] val SPARK_CONNECT_SESSION_USER_KEY: String = "spark.connect.session.user" + private[connect] var server: Server = _ private[connect] var bindingAddress: InetSocketAddress = _