Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ case class SQLFunction(

object SQLFunction {

val SCALAR = "SCALAR"
val TABLE = "TABLE"

/**
* Persisted frozen PATH for SQL function bodies when created with [[SQLConf.PATH_ENABLED]].
* Serialized as a JSON array of path entries (same format as
Expand Down Expand Up @@ -253,6 +256,40 @@ object SQLFunction {
}
}

/**
* Convert an [[ExpressionInfo]] into a SQL function.
*/
def fromExpressionInfo(info: ExpressionInfo, parser: ParserInterface): SQLFunction = {
try {
val props = mapper.readValue(info.getUsage, classOf[Map[String, String]])
val isTableFunc = props(IS_TABLE_FUNC).toBoolean
val collation = props.get(COLLATION)
val returnType = parseReturnTypeText(props(RETURN_TYPE), isTableFunc, parser, collation)
SQLFunction(
name = FunctionIdentifier(info.getName, Option(info.getDb)),
inputParam = props.get(INPUT_PARAM).map(parseRoutineParam(_, parser, collation)),
returnType = returnType.get,
exprText = props.get(EXPRESSION),
queryText = props.get(QUERY),
comment = props.get(COMMENT),
collation = collation,
deterministic = props.get(DETERMINISTIC).map(_.toBoolean),
containsSQL = props.get(CONTAINS_SQL).map(_.toBoolean),
isTableFunc = isTableFunc,
properties = props.filterNot(_._1.startsWith(SQL_FUNCTION_PREFIX)),
owner = props.get(OWNER),
createTimeMs = props(CREATE_TIME).toLong)
} catch {
case e: Exception =>
throw new AnalysisException(
errorClass = "CORRUPTED_CATALOG_FUNCTION",
messageParameters = Map(
"identifier" -> s"${info.getDb}.${info.getName}",
"className" -> s"${info.getClassName}"), cause = Some(e)
)
}
}

def parseDefault(text: String, parser: ParserInterface): Expression = {
parser.parseExpression(text)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2146,7 +2146,15 @@ class SessionCatalog(
if (registry.functionExists(identToRegister) && !overrideIfExists) {
throw QueryCompilationErrors.functionAlreadyExistsError(func)
}
val info = makeExprInfoForHiveFunction(funcDefinition)
val info = if (funcDefinition.isUserDefinedFunction) {
// For SQL UDFs we need the structured ExpressionInfo (with the serialized
// function metadata as the `usage` blob) so that DESCRIBE FUNCTION can
// later reconstruct the function from the registry cache. The hive-style
// ExpressionInfo has a null usage which would break DESCRIBE.
UserDefinedFunction.fromCatalogFunction(funcDefinition, parser).toExpressionInfo
} else {
makeExprInfoForHiveFunction(funcDefinition)
}
registry.registerFunction(identToRegister, info, functionBuilder)
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@

package org.apache.spark.sql.execution.command

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource, SQLFunction}
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource, SQLFunction, SqlPathFormat}
import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{NullType, StringType, StructField, StructType}


/**
Expand Down Expand Up @@ -101,38 +105,115 @@ case class DescribeFunctionCommand(
toAttributes(schema)
}

override def run(sparkSession: SparkSession): Seq[Row] = {
val identifier = if (info.getDb != null) {
sparkSession.sessionState.catalog.qualifyIdentifier(
FunctionIdentifier(info.getName, Some(info.getDb)))
private def append(buffer: ArrayBuffer[(String, String)], key: String, value: String): Unit = {
buffer += (key -> value)
}

/**
* Pad all input strings into the same length using the max string length among all inputs.
*/
private def tabulate(inputs: Seq[String]): Seq[String] = {
val maxLen = inputs.map(_.length).max
inputs.map { input => input.padTo(maxLen, " ").mkString }
}

private def formatParameters(params: StructType): Seq[String] = {
val names = tabulate(params.map(_.name))
val dataTypes = tabulate(params.map(_.dataType.sql))
// Only show parameter comments in extended mode.
val comments = params.map { p =>
if (isExtended) p.getComment().map(c => s" '$c'").getOrElse("") else ""
}
val defaults = params.map { p =>
if (isExtended) p.getDefault().map(d => s" DEFAULT $d").getOrElse("") else ""
}
names zip dataTypes zip defaults zip comments map {
case (((name, dataType), default), comment) => s"$name $dataType$default$comment"
}
}

private def describeSQLFunction(info: ExpressionInfo, parser: ParserInterface): Seq[Row] = {
val buffer = new ArrayBuffer[(String, String)]
val f = SQLFunction.fromExpressionInfo(info, parser)
append(buffer, "Function:", f.name.toString)
append(buffer, "Type:", if (f.isTableFunc) SQLFunction.TABLE else SQLFunction.SCALAR)
// Function input
val input = f.inputParam
if (input.nonEmpty) {
val params = formatParameters(input.get)
assert(params.nonEmpty)
append(buffer, "Input:", params.head)
params.tail.foreach(s => append(buffer, "", s))
} else {
FunctionIdentifier(info.getName)
append(buffer, "Input:", "()")
}
val name = identifier.unquotedString
val result = if (info.getClassName != null) {
Row(s"Function: $name") ::
Row(s"Class: ${info.getClassName}") ::
Row(s"Usage: ${info.getUsage}") :: Nil
// Function returns
if (f.isTableFunc) {
val returnParams = formatParameters(f.getTableFuncReturnCols)
assert(returnParams.nonEmpty)
append(buffer, "Returns:", returnParams.head)
returnParams.tail.foreach(s => append(buffer, "", s))
} else {
Row(s"Function: $name") :: Row(s"Usage: ${info.getUsage}") :: Nil
f.getScalarFuncReturnType match {
case _: NullType =>
case other => append(buffer, "Returns:", other.sql)
}
}
if (isExtended) {
f.comment.foreach(c => append(buffer, "Comment:", c))
f.collation.foreach(c => append(buffer, "Collation:", c))
f.deterministic.foreach(d => append(buffer, "Deterministic:", d.toString))
f.containsSQL.foreach { c =>
val dataAccess = if (c) "CONTAINS SQL" else "READS SQL DATA"
append(buffer, "Data Access:", dataAccess)
}
val configs = f.getSQLConfigs
if (configs.nonEmpty) {
val sorted = configs.toSeq.sortBy(_._1).map { case (key, value) => s"$key=$value" }
append(buffer, "Configs:", sorted.head)
sorted.tail.foreach(s => append(buffer, "", s))
}
f.owner.foreach(o => append(buffer, "Owner:", o))
append(buffer, "Create Time:", new java.util.Date(f.createTimeMs).toString)
// Put the function body at the end of the description.
append(buffer, "Body:", f.exprText.orElse(f.queryText).get)
// Show the frozen SQL PATH if one was persisted at function creation time.
if (SQLConf.get.pathEnabled) {
f.functionStoredResolutionPath
.flatMap(SqlPathFormat.toDescribeJson)
.flatMap(SqlPathFormat.formatForDisplay)
.foreach(p => append(buffer, "SQL Path:", p))
}
}
val keys = tabulate(buffer.map(_._1).toSeq)
val values = buffer.map(_._2)
keys.zip(values).map { case (key, value) => Row(s"$key $value") }
}

val sqlPathRows =
if (isExtended &&
sparkSession.sessionState.conf.pathEnabled &&
SQLFunction.isSQLFunction(info.getClassName)) {
DescribeFunctionCommandUtils
.storedResolutionPathString(sparkSession, identifier, info)
.map(s => Seq(Row(s"SQL Path: $s")))
.getOrElse(Nil)
override def run(sparkSession: SparkSession): Seq[Row] = {
if (SQLFunction.isSQLFunction(info.getClassName)) {
describeSQLFunction(info, sparkSession.sessionState.sqlParser)
} else {
val identifier = if (info.getDb != null) {
sparkSession.sessionState.catalog.qualifyIdentifier(
FunctionIdentifier(info.getName, Some(info.getDb)))
} else {
Nil
FunctionIdentifier(info.getName)
}
val name = identifier.unquotedString
val result = if (info.getClassName != null) {
Row(s"Function: $name") ::
Row(s"Class: ${info.getClassName}") ::
Row(s"Usage: ${info.getUsage}") :: Nil
} else {
Row(s"Function: $name") :: Row(s"Usage: ${info.getUsage}") :: Nil
}

if (isExtended) {
(result ++ sqlPathRows) :+ Row(s"Extended Usage:${info.getExtended}")
} else {
result
if (isExtended) {
result :+ Row(s"Extended Usage:${info.getExtended}")
} else {
result
}
}
}
}
Expand Down
Loading