forked from bashkarev/clickhouse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Socket.php
131 lines (117 loc) · 2.83 KB
/
Socket.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
<?php
/**
* @copyright Copyright (c) 2017 Dmitry Bashkarev
* @license https://github.com/bashkarev/clickhouse/blob/master/LICENSE
* @link https://github.com/bashkarev/clickhouse#readme
*/
namespace bashkarev\clickhouse;
use Yii;
/**
* @author Dmitry Bashkarev <dmitry@bashkarev.com>
*/
class Socket
{
/**
* @var resource
*/
protected $socket;
/**
* @var Configuration
*/
protected $config;
/**
* @var bool
*/
private $_lock = false;
public function __construct(Configuration $config)
{
$this->config = $config;
$this->open();
}
public function __wakeup()
{
$this->open();
}
/**
* Open socket connection
* @throws SocketException
*/
public function open()
{
$this->socket = @stream_socket_client($this->config->getAddress(), $code, $message);
if ($this->socket === false) {
throw new SocketException($message, [], $code);
}
if (stream_set_blocking($this->socket, false) === false) {
throw new SocketException('Failed set non blocking socket');
}
if (YII_DEBUG) {
Yii::trace("Opening clickhouse DB connection: " . $this->config->getAddress() . " ($this->socket)", __METHOD__);
}
}
/**
* Close socket connection
*/
public function close()
{
if (YII_DEBUG) {
Yii::trace("Closing clickhouse DB connection: " . $this->config->getAddress() . " ($this->socket)", __METHOD__);
}
stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR);
}
/**
* @param string $string
* @param null|int $length
* @throws SocketException
*/
public function write($string, $length = null)
{
if ($length === null) {
$length = strlen($string);
}
while (true) {
$bytes = @fwrite($this->socket, $string);
if ($bytes === false) {
$message = "Failed to write to socket";
if ($error = error_get_last()) {
$message .= sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
throw new SocketException($message);
}
if ($bytes < $length) {
$string = substr($string, $bytes);
$length -= $bytes;
} else {
break 1;
}
}
}
/**
* Lock
*/
public function lock()
{
$this->_lock = true;
}
/**
* Unlock
*/
public function unlock()
{
$this->_lock = false;
}
/**
* @return bool
*/
public function isLocked()
{
return $this->_lock;
}
/**
* @return resource
*/
public function getNative()
{
return $this->socket;
}
}