From efe9416d20f3ac2e281300693a373bc2a733846a Mon Sep 17 00:00:00 2001 From: madplay Date: Fri, 13 Dec 2024 22:28:44 +0900 Subject: [PATCH 1/3] Fix typo in OpensearchClient --- .../io/aiven/kafka/connect/opensearch/OpensearchClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchClient.java b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchClient.java index 5f63420f..c2c2e892 100644 --- a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchClient.java +++ b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchClient.java @@ -248,7 +248,7 @@ public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder h .forOpensearch(config); configurators.forEach(configurator -> { if (configurator.apply(config, httpClientBuilder)) { - LOGGER.debug("Successfuly applied " + configurator.getClass().getName() + LOGGER.debug("successfully applied " + configurator.getClass().getName() + " configurator to OpensearchClient"); } }); From 31bb93dba0157773bffc639ee929f80c288bd717 Mon Sep 17 00:00:00 2001 From: madplay Date: Fri, 13 Dec 2024 22:29:28 +0900 Subject: [PATCH 2/3] Simplify modifiers, map creation, and lambda expressions --- .../io/aiven/kafka/connect/opensearch/DocumentIDStrategy.java | 4 ++-- .../kafka/connect/opensearch/OpensearchSinkConnector.java | 3 +-- .../java/io/aiven/kafka/connect/opensearch/RetryUtil.java | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/DocumentIDStrategy.java b/src/main/java/io/aiven/kafka/connect/opensearch/DocumentIDStrategy.java index 9affebe6..25406362 100644 --- a/src/main/java/io/aiven/kafka/connect/opensearch/DocumentIDStrategy.java +++ b/src/main/java/io/aiven/kafka/connect/opensearch/DocumentIDStrategy.java @@ -42,7 +42,7 @@ record -> String.format("%s+%s+%s", record.topic(), record.kafkaPartition(), rec private final Function docIdGenerator; - private DocumentIDStrategy(final String name, final String description, + DocumentIDStrategy(final String name, final String description, final Function docIdGenerator) { this.name = name.toLowerCase(Locale.ROOT); this.description = description; @@ -74,7 +74,7 @@ public static String describe() { } public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() { - private final String[] names = Arrays.stream(values()).map(v -> v.toString()).toArray(String[]::new); + private final String[] names = Arrays.stream(values()).map(Object::toString).toArray(String[]::new); private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(names); @Override diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnector.java b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnector.java index 5c5b78db..a0e2265e 100644 --- a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnector.java +++ b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnector.java @@ -54,8 +54,7 @@ public Class taskClass() { @Override public List> taskConfigs(final int maxTasks) { final List> taskConfigs = new ArrayList<>(); - final Map taskProps = new HashMap<>(); - taskProps.putAll(configProperties); + final Map taskProps = new HashMap<>(configProperties); for (int i = 0; i < maxTasks; i++) { taskConfigs.add(taskProps); } diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/RetryUtil.java b/src/main/java/io/aiven/kafka/connect/opensearch/RetryUtil.java index 5411a378..a21171a1 100644 --- a/src/main/java/io/aiven/kafka/connect/opensearch/RetryUtil.java +++ b/src/main/java/io/aiven/kafka/connect/opensearch/RetryUtil.java @@ -115,7 +115,7 @@ public static T callWithRetry(final String callName, fi final long sleepTimeMs = computeRandomRetryWaitTimeInMillis(retryAttempts, retryBackoffMs); final var msg = String.format("Failed to %s with attempt %s/%s, will attempt retry after %s ms. ", callName, attempts, maxAttempts, sleepTimeMs); - LOGGER.warn(msg + "Failure reason: {}", e); + LOGGER.warn(msg + "Failure reason: ", e); time.sleep(sleepTimeMs); } else { final var msg = String.format("Failed to %s after total of %s attempt(s)", callName, attempts); From 670f35abc9492727962af80438c72bd22bf5465a Mon Sep 17 00:00:00 2001 From: madplay Date: Fri, 13 Dec 2024 22:43:37 +0900 Subject: [PATCH 3/3] Remove unused parameter and simplify iteration --- .../kafka/connect/opensearch/OpensearchClient.java | 2 +- .../opensearch/spi/ClientsConfiguratorProvider.java | 10 ++-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchClient.java b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchClient.java index c2c2e892..b7b5d809 100644 --- a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchClient.java +++ b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchClient.java @@ -245,7 +245,7 @@ public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder h .build(); final Collection configurators = ClientsConfiguratorProvider - .forOpensearch(config); + .forOpensearch(); configurators.forEach(configurator -> { if (configurator.apply(config, httpClientBuilder)) { LOGGER.debug("successfully applied " + configurator.getClass().getName() diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/spi/ClientsConfiguratorProvider.java b/src/main/java/io/aiven/kafka/connect/opensearch/spi/ClientsConfiguratorProvider.java index ec25b9c6..71deb483 100644 --- a/src/main/java/io/aiven/kafka/connect/opensearch/spi/ClientsConfiguratorProvider.java +++ b/src/main/java/io/aiven/kafka/connect/opensearch/spi/ClientsConfiguratorProvider.java @@ -17,10 +17,8 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.ServiceLoader; -import io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig; public final class ClientsConfiguratorProvider { private ClientsConfiguratorProvider() { @@ -30,19 +28,15 @@ private ClientsConfiguratorProvider() { * Use {@link ServiceLoader} mechanism to discover available configurators for Opensearch (and possibly others) * clients which are applicable to the provided configuration. * - * @param config - * provided configuration * @return the list of discovered {@link OpensearchClientConfigurator} configurators which are applicable to the * provided configuration. */ - public static Collection forOpensearch(final OpensearchSinkConnectorConfig config) { + public static Collection forOpensearch() { final Collection configurators = new ArrayList<>(); final ServiceLoader loaders = ServiceLoader .load(OpensearchClientConfigurator.class, ClientsConfiguratorProvider.class.getClassLoader()); - final Iterator iterator = loaders.iterator(); - while (iterator.hasNext()) { - final OpensearchClientConfigurator configurator = iterator.next(); + for (OpensearchClientConfigurator configurator : loaders) { configurators.add(configurator); }