Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions extensions/prometheus/PrometheusMetricsPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ void PrometheusMetricsPublisher::clearMetricNodes() {
gauge_collection_.reset();
}

void PrometheusMetricsPublisher::loadMetricNodes() {
void PrometheusMetricsPublisher::loadMetricNodes(core::ProcessGroup* root) {
logger_->log_debug("Loading all metric nodes.");
std::lock_guard<std::mutex> lock(registered_metrics_mutex_);
gauge_collection_ = std::make_shared<PublishedMetricGaugeCollection>(getMetricProviders(), agent_identifier_);
gauge_collection_ = std::make_shared<PublishedMetricGaugeCollection>(getMetricProviders(root), agent_identifier_);
exposer_->registerMetric(gauge_collection_);
}

std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> PrometheusMetricsPublisher::getMetricProviders() const {
std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> PrometheusMetricsPublisher::getMetricProviders(core::ProcessGroup* root) const {
gsl_Expects(response_node_loader_ && configuration_);
std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> nodes;
auto metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_prometheus_metrics_publisher_metrics);
Expand All @@ -89,14 +89,16 @@ std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> Prom
auto metric_classes = utils::string::splitAndTrimRemovingEmpty(*metric_classes_str, ",");
std::unordered_set<std::string> unique_metric_classes{metric_classes.begin(), metric_classes.end()};
for (const std::string& clazz : unique_metric_classes) {
auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
auto response_nodes = response_node_loader_->loadResponseNodes(clazz, root);
if (response_nodes.empty()) {
logger_->log_warn("Metric class '{}' could not be loaded.", clazz);
continue;
}
for (const auto& response_node : response_nodes) {
logger_->log_info("Loading metric node '{}'", response_node->getName());
nodes.push_back(response_node);
if (root) {
logger_->log_warn("Metric class '{}' could not be loaded.", clazz);
}
} else {
for (const auto& response_node : response_nodes) {
logger_->log_info("Loading metric node '{}'", response_node->getName());
nodes.push_back(response_node);
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions extensions/prometheus/PrometheusMetricsPublisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ class PrometheusMetricsPublisher : public state::MetricsPublisherImpl {

void initialize(const std::shared_ptr<Configure>& configuration, const std::shared_ptr<state::response::ResponseNodeLoader>& response_node_loader) override;
void clearMetricNodes() override;
void loadMetricNodes() override;
void loadMetricNodes(core::ProcessGroup* root) override;

private:
PrometheusExposerConfig readExposerConfig() const;
std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> getMetricProviders() const;
std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> getMetricProviders(core::ProcessGroup* root) const;
void loadAgentIdentifier();

std::mutex registered_metrics_mutex_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ TEST_CASE_METHOD(PrometheusPublisherTestFixtureWithDummyExposer, "Test adding me
}
configuration_->set(Configure::nifi_metrics_publisher_agent_identifier, "AgentId-1");
publisher_->initialize(configuration_, response_node_loader_);
publisher_->loadMetricNodes();
publisher_->loadMetricNodes(nullptr);
auto stored_metrics = exposer_->getMetrics();
std::vector<std::string> valid_metrics_without_flow = {"QueueMetrics", "RepositoryMetrics", "DeviceInfoNode", "FlowInformation", "AgentInformation"};
REQUIRE(stored_metrics);
Expand Down
8 changes: 8 additions & 0 deletions extensions/prometheus/tests/features/prometheus.feature
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ Feature: MiNiFi can publish metrics to Prometheus server
And "DeviceInfoNode" is published to the Prometheus server in less than 60 seconds
And "AgentStatus" is published to the Prometheus server in less than 60 seconds
And all Prometheus metric types are only defined once
And the Minifi logs do not contain errors
And the Minifi logs do not contain warnings

Scenario: Published metrics are scraped by Prometheus server through SSL connection
Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/input"
Expand All @@ -52,6 +54,8 @@ Feature: MiNiFi can publish metrics to Prometheus server
And "FlowInformation" is published to the Prometheus server in less than 60 seconds
And "DeviceInfoNode" is published to the Prometheus server in less than 60 seconds
And "AgentStatus" is published to the Prometheus server in less than 60 seconds
And the Minifi logs do not contain errors
And the Minifi logs do not contain warnings

Scenario: Multiple GetFile metrics are reported by Prometheus
Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/input"
Expand All @@ -62,8 +66,12 @@ Feature: MiNiFi can publish metrics to Prometheus server
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the GetFile1 processor is connected to the PutFile
And the "success" relationship of the GetFile2 processor is connected to the PutFile
And PutFile's success relationship is auto-terminated
And PutFile's failure relationship is auto-terminated
And Prometheus is enabled in MiNiFi
And a Prometheus server is set up
When all instances start up
Then "GetFileMetrics" processor metric is published to the Prometheus server in less than 60 seconds for "GetFile1" processor
And "GetFileMetrics" processor metric is published to the Prometheus server in less than 60 seconds for "GetFile2" processor
And the Minifi logs do not contain errors
And the Minifi logs do not contain warnings
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ Feature: Core flow functionalities
And the Minifi logs contain the following message: ' "size": "0"' in less than 2 seconds
And the Minifi logs contain the following message: ' },' in less than 2 seconds
And the Minifi logs contain the following message: ' "provenance": {' in less than 2 seconds
And the Minifi logs do not contain errors
And the Minifi logs do not contain warnings

Scenario: MiNiFi uses parameter contexts correctly
Given parameter context name is set to 'my-context'
Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/c2/C2MetricsPublisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class C2MetricsPublisher : public state::response::NodeReporter, public state::M
state::response::NodeReporter::ReportedNode getAgentManifest() override;

void clearMetricNodes() override;
void loadMetricNodes() override;
void loadMetricNodes(core::ProcessGroup* root) override;

private:
void loadC2ResponseConfiguration(const std::string &prefix);
Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/c2/ControllerSocketMetricsPublisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ControllerSocketMetricsPublisher : public state::MetricsPublisherImpl, pub
MINIFIAPI static constexpr const char* Description = "Provides the response nodes for c2 operations through localized environment through a simple TCP socket.";

void clearMetricNodes() override;
void loadMetricNodes() override;
void loadMetricNodes(core::ProcessGroup* root) override;

std::unordered_map<std::string, QueueSize> getQueueSizes() override;
std::unordered_set<std::string> getFullConnections() override;
Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/core/state/LogMetricsPublisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LogMetricsPublisher : public MetricsPublisherImpl {

void initialize(const std::shared_ptr<Configure>& configuration, const std::shared_ptr<state::response::ResponseNodeLoader>& response_node_loader) override;
void clearMetricNodes() override;
void loadMetricNodes() override;
void loadMetricNodes(core::ProcessGroup* root) override;
~LogMetricsPublisher() override;

private:
Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/core/state/nodes/ResponseNodeLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ResponseNodeLoaderImpl : public ResponseNodeLoader {
void clearConfigRoot() override;
void setControllerServiceProvider(core::controller::ControllerServiceProvider* controller) override;
void setStateMonitor(state::StateMonitor* update_sink) override;
std::vector<SharedResponseNode> loadResponseNodes(const std::string& clazz) override;
std::vector<SharedResponseNode> loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) override;
state::response::NodeReporter::ReportedNode getAgentManifest() const override;

private:
Expand Down
8 changes: 4 additions & 4 deletions libminifi/src/c2/C2MetricsPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void C2MetricsPublisher::loadNodeClasses(const std::string& class_definitions, c
auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};
for (const std::string& clazz : unique_classes) {
auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
auto response_nodes = response_node_loader_->loadResponseNodes(clazz, nullptr);
if (response_nodes.empty()) {
continue;
}
Expand Down Expand Up @@ -153,7 +153,7 @@ std::optional<state::response::NodeReporter::ReportedNode> C2MetricsPublisher::g
};

if (!metrics_class.empty()) {
auto metrics_nodes = response_node_loader_->loadResponseNodes(metrics_class);
auto metrics_nodes = response_node_loader_->loadResponseNodes(metrics_class, nullptr);
if (!metrics_nodes.empty()) {
return createReportedNode(metrics_nodes);
}
Expand Down Expand Up @@ -193,7 +193,7 @@ std::vector<state::response::NodeReporter::ReportedNode> C2MetricsPublisher::get
return reported_nodes;
}

void C2MetricsPublisher::loadMetricNodes() {
void C2MetricsPublisher::loadMetricNodes(core::ProcessGroup*) {
gsl_Expects(response_node_loader_ && configuration_);
if (!root_response_nodes_.empty()) {
return;
Expand All @@ -205,7 +205,7 @@ void C2MetricsPublisher::loadMetricNodes() {
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};

for (const std::string& clazz : unique_classes) {
auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
auto response_nodes = response_node_loader_->loadResponseNodes(clazz, nullptr);
if (response_nodes.empty()) {
continue;
}
Expand Down
4 changes: 2 additions & 2 deletions libminifi/src/c2/ControllerSocketMetricsPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ void ControllerSocketMetricsPublisher::clearMetricNodes() {
queue_metrics_node_.reset();
}

void ControllerSocketMetricsPublisher::loadMetricNodes() {
void ControllerSocketMetricsPublisher::loadMetricNodes(core::ProcessGroup*) {
std::lock_guard<std::mutex> guard(queue_metrics_node_mutex_);
gsl_Expects(response_node_loader_);
auto nodes = response_node_loader_->loadResponseNodes("QueueMetrics");
auto nodes = response_node_loader_->loadResponseNodes("QueueMetrics", nullptr);
if (!nodes.empty()) {
queue_metrics_node_ = nodes[0];
}
Expand Down
14 changes: 8 additions & 6 deletions libminifi/src/core/state/LogMetricsPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void LogMetricsPublisher::clearMetricNodes() {
}
}

void LogMetricsPublisher::loadMetricNodes() {
void LogMetricsPublisher::loadMetricNodes(core::ProcessGroup* root) {
gsl_Expects(response_node_loader_ && configuration_);
auto metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_log_metrics_publisher_metrics);
if (!metric_classes_str || metric_classes_str->empty()) {
Expand All @@ -103,15 +103,17 @@ void LogMetricsPublisher::loadMetricNodes() {
std::unordered_set<std::string> unique_metric_classes{metric_classes.begin(), metric_classes.end()};
std::lock_guard<std::mutex> lock(response_nodes_mutex_);
for (const std::string& clazz : unique_metric_classes) {
auto loaded_response_nodes = response_node_loader_->loadResponseNodes(clazz);
auto loaded_response_nodes = response_node_loader_->loadResponseNodes(clazz, root);
if (loaded_response_nodes.empty()) {
logger_->log_warn("Metric class '{}' could not be loaded.", clazz);
continue;
if (root) {
logger_->log_warn("Metric class '{}' could not be loaded.", clazz);
}
} else {
response_nodes_.insert(response_nodes_.end(), loaded_response_nodes.begin(), loaded_response_nodes.end());
}
response_nodes_.insert(response_nodes_.end(), loaded_response_nodes.begin(), loaded_response_nodes.end());
}
}
if (response_nodes_.empty()) {
if (root && response_nodes_.empty()) {
logger_->log_warn("LogMetricsPublisher is configured without any valid metrics!");
}
if (response_nodes_.empty() && metrics_logger_thread_) {
Expand Down
2 changes: 1 addition & 1 deletion libminifi/src/core/state/MetricsPublisherStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void MetricsPublisherStore::initialize(core::controller::ControllerServiceProvid
void MetricsPublisherStore::loadMetricNodes(core::ProcessGroup* root) {
response_node_loader_->setNewConfigRoot(root);
for (const auto& [name, publisher]: metrics_publishers_) {
publisher->loadMetricNodes();
publisher->loadMetricNodes(root);
}
}

Expand Down
7 changes: 4 additions & 3 deletions libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ std::vector<SharedResponseNode> ResponseNodeLoaderImpl::getResponseNodes(const s
}
auto response_node = getSystemMetricsNode(clazz);
if (!response_node) {
logger_->log_error("{}", response_node.error());
return {};
}
return {*response_node};
Expand Down Expand Up @@ -245,10 +244,12 @@ void ResponseNodeLoaderImpl::initializeFlowInformation(const SharedResponseNode&
}
}

std::vector<SharedResponseNode> ResponseNodeLoaderImpl::loadResponseNodes(const std::string& clazz) {
std::vector<SharedResponseNode> ResponseNodeLoaderImpl::loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) {
auto response_nodes = getResponseNodes(clazz);
if (response_nodes.empty()) {
logger_->log_error("No metric defined for {}", clazz);
if (root) {
logger_->log_error("No metric defined for {}", clazz);
}
return {};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class ControllerSocketMetricsPublisherTestFixture {
};

TEST_CASE_METHOD(ControllerSocketMetricsPublisherTestFixture, "Load and clear", "[ControllerSocketMetricsPublisher]") {
controller_socket_metrics_publisher_.loadMetricNodes();
controller_socket_metrics_publisher_.loadMetricNodes(nullptr);
auto node = controller_socket_metrics_publisher_.getQueueMetricsNode();
auto queue_metrics = dynamic_cast<state::response::QueueMetrics*>(node.get());
REQUIRE(queue_metrics);
Expand Down
18 changes: 10 additions & 8 deletions libminifi/test/unit/LogMetricsPublisherTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,19 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify empty metrics if no valid metr
configuration_->set(Configure::nifi_metrics_publisher_metrics, "InvalidMetric,NotValidMetricNode");
}
publisher_->initialize(configuration_, response_node_loader_);
publisher_->loadMetricNodes();
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
REQUIRE(verifyLogLinePresenceInPollTime(5s, "LogMetricsPublisher is configured without any valid metrics!"));
publisher_->loadMetricNodes(nullptr);
CHECK_FALSE(utils::verifyLogLinePresenceInPollTime(0s, "LogMetricsPublisher is configured without any valid metrics!"));
auto root_node = core::ProcessGroup(core::ROOT_PROCESS_GROUP, "root");
publisher_->loadMetricNodes(&root_node);
CHECK(utils::verifyLogLinePresenceInPollTime(5s, "LogMetricsPublisher is configured without any valid metrics!"));
}

TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs", "[LogMetricsPublisher]") {
LogTestController::getInstance().setTrace<minifi::state::LogMetricsPublisher>();
configuration_->set(minifi::Configuration::nifi_metrics_publisher_log_metrics_logging_interval, "100ms");
configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics,DeviceInfoNode");
publisher_->initialize(configuration_, response_node_loader_);
publisher_->loadMetricNodes();
publisher_->loadMetricNodes(nullptr);
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
std::string expected_log_1 = R"([info] {
"LogMetrics": {)";
Expand Down Expand Up @@ -131,7 +133,7 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics",
configuration_->set(minifi::Configuration::nifi_metrics_publisher_log_metrics_logging_interval, "100ms");
configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics");
publisher_->initialize(configuration_, response_node_loader_);
publisher_->loadMetricNodes();
publisher_->loadMetricNodes(nullptr);
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
std::string expected_log = R"([info] {
"LogMetrics": {
Expand Down Expand Up @@ -162,7 +164,7 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics",
LogTestController::getInstance().reset();
LogTestController::getInstance().setTrace<minifi::state::LogMetricsPublisher>();
configuration_->set(Configure::nifi_metrics_publisher_metrics, "DeviceInfoNode");
publisher_->loadMetricNodes();
publisher_->loadMetricNodes(nullptr);
expected_log = R"([info] {
"LogMetrics": {
"deviceInfo": {
Expand All @@ -184,7 +186,7 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify generic and publisher specific
configuration_->set(Configure::nifi_metrics_publisher_metrics, "DeviceInfoNode");
}
publisher_->initialize(configuration_, response_node_loader_);
publisher_->loadMetricNodes();
publisher_->loadMetricNodes(nullptr);
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
std::string expected_log = R"([info] {
"LogMetrics": {
Expand Down Expand Up @@ -219,7 +221,7 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property fo
configuration_->set(minifi::Configuration::nifi_metrics_publisher_log_metrics_log_level, "dEbUg");
configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics");
publisher_->initialize(configuration_, response_node_loader_);
publisher_->loadMetricNodes();
publisher_->loadMetricNodes(nullptr);
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
std::string expected_log = R"([debug] {
"LogMetrics": {
Expand Down
4 changes: 2 additions & 2 deletions libminifi/test/unit/MetricsPublisherStoreTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class FirstDummyMetricsPublisher : public minifi::state::MetricsPublisherImpl {
static constexpr const char* Description = "FirstDummyMetricsPublisher";

void clearMetricNodes() override {}
void loadMetricNodes() override {}
void loadMetricNodes(core::ProcessGroup*) override {}
};

class SecondDummyMetricsPublisher : public minifi::state::MetricsPublisherImpl {
Expand All @@ -45,7 +45,7 @@ class SecondDummyMetricsPublisher : public minifi::state::MetricsPublisherImpl {
static constexpr const char* Description = "SecondDummyMetricsPublisher";

void clearMetricNodes() override {}
void loadMetricNodes() override {}
void loadMetricNodes(core::ProcessGroup*) override {}
};

REGISTER_RESOURCE(FirstDummyMetricsPublisher, DescriptionOnly);
Expand Down
Loading
Loading