Skip to content

Commit a61b013

Browse files
authored
Merge pull request #6 from openstates/set-prefix-bucket-directory
Use DAG run start time
2 parents 2953884 + 0b04550 commit a61b013

File tree

7 files changed

+127
-19
lines changed

7 files changed

+127
-19
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@ logs
22
_data
33
.cache
44
merged_*
5-
db.db
5+
db.*
66
__pycache__
77
*json

audits/event.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
-- all events have sponsors?
1+
-- all events are classified?
22
AUDIT (
33
name assert_events_are_classified,
44
blocking false

main.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import argparse
22

33
from sqlmesh_tasks import sqlmesh_plan
4+
from openstates_metadata import lookup
5+
from utils import send_slack_message
46

5-
if __name__ == "__main__":
7+
8+
def main() -> None:
69
default_parser = argparse.ArgumentParser(add_help=False)
710

811
parser = argparse.ArgumentParser(
@@ -25,10 +28,26 @@
2528

2629
args = parser.parse_args()
2730
entity = args.entity
28-
jurisdiction = args.jurisdiction
31+
jur_obj = lookup(abbr=args.jurisdiction) if args.jurisdiction else None
2932

3033
if entity:
3134
entities = [entity]
3235
else:
3336
entities = ["bill", "event", "vote_event"]
34-
report = sqlmesh_plan(entities, jurisdiction)
37+
38+
# Use Jurisdiction if it is provided
39+
if jur_obj:
40+
reports = sqlmesh_plan(entities, jur_obj.jurisdiction_id)
41+
else:
42+
reports = sqlmesh_plan(entities)
43+
44+
# Send report
45+
if reports:
46+
reports = "\n".join(reports)
47+
jur_name = jur_obj.name if jur_obj else ""
48+
msg = f"Scrape Output Audit for {jur_name}: \n{reports}"
49+
send_slack_message("data-reports", msg)
50+
51+
52+
if __name__ == "__main__":
53+
main()

poetry.lock

Lines changed: 62 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ python = "^3.9"
99
sqlmesh = "^0.176.0"
1010
duckdb = "^1.2.2"
1111
google-cloud-storage = "^3.1.0"
12+
slack-sdk = "^3.35.0"
13+
openstates-metadata = "^2024.10.3"
1214

1315
[tool.poetry.group.dev.dependencies]
1416
black = "^22"

sqlmesh_tasks.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ def extract_audit_error(stdout: str) -> typing.Union[str, None]:
2020
return None
2121

2222

23-
def sqlmesh_plan(entities: list[str], jurisdiction: str) -> list:
23+
def sqlmesh_plan(entities: list[str], jurisdiction: str = None) -> list:
2424
"""Run SQLMesh plan on initialized DuckDB data"""
2525

26-
initialize_entities = init_duckdb(jurisdiction, entities)
26+
initialize_entities = init_duckdb(entities, jurisdiction)
2727
initialize_entities = [f"staged.{entity}" for entity in initialize_entities]
2828
reports = []
2929

@@ -60,7 +60,7 @@ def sqlmesh_plan(entities: list[str], jurisdiction: str) -> list:
6060
raise
6161

6262
if report:
63-
logger.info(f"Entity: {entity} audit failed:\n", report)
63+
logger.info(f"Entity: {entity} audit failed {report}")
6464
reports.append(report)
6565
else:
6666
logger.info(f"Entity: {entity} audit passed.")

utils.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,36 @@
44

55
import duckdb
66
from google.cloud import storage
7+
from slack_sdk.web import WebClient
78

89
logging.basicConfig(level=logging.INFO)
910
logger = logging.getLogger("openstates")
1011

1112
GCP_PROJECT = os.environ.get("GCP_PROJECT", None)
1213
BUCKET_NAME = os.environ.get("BUCKET_NAME", None)
1314
SCRAPE_LAKE_PREFIX = os.environ.get("BUCKET_PREFIX", "legislation")
15+
DAG_RUN_START = os.environ.get("DAG_RUN_START", None)
16+
SLACK_BEARER_TOKEN = os.environ.get("SLACK_BEARER_TOKEN", None)
17+
18+
19+
def send_slack_message(channel, msg=None, attachments=None) -> None:
20+
if not SLACK_BEARER_TOKEN:
21+
logger.warning("No SLACK_BEARER_TOKEN, cannot send slack notification.")
22+
return
23+
sendobj = {"channel": channel}
24+
if msg:
25+
sendobj["text"] = msg
26+
if attachments:
27+
if len(attachments) > 50:
28+
attachments.insert(0, {"title": "Too many attachments", "color": "FF3333"})
29+
attachments = attachments[:50]
30+
sendobj["attachments"] = attachments
31+
if sendobj.get("text", "") or sendobj.get("attachments", ""):
32+
try:
33+
client = WebClient(token=SLACK_BEARER_TOKEN)
34+
client.chat_postMessage(**sendobj)
35+
except Exception as e:
36+
logger.error(f"Couldn't send slack message: {e}")
1437

1538

1639
def check_for_json_files(file_path: str) -> bool:
@@ -50,35 +73,40 @@ def download_files_from_gcs(file_path: str) -> None:
5073

5174

5275
def init_duckdb(
53-
jurisdiction: str,
5476
entities: list[str],
55-
last_scrape_end_time: str = None,
77+
jurisdiction: str = None,
5678
) -> list[str]:
5779
"""Initialize Duckdb and load data, return list of tables created for usage downstream."""
5880

5981
db_path = "db.db"
6082
if os.path.exists(db_path):
6183
os.remove(db_path)
6284

63-
sub_directory = "*"
64-
if jurisdiction and last_scrape_end_time:
65-
sub_directory = jurisdiction.replace("ocd-jurisdiction/", "")
66-
sub_directory = f"{sub_directory}/{last_scrape_end_time}"
85+
# Determine subdirectory pattern for file search
86+
if jurisdiction and DAG_RUN_START:
87+
# Strip OCD prefix and build a dynamic path using DAG_RUN_START
88+
relative_path = jurisdiction.replace("ocd-jurisdiction/", "")
89+
sub_directory = f"**/{relative_path}/{DAG_RUN_START}"
90+
else:
91+
sub_directory = "**"
92+
6793
# Create DuckDB and load
6894
logger.info("Creating DuckDB schema and loading data...")
6995
con = duckdb.connect(db_path)
7096
con.execute("CREATE SCHEMA IF NOT EXISTS scraper")
7197
table_created = []
7298

73-
file_path_prefix = f"./*/{sub_directory}"
99+
file_path_prefix = f"./{sub_directory}"
74100
all_files_path = f"{file_path_prefix}/*.json"
75101

76102
# Grab scrape output from data lake if none is provided
77103
if not check_for_json_files(all_files_path):
78104
logger.info(
79105
"No file found in local directory, attempting to download from GCS, requires credentials in ENV."
80106
)
81-
download_files_from_gcs(sub_directory)
107+
# Remove "**/" from path prefix before passing to GCS downloader
108+
gcs_path = sub_directory[3:] if sub_directory.startswith("**/") else sub_directory
109+
download_files_from_gcs(gcs_path)
82110

83111
# Load data into duckdb table
84112
for entity in entities:

0 commit comments

Comments
 (0)