chore(x-goog-spanner-request-id): more updates for batch_write + mock… · googleapis/python-spanner@b433281 · GitHub | Latest TMZ Celebrity News & Gossip | Watch TMZ Live
Skip to content

Commit b433281

Browse files
odeke-emolavloite
andauthored
chore(x-goog-spanner-request-id): more updates for batch_write + mockserver tests (#1375)
* chore(x-goog-spanner-request-id): more updates for batch_write + mockserver tests This change plumbs in some x-goog-spanner-request-id updates for batch_write and some tests too. Updates #1261 * Use correct nth_request in pool.py nox -s blacken to format * Add add_select1_result to mockserver.test_snapshot_read_concurrent * Make _check_unavailable always pass for INTERNAL errors * Fix mismatched properties for checking grpc exceptions * test: fix concurrent queries test * test: unary RPCs should be retried on UNAVAILABLE * Blacken * Revert manual batch_create_session retry + TODO on mockserver tests * Remove unused internal_status --------- Co-authored-by: Knut Olav Løite <koloite@gmail.com>
1 parent d532d57 commit b433281

File tree

5 files changed

+401
-46
lines changed

5 files changed

+401
-46
lines changed

google/cloud/spanner_v1/testing/interceptors.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@ def reset(self):
7272

7373

7474
class XGoogRequestIDHeaderInterceptor(ClientInterceptor):
75-
# TODO:(@odeke-em): delete this guard when PR #1367 is merged.
76-
X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED = True
77-
7875
def __init__(self):
7976
self._unary_req_segments = []
8077
self._stream_req_segments = []
@@ -88,24 +85,23 @@ def intercept(self, method, request_or_iterator, call_details):
8885
x_goog_request_id = value
8986
break
9087

91-
if self.X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED and not x_goog_request_id:
88+
if not x_goog_request_id:
9289
raise Exception(
9390
f"Missing {X_GOOG_REQUEST_ID} header in {call_details.method}"
9491
)
9592

9693
response_or_iterator = method(request_or_iterator, call_details)
9794
streaming = getattr(response_or_iterator, "__iter__", None) is not None
9895

99-
if self.X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED:
100-
with self.__lock:
101-
if streaming:
102-
self._stream_req_segments.append(
103-
(call_details.method, parse_request_id(x_goog_request_id))
104-
)
105-
else:
106-
self._unary_req_segments.append(
107-
(call_details.method, parse_request_id(x_goog_request_id))
108-
)
96+
with self.__lock:
97+
if streaming:
98+
self._stream_req_segments.append(
99+
(call_details.method, parse_request_id(x_goog_request_id))
100+
)
101+
else:
102+
self._unary_req_segments.append(
103+
(call_details.method, parse_request_id(x_goog_request_id))
104+
)
109105

110106
return response_or_iterator
111107

Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
# Copyright 2025 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import random
16+
import threading
17+
18+
from google.cloud.spanner_v1 import (
19+
BatchCreateSessionsRequest,
20+
BeginTransactionRequest,
21+
ExecuteSqlRequest,
22+
)
23+
from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID
24+
from google.cloud.spanner_v1.testing.mock_spanner import SpannerServicer
25+
from tests.mockserver_tests.mock_server_test_base import (
26+
MockServerTestBase,
27+
add_select1_result,
28+
aborted_status,
29+
add_error,
30+
unavailable_status,
31+
)
32+
33+
34+
class TestRequestIDHeader(MockServerTestBase):
35+
def tearDown(self):
36+
self.database._x_goog_request_id_interceptor.reset()
37+
38+
def test_snapshot_execute_sql(self):
39+
add_select1_result()
40+
if not getattr(self.database, "_interceptors", None):
41+
self.database._interceptors = MockServerTestBase._interceptors
42+
with self.database.snapshot() as snapshot:
43+
results = snapshot.execute_sql("select 1")
44+
result_list = []
45+
for row in results:
46+
result_list.append(row)
47+
self.assertEqual(1, row[0])
48+
self.assertEqual(1, len(result_list))
49+
50+
requests = self.spanner_service.requests
51+
self.assertEqual(2, len(requests), msg=requests)
52+
self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest))
53+
self.assertTrue(isinstance(requests[1], ExecuteSqlRequest))
54+
55+
NTH_CLIENT = self.database._nth_client_id
56+
CHANNEL_ID = self.database._channel_id
57+
# Now ensure monotonicity of the received request-id segments.
58+
got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers()
59+
want_unary_segments = [
60+
(
61+
"/google.spanner.v1.Spanner/BatchCreateSessions",
62+
(1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1),
63+
)
64+
]
65+
want_stream_segments = [
66+
(
67+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
68+
(1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1),
69+
)
70+
]
71+
72+
assert got_unary_segments == want_unary_segments
73+
assert got_stream_segments == want_stream_segments
74+
75+
def test_snapshot_read_concurrent(self):
76+
add_select1_result()
77+
db = self.database
78+
# Trigger BatchCreateSessions first.
79+
with db.snapshot() as snapshot:
80+
rows = snapshot.execute_sql("select 1")
81+
for row in rows:
82+
_ = row
83+
84+
# The other requests can then proceed.
85+
def select1():
86+
with db.snapshot() as snapshot:
87+
rows = snapshot.execute_sql("select 1")
88+
res_list = []
89+
for row in rows:
90+
self.assertEqual(1, row[0])
91+
res_list.append(row)
92+
self.assertEqual(1, len(res_list))
93+
94+
n = 10
95+
threads = []
96+
for i in range(n):
97+
th = threading.Thread(target=select1, name=f"snapshot-select1-{i}")
98+
threads.append(th)
99+
th.start()
100+
101+
random.shuffle(threads)
102+
for thread in threads:
103+
thread.join()
104+
105+
requests = self.spanner_service.requests
106+
# We expect 2 + n requests, because:
107+
# 1. The initial query triggers one BatchCreateSessions call + one ExecuteStreamingSql call.
108+
# 2. Each following query triggers one ExecuteStreamingSql call.
109+
self.assertEqual(2 + n, len(requests), msg=requests)
110+
111+
client_id = db._nth_client_id
112+
channel_id = db._channel_id
113+
got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers()
114+
115+
want_unary_segments = [
116+
(
117+
"/google.spanner.v1.Spanner/BatchCreateSessions",
118+
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 1, 1),
119+
),
120+
]
121+
assert got_unary_segments == want_unary_segments
122+
123+
want_stream_segments = [
124+
(
125+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
126+
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 2, 1),
127+
),
128+
(
129+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
130+
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 3, 1),
131+
),
132+
(
133+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
134+
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 4, 1),
135+
),
136+
(
137+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
138+
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 5, 1),
139+
),
140+
(
141+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
142+
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 6, 1),
143+
),
144+
(
145+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
146+
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 7, 1),
147+
),
148+
(
149+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
150+
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 8, 1),
151+
),
152+
(
153+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
154+
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 9, 1),
155+
),
156+
(
157+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
158+
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 10, 1),
159+
),
160+
(
161+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
162+
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 11, 1),
163+
),
164+
(
165+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
166+
(1, REQ_RAND_PROCESS_ID, client_id, channel_id, 12, 1),
167+
),
168+
]
169+
assert got_stream_segments == want_stream_segments
170+
171+
def test_database_run_in_transaction_retries_on_abort(self):
172+
counters = dict(aborted=0)
173+
want_failed_attempts = 2
174+
175+
def select_in_txn(txn):
176+
results = txn.execute_sql("select 1")
177+
for row in results:
178+
_ = row
179+
180+
if counters["aborted"] < want_failed_attempts:
181+
counters["aborted"] += 1
182+
add_error(SpannerServicer.Commit.__name__, aborted_status())
183+
184+
add_select1_result()
185+
if not getattr(self.database, "_interceptors", None):
186+
self.database._interceptors = MockServerTestBase._interceptors
187+
188+
self.database.run_in_transaction(select_in_txn)
189+
190+
def test_database_execute_partitioned_dml_request_id(self):
191+
add_select1_result()
192+
if not getattr(self.database, "_interceptors", None):
193+
self.database._interceptors = MockServerTestBase._interceptors
194+
_ = self.database.execute_partitioned_dml("select 1")
195+
196+
requests = self.spanner_service.requests
197+
self.assertEqual(3, len(requests), msg=requests)
198+
self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest))
199+
self.assertTrue(isinstance(requests[1], BeginTransactionRequest))
200+
self.assertTrue(isinstance(requests[2], ExecuteSqlRequest))
201+
202+
# Now ensure monotonicity of the received request-id segments.
203+
got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers()
204+
NTH_CLIENT = self.database._nth_client_id
205+
CHANNEL_ID = self.database._channel_id
206+
want_unary_segments = [
207+
(
208+
"/google.spanner.v1.Spanner/BatchCreateSessions",
209+
(1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1),
210+
),
211+
(
212+
"/google.spanner.v1.Spanner/BeginTransaction",
213+
(1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1),
214+
),
215+
]
216+
want_stream_segments = [
217+
(
218+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
219+
(1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 3, 1),
220+
)
221+
]
222+
223+
assert got_unary_segments == want_unary_segments
224+
assert got_stream_segments == want_stream_segments
225+
226+
def test_unary_retryable_error(self):
227+
add_select1_result()
228+
add_error(SpannerServicer.BatchCreateSessions.__name__, unavailable_status())
229+
230+
if not getattr(self.database, "_interceptors", None):
231+
self.database._interceptors = MockServerTestBase._interceptors
232+
with self.database.snapshot() as snapshot:
233+
results = snapshot.execute_sql("select 1")
234+
result_list = []
235+
for row in results:
236+
result_list.append(row)
237+
self.assertEqual(1, row[0])
238+
self.assertEqual(1, len(result_list))
239+
240+
requests = self.spanner_service.requests
241+
self.assertEqual(3, len(requests), msg=requests)
242+
self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest))
243+
self.assertTrue(isinstance(requests[1], BatchCreateSessionsRequest))
244+
self.assertTrue(isinstance(requests[2], ExecuteSqlRequest))
245+
246+
NTH_CLIENT = self.database._nth_client_id
247+
CHANNEL_ID = self.database._channel_id
248+
# Now ensure monotonicity of the received request-id segments.
249+
got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers()
250+
251+
want_stream_segments = [
252+
(
253+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
254+
(1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1),
255+
)
256+
]
257+
assert got_stream_segments == want_stream_segments
258+
259+
want_unary_segments = [
260+
(
261+
"/google.spanner.v1.Spanner/BatchCreateSessions",
262+
(1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1),
263+
),
264+
(
265+
"/google.spanner.v1.Spanner/BatchCreateSessions",
266+
(1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 2),
267+
),
268+
]
269+
# TODO(@odeke-em): enable this test in the next iteration
270+
# when we've figured out unary retries with UNAVAILABLE.
271+
# See https://github.com/googleapis/python-spanner/issues/1379.
272+
if True:
273+
print(
274+
"TODO(@odeke-em): enable request_id checking when we figure out propagation for unary requests"
275+
)
276+
else:
277+
assert got_unary_segments == want_unary_segments
278+
279+
def test_streaming_retryable_error(self):
280+
add_select1_result()
281+
add_error(SpannerServicer.ExecuteStreamingSql.__name__, unavailable_status())
282+
283+
if not getattr(self.database, "_interceptors", None):
284+
self.database._interceptors = MockServerTestBase._interceptors
285+
with self.database.snapshot() as snapshot:
286+
results = snapshot.execute_sql("select 1")
287+
result_list = []
288+
for row in results:
289+
result_list.append(row)
290+
self.assertEqual(1, row[0])
291+
self.assertEqual(1, len(result_list))
292+
293+
requests = self.spanner_service.requests
294+
self.assertEqual(3, len(requests), msg=requests)
295+
self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest))
296+
self.assertTrue(isinstance(requests[1], ExecuteSqlRequest))
297+
self.assertTrue(isinstance(requests[2], ExecuteSqlRequest))
298+
299+
NTH_CLIENT = self.database._nth_client_id
300+
CHANNEL_ID = self.database._channel_id
301+
# Now ensure monotonicity of the received request-id segments.
302+
got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers()
303+
want_unary_segments = [
304+
(
305+
"/google.spanner.v1.Spanner/BatchCreateSessions",
306+
(1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 1, 1),
307+
),
308+
]
309+
want_stream_segments = [
310+
(
311+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
312+
(1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 1),
313+
),
314+
(
315+
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
316+
(1, REQ_RAND_PROCESS_ID, NTH_CLIENT, CHANNEL_ID, 2, 2),
317+
),
318+
]
319+
320+
assert got_unary_segments == want_unary_segments
321+
assert got_stream_segments == want_stream_segments
322+
323+
def canonicalize_request_id_headers(self):
324+
src = self.database._x_goog_request_id_interceptor
325+
return src._stream_req_segments, src._unary_req_segments

0 commit comments

Comments
 (0)

TMZ Celebrity News – Breaking Stories, Videos & Gossip

Looking for the latest TMZ celebrity news? You've come to the right place. From shocking Hollywood scandals to exclusive videos, TMZ delivers it all in real time.

Whether it’s a red carpet slip-up, a viral paparazzi moment, or a legal drama involving your favorite stars, TMZ news is always first to break the story. Stay in the loop with daily updates, insider tips, and jaw-dropping photos.

🎥 Watch TMZ Live

TMZ Live brings you daily celebrity news and interviews straight from the TMZ newsroom. Don’t miss a beat—watch now and see what’s trending in Hollywood.