docs(sample): Add samples for topic and subscription SMTs by kamalaboulhosn · Pull Request #1386 · googleapis/python-pubsub · GitHub | Latest TMZ Celebrity News & Gossip | Watch TMZ Live
Skip to content

docs(sample): Add samples for topic and subscription SMTs #1386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b13d989
samples: schema evolution
kamalaboulhosn Jan 24, 2023
a8dcf8f
Add command-line commands
kamalaboulhosn Jan 24, 2023
19890f4
Merge remote-tracking branch 'upstream/main'
kamalaboulhosn Feb 24, 2023
2063ec2
Fix tag for rollback
kamalaboulhosn Feb 24, 2023
b9c7db5
Make formatting fixes
kamalaboulhosn Mar 10, 2023
2cf5cca
Formatting fixes
kamalaboulhosn Mar 10, 2023
57d6c83
Merge branch 'main' into master
kamalaboulhosn Mar 11, 2023
e67ab61
Fix exceptions
kamalaboulhosn Mar 11, 2023
7a2bf09
Merge branch 'master' of https://github.com/kamalaboulhosn/python-pubsub
kamalaboulhosn Mar 11, 2023
aa4b1ce
fix: Set x-goog-request-params for streaming pull request
kamalaboulhosn Mar 14, 2023
ff9391b
Merge remote-tracking branch 'upstream/main'
kamalaboulhosn Mar 14, 2023
74972aa
Set blunderbuss config to auto-assign issues and PRs
kamalaboulhosn Oct 20, 2023
f280caa
Merge branch 'googleapis:main' into master
kamalaboulhosn Oct 20, 2023
ab6eb34
Merge branch 'main' into master
kamalaboulhosn Oct 23, 2023
1c2aed2
Merge branch 'googleapis:main' into master
kamalaboulhosn Dec 14, 2023
465051a
fix: Swap writer and reader schema to correct places in sample
kamalaboulhosn Dec 14, 2023
b65ea0b
Merge branch 'main' into master
kamalaboulhosn Dec 15, 2023
f7931b4
Merge branch 'googleapis:main' into master
kamalaboulhosn Apr 2, 2024
7ab0dea
Merge branch 'googleapis:main' into master
kamalaboulhosn Apr 17, 2024
6858957
Merge branch 'googleapis:main' into master
kamalaboulhosn Mar 25, 2025
e02dc71
docs(sample): Add samples for topic and subscription SMTs
kamalaboulhosn Apr 22, 2025
30b17d8
Merge branch 'main' into master
mukund-ananthu Apr 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,39 @@ def create_topic_with_confluent_cloud_ingestion(
# [END pubsub_create_topic_with_confluent_cloud_ingestion]


def create_topic_with_smt(
project_id: str,
topic_id: str,
) -> None:
"""Create a new Pub/Sub topic with a UDF SMT."""
# [START pubsub_create_topic_with_smt]
from google.cloud import pubsub_v1
from google.pubsub_v1.types import JavaScriptUDF, MessageTransform, Topic

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

code = """function redactSSN(message, metadata) {
const data = JSON.parse(message.data);
delete data['ssn'];
message.data = JSON.stringify(data);
return message;
}"""
udf = JavaScriptUDF(code=code, function_name="redactSSN")
transforms = [MessageTransform(javascript_udf=udf)]

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(name=topic_path, message_transforms=transforms)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with SMT")
# [END pubsub_create_topic_with_smt]


def update_topic_type(
project_id: str,
topic_id: str,
Expand Down Expand Up @@ -888,6 +921,11 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
"gcp_service_account"
)

create_parser = subparsers.add_parser(
"create_smt", help=create_topic_with_smt.__doc__
)
create_parser.add_argument("topic_id")

update_topic_type_parser = subparsers.add_parser(
"update_kinesis_ingestion", help=update_topic_type.__doc__
)
Expand Down Expand Up @@ -1007,6 +1045,11 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
args.identity_pool_id,
args.gcp_service_account,
)
elif args.command == "create_smt":
create_topic_with_smt(
args.project_id,
args.topic_id,
)
elif args.command == "update_kinesis_ingestion":
update_topic_type(
args.project_id,
Expand Down
20 changes: 20 additions & 0 deletions samples/snippets/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,26 @@ def test_create_topic_with_confluent_cloud_ingestion(
publisher_client.delete_topic(request={"topic": topic_path})


def test_create_with_smt(
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
) -> None:
# The scope of `topic_path` is limited to this function.
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID)

try:
publisher_client.delete_topic(request={"topic": topic_path})
except NotFound:
pass

publisher.create_topic_with_smt(PROJECT_ID, TOPIC_ID)

out, _ = capsys.readouterr()
assert f"Created topic: {topic_path} with SMT" in out

# Clean up resource created for the test.
publisher_client.delete_topic(request={"topic": topic_path})


def test_update_topic_type(
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
) -> None:
Expand Down
49 changes: 49 additions & 0 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,45 @@ def create_cloudstorage_subscription(
# [END pubsub_create_cloud_storage_subscription]


def create_subscription_with_smt(
project_id: str, topic_id: str, subscription_id: str
) -> None:
"""Create a subscription with a UDF SMT."""
# [START pubsub_create_subscription_with_smt]
from google.cloud import pubsub_v1
from google.pubsub_v1.types import JavaScriptUDF, MessageTransform

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

code = """function redactSSN(message, metadata) {
const data = JSON.parse(message.data);
delete data['ssn'];
message.data = JSON.stringify(data);
return message;
}"""
udf = JavaScriptUDF(code=code, function_name="redactSSN")
transforms = [MessageTransform(javascript_udf=udf)]

with subscriber:
subscription = subscriber.create_subscription(
request={
"name": subscription_path,
"topic": topic_path,
"message_transforms": transforms,
}
)
print(f"Created subscription with SMT: {subscription}")
# [END pubsub_create_subscription_with_smt]


def delete_subscription(project_id: str, subscription_id: str) -> None:
"""Deletes an existing Pub/Sub topic."""
# [START pubsub_delete_subscription]
Expand Down Expand Up @@ -1310,6 +1349,12 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
create_cloudstorage_subscription_parser.add_argument("subscription_id")
create_cloudstorage_subscription_parser.add_argument("bucket")

create_subscription_with_smt_parser = subparsers.add_parser(
"create-with-smt", help=create_subscription_with_smt.__doc__
)
create_subscription_with_smt_parser.add_argument("topic_id")
create_subscription_with_smt_parser.add_argument("subscription_id")

delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__)
delete_parser.add_argument("subscription_id")

Expand Down Expand Up @@ -1471,6 +1516,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
create_cloudstorage_subscription(
args.project_id, args.topic_id, args.subscription_id, args.bucket
)
elif args.command == "create-with-smt":
create_subscription_with_smt(
args.project_id, args.topic_id, args.subscription_id
)

elif args.command == "delete":
delete_subscription(args.project_id, args.subscription_id)
Expand Down
31 changes: 31 additions & 0 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,37 @@ def test_create_push_subscription(
subscriber_client.delete_subscription(request={"subscription": subscription_path})


def test_create_subscription_with_smt(
subscriber_client: pubsub_v1.SubscriberClient,
topic: str,
capsys: CaptureFixture[str],
) -> None:
subscription_for_create_name = (
f"subscription-test-subscription-for-create-with-smt-{PY_VERSION}-{UUID}"
)

subscription_path = subscriber_client.subscription_path(
PROJECT_ID, subscription_for_create_name
)

try:
subscriber_client.delete_subscription(
request={"subscription": subscription_path}
)
except NotFound:
pass

subscriber.create_subscription_with_smt(
PROJECT_ID, TOPIC, subscription_for_create_name
)

out, _ = capsys.readouterr()
assert f"{subscription_for_create_name}" in out

# Clean up.
subscriber_client.delete_subscription(request={"subscription": subscription_path})


def test_update_push_subscription(
subscriber_client: pubsub_v1.SubscriberClient,
topic: str,
Expand Down

TMZ Celebrity News – Breaking Stories, Videos & Gossip

Looking for the latest TMZ celebrity news? You've come to the right place. From shocking Hollywood scandals to exclusive videos, TMZ delivers it all in real time.

Whether it’s a red carpet slip-up, a viral paparazzi moment, or a legal drama involving your favorite stars, TMZ news is always first to break the story. Stay in the loop with daily updates, insider tips, and jaw-dropping photos.

🎥 Watch TMZ Live

TMZ Live brings you daily celebrity news and interviews straight from the TMZ newsroom. Don’t miss a beat—watch now and see what’s trending in Hollywood.