@@ -297,22 +297,26 @@ async def test_event_allowlist(
297297 await plugin .close () # Wait for write
298298 mock_write_client .append_rows .assert_called_once ()
299299 mock_write_client .append_rows .reset_mock ()
300-
301- # Re-init plugin logic since close() shuts it down, but for this test we want to test denial
302- # However, close() cleans up clients. We should probably create a new plugin or just check that the task was not created.
303- # But on_user_message_callback will try to log.
304- # To keep it simple, let's just use a fresh plugin for the second part or assume close() resets state enough to re-run _ensure_init if needed,
305- # but _ensure_init is called inside _perform_write.
306- # Actually, close() sets _is_shutting_down to True, so further logs are ignored.
307- # So we need a new plugin instance or reset _is_shutting_down.
300+ # Re-init plugin logic since close() shuts it down
308301 plugin ._is_shutting_down = False
309302
303+ # REFACTOR: Use a fresh plugin instance for the denied case
304+ plugin_denied = (
305+ bigquery_agent_analytics_plugin .BigQueryAgentAnalyticsPlugin (
306+ PROJECT_ID , DATASET_ID , TABLE_ID , config
307+ )
308+ )
309+ await plugin_denied ._ensure_init ()
310+ # Inject the same mock_write_client
311+ plugin_denied ._write_client = mock_write_client
312+ plugin_denied ._arrow_schema = plugin ._arrow_schema
313+
310314 user_message = types .Content (parts = [types .Part (text = "What is up?" )])
311- await plugin .on_user_message_callback (
315+ await plugin_denied .on_user_message_callback (
312316 invocation_context = invocation_context , user_message = user_message
313317 )
314318 # Since it's denied, no task is created. close() would wait if there was one.
315- await plugin .close ()
319+ await plugin_denied .close ()
316320 mock_write_client .append_rows .assert_not_called ()
317321
318322 @pytest .mark .asyncio
@@ -330,6 +334,8 @@ async def test_event_denylist(
330334 plugin = bigquery_agent_analytics_plugin .BigQueryAgentAnalyticsPlugin (
331335 PROJECT_ID , DATASET_ID , TABLE_ID , config
332336 )
337+ # Reset for next call
338+ plugin ._is_shutting_down = False
333339 await plugin ._ensure_init ()
334340 mock_write_client .append_rows .reset_mock ()
335341
@@ -340,11 +346,21 @@ async def test_event_denylist(
340346 await plugin .close ()
341347 mock_write_client .append_rows .assert_not_called ()
342348
343- # Reset for next call
344- plugin ._is_shutting_down = False
349+ # REFACTOR: Use a fresh plugin instance for the allowed case
350+ plugin_allowed = (
351+ bigquery_agent_analytics_plugin .BigQueryAgentAnalyticsPlugin (
352+ PROJECT_ID , DATASET_ID , TABLE_ID , config
353+ )
354+ )
355+ await plugin_allowed ._ensure_init ()
356+ # Inject the same mock_write_client
357+ plugin_allowed ._write_client = mock_write_client
358+ plugin_allowed ._arrow_schema = plugin ._arrow_schema
345359
346- await plugin .before_run_callback (invocation_context = invocation_context )
347- await plugin .close ()
360+ await plugin_allowed .before_run_callback (
361+ invocation_context = invocation_context
362+ )
363+ await plugin_allowed .close ()
348364 mock_write_client .append_rows .assert_called_once ()
349365
350366 @pytest .mark .asyncio
@@ -399,6 +415,44 @@ def mutate_payload(data):
399415 assert content ["model" ] == "GEMINI-PRO"
400416 assert content ["prompt" ][0 ]["role" ] == "user"
401417
418+ @pytest .mark .asyncio
419+ async def test_content_formatter_error_fallback (
420+ self ,
421+ mock_write_client ,
422+ invocation_context ,
423+ mock_auth_default ,
424+ mock_bq_client ,
425+ mock_to_arrow_schema ,
426+ dummy_arrow_schema ,
427+ mock_asyncio_to_thread ,
428+ ):
429+ """Tests that if content_formatter fails, the original payload is used."""
430+
431+ def error_formatter (data ):
432+ raise ValueError ("Formatter failed" )
433+
434+ config = BigQueryLoggerConfig (content_formatter = error_formatter )
435+ plugin = bigquery_agent_analytics_plugin .BigQueryAgentAnalyticsPlugin (
436+ PROJECT_ID , DATASET_ID , TABLE_ID , config
437+ )
438+ await plugin ._ensure_init ()
439+ mock_write_client .append_rows .reset_mock ()
440+
441+ user_message = types .Content (parts = [types .Part (text = "Original message" )])
442+
443+ # This triggers the log. Internal logic catches exception and proceeds.
444+ await plugin .on_user_message_callback (
445+ invocation_context = invocation_context , user_message = user_message
446+ )
447+ await plugin .close ()
448+
449+ mock_write_client .append_rows .assert_called_once ()
450+ log_entry = _get_captured_event_dict (mock_write_client , dummy_arrow_schema )
451+
452+ # Verify that despite the error, we still got the original data
453+ content = json .loads (log_entry ["content" ])
454+ assert content ["text" ] == "Original message"
455+
402456 @pytest .mark .asyncio
403457 async def test_max_content_length_smart_truncation (
404458 self ,
@@ -725,7 +779,9 @@ async def test_before_tool_callback_logs_correctly(
725779 type(mock_tool ).name = mock .PropertyMock (return_value = "MyTool" )
726780 type(mock_tool ).description = mock .PropertyMock (return_value = "Description" )
727781 await bq_plugin_inst .before_tool_callback (
728- tool = mock_tool , tool_args = {"param" : "value" }, tool_context = tool_context
782+ tool = mock_tool ,
783+ tool_args = {"param" : "value" },
784+ tool_context = tool_context ,
729785 )
730786 await bq_plugin_inst .close ()
731787 log_entry = _get_captured_event_dict (mock_write_client , dummy_arrow_schema )
0 commit comments