summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--PHPINFO2
-rw-r--r--REFLECTION4
-rw-r--r--channel.cc151
-rw-r--r--channel.h107
-rw-r--r--php-pecl-swoole4.spec13
-rw-r--r--socket.cc1441
-rw-r--r--socket.h142
7 files changed, 7 insertions, 1853 deletions
diff --git a/PHPINFO b/PHPINFO
index 177993c..7a65e9d 100644
--- a/PHPINFO
+++ b/PHPINFO
@@ -2,7 +2,7 @@
swoole
swoole support => enabled
-Version => 4.1.0
+Version => 4.1.1
Author => Swoole Group[email: team@swoole.com]
coroutine => enabled
trace-log => enabled
diff --git a/REFLECTION b/REFLECTION
index 8075de9..903ecd2 100644
--- a/REFLECTION
+++ b/REFLECTION
@@ -1,4 +1,4 @@
-Extension [ <persistent> extension #148 swoole version 4.1.0 ] {
+Extension [ <persistent> extension #148 swoole version 4.1.1 ] {
- INI {
Entry [ swoole.enable_coroutine <ALL> ]
@@ -69,7 +69,7 @@ Extension [ <persistent> extension #148 swoole version 4.1.0 ] {
Constant [ integer SWOOLE_DTLSv1_CLIENT_METHOD ] { 17 }
Constant [ integer SWOOLE_EVENT_READ ] { 512 }
Constant [ integer SWOOLE_EVENT_WRITE ] { 1024 }
- Constant [ string SWOOLE_VERSION ] { 4.1.0 }
+ Constant [ string SWOOLE_VERSION ] { 4.1.1 }
Constant [ integer SWOOLE_ERROR_MALLOC_FAIL ] { 501 }
Constant [ integer SWOOLE_ERROR_SYSTEM_CALL_FAIL ] { 502 }
Constant [ integer SWOOLE_ERROR_PHP_FATAL_ERROR ] { 503 }
diff --git a/channel.cc b/channel.cc
deleted file mode 100644
index 88423fc..0000000
--- a/channel.cc
+++ /dev/null
@@ -1,151 +0,0 @@
-#include "channel.h"
-
-using namespace swoole;
-
-static void channel_defer_callback(void *data)
-{
- notify_msg_t *msg = (notify_msg_t*) data;
- coroutine_t *co = msg->chan->pop_coroutine(msg->type);
- coroutine_resume(co);
- delete msg;
-}
-
-static void channel_pop_timeout(swTimer *timer, swTimer_node *tnode)
-{
- timeout_msg_t *msg = (timeout_msg_t *) tnode->data;
- msg->error = true;
- msg->timer = nullptr;
- msg->chan->remove(msg->co);
- coroutine_resume(msg->co);
-}
-
-Channel::Channel(size_t _capacity)
-{
- capacity = _capacity;
- closed = false;
- notify_producer_count = 0;
- notify_consumer_count = 0;
-}
-
-void Channel::yield(enum channel_op type)
-{
- int _cid = coroutine_get_current_cid();
- if (_cid == -1)
- {
- swError("Socket::yield() must be called in the coroutine.");
- }
- coroutine_t *co = coroutine_get_by_id(_cid);
- if (type == PRODUCER)
- {
- producer_queue.push_back(co);
- swDebug("producer[%d]", coroutine_get_cid(co));
- }
- else
- {
- consumer_queue.push_back(co);
- swDebug("consumer[%d]", coroutine_get_cid(co));
- }
- coroutine_yield(co);
-}
-
-void Channel::notify(enum channel_op type)
-{
- notify_msg_t *msg = new notify_msg_t;
- msg->chan = this;
- msg->type = type;
- if (type == PRODUCER)
- {
- notify_producer_count++;
- }
- else
- {
- notify_consumer_count++;
- }
- SwooleG.main_reactor->defer(SwooleG.main_reactor, channel_defer_callback, msg);
-}
-
-void* Channel::pop(double timeout)
-{
- timeout_msg_t msg;
- msg.error = false;
- if (timeout > 0)
- {
- int msec = (int) (timeout * 1000);
- if (SwooleG.timer.fd == 0)
- {
- swTimer_init (msec);
- }
- msg.chan = this;
- msg.co = coroutine_get_by_id(coroutine_get_current_cid());
- msg.timer = SwooleG.timer.add(&SwooleG.timer, msec, 0, &msg, channel_pop_timeout);
- }
- else
- {
- msg.timer = NULL;
- }
- if (is_empty() || consumer_queue.size() > 0)
- {
- yield(CONSUMER);
- }
- if (msg.error)
- {
- return nullptr;
- }
- if (msg.timer)
- {
- swTimer_del(&SwooleG.timer, msg.timer);
- }
- /**
- * pop data
- */
- void *data = data_queue.front();
- data_queue.pop();
- /**
- * notify producer
- */
- if (producer_queue.size() > 0 && notify_producer_count < producer_queue.size())
- {
- notify(PRODUCER);
- }
- return data;
-}
-
-bool Channel::push(void *data)
-{
- if (is_full() || producer_queue.size() > 0)
- {
- yield(PRODUCER);
- }
- /**
- * push data
- */
- data_queue.push(data);
- swDebug("push data, count=%ld", length());
- /**
- * notify consumer
- */
- if (consumer_queue.size() > 0 && notify_consumer_count < consumer_queue.size())
- {
- notify(CONSUMER);
- }
- return true;
-}
-
-bool Channel::close()
-{
- if (closed)
- {
- return false;
- }
- swDebug("closed");
- closed = true;
- while (producer_queue.size() > 0 && notify_producer_count < producer_queue.size())
- {
- notify(PRODUCER);
- }
- while (consumer_queue.size() > 0 && notify_consumer_count < producer_queue.size())
- {
- notify(CONSUMER);
- }
- return true;
-}
diff --git a/channel.h b/channel.h
deleted file mode 100644
index ab4fb5f..0000000
--- a/channel.h
+++ /dev/null
@@ -1,107 +0,0 @@
-#pragma once
-
-#include "swoole.h"
-#include "context.h"
-#include "coroutine.h"
-#include <string>
-#include <iostream>
-#include <list>
-#include <queue>
-#include <sys/stat.h>
-
-namespace swoole {
-
-enum channel_op
-{
- PRODUCER = 1,
- CONSUMER = 2,
-};
-
-class Channel;
-
-struct notify_msg_t
-{
- Channel *chan;
- enum channel_op type;
-};
-
-struct timeout_msg_t
-{
- Channel *chan;
- coroutine_t *co;
- bool error;
- swTimer_node *timer;
-};
-
-class Channel
-{
-private:
- std::list<coroutine_t *> producer_queue;
- std::list<coroutine_t *> consumer_queue;
- std::queue<void *> data_queue;
- size_t capacity;
- uint32_t notify_producer_count;
- uint32_t notify_consumer_count;
-
-public:
- bool closed;
- inline bool is_empty()
- {
- return data_queue.size() == 0;
- }
-
- inline bool is_full()
- {
- return data_queue.size() == capacity;
- }
-
- inline size_t length()
- {
- return data_queue.size();
- }
-
- inline size_t consumer_num()
- {
- return consumer_queue.size();
- }
-
- inline size_t producer_num()
- {
- return producer_queue.size();
- }
-
- inline void remove(coroutine_t *co)
- {
- consumer_queue.remove(co);
- }
-
- inline coroutine_t* pop_coroutine(enum channel_op type)
- {
- coroutine_t* co;
- if (type == PRODUCER)
- {
- co = producer_queue.front();
- producer_queue.pop_front();
- notify_producer_count--;
- swDebug("resume producer[%d]", coroutine_get_cid(co));
- }
- else
- {
- co = consumer_queue.front();
- consumer_queue.pop_front();
- notify_consumer_count--;
- swDebug("resume consumer[%d]", coroutine_get_cid(co));
- }
- return co;
- }
-
- Channel(size_t _capacity);
- void yield(enum channel_op type);
- void notify(enum channel_op type);
- void* pop(double timeout = 0);
- bool push(void *data);
- bool close();
-};
-
-};
-
diff --git a/php-pecl-swoole4.spec b/php-pecl-swoole4.spec
index 2ceed15..bb14709 100644
--- a/php-pecl-swoole4.spec
+++ b/php-pecl-swoole4.spec
@@ -36,17 +36,12 @@
Summary: PHP's asynchronous concurrent distributed networking framework
Name: %{?sub_prefix}php-pecl-%{pecl_name}4
-Version: 4.1.0
+Version: 4.1.1
Release: 1%{?dist}%{!?scl:%{!?nophptag:%(%{__php} -r 'echo ".".PHP_MAJOR_VERSION.".".PHP_MINOR_VERSION;')}}
License: BSD
URL: http://pecl.php.net/package/%{pecl_name}
Source0: http://pecl.php.net/get/%{pecl_name}-%{version}.tgz
-Source1: https://raw.githubusercontent.com/swoole/swoole-src/master/include/socket.h
-Source2: https://raw.githubusercontent.com/swoole/swoole-src/master/include/channel.h
-Source3: https://raw.githubusercontent.com/swoole/swoole-src/master/src/coroutine/socket.cc
-Source4: https://raw.githubusercontent.com/swoole/swoole-src/master/src/coroutine/channel.cc
-
%if 0%{?rhel} == 6
BuildRequires: devtoolset-6-toolchain
%else
@@ -170,9 +165,6 @@ sed -e '/examples/s/role="src"/role="doc"/' \
cd NTS
-cp %{SOURCE1} %{SOURCE2} include/
-cp %{SOURCE3} %{SOURCE4} src/coroutine/
-
# Sanity check, really often broken
extver=$(sed -n '/#define PHP_SWOOLE_VERSION/{s/.* "//;s/".*$//;p}' php_swoole.h)
if test "x${extver}" != "x%{version}%{?prever:-%{prever}}"; then
@@ -344,6 +336,9 @@ cd ../ZTS
%changelog
+* Fri Aug 31 2018 Remi Collet <remi@remirepo.net> - 4.1.1-1
+- update to 4.1.1 (no change)
+
* Fri Aug 31 2018 Remi Collet <remi@remirepo.net> - 4.1.0-1
- update to 4.1.0
- add dependency on brotli library (Fedora)
diff --git a/socket.cc b/socket.cc
deleted file mode 100644
index 7edb58f..0000000
--- a/socket.cc
+++ /dev/null
@@ -1,1441 +0,0 @@
-#include "socket.h"
-#include "context.h"
-#include "async.h"
-#include "buffer.h"
-
-#include <string>
-#include <iostream>
-#include <sys/stat.h>
-
-using namespace swoole;
-using namespace std;
-
-static int socket_onRead(swReactor *reactor, swEvent *event);
-static int socket_onWrite(swReactor *reactor, swEvent *event);
-static void socket_onTimeout(swTimer *timer, swTimer_node *tnode);
-static void socket_onResolveCompleted(swAio_event *event);
-
-bool Socket::socks5_handshake()
-{
- swSocks5 *ctx = socks5_proxy;
- char *buf = ctx->buf;
- int n;
-
- /**
- * handshake
- */
- swSocks5_pack(buf, socks5_proxy->username == NULL ? 0x00 : 0x02);
- socks5_proxy->state = SW_SOCKS5_STATE_HANDSHAKE;
- if (send(buf, 3) <= 0)
- {
- return false;
- }
- n = recv(buf, sizeof(ctx->buf));
- if (n <= 0)
- {
- return false;
- }
- uchar version = buf[0];
- uchar method = buf[1];
- if (version != SW_SOCKS5_VERSION_CODE)
- {
- swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported.");
- return SW_ERR;
- }
- if (method != ctx->method)
- {
- swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_METHOD, "SOCKS authentication method not supported.");
- return SW_ERR;
- }
- //authenticate request
- if (method == SW_SOCKS5_METHOD_AUTH)
- {
- buf[0] = 0x01;
- buf[1] = ctx->l_username;
-
- buf += 2;
- memcpy(buf, ctx->username, ctx->l_username);
- buf += ctx->l_username;
- buf[0] = ctx->l_password;
- memcpy(buf + 1, ctx->password, ctx->l_password);
-
- ctx->state = SW_SOCKS5_STATE_AUTH;
-
- if (send(ctx->buf, ctx->l_username + ctx->l_password + 3) < 0)
- {
- return false;
- }
-
- n = recv(buf, sizeof(ctx->buf));
- if (n <= 0)
- {
- return false;
- }
-
- uchar version = buf[0];
- uchar status = buf[1];
- if (version != 0x01)
- {
- swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported.");
- return false;
- }
- if (status != 0)
- {
- swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_AUTH_FAILED,
- "SOCKS username/password authentication failed.");
- return false;
- }
- goto send_connect_request;
- }
- //send connect request
- else
- {
- send_connect_request: buf[0] = SW_SOCKS5_VERSION_CODE;
- buf[1] = 0x01;
- buf[2] = 0x00;
-
- ctx->state = SW_SOCKS5_STATE_CONNECT;
-
- if (ctx->dns_tunnel)
- {
- buf[3] = 0x03;
- buf[4] = ctx->l_target_host;
- buf += 5;
- memcpy(buf, ctx->target_host, ctx->l_target_host);
- buf += ctx->l_target_host;
- *(uint16_t *) buf = htons(ctx->target_port);
-
- if (send(ctx->buf, ctx->l_target_host + 7) < 0)
- {
- return false;
- }
- }
- else
- {
- buf[3] = 0x01;
- buf += 4;
- *(uint32_t *) buf = htons(ctx->l_target_host);
- buf += 4;
- *(uint16_t *) buf = htons(ctx->target_port);
-
- if (send(ctx->buf, ctx->l_target_host + 7) < 0)
- {
- return false;
- }
- }
-
- /**
- * response
- */
- n = recv(buf, sizeof(ctx->buf));
- if (n <= 0)
- {
- return false;
- }
-
- uchar version = buf[0];
- if (version != SW_SOCKS5_VERSION_CODE)
- {
- swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported.");
- return false;
- }
- uchar result = buf[1];
-#if 0
- uchar reg = buf[2];
- uchar type = buf[3];
- uint32_t ip = *(uint32_t *) (buf + 4);
- uint16_t port = *(uint16_t *) (buf + 8);
-#endif
- if (result == 0)
- {
- ctx->state = SW_SOCKS5_STATE_READY;
- }
- else
- {
- swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_SERVER_ERROR, "Socks5 server error, reason :%s.",
- swSocks5_strerror(result));
- }
- return result;
- }
-}
-
-bool Socket::http_proxy_handshake()
-{
-#ifdef SW_USE_OPENSSL
- if (socket->ssl)
- {
- if (ssl_handshake() == false)
- {
- return false;
- }
- }
- else
- {
- return true;
- }
-#else
- return true;
-#endif
-
- //CONNECT
- int n = snprintf(http_proxy->buf, sizeof(http_proxy->buf), "CONNECT %*s:%d HTTP/1.1\r\n\r\n",
- http_proxy->l_target_host, http_proxy->target_host, http_proxy->target_port);
- if (send(http_proxy->buf, n) <= 0)
- {
- return false;
- }
-
- n = recv(http_proxy->buf, sizeof(http_proxy->buf));
- if (n <= 0)
- {
- return false;
- }
- char *buf = http_proxy->buf;
- int len = n;
- int state = 0;
- char *p = buf;
- for (p = buf; p < buf + len; p++)
- {
- if (state == 0)
- {
- if (strncasecmp(p, "HTTP/1.1", 8) == 0 || strncasecmp(p, "HTTP/1.0", 8) == 0)
- {
- state = 1;
- p += 8;
- }
- else
- {
- break;
- }
- }
- else if (state == 1)
- {
- if (isspace(*p))
- {
- continue;
- }
- else
- {
- if (strncasecmp(p, "200", 3) == 0)
- {
- state = 2;
- p += 3;
- }
- else
- {
- break;
- }
- }
- }
- else if (state == 2)
- {
- if (isspace(*p))
- {
- continue;
- }
- else
- {
- if (strncasecmp(p, "Connection established", sizeof("Connection established") - 1) == 0)
- {
- return true;
- }
- else
- {
- break;
- }
- }
- }
- }
- return false;
-}
-
-static inline int socket_connect(int fd, struct sockaddr *addr, socklen_t len)
-{
- int retval;
- while (1)
- {
- retval = ::connect(fd, addr, len);
- if (retval < 0)
- {
- if (errno == EINTR)
- {
- continue;
- }
- }
- break;
- }
- return retval;
-}
-
-Socket::Socket(enum swSocket_type _type)
-{
- type = _type;
- switch (type)
- {
- case SW_SOCK_TCP6:
- _sock_domain = AF_INET6;
- _sock_type = SOCK_STREAM;
- break;
- case SW_SOCK_UNIX_STREAM:
- _sock_domain = AF_UNIX;
- _sock_type = SOCK_STREAM;
- break;
- case SW_SOCK_UDP:
- _sock_domain = AF_INET;
- _sock_type = SOCK_DGRAM;
- break;
- case SW_SOCK_UDP6:
- _sock_domain = AF_INET6;
- _sock_type = SOCK_DGRAM;
- break;
- case SW_SOCK_UNIX_DGRAM:
- _sock_domain = AF_UNIX;
- _sock_type = SOCK_DGRAM;
- break;
- case SW_SOCK_TCP:
- default:
- _sock_domain = AF_INET;
- _sock_type = SOCK_STREAM;
- break;
- }
-
-#ifdef SOCK_CLOEXEC
- int sockfd = ::socket(_sock_domain, _sock_type | SOCK_CLOEXEC, 0);
-#else
- int sockfd = ::socket(_sock_domain, _sock_type, 0);
-#endif
- if (sockfd < 0)
- {
- swWarn("socket() failed. Error: %s[%d]", strerror(errno), errno);
- return;
- }
-
- if (swIsMaster() && SwooleTG.type == SW_THREAD_REACTOR)
- {
- reactor = SwooleTG.reactor;
- }
- else
- {
- reactor = SwooleG.main_reactor;
- }
- socket = swReactor_get(reactor, sockfd);
-
- bzero(socket, sizeof(swConnection));
- socket->fd = sockfd;
- socket->object = this;
- socket->socket_type = type;
-
- swSetNonBlock(socket->fd);
- if (!swReactor_handle_isset(reactor, SW_FD_CORO_SOCKET))
- {
- reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_READ, socket_onRead);
- reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_WRITE, socket_onWrite);
- reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_ERROR, socket_onRead);
- }
- init();
-}
-
-Socket::Socket(int _fd, Socket *sock)
-{
- reactor = sock->reactor;
-
- socket = swReactor_get(reactor, _fd);
- bzero(socket, sizeof(swConnection));
- socket->fd = _fd;
- socket->object = this;
- socket->socket_type = sock->type;
-
- _sock_domain = sock->_sock_domain;
- _sock_type = sock->_sock_type;
- init();
-}
-
-bool Socket::connect(string host, int port, int flags)
-{
- //enable socks5 proxy
- if (socks5_proxy)
- {
- socks5_proxy->target_host = (char *) host.c_str();
- socks5_proxy->l_target_host = host.size();
- socks5_proxy->target_port = port;
-
- host = socks5_proxy->host;
- port = socks5_proxy->port;
- }
-
- //enable http proxy
- if (http_proxy)
- {
- http_proxy->target_host = (char *) host.c_str();
- http_proxy->l_target_host = host.size();
- http_proxy->target_port = port;
-
- host = http_proxy->proxy_host;
- port = http_proxy->proxy_port;
- }
-
- if (_sock_domain == AF_INET6 || _sock_domain == AF_INET)
- {
- if (port == -1)
- {
- swWarn("Socket of type AF_INET/AF_INET6 requires port argument");
- return false;
- }
- else if (port == 0 || port >= 65536)
- {
- swWarn("Invalid port argument[%d]", port);
- return false;
- }
- }
-
- if (unlikely(_cid && _cid != coroutine_get_current_cid()))
- {
- swWarn( "socket has already been bound to another coroutine.");
- return false;
- }
-
- int retval = 0;
- _host = host;
- _port = port;
-
- for (int i = 0; i < 2; i++)
- {
- if (_sock_domain == AF_INET)
- {
- socket->info.addr.inet_v4.sin_family = AF_INET;
- socket->info.addr.inet_v4.sin_port = htons(port);
-
- if (!inet_pton(AF_INET, _host.c_str(), & socket->info.addr.inet_v4.sin_addr))
- {
- _host = resolve(_host);
- if (_host.size() == 0)
- {
- return false;
- }
- continue;
- }
- else
- {
- socket->info.len = sizeof( socket->info.addr.inet_v4);
- retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.inet_v4, socket->info.len);
- break;
- }
- }
- else if (_sock_domain == AF_INET6)
- {
- socket->info.addr.inet_v6.sin6_family = AF_INET6;
- socket->info.addr.inet_v6.sin6_port = htons(port);
-
- if (!inet_pton(AF_INET6, _host.c_str(), &socket->info.addr.inet_v6.sin6_addr))
- {
- _host = resolve(_host);
- if (_host.size() == 0)
- {
- return false;
- }
- continue;
- }
- else
- {
- socket->info.len = sizeof(socket->info.addr.inet_v6);
- retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.inet_v6, socket->info.len);
- break;
- }
- }
- else if (_sock_domain == AF_UNIX)
- {
- if (_host.size() >= sizeof(socket->info.addr.un.sun_path))
- {
- return false;
- }
- socket->info.addr.un.sun_family = AF_UNIX;
- memcpy(&socket->info.addr.un.sun_path, _host.c_str(), _host.size());
- retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.un,
- (socklen_t) (offsetof(struct sockaddr_un, sun_path) + _host.size()));
- break;
- }
- else
- {
- return false;
- }
- }
-
- if (retval == -1)
- {
- if (errno != EINPROGRESS)
- {
- _error: errCode = errno;
- return false;
- }
- if (!wait_events(SW_EVENT_WRITE))
- {
- goto _error;
- }
- yield();
- //Connection has timed out
- if (errCode == ETIMEDOUT)
- {
- errMsg = strerror(errCode);
- return false;
- }
- socklen_t len = sizeof(errCode);
- if (getsockopt(socket->fd, SOL_SOCKET, SO_ERROR, &errCode, &len) < 0 || errCode != 0)
- {
- errMsg = strerror(errCode);
- return false;
- }
- }
- socket->active = 1;
- //socks5 proxy
- if (socks5_proxy && socks5_handshake() == false)
- {
- return false;
- }
- //http proxy
- if (http_proxy && http_proxy_handshake() == false)
- {
- return false;
- }
- return true;
-}
-
-static void socket_onResolveCompleted(swAio_event *event)
-{
- Socket *sock = (Socket *) event->object;
- if (event->error != 0)
- {
- sock->errCode = SW_ERROR_DNSLOOKUP_RESOLVE_FAILED;
- }
- sock->resume();
-}
-
-static void socket_onTimeout(swTimer *timer, swTimer_node *tnode)
-{
- Socket *sock = (Socket *) tnode->data;
- sock->timer = NULL;
- sock->errCode = ETIMEDOUT;
- swDebug("socket[%d] timeout", sock->socket->fd);
- sock->reactor->del(sock->reactor, sock->socket->fd);
- sock->resume();
-}
-
-static int socket_onRead(swReactor *reactor, swEvent *event)
-{
- Socket *sock = (Socket *) event->socket->object;
- reactor->del(reactor, event->fd);
- sock->resume();
- return SW_OK;
-}
-
-static int socket_onWrite(swReactor *reactor, swEvent *event)
-{
- Socket *sock = (Socket *) event->socket->object;
- reactor->del(reactor, event->fd);
- sock->resume();
- return SW_OK;
-}
-
-ssize_t Socket::peek(void *__buf, size_t __n)
-{
- return swConnection_peek(socket, __buf, __n, 0);
-}
-
-ssize_t Socket::recv(void *__buf, size_t __n)
-{
- ssize_t retval = swConnection_recv(socket, __buf, __n, 0);
- if (retval >= 0)
- {
- return retval;
- }
-
- if (swConnection_error(errno) != SW_WAIT)
- {
- errCode = errno;
- return -1;
- }
-
- while (true)
- {
- int events = SW_EVENT_READ;
-#ifdef SW_USE_OPENSSL
- if (socket->ssl && socket->ssl_want_write)
- {
- events = SW_EVENT_WRITE;
- }
-#endif
- if (!wait_events(events))
- {
- return -1;
- }
- yield();
- if (errCode == ETIMEDOUT)
- {
- return -1;
- }
- retval = swConnection_recv(socket, __buf, __n, 0);
- if (retval < 0)
- {
- if (swConnection_error(errno) == SW_WAIT)
- {
- continue;
- }
- errCode = errno;
- }
- break;
- }
- return retval;
-}
-
-ssize_t Socket::recv_all(void *__buf, size_t __n)
-{
- ssize_t retval, total_bytes = 0;
- while (true)
- {
- retval = recv((char*) __buf + total_bytes, __n - total_bytes);
- if (retval <= 0)
- {
- if (total_bytes == 0)
- {
- total_bytes = retval;
- }
- break;
- }
- total_bytes += retval;
- if ((size_t) total_bytes == __n)
- {
- break;
- }
- }
- return total_bytes;
-}
-
-ssize_t Socket::send_all(const void *__buf, size_t __n)
-{
- ssize_t retval, total_bytes = 0;
- while (true)
- {
- retval = send((char*) __buf + total_bytes, __n - total_bytes);
- if (retval <= 0)
- {
- if (total_bytes == 0)
- {
- total_bytes = retval;
- }
- break;
- }
- total_bytes += retval;
- if ((size_t) total_bytes == __n)
- {
- break;
- }
- }
- return total_bytes;
-}
-
-ssize_t Socket::send(const void *__buf, size_t __n)
-{
- ssize_t retval = swConnection_send(socket, (void *) __buf, __n, 0);
- if (retval >= 0)
- {
- return retval;
- }
-
- if (swConnection_error(errno) != SW_WAIT)
- {
- errCode = errno;
- return -1;
- }
-
- while (true)
- {
- int events = SW_EVENT_WRITE;
-#ifdef SW_USE_OPENSSL
- if (socket->ssl && socket->ssl_want_read)
- {
- events = SW_EVENT_READ;
- }
-#endif
- if (!wait_events(events))
- {
- return -1;
- }
- yield();
- if (errCode == ETIMEDOUT)
- {
- return -1;
- }
- ssize_t retval = swConnection_send(socket, (void *) __buf, __n, 0);
- if (retval < 0)
- {
- if (swConnection_error(errno) == SW_WAIT)
- {
- continue;
- }
- errCode = errno;
- }
- break;
- }
-
- return retval;
-}
-
-void Socket::yield()
-{
- if (suspending)
- {
- swError("socket has already been bound to another coroutine.");
- }
- errCode = 0;
- if (_timeout > 0)
- {
- int ms = (int) (_timeout * 1000);
- if (SwooleG.timer.fd == 0)
- {
- swTimer_init(ms);
- }
- timer = SwooleG.timer.add(&SwooleG.timer, ms, 0, this, socket_onTimeout);
- }
- _cid = coroutine_get_current_cid();
- if (_cid == -1)
- {
- swError("Socket::yield() must be called in the coroutine.");
- }
- //suspend
- suspending = true;
- coroutine_yield(coroutine_get_by_id(_cid));
- suspending = false;
- //clear timer
- if (timer)
- {
- swTimer_del(&SwooleG.timer, timer);
- timer = nullptr;
- }
-}
-
-void Socket::resume()
-{
- coroutine_resume(coroutine_get_by_id(_cid));
-}
-
-bool Socket::bind(std::string address, int port)
-{
- bind_address = address;
- bind_port = port;
-
- struct sockaddr_storage sa_storage = { 0 };
- struct sockaddr *sock_type = (struct sockaddr*) &sa_storage;
-
- int retval;
- switch (_sock_domain)
- {
- case AF_UNIX:
- {
- struct sockaddr_un *sa = (struct sockaddr_un *) sock_type;
- sa->sun_family = AF_UNIX;
-
- if (bind_address.size() >= sizeof(sa->sun_path))
- {
- return false;
- }
- memcpy(&sa->sun_path, bind_address.c_str(), bind_address.size());
-
- retval = ::bind(socket->fd, (struct sockaddr *) sa,
- offsetof(struct sockaddr_un, sun_path) + bind_address.size());
- break;
- }
-
- case AF_INET:
- {
- struct sockaddr_in *sa = (struct sockaddr_in *) sock_type;
- sa->sin_family = AF_INET;
- sa->sin_port = htons((unsigned short) bind_port);
- if (!inet_aton(bind_address.c_str(), &sa->sin_addr))
- {
- return false;
- }
- retval = ::bind(socket->fd, (struct sockaddr *) sa, sizeof(struct sockaddr_in));
- break;
- }
-
- case AF_INET6:
- {
- struct sockaddr_in6 *sa = (struct sockaddr_in6 *) sock_type;
- sa->sin6_family = AF_INET6;
- sa->sin6_port = htons((unsigned short) bind_port);
-
- if (!inet_pton(AF_INET6, bind_address.c_str(), &sa->sin6_addr))
- {
- return false;
- }
- retval = ::bind(socket->fd, (struct sockaddr *) sa, sizeof(struct sockaddr_in6));
- break;
- }
- default:
- return false;
- }
-
- if (retval != 0)
- {
- errCode = errno;
- return false;
- }
-
- return true;
-}
-
-bool Socket::listen(int backlog)
-{
- _backlog = backlog;
- if (::listen(socket->fd, backlog) != 0)
- {
- errCode = errno;
- return false;
- }
- return true;
-}
-
-Socket* Socket::accept()
-{
- if (!wait_events(SW_EVENT_READ))
- {
- return nullptr;
- }
- yield();
- if (errCode == ETIMEDOUT)
- {
- return nullptr;
- }
- int conn;
- swSocketAddress client_addr;
- socklen_t client_addrlen = sizeof(client_addr);
-
-#ifdef HAVE_ACCEPT4
- conn = ::accept4(socket->fd, (struct sockaddr *) &client_addr, &client_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
-#else
- conn = ::accept(socket->fd, (struct sockaddr *) &client_addr, &client_addrlen);
- if (conn >= 0)
- {
- swoole_fcntl_set_option(conn, 1, 1);
- }
-#endif
- if (conn >= 0)
- {
- return new Socket(conn, this);
- }
- else
- {
- errCode = errno;
- return nullptr;
- }
-}
-
-string Socket::resolve(string domain_name)
-{
- swAio_event ev;
- bzero(&ev, sizeof(swAio_event));
- ev.nbytes = SW_IP_MAX_LENGTH;
- ev.buf = sw_malloc(ev.nbytes);
- if (!ev.buf)
- {
- errCode = errno;
- return "";
- }
-
- memcpy(ev.buf, domain_name.c_str(), domain_name.size());
- ((char *) ev.buf)[domain_name.size()] = 0;
- ev.flags = _sock_domain;
- ev.type = SW_AIO_GETHOSTBYNAME;
- ev.object = this;
- ev.callback = socket_onResolveCompleted;
-
- if (SwooleAIO.init == 0)
- {
- swAio_init();
- }
-
- if (swAio_dispatch(&ev) < 0)
- {
- errCode = SwooleG.error;
- sw_free(ev.buf);
- return "";
- }
- /**
- * cannot timeout
- */
- double tmp_timeout = _timeout;
- _timeout = -1;
- yield();
- _timeout = tmp_timeout;
-
- if (errCode == SW_ERROR_DNSLOOKUP_RESOLVE_FAILED)
- {
- errMsg = hstrerror(ev.error);
- return "";
- }
- else
- {
- string addr((char *) ev.buf);
- sw_free(ev.buf);
- return addr;
- }
-}
-
-bool Socket::shutdown(int __how)
-{
- if (!socket || socket->closed)
- {
- return false;
- }
- if (__how == SHUT_RD)
- {
- if (shutdown_read || shutdow_rw || ::shutdown(socket->fd, SHUT_RD))
- {
- return false;
- }
- else
- {
- shutdown_read = 1;
- return true;
- }
- }
- else if (__how == SHUT_WR)
- {
- if (shutdown_write || shutdow_rw || ::shutdown(socket->fd, SHUT_RD) < 0)
- {
- return false;
- }
- else
- {
- shutdown_write = 1;
- return true;
- }
- }
- else if (__how == SHUT_RDWR)
- {
- if (shutdow_rw || ::shutdown(socket->fd, SHUT_RDWR) < 0)
- {
- return false;
- }
- else
- {
- shutdown_read = 1;
- return true;
- }
- }
- else
- {
- return false;
- }
-}
-
-bool Socket::close()
-{
- if (socket == NULL || socket->closed)
- {
- return false;
- }
- socket->closed = 1;
-
- int fd = socket->fd;
-
- if (_sock_type == SW_SOCK_UNIX_DGRAM)
- {
- unlink(socket->info.addr.un.sun_path);
- }
-
-#ifdef SW_USE_OPENSSL
- if (open_ssl && ssl_context)
- {
- if (socket->ssl)
- {
- swSSL_close(socket);
- }
- swSSL_free_context(ssl_context);
- if (ssl_option.cert_file)
- {
- sw_free(ssl_option.cert_file);
- }
- if (ssl_option.key_file)
- {
- sw_free(ssl_option.key_file);
- }
- if (ssl_option.passphrase)
- {
- sw_free(ssl_option.passphrase);
- }
-#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
- if (ssl_option.tls_host_name)
- {
- sw_free(ssl_option.tls_host_name);
- }
-#endif
- if (ssl_option.cafile)
- {
- sw_free(ssl_option.cafile);
- }
- if (ssl_option.capath)
- {
- sw_free(ssl_option.capath);
- }
- }
-#endif
- if (_sock_type == SW_SOCK_UNIX_DGRAM)
- {
- unlink(socket->info.addr.un.sun_path);
- }
- if (timer)
- {
- swTimer_del(&SwooleG.timer, timer);
- timer = NULL;
- }
- socket->active = 0;
- ::close(fd);
- return true;
-}
-
-#ifdef SW_USE_OPENSSL
-bool Socket::ssl_handshake()
-{
- if (socket->ssl)
- {
- return false;
- }
-
- ssl_context = swSSL_get_context(&ssl_option);
- if (ssl_context == NULL)
- {
- return false;
- }
-
- if (ssl_option.verify_peer)
- {
- if (swSSL_set_capath(&ssl_option, ssl_context) < 0)
- {
- return false;
- }
- }
-
- socket->ssl_send = 1;
-#if defined(SW_USE_HTTP2) && defined(SW_USE_OPENSSL) && OPENSSL_VERSION_NUMBER >= 0x10002000L
- if (http2)
- {
- if (SSL_CTX_set_alpn_protos(ssl_context, (const unsigned char *) "\x02h2", 3) < 0)
- {
- return false;
- }
- }
-#endif
-
- if (swSSL_create(socket, ssl_context, SW_SSL_CLIENT) < 0)
- {
- return false;
- }
-#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
- if (ssl_option.tls_host_name)
- {
- SSL_set_tlsext_host_name(socket->ssl, ssl_option.tls_host_name);
- }
-#endif
-
- while (true)
- {
- int retval = swSSL_connect(socket);
- if (retval < 0)
- {
- errCode = SwooleG.error;
- return false;
- }
- if (socket->ssl_state == SW_SSL_STATE_WAIT_STREAM)
- {
- int events = socket->ssl_want_write ? SW_EVENT_WRITE : SW_EVENT_READ;
- if (!wait_events(events))
- {
- return false;
- }
- yield();
- if (errCode == ETIMEDOUT)
- {
- return false;
- }
- }
- else if (socket->ssl_state == SW_SSL_STATE_READY)
- {
- return true;
- }
- }
-
- if (socket->ssl_state == SW_SSL_STATE_READY && ssl_option.verify_peer)
- {
- if (ssl_verify(ssl_option.allow_self_signed) < 0)
- {
- return false;
- }
- }
- return true;
-}
-
-int Socket::ssl_verify(bool allow_self_signed)
-{
- if (swSSL_verify(socket, allow_self_signed) < 0)
- {
- return SW_ERR;
- }
- if (ssl_option.tls_host_name && swSSL_check_host(socket, ssl_option.tls_host_name) < 0)
- {
- return SW_ERR;
- }
- return SW_OK;
-}
-#endif
-
-bool Socket::sendfile(char *filename, off_t offset, size_t length)
-{
- int file_fd = open(filename, O_RDONLY);
- if (file_fd < 0)
- {
- swSysError("open(%s) failed.", filename);
- return false;
- }
-
- if (length == 0)
- {
- struct stat file_stat;
- if (::fstat(file_fd, &file_stat) < 0)
- {
- swSysError("fstat(%s) failed.", filename);
- ::close(file_fd);
- return false;
- }
- length = file_stat.st_size;
- }
- else
- {
- // total length of the file
- length = offset + length;
- }
-
- int n, sendn;
- while ((size_t) offset < length)
- {
- sendn = (length - offset > SW_SENDFILE_CHUNK_SIZE) ? SW_SENDFILE_CHUNK_SIZE : length - offset;
-#ifdef SW_USE_OPENSSL
- if (socket->ssl)
- {
- n = swSSL_sendfile(socket, file_fd, &offset, sendn);
- }
- else
-#endif
- {
- n = ::swoole_sendfile(socket->fd, file_fd, &offset, sendn);
- }
- if (n > 0)
- {
- continue;
- }
- else if (n == 0)
- {
- swWarn("sendfile return zero.");
- return false;
- }
- else if (errno != EAGAIN)
- {
- swSysError("sendfile(%d, %s) failed.", socket->fd, filename);
- _error: errCode = errno;
- ::close(file_fd);
- return false;
- }
- if (!wait_events(SW_EVENT_WRITE))
- {
- goto _error;
- }
- yield();
- if (errCode == ETIMEDOUT)
- {
- goto _error;
- }
- }
- ::close(file_fd);
- return true;
-}
-
-int Socket::sendto(char *address, int port, char *data, int len)
-{
- if (type == SW_SOCK_UDP)
- {
- return swSocket_udp_sendto(socket->fd, address, port, data, len);
- }
- else if (type == SW_SOCK_UDP6)
- {
- return swSocket_udp_sendto6(socket->fd, address, port, data, len);
- }
- else
- {
- swWarn("only supports SWOOLE_SOCK_UDP or SWOOLE_SOCK_UDP6.");
- return -1;
- }
-}
-
-int Socket::recvfrom(void *__buf, size_t __n, char *address, int *port)
-{
- socket->info.len = sizeof(socket->info.addr);
- int retval;
-
- _recv: retval = ::recvfrom(socket->fd, __buf, __n, 0, (struct sockaddr *) &socket->info.addr, &socket->info.len);
- if (retval < 0)
- {
- if (errno == EINTR)
- {
- goto _recv;
- }
- else if (swConnection_error(errno) == SW_WAIT)
- {
- if (!wait_events(SW_EVENT_READ))
- {
- return -1;
- }
- yield();
- if (errCode == ETIMEDOUT)
- {
- return -1;
- }
- retval = ::recvfrom(socket->fd, __buf, __n, 0, (struct sockaddr *) &socket->info.addr, &socket->info.len);
- if (retval < 0)
- {
- errCode = errno;
- }
- else
- {
- strcpy(address, swConnection_get_ip(socket));
- *port = swConnection_get_port(socket);
- }
- }
- else
- {
- errCode = errno;
- }
- }
- return retval;
-}
-
-/**
- * recv packet with protocol
- */
-ssize_t Socket::recv_packet()
-{
- get_buffer();
- ssize_t buf_len = SW_BUFFER_SIZE_STD;
- ssize_t retval;
-
- if (open_length_check)
- {
- uint32_t header_len;
-
- _get_header_len: header_len = protocol.package_length_offset + protocol.package_length_size;
- if (buffer->length > 0)
- {
- if (buffer->length < header_len)
- {
- goto _recv_header;
- }
- else
- {
- goto _get_length;
- }
- }
-
- _recv_header: retval = recv(buffer->str + buffer->length, header_len - buffer->length);
- if (retval <= 0)
- {
- return 0;
- }
- else if (retval < 0 || retval != header_len)
- {
- return 0;
- }
- else
- {
- buffer->length += retval;
- }
-
- _get_length: buf_len = protocol.get_package_length(&protocol, socket, buffer->str, (uint32_t) buffer->length);
- swDebug("packet_len=%ld, length=%ld", buf_len, buffer->length);
- //error package
- if (buf_len < 0)
- {
- return 0;
- }
- else if (buf_len == 0)
- {
- header_len = protocol.real_header_length;
- goto _recv_header;
- }
- //empty package
- else if (buf_len == header_len)
- {
- buffer->length = 0;
- return header_len;
- }
- else if (buf_len > protocol.package_max_length)
- {
- swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "packet[length=%d] is too big.", (int )buf_len);
- return 0;
- }
-
- if ((size_t) buf_len == buffer->length)
- {
- buffer->length = 0;
- return buf_len;
- }
- else if ((size_t) buf_len < buffer->length)
- {
- buffer->length = buffer->length - buf_len;
- memmove(buffer->str, buffer->str + buf_len, buffer->length);
- goto _get_header_len;
- }
-
- if ((size_t) buf_len >= buffer->size)
- {
- if (swString_extend(buffer, buf_len) < 0)
- {
- buffer->length = 0;
- return -1;
- }
- }
-
- retval = recv_all(buffer->str + buffer->length, buf_len - buffer->length);
- if (retval > 0)
- {
- buffer->length += retval;
- if (buffer->length != (size_t) buf_len)
- {
- retval = 0;
- }
- else
- {
- buffer->length = 0;
- return buf_len;
- }
- }
- }
- else if (open_eof_check)
- {
- int eof = -1;
- char *buf;
-
- if (buffer->length > 0)
- {
- goto find_eof;
- }
-
- while (1)
- {
- buf = buffer->str + buffer->length;
- buf_len = buffer->size - buffer->length;
-
- if (buf_len > SW_BUFFER_SIZE_BIG)
- {
- buf_len = SW_BUFFER_SIZE_BIG;
- }
-
- retval = recv(buf, buf_len);
- if (retval < 0)
- {
- buffer->length = 0;
- return -1;
- }
- else if (retval == 0)
- {
- buffer->length = 0;
- return 0;
- }
-
- buffer->length += retval;
-
- if (buffer->length < protocol.package_eof_len)
- {
- continue;
- }
-
- find_eof: eof = swoole_strnpos(buffer->str, buffer->length, protocol.package_eof, protocol.package_eof_len);
- if (eof >= 0)
- {
- eof += protocol.package_eof_len;
- if (buffer->length > (uint32_t) eof)
- {
- buffer->length -= eof;
- memmove(buffer->str, buffer->str + eof, buffer->length);
- }
- else
- {
- buffer->length = 0;
- }
- return eof;
- }
- else
- {
- if (buffer->length == protocol.package_max_length)
- {
- swWarn("no package eof");
- buffer->length = 0;
- return -1;
- }
- else if (buffer->length == buffer->size)
- {
- if (buffer->size < protocol.package_max_length)
- {
- size_t new_size = buffer->size * 2;
- if (new_size > protocol.package_max_length)
- {
- new_size = protocol.package_max_length;
- }
- if (swString_extend(buffer, new_size) < 0)
- {
- buffer->length = 0;
- return -1;
- }
- }
- }
- }
- }
- buffer->length = 0;
- }
- else
- {
- return -1;
- }
-
- return retval;
-}
-
-swString* Socket::get_buffer()
-{
- if (unlikely(buffer == nullptr))
- {
- buffer = swString_new(SW_BUFFER_SIZE_STD);
- }
- return buffer;
-}
-
-Socket::~Socket()
-{
- if (!socket->closed)
- {
- close();
- }
- if (socket->out_buffer)
- {
- swBuffer_free(socket->out_buffer);
- socket->out_buffer = NULL;
- }
- if (socket->in_buffer)
- {
- swBuffer_free(socket->in_buffer);
- socket->in_buffer = NULL;
- }
- if (buffer)
- {
- swString_free(buffer);
- }
- bzero(socket, sizeof(swConnection));
- socket->removed = 1;
-}
diff --git a/socket.h b/socket.h
deleted file mode 100644
index 7a3d039..0000000
--- a/socket.h
+++ /dev/null
@@ -1,142 +0,0 @@
-#pragma once
-
-#include "swoole.h"
-#include "connection.h"
-#include "socks5.h"
-#include <string>
-
-namespace swoole
-{
-class Socket
-{
-public:
- Socket(enum swSocket_type type);
- Socket(int _fd, Socket *sock);
- ~Socket();
- bool connect(std::string host, int port, int flags = 0);
- bool shutdown(int how);
- bool close();
- ssize_t send(const void *__buf, size_t __n);
- ssize_t peek(void *__buf, size_t __n);
- ssize_t recv(void *__buf, size_t __n);
- ssize_t recv_all(void *__buf, size_t __n);
- ssize_t send_all(const void *__buf, size_t __n);
- ssize_t recv_packet();
- Socket* accept();
- void resume();
- void yield();
- bool bind(std::string address, int port = 0);
- std::string resolve(std::string host);
- bool listen(int backlog = 0);
- bool sendfile(char *filename, off_t offset, size_t length);
- int sendto(char *address, int port, char *data, int len);
- int recvfrom(void *__buf, size_t __n, char *address, int *port = nullptr);
- swString* get_buffer();
-
- void setTimeout(double timeout)
- {
- _timeout = timeout;
- }
-
-#ifdef SW_USE_OPENSSL
- bool ssl_handshake();
- int ssl_verify(bool allow_self_signed);
-#endif
-
-protected:
- inline void init()
- {
- _cid = 0;
- suspending = false;
- _timeout = 0;
- _port = 0;
- errCode = 0;
- errMsg = nullptr;
- timer = nullptr;
- bind_port = 0;
- _backlog = 0;
-
- http2 = 0;
- shutdow_rw = 0;
- shutdown_read = 0;
- shutdown_write = 0;
- open_length_check = 0;
- open_eof_check = 0;
-
- socks5_proxy = nullptr;
- http_proxy = nullptr;
-
- buffer = nullptr;
- protocol = {0};
-
- protocol.package_length_type = 'N';
- protocol.package_length_size = 4;
- protocol.package_body_offset = 0;
- protocol.package_max_length = SW_BUFFER_INPUT_SIZE;
-
-#ifdef SW_USE_OPENSSL
- open_ssl = 0;
- ssl_wait_handshake = 0;
- ssl_context = NULL;
- ssl_option = {0};
-#endif
- }
-
- inline bool wait_events(int events)
- {
- if (reactor->add(reactor, socket->fd, SW_FD_CORO_SOCKET | events) < 0)
- {
- errCode = errno;
- return false;
- }
- else
- {
- return true;
- }
- }
-
- bool socks5_handshake();
- bool http_proxy_handshake();
-
-public:
- swTimer_node *timer;
- swReactor *reactor;
- std::string _host;
- std::string bind_address;
- int bind_port;
- int _port;
- int _cid;
- bool suspending;
- swConnection *socket;
- enum swSocket_type type;
- int _sock_type;
- int _sock_domain;
- double _timeout;
- int _backlog;
- int errCode;
- const char *errMsg;
- uint32_t http2 :1;
- uint32_t shutdow_rw :1;
- uint32_t shutdown_read :1;
- uint32_t shutdown_write :1;
- /**
- * one package: length check
- */
- uint32_t open_length_check :1;
- uint32_t open_eof_check :1;
-
- swProtocol protocol;
- swString *buffer;
-
- struct _swSocks5 *socks5_proxy;
- struct _http_proxy* http_proxy;
-
-#ifdef SW_USE_OPENSSL
- uint8_t open_ssl :1;
- uint8_t ssl_wait_handshake :1;
- SSL_CTX *ssl_context;
- swSSL_option ssl_option;
-#endif
-};
-
-};