diff options
| -rw-r--r-- | PHPINFO | 10 | ||||
| -rw-r--r-- | REFLECTION | 272 | ||||
| -rw-r--r-- | channel.cc | 151 | ||||
| -rw-r--r-- | channel.h | 107 | ||||
| -rw-r--r-- | php-pecl-swoole4.spec | 43 | ||||
| -rw-r--r-- | socket.cc | 1441 | ||||
| -rw-r--r-- | socket.h | 142 | 
7 files changed, 2138 insertions, 28 deletions
| @@ -2,29 +2,31 @@  swoole  swoole support => enabled -Version => 4.0.4 +Version => 4.1.0  Author => Swoole Group[email: team@swoole.com]  coroutine => enabled +trace-log => enabled  epoll => enabled  eventfd => enabled  signalfd => enabled  cpu affinity => enabled  spinlock => enabled  rwlock => enabled -async postgresql => enabled -async redis client => enabled -async http/websocket client => enabled  sockets => enabled  openssl => enabled  http2 => enabled  pcre => enabled  zlib => enabled +brotli => enabled  mutex_timedlock => enabled  pthread_barrier => enabled  futex => enabled  mysqlnd => enabled +redis client => enabled +postgresql client => enabled  Directive => Local Value => Master Value +swoole.enable_coroutine => On => On  swoole.aio_thread_num => 2 => 2  swoole.display_errors => On => On  swoole.use_namespace => On => On @@ -1,6 +1,9 @@ -Extension [ <persistent> extension #147 swoole version 4.0.4 ] { +Extension [ <persistent> extension #148 swoole version 4.1.0 ] {    - INI { +    Entry [ swoole.enable_coroutine <ALL> ] +      Current = 'On' +    }      Entry [ swoole.aio_thread_num <ALL> ]        Current = '2'      } @@ -21,7 +24,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {      }    } -  - Constants [164] { +  - Constants [192] {      Constant [ integer SWOOLE_BASE ] { 4 }      Constant [ integer SWOOLE_THREAD ] { 2 }      Constant [ integer SWOOLE_PROCESS ] { 3 } @@ -66,7 +69,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {      Constant [ integer SWOOLE_DTLSv1_CLIENT_METHOD ] { 17 }      Constant [ integer SWOOLE_EVENT_READ ] { 512 }      Constant [ integer SWOOLE_EVENT_WRITE ] { 1024 } -    Constant [ string SWOOLE_VERSION ] { 4.0.4 } +    Constant [ string SWOOLE_VERSION ] { 4.1.0 }      Constant [ integer SWOOLE_ERROR_MALLOC_FAIL ] { 501 }      Constant [ integer SWOOLE_ERROR_SYSTEM_CALL_FAIL ] { 502 }      Constant [ integer SWOOLE_ERROR_PHP_FATAL_ERROR ] { 503 } @@ -156,6 +159,8 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {      Constant [ integer SW_PGSQL_ASSOC ] { 1 }      Constant [ integer SW_PGSQL_NUM ] { 2 }      Constant [ integer SW_PGSQL_BOTH ] { 3 } +    Constant [ integer SWOOLE_EXIT_IN_COROUTINE ] { 2 } +    Constant [ integer SWOOLE_EXIT_IN_SERVER ] { 4 }      Constant [ integer SWOOLE_AIO_BASE ] { 0 }      Constant [ integer SWOOLE_AIO_LINUX ] { 0 }      Constant [ integer SWOOLE_FILELOCK ] { 2 } @@ -163,13 +168,39 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {      Constant [ integer SWOOLE_SEM ] { 4 }      Constant [ integer SWOOLE_RWLOCK ] { 1 }      Constant [ integer SWOOLE_SPINLOCK ] { 5 } -    Constant [ integer WEBSOCKET_OPCODE_TEXT ] { 1 } -    Constant [ integer WEBSOCKET_OPCODE_BINARY ] { 2 } -    Constant [ integer WEBSOCKET_OPCODE_PING ] { 9 }      Constant [ integer WEBSOCKET_STATUS_CONNECTION ] { 1 }      Constant [ integer WEBSOCKET_STATUS_HANDSHAKE ] { 2 }      Constant [ integer WEBSOCKET_STATUS_FRAME ] { 3 }      Constant [ integer WEBSOCKET_STATUS_ACTIVE ] { 3 } +    Constant [ integer WEBSOCKET_STATUS_CLOSING ] { 4 } +    Constant [ integer WEBSOCKET_OPCODE_CONTINUATION ] { 0 } +    Constant [ integer WEBSOCKET_OPCODE_TEXT ] { 1 } +    Constant [ integer WEBSOCKET_OPCODE_BINARY ] { 2 } +    Constant [ integer WEBSOCKET_OPCODE_CLOSE ] { 8 } +    Constant [ integer WEBSOCKET_OPCODE_PING ] { 9 } +    Constant [ integer WEBSOCKET_OPCODE_PONG ] { 10 } +    Constant [ integer WEBSOCKET_CLOSE_NORMAL ] { 1000 } +    Constant [ integer WEBSOCKET_CLOSE_GOING_AWAY ] { 1001 } +    Constant [ integer WEBSOCKET_CLOSE_PROTOCOL_ERROR ] { 1002 } +    Constant [ integer WEBSOCKET_CLOSE_DATA_ERROR ] { 1003 } +    Constant [ integer WEBSOCKET_CLOSE_STATUS_ERROR ] { 1005 } +    Constant [ integer WEBSOCKET_CLOSE_ABNORMAL ] { 1006 } +    Constant [ integer WEBSOCKET_CLOSE_MESSAGE_ERROR ] { 1007 } +    Constant [ integer WEBSOCKET_CLOSE_POLICY_ERROR ] { 1008 } +    Constant [ integer WEBSOCKET_CLOSE_MESSAGE_TOO_BIG ] { 1009 } +    Constant [ integer WEBSOCKET_CLOSE_EXTENSION_MISSING ] { 1010 } +    Constant [ integer WEBSOCKET_CLOSE_SERVER_ERROR ] { 1011 } +    Constant [ integer WEBSOCKET_CLOSE_TLS ] { 1015 } +    Constant [ integer SWOOLE_HTTP2_TYPE_DATA ] { 0 } +    Constant [ integer SWOOLE_HTTP2_TYPE_HEADERS ] { 1 } +    Constant [ integer SWOOLE_HTTP2_TYPE_PRIORITY ] { 2 } +    Constant [ integer SWOOLE_HTTP2_TYPE_RST_STREAM ] { 3 } +    Constant [ integer SWOOLE_HTTP2_TYPE_SETTINGS ] { 4 } +    Constant [ integer SWOOLE_HTTP2_TYPE_PUSH_PROMISE ] { 5 } +    Constant [ integer SWOOLE_HTTP2_TYPE_PING ] { 6 } +    Constant [ integer SWOOLE_HTTP2_TYPE_GOAWAY ] { 7 } +    Constant [ integer SWOOLE_HTTP2_TYPE_WINDOW_UPDATE ] { 8 } +    Constant [ integer SWOOLE_HTTP2_TYPE_CONTINUATION ] { 9 }      Constant [ integer SWOOLE_HTTP2_ERROR_NO_ERROR ] { 0 }      Constant [ integer SWOOLE_HTTP2_ERROR_PROTOCOL_ERROR ] { 1 }      Constant [ integer SWOOLE_HTTP2_ERROR_INTERNAL_ERROR ] { 2 } @@ -428,7 +459,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {      }    } -  - Classes [48] { +  - Classes [51] {      Class [ <internal:swoole> class Swoole\Server ] {        - Constants [0] { @@ -995,7 +1026,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {        }      } -    Class [ <internal:swoole> <iterateable> class Swoole\Connection\Iterator implements Iterator, Traversable, Countable, ArrayAccess ] { +    Class [ <internal:swoole> <iterateable> class Swoole\Connection\Iterator implements Iterator, Traversable, ArrayAccess, Countable ] {        - Constants [0] {        } @@ -1591,16 +1622,15 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {        - Static methods [0] {        } -      - Properties [6] { +      - Properties [5] {          Property [ <default> public $errCode ]          Property [ <default> public $sock ]          Property [ <default> public $type ] -        Property [ <default> public $id ]          Property [ <default> public $setting ]          Property [ <default> public $connected ]        } -      - Methods [17] { +      - Methods [18] {          Method [ <internal:swoole, ctor> public method __construct ] {            - Parameters [1] { @@ -1646,9 +1676,8 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {          Method [ <internal:swoole> public method send ] { -          - Parameters [2] { +          - Parameters [1] {              Parameter #0 [ <required> $data ] -            Parameter #1 [ <optional> $flag ]            }          } @@ -1664,12 +1693,21 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {          Method [ <internal:swoole> public method sendto ] {            - Parameters [3] { -            Parameter #0 [ <required> $ip ] +            Parameter #0 [ <required> $address ]              Parameter #1 [ <required> $port ]              Parameter #2 [ <required> $data ]            }          } +        Method [ <internal:swoole> public method recvfrom ] { + +          - Parameters [3] { +            Parameter #0 [ <required> $length ] +            Parameter #1 [ <required> &$address ] +            Parameter #2 [ <optional> &$port ] +          } +        } +          Method [ <internal:swoole> public method enableSSL ] {            - Parameters [0] { @@ -2801,7 +2839,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {          Property [ <default> public $body ]        } -      - Methods [19] { +      - Methods [20] {          Method [ <internal:swoole, ctor> public method __construct ] {            - Parameters [3] { @@ -2902,6 +2940,16 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {            }          } +        Method [ <internal:swoole> public method addData ] { + +          - Parameters [4] { +            Parameter #0 [ <required> $path ] +            Parameter #1 [ <required> $name ] +            Parameter #2 [ <optional> $type ] +            Parameter #3 [ <optional> $filename ] +          } +        } +          Method [ <internal:swoole> public method isConnected ] {            - Parameters [0] { @@ -2953,7 +3001,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {        - Static properties [0] {        } -      - Static methods [16] { +      - Static methods [18] {          Method [ <internal:swoole> static public method create ] {            - Parameters [1] { @@ -3071,6 +3119,21 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {              Parameter #4 [ <optional> $service ]            }          } + +        Method [ <internal:swoole> static public method getBackTrace ] { + +          - Parameters [3] { +            Parameter #0 [ <required> $cid ] +            Parameter #1 [ <optional> $options ] +            Parameter #2 [ <optional> $limit ] +          } +        } + +        Method [ <internal:swoole> static public method listCoroutines ] { + +          - Parameters [0] { +          } +        }        }        - Properties [0] { @@ -3080,6 +3143,134 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {        }      } +    Class [ <internal:swoole> <iterateable> class Swoole\Coroutine\Iterator implements Iterator, Traversable, Countable ] { + +      - Constants [0] { +      } + +      - Static properties [0] { +      } + +      - Static methods [0] { +      } + +      - Properties [0] { +      } + +      - Methods [7] { +        Method [ <internal:swoole, prototype Iterator> public method rewind ] { + +          - Parameters [0] { +          } +        } + +        Method [ <internal:swoole, prototype Iterator> public method next ] { + +          - Parameters [0] { +          } +        } + +        Method [ <internal:swoole, prototype Iterator> public method current ] { + +          - Parameters [0] { +          } +        } + +        Method [ <internal:swoole, prototype Iterator> public method key ] { + +          - Parameters [0] { +          } +        } + +        Method [ <internal:swoole, prototype Iterator> public method valid ] { + +          - Parameters [0] { +          } +        } + +        Method [ <internal:swoole, prototype Countable> public method count ] { + +          - Parameters [0] { +          } +        } + +        Method [ <internal:swoole, dtor> public method __destruct ] { + +          - Parameters [0] { +          } +        } +      } +    } + +    Class [ <internal:swoole> class Swoole\ExitException extends Exception implements Throwable ] { + +      - Constants [0] { +      } + +      - Static properties [0] { +      } + +      - Static methods [0] { +      } + +      - Properties [4] { +        Property [ <default> protected $message ] +        Property [ <default> protected $code ] +        Property [ <default> protected $file ] +        Property [ <default> protected $line ] +      } + +      - Methods [12] { +        Method [ <internal:swoole> public method getFlags ] { + +          - Parameters [0] { +          } +        } + +        Method [ <internal:swoole> public method getStatus ] { + +          - Parameters [0] { +          } +        } + +        Method [ <internal:Core, inherits Exception, ctor> public method __construct ] { + +          - Parameters [3] { +            Parameter #0 [ <optional> $message ] +            Parameter #1 [ <optional> $code ] +            Parameter #2 [ <optional> $previous ] +          } +        } + +        Method [ <internal:Core, inherits Exception> public method __wakeup ] { +        } + +        Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getMessage ] { +        } + +        Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getCode ] { +        } + +        Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getFile ] { +        } + +        Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getLine ] { +        } + +        Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getTrace ] { +        } + +        Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getPrevious ] { +        } + +        Method [ <internal:Core, inherits Exception, prototype Throwable> final public method getTraceAsString ] { +        } + +        Method [ <internal:Core, inherits Exception, prototype Throwable> public method __toString ] { +        } +      } +    } +      Class [ <internal:swoole> class Swoole\Http\Client ] {        - Constants [0] { @@ -3495,7 +3686,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {        }      } -    Class [ <internal:swoole> <iterateable> class Swoole\Table implements ArrayAccess, Iterator, Traversable, Countable ] { +    Class [ <internal:swoole> <iterateable> class Swoole\Table implements Iterator, Traversable, ArrayAccess, Countable ] {        - Constants [3] {          Constant [ public integer TYPE_INT ] { 1 } @@ -3717,6 +3908,36 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {        }      } +    Class [ <internal:swoole> class Swoole\Runtime ] { + +      - Constants [0] { +      } + +      - Static properties [0] { +      } + +      - Static methods [2] { +        Method [ <internal:swoole> static public method enableStrictMode ] { + +          - Parameters [0] { +          } +        } + +        Method [ <internal:swoole> static public method enableCoroutine ] { + +          - Parameters [1] { +            Parameter #0 [ <optional> $enable ] +          } +        } +      } + +      - Properties [0] { +      } + +      - Methods [0] { +      } +    } +      Class [ <internal:swoole> class Swoole\Lock ] {        - Constants [5] { @@ -4344,8 +4565,9 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {          Method [ <internal:swoole> public method status ] { -          - Parameters [1] { +          - Parameters [2] {              Parameter #0 [ <required> $http_code ] +            Parameter #1 [ <optional> $reason ]            }          } @@ -4430,8 +4652,9 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {        - Static methods [0] {        } -      - Properties [9] { +      - Properties [10] {          Property [ <default> public $fd ] +        Property [ <default> public $streamId ]          Property [ <default> public $header ]          Property [ <default> public $server ]          Property [ <default> public $request ] @@ -5668,7 +5891,7 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {          Property [ <default> public $port ]        } -      - Methods [10] { +      - Methods [11] {          Method [ <internal:swoole, ctor> public method __construct ] {            - Parameters [3] { @@ -5700,7 +5923,14 @@ Extension [ <persistent> extension #147 swoole version 4.0.4 ] {          Method [ <internal:swoole> public method stats ] {            - Parameters [1] { -            Parameter #0 [ <required> $key ] +            Parameter #0 [ <optional> $key ] +          } +        } + +        Method [ <internal:swoole> public method isStreamExist ] { + +          - Parameters [1] { +            Parameter #0 [ <required> $stream_id ]            }          } diff --git a/channel.cc b/channel.cc new file mode 100644 index 0000000..88423fc --- /dev/null +++ b/channel.cc @@ -0,0 +1,151 @@ +#include "channel.h" + +using namespace swoole; + +static void channel_defer_callback(void *data) +{ +    notify_msg_t *msg = (notify_msg_t*) data; +    coroutine_t *co = msg->chan->pop_coroutine(msg->type); +    coroutine_resume(co); +    delete msg; +} + +static void channel_pop_timeout(swTimer *timer, swTimer_node *tnode) +{ +    timeout_msg_t *msg = (timeout_msg_t *) tnode->data; +    msg->error = true; +    msg->timer = nullptr; +    msg->chan->remove(msg->co); +    coroutine_resume(msg->co); +} + +Channel::Channel(size_t _capacity) +{ +    capacity = _capacity; +    closed = false; +    notify_producer_count = 0; +    notify_consumer_count = 0; +} + +void Channel::yield(enum channel_op type) +{ +    int _cid = coroutine_get_current_cid(); +    if (_cid == -1) +    { +        swError("Socket::yield() must be called in the coroutine."); +    } +    coroutine_t *co = coroutine_get_by_id(_cid); +    if (type == PRODUCER) +    { +        producer_queue.push_back(co); +        swDebug("producer[%d]", coroutine_get_cid(co)); +    } +    else +    { +        consumer_queue.push_back(co); +        swDebug("consumer[%d]", coroutine_get_cid(co)); +    } +    coroutine_yield(co); +} + +void Channel::notify(enum channel_op type) +{ +    notify_msg_t *msg = new notify_msg_t; +    msg->chan = this; +    msg->type = type; +    if (type == PRODUCER) +    { +        notify_producer_count++; +    } +    else +    { +        notify_consumer_count++; +    } +    SwooleG.main_reactor->defer(SwooleG.main_reactor, channel_defer_callback, msg); +} + +void* Channel::pop(double timeout) +{ +    timeout_msg_t msg; +    msg.error = false; +    if (timeout > 0) +    { +        int msec = (int) (timeout * 1000); +        if (SwooleG.timer.fd == 0) +        { +            swTimer_init (msec); +        } +        msg.chan = this; +        msg.co = coroutine_get_by_id(coroutine_get_current_cid()); +        msg.timer = SwooleG.timer.add(&SwooleG.timer, msec, 0, &msg, channel_pop_timeout); +    } +    else +    { +        msg.timer = NULL; +    } +    if (is_empty() || consumer_queue.size() > 0) +    { +        yield(CONSUMER); +    } +    if (msg.error) +    { +        return nullptr; +    } +    if (msg.timer) +    { +        swTimer_del(&SwooleG.timer, msg.timer); +    } +    /** +     * pop data +     */ +    void *data = data_queue.front(); +    data_queue.pop(); +    /** +     * notify producer +     */ +    if (producer_queue.size() > 0 && notify_producer_count < producer_queue.size()) +    { +        notify(PRODUCER); +    } +    return data; +} + +bool Channel::push(void *data) +{ +    if (is_full() || producer_queue.size() > 0) +    { +        yield(PRODUCER); +    } +    /** +     * push data +     */ +    data_queue.push(data); +    swDebug("push data, count=%ld", length()); +    /** +     * notify consumer +     */ +    if (consumer_queue.size() > 0 && notify_consumer_count < consumer_queue.size()) +    { +        notify(CONSUMER); +    } +    return true; +} + +bool Channel::close() +{ +    if (closed) +    { +        return false; +    } +    swDebug("closed"); +    closed = true; +    while (producer_queue.size() > 0 && notify_producer_count < producer_queue.size()) +    { +        notify(PRODUCER); +    } +    while (consumer_queue.size() > 0 && notify_consumer_count < producer_queue.size()) +    { +        notify(CONSUMER); +    } +    return true; +} diff --git a/channel.h b/channel.h new file mode 100644 index 0000000..ab4fb5f --- /dev/null +++ b/channel.h @@ -0,0 +1,107 @@ +#pragma once + +#include "swoole.h" +#include "context.h" +#include "coroutine.h" +#include <string> +#include <iostream> +#include <list> +#include <queue> +#include <sys/stat.h> + +namespace swoole { + +enum channel_op +{ +    PRODUCER = 1, +    CONSUMER = 2, +}; + +class Channel; + +struct notify_msg_t +{ +    Channel *chan; +    enum channel_op type; +}; + +struct timeout_msg_t +{ +    Channel *chan; +    coroutine_t *co; +    bool error; +    swTimer_node *timer; +}; + +class Channel +{ +private: +    std::list<coroutine_t *> producer_queue; +    std::list<coroutine_t *> consumer_queue; +    std::queue<void *> data_queue; +    size_t capacity; +    uint32_t notify_producer_count; +    uint32_t notify_consumer_count; + +public: +    bool closed; +    inline bool is_empty() +    { +        return data_queue.size() == 0; +    } + +    inline bool is_full() +    { +        return data_queue.size() == capacity; +    } + +    inline size_t length() +    { +        return data_queue.size(); +    } + +    inline size_t consumer_num() +    { +        return consumer_queue.size(); +    } + +    inline size_t producer_num() +    { +        return producer_queue.size(); +    } + +    inline void remove(coroutine_t *co) +    { +        consumer_queue.remove(co); +    } + +    inline coroutine_t* pop_coroutine(enum channel_op type) +    { +        coroutine_t* co; +        if (type == PRODUCER) +        { +            co = producer_queue.front(); +            producer_queue.pop_front(); +            notify_producer_count--; +            swDebug("resume producer[%d]", coroutine_get_cid(co)); +        } +        else +        { +            co = consumer_queue.front(); +            consumer_queue.pop_front(); +            notify_consumer_count--; +            swDebug("resume consumer[%d]", coroutine_get_cid(co)); +        } +        return co; +    } + +    Channel(size_t _capacity); +    void yield(enum channel_op type); +    void notify(enum channel_op type); +    void* pop(double timeout = 0); +    bool push(void *data); +    bool close(); +}; + +}; + diff --git a/php-pecl-swoole4.spec b/php-pecl-swoole4.spec index 0555a2c..2ceed15 100644 --- a/php-pecl-swoole4.spec +++ b/php-pecl-swoole4.spec @@ -12,7 +12,7 @@  %if 0%{?scl:1}  %global sub_prefix %{scl_prefix} -%scl_package       php-pecl-swoole +%scl_package       php-pecl-swoole4  %endif  %global with_zts   0%{!?_without_zts:%{?__ztsphp:1}} @@ -27,17 +27,32 @@  %endif  %global with_nghttpd2 1  %global with_hiredis  1 +%if 0%{?fedora} >= 25 || 0%{?rhel} >= 8 +%global with_brotli   1 +%else +%global with_brotli   0 +%endif +  Summary:        PHP's asynchronous concurrent distributed networking framework  Name:           %{?sub_prefix}php-pecl-%{pecl_name}4 -Version:        4.0.4 -Release:        2%{?dist}%{!?scl:%{!?nophptag:%(%{__php} -r 'echo ".".PHP_MAJOR_VERSION.".".PHP_MINOR_VERSION;')}} +Version:        4.1.0 +Release:        1%{?dist}%{!?scl:%{!?nophptag:%(%{__php} -r 'echo ".".PHP_MAJOR_VERSION.".".PHP_MINOR_VERSION;')}}  License:        BSD  URL:            http://pecl.php.net/package/%{pecl_name}  Source0:        http://pecl.php.net/get/%{pecl_name}-%{version}.tgz +Source1:        https://raw.githubusercontent.com/swoole/swoole-src/master/include/socket.h +Source2:        https://raw.githubusercontent.com/swoole/swoole-src/master/include/channel.h +Source3:        https://raw.githubusercontent.com/swoole/swoole-src/master/src/coroutine/socket.cc +Source4:        https://raw.githubusercontent.com/swoole/swoole-src/master/src/coroutine/channel.cc + +%if 0%{?rhel} == 6 +BuildRequires:  devtoolset-6-toolchain +%else  BuildRequires:  %{?dtsprefix}gcc  BuildRequires:  %{?dtsprefix}gcc-c++ +%endif  BuildRequires:  %{?scl_prefix}php-devel > 7  BuildRequires:  %{?scl_prefix}php-pear  BuildRequires:  %{?scl_prefix}php-sockets @@ -53,6 +68,9 @@ BuildRequires:  postgresql-devel > 9.5  %if %{with_hiredis}  BuildRequires:  hiredis-devel  %endif +%if %{with_brotli} +BuildRequires:  brotli-devel +%endif  Requires:       %{?scl_prefix}php(zend-abi) = %{php_zend_api}  Requires:       %{?scl_prefix}php(api) = %{php_core_api} @@ -152,6 +170,9 @@ sed -e '/examples/s/role="src"/role="doc"/' \  cd NTS +cp %{SOURCE1} %{SOURCE2} include/ +cp %{SOURCE3} %{SOURCE4} src/coroutine/ +  # Sanity check, really often broken  extver=$(sed -n '/#define PHP_SWOOLE_VERSION/{s/.* "//;s/".*$//;p}' php_swoole.h)  if test "x${extver}" != "x%{version}%{?prever:-%{prever}}"; then @@ -171,6 +192,7 @@ cat << 'EOF' | tee %{ini_name}  extension=%{pecl_name}.so  ; Configuration +;swoole.enable_coroutine = On  ;swoole.aio_thread_num = 2  ;swoole.display_errors = On  ;swoole.use_namespace = On @@ -181,7 +203,12 @@ EOF  %build +%if 0%{?rhel} == 6 +source /opt/rh/devtoolset-6/enable +g++ --version +%else  %{?dtsenable} +%endif  peclbuild() {  %configure \ @@ -218,7 +245,12 @@ peclbuild %{_bindir}/zts-php-config  %install +%if 0%{?rhel} == 6 +source /opt/rh/devtoolset-6/enable +g++ --version +%else  %{?dtsenable} +%endif  make -C NTS \       install INSTALL_ROOT=%{buildroot} @@ -312,6 +344,11 @@ cd ../ZTS  %changelog +* Fri Aug 31 2018 Remi Collet <remi@remirepo.net> - 4.1.0-1 +- update to 4.1.0 +- add dependency on brotli library (Fedora) +- open https://github.com/swoole/swoole-src/issues/1931 missing files +  * Thu Aug 16 2018 Remi Collet <remi@remirepo.net> - 4.0.4-2  - rebuild for 7.3.0beta2 new ABI diff --git a/socket.cc b/socket.cc new file mode 100644 index 0000000..7edb58f --- /dev/null +++ b/socket.cc @@ -0,0 +1,1441 @@ +#include "socket.h" +#include "context.h" +#include "async.h" +#include "buffer.h" + +#include <string> +#include <iostream> +#include <sys/stat.h> + +using namespace swoole; +using namespace std; + +static int socket_onRead(swReactor *reactor, swEvent *event); +static int socket_onWrite(swReactor *reactor, swEvent *event); +static void socket_onTimeout(swTimer *timer, swTimer_node *tnode); +static void socket_onResolveCompleted(swAio_event *event); + +bool Socket::socks5_handshake() +{ +    swSocks5 *ctx = socks5_proxy; +    char *buf = ctx->buf; +    int n; + +    /** +     * handshake +     */ +    swSocks5_pack(buf, socks5_proxy->username == NULL ? 0x00 : 0x02); +    socks5_proxy->state = SW_SOCKS5_STATE_HANDSHAKE; +    if (send(buf, 3) <= 0) +    { +        return false; +    } +    n = recv(buf, sizeof(ctx->buf)); +    if (n <= 0) +    { +        return false; +    } +    uchar version = buf[0]; +    uchar method = buf[1]; +    if (version != SW_SOCKS5_VERSION_CODE) +    { +        swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported."); +        return SW_ERR; +    } +    if (method != ctx->method) +    { +        swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_METHOD, "SOCKS authentication method not supported."); +        return SW_ERR; +    } +    //authenticate request +    if (method == SW_SOCKS5_METHOD_AUTH) +    { +        buf[0] = 0x01; +        buf[1] = ctx->l_username; + +        buf += 2; +        memcpy(buf, ctx->username, ctx->l_username); +        buf += ctx->l_username; +        buf[0] = ctx->l_password; +        memcpy(buf + 1, ctx->password, ctx->l_password); + +        ctx->state = SW_SOCKS5_STATE_AUTH; + +        if (send(ctx->buf, ctx->l_username + ctx->l_password + 3) < 0) +        { +            return false; +        } + +        n = recv(buf, sizeof(ctx->buf)); +        if (n <= 0) +        { +            return false; +        } + +        uchar version = buf[0]; +        uchar status = buf[1]; +        if (version != 0x01) +        { +            swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported."); +            return false; +        } +        if (status != 0) +        { +            swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_AUTH_FAILED, +                    "SOCKS username/password authentication failed."); +            return false; +        } +        goto send_connect_request; +    } +    //send connect request +    else +    { +        send_connect_request: buf[0] = SW_SOCKS5_VERSION_CODE; +        buf[1] = 0x01; +        buf[2] = 0x00; + +        ctx->state = SW_SOCKS5_STATE_CONNECT; + +        if (ctx->dns_tunnel) +        { +            buf[3] = 0x03; +            buf[4] = ctx->l_target_host; +            buf += 5; +            memcpy(buf, ctx->target_host, ctx->l_target_host); +            buf += ctx->l_target_host; +            *(uint16_t *) buf = htons(ctx->target_port); + +            if (send(ctx->buf, ctx->l_target_host + 7) < 0) +            { +                return false; +            } +        } +        else +        { +            buf[3] = 0x01; +            buf += 4; +            *(uint32_t *) buf = htons(ctx->l_target_host); +            buf += 4; +            *(uint16_t *) buf = htons(ctx->target_port); + +            if (send(ctx->buf, ctx->l_target_host + 7) < 0) +            { +                return false; +            } +        } + +        /** +         * response +         */ +        n = recv(buf, sizeof(ctx->buf)); +        if (n <= 0) +        { +            return false; +        } + +        uchar version = buf[0]; +        if (version != SW_SOCKS5_VERSION_CODE) +        { +            swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported."); +            return false; +        } +        uchar result = buf[1]; +#if 0 +        uchar reg = buf[2]; +        uchar type = buf[3]; +        uint32_t ip = *(uint32_t *) (buf + 4); +        uint16_t port = *(uint16_t *) (buf + 8); +#endif +        if (result == 0) +        { +            ctx->state = SW_SOCKS5_STATE_READY; +        } +        else +        { +            swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_SERVER_ERROR, "Socks5 server error, reason :%s.", +                    swSocks5_strerror(result)); +        } +        return result; +    } +} + +bool Socket::http_proxy_handshake() +{ +#ifdef SW_USE_OPENSSL +    if (socket->ssl) +    { +        if (ssl_handshake() == false) +        { +            return false; +        } +    } +    else +    { +        return true; +    } +#else +    return true; +#endif + +    //CONNECT +    int n = snprintf(http_proxy->buf, sizeof(http_proxy->buf), "CONNECT %*s:%d HTTP/1.1\r\n\r\n", +            http_proxy->l_target_host, http_proxy->target_host, http_proxy->target_port); +    if (send(http_proxy->buf, n) <= 0) +    { +        return false; +    } + +    n = recv(http_proxy->buf, sizeof(http_proxy->buf)); +    if (n <= 0) +    { +        return false; +    } +    char *buf = http_proxy->buf; +    int len = n; +    int state = 0; +    char *p = buf; +    for (p = buf; p < buf + len; p++) +    { +        if (state == 0) +        { +            if (strncasecmp(p, "HTTP/1.1", 8) == 0 || strncasecmp(p, "HTTP/1.0", 8) == 0) +            { +                state = 1; +                p += 8; +            } +            else +            { +                break; +            } +        } +        else if (state == 1) +        { +            if (isspace(*p)) +            { +                continue; +            } +            else +            { +                if (strncasecmp(p, "200", 3) == 0) +                { +                    state = 2; +                    p += 3; +                } +                else +                { +                    break; +                } +            } +        } +        else if (state == 2) +        { +            if (isspace(*p)) +            { +                continue; +            } +            else +            { +                if (strncasecmp(p, "Connection established", sizeof("Connection established") - 1) == 0) +                { +                    return true; +                } +                else +                { +                    break; +                } +            } +        } +    } +    return false; +} + +static inline int socket_connect(int fd, struct sockaddr *addr, socklen_t len) +{ +    int retval; +    while (1) +    { +        retval = ::connect(fd, addr, len); +        if (retval < 0) +        { +            if (errno == EINTR) +            { +                continue; +            } +        } +        break; +    } +    return retval; +} + +Socket::Socket(enum swSocket_type _type) +{ +    type = _type; +    switch (type) +    { +    case SW_SOCK_TCP6: +        _sock_domain = AF_INET6; +        _sock_type = SOCK_STREAM; +        break; +    case SW_SOCK_UNIX_STREAM: +        _sock_domain = AF_UNIX; +        _sock_type = SOCK_STREAM; +        break; +    case SW_SOCK_UDP: +        _sock_domain = AF_INET; +        _sock_type = SOCK_DGRAM; +        break; +    case SW_SOCK_UDP6: +        _sock_domain = AF_INET6; +        _sock_type = SOCK_DGRAM; +        break; +    case SW_SOCK_UNIX_DGRAM: +        _sock_domain = AF_UNIX; +        _sock_type = SOCK_DGRAM; +        break; +    case SW_SOCK_TCP: +    default: +        _sock_domain = AF_INET; +        _sock_type = SOCK_STREAM; +        break; +    } + +#ifdef SOCK_CLOEXEC +    int sockfd = ::socket(_sock_domain, _sock_type | SOCK_CLOEXEC, 0); +#else +    int sockfd = ::socket(_sock_domain, _sock_type, 0); +#endif +    if (sockfd < 0) +    { +        swWarn("socket() failed. Error: %s[%d]", strerror(errno), errno); +        return; +    } + +    if (swIsMaster() && SwooleTG.type == SW_THREAD_REACTOR) +    { +        reactor = SwooleTG.reactor; +    } +    else +    { +        reactor = SwooleG.main_reactor; +    } +    socket = swReactor_get(reactor, sockfd); + +    bzero(socket, sizeof(swConnection)); +    socket->fd = sockfd; +    socket->object = this; +    socket->socket_type = type; + +    swSetNonBlock(socket->fd); +    if (!swReactor_handle_isset(reactor, SW_FD_CORO_SOCKET)) +    { +        reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_READ, socket_onRead); +        reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_WRITE, socket_onWrite); +        reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_ERROR, socket_onRead); +    } +    init(); +} + +Socket::Socket(int _fd, Socket *sock) +{ +    reactor = sock->reactor; + +    socket = swReactor_get(reactor, _fd); +    bzero(socket, sizeof(swConnection)); +    socket->fd = _fd; +    socket->object = this; +    socket->socket_type = sock->type; + +    _sock_domain = sock->_sock_domain; +    _sock_type = sock->_sock_type; +    init(); +} + +bool Socket::connect(string host, int port, int flags) +{ +    //enable socks5 proxy +    if (socks5_proxy) +    { +        socks5_proxy->target_host = (char *) host.c_str(); +        socks5_proxy->l_target_host = host.size(); +        socks5_proxy->target_port = port; + +        host = socks5_proxy->host; +        port = socks5_proxy->port; +    } + +    //enable http proxy +    if (http_proxy) +    { +        http_proxy->target_host = (char *) host.c_str(); +        http_proxy->l_target_host = host.size(); +        http_proxy->target_port = port; + +        host = http_proxy->proxy_host; +        port = http_proxy->proxy_port; +    } + +    if (_sock_domain == AF_INET6 || _sock_domain == AF_INET) +    { +        if (port == -1) +        { +            swWarn("Socket of type AF_INET/AF_INET6 requires port argument"); +            return false; +        } +        else if (port == 0 || port >= 65536) +        { +            swWarn("Invalid port argument[%d]", port); +            return false; +        } +    } + +    if (unlikely(_cid && _cid != coroutine_get_current_cid())) +    { +        swWarn( "socket has already been bound to another coroutine."); +        return false; +    } + +    int retval = 0; +    _host = host; +    _port = port; + +    for (int i = 0; i < 2; i++) +    { +        if (_sock_domain == AF_INET) +        { +            socket->info.addr.inet_v4.sin_family = AF_INET; +            socket->info.addr.inet_v4.sin_port = htons(port); + +            if (!inet_pton(AF_INET, _host.c_str(), & socket->info.addr.inet_v4.sin_addr)) +            { +                _host = resolve(_host); +                if (_host.size() == 0) +                { +                    return false; +                } +                continue; +            } +            else +            { +                socket->info.len = sizeof( socket->info.addr.inet_v4); +                retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.inet_v4, socket->info.len); +                break; +            } +        } +        else if (_sock_domain == AF_INET6) +        { +            socket->info.addr.inet_v6.sin6_family = AF_INET6; +            socket->info.addr.inet_v6.sin6_port = htons(port); + +            if (!inet_pton(AF_INET6, _host.c_str(), &socket->info.addr.inet_v6.sin6_addr)) +            { +                _host = resolve(_host); +                if (_host.size() == 0) +                { +                    return false; +                } +                continue; +            } +            else +            { +                socket->info.len = sizeof(socket->info.addr.inet_v6); +                retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.inet_v6, socket->info.len); +                break; +            } +        } +        else if (_sock_domain == AF_UNIX) +        { +            if (_host.size() >= sizeof(socket->info.addr.un.sun_path)) +            { +                return false; +            } +            socket->info.addr.un.sun_family = AF_UNIX; +            memcpy(&socket->info.addr.un.sun_path, _host.c_str(), _host.size()); +            retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.un, +                    (socklen_t) (offsetof(struct sockaddr_un, sun_path) + _host.size())); +            break; +        } +        else +        { +            return false; +        } +    } + +    if (retval == -1) +    { +        if (errno != EINPROGRESS) +        { +            _error: errCode = errno; +            return false; +        } +        if (!wait_events(SW_EVENT_WRITE)) +        { +            goto _error; +        } +        yield(); +        //Connection has timed out +        if (errCode == ETIMEDOUT) +        { +            errMsg = strerror(errCode); +            return false; +        } +        socklen_t len = sizeof(errCode); +        if (getsockopt(socket->fd, SOL_SOCKET, SO_ERROR, &errCode, &len) < 0 || errCode != 0) +        { +            errMsg = strerror(errCode); +            return false; +        } +    } +    socket->active = 1; +    //socks5 proxy +    if (socks5_proxy && socks5_handshake() == false) +    { +        return false; +    } +    //http proxy +    if (http_proxy && http_proxy_handshake() == false) +    { +        return false; +    } +    return true; +} + +static void socket_onResolveCompleted(swAio_event *event) +{ +    Socket *sock = (Socket *) event->object; +    if (event->error != 0) +    { +        sock->errCode = SW_ERROR_DNSLOOKUP_RESOLVE_FAILED; +    } +    sock->resume(); +} + +static void socket_onTimeout(swTimer *timer, swTimer_node *tnode) +{ +    Socket *sock = (Socket *) tnode->data; +    sock->timer = NULL; +    sock->errCode = ETIMEDOUT; +    swDebug("socket[%d] timeout", sock->socket->fd); +    sock->reactor->del(sock->reactor, sock->socket->fd); +    sock->resume(); +} + +static int socket_onRead(swReactor *reactor, swEvent *event) +{ +    Socket *sock = (Socket *) event->socket->object; +    reactor->del(reactor, event->fd); +    sock->resume(); +    return SW_OK; +} + +static int socket_onWrite(swReactor *reactor, swEvent *event) +{ +    Socket *sock = (Socket *) event->socket->object; +    reactor->del(reactor, event->fd); +    sock->resume(); +    return SW_OK; +} + +ssize_t Socket::peek(void *__buf, size_t __n) +{ +    return swConnection_peek(socket, __buf, __n, 0); +} + +ssize_t Socket::recv(void *__buf, size_t __n) +{ +    ssize_t retval = swConnection_recv(socket, __buf, __n, 0); +    if (retval >= 0) +    { +        return retval; +    } + +    if (swConnection_error(errno) != SW_WAIT) +    { +        errCode = errno; +        return -1; +    } + +    while (true) +    { +        int events = SW_EVENT_READ; +#ifdef SW_USE_OPENSSL +        if (socket->ssl && socket->ssl_want_write) +        { +            events = SW_EVENT_WRITE; +        } +#endif +        if (!wait_events(events)) +        { +            return -1; +        } +        yield(); +        if (errCode == ETIMEDOUT) +        { +            return -1; +        } +        retval = swConnection_recv(socket, __buf, __n, 0); +        if (retval < 0) +        { +            if (swConnection_error(errno) == SW_WAIT) +            { +                continue; +            } +            errCode = errno; +        } +        break; +    } +    return retval; +} + +ssize_t Socket::recv_all(void *__buf, size_t __n) +{ +    ssize_t retval, total_bytes = 0; +    while (true) +    { +        retval = recv((char*) __buf + total_bytes, __n - total_bytes); +        if (retval <= 0) +        { +            if (total_bytes == 0) +            { +                total_bytes = retval; +            } +            break; +        } +        total_bytes += retval; +        if ((size_t) total_bytes == __n) +        { +            break; +        } +    } +    return total_bytes; +} + +ssize_t Socket::send_all(const void *__buf, size_t __n) +{ +    ssize_t retval, total_bytes = 0; +    while (true) +    { +        retval = send((char*) __buf + total_bytes, __n - total_bytes); +        if (retval <= 0) +        { +            if (total_bytes == 0) +            { +                total_bytes = retval; +            } +            break; +        } +        total_bytes += retval; +        if ((size_t) total_bytes == __n) +        { +            break; +        } +    } +    return total_bytes; +} + +ssize_t Socket::send(const void *__buf, size_t __n) +{ +    ssize_t retval = swConnection_send(socket, (void *) __buf, __n, 0); +    if (retval >= 0) +    { +        return retval; +    } + +    if (swConnection_error(errno) != SW_WAIT) +    { +        errCode = errno; +        return -1; +    } + +    while (true) +    { +        int events = SW_EVENT_WRITE; +#ifdef SW_USE_OPENSSL +        if (socket->ssl && socket->ssl_want_read) +        { +            events = SW_EVENT_READ; +        } +#endif +        if (!wait_events(events)) +        { +            return -1; +        } +        yield(); +        if (errCode == ETIMEDOUT) +        { +            return -1; +        } +        ssize_t retval = swConnection_send(socket, (void *) __buf, __n, 0); +        if (retval < 0) +        { +            if (swConnection_error(errno) == SW_WAIT) +            { +                continue; +            } +            errCode = errno; +        } +        break; +    } + +    return retval; +} + +void Socket::yield() +{ +    if (suspending) +    { +        swError("socket has already been bound to another coroutine."); +    } +    errCode = 0; +    if (_timeout > 0) +    { +        int ms = (int) (_timeout * 1000); +        if (SwooleG.timer.fd == 0) +        { +           swTimer_init(ms); +        } +        timer = SwooleG.timer.add(&SwooleG.timer, ms, 0, this, socket_onTimeout); +    } +    _cid = coroutine_get_current_cid(); +    if (_cid == -1) +    { +        swError("Socket::yield() must be called in the coroutine."); +    } +    //suspend +    suspending = true; +    coroutine_yield(coroutine_get_by_id(_cid)); +    suspending = false; +    //clear timer +    if (timer) +    { +        swTimer_del(&SwooleG.timer, timer); +        timer = nullptr; +    } +} + +void Socket::resume() +{ +    coroutine_resume(coroutine_get_by_id(_cid)); +} + +bool Socket::bind(std::string address, int port) +{ +    bind_address = address; +    bind_port = port; + +    struct sockaddr_storage sa_storage = { 0 }; +    struct sockaddr *sock_type = (struct sockaddr*) &sa_storage; + +    int retval; +    switch (_sock_domain) +    { +    case AF_UNIX: +    { +        struct sockaddr_un *sa = (struct sockaddr_un *) sock_type; +        sa->sun_family = AF_UNIX; + +        if (bind_address.size() >= sizeof(sa->sun_path)) +        { +            return false; +        } +        memcpy(&sa->sun_path, bind_address.c_str(), bind_address.size()); + +        retval = ::bind(socket->fd, (struct sockaddr *) sa, +        offsetof(struct sockaddr_un, sun_path) + bind_address.size()); +        break; +    } + +    case AF_INET: +    { +        struct sockaddr_in *sa = (struct sockaddr_in *) sock_type; +        sa->sin_family = AF_INET; +        sa->sin_port = htons((unsigned short) bind_port); +        if (!inet_aton(bind_address.c_str(), &sa->sin_addr)) +        { +            return false; +        } +        retval = ::bind(socket->fd, (struct sockaddr *) sa, sizeof(struct sockaddr_in)); +        break; +    } + +    case AF_INET6: +    { +        struct sockaddr_in6 *sa = (struct sockaddr_in6 *) sock_type; +        sa->sin6_family = AF_INET6; +        sa->sin6_port = htons((unsigned short) bind_port); + +        if (!inet_pton(AF_INET6, bind_address.c_str(), &sa->sin6_addr)) +        { +            return false; +        } +        retval = ::bind(socket->fd, (struct sockaddr *) sa, sizeof(struct sockaddr_in6)); +        break; +    } +    default: +        return false; +    } + +    if (retval != 0) +    { +        errCode = errno; +        return false; +    } + +    return true; +} + +bool Socket::listen(int backlog) +{ +    _backlog = backlog; +    if (::listen(socket->fd, backlog) != 0) +    { +        errCode = errno; +        return false; +    } +    return true; +} + +Socket* Socket::accept() +{ +    if (!wait_events(SW_EVENT_READ)) +    { +        return nullptr; +    } +    yield(); +    if (errCode == ETIMEDOUT) +    { +        return nullptr; +    } +    int conn; +    swSocketAddress client_addr; +    socklen_t client_addrlen = sizeof(client_addr); + +#ifdef HAVE_ACCEPT4 +    conn = ::accept4(socket->fd, (struct sockaddr *) &client_addr, &client_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); +#else +    conn = ::accept(socket->fd, (struct sockaddr *) &client_addr, &client_addrlen); +    if (conn >= 0) +    { +        swoole_fcntl_set_option(conn, 1, 1); +    } +#endif +    if (conn >= 0) +    { +        return new Socket(conn, this); +    } +    else +    { +        errCode = errno; +        return nullptr; +    } +} + +string Socket::resolve(string domain_name) +{ +    swAio_event ev; +    bzero(&ev, sizeof(swAio_event)); +    ev.nbytes = SW_IP_MAX_LENGTH; +    ev.buf = sw_malloc(ev.nbytes); +    if (!ev.buf) +    { +        errCode = errno; +        return ""; +    } + +    memcpy(ev.buf, domain_name.c_str(), domain_name.size()); +    ((char *) ev.buf)[domain_name.size()] = 0; +    ev.flags = _sock_domain; +    ev.type = SW_AIO_GETHOSTBYNAME; +    ev.object = this; +    ev.callback = socket_onResolveCompleted; + +    if (SwooleAIO.init == 0) +    { +        swAio_init(); +    } + +    if (swAio_dispatch(&ev) < 0) +    { +        errCode = SwooleG.error; +        sw_free(ev.buf); +        return ""; +    } +    /** +     * cannot timeout +     */ +    double tmp_timeout = _timeout; +    _timeout = -1; +    yield(); +    _timeout = tmp_timeout; + +    if (errCode == SW_ERROR_DNSLOOKUP_RESOLVE_FAILED) +    { +        errMsg = hstrerror(ev.error); +        return ""; +    } +    else +    { +        string addr((char *) ev.buf); +        sw_free(ev.buf); +        return addr; +    } +} + +bool Socket::shutdown(int __how) +{ +    if (!socket || socket->closed) +    { +        return false; +    } +    if (__how == SHUT_RD) +    { +        if (shutdown_read || shutdow_rw || ::shutdown(socket->fd, SHUT_RD)) +        { +            return false; +        } +        else +        { +            shutdown_read = 1; +            return true; +        } +    } +    else if (__how == SHUT_WR) +    { +        if (shutdown_write || shutdow_rw || ::shutdown(socket->fd, SHUT_RD) < 0) +        { +            return false; +        } +        else +        { +            shutdown_write = 1; +            return true; +        } +    } +    else if (__how == SHUT_RDWR) +    { +        if (shutdow_rw || ::shutdown(socket->fd, SHUT_RDWR) < 0) +        { +            return false; +        } +        else +        { +            shutdown_read = 1; +            return true; +        } +    } +    else +    { +        return false; +    } +} + +bool Socket::close() +{ +    if (socket == NULL || socket->closed) +    { +        return false; +    } +    socket->closed = 1; + +    int fd = socket->fd; + +    if (_sock_type == SW_SOCK_UNIX_DGRAM) +    { +        unlink(socket->info.addr.un.sun_path); +    } + +#ifdef SW_USE_OPENSSL +    if (open_ssl && ssl_context) +    { +        if (socket->ssl) +        { +            swSSL_close(socket); +        } +        swSSL_free_context(ssl_context); +        if (ssl_option.cert_file) +        { +            sw_free(ssl_option.cert_file); +        } +        if (ssl_option.key_file) +        { +            sw_free(ssl_option.key_file); +        } +        if (ssl_option.passphrase) +        { +            sw_free(ssl_option.passphrase); +        } +#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME +        if (ssl_option.tls_host_name) +        { +            sw_free(ssl_option.tls_host_name); +        } +#endif +        if (ssl_option.cafile) +        { +            sw_free(ssl_option.cafile); +        } +        if (ssl_option.capath) +        { +            sw_free(ssl_option.capath); +        } +    } +#endif +    if (_sock_type == SW_SOCK_UNIX_DGRAM) +    { +        unlink(socket->info.addr.un.sun_path); +    } +    if (timer) +    { +        swTimer_del(&SwooleG.timer, timer); +        timer = NULL; +    } +    socket->active = 0; +    ::close(fd); +    return true; +} + +#ifdef SW_USE_OPENSSL +bool Socket::ssl_handshake() +{ +    if (socket->ssl) +    { +        return false; +    } + +    ssl_context = swSSL_get_context(&ssl_option); +    if (ssl_context == NULL) +    { +        return false; +    } + +    if (ssl_option.verify_peer) +    { +        if (swSSL_set_capath(&ssl_option, ssl_context) < 0) +        { +            return false; +        } +    } + +    socket->ssl_send = 1; +#if defined(SW_USE_HTTP2) && defined(SW_USE_OPENSSL) && OPENSSL_VERSION_NUMBER >= 0x10002000L +    if (http2) +    { +        if (SSL_CTX_set_alpn_protos(ssl_context, (const unsigned char *) "\x02h2", 3) < 0) +        { +            return false; +        } +    } +#endif + +    if (swSSL_create(socket, ssl_context, SW_SSL_CLIENT) < 0) +    { +        return false; +    } +#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME +    if (ssl_option.tls_host_name) +    { +        SSL_set_tlsext_host_name(socket->ssl, ssl_option.tls_host_name); +    } +#endif + +    while (true) +    { +        int retval = swSSL_connect(socket); +        if (retval < 0) +        { +            errCode = SwooleG.error; +            return false; +        } +        if (socket->ssl_state == SW_SSL_STATE_WAIT_STREAM) +        { +            int events = socket->ssl_want_write ? SW_EVENT_WRITE : SW_EVENT_READ; +            if (!wait_events(events)) +            { +                return false; +            } +            yield(); +            if (errCode == ETIMEDOUT) +            { +                return false; +            } +        } +        else if (socket->ssl_state == SW_SSL_STATE_READY) +        { +            return true; +        } +    } + +    if (socket->ssl_state == SW_SSL_STATE_READY && ssl_option.verify_peer) +    { +        if (ssl_verify(ssl_option.allow_self_signed) < 0) +        { +            return false; +        } +    } +    return true; +} + +int Socket::ssl_verify(bool allow_self_signed) +{ +    if (swSSL_verify(socket, allow_self_signed) < 0) +    { +        return SW_ERR; +    } +    if (ssl_option.tls_host_name && swSSL_check_host(socket, ssl_option.tls_host_name) < 0) +    { +        return SW_ERR; +    } +    return SW_OK; +} +#endif + +bool Socket::sendfile(char *filename, off_t offset, size_t length) +{ +    int file_fd = open(filename, O_RDONLY); +    if (file_fd < 0) +    { +        swSysError("open(%s) failed.", filename); +        return false; +    } + +    if (length == 0) +    { +        struct stat file_stat; +        if (::fstat(file_fd, &file_stat) < 0) +        { +            swSysError("fstat(%s) failed.", filename); +            ::close(file_fd); +            return false; +        } +        length = file_stat.st_size; +    } +    else +    { +        // total length of the file +        length = offset + length; +    } + +    int n, sendn; +    while ((size_t) offset < length) +    { +        sendn = (length - offset > SW_SENDFILE_CHUNK_SIZE) ? SW_SENDFILE_CHUNK_SIZE : length - offset; +#ifdef SW_USE_OPENSSL +        if (socket->ssl) +        { +            n = swSSL_sendfile(socket, file_fd, &offset, sendn); +        } +        else +#endif +        { +            n = ::swoole_sendfile(socket->fd, file_fd, &offset, sendn); +        } +        if (n > 0) +        { +            continue; +        } +        else if (n == 0) +        { +            swWarn("sendfile return zero."); +            return false; +        } +        else if (errno != EAGAIN) +        { +            swSysError("sendfile(%d, %s) failed.", socket->fd, filename); +            _error: errCode = errno; +            ::close(file_fd); +            return false; +        } +        if (!wait_events(SW_EVENT_WRITE)) +        { +            goto _error; +        } +        yield(); +        if (errCode == ETIMEDOUT) +        { +            goto _error; +        } +    } +    ::close(file_fd); +    return true; +} + +int Socket::sendto(char *address, int port, char *data, int len) +{ +    if (type == SW_SOCK_UDP) +    { +        return swSocket_udp_sendto(socket->fd, address, port, data, len); +    } +    else if (type == SW_SOCK_UDP6) +    { +        return swSocket_udp_sendto6(socket->fd, address, port, data, len); +    } +    else +    { +        swWarn("only supports SWOOLE_SOCK_UDP or SWOOLE_SOCK_UDP6."); +        return -1; +    } +} + +int Socket::recvfrom(void *__buf, size_t __n, char *address, int *port) +{ +    socket->info.len = sizeof(socket->info.addr); +    int retval; + +    _recv: retval = ::recvfrom(socket->fd, __buf, __n, 0, (struct sockaddr *) &socket->info.addr, &socket->info.len); +    if (retval < 0) +    { +        if (errno == EINTR) +        { +            goto _recv; +        } +        else if (swConnection_error(errno) == SW_WAIT) +        { +            if (!wait_events(SW_EVENT_READ)) +            { +                return -1; +            } +            yield(); +            if (errCode == ETIMEDOUT) +            { +                return -1; +            } +            retval = ::recvfrom(socket->fd, __buf, __n, 0, (struct sockaddr *) &socket->info.addr, &socket->info.len); +            if (retval < 0) +            { +                errCode = errno; +            } +            else +            { +                strcpy(address, swConnection_get_ip(socket)); +                *port = swConnection_get_port(socket); +            } +        } +        else +        { +            errCode = errno; +        } +    } +    return retval; +} + +/** + * recv packet with protocol + */ +ssize_t Socket::recv_packet() +{ +    get_buffer(); +    ssize_t buf_len = SW_BUFFER_SIZE_STD; +    ssize_t retval; + +    if (open_length_check) +    { +        uint32_t header_len; + +        _get_header_len: header_len = protocol.package_length_offset + protocol.package_length_size; +        if (buffer->length > 0) +        { +            if (buffer->length < header_len) +            { +                goto _recv_header; +            } +            else +            { +                goto _get_length; +            } +        } + +        _recv_header: retval = recv(buffer->str + buffer->length, header_len - buffer->length); +        if (retval <= 0) +        { +            return 0; +        } +        else if (retval < 0 || retval != header_len) +        { +            return 0; +        } +        else +        { +            buffer->length += retval; +        } + +        _get_length: buf_len = protocol.get_package_length(&protocol, socket, buffer->str, (uint32_t) buffer->length); +        swDebug("packet_len=%ld, length=%ld", buf_len, buffer->length); +        //error package +        if (buf_len < 0) +        { +            return 0; +        } +        else if (buf_len == 0) +        { +            header_len = protocol.real_header_length; +            goto _recv_header; +        } +        //empty package +        else if (buf_len == header_len) +        { +            buffer->length = 0; +            return header_len; +        } +        else if (buf_len > protocol.package_max_length) +        { +            swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "packet[length=%d] is too big.", (int )buf_len); +            return 0; +        } + +        if ((size_t) buf_len == buffer->length) +        { +            buffer->length = 0; +            return buf_len; +        } +        else if ((size_t) buf_len < buffer->length) +        { +            buffer->length = buffer->length - buf_len; +            memmove(buffer->str, buffer->str + buf_len, buffer->length); +            goto _get_header_len; +        } + +        if ((size_t) buf_len >= buffer->size) +        { +            if (swString_extend(buffer, buf_len) < 0) +            { +                buffer->length = 0; +                return -1; +            } +        } + +        retval = recv_all(buffer->str + buffer->length, buf_len - buffer->length); +        if (retval > 0) +        { +            buffer->length += retval; +            if (buffer->length != (size_t) buf_len) +            { +                retval = 0; +            } +            else +            { +                buffer->length = 0; +                return buf_len; +            } +        } +    } +    else if (open_eof_check) +    { +        int eof = -1; +        char *buf; + +        if (buffer->length > 0) +        { +            goto find_eof; +        } + +        while (1) +        { +            buf = buffer->str + buffer->length; +            buf_len = buffer->size - buffer->length; + +            if (buf_len > SW_BUFFER_SIZE_BIG) +            { +                buf_len = SW_BUFFER_SIZE_BIG; +            } + +            retval = recv(buf, buf_len); +            if (retval < 0) +            { +                buffer->length = 0; +                return -1; +            } +            else if (retval == 0) +            { +                buffer->length = 0; +                return 0; +            } + +            buffer->length += retval; + +            if (buffer->length < protocol.package_eof_len) +            { +                continue; +            } + +            find_eof: eof = swoole_strnpos(buffer->str, buffer->length, protocol.package_eof, protocol.package_eof_len); +            if (eof >= 0) +            { +                eof += protocol.package_eof_len; +                if (buffer->length > (uint32_t) eof) +                { +                    buffer->length -= eof; +                    memmove(buffer->str, buffer->str + eof, buffer->length); +                } +                else +                { +                    buffer->length = 0; +                } +                return eof; +            } +            else +            { +                if (buffer->length == protocol.package_max_length) +                { +                    swWarn("no package eof"); +                    buffer->length = 0; +                    return -1; +                } +                else if (buffer->length == buffer->size) +                { +                    if (buffer->size < protocol.package_max_length) +                    { +                        size_t new_size = buffer->size * 2; +                        if (new_size > protocol.package_max_length) +                        { +                            new_size = protocol.package_max_length; +                        } +                        if (swString_extend(buffer, new_size) < 0) +                        { +                            buffer->length = 0; +                            return -1; +                        } +                    } +                } +            } +        } +        buffer->length = 0; +    } +    else +    { +        return -1; +    } + +    return retval; +} + +swString* Socket::get_buffer() +{ +    if (unlikely(buffer == nullptr)) +    { +        buffer = swString_new(SW_BUFFER_SIZE_STD); +    } +    return buffer; +} + +Socket::~Socket() +{ +    if (!socket->closed) +    { +        close(); +    } +    if (socket->out_buffer) +    { +        swBuffer_free(socket->out_buffer); +        socket->out_buffer = NULL; +    } +    if (socket->in_buffer) +    { +        swBuffer_free(socket->in_buffer); +        socket->in_buffer = NULL; +    } +    if (buffer) +    { +        swString_free(buffer); +    } +    bzero(socket, sizeof(swConnection)); +    socket->removed = 1; +} diff --git a/socket.h b/socket.h new file mode 100644 index 0000000..7a3d039 --- /dev/null +++ b/socket.h @@ -0,0 +1,142 @@ +#pragma once + +#include "swoole.h" +#include "connection.h" +#include "socks5.h" +#include <string> + +namespace swoole +{ +class Socket +{ +public: +    Socket(enum swSocket_type type); +    Socket(int _fd, Socket *sock); +    ~Socket(); +    bool connect(std::string host, int port, int flags = 0); +    bool shutdown(int how); +    bool close(); +    ssize_t send(const void *__buf, size_t __n); +    ssize_t peek(void *__buf, size_t __n); +    ssize_t recv(void *__buf, size_t __n); +    ssize_t recv_all(void *__buf, size_t __n); +    ssize_t send_all(const void *__buf, size_t __n); +    ssize_t recv_packet(); +    Socket* accept(); +    void resume(); +    void yield(); +    bool bind(std::string address, int port = 0); +    std::string resolve(std::string host); +    bool listen(int backlog = 0); +    bool sendfile(char *filename, off_t offset, size_t length); +    int sendto(char *address, int port, char *data, int len); +    int recvfrom(void *__buf, size_t __n, char *address, int *port = nullptr); +    swString* get_buffer(); + +    void setTimeout(double timeout) +    { +        _timeout = timeout; +    } + +#ifdef SW_USE_OPENSSL +    bool ssl_handshake(); +    int ssl_verify(bool allow_self_signed); +#endif + +protected: +    inline void init() +    { +        _cid = 0; +        suspending = false; +        _timeout = 0; +        _port = 0; +        errCode = 0; +        errMsg = nullptr; +        timer = nullptr; +        bind_port = 0; +        _backlog = 0; + +        http2 = 0; +        shutdow_rw = 0; +        shutdown_read = 0; +        shutdown_write = 0; +        open_length_check = 0; +        open_eof_check = 0; + +        socks5_proxy = nullptr; +        http_proxy = nullptr; + +        buffer = nullptr; +        protocol = {0}; + +        protocol.package_length_type = 'N'; +        protocol.package_length_size = 4; +        protocol.package_body_offset = 0; +        protocol.package_max_length = SW_BUFFER_INPUT_SIZE; + +#ifdef SW_USE_OPENSSL +        open_ssl = 0; +        ssl_wait_handshake = 0; +        ssl_context = NULL; +        ssl_option = {0}; +#endif +    } + +    inline bool wait_events(int events) +    { +        if (reactor->add(reactor, socket->fd, SW_FD_CORO_SOCKET | events) < 0) +        { +            errCode = errno; +            return false; +        } +        else +        { +            return true; +        } +    } + +    bool socks5_handshake(); +    bool http_proxy_handshake(); + +public: +    swTimer_node *timer; +    swReactor *reactor; +    std::string _host; +    std::string bind_address; +    int bind_port; +    int _port; +    int _cid; +    bool suspending; +    swConnection *socket; +    enum swSocket_type type; +    int _sock_type; +    int _sock_domain; +    double _timeout; +    int _backlog; +    int errCode; +    const char *errMsg; +    uint32_t http2 :1; +    uint32_t shutdow_rw :1; +    uint32_t shutdown_read :1; +    uint32_t shutdown_write :1; +    /** +     * one package: length check +     */ +    uint32_t open_length_check :1; +    uint32_t open_eof_check :1; + +    swProtocol protocol; +    swString *buffer; + +    struct _swSocks5 *socks5_proxy; +    struct _http_proxy* http_proxy; + +#ifdef SW_USE_OPENSSL +    uint8_t open_ssl :1; +    uint8_t ssl_wait_handshake :1; +    SSL_CTX *ssl_context; +    swSSL_option ssl_option; +#endif +}; + +}; | 
