Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public enum LogKeys implements LogKey {
CONFIG5,
CONFIG_DEPRECATION_MESSAGE,
CONFIG_KEY_UPDATED,
CONFIG_MAP_NAME,
CONFIG_VERSION,
CONSUMER,
CONTAINER,
Expand Down Expand Up @@ -720,6 +721,7 @@ public enum LogKeys implements LogKey {
SESSION_KEY,
SET_CLIENT_INFO_REQUEST,
SHARD_ID,
SHORTER_CONFIG_MAP_NAME,
SHORTER_SERVICE_NAME,
SHORT_USER_NAME,
SHUFFLE_BLOCK_INFO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.fabric8.kubernetes.api.model._
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils
import org.apache.spark.util.ArrayImplicits._

/**
Expand Down Expand Up @@ -53,7 +54,8 @@ private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf)
}
}

private def newConfigMapName: String = s"${conf.resourceNamePrefix}-hadoop-config"
private lazy val newConfigMapName: String =
KubernetesClientUtils.configMapName(conf.resourceNamePrefix, "-hadoop-config")

private def hasHadoopConf: Boolean = confDir.isDefined || existingConfMap.isDefined

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -119,7 +120,8 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri

private def hasKerberosConf: Boolean = krb5CMap.isDefined | krb5File.isDefined

private def newConfigMapName: String = s"${kubernetesConf.resourceNamePrefix}-krb5-file"
private lazy val newConfigMapName: String =
KubernetesClientUtils.configMapName(kubernetesConf.resourceNamePrefix, "-krb5-file")

override def configurePod(original: SparkPod): SparkPod = {
original.transform { case pod if hasKerberosConf =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils
import org.apache.spark.util.DependencyUtils.downloadFile
import org.apache.spark.util.Utils

Expand All @@ -33,7 +34,8 @@ private[spark] class PodTemplateConfigMapStep(conf: KubernetesConf)

private val hasTemplate = conf.contains(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)

private val configmapName = s"${conf.resourceNamePrefix}-$POD_TEMPLATE_CONFIGMAP"
private val configmapName =
KubernetesClientUtils.configMapName(conf.resourceNamePrefix, s"-$POD_TEMPLATE_CONFIGMAP")

def configurePod(pod: SparkPod): SparkPod = {
if (hasTemplate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.deploy.k8s.{Config, Constants, KubernetesUtils}
import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH, KUBERNETES_NAMESPACE}
import org.apache.spark.deploy.k8s.Constants.ENV_SPARK_CONF_DIR
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{CONFIG, PATH, PATHS}
import org.apache.spark.internal.LogKeys.{CONFIG, CONFIG_MAP_NAME, MAX_SIZE, PATH, PATHS, SHORTER_CONFIG_MAP_NAME}
import org.apache.spark.util.ArrayImplicits._

/**
Expand All @@ -49,9 +49,26 @@ object KubernetesClientUtils extends Logging {

// Config map name can be KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH chars at max.
@Since("3.3.0")
def configMapName(prefix: String): String = {
val suffix = "-conf-map"
s"${prefix.take(KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH - suffix.length)}$suffix"
def configMapName(prefix: String): String = configMapName(prefix, "-conf-map")

/**
* Builds a ConfigMap name of the form `<prefix><suffix>`. If the resulting name would exceed
* `KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH`, falls back to `spark-<uniqueID><suffix>` so the
* name remains both valid and unique across concurrent applications. Mirrors the fallback
* strategy used by [[org.apache.spark.deploy.k8s.KubernetesConf.driverServiceName]].
*/
@Since("5.0.0")
def configMapName(prefix: String, suffix: String): String = {
val preferred = s"$prefix$suffix"
if (preferred.length <= KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH) {
preferred
} else {
val fallback = s"spark-${KubernetesUtils.uniqueID()}$suffix"
logWarning(log"ConfigMap name ${MDC(CONFIG_MAP_NAME, preferred)} is too long " +
log"(must be <= ${MDC(MAX_SIZE, KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH)} characters); " +
log"falling back to ${MDC(SHORTER_CONFIG_MAP_NAME, fallback)}.")
fallback
}
}

@Since("3.1.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.api.model.ConfigMap

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config.KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.util.{SparkConfWithEnv, Utils}

Expand Down Expand Up @@ -60,6 +61,27 @@ class HadoopConfDriverFeatureStepSuite extends SparkFunSuite {
assert(hadoopConfMap.getData().keySet().asScala === confFiles)
}

test("hadoop ConfigMap name stays valid and consistent with very long resourceNamePrefix") {
val confDir = Utils.createTempDir()
Files.writeString(new File(confDir, "core-site.xml").toPath, "some data")

val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath()))
val longPrefix = "x" * KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH
val conf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf, resourceNamePrefix = Some(longPrefix))
val step = new HadoopConfDriverFeatureStep(conf)

val hadoopConfMap = filter[ConfigMap](step.getAdditionalKubernetesResources()).head
val name = hadoopConfMap.getMetadata().getName()
assert(name.length <= KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH)
// The pod's volume must reference the exact same name as the created ConfigMap;
// otherwise the driver/executor would mount a non-existent ConfigMap.
val pod = step.configurePod(SparkPod.initialPod())
val volume = pod.pod.getSpec().getVolumes().asScala.find(_.getName() == HADOOP_CONF_VOLUME)
assert(volume.isDefined)
assert(volume.get.getConfigMap().getName() === name)
}

private def checkPod(pod: SparkPod): Unit = {
assert(podHasVolume(pod.pod, HADOOP_CONF_VOLUME))
assert(containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,25 @@ class KerberosConfDriverFeatureStepSuite extends SparkFunSuite {
assert(step.getAdditionalPodSystemProperties().isEmpty)
}

test("krb5.conf ConfigMap name stays valid and consistent with very long resourceNamePrefix") {
val krbConf = File.createTempFile("krb5", ".conf", tmpDir)
Files.writeString(krbConf.toPath, "some data")

val sparkConf = new SparkConf(false)
.set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath())
val longPrefix = "x" * KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf, resourceNamePrefix = Some(longPrefix))
val step = new KerberosConfDriverFeatureStep(kconf)

val confMap = filter[ConfigMap](step.getAdditionalKubernetesResources()).head
val name = confMap.getMetadata().getName()
assert(name.length <= KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH)
// The pod mount must reference the exact same name as the created resource;
// otherwise the executor would mount a non-existent ConfigMap.
checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), name)
}

test("create keytab secret if client keytab file used") {
val keytab = File.createTempFile("keytab", ".bin", tmpDir)
Files.writeString(keytab.toPath, "some data")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model.ConfigMap

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config.KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -87,4 +88,29 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite {
(Constants.EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" +
Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
}

test("podspec ConfigMap name stays valid and consistent with very long resourceNamePrefix") {
val templateFile = Files.createTempFile("pod-template", "yml").toFile
templateFile.deleteOnExit()
Utils.tryWithResource(new PrintWriter(templateFile)) { writer =>
writer.write("pod-template-contents")
}

val sparkConf = new SparkConf(false)
.set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, templateFile.getAbsolutePath)
val longPrefix = "x" * KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH
val kubernetesConf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf, resourceNamePrefix = Some(longPrefix))
val step = new PodTemplateConfigMapStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

val resources = step.getAdditionalKubernetesResources()
assert(resources.size === 1)
val name = resources.head.getMetadata.getName
assert(name.length <= KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH)
// The pod's volume must reference the exact same name as the created ConfigMap;
// otherwise the executor would mount a non-existent ConfigMap.
val volume = configuredPod.pod.getSpec.getVolumes.get(0)
assert(volume.getConfigMap.getName === name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config
import org.apache.spark.deploy.k8s.Config.KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH
import org.apache.spark.util.Utils

class KubernetesClientUtilsSuite extends SparkFunSuite with BeforeAndAfter {
Expand Down Expand Up @@ -105,6 +106,27 @@ class KubernetesClientUtilsSuite extends SparkFunSuite with BeforeAndAfter {
assert(outputConfigMap === expectedConfigMap)
}

test("configMapName returns prefix+suffix when within length limit") {
val name = KubernetesClientUtils.configMapName("my-app-prefix", "-hadoop-config")
assert(name === "my-app-prefix-hadoop-config")
}

test("configMapName falls back to spark-<id><suffix> when prefix+suffix is too long") {
val suffix = "-hadoop-config"
val longPrefix = "x" * (KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH - suffix.length + 1)
val name = KubernetesClientUtils.configMapName(longPrefix, suffix)
assert(name.length <= KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH)
assert(name.startsWith("spark-"))
assert(name.endsWith(suffix))
// The fallback drops the original (too-long) prefix entirely.
assert(!name.contains(longPrefix))
}

test("configMapName(prefix) preserves the legacy -conf-map suffix") {
val name = KubernetesClientUtils.configMapName("spark-drv-1234")
assert(name === "spark-drv-1234-conf-map")
}

test("SPARK-53832: verify that configmap built as expected va Java-friendly APIs") {
val configMapName = s"configmap-name-${UUID.randomUUID.toString}"
val configMapNameSpace = s"configmap-namespace-${UUID.randomUUID.toString}"
Expand Down