Skip to content

Commit afff79b

Browse files
committed
Deep support ddl
1 parent 36aa07b commit afff79b

22 files changed

+1000
-349
lines changed

.github/workflows/pypi.yml

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
name: pypi
22
on:
3-
push:
4-
paths:
5-
- 'mysql2ch/version.py'
6-
- '.github/workflows/pypi.yml'
3+
release:
4+
types:
5+
- created
76
jobs:
87
build:
98
runs-on: ubuntu-latest
109
steps:
1110
- uses: actions/checkout@v2
12-
- uses: actions/setup-python@v1
11+
- uses: actions/setup-python@v2
1312
with:
1413
python-version: '3.x'
14+
- uses: actions/cache@v1
15+
with:
16+
path: ~/.cache/pip
17+
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }}
18+
restore-keys: |
19+
${{ runner.os }}-pip-
1520
- name: Build dists
1621
run: |
1722
python3 setup.py sdist

.github/workflows/pytest.yml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
name: pytest
2+
on: [push, pull_request]
3+
jobs:
4+
build:
5+
runs-on: ubuntu-latest
6+
steps:
7+
- uses: actions/checkout@v2
8+
- uses: actions/setup-python@v2
9+
with:
10+
python-version: '3.x'
11+
- uses: actions/cache@v1
12+
with:
13+
path: ~/.cache/pip
14+
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements-dev.txt') }}
15+
restore-keys: |
16+
${{ runner.os }}-pip-
17+
- name: Install dependencies
18+
run: |
19+
python -m pip install --upgrade pip
20+
pip install -r requirements-dev.txt
21+
- name: Test with pytest
22+
run: |
23+
pytest

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
# ChangeLog
22

3-
## 0.4.0
3+
## 0.4.3
4+
- Deep support ddl.
45

6+
## 0.4.0
57
- Most of the rewrite.
68
- Remove read config from env,instead of config.json.
79
- Remove ui module.

Makefile

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
checkfiles = mysql2ch/ tests/
2+
black_opts = -l 100 -t py38
3+
py_warn = PYTHONDEVMODE=1
4+
5+
help:
6+
@echo "mysl2ch development makefile"
7+
@echo
8+
@echo "usage: make <target>"
9+
@echo "Targets:"
10+
@echo " up Updates dev/test dependencies"
11+
@echo " deps Ensure dev/test dependencies are installed"
12+
@echo " check Checks that build is sane"
13+
@echo " lint Reports all linter violations"
14+
@echo " test Runs all tests"
15+
@echo " style Auto-formats the code"
16+
17+
deps:
18+
@which pip-sync > /dev/null || pip install -q pip-tools
19+
@pip install -r requirements-dev.txt
20+
21+
up:
22+
CUSTOM_COMPILE_COMMAND="make up" pip-compile -o requirements-dev.txt -U
23+
sed -i "s/^-e .*/-e ./" requirements.txt
24+
25+
style: deps
26+
isort -rc $(checkfiles)
27+
black $(black_opts) $(checkfiles)
28+
29+
check: deps
30+
ifneq ($(shell which black),)
31+
black --check $(black_opts) $(checkfiles) || (echo "Please run 'make style' to auto-fix style issues" && false)
32+
endif
33+
flake8 $(checkfiles)
34+
mypy $(checkfiles)
35+
pylint -d C,W,R $(checkfiles)
36+
bandit -r $(checkfiles)
37+
python setup.py check -mrs
38+
39+
lint: deps
40+
ifneq ($(shell which black),)
41+
black --check $(black_opts) $(checkfiles) || (echo "Please run 'make style' to auto-fix style issues" && false)
42+
endif
43+
flake8 $(checkfiles)
44+
mypy $(checkfiles)
45+
pylint $(checkfiles)
46+
bandit -r $(checkfiles)
47+
python setup.py check -mrs
48+
49+
test: deps
50+
$(py_warn) py.test
51+
52+
publish: deps
53+
rm -fR dist/
54+
python setup.py sdist
55+
twine upload dist/*

mysql2ch/__init__.py

Lines changed: 1 addition & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,75 +1 @@
1-
import logging
2-
import sys
3-
from typing import List, Dict, Union, Optional
4-
from pydantic import BaseSettings, HttpUrl, validator
5-
6-
7-
def partitioner(key_bytes, all_partitions, available_partitions):
8-
"""
9-
custom partitioner depend on settings
10-
:param key_bytes:
11-
:param all_partitions:
12-
:param available_partitions:
13-
:return:
14-
"""
15-
key = key_bytes.decode()
16-
partition = Global.settings.schema_table.get(key).get('kafka_partition')
17-
return all_partitions[partition]
18-
19-
20-
def init_logging(debug):
21-
logger = logging.getLogger('mysql2ch')
22-
if debug:
23-
logger.setLevel(logging.DEBUG)
24-
else:
25-
logger.setLevel(logging.INFO)
26-
sh = logging.StreamHandler(sys.stdout)
27-
sh.setLevel(logging.DEBUG)
28-
sh.setFormatter(logging.Formatter(
29-
fmt='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s',
30-
datefmt='%Y-%m-%d %H:%M:%S'
31-
))
32-
logger.addHandler(sh)
33-
34-
35-
class Settings(BaseSettings):
36-
debug: bool = True
37-
environment: str = 'development'
38-
mysql_host: str = '127.0.0.1'
39-
mysql_port: int = 3306
40-
mysql_user: str = 'root'
41-
mysql_password: str = '123456'
42-
mysql_server_id: int = 1
43-
redis_host: str = '120.0.0.1'
44-
redis_port: int = 6379
45-
redis_password: str = None
46-
redis_db: int = 0
47-
clickhouse_host: str = '127.0.0.1'
48-
clickhouse_port: int = 9000
49-
clickhouse_user: str = 'default'
50-
clickhouse_password: str = None
51-
kafka_server: str = '127.0.0.1:9092'
52-
kafka_topic: str = 'test'
53-
sentry_dsn: Optional[HttpUrl]
54-
schema_table: Dict[str, Dict[str, Union[List[str], int]]]
55-
init_binlog_file: str
56-
init_binlog_pos: int
57-
log_pos_prefix: str = 'mysql2ch'
58-
skip_delete_tables: List[str]
59-
skip_update_tables: List[str]
60-
skip_dmls: List[str]
61-
insert_num: int = 20000
62-
insert_interval: int = 60
63-
64-
@validator('schema_table')
65-
def check_kafka_partition(cls, v):
66-
partitions = list(map(lambda x: v.get(x).get('kafka_partition'), v))
67-
if len(partitions) != len(set(partitions)):
68-
raise ValueError('kafka_partition must be unique for schema!')
69-
return v
70-
71-
72-
class Global:
73-
settings: Optional['Settings'] = None
74-
reader = None
75-
writer = None
1+
__version__ = "0.4.3"

mysql2ch/cli.py

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,71 +4,79 @@
44
import sentry_sdk
55
from sentry_sdk.integrations.redis import RedisIntegration
66

7-
import mysql2ch
8-
from mysql2ch import Settings
7+
from mysql2ch.common import Global, Settings, init_logging
98
from mysql2ch.consumer import consume
109
from mysql2ch.producer import produce
1110
from mysql2ch.reader import MysqlReader
1211
from mysql2ch.replication import make_etl
1312
from mysql2ch.writer import ClickHouseWriter
1413

15-
logger = logging.getLogger('mysql2ch.manage')
14+
logger = logging.getLogger("mysql2ch.manage")
1615

1716

1817
def run(args):
1918
config = args.config
20-
settings = mysql2ch.Global.settings = Settings.parse_file(config)
19+
settings = Global.settings = Settings.parse_file(config)
2120

2221
sentry_sdk.init(
23-
settings.sentry_dsn,
24-
environment=settings.environment,
25-
integrations=[RedisIntegration()]
22+
settings.sentry_dsn, environment=settings.environment, integrations=[RedisIntegration()]
2623
)
2724

28-
mysql2ch.init_logging(settings.debug)
29-
mysql2ch.Global.writer = ClickHouseWriter(
25+
init_logging(settings.debug)
26+
Global.writer = ClickHouseWriter(
3027
host=settings.clickhouse_host,
3128
port=settings.clickhouse_port,
3229
password=settings.clickhouse_password,
33-
user=settings.clickhouse_user
30+
user=settings.clickhouse_user,
3431
)
35-
mysql2ch.Global.reader = MysqlReader(
32+
Global.reader = MysqlReader(
3633
host=settings.mysql_host,
3734
port=settings.mysql_port,
3835
password=settings.mysql_password,
39-
user=settings.mysql_user
36+
user=settings.mysql_user,
4037
)
4138

4239
args.func(args)
4340

4441

4542
def cli():
46-
parser = argparse.ArgumentParser(
47-
description='Sync data from MySQL to ClickHouse.',
43+
parser = argparse.ArgumentParser(description="Sync data from MySQL to ClickHouse.",)
44+
parser.add_argument("-c", "--config", required=True, help="Config json file.")
45+
subparsers = parser.add_subparsers(title="subcommands")
46+
parser_etl = subparsers.add_parser("etl")
47+
parser_etl.add_argument("--schema", required=True, help="Schema to full etl.")
48+
parser_etl.add_argument(
49+
"--tables",
50+
required=False,
51+
help="Tables to full etl,multiple tables split with comma,default read from environment.",
52+
)
53+
parser_etl.add_argument(
54+
"--renew",
55+
default=False,
56+
action="store_true",
57+
help="Etl after try to drop the target tables.",
4858
)
49-
parser.add_argument('-c', '--config', required=True, help='Config json file.')
50-
subparsers = parser.add_subparsers(title='subcommands')
51-
parser_etl = subparsers.add_parser('etl')
52-
parser_etl.add_argument('--schema', required=True, help='Schema to full etl.')
53-
parser_etl.add_argument('--tables', required=False,
54-
help='Tables to full etl,multiple tables split with comma,default read from environment.')
55-
parser_etl.add_argument('--renew', default=False, action='store_true',
56-
help='Etl after try to drop the target tables.')
5759
parser_etl.set_defaults(run=run, func=make_etl)
5860

59-
parser_producer = subparsers.add_parser('produce')
61+
parser_producer = subparsers.add_parser("produce")
6062
parser_producer.set_defaults(run=run, func=produce)
6163

62-
parser_consumer = subparsers.add_parser('consume')
63-
parser_consumer.add_argument('--schema', required=True, help='Schema to consume.')
64-
parser_consumer.add_argument('--skip-error', action='store_true', default=False, help='Skip error rows.')
65-
parser_consumer.add_argument('--auto-offset-reset', required=False, default='earliest',
66-
help='Kafka auto offset reset,default earliest.')
64+
parser_consumer = subparsers.add_parser("consume")
65+
parser_consumer.add_argument("--schema", required=True, help="Schema to consume.")
66+
parser_consumer.add_argument(
67+
"--skip-error", action="store_true", default=False, help="Skip error rows."
68+
)
69+
parser_consumer.add_argument(
70+
"--auto-offset-reset",
71+
required=False,
72+
default="earliest",
73+
help="Kafka auto offset reset,default earliest.",
74+
)
6775
parser_consumer.set_defaults(run=run, func=consume)
6876

6977
parse_args = parser.parse_args()
7078
parse_args.run(parse_args)
7179

7280

73-
if __name__ == '__main__':
81+
if __name__ == "__main__":
7482
cli()

0 commit comments

Comments
 (0)