recv.c 11 KB


  1. // (C) 2013 Cybozu et al.
  2. #include "yrmcds.h"
  3. #include "yrmcds_portability.h"
  4. #ifdef LIBYRMCDS_USE_LZ4
  5. # include "lz4/lib/lz4.h"
  6. #endif
  7. #include <errno.h>
  8. #include <limits.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <sys/socket.h>
  13. #include <sys/types.h>
  14. static const size_t BINARY_HEADER_SIZE = 24;
  15. static const size_t RECV_SIZE = 256 << 10;
  16. static const size_t MAX_CAPACITY = 50 << 20; // 50 MiB
  17. static inline yrmcds_error recv_data(yrmcds* c) {
  18. if( (c->capacity - c->used) < RECV_SIZE ) {
  19. size_t new_capacity = c->capacity * 2;
  20. char* new_buffer = (char*)realloc(c->recvbuf, new_capacity);
  21. if( new_buffer == NULL )
  22. return YRMCDS_OUT_OF_MEMORY;
  23. c->recvbuf = new_buffer;
  24. c->capacity = new_capacity;
  25. }
  26. ssize_t n;
  27. AGAIN:
  28. n = recv(c->sock, c->recvbuf + c->used, RECV_SIZE, 0);
  29. if( n == -1 ) {
  30. if( errno == EINTR ) goto AGAIN;
  31. return YRMCDS_SYSTEM_ERROR;
  32. }
  33. if( n == 0 )
  34. return YRMCDS_DISCONNECTED;
  35. c->used += (size_t)n;
  36. return YRMCDS_OK;
  37. }
  38. static inline uint64_t ntoh64(const char* p) {
  39. uint64_t n;
  40. memcpy(&n, p, sizeof(n));
  41. return be64toh(n);
  42. }
  43. static inline uint32_t ntoh32(const char* p) {
  44. uint32_t n;
  45. memcpy(&n, p, sizeof(n));
  46. return be32toh(n);
  47. }
  48. static inline uint16_t ntoh16(const char* p) {
  49. uint16_t n;
  50. memcpy(&n, p, sizeof(n));
  51. return be16toh(n);
  52. }
  53. static yrmcds_error text_recv(yrmcds* c, yrmcds_response* r);
  54. yrmcds_error yrmcds_recv(yrmcds* c, yrmcds_response* r) {
  55. if( c == NULL || r == NULL )
  56. return YRMCDS_BAD_ARGUMENT;
  57. if( c->invalid )
  58. return YRMCDS_PROTOCOL_ERROR;
  59. if( c->last_size > 0 ) {
  60. size_t remain = c->used - c->last_size;
  61. if( remain > 0 )
  62. memmove(c->recvbuf, c->recvbuf + c->last_size, remain);
  63. c->used = remain;
  64. c->last_size = 0;
  65. free(c->decompressed);
  66. c->decompressed = NULL;
  67. }
  68. if( c->text_mode ) {
  69. return text_recv(c, r);
  70. }
  71. while( c->used < BINARY_HEADER_SIZE ) {
  72. yrmcds_error e = recv_data(c);
  73. if( e != 0 ) return e;
  74. }
  75. if( *c->recvbuf != '\x81' ) {
  76. c->invalid = 1;
  77. return YRMCDS_PROTOCOL_ERROR;
  78. }
  79. uint32_t total_len = ntoh32(c->recvbuf + 8);
  80. if( total_len > MAX_CAPACITY ) {
  81. c->invalid = 1;
  82. return YRMCDS_PROTOCOL_ERROR;
  83. }
  84. while( c->used < (BINARY_HEADER_SIZE + total_len) ) {
  85. yrmcds_error e = recv_data(c);
  86. if( e != 0 ) return e;
  87. }
  88. uint16_t key_len = ntoh16(c->recvbuf + 2);
  89. uint8_t extras_len = *(unsigned char*)(c->recvbuf + 4);
  90. if( total_len < (key_len + extras_len) ) {
  91. c->invalid = 1;
  92. return YRMCDS_PROTOCOL_ERROR;
  93. }
  94. const char* pkey = c->recvbuf + (BINARY_HEADER_SIZE + extras_len);
  95. r->length = BINARY_HEADER_SIZE + total_len;
  96. r->command = *(unsigned char*)(c->recvbuf + 1);
  97. r->key = key_len ? pkey : NULL;
  98. r->key_len = key_len;
  99. r->status = ntoh16(c->recvbuf + 6);
  100. memcpy(&(r->serial), c->recvbuf + 12, 4);
  101. r->cas_unique = ntoh64(c->recvbuf + 16);
  102. r->flags = 0;
  103. if( extras_len > 0 ) {
  104. if( extras_len != 4 ) {
  105. c->invalid = 1;
  106. return YRMCDS_PROTOCOL_ERROR;
  107. }
  108. r->flags = ntoh32(c->recvbuf + BINARY_HEADER_SIZE);
  109. }
  110. size_t data_len = total_len - key_len - extras_len;
  111. const char* pdata = pkey + key_len;
  112. if( (r->command == YRMCDS_CMD_INCREMENT ||
  113. r->command == YRMCDS_CMD_DECREMENT) &&
  114. (r->status == YRMCDS_STATUS_OK) ) {
  115. r->data = NULL;
  116. r->data_len = 0;
  117. if( data_len != 8 ) {
  118. c->invalid = 1;
  119. return YRMCDS_PROTOCOL_ERROR;
  120. }
  121. r->value = ntoh64(pdata);
  122. c->last_size = r->length;
  123. return YRMCDS_OK;
  124. }
  125. r->value = 0;
  126. r->data = data_len ? pdata : NULL;
  127. r->data_len = data_len;
  128. #ifdef LIBYRMCDS_USE_LZ4
  129. if( c->compress_size && (r->flags & YRMCDS_FLAG_COMPRESS) ) {
  130. if( data_len == 0 ) {
  131. c->invalid = 1;
  132. return YRMCDS_PROTOCOL_ERROR;
  133. }
  134. r->flags &= ~(uint32_t)YRMCDS_FLAG_COMPRESS;
  135. uint32_t decompress_size = ntoh32(pdata);
  136. if( UINT32_MAX > INT_MAX ) {
  137. if( decompress_size > INT_MAX ) {
  138. c->invalid = 1;
  139. return YRMCDS_PROTOCOL_ERROR;
  140. }
  141. }
  142. c->decompressed = (char*)malloc(decompress_size);
  143. if( c->decompressed == NULL )
  144. return YRMCDS_OUT_OF_MEMORY;
  145. int d = LZ4_decompress_safe(pdata + sizeof(uint32_t),
  146. c->decompressed,
  147. (int)(data_len - sizeof(uint32_t)),
  148. (int)decompress_size);
  149. if( d != decompress_size ) {
  150. c->invalid = 1;
  151. return YRMCDS_PROTOCOL_ERROR;
  152. }
  153. r->data = c->decompressed;
  154. r->data_len = decompress_size;
  155. }
  156. #endif // LIBYRMCDS_USE_LZ4
  157. c->last_size = r->length;
  158. return YRMCDS_OK;
  159. }
  160. // text protocol
  161. #define PARSE_UINT(name) \
  162. uint64_t name = 0; \
  163. while( *p == ' ' ) p++; \
  164. while( '0' <= *p && *p <= '9' ) { \
  165. name *= 10; \
  166. name += (uint64_t)(*p - '0'); \
  167. p++; \
  168. }
  169. static yrmcds_error text_recv(yrmcds* c, yrmcds_response* r) {
  170. char* pos;
  171. while( c->used == 0 ||
  172. (pos = (char*)memchr(c->recvbuf, '\n', c->used)) == NULL ) {
  173. yrmcds_error e = recv_data(c);
  174. if( e != 0 ) return e;
  175. }
  176. // make sure the buffer contains CRLF.
  177. if( (pos - c->recvbuf) < 2 || *(pos-1) != '\r' ) {
  178. c->invalid = 1;
  179. return YRMCDS_PROTOCOL_ERROR;
  180. }
  181. pos--;
  182. size_t resp_len = (size_t)(pos - c->recvbuf);
  183. memset(r, 0, sizeof(yrmcds_response));
  184. r->serial = ++c->rserial;
  185. r->length = resp_len + 2;
  186. r->status = YRMCDS_STATUS_OK;
  187. r->command = YRMCDS_CMD_BOTTOM; // dummy for emulating binary protocol
  188. if( resp_len == 2 && memcmp(c->recvbuf, "OK", 2) == 0 ) {
  189. // successful response for flush_all
  190. goto FINISH;
  191. }
  192. if( resp_len == 3 && memcmp(c->recvbuf, "END", 3) == 0 ) {
  193. // get failed for non-existing object.
  194. r->status = YRMCDS_STATUS_NOTFOUND;
  195. goto FINISH;
  196. }
  197. if( resp_len == 5 && memcmp(c->recvbuf, "ERROR", 5) == 0 ) {
  198. r->status = YRMCDS_STATUS_UNKNOWNCOMMAND;
  199. goto FINISH;
  200. }
  201. if( resp_len == 6 ) {
  202. if( memcmp(c->recvbuf, "STORED", 6) == 0 ) {
  203. // successful response for storage commands.
  204. goto FINISH;
  205. }
  206. if( memcmp(c->recvbuf, "EXISTS", 6) == 0 ) {
  207. // failure response for cas.
  208. r->status = YRMCDS_STATUS_EXISTS;
  209. goto FINISH;
  210. }
  211. }
  212. if( resp_len == 7 ) {
  213. if( memcmp(c->recvbuf, "DELETED", 7) == 0 )
  214. // successful response for delete.
  215. goto FINISH;
  216. if( memcmp(c->recvbuf, "TOUCHED", 7) == 0 )
  217. // successful response for touch.
  218. goto FINISH;
  219. }
  220. if( resp_len == 9 && memcmp(c->recvbuf, "NOT_FOUND", 9) == 0 ) {
  221. // failure response for cas, delete, incr, decr, or touch.
  222. r->status = YRMCDS_STATUS_NOTFOUND;
  223. goto FINISH;
  224. }
  225. if( resp_len == 10 && memcmp(c->recvbuf, "NOT_STORED", 10) == 0 ) {
  226. // failure response for add, replace, append, or prepend.
  227. r->status = YRMCDS_STATUS_NOTSTORED;
  228. goto FINISH;
  229. }
  230. if( resp_len > 0 && '0' <= c->recvbuf[0] && c->recvbuf[0] <= '9' ) {
  231. // successful response for incr or decr.
  232. const char* p = c->recvbuf;
  233. PARSE_UINT(value);
  234. r->value = value;
  235. goto FINISH;
  236. }
  237. if( resp_len > 8 && memcmp(c->recvbuf, "VERSION ", 8) == 0 ) {
  238. // successful response for version.
  239. r->data_len = resp_len - 8;
  240. r->data = c->recvbuf + 8;
  241. goto FINISH;
  242. }
  243. if( resp_len > 6 && memcmp(c->recvbuf, "VALUE ", 6) == 0 ) {
  244. // successful response for gets.
  245. const char* p = c->recvbuf + 6;
  246. while( *p == ' ' ) p++;
  247. if( p == pos ) goto UNKNOWN;
  248. const char* key_end = memchr(p, ' ', (size_t)(pos - p));
  249. if( key_end == NULL ) goto UNKNOWN;
  250. r->key = p;
  251. r->key_len = (size_t)(key_end - p);
  252. p = key_end;
  253. PARSE_UINT(flags);
  254. if( *p != ' ' ) goto UNKNOWN;
  255. r->flags = (uint32_t)flags;
  256. PARSE_UINT(bytes);
  257. if( bytes > MAX_CAPACITY ) {
  258. c->invalid = 1;
  259. return YRMCDS_PROTOCOL_ERROR;
  260. }
  261. size_t data_len = (size_t)bytes;
  262. while( *p == ' ' ) p++;
  263. if( *p < '0' || '9' < *p ) goto UNKNOWN;
  264. PARSE_UINT(cas);
  265. size_t required = resp_len + 2 + data_len + 7; // CRLF "END" CRLF
  266. while( c->used < required ) {
  267. yrmcds_error e = recv_data(c);
  268. if( e != 0 ) return e;
  269. }
  270. const char* data = c->recvbuf + (resp_len + 2);
  271. if( memcmp(data + data_len, "\r\nEND\r\n", 7) != 0 ) {
  272. c->invalid = 1;
  273. return YRMCDS_PROTOCOL_ERROR;
  274. }
  275. r->length = required;
  276. r->flags = (uint32_t)flags;
  277. #ifdef LIBYRMCDS_USE_LZ4
  278. if( c->compress_size && (r->flags & YRMCDS_FLAG_COMPRESS) ) {
  279. if( data_len == 0 ) {
  280. c->invalid = 1;
  281. return YRMCDS_PROTOCOL_ERROR;
  282. }
  283. r->flags &= ~(uint32_t)YRMCDS_FLAG_COMPRESS;
  284. uint32_t decompress_size = ntoh32(data);
  285. if( UINT32_MAX > INT_MAX ) {
  286. if( decompress_size > INT_MAX ) {
  287. c->invalid = 1;
  288. return YRMCDS_PROTOCOL_ERROR;
  289. }
  290. }
  291. c->decompressed = (char*)malloc(decompress_size);
  292. if( c->decompressed == NULL )
  293. return YRMCDS_OUT_OF_MEMORY;
  294. int d = LZ4_decompress_safe(data + sizeof(uint32_t),
  295. c->decompressed,
  296. (int)(data_len - sizeof(uint32_t)),
  297. (int)decompress_size);
  298. if( d != decompress_size ) {
  299. c->invalid = 1;
  300. return YRMCDS_PROTOCOL_ERROR;
  301. }
  302. data = c->decompressed;
  303. data_len = (size_t)decompress_size;
  304. }
  305. #endif // LIBYRMCDS_USE_LZ4
  306. r->data = data;
  307. r->data_len = data_len;
  308. r->cas_unique = cas;
  309. goto FINISH;
  310. }
  311. UNKNOWN:
  312. r->status = YRMCDS_STATUS_OTHER;
  313. fprintf(stderr, "[libyrmcds] unknown response: %.*s\n",
  314. (int)resp_len, c->recvbuf);
  315. FINISH:
  316. c->last_size = r->length;
  317. return YRMCDS_OK;
  318. }