Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 3bb6261

Browse files
Feature Implementation: AWS Glue Job Execution Support (#308)
Co-authored-by: Alexander Streed <[email protected]> Co-authored-by: Alexander Streed <[email protected]>
1 parent 1e21cd5 commit 3bb6261

File tree

6 files changed

+356
-2
lines changed

6 files changed

+356
-2
lines changed

docs/glue_job.md

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
description: Tasks for interacting with AWS Glue Job
3+
notes: This documentation page is generated from source file docstrings.
4+
---
5+
6+
::: prefect_aws.glue_job

mkdocs.yml

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ nav:
9090
- Lambda: lambda_function.md
9191
- Deployments:
9292
- Steps: deployments/steps.md
93+
- Glue Job: glue_job.md
9394
- S3: s3.md
9495
- Secrets Manager: secrets_manager.md
9596

prefect_aws/glue_job.py

+188
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
"""
2+
Integrations with the AWS Glue Job.
3+
4+
"""
5+
import time
6+
from typing import Any, Optional
7+
8+
from prefect.blocks.abstract import JobBlock, JobRun
9+
from pydantic import VERSION as PYDANTIC_VERSION
10+
11+
if PYDANTIC_VERSION.startswith("2."):
12+
from pydantic.v1 import BaseModel, Field
13+
else:
14+
from pydantic import BaseModel, Field
15+
16+
from prefect_aws import AwsCredentials
17+
18+
_GlueJobClient = Any
19+
20+
21+
class GlueJobRun(JobRun, BaseModel):
22+
"""Execute a Glue Job"""
23+
24+
job_name: str = Field(
25+
...,
26+
title="AWS Glue Job Name",
27+
description="The name of the job definition to use.",
28+
)
29+
30+
job_id: str = Field(
31+
...,
32+
title="AWS Glue Job ID",
33+
description="The ID of the job run.",
34+
)
35+
36+
job_watch_poll_interval: float = Field(
37+
default=60.0,
38+
description=(
39+
"The amount of time to wait between AWS API calls while monitoring the "
40+
"state of an Glue Job."
41+
),
42+
)
43+
44+
_error_states = ["FAILED", "STOPPED", "ERROR", "TIMEOUT"]
45+
46+
aws_credentials: AwsCredentials = Field(
47+
title="AWS Credentials",
48+
default_factory=AwsCredentials,
49+
description="The AWS credentials to use to connect to Glue.",
50+
)
51+
52+
client: _GlueJobClient = Field(default=None, description="")
53+
54+
async def fetch_result(self) -> str:
55+
"""fetch glue job state"""
56+
job = self._get_job_run()
57+
return job["JobRun"]["JobRunState"]
58+
59+
def wait_for_completion(self) -> None:
60+
"""
61+
Wait for the job run to complete and get exit code
62+
"""
63+
self.logger.info(f"watching job {self.job_name} with run id {self.job_id}")
64+
while True:
65+
job = self._get_job_run()
66+
job_state = job["JobRun"]["JobRunState"]
67+
if job_state in self._error_states:
68+
# Generate a dynamic exception type from the AWS name
69+
self.logger.error(f"job failed: {job['JobRun']['ErrorMessage']}")
70+
raise RuntimeError(job["JobRun"]["ErrorMessage"])
71+
elif job_state == "SUCCEEDED":
72+
self.logger.info(f"job succeeded: {self.job_id}")
73+
break
74+
75+
time.sleep(self.job_watch_poll_interval)
76+
77+
def _get_job_run(self):
78+
"""get glue job"""
79+
return self.client.get_job_run(JobName=self.job_name, RunId=self.job_id)
80+
81+
82+
class GlueJobBlock(JobBlock):
83+
"""Execute a job to the AWS Glue Job service.
84+
85+
Attributes:
86+
job_name: The name of the job definition to use.
87+
arguments: The job arguments associated with this run.
88+
For this job run, they replace the default arguments set in the job
89+
definition itself.
90+
You can specify arguments here that your own job-execution script consumes,
91+
as well as arguments that Glue itself consumes.
92+
Job arguments may be logged. Do not pass plaintext secrets as arguments.
93+
Retrieve secrets from a Glue Connection, Secrets Manager or other secret
94+
management mechanism if you intend to keep them within the Job.
95+
[doc](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html)
96+
job_watch_poll_interval: The amount of time to wait between AWS API
97+
calls while monitoring the state of a Glue Job.
98+
default is 60s because of jobs that use AWS Glue versions 2.0 and later
99+
have a 1-minute minimum.
100+
[AWS Glue Pricing](https://aws.amazon.com/glue/pricing/?nc1=h_ls)
101+
102+
Example:
103+
Start a job to AWS Glue Job.
104+
```python
105+
from prefect import flow
106+
from prefect_aws import AwsCredentials
107+
from prefect_aws.glue_job import GlueJobBlock
108+
109+
110+
@flow
111+
def example_run_glue_job():
112+
aws_credentials = AwsCredentials(
113+
aws_access_key_id="your_access_key_id",
114+
aws_secret_access_key="your_secret_access_key"
115+
)
116+
glue_job_run = GlueJobBlock(
117+
job_name="your_glue_job_name",
118+
arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
119+
).trigger()
120+
121+
return glue_job_run.wait_for_completion()
122+
123+
124+
example_run_glue_job()
125+
```
126+
"""
127+
128+
job_name: str = Field(
129+
...,
130+
title="AWS Glue Job Name",
131+
description="The name of the job definition to use.",
132+
)
133+
134+
arguments: Optional[dict] = Field(
135+
default=None,
136+
title="AWS Glue Job Arguments",
137+
description="The job arguments associated with this run.",
138+
)
139+
job_watch_poll_interval: float = Field(
140+
default=60.0,
141+
description=(
142+
"The amount of time to wait between AWS API calls while monitoring the "
143+
"state of an Glue Job."
144+
),
145+
)
146+
147+
aws_credentials: AwsCredentials = Field(
148+
title="AWS Credentials",
149+
default_factory=AwsCredentials,
150+
description="The AWS credentials to use to connect to Glue.",
151+
)
152+
153+
async def trigger(self) -> GlueJobRun:
154+
"""trigger for GlueJobRun"""
155+
client = self._get_client()
156+
job_run_id = self._start_job(client)
157+
return GlueJobRun(
158+
job_name=self.job_name,
159+
job_id=job_run_id,
160+
job_watch_poll_interval=self.job_watch_poll_interval,
161+
)
162+
163+
def _start_job(self, client: _GlueJobClient) -> str:
164+
"""
165+
Start the AWS Glue Job
166+
[doc](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/client/start_job_run.html)
167+
"""
168+
self.logger.info(
169+
f"starting job {self.job_name} with arguments {self.arguments}"
170+
)
171+
try:
172+
response = client.start_job_run(
173+
JobName=self.job_name,
174+
Arguments=self.arguments,
175+
)
176+
job_run_id = str(response["JobRunId"])
177+
self.logger.info(f"job started with job run id: {job_run_id}")
178+
return job_run_id
179+
except Exception as e:
180+
self.logger.error(f"failed to start job: {e}")
181+
raise RuntimeError
182+
183+
def _get_client(self) -> _GlueJobClient:
184+
"""
185+
Retrieve a Glue Job Client
186+
"""
187+
boto_session = self.aws_credentials.get_boto3_session()
188+
return boto_session.client("glue")

requirements.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ botocore>=1.27.53
33
mypy_boto3_s3>=1.24.94
44
mypy_boto3_secretsmanager>=1.26.49
55
prefect>=2.16.4
6-
tenacity>=8.0.0
6+
pyparsing>=3.1.1
7+
tenacity>=8.0.0

tests/mock_aws_credentials

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
[TEST_PROFILE_1]
22
aws_access_key_id = mock
33
aws_secret_access_key = mock
4+
aws_region = us-east-1
5+
aws_default_region = us-east-1
46

57
[TEST_PROFILE_2]
68
aws_access_key_id = mock
7-
aws_secret_access_key = mock
9+
aws_secret_access_key = mock
10+
aws_region = us-east-1
11+
aws_default_region = us-east-1

tests/test_glue_job.py

+154
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
from unittest.mock import MagicMock
2+
3+
import pytest
4+
from moto import mock_glue
5+
6+
from prefect_aws.glue_job import GlueJobBlock, GlueJobRun
7+
8+
9+
@pytest.fixture(scope="function")
10+
def glue_job_client(aws_credentials):
11+
with mock_glue():
12+
boto_session = aws_credentials.get_boto3_session()
13+
yield boto_session.client("glue", region_name="us-east-1")
14+
15+
16+
async def test_fetch_result(aws_credentials, glue_job_client):
17+
glue_job_client.create_job(
18+
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={}
19+
)
20+
job_run_id = glue_job_client.start_job_run(
21+
JobName="test_job_name",
22+
Arguments={},
23+
)["JobRunId"]
24+
glue_job_run = GlueJobRun(
25+
job_name="test_job_name", job_id=job_run_id, client=glue_job_client
26+
)
27+
result = await glue_job_run.fetch_result()
28+
assert result == "SUCCEEDED"
29+
30+
31+
def test_wait_for_completion(aws_credentials, glue_job_client):
32+
with mock_glue():
33+
glue_job_client.create_job(
34+
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={}
35+
)
36+
job_run_id = glue_job_client.start_job_run(
37+
JobName="test_job_name",
38+
Arguments={},
39+
)["JobRunId"]
40+
41+
glue_job_run = GlueJobRun(
42+
job_name="test_job_name",
43+
job_id=job_run_id,
44+
job_watch_poll_interval=0.1,
45+
client=glue_job_client,
46+
)
47+
48+
glue_job_client.get_job_run = MagicMock(
49+
side_effect=[
50+
{
51+
"JobRun": {
52+
"JobName": "test_job_name",
53+
"JobRunState": "RUNNING",
54+
}
55+
},
56+
{
57+
"JobRun": {
58+
"JobName": "test_job_name",
59+
"JobRunState": "SUCCEEDED",
60+
}
61+
},
62+
]
63+
)
64+
glue_job_run.wait_for_completion()
65+
66+
67+
def test_wait_for_completion_fail(aws_credentials, glue_job_client):
68+
with mock_glue():
69+
glue_job_client.create_job(
70+
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={}
71+
)
72+
job_run_id = glue_job_client.start_job_run(
73+
JobName="test_job_name",
74+
Arguments={},
75+
)["JobRunId"]
76+
glue_job_client.get_job_run = MagicMock(
77+
side_effect=[
78+
{
79+
"JobRun": {
80+
"JobName": "test_job_name",
81+
"JobRunState": "FAILED",
82+
"ErrorMessage": "err",
83+
}
84+
},
85+
]
86+
)
87+
88+
glue_job_run = GlueJobRun(
89+
job_name="test_job_name", job_id=job_run_id, client=glue_job_client
90+
)
91+
with pytest.raises(RuntimeError):
92+
glue_job_run.wait_for_completion()
93+
94+
95+
def test__get_job_run(aws_credentials, glue_job_client):
96+
with mock_glue():
97+
glue_job_client.create_job(
98+
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={}
99+
)
100+
job_run_id = glue_job_client.start_job_run(
101+
JobName="test_job_name",
102+
Arguments={},
103+
)["JobRunId"]
104+
105+
glue_job_run = GlueJobRun(
106+
job_name="test_job_name", job_id=job_run_id, client=glue_job_client
107+
)
108+
response = glue_job_run._get_job_run()
109+
assert response["JobRun"]["JobRunState"] == "SUCCEEDED"
110+
111+
112+
async def test_trigger(aws_credentials, glue_job_client):
113+
glue_job_client.create_job(
114+
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={}
115+
)
116+
glue_job = GlueJobBlock(
117+
job_name="test_job_name",
118+
arguments={"arg1": "value1"},
119+
aws_credential=aws_credentials,
120+
)
121+
glue_job._get_client = MagicMock(side_effect=[glue_job_client])
122+
glue_job._start_job = MagicMock(side_effect=["test_job_id"])
123+
glue_job_run = await glue_job.trigger()
124+
assert isinstance(glue_job_run, GlueJobRun)
125+
126+
127+
def test_start_job(aws_credentials, glue_job_client):
128+
with mock_glue():
129+
glue_job_client.create_job(
130+
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={}
131+
)
132+
glue_job = GlueJobBlock(job_name="test_job_name", arguments={"arg1": "value1"})
133+
134+
glue_job_client.start_job_run = MagicMock(
135+
side_effect=[{"JobRunId": "test_job_run_id"}]
136+
)
137+
job_run_id = glue_job._start_job(glue_job_client)
138+
assert job_run_id == "test_job_run_id"
139+
140+
141+
def test_start_job_fail_because_not_exist_job(aws_credentials, glue_job_client):
142+
with mock_glue():
143+
glue_job = GlueJobBlock(job_name="test_job_name", arguments={"arg1": "value1"})
144+
with pytest.raises(RuntimeError):
145+
glue_job._start_job(glue_job_client)
146+
147+
148+
def test_get_client(aws_credentials):
149+
with mock_glue():
150+
glue_job_run = GlueJobBlock(
151+
job_name="test_job_name", aws_credentials=aws_credentials
152+
)
153+
client = glue_job_run._get_client()
154+
assert hasattr(client, "get_job_run")

0 commit comments

Comments
 (0)