@@ -6,7 +6,7 @@ use crate::{
6
6
use either:: Either ;
7
7
use futures_core:: future:: BoxFuture ;
8
8
use futures_core:: stream:: BoxStream ;
9
- use futures_util:: { StreamExt , TryFutureExt , TryStreamExt } ;
9
+ use futures_util:: { stream , StreamExt , TryFutureExt , TryStreamExt } ;
10
10
use sqlx_core:: any:: {
11
11
Any , AnyArguments , AnyColumn , AnyConnectOptions , AnyConnectionBackend , AnyQueryResult , AnyRow ,
12
12
AnyStatement , AnyTypeInfo , AnyTypeInfoKind ,
@@ -16,6 +16,7 @@ use sqlx_core::database::Database;
16
16
use sqlx_core:: describe:: Describe ;
17
17
use sqlx_core:: executor:: Executor ;
18
18
use sqlx_core:: transaction:: TransactionManager ;
19
+ use std:: future;
19
20
20
21
sqlx_core:: declare_driver_with_optional_migrate!( DRIVER = MySql ) ;
21
22
@@ -77,14 +78,15 @@ impl AnyConnectionBackend for MySqlConnection {
77
78
arguments : Option < AnyArguments < ' q > > ,
78
79
) -> BoxStream < ' q , sqlx_core:: Result < Either < AnyQueryResult , AnyRow > > > {
79
80
let persistent = arguments. is_some ( ) ;
80
- let args = arguments
81
- . as_ref ( )
82
- . map ( AnyArguments :: convert_to)
83
- . transpose ( )
84
- . expect ( "Failed to encode arguments" ) ;
81
+ let arguments = match arguments. as_ref ( ) . map ( AnyArguments :: convert_to) . transpose ( ) {
82
+ Ok ( arguments) => arguments,
83
+ Err ( error) => {
84
+ return stream:: once ( future:: ready ( Err ( sqlx_core:: Error :: Encode ( error) ) ) ) . boxed ( )
85
+ }
86
+ } ;
85
87
86
88
Box :: pin (
87
- self . run ( query, args , persistent)
89
+ self . run ( query, arguments , persistent)
88
90
. try_flatten_stream ( )
89
91
. map ( |res| {
90
92
Ok ( match res? {
@@ -101,14 +103,15 @@ impl AnyConnectionBackend for MySqlConnection {
101
103
arguments : Option < AnyArguments < ' q > > ,
102
104
) -> BoxFuture < ' q , sqlx_core:: Result < Option < AnyRow > > > {
103
105
let persistent = arguments. is_some ( ) ;
104
- let args = arguments
106
+ let arguments = arguments
105
107
. as_ref ( )
106
108
. map ( AnyArguments :: convert_to)
107
109
. transpose ( )
108
- . expect ( "Failed to encode arguments" ) ;
110
+ . map_err ( sqlx_core :: Error :: Encode ) ;
109
111
110
112
Box :: pin ( async move {
111
- let stream = self . run ( query, args, persistent) . await ?;
113
+ let arguments = arguments?;
114
+ let stream = self . run ( query, arguments, persistent) . await ?;
112
115
futures_util:: pin_mut!( stream) ;
113
116
114
117
if let Some ( Either :: Right ( row) ) = stream. try_next ( ) . await ? {
0 commit comments