diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/PulsarCatalog.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/PulsarCatalog.java index 46306b84..17fd4553 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/PulsarCatalog.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/PulsarCatalog.java @@ -70,6 +70,8 @@ public class PulsarCatalog extends GenericInMemoryCatalog { private final String authParams; + private final String tlsTrustCertsFilePath; + private final String tenant; private PulsarCatalogSupport catalogSupport; @@ -85,13 +87,15 @@ public PulsarCatalog( String database, String tenant, @Nullable String authPlugin, - @Nullable String authParams) { + @Nullable String authParams, + @Nullable String tlsTrustCertsFilePath) { super(catalogName, database); this.adminUrl = adminUrl; this.serviceUrl = serviceUrl; this.authPlugin = authPlugin; this.authParams = authParams; this.tenant = tenant; + this.tlsTrustCertsFilePath = tlsTrustCertsFilePath; log.info("Created Pulsar Catalog {}", catalogName); } @@ -108,6 +112,7 @@ public void open() throws CatalogException { final ClientConfigurationData clientConf = new ClientConfigurationData(); clientConf.setAuthPluginClassName(this.authPlugin); clientConf.setAuthParams(this.authParams); + clientConf.setTlsTrustCertsFilePath(this.tlsTrustCertsFilePath); clientConf.setServiceUrl(serviceUrl); catalogSupport = new PulsarCatalogSupport( diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/PulsarCatalogFactory.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/PulsarCatalogFactory.java index 5caeb7db..fd3ae3f5 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/PulsarCatalogFactory.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/PulsarCatalogFactory.java @@ -36,6 +36,7 @@ import static org.apache.flink.streaming.connectors.pulsar.catalog.PulsarCatalogFactoryOptions.IDENTIFIER; import static org.apache.flink.streaming.connectors.pulsar.catalog.PulsarCatalogFactoryOptions.PULSAR_VERSION; import static org.apache.flink.streaming.connectors.pulsar.catalog.PulsarCatalogFactoryOptions.SERVICE_URL; +import static org.apache.flink.streaming.connectors.pulsar.catalog.PulsarCatalogFactoryOptions.TLS_TRUSTCERTS_FILE_PATH; /** Pulsar {@CatalogFactory}. */ public class PulsarCatalogFactory implements CatalogFactory { @@ -57,7 +58,8 @@ public Catalog createCatalog(Context context) { helper.getOptions().get(DEFAULT_DATABASE), helper.getOptions().get(CATALOG_TENANT), helper.getOptions().get(AUTH_PLUGIN), - helper.getOptions().get(AUTH_PARAMS)); + helper.getOptions().get(AUTH_PARAMS), + helper.getOptions().get(TLS_TRUSTCERTS_FILE_PATH)); } @Override @@ -77,7 +79,7 @@ public Set> optionalOptions() { options.add(AUTH_PARAMS); options.add(DEFAULT_PARTITIONS); options.add(PULSAR_VERSION); - + options.add(TLS_TRUSTCERTS_FILE_PATH); // TODO: investigate if need to provide default table options return options; diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/PulsarCatalogFactoryOptions.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/PulsarCatalogFactoryOptions.java index 1c96d9aa..0926aa48 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/PulsarCatalogFactoryOptions.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/PulsarCatalogFactoryOptions.java @@ -73,6 +73,11 @@ public final class PulsarCatalogFactoryOptions { ConfigOptions.key("pulsar-version") .stringType() .defaultValue(PulsarVersion.getVersion()); + public static final ConfigOption TLS_TRUSTCERTS_FILE_PATH = + ConfigOptions.key("catalog-tls-trustcerts-filepath") + .stringType() + .noDefaultValue() + .withDescription("tls trust certs file path config"); private PulsarCatalogFactoryOptions() {} } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/util/PulsarCatalogSupport.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/util/PulsarCatalogSupport.java index b5828468..be955e8b 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/util/PulsarCatalogSupport.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/catalog/util/PulsarCatalogSupport.java @@ -256,6 +256,14 @@ private Map enrichTableOptions(final Map tableOp authParams); } + String tlsTrustCertFilePath = + pulsarMetadataReader.getClientConf().getTlsTrustCertsFilePath(); + if (tlsTrustCertFilePath != null && !tlsTrustCertFilePath.isEmpty()) { + enrichedTableOptions.put( + PulsarTableOptions.PROPERTIES_PREFIX + PulsarOptions.TLS_TRUSTCERTS_FILEPATH, + tlsTrustCertFilePath); + } + if (tableOptions != null) { // table options could overwrite the default options provided above enrichedTableOptions.putAll(tableOptions); diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java index e5ef180f..6b89b5c4 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java @@ -62,6 +62,7 @@ public static ClientConfigurationData newClientConf(String serviceUrl, Propertie clientConf.setAuthParams(properties.getProperty(PulsarOptions.AUTH_PARAMS_KEY)); clientConf.setAuthPluginClassName( properties.getProperty(PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY)); + clientConf.setTlsTrustCertsFilePath(PulsarOptions.TLS_TRUSTCERTS_FILEPATH); } return clientConf; } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java index 76f4f677..3b575e36 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java @@ -98,4 +98,5 @@ public class PulsarOptions { public static final String AUTH_PARAMS_KEY = "auth-params"; public static final String AUTH_PLUGIN_CLASSNAME_KEY = "auth-plugin-classname"; + public static final String TLS_TRUSTCERTS_FILEPATH = "tls-trustcerts-filepath"; }