-
Notifications
You must be signed in to change notification settings - Fork 6.6k
feat(dataproc): Add Spark Job to Cluster sample (#13459) #13469
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* Create submit_spark_job_to_driver_node_group_cluster.py * add a system test, copied from instantiate_line_workflow, create cluster tests * use create cluster sample to create cluster * black, isort * ensure job check is specified in output * create node cluster manually --------- Co-authored-by: Katie McLaughlin <katie@glasnt.com>
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
Here is the summary of changes. You are about to add 1 region tag.
This comment is generated by snippet-bot.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @glasnt, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces a new Python sample and its accompanying integration test for Google Cloud Dataproc. The primary purpose is to demonstrate how to submit a Spark job to a Dataproc cluster that utilizes a driver node group, including the configuration of driver resources and the retrieval of job output.
Highlights
- New Sample for Dataproc Spark Jobs: Adds
dataproc/snippets/submit_spark_job_to_driver_node_group_cluster.py
, a new Python sample demonstrating how to submit a Spark job to a Dataproc cluster configured with a driver node group. - Driver Node Group Configuration: The new sample showcases how to specify
driver_scheduling_config
(memory and vcores) when submitting a Spark job to a Dataproc cluster with a dedicated driver node group. - Job Output Retrieval: The sample includes logic to retrieve and print the Spark job's output from the Cloud Storage bucket allocated to the job, using the
driver_output_resource_uri
. - Integration Test: A corresponding integration test,
dataproc/snippets/submit_spark_job_to_driver_node_group_cluster_test.py
, is added to verify the sample's functionality, including the creation of a driver node group enabled cluster viagcloud
command.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
The pull request introduces a new sample for submitting a Spark job to a Dataproc cluster with a driver node group, along with a corresponding test. The main script has a brittle method for retrieving job output from GCS, and the test file has a critical issue where the cluster teardown logic is never executed, which would lead to resource leaks. There's also a security best practice violation regarding the use of shell=True
in a subprocess call.
|
||
|
||
@backoff.on_exception(backoff.expo, (Exception), max_tries=5) | ||
def teardown(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The teardown
function is defined but is never called by pytest
. This will lead to leaked Dataproc clusters and unnecessary cloud costs. The comment on line 88 indicates that teardown is expected, making this a critical bug. Use a pytest
fixture to manage the cluster's lifecycle. The fixture can create the cluster, yield
its name to the test, and then ensure the cluster is deleted after the test completes.
import pytest
@pytest.fixture(scope="module")
def cluster_resource():
# Setup: create the cluster
# ...
yield CLUSTER_NAME
# Teardown: delete the cluster (code from your teardown function)
# ...
def test_workflows(capsys, cluster_resource):
# The test now receives the cluster name from the fixture
submit_spark_job_to_driver_node_group_cluster.submit_job(
PROJECT_ID, REGION, cluster_resource
)
# ... assertions ...
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) | ||
if not matches: | ||
print(f"Error: Could not parse driver output URI: {response.driver_output_resource_uri}") | ||
raise ValueError | ||
|
||
output = ( | ||
storage.Client() | ||
.get_bucket(matches.group(1)) | ||
.blob(f"{matches.group(2)}.000000000") | ||
.download_as_bytes() | ||
.decode("utf-8") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method of retrieving job output is brittle. It relies on a regular expression to parse the GCS URI and then assumes a hardcoded .000000000
suffix for the output file. This can easily break if Dataproc changes its output naming conventions or if a job produces multiple output files. A more robust approach is to treat the GCS path as a prefix and list the blobs within it to discover the actual output file(s). This ensures the code will work correctly even if the naming scheme changes.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) | |
if not matches: | |
print(f"Error: Could not parse driver output URI: {response.driver_output_resource_uri}") | |
raise ValueError | |
output = ( | |
storage.Client() | |
.get_bucket(matches.group(1)) | |
.blob(f"{matches.group(2)}.000000000") | |
.download_as_bytes() | |
.decode("utf-8") | |
) | |
matches = re.match("gs://([^/]+)/(.+)", response.driver_output_resource_uri) | |
if not matches: | |
print(f"Error: Could not parse driver output URI: {response.driver_output_resource_uri}") | |
raise ValueError(f"Could not parse driver output URI: {response.driver_output_resource_uri}") | |
bucket_name, blob_prefix = matches.groups() | |
storage_client = storage.Client() | |
bucket = storage_client.bucket(bucket_name) | |
# List blobs to find the output file, which is more robust than assuming a filename. | |
output_blob = next( | |
(b for b in bucket.list_blobs(prefix=blob_prefix) if not b.name.endswith("/")), | |
None, | |
) | |
if not output_blob: | |
raise ValueError(f"No output found at {response.driver_output_resource_uri}") | |
output = output_blob.download_as_bytes().decode("utf-8") |
output = subprocess.run( | ||
command, | ||
capture_output=True, | ||
shell=True, | ||
check=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using shell=True
with subprocess.run
is a security risk as it can lead to shell injection vulnerabilities if any part of the command is crafted from external input. While the inputs here are controlled, it is a security best practice to avoid shell=True
. Achieve the same result more safely by passing the command as a list of arguments.
output = subprocess.run(
command.split(),
capture_output=True,
check=True,
)
This PR was already approved in #13459 merging to unblock |
Description
Fixes #13459 #13423
This PR merges the approved changes into main.