summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRemi Collet <remi@remirepo.net>2018-08-31 14:02:25 +0200
committerRemi Collet <remi@remirepo.net>2018-08-31 14:02:25 +0200
commit10ce346530a757323bd9ea0aeae3cac1de5cd960 (patch)
treedb41d6e0904a1bd91689e28bf07f59b5427c1b98
parent0f701e29462ce5c4e2aedc9bde0f1aa8fae39bd2 (diff)
v4.1.0
-rw-r--r--PHPINFO10
-rw-r--r--REFLECTION272
-rw-r--r--channel.cc151
-rw-r--r--channel.h107
-rw-r--r--php-pecl-swoole4.spec43
-rw-r--r--socket.cc1441
-rw-r--r--socket.h142
7 files changed, 2138 insertions, 28 deletions
diff --git a/PHPINFO b/PHPINFO
index 884fd1b..177993c 100644
--- a/PHPINFO
+++ b/PHPINFO
@@ -2,29 +2,31 @@
swoole
swoole support => enabled
-Version => 4.0.4
+Version => 4.1.0
Author => Swoole Group[email: team@swoole.com]
coroutine => enabled
+trace-log => enabled
epoll => enabled
eventfd => enabled
signalfd => enabled
cpu affinity => enabled
spinlock => enabled
rwlock => enabled
-async postgresql => enabled
-async redis client => enabled
-async http/websocket client => enabled
sockets => enabled
openssl => enabled
http2 => enabled
pcre => enabled
zlib => enabled
+brotli => enabled
mutex_timedlock => enabled
pthread_barrier => enabled
futex => enabled
mysqlnd => enabled
+redis client => enabled
+postgresql client => enabled
Directive => Local Value => Master Value
+swoole.enable_coroutine => On => On
swoole.aio_thread_num => 2 => 2
swoole.display_errors => On => On
swoole.use_namespace => On => On
diff --git a/REFLECTION b/REFLECTION
index 4997035..8075de9 100644
--- a/REFLECTION
+++ b/REFLECTION
@@ -1,6 +1,9 @@
-Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
+Extension [ <persistent> extension #148 swoole version 4.1.0 ] {
- INI {
+ Entry [ swoole.enable_coroutine <ALL> ]
+ Current = 'On'
+ }
Entry [ swoole.aio_thread_num <ALL> ]
Current = '2'
}
@@ -21,7 +24,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
}
}
- - Constants [164] {
+ - Constants [192] {
Constant [ integer SWOOLE_BASE ] { 4 }
Constant [ integer SWOOLE_THREAD ] { 2 }
Constant [ integer SWOOLE_PROCESS ] { 3 }
@@ -66,7 +69,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
Constant [ integer SWOOLE_DTLSv1_CLIENT_METHOD ] { 17 }
Constant [ integer SWOOLE_EVENT_READ ] { 512 }
Constant [ integer SWOOLE_EVENT_WRITE ] { 1024 }
- Constant [ string SWOOLE_VERSION ] { 4.0.4 }
+ Constant [ string SWOOLE_VERSION ] { 4.1.0 }
Constant [ integer SWOOLE_ERROR_MALLOC_FAIL ] { 501 }
Constant [ integer SWOOLE_ERROR_SYSTEM_CALL_FAIL ] { 502 }
Constant [ integer SWOOLE_ERROR_PHP_FATAL_ERROR ] { 503 }
@@ -156,6 +159,8 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
Constant [ integer SW_PGSQL_ASSOC ] { 1 }
Constant [ integer SW_PGSQL_NUM ] { 2 }
Constant [ integer SW_PGSQL_BOTH ] { 3 }
+ Constant [ integer SWOOLE_EXIT_IN_COROUTINE ] { 2 }
+ Constant [ integer SWOOLE_EXIT_IN_SERVER ] { 4 }
Constant [ integer SWOOLE_AIO_BASE ] { 0 }
Constant [ integer SWOOLE_AIO_LINUX ] { 0 }
Constant [ integer SWOOLE_FILELOCK ] { 2 }
@@ -163,13 +168,39 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
Constant [ integer SWOOLE_SEM ] { 4 }
Constant [ integer SWOOLE_RWLOCK ] { 1 }
Constant [ integer SWOOLE_SPINLOCK ] { 5 }
- Constant [ integer WEBSOCKET_OPCODE_TEXT ] { 1 }
- Constant [ integer WEBSOCKET_OPCODE_BINARY ] { 2 }
- Constant [ integer WEBSOCKET_OPCODE_PING ] { 9 }
Constant [ integer WEBSOCKET_STATUS_CONNECTION ] { 1 }
Constant [ integer WEBSOCKET_STATUS_HANDSHAKE ] { 2 }
Constant [ integer WEBSOCKET_STATUS_FRAME ] { 3 }
Constant [ integer WEBSOCKET_STATUS_ACTIVE ] { 3 }
+ Constant [ integer WEBSOCKET_STATUS_CLOSING ] { 4 }
+ Constant [ integer WEBSOCKET_OPCODE_CONTINUATION ] { 0 }
+ Constant [ integer WEBSOCKET_OPCODE_TEXT ] { 1 }
+ Constant [ integer WEBSOCKET_OPCODE_BINARY ] { 2 }
+ Constant [ integer WEBSOCKET_OPCODE_CLOSE ] { 8 }
+ Constant [ integer WEBSOCKET_OPCODE_PING ] { 9 }
+ Constant [ integer WEBSOCKET_OPCODE_PONG ] { 10 }
+ Constant [ integer WEBSOCKET_CLOSE_NORMAL ] { 1000 }
+ Constant [ integer WEBSOCKET_CLOSE_GOING_AWAY ] { 1001 }
+ Constant [ integer WEBSOCKET_CLOSE_PROTOCOL_ERROR ] { 1002 }
+ Constant [ integer WEBSOCKET_CLOSE_DATA_ERROR ] { 1003 }
+ Constant [ integer WEBSOCKET_CLOSE_STATUS_ERROR ] { 1005 }
+ Constant [ integer WEBSOCKET_CLOSE_ABNORMAL ] { 1006 }
+ Constant [ integer WEBSOCKET_CLOSE_MESSAGE_ERROR ] { 1007 }
+ Constant [ integer WEBSOCKET_CLOSE_POLICY_ERROR ] { 1008 }
+ Constant [ integer WEBSOCKET_CLOSE_MESSAGE_TOO_BIG ] { 1009 }
+ Constant [ integer WEBSOCKET_CLOSE_EXTENSION_MISSING ] { 1010 }
+ Constant [ integer WEBSOCKET_CLOSE_SERVER_ERROR ] { 1011 }
+ Constant [ integer WEBSOCKET_CLOSE_TLS ] { 1015 }
+ Constant [ integer SWOOLE_HTTP2_TYPE_DATA ] { 0 }
+ Constant [ integer SWOOLE_HTTP2_TYPE_HEADERS ] { 1 }
+ Constant [ integer SWOOLE_HTTP2_TYPE_PRIORITY ] { 2 }
+ Constant [ integer SWOOLE_HTTP2_TYPE_RST_STREAM ] { 3 }
+ Constant [ integer SWOOLE_HTTP2_TYPE_SETTINGS ] { 4 }
+ Constant [ integer SWOOLE_HTTP2_TYPE_PUSH_PROMISE ] { 5 }
+ Constant [ integer SWOOLE_HTTP2_TYPE_PING ] { 6 }
+ Constant [ integer SWOOLE_HTTP2_TYPE_GOAWAY ] { 7 }
+ Constant [ integer SWOOLE_HTTP2_TYPE_WINDOW_UPDATE ] { 8 }
+ Constant [ integer SWOOLE_HTTP2_TYPE_CONTINUATION ] { 9 }
Constant [ integer SWOOLE_HTTP2_ERROR_NO_ERROR ] { 0 }
Constant [ integer SWOOLE_HTTP2_ERROR_PROTOCOL_ERROR ] { 1 }
Constant [ integer SWOOLE_HTTP2_ERROR_INTERNAL_ERROR ] { 2 }
@@ -428,7 +459,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
}
}
- - Classes [48] {
+ - Classes [51] {
Class [ <internal:swoole> class Swoole\Server ] {
- Constants [0] {
@@ -995,7 +1026,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
}
}
- Class [ <internal:swoole> <iterateable> class Swoole\Connection\Iterator implements Iterator, Traversable, Countable, ArrayAccess ] {
+ Class [ <internal:swoole> <iterateable> class Swoole\Connection\Iterator implements Iterator, Traversable, ArrayAccess, Countable ] {
- Constants [0] {
}
@@ -1591,16 +1622,15 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
- Static methods [0] {
}
- - Properties [6] {
+ - Properties [5] {
Property [ <default> public $errCode ]
Property [ <default> public $sock ]
Property [ <default> public $type ]
- Property [ <default> public $id ]
Property [ <default> public $setting ]
Property [ <default> public $connected ]
}
- - Methods [17] {
+ - Methods [18] {
Method [ <internal:swoole, ctor> public method __construct ] {
- Parameters [1] {
@@ -1646,9 +1676,8 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
Method [ <internal:swoole> public method send ] {
- - Parameters [2] {
+ - Parameters [1] {
Parameter #0 [ <required> $data ]
- Parameter #1 [ <optional> $flag ]
}
}
@@ -1664,12 +1693,21 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
Method [ <internal:swoole> public method sendto ] {
- Parameters [3] {
- Parameter #0 [ <required> $ip ]
+ Parameter #0 [ <required> $address ]
Parameter #1 [ <required> $port ]
Parameter #2 [ <required> $data ]
}
}
+ Method [ <internal:swoole> public method recvfrom ] {
+
+ - Parameters [3] {
+ Parameter #0 [ <required> $length ]
+ Parameter #1 [ <required> &$address ]
+ Parameter #2 [ <optional> &$port ]
+ }
+ }
+
Method [ <internal:swoole> public method enableSSL ] {
- Parameters [0] {
@@ -2801,7 +2839,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
Property [ <default> public $body ]
}
- - Methods [19] {
+ - Methods [20] {
Method [ <internal:swoole, ctor> public method __construct ] {
- Parameters [3] {
@@ -2902,6 +2940,16 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
}
}
+ Method [ <internal:swoole> public method addData ] {
+
+ - Parameters [4] {
+ Parameter #0 [ <required> $path ]
+ Parameter #1 [ <required> $name ]
+ Parameter #2 [ <optional> $type ]
+ Parameter #3 [ <optional> $filename ]
+ }
+ }
+
Method [ <internal:swoole> public method isConnected ] {
- Parameters [0] {
@@ -2953,7 +3001,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
- Static properties [0] {
}
- - Static methods [16] {
+ - Static methods [18] {
Method [ <internal:swoole> static public method create ] {
- Parameters [1] {
@@ -3071,6 +3119,21 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
Parameter #4 [ <optional> $service ]
}
}
+
+ Method [ <internal:swoole> static public method getBackTrace ] {
+
+ - Parameters [3] {
+ Parameter #0 [ <required> $cid ]
+ Parameter #1 [ <optional> $options ]
+ Parameter #2 [ <optional> $limit ]
+ }
+ }
+
+ Method [ <internal:swoole> static public method listCoroutines ] {
+
+ - Parameters [0] {
+ }
+ }
}
- Properties [0] {
@@ -3080,6 +3143,134 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
}
}
+ Class [ <internal:swoole> <iterateable> class Swoole\Coroutine\Iterator implements Iterator, Traversable, Countable ] {
+
+ - Constants [0] {
+ }
+
+ - Static properties [0] {
+ }
+
+ - Static methods [0] {
+ }
+
+ - Properties [0] {
+ }
+
+ - Methods [7] {
+ Method [ <internal:swoole, prototype Iterator> public method rewind ] {
+
+ - Parameters [0] {
+ }
+ }
+
+ Method [ <internal:swoole, prototype Iterator> public method next ] {
+
+ - Parameters [0] {
+ }
+ }
+
+ Method [ <internal:swoole, prototype Iterator> public method current ] {
+
+ - Parameters [0] {
+ }
+ }
+
+ Method [ <internal:swoole, prototype Iterator> public method key ] {
+
+ - Parameters [0] {
+ }
+ }
+
+ Method [ <internal:swoole, prototype Iterator> public method valid ] {
+
+ - Parameters [0] {
+ }
+ }
+
+ Method [ <internal:swoole, prototype Countable> public method count ] {
+
+ - Parameters [0] {
+ }
+ }
+
+ Method [ <internal:swoole, dtor> public method __destruct ] {
+
+ - Parameters [0] {
+ }
+ }
+ }
+ }
+
+ Class [ <internal:swoole> class Swoole\ExitException extends Exception implements Throwable ] {
+
+ - Constants [0] {
+ }
+
+ - Static properties [0] {
+ }
+
+ - Static methods [0] {
+ }
+
+ - Properties [4] {
+ Property [ <default> protected $message ]
+ Property [ <default> protected $code ]
+ Property [ <default> protected $file ]
+ Property [ <default> protected $line ]
+ }
+
+ - Methods [12] {
+ Method [ <internal:swoole> public method getFlags ] {
+
+ - Parameters [0] {
+ }
+ }
+
+ Method [ <internal:swoole> public method getStatus ] {
+
+ - Parameters [0] {
+ }
+ }
+
+ Method [ <internal:Core, inherits Exception, ctor> public method __construct ] {
+
+ - Parameters [3] {
+ Parameter #0 [ <optional> $message ]
+ Parameter #1 [ <optional> $code ]
+ Parameter #2 [ <optional> $previous ]
+ }
+ }
+
+ Method [ <internal:Core, inherits Exception> public method __wakeup ] {
+ }
+
+ Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getMessage ] {
+ }
+
+ Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getCode ] {
+ }
+
+ Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getFile ] {
+ }
+
+ Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getLine ] {
+ }
+
+ Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getTrace ] {
+ }
+
+ Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getPrevious ] {
+ }
+
+ Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getTraceAsString ] {
+ }
+
+ Method [ <internal:Core, inherits Exception, prototype Throwable> public method __toString ] {
+ }
+ }
+ }
+
Class [ <internal:swoole> class Swoole\Http\Client ] {
- Constants [0] {
@@ -3495,7 +3686,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
}
}
- Class [ <internal:swoole> <iterateable> class Swoole\Table implements ArrayAccess, Iterator, Traversable, Countable ] {
+ Class [ <internal:swoole> <iterateable> class Swoole\Table implements Iterator, Traversable, ArrayAccess, Countable ] {
- Constants [3] {
Constant [ public integer TYPE_INT ] { 1 }
@@ -3717,6 +3908,36 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
}
}
+ Class [ <internal:swoole> class Swoole\Runtime ] {
+
+ - Constants [0] {
+ }
+
+ - Static properties [0] {
+ }
+
+ - Static methods [2] {
+ Method [ <internal:swoole> static public method enableStrictMode ] {
+
+ - Parameters [0] {
+ }
+ }
+
+ Method [ <internal:swoole> static public method enableCoroutine ] {
+
+ - Parameters [1] {
+ Parameter #0 [ <optional> $enable ]
+ }
+ }
+ }
+
+ - Properties [0] {
+ }
+
+ - Methods [0] {
+ }
+ }
+
Class [ <internal:swoole> class Swoole\Lock ] {
- Constants [5] {
@@ -4344,8 +4565,9 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
Method [ <internal:swoole> public method status ] {
- - Parameters [1] {
+ - Parameters [2] {
Parameter #0 [ <required> $http_code ]
+ Parameter #1 [ <optional> $reason ]
}
}
@@ -4430,8 +4652,9 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
- Static methods [0] {
}
- - Properties [9] {
+ - Properties [10] {
Property [ <default> public $fd ]
+ Property [ <default> public $streamId ]
Property [ <default> public $header ]
Property [ <default> public $server ]
Property [ <default> public $request ]
@@ -5668,7 +5891,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
Property [ <default> public $port ]
}
- - Methods [10] {
+ - Methods [11] {
Method [ <internal:swoole, ctor> public method __construct ] {
- Parameters [3] {
@@ -5700,7 +5923,14 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {
Method [ <internal:swoole> public method stats ] {
- Parameters [1] {
- Parameter #0 [ <required> $key ]
+ Parameter #0 [ <optional> $key ]
+ }
+ }
+
+ Method [ <internal:swoole> public method isStreamExist ] {
+
+ - Parameters [1] {
+ Parameter #0 [ <required> $stream_id ]
}
}
diff --git a/channel.cc b/channel.cc
new file mode 100644
index 0000000..88423fc
--- /dev/null
+++ b/channel.cc
@@ -0,0 +1,151 @@
+#include "channel.h"
+
+using namespace swoole;
+
+static void channel_defer_callback(void *data)
+{
+ notify_msg_t *msg = (notify_msg_t*) data;
+ coroutine_t *co = msg->chan->pop_coroutine(msg->type);
+ coroutine_resume(co);
+ delete msg;
+}
+
+static void channel_pop_timeout(swTimer *timer, swTimer_node *tnode)
+{
+ timeout_msg_t *msg = (timeout_msg_t *) tnode->data;
+ msg->error = true;
+ msg->timer = nullptr;
+ msg->chan->remove(msg->co);
+ coroutine_resume(msg->co);
+}
+
+Channel::Channel(size_t _capacity)
+{
+ capacity = _capacity;
+ closed = false;
+ notify_producer_count = 0;
+ notify_consumer_count = 0;
+}
+
+void Channel::yield(enum channel_op type)
+{
+ int _cid = coroutine_get_current_cid();
+ if (_cid == -1)
+ {
+ swError("Socket::yield() must be called in the coroutine.");
+ }
+ coroutine_t *co = coroutine_get_by_id(_cid);
+ if (type == PRODUCER)
+ {
+ producer_queue.push_back(co);
+ swDebug("producer[%d]", coroutine_get_cid(co));
+ }
+ else
+ {
+ consumer_queue.push_back(co);
+ swDebug("consumer[%d]", coroutine_get_cid(co));
+ }
+ coroutine_yield(co);
+}
+
+void Channel::notify(enum channel_op type)
+{
+ notify_msg_t *msg = new notify_msg_t;
+ msg->chan = this;
+ msg->type = type;
+ if (type == PRODUCER)
+ {
+ notify_producer_count++;
+ }
+ else
+ {
+ notify_consumer_count++;
+ }
+ SwooleG.main_reactor->defer(SwooleG.main_reactor, channel_defer_callback, msg);
+}
+
+void* Channel::pop(double timeout)
+{
+ timeout_msg_t msg;
+ msg.error = false;
+ if (timeout > 0)
+ {
+ int msec = (int) (timeout * 1000);
+ if (SwooleG.timer.fd == 0)
+ {
+ swTimer_init (msec);
+ }
+ msg.chan = this;
+ msg.co = coroutine_get_by_id(coroutine_get_current_cid());
+ msg.timer = SwooleG.timer.add(&SwooleG.timer, msec, 0, &msg, channel_pop_timeout);
+ }
+ else
+ {
+ msg.timer = NULL;
+ }
+ if (is_empty() || consumer_queue.size() > 0)
+ {
+ yield(CONSUMER);
+ }
+ if (msg.error)
+ {
+ return nullptr;
+ }
+ if (msg.timer)
+ {
+ swTimer_del(&SwooleG.timer, msg.timer);
+ }
+ /**
+ * pop data
+ */
+ void *data = data_queue.front();
+ data_queue.pop();
+ /**
+ * notify producer
+ */
+ if (producer_queue.size() > 0 && notify_producer_count < producer_queue.size())
+ {
+ notify(PRODUCER);
+ }
+ return data;
+}
+
+bool Channel::push(void *data)
+{
+ if (is_full() || producer_queue.size() > 0)
+ {
+ yield(PRODUCER);
+ }
+ /**
+ * push data
+ */
+ data_queue.push(data);
+ swDebug("push data, count=%ld", length());
+ /**
+ * notify consumer
+ */
+ if (consumer_queue.size() > 0 && notify_consumer_count < consumer_queue.size())
+ {
+ notify(CONSUMER);
+ }
+ return true;
+}
+
+bool Channel::close()
+{
+ if (closed)
+ {
+ return false;
+ }
+ swDebug("closed");
+ closed = true;
+ while (producer_queue.size() > 0 && notify_producer_count < producer_queue.size())
+ {
+ notify(PRODUCER);
+ }
+ while (consumer_queue.size() > 0 && notify_consumer_count < producer_queue.size())
+ {
+ notify(CONSUMER);
+ }
+ return true;
+}
diff --git a/channel.h b/channel.h
new file mode 100644
index 0000000..ab4fb5f
--- /dev/null
+++ b/channel.h
@@ -0,0 +1,107 @@
+#pragma once
+
+#include "swoole.h"
+#include "context.h"
+#include "coroutine.h"
+#include <string>
+#include <iostream>
+#include <list>
+#include <queue>
+#include <sys/stat.h>
+
+namespace swoole {
+
+enum channel_op
+{
+ PRODUCER = 1,
+ CONSUMER = 2,
+};
+
+class Channel;
+
+struct notify_msg_t
+{
+ Channel *chan;
+ enum channel_op type;
+};
+
+struct timeout_msg_t
+{
+ Channel *chan;
+ coroutine_t *co;
+ bool error;
+ swTimer_node *timer;
+};
+
+class Channel
+{
+private:
+ std::list<coroutine_t *> producer_queue;
+ std::list<coroutine_t *> consumer_queue;
+ std::queue<void *> data_queue;
+ size_t capacity;
+ uint32_t notify_producer_count;
+ uint32_t notify_consumer_count;
+
+public:
+ bool closed;
+ inline bool is_empty()
+ {
+ return data_queue.size() == 0;
+ }
+
+ inline bool is_full()
+ {
+ return data_queue.size() == capacity;
+ }
+
+ inline size_t length()
+ {
+ return data_queue.size();
+ }
+
+ inline size_t consumer_num()
+ {
+ return consumer_queue.size();
+ }
+
+ inline size_t producer_num()
+ {
+ return producer_queue.size();
+ }
+
+ inline void remove(coroutine_t *co)
+ {
+ consumer_queue.remove(co);
+ }
+
+ inline coroutine_t* pop_coroutine(enum channel_op type)
+ {
+ coroutine_t* co;
+ if (type == PRODUCER)
+ {
+ co = producer_queue.front();
+ producer_queue.pop_front();
+ notify_producer_count--;
+ swDebug("resume producer[%d]", coroutine_get_cid(co));
+ }
+ else
+ {
+ co = consumer_queue.front();
+ consumer_queue.pop_front();
+ notify_consumer_count--;
+ swDebug("resume consumer[%d]", coroutine_get_cid(co));
+ }
+ return co;
+ }
+
+ Channel(size_t _capacity);
+ void yield(enum channel_op type);
+ void notify(enum channel_op type);
+ void* pop(double timeout = 0);
+ bool push(void *data);
+ bool close();
+};
+
+};
+
diff --git a/php-pecl-swoole4.spec b/php-pecl-swoole4.spec
index 0555a2c..2ceed15 100644
--- a/php-pecl-swoole4.spec
+++ b/php-pecl-swoole4.spec
@@ -12,7 +12,7 @@
%if 0%{?scl:1}
%global sub_prefix %{scl_prefix}
-%scl_package php-pecl-swoole
+%scl_package php-pecl-swoole4
%endif
%global with_zts 0%{!?_without_zts:%{?__ztsphp:1}}
@@ -27,17 +27,32 @@
%endif
%global with_nghttpd2 1
%global with_hiredis 1
+%if 0%{?fedora} >= 25 || 0%{?rhel} >= 8
+%global with_brotli 1
+%else
+%global with_brotli 0
+%endif
+
Summary: PHP's asynchronous concurrent distributed networking framework
Name: %{?sub_prefix}php-pecl-%{pecl_name}4
-Version: 4.0.4
-Release: 2%{?dist}%{!?scl:%{!?nophptag:%(%{__php} -r 'echo ".".PHP_MAJOR_VERSION.".".PHP_MINOR_VERSION;')}}
+Version: 4.1.0
+Release: 1%{?dist}%{!?scl:%{!?nophptag:%(%{__php} -r 'echo ".".PHP_MAJOR_VERSION.".".PHP_MINOR_VERSION;')}}
License: BSD
URL: http://pecl.php.net/package/%{pecl_name}
Source0: http://pecl.php.net/get/%{pecl_name}-%{version}.tgz
+Source1: https://raw.githubusercontent.com/swoole/swoole-src/master/include/socket.h
+Source2: https://raw.githubusercontent.com/swoole/swoole-src/master/include/channel.h
+Source3: https://raw.githubusercontent.com/swoole/swoole-src/master/src/coroutine/socket.cc
+Source4: https://raw.githubusercontent.com/swoole/swoole-src/master/src/coroutine/channel.cc
+
+%if 0%{?rhel} == 6
+BuildRequires: devtoolset-6-toolchain
+%else
BuildRequires: %{?dtsprefix}gcc
BuildRequires: %{?dtsprefix}gcc-c++
+%endif
BuildRequires: %{?scl_prefix}php-devel > 7
BuildRequires: %{?scl_prefix}php-pear
BuildRequires: %{?scl_prefix}php-sockets
@@ -53,6 +68,9 @@ BuildRequires: postgresql-devel > 9.5
%if %{with_hiredis}
BuildRequires: hiredis-devel
%endif
+%if %{with_brotli}
+BuildRequires: brotli-devel
+%endif
Requires: %{?scl_prefix}php(zend-abi) = %{php_zend_api}
Requires: %{?scl_prefix}php(api) = %{php_core_api}
@@ -152,6 +170,9 @@ sed -e '/examples/s/role="src"/role="doc"/' \
cd NTS
+cp %{SOURCE1} %{SOURCE2} include/
+cp %{SOURCE3} %{SOURCE4} src/coroutine/
+
# Sanity check, really often broken
extver=$(sed -n '/#define PHP_SWOOLE_VERSION/{s/.* "//;s/".*$//;p}' php_swoole.h)
if test "x${extver}" != "x%{version}%{?prever:-%{prever}}"; then
@@ -171,6 +192,7 @@ cat << 'EOF' | tee %{ini_name}
extension=%{pecl_name}.so
; Configuration
+;swoole.enable_coroutine = On
;swoole.aio_thread_num = 2
;swoole.display_errors = On
;swoole.use_namespace = On
@@ -181,7 +203,12 @@ EOF
%build
+%if 0%{?rhel} == 6
+source /opt/rh/devtoolset-6/enable
+g++ --version
+%else
%{?dtsenable}
+%endif
peclbuild() {
%configure \
@@ -218,7 +245,12 @@ peclbuild %{_bindir}/zts-php-config
%install
+%if 0%{?rhel} == 6
+source /opt/rh/devtoolset-6/enable
+g++ --version
+%else
%{?dtsenable}
+%endif
make -C NTS \
install INSTALL_ROOT=%{buildroot}
@@ -312,6 +344,11 @@ cd ../ZTS
%changelog
+* Fri Aug 31 2018 Remi Collet <remi@remirepo.net> - 4.1.0-1
+- update to 4.1.0
+- add dependency on brotli library (Fedora)
+- open https://github.com/swoole/swoole-src/issues/1931 missing files
+
* Thu Aug 16 2018 Remi Collet <remi@remirepo.net> - 4.0.4-2
- rebuild for 7.3.0beta2 new ABI
diff --git a/socket.cc b/socket.cc
new file mode 100644
index 0000000..7edb58f
--- /dev/null
+++ b/socket.cc
@@ -0,0 +1,1441 @@
+#include "socket.h"
+#include "context.h"
+#include "async.h"
+#include "buffer.h"
+
+#include <string>
+#include <iostream>
+#include <sys/stat.h>
+
+using namespace swoole;
+using namespace std;
+
+static int socket_onRead(swReactor *reactor, swEvent *event);
+static int socket_onWrite(swReactor *reactor, swEvent *event);
+static void socket_onTimeout(swTimer *timer, swTimer_node *tnode);
+static void socket_onResolveCompleted(swAio_event *event);
+
+bool Socket::socks5_handshake()
+{
+ swSocks5 *ctx = socks5_proxy;
+ char *buf = ctx->buf;
+ int n;
+
+ /**
+ * handshake
+ */
+ swSocks5_pack(buf, socks5_proxy->username == NULL ? 0x00 : 0x02);
+ socks5_proxy->state = SW_SOCKS5_STATE_HANDSHAKE;
+ if (send(buf, 3) <= 0)
+ {
+ return false;
+ }
+ n = recv(buf, sizeof(ctx->buf));
+ if (n <= 0)
+ {
+ return false;
+ }
+ uchar version = buf[0];
+ uchar method = buf[1];
+ if (version != SW_SOCKS5_VERSION_CODE)
+ {
+ swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported.");
+ return SW_ERR;
+ }
+ if (method != ctx->method)
+ {
+ swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_METHOD, "SOCKS authentication method not supported.");
+ return SW_ERR;
+ }
+ //authenticate request
+ if (method == SW_SOCKS5_METHOD_AUTH)
+ {
+ buf[0] = 0x01;
+ buf[1] = ctx->l_username;
+
+ buf += 2;
+ memcpy(buf, ctx->username, ctx->l_username);
+ buf += ctx->l_username;
+ buf[0] = ctx->l_password;
+ memcpy(buf + 1, ctx->password, ctx->l_password);
+
+ ctx->state = SW_SOCKS5_STATE_AUTH;
+
+ if (send(ctx->buf, ctx->l_username + ctx->l_password + 3) < 0)
+ {
+ return false;
+ }
+
+ n = recv(buf, sizeof(ctx->buf));
+ if (n <= 0)
+ {
+ return false;
+ }
+
+ uchar version = buf[0];
+ uchar status = buf[1];
+ if (version != 0x01)
+ {
+ swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported.");
+ return false;
+ }
+ if (status != 0)
+ {
+ swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_AUTH_FAILED,
+ "SOCKS username/password authentication failed.");
+ return false;
+ }
+ goto send_connect_request;
+ }
+ //send connect request
+ else
+ {
+ send_connect_request: buf[0] = SW_SOCKS5_VERSION_CODE;
+ buf[1] = 0x01;
+ buf[2] = 0x00;
+
+ ctx->state = SW_SOCKS5_STATE_CONNECT;
+
+ if (ctx->dns_tunnel)
+ {
+ buf[3] = 0x03;
+ buf[4] = ctx->l_target_host;
+ buf += 5;
+ memcpy(buf, ctx->target_host, ctx->l_target_host);
+ buf += ctx->l_target_host;
+ *(uint16_t *) buf = htons(ctx->target_port);
+
+ if (send(ctx->buf, ctx->l_target_host + 7) < 0)
+ {
+ return false;
+ }
+ }
+ else
+ {
+ buf[3] = 0x01;
+ buf += 4;
+ *(uint32_t *) buf = htons(ctx->l_target_host);
+ buf += 4;
+ *(uint16_t *) buf = htons(ctx->target_port);
+
+ if (send(ctx->buf, ctx->l_target_host + 7) < 0)
+ {
+ return false;
+ }
+ }
+
+ /**
+ * response
+ */
+ n = recv(buf, sizeof(ctx->buf));
+ if (n <= 0)
+ {
+ return false;
+ }
+
+ uchar version = buf[0];
+ if (version != SW_SOCKS5_VERSION_CODE)
+ {
+ swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported.");
+ return false;
+ }
+ uchar result = buf[1];
+#if 0
+ uchar reg = buf[2];
+ uchar type = buf[3];
+ uint32_t ip = *(uint32_t *) (buf + 4);
+ uint16_t port = *(uint16_t *) (buf + 8);
+#endif
+ if (result == 0)
+ {
+ ctx->state = SW_SOCKS5_STATE_READY;
+ }
+ else
+ {
+ swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_SERVER_ERROR, "Socks5 server error, reason :%s.",
+ swSocks5_strerror(result));
+ }
+ return result;
+ }
+}
+
+bool Socket::http_proxy_handshake()
+{
+#ifdef SW_USE_OPENSSL
+ if (socket->ssl)
+ {
+ if (ssl_handshake() == false)
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return true;
+ }
+#else
+ return true;
+#endif
+
+ //CONNECT
+ int n = snprintf(http_proxy->buf, sizeof(http_proxy->buf), "CONNECT %*s:%d HTTP/1.1\r\n\r\n",
+ http_proxy->l_target_host, http_proxy->target_host, http_proxy->target_port);
+ if (send(http_proxy->buf, n) <= 0)
+ {
+ return false;
+ }
+
+ n = recv(http_proxy->buf, sizeof(http_proxy->buf));
+ if (n <= 0)
+ {
+ return false;
+ }
+ char *buf = http_proxy->buf;
+ int len = n;
+ int state = 0;
+ char *p = buf;
+ for (p = buf; p < buf + len; p++)
+ {
+ if (state == 0)
+ {
+ if (strncasecmp(p, "HTTP/1.1", 8) == 0 || strncasecmp(p, "HTTP/1.0", 8) == 0)
+ {
+ state = 1;
+ p += 8;
+ }
+ else
+ {
+ break;
+ }
+ }
+ else if (state == 1)
+ {
+ if (isspace(*p))
+ {
+ continue;
+ }
+ else
+ {
+ if (strncasecmp(p, "200", 3) == 0)
+ {
+ state = 2;
+ p += 3;
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ else if (state == 2)
+ {
+ if (isspace(*p))
+ {
+ continue;
+ }
+ else
+ {
+ if (strncasecmp(p, "Connection established", sizeof("Connection established") - 1) == 0)
+ {
+ return true;
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+static inline int socket_connect(int fd, struct sockaddr *addr, socklen_t len)
+{
+ int retval;
+ while (1)
+ {
+ retval = ::connect(fd, addr, len);
+ if (retval < 0)
+ {
+ if (errno == EINTR)
+ {
+ continue;
+ }
+ }
+ break;
+ }
+ return retval;
+}
+
+Socket::Socket(enum swSocket_type _type)
+{
+ type = _type;
+ switch (type)
+ {
+ case SW_SOCK_TCP6:
+ _sock_domain = AF_INET6;
+ _sock_type = SOCK_STREAM;
+ break;
+ case SW_SOCK_UNIX_STREAM:
+ _sock_domain = AF_UNIX;
+ _sock_type = SOCK_STREAM;
+ break;
+ case SW_SOCK_UDP:
+ _sock_domain = AF_INET;
+ _sock_type = SOCK_DGRAM;
+ break;
+ case SW_SOCK_UDP6:
+ _sock_domain = AF_INET6;
+ _sock_type = SOCK_DGRAM;
+ break;
+ case SW_SOCK_UNIX_DGRAM:
+ _sock_domain = AF_UNIX;
+ _sock_type = SOCK_DGRAM;
+ break;
+ case SW_SOCK_TCP:
+ default:
+ _sock_domain = AF_INET;
+ _sock_type = SOCK_STREAM;
+ break;
+ }
+
+#ifdef SOCK_CLOEXEC
+ int sockfd = ::socket(_sock_domain, _sock_type | SOCK_CLOEXEC, 0);
+#else
+ int sockfd = ::socket(_sock_domain, _sock_type, 0);
+#endif
+ if (sockfd < 0)
+ {
+ swWarn("socket() failed. Error: %s[%d]", strerror(errno), errno);
+ return;
+ }
+
+ if (swIsMaster() && SwooleTG.type == SW_THREAD_REACTOR)
+ {
+ reactor = SwooleTG.reactor;
+ }
+ else
+ {
+ reactor = SwooleG.main_reactor;
+ }
+ socket = swReactor_get(reactor, sockfd);
+
+ bzero(socket, sizeof(swConnection));
+ socket->fd = sockfd;
+ socket->object = this;
+ socket->socket_type = type;
+
+ swSetNonBlock(socket->fd);
+ if (!swReactor_handle_isset(reactor, SW_FD_CORO_SOCKET))
+ {
+ reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_READ, socket_onRead);
+ reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_WRITE, socket_onWrite);
+ reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_ERROR, socket_onRead);
+ }
+ init();
+}
+
+Socket::Socket(int _fd, Socket *sock)
+{
+ reactor = sock->reactor;
+
+ socket = swReactor_get(reactor, _fd);
+ bzero(socket, sizeof(swConnection));
+ socket->fd = _fd;
+ socket->object = this;
+ socket->socket_type = sock->type;
+
+ _sock_domain = sock->_sock_domain;
+ _sock_type = sock->_sock_type;
+ init();
+}
+
+bool Socket::connect(string host, int port, int flags)
+{
+ //enable socks5 proxy
+ if (socks5_proxy)
+ {
+ socks5_proxy->target_host = (char *) host.c_str();
+ socks5_proxy->l_target_host = host.size();
+ socks5_proxy->target_port = port;
+
+ host = socks5_proxy->host;
+ port = socks5_proxy->port;
+ }
+
+ //enable http proxy
+ if (http_proxy)
+ {
+ http_proxy->target_host = (char *) host.c_str();
+ http_proxy->l_target_host = host.size();
+ http_proxy->target_port = port;
+
+ host = http_proxy->proxy_host;
+ port = http_proxy->proxy_port;
+ }
+
+ if (_sock_domain == AF_INET6 || _sock_domain == AF_INET)
+ {
+ if (port == -1)
+ {
+ swWarn("Socket of type AF_INET/AF_INET6 requires port argument");
+ return false;
+ }
+ else if (port == 0 || port >= 65536)
+ {
+ swWarn("Invalid port argument[%d]", port);
+ return false;
+ }
+ }
+
+ if (unlikely(_cid && _cid != coroutine_get_current_cid()))
+ {
+ swWarn( "socket has already been bound to another coroutine.");
+ return false;
+ }
+
+ int retval = 0;
+ _host = host;
+ _port = port;
+
+ for (int i = 0; i < 2; i++)
+ {
+ if (_sock_domain == AF_INET)
+ {
+ socket->info.addr.inet_v4.sin_family = AF_INET;
+ socket->info.addr.inet_v4.sin_port = htons(port);
+
+ if (!inet_pton(AF_INET, _host.c_str(), & socket->info.addr.inet_v4.sin_addr))
+ {
+ _host = resolve(_host);
+ if (_host.size() == 0)
+ {
+ return false;
+ }
+ continue;
+ }
+ else
+ {
+ socket->info.len = sizeof( socket->info.addr.inet_v4);
+ retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.inet_v4, socket->info.len);
+ break;
+ }
+ }
+ else if (_sock_domain == AF_INET6)
+ {
+ socket->info.addr.inet_v6.sin6_family = AF_INET6;
+ socket->info.addr.inet_v6.sin6_port = htons(port);
+
+ if (!inet_pton(AF_INET6, _host.c_str(), &socket->info.addr.inet_v6.sin6_addr))
+ {
+ _host = resolve(_host);
+ if (_host.size() == 0)
+ {
+ return false;
+ }
+ continue;
+ }
+ else
+ {
+ socket->info.len = sizeof(socket->info.addr.inet_v6);
+ retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.inet_v6, socket->info.len);
+ break;
+ }
+ }
+ else if (_sock_domain == AF_UNIX)
+ {
+ if (_host.size() >= sizeof(socket->info.addr.un.sun_path))
+ {
+ return false;
+ }
+ socket->info.addr.un.sun_family = AF_UNIX;
+ memcpy(&socket->info.addr.un.sun_path, _host.c_str(), _host.size());
+ retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.un,
+ (socklen_t) (offsetof(struct sockaddr_un, sun_path) + _host.size()));
+ break;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ if (retval == -1)
+ {
+ if (errno != EINPROGRESS)
+ {
+ _error: errCode = errno;
+ return false;
+ }
+ if (!wait_events(SW_EVENT_WRITE))
+ {
+ goto _error;
+ }
+ yield();
+ //Connection has timed out
+ if (errCode == ETIMEDOUT)
+ {
+ errMsg = strerror(errCode);
+ return false;
+ }
+ socklen_t len = sizeof(errCode);
+ if (getsockopt(socket->fd, SOL_SOCKET, SO_ERROR, &errCode, &len) < 0 || errCode != 0)
+ {
+ errMsg = strerror(errCode);
+ return false;
+ }
+ }
+ socket->active = 1;
+ //socks5 proxy
+ if (socks5_proxy && socks5_handshake() == false)
+ {
+ return false;
+ }
+ //http proxy
+ if (http_proxy && http_proxy_handshake() == false)
+ {
+ return false;
+ }
+ return true;
+}
+
+static void socket_onResolveCompleted(swAio_event *event)
+{
+ Socket *sock = (Socket *) event->object;
+ if (event->error != 0)
+ {
+ sock->errCode = SW_ERROR_DNSLOOKUP_RESOLVE_FAILED;
+ }
+ sock->resume();
+}
+
+static void socket_onTimeout(swTimer *timer, swTimer_node *tnode)
+{
+ Socket *sock = (Socket *) tnode->data;
+ sock->timer = NULL;
+ sock->errCode = ETIMEDOUT;
+ swDebug("socket[%d] timeout", sock->socket->fd);
+ sock->reactor->del(sock->reactor, sock->socket->fd);
+ sock->resume();
+}
+
+static int socket_onRead(swReactor *reactor, swEvent *event)
+{
+ Socket *sock = (Socket *) event->socket->object;
+ reactor->del(reactor, event->fd);
+ sock->resume();
+ return SW_OK;
+}
+
+static int socket_onWrite(swReactor *reactor, swEvent *event)
+{
+ Socket *sock = (Socket *) event->socket->object;
+ reactor->del(reactor, event->fd);
+ sock->resume();
+ return SW_OK;
+}
+
+ssize_t Socket::peek(void *__buf, size_t __n)
+{
+ return swConnection_peek(socket, __buf, __n, 0);
+}
+
+ssize_t Socket::recv(void *__buf, size_t __n)
+{
+ ssize_t retval = swConnection_recv(socket, __buf, __n, 0);
+ if (retval >= 0)
+ {
+ return retval;
+ }
+
+ if (swConnection_error(errno) != SW_WAIT)
+ {
+ errCode = errno;
+ return -1;
+ }
+
+ while (true)
+ {
+ int events = SW_EVENT_READ;
+#ifdef SW_USE_OPENSSL
+ if (socket->ssl && socket->ssl_want_write)
+ {
+ events = SW_EVENT_WRITE;
+ }
+#endif
+ if (!wait_events(events))
+ {
+ return -1;
+ }
+ yield();
+ if (errCode == ETIMEDOUT)
+ {
+ return -1;
+ }
+ retval = swConnection_recv(socket, __buf, __n, 0);
+ if (retval < 0)
+ {
+ if (swConnection_error(errno) == SW_WAIT)
+ {
+ continue;
+ }
+ errCode = errno;
+ }
+ break;
+ }
+ return retval;
+}
+
+ssize_t Socket::recv_all(void *__buf, size_t __n)
+{
+ ssize_t retval, total_bytes = 0;
+ while (true)
+ {
+ retval = recv((char*) __buf + total_bytes, __n - total_bytes);
+ if (retval <= 0)
+ {
+ if (total_bytes == 0)
+ {
+ total_bytes = retval;
+ }
+ break;
+ }
+ total_bytes += retval;
+ if ((size_t) total_bytes == __n)
+ {
+ break;
+ }
+ }
+ return total_bytes;
+}
+
+ssize_t Socket::send_all(const void *__buf, size_t __n)
+{
+ ssize_t retval, total_bytes = 0;
+ while (true)
+ {
+ retval = send((char*) __buf + total_bytes, __n - total_bytes);
+ if (retval <= 0)
+ {
+ if (total_bytes == 0)
+ {
+ total_bytes = retval;
+ }
+ break;
+ }
+ total_bytes += retval;
+ if ((size_t) total_bytes == __n)
+ {
+ break;
+ }
+ }
+ return total_bytes;
+}
+
+ssize_t Socket::send(const void *__buf, size_t __n)
+{
+ ssize_t retval = swConnection_send(socket, (void *) __buf, __n, 0);
+ if (retval >= 0)
+ {
+ return retval;
+ }
+
+ if (swConnection_error(errno) != SW_WAIT)
+ {
+ errCode = errno;
+ return -1;
+ }
+
+ while (true)
+ {
+ int events = SW_EVENT_WRITE;
+#ifdef SW_USE_OPENSSL
+ if (socket->ssl && socket->ssl_want_read)
+ {
+ events = SW_EVENT_READ;
+ }
+#endif
+ if (!wait_events(events))
+ {
+ return -1;
+ }
+ yield();
+ if (errCode == ETIMEDOUT)
+ {
+ return -1;
+ }
+ ssize_t retval = swConnection_send(socket, (void *) __buf, __n, 0);
+ if (retval < 0)
+ {
+ if (swConnection_error(errno) == SW_WAIT)
+ {
+ continue;
+ }
+ errCode = errno;
+ }
+ break;
+ }
+
+ return retval;
+}
+
+void Socket::yield()
+{
+ if (suspending)
+ {
+ swError("socket has already been bound to another coroutine.");
+ }
+ errCode = 0;
+ if (_timeout > 0)
+ {
+ int ms = (int) (_timeout * 1000);
+ if (SwooleG.timer.fd == 0)
+ {
+ swTimer_init(ms);
+ }
+ timer = SwooleG.timer.add(&SwooleG.timer, ms, 0, this, socket_onTimeout);
+ }
+ _cid = coroutine_get_current_cid();
+ if (_cid == -1)
+ {
+ swError("Socket::yield() must be called in the coroutine.");
+ }
+ //suspend
+ suspending = true;
+ coroutine_yield(coroutine_get_by_id(_cid));
+ suspending = false;
+ //clear timer
+ if (timer)
+ {
+ swTimer_del(&SwooleG.timer, timer);
+ timer = nullptr;
+ }
+}
+
+void Socket::resume()
+{
+ coroutine_resume(coroutine_get_by_id(_cid));
+}
+
+bool Socket::bind(std::string address, int port)
+{
+ bind_address = address;
+ bind_port = port;
+
+ struct sockaddr_storage sa_storage = { 0 };
+ struct sockaddr *sock_type = (struct sockaddr*) &sa_storage;
+
+ int retval;
+ switch (_sock_domain)
+ {
+ case AF_UNIX:
+ {
+ struct sockaddr_un *sa = (struct sockaddr_un *) sock_type;
+ sa->sun_family = AF_UNIX;
+
+ if (bind_address.size() >= sizeof(sa->sun_path))
+ {
+ return false;
+ }
+ memcpy(&sa->sun_path, bind_address.c_str(), bind_address.size());
+
+ retval = ::bind(socket->fd, (struct sockaddr *) sa,
+ offsetof(struct sockaddr_un, sun_path) + bind_address.size());
+ break;
+ }
+
+ case AF_INET:
+ {
+ struct sockaddr_in *sa = (struct sockaddr_in *) sock_type;
+ sa->sin_family = AF_INET;
+ sa->sin_port = htons((unsigned short) bind_port);
+ if (!inet_aton(bind_address.c_str(), &sa->sin_addr))
+ {
+ return false;
+ }
+ retval = ::bind(socket->fd, (struct sockaddr *) sa, sizeof(struct sockaddr_in));
+ break;
+ }
+
+ case AF_INET6:
+ {
+ struct sockaddr_in6 *sa = (struct sockaddr_in6 *) sock_type;
+ sa->sin6_family = AF_INET6;
+ sa->sin6_port = htons((unsigned short) bind_port);
+
+ if (!inet_pton(AF_INET6, bind_address.c_str(), &sa->sin6_addr))
+ {
+ return false;
+ }
+ retval = ::bind(socket->fd, (struct sockaddr *) sa, sizeof(struct sockaddr_in6));
+ break;
+ }
+ default:
+ return false;
+ }
+
+ if (retval != 0)
+ {
+ errCode = errno;
+ return false;
+ }
+
+ return true;
+}
+
+bool Socket::listen(int backlog)
+{
+ _backlog = backlog;
+ if (::listen(socket->fd, backlog) != 0)
+ {
+ errCode = errno;
+ return false;
+ }
+ return true;
+}
+
+Socket* Socket::accept()
+{
+ if (!wait_events(SW_EVENT_READ))
+ {
+ return nullptr;
+ }
+ yield();
+ if (errCode == ETIMEDOUT)
+ {
+ return nullptr;
+ }
+ int conn;
+ swSocketAddress client_addr;
+ socklen_t client_addrlen = sizeof(client_addr);
+
+#ifdef HAVE_ACCEPT4
+ conn = ::accept4(socket->fd, (struct sockaddr *) &client_addr, &client_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
+#else
+ conn = ::accept(socket->fd, (struct sockaddr *) &client_addr, &client_addrlen);
+ if (conn >= 0)
+ {
+ swoole_fcntl_set_option(conn, 1, 1);
+ }
+#endif
+ if (conn >= 0)
+ {
+ return new Socket(conn, this);
+ }
+ else
+ {
+ errCode = errno;
+ return nullptr;
+ }
+}
+
+string Socket::resolve(string domain_name)
+{
+ swAio_event ev;
+ bzero(&ev, sizeof(swAio_event));
+ ev.nbytes = SW_IP_MAX_LENGTH;
+ ev.buf = sw_malloc(ev.nbytes);
+ if (!ev.buf)
+ {
+ errCode = errno;
+ return "";
+ }
+
+ memcpy(ev.buf, domain_name.c_str(), domain_name.size());
+ ((char *) ev.buf)[domain_name.size()] = 0;
+ ev.flags = _sock_domain;
+ ev.type = SW_AIO_GETHOSTBYNAME;
+ ev.object = this;
+ ev.callback = socket_onResolveCompleted;
+
+ if (SwooleAIO.init == 0)
+ {
+ swAio_init();
+ }
+
+ if (swAio_dispatch(&ev) < 0)
+ {
+ errCode = SwooleG.error;
+ sw_free(ev.buf);
+ return "";
+ }
+ /**
+ * cannot timeout
+ */
+ double tmp_timeout = _timeout;
+ _timeout = -1;
+ yield();
+ _timeout = tmp_timeout;
+
+ if (errCode == SW_ERROR_DNSLOOKUP_RESOLVE_FAILED)
+ {
+ errMsg = hstrerror(ev.error);
+ return "";
+ }
+ else
+ {
+ string addr((char *) ev.buf);
+ sw_free(ev.buf);
+ return addr;
+ }
+}
+
+bool Socket::shutdown(int __how)
+{
+ if (!socket || socket->closed)
+ {
+ return false;
+ }
+ if (__how == SHUT_RD)
+ {
+ if (shutdown_read || shutdow_rw || ::shutdown(socket->fd, SHUT_RD))
+ {
+ return false;
+ }
+ else
+ {
+ shutdown_read = 1;
+ return true;
+ }
+ }
+ else if (__how == SHUT_WR)
+ {
+ if (shutdown_write || shutdow_rw || ::shutdown(socket->fd, SHUT_RD) < 0)
+ {
+ return false;
+ }
+ else
+ {
+ shutdown_write = 1;
+ return true;
+ }
+ }
+ else if (__how == SHUT_RDWR)
+ {
+ if (shutdow_rw || ::shutdown(socket->fd, SHUT_RDWR) < 0)
+ {
+ return false;
+ }
+ else
+ {
+ shutdown_read = 1;
+ return true;
+ }
+ }
+ else
+ {
+ return false;
+ }
+}
+
+bool Socket::close()
+{
+ if (socket == NULL || socket->closed)
+ {
+ return false;
+ }
+ socket->closed = 1;
+
+ int fd = socket->fd;
+
+ if (_sock_type == SW_SOCK_UNIX_DGRAM)
+ {
+ unlink(socket->info.addr.un.sun_path);
+ }
+
+#ifdef SW_USE_OPENSSL
+ if (open_ssl && ssl_context)
+ {
+ if (socket->ssl)
+ {
+ swSSL_close(socket);
+ }
+ swSSL_free_context(ssl_context);
+ if (ssl_option.cert_file)
+ {
+ sw_free(ssl_option.cert_file);
+ }
+ if (ssl_option.key_file)
+ {
+ sw_free(ssl_option.key_file);
+ }
+ if (ssl_option.passphrase)
+ {
+ sw_free(ssl_option.passphrase);
+ }
+#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
+ if (ssl_option.tls_host_name)
+ {
+ sw_free(ssl_option.tls_host_name);
+ }
+#endif
+ if (ssl_option.cafile)
+ {
+ sw_free(ssl_option.cafile);
+ }
+ if (ssl_option.capath)
+ {
+ sw_free(ssl_option.capath);
+ }
+ }
+#endif
+ if (_sock_type == SW_SOCK_UNIX_DGRAM)
+ {
+ unlink(socket->info.addr.un.sun_path);
+ }
+ if (timer)
+ {
+ swTimer_del(&SwooleG.timer, timer);
+ timer = NULL;
+ }
+ socket->active = 0;
+ ::close(fd);
+ return true;
+}
+
+#ifdef SW_USE_OPENSSL
+bool Socket::ssl_handshake()
+{
+ if (socket->ssl)
+ {
+ return false;
+ }
+
+ ssl_context = swSSL_get_context(&ssl_option);
+ if (ssl_context == NULL)
+ {
+ return false;
+ }
+
+ if (ssl_option.verify_peer)
+ {
+ if (swSSL_set_capath(&ssl_option, ssl_context) < 0)
+ {
+ return false;
+ }
+ }
+
+ socket->ssl_send = 1;
+#if defined(SW_USE_HTTP2) && defined(SW_USE_OPENSSL) && OPENSSL_VERSION_NUMBER >= 0x10002000L
+ if (http2)
+ {
+ if (SSL_CTX_set_alpn_protos(ssl_context, (const unsigned char *) "\x02h2", 3) < 0)
+ {
+ return false;
+ }
+ }
+#endif
+
+ if (swSSL_create(socket, ssl_context, SW_SSL_CLIENT) < 0)
+ {
+ return false;
+ }
+#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
+ if (ssl_option.tls_host_name)
+ {
+ SSL_set_tlsext_host_name(socket->ssl, ssl_option.tls_host_name);
+ }
+#endif
+
+ while (true)
+ {
+ int retval = swSSL_connect(socket);
+ if (retval < 0)
+ {
+ errCode = SwooleG.error;
+ return false;
+ }
+ if (socket->ssl_state == SW_SSL_STATE_WAIT_STREAM)
+ {
+ int events = socket->ssl_want_write ? SW_EVENT_WRITE : SW_EVENT_READ;
+ if (!wait_events(events))
+ {
+ return false;
+ }
+ yield();
+ if (errCode == ETIMEDOUT)
+ {
+ return false;
+ }
+ }
+ else if (socket->ssl_state == SW_SSL_STATE_READY)
+ {
+ return true;
+ }
+ }
+
+ if (socket->ssl_state == SW_SSL_STATE_READY && ssl_option.verify_peer)
+ {
+ if (ssl_verify(ssl_option.allow_self_signed) < 0)
+ {
+ return false;
+ }
+ }
+ return true;
+}
+
+int Socket::ssl_verify(bool allow_self_signed)
+{
+ if (swSSL_verify(socket, allow_self_signed) < 0)
+ {
+ return SW_ERR;
+ }
+ if (ssl_option.tls_host_name && swSSL_check_host(socket, ssl_option.tls_host_name) < 0)
+ {
+ return SW_ERR;
+ }
+ return SW_OK;
+}
+#endif
+
+bool Socket::sendfile(char *filename, off_t offset, size_t length)
+{
+ int file_fd = open(filename, O_RDONLY);
+ if (file_fd < 0)
+ {
+ swSysError("open(%s) failed.", filename);
+ return false;
+ }
+
+ if (length == 0)
+ {
+ struct stat file_stat;
+ if (::fstat(file_fd, &file_stat) < 0)
+ {
+ swSysError("fstat(%s) failed.", filename);
+ ::close(file_fd);
+ return false;
+ }
+ length = file_stat.st_size;
+ }
+ else
+ {
+ // total length of the file
+ length = offset + length;
+ }
+
+ int n, sendn;
+ while ((size_t) offset < length)
+ {
+ sendn = (length - offset > SW_SENDFILE_CHUNK_SIZE) ? SW_SENDFILE_CHUNK_SIZE : length - offset;
+#ifdef SW_USE_OPENSSL
+ if (socket->ssl)
+ {
+ n = swSSL_sendfile(socket, file_fd, &offset, sendn);
+ }
+ else
+#endif
+ {
+ n = ::swoole_sendfile(socket->fd, file_fd, &offset, sendn);
+ }
+ if (n > 0)
+ {
+ continue;
+ }
+ else if (n == 0)
+ {
+ swWarn("sendfile return zero.");
+ return false;
+ }
+ else if (errno != EAGAIN)
+ {
+ swSysError("sendfile(%d, %s) failed.", socket->fd, filename);
+ _error: errCode = errno;
+ ::close(file_fd);
+ return false;
+ }
+ if (!wait_events(SW_EVENT_WRITE))
+ {
+ goto _error;
+ }
+ yield();
+ if (errCode == ETIMEDOUT)
+ {
+ goto _error;
+ }
+ }
+ ::close(file_fd);
+ return true;
+}
+
+int Socket::sendto(char *address, int port, char *data, int len)
+{
+ if (type == SW_SOCK_UDP)
+ {
+ return swSocket_udp_sendto(socket->fd, address, port, data, len);
+ }
+ else if (type == SW_SOCK_UDP6)
+ {
+ return swSocket_udp_sendto6(socket->fd, address, port, data, len);
+ }
+ else
+ {
+ swWarn("only supports SWOOLE_SOCK_UDP or SWOOLE_SOCK_UDP6.");
+ return -1;
+ }
+}
+
+int Socket::recvfrom(void *__buf, size_t __n, char *address, int *port)
+{
+ socket->info.len = sizeof(socket->info.addr);
+ int retval;
+
+ _recv: retval = ::recvfrom(socket->fd, __buf, __n, 0, (struct sockaddr *) &socket->info.addr, &socket->info.len);
+ if (retval < 0)
+ {
+ if (errno == EINTR)
+ {
+ goto _recv;
+ }
+ else if (swConnection_error(errno) == SW_WAIT)
+ {
+ if (!wait_events(SW_EVENT_READ))
+ {
+ return -1;
+ }
+ yield();
+ if (errCode == ETIMEDOUT)
+ {
+ return -1;
+ }
+ retval = ::recvfrom(socket->fd, __buf, __n, 0, (struct sockaddr *) &socket->info.addr, &socket->info.len);
+ if (retval < 0)
+ {
+ errCode = errno;
+ }
+ else
+ {
+ strcpy(address, swConnection_get_ip(socket));
+ *port = swConnection_get_port(socket);
+ }
+ }
+ else
+ {
+ errCode = errno;
+ }
+ }
+ return retval;
+}
+
+/**
+ * recv packet with protocol
+ */
+ssize_t Socket::recv_packet()
+{
+ get_buffer();
+ ssize_t buf_len = SW_BUFFER_SIZE_STD;
+ ssize_t retval;
+
+ if (open_length_check)
+ {
+ uint32_t header_len;
+
+ _get_header_len: header_len = protocol.package_length_offset + protocol.package_length_size;
+ if (buffer->length > 0)
+ {
+ if (buffer->length < header_len)
+ {
+ goto _recv_header;
+ }
+ else
+ {
+ goto _get_length;
+ }
+ }
+
+ _recv_header: retval = recv(buffer->str + buffer->length, header_len - buffer->length);
+ if (retval <= 0)
+ {
+ return 0;
+ }
+ else if (retval < 0 || retval != header_len)
+ {
+ return 0;
+ }
+ else
+ {
+ buffer->length += retval;
+ }
+
+ _get_length: buf_len = protocol.get_package_length(&protocol, socket, buffer->str, (uint32_t) buffer->length);
+ swDebug("packet_len=%ld, length=%ld", buf_len, buffer->length);
+ //error package
+ if (buf_len < 0)
+ {
+ return 0;
+ }
+ else if (buf_len == 0)
+ {
+ header_len = protocol.real_header_length;
+ goto _recv_header;
+ }
+ //empty package
+ else if (buf_len == header_len)
+ {
+ buffer->length = 0;
+ return header_len;
+ }
+ else if (buf_len > protocol.package_max_length)
+ {
+ swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "packet[length=%d] is too big.", (int )buf_len);
+ return 0;
+ }
+
+ if ((size_t) buf_len == buffer->length)
+ {
+ buffer->length = 0;
+ return buf_len;
+ }
+ else if ((size_t) buf_len < buffer->length)
+ {
+ buffer->length = buffer->length - buf_len;
+ memmove(buffer->str, buffer->str + buf_len, buffer->length);
+ goto _get_header_len;
+ }
+
+ if ((size_t) buf_len >= buffer->size)
+ {
+ if (swString_extend(buffer, buf_len) < 0)
+ {
+ buffer->length = 0;
+ return -1;
+ }
+ }
+
+ retval = recv_all(buffer->str + buffer->length, buf_len - buffer->length);
+ if (retval > 0)
+ {
+ buffer->length += retval;
+ if (buffer->length != (size_t) buf_len)
+ {
+ retval = 0;
+ }
+ else
+ {
+ buffer->length = 0;
+ return buf_len;
+ }
+ }
+ }
+ else if (open_eof_check)
+ {
+ int eof = -1;
+ char *buf;
+
+ if (buffer->length > 0)
+ {
+ goto find_eof;
+ }
+
+ while (1)
+ {
+ buf = buffer->str + buffer->length;
+ buf_len = buffer->size - buffer->length;
+
+ if (buf_len > SW_BUFFER_SIZE_BIG)
+ {
+ buf_len = SW_BUFFER_SIZE_BIG;
+ }
+
+ retval = recv(buf, buf_len);
+ if (retval < 0)
+ {
+ buffer->length = 0;
+ return -1;
+ }
+ else if (retval == 0)
+ {
+ buffer->length = 0;
+ return 0;
+ }
+
+ buffer->length += retval;
+
+ if (buffer->length < protocol.package_eof_len)
+ {
+ continue;
+ }
+
+ find_eof: eof = swoole_strnpos(buffer->str, buffer->length, protocol.package_eof, protocol.package_eof_len);
+ if (eof >= 0)
+ {
+ eof += protocol.package_eof_len;
+ if (buffer->length > (uint32_t) eof)
+ {
+ buffer->length -= eof;
+ memmove(buffer->str, buffer->str + eof, buffer->length);
+ }
+ else
+ {
+ buffer->length = 0;
+ }
+ return eof;
+ }
+ else
+ {
+ if (buffer->length == protocol.package_max_length)
+ {
+ swWarn("no package eof");
+ buffer->length = 0;
+ return -1;
+ }
+ else if (buffer->length == buffer->size)
+ {
+ if (buffer->size < protocol.package_max_length)
+ {
+ size_t new_size = buffer->size * 2;
+ if (new_size > protocol.package_max_length)
+ {
+ new_size = protocol.package_max_length;
+ }
+ if (swString_extend(buffer, new_size) < 0)
+ {
+ buffer->length = 0;
+ return -1;
+ }
+ }
+ }
+ }
+ }
+ buffer->length = 0;
+ }
+ else
+ {
+ return -1;
+ }
+
+ return retval;
+}
+
+swString* Socket::get_buffer()
+{
+ if (unlikely(buffer == nullptr))
+ {
+ buffer = swString_new(SW_BUFFER_SIZE_STD);
+ }
+ return buffer;
+}
+
+Socket::~Socket()
+{
+ if (!socket->closed)
+ {
+ close();
+ }
+ if (socket->out_buffer)
+ {
+ swBuffer_free(socket->out_buffer);
+ socket->out_buffer = NULL;
+ }
+ if (socket->in_buffer)
+ {
+ swBuffer_free(socket->in_buffer);
+ socket->in_buffer = NULL;
+ }
+ if (buffer)
+ {
+ swString_free(buffer);
+ }
+ bzero(socket, sizeof(swConnection));
+ socket->removed = 1;
+}
diff --git a/socket.h b/socket.h
new file mode 100644
index 0000000..7a3d039
--- /dev/null
+++ b/socket.h
@@ -0,0 +1,142 @@
+#pragma once
+
+#include "swoole.h"
+#include "connection.h"
+#include "socks5.h"
+#include <string>
+
+namespace swoole
+{
+class Socket
+{
+public:
+ Socket(enum swSocket_type type);
+ Socket(int _fd, Socket *sock);
+ ~Socket();
+ bool connect(std::string host, int port, int flags = 0);
+ bool shutdown(int how);
+ bool close();
+ ssize_t send(const void *__buf, size_t __n);
+ ssize_t peek(void *__buf, size_t __n);
+ ssize_t recv(void *__buf, size_t __n);
+ ssize_t recv_all(void *__buf, size_t __n);
+ ssize_t send_all(const void *__buf, size_t __n);
+ ssize_t recv_packet();
+ Socket* accept();
+ void resume();
+ void yield();
+ bool bind(std::string address, int port = 0);
+ std::string resolve(std::string host);
+ bool listen(int backlog = 0);
+ bool sendfile(char *filename, off_t offset, size_t length);
+ int sendto(char *address, int port, char *data, int len);
+ int recvfrom(void *__buf, size_t __n, char *address, int *port = nullptr);
+ swString* get_buffer();
+
+ void setTimeout(double timeout)
+ {
+ _timeout = timeout;
+ }
+
+#ifdef SW_USE_OPENSSL
+ bool ssl_handshake();
+ int ssl_verify(bool allow_self_signed);
+#endif
+
+protected:
+ inline void init()
+ {
+ _cid = 0;
+ suspending = false;
+ _timeout = 0;
+ _port = 0;
+ errCode = 0;
+ errMsg = nullptr;
+ timer = nullptr;
+ bind_port = 0;
+ _backlog = 0;
+
+ http2 = 0;
+ shutdow_rw = 0;
+ shutdown_read = 0;
+ shutdown_write = 0;
+ open_length_check = 0;
+ open_eof_check = 0;
+
+ socks5_proxy = nullptr;
+ http_proxy = nullptr;
+
+ buffer = nullptr;
+ protocol = {0};
+
+ protocol.package_length_type = 'N';
+ protocol.package_length_size = 4;
+ protocol.package_body_offset = 0;
+ protocol.package_max_length = SW_BUFFER_INPUT_SIZE;
+
+#ifdef SW_USE_OPENSSL
+ open_ssl = 0;
+ ssl_wait_handshake = 0;
+ ssl_context = NULL;
+ ssl_option = {0};
+#endif
+ }
+
+ inline bool wait_events(int events)
+ {
+ if (reactor->add(reactor, socket->fd, SW_FD_CORO_SOCKET | events) < 0)
+ {
+ errCode = errno;
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ bool socks5_handshake();
+ bool http_proxy_handshake();
+
+public:
+ swTimer_node *timer;
+ swReactor *reactor;
+ std::string _host;
+ std::string bind_address;
+ int bind_port;
+ int _port;
+ int _cid;
+ bool suspending;
+ swConnection *socket;
+ enum swSocket_type type;
+ int _sock_type;
+ int _sock_domain;
+ double _timeout;
+ int _backlog;
+ int errCode;
+ const char *errMsg;
+ uint32_t http2 :1;
+ uint32_t shutdow_rw :1;
+ uint32_t shutdown_read :1;
+ uint32_t shutdown_write :1;
+ /**
+ * one package: length check
+ */
+ uint32_t open_length_check :1;
+ uint32_t open_eof_check :1;
+
+ swProtocol protocol;
+ swString *buffer;
+
+ struct _swSocks5 *socks5_proxy;
+ struct _http_proxy* http_proxy;
+
+#ifdef SW_USE_OPENSSL
+ uint8_t open_ssl :1;
+ uint8_t ssl_wait_handshake :1;
+ SSL_CTX *ssl_context;
+ swSSL_option ssl_option;
+#endif
+};
+
+};