1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
From 0b97ec3739d99f0778ff827cb58c011b92d27a74 Mon Sep 17 00:00:00 2001
From: michael-grunder <michael.grunder@gmail.com>
Date: Thu, 18 Oct 2018 09:47:10 -0700
Subject: [PATCH] Update STREAM API to handle STATUS -> BULK reply change
Right before Redis 5.0 was released, the api was changed to send
message ids as BULK instead of STATUS replies.
---
library.c | 35 ++++++++++++++++++++++-------------
redis.c | 2 +-
redis_cluster.c | 2 +-
3 files changed, 24 insertions(+), 15 deletions(-)
diff --git a/library.c b/library.c
index 8fecaef1..d5752f0d 100644
--- a/library.c
+++ b/library.c
@@ -1269,17 +1269,18 @@ redis_read_stream_messages(RedisSock *redis_sock, int count, zval *z_ret
{
zval zv, *z_message = &zv;
int i, mhdr, fields;
- char id[1024];
- size_t idlen;
+ char *id = NULL;
+ int idlen;
/* Iterate over each message */
for (i = 0; i < count; i++) {
/* Consume inner multi-bulk header, message ID itself and finaly
* the multi-bulk header for field and values */
if ((read_mbulk_header(redis_sock, &mhdr TSRMLS_CC) < 0 || mhdr != 2) ||
- redis_sock_read_single_line(redis_sock, id, sizeof(id), &idlen, 0 TSRMLS_CC) < 0 ||
+ ((id = redis_sock_read(redis_sock, &idlen TSRMLS_CC)) == NULL) ||
(read_mbulk_header(redis_sock, &fields TSRMLS_CC) < 0 || fields % 2 != 0))
{
+ if (id) efree(id);
return -1;
}
@@ -1289,6 +1290,7 @@ redis_read_stream_messages(RedisSock *redis_sock, int count, zval *z_ret
redis_mbulk_reply_loop(redis_sock, z_message, fields, UNSERIALIZE_VALS TSRMLS_CC);
array_zip_values_and_scores(redis_sock, z_message, SCORE_DECODE_NONE TSRMLS_CC);
add_assoc_zval_ex(z_ret, id, idlen, z_message);
+ efree(id);
}
return 0;
@@ -1404,24 +1406,30 @@ PHP_REDIS_API int
redis_read_xclaim_response(RedisSock *redis_sock, int count, zval *rv TSRMLS_DC) {
zval zv, *z_msg = &zv;
REDIS_REPLY_TYPE type;
- char id[1024];
- int i, fields;
+ char *id;
+ int i, fields, idlen;
long li;
- size_t idlen;
for (i = 0; i < count; i++) {
/* Consume inner reply type */
if (redis_read_reply_type(redis_sock, &type, &li TSRMLS_CC) < 0 ||
- (type != TYPE_LINE && type != TYPE_MULTIBULK)) return -1;
+ (type != TYPE_BULK && type != TYPE_MULTIBULK) ||
+ (type == TYPE_BULK && li <= 0)) return -1;
- if (type == TYPE_LINE) {
- /* JUSTID variant */
- if (redis_sock_gets(redis_sock, id, sizeof(id), &idlen TSRMLS_CC) < 0)
+ /* TYPE_BULK is the JUSTID variant, otherwise it's standard xclaim response */
+ if (type == TYPE_BULK) {
+ if ((id = redis_sock_read_bulk_reply(redis_sock, (size_t)li TSRMLS_CC)) == NULL)
return -1;
- add_next_index_stringl(rv, id, idlen);
+
+ add_next_index_stringl(rv, id, li);
+ efree(id);
} else {
- if (li != 2 || redis_sock_read_single_line(redis_sock, id, sizeof(id), &idlen, 0 TSRMLS_CC) < 0 ||
- (read_mbulk_header(redis_sock, &fields TSRMLS_CC) < 0 || fields % 2 != 0)) return -1;
+ if ((li != 2 || (id = redis_sock_read(redis_sock, &idlen TSRMLS_CC)) == NULL) ||
+ (read_mbulk_header(redis_sock, &fields TSRMLS_CC) < 0 || fields % 2 != 0))
+ {
+ if (id) efree(id);
+ return -1;
+ }
REDIS_MAKE_STD_ZVAL(z_msg);
array_init(z_msg);
@@ -1429,6 +1437,7 @@ redis_read_xclaim_response(RedisSock *redis_sock, int count, zval *rv TSRMLS_DC)
redis_mbulk_reply_loop(redis_sock, z_msg, fields, UNSERIALIZE_VALS TSRMLS_CC);
array_zip_values_and_scores(redis_sock, z_msg, SCORE_DECODE_NONE TSRMLS_CC);
add_assoc_zval_ex(rv, id, idlen, z_msg);
+ efree(id);
}
}
diff --git a/redis.c b/redis.c
index b8244718..2718997c 100644
--- a/redis.c
+++ b/redis.c
@@ -3592,7 +3592,7 @@ PHP_METHOD(Redis, xack) {
}
PHP_METHOD(Redis, xadd) {
- REDIS_PROCESS_CMD(xadd, redis_single_line_reply);
+ REDIS_PROCESS_CMD(xadd, redis_read_variant_reply);
}
PHP_METHOD(Redis, xclaim) {
diff --git a/redis_cluster.c b/redis_cluster.c
index 130b961a..8f10bcb0 100644
--- a/redis_cluster.c
+++ b/redis_cluster.c
@@ -2983,7 +2983,7 @@ PHP_METHOD(RedisCluster, xack) {
/* {{{ proto string RedisCluster::xadd(string key, string id, array field_values) }}} */
PHP_METHOD(RedisCluster, xadd) {
- CLUSTER_PROCESS_CMD(xadd, cluster_single_line_resp, 0);
+ CLUSTER_PROCESS_CMD(xadd, cluster_bulk_raw_resp, 0);
}
/* {{{ proto array RedisCluster::xclaim(string key, string group, string consumer,
|