diff options
| author | Remi Collet <remi@remirepo.net> | 2018-08-31 14:27:54 +0200 | 
|---|---|---|
| committer | Remi Collet <remi@remirepo.net> | 2018-08-31 14:27:54 +0200 | 
| commit | bb8a1252879e4ccdf0c2b803e6d7b0a58b76038f (patch) | |
| tree | a9032cf9054398871b9eda1a6d91ef01d2a35c4a | |
| parent | 10ce346530a757323bd9ea0aeae3cac1de5cd960 (diff) | |
v4.1.1 (no change)
| -rw-r--r-- | PHPINFO | 2 | ||||
| -rw-r--r-- | REFLECTION | 4 | ||||
| -rw-r--r-- | channel.cc | 151 | ||||
| -rw-r--r-- | channel.h | 107 | ||||
| -rw-r--r-- | php-pecl-swoole4.spec | 13 | ||||
| -rw-r--r-- | socket.cc | 1441 | ||||
| -rw-r--r-- | socket.h | 142 | 
7 files changed, 7 insertions, 1853 deletions
| @@ -2,7 +2,7 @@  swoole  swoole support => enabled -Version => 4.1.0 +Version => 4.1.1  Author => Swoole Group[email: team@swoole.com]  coroutine => enabled  trace-log => enabled @@ -1,4 +1,4 @@ -Extension [ <persistent> extension #148 swoole version 4.1.0 ] { +Extension [ <persistent> extension #148 swoole version 4.1.1 ] {    - INI {      Entry [ swoole.enable_coroutine <ALL> ] @@ -69,7 +69,7 @@ Extension [ <persistent> extension #148 swoole version 4.1.0 ] {      Constant [ integer SWOOLE_DTLSv1_CLIENT_METHOD ] { 17 }      Constant [ integer SWOOLE_EVENT_READ ] { 512 }      Constant [ integer SWOOLE_EVENT_WRITE ] { 1024 } -    Constant [ string SWOOLE_VERSION ] { 4.1.0 } +    Constant [ string SWOOLE_VERSION ] { 4.1.1 }      Constant [ integer SWOOLE_ERROR_MALLOC_FAIL ] { 501 }      Constant [ integer SWOOLE_ERROR_SYSTEM_CALL_FAIL ] { 502 }      Constant [ integer SWOOLE_ERROR_PHP_FATAL_ERROR ] { 503 } diff --git a/channel.cc b/channel.cc deleted file mode 100644 index 88423fc..0000000 --- a/channel.cc +++ /dev/null @@ -1,151 +0,0 @@ -#include "channel.h" - -using namespace swoole; - -static void channel_defer_callback(void *data) -{ -    notify_msg_t *msg = (notify_msg_t*) data; -    coroutine_t *co = msg->chan->pop_coroutine(msg->type); -    coroutine_resume(co); -    delete msg; -} - -static void channel_pop_timeout(swTimer *timer, swTimer_node *tnode) -{ -    timeout_msg_t *msg = (timeout_msg_t *) tnode->data; -    msg->error = true; -    msg->timer = nullptr; -    msg->chan->remove(msg->co); -    coroutine_resume(msg->co); -} - -Channel::Channel(size_t _capacity) -{ -    capacity = _capacity; -    closed = false; -    notify_producer_count = 0; -    notify_consumer_count = 0; -} - -void Channel::yield(enum channel_op type) -{ -    int _cid = coroutine_get_current_cid(); -    if (_cid == -1) -    { -        swError("Socket::yield() must be called in the coroutine."); -    } -    coroutine_t *co = coroutine_get_by_id(_cid); -    if (type == PRODUCER) -    { -        producer_queue.push_back(co); -        swDebug("producer[%d]", coroutine_get_cid(co)); -    } -    else -    { -        consumer_queue.push_back(co); -        swDebug("consumer[%d]", coroutine_get_cid(co)); -    } -    coroutine_yield(co); -} - -void Channel::notify(enum channel_op type) -{ -    notify_msg_t *msg = new notify_msg_t; -    msg->chan = this; -    msg->type = type; -    if (type == PRODUCER) -    { -        notify_producer_count++; -    } -    else -    { -        notify_consumer_count++; -    } -    SwooleG.main_reactor->defer(SwooleG.main_reactor, channel_defer_callback, msg); -} - -void* Channel::pop(double timeout) -{ -    timeout_msg_t msg; -    msg.error = false; -    if (timeout > 0) -    { -        int msec = (int) (timeout * 1000); -        if (SwooleG.timer.fd == 0) -        { -            swTimer_init (msec); -        } -        msg.chan = this; -        msg.co = coroutine_get_by_id(coroutine_get_current_cid()); -        msg.timer = SwooleG.timer.add(&SwooleG.timer, msec, 0, &msg, channel_pop_timeout); -    } -    else -    { -        msg.timer = NULL; -    } -    if (is_empty() || consumer_queue.size() > 0) -    { -        yield(CONSUMER); -    } -    if (msg.error) -    { -        return nullptr; -    } -    if (msg.timer) -    { -        swTimer_del(&SwooleG.timer, msg.timer); -    } -    /** -     * pop data -     */ -    void *data = data_queue.front(); -    data_queue.pop(); -    /** -     * notify producer -     */ -    if (producer_queue.size() > 0 && notify_producer_count < producer_queue.size()) -    { -        notify(PRODUCER); -    } -    return data; -} - -bool Channel::push(void *data) -{ -    if (is_full() || producer_queue.size() > 0) -    { -        yield(PRODUCER); -    } -    /** -     * push data -     */ -    data_queue.push(data); -    swDebug("push data, count=%ld", length()); -    /** -     * notify consumer -     */ -    if (consumer_queue.size() > 0 && notify_consumer_count < consumer_queue.size()) -    { -        notify(CONSUMER); -    } -    return true; -} - -bool Channel::close() -{ -    if (closed) -    { -        return false; -    } -    swDebug("closed"); -    closed = true; -    while (producer_queue.size() > 0 && notify_producer_count < producer_queue.size()) -    { -        notify(PRODUCER); -    } -    while (consumer_queue.size() > 0 && notify_consumer_count < producer_queue.size()) -    { -        notify(CONSUMER); -    } -    return true; -} diff --git a/channel.h b/channel.h deleted file mode 100644 index ab4fb5f..0000000 --- a/channel.h +++ /dev/null @@ -1,107 +0,0 @@ -#pragma once - -#include "swoole.h" -#include "context.h" -#include "coroutine.h" -#include <string> -#include <iostream> -#include <list> -#include <queue> -#include <sys/stat.h> - -namespace swoole { - -enum channel_op -{ -    PRODUCER = 1, -    CONSUMER = 2, -}; - -class Channel; - -struct notify_msg_t -{ -    Channel *chan; -    enum channel_op type; -}; - -struct timeout_msg_t -{ -    Channel *chan; -    coroutine_t *co; -    bool error; -    swTimer_node *timer; -}; - -class Channel -{ -private: -    std::list<coroutine_t *> producer_queue; -    std::list<coroutine_t *> consumer_queue; -    std::queue<void *> data_queue; -    size_t capacity; -    uint32_t notify_producer_count; -    uint32_t notify_consumer_count; - -public: -    bool closed; -    inline bool is_empty() -    { -        return data_queue.size() == 0; -    } - -    inline bool is_full() -    { -        return data_queue.size() == capacity; -    } - -    inline size_t length() -    { -        return data_queue.size(); -    } - -    inline size_t consumer_num() -    { -        return consumer_queue.size(); -    } - -    inline size_t producer_num() -    { -        return producer_queue.size(); -    } - -    inline void remove(coroutine_t *co) -    { -        consumer_queue.remove(co); -    } - -    inline coroutine_t* pop_coroutine(enum channel_op type) -    { -        coroutine_t* co; -        if (type == PRODUCER) -        { -            co = producer_queue.front(); -            producer_queue.pop_front(); -            notify_producer_count--; -            swDebug("resume producer[%d]", coroutine_get_cid(co)); -        } -        else -        { -            co = consumer_queue.front(); -            consumer_queue.pop_front(); -            notify_consumer_count--; -            swDebug("resume consumer[%d]", coroutine_get_cid(co)); -        } -        return co; -    } - -    Channel(size_t _capacity); -    void yield(enum channel_op type); -    void notify(enum channel_op type); -    void* pop(double timeout = 0); -    bool push(void *data); -    bool close(); -}; - -}; - diff --git a/php-pecl-swoole4.spec b/php-pecl-swoole4.spec index 2ceed15..bb14709 100644 --- a/php-pecl-swoole4.spec +++ b/php-pecl-swoole4.spec @@ -36,17 +36,12 @@  Summary:        PHP's asynchronous concurrent distributed networking framework  Name:           %{?sub_prefix}php-pecl-%{pecl_name}4 -Version:        4.1.0 +Version:        4.1.1  Release:        1%{?dist}%{!?scl:%{!?nophptag:%(%{__php} -r 'echo ".".PHP_MAJOR_VERSION.".".PHP_MINOR_VERSION;')}}  License:        BSD  URL:            http://pecl.php.net/package/%{pecl_name}  Source0:        http://pecl.php.net/get/%{pecl_name}-%{version}.tgz -Source1:        https://raw.githubusercontent.com/swoole/swoole-src/master/include/socket.h -Source2:        https://raw.githubusercontent.com/swoole/swoole-src/master/include/channel.h -Source3:        https://raw.githubusercontent.com/swoole/swoole-src/master/src/coroutine/socket.cc -Source4:        https://raw.githubusercontent.com/swoole/swoole-src/master/src/coroutine/channel.cc -  %if 0%{?rhel} == 6  BuildRequires:  devtoolset-6-toolchain  %else @@ -170,9 +165,6 @@ sed -e '/examples/s/role="src"/role="doc"/' \  cd NTS -cp %{SOURCE1} %{SOURCE2} include/ -cp %{SOURCE3} %{SOURCE4} src/coroutine/ -  # Sanity check, really often broken  extver=$(sed -n '/#define PHP_SWOOLE_VERSION/{s/.* "//;s/".*$//;p}' php_swoole.h)  if test "x${extver}" != "x%{version}%{?prever:-%{prever}}"; then @@ -344,6 +336,9 @@ cd ../ZTS  %changelog +* Fri Aug 31 2018 Remi Collet <remi@remirepo.net> - 4.1.1-1 +- update to 4.1.1 (no change) +  * Fri Aug 31 2018 Remi Collet <remi@remirepo.net> - 4.1.0-1  - update to 4.1.0  - add dependency on brotli library (Fedora) diff --git a/socket.cc b/socket.cc deleted file mode 100644 index 7edb58f..0000000 --- a/socket.cc +++ /dev/null @@ -1,1441 +0,0 @@ -#include "socket.h" -#include "context.h" -#include "async.h" -#include "buffer.h" - -#include <string> -#include <iostream> -#include <sys/stat.h> - -using namespace swoole; -using namespace std; - -static int socket_onRead(swReactor *reactor, swEvent *event); -static int socket_onWrite(swReactor *reactor, swEvent *event); -static void socket_onTimeout(swTimer *timer, swTimer_node *tnode); -static void socket_onResolveCompleted(swAio_event *event); - -bool Socket::socks5_handshake() -{ -    swSocks5 *ctx = socks5_proxy; -    char *buf = ctx->buf; -    int n; - -    /** -     * handshake -     */ -    swSocks5_pack(buf, socks5_proxy->username == NULL ? 0x00 : 0x02); -    socks5_proxy->state = SW_SOCKS5_STATE_HANDSHAKE; -    if (send(buf, 3) <= 0) -    { -        return false; -    } -    n = recv(buf, sizeof(ctx->buf)); -    if (n <= 0) -    { -        return false; -    } -    uchar version = buf[0]; -    uchar method = buf[1]; -    if (version != SW_SOCKS5_VERSION_CODE) -    { -        swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported."); -        return SW_ERR; -    } -    if (method != ctx->method) -    { -        swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_METHOD, "SOCKS authentication method not supported."); -        return SW_ERR; -    } -    //authenticate request -    if (method == SW_SOCKS5_METHOD_AUTH) -    { -        buf[0] = 0x01; -        buf[1] = ctx->l_username; - -        buf += 2; -        memcpy(buf, ctx->username, ctx->l_username); -        buf += ctx->l_username; -        buf[0] = ctx->l_password; -        memcpy(buf + 1, ctx->password, ctx->l_password); - -        ctx->state = SW_SOCKS5_STATE_AUTH; - -        if (send(ctx->buf, ctx->l_username + ctx->l_password + 3) < 0) -        { -            return false; -        } - -        n = recv(buf, sizeof(ctx->buf)); -        if (n <= 0) -        { -            return false; -        } - -        uchar version = buf[0]; -        uchar status = buf[1]; -        if (version != 0x01) -        { -            swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported."); -            return false; -        } -        if (status != 0) -        { -            swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_AUTH_FAILED, -                    "SOCKS username/password authentication failed."); -            return false; -        } -        goto send_connect_request; -    } -    //send connect request -    else -    { -        send_connect_request: buf[0] = SW_SOCKS5_VERSION_CODE; -        buf[1] = 0x01; -        buf[2] = 0x00; - -        ctx->state = SW_SOCKS5_STATE_CONNECT; - -        if (ctx->dns_tunnel) -        { -            buf[3] = 0x03; -            buf[4] = ctx->l_target_host; -            buf += 5; -            memcpy(buf, ctx->target_host, ctx->l_target_host); -            buf += ctx->l_target_host; -            *(uint16_t *) buf = htons(ctx->target_port); - -            if (send(ctx->buf, ctx->l_target_host + 7) < 0) -            { -                return false; -            } -        } -        else -        { -            buf[3] = 0x01; -            buf += 4; -            *(uint32_t *) buf = htons(ctx->l_target_host); -            buf += 4; -            *(uint16_t *) buf = htons(ctx->target_port); - -            if (send(ctx->buf, ctx->l_target_host + 7) < 0) -            { -                return false; -            } -        } - -        /** -         * response -         */ -        n = recv(buf, sizeof(ctx->buf)); -        if (n <= 0) -        { -            return false; -        } - -        uchar version = buf[0]; -        if (version != SW_SOCKS5_VERSION_CODE) -        { -            swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported."); -            return false; -        } -        uchar result = buf[1]; -#if 0 -        uchar reg = buf[2]; -        uchar type = buf[3]; -        uint32_t ip = *(uint32_t *) (buf + 4); -        uint16_t port = *(uint16_t *) (buf + 8); -#endif -        if (result == 0) -        { -            ctx->state = SW_SOCKS5_STATE_READY; -        } -        else -        { -            swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_SERVER_ERROR, "Socks5 server error, reason :%s.", -                    swSocks5_strerror(result)); -        } -        return result; -    } -} - -bool Socket::http_proxy_handshake() -{ -#ifdef SW_USE_OPENSSL -    if (socket->ssl) -    { -        if (ssl_handshake() == false) -        { -            return false; -        } -    } -    else -    { -        return true; -    } -#else -    return true; -#endif - -    //CONNECT -    int n = snprintf(http_proxy->buf, sizeof(http_proxy->buf), "CONNECT %*s:%d HTTP/1.1\r\n\r\n", -            http_proxy->l_target_host, http_proxy->target_host, http_proxy->target_port); -    if (send(http_proxy->buf, n) <= 0) -    { -        return false; -    } - -    n = recv(http_proxy->buf, sizeof(http_proxy->buf)); -    if (n <= 0) -    { -        return false; -    } -    char *buf = http_proxy->buf; -    int len = n; -    int state = 0; -    char *p = buf; -    for (p = buf; p < buf + len; p++) -    { -        if (state == 0) -        { -            if (strncasecmp(p, "HTTP/1.1", 8) == 0 || strncasecmp(p, "HTTP/1.0", 8) == 0) -            { -                state = 1; -                p += 8; -            } -            else -            { -                break; -            } -        } -        else if (state == 1) -        { -            if (isspace(*p)) -            { -                continue; -            } -            else -            { -                if (strncasecmp(p, "200", 3) == 0) -                { -                    state = 2; -                    p += 3; -                } -                else -                { -                    break; -                } -            } -        } -        else if (state == 2) -        { -            if (isspace(*p)) -            { -                continue; -            } -            else -            { -                if (strncasecmp(p, "Connection established", sizeof("Connection established") - 1) == 0) -                { -                    return true; -                } -                else -                { -                    break; -                } -            } -        } -    } -    return false; -} - -static inline int socket_connect(int fd, struct sockaddr *addr, socklen_t len) -{ -    int retval; -    while (1) -    { -        retval = ::connect(fd, addr, len); -        if (retval < 0) -        { -            if (errno == EINTR) -            { -                continue; -            } -        } -        break; -    } -    return retval; -} - -Socket::Socket(enum swSocket_type _type) -{ -    type = _type; -    switch (type) -    { -    case SW_SOCK_TCP6: -        _sock_domain = AF_INET6; -        _sock_type = SOCK_STREAM; -        break; -    case SW_SOCK_UNIX_STREAM: -        _sock_domain = AF_UNIX; -        _sock_type = SOCK_STREAM; -        break; -    case SW_SOCK_UDP: -        _sock_domain = AF_INET; -        _sock_type = SOCK_DGRAM; -        break; -    case SW_SOCK_UDP6: -        _sock_domain = AF_INET6; -        _sock_type = SOCK_DGRAM; -        break; -    case SW_SOCK_UNIX_DGRAM: -        _sock_domain = AF_UNIX; -        _sock_type = SOCK_DGRAM; -        break; -    case SW_SOCK_TCP: -    default: -        _sock_domain = AF_INET; -        _sock_type = SOCK_STREAM; -        break; -    } - -#ifdef SOCK_CLOEXEC -    int sockfd = ::socket(_sock_domain, _sock_type | SOCK_CLOEXEC, 0); -#else -    int sockfd = ::socket(_sock_domain, _sock_type, 0); -#endif -    if (sockfd < 0) -    { -        swWarn("socket() failed. Error: %s[%d]", strerror(errno), errno); -        return; -    } - -    if (swIsMaster() && SwooleTG.type == SW_THREAD_REACTOR) -    { -        reactor = SwooleTG.reactor; -    } -    else -    { -        reactor = SwooleG.main_reactor; -    } -    socket = swReactor_get(reactor, sockfd); - -    bzero(socket, sizeof(swConnection)); -    socket->fd = sockfd; -    socket->object = this; -    socket->socket_type = type; - -    swSetNonBlock(socket->fd); -    if (!swReactor_handle_isset(reactor, SW_FD_CORO_SOCKET)) -    { -        reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_READ, socket_onRead); -        reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_WRITE, socket_onWrite); -        reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_ERROR, socket_onRead); -    } -    init(); -} - -Socket::Socket(int _fd, Socket *sock) -{ -    reactor = sock->reactor; - -    socket = swReactor_get(reactor, _fd); -    bzero(socket, sizeof(swConnection)); -    socket->fd = _fd; -    socket->object = this; -    socket->socket_type = sock->type; - -    _sock_domain = sock->_sock_domain; -    _sock_type = sock->_sock_type; -    init(); -} - -bool Socket::connect(string host, int port, int flags) -{ -    //enable socks5 proxy -    if (socks5_proxy) -    { -        socks5_proxy->target_host = (char *) host.c_str(); -        socks5_proxy->l_target_host = host.size(); -        socks5_proxy->target_port = port; - -        host = socks5_proxy->host; -        port = socks5_proxy->port; -    } - -    //enable http proxy -    if (http_proxy) -    { -        http_proxy->target_host = (char *) host.c_str(); -        http_proxy->l_target_host = host.size(); -        http_proxy->target_port = port; - -        host = http_proxy->proxy_host; -        port = http_proxy->proxy_port; -    } - -    if (_sock_domain == AF_INET6 || _sock_domain == AF_INET) -    { -        if (port == -1) -        { -            swWarn("Socket of type AF_INET/AF_INET6 requires port argument"); -            return false; -        } -        else if (port == 0 || port >= 65536) -        { -            swWarn("Invalid port argument[%d]", port); -            return false; -        } -    } - -    if (unlikely(_cid && _cid != coroutine_get_current_cid())) -    { -        swWarn( "socket has already been bound to another coroutine."); -        return false; -    } - -    int retval = 0; -    _host = host; -    _port = port; - -    for (int i = 0; i < 2; i++) -    { -        if (_sock_domain == AF_INET) -        { -            socket->info.addr.inet_v4.sin_family = AF_INET; -            socket->info.addr.inet_v4.sin_port = htons(port); - -            if (!inet_pton(AF_INET, _host.c_str(), & socket->info.addr.inet_v4.sin_addr)) -            { -                _host = resolve(_host); -                if (_host.size() == 0) -                { -                    return false; -                } -                continue; -            } -            else -            { -                socket->info.len = sizeof( socket->info.addr.inet_v4); -                retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.inet_v4, socket->info.len); -                break; -            } -        } -        else if (_sock_domain == AF_INET6) -        { -            socket->info.addr.inet_v6.sin6_family = AF_INET6; -            socket->info.addr.inet_v6.sin6_port = htons(port); - -            if (!inet_pton(AF_INET6, _host.c_str(), &socket->info.addr.inet_v6.sin6_addr)) -            { -                _host = resolve(_host); -                if (_host.size() == 0) -                { -                    return false; -                } -                continue; -            } -            else -            { -                socket->info.len = sizeof(socket->info.addr.inet_v6); -                retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.inet_v6, socket->info.len); -                break; -            } -        } -        else if (_sock_domain == AF_UNIX) -        { -            if (_host.size() >= sizeof(socket->info.addr.un.sun_path)) -            { -                return false; -            } -            socket->info.addr.un.sun_family = AF_UNIX; -            memcpy(&socket->info.addr.un.sun_path, _host.c_str(), _host.size()); -            retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.un, -                    (socklen_t) (offsetof(struct sockaddr_un, sun_path) + _host.size())); -            break; -        } -        else -        { -            return false; -        } -    } - -    if (retval == -1) -    { -        if (errno != EINPROGRESS) -        { -            _error: errCode = errno; -            return false; -        } -        if (!wait_events(SW_EVENT_WRITE)) -        { -            goto _error; -        } -        yield(); -        //Connection has timed out -        if (errCode == ETIMEDOUT) -        { -            errMsg = strerror(errCode); -            return false; -        } -        socklen_t len = sizeof(errCode); -        if (getsockopt(socket->fd, SOL_SOCKET, SO_ERROR, &errCode, &len) < 0 || errCode != 0) -        { -            errMsg = strerror(errCode); -            return false; -        } -    } -    socket->active = 1; -    //socks5 proxy -    if (socks5_proxy && socks5_handshake() == false) -    { -        return false; -    } -    //http proxy -    if (http_proxy && http_proxy_handshake() == false) -    { -        return false; -    } -    return true; -} - -static void socket_onResolveCompleted(swAio_event *event) -{ -    Socket *sock = (Socket *) event->object; -    if (event->error != 0) -    { -        sock->errCode = SW_ERROR_DNSLOOKUP_RESOLVE_FAILED; -    } -    sock->resume(); -} - -static void socket_onTimeout(swTimer *timer, swTimer_node *tnode) -{ -    Socket *sock = (Socket *) tnode->data; -    sock->timer = NULL; -    sock->errCode = ETIMEDOUT; -    swDebug("socket[%d] timeout", sock->socket->fd); -    sock->reactor->del(sock->reactor, sock->socket->fd); -    sock->resume(); -} - -static int socket_onRead(swReactor *reactor, swEvent *event) -{ -    Socket *sock = (Socket *) event->socket->object; -    reactor->del(reactor, event->fd); -    sock->resume(); -    return SW_OK; -} - -static int socket_onWrite(swReactor *reactor, swEvent *event) -{ -    Socket *sock = (Socket *) event->socket->object; -    reactor->del(reactor, event->fd); -    sock->resume(); -    return SW_OK; -} - -ssize_t Socket::peek(void *__buf, size_t __n) -{ -    return swConnection_peek(socket, __buf, __n, 0); -} - -ssize_t Socket::recv(void *__buf, size_t __n) -{ -    ssize_t retval = swConnection_recv(socket, __buf, __n, 0); -    if (retval >= 0) -    { -        return retval; -    } - -    if (swConnection_error(errno) != SW_WAIT) -    { -        errCode = errno; -        return -1; -    } - -    while (true) -    { -        int events = SW_EVENT_READ; -#ifdef SW_USE_OPENSSL -        if (socket->ssl && socket->ssl_want_write) -        { -            events = SW_EVENT_WRITE; -        } -#endif -        if (!wait_events(events)) -        { -            return -1; -        } -        yield(); -        if (errCode == ETIMEDOUT) -        { -            return -1; -        } -        retval = swConnection_recv(socket, __buf, __n, 0); -        if (retval < 0) -        { -            if (swConnection_error(errno) == SW_WAIT) -            { -                continue; -            } -            errCode = errno; -        } -        break; -    } -    return retval; -} - -ssize_t Socket::recv_all(void *__buf, size_t __n) -{ -    ssize_t retval, total_bytes = 0; -    while (true) -    { -        retval = recv((char*) __buf + total_bytes, __n - total_bytes); -        if (retval <= 0) -        { -            if (total_bytes == 0) -            { -                total_bytes = retval; -            } -            break; -        } -        total_bytes += retval; -        if ((size_t) total_bytes == __n) -        { -            break; -        } -    } -    return total_bytes; -} - -ssize_t Socket::send_all(const void *__buf, size_t __n) -{ -    ssize_t retval, total_bytes = 0; -    while (true) -    { -        retval = send((char*) __buf + total_bytes, __n - total_bytes); -        if (retval <= 0) -        { -            if (total_bytes == 0) -            { -                total_bytes = retval; -            } -            break; -        } -        total_bytes += retval; -        if ((size_t) total_bytes == __n) -        { -            break; -        } -    } -    return total_bytes; -} - -ssize_t Socket::send(const void *__buf, size_t __n) -{ -    ssize_t retval = swConnection_send(socket, (void *) __buf, __n, 0); -    if (retval >= 0) -    { -        return retval; -    } - -    if (swConnection_error(errno) != SW_WAIT) -    { -        errCode = errno; -        return -1; -    } - -    while (true) -    { -        int events = SW_EVENT_WRITE; -#ifdef SW_USE_OPENSSL -        if (socket->ssl && socket->ssl_want_read) -        { -            events = SW_EVENT_READ; -        } -#endif -        if (!wait_events(events)) -        { -            return -1; -        } -        yield(); -        if (errCode == ETIMEDOUT) -        { -            return -1; -        } -        ssize_t retval = swConnection_send(socket, (void *) __buf, __n, 0); -        if (retval < 0) -        { -            if (swConnection_error(errno) == SW_WAIT) -            { -                continue; -            } -            errCode = errno; -        } -        break; -    } - -    return retval; -} - -void Socket::yield() -{ -    if (suspending) -    { -        swError("socket has already been bound to another coroutine."); -    } -    errCode = 0; -    if (_timeout > 0) -    { -        int ms = (int) (_timeout * 1000); -        if (SwooleG.timer.fd == 0) -        { -           swTimer_init(ms); -        } -        timer = SwooleG.timer.add(&SwooleG.timer, ms, 0, this, socket_onTimeout); -    } -    _cid = coroutine_get_current_cid(); -    if (_cid == -1) -    { -        swError("Socket::yield() must be called in the coroutine."); -    } -    //suspend -    suspending = true; -    coroutine_yield(coroutine_get_by_id(_cid)); -    suspending = false; -    //clear timer -    if (timer) -    { -        swTimer_del(&SwooleG.timer, timer); -        timer = nullptr; -    } -} - -void Socket::resume() -{ -    coroutine_resume(coroutine_get_by_id(_cid)); -} - -bool Socket::bind(std::string address, int port) -{ -    bind_address = address; -    bind_port = port; - -    struct sockaddr_storage sa_storage = { 0 }; -    struct sockaddr *sock_type = (struct sockaddr*) &sa_storage; - -    int retval; -    switch (_sock_domain) -    { -    case AF_UNIX: -    { -        struct sockaddr_un *sa = (struct sockaddr_un *) sock_type; -        sa->sun_family = AF_UNIX; - -        if (bind_address.size() >= sizeof(sa->sun_path)) -        { -            return false; -        } -        memcpy(&sa->sun_path, bind_address.c_str(), bind_address.size()); - -        retval = ::bind(socket->fd, (struct sockaddr *) sa, -        offsetof(struct sockaddr_un, sun_path) + bind_address.size()); -        break; -    } - -    case AF_INET: -    { -        struct sockaddr_in *sa = (struct sockaddr_in *) sock_type; -        sa->sin_family = AF_INET; -        sa->sin_port = htons((unsigned short) bind_port); -        if (!inet_aton(bind_address.c_str(), &sa->sin_addr)) -        { -            return false; -        } -        retval = ::bind(socket->fd, (struct sockaddr *) sa, sizeof(struct sockaddr_in)); -        break; -    } - -    case AF_INET6: -    { -        struct sockaddr_in6 *sa = (struct sockaddr_in6 *) sock_type; -        sa->sin6_family = AF_INET6; -        sa->sin6_port = htons((unsigned short) bind_port); - -        if (!inet_pton(AF_INET6, bind_address.c_str(), &sa->sin6_addr)) -        { -            return false; -        } -        retval = ::bind(socket->fd, (struct sockaddr *) sa, sizeof(struct sockaddr_in6)); -        break; -    } -    default: -        return false; -    } - -    if (retval != 0) -    { -        errCode = errno; -        return false; -    } - -    return true; -} - -bool Socket::listen(int backlog) -{ -    _backlog = backlog; -    if (::listen(socket->fd, backlog) != 0) -    { -        errCode = errno; -        return false; -    } -    return true; -} - -Socket* Socket::accept() -{ -    if (!wait_events(SW_EVENT_READ)) -    { -        return nullptr; -    } -    yield(); -    if (errCode == ETIMEDOUT) -    { -        return nullptr; -    } -    int conn; -    swSocketAddress client_addr; -    socklen_t client_addrlen = sizeof(client_addr); - -#ifdef HAVE_ACCEPT4 -    conn = ::accept4(socket->fd, (struct sockaddr *) &client_addr, &client_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); -#else -    conn = ::accept(socket->fd, (struct sockaddr *) &client_addr, &client_addrlen); -    if (conn >= 0) -    { -        swoole_fcntl_set_option(conn, 1, 1); -    } -#endif -    if (conn >= 0) -    { -        return new Socket(conn, this); -    } -    else -    { -        errCode = errno; -        return nullptr; -    } -} - -string Socket::resolve(string domain_name) -{ -    swAio_event ev; -    bzero(&ev, sizeof(swAio_event)); -    ev.nbytes = SW_IP_MAX_LENGTH; -    ev.buf = sw_malloc(ev.nbytes); -    if (!ev.buf) -    { -        errCode = errno; -        return ""; -    } - -    memcpy(ev.buf, domain_name.c_str(), domain_name.size()); -    ((char *) ev.buf)[domain_name.size()] = 0; -    ev.flags = _sock_domain; -    ev.type = SW_AIO_GETHOSTBYNAME; -    ev.object = this; -    ev.callback = socket_onResolveCompleted; - -    if (SwooleAIO.init == 0) -    { -        swAio_init(); -    } - -    if (swAio_dispatch(&ev) < 0) -    { -        errCode = SwooleG.error; -        sw_free(ev.buf); -        return ""; -    } -    /** -     * cannot timeout -     */ -    double tmp_timeout = _timeout; -    _timeout = -1; -    yield(); -    _timeout = tmp_timeout; - -    if (errCode == SW_ERROR_DNSLOOKUP_RESOLVE_FAILED) -    { -        errMsg = hstrerror(ev.error); -        return ""; -    } -    else -    { -        string addr((char *) ev.buf); -        sw_free(ev.buf); -        return addr; -    } -} - -bool Socket::shutdown(int __how) -{ -    if (!socket || socket->closed) -    { -        return false; -    } -    if (__how == SHUT_RD) -    { -        if (shutdown_read || shutdow_rw || ::shutdown(socket->fd, SHUT_RD)) -        { -            return false; -        } -        else -        { -            shutdown_read = 1; -            return true; -        } -    } -    else if (__how == SHUT_WR) -    { -        if (shutdown_write || shutdow_rw || ::shutdown(socket->fd, SHUT_RD) < 0) -        { -            return false; -        } -        else -        { -            shutdown_write = 1; -            return true; -        } -    } -    else if (__how == SHUT_RDWR) -    { -        if (shutdow_rw || ::shutdown(socket->fd, SHUT_RDWR) < 0) -        { -            return false; -        } -        else -        { -            shutdown_read = 1; -            return true; -        } -    } -    else -    { -        return false; -    } -} - -bool Socket::close() -{ -    if (socket == NULL || socket->closed) -    { -        return false; -    } -    socket->closed = 1; - -    int fd = socket->fd; - -    if (_sock_type == SW_SOCK_UNIX_DGRAM) -    { -        unlink(socket->info.addr.un.sun_path); -    } - -#ifdef SW_USE_OPENSSL -    if (open_ssl && ssl_context) -    { -        if (socket->ssl) -        { -            swSSL_close(socket); -        } -        swSSL_free_context(ssl_context); -        if (ssl_option.cert_file) -        { -            sw_free(ssl_option.cert_file); -        } -        if (ssl_option.key_file) -        { -            sw_free(ssl_option.key_file); -        } -        if (ssl_option.passphrase) -        { -            sw_free(ssl_option.passphrase); -        } -#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME -        if (ssl_option.tls_host_name) -        { -            sw_free(ssl_option.tls_host_name); -        } -#endif -        if (ssl_option.cafile) -        { -            sw_free(ssl_option.cafile); -        } -        if (ssl_option.capath) -        { -            sw_free(ssl_option.capath); -        } -    } -#endif -    if (_sock_type == SW_SOCK_UNIX_DGRAM) -    { -        unlink(socket->info.addr.un.sun_path); -    } -    if (timer) -    { -        swTimer_del(&SwooleG.timer, timer); -        timer = NULL; -    } -    socket->active = 0; -    ::close(fd); -    return true; -} - -#ifdef SW_USE_OPENSSL -bool Socket::ssl_handshake() -{ -    if (socket->ssl) -    { -        return false; -    } - -    ssl_context = swSSL_get_context(&ssl_option); -    if (ssl_context == NULL) -    { -        return false; -    } - -    if (ssl_option.verify_peer) -    { -        if (swSSL_set_capath(&ssl_option, ssl_context) < 0) -        { -            return false; -        } -    } - -    socket->ssl_send = 1; -#if defined(SW_USE_HTTP2) && defined(SW_USE_OPENSSL) && OPENSSL_VERSION_NUMBER >= 0x10002000L -    if (http2) -    { -        if (SSL_CTX_set_alpn_protos(ssl_context, (const unsigned char *) "\x02h2", 3) < 0) -        { -            return false; -        } -    } -#endif - -    if (swSSL_create(socket, ssl_context, SW_SSL_CLIENT) < 0) -    { -        return false; -    } -#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME -    if (ssl_option.tls_host_name) -    { -        SSL_set_tlsext_host_name(socket->ssl, ssl_option.tls_host_name); -    } -#endif - -    while (true) -    { -        int retval = swSSL_connect(socket); -        if (retval < 0) -        { -            errCode = SwooleG.error; -            return false; -        } -        if (socket->ssl_state == SW_SSL_STATE_WAIT_STREAM) -        { -            int events = socket->ssl_want_write ? SW_EVENT_WRITE : SW_EVENT_READ; -            if (!wait_events(events)) -            { -                return false; -            } -            yield(); -            if (errCode == ETIMEDOUT) -            { -                return false; -            } -        } -        else if (socket->ssl_state == SW_SSL_STATE_READY) -        { -            return true; -        } -    } - -    if (socket->ssl_state == SW_SSL_STATE_READY && ssl_option.verify_peer) -    { -        if (ssl_verify(ssl_option.allow_self_signed) < 0) -        { -            return false; -        } -    } -    return true; -} - -int Socket::ssl_verify(bool allow_self_signed) -{ -    if (swSSL_verify(socket, allow_self_signed) < 0) -    { -        return SW_ERR; -    } -    if (ssl_option.tls_host_name && swSSL_check_host(socket, ssl_option.tls_host_name) < 0) -    { -        return SW_ERR; -    } -    return SW_OK; -} -#endif - -bool Socket::sendfile(char *filename, off_t offset, size_t length) -{ -    int file_fd = open(filename, O_RDONLY); -    if (file_fd < 0) -    { -        swSysError("open(%s) failed.", filename); -        return false; -    } - -    if (length == 0) -    { -        struct stat file_stat; -        if (::fstat(file_fd, &file_stat) < 0) -        { -            swSysError("fstat(%s) failed.", filename); -            ::close(file_fd); -            return false; -        } -        length = file_stat.st_size; -    } -    else -    { -        // total length of the file -        length = offset + length; -    } - -    int n, sendn; -    while ((size_t) offset < length) -    { -        sendn = (length - offset > SW_SENDFILE_CHUNK_SIZE) ? SW_SENDFILE_CHUNK_SIZE : length - offset; -#ifdef SW_USE_OPENSSL -        if (socket->ssl) -        { -            n = swSSL_sendfile(socket, file_fd, &offset, sendn); -        } -        else -#endif -        { -            n = ::swoole_sendfile(socket->fd, file_fd, &offset, sendn); -        } -        if (n > 0) -        { -            continue; -        } -        else if (n == 0) -        { -            swWarn("sendfile return zero."); -            return false; -        } -        else if (errno != EAGAIN) -        { -            swSysError("sendfile(%d, %s) failed.", socket->fd, filename); -            _error: errCode = errno; -            ::close(file_fd); -            return false; -        } -        if (!wait_events(SW_EVENT_WRITE)) -        { -            goto _error; -        } -        yield(); -        if (errCode == ETIMEDOUT) -        { -            goto _error; -        } -    } -    ::close(file_fd); -    return true; -} - -int Socket::sendto(char *address, int port, char *data, int len) -{ -    if (type == SW_SOCK_UDP) -    { -        return swSocket_udp_sendto(socket->fd, address, port, data, len); -    } -    else if (type == SW_SOCK_UDP6) -    { -        return swSocket_udp_sendto6(socket->fd, address, port, data, len); -    } -    else -    { -        swWarn("only supports SWOOLE_SOCK_UDP or SWOOLE_SOCK_UDP6."); -        return -1; -    } -} - -int Socket::recvfrom(void *__buf, size_t __n, char *address, int *port) -{ -    socket->info.len = sizeof(socket->info.addr); -    int retval; - -    _recv: retval = ::recvfrom(socket->fd, __buf, __n, 0, (struct sockaddr *) &socket->info.addr, &socket->info.len); -    if (retval < 0) -    { -        if (errno == EINTR) -        { -            goto _recv; -        } -        else if (swConnection_error(errno) == SW_WAIT) -        { -            if (!wait_events(SW_EVENT_READ)) -            { -                return -1; -            } -            yield(); -            if (errCode == ETIMEDOUT) -            { -                return -1; -            } -            retval = ::recvfrom(socket->fd, __buf, __n, 0, (struct sockaddr *) &socket->info.addr, &socket->info.len); -            if (retval < 0) -            { -                errCode = errno; -            } -            else -            { -                strcpy(address, swConnection_get_ip(socket)); -                *port = swConnection_get_port(socket); -            } -        } -        else -        { -            errCode = errno; -        } -    } -    return retval; -} - -/** - * recv packet with protocol - */ -ssize_t Socket::recv_packet() -{ -    get_buffer(); -    ssize_t buf_len = SW_BUFFER_SIZE_STD; -    ssize_t retval; - -    if (open_length_check) -    { -        uint32_t header_len; - -        _get_header_len: header_len = protocol.package_length_offset + protocol.package_length_size; -        if (buffer->length > 0) -        { -            if (buffer->length < header_len) -            { -                goto _recv_header; -            } -            else -            { -                goto _get_length; -            } -        } - -        _recv_header: retval = recv(buffer->str + buffer->length, header_len - buffer->length); -        if (retval <= 0) -        { -            return 0; -        } -        else if (retval < 0 || retval != header_len) -        { -            return 0; -        } -        else -        { -            buffer->length += retval; -        } - -        _get_length: buf_len = protocol.get_package_length(&protocol, socket, buffer->str, (uint32_t) buffer->length); -        swDebug("packet_len=%ld, length=%ld", buf_len, buffer->length); -        //error package -        if (buf_len < 0) -        { -            return 0; -        } -        else if (buf_len == 0) -        { -            header_len = protocol.real_header_length; -            goto _recv_header; -        } -        //empty package -        else if (buf_len == header_len) -        { -            buffer->length = 0; -            return header_len; -        } -        else if (buf_len > protocol.package_max_length) -        { -            swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "packet[length=%d] is too big.", (int )buf_len); -            return 0; -        } - -        if ((size_t) buf_len == buffer->length) -        { -            buffer->length = 0; -            return buf_len; -        } -        else if ((size_t) buf_len < buffer->length) -        { -            buffer->length = buffer->length - buf_len; -            memmove(buffer->str, buffer->str + buf_len, buffer->length); -            goto _get_header_len; -        } - -        if ((size_t) buf_len >= buffer->size) -        { -            if (swString_extend(buffer, buf_len) < 0) -            { -                buffer->length = 0; -                return -1; -            } -        } - -        retval = recv_all(buffer->str + buffer->length, buf_len - buffer->length); -        if (retval > 0) -        { -            buffer->length += retval; -            if (buffer->length != (size_t) buf_len) -            { -                retval = 0; -            } -            else -            { -                buffer->length = 0; -                return buf_len; -            } -        } -    } -    else if (open_eof_check) -    { -        int eof = -1; -        char *buf; - -        if (buffer->length > 0) -        { -            goto find_eof; -        } - -        while (1) -        { -            buf = buffer->str + buffer->length; -            buf_len = buffer->size - buffer->length; - -            if (buf_len > SW_BUFFER_SIZE_BIG) -            { -                buf_len = SW_BUFFER_SIZE_BIG; -            } - -            retval = recv(buf, buf_len); -            if (retval < 0) -            { -                buffer->length = 0; -                return -1; -            } -            else if (retval == 0) -            { -                buffer->length = 0; -                return 0; -            } - -            buffer->length += retval; - -            if (buffer->length < protocol.package_eof_len) -            { -                continue; -            } - -            find_eof: eof = swoole_strnpos(buffer->str, buffer->length, protocol.package_eof, protocol.package_eof_len); -            if (eof >= 0) -            { -                eof += protocol.package_eof_len; -                if (buffer->length > (uint32_t) eof) -                { -                    buffer->length -= eof; -                    memmove(buffer->str, buffer->str + eof, buffer->length); -                } -                else -                { -                    buffer->length = 0; -                } -                return eof; -            } -            else -            { -                if (buffer->length == protocol.package_max_length) -                { -                    swWarn("no package eof"); -                    buffer->length = 0; -                    return -1; -                } -                else if (buffer->length == buffer->size) -                { -                    if (buffer->size < protocol.package_max_length) -                    { -                        size_t new_size = buffer->size * 2; -                        if (new_size > protocol.package_max_length) -                        { -                            new_size = protocol.package_max_length; -                        } -                        if (swString_extend(buffer, new_size) < 0) -                        { -                            buffer->length = 0; -                            return -1; -                        } -                    } -                } -            } -        } -        buffer->length = 0; -    } -    else -    { -        return -1; -    } - -    return retval; -} - -swString* Socket::get_buffer() -{ -    if (unlikely(buffer == nullptr)) -    { -        buffer = swString_new(SW_BUFFER_SIZE_STD); -    } -    return buffer; -} - -Socket::~Socket() -{ -    if (!socket->closed) -    { -        close(); -    } -    if (socket->out_buffer) -    { -        swBuffer_free(socket->out_buffer); -        socket->out_buffer = NULL; -    } -    if (socket->in_buffer) -    { -        swBuffer_free(socket->in_buffer); -        socket->in_buffer = NULL; -    } -    if (buffer) -    { -        swString_free(buffer); -    } -    bzero(socket, sizeof(swConnection)); -    socket->removed = 1; -} diff --git a/socket.h b/socket.h deleted file mode 100644 index 7a3d039..0000000 --- a/socket.h +++ /dev/null @@ -1,142 +0,0 @@ -#pragma once - -#include "swoole.h" -#include "connection.h" -#include "socks5.h" -#include <string> - -namespace swoole -{ -class Socket -{ -public: -    Socket(enum swSocket_type type); -    Socket(int _fd, Socket *sock); -    ~Socket(); -    bool connect(std::string host, int port, int flags = 0); -    bool shutdown(int how); -    bool close(); -    ssize_t send(const void *__buf, size_t __n); -    ssize_t peek(void *__buf, size_t __n); -    ssize_t recv(void *__buf, size_t __n); -    ssize_t recv_all(void *__buf, size_t __n); -    ssize_t send_all(const void *__buf, size_t __n); -    ssize_t recv_packet(); -    Socket* accept(); -    void resume(); -    void yield(); -    bool bind(std::string address, int port = 0); -    std::string resolve(std::string host); -    bool listen(int backlog = 0); -    bool sendfile(char *filename, off_t offset, size_t length); -    int sendto(char *address, int port, char *data, int len); -    int recvfrom(void *__buf, size_t __n, char *address, int *port = nullptr); -    swString* get_buffer(); - -    void setTimeout(double timeout) -    { -        _timeout = timeout; -    } - -#ifdef SW_USE_OPENSSL -    bool ssl_handshake(); -    int ssl_verify(bool allow_self_signed); -#endif - -protected: -    inline void init() -    { -        _cid = 0; -        suspending = false; -        _timeout = 0; -        _port = 0; -        errCode = 0; -        errMsg = nullptr; -        timer = nullptr; -        bind_port = 0; -        _backlog = 0; - -        http2 = 0; -        shutdow_rw = 0; -        shutdown_read = 0; -        shutdown_write = 0; -        open_length_check = 0; -        open_eof_check = 0; - -        socks5_proxy = nullptr; -        http_proxy = nullptr; - -        buffer = nullptr; -        protocol = {0}; - -        protocol.package_length_type = 'N'; -        protocol.package_length_size = 4; -        protocol.package_body_offset = 0; -        protocol.package_max_length = SW_BUFFER_INPUT_SIZE; - -#ifdef SW_USE_OPENSSL -        open_ssl = 0; -        ssl_wait_handshake = 0; -        ssl_context = NULL; -        ssl_option = {0}; -#endif -    } - -    inline bool wait_events(int events) -    { -        if (reactor->add(reactor, socket->fd, SW_FD_CORO_SOCKET | events) < 0) -        { -            errCode = errno; -            return false; -        } -        else -        { -            return true; -        } -    } - -    bool socks5_handshake(); -    bool http_proxy_handshake(); - -public: -    swTimer_node *timer; -    swReactor *reactor; -    std::string _host; -    std::string bind_address; -    int bind_port; -    int _port; -    int _cid; -    bool suspending; -    swConnection *socket; -    enum swSocket_type type; -    int _sock_type; -    int _sock_domain; -    double _timeout; -    int _backlog; -    int errCode; -    const char *errMsg; -    uint32_t http2 :1; -    uint32_t shutdow_rw :1; -    uint32_t shutdown_read :1; -    uint32_t shutdown_write :1; -    /** -     * one package: length check -     */ -    uint32_t open_length_check :1; -    uint32_t open_eof_check :1; - -    swProtocol protocol; -    swString *buffer; - -    struct _swSocks5 *socks5_proxy; -    struct _http_proxy* http_proxy; - -#ifdef SW_USE_OPENSSL -    uint8_t open_ssl :1; -    uint8_t ssl_wait_handshake :1; -    SSL_CTX *ssl_context; -    swSSL_option ssl_option; -#endif -}; - -}; | 
