1
+ /// Server launch file. Starts the services to make redis-proto work.
2
+ use crate :: asyncresp:: RespParser ;
1
3
use crate :: database:: save_state;
2
4
use crate :: misc:: misc_interact;
3
5
use crate :: ops:: { op_interact, Ops } ;
4
- /// Server launch file. Starts the services to make redis-proto work.
5
- use crate :: asyncresp:: RespParser ;
6
6
use crate :: { logger:: LOGGER , types:: StateRef } ;
7
7
use crate :: {
8
8
ops:: translate,
@@ -13,7 +13,7 @@ use futures::StreamExt;
13
13
use futures_util:: sink:: SinkExt ;
14
14
use slog:: { debug, error, info} ;
15
15
use std:: sync:: atomic:: Ordering ;
16
- use std:: { net:: SocketAddr , sync :: Arc } ;
16
+ use std:: net:: SocketAddr ;
17
17
use tokio:: net:: { TcpListener , TcpStream } ;
18
18
use tokio_util:: codec:: Decoder ;
19
19
@@ -45,9 +45,7 @@ pub async fn process_command(
45
45
debug ! ( LOGGER , "running op {:?}" , op. clone( ) ) ;
46
46
// Step 1: Execute the operation the operation (from translate above)
47
47
let res: ReturnValue = match op {
48
- Ops :: Misc ( op) => {
49
- misc_interact ( op, state, state_store. clone ( ) ) ,
50
- }
48
+ Ops :: Misc ( op) => misc_interact ( op, state, state_store. clone ( ) ) . await ,
51
49
_ => op_interact ( op, state. clone ( ) ) . await ,
52
50
} ;
53
51
// Step 2: Update commands_ran_since_save counter, and save if necessary
@@ -66,11 +64,7 @@ pub async fn process_command(
66
64
/// This will synchronously process requests / responses for this
67
65
/// connection only. Other connections will be spread across the
68
66
/// thread pool.
69
- async fn process (
70
- socket : TcpStream ,
71
- state_store : StateStoreRef ,
72
- dump_file : Dumpfile ,
73
- ) {
67
+ async fn process ( socket : TcpStream , state_store : StateStoreRef , dump_file : Dumpfile ) {
74
68
tokio:: spawn ( async move {
75
69
let mut state = state_store. get_default ( ) ;
76
70
let mut transport = RespParser . framed ( socket) ;
@@ -83,7 +77,7 @@ async fn process(
83
77
& mut state,
84
78
state_store. clone ( ) ,
85
79
dump_file. clone ( ) ,
86
- redis_value. unwrap ( ) ,
80
+ redis_value. unwrap ( ) ,
87
81
)
88
82
. await ;
89
83
// let res = match translate(redis_value.unwrap()) {
@@ -96,7 +90,7 @@ async fn process(
96
90
// op,
97
91
// &mut state,
98
92
// state_store.clone(),
99
- //
93
+ //
100
94
// )
101
95
// .await
102
96
// }
@@ -119,11 +113,7 @@ async fn process(
119
113
}
120
114
121
115
/// The listener for redis-proto. Accepts connections and spawns handlers.
122
- pub async fn socket_listener (
123
- state_store : StateStoreRef ,
124
- dump_file : Dumpfile ,
125
- config : Config ,
126
- ) {
116
+ pub async fn socket_listener ( state_store : StateStoreRef , dump_file : Dumpfile , config : Config ) {
127
117
// First, get the address determined and parsed.
128
118
let addr_str = format ! ( "{}:{}" , "127.0.0.1" , config. port) ;
129
119
let addr = match addr_str. parse :: < SocketAddr > ( ) {
@@ -159,13 +149,7 @@ pub async fn socket_listener(
159
149
match listener. accept ( ) . await {
160
150
Ok ( ( socket, _) ) => {
161
151
debug ! ( LOGGER , "Accepted connection!" ) ;
162
- process (
163
- socket,
164
- state_store. clone ( ) ,
165
- dump_file. clone ( ) ,
166
-
167
- )
168
- . await ;
152
+ process ( socket, state_store. clone ( ) , dump_file. clone ( ) ) . await ;
169
153
}
170
154
Err ( e) => error ! ( LOGGER , "Failed to establish connectin: {:?}" , e) ,
171
155
} ;
0 commit comments