Skip to content

Commit 92661f3

Browse files
chore: make api method logging session-scoped (#2347)
Updates `log_adapter` to optionally store API method names in a `Session` instance instead of the global list. This improves label accuracy when running tests in parallel. - Updates `Session` to initialize `_api_methods` list and lock. - Updates `log_adapter.add_api_method` and `get_and_reset_api_methods` to handle session-scoped logging. - Updates `log_adapter.method_logger` and `property_logger` to identify the session from arguments. - Propagates `session` through `start_query_with_client` and its callers to ensure labels are correctly associated with the session. --- *PR created automatically by Jules for task [6421369828766099756](https://jules.google.com/task/6421369828766099756) started by @tswast* --------- Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> Co-authored-by: Tim Swena <[email protected]>
1 parent 7395d41 commit 92661f3

File tree

6 files changed

+90
-14
lines changed

6 files changed

+90
-14
lines changed

bigframes/core/log_adapter.py

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ def wrapper(*args, **kwargs):
174174
full_method_name = f"{base_name.lower()}-{api_method_name}"
175175
# Track directly called methods
176176
if len(_call_stack) == 0:
177-
add_api_method(full_method_name)
177+
session = _find_session(*args, **kwargs)
178+
add_api_method(full_method_name, session=session)
178179

179180
_call_stack.append(full_method_name)
180181

@@ -220,7 +221,8 @@ def wrapped(*args, **kwargs):
220221
full_property_name = f"{class_name.lower()}-{property_name.lower()}"
221222

222223
if len(_call_stack) == 0:
223-
add_api_method(full_property_name)
224+
session = _find_session(*args, **kwargs)
225+
add_api_method(full_property_name, session=session)
224226

225227
_call_stack.append(full_property_name)
226228
try:
@@ -250,25 +252,41 @@ def wrapper(func):
250252
return wrapper
251253

252254

253-
def add_api_method(api_method_name):
255+
def add_api_method(api_method_name, session=None):
254256
global _lock
255257
global _api_methods
256-
with _lock:
257-
# Push the method to the front of the _api_methods list
258-
_api_methods.insert(0, api_method_name.replace("<", "").replace(">", ""))
259-
# Keep the list length within the maximum limit (adjust MAX_LABELS_COUNT as needed)
260-
_api_methods = _api_methods[:MAX_LABELS_COUNT]
261258

259+
clean_method_name = api_method_name.replace("<", "").replace(">", "")
260+
261+
if session is not None and _is_session_initialized(session):
262+
with session._api_methods_lock:
263+
session._api_methods.insert(0, clean_method_name)
264+
session._api_methods = session._api_methods[:MAX_LABELS_COUNT]
265+
else:
266+
with _lock:
267+
# Push the method to the front of the _api_methods list
268+
_api_methods.insert(0, clean_method_name)
269+
# Keep the list length within the maximum limit (adjust MAX_LABELS_COUNT as needed)
270+
_api_methods = _api_methods[:MAX_LABELS_COUNT]
262271

263-
def get_and_reset_api_methods(dry_run: bool = False):
272+
273+
def get_and_reset_api_methods(dry_run: bool = False, session=None):
264274
global _lock
275+
methods = []
276+
277+
if session is not None and _is_session_initialized(session):
278+
with session._api_methods_lock:
279+
methods.extend(session._api_methods)
280+
if not dry_run:
281+
session._api_methods.clear()
282+
265283
with _lock:
266-
previous_api_methods = list(_api_methods)
284+
methods.extend(_api_methods)
267285

268286
# dry_run might not make a job resource, so only reset the log on real queries.
269287
if not dry_run:
270288
_api_methods.clear()
271-
return previous_api_methods
289+
return methods
272290

273291

274292
def _get_bq_client(*args, **kwargs):
@@ -283,3 +301,36 @@ def _get_bq_client(*args, **kwargs):
283301
return kwargv._block.session.bqclient
284302

285303
return None
304+
305+
306+
def _is_session_initialized(session):
307+
"""Return True if fully initialized.
308+
309+
Because the method logger could get called before Session.__init__ has a
310+
chance to run, we use the globals in that case.
311+
"""
312+
return hasattr(session, "_api_methods_lock") and hasattr(session, "_api_methods")
313+
314+
315+
def _find_session(*args, **kwargs):
316+
# This function cannot import Session at the top level because Session
317+
# imports log_adapter.
318+
from bigframes.session import Session
319+
320+
session = args[0] if args else None
321+
if (
322+
session is not None
323+
and isinstance(session, Session)
324+
and _is_session_initialized(session)
325+
):
326+
return session
327+
328+
session = kwargs.get("session")
329+
if (
330+
session is not None
331+
and isinstance(session, Session)
332+
and _is_session_initialized(session)
333+
):
334+
return session
335+
336+
return None

bigframes/session/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import logging
2424
import os
2525
import secrets
26+
import threading
2627
import typing
2728
from typing import (
2829
Any,
@@ -208,6 +209,9 @@ def __init__(
208209
self._session_id: str = "session" + secrets.token_hex(3)
209210
# store table ids and delete them when the session is closed
210211

212+
self._api_methods: list[str] = []
213+
self._api_methods_lock = threading.Lock()
214+
211215
self._objects: list[
212216
weakref.ReferenceType[
213217
Union[
@@ -2160,6 +2164,7 @@ def _start_query_ml_ddl(
21602164
query_with_job=True,
21612165
job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY,
21622166
publisher=self._publisher,
2167+
session=self,
21632168
)
21642169
return iterator, query_job
21652170

@@ -2188,6 +2193,7 @@ def _create_object_table(self, path: str, connection: str) -> str:
21882193
timeout=None,
21892194
query_with_job=True,
21902195
publisher=self._publisher,
2196+
session=self,
21912197
)
21922198

21932199
return table

bigframes/session/_io/bigquery/__init__.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ def create_temp_table(
126126
schema: Optional[Iterable[bigquery.SchemaField]] = None,
127127
cluster_columns: Optional[list[str]] = None,
128128
kms_key: Optional[str] = None,
129+
session=None,
129130
) -> str:
130131
"""Create an empty table with an expiration in the desired session.
131132
@@ -153,6 +154,7 @@ def create_temp_view(
153154
*,
154155
expiration: datetime.datetime,
155156
sql: str,
157+
session=None,
156158
) -> str:
157159
"""Create an empty table with an expiration in the desired session.
158160
@@ -228,12 +230,14 @@ def format_option(key: str, value: Union[bool, str]) -> str:
228230
return f"{key}={repr(value)}"
229231

230232

231-
def add_and_trim_labels(job_config):
233+
def add_and_trim_labels(job_config, session=None):
232234
"""
233235
Add additional labels to the job configuration and trim the total number of labels
234236
to ensure they do not exceed MAX_LABELS_COUNT labels per job.
235237
"""
236-
api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run)
238+
api_methods = log_adapter.get_and_reset_api_methods(
239+
dry_run=job_config.dry_run, session=session
240+
)
237241
job_config.labels = create_job_configs_labels(
238242
job_configs_labels=job_config.labels,
239243
api_methods=api_methods,
@@ -270,6 +274,7 @@ def start_query_with_client(
270274
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
271275
query_with_job: Literal[True],
272276
publisher: bigframes.core.events.Publisher,
277+
session=None,
273278
) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]:
274279
...
275280

@@ -286,6 +291,7 @@ def start_query_with_client(
286291
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
287292
query_with_job: Literal[False],
288293
publisher: bigframes.core.events.Publisher,
294+
session=None,
289295
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
290296
...
291297

@@ -303,6 +309,7 @@ def start_query_with_client(
303309
query_with_job: Literal[True],
304310
job_retry: google.api_core.retry.Retry,
305311
publisher: bigframes.core.events.Publisher,
312+
session=None,
306313
) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]:
307314
...
308315

@@ -320,6 +327,7 @@ def start_query_with_client(
320327
query_with_job: Literal[False],
321328
job_retry: google.api_core.retry.Retry,
322329
publisher: bigframes.core.events.Publisher,
330+
session=None,
323331
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
324332
...
325333

@@ -340,14 +348,15 @@ def start_query_with_client(
340348
# version 3.36.0 or later.
341349
job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY,
342350
publisher: bigframes.core.events.Publisher,
351+
session=None,
343352
) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
344353
"""
345354
Starts query job and waits for results.
346355
"""
347356
# Note: Ensure no additional labels are added to job_config after this
348357
# point, as `add_and_trim_labels` ensures the label count does not
349358
# exceed MAX_LABELS_COUNT.
350-
add_and_trim_labels(job_config)
359+
add_and_trim_labels(job_config, session=session)
351360

352361
try:
353362
if not query_with_job:

bigframes/session/bq_caching_executor.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ def _export_gbq(
323323
iterator, job = self._run_execute_query(
324324
sql=sql,
325325
job_config=job_config,
326+
session=array_value.session,
326327
)
327328

328329
has_timedelta_col = any(
@@ -389,6 +390,7 @@ def _run_execute_query(
389390
sql: str,
390391
job_config: Optional[bq_job.QueryJobConfig] = None,
391392
query_with_job: bool = True,
393+
session=None,
392394
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
393395
"""
394396
Starts BigQuery query job and waits for results.
@@ -415,6 +417,7 @@ def _run_execute_query(
415417
timeout=None,
416418
query_with_job=True,
417419
publisher=self._publisher,
420+
session=session,
418421
)
419422
else:
420423
return bq_io.start_query_with_client(
@@ -427,6 +430,7 @@ def _run_execute_query(
427430
timeout=None,
428431
query_with_job=False,
429432
publisher=self._publisher,
433+
session=session,
430434
)
431435

432436
except google.api_core.exceptions.BadRequest as e:
@@ -661,6 +665,7 @@ def _execute_plan_gbq(
661665
sql=compiled.sql,
662666
job_config=job_config,
663667
query_with_job=(destination_table is not None),
668+
session=plan.session,
664669
)
665670

666671
# we could actually cache even when caching is not explicitly requested, but being conservative for now

bigframes/session/direct_gbq_execution.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def execute(
6060

6161
iterator, query_job = self._run_execute_query(
6262
sql=compiled.sql,
63+
session=plan.session,
6364
)
6465

6566
# just immediately downlaod everything for simplicity
@@ -75,6 +76,7 @@ def _run_execute_query(
7576
self,
7677
sql: str,
7778
job_config: Optional[bq_job.QueryJobConfig] = None,
79+
session=None,
7880
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
7981
"""
8082
Starts BigQuery query job and waits for results.
@@ -89,4 +91,5 @@ def _run_execute_query(
8991
metrics=None,
9092
query_with_job=False,
9193
publisher=self._publisher,
94+
session=session,
9295
)

bigframes/session/loader.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1324,6 +1324,7 @@ def _start_query_with_job_optional(
13241324
metrics=None,
13251325
query_with_job=False,
13261326
publisher=self._publisher,
1327+
session=self._session,
13271328
)
13281329
return rows
13291330

@@ -1350,6 +1351,7 @@ def _start_query_with_job(
13501351
metrics=None,
13511352
query_with_job=True,
13521353
publisher=self._publisher,
1354+
session=self._session,
13531355
)
13541356
return query_job
13551357

0 commit comments

Comments
 (0)