diff options
Diffstat (limited to 'channel.cc')
-rw-r--r-- | channel.cc | 151 |
1 files changed, 151 insertions, 0 deletions
diff --git a/channel.cc b/channel.cc new file mode 100644 index 0000000..88423fc --- /dev/null +++ b/channel.cc @@ -0,0 +1,151 @@ +#include "channel.h" + +using namespace swoole; + +static void channel_defer_callback(void *data) +{ + notify_msg_t *msg = (notify_msg_t*) data; + coroutine_t *co = msg->chan->pop_coroutine(msg->type); + coroutine_resume(co); + delete msg; +} + +static void channel_pop_timeout(swTimer *timer, swTimer_node *tnode) +{ + timeout_msg_t *msg = (timeout_msg_t *) tnode->data; + msg->error = true; + msg->timer = nullptr; + msg->chan->remove(msg->co); + coroutine_resume(msg->co); +} + +Channel::Channel(size_t _capacity) +{ + capacity = _capacity; + closed = false; + notify_producer_count = 0; + notify_consumer_count = 0; +} + +void Channel::yield(enum channel_op type) +{ + int _cid = coroutine_get_current_cid(); + if (_cid == -1) + { + swError("Socket::yield() must be called in the coroutine."); + } + coroutine_t *co = coroutine_get_by_id(_cid); + if (type == PRODUCER) + { + producer_queue.push_back(co); + swDebug("producer[%d]", coroutine_get_cid(co)); + } + else + { + consumer_queue.push_back(co); + swDebug("consumer[%d]", coroutine_get_cid(co)); + } + coroutine_yield(co); +} + +void Channel::notify(enum channel_op type) +{ + notify_msg_t *msg = new notify_msg_t; + msg->chan = this; + msg->type = type; + if (type == PRODUCER) + { + notify_producer_count++; + } + else + { + notify_consumer_count++; + } + SwooleG.main_reactor->defer(SwooleG.main_reactor, channel_defer_callback, msg); +} + +void* Channel::pop(double timeout) +{ + timeout_msg_t msg; + msg.error = false; + if (timeout > 0) + { + int msec = (int) (timeout * 1000); + if (SwooleG.timer.fd == 0) + { + swTimer_init (msec); + } + msg.chan = this; + msg.co = coroutine_get_by_id(coroutine_get_current_cid()); + msg.timer = SwooleG.timer.add(&SwooleG.timer, msec, 0, &msg, channel_pop_timeout); + } + else + { + msg.timer = NULL; + } + if (is_empty() || consumer_queue.size() > 0) + { + yield(CONSUMER); + } + if (msg.error) + { + return nullptr; + } + if (msg.timer) + { + swTimer_del(&SwooleG.timer, msg.timer); + } + /** + * pop data + */ + void *data = data_queue.front(); + data_queue.pop(); + /** + * notify producer + */ + if (producer_queue.size() > 0 && notify_producer_count < producer_queue.size()) + { + notify(PRODUCER); + } + return data; +} + +bool Channel::push(void *data) +{ + if (is_full() || producer_queue.size() > 0) + { + yield(PRODUCER); + } + /** + * push data + */ + data_queue.push(data); + swDebug("push data, count=%ld", length()); + /** + * notify consumer + */ + if (consumer_queue.size() > 0 && notify_consumer_count < consumer_queue.size()) + { + notify(CONSUMER); + } + return true; +} + +bool Channel::close() +{ + if (closed) + { + return false; + } + swDebug("closed"); + closed = true; + while (producer_queue.size() > 0 && notify_producer_count < producer_queue.size()) + { + notify(PRODUCER); + } + while (consumer_queue.size() > 0 && notify_consumer_count < producer_queue.size()) + { + notify(CONSUMER); + } + return true; +} |