@@ -109,8 +109,23 @@ def test_propagation(enable_extended_tracing):
109
109
len (from_inject_spans ) >= 2
110
110
) # "Expecting at least 2 spans from the injected trace exporter"
111
111
gotNames = [span .name for span in from_inject_spans ]
112
+
113
+ # Check if multiplexed sessions are enabled
114
+ import os
115
+
116
+ multiplexed_enabled = (
117
+ os .getenv ("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS" , "" ).lower () == "true"
118
+ )
119
+
120
+ # Determine expected session span name based on multiplexed sessions
121
+ expected_session_span_name = (
122
+ "CloudSpanner.CreateMultiplexedSession"
123
+ if multiplexed_enabled
124
+ else "CloudSpanner.CreateSession"
125
+ )
126
+
112
127
wantNames = [
113
- "CloudSpanner.CreateSession" ,
128
+ expected_session_span_name ,
114
129
"CloudSpanner.Snapshot.execute_sql" ,
115
130
]
116
131
assert gotNames == wantNames
@@ -392,6 +407,7 @@ def tx_update(txn):
392
407
reason = "Tracing requires OpenTelemetry" ,
393
408
)
394
409
def test_database_partitioned_error ():
410
+ import os
395
411
from opentelemetry .trace .status import StatusCode
396
412
397
413
db , trace_exporter = create_db_trace_exporter ()
@@ -402,43 +418,101 @@ def test_database_partitioned_error():
402
418
pass
403
419
404
420
got_statuses , got_events = finished_spans_statuses (trace_exporter )
405
- # Check for the series of events
406
- want_events = [
407
- ("Acquiring session" , {"kind" : "BurstyPool" }),
408
- ("Waiting for a session to become available" , {"kind" : "BurstyPool" }),
409
- ("No sessions available in pool. Creating session" , {"kind" : "BurstyPool" }),
410
- ("Creating Session" , {}),
411
- ("Starting BeginTransaction" , {}),
412
- (
421
+
422
+ # Check if multiplexed sessions are enabled for partitioned operations
423
+ multiplexed_partitioned_enabled = (
424
+ os .getenv ("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS" ) == "true"
425
+ )
426
+
427
+ # Define expected events based on whether multiplexed sessions are enabled
428
+ if multiplexed_partitioned_enabled :
429
+ # When multiplexed sessions are enabled for partitioned operations,
430
+ # the execution path is different - sessions manager creates multiplexed sessions directly
431
+ expected_event_names = [
432
+ "Creating Session" ,
433
+ "Using session" ,
434
+ "Starting BeginTransaction" ,
435
+ "Returning session" ,
413
436
"exception" ,
414
- {
415
- "exception.type" : "google.api_core.exceptions.InvalidArgument" ,
416
- "exception.message" : "400 Table not found: NonExistent [at 1:8]\n UPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^" ,
417
- "exception.stacktrace" : "EPHEMERAL" ,
418
- "exception.escaped" : "False" ,
419
- },
420
- ),
421
- (
422
437
"exception" ,
423
- {
424
- "exception.type" : "google.api_core.exceptions.InvalidArgument" ,
425
- "exception.message" : "400 Table not found: NonExistent [at 1:8]\n UPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^" ,
426
- "exception.stacktrace" : "EPHEMERAL" ,
427
- "exception.escaped" : "False" ,
428
- },
429
- ),
430
- ]
431
- assert got_events == want_events
438
+ ]
439
+ # Check that we have the expected events
440
+ assert len (got_events ) == len (expected_event_names )
441
+ for i , expected_name in enumerate (expected_event_names ):
442
+ assert got_events [i ][0 ] == expected_name
443
+
444
+ # Verify session usage event shows multiplexed session
445
+ assert got_events [1 ][1 ]["multiplexed" ] is True
446
+
447
+ # Verify session return event shows multiplexed session
448
+ assert got_events [3 ][1 ]["multiplexed" ] is True
449
+
450
+ # Verify the exception details
451
+ for i in [4 , 5 ]: # Both exception events
452
+ assert (
453
+ got_events [i ][1 ]["exception.type" ]
454
+ == "google.api_core.exceptions.InvalidArgument"
455
+ )
456
+ assert (
457
+ "Table not found: NonExistent" in got_events [i ][1 ]["exception.message" ]
458
+ )
459
+ else :
460
+ # When multiplexed sessions are disabled, sessions manager still manages sessions
461
+ # but uses regular pool sessions instead of multiplexed sessions
462
+ expected_event_names = [
463
+ "Acquiring session" ,
464
+ "Waiting for a session to become available" ,
465
+ "No sessions available in pool. Creating session" ,
466
+ "Creating Session" ,
467
+ "Using session" ,
468
+ "Starting BeginTransaction" ,
469
+ "Returning session" ,
470
+ "exception" ,
471
+ "exception" ,
472
+ ]
432
473
433
- # Check for the statues.
474
+ # Check that we have the expected events
475
+ assert len (got_events ) == len (expected_event_names )
476
+ for i , expected_name in enumerate (expected_event_names ):
477
+ assert got_events [i ][0 ] == expected_name
478
+
479
+ # Verify pool-related events
480
+ assert got_events [0 ][1 ]["kind" ] == "BurstyPool"
481
+ assert got_events [1 ][1 ]["kind" ] == "BurstyPool"
482
+ assert got_events [2 ][1 ]["kind" ] == "BurstyPool"
483
+
484
+ # Verify session usage event shows non-multiplexed session
485
+ assert got_events [4 ][1 ]["multiplexed" ] is False
486
+
487
+ # Verify session return event shows non-multiplexed session
488
+ assert got_events [6 ][1 ]["multiplexed" ] is False
489
+
490
+ # Verify the exception details
491
+ for i in [7 , 8 ]: # Both exception events
492
+ assert (
493
+ got_events [i ][1 ]["exception.type" ]
494
+ == "google.api_core.exceptions.InvalidArgument"
495
+ )
496
+ assert (
497
+ "Table not found: NonExistent" in got_events [i ][1 ]["exception.message" ]
498
+ )
499
+
500
+ # Check for the statuses.
434
501
codes = StatusCode
502
+
503
+ # Determine expected session creation span name based on multiplexed sessions
504
+ expected_session_span_name = (
505
+ "CloudSpanner.CreateMultiplexedSession"
506
+ if multiplexed_partitioned_enabled
507
+ else "CloudSpanner.CreateSession"
508
+ )
435
509
want_statuses = [
436
510
(
437
511
"CloudSpanner.Database.execute_partitioned_pdml" ,
438
512
codes .ERROR ,
439
513
"InvalidArgument: 400 Table not found: NonExistent [at 1:8]\n UPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^" ,
440
514
),
441
- ("CloudSpanner.CreateSession" , codes .OK , None ),
515
+ (expected_session_span_name , codes .OK , None ),
442
516
(
443
517
"CloudSpanner.ExecuteStreamingSql" ,
444
518
codes .ERROR ,
0 commit comments