Patcharanat P.
This project emphasized setting up environment for Data Engineering pipelines, including:
- Initializing Airflow and Kafka on local environment with docker compose.
- Enabling cloud resources in GCP and AWS for testing.
- Implementing Data Loading Module for data initialization.
What expected from this project is technical detail on how to send data for batch and streaming type with different APIs.
If you're non-technical person or just want concepts of the project, please check concept.md instead of this main documnetation.
- Program Installed
- Python
- Docker
- Terraform
- gcloud CLI
- aws CLI
- Sensitive Information pre-created
# .env # TODO: Get MongoDB Connection URI From Web UI MONGO_URI="mongodb+srv://....mongodb.net" GCP__PROJECT="your-project-name"
# ./.docker/.env # TODO: Set AWS Profile AIRFLOW_PROJ_DIR="../.airflow" AIRFLOW_UID="50000" AWS_PROFILE="<aws-sso-profile>" AWS_ACCESS_KEY_ID="<your AWS Access Key ID with S3 Permission>" AWS_SECRET_ACCESS_KEY="<your AWS Secret Key with S3 Permission>"
# .terraform/aws/terraform.tfvars profile = "<profile-name>"
# .terraform/gcp/terraform.tfvars service_account_email = "<service-account-name>@<gcp-project-name>.iam.gserviceaccount.com" project_id = "<gcp-project-name>" location = "<region>"
make start
# make stop
Explanation
- I modified docker compose template by conduktor to use Kafka with redpanda console (conduktor removed)
- At First, if we use original template from Airflow and redpanda (or conduktor), we will not be able to open redpanda console, due to duplicated port exposed, so changing port for redpanda is an only option.
- Console
- airflow WebUI: http://localhost:8080
- kafka (redpanda) console: http://localhost:8085
- Console
- Redpanda implicitly use port 8080 to expose, can be changed by setting a specific environment variable, but it's unnecessary, because we can change port to be exposed at higher level in docker-compose.
Disclaimer
- Using Docker Compose is not appropriate for production environment.
References
- Airflow Docker Compose Template
- Kafka Docker Compose Template
# AWS
aws configure sso
# SSO session name: <session-name>
# SSO start URL: <retrieved from AWS Identity Center>
# SSO region: <your-sso-region>
# SSO registration scopes: <leave-blank>
aws configure list-profiles
aws sso login --profile <profile-name>
# Login via WebUI
# GCP
gcloud auth application-default login --impersonate-service-account <service-account-name>@<gcp-project-name>.iam.gserviceaccount.com
Explanation
- For me, AWS SSO method is like ADC method in GCP by using account A to act as another account B to grant permissions and be able to interact with cloud resources with permissions of account B.
- Both AWS SSO and GCP ADC are only recommended for local development and make long-lived credentials lesser to be concerned by utilizing global credentials in a local machine with short-lived credentials concept.
References
- Amazon Authentication
- GCP Authentication Detail from Another Project
# GCP
# .terrraform/gcp
# AWS
# .terrraform/aws
terraform init
# terraform validate
# terraform fmt
terraform plan
terraform apply
terraform destroy
References
- Terraform AWS
In this project, we will download raw data from Kaggle with a custom script: kaggle_wrapper.sh. Then, we will convert it from csv to json lines with python script: converter_main.py. Then, we will use this converted data to load to multiple sources to mock up data sources for ELT process, such as MongoDB, Firestore, DynamoDB for NoSQL Database, and Kafka for streaming.
make venv
source pyenv/Scripts/activate
make install
./tools/data_init/scripts/kaggle_wrapper.sh
python tools/data_init/converter_main.py
Then please refer to input_example.sh for initiating loading data to different targets.
References
- Firestore Python API
- DynamoDB Python API
- Kafka Confluent (Python API Client)
- Python Logging Setting up
- Python Logging Formatters
dlt is a modern tool for ELT/ETL data pipeline. It can either extract from data sources and load to various target destinations as a json lines file, or as structured format with schema pre-defined.
Please review concept.md - Detail on Batch Pipeline - dlthub for more detail.
Related file is finance_mongo_s3.py integrating dlt library with Airflow DAG.
References
- dlt
- Airflow
I recommend to read concept of Kafka-connect from this project first at here. What we're gonna do are the following steps:
- Spin up docker compose to initialize these sessions:
- Kafka Broker: to store data in a topic
- Kafka REST proxy: to allow us work with Kafka via REST API
- Kafka Schema registry: to ensure what we send to the topic with JSON pre-defined schema
- Kafka UI (redpanda): to monitor streaming processes
- Kafka Connect: to sink data from the topic to destination (S3)
# workdir: project root directory make start # docker compose -f .docker/docker-compose.yml up --build
- Use test_producer.py to send data to a topic utilizing schema registry and serializer properly. (use the command in test_producer.sh to execute py script)
# workdir: ./kafka python test_producer.py \ --bootstrap-servers localhost:9092 \ --schema-registry http://localhost:8081 \ --topic cards-data \ --source-path ../data/json/cards_data.json \ --schema-path ./schema/json/schema_cards_data.json \ --rows 50
- p.s. Data Loader haven't implement schema registry and serializer properly yet.
- The data will use a schema file for schema registration in schema registry
- Make sure we mounted/exported needed long-lived credentials for Kafka connect (because it doesn't support SSO method)
- If you check docker compose file, you will see in kafka-connect service, I exported environment variables within the continer to authenticate with AWS that's gonna use secret variables in
.env
located in the same directory with docker compose file which is hidden from GitHub. - Can be checked by getting in to kafka-connect container and check exported environment variables
# workdir: anywhere docker exec -it kafka-connect bash env | grep AWS_ # printenv | grep AWS_ # AWS ACCESS ID and Secret Key must be shown
- If you check docker compose file, you will see in kafka-connect service, I exported environment variables within the continer to authenticate with AWS that's gonna use secret variables in
- Deploy Kafka Connector with REST API (can be executed inside or outside of the kafka connect container) using pre-defined connector JSON configuration: s3_sink_cards_data.json.
# kafka connect # deploy kafka connector curl -X POST -H "Content-Type: application/json" --data @connectors_config/s3_sink_cards_data.json http://localhost:8083/connectors # force check connector's status curl -s http://localhost:8083/connectors/s3-sink-kde/status # delete deployed kafka connector curl -X DELETE http://localhost:8083/connectors/s3-sink-kde # Kafka Schema Registry # delete registered schema from schema registry if needed curl -X DELETE http://localhost:8081/subjects/cards-data-value
- Check Connector Status & Check Result in S3
Results:
- Data is sent to a topics name:
cards-data
- Schema Registry is used by the producer as the script specify it to.
- Kafka conenctor is deployed and working as expected. You can see in kafka connect related topics which is created automatically.
- Data is sunk to S3, successfully send data from Kafka topic to the data lake.
- All the options available in S3 Sink Connector is not well-documented in Official S3 Connector Configuration Reference, you have to find it more in Example in main page and some other pages about Converter and Serializer if you need it.
- We have to use the same type of converter and serializer for each key and value
- For example:
- For key, we use
StringSerializer
, then in the connector's configuration we have to specify"key.converter": "org.apache.kafka.connect.storage.StringConverter"
- For value (json data for each record), we use
JsonSerializer
, then in the connector's configuration, we have to use specify"value.converter": "io.confluent.connect.json.JsonSchemaConverter"
- and if you want to use Schema Registry, you have to set
"value.converter.schema.registry.url": "http://kafka-schema-registry:8081"
. - However, all the mentioned options are not shown in reference page. I took some time to figure it out myself in other pages of documentation about kafka connect concept.
- For key, we use
- For example:
- We can debug or see if it's worked or not by checking the status of the deployed connector through REST API command:
# force check connector's status curl -s http://localhost:8083/connectors/s3-sink-kde/status
References
- Kafka Connect Concept - Official Confluent
- How to set connector config & kafka related service (in Thai language) - Medium
- Kafka Amazon S3 Sink Connector
- How to deploy kafka connector with a config file instead of json string in the command - Stackoverflow
- Unknown Magic byte! Trobleshooting: related to Using Different Serializer - GitHub
- How to delete registered schema in schema registry
Please review concept.md for more detail