123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- // (C) 2016 Cybozu
- #include "yrmcds_text.h"
- #include "yrmcds_portability.h"
- #ifdef LIBYRMCDS_USE_LZ4
- # include "lz4/lib/lz4.h"
- #endif
- #include <errno.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/socket.h>
- #include <sys/types.h>
- #include <sys/uio.h>
- #include <unistd.h>
- #define MAX_KEY_LENGTH 250 // from memcached spec.
- #define TEXTBUF_SIZE 1000 // enough for any command & parameters.
- #define EXPAND_STR(s) (s), (sizeof(s) - 1)
- static const char CRLF[2] = {'\r', '\n'};
- #ifdef LIBYRMCDS_USE_LZ4
- static inline void
- hton32(uint32_t i, char* p) {
- uint32_t n = htobe32(i);
- memcpy(p, &n, sizeof(n));
- }
- #endif
- static inline yrmcds_error
- check_key(const char* key, size_t key_len) {
- if( key_len > MAX_KEY_LENGTH )
- return YRMCDS_BAD_KEY;
- size_t i;
- for( i = 0; i < key_len; i++ ) {
- char c = key[i];
- if( c <= ' ' ) return YRMCDS_BAD_KEY; // SPC and control chars
- if( c == 127 ) return YRMCDS_BAD_KEY; // DEL
- }
- return YRMCDS_OK;
- }
- typedef struct {
- char* pos;
- char buffer[TEXTBUF_SIZE];
- } textbuf_t;
- static inline size_t
- textbuf_length(const textbuf_t* buf) {
- return (size_t)(buf->pos - buf->buffer);
- }
- static inline void
- textbuf_init(textbuf_t* buf) {
- buf->pos = buf->buffer;
- }
- static inline void
- textbuf_append_char(textbuf_t* buf, char c) {
- *buf->pos = c;
- ++buf->pos;
- }
- static inline void
- textbuf_append_string(textbuf_t* buf, const char* s, size_t len) {
- memcpy(buf->pos, s, len);
- buf->pos += len;
- }
- #define textbuf_append_const_string(b, s) \
- textbuf_append_string(b, s, sizeof(s) - 1)
- static void
- textbuf_append_uint64(textbuf_t* buf, uint64_t n) {
- // UINT64_MAX = 18446744073709551615 -> char[20]
- char nbuf[20];
- char* pos = (nbuf) + 20;
- do {
- pos--;
- uint64_t m = n % 10;
- n /= 10;
- *pos = (char)('0' + m);
- } while( n != 0 );
- textbuf_append_string(buf, pos, (size_t)(nbuf - pos + 20));
- }
- static yrmcds_error
- send_command(yrmcds* c, textbuf_t* buf, uint32_t* serial) {
- memcpy(buf->pos, CRLF, sizeof(CRLF));
- buf->pos += sizeof(CRLF);
- const char* p = buf->buffer;
- size_t len = textbuf_length(buf);
- #ifndef LIBYRMCDS_NO_INTERNAL_LOCK
- int e = pthread_mutex_lock(&c->lock);
- if( e != 0 ) {
- errno = e;
- return YRMCDS_SYSTEM_ERROR;
- }
- #endif // ! LIBYRMCDS_NO_INTERNAL_LOCK
- c->serial = c->serial + 1;
- if( serial != NULL )
- *serial = c->serial;
- yrmcds_error ret = YRMCDS_OK;
- while( len > 0 ) {
- ssize_t n = send(c->sock, p, len, 0);
- if( n == -1 ) {
- if( errno == EINTR ) continue;
- ret = YRMCDS_SYSTEM_ERROR;
- goto OUT;
- }
- size_t n2 = (size_t)n;
- p += n2;
- len -= n2;
- }
- OUT:
- #ifndef LIBYRMCDS_NO_INTERNAL_LOCK
- pthread_mutex_unlock(&c->lock);
- #endif
- return ret;
- }
- static yrmcds_error
- send_data(yrmcds* c, const char* cmd, size_t cmd_len,
- const char* key, size_t key_len,
- const char* data, size_t data_len,
- uint32_t flags, uint32_t expire, uint64_t cas,
- int quiet, uint32_t* serial) {
- if( key == NULL || key_len == 0 || data == NULL || data_len == 0 || quiet )
- return YRMCDS_BAD_ARGUMENT;
- yrmcds_error ret;
- ret = check_key(key, key_len);
- if( ret != YRMCDS_OK ) return ret;
- if( cas != 0 ) {
- cmd = "cas";
- cmd_len = 3;
- }
- int compressed = 0;
- #ifdef LIBYRMCDS_USE_LZ4
- if( (c->compress_size > 0) && (data_len > c->compress_size) ) {
- if( flags & YRMCDS_FLAG_COMPRESS )
- return YRMCDS_BAD_ARGUMENT;
- size_t bound = (size_t)LZ4_compressBound((int)data_len);
- char* new_data = (char*)malloc(bound + sizeof(uint32_t));
- if( new_data == NULL )
- return YRMCDS_OUT_OF_MEMORY;
- uint32_t new_size =
- (uint32_t)LZ4_compress(data, new_data + sizeof(uint32_t),
- (int)data_len);
- if( new_size == 0 ) {
- free(new_data);
- return YRMCDS_COMPRESS_FAILED;
- }
- hton32((uint32_t)data_len, new_data);
- flags |= YRMCDS_FLAG_COMPRESS;
- data_len = sizeof(uint32_t) + new_size;
- data = new_data;
- compressed = 1;
- }
- #endif // LIBYRMCDS_USE_LZ4
- textbuf_t buf[1];
- textbuf_init(buf);
- // "cmd key flags expire bytes (cas)"
- textbuf_append_string(buf, cmd, cmd_len);
- textbuf_append_char(buf, ' ');
- textbuf_append_string(buf, key, key_len);
- textbuf_append_char(buf, ' ');
- textbuf_append_uint64(buf, flags);
- textbuf_append_char(buf, ' ');
- textbuf_append_uint64(buf, expire);
- textbuf_append_char(buf, ' ');
- textbuf_append_uint64(buf, (uint64_t)data_len);
- if( cas != 0 ) {
- textbuf_append_char(buf, ' ');
- textbuf_append_uint64(buf, cas);
- }
- textbuf_append_string(buf, CRLF, sizeof(CRLF));
- struct iovec iov[3];
- int iovcnt = 3;
- iov[0].iov_base = buf[0].buffer;
- iov[0].iov_len = textbuf_length(buf);
- iov[1].iov_base = (void*)data;
- iov[1].iov_len = data_len;
- iov[2].iov_base = (void*)CRLF;
- iov[2].iov_len = sizeof(CRLF);
- #ifndef LIBYRMCDS_NO_INTERNAL_LOCK
- int e = pthread_mutex_lock(&c->lock);
- if( e != 0 ) {
- errno = e;
- return YRMCDS_SYSTEM_ERROR;
- }
- #endif // ! LIBYRMCDS_NO_INTERNAL_LOCK
- c->serial = c->serial + 1;
- if( serial != NULL )
- *serial = c->serial;
- while( iovcnt > 0 ) {
- ssize_t n = writev(c->sock, iov, iovcnt);
- if( n == -1 ) {
- if( errno == EINTR ) continue;
- ret = YRMCDS_SYSTEM_ERROR;
- goto OUT;
- }
- size_t n2 = (size_t)n;
- while( n2 > 0 ) {
- if( n2 < iov[0].iov_len ) {
- iov[0].iov_base = (char*)iov[0].iov_base + n2;
- iov[0].iov_len -= n2;
- break;
- }
- n2 -= iov[0].iov_len;
- iovcnt --;
- if( iovcnt == 0 )
- break;
- int i;
- for( i = 0; i < iovcnt; ++i )
- iov[i] = iov[i+1];
- }
- }
- OUT:
- #ifndef LIBYRMCDS_NO_INTERNAL_LOCK
- pthread_mutex_unlock(&c->lock);
- #endif
- if( compressed )
- free((void*)data);
- return ret;
- }
- // public functions.
- yrmcds_error yrmcds_text_get(yrmcds* c, const char* key, size_t key_len,
- int quiet, uint32_t* serial) {
- if( key == NULL || key_len == 0 || quiet )
- return YRMCDS_BAD_ARGUMENT;
- yrmcds_error ret;
- ret = check_key(key, key_len);
- if( ret != YRMCDS_OK ) return ret;
- textbuf_t buf[1];
- textbuf_init(buf);
- textbuf_append_const_string(buf, "gets ");
- textbuf_append_string(buf, key, key_len);
- return send_command(c, buf, serial);
- }
- yrmcds_error yrmcds_text_touch(yrmcds* c, const char* key, size_t key_len,
- uint32_t expire, int quiet, uint32_t* serial) {
- if( key == NULL || key_len == 0 || quiet )
- return YRMCDS_BAD_ARGUMENT;
- yrmcds_error ret;
- ret = check_key(key, key_len);
- if( ret != YRMCDS_OK ) return ret;
- textbuf_t buf[1];
- textbuf_init(buf);
- textbuf_append_const_string(buf, "touch ");
- textbuf_append_string(buf, key, key_len);
- textbuf_append_char(buf, ' ');
- textbuf_append_uint64(buf, expire);
- return send_command(c, buf, serial);
- }
- yrmcds_error yrmcds_text_set(yrmcds* c, const char* key, size_t key_len,
- const char* data, size_t data_len,
- uint32_t flags, uint32_t expire, uint64_t cas,
- int quiet, uint32_t* serial) {
- return send_data(c, EXPAND_STR("set"), key, key_len, data, data_len,
- flags, expire, cas, quiet, serial);
- }
- yrmcds_error yrmcds_text_replace(yrmcds* c, const char* key, size_t key_len,
- const char* data, size_t data_len,
- uint32_t flags, uint32_t expire, uint64_t cas,
- int quiet, uint32_t* serial) {
- return send_data(c, EXPAND_STR("replace"), key, key_len, data, data_len,
- flags, expire, cas, quiet, serial);
- }
- yrmcds_error yrmcds_text_add(yrmcds* c, const char* key, size_t key_len,
- const char* data, size_t data_len,
- uint32_t flags, uint32_t expire, uint64_t cas,
- int quiet, uint32_t* serial) {
- return send_data(c, EXPAND_STR("add"), key, key_len, data, data_len,
- flags, expire, cas, quiet, serial);
- }
- yrmcds_error yrmcds_text_append(yrmcds* c, const char* key, size_t key_len,
- const char* data, size_t data_len,
- int quiet, uint32_t* serial) {
- return send_data(c, EXPAND_STR("append"), key, key_len, data, data_len,
- 0, 0, 0, quiet, serial);
- }
- yrmcds_error yrmcds_text_prepend(yrmcds* c, const char* key, size_t key_len,
- const char* data, size_t data_len,
- int quiet, uint32_t* serial) {
- return send_data(c, EXPAND_STR("prepend"), key, key_len, data, data_len,
- 0, 0, 0, quiet, serial);
- }
- yrmcds_error yrmcds_text_incr(yrmcds* c, const char* key, size_t key_len,
- uint64_t value, int quiet, uint32_t* serial) {
- if( key == NULL || key_len == 0 || quiet )
- return YRMCDS_BAD_ARGUMENT;
- yrmcds_error ret;
- ret = check_key(key, key_len);
- if( ret != YRMCDS_OK ) return ret;
- textbuf_t buf[1];
- textbuf_init(buf);
- textbuf_append_const_string(buf, "incr ");
- textbuf_append_string(buf, key, key_len);
- textbuf_append_char(buf, ' ');
- textbuf_append_uint64(buf, value);
- return send_command(c, buf, serial);
- }
- yrmcds_error yrmcds_text_decr(yrmcds* c, const char* key, size_t key_len,
- uint64_t value, int quiet, uint32_t* serial) {
- if( key == NULL || key_len == 0 || quiet )
- return YRMCDS_BAD_ARGUMENT;
- yrmcds_error ret;
- ret = check_key(key, key_len);
- if( ret != YRMCDS_OK ) return ret;
- textbuf_t buf[1];
- textbuf_init(buf);
- textbuf_append_const_string(buf, "decr ");
- textbuf_append_string(buf, key, key_len);
- textbuf_append_char(buf, ' ');
- textbuf_append_uint64(buf, value);
- return send_command(c, buf, serial);
- }
- yrmcds_error yrmcds_text_remove(yrmcds* c, const char* key, size_t key_len,
- int quiet, uint32_t* serial) {
- if( key == NULL || key_len == 0 || quiet )
- return YRMCDS_BAD_ARGUMENT;
- yrmcds_error ret;
- ret = check_key(key, key_len);
- if( ret != YRMCDS_OK ) return ret;
- textbuf_t buf[1];
- textbuf_init(buf);
- textbuf_append_const_string(buf, "delete ");
- textbuf_append_string(buf, key, key_len);
- return send_command(c, buf, serial);
- }
- yrmcds_error yrmcds_text_flush(yrmcds* c, uint32_t delay,
- int quiet, uint32_t* serial) {
- if( quiet )
- return YRMCDS_BAD_ARGUMENT;
- textbuf_t buf[1];
- textbuf_init(buf);
- textbuf_append_const_string(buf, "flush_all");
- if( delay != 0 ) {
- textbuf_append_char(buf, ' ');
- textbuf_append_uint64(buf, delay);
- }
- return send_command(c, buf, serial);
- }
- yrmcds_error yrmcds_text_version(yrmcds* c, uint32_t* serial) {
- textbuf_t buf[1];
- textbuf_init(buf);
- textbuf_append_const_string(buf, "version");
- return send_command(c, buf, serial);
- }
- yrmcds_error yrmcds_text_quit(yrmcds* c, uint32_t* serial) {
- textbuf_t buf[1];
- textbuf_init(buf);
- textbuf_append_const_string(buf, "quit");
- return send_command(c, buf, serial);
- }
|