-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAsyncDBPool.php
202 lines (182 loc) · 5.54 KB
/
AsyncDBPool.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
<?php
/**
* AsyncDBPool - A wrapper for mysqli async connections
*
* Configure your mysql connection in the newConnection function
*
* Start a new pool with $pool = AsyncDBPool::get();
* You can have multiple different pools at the same time with $secondPool = AsyncDBPool::get("someIdentifier");
*
* Use $pool->query('SELECT "mydemo"') to send a query to the pool
* If you need to work with the result use
* $pool->query('SELECT "mydemo"', function($result) {var_dump($result->fetch_all());})
*
* The function query takes any callback and will always pass the resultset as the first parameter
* The callback will !!NOT!! be called immediately after the query finished since php does not allow threads
* Use the evaluateAll to let all queries finish and apply their callback
*
* PHP version 5
*
* LICENSE: This source file is licenced under GPL3
*
* @author Florian Bieringer <florian.bieringer@uni-passau.de>
* @license https://www.gnu.org/licenses/gpl-3.0.de.html GPL3
*/
class AsyncDBPool
{
/* How many connections should be established per pool? */
const MAX_QUERIES = 10;
private $connections;
private $queries;
/* The asyncid is required to identify a connection in the pool */
private $asyncid = 0;
private $queue;
private $error;
/* Pools */
private static $pools;
/**
* #### SET YOUR CONNECTION SETTINGS THERE! ####
*
* @return mysqli New connection
*/
private static function newConnection() {
return mysqli_connect("localhost", "user", "password", "database");
}
/**
* Initiate a pool of asynchronious connections
*
* @param string $pool Identifier of pool
*
* @return self
*/
public static function get($pool = "default")
{
if (self::$pools[$pool] === null) {
self::$pools[$pool] = new self;
}
return self::$pools[$pool];
}
/**
* Send a mysql query to the pool
*
* @param $query The mysql query to be evaluated
*
* @param callable|null $callback Callback for the query. Will receive the mysqli result set as first param.
*/
public function query($query, callable $callback = null)
{
// Try to get a connection. If it failed, we have reached the maximum connections in this pool. Push it to queue
$conn = $this->getConnection();
if ($conn) {
if ($callback) {
$conn->callback = $callback;
}
$conn->asyncid = $this->asyncid;
$this->queries[$this->asyncid] = $conn;
$conn->query($query, MYSQLI_ASYNC);
$this->asyncid++;
} else {
$this->queue($query, $callback);
}
}
/**
* Evaluates the already finished queries in this pool and applies callback.
*
* @param int $timeout Timeout to wait for connections to finish.
*/
public function evaluate($timeout = 0)
{
// Devide all queries into read, reject, error
$error = $reject = $read = $this->queries;
mysqli_poll($read, $error, $reject, $timeout);
// For all queries that finished
foreach ($read as $r) {
// Fetch their resultset
if ($r && $set = $r->reap_async_query()) {
// Call the callback
if ($r->callback) {
$func = $r->callback;
$func($set);
}
}
}
// Free all finished connections for reuse
foreach (array_merge($error, $reject, $read) as $conn) {
$this->freeConnection($conn);
}
// Continue with the queue
$this->continueQueue();
}
/**
* Will evaluate all queries and apply their callback.
*/
public function evaluateAll()
{
while ($this->queries || $this->queue) {
$this->evaluate();
}
}
/**
* Get array of occured mysql errors
*
* @return mixed Array of errors
*/
public function getErrors()
{
return $this->error;
}
/**
* Will return a prepared connection to execute a query.
*
* @return mysqli|null
*/
private function getConnection()
{
if (empty($this->connections)) {
if (count($this->queries) >= self::MAX_QUERIES) {
return null;
}
return self::newConnection();
}
return array_pop($this->connections);
}
/**
* Will free up a connection to reuse.
*
* @param $connection
*/
private function freeConnection($connection)
{
$error = mysqli_error($connection);
if ($error) {
$this->error[] = $error;
}
unset($this->queries[$connection->asyncid]);
$this->connections[] = $connection;
}
/**
* Continue the queued mysql queries
*/
private function continueQueue()
{
while ($this->queue && $conn = $this->getConnection()) {
$query = array_pop($this->queue);
if ($query['callback']) {
$conn->callback = $query['callback'];
}
$conn->asyncid = $this->asyncid;
$this->queries[$this->asyncid] = $conn;
$conn->query($query['query'], MYSQLI_ASYNC);
$this->asyncid++;
}
}
/**
* Queue up a query
* @param $query The query statement
* @param $callback The callback
*/
private function queue($query, $callback)
{
$this->queue[] = array("query" => $query, "callback" => $callback);
}
}