diff options
-rw-r--r-- | PHPINFO | 2 | ||||
-rw-r--r-- | REFLECTION | 4 | ||||
-rw-r--r-- | channel.cc | 151 | ||||
-rw-r--r-- | channel.h | 107 | ||||
-rw-r--r-- | php-pecl-swoole4.spec | 13 | ||||
-rw-r--r-- | socket.cc | 1441 | ||||
-rw-r--r-- | socket.h | 142 |
7 files changed, 7 insertions, 1853 deletions
@@ -2,7 +2,7 @@ swoole swoole support => enabled -Version => 4.1.0 +Version => 4.1.1 Author => Swoole Group[email: team@swoole.com] coroutine => enabled trace-log => enabled @@ -1,4 +1,4 @@ -Extension [ <persistent> extension #148 swoole version 4.1.0 ] { +Extension [ <persistent> extension #148 swoole version 4.1.1 ] { - INI { Entry [ swoole.enable_coroutine <ALL> ] @@ -69,7 +69,7 @@ Extension [ <persistent> extension #148 swoole version 4.1.0 ] { Constant [ integer SWOOLE_DTLSv1_CLIENT_METHOD ] { 17 } Constant [ integer SWOOLE_EVENT_READ ] { 512 } Constant [ integer SWOOLE_EVENT_WRITE ] { 1024 } - Constant [ string SWOOLE_VERSION ] { 4.1.0 } + Constant [ string SWOOLE_VERSION ] { 4.1.1 } Constant [ integer SWOOLE_ERROR_MALLOC_FAIL ] { 501 } Constant [ integer SWOOLE_ERROR_SYSTEM_CALL_FAIL ] { 502 } Constant [ integer SWOOLE_ERROR_PHP_FATAL_ERROR ] { 503 } diff --git a/channel.cc b/channel.cc deleted file mode 100644 index 88423fc..0000000 --- a/channel.cc +++ /dev/null @@ -1,151 +0,0 @@ -#include "channel.h" - -using namespace swoole; - -static void channel_defer_callback(void *data) -{ - notify_msg_t *msg = (notify_msg_t*) data; - coroutine_t *co = msg->chan->pop_coroutine(msg->type); - coroutine_resume(co); - delete msg; -} - -static void channel_pop_timeout(swTimer *timer, swTimer_node *tnode) -{ - timeout_msg_t *msg = (timeout_msg_t *) tnode->data; - msg->error = true; - msg->timer = nullptr; - msg->chan->remove(msg->co); - coroutine_resume(msg->co); -} - -Channel::Channel(size_t _capacity) -{ - capacity = _capacity; - closed = false; - notify_producer_count = 0; - notify_consumer_count = 0; -} - -void Channel::yield(enum channel_op type) -{ - int _cid = coroutine_get_current_cid(); - if (_cid == -1) - { - swError("Socket::yield() must be called in the coroutine."); - } - coroutine_t *co = coroutine_get_by_id(_cid); - if (type == PRODUCER) - { - producer_queue.push_back(co); - swDebug("producer[%d]", coroutine_get_cid(co)); - } - else - { - consumer_queue.push_back(co); - swDebug("consumer[%d]", coroutine_get_cid(co)); - } - coroutine_yield(co); -} - -void Channel::notify(enum channel_op type) -{ - notify_msg_t *msg = new notify_msg_t; - msg->chan = this; - msg->type = type; - if (type == PRODUCER) - { - notify_producer_count++; - } - else - { - notify_consumer_count++; - } - SwooleG.main_reactor->defer(SwooleG.main_reactor, channel_defer_callback, msg); -} - -void* Channel::pop(double timeout) -{ - timeout_msg_t msg; - msg.error = false; - if (timeout > 0) - { - int msec = (int) (timeout * 1000); - if (SwooleG.timer.fd == 0) - { - swTimer_init (msec); - } - msg.chan = this; - msg.co = coroutine_get_by_id(coroutine_get_current_cid()); - msg.timer = SwooleG.timer.add(&SwooleG.timer, msec, 0, &msg, channel_pop_timeout); - } - else - { - msg.timer = NULL; - } - if (is_empty() || consumer_queue.size() > 0) - { - yield(CONSUMER); - } - if (msg.error) - { - return nullptr; - } - if (msg.timer) - { - swTimer_del(&SwooleG.timer, msg.timer); - } - /** - * pop data - */ - void *data = data_queue.front(); - data_queue.pop(); - /** - * notify producer - */ - if (producer_queue.size() > 0 && notify_producer_count < producer_queue.size()) - { - notify(PRODUCER); - } - return data; -} - -bool Channel::push(void *data) -{ - if (is_full() || producer_queue.size() > 0) - { - yield(PRODUCER); - } - /** - * push data - */ - data_queue.push(data); - swDebug("push data, count=%ld", length()); - /** - * notify consumer - */ - if (consumer_queue.size() > 0 && notify_consumer_count < consumer_queue.size()) - { - notify(CONSUMER); - } - return true; -} - -bool Channel::close() -{ - if (closed) - { - return false; - } - swDebug("closed"); - closed = true; - while (producer_queue.size() > 0 && notify_producer_count < producer_queue.size()) - { - notify(PRODUCER); - } - while (consumer_queue.size() > 0 && notify_consumer_count < producer_queue.size()) - { - notify(CONSUMER); - } - return true; -} diff --git a/channel.h b/channel.h deleted file mode 100644 index ab4fb5f..0000000 --- a/channel.h +++ /dev/null @@ -1,107 +0,0 @@ -#pragma once - -#include "swoole.h" -#include "context.h" -#include "coroutine.h" -#include <string> -#include <iostream> -#include <list> -#include <queue> -#include <sys/stat.h> - -namespace swoole { - -enum channel_op -{ - PRODUCER = 1, - CONSUMER = 2, -}; - -class Channel; - -struct notify_msg_t -{ - Channel *chan; - enum channel_op type; -}; - -struct timeout_msg_t -{ - Channel *chan; - coroutine_t *co; - bool error; - swTimer_node *timer; -}; - -class Channel -{ -private: - std::list<coroutine_t *> producer_queue; - std::list<coroutine_t *> consumer_queue; - std::queue<void *> data_queue; - size_t capacity; - uint32_t notify_producer_count; - uint32_t notify_consumer_count; - -public: - bool closed; - inline bool is_empty() - { - return data_queue.size() == 0; - } - - inline bool is_full() - { - return data_queue.size() == capacity; - } - - inline size_t length() - { - return data_queue.size(); - } - - inline size_t consumer_num() - { - return consumer_queue.size(); - } - - inline size_t producer_num() - { - return producer_queue.size(); - } - - inline void remove(coroutine_t *co) - { - consumer_queue.remove(co); - } - - inline coroutine_t* pop_coroutine(enum channel_op type) - { - coroutine_t* co; - if (type == PRODUCER) - { - co = producer_queue.front(); - producer_queue.pop_front(); - notify_producer_count--; - swDebug("resume producer[%d]", coroutine_get_cid(co)); - } - else - { - co = consumer_queue.front(); - consumer_queue.pop_front(); - notify_consumer_count--; - swDebug("resume consumer[%d]", coroutine_get_cid(co)); - } - return co; - } - - Channel(size_t _capacity); - void yield(enum channel_op type); - void notify(enum channel_op type); - void* pop(double timeout = 0); - bool push(void *data); - bool close(); -}; - -}; - diff --git a/php-pecl-swoole4.spec b/php-pecl-swoole4.spec index 2ceed15..bb14709 100644 --- a/php-pecl-swoole4.spec +++ b/php-pecl-swoole4.spec @@ -36,17 +36,12 @@ Summary: PHP's asynchronous concurrent distributed networking framework Name: %{?sub_prefix}php-pecl-%{pecl_name}4 -Version: 4.1.0 +Version: 4.1.1 Release: 1%{?dist}%{!?scl:%{!?nophptag:%(%{__php} -r 'echo ".".PHP_MAJOR_VERSION.".".PHP_MINOR_VERSION;')}} License: BSD URL: http://pecl.php.net/package/%{pecl_name} Source0: http://pecl.php.net/get/%{pecl_name}-%{version}.tgz -Source1: https://raw.githubusercontent.com/swoole/swoole-src/master/include/socket.h -Source2: https://raw.githubusercontent.com/swoole/swoole-src/master/include/channel.h -Source3: https://raw.githubusercontent.com/swoole/swoole-src/master/src/coroutine/socket.cc -Source4: https://raw.githubusercontent.com/swoole/swoole-src/master/src/coroutine/channel.cc - %if 0%{?rhel} == 6 BuildRequires: devtoolset-6-toolchain %else @@ -170,9 +165,6 @@ sed -e '/examples/s/role="src"/role="doc"/' \ cd NTS -cp %{SOURCE1} %{SOURCE2} include/ -cp %{SOURCE3} %{SOURCE4} src/coroutine/ - # Sanity check, really often broken extver=$(sed -n '/#define PHP_SWOOLE_VERSION/{s/.* "//;s/".*$//;p}' php_swoole.h) if test "x${extver}" != "x%{version}%{?prever:-%{prever}}"; then @@ -344,6 +336,9 @@ cd ../ZTS %changelog +* Fri Aug 31 2018 Remi Collet <remi@remirepo.net> - 4.1.1-1 +- update to 4.1.1 (no change) + * Fri Aug 31 2018 Remi Collet <remi@remirepo.net> - 4.1.0-1 - update to 4.1.0 - add dependency on brotli library (Fedora) diff --git a/socket.cc b/socket.cc deleted file mode 100644 index 7edb58f..0000000 --- a/socket.cc +++ /dev/null @@ -1,1441 +0,0 @@ -#include "socket.h" -#include "context.h" -#include "async.h" -#include "buffer.h" - -#include <string> -#include <iostream> -#include <sys/stat.h> - -using namespace swoole; -using namespace std; - -static int socket_onRead(swReactor *reactor, swEvent *event); -static int socket_onWrite(swReactor *reactor, swEvent *event); -static void socket_onTimeout(swTimer *timer, swTimer_node *tnode); -static void socket_onResolveCompleted(swAio_event *event); - -bool Socket::socks5_handshake() -{ - swSocks5 *ctx = socks5_proxy; - char *buf = ctx->buf; - int n; - - /** - * handshake - */ - swSocks5_pack(buf, socks5_proxy->username == NULL ? 0x00 : 0x02); - socks5_proxy->state = SW_SOCKS5_STATE_HANDSHAKE; - if (send(buf, 3) <= 0) - { - return false; - } - n = recv(buf, sizeof(ctx->buf)); - if (n <= 0) - { - return false; - } - uchar version = buf[0]; - uchar method = buf[1]; - if (version != SW_SOCKS5_VERSION_CODE) - { - swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported."); - return SW_ERR; - } - if (method != ctx->method) - { - swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_METHOD, "SOCKS authentication method not supported."); - return SW_ERR; - } - //authenticate request - if (method == SW_SOCKS5_METHOD_AUTH) - { - buf[0] = 0x01; - buf[1] = ctx->l_username; - - buf += 2; - memcpy(buf, ctx->username, ctx->l_username); - buf += ctx->l_username; - buf[0] = ctx->l_password; - memcpy(buf + 1, ctx->password, ctx->l_password); - - ctx->state = SW_SOCKS5_STATE_AUTH; - - if (send(ctx->buf, ctx->l_username + ctx->l_password + 3) < 0) - { - return false; - } - - n = recv(buf, sizeof(ctx->buf)); - if (n <= 0) - { - return false; - } - - uchar version = buf[0]; - uchar status = buf[1]; - if (version != 0x01) - { - swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported."); - return false; - } - if (status != 0) - { - swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_AUTH_FAILED, - "SOCKS username/password authentication failed."); - return false; - } - goto send_connect_request; - } - //send connect request - else - { - send_connect_request: buf[0] = SW_SOCKS5_VERSION_CODE; - buf[1] = 0x01; - buf[2] = 0x00; - - ctx->state = SW_SOCKS5_STATE_CONNECT; - - if (ctx->dns_tunnel) - { - buf[3] = 0x03; - buf[4] = ctx->l_target_host; - buf += 5; - memcpy(buf, ctx->target_host, ctx->l_target_host); - buf += ctx->l_target_host; - *(uint16_t *) buf = htons(ctx->target_port); - - if (send(ctx->buf, ctx->l_target_host + 7) < 0) - { - return false; - } - } - else - { - buf[3] = 0x01; - buf += 4; - *(uint32_t *) buf = htons(ctx->l_target_host); - buf += 4; - *(uint16_t *) buf = htons(ctx->target_port); - - if (send(ctx->buf, ctx->l_target_host + 7) < 0) - { - return false; - } - } - - /** - * response - */ - n = recv(buf, sizeof(ctx->buf)); - if (n <= 0) - { - return false; - } - - uchar version = buf[0]; - if (version != SW_SOCKS5_VERSION_CODE) - { - swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported."); - return false; - } - uchar result = buf[1]; -#if 0 - uchar reg = buf[2]; - uchar type = buf[3]; - uint32_t ip = *(uint32_t *) (buf + 4); - uint16_t port = *(uint16_t *) (buf + 8); -#endif - if (result == 0) - { - ctx->state = SW_SOCKS5_STATE_READY; - } - else - { - swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_SERVER_ERROR, "Socks5 server error, reason :%s.", - swSocks5_strerror(result)); - } - return result; - } -} - -bool Socket::http_proxy_handshake() -{ -#ifdef SW_USE_OPENSSL - if (socket->ssl) - { - if (ssl_handshake() == false) - { - return false; - } - } - else - { - return true; - } -#else - return true; -#endif - - //CONNECT - int n = snprintf(http_proxy->buf, sizeof(http_proxy->buf), "CONNECT %*s:%d HTTP/1.1\r\n\r\n", - http_proxy->l_target_host, http_proxy->target_host, http_proxy->target_port); - if (send(http_proxy->buf, n) <= 0) - { - return false; - } - - n = recv(http_proxy->buf, sizeof(http_proxy->buf)); - if (n <= 0) - { - return false; - } - char *buf = http_proxy->buf; - int len = n; - int state = 0; - char *p = buf; - for (p = buf; p < buf + len; p++) - { - if (state == 0) - { - if (strncasecmp(p, "HTTP/1.1", 8) == 0 || strncasecmp(p, "HTTP/1.0", 8) == 0) - { - state = 1; - p += 8; - } - else - { - break; - } - } - else if (state == 1) - { - if (isspace(*p)) - { - continue; - } - else - { - if (strncasecmp(p, "200", 3) == 0) - { - state = 2; - p += 3; - } - else - { - break; - } - } - } - else if (state == 2) - { - if (isspace(*p)) - { - continue; - } - else - { - if (strncasecmp(p, "Connection established", sizeof("Connection established") - 1) == 0) - { - return true; - } - else - { - break; - } - } - } - } - return false; -} - -static inline int socket_connect(int fd, struct sockaddr *addr, socklen_t len) -{ - int retval; - while (1) - { - retval = ::connect(fd, addr, len); - if (retval < 0) - { - if (errno == EINTR) - { - continue; - } - } - break; - } - return retval; -} - -Socket::Socket(enum swSocket_type _type) -{ - type = _type; - switch (type) - { - case SW_SOCK_TCP6: - _sock_domain = AF_INET6; - _sock_type = SOCK_STREAM; - break; - case SW_SOCK_UNIX_STREAM: - _sock_domain = AF_UNIX; - _sock_type = SOCK_STREAM; - break; - case SW_SOCK_UDP: - _sock_domain = AF_INET; - _sock_type = SOCK_DGRAM; - break; - case SW_SOCK_UDP6: - _sock_domain = AF_INET6; - _sock_type = SOCK_DGRAM; - break; - case SW_SOCK_UNIX_DGRAM: - _sock_domain = AF_UNIX; - _sock_type = SOCK_DGRAM; - break; - case SW_SOCK_TCP: - default: - _sock_domain = AF_INET; - _sock_type = SOCK_STREAM; - break; - } - -#ifdef SOCK_CLOEXEC - int sockfd = ::socket(_sock_domain, _sock_type | SOCK_CLOEXEC, 0); -#else - int sockfd = ::socket(_sock_domain, _sock_type, 0); -#endif - if (sockfd < 0) - { - swWarn("socket() failed. Error: %s[%d]", strerror(errno), errno); - return; - } - - if (swIsMaster() && SwooleTG.type == SW_THREAD_REACTOR) - { - reactor = SwooleTG.reactor; - } - else - { - reactor = SwooleG.main_reactor; - } - socket = swReactor_get(reactor, sockfd); - - bzero(socket, sizeof(swConnection)); - socket->fd = sockfd; - socket->object = this; - socket->socket_type = type; - - swSetNonBlock(socket->fd); - if (!swReactor_handle_isset(reactor, SW_FD_CORO_SOCKET)) - { - reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_READ, socket_onRead); - reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_WRITE, socket_onWrite); - reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_ERROR, socket_onRead); - } - init(); -} - -Socket::Socket(int _fd, Socket *sock) -{ - reactor = sock->reactor; - - socket = swReactor_get(reactor, _fd); - bzero(socket, sizeof(swConnection)); - socket->fd = _fd; - socket->object = this; - socket->socket_type = sock->type; - - _sock_domain = sock->_sock_domain; - _sock_type = sock->_sock_type; - init(); -} - -bool Socket::connect(string host, int port, int flags) -{ - //enable socks5 proxy - if (socks5_proxy) - { - socks5_proxy->target_host = (char *) host.c_str(); - socks5_proxy->l_target_host = host.size(); - socks5_proxy->target_port = port; - - host = socks5_proxy->host; - port = socks5_proxy->port; - } - - //enable http proxy - if (http_proxy) - { - http_proxy->target_host = (char *) host.c_str(); - http_proxy->l_target_host = host.size(); - http_proxy->target_port = port; - - host = http_proxy->proxy_host; - port = http_proxy->proxy_port; - } - - if (_sock_domain == AF_INET6 || _sock_domain == AF_INET) - { - if (port == -1) - { - swWarn("Socket of type AF_INET/AF_INET6 requires port argument"); - return false; - } - else if (port == 0 || port >= 65536) - { - swWarn("Invalid port argument[%d]", port); - return false; - } - } - - if (unlikely(_cid && _cid != coroutine_get_current_cid())) - { - swWarn( "socket has already been bound to another coroutine."); - return false; - } - - int retval = 0; - _host = host; - _port = port; - - for (int i = 0; i < 2; i++) - { - if (_sock_domain == AF_INET) - { - socket->info.addr.inet_v4.sin_family = AF_INET; - socket->info.addr.inet_v4.sin_port = htons(port); - - if (!inet_pton(AF_INET, _host.c_str(), & socket->info.addr.inet_v4.sin_addr)) - { - _host = resolve(_host); - if (_host.size() == 0) - { - return false; - } - continue; - } - else - { - socket->info.len = sizeof( socket->info.addr.inet_v4); - retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.inet_v4, socket->info.len); - break; - } - } - else if (_sock_domain == AF_INET6) - { - socket->info.addr.inet_v6.sin6_family = AF_INET6; - socket->info.addr.inet_v6.sin6_port = htons(port); - - if (!inet_pton(AF_INET6, _host.c_str(), &socket->info.addr.inet_v6.sin6_addr)) - { - _host = resolve(_host); - if (_host.size() == 0) - { - return false; - } - continue; - } - else - { - socket->info.len = sizeof(socket->info.addr.inet_v6); - retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.inet_v6, socket->info.len); - break; - } - } - else if (_sock_domain == AF_UNIX) - { - if (_host.size() >= sizeof(socket->info.addr.un.sun_path)) - { - return false; - } - socket->info.addr.un.sun_family = AF_UNIX; - memcpy(&socket->info.addr.un.sun_path, _host.c_str(), _host.size()); - retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.un, - (socklen_t) (offsetof(struct sockaddr_un, sun_path) + _host.size())); - break; - } - else - { - return false; - } - } - - if (retval == -1) - { - if (errno != EINPROGRESS) - { - _error: errCode = errno; - return false; - } - if (!wait_events(SW_EVENT_WRITE)) - { - goto _error; - } - yield(); - //Connection has timed out - if (errCode == ETIMEDOUT) - { - errMsg = strerror(errCode); - return false; - } - socklen_t len = sizeof(errCode); - if (getsockopt(socket->fd, SOL_SOCKET, SO_ERROR, &errCode, &len) < 0 || errCode != 0) - { - errMsg = strerror(errCode); - return false; - } - } - socket->active = 1; - //socks5 proxy - if (socks5_proxy && socks5_handshake() == false) - { - return false; - } - //http proxy - if (http_proxy && http_proxy_handshake() == false) - { - return false; - } - return true; -} - -static void socket_onResolveCompleted(swAio_event *event) -{ - Socket *sock = (Socket *) event->object; - if (event->error != 0) - { - sock->errCode = SW_ERROR_DNSLOOKUP_RESOLVE_FAILED; - } - sock->resume(); -} - -static void socket_onTimeout(swTimer *timer, swTimer_node *tnode) -{ - Socket *sock = (Socket *) tnode->data; - sock->timer = NULL; - sock->errCode = ETIMEDOUT; - swDebug("socket[%d] timeout", sock->socket->fd); - sock->reactor->del(sock->reactor, sock->socket->fd); - sock->resume(); -} - -static int socket_onRead(swReactor *reactor, swEvent *event) -{ - Socket *sock = (Socket *) event->socket->object; - reactor->del(reactor, event->fd); - sock->resume(); - return SW_OK; -} - -static int socket_onWrite(swReactor *reactor, swEvent *event) -{ - Socket *sock = (Socket *) event->socket->object; - reactor->del(reactor, event->fd); - sock->resume(); - return SW_OK; -} - -ssize_t Socket::peek(void *__buf, size_t __n) -{ - return swConnection_peek(socket, __buf, __n, 0); -} - -ssize_t Socket::recv(void *__buf, size_t __n) -{ - ssize_t retval = swConnection_recv(socket, __buf, __n, 0); - if (retval >= 0) - { - return retval; - } - - if (swConnection_error(errno) != SW_WAIT) - { - errCode = errno; - return -1; - } - - while (true) - { - int events = SW_EVENT_READ; -#ifdef SW_USE_OPENSSL - if (socket->ssl && socket->ssl_want_write) - { - events = SW_EVENT_WRITE; - } -#endif - if (!wait_events(events)) - { - return -1; - } - yield(); - if (errCode == ETIMEDOUT) - { - return -1; - } - retval = swConnection_recv(socket, __buf, __n, 0); - if (retval < 0) - { - if (swConnection_error(errno) == SW_WAIT) - { - continue; - } - errCode = errno; - } - break; - } - return retval; -} - -ssize_t Socket::recv_all(void *__buf, size_t __n) -{ - ssize_t retval, total_bytes = 0; - while (true) - { - retval = recv((char*) __buf + total_bytes, __n - total_bytes); - if (retval <= 0) - { - if (total_bytes == 0) - { - total_bytes = retval; - } - break; - } - total_bytes += retval; - if ((size_t) total_bytes == __n) - { - break; - } - } - return total_bytes; -} - -ssize_t Socket::send_all(const void *__buf, size_t __n) -{ - ssize_t retval, total_bytes = 0; - while (true) - { - retval = send((char*) __buf + total_bytes, __n - total_bytes); - if (retval <= 0) - { - if (total_bytes == 0) - { - total_bytes = retval; - } - break; - } - total_bytes += retval; - if ((size_t) total_bytes == __n) - { - break; - } - } - return total_bytes; -} - -ssize_t Socket::send(const void *__buf, size_t __n) -{ - ssize_t retval = swConnection_send(socket, (void *) __buf, __n, 0); - if (retval >= 0) - { - return retval; - } - - if (swConnection_error(errno) != SW_WAIT) - { - errCode = errno; - return -1; - } - - while (true) - { - int events = SW_EVENT_WRITE; -#ifdef SW_USE_OPENSSL - if (socket->ssl && socket->ssl_want_read) - { - events = SW_EVENT_READ; - } -#endif - if (!wait_events(events)) - { - return -1; - } - yield(); - if (errCode == ETIMEDOUT) - { - return -1; - } - ssize_t retval = swConnection_send(socket, (void *) __buf, __n, 0); - if (retval < 0) - { - if (swConnection_error(errno) == SW_WAIT) - { - continue; - } - errCode = errno; - } - break; - } - - return retval; -} - -void Socket::yield() -{ - if (suspending) - { - swError("socket has already been bound to another coroutine."); - } - errCode = 0; - if (_timeout > 0) - { - int ms = (int) (_timeout * 1000); - if (SwooleG.timer.fd == 0) - { - swTimer_init(ms); - } - timer = SwooleG.timer.add(&SwooleG.timer, ms, 0, this, socket_onTimeout); - } - _cid = coroutine_get_current_cid(); - if (_cid == -1) - { - swError("Socket::yield() must be called in the coroutine."); - } - //suspend - suspending = true; - coroutine_yield(coroutine_get_by_id(_cid)); - suspending = false; - //clear timer - if (timer) - { - swTimer_del(&SwooleG.timer, timer); - timer = nullptr; - } -} - -void Socket::resume() -{ - coroutine_resume(coroutine_get_by_id(_cid)); -} - -bool Socket::bind(std::string address, int port) -{ - bind_address = address; - bind_port = port; - - struct sockaddr_storage sa_storage = { 0 }; - struct sockaddr *sock_type = (struct sockaddr*) &sa_storage; - - int retval; - switch (_sock_domain) - { - case AF_UNIX: - { - struct sockaddr_un *sa = (struct sockaddr_un *) sock_type; - sa->sun_family = AF_UNIX; - - if (bind_address.size() >= sizeof(sa->sun_path)) - { - return false; - } - memcpy(&sa->sun_path, bind_address.c_str(), bind_address.size()); - - retval = ::bind(socket->fd, (struct sockaddr *) sa, - offsetof(struct sockaddr_un, sun_path) + bind_address.size()); - break; - } - - case AF_INET: - { - struct sockaddr_in *sa = (struct sockaddr_in *) sock_type; - sa->sin_family = AF_INET; - sa->sin_port = htons((unsigned short) bind_port); - if (!inet_aton(bind_address.c_str(), &sa->sin_addr)) - { - return false; - } - retval = ::bind(socket->fd, (struct sockaddr *) sa, sizeof(struct sockaddr_in)); - break; - } - - case AF_INET6: - { - struct sockaddr_in6 *sa = (struct sockaddr_in6 *) sock_type; - sa->sin6_family = AF_INET6; - sa->sin6_port = htons((unsigned short) bind_port); - - if (!inet_pton(AF_INET6, bind_address.c_str(), &sa->sin6_addr)) - { - return false; - } - retval = ::bind(socket->fd, (struct sockaddr *) sa, sizeof(struct sockaddr_in6)); - break; - } - default: - return false; - } - - if (retval != 0) - { - errCode = errno; - return false; - } - - return true; -} - -bool Socket::listen(int backlog) -{ - _backlog = backlog; - if (::listen(socket->fd, backlog) != 0) - { - errCode = errno; - return false; - } - return true; -} - -Socket* Socket::accept() -{ - if (!wait_events(SW_EVENT_READ)) - { - return nullptr; - } - yield(); - if (errCode == ETIMEDOUT) - { - return nullptr; - } - int conn; - swSocketAddress client_addr; - socklen_t client_addrlen = sizeof(client_addr); - -#ifdef HAVE_ACCEPT4 - conn = ::accept4(socket->fd, (struct sockaddr *) &client_addr, &client_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); -#else - conn = ::accept(socket->fd, (struct sockaddr *) &client_addr, &client_addrlen); - if (conn >= 0) - { - swoole_fcntl_set_option(conn, 1, 1); - } -#endif - if (conn >= 0) - { - return new Socket(conn, this); - } - else - { - errCode = errno; - return nullptr; - } -} - -string Socket::resolve(string domain_name) -{ - swAio_event ev; - bzero(&ev, sizeof(swAio_event)); - ev.nbytes = SW_IP_MAX_LENGTH; - ev.buf = sw_malloc(ev.nbytes); - if (!ev.buf) - { - errCode = errno; - return ""; - } - - memcpy(ev.buf, domain_name.c_str(), domain_name.size()); - ((char *) ev.buf)[domain_name.size()] = 0; - ev.flags = _sock_domain; - ev.type = SW_AIO_GETHOSTBYNAME; - ev.object = this; - ev.callback = socket_onResolveCompleted; - - if (SwooleAIO.init == 0) - { - swAio_init(); - } - - if (swAio_dispatch(&ev) < 0) - { - errCode = SwooleG.error; - sw_free(ev.buf); - return ""; - } - /** - * cannot timeout - */ - double tmp_timeout = _timeout; - _timeout = -1; - yield(); - _timeout = tmp_timeout; - - if (errCode == SW_ERROR_DNSLOOKUP_RESOLVE_FAILED) - { - errMsg = hstrerror(ev.error); - return ""; - } - else - { - string addr((char *) ev.buf); - sw_free(ev.buf); - return addr; - } -} - -bool Socket::shutdown(int __how) -{ - if (!socket || socket->closed) - { - return false; - } - if (__how == SHUT_RD) - { - if (shutdown_read || shutdow_rw || ::shutdown(socket->fd, SHUT_RD)) - { - return false; - } - else - { - shutdown_read = 1; - return true; - } - } - else if (__how == SHUT_WR) - { - if (shutdown_write || shutdow_rw || ::shutdown(socket->fd, SHUT_RD) < 0) - { - return false; - } - else - { - shutdown_write = 1; - return true; - } - } - else if (__how == SHUT_RDWR) - { - if (shutdow_rw || ::shutdown(socket->fd, SHUT_RDWR) < 0) - { - return false; - } - else - { - shutdown_read = 1; - return true; - } - } - else - { - return false; - } -} - -bool Socket::close() -{ - if (socket == NULL || socket->closed) - { - return false; - } - socket->closed = 1; - - int fd = socket->fd; - - if (_sock_type == SW_SOCK_UNIX_DGRAM) - { - unlink(socket->info.addr.un.sun_path); - } - -#ifdef SW_USE_OPENSSL - if (open_ssl && ssl_context) - { - if (socket->ssl) - { - swSSL_close(socket); - } - swSSL_free_context(ssl_context); - if (ssl_option.cert_file) - { - sw_free(ssl_option.cert_file); - } - if (ssl_option.key_file) - { - sw_free(ssl_option.key_file); - } - if (ssl_option.passphrase) - { - sw_free(ssl_option.passphrase); - } -#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME - if (ssl_option.tls_host_name) - { - sw_free(ssl_option.tls_host_name); - } -#endif - if (ssl_option.cafile) - { - sw_free(ssl_option.cafile); - } - if (ssl_option.capath) - { - sw_free(ssl_option.capath); - } - } -#endif - if (_sock_type == SW_SOCK_UNIX_DGRAM) - { - unlink(socket->info.addr.un.sun_path); - } - if (timer) - { - swTimer_del(&SwooleG.timer, timer); - timer = NULL; - } - socket->active = 0; - ::close(fd); - return true; -} - -#ifdef SW_USE_OPENSSL -bool Socket::ssl_handshake() -{ - if (socket->ssl) - { - return false; - } - - ssl_context = swSSL_get_context(&ssl_option); - if (ssl_context == NULL) - { - return false; - } - - if (ssl_option.verify_peer) - { - if (swSSL_set_capath(&ssl_option, ssl_context) < 0) - { - return false; - } - } - - socket->ssl_send = 1; -#if defined(SW_USE_HTTP2) && defined(SW_USE_OPENSSL) && OPENSSL_VERSION_NUMBER >= 0x10002000L - if (http2) - { - if (SSL_CTX_set_alpn_protos(ssl_context, (const unsigned char *) "\x02h2", 3) < 0) - { - return false; - } - } -#endif - - if (swSSL_create(socket, ssl_context, SW_SSL_CLIENT) < 0) - { - return false; - } -#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME - if (ssl_option.tls_host_name) - { - SSL_set_tlsext_host_name(socket->ssl, ssl_option.tls_host_name); - } -#endif - - while (true) - { - int retval = swSSL_connect(socket); - if (retval < 0) - { - errCode = SwooleG.error; - return false; - } - if (socket->ssl_state == SW_SSL_STATE_WAIT_STREAM) - { - int events = socket->ssl_want_write ? SW_EVENT_WRITE : SW_EVENT_READ; - if (!wait_events(events)) - { - return false; - } - yield(); - if (errCode == ETIMEDOUT) - { - return false; - } - } - else if (socket->ssl_state == SW_SSL_STATE_READY) - { - return true; - } - } - - if (socket->ssl_state == SW_SSL_STATE_READY && ssl_option.verify_peer) - { - if (ssl_verify(ssl_option.allow_self_signed) < 0) - { - return false; - } - } - return true; -} - -int Socket::ssl_verify(bool allow_self_signed) -{ - if (swSSL_verify(socket, allow_self_signed) < 0) - { - return SW_ERR; - } - if (ssl_option.tls_host_name && swSSL_check_host(socket, ssl_option.tls_host_name) < 0) - { - return SW_ERR; - } - return SW_OK; -} -#endif - -bool Socket::sendfile(char *filename, off_t offset, size_t length) -{ - int file_fd = open(filename, O_RDONLY); - if (file_fd < 0) - { - swSysError("open(%s) failed.", filename); - return false; - } - - if (length == 0) - { - struct stat file_stat; - if (::fstat(file_fd, &file_stat) < 0) - { - swSysError("fstat(%s) failed.", filename); - ::close(file_fd); - return false; - } - length = file_stat.st_size; - } - else - { - // total length of the file - length = offset + length; - } - - int n, sendn; - while ((size_t) offset < length) - { - sendn = (length - offset > SW_SENDFILE_CHUNK_SIZE) ? SW_SENDFILE_CHUNK_SIZE : length - offset; -#ifdef SW_USE_OPENSSL - if (socket->ssl) - { - n = swSSL_sendfile(socket, file_fd, &offset, sendn); - } - else -#endif - { - n = ::swoole_sendfile(socket->fd, file_fd, &offset, sendn); - } - if (n > 0) - { - continue; - } - else if (n == 0) - { - swWarn("sendfile return zero."); - return false; - } - else if (errno != EAGAIN) - { - swSysError("sendfile(%d, %s) failed.", socket->fd, filename); - _error: errCode = errno; - ::close(file_fd); - return false; - } - if (!wait_events(SW_EVENT_WRITE)) - { - goto _error; - } - yield(); - if (errCode == ETIMEDOUT) - { - goto _error; - } - } - ::close(file_fd); - return true; -} - -int Socket::sendto(char *address, int port, char *data, int len) -{ - if (type == SW_SOCK_UDP) - { - return swSocket_udp_sendto(socket->fd, address, port, data, len); - } - else if (type == SW_SOCK_UDP6) - { - return swSocket_udp_sendto6(socket->fd, address, port, data, len); - } - else - { - swWarn("only supports SWOOLE_SOCK_UDP or SWOOLE_SOCK_UDP6."); - return -1; - } -} - -int Socket::recvfrom(void *__buf, size_t __n, char *address, int *port) -{ - socket->info.len = sizeof(socket->info.addr); - int retval; - - _recv: retval = ::recvfrom(socket->fd, __buf, __n, 0, (struct sockaddr *) &socket->info.addr, &socket->info.len); - if (retval < 0) - { - if (errno == EINTR) - { - goto _recv; - } - else if (swConnection_error(errno) == SW_WAIT) - { - if (!wait_events(SW_EVENT_READ)) - { - return -1; - } - yield(); - if (errCode == ETIMEDOUT) - { - return -1; - } - retval = ::recvfrom(socket->fd, __buf, __n, 0, (struct sockaddr *) &socket->info.addr, &socket->info.len); - if (retval < 0) - { - errCode = errno; - } - else - { - strcpy(address, swConnection_get_ip(socket)); - *port = swConnection_get_port(socket); - } - } - else - { - errCode = errno; - } - } - return retval; -} - -/** - * recv packet with protocol - */ -ssize_t Socket::recv_packet() -{ - get_buffer(); - ssize_t buf_len = SW_BUFFER_SIZE_STD; - ssize_t retval; - - if (open_length_check) - { - uint32_t header_len; - - _get_header_len: header_len = protocol.package_length_offset + protocol.package_length_size; - if (buffer->length > 0) - { - if (buffer->length < header_len) - { - goto _recv_header; - } - else - { - goto _get_length; - } - } - - _recv_header: retval = recv(buffer->str + buffer->length, header_len - buffer->length); - if (retval <= 0) - { - return 0; - } - else if (retval < 0 || retval != header_len) - { - return 0; - } - else - { - buffer->length += retval; - } - - _get_length: buf_len = protocol.get_package_length(&protocol, socket, buffer->str, (uint32_t) buffer->length); - swDebug("packet_len=%ld, length=%ld", buf_len, buffer->length); - //error package - if (buf_len < 0) - { - return 0; - } - else if (buf_len == 0) - { - header_len = protocol.real_header_length; - goto _recv_header; - } - //empty package - else if (buf_len == header_len) - { - buffer->length = 0; - return header_len; - } - else if (buf_len > protocol.package_max_length) - { - swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "packet[length=%d] is too big.", (int )buf_len); - return 0; - } - - if ((size_t) buf_len == buffer->length) - { - buffer->length = 0; - return buf_len; - } - else if ((size_t) buf_len < buffer->length) - { - buffer->length = buffer->length - buf_len; - memmove(buffer->str, buffer->str + buf_len, buffer->length); - goto _get_header_len; - } - - if ((size_t) buf_len >= buffer->size) - { - if (swString_extend(buffer, buf_len) < 0) - { - buffer->length = 0; - return -1; - } - } - - retval = recv_all(buffer->str + buffer->length, buf_len - buffer->length); - if (retval > 0) - { - buffer->length += retval; - if (buffer->length != (size_t) buf_len) - { - retval = 0; - } - else - { - buffer->length = 0; - return buf_len; - } - } - } - else if (open_eof_check) - { - int eof = -1; - char *buf; - - if (buffer->length > 0) - { - goto find_eof; - } - - while (1) - { - buf = buffer->str + buffer->length; - buf_len = buffer->size - buffer->length; - - if (buf_len > SW_BUFFER_SIZE_BIG) - { - buf_len = SW_BUFFER_SIZE_BIG; - } - - retval = recv(buf, buf_len); - if (retval < 0) - { - buffer->length = 0; - return -1; - } - else if (retval == 0) - { - buffer->length = 0; - return 0; - } - - buffer->length += retval; - - if (buffer->length < protocol.package_eof_len) - { - continue; - } - - find_eof: eof = swoole_strnpos(buffer->str, buffer->length, protocol.package_eof, protocol.package_eof_len); - if (eof >= 0) - { - eof += protocol.package_eof_len; - if (buffer->length > (uint32_t) eof) - { - buffer->length -= eof; - memmove(buffer->str, buffer->str + eof, buffer->length); - } - else - { - buffer->length = 0; - } - return eof; - } - else - { - if (buffer->length == protocol.package_max_length) - { - swWarn("no package eof"); - buffer->length = 0; - return -1; - } - else if (buffer->length == buffer->size) - { - if (buffer->size < protocol.package_max_length) - { - size_t new_size = buffer->size * 2; - if (new_size > protocol.package_max_length) - { - new_size = protocol.package_max_length; - } - if (swString_extend(buffer, new_size) < 0) - { - buffer->length = 0; - return -1; - } - } - } - } - } - buffer->length = 0; - } - else - { - return -1; - } - - return retval; -} - -swString* Socket::get_buffer() -{ - if (unlikely(buffer == nullptr)) - { - buffer = swString_new(SW_BUFFER_SIZE_STD); - } - return buffer; -} - -Socket::~Socket() -{ - if (!socket->closed) - { - close(); - } - if (socket->out_buffer) - { - swBuffer_free(socket->out_buffer); - socket->out_buffer = NULL; - } - if (socket->in_buffer) - { - swBuffer_free(socket->in_buffer); - socket->in_buffer = NULL; - } - if (buffer) - { - swString_free(buffer); - } - bzero(socket, sizeof(swConnection)); - socket->removed = 1; -} diff --git a/socket.h b/socket.h deleted file mode 100644 index 7a3d039..0000000 --- a/socket.h +++ /dev/null @@ -1,142 +0,0 @@ -#pragma once - -#include "swoole.h" -#include "connection.h" -#include "socks5.h" -#include <string> - -namespace swoole -{ -class Socket -{ -public: - Socket(enum swSocket_type type); - Socket(int _fd, Socket *sock); - ~Socket(); - bool connect(std::string host, int port, int flags = 0); - bool shutdown(int how); - bool close(); - ssize_t send(const void *__buf, size_t __n); - ssize_t peek(void *__buf, size_t __n); - ssize_t recv(void *__buf, size_t __n); - ssize_t recv_all(void *__buf, size_t __n); - ssize_t send_all(const void *__buf, size_t __n); - ssize_t recv_packet(); - Socket* accept(); - void resume(); - void yield(); - bool bind(std::string address, int port = 0); - std::string resolve(std::string host); - bool listen(int backlog = 0); - bool sendfile(char *filename, off_t offset, size_t length); - int sendto(char *address, int port, char *data, int len); - int recvfrom(void *__buf, size_t __n, char *address, int *port = nullptr); - swString* get_buffer(); - - void setTimeout(double timeout) - { - _timeout = timeout; - } - -#ifdef SW_USE_OPENSSL - bool ssl_handshake(); - int ssl_verify(bool allow_self_signed); -#endif - -protected: - inline void init() - { - _cid = 0; - suspending = false; - _timeout = 0; - _port = 0; - errCode = 0; - errMsg = nullptr; - timer = nullptr; - bind_port = 0; - _backlog = 0; - - http2 = 0; - shutdow_rw = 0; - shutdown_read = 0; - shutdown_write = 0; - open_length_check = 0; - open_eof_check = 0; - - socks5_proxy = nullptr; - http_proxy = nullptr; - - buffer = nullptr; - protocol = {0}; - - protocol.package_length_type = 'N'; - protocol.package_length_size = 4; - protocol.package_body_offset = 0; - protocol.package_max_length = SW_BUFFER_INPUT_SIZE; - -#ifdef SW_USE_OPENSSL - open_ssl = 0; - ssl_wait_handshake = 0; - ssl_context = NULL; - ssl_option = {0}; -#endif - } - - inline bool wait_events(int events) - { - if (reactor->add(reactor, socket->fd, SW_FD_CORO_SOCKET | events) < 0) - { - errCode = errno; - return false; - } - else - { - return true; - } - } - - bool socks5_handshake(); - bool http_proxy_handshake(); - -public: - swTimer_node *timer; - swReactor *reactor; - std::string _host; - std::string bind_address; - int bind_port; - int _port; - int _cid; - bool suspending; - swConnection *socket; - enum swSocket_type type; - int _sock_type; - int _sock_domain; - double _timeout; - int _backlog; - int errCode; - const char *errMsg; - uint32_t http2 :1; - uint32_t shutdow_rw :1; - uint32_t shutdown_read :1; - uint32_t shutdown_write :1; - /** - * one package: length check - */ - uint32_t open_length_check :1; - uint32_t open_eof_check :1; - - swProtocol protocol; - swString *buffer; - - struct _swSocks5 *socks5_proxy; - struct _http_proxy* http_proxy; - -#ifdef SW_USE_OPENSSL - uint8_t open_ssl :1; - uint8_t ssl_wait_handshake :1; - SSL_CTX *ssl_context; - swSSL_option ssl_option; -#endif -}; - -}; |