@@ -125,8 +125,7 @@ def mock_bq_client():
125125def mock_write_client ():
126126 # Updated patch path to match the new import structure in src
127127 with mock .patch (
128- "google.cloud.bigquery_storage_v1.services.big_query_write.async_client.BigQueryWriteAsyncClient" ,
129- autospec = True ,
128+ "google.cloud.bigquery_storage_v1.BigQueryWriteAsyncClient" , autospec = True
130129 ) as mock_cls :
131130 mock_client = mock_cls .return_value
132131 mock_client .transport = mock .AsyncMock ()
@@ -298,13 +297,7 @@ async def test_event_allowlist(
298297 mock_write_client .append_rows .assert_called_once ()
299298 mock_write_client .append_rows .reset_mock ()
300299
301- # Re-init plugin logic since close() shuts it down, but for this test we want to test denial
302- # However, close() cleans up clients. We should probably create a new plugin or just check that the task was not created.
303- # But on_user_message_callback will try to log.
304- # To keep it simple, let's just use a fresh plugin for the second part or assume close() resets state enough to re-run _ensure_init if needed,
305- # but _ensure_init is called inside _perform_write.
306- # Actually, close() sets _is_shutting_down to True, so further logs are ignored.
307- # So we need a new plugin instance or reset _is_shutting_down.
300+ # Re-init plugin logic since close() shuts it down
308301 plugin ._is_shutting_down = False
309302
310303 user_message = types .Content (parts = [types .Part (text = "What is up?" )])
@@ -399,6 +392,44 @@ def mutate_payload(data):
399392 assert content ["model" ] == "GEMINI-PRO"
400393 assert content ["prompt" ][0 ]["role" ] == "user"
401394
395+ @pytest .mark .asyncio
396+ async def test_content_formatter_error_fallback (
397+ self ,
398+ mock_write_client ,
399+ invocation_context ,
400+ mock_auth_default ,
401+ mock_bq_client ,
402+ mock_to_arrow_schema ,
403+ dummy_arrow_schema ,
404+ mock_asyncio_to_thread ,
405+ ):
406+ """Tests that if content_formatter fails, the original payload is used."""
407+
408+ def error_formatter (data ):
409+ raise ValueError ("Formatter failed" )
410+
411+ config = BigQueryLoggerConfig (content_formatter = error_formatter )
412+ plugin = bigquery_agent_analytics_plugin .BigQueryAgentAnalyticsPlugin (
413+ PROJECT_ID , DATASET_ID , TABLE_ID , config
414+ )
415+ await plugin ._ensure_init ()
416+ mock_write_client .append_rows .reset_mock ()
417+
418+ user_message = types .Content (parts = [types .Part (text = "Original message" )])
419+
420+ # This triggers the log. Internal logic catches exception and proceeds.
421+ await plugin .on_user_message_callback (
422+ invocation_context = invocation_context , user_message = user_message
423+ )
424+ await plugin .close ()
425+
426+ mock_write_client .append_rows .assert_called_once ()
427+ log_entry = _get_captured_event_dict (mock_write_client , dummy_arrow_schema )
428+
429+ # Verify that despite the error, we still got the original data
430+ content = json .loads (log_entry ["content" ])
431+ assert content ["text" ] == "Original message"
432+
402433 @pytest .mark .asyncio
403434 async def test_max_content_length_smart_truncation (
404435 self ,
@@ -695,7 +726,9 @@ async def test_after_model_callback_tool_call(
695726 callback_context ,
696727 dummy_arrow_schema ,
697728 ):
698- tool_fc = types .FunctionCall (name = "get_weather" , args = {"location" : "Paris" })
729+ tool_fc = types .FunctionCall (
730+ name = "get_weather" , args = {"location" : "Paris" }
731+ )
699732 llm_response = llm_response_lib .LlmResponse (
700733 content = types .Content (parts = [types .Part (function_call = tool_fc )]),
701734 usage_metadata = types .UsageMetadata (
@@ -725,7 +758,9 @@ async def test_before_tool_callback_logs_correctly(
725758 type(mock_tool ).name = mock .PropertyMock (return_value = "MyTool" )
726759 type(mock_tool ).description = mock .PropertyMock (return_value = "Description" )
727760 await bq_plugin_inst .before_tool_callback (
728- tool = mock_tool , tool_args = {"param" : "value" }, tool_context = tool_context
761+ tool = mock_tool ,
762+ tool_args = {"param" : "value" },
763+ tool_context = tool_context ,
729764 )
730765 await bq_plugin_inst .close ()
731766 log_entry = _get_captured_event_dict (mock_write_client , dummy_arrow_schema )
@@ -894,7 +929,9 @@ async def fake_append_rows_with_schema_error(requests, **kwargs):
894929 )
895930
896931 @pytest .mark .asyncio
897- async def test_close (self , bq_plugin_inst , mock_bq_client , mock_write_client ):
932+ async def test_close (
933+ self , bq_plugin_inst , mock_bq_client , mock_write_client
934+ ):
898935 await bq_plugin_inst .close ()
899936 mock_write_client .transport .close .assert_called_once ()
900937 # in the new implementation we verify attributes are reset
@@ -997,4 +1034,4 @@ async def test_table_creation_options(
9971034
9981035 # Verify schema type for content is JSON
9991036 content_field = next (f for f in table_arg .schema if f .name == "content" )
1000- assert content_field .field_type == "JSON"
1037+ assert content_field .field_type == "JSON"
0 commit comments