diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SQLFunction.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SQLFunction.scala index 07ca0a8712485..0c21a1cde1484 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SQLFunction.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SQLFunction.scala @@ -187,6 +187,9 @@ case class SQLFunction( object SQLFunction { + val SCALAR = "SCALAR" + val TABLE = "TABLE" + /** * Persisted frozen PATH for SQL function bodies when created with [[SQLConf.PATH_ENABLED]]. * Serialized as a JSON array of path entries (same format as @@ -253,6 +256,40 @@ object SQLFunction { } } + /** + * Convert an [[ExpressionInfo]] into a SQL function. + */ + def fromExpressionInfo(info: ExpressionInfo, parser: ParserInterface): SQLFunction = { + try { + val props = mapper.readValue(info.getUsage, classOf[Map[String, String]]) + val isTableFunc = props(IS_TABLE_FUNC).toBoolean + val collation = props.get(COLLATION) + val returnType = parseReturnTypeText(props(RETURN_TYPE), isTableFunc, parser, collation) + SQLFunction( + name = FunctionIdentifier(info.getName, Option(info.getDb)), + inputParam = props.get(INPUT_PARAM).map(parseRoutineParam(_, parser, collation)), + returnType = returnType.get, + exprText = props.get(EXPRESSION), + queryText = props.get(QUERY), + comment = props.get(COMMENT), + collation = collation, + deterministic = props.get(DETERMINISTIC).map(_.toBoolean), + containsSQL = props.get(CONTAINS_SQL).map(_.toBoolean), + isTableFunc = isTableFunc, + properties = props.filterNot(_._1.startsWith(SQL_FUNCTION_PREFIX)), + owner = props.get(OWNER), + createTimeMs = props(CREATE_TIME).toLong) + } catch { + case e: Exception => + throw new AnalysisException( + errorClass = "CORRUPTED_CATALOG_FUNCTION", + messageParameters = Map( + "identifier" -> s"${info.getDb}.${info.getName}", + "className" -> s"${info.getClassName}"), cause = Some(e) + ) + } + } + def parseDefault(text: String, parser: ParserInterface): Expression = { parser.parseExpression(text) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 249700ec0d92d..db1c8511b6aea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -2146,7 +2146,15 @@ class SessionCatalog( if (registry.functionExists(identToRegister) && !overrideIfExists) { throw QueryCompilationErrors.functionAlreadyExistsError(func) } - val info = makeExprInfoForHiveFunction(funcDefinition) + val info = if (funcDefinition.isUserDefinedFunction) { + // For SQL UDFs we need the structured ExpressionInfo (with the serialized + // function metadata as the `usage` blob) so that DESCRIBE FUNCTION can + // later reconstruct the function from the registry cache. The hive-style + // ExpressionInfo has a null usage which would break DESCRIBE. + UserDefinedFunction.fromCatalogFunction(funcDefinition, parser).toExpressionInfo + } else { + makeExprInfoForHiveFunction(funcDefinition) + } registry.registerFunction(identToRegister, info, functionBuilder) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DescribeFunctionCommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DescribeFunctionCommandUtils.scala deleted file mode 100644 index 24b04a9e3faf8..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DescribeFunctionCommandUtils.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command - -import java.util - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.FunctionIdentifier -import org.apache.spark.sql.catalyst.catalog.{SQLFunction, SqlPathFormat, UserDefinedFunction} -import org.apache.spark.sql.catalyst.expressions.ExpressionInfo - -/** - * Helpers for [[DescribeFunctionCommand]] to retrieve and format - * the frozen SQL PATH stored in SQL function metadata. - */ -private[command] object DescribeFunctionCommandUtils { - - /** - * Returns the frozen SQL PATH persisted for a SQL function, formatted - * for display. Persistent functions: loads [[CatalogFunction]] metadata - * from the catalog. Temporary SQL UDFs (not in catalog): falls back to - * parsing the usage JSON blob produced by [[SQLFunction.toExpressionInfo]]. - */ - private[command] def storedResolutionPathString( - sparkSession: SparkSession, - identifier: FunctionIdentifier, - info: ExpressionInfo): Option[String] = { - val rawJson = try { - val meta = sparkSession.sessionState.catalog - .getFunctionMetadata(identifier) - if (meta.isUserDefinedFunction) { - val udf = UserDefinedFunction.fromCatalogFunction( - meta, - sparkSession.sessionState.sqlParser) - udf.asInstanceOf[SQLFunction].functionStoredResolutionPath - } else { - None - } - } catch { - case _: org.apache.spark.sql.catalyst.analysis - .NoSuchFunctionException | - _: org.apache.spark.sql.catalyst.analysis - .NoSuchDatabaseException => - extractResolutionPathFromSqlUdfUsage(info.getUsage) - } - rawJson.flatMap(formatStoredPath) - } - - private def formatStoredPath(pathStr: String): Option[String] = { - SqlPathFormat.toDescribeJson(pathStr) - .flatMap(SqlPathFormat.formatForDisplay) - } - - /** - * For temporary SQL UDFs not in the catalog, the resolution path may - * be embedded in the ExpressionInfo usage JSON blob. Returns None if - * the usage string is not JSON or does not contain the path key. - */ - private def extractResolutionPathFromSqlUdfUsage( - usage: String): Option[String] = { - if (usage == null || usage.isEmpty) return None - try { - val map = UserDefinedFunction.mapper.readValue( - usage, classOf[util.HashMap[String, String]]) - Option(map.get(SQLFunction.FUNCTION_RESOLUTION_PATH)) - .filter(_.nonEmpty) - } catch { - case e: com.fasterxml.jackson.core.JsonProcessingException => - throw new org.apache.spark.SparkException( - s"Corrupted SQL UDF metadata: expected JSON usage blob " + - s"but failed to parse: ${e.getMessage}", e) - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index ff54b49eed3ae..94de04b94bbb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -17,16 +17,20 @@ package org.apache.spark.sql.execution.command +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistry -import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource, SQLFunction} +import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource, SQLFunction, SqlPathFormat} import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo} +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{NullType, StringType, StructField, StructType} /** @@ -101,38 +105,115 @@ case class DescribeFunctionCommand( toAttributes(schema) } - override def run(sparkSession: SparkSession): Seq[Row] = { - val identifier = if (info.getDb != null) { - sparkSession.sessionState.catalog.qualifyIdentifier( - FunctionIdentifier(info.getName, Some(info.getDb))) + private def append(buffer: ArrayBuffer[(String, String)], key: String, value: String): Unit = { + buffer += (key -> value) + } + + /** + * Pad all input strings into the same length using the max string length among all inputs. + */ + private def tabulate(inputs: Seq[String]): Seq[String] = { + val maxLen = inputs.map(_.length).max + inputs.map { input => input.padTo(maxLen, " ").mkString } + } + + private def formatParameters(params: StructType): Seq[String] = { + val names = tabulate(params.map(_.name)) + val dataTypes = tabulate(params.map(_.dataType.sql)) + // Only show parameter comments in extended mode. + val comments = params.map { p => + if (isExtended) p.getComment().map(c => s" '$c'").getOrElse("") else "" + } + val defaults = params.map { p => + if (isExtended) p.getDefault().map(d => s" DEFAULT $d").getOrElse("") else "" + } + names zip dataTypes zip defaults zip comments map { + case (((name, dataType), default), comment) => s"$name $dataType$default$comment" + } + } + + private def describeSQLFunction(info: ExpressionInfo, parser: ParserInterface): Seq[Row] = { + val buffer = new ArrayBuffer[(String, String)] + val f = SQLFunction.fromExpressionInfo(info, parser) + append(buffer, "Function:", f.name.toString) + append(buffer, "Type:", if (f.isTableFunc) SQLFunction.TABLE else SQLFunction.SCALAR) + // Function input + val input = f.inputParam + if (input.nonEmpty) { + val params = formatParameters(input.get) + assert(params.nonEmpty) + append(buffer, "Input:", params.head) + params.tail.foreach(s => append(buffer, "", s)) } else { - FunctionIdentifier(info.getName) + append(buffer, "Input:", "()") } - val name = identifier.unquotedString - val result = if (info.getClassName != null) { - Row(s"Function: $name") :: - Row(s"Class: ${info.getClassName}") :: - Row(s"Usage: ${info.getUsage}") :: Nil + // Function returns + if (f.isTableFunc) { + val returnParams = formatParameters(f.getTableFuncReturnCols) + assert(returnParams.nonEmpty) + append(buffer, "Returns:", returnParams.head) + returnParams.tail.foreach(s => append(buffer, "", s)) } else { - Row(s"Function: $name") :: Row(s"Usage: ${info.getUsage}") :: Nil + f.getScalarFuncReturnType match { + case _: NullType => + case other => append(buffer, "Returns:", other.sql) + } + } + if (isExtended) { + f.comment.foreach(c => append(buffer, "Comment:", c)) + f.collation.foreach(c => append(buffer, "Collation:", c)) + f.deterministic.foreach(d => append(buffer, "Deterministic:", d.toString)) + f.containsSQL.foreach { c => + val dataAccess = if (c) "CONTAINS SQL" else "READS SQL DATA" + append(buffer, "Data Access:", dataAccess) + } + val configs = f.getSQLConfigs + if (configs.nonEmpty) { + val sorted = configs.toSeq.sortBy(_._1).map { case (key, value) => s"$key=$value" } + append(buffer, "Configs:", sorted.head) + sorted.tail.foreach(s => append(buffer, "", s)) + } + f.owner.foreach(o => append(buffer, "Owner:", o)) + append(buffer, "Create Time:", new java.util.Date(f.createTimeMs).toString) + // Put the function body at the end of the description. + append(buffer, "Body:", f.exprText.orElse(f.queryText).get) + // Show the frozen SQL PATH if one was persisted at function creation time. + if (SQLConf.get.pathEnabled) { + f.functionStoredResolutionPath + .flatMap(SqlPathFormat.toDescribeJson) + .flatMap(SqlPathFormat.formatForDisplay) + .foreach(p => append(buffer, "SQL Path:", p)) + } } + val keys = tabulate(buffer.map(_._1).toSeq) + val values = buffer.map(_._2) + keys.zip(values).map { case (key, value) => Row(s"$key $value") } + } - val sqlPathRows = - if (isExtended && - sparkSession.sessionState.conf.pathEnabled && - SQLFunction.isSQLFunction(info.getClassName)) { - DescribeFunctionCommandUtils - .storedResolutionPathString(sparkSession, identifier, info) - .map(s => Seq(Row(s"SQL Path: $s"))) - .getOrElse(Nil) + override def run(sparkSession: SparkSession): Seq[Row] = { + if (SQLFunction.isSQLFunction(info.getClassName)) { + describeSQLFunction(info, sparkSession.sessionState.sqlParser) + } else { + val identifier = if (info.getDb != null) { + sparkSession.sessionState.catalog.qualifyIdentifier( + FunctionIdentifier(info.getName, Some(info.getDb))) } else { - Nil + FunctionIdentifier(info.getName) + } + val name = identifier.unquotedString + val result = if (info.getClassName != null) { + Row(s"Function: $name") :: + Row(s"Class: ${info.getClassName}") :: + Row(s"Usage: ${info.getUsage}") :: Nil + } else { + Row(s"Function: $name") :: Row(s"Usage: ${info.getUsage}") :: Nil } - if (isExtended) { - (result ++ sqlPathRows) :+ Row(s"Extended Usage:${info.getExtended}") - } else { - result + if (isExtended) { + result :+ Row(s"Extended Usage:${info.getExtended}") + } else { + result + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala index 9a3af9e1b4324..2a1fc2215d627 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala @@ -113,6 +113,107 @@ class SQLFunctionSuite extends SharedSparkSession { } } + test("describe SQL scalar functions") { + withUserDefinedFunction("foo" -> true, "bar" -> true, "area" -> false) { + // Temporary function + sql( + """ + |CREATE TEMPORARY FUNCTION foo() RETURNS int + |COMMENT 'function foo' RETURN 1 + |""".stripMargin) + checkKeywordsExist(sql("describe function foo"), + "Function:", "foo", + "Type:", "SCALAR", + "Input:", "()", + "Returns:", "INT") + checkKeywordsExist(sql("describe function extended foo"), + "Deterministic: true", + "Data Access:", "CONTAINS SQL", + "Comment:", "function foo", + "Create Time:", + "Body:", "1") + sql( + """ + |CREATE TEMPORARY FUNCTION bar(x int default 8, + |y int default substr('8hello', 1, 1) comment 'var_y') + |RETURNS int COMMENT 'function bar' RETURN x + y + |""".stripMargin) + checkKeywordsExist(sql("describe function bar"), + "Function:", "bar", + "Input:", "x INT", "y INT", + "Returns:", "INT") + checkKeywordsExist(sql("describe function extended bar"), + "Input:", "x INT DEFAULT 8", "y INT DEFAULT substr('8hello', 1, 1) 'var_y'", + "Comment:", "function bar", + "Deterministic: true", + "Data Access:", "CONTAINS SQL", + "Body:", "x + y") + // Permanent function + sql( + """ + |CREATE FUNCTION area(width double comment 'width', height double comment 'height') + |RETURNS double + |COMMENT 'compute area' + |DETERMINISTIC + |RETURN width * height + |""".stripMargin) + checkKeywordsExist(sql("describe function area"), + "Function:", "default.area", + "Type:", "SCALAR", + "Input:", "width DOUBLE", "height DOUBLE", + "Returns:", "DOUBLE") + checkKeywordsExist(sql("describe function extended area"), + "Input:", "width DOUBLE 'width'", "height DOUBLE 'height'", + "Comment:", "compute area", + "Deterministic: true", + "Data Access:", "CONTAINS SQL", + "Create Time:", + "Body:", "width * height") + } + } + + test("describe SQL table functions") { + withUserDefinedFunction("foo" -> false) { + sql( + """ + |CREATE FUNCTION foo(x INT) RETURNS TABLE (a INT, b STRING) + |COMMENT 'table function foo' RETURN SELECT x, x + |""".stripMargin) + checkKeywordsExist(sql("describe function foo"), + "Function:", "foo", + "Type:", "TABLE", + "Input:", "x INT", + "Returns:", "a INT", "b STRING") + checkKeywordsExist(sql("describe function extended foo"), + "Comment:", "table function foo", + "Deterministic: true", + "Data Access:", "CONTAINS SQL", + "Create Time:", + "Body:", "SELECT x, x") + } + } + + test("describe SQL functions with derived routine characteristics") { + withUserDefinedFunction("foo" -> false, "bar" -> false, "baz" -> false) { + withTable("tbl_for_describe") { + sql("CREATE TABLE tbl_for_describe AS SELECT 1 AS x") + sql("CREATE FUNCTION foo() RETURNS TABLE(x INT) RETURN SELECT * FROM tbl_for_describe") + sql("CREATE FUNCTION bar() RETURNS DOUBLE RETURN SELECT SUM(x) + rand() FROM foo()") + sql("CREATE FUNCTION baz() RETURNS INT NOT DETERMINISTIC READS SQL DATA RETURN 1") + checkKeywordsExist(sql("DESCRIBE FUNCTION EXTENDED foo"), + "Deterministic: true", + "Data Access:", "READS SQL DATA") + checkKeywordsExist(sql("DESCRIBE FUNCTION EXTENDED bar"), + "Deterministic: false", + "Data Access:", "READS SQL DATA") + // Do not overwrite user specified routine characteristics. + checkKeywordsExist(sql("DESCRIBE FUNCTION EXTENDED baz"), + "Deterministic: false", + "Data Access:", "READS SQL DATA") + } + } + } + test("SPARK-56639: SQL function uses frozen SQL path") { withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { withDatabase("path_func_db_a", "path_func_db_b") { @@ -135,6 +236,14 @@ class SQLFunctionSuite extends SharedSparkSession { checkAnswer(sql("SELECT MAX(id) FROM frozen_t"), Row(20)) checkAnswer(sql("SELECT default.frozen_fn()"), Row(10)) + // DESCRIBE FUNCTION EXTENDED renders the frozen creator path, + // not the invoker's current PATH. + checkKeywordsExist(sql("DESCRIBE FUNCTION EXTENDED default.frozen_fn"), + "SQL Path:", + "`spark_catalog`.`path_func_db_a`", + "`system`.`builtin`") + checkKeywordsNotExist(sql("DESCRIBE FUNCTION EXTENDED default.frozen_fn"), + "path_func_db_b") } finally { sql("SET PATH = DEFAULT_PATH") }