Skip to content

Instantly share code, notes, and snippets.

@mandarjog
Created February 24, 2026 06:47
Show Gist options
  • Select an option

  • Save mandarjog/1775aa579308f251a343e9cee6b82a3b to your computer and use it in GitHub Desktop.

Select an option

Save mandarjog/1775aa579308f251a343e9cee6b82a3b to your computer and use it in GitHub Desktop.
diff --git a/envoy/router/router.h b/envoy/router/router.h
index 13e2842c9d778..8f8a6bf9f091e 100644
--- a/envoy/router/router.h
+++ b/envoy/router/router.h
@@ -501,6 +501,36 @@ class RetryState {
* return how many times host selection should be reattempted during host selection.
*/
virtual uint32_t hostSelectionMaxAttempts() const PURE;
+
+ /**
+ * Callback type for refreshing the cluster on retry. Implementations receive the current
+ * request headers and stream info, and return a new route pointing to a different cluster,
+ * or nullptr if no cluster refresh is needed.
+ */
+ using ClusterRefreshFunction = std::function<RouteConstSharedPtr(
+ const Http::RequestHeaderMap& headers, StreamInfo::StreamInfo& stream_info)>;
+
+ /**
+ * Set a callback that will be invoked on retry to potentially select a different cluster.
+ * This is used by weighted cluster routes to redirect retries to untried clusters.
+ * @param callback the function to invoke on retry.
+ */
+ virtual void setClusterRefreshCallback(ClusterRefreshFunction callback) {
+ UNREFERENCED_PARAMETER(callback);
+ }
+
+ /**
+ * Called by the router during retry to check if the cluster should be refreshed.
+ * If a ClusterRefreshFunction was set and returns a non-null route, the router will
+ * use that route instead of the original one for the retry attempt.
+ * @param headers the downstream request headers.
+ * @param stream_info the stream info for the request.
+ * @return RouteConstSharedPtr a new route for the retry, or nullptr to use the original.
+ */
+ virtual RouteConstSharedPtr refreshClusterOnRetry(const Http::RequestHeaderMap&,
+ StreamInfo::StreamInfo&) {
+ return nullptr;
+ }
};
using RetryStatePtr = std::unique_ptr<RetryState>;
@@ -1170,6 +1200,15 @@ class RouteEntry : public ResponseEntry {
*/
virtual void refreshRouteCluster(const Http::RequestHeaderMap& headers,
const StreamInfo::StreamInfo& stream_info) const PURE;
+
+ /**
+ * Returns a callback function for refreshing the cluster on retry, if applicable.
+ * Weighted cluster routes override this to provide a callback that selects a different
+ * cluster on retry. The returned callback is set on the RetryState so it can be invoked
+ * during retry processing.
+ * @return a ClusterRefreshFunction, or nullptr if cluster refresh on retry is not supported.
+ */
+ virtual RetryState::ClusterRefreshFunction clusterRefreshCallback() const { return nullptr; }
};
/**
diff --git a/source/common/router/BUILD b/source/common/router/BUILD
index 1a1e30f078a67..c254b9f36f857 100644
--- a/source/common/router/BUILD
+++ b/source/common/router/BUILD
@@ -128,9 +128,13 @@ envoy_cc_library(
":metadatamatchcriteria_lib",
":per_filter_config_lib",
"//envoy/router:cluster_specifier_plugin_interface",
+ "//envoy/router:router_interface",
"//envoy/server:factory_context_interface",
+ "//envoy/stream_info:filter_state_interface",
"//source/common/config:well_known_names",
"//source/common/http:hash_policy_lib",
+ "@com_google_absl//absl/container:flat_hash_set",
+ "@com_google_absl//absl/strings",
],
)
diff --git a/source/common/router/delegating_route_impl.cc b/source/common/router/delegating_route_impl.cc
index f2dd426a768a2..40cc31cbefb6f 100644
--- a/source/common/router/delegating_route_impl.cc
+++ b/source/common/router/delegating_route_impl.cc
@@ -179,5 +179,9 @@ void DelegatingRouteEntry::refreshRouteCluster(const Http::RequestHeaderMap& hea
base_route_entry_->refreshRouteCluster(headers, stream_info);
}
+RetryState::ClusterRefreshFunction DelegatingRouteEntry::clusterRefreshCallback() const {
+ return base_route_entry_->clusterRefreshCallback();
+}
+
} // namespace Router
} // namespace Envoy
diff --git a/source/common/router/delegating_route_impl.h b/source/common/router/delegating_route_impl.h
index 89f616cb0f541..ce6e0de4979b5 100644
--- a/source/common/router/delegating_route_impl.h
+++ b/source/common/router/delegating_route_impl.h
@@ -132,6 +132,7 @@ class DelegatingRouteEntry : public DelegatingRouteBase<RouteEntryAndRoute> {
const RouteStatsContextOptRef routeStatsContext() const override;
void refreshRouteCluster(const Http::RequestHeaderMap& headers,
const StreamInfo::StreamInfo& stream_info) const override;
+ RetryState::ClusterRefreshFunction clusterRefreshCallback() const override;
private:
const RouteEntry* base_route_entry_{};
diff --git a/source/common/router/retry_state_impl.h b/source/common/router/retry_state_impl.h
index c0b7f98ab5859..c348ed17e9324 100644
--- a/source/common/router/retry_state_impl.h
+++ b/source/common/router/retry_state_impl.h
@@ -96,6 +96,18 @@ class RetryStateImpl : public RetryState {
uint32_t hostSelectionMaxAttempts() const override { return host_selection_max_attempts_; }
+ void setClusterRefreshCallback(ClusterRefreshFunction callback) override {
+ cluster_refresh_callback_ = std::move(callback);
+ }
+
+ RouteConstSharedPtr refreshClusterOnRetry(const Http::RequestHeaderMap& headers,
+ StreamInfo::StreamInfo& stream_info) override {
+ if (cluster_refresh_callback_) {
+ return cluster_refresh_callback_(headers, stream_info);
+ }
+ return nullptr;
+ }
+
bool isAutomaticallyConfiguredForHttp3() const { return auto_configured_for_http3_; }
private:
@@ -136,6 +148,8 @@ class RetryStateImpl : public RetryState {
std::vector<ResetHeaderParserSharedPtr> reset_headers_{};
std::chrono::milliseconds reset_max_interval_{};
+ ClusterRefreshFunction cluster_refresh_callback_;
+
// Keep small members (bools, enums and int32s) at the end of class, to reduce alignment overhead.
uint32_t retry_on_{};
uint32_t retries_remaining_{};
diff --git a/source/common/router/router.cc b/source/common/router/router.cc
index 91f2f3e96b214..823ebcaaa059e 100644
--- a/source/common/router/router.cc
+++ b/source/common/router/router.cc
@@ -2190,6 +2190,37 @@ void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry
host_selection_cancelable_.reset();
}
+ // Allow the retry state to refresh the cluster selection. For weighted cluster routes,
+ // this selects a different cluster than the one that just failed.
+ // On the first retry, we lazily wire the callback from the route entry into the retry
+ // state (no work on the happy path). On subsequent retries, the callback is already set
+ // (updated after each successful cluster switch below).
+ if (retry_state_ != nullptr && downstream_headers_ != nullptr) {
+ if (!cluster_refresh_cb_set_) {
+ auto cb = route_entry_->clusterRefreshCallback();
+ if (cb != nullptr) {
+ retry_state_->setClusterRefreshCallback(std::move(cb));
+ cluster_refresh_cb_set_ = true;
+ }
+ }
+
+ auto retry_route =
+ retry_state_->refreshClusterOnRetry(*downstream_headers_, callbacks_->streamInfo());
+ if (retry_route != nullptr) {
+ route_ = std::move(retry_route);
+ route_entry_ = route_->routeEntry();
+ ENVOY_STREAM_LOG(debug, "retry-aware lb: switched to cluster '{}'", *callbacks_,
+ route_entry_->clusterName());
+ // Update the callback so the NEXT retry records THIS cluster as the one that failed,
+ // not the original. This ensures all attempted clusters accumulate in FilterState
+ // across multiple retries (e.g. A fails → B fails → C is tried).
+ auto new_cb = route_entry_->clusterRefreshCallback();
+ if (new_cb != nullptr) {
+ retry_state_->setClusterRefreshCallback(std::move(new_cb));
+ }
+ }
+ }
+
// Clusters can technically get removed by CDS during a retry. Make sure it still exists.
const auto cluster = config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
std::unique_ptr<GenericConnPool> generic_conn_pool;
diff --git a/source/common/router/router.h b/source/common/router/router.h
index a2a3f93e09fa5..3b005694cbad5 100644
--- a/source/common/router/router.h
+++ b/source/common/router/router.h
@@ -303,7 +303,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
Filter(const FilterConfigSharedPtr& config, FilterStats& stats)
: config_(config), stats_(stats), grpc_request_(false), exclude_http_code_stats_(false),
downstream_response_started_(false), downstream_end_stream_(false), is_retry_(false),
- request_buffer_overflowed_(false),
+ cluster_refresh_cb_set_(false), request_buffer_overflowed_(false),
allow_multiplexed_upstream_half_close_(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.allow_multiplexed_upstream_half_close")),
upstream_request_started_(false), orca_load_report_received_(false) {}
@@ -677,6 +677,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
bool downstream_response_started_ : 1;
bool downstream_end_stream_ : 1;
bool is_retry_ : 1;
+ bool cluster_refresh_cb_set_ : 1;
bool include_attempt_count_in_request_ : 1;
bool include_timeout_retry_header_in_request_ : 1;
bool request_buffer_overflowed_ : 1;
diff --git a/source/common/router/weighted_cluster_specifier.cc b/source/common/router/weighted_cluster_specifier.cc
index 4cdf5df10c073..e7ac60a4c6a9a 100644
--- a/source/common/router/weighted_cluster_specifier.cc
+++ b/source/common/router/weighted_cluster_specifier.cc
@@ -3,6 +3,8 @@
#include "source/common/config/well_known_names.h"
#include "source/common/router/config_utility.h"
+#include "absl/strings/str_join.h"
+
namespace Envoy {
namespace Router {
@@ -18,6 +20,53 @@ absl::Status validateWeightedClusterSpecifier(const ClusterWeightProto& cluster)
return absl::InvalidArgumentError(error);
}
+uint64_t WeightedClusterSpecifierPlugin::healthawareClusterWeight(const std::string& cluster_name,
+ uint64_t config_weight) const {
+ if (!health_aware_lb_ || config_weight == 0) {
+ return config_weight;
+ }
+
+ auto* cluster = cluster_manager_.getThreadLocalCluster(cluster_name);
+ if (cluster == nullptr) {
+ return config_weight; // cannot tell cluster health.
+ }
+
+ bool has_healthy = false;
+ for (const auto& ps : cluster->prioritySet().hostSetsPerPriority()) {
+ if (!ps->healthyHosts().empty()) {
+ has_healthy = true;
+ break;
+ }
+ }
+ if (!has_healthy) {
+ const auto& stats = cluster->info()->endpointStats();
+ uint64_t healthy_count = stats.membership_healthy_.value();
+ uint64_t total_count = stats.membership_total_.value();
+ ENVOY_LOG(debug, "unhealthy cluster {} has {} healthy hosts out of {} total hosts",
+ cluster_name, healthy_count, total_count);
+ return 0;
+ }
+
+ return config_weight;
+}
+
+uint64_t WeightedClusterSpecifierPlugin::retryAwareClusterWeight(
+ const std::string& cluster_name, uint64_t config_weight,
+ const AttemptedClustersFilterState* attempted_clusters) const {
+ if (!retry_aware_lb_ || config_weight == 0 || attempted_clusters == nullptr) {
+ return config_weight;
+ }
+
+ if (!attempted_clusters->hasAttempted(cluster_name)) {
+ return config_weight;
+ }
+
+ ENVOY_LOG(debug,
+ "retry-aware lb: zeroing weight for previously attempted single-endpoint cluster {}",
+ cluster_name);
+ return 0;
+}
+
absl::StatusOr<std::shared_ptr<WeightedClustersConfigEntry>> WeightedClustersConfigEntry::create(
const ClusterWeightProto& cluster, const MetadataMatchCriteria* parent_metadata_match,
std::string&& runtime_key, Server::Configuration::ServerFactoryContext& context) {
@@ -70,7 +119,8 @@ WeightedClusterSpecifierPlugin::WeightedClusterSpecifierPlugin(
const WeightedClusterProto& weighted_clusters,
const MetadataMatchCriteria* parent_metadata_match, absl::string_view route_name,
Server::Configuration::ServerFactoryContext& context, absl::Status& creation_status)
- : loader_(context.runtime()), random_value_header_(weighted_clusters.header_name()),
+ : loader_(context.runtime()), cluster_manager_(context.clusterManager()),
+ random_value_header_(weighted_clusters.header_name()),
runtime_key_prefix_(weighted_clusters.runtime_key_prefix()),
use_hash_policy_(weighted_clusters.random_value_specifier_case() ==
WeightedClusterProto::kUseHashPolicy
@@ -114,8 +164,10 @@ WeightedClusterSpecifierPlugin::WeightedClusterSpecifierPlugin(
class WeightedClusterEntry : public DynamicRouteEntry {
public:
WeightedClusterEntry(RouteConstSharedPtr route, std::string&& cluster_name,
- WeightedClustersConfigEntryConstSharedPtr config)
- : DynamicRouteEntry(route, std::move(cluster_name)), config_(std::move(config)) {
+ WeightedClustersConfigEntryConstSharedPtr config,
+ const WeightedClusterSpecifierPlugin* plugin, uint64_t random_value)
+ : DynamicRouteEntry(route, std::move(cluster_name)), config_(std::move(config)),
+ plugin_(plugin), random_value_(random_value) {
ASSERT(config_ != nullptr);
}
@@ -186,6 +238,29 @@ class WeightedClusterEntry : public DynamicRouteEntry {
return result;
}
+ /**
+ * Returns a callback suitable for setting on RetryState via setClusterRefreshCallback().
+ * The callback captures this entry's cluster name, parent route, plugin, and random value,
+ * and delegates to the plugin's retryRoute() method.
+ * @return ClusterRefreshFunction or nullptr if retry-aware LB is not applicable.
+ */
+ RetryState::ClusterRefreshFunction clusterRefreshCallback() const override {
+ if (plugin_ == nullptr || !plugin_->hasRetryAwareAlternatives()) {
+ return nullptr;
+ }
+ // Capture the values needed for retry route computation. We capture by value
+ // since the WeightedClusterEntry may not outlive the retry state.
+ const std::string cluster_name = clusterName();
+ RouteConstSharedPtr parent = base_route_;
+ const WeightedClusterSpecifierPlugin* plugin = plugin_;
+ const uint64_t random_value = random_value_;
+ return [cluster_name, parent, plugin,
+ random_value](const Http::RequestHeaderMap& headers,
+ StreamInfo::StreamInfo& stream_info) -> RouteConstSharedPtr {
+ return plugin->retryRoute(cluster_name, parent, headers, stream_info, random_value);
+ };
+ }
+
private:
const HeaderParser& requestHeaderParser() const {
if (config_->request_headers_parser_ != nullptr) {
@@ -201,6 +276,8 @@ class WeightedClusterEntry : public DynamicRouteEntry {
}
WeightedClustersConfigEntryConstSharedPtr config_;
+ const WeightedClusterSpecifierPlugin* plugin_;
+ const uint64_t random_value_;
};
// Selects a cluster depending on weight parameters from configuration or from headers.
@@ -252,6 +329,7 @@ RouteConstSharedPtr WeightedClusterSpecifierPlugin::pickWeightedCluster(
const bool runtime_key_prefix_configured = !runtime_key_prefix_.empty();
uint32_t total_cluster_weight = total_cluster_weight_;
absl::InlinedVector<uint32_t, 4> cluster_weights;
+ bool use_weight_override = false;
// if runtime config is used, we need to recompute total_weight.
if (runtime_key_prefix_configured) {
@@ -273,12 +351,76 @@ RouteConstSharedPtr WeightedClusterSpecifierPlugin::pickWeightedCluster(
}
total_cluster_weight += cluster_weight;
}
+ use_weight_override = true;
}
+ // if total_weight is zero, it means the config is invalid.
if (total_cluster_weight == 0) {
IS_ENVOY_BUG("Sum of weight cannot be zero");
return nullptr;
}
+
+ // recompute total_weight based on cluster health, overrides config values.
+ // This step is not needed for a single cluster.
+ if (health_aware_lb_ && weighted_clusters_.size() > 1) {
+ absl::InlinedVector<uint32_t, 4> healthy_cluster_weights;
+ healthy_cluster_weights.reserve(weighted_clusters_.size());
+ uint32_t total_healthy_weight = 0;
+ for (const auto& cluster : weighted_clusters_) {
+ auto cluster_weight = cluster->clusterWeight(loader_);
+ cluster_weight = healthawareClusterWeight(cluster->cluster_name_, cluster_weight);
+
+ healthy_cluster_weights.push_back(cluster_weight);
+ total_healthy_weight += cluster_weight;
+ }
+ if (total_healthy_weight > 0) {
+ total_cluster_weight = total_healthy_weight;
+ cluster_weights = std::move(healthy_cluster_weights);
+ use_weight_override = true;
+ } else {
+ ENVOY_LOG(debug, "All clusters are unhealthy, weighted panic mode");
+ }
+ }
+
+ // Retry-aware weighted cluster selection: zero out the weight of any cluster
+ // that has already been attempted (and failed) on this request.
+ const AttemptedClustersFilterState* attempted_clusters = nullptr;
+ if (retry_aware_lb_ && weighted_clusters_.size() > 1) {
+ attempted_clusters = stream_info.filterState().getDataReadOnly<AttemptedClustersFilterState>(
+ kWeightedClusterAttemptedClustersKey);
+ if (attempted_clusters != nullptr && attempted_clusters->size() > 0) {
+ absl::InlinedVector<uint32_t, 4> retry_cluster_weights;
+ retry_cluster_weights.reserve(weighted_clusters_.size());
+ uint32_t total_retry_weight = 0;
+
+ auto current_weight = cluster_weights.begin();
+ for (const auto& cluster : weighted_clusters_) {
+ uint32_t cw;
+ if (use_weight_override && current_weight != cluster_weights.end()) {
+ cw = *current_weight++;
+ } else {
+ cw = cluster->clusterWeight(loader_);
+ }
+ cw = retryAwareClusterWeight(cluster->cluster_name_, cw, attempted_clusters);
+ retry_cluster_weights.push_back(cw);
+ total_retry_weight += cw;
+ }
+
+ if (total_retry_weight > 0) {
+ total_cluster_weight = total_retry_weight;
+ cluster_weights = std::move(retry_cluster_weights);
+ use_weight_override = true;
+ ENVOY_LOG(debug,
+ "retry-aware lb: adjusted total weight to {} after excluding {} "
+ "previously attempted cluster(s)",
+ total_cluster_weight, attempted_clusters->size());
+ } else {
+ ENVOY_LOG(debug, "retry-aware lb: all clusters previously attempted, "
+ "falling back to original weights (panic mode)");
+ }
+ }
+ }
+
const uint64_t selected_value =
(random_value_from_header.has_value() ? random_value_from_header.value() : selection_value) %
total_cluster_weight;
@@ -291,7 +433,7 @@ RouteConstSharedPtr WeightedClusterSpecifierPlugin::pickWeightedCluster(
// [0, cluster1_weight), [cluster1_weight, cluster1_weight+cluster2_weight),..
for (const auto& cluster : weighted_clusters_) {
- if (runtime_key_prefix_configured) {
+ if (use_weight_override) {
end = begin + *cluster_weight++;
} else {
end = begin + cluster->clusterWeight(loader_);
@@ -299,7 +441,8 @@ RouteConstSharedPtr WeightedClusterSpecifierPlugin::pickWeightedCluster(
if (selected_value >= begin && selected_value < end) {
if (!cluster->cluster_name_.empty()) {
- return std::make_shared<WeightedClusterEntry>(std::move(parent), "", cluster);
+ return std::make_shared<WeightedClusterEntry>(std::move(parent), "", cluster, this,
+ random_value);
}
ASSERT(!cluster->cluster_header_name_.get().empty());
@@ -307,7 +450,7 @@ RouteConstSharedPtr WeightedClusterSpecifierPlugin::pickWeightedCluster(
absl::string_view cluster_name =
entries.empty() ? absl::string_view{} : entries[0]->value().getStringView();
return std::make_shared<WeightedClusterEntry>(std::move(parent), std::string(cluster_name),
- cluster);
+ cluster, this, random_value);
}
begin = end;
}
@@ -335,5 +478,58 @@ WeightedClusterSpecifierPlugin::validateClusters(const Upstream::ClusterManager&
return absl::OkStatus();
}
+RouteConstSharedPtr WeightedClusterSpecifierPlugin::retryRoute(
+ const std::string& failed_cluster_name, RouteConstSharedPtr parent_route,
+ const Http::RequestHeaderMap& headers, StreamInfo::StreamInfo& stream_info,
+ uint64_t random_value) const {
+ if (!retry_aware_lb_ || weighted_clusters_.size() <= 1) {
+ return nullptr; // No alternatives to retry against.
+ }
+
+ // Only single-endpoint clusters (e.g. egress VIPs) need cluster-level retry
+ // routing. Multi-endpoint clusters rely on host-level retry predicates
+ // (PreviousHostsRetryPredicate) to pick a different host within the same cluster.
+ auto* failed_cluster = cluster_manager_.getThreadLocalCluster(failed_cluster_name);
+ if (failed_cluster != nullptr) {
+ uint64_t total_hosts = 0;
+ for (const auto& ps : failed_cluster->prioritySet().hostSetsPerPriority()) {
+ total_hosts += ps->hosts().size();
+ }
+ if (total_hosts > 1) {
+ ENVOY_LOG(debug,
+ "retry-aware lb: skipping for cluster '{}' with {} endpoints "
+ "(host-level retry predicate will handle)",
+ failed_cluster_name, total_hosts);
+ return nullptr;
+ }
+ }
+
+ // Record the failed cluster in filter state so pickWeightedCluster can exclude it.
+ const auto& filter_state = stream_info.filterState();
+ auto* attempted = filter_state->getDataMutable<AttemptedClustersFilterState>(
+ kWeightedClusterAttemptedClustersKey);
+ if (attempted == nullptr) {
+ auto attempted_clusters = std::make_shared<AttemptedClustersFilterState>();
+ attempted_clusters->addAttemptedCluster(failed_cluster_name);
+ filter_state->setData(kWeightedClusterAttemptedClustersKey, attempted_clusters,
+ StreamInfo::FilterState::StateType::Mutable,
+ StreamInfo::FilterState::LifeSpan::Request);
+ ENVOY_LOG(debug, "retry-aware lb: recorded first attempted cluster '{}' in filter state",
+ failed_cluster_name);
+ } else {
+ attempted->addAttemptedCluster(failed_cluster_name);
+ ENVOY_LOG(debug,
+ "retry-aware lb: recorded attempted cluster '{}' in filter state "
+ "(total attempted: {})",
+ failed_cluster_name, attempted->size());
+ }
+
+ // Re-pick a weighted cluster using the same random_value as the original selection.
+ // The filter state now contains the attempted clusters, so pickWeightedCluster will
+ // zero their weights and select a different one.
+ auto parent = std::static_pointer_cast<const RouteEntryAndRoute>(parent_route);
+ return pickWeightedCluster(std::move(parent), headers, stream_info, random_value);
+}
+
} // namespace Router
} // namespace Envoy
diff --git a/source/common/router/weighted_cluster_specifier.h b/source/common/router/weighted_cluster_specifier.h
index c8f3fb1de37af..274c6cd7c33d1 100644
--- a/source/common/router/weighted_cluster_specifier.h
+++ b/source/common/router/weighted_cluster_specifier.h
@@ -1,12 +1,16 @@
#pragma once
#include "envoy/router/cluster_specifier_plugin.h"
+#include "envoy/stream_info/filter_state.h"
#include "source/common/router/delegating_route_impl.h"
#include "source/common/router/header_parser.h"
#include "source/common/router/metadatamatchcriteria_impl.h"
#include "source/common/router/per_filter_config.h"
+#include "absl/container/flat_hash_set.h"
+#include "absl/strings/str_join.h"
+
namespace Envoy {
namespace Router {
@@ -16,6 +20,41 @@ using ClusterWeightProto = envoy::config::route::v3::WeightedCluster::ClusterWei
class WeightedClusterEntry;
class WeightedClusterSpecifierPlugin;
+/**
+ * Filter state key for tracking attempted weighted clusters during retries.
+ * This allows the weighted cluster selection logic to avoid re-selecting
+ * clusters that have already been tried and failed.
+ */
+inline constexpr absl::string_view kWeightedClusterAttemptedClustersKey =
+ "envoy.weighted_cluster.attempted_clusters";
+
+/**
+ * FilterState object that tracks which weighted clusters have been attempted
+ * during the lifetime of a request (including retries). When a cluster is
+ * attempted and fails, its name is added to this set. On subsequent retries,
+ * the weighted cluster selection logic will zero out the weight of any
+ * previously-attempted clusters to ensure the retry lands on a different cluster.
+ */
+class AttemptedClustersFilterState : public StreamInfo::FilterState::Object {
+public:
+ void addAttemptedCluster(const std::string& cluster_name) {
+ attempted_clusters_.insert(cluster_name);
+ }
+
+ bool hasAttempted(const std::string& cluster_name) const {
+ return attempted_clusters_.contains(cluster_name);
+ }
+
+ size_t size() const { return attempted_clusters_.size(); }
+
+ absl::optional<std::string> serializeAsString() const override {
+ return absl::StrJoin(attempted_clusters_, ",");
+ }
+
+private:
+ absl::flat_hash_set<std::string> attempted_clusters_;
+};
+
struct WeightedClustersConfigEntry {
public:
static absl::StatusOr<std::shared_ptr<WeightedClustersConfigEntry>>
@@ -64,16 +103,45 @@ class WeightedClusterSpecifierPlugin : public ClusterSpecifierPlugin,
absl::Status validateClusters(const Upstream::ClusterManager& cm) const override;
+ /**
+ * Attempt to select an alternative weighted cluster for a retry. Records the failed
+ * cluster in filter state and re-picks, skipping previously attempted single-endpoint
+ * clusters. Multi-endpoint clusters return nullptr (host-level retry predicates handle them).
+ *
+ * @param failed_cluster_name the cluster that just failed.
+ * @param parent_route the parent route (used by pickWeightedCluster to create new entries).
+ * @param headers the downstream request headers.
+ * @param stream_info the stream info (filter state will be modified).
+ * @param random_value the same random value used in the original cluster selection.
+ * @return RouteConstSharedPtr a new route targeting a different cluster, or nullptr.
+ */
+ RouteConstSharedPtr retryRoute(const std::string& failed_cluster_name,
+ RouteConstSharedPtr parent_route,
+ const Http::RequestHeaderMap& headers,
+ StreamInfo::StreamInfo& stream_info,
+ uint64_t random_value) const;
+
+ /** @return true if retry-aware load balancing is enabled and there are alternatives. */
+ bool hasRetryAwareAlternatives() const {
+ return retry_aware_lb_ && weighted_clusters_.size() > 1;
+ }
+
private:
RouteConstSharedPtr pickWeightedCluster(RouteEntryAndRouteConstSharedPtr parent,
const Http::RequestHeaderMap& headers,
const StreamInfo::StreamInfo& stream_info,
uint64_t random_value) const;
+ uint64_t healthawareClusterWeight(const std::string& cluster_name, uint64_t config_weight) const;
+ uint64_t retryAwareClusterWeight(const std::string& cluster_name, uint64_t config_weight,
+ const AttemptedClustersFilterState* attempted_clusters) const;
Runtime::Loader& loader_;
+ Upstream::ClusterManager& cluster_manager_;
const Http::LowerCaseString random_value_header_;
const std::string runtime_key_prefix_;
const bool use_hash_policy_{};
+ const bool health_aware_lb_{true};
+ const bool retry_aware_lb_{true};
std::vector<WeightedClustersConfigEntryConstSharedPtr> weighted_clusters_;
uint64_t total_cluster_weight_{0};
};
diff --git a/test/common/router/config_impl_test.cc b/test/common/router/config_impl_test.cc
index 614d3d24ae3da..f2bf9f36a11e1 100644
--- a/test/common/router/config_impl_test.cc
+++ b/test/common/router/config_impl_test.cc
@@ -20,6 +20,7 @@
#include "source/common/network/address_impl.h"
#include "source/common/router/config_impl.h"
#include "source/common/router/string_accessor_impl.h"
+#include "source/common/router/weighted_cluster_specifier.h"
#include "source/common/stream_info/filter_state_impl.h"
#include "source/common/stream_info/upstream_address.h"
@@ -11889,6 +11890,246 @@ TEST_F(RouteConfigurationV2, RequestBodyBufferLimitPrecedenceRouteOverridesVirtu
EXPECT_EQ(4194304U, route->requestBodyBufferLimit());
}
+// =============================================================================
+// Retry-aware weighted cluster selection tests
+// =============================================================================
+
+// Verify that the retry-aware weighted cluster logic zeroes out the weight of
+// previously attempted clusters (stored in filter state) and selects a different one.
+TEST_F(RouteMatcherTest, WeightedClusterRetryAwareSelectionSkipsAttemptedCluster) {
+ const std::string yaml = R"EOF(
+virtual_hosts:
+ - name: www
+ domains: ["www.lyft.com"]
+ routes:
+ - match: { prefix: "/" }
+ route:
+ weighted_clusters:
+ clusters:
+ - name: cluster1
+ weight: 50
+ - name: cluster2
+ weight: 50
+ )EOF";
+
+ factory_context_.cluster_manager_.initializeClusters({"cluster1", "cluster2"}, {});
+ TestConfigImpl config(parseRouteConfigurationFromYaml(yaml), factory_context_, true,
+ creation_status_);
+
+ NiceMock<Envoy::StreamInfo::MockStreamInfo> stream_info;
+
+ // First request: random_value=10 selects cluster1 (10 % 100 = 10, in [0, 50))
+ Http::TestRequestHeaderMapImpl headers = genHeaders("www.lyft.com", "/foo", "GET");
+ auto route = config.route(headers, stream_info, 10).route;
+ ASSERT_NE(nullptr, route);
+ EXPECT_EQ("cluster1", route->routeEntry()->clusterName());
+
+ // Now simulate a retry: record cluster1 as attempted in filter state.
+ auto attempted = std::make_shared<Router::AttemptedClustersFilterState>();
+ attempted->addAttemptedCluster("cluster1");
+ stream_info.filterState()->setData(
+ std::string(Router::kWeightedClusterAttemptedClustersKey), attempted,
+ StreamInfo::FilterState::StateType::Mutable, StreamInfo::FilterState::LifeSpan::Request);
+
+ // Same random_value=10, but now cluster1 has weight 0 → total_weight=50,
+ // selected_value = 10 % 50 = 10, falls into cluster2's range [0, 50).
+ auto retry_route = config.route(headers, stream_info, 10).route;
+ ASSERT_NE(nullptr, retry_route);
+ EXPECT_EQ("cluster2", retry_route->routeEntry()->clusterName());
+}
+
+// Verify that the clusterRefreshCallback() on a WeightedClusterEntry returns a valid
+// callback that performs retry-aware cluster selection.
+TEST_F(RouteMatcherTest, WeightedClusterClusterRefreshCallback) {
+ const std::string yaml = R"EOF(
+virtual_hosts:
+ - name: www
+ domains: ["www.lyft.com"]
+ routes:
+ - match: { prefix: "/" }
+ route:
+ weighted_clusters:
+ clusters:
+ - name: cluster1
+ weight: 50
+ - name: cluster2
+ weight: 50
+ )EOF";
+
+ factory_context_.cluster_manager_.initializeClusters({"cluster1", "cluster2"}, {});
+ TestConfigImpl config(parseRouteConfigurationFromYaml(yaml), factory_context_, true,
+ creation_status_);
+
+ NiceMock<Envoy::StreamInfo::MockStreamInfo> stream_info;
+
+ // Get the initial route (cluster1).
+ Http::TestRequestHeaderMapImpl headers = genHeaders("www.lyft.com", "/foo", "GET");
+ auto route = config.route(headers, stream_info, 10).route;
+ ASSERT_NE(nullptr, route);
+ EXPECT_EQ("cluster1", route->routeEntry()->clusterName());
+
+ // Get the cluster refresh callback from the route entry.
+ auto callback = route->routeEntry()->clusterRefreshCallback();
+ ASSERT_NE(nullptr, callback);
+
+ // Call the callback — it should record cluster1 as attempted and select cluster2.
+ auto retry_route = callback(headers, stream_info);
+ ASSERT_NE(nullptr, retry_route);
+ EXPECT_EQ("cluster2", retry_route->routeEntry()->clusterName());
+}
+
+// Verify that two high-weight "bad" clusters are systematically eliminated across
+// chained retries, eventually forcing selection to a low-weight "good" cluster.
+// This simulates the real router behavior: each retry updates the callback,
+// and the FilterState accumulates all previously attempted clusters.
+//
+// Weights: bad1(45), bad2(45), good(10) — random_value=20
+// Initial: 20 % 100 = 20 → bad1 [0,45) ← selected, fails
+// Retry 1: bad1 zeroed → total=55, 20 % 55 = 20 → bad2 [0,45) ← selected, fails
+// Retry 2: bad1+bad2 zeroed → total=10, 20 % 10 = 0 → good [0,10) ← selected ✓
+//
+TEST_F(RouteMatcherTest, WeightedClusterChainedRetriesAccumulateAttemptedClusters) {
+ const std::string yaml = R"EOF(
+virtual_hosts:
+ - name: www
+ domains: ["www.lyft.com"]
+ routes:
+ - match: { prefix: "/" }
+ route:
+ weighted_clusters:
+ clusters:
+ - name: bad1
+ weight: 45
+ - name: bad2
+ weight: 45
+ - name: good
+ weight: 10
+ )EOF";
+
+ factory_context_.cluster_manager_.initializeClusters({"bad1", "bad2", "good"}, {});
+ TestConfigImpl config(parseRouteConfigurationFromYaml(yaml), factory_context_, true,
+ creation_status_);
+
+ NiceMock<Envoy::StreamInfo::MockStreamInfo> stream_info;
+ Http::TestRequestHeaderMapImpl headers = genHeaders("www.lyft.com", "/foo", "GET");
+
+ // --- Initial request ---
+ // random_value=20: 20 % 100 = 20, falls in bad1's range [0, 45)
+ auto route = config.route(headers, stream_info, 20).route;
+ ASSERT_NE(nullptr, route);
+ EXPECT_EQ("bad1", route->routeEntry()->clusterName());
+
+ // Get the callback that captures "bad1" as the cluster to record on failure.
+ auto callback = route->routeEntry()->clusterRefreshCallback();
+ ASSERT_NE(nullptr, callback);
+
+ // --- Retry 1: bad1 just failed ---
+ // Callback records "bad1" in FilterState, re-picks.
+ // Weights: bad1=0, bad2=45, good=10 → total=55
+ // 20 % 55 = 20, falls in bad2's range [0, 45)
+ auto retry1_route = callback(headers, stream_info);
+ ASSERT_NE(nullptr, retry1_route);
+ EXPECT_EQ("bad2", retry1_route->routeEntry()->clusterName());
+
+ // Verify FilterState accumulated "bad1".
+ auto* attempted = stream_info.filterState()->getDataReadOnly<Router::AttemptedClustersFilterState>(
+ std::string(Router::kWeightedClusterAttemptedClustersKey));
+ ASSERT_NE(nullptr, attempted);
+ EXPECT_EQ(1, attempted->size());
+ EXPECT_TRUE(attempted->hasAttempted("bad1"));
+
+ // Simulate what the router does: update the callback to the new route entry.
+ // This is the crucial step — without it, retry 2 would record "bad1" again
+ // instead of "bad2", and bad2 could be re-selected.
+ callback = retry1_route->routeEntry()->clusterRefreshCallback();
+ ASSERT_NE(nullptr, callback);
+
+ // --- Retry 2: bad2 also failed ---
+ // Callback records "bad2" in FilterState, re-picks.
+ // Weights: bad1=0, bad2=0, good=10 → total=10
+ // 20 % 10 = 0, falls in good's range [0, 10)
+ auto retry2_route = callback(headers, stream_info);
+ ASSERT_NE(nullptr, retry2_route);
+ EXPECT_EQ("good", retry2_route->routeEntry()->clusterName());
+
+ // Verify FilterState accumulated both "bad1" and "bad2".
+ attempted = stream_info.filterState()->getDataReadOnly<Router::AttemptedClustersFilterState>(
+ std::string(Router::kWeightedClusterAttemptedClustersKey));
+ ASSERT_NE(nullptr, attempted);
+ EXPECT_EQ(2, attempted->size());
+ EXPECT_TRUE(attempted->hasAttempted("bad1"));
+ EXPECT_TRUE(attempted->hasAttempted("bad2"));
+}
+
+// Verify that when all clusters have been attempted, the retry-aware logic falls back
+// to original weights (panic mode) and still returns a valid route.
+TEST_F(RouteMatcherTest, WeightedClusterRetryAwarePanicMode) {
+ const std::string yaml = R"EOF(
+virtual_hosts:
+ - name: www
+ domains: ["www.lyft.com"]
+ routes:
+ - match: { prefix: "/" }
+ route:
+ weighted_clusters:
+ clusters:
+ - name: cluster1
+ weight: 50
+ - name: cluster2
+ weight: 50
+ )EOF";
+
+ factory_context_.cluster_manager_.initializeClusters({"cluster1", "cluster2"}, {});
+ TestConfigImpl config(parseRouteConfigurationFromYaml(yaml), factory_context_, true,
+ creation_status_);
+
+ NiceMock<Envoy::StreamInfo::MockStreamInfo> stream_info;
+
+ // Mark both clusters as attempted.
+ auto attempted = std::make_shared<Router::AttemptedClustersFilterState>();
+ attempted->addAttemptedCluster("cluster1");
+ attempted->addAttemptedCluster("cluster2");
+ stream_info.filterState()->setData(
+ std::string(Router::kWeightedClusterAttemptedClustersKey), attempted,
+ StreamInfo::FilterState::StateType::Mutable, StreamInfo::FilterState::LifeSpan::Request);
+
+ // Should still return a route (panic mode — original weights used).
+ Http::TestRequestHeaderMapImpl headers = genHeaders("www.lyft.com", "/foo", "GET");
+ auto route = config.route(headers, stream_info, 10).route;
+ ASSERT_NE(nullptr, route);
+ // With panic mode, original weights are used — cluster1 is selected for random_value=10.
+ EXPECT_EQ("cluster1", route->routeEntry()->clusterName());
+}
+
+// Verify that a single-cluster weighted route returns no callback (no alternatives).
+TEST_F(RouteMatcherTest, WeightedClusterSingleClusterNoCallback) {
+ const std::string yaml = R"EOF(
+virtual_hosts:
+ - name: www
+ domains: ["www.lyft.com"]
+ routes:
+ - match: { prefix: "/" }
+ route:
+ weighted_clusters:
+ clusters:
+ - name: only_cluster
+ weight: 100
+ )EOF";
+
+ factory_context_.cluster_manager_.initializeClusters({"only_cluster"}, {});
+ TestConfigImpl config(parseRouteConfigurationFromYaml(yaml), factory_context_, true,
+ creation_status_);
+
+ Http::TestRequestHeaderMapImpl headers = genHeaders("www.lyft.com", "/foo", "GET");
+ auto route = config.route(headers, 10).route;
+ ASSERT_NE(nullptr, route);
+ EXPECT_EQ("only_cluster", route->routeEntry()->clusterName());
+
+ // Single cluster → no callback (hasRetryAwareAlternatives() is false).
+ auto callback = route->routeEntry()->clusterRefreshCallback();
+ EXPECT_EQ(nullptr, callback);
+}
+
} // namespace
} // namespace Router
} // namespace Envoy
diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc
index 69a77f3549858..8c9053b69bf0d 100644
--- a/test/common/router/router_test.cc
+++ b/test/common/router/router_test.cc
@@ -7617,5 +7617,88 @@ TEST_F(RouterTest, OrcaLoadReportInvalidHeaderValue) {
response_decoder->decodeHeaders(std::move(response_headers), true);
}
+// =============================================================================
+// Retry-aware weighted cluster tests
+// =============================================================================
+
+// Verify that doRetry calls refreshClusterOnRetry() on the retry state and uses
+// the returned route to switch clusters. The router doesn't know about weighted
+// clusters — it just calls the generic refreshClusterOnRetry() method.
+TEST_F(RouterTest, DoRetryCallsRefreshClusterOnRetryAndSwitchesCluster) {
+ // Create a mock route that refreshClusterOnRetry() will return.
+ auto retry_route = std::make_shared<NiceMock<MockRoute>>();
+ retry_route->route_entry_.cluster_name_ = "retry_cluster";
+ cm_.initializeThreadLocalClusters({"retry_cluster"});
+
+ ON_CALL(*router_->retry_state_, refreshClusterOnRetry(_, _))
+ .WillByDefault(Return(retry_route));
+
+ NiceMock<Http::MockRequestEncoder> encoder1;
+ Http::ResponseDecoder* response_decoder = nullptr;
+ expectNewStreamWithImmediateEncoder(encoder1, &response_decoder, Http::Protocol::Http10);
+ expectResponseTimerCreate();
+
+ Http::TestRequestHeaderMapImpl headers{
+ {"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}};
+ HttpTestUtility::addDefaultHeaders(headers);
+ router_->decodeHeaders(headers, true);
+ EXPECT_EQ(1U,
+ callbacks_.route_->virtual_host_->virtual_cluster_.stats().upstream_rq_total_.value());
+
+ // Trigger a retry via reset.
+ router_->retry_state_->expectResetRetry();
+ encoder1.stream_.resetStream(Http::StreamResetReason::RemoteReset);
+
+ // Execute the retry callback — this should call refreshClusterOnRetry() and switch cluster.
+ NiceMock<Http::MockRequestEncoder> encoder2;
+ expectNewStreamWithImmediateEncoder(encoder2, &response_decoder, Http::Protocol::Http10);
+ router_->retry_state_->callback_();
+
+ // Verify that the route was switched to the retry route's cluster.
+ EXPECT_EQ("retry_cluster", router_->route()->routeEntry()->clusterName());
+
+ // Complete the retry with a successful response.
+ Http::ResponseHeaderMapPtr response_headers(
+ new Http::TestResponseHeaderMapImpl({{":status", "200"}}));
+ EXPECT_CALL(callbacks_, encodeHeaders_(_, _));
+ response_decoder->decodeHeaders(std::move(response_headers), true);
+}
+
+// Verify that when refreshClusterOnRetry() returns nullptr (the default),
+// normal retry proceeds without any disruption.
+TEST_F(RouterTest, DoRetryNormalRetryWhenRefreshClusterReturnsNull) {
+ // NiceMock default: refreshClusterOnRetry() returns nullptr — normal retry behavior.
+
+ NiceMock<Http::MockRequestEncoder> encoder1;
+ Http::ResponseDecoder* response_decoder = nullptr;
+ expectNewStreamWithImmediateEncoder(encoder1, &response_decoder, Http::Protocol::Http10);
+ expectResponseTimerCreate();
+
+ Http::TestRequestHeaderMapImpl headers{
+ {"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}};
+ HttpTestUtility::addDefaultHeaders(headers);
+ router_->decodeHeaders(headers, true);
+
+ // Trigger retry.
+ router_->retry_state_->expectResetRetry();
+ encoder1.stream_.resetStream(Http::StreamResetReason::RemoteReset);
+
+ // No clearRouteCache should be called — refreshClusterOnRetry returns nullptr.
+ EXPECT_CALL(callbacks_.downstream_callbacks_, clearRouteCache()).Times(0);
+
+ NiceMock<Http::MockRequestEncoder> encoder2;
+ expectNewStreamWithImmediateEncoder(encoder2, &response_decoder, Http::Protocol::Http10);
+ router_->retry_state_->callback_();
+
+ // Cluster should remain the same.
+ EXPECT_EQ("fake_cluster", router_->route()->routeEntry()->clusterName());
+
+ // Complete the retry with a successful response.
+ Http::ResponseHeaderMapPtr response_headers(
+ new Http::TestResponseHeaderMapImpl({{":status", "200"}}));
+ EXPECT_CALL(callbacks_, encodeHeaders_(_, _));
+ response_decoder->decodeHeaders(std::move(response_headers), true);
+}
+
} // namespace Router
} // namespace Envoy
diff --git a/test/mocks/router/mocks.h b/test/mocks/router/mocks.h
index be1b169e9bb60..e058c2fe275c2 100644
--- a/test/mocks/router/mocks.h
+++ b/test/mocks/router/mocks.h
@@ -234,6 +234,9 @@ class MockRetryState : public RetryState {
const Upstream::RetryPriority::PriorityMappingFunc&));
MOCK_METHOD(uint32_t, hostSelectionMaxAttempts, (), (const));
MOCK_METHOD(bool, wouldRetryFromRetriableStatusCode, (Http::Code code), (const));
+ MOCK_METHOD(void, setClusterRefreshCallback, (ClusterRefreshFunction callback));
+ MOCK_METHOD(RouteConstSharedPtr, refreshClusterOnRetry,
+ (const Http::RequestHeaderMap& headers, StreamInfo::StreamInfo& stream_info));
DoRetryCallback callback_;
};
@@ -466,6 +469,7 @@ class MockRouteEntry : public RouteEntry {
MOCK_METHOD(const RouteStatsContextOptRef, routeStatsContext, (), (const));
MOCK_METHOD(void, refreshRouteCluster,
(const Http::RequestHeaderMap&, const StreamInfo::StreamInfo&), (const));
+ MOCK_METHOD(RetryState::ClusterRefreshFunction, clusterRefreshCallback, (), (const));
std::string cluster_name_{"fake_cluster"};
std::multimap<std::string, std::string> opaque_config_;
@@ -589,6 +593,7 @@ class MockRoute : public RouteEntryAndRoute {
MOCK_METHOD(const RouteStatsContextOptRef, routeStatsContext, (), (const));
MOCK_METHOD(void, refreshRouteCluster,
(const Http::RequestHeaderMap&, const StreamInfo::StreamInfo&), (const));
+ MOCK_METHOD(RetryState::ClusterRefreshFunction, clusterRefreshCallback, (), (const));
testing::NiceMock<MockRouteEntry> route_entry_;
testing::NiceMock<MockDecorator> decorator_;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment