Skip to content

patcha-ranat/modern-elt-clouds

Repository files navigation

Modern Data Pipeline with Clouds

Patcharanat P.

Overview

This project emphasized setting up environment for Data Engineering pipelines, including:

  1. Initializing Airflow and Kafka on local environment with docker compose.
  2. Enabling cloud resources in GCP and AWS for testing.
  3. Implementing Data Loading Module for data initialization.

What expected from this project is technical detail on how to send data for batch and streaming type with different APIs.

If you're non-technical person or just want concepts of the project, please check concept.md instead of this main documnetation.

modern_pipeline_with_cloud_overview

Table of Contents

  1. Pre-requisites
  2. Setting up Environment
  3. Data Pipelines

1. Pre-requisites

  • Program Installed
    • Python
    • Docker
    • Terraform
    • gcloud CLI
    • aws CLI
  • Sensitive Information pre-created
    # .env
    # TODO: Get MongoDB Connection URI From Web UI
    MONGO_URI="mongodb+srv://....mongodb.net"
    GCP__PROJECT="your-project-name"
    # ./.docker/.env
    # TODO: Set AWS Profile
    AIRFLOW_PROJ_DIR="../.airflow"
    AIRFLOW_UID="50000"
    
    AWS_PROFILE="<aws-sso-profile>"
    AWS_ACCESS_KEY_ID="<your AWS Access Key ID with S3 Permission>"
    AWS_SECRET_ACCESS_KEY="<your AWS Secret Key with S3 Permission>"
    # .terraform/aws/terraform.tfvars
    profile = "<profile-name>"
    # .terraform/gcp/terraform.tfvars
    service_account_email = "<service-account-name>@<gcp-project-name>.iam.gserviceaccount.com"
    project_id            = "<gcp-project-name>"
    location              = "<region>"

2. Setting up Environment

2.1 Initiating Docker Containers

make start

# make stop

Explanation

  • I modified docker compose template by conduktor to use Kafka with redpanda console (conduktor removed)
  • At First, if we use original template from Airflow and redpanda (or conduktor), we will not be able to open redpanda console, due to duplicated port exposed, so changing port for redpanda is an only option.
  • Redpanda implicitly use port 8080 to expose, can be changed by setting a specific environment variable, but it's unnecessary, because we can change port to be exposed at higher level in docker-compose.

Disclaimer

  • Using Docker Compose is not appropriate for production environment.

References

2.2 Cloud Authentication

# AWS
aws configure sso
# SSO session name: <session-name>
# SSO start URL: <retrieved from AWS Identity Center>
# SSO region: <your-sso-region>
# SSO registration scopes: <leave-blank>

aws configure list-profiles

aws sso login --profile <profile-name>
# Login via WebUI

# GCP
gcloud auth application-default login --impersonate-service-account <service-account-name>@<gcp-project-name>.iam.gserviceaccount.com

Explanation

  • For me, AWS SSO method is like ADC method in GCP by using account A to act as another account B to grant permissions and be able to interact with cloud resources with permissions of account B.
  • Both AWS SSO and GCP ADC are only recommended for local development and make long-lived credentials lesser to be concerned by utilizing global credentials in a local machine with short-lived credentials concept.

References

2.3 Initiating Cloud Resources

# GCP
# .terrraform/gcp

# AWS
# .terrraform/aws

terraform init
# terraform validate
# terraform fmt
terraform plan
terraform apply
terraform destroy

References

2.4 Initiating Data

In this project, we will download raw data from Kaggle with a custom script: kaggle_wrapper.sh. Then, we will convert it from csv to json lines with python script: converter_main.py. Then, we will use this converted data to load to multiple sources to mock up data sources for ELT process, such as MongoDB, Firestore, DynamoDB for NoSQL Database, and Kafka for streaming.

make venv

source pyenv/Scripts/activate

make install

./tools/data_init/scripts/kaggle_wrapper.sh

python tools/data_init/converter_main.py

Then please refer to input_example.sh for initiating loading data to different targets.

References

3. Data Pipelines

3.1 Batch - Data Load Tool (dlt/dlthub)

dlt is a modern tool for ELT/ETL data pipeline. It can either extract from data sources and load to various target destinations as a json lines file, or as structured format with schema pre-defined.

Please review concept.md - Detail on Batch Pipeline - dlthub for more detail.

Related file is finance_mongo_s3.py integrating dlt library with Airflow DAG.

References

3.2 Streaming - Kafka

modern_pipeline_kafka_architecture

I recommend to read concept of Kafka-connect from this project first at here. What we're gonna do are the following steps:

  1. Spin up docker compose to initialize these sessions:
    • Kafka Broker: to store data in a topic
    • Kafka REST proxy: to allow us work with Kafka via REST API
    • Kafka Schema registry: to ensure what we send to the topic with JSON pre-defined schema
    • Kafka UI (redpanda): to monitor streaming processes
    • Kafka Connect: to sink data from the topic to destination (S3)
    # workdir: project root directory
    
    make start
    # docker compose -f .docker/docker-compose.yml up --build
  2. Use test_producer.py to send data to a topic utilizing schema registry and serializer properly. (use the command in test_producer.sh to execute py script)
    # workdir: ./kafka
    
    python test_producer.py \
        --bootstrap-servers localhost:9092 \
        --schema-registry  http://localhost:8081 \
        --topic cards-data \
        --source-path ../data/json/cards_data.json \
        --schema-path ./schema/json/schema_cards_data.json \
        --rows 50 
    • p.s. Data Loader haven't implement schema registry and serializer properly yet.
    • The data will use a schema file for schema registration in schema registry
  3. Make sure we mounted/exported needed long-lived credentials for Kafka connect (because it doesn't support SSO method)
    • If you check docker compose file, you will see in kafka-connect service, I exported environment variables within the continer to authenticate with AWS that's gonna use secret variables in .env located in the same directory with docker compose file which is hidden from GitHub.
    • Can be checked by getting in to kafka-connect container and check exported environment variables
    # workdir: anywhere
    docker exec -it kafka-connect bash
    
    env | grep AWS_
    # printenv | grep AWS_
    # AWS ACCESS ID and Secret Key must be shown
  4. Deploy Kafka Connector with REST API (can be executed inside or outside of the kafka connect container) using pre-defined connector JSON configuration: s3_sink_cards_data.json.
    # kafka connect
    
    # deploy kafka connector
    curl -X POST -H "Content-Type: application/json" --data @connectors_config/s3_sink_cards_data.json http://localhost:8083/connectors
    
    # force check connector's status
    curl -s http://localhost:8083/connectors/s3-sink-kde/status
    
    # delete deployed kafka connector
    curl -X DELETE http://localhost:8083/connectors/s3-sink-kde
    
    # Kafka Schema Registry
    # delete registered schema from schema registry if needed
    curl -X DELETE http://localhost:8081/subjects/cards-data-value
  5. Check Connector Status & Check Result in S3

Results:

  • Data is sent to a topics name: cards-data kafka_data
  • Schema Registry is used by the producer as the script specify it to. kafka_schema_registry
  • Kafka conenctor is deployed and working as expected. You can see in kafka connect related topics which is created automatically. kafka_connect_working
  • Data is sunk to S3, successfully send data from Kafka topic to the data lake. kafka_sink_output

Troubleshoots

  • All the options available in S3 Sink Connector is not well-documented in Official S3 Connector Configuration Reference, you have to find it more in Example in main page and some other pages about Converter and Serializer if you need it.
  • We have to use the same type of converter and serializer for each key and value
    • For example:
      • For key, we use StringSerializer, then in the connector's configuration we have to specify "key.converter": "org.apache.kafka.connect.storage.StringConverter"
      • For value (json data for each record), we use JsonSerializer, then in the connector's configuration, we have to use specify "value.converter": "io.confluent.connect.json.JsonSchemaConverter"
      • and if you want to use Schema Registry, you have to set "value.converter.schema.registry.url": "http://kafka-schema-registry:8081".
      • However, all the mentioned options are not shown in reference page. I took some time to figure it out myself in other pages of documentation about kafka connect concept.
  • We can debug or see if it's worked or not by checking the status of the deployed connector through REST API command:
    # force check connector's status
    curl -s http://localhost:8083/connectors/s3-sink-kde/status

References

Please review concept.md for more detail


About

Batch / Streaming Data Loading with NoSQL on GCP, AWS, and Kafka

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published