Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.

Connector support pulsar cluster with both JWT and TLS auth #543

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class PulsarCatalog extends GenericInMemoryCatalog {

private final String authParams;

private final String tlsTrustCertsFilePath;

private final String tenant;

private PulsarCatalogSupport catalogSupport;
Expand All @@ -85,13 +87,15 @@ public PulsarCatalog(
String database,
String tenant,
@Nullable String authPlugin,
@Nullable String authParams) {
@Nullable String authParams,
@Nullable String tlsTrustCertsFilePath) {
super(catalogName, database);
this.adminUrl = adminUrl;
this.serviceUrl = serviceUrl;
this.authPlugin = authPlugin;
this.authParams = authParams;
this.tenant = tenant;
this.tlsTrustCertsFilePath = tlsTrustCertsFilePath;

log.info("Created Pulsar Catalog {}", catalogName);
}
Expand All @@ -108,6 +112,7 @@ public void open() throws CatalogException {
final ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setAuthPluginClassName(this.authPlugin);
clientConf.setAuthParams(this.authParams);
clientConf.setTlsTrustCertsFilePath(this.tlsTrustCertsFilePath);
clientConf.setServiceUrl(serviceUrl);
catalogSupport =
new PulsarCatalogSupport(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.apache.flink.streaming.connectors.pulsar.catalog.PulsarCatalogFactoryOptions.IDENTIFIER;
import static org.apache.flink.streaming.connectors.pulsar.catalog.PulsarCatalogFactoryOptions.PULSAR_VERSION;
import static org.apache.flink.streaming.connectors.pulsar.catalog.PulsarCatalogFactoryOptions.SERVICE_URL;
import static org.apache.flink.streaming.connectors.pulsar.catalog.PulsarCatalogFactoryOptions.TLS_TRUSTCERTS_FILE_PATH;

/** Pulsar {@CatalogFactory}. */
public class PulsarCatalogFactory implements CatalogFactory {
Expand All @@ -57,7 +58,8 @@ public Catalog createCatalog(Context context) {
helper.getOptions().get(DEFAULT_DATABASE),
helper.getOptions().get(CATALOG_TENANT),
helper.getOptions().get(AUTH_PLUGIN),
helper.getOptions().get(AUTH_PARAMS));
helper.getOptions().get(AUTH_PARAMS),
helper.getOptions().get(TLS_TRUSTCERTS_FILE_PATH));
}

@Override
Expand All @@ -77,7 +79,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(AUTH_PARAMS);
options.add(DEFAULT_PARTITIONS);
options.add(PULSAR_VERSION);

options.add(TLS_TRUSTCERTS_FILE_PATH);
// TODO: investigate if need to provide default table options

return options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public final class PulsarCatalogFactoryOptions {
ConfigOptions.key("pulsar-version")
.stringType()
.defaultValue(PulsarVersion.getVersion());
public static final ConfigOption<String> TLS_TRUSTCERTS_FILE_PATH =
ConfigOptions.key("catalog-tls-trustcerts-filepath")
.stringType()
.noDefaultValue()
.withDescription("tls trust certs file path config");

private PulsarCatalogFactoryOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,14 @@ private Map<String, String> enrichTableOptions(final Map<String, String> tableOp
authParams);
}

String tlsTrustCertFilePath =
pulsarMetadataReader.getClientConf().getTlsTrustCertsFilePath();
if (tlsTrustCertFilePath != null && !tlsTrustCertFilePath.isEmpty()) {
enrichedTableOptions.put(
PulsarTableOptions.PROPERTIES_PREFIX + PulsarOptions.TLS_TRUSTCERTS_FILEPATH,
tlsTrustCertFilePath);
}

if (tableOptions != null) {
// table options could overwrite the default options provided above
enrichedTableOptions.putAll(tableOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static ClientConfigurationData newClientConf(String serviceUrl, Propertie
clientConf.setAuthParams(properties.getProperty(PulsarOptions.AUTH_PARAMS_KEY));
clientConf.setAuthPluginClassName(
properties.getProperty(PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY));
clientConf.setTlsTrustCertsFilePath(PulsarOptions.TLS_TRUSTCERTS_FILEPATH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a mistake? Have you tested on your local machine?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a mistake? Have you tested on your local machine?

Sorry,I didn't catch your meaning. I had tested on our pulsar cluster. PS: our cluster is run with both token authentication and tls for transport encryption. If only config one auth-plugin, connector would get error.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Pulsar client cannot support JWT authentication with TLS transport, which only provides the use of "AuthenticationTls" to set up TLS transport/authentication, this is a conflict with JWT authentication.

BTW, the trust certificate is a CA certificate, this is useless. Usually, we use mTLS, which requires a certificate and private key.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, I submitted apache/pulsar#15634 to improve this, welcome to review if you are interesting.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nodece Yes, Our cluster scene is JWT for authentication and TLS for transport encryption.
And, tls requires a ca.cert.pem file.

Copy link

@nodece nodece Jun 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if you do not use mTLS, this PR is right.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test for these changes?

}
return clientConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,5 @@ public class PulsarOptions {

public static final String AUTH_PARAMS_KEY = "auth-params";
public static final String AUTH_PLUGIN_CLASSNAME_KEY = "auth-plugin-classname";
public static final String TLS_TRUSTCERTS_FILEPATH = "tls-trustcerts-filepath";
}