send_text.c 12 KB


  1. // (C) 2016 Cybozu
  2. #include "yrmcds_text.h"
  3. #include "yrmcds_portability.h"
  4. #ifdef LIBYRMCDS_USE_LZ4
  5. # include "lz4/lib/lz4.h"
  6. #endif
  7. #include <errno.h>
  8. #include <stdlib.h>
  9. #include <string.h>
  10. #include <sys/socket.h>
  11. #include <sys/types.h>
  12. #include <sys/uio.h>
  13. #include <unistd.h>
  14. #define MAX_KEY_LENGTH 250 // from memcached spec.
  15. #define TEXTBUF_SIZE 1000 // enough for any command & parameters.
  16. #define EXPAND_STR(s) (s), (sizeof(s) - 1)
  17. static const char CRLF[2] = {'\r', '\n'};
  18. #ifdef LIBYRMCDS_USE_LZ4
  19. static inline void
  20. hton32(uint32_t i, char* p) {
  21. uint32_t n = htobe32(i);
  22. memcpy(p, &n, sizeof(n));
  23. }
  24. #endif
  25. static inline yrmcds_error
  26. check_key(const char* key, size_t key_len) {
  27. if( key_len > MAX_KEY_LENGTH )
  28. return YRMCDS_BAD_KEY;
  29. size_t i;
  30. for( i = 0; i < key_len; i++ ) {
  31. char c = key[i];
  32. if( c <= ' ' ) return YRMCDS_BAD_KEY; // SPC and control chars
  33. if( c == 127 ) return YRMCDS_BAD_KEY; // DEL
  34. }
  35. return YRMCDS_OK;
  36. }
  37. typedef struct {
  38. char* pos;
  39. char buffer[TEXTBUF_SIZE];
  40. } textbuf_t;
  41. static inline size_t
  42. textbuf_length(const textbuf_t* buf) {
  43. return (size_t)(buf->pos - buf->buffer);
  44. }
  45. static inline void
  46. textbuf_init(textbuf_t* buf) {
  47. buf->pos = buf->buffer;
  48. }
  49. static inline void
  50. textbuf_append_char(textbuf_t* buf, char c) {
  51. *buf->pos = c;
  52. ++buf->pos;
  53. }
  54. static inline void
  55. textbuf_append_string(textbuf_t* buf, const char* s, size_t len) {
  56. memcpy(buf->pos, s, len);
  57. buf->pos += len;
  58. }
  59. #define textbuf_append_const_string(b, s) \
  60. textbuf_append_string(b, s, sizeof(s) - 1)
  61. static void
  62. textbuf_append_uint64(textbuf_t* buf, uint64_t n) {
  63. // UINT64_MAX = 18446744073709551615 -> char[20]
  64. char nbuf[20];
  65. char* pos = (nbuf) + 20;
  66. do {
  67. pos--;
  68. uint64_t m = n % 10;
  69. n /= 10;
  70. *pos = (char)('0' + m);
  71. } while( n != 0 );
  72. textbuf_append_string(buf, pos, (size_t)(nbuf - pos + 20));
  73. }
  74. static yrmcds_error
  75. send_command(yrmcds* c, textbuf_t* buf, uint32_t* serial) {
  76. memcpy(buf->pos, CRLF, sizeof(CRLF));
  77. buf->pos += sizeof(CRLF);
  78. const char* p = buf->buffer;
  79. size_t len = textbuf_length(buf);
  80. #ifndef LIBYRMCDS_NO_INTERNAL_LOCK
  81. int e = pthread_mutex_lock(&c->lock);
  82. if( e != 0 ) {
  83. errno = e;
  84. return YRMCDS_SYSTEM_ERROR;
  85. }
  86. #endif // ! LIBYRMCDS_NO_INTERNAL_LOCK
  87. c->serial = c->serial + 1;
  88. if( serial != NULL )
  89. *serial = c->serial;
  90. yrmcds_error ret = YRMCDS_OK;
  91. while( len > 0 ) {
  92. ssize_t n = send(c->sock, p, len, 0);
  93. if( n == -1 ) {
  94. if( errno == EINTR ) continue;
  95. ret = YRMCDS_SYSTEM_ERROR;
  96. goto OUT;
  97. }
  98. size_t n2 = (size_t)n;
  99. p += n2;
  100. len -= n2;
  101. }
  102. OUT:
  103. #ifndef LIBYRMCDS_NO_INTERNAL_LOCK
  104. pthread_mutex_unlock(&c->lock);
  105. #endif
  106. return ret;
  107. }
  108. static yrmcds_error
  109. send_data(yrmcds* c, const char* cmd, size_t cmd_len,
  110. const char* key, size_t key_len,
  111. const char* data, size_t data_len,
  112. uint32_t flags, uint32_t expire, uint64_t cas,
  113. int quiet, uint32_t* serial) {
  114. if( key == NULL || key_len == 0 || data == NULL || data_len == 0 || quiet )
  115. return YRMCDS_BAD_ARGUMENT;
  116. yrmcds_error ret;
  117. ret = check_key(key, key_len);
  118. if( ret != YRMCDS_OK ) return ret;
  119. if( cas != 0 ) {
  120. cmd = "cas";
  121. cmd_len = 3;
  122. }
  123. int compressed = 0;
  124. #ifdef LIBYRMCDS_USE_LZ4
  125. if( (c->compress_size > 0) && (data_len > c->compress_size) ) {
  126. if( flags & YRMCDS_FLAG_COMPRESS )
  127. return YRMCDS_BAD_ARGUMENT;
  128. size_t bound = (size_t)LZ4_compressBound((int)data_len);
  129. char* new_data = (char*)malloc(bound + sizeof(uint32_t));
  130. if( new_data == NULL )
  131. return YRMCDS_OUT_OF_MEMORY;
  132. uint32_t new_size =
  133. (uint32_t)LZ4_compress(data, new_data + sizeof(uint32_t),
  134. (int)data_len);
  135. if( new_size == 0 ) {
  136. free(new_data);
  137. return YRMCDS_COMPRESS_FAILED;
  138. }
  139. hton32((uint32_t)data_len, new_data);
  140. flags |= YRMCDS_FLAG_COMPRESS;
  141. data_len = sizeof(uint32_t) + new_size;
  142. data = new_data;
  143. compressed = 1;
  144. }
  145. #endif // LIBYRMCDS_USE_LZ4
  146. textbuf_t buf[1];
  147. textbuf_init(buf);
  148. // "cmd key flags expire bytes (cas)"
  149. textbuf_append_string(buf, cmd, cmd_len);
  150. textbuf_append_char(buf, ' ');
  151. textbuf_append_string(buf, key, key_len);
  152. textbuf_append_char(buf, ' ');
  153. textbuf_append_uint64(buf, flags);
  154. textbuf_append_char(buf, ' ');
  155. textbuf_append_uint64(buf, expire);
  156. textbuf_append_char(buf, ' ');
  157. textbuf_append_uint64(buf, (uint64_t)data_len);
  158. if( cas != 0 ) {
  159. textbuf_append_char(buf, ' ');
  160. textbuf_append_uint64(buf, cas);
  161. }
  162. textbuf_append_string(buf, CRLF, sizeof(CRLF));
  163. struct iovec iov[3];
  164. int iovcnt = 3;
  165. iov[0].iov_base = buf[0].buffer;
  166. iov[0].iov_len = textbuf_length(buf);
  167. iov[1].iov_base = (void*)data;
  168. iov[1].iov_len = data_len;
  169. iov[2].iov_base = (void*)CRLF;
  170. iov[2].iov_len = sizeof(CRLF);
  171. #ifndef LIBYRMCDS_NO_INTERNAL_LOCK
  172. int e = pthread_mutex_lock(&c->lock);
  173. if( e != 0 ) {
  174. errno = e;
  175. return YRMCDS_SYSTEM_ERROR;
  176. }
  177. #endif // ! LIBYRMCDS_NO_INTERNAL_LOCK
  178. c->serial = c->serial + 1;
  179. if( serial != NULL )
  180. *serial = c->serial;
  181. while( iovcnt > 0 ) {
  182. ssize_t n = writev(c->sock, iov, iovcnt);
  183. if( n == -1 ) {
  184. if( errno == EINTR ) continue;
  185. ret = YRMCDS_SYSTEM_ERROR;
  186. goto OUT;
  187. }
  188. size_t n2 = (size_t)n;
  189. while( n2 > 0 ) {
  190. if( n2 < iov[0].iov_len ) {
  191. iov[0].iov_base = (char*)iov[0].iov_base + n2;
  192. iov[0].iov_len -= n2;
  193. break;
  194. }
  195. n2 -= iov[0].iov_len;
  196. iovcnt --;
  197. if( iovcnt == 0 )
  198. break;
  199. int i;
  200. for( i = 0; i < iovcnt; ++i )
  201. iov[i] = iov[i+1];
  202. }
  203. }
  204. OUT:
  205. #ifndef LIBYRMCDS_NO_INTERNAL_LOCK
  206. pthread_mutex_unlock(&c->lock);
  207. #endif
  208. if( compressed )
  209. free((void*)data);
  210. return ret;
  211. }
  212. // public functions.
  213. yrmcds_error yrmcds_text_get(yrmcds* c, const char* key, size_t key_len,
  214. int quiet, uint32_t* serial) {
  215. if( key == NULL || key_len == 0 || quiet )
  216. return YRMCDS_BAD_ARGUMENT;
  217. yrmcds_error ret;
  218. ret = check_key(key, key_len);
  219. if( ret != YRMCDS_OK ) return ret;
  220. textbuf_t buf[1];
  221. textbuf_init(buf);
  222. textbuf_append_const_string(buf, "gets ");
  223. textbuf_append_string(buf, key, key_len);
  224. return send_command(c, buf, serial);
  225. }
  226. yrmcds_error yrmcds_text_touch(yrmcds* c, const char* key, size_t key_len,
  227. uint32_t expire, int quiet, uint32_t* serial) {
  228. if( key == NULL || key_len == 0 || quiet )
  229. return YRMCDS_BAD_ARGUMENT;
  230. yrmcds_error ret;
  231. ret = check_key(key, key_len);
  232. if( ret != YRMCDS_OK ) return ret;
  233. textbuf_t buf[1];
  234. textbuf_init(buf);
  235. textbuf_append_const_string(buf, "touch ");
  236. textbuf_append_string(buf, key, key_len);
  237. textbuf_append_char(buf, ' ');
  238. textbuf_append_uint64(buf, expire);
  239. return send_command(c, buf, serial);
  240. }
  241. yrmcds_error yrmcds_text_set(yrmcds* c, const char* key, size_t key_len,
  242. const char* data, size_t data_len,
  243. uint32_t flags, uint32_t expire, uint64_t cas,
  244. int quiet, uint32_t* serial) {
  245. return send_data(c, EXPAND_STR("set"), key, key_len, data, data_len,
  246. flags, expire, cas, quiet, serial);
  247. }
  248. yrmcds_error yrmcds_text_replace(yrmcds* c, const char* key, size_t key_len,
  249. const char* data, size_t data_len,
  250. uint32_t flags, uint32_t expire, uint64_t cas,
  251. int quiet, uint32_t* serial) {
  252. return send_data(c, EXPAND_STR("replace"), key, key_len, data, data_len,
  253. flags, expire, cas, quiet, serial);
  254. }
  255. yrmcds_error yrmcds_text_add(yrmcds* c, const char* key, size_t key_len,
  256. const char* data, size_t data_len,
  257. uint32_t flags, uint32_t expire, uint64_t cas,
  258. int quiet, uint32_t* serial) {
  259. return send_data(c, EXPAND_STR("add"), key, key_len, data, data_len,
  260. flags, expire, cas, quiet, serial);
  261. }
  262. yrmcds_error yrmcds_text_append(yrmcds* c, const char* key, size_t key_len,
  263. const char* data, size_t data_len,
  264. int quiet, uint32_t* serial) {
  265. return send_data(c, EXPAND_STR("append"), key, key_len, data, data_len,
  266. 0, 0, 0, quiet, serial);
  267. }
  268. yrmcds_error yrmcds_text_prepend(yrmcds* c, const char* key, size_t key_len,
  269. const char* data, size_t data_len,
  270. int quiet, uint32_t* serial) {
  271. return send_data(c, EXPAND_STR("prepend"), key, key_len, data, data_len,
  272. 0, 0, 0, quiet, serial);
  273. }
  274. yrmcds_error yrmcds_text_incr(yrmcds* c, const char* key, size_t key_len,
  275. uint64_t value, int quiet, uint32_t* serial) {
  276. if( key == NULL || key_len == 0 || quiet )
  277. return YRMCDS_BAD_ARGUMENT;
  278. yrmcds_error ret;
  279. ret = check_key(key, key_len);
  280. if( ret != YRMCDS_OK ) return ret;
  281. textbuf_t buf[1];
  282. textbuf_init(buf);
  283. textbuf_append_const_string(buf, "incr ");
  284. textbuf_append_string(buf, key, key_len);
  285. textbuf_append_char(buf, ' ');
  286. textbuf_append_uint64(buf, value);
  287. return send_command(c, buf, serial);
  288. }
  289. yrmcds_error yrmcds_text_decr(yrmcds* c, const char* key, size_t key_len,
  290. uint64_t value, int quiet, uint32_t* serial) {
  291. if( key == NULL || key_len == 0 || quiet )
  292. return YRMCDS_BAD_ARGUMENT;
  293. yrmcds_error ret;
  294. ret = check_key(key, key_len);
  295. if( ret != YRMCDS_OK ) return ret;
  296. textbuf_t buf[1];
  297. textbuf_init(buf);
  298. textbuf_append_const_string(buf, "decr ");
  299. textbuf_append_string(buf, key, key_len);
  300. textbuf_append_char(buf, ' ');
  301. textbuf_append_uint64(buf, value);
  302. return send_command(c, buf, serial);
  303. }
  304. yrmcds_error yrmcds_text_remove(yrmcds* c, const char* key, size_t key_len,
  305. int quiet, uint32_t* serial) {
  306. if( key == NULL || key_len == 0 || quiet )
  307. return YRMCDS_BAD_ARGUMENT;
  308. yrmcds_error ret;
  309. ret = check_key(key, key_len);
  310. if( ret != YRMCDS_OK ) return ret;
  311. textbuf_t buf[1];
  312. textbuf_init(buf);
  313. textbuf_append_const_string(buf, "delete ");
  314. textbuf_append_string(buf, key, key_len);
  315. return send_command(c, buf, serial);
  316. }
  317. yrmcds_error yrmcds_text_flush(yrmcds* c, uint32_t delay,
  318. int quiet, uint32_t* serial) {
  319. if( quiet )
  320. return YRMCDS_BAD_ARGUMENT;
  321. textbuf_t buf[1];
  322. textbuf_init(buf);
  323. textbuf_append_const_string(buf, "flush_all");
  324. if( delay != 0 ) {
  325. textbuf_append_char(buf, ' ');
  326. textbuf_append_uint64(buf, delay);
  327. }
  328. return send_command(c, buf, serial);
  329. }
  330. yrmcds_error yrmcds_text_version(yrmcds* c, uint32_t* serial) {
  331. textbuf_t buf[1];
  332. textbuf_init(buf);
  333. textbuf_append_const_string(buf, "version");
  334. return send_command(c, buf, serial);
  335. }
  336. yrmcds_error yrmcds_text_quit(yrmcds* c, uint32_t* serial) {
  337. textbuf_t buf[1];
  338. textbuf_init(buf);
  339. textbuf_append_const_string(buf, "quit");
  340. return send_command(c, buf, serial);
  341. }