diff --git a/demos/common/export_models/export_model.py b/demos/common/export_models/export_model.py index 5aa81b0c81..1cd3917739 100644 --- a/demos/common/export_models/export_model.py +++ b/demos/common/export_models/export_model.py @@ -32,6 +32,8 @@ def add_common_arguments(parser): parser.add_argument('--target_device', default="CPU", help='CPU, GPU, NPU or HETERO, default is CPU', dest='target_device') parser.add_argument('--ov_cache_dir', default=None, help='Folder path for compilation cache to speedup initialization time', dest='ov_cache_dir') parser.add_argument('--extra_quantization_params', required=False, help='Add advanced quantization parameters. Check optimum-intel documentation. Example: "--sym --group-size -1 --ratio 1.0 --awq --scale-estimation --dataset wikitext2"', dest='extra_quantization_params') + parser.add_argument('--graph_initial_queue_size', required=False, default=None, help='Initial graph pool size. Use AUTO for hardware_concurrency or a positive integer. If not set, defaults per task are used.', dest='graph_initial_queue_size') + parser.add_argument('--graph_queue_max_size', required=False, default=None, help='Maximum graph pool size (expansion ceiling). Use AUTO for hardware_concurrency or a positive integer. If not set, defaults per task are used.', dest='graph_queue_max_size') parser = argparse.ArgumentParser(description='Export Hugging face models to OVMS models repository including all configuration for deployments') @@ -148,6 +150,8 @@ def add_common_arguments(parser): """ embedding_graph_ov_template = """ +# OVMS_GRAPH_INITIAL_QUEUE_SIZE: {{graph_initial_queue_size|default("1", true)}} +# OVMS_GRAPH_QUEUE_MAX_SIZE: {{graph_queue_max_size|default("AUTO", true)}} input_stream: "REQUEST_PAYLOAD:input" output_stream: "RESPONSE_PAYLOAD:output" node { @@ -172,6 +176,8 @@ def add_common_arguments(parser): """ rerank_graph_ov_template = """ +# OVMS_GRAPH_INITIAL_QUEUE_SIZE: {{graph_initial_queue_size|default("1", true)}} +# OVMS_GRAPH_QUEUE_MAX_SIZE: {{graph_queue_max_size|default("AUTO", true)}} input_stream: "REQUEST_PAYLOAD:input" output_stream: "RESPONSE_PAYLOAD:output" node { @@ -190,7 +196,9 @@ def add_common_arguments(parser): } """ -rerank_graph_template = """input_stream: "REQUEST_PAYLOAD:input" +rerank_graph_template = """# OVMS_GRAPH_INITIAL_QUEUE_SIZE: {{graph_initial_queue_size|default("1", true)}} +# OVMS_GRAPH_QUEUE_MAX_SIZE: {{graph_queue_max_size|default("AUTO", true)}} +input_stream: "REQUEST_PAYLOAD:input" output_stream: "RESPONSE_PAYLOAD:output" node { calculator: "OpenVINOModelServerSessionCalculator" @@ -219,7 +227,9 @@ def add_common_arguments(parser): } """ -text_generation_graph_template = """input_stream: "HTTP_REQUEST_PAYLOAD:input" +text_generation_graph_template = """# OVMS_GRAPH_INITIAL_QUEUE_SIZE: {{graph_initial_queue_size|default("1", true)}} +# OVMS_GRAPH_QUEUE_MAX_SIZE: {{graph_queue_max_size|default("AUTO", true)}} +input_stream: "HTTP_REQUEST_PAYLOAD:input" output_stream: "HTTP_RESPONSE_PAYLOAD:output" node: { @@ -292,7 +302,9 @@ def add_common_arguments(parser): ] }""" -image_generation_graph_template = """input_stream: "HTTP_REQUEST_PAYLOAD:input" +image_generation_graph_template = """# OVMS_GRAPH_INITIAL_QUEUE_SIZE: {{graph_initial_queue_size|default("1", true)}} +# OVMS_GRAPH_QUEUE_MAX_SIZE: {{graph_queue_max_size|default("1", true)}} +input_stream: "HTTP_REQUEST_PAYLOAD:input" output_stream: "HTTP_RESPONSE_PAYLOAD:output" node: { diff --git a/src/BUILD b/src/BUILD index f32bd88e12..7eabdce150 100644 --- a/src/BUILD +++ b/src/BUILD @@ -2221,6 +2221,7 @@ cc_test( + select({ "//:not_disable_mediapipe": [ "test/embeddingsnode_test.cpp", + "test/graphqueue_test.cpp", "test/listmodelsendpoint_test.cpp", "test/mediapipeflow_test.cpp", "test/mediapipe/inputsidepacketusertestcalc.cc", diff --git a/src/capi_frontend/server_settings.hpp b/src/capi_frontend/server_settings.hpp index 77645dda7a..4d033e9756 100644 --- a/src/capi_frontend/server_settings.hpp +++ b/src/capi_frontend/server_settings.hpp @@ -162,6 +162,8 @@ struct ExportSettings { std::optional extraQuantizationParams; std::optional vocoder; std::string precision = "int8"; + std::optional graphInitialQueueSize; + std::optional graphQueueMaxSize; PluginConfigSettingsImpl pluginConfig; }; diff --git a/src/graph_export/BUILD b/src/graph_export/BUILD index 0630805c53..688ba933ea 100644 --- a/src/graph_export/BUILD +++ b/src/graph_export/BUILD @@ -49,7 +49,10 @@ ovms_cc_library( ovms_cc_library( name = "graph_cli_parser", srcs = ["graph_cli_parser.cpp"], - hdrs = ["graph_cli_parser.hpp"], + hdrs = [ + "graph_cli_parser.hpp", + "graph_queue_cli_options.hpp", + ], deps = [ "@ovms//src:cpp_headers", "@ovms//src:libovms_server_settings", diff --git a/src/graph_export/embeddings_graph_cli_parser.cpp b/src/graph_export/embeddings_graph_cli_parser.cpp index 192dd6c748..1556e2d3f1 100644 --- a/src/graph_export/embeddings_graph_cli_parser.cpp +++ b/src/graph_export/embeddings_graph_cli_parser.cpp @@ -26,6 +26,7 @@ #include "../capi_frontend/server_settings.hpp" #include "../ovms_exit_codes.hpp" #include "../status.hpp" +#include "graph_queue_cli_options.hpp" namespace ovms { @@ -56,6 +57,7 @@ void EmbeddingsGraphCLIParser::createOptions() { "Pooling option. One of: CLS, LAST, MEAN.", cxxopts::value()->default_value("CLS"), "POOLING"); + addGraphQueueOptions(*options, "embeddings"); } void EmbeddingsGraphCLIParser::printHelp() { @@ -97,6 +99,7 @@ void EmbeddingsGraphCLIParser::prepare(OvmsServerMode serverMode, HFSettingsImpl embeddingsGraphSettings.normalize = result->operator[]("normalize").as(); embeddingsGraphSettings.truncate = result->operator[]("truncate").as(); embeddingsGraphSettings.pooling = result->operator[]("pooling").as(); + extractGraphQueueOptions(*result, hfSettings); } if (!(embeddingsGraphSettings.pooling == "CLS" || embeddingsGraphSettings.pooling == "LAST" || embeddingsGraphSettings.pooling == "MEAN")){ throw std::invalid_argument("Only CLS and LAST pooling modes are supported"); diff --git a/src/graph_export/graph_cli_parser.cpp b/src/graph_export/graph_cli_parser.cpp index 649234dc79..4867402376 100644 --- a/src/graph_export/graph_cli_parser.cpp +++ b/src/graph_export/graph_cli_parser.cpp @@ -26,6 +26,7 @@ #include "../capi_frontend/server_settings.hpp" #include "../ovms_exit_codes.hpp" #include "../status.hpp" +#include "graph_queue_cli_options.hpp" namespace ovms { @@ -80,7 +81,7 @@ void GraphCLIParser::createOptions() { "Enables enforcing tool schema during generation. Requires setting tool parser. Default: false.", cxxopts::value()->default_value("false"), "ENABLE_TOOL_GUIDED_GENERATION"); - + addGraphQueueOptions(*options, "plugin config"); options->add_options("plugin config") ("max_prompt_len", "Sets NPU specific property for maximum number of tokens in the prompt.", @@ -168,6 +169,7 @@ void GraphCLIParser::prepare(OvmsServerMode serverMode, HFSettingsImpl& hfSettin if (result->count("kv_cache_precision")) { hfSettings.exportSettings.pluginConfig.kvCachePrecision = result->operator[]("kv_cache_precision").as(); } + extractGraphQueueOptions(*result, hfSettings); } hfSettings.graphSettings = std::move(graphSettings); diff --git a/src/graph_export/graph_export.cpp b/src/graph_export/graph_export.cpp index a61d1ea3b8..f214013923 100644 --- a/src/graph_export/graph_export.cpp +++ b/src/graph_export/graph_export.cpp @@ -53,20 +53,51 @@ namespace ovms { static const std::string OVMS_VERSION_GRAPH_LINE = std::string("# File created with: ") + PROJECT_NAME + std::string(" ") + PROJECT_VERSION + std::string("\n"); -static const std::string OVMS_GRAPH_QUEUE_SIZE_LINE_PREFIX = "# OVMS_GRAPH_QUEUE_SIZE: "; +static const std::string OVMS_GRAPH_INITIAL_QUEUE_SIZE_LINE_PREFIX = "# OVMS_GRAPH_INITIAL_QUEUE_SIZE: "; +static const std::string OVMS_GRAPH_QUEUE_MAX_SIZE_LINE_PREFIX = "# OVMS_GRAPH_QUEUE_MAX_SIZE: "; static const std::string OVMS_GRAPH_QUEUE_SIZE_AUTO = "AUTO"; -static std::string getDefaultGraphQueueSizeDirective(const HFSettingsImpl& hfSettings) { +static std::optional getDefaultInitialGraphQueueSizeDirective(const HFSettingsImpl& hfSettings) { + if (hfSettings.exportSettings.graphInitialQueueSize.has_value()) { + return hfSettings.exportSettings.graphInitialQueueSize.value(); + } + if (hfSettings.task == IMAGE_GENERATION_GRAPH) { + return "1"; + } + if (hfSettings.task == TEXT_GENERATION_GRAPH || + hfSettings.task == EMBEDDINGS_GRAPH || + hfSettings.task == RERANK_GRAPH) { + return "1"; + } + return std::nullopt; +} + +static std::optional getDefaultGraphQueueMaxSizeDirective(const HFSettingsImpl& hfSettings) { + if (hfSettings.exportSettings.graphQueueMaxSize.has_value()) { + return hfSettings.exportSettings.graphQueueMaxSize.value(); + } if (hfSettings.task == IMAGE_GENERATION_GRAPH) { return "1"; } - return OVMS_GRAPH_QUEUE_SIZE_AUTO; + if (hfSettings.task == TEXT_GENERATION_GRAPH || + hfSettings.task == EMBEDDINGS_GRAPH || + hfSettings.task == RERANK_GRAPH) { + return OVMS_GRAPH_QUEUE_SIZE_AUTO; + } + return std::nullopt; } static std::string buildGraphHeader(const HFSettingsImpl& hfSettings) { std::ostringstream oss; oss << OVMS_VERSION_GRAPH_LINE; - oss << OVMS_GRAPH_QUEUE_SIZE_LINE_PREFIX << getDefaultGraphQueueSizeDirective(hfSettings) << "\n"; + auto queueDirective = getDefaultInitialGraphQueueSizeDirective(hfSettings); + if (queueDirective.has_value()) { + oss << OVMS_GRAPH_INITIAL_QUEUE_SIZE_LINE_PREFIX << queueDirective.value() << "\n"; + } + auto maxDirective = getDefaultGraphQueueMaxSizeDirective(hfSettings); + if (maxDirective.has_value()) { + oss << OVMS_GRAPH_QUEUE_MAX_SIZE_LINE_PREFIX << maxDirective.value() << "\n"; + } return oss.str(); } diff --git a/src/graph_export/graph_queue_cli_options.hpp b/src/graph_export/graph_queue_cli_options.hpp new file mode 100644 index 0000000000..b5d41ca5b2 --- /dev/null +++ b/src/graph_export/graph_queue_cli_options.hpp @@ -0,0 +1,46 @@ +//***************************************************************************** +// Copyright 2025 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//***************************************************************************** +#pragma once + +#include + +#include "../capi_frontend/server_settings.hpp" + +namespace ovms { + +// Common graph queue CLI options shared across all mediapipe graph task parsers. +// Call addGraphQueueOptions() in createOptions() and extractGraphQueueOptions() in prepare(). + +inline void addGraphQueueOptions(cxxopts::Options& options, const std::string& group = "graph pool") { + options.add_options(group)("graph_initial_queue_size", + "Initial graph pool size at startup. Positive integer or AUTO. Default: 1.", + cxxopts::value(), + "GRAPH_INITIAL_QUEUE_SIZE")("graph_queue_max_size", + "Maximum graph pool size (expansion ceiling). Positive integer or AUTO. Default: same as initial (no expansion).", + cxxopts::value(), + "GRAPH_QUEUE_MAX_SIZE"); +} + +inline void extractGraphQueueOptions(const cxxopts::ParseResult& result, HFSettingsImpl& hfSettings) { + if (result.count("graph_initial_queue_size")) { + hfSettings.exportSettings.graphInitialQueueSize = result["graph_initial_queue_size"].as(); + } + if (result.count("graph_queue_max_size")) { + hfSettings.exportSettings.graphQueueMaxSize = result["graph_queue_max_size"].as(); + } +} + +} // namespace ovms diff --git a/src/graph_export/image_generation_graph_cli_parser.cpp b/src/graph_export/image_generation_graph_cli_parser.cpp index ed0d1b91ef..fd836dfd49 100644 --- a/src/graph_export/image_generation_graph_cli_parser.cpp +++ b/src/graph_export/image_generation_graph_cli_parser.cpp @@ -27,6 +27,7 @@ #include "../capi_frontend/server_settings.hpp" #include "../ovms_exit_codes.hpp" #include "../status.hpp" +#include "graph_queue_cli_options.hpp" namespace ovms { @@ -82,6 +83,7 @@ void ImageGenerationGraphCLIParser::createOptions() { "The number of parallel execution streams to use for the image generation models. Use at least 2 on 2 socket CPU systems.", cxxopts::value(), "NUM_STREAMS"); + addGraphQueueOptions(*options, "image_generation"); } void ImageGenerationGraphCLIParser::printHelp() { @@ -162,6 +164,7 @@ void ImageGenerationGraphCLIParser::prepare(ServerSettingsImpl& serverSettings, hfSettings.exportSettings.pluginConfig.cacheDir = serverSettings.cacheDir; } } + extractGraphQueueOptions(*result, hfSettings); } hfSettings.graphSettings = std::move(imageGenerationGraphSettings); diff --git a/src/graph_export/rerank_graph_cli_parser.cpp b/src/graph_export/rerank_graph_cli_parser.cpp index 80f1561a4a..90699c4b37 100644 --- a/src/graph_export/rerank_graph_cli_parser.cpp +++ b/src/graph_export/rerank_graph_cli_parser.cpp @@ -26,6 +26,7 @@ #include "../capi_frontend/server_settings.hpp" #include "../ovms_exit_codes.hpp" #include "../status.hpp" +#include "graph_queue_cli_options.hpp" namespace ovms { @@ -48,6 +49,7 @@ void RerankGraphCLIParser::createOptions() { "Maximum allowed chunks.", cxxopts::value()->default_value("10000"), "MAX_ALLOWED_CHUNKS"); + addGraphQueueOptions(*options, "rerank"); } void RerankGraphCLIParser::printHelp() { @@ -88,6 +90,7 @@ void RerankGraphCLIParser::prepare(OvmsServerMode serverMode, HFSettingsImpl& hf } else { hfSettings.exportSettings.pluginConfig.numStreams = result->operator[]("num_streams").as(); rerankGraphSettings.maxAllowedChunks = result->operator[]("max_allowed_chunks").as(); + extractGraphQueueOptions(*result, hfSettings); } hfSettings.graphSettings = std::move(rerankGraphSettings); diff --git a/src/graph_export/s2t_graph_cli_parser.cpp b/src/graph_export/s2t_graph_cli_parser.cpp index c516581468..dd90e7a969 100644 --- a/src/graph_export/s2t_graph_cli_parser.cpp +++ b/src/graph_export/s2t_graph_cli_parser.cpp @@ -26,6 +26,7 @@ #include "../capi_frontend/server_settings.hpp" #include "../ovms_exit_codes.hpp" #include "../status.hpp" +#include "graph_queue_cli_options.hpp" namespace ovms { @@ -44,6 +45,7 @@ void SpeechToTextGraphCLIParser::createOptions() { "The number of parallel execution streams to use for the model. Use at least 2 on 2 socket CPU systems.", cxxopts::value()->default_value("1"), "NUM_STREAMS"); + addGraphQueueOptions(*options, "SpeechToText"); } void SpeechToTextGraphCLIParser::printHelp() { @@ -82,6 +84,7 @@ void SpeechToTextGraphCLIParser::prepare(OvmsServerMode serverMode, HFSettingsIm } } else { hfSettings.exportSettings.pluginConfig.numStreams = result->operator[]("num_streams").as(); + extractGraphQueueOptions(*result, hfSettings); } hfSettings.graphSettings = std::move(speechToTextGraphSettings); } diff --git a/src/graph_export/t2s_graph_cli_parser.cpp b/src/graph_export/t2s_graph_cli_parser.cpp index 69f4479a24..06fca6f97d 100644 --- a/src/graph_export/t2s_graph_cli_parser.cpp +++ b/src/graph_export/t2s_graph_cli_parser.cpp @@ -26,6 +26,7 @@ #include "../capi_frontend/server_settings.hpp" #include "../ovms_exit_codes.hpp" #include "../status.hpp" +#include "graph_queue_cli_options.hpp" namespace ovms { @@ -44,6 +45,7 @@ void TextToSpeechGraphCLIParser::createOptions() { "The number of parallel execution streams to use for the model. Use at least 2 on 2 socket CPU systems.", cxxopts::value()->default_value("1"), "NUM_STREAMS"); + addGraphQueueOptions(*options, "TextToSpeech"); } void TextToSpeechGraphCLIParser::printHelp() { @@ -82,6 +84,7 @@ void TextToSpeechGraphCLIParser::prepare(OvmsServerMode serverMode, HFSettingsIm } } else { hfSettings.exportSettings.pluginConfig.numStreams = result->operator[]("num_streams").as(); + extractGraphQueueOptions(*result, hfSettings); } hfSettings.graphSettings = std::move(textToSpeechGraphSettings); } diff --git a/src/mediapipe_internal/BUILD b/src/mediapipe_internal/BUILD index 05169e5563..64e6272b4a 100644 --- a/src/mediapipe_internal/BUILD +++ b/src/mediapipe_internal/BUILD @@ -83,7 +83,6 @@ ovms_cc_library( ":mediapipe_utils", ":outputstreamobserver", ":side_packet_builder", - "//src:libovms_queue", "//src:libovmslogging", "//src:libovms_execution_context", "//src:libovmstimer", diff --git a/src/mediapipe_internal/graphqueue.cpp b/src/mediapipe_internal/graphqueue.cpp index 628ce58a49..68e7eb0b24 100644 --- a/src/mediapipe_internal/graphqueue.cpp +++ b/src/mediapipe_internal/graphqueue.cpp @@ -16,18 +16,14 @@ #include "graphqueue.hpp" #include -#include #include #include #include #include -#include #include -#include #include #include -#include "../queue.hpp" #include "src/python/pythonnoderesources.hpp" #include "src/llm/servable.hpp" @@ -41,54 +37,65 @@ #include "outputstreamobserver.hpp" #include "side_packet_builder.hpp" namespace ovms { -GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::shared_ptr sidePacketMaps, int streamsLength) : - Queue(streamsLength), - sidePacketMaps(sidePacketMaps) { - inferRequests.reserve(streamsLength); - for (auto i = 0; i < streamsLength; ++i) { - // Build observer map locally before constructing GraphHelper (const map) - std::unordered_map> observers; - for (auto& name : config.output_stream()) { - std::string streamName = getStreamName(name); - auto holder = std::make_shared(); - holder->current = std::make_shared(); - observers[streamName] = holder; - } - auto gh = std::make_shared(std::move(observers)); - gh->graph = std::make_unique<::mediapipe::CalculatorGraph>(); - gh->currentTimestamp = ::mediapipe::Timestamp(0); +std::shared_ptr GraphQueue::createOneGraph() { + std::unordered_map> observers; + for (auto& name : config_.output_stream()) { + std::string streamName = getStreamName(name); + auto holder = std::make_shared(); + holder->current = std::make_shared(); + observers.emplace(std::move(streamName), std::move(holder)); + } - auto absStatus = gh->graph->Initialize(config); - if (!absStatus.ok()) { - SPDLOG_ERROR("Graph queue initialization failed: {}", absStatus.ToString()); - throw std::runtime_error(absStatus.ToString()); - } - for (const auto& [streamName, holder] : gh->outStreamObservers) { - // Lambda captures holder (shared_ptr) by value — safe regardless of map layout - absStatus = gh->graph->ObserveOutputStream(streamName, [holder](const ::mediapipe::Packet& packet) -> absl::Status { return holder->current->handlePacket(packet); }); - if (!absStatus.ok()) { - SPDLOG_ERROR("Graph queue ObserveOutputStream failed: {}", absStatus.ToString()); - throw std::runtime_error(absStatus.ToString()); - } - } - for (const auto& [nodeName, _] : sidePacketMaps->genAiServableMap) { - gh->genAiExecutionContextMap[nodeName] = std::make_shared(); - } - std::map inputSidePackets; - buildInputSidePackets(inputSidePackets, *sidePacketMaps); - // Override execution context with per-graph instance - inputSidePackets[LLM_EXECUTION_CONTEXT_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(gh->genAiExecutionContextMap).At(::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE)); - absStatus = gh->graph->StartRun(inputSidePackets); + auto gh = std::make_shared(std::move(observers)); + gh->graph = std::make_unique<::mediapipe::CalculatorGraph>(); + gh->currentTimestamp = ::mediapipe::Timestamp(0); + + auto absStatus = gh->graph->Initialize(config_); + if (!absStatus.ok()) { + SPDLOG_ERROR("Graph queue initialization failed: {}", absStatus.ToString()); + throw std::runtime_error(absStatus.ToString()); + } + for (const auto& [streamName, holder] : gh->outStreamObservers) { + absStatus = gh->graph->ObserveOutputStream(streamName, [holder](const ::mediapipe::Packet& packet) -> absl::Status { return holder->current->handlePacket(packet); }); if (!absStatus.ok()) { - SPDLOG_ERROR("Graph queue StartRun failed: {}", absStatus.ToString()); + SPDLOG_ERROR("Graph queue ObserveOutputStream failed: {}", absStatus.ToString()); throw std::runtime_error(absStatus.ToString()); } - inferRequests.emplace_back(std::move(gh)); } + for (const auto& [nodeName, _] : sidePacketMaps_->genAiServableMap) { + gh->genAiExecutionContextMap[nodeName] = std::make_shared(); + } + std::map inputSidePackets; + buildInputSidePackets(inputSidePackets, *sidePacketMaps_); + inputSidePackets[LLM_EXECUTION_CONTEXT_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(gh->genAiExecutionContextMap).At(::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE)); + absStatus = gh->graph->StartRun(inputSidePackets); + if (!absStatus.ok()) { + SPDLOG_ERROR("Graph queue StartRun failed: {}", absStatus.ToString()); + throw std::runtime_error(absStatus.ToString()); + } + return gh; } + +GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::shared_ptr sidePacketMaps, int initialSize, int maxSize) : + config_(config), + sidePacketMaps_(sidePacketMaps), + maxSize_(maxSize) { + inferRequests_.resize(maxSize_); + for (int i = 0; i < initialSize; ++i) { + inferRequests_[i] = createOneGraph(); + idleIds_.push(i); + } + currentSize_.store(initialSize, std::memory_order_relaxed); + SPDLOG_DEBUG("Graph queue created with initial size {} and max size {}", initialSize, maxSize_); +} + GraphQueue::~GraphQueue() { - for (auto& graphHelper : inferRequests) { + int size = currentSize_.load(std::memory_order_relaxed); + for (int i = 0; i < size; ++i) { + auto& graphHelper = inferRequests_[i]; + if (!graphHelper) + continue; auto absStatus = graphHelper->graph->WaitUntilIdle(); if (!absStatus.ok()) { SPDLOG_DEBUG("Graph queue WaitUntilIdle error: {}", absStatus.ToString()); @@ -105,4 +112,59 @@ GraphQueue::~GraphQueue() { graphHelper->graph.reset(); } } + +std::future GraphQueue::getIdleStream() { + std::promise promise; + std::future future = promise.get_future(); + + std::unique_lock lk(mutex_); + if (!idleIds_.empty()) { + int id = idleIds_.front(); + idleIds_.pop(); + lk.unlock(); + promise.set_value(id); + return future; + } + + // No idle graph available — try to expand + int currentSize = currentSize_.load(std::memory_order_relaxed); + if (currentSize < maxSize_) { + int newId = currentSize; + currentSize_.store(currentSize + 1, std::memory_order_relaxed); + lk.unlock(); + // Create graph outside the lock (expensive but only blocks this request) + try { + inferRequests_[newId] = createOneGraph(); + SPDLOG_DEBUG("Graph queue expanded to size {}/{}", newId + 1, maxSize_); + } catch (const std::exception& e) { + // Rollback size on failure + currentSize_.fetch_sub(1, std::memory_order_relaxed); + promise.set_exception(std::make_exception_ptr(e)); + return future; + } + promise.set_value(newId); + return future; + } + + // At max capacity — must wait for a graph to be returned + waiters_.push(std::move(promise)); + return future; +} + +void GraphQueue::returnStream(int id) { + std::unique_lock lk(mutex_); + if (!waiters_.empty()) { + std::promise waiter = std::move(waiters_.front()); + waiters_.pop(); + lk.unlock(); + waiter.set_value(id); + return; + } + idleIds_.push(id); +} + +std::shared_ptr& GraphQueue::getInferRequest(int id) { + return inferRequests_[id]; +} + } // namespace ovms diff --git a/src/mediapipe_internal/graphqueue.hpp b/src/mediapipe_internal/graphqueue.hpp index 9e85dc3385..c837ceec18 100644 --- a/src/mediapipe_internal/graphqueue.hpp +++ b/src/mediapipe_internal/graphqueue.hpp @@ -28,8 +28,6 @@ #include #include -#include "src/queue.hpp" - #pragma warning(push) #pragma warning(disable : 4324 6001 6385 6386 6326 6011 4309 4005 4456 6246) #pragma GCC diagnostic push @@ -66,14 +64,36 @@ struct GraphHelper { currentTimestamp(gh.currentTimestamp) {} GraphHelper& operator=(GraphHelper&&) = delete; }; -// we need to keep Graph alive during MP reload hence shared_ptr -class GraphQueue : public Queue> { -public: // XXX TODO make private? we need to access in mediapipegraphdefinition to set side packets though - std::shared_ptr sidePacketMaps; +// Dynamic graph pool that starts at initialSize and expands on demand up to maxSize. +// Does NOT inherit from Queue — isolated from the OV model inference path. +class GraphQueue { public: - GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::shared_ptr sidePacketMaps, int streamsLength); + GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::shared_ptr sidePacketMaps, int initialSize, int maxSize); ~GraphQueue(); + + std::future getIdleStream(); + void returnStream(int id); + std::shared_ptr& getInferRequest(int id); + + int getCurrentSize() const { return currentSize_.load(std::memory_order_relaxed); } + int getMaxSize() const { return maxSize_; } + +private: + std::shared_ptr createOneGraph(); + + const ::mediapipe::CalculatorGraphConfig config_; + std::shared_ptr sidePacketMaps_; + const int maxSize_; + std::atomic currentSize_{0}; + + // Pre-allocated to maxSize_; slots filled as pool expands. + std::vector> inferRequests_; + + // Idle stream management + std::mutex mutex_; + std::queue idleIds_; + std::queue> waiters_; }; struct GraphIdGuard { diff --git a/src/mediapipe_internal/mediapipegraphconfig.hpp b/src/mediapipe_internal/mediapipegraphconfig.hpp index 193576b416..ab27d0eb6e 100644 --- a/src/mediapipe_internal/mediapipegraphconfig.hpp +++ b/src/mediapipe_internal/mediapipegraphconfig.hpp @@ -91,7 +91,7 @@ class MediapipeGraphConfig { std::string currentGraphPbTxtMD5; /** - * @brief Graph queue size configuration. + * @brief Graph queue initial size configuration. * * - std::nullopt => user did not set this field * - int => user explicitly set a numeric size @@ -99,6 +99,15 @@ class MediapipeGraphConfig { */ GraphQueueSizeValue graphQueueSize; + /** + * @brief Graph queue max size configuration (pool expansion ceiling). + * + * - std::nullopt => defaults to graphQueueSize (no expansion) + * - int => user explicitly set a numeric max + * - GraphQueueAutoTag => user explicitly set "AUTO" + */ + GraphQueueSizeValue graphQueueMaxSize; + public: /** * @brief Construct a new Mediapie Graph configuration object @@ -245,6 +254,13 @@ class MediapipeGraphConfig { return this->graphQueueSize; } + /** + * @brief Get the graph queue max size setting. + */ + const GraphQueueSizeValue& getGraphQueueMaxSize() const { + return this->graphQueueMaxSize; + } + /** * @brief Set the graph queue size to an explicit numeric value. */ @@ -260,29 +276,64 @@ class MediapipeGraphConfig { } /** - * @brief Resolve the graph queue size setting to a concrete integer. + * @brief Set the graph queue max size to an explicit numeric value. + */ + void setGraphQueueMaxSize(int size) { + this->graphQueueMaxSize = size; + } + + /** + * @brief Set the graph queue max size to AUTO. + */ + void setGraphQueueMaxSizeAuto() { + this->graphQueueMaxSize = GraphQueueAutoTag{}; + } + + /** + * @brief Resolve a GraphQueueSizeValue to a concrete integer. * * Returns: - * -1 => queue creation disabled (user set -1 or not set) + * -1 => not set / disabled * >0 => explicit size or resolved AUTO - * - * Value 0 is rejected at parse time (resolveGraphQueueSize). - * When not set (nullopt): returns -1 (queue disabled). - * When AUTO: returns hardware_concurrency() or 16 as fallback. */ - int getInitialQueueSize() const { - if (!this->graphQueueSize.has_value()) { - return -1; // not set - queue disabled by default + static int resolveQueueSizeValue(const GraphQueueSizeValue& val) { + if (!val.has_value()) { + return -1; } - if (std::holds_alternative(*this->graphQueueSize)) { + if (std::holds_alternative(*val)) { unsigned int hwThreads = std::thread::hardware_concurrency(); if (hwThreads == 0) { - SPDLOG_WARN("std::thread::hardware_concurrency() returned 0 (unknown). Falling back to graph queue size 16."); + SPDLOG_WARN("std::thread::hardware_concurrency() returned 0 (unknown). Falling back to 16."); return 16; } return static_cast(hwThreads); } - return std::get(*this->graphQueueSize); + return std::get(*val); + } + + /** + * @brief Get the resolved initial queue size. + * + * Returns: + * -1 => queue creation disabled (not set) + * >0 => explicit size or resolved AUTO + */ + int getResolvedQueueSize() const { + return resolveQueueSizeValue(this->graphQueueSize); + } + + /** + * @brief Get the resolved max queue size (pool expansion ceiling). + * + * If not explicitly set, defaults to getResolvedQueueSize() (no expansion). + */ + int getResolvedMaxQueueSize() const { + int maxVal = resolveQueueSizeValue(this->graphQueueMaxSize); + if (maxVal < 0) { + // Not set — default to same as queue size (no expansion) + return getResolvedQueueSize(); + } + return maxVal; } bool isReloadRequired(const MediapipeGraphConfig& rhs) const; diff --git a/src/mediapipe_internal/mediapipegraphdefinition.cpp b/src/mediapipe_internal/mediapipegraphdefinition.cpp index 40da82454a..9812ded2ea 100644 --- a/src/mediapipe_internal/mediapipegraphdefinition.cpp +++ b/src/mediapipe_internal/mediapipegraphdefinition.cpp @@ -80,44 +80,72 @@ Status MediapipeGraphDefinition::validateForConfigFileExistence() { } Status MediapipeGraphDefinition::resolveGraphQueueSize() { - // 1. Explicit pbtxt directive: # OVMS_GRAPH_QUEUE_SIZE: - // Always honored regardless of env var or calculator checks. - // Value -1 disables the queue, AUTO or positive integer enables it. - // Value 0 is rejected as invalid. - static const std::regex directiveRegex( - R"((?:^|\n)\s*#\s*OVMS_GRAPH_QUEUE_SIZE\s*:\s*(\S+)\s*(?:\r?\n|$))"); + // 1. Parse OVMS_GRAPH_INITIAL_QUEUE_SIZE directive (initial pool size) + static const std::regex sizeRegex( + R"((?:^|\n)\s*#\s*OVMS_GRAPH_INITIAL_QUEUE_SIZE\s*:\s*(\S+)\s*(?:\r?\n|$))"); + // 2. Parse OVMS_GRAPH_QUEUE_MAX_SIZE directive (pool expansion ceiling) + static const std::regex maxSizeRegex( + R"((?:^|\n)\s*#\s*OVMS_GRAPH_QUEUE_MAX_SIZE\s*:\s*(\S+)\s*(?:\r?\n|$))"); + std::smatch match; - if (std::regex_search(this->chosenConfig, match, directiveRegex)) { + + if (std::regex_search(this->chosenConfig, match, sizeRegex)) { std::string value = match[1].str(); if (value == "AUTO") { this->mgconfig.setGraphQueueSizeAuto(); - return StatusCode::OK; - } - auto parsed = stoi32(value); - if (!parsed.has_value()) { - SPDLOG_ERROR("Invalid OVMS_GRAPH_QUEUE_SIZE value: '{}'. Expected integer or 'AUTO'.", value); - return StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID; - } - int queueSize = parsed.value(); - if (queueSize < -1 || queueSize == 0) { - SPDLOG_ERROR("Invalid OVMS_GRAPH_QUEUE_SIZE value: {}. Must be -1 (disabled) or a positive integer.", queueSize); - return StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID; - } - if (queueSize == -1) { - SPDLOG_DEBUG("Graph queue explicitly disabled (OVMS_GRAPH_QUEUE_SIZE=-1) for mediapipe: {}", getName()); - return StatusCode::OK; - } - unsigned int maxThreads = std::thread::hardware_concurrency(); - if (maxThreads > 0 && queueSize > static_cast(maxThreads)) { - SPDLOG_WARN("OVMS_GRAPH_QUEUE_SIZE value: {} exceeds available hardware threads: {}. Clamping to {}.", queueSize, maxThreads, maxThreads); - queueSize = static_cast(maxThreads); + } else { + auto parsed = stoi32(value); + if (!parsed.has_value()) { + SPDLOG_ERROR("Invalid OVMS_GRAPH_INITIAL_QUEUE_SIZE value: '{}'. Expected integer or 'AUTO'.", value); + return StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID; + } + int queueSize = parsed.value(); + if (queueSize < -1 || queueSize == 0) { + SPDLOG_ERROR("Invalid OVMS_GRAPH_INITIAL_QUEUE_SIZE value: {}. Must be -1 (disabled) or a positive integer.", queueSize); + return StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID; + } + if (queueSize == -1) { + SPDLOG_DEBUG("Graph queue explicitly disabled (OVMS_GRAPH_INITIAL_QUEUE_SIZE=-1) for mediapipe: {}", getName()); + return StatusCode::OK; + } + unsigned int maxThreads = std::thread::hardware_concurrency(); + if (maxThreads > 0 && queueSize > static_cast(maxThreads)) { + SPDLOG_WARN("OVMS_GRAPH_INITIAL_QUEUE_SIZE value: {} exceeds available hardware threads: {}. Clamping to {}.", queueSize, maxThreads, maxThreads); + queueSize = static_cast(maxThreads); + } + this->mgconfig.setGraphQueueSize(queueSize); } - this->mgconfig.setGraphQueueSize(queueSize); + } else { + // No directive — queue disabled by default. + SPDLOG_DEBUG("Graph queue disabled by default for mediapipe: {}. Add '# OVMS_GRAPH_INITIAL_QUEUE_SIZE: ' directive in graph.pbtxt to enable.", getName()); return StatusCode::OK; } - // 2. Default: queue disabled unless graph explicitly provides directive. - SPDLOG_DEBUG("Graph queue disabled by default for mediapipe: {}. Add '# OVMS_GRAPH_QUEUE_SIZE: ' directive in graph.pbtxt to enable.", getName()); + // Parse max size (optional — defaults to same as initial size if absent) + if (std::regex_search(this->chosenConfig, match, maxSizeRegex)) { + std::string value = match[1].str(); + if (value == "AUTO") { + this->mgconfig.setGraphQueueMaxSizeAuto(); + } else { + auto parsed = stoi32(value); + if (!parsed.has_value()) { + SPDLOG_ERROR("Invalid OVMS_GRAPH_QUEUE_MAX_SIZE value: '{}'. Expected integer or 'AUTO'.", value); + return StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID; + } + int maxSize = parsed.value(); + if (maxSize < 1) { + SPDLOG_ERROR("Invalid OVMS_GRAPH_QUEUE_MAX_SIZE value: {}. Must be a positive integer.", maxSize); + return StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID; + } + unsigned int maxThreads = std::thread::hardware_concurrency(); + if (maxThreads > 0 && maxSize > static_cast(maxThreads)) { + SPDLOG_WARN("OVMS_GRAPH_QUEUE_MAX_SIZE value: {} exceeds available hardware threads: {}. Clamping to {}.", maxSize, maxThreads, maxThreads); + maxSize = static_cast(maxThreads); + } + this->mgconfig.setGraphQueueMaxSize(maxSize); + } + } + return StatusCode::OK; } @@ -222,13 +250,18 @@ Status MediapipeGraphDefinition::validate(const ServableNameChecker& checker) { } Status MediapipeGraphDefinition::initializeQueueIfRequired() { - int initialQueueSize = this->mgconfig.getInitialQueueSize(); - if (initialQueueSize < 0) { - SPDLOG_DEBUG("Graph queue creation disabled for mediapipe: {} (graph_queue_size={})", getName(), initialQueueSize); + int initialSize = this->mgconfig.getResolvedQueueSize(); + if (initialSize < 0) { + SPDLOG_DEBUG("Graph queue creation disabled for mediapipe: {} (graph_queue_size={})", getName(), initialSize); return StatusCode::OK; } + int maxQueueSize = this->mgconfig.getResolvedMaxQueueSize(); + if (maxQueueSize < initialSize) { + SPDLOG_WARN("OVMS_GRAPH_QUEUE_MAX_SIZE ({}) < OVMS_GRAPH_QUEUE_SIZE ({}). Clamping max to initial size.", maxQueueSize, initialSize); + maxQueueSize = initialSize; + } try { - this->queue = std::make_shared(this->config, this->sidePacketMaps, initialQueueSize); + this->queue = std::make_shared(this->config, this->sidePacketMaps, initialSize, maxQueueSize); } catch (const std::exception& e) { SPDLOG_LOGGER_ERROR(modelmanager_logger, "Failed to create graph queue for mediapipe: {} error: {}", getName(), e.what()); return StatusCode::INTERNAL_ERROR; @@ -236,7 +269,7 @@ Status MediapipeGraphDefinition::initializeQueueIfRequired() { SPDLOG_LOGGER_ERROR(modelmanager_logger, "Failed to create graph queue for mediapipe: {} unknown error", getName()); return StatusCode::INTERNAL_ERROR; } - SPDLOG_DEBUG("Created graph queue with size {} for mediapipe: {}", initialQueueSize, getName()); + SPDLOG_DEBUG("Created graph queue with initial size {} max size {} for mediapipe: {}", initialSize, maxQueueSize, getName()); return StatusCode::OK; } diff --git a/src/test/graph_export_test.cpp b/src/test/graph_export_test.cpp index 32495c43f7..31f81e8543 100644 --- a/src/test/graph_export_test.cpp +++ b/src/test/graph_export_test.cpp @@ -14,6 +14,7 @@ // limitations under the License. //***************************************************************************** #include +#include #include #include @@ -505,11 +506,34 @@ class GraphCreationTest : public TestWithTempDir { TestWithTempDir::TearDown(); } - std::string getExpectedGraphQueueSizeDirective(const ovms::HFSettingsImpl& hfSettings) const { + std::optional getExpectedGraphQueueSizeDirective(const ovms::HFSettingsImpl& hfSettings) const { + if (hfSettings.exportSettings.graphInitialQueueSize.has_value()) { + return hfSettings.exportSettings.graphInitialQueueSize.value(); + } + if (hfSettings.task == ovms::IMAGE_GENERATION_GRAPH) { + return "1"; + } + if (hfSettings.task == ovms::TEXT_GENERATION_GRAPH || + hfSettings.task == ovms::EMBEDDINGS_GRAPH || + hfSettings.task == ovms::RERANK_GRAPH) { + return "1"; + } + return std::nullopt; + } + + std::optional getExpectedGraphQueueMaxSizeDirective(const ovms::HFSettingsImpl& hfSettings) const { + if (hfSettings.exportSettings.graphQueueMaxSize.has_value()) { + return hfSettings.exportSettings.graphQueueMaxSize.value(); + } if (hfSettings.task == ovms::IMAGE_GENERATION_GRAPH) { return "1"; } - return "AUTO"; + if (hfSettings.task == ovms::TEXT_GENERATION_GRAPH || + hfSettings.task == ovms::EMBEDDINGS_GRAPH || + hfSettings.task == ovms::RERANK_GRAPH) { + return "AUTO"; + } + return std::nullopt; } std::string createGraphAndReadContents(const ovms::HFSettingsImpl& hfSettings) { @@ -524,17 +548,37 @@ class GraphCreationTest : public TestWithTempDir { } void assertGraphQueueHeader(const std::string& graphContents, const ovms::HFSettingsImpl& hfSettings) { - const std::string queueLinePrefix = "# OVMS_GRAPH_QUEUE_SIZE: "; + const std::string queueLinePrefix = "# OVMS_GRAPH_INITIAL_QUEUE_SIZE: "; + const std::string maxLinePrefix = "# OVMS_GRAPH_QUEUE_MAX_SIZE: "; + auto expectedDirective = getExpectedGraphQueueSizeDirective(hfSettings); + auto expectedMaxDirective = getExpectedGraphQueueMaxSizeDirective(hfSettings); auto firstLineEnd = graphContents.find("\n"); ASSERT_NE(firstLineEnd, std::string::npos) << graphContents; auto queueLineStart = firstLineEnd + 1; auto queueLineEnd = graphContents.find("\n", queueLineStart); ASSERT_NE(queueLineEnd, std::string::npos) << graphContents; - std::string actualQueueLine = graphContents.substr(queueLineStart, queueLineEnd - queueLineStart); - ASSERT_EQ(0, actualQueueLine.rfind(queueLinePrefix, 0)) << graphContents; - std::string expectedQueueLine = queueLinePrefix + getExpectedGraphQueueSizeDirective(hfSettings); - ASSERT_EQ(expectedQueueLine, actualQueueLine) << graphContents; + std::string secondLine = graphContents.substr(queueLineStart, queueLineEnd - queueLineStart); + if (!expectedDirective.has_value()) { + // Directive should NOT be present + ASSERT_NE(0, secondLine.rfind(queueLinePrefix, 0)) + << "Queue directive should not be present for this graph type. Got: " << secondLine; + return; + } + ASSERT_EQ(0, secondLine.rfind(queueLinePrefix, 0)) << graphContents; + std::string expectedQueueLine = queueLinePrefix + expectedDirective.value(); + ASSERT_EQ(expectedQueueLine, secondLine) << graphContents; + + // Check max size line + if (expectedMaxDirective.has_value()) { + auto maxLineStart = queueLineEnd + 1; + auto maxLineEnd = graphContents.find("\n", maxLineStart); + ASSERT_NE(maxLineEnd, std::string::npos) << graphContents; + std::string thirdLine = graphContents.substr(maxLineStart, maxLineEnd - maxLineStart); + ASSERT_EQ(0, thirdLine.rfind(maxLinePrefix, 0)) << graphContents; + std::string expectedMaxLine = maxLinePrefix + expectedMaxDirective.value(); + ASSERT_EQ(expectedMaxLine, thirdLine) << graphContents; + } } void assertCreatedGraphEquals(const ovms::HFSettingsImpl& hfSettings, const std::string& expectedGraphContents, bool assertVersion = false) { @@ -546,7 +590,7 @@ class GraphCreationTest : public TestWithTempDir { ASSERT_EQ(expectedGraphContents, removeGeneratedGraphHeaders(graphContents)) << graphContents; } - // Removes generated graph header lines (version and optional queue size directive) + // Removes generated graph header lines (version and optional queue size directives) // which differ across build/runtime setup. std::string removeGeneratedGraphHeaders(std::string input) { auto firstLineEnd = input.find("\n"); @@ -555,13 +599,22 @@ class GraphCreationTest : public TestWithTempDir { } input.erase(0, firstLineEnd + 1); - const std::string queueLinePrefix = "# OVMS_GRAPH_QUEUE_SIZE:"; + const std::string queueLinePrefix = "# OVMS_GRAPH_INITIAL_QUEUE_SIZE:"; if (input.rfind(queueLinePrefix, 0) == 0) { - auto secondLineEnd = input.find("\n"); - if (secondLineEnd == std::string::npos) { + auto lineEnd = input.find("\n"); + if (lineEnd == std::string::npos) { + return ""; + } + input.erase(0, lineEnd + 1); + } + + const std::string maxLinePrefix = "# OVMS_GRAPH_QUEUE_MAX_SIZE:"; + if (input.rfind(maxLinePrefix, 0) == 0) { + auto lineEnd = input.find("\n"); + if (lineEnd == std::string::npos) { return ""; } - input.erase(0, secondLineEnd + 1); + input.erase(0, lineEnd + 1); } return input; } diff --git a/src/test/graphqueue_test.cpp b/src/test/graphqueue_test.cpp new file mode 100644 index 0000000000..28c6cf9bd1 --- /dev/null +++ b/src/test/graphqueue_test.cpp @@ -0,0 +1,319 @@ +//***************************************************************************** +// Copyright 2025 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//***************************************************************************** +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "../mediapipe_internal/graphqueue.hpp" +#include "../mediapipe_internal/graph_side_packets.hpp" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include "mediapipe/framework/calculator_graph.h" +#include "mediapipe/framework/port/parse_text_proto.h" +#pragma GCC diagnostic pop + +using namespace ovms; +using namespace std::chrono_literals; + +// Minimal passthrough graph config for testing +static const char* kPassthroughGraphConfig = R"pb( + input_stream: "PASSTHROUGH:in" + output_stream: "PASSTHROUGH:out" + node { + calculator: "PassThroughCalculator" + input_stream: "PASSTHROUGH:in" + output_stream: "PASSTHROUGH:out" + } +)pb"; + +class GraphQueueExpansionTest : public ::testing::Test { +protected: + ::mediapipe::CalculatorGraphConfig config; + std::shared_ptr sidePackets; + + void SetUp() override { + config = mediapipe::ParseTextProtoOrDie<::mediapipe::CalculatorGraphConfig>(kPassthroughGraphConfig); + sidePackets = std::make_shared(); + } +}; + +// Basic: pool starts at initial size +TEST_F(GraphQueueExpansionTest, StartsAtInitialSize) { + auto queue = std::make_shared(config, sidePackets, 1, 4); + EXPECT_EQ(1, queue->getCurrentSize()); + EXPECT_EQ(4, queue->getMaxSize()); +} + +// Single request works on initial pool +TEST_F(GraphQueueExpansionTest, SingleRequestNoExpansion) { + auto queue = std::make_shared(config, sidePackets, 1, 4); + { + GraphIdGuard guard(queue); + EXPECT_EQ(0, guard.id); + EXPECT_NE(nullptr, guard.gh); + EXPECT_NE(nullptr, &guard.graph); + } + // After return, size should still be 1 + EXPECT_EQ(1, queue->getCurrentSize()); +} + +// Two concurrent requests expand pool from 1 to 2 +TEST_F(GraphQueueExpansionTest, ConcurrentRequestsExpandPool) { + auto queue = std::make_shared(config, sidePackets, 1, 4); + + // Acquire first graph — pool size stays at 1 + auto future0 = queue->getIdleStream(); + int id0 = future0.get(); + EXPECT_EQ(0, id0); + EXPECT_EQ(1, queue->getCurrentSize()); + + // Second request should trigger expansion + auto future1 = queue->getIdleStream(); + int id1 = future1.get(); + EXPECT_EQ(1, id1); + EXPECT_EQ(2, queue->getCurrentSize()); + + // Return both + queue->returnStream(id0); + queue->returnStream(id1); +} + +// Expansion stops at max size, additional requests block +TEST_F(GraphQueueExpansionTest, ExpansionStopsAtMax) { + auto queue = std::make_shared(config, sidePackets, 1, 2); + + auto f0 = queue->getIdleStream(); + int id0 = f0.get(); + auto f1 = queue->getIdleStream(); + int id1 = f1.get(); + EXPECT_EQ(2, queue->getCurrentSize()); + + // Third request should block since max=2 and both are in use + std::atomic thirdCompleted{false}; + std::thread t([&]() { + auto f2 = queue->getIdleStream(); + f2.get(); + thirdCompleted.store(true); + }); + + // Give the thread time to block + std::this_thread::sleep_for(50ms); + EXPECT_FALSE(thirdCompleted.load()); + + // Return one — should unblock the waiting request + queue->returnStream(id0); + t.join(); + EXPECT_TRUE(thirdCompleted.load()); + EXPECT_EQ(2, queue->getCurrentSize()); // no expansion beyond max + + queue->returnStream(id1); +} + +// Multiple threads expanding concurrently — all get unique IDs, no overflow +TEST_F(GraphQueueExpansionTest, MultiThreadExpansionThreadSafe) { + constexpr int maxSize = 8; + auto queue = std::make_shared(config, sidePackets, 1, maxSize); + + std::vector> futures; + std::vector threads; + + std::atomic readyCount{0}; + std::atomic go{false}; + + // Launch maxSize threads that all try to acquire simultaneously + for (int i = 0; i < maxSize; ++i) { + threads.emplace_back([&, i]() { + readyCount.fetch_add(1); + while (!go.load()) { + std::this_thread::yield(); + } + auto f = queue->getIdleStream(); + int id = f.get(); + // Hold the graph briefly + std::this_thread::sleep_for(10ms); + queue->returnStream(id); + }); + } + + // Wait for all threads to be ready, then release + while (readyCount.load() < maxSize) { + std::this_thread::yield(); + } + go.store(true); + + for (auto& t : threads) { + t.join(); + } + + // Pool should have expanded to serve all concurrent requests + EXPECT_LE(queue->getCurrentSize(), maxSize); + EXPECT_GE(queue->getCurrentSize(), 1); +} + +// GraphIdGuard RAII correctly returns graph on destruction +TEST_F(GraphQueueExpansionTest, GraphIdGuardReturnsOnDestruction) { + auto queue = std::make_shared(config, sidePackets, 1, 1); + { + GraphIdGuard guard(queue); + EXPECT_EQ(0, guard.id); + } + // Graph should be available again + auto f = queue->getIdleStream(); + int id = f.get(); + EXPECT_EQ(0, id); + queue->returnStream(id); +} + +// GraphIdGuard survives queue destruction (weak_ptr pattern) +TEST_F(GraphQueueExpansionTest, GraphIdGuardSurvivesQueueDestruction) { + auto queue = std::make_shared(config, sidePackets, 1, 1); + auto guard = std::make_unique(queue); + EXPECT_EQ(0, guard->id); + + // Destroy the queue while guard is alive + queue.reset(); + + // Guard should still hold the graph helper alive (shared_ptr prevents deallocation) + EXPECT_NE(nullptr, guard->gh); + // Note: the underlying graph is shut down by ~GraphQueue, so graph ptr is null. + // This is expected — in-flight requests complete before queue destruction. + + // Destruction of guard should not crash (weak_ptr expired, returnStream is no-op) + guard.reset(); +} + +// Stress test: many threads doing acquire→use→return in a loop during expansion +// Verifies no ID duplication, no data corruption, no deadlocks +TEST_F(GraphQueueExpansionTest, StressInterleavedExpandAndReturn) { + constexpr int maxSize = 8; + constexpr int numThreads = 16; // more threads than max pool size + constexpr int iterationsPerThread = 50; + auto queue = std::make_shared(config, sidePackets, 1, maxSize); + + std::atomic readyCount{0}; + std::atomic go{false}; + std::atomic totalAcquired{0}; + + // Track which IDs are currently held — detect duplicates + std::vector> idInUse(maxSize); + for (auto& a : idInUse) + a.store(false); + + std::vector threads; + for (int t = 0; t < numThreads; ++t) { + threads.emplace_back([&]() { + readyCount.fetch_add(1); + while (!go.load()) { + std::this_thread::yield(); + } + for (int i = 0; i < iterationsPerThread; ++i) { + auto f = queue->getIdleStream(); + int id = f.get(); + + // Verify ID is in valid range + ASSERT_GE(id, 0); + ASSERT_LT(id, maxSize); + + // Verify no other thread holds this ID + bool wasInUse = idInUse[id].exchange(true); + ASSERT_FALSE(wasInUse) << "ID " << id << " was already in use — duplicate assignment!"; + + // Verify the GraphHelper at this slot is valid + auto& gh = queue->getInferRequest(id); + ASSERT_NE(nullptr, gh); + ASSERT_NE(nullptr, gh->graph); + + totalAcquired.fetch_add(1); + + // Simulate some work + std::this_thread::sleep_for(std::chrono::microseconds(100)); + + // Release + idInUse[id].store(false); + queue->returnStream(id); + } + }); + } + + while (readyCount.load() < numThreads) { + std::this_thread::yield(); + } + go.store(true); + + for (auto& t : threads) { + t.join(); + } + + EXPECT_EQ(numThreads * iterationsPerThread, totalAcquired.load()); + EXPECT_LE(queue->getCurrentSize(), maxSize); + // All IDs should be returned (idle) + for (int i = 0; i < queue->getCurrentSize(); ++i) { + EXPECT_FALSE(idInUse[i].load()); + } +} + +// Verify returned IDs are properly recycled (no unbounded growth) +TEST_F(GraphQueueExpansionTest, IdsAreRecycledNotGrowing) { + auto queue = std::make_shared(config, sidePackets, 2, 2); + + // Acquire and return many times — should always get IDs 0 or 1 + std::set seenIds; + for (int i = 0; i < 100; ++i) { + auto f = queue->getIdleStream(); + int id = f.get(); + seenIds.insert(id); + queue->returnStream(id); + } + // Only IDs 0 and 1 should ever appear + EXPECT_EQ(2u, seenIds.size()); + EXPECT_TRUE(seenIds.count(0)); + EXPECT_TRUE(seenIds.count(1)); +} + +// Verify expansion under load: each new slot gets a distinct GraphHelper +TEST_F(GraphQueueExpansionTest, EachSlotHasDistinctGraphHelper) { + constexpr int maxSize = 4; + auto queue = std::make_shared(config, sidePackets, 1, maxSize); + + // Hold all graphs simultaneously to force full expansion + std::vector heldIds; + std::set helperPtrs; + for (int i = 0; i < maxSize; ++i) { + auto f = queue->getIdleStream(); + int id = f.get(); + heldIds.push_back(id); + auto& gh = queue->getInferRequest(id); + ASSERT_NE(nullptr, gh); + helperPtrs.insert(gh.get()); + } + + // All pointers must be distinct + EXPECT_EQ(static_cast(maxSize), helperPtrs.size()) + << "Expected " << maxSize << " distinct GraphHelper instances, got " << helperPtrs.size(); + + // Return all + for (int id : heldIds) { + queue->returnStream(id); + } + EXPECT_EQ(maxSize, queue->getCurrentSize()); +} diff --git a/src/test/llm/lm_cb_regular_queue.pbtxt b/src/test/llm/lm_cb_regular_queue.pbtxt index 60ef13f6b7..ef3b4b7c0e 100644 --- a/src/test/llm/lm_cb_regular_queue.pbtxt +++ b/src/test/llm/lm_cb_regular_queue.pbtxt @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# OVMS_GRAPH_QUEUE_SIZE: 1 +# OVMS_GRAPH_INITIAL_QUEUE_SIZE: 1 input_stream: "HTTP_REQUEST_PAYLOAD:input" output_stream: "HTTP_RESPONSE_PAYLOAD:output" node { diff --git a/src/test/mediapipe/graph_gpt_with_queue.pbtxt b/src/test/mediapipe/graph_gpt_with_queue.pbtxt index 43c2ef68c1..347298599b 100644 --- a/src/test/mediapipe/graph_gpt_with_queue.pbtxt +++ b/src/test/mediapipe/graph_gpt_with_queue.pbtxt @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# OVMS_GRAPH_QUEUE_SIZE: 1 +# OVMS_GRAPH_INITIAL_QUEUE_SIZE: 1 input_stream: "HTTP_REQUEST_PAYLOAD:input" output_stream: "HTTP_RESPONSE_PAYLOAD:output" diff --git a/src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames.pbtxt b/src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames.pbtxt index 2a5016a7fb..5dd9fc4d1b 100644 --- a/src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames.pbtxt +++ b/src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames.pbtxt @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# OVMS_GRAPH_QUEUE_SIZE: 16 +# OVMS_GRAPH_INITIAL_QUEUE_SIZE: 16 input_stream: "custom_dummy_input" output_stream: "custom_dummy_output" node { diff --git a/src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames_newpath.pbtxt b/src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames_newpath.pbtxt index 2a5016a7fb..5dd9fc4d1b 100644 --- a/src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames_newpath.pbtxt +++ b/src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames_newpath.pbtxt @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# OVMS_GRAPH_QUEUE_SIZE: 16 +# OVMS_GRAPH_INITIAL_QUEUE_SIZE: 16 input_stream: "custom_dummy_input" output_stream: "custom_dummy_output" node { diff --git a/src/test/mediapipeflow_test.cpp b/src/test/mediapipeflow_test.cpp index 12784eaf8b..ac172bc48d 100644 --- a/src/test/mediapipeflow_test.cpp +++ b/src/test/mediapipeflow_test.cpp @@ -2728,7 +2728,7 @@ class MediapipeSerialization : public ::testing::Test { const ::mediapipe::CalculatorGraphConfig config; this->reporter = std::make_unique(nullptr, nullptr, ""); // disabled reporter sidePackets = std::make_shared(); - queue = std::make_shared(config, sidePackets, 1); + queue = std::make_shared(config, sidePackets, 1, 1); GraphIdGuard guard(queue); executor = std::make_unique("", "", config, mapping, mapping, inputNames, outputNames, *sidePackets, this->reporter.get(), std::move(guard)); } @@ -4065,7 +4065,7 @@ TEST(WhitelistRegistered, MediapipeSubgraphList) { ASSERT_THAT(mediapipe::SubgraphRegistry::GetRegisteredNames(), UnorderedElementsAreArray(expected)) << readableSetError(mediapipe::SubgraphRegistry::GetRegisteredNames(), expected); } -// --- OVMS_GRAPH_QUEUE_SIZE pbtxt directive tests --- +// --- OVMS_GRAPH_INITIAL_QUEUE_SIZE pbtxt directive tests --- // Minimal valid pbtxt that MediaPipe can parse (uses a registered test calculator) static const char* MINIMAL_PBTXT_TEMPLATE = R"( @@ -4105,42 +4105,42 @@ TEST(MediapipeGraphQueueSizeDirective, NoDirectiveMeansDisabled) { auto status = def.validate(manager); ASSERT_EQ(status, ovms::StatusCode::OK); EXPECT_FALSE(mgc.getGraphQueueSize().has_value()); - // getInitialQueueSize on default mgc returns -1 - EXPECT_EQ(def.getMediapipeGraphConfig().getInitialQueueSize(), -1); + // getResolvedQueueSize on default mgc returns -1 + EXPECT_EQ(def.getMediapipeGraphConfig().getResolvedQueueSize(), -1); } TEST(MediapipeGraphQueueSizeDirective, ExplicitPositiveValue) { - std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_QUEUE_SIZE: 4"); + std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_INITIAL_QUEUE_SIZE: 4"); ovms::MediapipeGraphConfig mgc; DummyMediapipeGraphDefinition def("test", mgc, pbtxt); ovms::ModelManager manager; auto status = def.validate(manager); ASSERT_EQ(status, ovms::StatusCode::OK); - EXPECT_EQ(def.getMediapipeGraphConfig().getInitialQueueSize(), 4); + EXPECT_EQ(def.getMediapipeGraphConfig().getResolvedQueueSize(), 4); } TEST(MediapipeGraphQueueSizeDirective, DisabledExplicitly) { - std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_QUEUE_SIZE: -1"); + std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_INITIAL_QUEUE_SIZE: -1"); ovms::MediapipeGraphConfig mgc; DummyMediapipeGraphDefinition def("test", mgc, pbtxt); ovms::ModelManager manager; auto status = def.validate(manager); ASSERT_EQ(status, ovms::StatusCode::OK); - EXPECT_EQ(def.getMediapipeGraphConfig().getInitialQueueSize(), -1); + EXPECT_EQ(def.getMediapipeGraphConfig().getResolvedQueueSize(), -1); } TEST(MediapipeGraphQueueSizeDirective, AutoValue) { - std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_QUEUE_SIZE: AUTO"); + std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_INITIAL_QUEUE_SIZE: AUTO"); ovms::MediapipeGraphConfig mgc; DummyMediapipeGraphDefinition def("test", mgc, pbtxt); ovms::ModelManager manager; auto status = def.validate(manager); ASSERT_EQ(status, ovms::StatusCode::OK); - EXPECT_GT(def.getMediapipeGraphConfig().getInitialQueueSize(), 0); + EXPECT_GT(def.getMediapipeGraphConfig().getResolvedQueueSize(), 0); } TEST(MediapipeGraphQueueSizeDirective, ZeroRejected) { - std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_QUEUE_SIZE: 0"); + std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_INITIAL_QUEUE_SIZE: 0"); ovms::MediapipeGraphConfig mgc; DummyMediapipeGraphDefinition def("test", mgc, pbtxt); ovms::ModelManager manager; @@ -4149,7 +4149,7 @@ TEST(MediapipeGraphQueueSizeDirective, ZeroRejected) { } TEST(MediapipeGraphQueueSizeDirective, NegativeBelowMinusOneRejected) { - std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_QUEUE_SIZE: -2"); + std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_INITIAL_QUEUE_SIZE: -2"); ovms::MediapipeGraphConfig mgc; DummyMediapipeGraphDefinition def("test", mgc, pbtxt); ovms::ModelManager manager; @@ -4163,7 +4163,7 @@ TEST(MediapipeGraphQueueSizeDirective, ExceedsHardwareThreads) { GTEST_SKIP() << "hardware_concurrency() returned 0, cannot test thread limit"; } int oversized = static_cast(maxThreads) + 1; - std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_QUEUE_SIZE: " + std::to_string(oversized)); + std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_INITIAL_QUEUE_SIZE: " + std::to_string(oversized)); ovms::MediapipeGraphConfig mgc; DummyMediapipeGraphDefinition def("test", mgc, pbtxt); ovms::ModelManager manager; @@ -4173,7 +4173,59 @@ TEST(MediapipeGraphQueueSizeDirective, ExceedsHardwareThreads) { } TEST(MediapipeGraphQueueSizeDirective, InvalidStringRejected) { - std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_QUEUE_SIZE: INVALID"); + std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_INITIAL_QUEUE_SIZE: INVALID"); + ovms::MediapipeGraphConfig mgc; + DummyMediapipeGraphDefinition def("test", mgc, pbtxt); + ovms::ModelManager manager; + auto status = def.validate(manager); + EXPECT_EQ(status, ovms::StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID); +} + +TEST(MediapipeGraphQueueSizeDirective, MaxSizeDirective) { + std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_INITIAL_QUEUE_SIZE: 2\n# OVMS_GRAPH_QUEUE_MAX_SIZE: 8"); + ovms::MediapipeGraphConfig mgc; + DummyMediapipeGraphDefinition def("test", mgc, pbtxt); + ovms::ModelManager manager; + auto status = def.validate(manager); + ASSERT_EQ(status, ovms::StatusCode::OK); + EXPECT_EQ(def.getMediapipeGraphConfig().getResolvedQueueSize(), 2); + EXPECT_EQ(def.getMediapipeGraphConfig().getResolvedMaxQueueSize(), 8); +} + +TEST(MediapipeGraphQueueSizeDirective, MaxSizeAutoDirective) { + std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_INITIAL_QUEUE_SIZE: 1\n# OVMS_GRAPH_QUEUE_MAX_SIZE: AUTO"); + ovms::MediapipeGraphConfig mgc; + DummyMediapipeGraphDefinition def("test", mgc, pbtxt); + ovms::ModelManager manager; + auto status = def.validate(manager); + ASSERT_EQ(status, ovms::StatusCode::OK); + EXPECT_EQ(def.getMediapipeGraphConfig().getResolvedQueueSize(), 1); + EXPECT_GT(def.getMediapipeGraphConfig().getResolvedMaxQueueSize(), 0); +} + +TEST(MediapipeGraphQueueSizeDirective, MaxSizeDefaultsToSizeWhenAbsent) { + std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_INITIAL_QUEUE_SIZE: 4"); + ovms::MediapipeGraphConfig mgc; + DummyMediapipeGraphDefinition def("test", mgc, pbtxt); + ovms::ModelManager manager; + auto status = def.validate(manager); + ASSERT_EQ(status, ovms::StatusCode::OK); + EXPECT_EQ(def.getMediapipeGraphConfig().getResolvedQueueSize(), 4); + // When max is not set, it defaults to the same as initial size + EXPECT_EQ(def.getMediapipeGraphConfig().getResolvedMaxQueueSize(), 4); +} + +TEST(MediapipeGraphQueueSizeDirective, MaxSizeInvalidRejected) { + std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_INITIAL_QUEUE_SIZE: 2\n# OVMS_GRAPH_QUEUE_MAX_SIZE: INVALID"); + ovms::MediapipeGraphConfig mgc; + DummyMediapipeGraphDefinition def("test", mgc, pbtxt); + ovms::ModelManager manager; + auto status = def.validate(manager); + EXPECT_EQ(status, ovms::StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID); +} + +TEST(MediapipeGraphQueueSizeDirective, MaxSizeZeroRejected) { + std::string pbtxt = makePbtxtWithDirective("# OVMS_GRAPH_INITIAL_QUEUE_SIZE: 2\n# OVMS_GRAPH_QUEUE_MAX_SIZE: 0"); ovms::MediapipeGraphConfig mgc; DummyMediapipeGraphDefinition def("test", mgc, pbtxt); ovms::ModelManager manager; diff --git a/src/test/pull_hf_model_test.cpp b/src/test/pull_hf_model_test.cpp index b8d323801c..fdf9d29c45 100644 --- a/src/test/pull_hf_model_test.cpp +++ b/src/test/pull_hf_model_test.cpp @@ -73,7 +73,7 @@ class HfDownloaderPullHfModel : public TestWithTempDir { TestWithTempDir::TearDown(); } - // Removes generated graph header lines (version and optional queue size directive) + // Removes generated graph header lines (version and optional queue size directives) // which differ across build/runtime setup. std::string removeGeneratedGraphHeaders(std::string input) { auto firstLineEnd = input.find("\n"); @@ -82,13 +82,22 @@ class HfDownloaderPullHfModel : public TestWithTempDir { } input.erase(0, firstLineEnd + 1); - const std::string queueLinePrefix = "# OVMS_GRAPH_QUEUE_SIZE:"; + const std::string queueLinePrefix = "# OVMS_GRAPH_INITIAL_QUEUE_SIZE:"; if (input.rfind(queueLinePrefix, 0) == 0) { - auto secondLineEnd = input.find("\n"); - if (secondLineEnd == std::string::npos) { + auto lineEnd = input.find("\n"); + if (lineEnd == std::string::npos) { return ""; } - input.erase(0, secondLineEnd + 1); + input.erase(0, lineEnd + 1); + } + + const std::string maxLinePrefix = "# OVMS_GRAPH_QUEUE_MAX_SIZE:"; + if (input.rfind(maxLinePrefix, 0) == 0) { + auto lineEnd = input.find("\n"); + if (lineEnd == std::string::npos) { + return ""; + } + input.erase(0, lineEnd + 1); } return input; } diff --git a/src/test/pythonnode_test.cpp b/src/test/pythonnode_test.cpp index 3f7495650c..170fa49306 100644 --- a/src/test/pythonnode_test.cpp +++ b/src/test/pythonnode_test.cpp @@ -1017,7 +1017,7 @@ TEST_F(PythonFlowTest, SerializePyObjectWrapperToKServeResponse) { const std::vector outputNames; const ::mediapipe::CalculatorGraphConfig config; auto sidePackets = std::make_shared(); - std::shared_ptr queue = std::make_shared(config, sidePackets, 1); + std::shared_ptr queue = std::make_shared(config, sidePackets, 1, 1); GraphIdGuard guard(queue); auto executor = MockedMediapipeGraphExecutorPy("", "", config, mapping, mapping, inputNames, outputNames, *sidePackets, getPythonBackend(), this->reporter.get(), std::move(guard)); diff --git a/src/test/streaming_test.cpp b/src/test/streaming_test.cpp index edadeb15de..0f8bf87d2f 100644 --- a/src/test/streaming_test.cpp +++ b/src/test/streaming_test.cpp @@ -83,7 +83,7 @@ class StreamingQueueTest : public StreamingTest { std::vector outputNames, int queueSize = 1) { auto sidePackets = std::make_shared(); - queue = std::make_shared(config, sidePackets, queueSize); + queue = std::make_shared(config, sidePackets, queueSize, queueSize); GraphIdGuard graphIdGuard(queue); return MediapipeGraphExecutor{ this->name,