diff options
Diffstat (limited to 'channel.h')
-rw-r--r-- | channel.h | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/channel.h b/channel.h new file mode 100644 index 0000000..ab4fb5f --- /dev/null +++ b/channel.h @@ -0,0 +1,107 @@ +#pragma once + +#include "swoole.h" +#include "context.h" +#include "coroutine.h" +#include <string> +#include <iostream> +#include <list> +#include <queue> +#include <sys/stat.h> + +namespace swoole { + +enum channel_op +{ + PRODUCER = 1, + CONSUMER = 2, +}; + +class Channel; + +struct notify_msg_t +{ + Channel *chan; + enum channel_op type; +}; + +struct timeout_msg_t +{ + Channel *chan; + coroutine_t *co; + bool error; + swTimer_node *timer; +}; + +class Channel +{ +private: + std::list<coroutine_t *> producer_queue; + std::list<coroutine_t *> consumer_queue; + std::queue<void *> data_queue; + size_t capacity; + uint32_t notify_producer_count; + uint32_t notify_consumer_count; + +public: + bool closed; + inline bool is_empty() + { + return data_queue.size() == 0; + } + + inline bool is_full() + { + return data_queue.size() == capacity; + } + + inline size_t length() + { + return data_queue.size(); + } + + inline size_t consumer_num() + { + return consumer_queue.size(); + } + + inline size_t producer_num() + { + return producer_queue.size(); + } + + inline void remove(coroutine_t *co) + { + consumer_queue.remove(co); + } + + inline coroutine_t* pop_coroutine(enum channel_op type) + { + coroutine_t* co; + if (type == PRODUCER) + { + co = producer_queue.front(); + producer_queue.pop_front(); + notify_producer_count--; + swDebug("resume producer[%d]", coroutine_get_cid(co)); + } + else + { + co = consumer_queue.front(); + consumer_queue.pop_front(); + notify_consumer_count--; + swDebug("resume consumer[%d]", coroutine_get_cid(co)); + } + return co; + } + + Channel(size_t _capacity); + void yield(enum channel_op type); + void notify(enum channel_op type); + void* pop(double timeout = 0); + bool push(void *data); + bool close(); +}; + +}; + |