feat: Add support for multiplexed sessions by rahul2393 · Pull Request #1381 · googleapis/python-spanner · GitHub | Latest TMZ Celebrity News & Gossip | Watch TMZ Live
Skip to content

feat: Add support for multiplexed sessions #1381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
on:
push:
branches:
- main
pull_request:
name: Run Spanner integration tests against emulator with multiplexed sessions
jobs:
system-tests:
runs-on: ubuntu-latest

services:
emulator:
image: gcr.io/cloud-spanner-emulator/emulator:latest
ports:
- 9010:9010
- 9020:9020

steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: 3.8
- name: Install nox
run: python -m pip install nox
- name: Run system tests
run: nox -s system
env:
SPANNER_EMULATOR_HOST: localhost:9010
GOOGLE_CLOUD_PROJECT: emulator-test-project
GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE: true
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: true
17 changes: 17 additions & 0 deletions .kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Format: //devtools/kokoro/config/proto/build.proto

# Only run a subset of all nox sessions
env_vars: {
key: "NOX_SESSION"
value: "unit-3.8 unit-3.12 system-3.8"
}

env_vars: {
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
value: "true"
}

env_vars: {
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"
value: "true"
}
61 changes: 49 additions & 12 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
from google.cloud.spanner_v1.pool import BurstyPool
from google.cloud.spanner_v1.pool import SessionCheckout
from google.cloud.spanner_v1.session import Session
from google.cloud.spanner_v1.session_options import SessionOptions
from google.cloud.spanner_v1.database_sessions_manager import DatabaseSessionsManager
from google.cloud.spanner_v1.snapshot import _restart_on_unavailable
from google.cloud.spanner_v1.snapshot import Snapshot
from google.cloud.spanner_v1.streamed import StreamedResultSet
Expand Down Expand Up @@ -200,6 +202,9 @@ def __init__(
self._pool = pool
pool.bind(self)

self.session_options = SessionOptions()
self._sessions_manager = DatabaseSessionsManager(self, pool)

@classmethod
def from_pb(cls, database_pb, instance, pool=None):
"""Creates an instance of this class from a protobuf.
Expand Down Expand Up @@ -759,7 +764,12 @@ def execute_pdml():
"CloudSpanner.Database.execute_partitioned_pdml",
observability_options=self.observability_options,
) as span, MetricsCapture():
with SessionCheckout(self._pool) as session:
from google.cloud.spanner_v1.session_options import TransactionType

session = self._sessions_manager.get_session(
TransactionType.PARTITIONED
)
try:
add_span_event(span, "Starting BeginTransaction")
txn = api.begin_transaction(
session=session.name,
Expand Down Expand Up @@ -802,6 +812,8 @@ def execute_pdml():
list(result_set) # consume all partials

return result_set.stats.row_count_lower_bound
finally:
self._sessions_manager.put_session(session)

return _retry_on_aborted(execute_pdml, DEFAULT_RETRY_BACKOFF)()

Expand Down Expand Up @@ -1240,6 +1252,15 @@ def observability_options(self):
opts["db_name"] = self.name
return opts

@property
def sessions_manager(self):
"""Returns the database sessions manager.

:rtype: :class:`~google.cloud.spanner_v1.database_sessions_manager.DatabaseSessionsManager`
:returns: The sessions manager for this database.
"""
return self._sessions_manager


class BatchCheckout(object):
"""Context manager for using a batch from a database.
Expand Down Expand Up @@ -1290,8 +1311,12 @@ def __init__(

def __enter__(self):
"""Begin ``with`` block."""
from google.cloud.spanner_v1.session_options import TransactionType

current_span = get_current_span()
session = self._session = self._database._pool.get()
session = self._session = self._database.sessions_manager.get_session(
TransactionType.READ_WRITE
)
add_span_event(current_span, "Using session", {"id": session.session_id})
batch = self._batch = Batch(session)
if self._request_options.transaction_tag:
Expand All @@ -1316,7 +1341,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
"CommitStats: {}".format(self._batch.commit_stats),
extra={"commit_stats": self._batch.commit_stats},
)
self._database._pool.put(self._session)
self._database.sessions_manager.put_session(self._session)
current_span = get_current_span()
add_span_event(
current_span,
Expand Down Expand Up @@ -1344,7 +1369,11 @@ def __init__(self, database):

def __enter__(self):
"""Begin ``with`` block."""
session = self._session = self._database._pool.get()
from google.cloud.spanner_v1.session_options import TransactionType

session = self._session = self._database.sessions_manager.get_session(
TransactionType.READ_WRITE
)
return MutationGroups(session)

def __exit__(self, exc_type, exc_val, exc_tb):
Expand All @@ -1355,7 +1384,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
if not self._session.exists():
self._session = self._database._pool._new_session()
self._session.create()
self._database._pool.put(self._session)
self._database.sessions_manager.put_session(self._session)


class SnapshotCheckout(object):
Expand Down Expand Up @@ -1383,7 +1412,11 @@ def __init__(self, database, **kw):

def __enter__(self):
"""Begin ``with`` block."""
session = self._session = self._database._pool.get()
from google.cloud.spanner_v1.session_options import TransactionType

session = self._session = self._database.sessions_manager.get_session(
TransactionType.READ_ONLY
)
return Snapshot(session, **self._kw)

def __exit__(self, exc_type, exc_val, exc_tb):
Expand All @@ -1394,7 +1427,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
if not self._session.exists():
self._session = self._database._pool._new_session()
self._session.create()
self._database._pool.put(self._session)
self._database.sessions_manager.put_session(self._session)


class BatchSnapshot(object):
Expand Down Expand Up @@ -1474,10 +1507,13 @@ def _get_session(self):
all partitions have been processed.
"""
if self._session is None:
session = self._session = self._database.session()
if self._session_id is None:
session.create()
else:
from google.cloud.spanner_v1.session_options import TransactionType

# Use sessions manager for partition operations
session = self._session = self._database.sessions_manager.get_session(
TransactionType.PARTITIONED
)
if self._session_id is not None:
session._session_id = self._session_id
return self._session

Expand Down Expand Up @@ -1888,7 +1924,8 @@ def close(self):
from all the partitions.
"""
if self._session is not None:
self._session.delete()
if not self._session.is_multiplexed:
self._session.delete()


def _check_ddl_statements(value):
Expand Down
Loading

TMZ Celebrity News – Breaking Stories, Videos & Gossip

Looking for the latest TMZ celebrity news? You've come to the right place. From shocking Hollywood scandals to exclusive videos, TMZ delivers it all in real time.

Whether it’s a red carpet slip-up, a viral paparazzi moment, or a legal drama involving your favorite stars, TMZ news is always first to break the story. Stay in the loop with daily updates, insider tips, and jaw-dropping photos.

🎥 Watch TMZ Live

TMZ Live brings you daily celebrity news and interviews straight from the TMZ newsroom. Don’t miss a beat—watch now and see what’s trending in Hollywood.