-
Notifications
You must be signed in to change notification settings - Fork 1
Implemented automatic grpc loop with reconnection #87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f016fc1
73280da
8ec4324
5c03d8a
046e1d8
8f1123c
b30b557
e6d2677
10288f1
eecf2a3
0e2c7bf
c023fda
cf96237
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,20 +1,27 @@ | ||
| #include "configuration.h" | ||
|
|
||
| #include <grpcpp/grpcpp.h> | ||
| #include <grpcpp/security/credentials.h> | ||
|
|
||
| #include <algorithm> | ||
| #include <cstdlib> | ||
| #include <fstream> | ||
| #include <map> | ||
| #include <sstream> | ||
| #include <string> | ||
| #include <utility> | ||
| #include <vector> | ||
|
|
||
| #include "absl/log/log.h" | ||
| #include "absl/status/statusor.h" | ||
| #include "absl/strings/str_cat.h" | ||
| #include "nlohmann/json.hpp" | ||
|
|
||
| namespace flagd { | ||
|
|
||
| namespace { | ||
| constexpr double kMsInSecond = 1000.0; | ||
|
|
||
| struct EnvVars { | ||
| static constexpr std::string_view kHost = "FLAGD_HOST"; | ||
| static constexpr std::string_view kPort = "FLAGD_PORT"; | ||
|
|
@@ -23,19 +30,49 @@ struct EnvVars { | |
| static constexpr std::string_view kSocketPath = "FLAGD_SOCKET_PATH"; | ||
| static constexpr std::string_view kServerCertPath = "FLAGD_SERVER_CERT_PATH"; | ||
| static constexpr std::string_view kDeadlineMs = "FLAGD_DEADLINE_MS"; | ||
| static constexpr std::string_view kStreamDeadlineMs = | ||
| "FLAGD_STREAM_DEADLINE_MS"; | ||
| static constexpr std::string_view kRetryBackoffMs = "FLAGD_RETRY_BACKOFF_MS"; | ||
| static constexpr std::string_view kRetryBackoffMaxMs = | ||
| "FLAGD_RETRY_BACKOFF_MAX_MS"; | ||
| static constexpr std::string_view kRetryGracePeriod = | ||
| "FLAGD_RETRY_GRACE_PERIOD"; | ||
| static constexpr std::string_view kKeepAliveTimeMs = | ||
| "FLAGD_KEEP_ALIVE_TIME_MS"; | ||
| static constexpr std::string_view kFatalStatusCodes = | ||
| "FLAGD_FATAL_STATUS_CODES"; | ||
| static constexpr std::string_view kSourceSelector = "FLAGD_SOURCE_SELECTOR"; | ||
| static constexpr std::string_view kProviderId = "FLAGD_PROVIDER_ID"; | ||
| static constexpr std::string_view kOfflineFlagSourcePath = | ||
| "FLAGD_OFFLINE_FLAG_SOURCE_PATH"; | ||
| static constexpr std::string_view kOfflinePollMs = "FLAGD_OFFLINE_POLL_MS"; | ||
| }; | ||
|
|
||
| struct Defaults { | ||
| static constexpr std::string_view kHost = "localhost"; | ||
| static constexpr int kPortInProcess = 8015; | ||
| static constexpr bool kTls = false; | ||
| static constexpr int kDeadlineMs = 500; | ||
| static constexpr int kOfflinePollMs = 5000; | ||
| struct Validation { | ||
| static constexpr int kMinPort = 1; | ||
| static constexpr int kMaxPort = 65535; | ||
| static constexpr int kMinStatusCode = 0; | ||
| static constexpr int kMaxStatusCode = 16; | ||
| }; | ||
|
|
||
| const std::map<std::string, int> kStatusCodeMap = { | ||
| {"OK", 0}, | ||
| {"CANCELLED", 1}, | ||
| {"UNKNOWN", 2}, | ||
| {"INVALID_ARGUMENT", 3}, | ||
| {"DEADLINE_EXCEEDED", 4}, | ||
| {"NOT_FOUND", 5}, | ||
| {"ALREADY_EXISTS", 6}, | ||
| {"PERMISSION_DENIED", 7}, | ||
| {"UNAUTHENTICATED", 16}, | ||
| {"RESOURCE_EXHAUSTED", 8}, | ||
| {"FAILED_PRECONDITION", 9}, | ||
| {"ABORTED", 10}, | ||
| {"OUT_OF_RANGE", 11}, | ||
| {"UNIMPLEMENTED", 12}, | ||
| {"INTERNAL", 13}, | ||
| {"UNAVAILABLE", 14}, | ||
| {"DATA_LOSS", 15}, | ||
| }; | ||
| } // namespace | ||
|
|
||
|
|
@@ -69,31 +106,88 @@ static bool GetEnvBool(const std::string_view name, bool default_value) { | |
| return str == "true" || str == "1"; | ||
| } | ||
|
|
||
| FlagdProviderConfig::FlagdProviderConfig() | ||
| : host_(GetEnvStr(EnvVars::kHost, Defaults::kHost)), | ||
| port_(GetEnvInt(EnvVars::kPort, Defaults::kPortInProcess)), | ||
| tls_(GetEnvBool(EnvVars::kTls, Defaults::kTls)), | ||
| deadline_ms_(GetEnvInt(EnvVars::kDeadlineMs, Defaults::kDeadlineMs)), | ||
| offline_poll_interval_ms_( | ||
| GetEnvInt(EnvVars::kOfflinePollMs, Defaults::kOfflinePollMs)) { | ||
| static bool IsValidPort(int port) { | ||
| return port >= Validation::kMinPort && port <= Validation::kMaxPort; | ||
| } | ||
|
|
||
| static bool IsValidStatusCode(int code) { | ||
| return code >= Validation::kMinStatusCode && | ||
| code <= Validation::kMaxStatusCode; | ||
| } | ||
|
|
||
| static std::vector<int> ParseFatalStatusCodes(const std::string& str) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. const std::string& -> string_view ? |
||
| std::vector<int> result; | ||
| std::stringstream sstream(str); | ||
| std::string item; | ||
| while (std::getline(sstream, item, ',')) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know you want to reduce abseil dependencies, so just sharing as FYI that in Google world it would be for (std::string_view item : absl::StrSplit(str, ',', absl::SkipWhitespace())) { :)) |
||
| item.erase(0, item.find_first_not_of(" \t\n\r")); | ||
| item.erase(item.find_last_not_of(" \t\n\r") + 1); | ||
|
|
||
| if (item.empty()) continue; | ||
|
|
||
| try { | ||
| int code = std::stoi(item); | ||
| if (IsValidStatusCode(code)) { | ||
| result.push_back(code); | ||
| } else { | ||
| LOG(WARNING) << "Invalid gRPC status code: " << code; | ||
| } | ||
| continue; | ||
| } catch (const std::invalid_argument&) { | ||
| // Not an integer, try parsing as a status code string. | ||
| } catch (const std::out_of_range&) { | ||
| // Not a valid integer, try parsing as a status code string. | ||
| } | ||
|
m-olko marked this conversation as resolved.
|
||
|
|
||
| auto iter = kStatusCodeMap.find(item); | ||
| if (iter != kStatusCodeMap.end()) { | ||
| result.push_back(iter->second); | ||
| } else { | ||
| LOG(WARNING) << "Unknown gRPC status code: " << item; | ||
| } | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| FlagdProviderConfig::FlagdProviderConfig() { | ||
| SetHost(GetEnvStr(EnvVars::kHost, Defaults::kHost)); | ||
| SetPort(GetEnvInt(EnvVars::kPort, Defaults::kPortInProcess)); | ||
| SetTls(GetEnvBool(EnvVars::kTls, Defaults::kTls)); | ||
| SetDeadlineMs(GetEnvInt(EnvVars::kDeadlineMs, Defaults::kDeadlineMs)); | ||
| SetStreamDeadlineMs( | ||
| GetEnvInt(EnvVars::kStreamDeadlineMs, Defaults::kStreamDeadlineMs)); | ||
| SetRetryBackoffMs( | ||
| GetEnvInt(EnvVars::kRetryBackoffMs, Defaults::kRetryBackoffMs)); | ||
| SetRetryBackoffMaxMs( | ||
| GetEnvInt(EnvVars::kRetryBackoffMaxMs, Defaults::kRetryBackoffMaxMs)); | ||
| SetRetryGracePeriod( | ||
| GetEnvInt(EnvVars::kRetryGracePeriod, Defaults::kRetryGracePeriod)); | ||
| SetKeepAliveTimeMs( | ||
| GetEnvInt(EnvVars::kKeepAliveTimeMs, Defaults::kKeepAliveTimeMs)); | ||
| SetOfflinePollIntervalMs( | ||
| GetEnvInt(EnvVars::kOfflinePollMs, Defaults::kOfflinePollMs)); | ||
|
|
||
| if (std::string val = GetEnvStr(EnvVars::kTargetUri); !val.empty()) { | ||
| target_uri_ = val; | ||
| SetTargetUri(val); | ||
| } | ||
| if (std::string val = GetEnvStr(EnvVars::kSocketPath); !val.empty()) { | ||
| socket_path_ = val; | ||
| SetSocketPath(val); | ||
| } | ||
| if (std::string val = GetEnvStr(EnvVars::kServerCertPath); !val.empty()) { | ||
| cert_path_ = val; | ||
| SetCertPath(val); | ||
| } | ||
| if (std::string val = GetEnvStr(EnvVars::kSourceSelector); !val.empty()) { | ||
| selector_ = val; | ||
| SetSelector(val); | ||
| } | ||
| if (std::string val = GetEnvStr(EnvVars::kProviderId); !val.empty()) { | ||
| provider_id_ = val; | ||
| SetProviderId(val); | ||
| } | ||
| if (std::string val = GetEnvStr(EnvVars::kOfflineFlagSourcePath); | ||
| !val.empty()) { | ||
| offline_flag_source_path_ = val; | ||
| SetOfflineFlagSourcePath(val); | ||
| } | ||
| if (std::string val = GetEnvStr(EnvVars::kFatalStatusCodes); !val.empty()) { | ||
| SetFatalStatusCodes(val); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -150,6 +244,50 @@ std::optional<std::string> FlagdProviderConfig::GetCertPath() const { | |
| return cert_path_; | ||
| } | ||
| int FlagdProviderConfig::GetDeadlineMs() const { return deadline_ms_; } | ||
| int FlagdProviderConfig::GetStreamDeadlineMs() const { | ||
| return stream_deadline_ms_; | ||
| } | ||
| int FlagdProviderConfig::GetRetryBackoffMs() const { return retry_backoff_ms_; } | ||
| int FlagdProviderConfig::GetRetryBackoffMaxMs() const { | ||
| return retry_backoff_max_ms_; | ||
| } | ||
| int FlagdProviderConfig::GetRetryGracePeriod() const { | ||
| return retry_grace_period_; | ||
| } | ||
| int FlagdProviderConfig::GetKeepAliveTimeMs() const { | ||
| return keep_alive_time_ms_; | ||
| } | ||
| const std::vector<int>& FlagdProviderConfig::GetFatalStatusCodes() const { | ||
| return fatal_status_codes_; | ||
| } | ||
|
|
||
| std::string FlagdProviderConfig::GetServiceConfigJson() const { | ||
| const auto names = nlohmann::json::array({ | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can use "static const" to avoid re-initializing this object on every call |
||
| nlohmann::json::object({{"service", "flagd.evaluation.v1.Service"}}), | ||
| nlohmann::json::object({{"service", "flagd.sync.v1.FlagSyncService"}}), | ||
| }); | ||
|
|
||
| const auto retry_policy = nlohmann::json::object({ | ||
| {"maxAttempts", 4}, | ||
|
m-olko marked this conversation as resolved.
|
||
| {"initialBackoff", absl::StrCat(retry_backoff_ms_ / kMsInSecond, "s")}, | ||
| {"maxBackoff", absl::StrCat(retry_backoff_max_ms_ / kMsInSecond, "s")}, | ||
| {"backoffMultiplier", 2}, | ||
|
m-olko marked this conversation as resolved.
|
||
| { | ||
| "retryableStatusCodes", | ||
| nlohmann::json::array({"UNAVAILABLE", "UNKNOWN"}), | ||
| }, | ||
| }); | ||
|
|
||
| const auto method_config = nlohmann::json::object({ | ||
| {"name", names}, | ||
| {"retryPolicy", retry_policy}, | ||
| }); | ||
|
|
||
| return nlohmann::json::object( | ||
| {{"methodConfig", nlohmann::json::array({method_config})}}) | ||
| .dump(); | ||
| } | ||
|
m-olko marked this conversation as resolved.
|
||
|
|
||
| std::optional<std::string> FlagdProviderConfig::GetSelector() const { | ||
| return selector_; | ||
| } | ||
|
|
@@ -170,6 +308,10 @@ FlagdProviderConfig& FlagdProviderConfig::SetHost(std::string_view host) { | |
| return *this; | ||
| } | ||
| FlagdProviderConfig& FlagdProviderConfig::SetPort(int port) { | ||
| if (!IsValidPort(port)) { | ||
| LOG(WARNING) << "Invalid port: " << port << ". Ignoring."; | ||
| return *this; | ||
| } | ||
| port_ = port; | ||
| return *this; | ||
| } | ||
|
|
@@ -195,9 +337,81 @@ FlagdProviderConfig& FlagdProviderConfig::SetCertPath(std::string_view path) { | |
| return *this; | ||
| } | ||
| FlagdProviderConfig& FlagdProviderConfig::SetDeadlineMs(int deadline_ms) { | ||
| if (deadline_ms <= 0) { | ||
| LOG(WARNING) << "Invalid deadline_ms: " << deadline_ms << ". Ignoring."; | ||
| return *this; | ||
| } | ||
| deadline_ms_ = deadline_ms; | ||
| return *this; | ||
| } | ||
| FlagdProviderConfig& FlagdProviderConfig::SetStreamDeadlineMs( | ||
| int stream_deadline_ms) { | ||
| if (stream_deadline_ms <= 0) { | ||
| LOG(WARNING) << "Invalid stream_deadline_ms: " << stream_deadline_ms | ||
| << ". Ignoring."; | ||
| return *this; | ||
| } | ||
| stream_deadline_ms_ = stream_deadline_ms; | ||
| return *this; | ||
| } | ||
| FlagdProviderConfig& FlagdProviderConfig::SetRetryBackoffMs( | ||
| int retry_backoff_ms) { | ||
| if (retry_backoff_ms <= 0) { | ||
| LOG(WARNING) << "Invalid retry_backoff_ms: " << retry_backoff_ms | ||
| << ". Ignoring."; | ||
| return *this; | ||
| } | ||
| retry_backoff_ms_ = retry_backoff_ms; | ||
| return *this; | ||
|
m-olko marked this conversation as resolved.
|
||
| } | ||
| FlagdProviderConfig& FlagdProviderConfig::SetRetryBackoffMaxMs( | ||
| int retry_backoff_max_ms) { | ||
| if (retry_backoff_max_ms <= 0) { | ||
| LOG(WARNING) << "Invalid retry_backoff_max_ms: " << retry_backoff_max_ms | ||
| << ". Ignoring."; | ||
| return *this; | ||
| } | ||
| retry_backoff_max_ms_ = retry_backoff_max_ms; | ||
| return *this; | ||
| } | ||
| FlagdProviderConfig& FlagdProviderConfig::SetRetryGracePeriod( | ||
| int retry_grace_period) { | ||
| if (retry_grace_period < 0) { | ||
| LOG(WARNING) << "Invalid retry_grace_period: " << retry_grace_period | ||
| << ". Ignoring."; | ||
| return *this; | ||
| } | ||
| retry_grace_period_ = retry_grace_period; | ||
| return *this; | ||
| } | ||
| FlagdProviderConfig& FlagdProviderConfig::SetKeepAliveTimeMs( | ||
| int keep_alive_time_ms) { | ||
| if (keep_alive_time_ms < 0) { | ||
| LOG(WARNING) << "Invalid keep_alive_time_ms: " << keep_alive_time_ms | ||
| << ". Ignoring."; | ||
| return *this; | ||
| } | ||
| keep_alive_time_ms_ = keep_alive_time_ms; | ||
| return *this; | ||
| } | ||
| FlagdProviderConfig& FlagdProviderConfig::SetFatalStatusCodes( | ||
| const std::vector<int>& fatal_status_codes) { | ||
| std::vector<int> valid_codes; | ||
| for (int code : fatal_status_codes) { | ||
| if (IsValidStatusCode(code)) { | ||
| valid_codes.push_back(code); | ||
| } else { | ||
| LOG(WARNING) << "Invalid gRPC status code: " << code << ". Ignoring."; | ||
| } | ||
| } | ||
| fatal_status_codes_ = std::move(valid_codes); | ||
| return *this; | ||
| } | ||
|
m-olko marked this conversation as resolved.
|
||
| FlagdProviderConfig& FlagdProviderConfig::SetFatalStatusCodes( | ||
| const std::string& fatal_status_codes_str) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case you decide to change ParseFatalStatusCodes, string_view here as well |
||
| fatal_status_codes_ = ParseFatalStatusCodes(fatal_status_codes_str); | ||
| return *this; | ||
| } | ||
| FlagdProviderConfig& FlagdProviderConfig::SetSelector( | ||
| std::string_view selector) { | ||
| selector_ = std::string(selector); | ||
|
|
@@ -215,6 +429,11 @@ FlagdProviderConfig& FlagdProviderConfig::SetOfflineFlagSourcePath( | |
| } | ||
| FlagdProviderConfig& FlagdProviderConfig::SetOfflinePollIntervalMs( | ||
| int interval_ms) { | ||
| if (interval_ms <= 0) { | ||
| LOG(WARNING) << "Invalid offline_poll_interval_ms: " << interval_ms | ||
| << ". Ignoring."; | ||
| return *this; | ||
| } | ||
| offline_poll_interval_ms_ = interval_ms; | ||
| return *this; | ||
|
m-olko marked this conversation as resolved.
|
||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it same as this?
const std::map<std::string, grpc::StatusCode> kStatusCodeMap = {
{"OK", grpc::StatusCode::OK},
{"CANCELLED", grpc::StatusCode::CANCELLED},
{"UNKNOWN", grpc::StatusCode::UNKNOWN},
{"INVALID_ARGUMENT", grpc::StatusCode::INVALID_ARGUMENT},
{"DEADLINE_EXCEEDED", grpc::StatusCode::DEADLINE_EXCEEDED},
...
};