The Confluent Cloud Clients Python Library provides a set of clients for interacting with Confluent Cloud REST APIs. The library includes clients for:
- Flink
- Kafka
- Schema Registry
- Tableflow
Note: This library is in active development and is subject to change. It covers only the methods I have needed so far. If you need a method that is not covered, please feel free to open an issue or submit a pull request.
Table of Contents
The Flink Client provides the following methods:
delete_statement
delete_statements_by_phase
drop_table
Note: "The
drop_table
method will drop the table and all associated statements, including the backing Kafka Topic and Schemas."get_compute_pool
get_compute_pool_list
get_statement_list
stop_statement
Note: "Confluent Cloud for Apache Flink enforces a 30-day retention for statements in terminal states."
submit_statement
update_statement
update_all_sink_statements
The Kafka Client provides the following methods:
delete_kafka_topic
kafka_topic_exist
The Schema Registry Client provides the following methods:
convert_avro_schema_into_string
delete_kafka_topic_key_schema_subject
delete_kafka_topic_value_schema_subject
get_global_topic_subject_compatibility_level
get_topic_subject_compatibility_level
get_topic_subject_latest_schema
register_topic_subject_schema
set_topic_subject_compatibility_level
The Tableflow Client provides the following methods:
get_tableflow_topic
get_tableflow_topic_table_path
The library includes unit tests for each client. The tests are located in the tests
directory. To use them, you must clone the repo locally:
git clone https://github.com/j3-signalroom/cc-clients-python_lib.git
Since this project was built using uv
, please install it, and then run the following command to install all the project dependencies:
uv sync
Then within the tests
directory, create the .env
file and add the following environment variables, filling them with your Confluent Cloud credentials and other required values:
SCHEMA_REGISTRY_URL=
SCHEMA_REGISTRY_API_KEY=
SCHEMA_REGISTRY_API_SECRET=
KAFKA_TOPIC_NAME=
FLINK_API_KEY=
FLINK_API_SECRET=
ORGANIZATION_ID=
ENVIRONMENT_ID=
CLOUD_PROVIDER=
CLOUD_REGION=
COMPUTE_POOL_ID=
PRINCIPAL_ID=
FLINK_STATEMENT_NAME=
FLINK_CATALOG_NAME=
FLINK_DATABASE_NAME=
FLINK_TABLE_NAME=
BOOTSTRAP_SERVER_ID=
BOOTSTRAP_SERVER_CLOUD_PROVIDER=
BOOTSTRAP_SERVER_CLOUD_REGION=
KAFKA_CLUSTER_ID=
KAFKA_API_KEY=
KAFKA_API_SECRET=
CONFLUENT_CLOUD_API_KEY=
CONFLUENT_CLOUD_API_SECRET=
TABLEFLOW_API_KEY=
TABLEFLOW_API_SECRET=
To run a specific test, use one of the following commands:
Unit Test | Command |
---|---|
Delete a Flink Statement | pytest -s tests/test_flink_client.py::test_delete_statement |
Delete all Flink Statements by Phase | pytest -s tests/test_flink_client.py::test_delete_statements_by_phase |
Get list of the all the Statements | pytest -s tests/test_flink_client.py::test_get_statement_list |
Submit a Flink Statement | pytest -s tests/test_flink_client.py::test_submit_statement |
Get Compute Pool List | pytest -s tests/test_flink_client.py::test_get_compute_pool_list |
Get Compute Pool | pytest -s tests/test_flink_client.py::test_get_compute_pool |
Stop a Flink Statement | pytest -s tests/test_flink_client.py::test_stop_statement |
Update a Flink Statement | pytest -s tests/test_flink_client.py::test_update_statement |
Update all the Sink Statements | pytest -s tests/test_flink_client.py::test_update_all_sink_statements |
Drop a Flink Table along with any associated statements, including the backing Kafka Topic and Schemas | pytest -s tests/test_flink_client.py::test_drop_table |
Otherwise, to run all the tests, use the following command:
pytest -s tests/test_flink_client.py
Note: The tests are designed to be run in a specific order. If you run them out of order, you may encounter errors. The tests are also designed to be run against a Confluent Cloud environment. If you run them against a local environment, you may encounter errors.
To run a specific test, use one of the following commands:
Unit Test | Command |
---|---|
Delete a Kafka Topic | pytest -s tests/test_kafka_client.py::test_delete_kafka_topic |
Checks if a Kafka Topic Exist | pytest -s tests/test_kafkaclient.py::test_kafka_topic_exist |
Otherwise, to run all the tests, use the following command:
pytest -s tests/test_kafka_client.py
Note: The tests are designed to be run in a specific order. If you run them out of order, you may encounter errors. The tests are also designed to be run against a Confluent Cloud environment. If you run them against a local environment, you may encounter errors.
To run a specific test, use one of the following commands:
Unit Test | Command |
---|---|
Get the Subject Compatibility Level | pytest -s tests/test_schema_registry_client.py::test_get_subject_compatibility_level |
Delete the Kafka Topic Key Schema Subject | pytest -s tests/test_schema_registry_client.py::test_delete_kafka_topic_key_schema_subject |
Delete the Kafka Topic Value Schema Subject | pytest -s tests/test_schema_registry_client.py::test_delete_kafka_topic_value_schema_subject |
Otherwise, to run all the tests, use the following command:
pytest -s tests/test_schema_registry_client.py
Note: The tests are designed to be run in a specific order. If you run them out of order, you may encounter errors. The tests are also designed to be run against a Confluent Cloud environment. If you run them against a local environment, you may encounter errors.
To run a specific test, use one of the following commands:
Unit Test | Command |
---|---|
Get the Tableflow Topic | pytest -s tests/test_tableflow_client.py::test_get_tableflow_topic |
Get the Tableflow Topic Table Path | pytest -s tests/test_tableflow_client.py::test_get_tableflow_topic_table_path |
Otherwise, to run all the tests, use the following command:
pytest -s tests/test_tableflow_client.py
Note: The tests are designed to be run in a specific order. If you run them out of order, you may encounter errors. The tests are also designed to be run against a Confluent Cloud environment. If you run them against a local environment, you may encounter errors.
Install the Confluent Cloud Clients Python Library using pip
:
pip install cc-clients-python-lib
Or, using uv
:
uv add cc-clients-python-lib
- Flink SQL REST API for Confluent Cloud for Apache Flink
- Kafka REST APIs for Confluent Cloud
- Confluent Cloud APIs - Topic (v3)
- Confluent Cloud Schema Registry REST API Usage