diff --git a/src/BUILD b/src/BUILD index 41f7ac8116..151d1efe2f 100644 --- a/src/BUILD +++ b/src/BUILD @@ -348,6 +348,7 @@ ovms_cc_library( "libovms_cliparser", "libovms_systeminfo", "ovms_exit_codes", + "//src/utils:env_guard", ], visibility = ["//visibility:public",], additional_copts = COPTS_DROGON, @@ -2076,6 +2077,7 @@ ovms_cc_library( "libovmsshape", "libovmsprofiler", "libovms_tensorinfo", + "libovms_systeminfo", ], visibility = ["//visibility:public"], ) @@ -3109,7 +3111,8 @@ ovms_cc_library( "@mediapipe//mediapipe/framework:calculator_framework", "//third_party:openvino", "@com_github_tencent_rapidjson//:rapidjson", - "//third_party:genai",], + "//third_party:genai", + "//src:libovms_ov_utils",], visibility = ["//visibility:public"], alwayslink = 1, ) diff --git a/src/config.cpp b/src/config.cpp index 3222e775b1..a00efb2246 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -14,7 +14,8 @@ // limitations under the License. //***************************************************************************** #include "config.hpp" - +#include +#include #include #include #include @@ -36,6 +37,7 @@ #include "modelconfig.hpp" #include "stringutils.hpp" #include "systeminfo.hpp" +#include "utils/env_guard.hpp" namespace ovms { @@ -43,12 +45,29 @@ const uint32_t AVAILABLE_CORES = getCoreCount(); const uint32_t WIN_MAX_GRPC_WORKERS = 1; const uint32_t MAX_PORT_NUMBER = std::numeric_limits::max(); -// For drogon, we need to minimize the number of default workers since this value is set for both: unary and streaming (making it always double) -const uint64_t DEFAULT_REST_WORKERS = AVAILABLE_CORES; const uint32_t DEFAULT_GRPC_MAX_THREADS = AVAILABLE_CORES * 8.0; const size_t DEFAULT_GRPC_MEMORY_QUOTA = (size_t)2 * 1024 * 1024 * 1024; // 2GB const uint64_t MAX_REST_WORKERS = 10'000; +// We need to minimize the number of default drogon workers since this value is set for both: unary and streaming (making it always double) +// on linux, restrict also based on the max allowed number of open files +#ifdef __linux__ + +const uint64_t RESERVED_OPEN_FILES = 15; // we need to reserve some file descriptors for other operations, so we don't want to use all of them for drogon workers +const uint64_t OPEN_FILES_PER_REST_WORKER = 7; // 5x rest_workers to initialize ovms and 2x rest_workers for new connections +uint64_t getDefaultRestWorkers() { + const uint64_t maxOpenFiles = getMaxOpenFilesLimit(); + if (maxOpenFiles <= RESERVED_OPEN_FILES) { + return static_cast(0); + } + return std::min(static_cast(AVAILABLE_CORES), (maxOpenFiles - RESERVED_OPEN_FILES) / OPEN_FILES_PER_REST_WORKER); +} +#else +uint64_t getDefaultRestWorkers() { + return AVAILABLE_CORES; +} +#endif + Config& Config::parse(int argc, char** argv) { ovms::CLIParser parser; ovms::ServerSettingsImpl serverSettings; @@ -73,6 +92,14 @@ Config& Config::parse(int argc, char** argv) { bool Config::parse(ServerSettingsImpl* serverSettings, ModelsSettingsImpl* modelsSettings) { this->serverSettings = *serverSettings; this->modelsSettings = *modelsSettings; + + static EnvGuard envGuard; +#if defined(__linux__) || defined(_WIN32) + if (this->serverSettings.logLevel == "DEBUG") { + envGuard.set("OPENVINO_LOG_LEVEL", "4"); + } +#endif + return validate(); } @@ -297,7 +324,8 @@ bool Config::validate() { } // check rest_workers value - if (((restWorkers() > MAX_REST_WORKERS) || (restWorkers() < 2))) { + const uint32_t restWorkersValue = restWorkers(); // Cache to avoid multiple calls + if (((restWorkersValue > MAX_REST_WORKERS) || (restWorkersValue < 2))) { std::cerr << "rest_workers count should be from 2 to " << MAX_REST_WORKERS << std::endl; return false; } @@ -306,6 +334,12 @@ bool Config::validate() { std::cerr << "rest_workers is set but rest_port is not set. rest_port is required to start rest servers" << std::endl; return false; } +#ifdef __linux__ + if (restWorkersValue > (getMaxOpenFilesLimit() - RESERVED_OPEN_FILES) / 6) { + std::cerr << "rest_workers count cannot be larger than " << (getMaxOpenFilesLimit() - RESERVED_OPEN_FILES) / 6 << " due to open files limit. Current open files limit: " << getMaxOpenFilesLimit() << std::endl; + return false; + } +#endif #ifdef _WIN32 if (grpcWorkers() > WIN_MAX_GRPC_WORKERS) { @@ -368,7 +402,7 @@ const std::string Config::restBindAddress() const { return this->serverSettings. uint32_t Config::grpcWorkers() const { return this->serverSettings.grpcWorkers; } uint32_t Config::grpcMaxThreads() const { return this->serverSettings.grpcMaxThreads.value_or(DEFAULT_GRPC_MAX_THREADS); } size_t Config::grpcMemoryQuota() const { return this->serverSettings.grpcMemoryQuota.value_or(DEFAULT_GRPC_MEMORY_QUOTA); } -uint32_t Config::restWorkers() const { return this->serverSettings.restWorkers.value_or(DEFAULT_REST_WORKERS); } +uint32_t Config::restWorkers() const { return static_cast(std::max(static_cast(2), static_cast(this->serverSettings.restWorkers.value_or(getDefaultRestWorkers())))); } const std::string& Config::modelName() const { return this->modelsSettings.modelName; } const std::string& Config::modelPath() const { return this->modelsSettings.modelPath; } const std::string& Config::batchSize() const { diff --git a/src/llm/BUILD b/src/llm/BUILD index 8fe6059d71..0939d3376c 100644 --- a/src/llm/BUILD +++ b/src/llm/BUILD @@ -283,6 +283,9 @@ ovms_cc_library( "//src:httppayload", "//src:libhttpclientconnection", "//src:sse_utils", + "//src:libovms_systeminfo", + "//src:libovms_config", + "//src:libovms_ov_utils", "//third_party:genai",] + select({ "//:disable_python": [], "//:not_disable_python" : [":py_jinja_template_processor"], diff --git a/src/llm/language_model/continuous_batching/servable_initializer.cpp b/src/llm/language_model/continuous_batching/servable_initializer.cpp index 27f4f51aee..d5cc5f4cff 100644 --- a/src/llm/language_model/continuous_batching/servable_initializer.cpp +++ b/src/llm/language_model/continuous_batching/servable_initializer.cpp @@ -13,6 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. //***************************************************************************** +#include #include #include #include @@ -32,10 +33,13 @@ #pragma GCC diagnostic pop #pragma warning(pop) +#include "../../../config.hpp" #include "../../../json_parser.hpp" #include "../../../logging.hpp" #include "../../../mediapipe_internal/mediapipe_utils.hpp" +#include "../../../ov_utils.hpp" #include "../../../status.hpp" +#include "../../../systeminfo.hpp" #include "llm_executor.hpp" #include "servable.hpp" #include "servable_initializer.hpp" @@ -204,7 +208,26 @@ Status ContinuousBatchingServableInitializer::initialize(std::shared_ptrtokenizerPluginConfig = {{"PERFORMANCE_HINT", "THROUGHPUT"}}; + if (properties->device == "CPU") { + status = applyDefaultCpuProperties(properties->pluginConfig); + if (!status.ok()) { + SPDLOG_ERROR("Failed to apply default CPU properties for LLM model: {}", status.string()); + return status; + } + } + + ov::AnyMap tokenProperties; + const uint32_t tokenizerNumStreams = std::min(static_cast(Config::instance().restWorkers()), static_cast(getCoreCount())); + tokenProperties[ov::num_streams.name()] = static_cast(tokenizerNumStreams); + tokenProperties[ov::hint::performance_mode.name()] = ov::hint::PerformanceMode::THROUGHPUT; + SPDLOG_DEBUG("Setting tokenizer/detokenizer NUM_STREAMS to: {}", tokenizerNumStreams); + status = applyDefaultCpuProperties(tokenProperties); + if (!status.ok()) { + SPDLOG_ERROR("Failed to apply default CPU properties for tokenizer: {}", status.string()); + return status; + } + properties->tokenizerPluginConfig = tokenProperties; + try { properties->pipeline = std::make_shared(parsedModelsPath, properties->schedulerConfig, properties->device, diff --git a/src/modelmanager.cpp b/src/modelmanager.cpp index 67a0a38399..1b65eb4e59 100644 --- a/src/modelmanager.cpp +++ b/src/modelmanager.cpp @@ -69,6 +69,7 @@ #include "schema.hpp" #include "servable_definition.hpp" #include "stringutils.hpp" +#include "systeminfo.hpp" namespace ovms { @@ -79,7 +80,6 @@ const std::string DEFAULT_MODEL_CACHE_DIRECTORY = "c:\\Intel\\openvino_cache"; const std::string DEFAULT_MODEL_CACHE_DIRECTORY = "/opt/cache"; #endif ModelManager::ModelManager(const std::string& modelCacheDirectory, MetricRegistry* registry, PythonBackend* pythonBackend) : - ieCore(std::make_unique()), pipelineFactory(std::make_unique()), #if (MEDIAPIPE_DISABLE == 0) mediapipeFactory(std::make_unique(pythonBackend)), @@ -89,6 +89,20 @@ ModelManager::ModelManager(const std::string& modelCacheDirectory, MetricRegistr modelCacheDirectory(modelCacheDirectory), metricRegistry(registry), pythonBackend(pythonBackend) { + try { + this->ieCore = std::make_unique(); + ov::AnyMap cpuProperties; + Status status = applyDefaultCpuProperties(cpuProperties); + if (!status.ok()) { + SPDLOG_CRITICAL("Failed to apply default CPU properties. Reason: {}", status.string()); + throw std::runtime_error("Failed to apply default CPU properties"); + } + this->ieCore->set_property("CPU", cpuProperties); + } catch (const std::exception& ex) { + SPDLOG_CRITICAL("Failed to initialize OpenVINO Core with CPU properties. Reason: {}", ex.what()); + throw; + } + OV_LOGGER("ov::Core(): {}", reinterpret_cast(this->ieCore.get())); // Take --cache_dir from CLI if (this->modelCacheDirectory.empty()) { @@ -151,6 +165,12 @@ ModelManager::ModelManager(const std::string& modelCacheDirectory, MetricRegistr throw; } this->logPluginConfiguration(); +#ifdef __linux__ + if (isRunningInDocker()) { + SPDLOG_INFO("Running inside Docker container"); + SPDLOG_INFO("cpu quota: {}, cpu affinity: {}, max_open_files: {}", getDockerCpuQuota(), getCpuAffinityCount(), getMaxOpenFilesLimit()); + } +#endif } void ModelManager::logPluginConfiguration() { diff --git a/src/ov_utils.cpp b/src/ov_utils.cpp index 4b976d4916..96d156b041 100644 --- a/src/ov_utils.cpp +++ b/src/ov_utils.cpp @@ -26,6 +26,7 @@ #include "logging.hpp" #include "profiler.hpp" #include "status.hpp" +#include "systeminfo.hpp" #include "tensorinfo.hpp" namespace ovms { @@ -148,4 +149,50 @@ Status validatePluginConfiguration(const plugin_config_t& pluginConfig, const st return StatusCode::OK; } + +Status applyDefaultCpuProperties(ov::AnyMap& properties) { + try { + const uint16_t coreCount = getCoreCount(); + + if (properties.find(ov::inference_num_threads.name()) == properties.end()) { + properties[ov::inference_num_threads.name()] = static_cast(coreCount); + SPDLOG_DEBUG("applyDefaultCpuProperties: setting inference_num_threads to {}", coreCount); + } + +#ifdef __linux__ + if (properties.find(ov::hint::enable_cpu_pinning.name()) == properties.end()) { + if (isRunningInDocker()) { + const bool cpuPinning = getDockerCpuQuota() <= 0; + properties[ov::hint::enable_cpu_pinning.name()] = cpuPinning; + SPDLOG_DEBUG("applyDefaultCpuProperties: setting enable_cpu_pinning to {}", cpuPinning); + } + } +#endif + + const auto perfIt = properties.find(ov::hint::performance_mode.name()); + if (perfIt != properties.end()) { + bool isThroughput = false; + try { + isThroughput = (perfIt->second.as() == ov::hint::PerformanceMode::THROUGHPUT); + } catch (...) { + try { + isThroughput = (perfIt->second.as() == "THROUGHPUT"); + } catch (...) { + } + } + if (isThroughput && properties.find(ov::num_streams.name()) == properties.end()) { + properties[ov::num_streams.name()] = static_cast(coreCount); + SPDLOG_DEBUG("applyDefaultCpuProperties: setting num_streams to {} (THROUGHPUT hint active)", coreCount); + } + } + } catch (const std::exception& ex) { + SPDLOG_ERROR("Exception while applying default CPU properties: {}", ex.what()); + return StatusCode::INTERNAL_ERROR; + } catch (...) { + SPDLOG_ERROR("Unknown exception while applying default CPU properties"); + return StatusCode::INTERNAL_ERROR; + } + return StatusCode::OK; +} + } // namespace ovms diff --git a/src/ov_utils.hpp b/src/ov_utils.hpp index 011f342a0d..c1ca92ecb4 100644 --- a/src/ov_utils.hpp +++ b/src/ov_utils.hpp @@ -52,6 +52,13 @@ std::optional getLayoutFromRTMap(const ov::RTMap& rtMap); Status validatePluginConfiguration(const plugin_config_t& pluginConfig, const std::string& targetDevice, const ov::Core& ieCore); +// Applies resource-aware CPU defaults to an OpenVINO property map. +// Sets inference_num_threads and (on Linux) enable_cpu_pinning only when not +// already present in the map. When PERFORMANCE_HINT=THROUGHPUT is set, +// num_streams is also capped to the detected core count if not already set. +// Returns StatusCode::INTERNAL_ERROR on any OpenVINO exception. +Status applyDefaultCpuProperties(ov::AnyMap& properties); + // Logging // #1 model/global plugin CompiledMode:DUMMY / Global OpenVINO plugin:CPU // #2 version/_ @@ -96,4 +103,5 @@ static void logOVPluginConfig(PropertyExtractor&& propertyExtractor, const std:: std::string pluginConfigNameValuesString = joins(pluginConfigNameValues, ", "); SPDLOG_LOGGER_DEBUG(modelmanager_logger, "{}; {}plugin configuration: {{ {} }}", loggingAuthor, loggingDetails, pluginConfigNameValuesString); } + } // namespace ovms diff --git a/src/sidepacket_servable.cpp b/src/sidepacket_servable.cpp index c8978f2f2c..18778f77d1 100644 --- a/src/sidepacket_servable.cpp +++ b/src/sidepacket_servable.cpp @@ -20,6 +20,7 @@ #include "openvino/runtime/core.hpp" #include "sidepacket_servable.hpp" #include "logging.hpp" +#include "ov_utils.hpp" #include #include #include @@ -134,6 +135,13 @@ void SidepacketServable::initialize(const std::string& modelDir, const std::stri ov::Core core; std::shared_ptr m_model = core.read_model(parsedModelsPath / std::filesystem::path("openvino_model.xml"), {}, properties); m_model = this->applyPrePostProcessing(core, m_model, properties); + if (targetDevice == "CPU") { + auto cpuPropertiesStatus = applyDefaultCpuProperties(properties); + if (!cpuPropertiesStatus.ok()) { + SPDLOG_ERROR("Failed to apply default CPU properties for embeddings model: {}", cpuPropertiesStatus.string()); + return; + } + } compiledModel = core.compile_model(m_model, targetDevice, properties); SPDLOG_DEBUG("Model compiled {} for {}", parsedModelsPath.string(), targetDevice); diff --git a/src/systeminfo.cpp b/src/systeminfo.cpp index 16531df9b6..b1501c428f 100644 --- a/src/systeminfo.cpp +++ b/src/systeminfo.cpp @@ -15,16 +15,143 @@ //***************************************************************************** #include "systeminfo.hpp" +#include #include +#include #include #include #include +#ifdef __linux__ +#include +#include +#endif + #include "logging.hpp" #include "status.hpp" namespace ovms { uint16_t getCoreCount() { - return std::thread::hardware_concurrency(); + uint16_t detectedCoreCount = static_cast(std::thread::hardware_concurrency()); +#ifdef __linux__ + if (isRunningInDocker()) { + const uint16_t affinityCount = getCpuAffinityCount(); + const uint16_t quotaCount = getDockerCpuQuota(); + if (quotaCount > 0) { + detectedCoreCount = std::min(affinityCount, quotaCount); + } else { + detectedCoreCount = affinityCount; + } + } +#endif + return detectedCoreCount; +} + +uint64_t getMaxOpenFilesLimit() { +#ifdef __linux__ + struct rlimit limit; + if (getrlimit(RLIMIT_NOFILE, &limit) == 0) { + return limit.rlim_cur; + } +#endif + return std::numeric_limits::max(); +} + +#ifdef __linux__ + +bool isRunningInDocker() { + // Check for /.dockerenv file + std::ifstream dockerenv("/.dockerenv"); + if (dockerenv.good()) { + return true; + } + // Check for /run/.containerenv file + std::ifstream containerenv("/run/.containerenv"); + if (containerenv.good()) { + return true; + } + + // Check /proc/self/cgroup for docker references + std::ifstream cgroup("/proc/self/cgroup"); + if (cgroup.is_open()) { + std::string line; + while (std::getline(cgroup, line)) { + if (line.find("docker") != std::string::npos) { + return true; + } + if (line.find("kubepods") != std::string::npos) { + return true; + } + } + } + + return false; } + +uint16_t getCpuAffinityCount() { + cpu_set_t mask; + CPU_ZERO(&mask); + + if (sched_getaffinity(0, sizeof(mask), &mask) == -1) { + return std::thread::hardware_concurrency(); + } + + int cpu_count = CPU_COUNT(&mask); + return static_cast(cpu_count); +} + +uint16_t getDockerCpuQuota() { + // Try cgroup v2 cpu.max (format: "quota period") + std::ifstream cpu_max_v2("/sys/fs/cgroup/cpu.max"); + if (cpu_max_v2.is_open()) { + std::string line; + if (std::getline(cpu_max_v2, line)) { + std::istringstream iss(line); + std::string quota_str, period_str; + if (iss >> quota_str >> period_str) { + if (quota_str == "max") { + return 0; // No quota set + } + try { + uint64_t quota = std::stoull(quota_str); + uint64_t period = std::stoull(period_str); + if (quota > 0 && period > 0) { + uint16_t cpu_count = static_cast((quota + period - 1) / period); + return cpu_count; + } + } catch (const std::exception&) { + // Parsing failed, continue + } + } + } + } + + // Try cgroup v1 cpu.cfs_quota_us and cpu.cfs_period_us + std::ifstream quota_file("/sys/fs/cgroup/cpu/cpu.cfs_quota_us"); + std::ifstream period_file("/sys/fs/cgroup/cpu/cpu.cfs_period_us"); + + if (quota_file.is_open() && period_file.is_open()) { + std::string quota_str, period_str; + if (std::getline(quota_file, quota_str) && std::getline(period_file, period_str)) { + // Trim whitespace + quota_str.erase(quota_str.find_last_not_of(" \n\r\t") + 1); + period_str.erase(period_str.find_last_not_of(" \n\r\t") + 1); + try { + uint64_t quota = std::stoull(quota_str); + uint64_t period = std::stoull(period_str); + if (quota > 0 && period > 0) { + uint16_t cpu_count = static_cast((quota + period - 1) / period); + return cpu_count; + } + } catch (const std::exception&) { + // Parsing failed, continue + } + } + } + + return 0; // No quota set +} + +#endif // __linux__ + } // namespace ovms diff --git a/src/systeminfo.hpp b/src/systeminfo.hpp index 2dc66fffe9..b1e0ae201b 100644 --- a/src/systeminfo.hpp +++ b/src/systeminfo.hpp @@ -22,4 +22,18 @@ namespace ovms { * @return uint16_t Available number of cores in the system */ uint16_t getCoreCount(); +uint64_t getMaxOpenFilesLimit(); +#ifdef __linux__ +bool isRunningInDocker(); +/** + * @brief Get number of CPUs available via CPU affinity mask + * @return uint16_t Number of CPUs in the affinity mask, or total hardware concurrency if affinity is not set + */ +uint16_t getCpuAffinityCount(); +/** + * @brief Get CPU limit from cgroup (docker run --cpus constraint) + * @return uint16_t Number of CPUs allowed by quota, or 0 if no quota is set + */ +uint16_t getDockerCpuQuota(); +#endif } // namespace ovms diff --git a/src/test/ovmsconfig_test.cpp b/src/test/ovmsconfig_test.cpp index 6b76d0e2fa..d308b3db76 100644 --- a/src/test/ovmsconfig_test.cpp +++ b/src/test/ovmsconfig_test.cpp @@ -31,6 +31,24 @@ #include "../systeminfo.hpp" #include "test_utils.hpp" +#ifdef __linux__ +#include + +namespace { +class ScopedNoFileRlimitRestore { +public: + explicit ScopedNoFileRlimitRestore(const struct rlimit& originalLimit) : + originalLimit(originalLimit) {} + ~ScopedNoFileRlimitRestore() { + setrlimit(RLIMIT_NOFILE, &originalLimit); + } + +private: + struct rlimit originalLimit; +}; +} // namespace +#endif + using testing::_; using testing::ContainerEq; using testing::Return; @@ -202,6 +220,37 @@ TEST_F(OvmsConfigDeathTest, restWorkersTooLarge) { EXPECT_EXIT(ovms::Config::instance().parse(arg_count, n_argv), ::testing::ExitedWithCode(OVMS_EX_USAGE), "rest_workers count should be from 2 to "); } +#ifdef __linux__ +TEST_F(OvmsConfigDeathTest, restWorkersDefaultReducedForOpenFilesLimit) { + // limit allowed number of open files to value that enforce default rest_workers to be determined based on open files limit instead of number of cpu cores alone. This is to test that default rest_workers count is reduced when open files limit is low. + int cpu_cores = ovms::getCoreCount(); + struct rlimit limit; + ASSERT_EQ(getrlimit(RLIMIT_NOFILE, &limit), 0); + ScopedNoFileRlimitRestore restoreOriginalLimit(limit); + struct rlimit newLimit = {std::min(static_cast(cpu_cores * 5), limit.rlim_max), limit.rlim_max}; + std::cout << "Setting open files limit to " << newLimit.rlim_cur << " to test that default rest_workers count is reduced based on open files limit" << std::endl; + ASSERT_EQ(setrlimit(RLIMIT_NOFILE, &newLimit), 0); + + char* n_argv[] = {"ovms", "--config_path", "/path1", "--rest_port", "8080", "--port", "8081"}; + int arg_count = 7; + ovms::Config::instance().parse(arg_count, n_argv); + EXPECT_TRUE(ovms::Config::instance().validate()); +} + +TEST_F(OvmsConfigDeathTest, restWorkersTooLargeForOpenFilesLimit) { + // limit allowed number of open files to 1024 to make sure that rest_workers count is too large. + struct rlimit limit; + ASSERT_EQ(getrlimit(RLIMIT_NOFILE, &limit), 0); + ScopedNoFileRlimitRestore restoreOriginalLimit(limit); + struct rlimit newLimit = {std::min(static_cast(1024), limit.rlim_max), limit.rlim_max}; + std::cout << "Setting open files limit to " << newLimit.rlim_cur << " to test that rest_workers count is too large for the limit based on number of cpu cores alone" << std::endl; + ASSERT_EQ(setrlimit(RLIMIT_NOFILE, &newLimit), 0); + char* n_argv[] = {"ovms", "--config_path", "/path1", "--rest_port", "8080", "--port", "8081", "--rest_workers", "1000"}; + int arg_count = 9; + EXPECT_EXIT(ovms::Config::instance().parse(arg_count, n_argv), ::testing::ExitedWithCode(OVMS_EX_USAGE), "rest_workers count cannot be larger than .* due to open files limit. Current open files limit: .*1024"); +} +#endif + TEST_F(OvmsConfigDeathTest, restWorkersDefinedRestPortUndefined) { char* n_argv[] = {"ovms", "--config_path", "/path1", "--port", "8080", "--rest_workers", "60"}; int arg_count = 7;