Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import io.javaoperatorsdk.operator.processing.event.source.informer.pool.DefaultInformerPool;
import io.javaoperatorsdk.operator.processing.event.source.informer.pool.InformerPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -476,4 +478,8 @@ default boolean useSSAToPatchPrimaryResource() {
default boolean cloneSecondaryResourcesWhenGettingFromCache() {
return false;
}

default InformerPool informerPool() {
return new DefaultInformerPool(getKubernetesClient(),this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.javaoperatorsdk.operator.processing.event.source.informer.pool.InformerPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,6 +56,7 @@ class InformerManager<R extends HasMetadata, C extends Informable<R>>
private final ResourceEventHandler<R> eventHandler;
private final Map<String, Function<R, List<String>>> indexers = new HashMap<>();
private ControllerConfiguration<R> controllerConfiguration;
private InformerPool informerPool;

InformerManager(
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client,
Expand All @@ -67,6 +69,7 @@ class InformerManager<R extends HasMetadata, C extends Informable<R>>

void setControllerConfiguration(ControllerConfiguration<R> controllerConfiguration) {
this.controllerConfiguration = controllerConfiguration;
this.controllerConfiguration.getConfigurationService().informerPool();
}

@Override
Expand Down Expand Up @@ -149,7 +152,6 @@ private InformerWrapper<R> createEventSource(
ResourceEventHandler<R> eventHandler,
String namespaceIdentifier) {
final var informerConfig = configuration.getInformerConfig();

if (informerConfig.getFieldSelector() != null
&& !informerConfig.getFieldSelector().getFields().isEmpty()) {
for (var f : informerConfig.getFieldSelector().getFields()) {
Expand All @@ -167,9 +169,7 @@ private InformerWrapper<R> createEventSource(
.orElse(filteredBySelectorClient)
.runnableInformer(0);
Optional.ofNullable(informerConfig.getItemStore()).ifPresent(informer::itemStore);
var source =
new InformerWrapper<>(
informer, controllerConfiguration.getConfigurationService(), namespaceIdentifier);
var source = new InformerWrapper<>(informer, namespaceIdentifier);
source.addEventHandler(eventHandler);
sources.put(namespaceIdentifier, source);
return source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,18 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.informers.ExceptionHandler;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
Expand All @@ -51,99 +44,16 @@ class InformerWrapper<T extends HasMetadata>
private final SharedIndexInformer<T> informer;
private final Cache<T> cache;
private final String namespaceIdentifier;
private final ConfigurationService configurationService;

public InformerWrapper(
SharedIndexInformer<T> informer,
ConfigurationService configurationService,
String namespaceIdentifier) {
public InformerWrapper(SharedIndexInformer<T> informer, String namespaceIdentifier) {
this.informer = informer;
this.namespaceIdentifier = namespaceIdentifier;
this.cache = (Cache<T>) informer.getStore();
this.configurationService = configurationService;
}

@Override
public void start() throws OperatorException {
try {

// register stopped handler if we have one defined
configurationService
.getInformerStoppedHandler()
.ifPresent(
ish -> {
final var stopped = informer.stopped();
if (stopped != null) {
stopped.handle(
(res, ex) -> {
ish.onStop(informer, ex);
return null;
});
} else {
final var apiTypeClass = informer.getApiTypeClass();
final var fullResourceName = HasMetadata.getFullResourceName(apiTypeClass);
final var version = HasMetadata.getVersion(apiTypeClass);
throw new IllegalStateException(
"Cannot retrieve 'stopped' callback to listen to informer stopping for"
+ " informer for "
+ fullResourceName
+ "/"
+ version);
}
});
if (!configurationService.stopOnInformerErrorDuringStartup()) {
informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t));
}
// change thread name for easier debugging
final var thread = Thread.currentThread();
final var name = thread.getName();
try {
thread.setName(informerInfo() + " " + thread.getId());
final var resourceName = informer.getApiTypeClass().getSimpleName();
log.debug(
"Starting informer for namespace: {} resource: {}", namespaceIdentifier, resourceName);
var start = informer.start();
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
// false, and there is a rbac issue the get never returns; therefore operator never really
// starts
log.trace(
"Waiting informer to start namespace: {} resource: {}",
namespaceIdentifier,
resourceName);
start
.toCompletableFuture()
.get(configurationService.cacheSyncTimeout().toMillis(), TimeUnit.MILLISECONDS);
log.debug(
"Started informer for namespace: {} resource: {}", namespaceIdentifier, resourceName);
} catch (TimeoutException | ExecutionException e) {
if (configurationService.stopOnInformerErrorDuringStartup()) {
log.error("Informer startup error. Operator will be stopped. Informer: {}", informer, e);
throw new OperatorException(e);
} else {
log.warn("Informer startup error. Will periodically retry. Informer: {}", informer, e);
}
} catch (InterruptedException e) {
thread.interrupt();
throw new IllegalStateException(e);
} finally {
// restore original name
thread.setName(name);
}

} catch (Exception e) {
ReconcilerUtilsInternal.handleKubernetesClientException(
e, HasMetadata.getFullResourceName(informer.getApiTypeClass()));
throw new OperatorException(
"Couldn't start informer for " + versionedFullResourceName() + " resources", e);
}
}

private String versionedFullResourceName() {
final var apiTypeClass = informer.getApiTypeClass();
if (apiTypeClass.isAssignableFrom(GenericKubernetesResource.class)) {
return GenericKubernetesResource.class.getSimpleName();
}
return ReconcilerUtilsInternal.getResourceTypeNameWithVersion(apiTypeClass);
public void start() {
// no-op: informer initialization is handled by InformerPool
}

@Override
Expand Down Expand Up @@ -201,7 +111,7 @@ public String toString() {
}

private String informerInfo() {
return "InformerWrapper [" + versionedFullResourceName() + "]";
return "InformerWrapper [" + informer.getApiTypeClass().getSimpleName() + "]";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright Java Operator SDK Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.javaoperatorsdk.operator.processing.event.source.informer.pool;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.informers.ExceptionHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_ALL_NAMESPACES;

public class DefaultInformerPool implements InformerPool {

private static final Logger log = LoggerFactory.getLogger(DefaultInformerPool.class);

private final KubernetesClient client;
private final ConfigurationService configurationService;

private final Map<InformerClassifier, SharedIndexInformer<?>> informers = new HashMap<>();
private final Map<SharedIndexInformer<?>, AtomicInteger> counters = new HashMap<>();

public DefaultInformerPool(KubernetesClient client, ConfigurationService configurationService) {
this.client = client;
this.configurationService = configurationService;
}

public synchronized SharedIndexInformer<?> getResource(InformerClassifier classifier) {
var informer = informers.get(classifier);
if (informer == null) {
informer = createInformer(client, classifier);
initInformer(informer, classifier.namespaceIdentifier());
informers.put(classifier, informer);
counters.put(informer, new AtomicInteger(1));
} else {
counters.get(informer).incrementAndGet();
}
return informer;
}

public synchronized void releaseResource(SharedIndexInformer<?> informer) {
var counter = counters.get(informer);
if (counter != null && counter.decrementAndGet() <= 0) {
informer.stop();
counters.remove(informer);
informers.values().remove(informer);
} else {
log.warn("No informer found in the pool.");
}
}

private void initInformer(SharedIndexInformer<?> informer, String namespaceIdentifier) {
try {
configurationService
.getInformerStoppedHandler()
.ifPresent(
ish -> {
final var stopped = informer.stopped();
if (stopped != null) {
stopped.handle(
(res, ex) -> {
ish.onStop(informer, ex);
return null;
});
} else {
final var apiTypeClass = informer.getApiTypeClass();
final var fullResourceName = HasMetadata.getFullResourceName(apiTypeClass);
final var version = HasMetadata.getVersion(apiTypeClass);
throw new IllegalStateException(
"Cannot retrieve 'stopped' callback to listen to informer stopping for"
+ " informer for "
+ fullResourceName
+ "/"
+ version);
}
});
if (!configurationService.stopOnInformerErrorDuringStartup()) {
informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t));
}
// change thread name for easier debugging
final var thread = Thread.currentThread();
final var name = thread.getName();
try {
thread.setName(
"InformerPool [" + versionedFullResourceName(informer) + "] " + thread.getId());
final var resourceName = informer.getApiTypeClass().getSimpleName();
log.debug(
"Starting informer for namespace: {} resource: {}", namespaceIdentifier, resourceName);
var start = informer.start();
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
// false, and there is a rbac issue the get never returns; therefore operator never really
// starts
log.trace(
"Waiting informer to start namespace: {} resource: {}",
namespaceIdentifier,
resourceName);
start
.toCompletableFuture()
.get(configurationService.cacheSyncTimeout().toMillis(), TimeUnit.MILLISECONDS);
log.debug(
"Started informer for namespace: {} resource: {}", namespaceIdentifier, resourceName);
} catch (TimeoutException | ExecutionException e) {
if (configurationService.stopOnInformerErrorDuringStartup()) {
log.error("Informer startup error. Operator will be stopped. Informer: {}", informer, e);
throw new OperatorException(e);
} else {
log.warn("Informer startup error. Will periodically retry. Informer: {}", informer, e);
}
} catch (InterruptedException e) {
thread.interrupt();
throw new IllegalStateException(e);
} finally {
// restore original name
thread.setName(name);
}
} catch (Exception e) {
ReconcilerUtilsInternal.handleKubernetesClientException(
e, HasMetadata.getFullResourceName(informer.getApiTypeClass()));
throw new OperatorException(
"Couldn't start informer for " + versionedFullResourceName(informer) + " resources", e);
}
}

@SuppressWarnings("unchecked")
private static String versionedFullResourceName(SharedIndexInformer<?> informer) {
return ReconcilerUtilsInternal.getResourceTypeNameWithVersion(
(Class<? extends HasMetadata>) informer.getApiTypeClass());
}

@SuppressWarnings("rawtypes")
static SharedIndexInformer<?> createInformer(
KubernetesClient client, InformerClassifier classifier) {
FilterWatchListDeletable filteredClient;
if (WATCH_ALL_NAMESPACES.equals(classifier.namespaceIdentifier())) {
filteredClient =
client
.resources(classifier.resourceClass())
.inAnyNamespace()
.withLabelSelector(classifier.labelSelector());
} else {
filteredClient =
client
.resources(classifier.resourceClass())
.inNamespace(classifier.namespaceIdentifier())
.withLabelSelector(classifier.labelSelector());
}

if (classifier.fieldSelector() != null && !classifier.fieldSelector().getFields().isEmpty()) {
for (var f : classifier.fieldSelector().getFields()) {
if (f.negated()) {
filteredClient =
(FilterWatchListDeletable) filteredClient.withoutField(f.path(), f.value());
} else {
filteredClient = (FilterWatchListDeletable) filteredClient.withField(f.path(), f.value());
}
}
}
return filteredClient.runnableInformer(0);
}
}
Loading
Loading