feat(dataproc): Add Spark Job to Cluster sample (#13459) by glasnt · Pull Request #13469 · GoogleCloudPlatform/python-docs-samples · GitHub | Latest TMZ Celebrity News & Gossip | Watch TMZ Live
Skip to content

feat(dataproc): Add Spark Job to Cluster sample (#13459) #13469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 29, 2025
Merged

feat(dataproc): Add Spark Job to Cluster sample (#13459) #13469

merged 1 commit into from
Jun 29, 2025

Conversation

glasnt
Copy link
Contributor

@glasnt glasnt commented Jun 29, 2025

Description

Fixes #13459 #13423

This PR merges the approved changes into main.

  • Please merge this PR for me once it is approved

* Create submit_spark_job_to_driver_node_group_cluster.py

* add a system test, copied from instantiate_line_workflow, create cluster tests

* use create cluster sample to create cluster

* black, isort

* ensure job check is specified in output

* create node cluster manually

---------

Co-authored-by: Katie McLaughlin <katie@glasnt.com>
Copy link

google-cla bot commented Jun 29, 2025

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

@product-auto-label product-auto-label bot added api: dataproc Issues related to the Dataproc API. samples Issues that are directly related to samples. labels Jun 29, 2025
@glasnt glasnt marked this pull request as ready for review June 29, 2025 21:23
@glasnt glasnt requested review from a team as code owners June 29, 2025 21:23
Copy link

snippet-bot bot commented Jun 29, 2025

Here is the summary of changes.

You are about to add 1 region tag.

This comment is generated by snippet-bot.
If you find problems with this result, please file an issue at:
https://github.com/googleapis/repo-automation-bots/issues.
To update this comment, add snippet-bot:force-run label or use the checkbox below:

  • Refresh this comment

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @glasnt, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new Python sample and its accompanying integration test for Google Cloud Dataproc. The primary purpose is to demonstrate how to submit a Spark job to a Dataproc cluster that utilizes a driver node group, including the configuration of driver resources and the retrieval of job output.

Highlights

  • New Sample for Dataproc Spark Jobs: Adds dataproc/snippets/submit_spark_job_to_driver_node_group_cluster.py, a new Python sample demonstrating how to submit a Spark job to a Dataproc cluster configured with a driver node group.
  • Driver Node Group Configuration: The new sample showcases how to specify driver_scheduling_config (memory and vcores) when submitting a Spark job to a Dataproc cluster with a dedicated driver node group.
  • Job Output Retrieval: The sample includes logic to retrieve and print the Spark job's output from the Cloud Storage bucket allocated to the job, using the driver_output_resource_uri.
  • Integration Test: A corresponding integration test, dataproc/snippets/submit_spark_job_to_driver_node_group_cluster_test.py, is added to verify the sample's functionality, including the creation of a driver node group enabled cluster via gcloud command.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces a new sample for submitting a Spark job to a Dataproc cluster with a driver node group, along with a corresponding test. The main script has a brittle method for retrieving job output from GCS, and the test file has a critical issue where the cluster teardown logic is never executed, which would lead to resource leaks. There's also a security best practice violation regarding the use of shell=True in a subprocess call.



@backoff.on_exception(backoff.expo, (Exception), max_tries=5)
def teardown():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The teardown function is defined but is never called by pytest. This will lead to leaked Dataproc clusters and unnecessary cloud costs. The comment on line 88 indicates that teardown is expected, making this a critical bug. Use a pytest fixture to manage the cluster's lifecycle. The fixture can create the cluster, yield its name to the test, and then ensure the cluster is deleted after the test completes.

import pytest

@pytest.fixture(scope="module")
def cluster_resource():
    # Setup: create the cluster
    # ...
    yield CLUSTER_NAME
    # Teardown: delete the cluster (code from your teardown function)
    # ...

def test_workflows(capsys, cluster_resource):
    # The test now receives the cluster name from the fixture
    submit_spark_job_to_driver_node_group_cluster.submit_job(
        PROJECT_ID, REGION, cluster_resource
    )
    # ... assertions ...

Comment on lines +72 to +83
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
if not matches:
print(f"Error: Could not parse driver output URI: {response.driver_output_resource_uri}")
raise ValueError

output = (
storage.Client()
.get_bucket(matches.group(1))
.blob(f"{matches.group(2)}.000000000")
.download_as_bytes()
.decode("utf-8")
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The method of retrieving job output is brittle. It relies on a regular expression to parse the GCS URI and then assumes a hardcoded .000000000 suffix for the output file. This can easily break if Dataproc changes its output naming conventions or if a job produces multiple output files. A more robust approach is to treat the GCS path as a prefix and list the blobs within it to discover the actual output file(s). This ensures the code will work correctly even if the naming scheme changes.

Suggested change
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
if not matches:
print(f"Error: Could not parse driver output URI: {response.driver_output_resource_uri}")
raise ValueError
output = (
storage.Client()
.get_bucket(matches.group(1))
.blob(f"{matches.group(2)}.000000000")
.download_as_bytes()
.decode("utf-8")
)
matches = re.match("gs://([^/]+)/(.+)", response.driver_output_resource_uri)
if not matches:
print(f"Error: Could not parse driver output URI: {response.driver_output_resource_uri}")
raise ValueError(f"Could not parse driver output URI: {response.driver_output_resource_uri}")
bucket_name, blob_prefix = matches.groups()
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
# List blobs to find the output file, which is more robust than assuming a filename.
output_blob = next(
(b for b in bucket.list_blobs(prefix=blob_prefix) if not b.name.endswith("/")),
None,
)
if not output_blob:
raise ValueError(f"No output found at {response.driver_output_resource_uri}")
output = output_blob.download_as_bytes().decode("utf-8")

Comment on lines +72 to +76
output = subprocess.run(
command,
capture_output=True,
shell=True,
check=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using shell=True with subprocess.run is a security risk as it can lead to shell injection vulnerabilities if any part of the command is crafted from external input. While the inputs here are controlled, it is a security best practice to avoid shell=True. Achieve the same result more safely by passing the command as a list of arguments.

    output = subprocess.run(
        command.split(),
        capture_output=True,
        check=True,
    )

@glasnt
Copy link
Contributor Author

glasnt commented Jun 29, 2025

This PR was already approved in #13459

merging to unblock

@glasnt glasnt merged commit 66bd8c4 into main Jun 29, 2025
14 checks passed
@glasnt glasnt deleted the aman branch June 29, 2025 22:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: dataproc Issues related to the Dataproc API. samples Issues that are directly related to samples.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants

TMZ Celebrity News – Breaking Stories, Videos & Gossip

Looking for the latest TMZ celebrity news? You've come to the right place. From shocking Hollywood scandals to exclusive videos, TMZ delivers it all in real time.

Whether it’s a red carpet slip-up, a viral paparazzi moment, or a legal drama involving your favorite stars, TMZ news is always first to break the story. Stay in the loop with daily updates, insider tips, and jaw-dropping photos.

🎥 Watch TMZ Live

TMZ Live brings you daily celebrity news and interviews straight from the TMZ newsroom. Don’t miss a beat—watch now and see what’s trending in Hollywood.