counter.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. // (C) 2013-2015 Cybozu et al.
  2. #include "yrmcds.h"
  3. #include "yrmcds_portability.h"
  4. #include <errno.h>
  5. #include <fcntl.h>
  6. #include <limits.h>
  7. #include <netdb.h>
  8. #include <netinet/tcp.h>
  9. #include <poll.h>
  10. #include <stdio.h>
  11. #include <stdlib.h>
  12. #include <string.h>
  13. #include <sys/socket.h>
  14. #include <sys/types.h>
  15. #include <sys/uio.h>
  16. #include <unistd.h>
  17. static const size_t HEADER_SIZE = 12;
  18. static const size_t RECV_SIZE = 4096;
  19. static const size_t INITIAL_STATS_CAPACITY = 16;
  20. static inline void hton32(uint32_t i, char* p) {
  21. uint32_t n = htobe32(i);
  22. memcpy(p, &n, sizeof(n));
  23. }
  24. static inline void hton16(uint16_t i, char* p) {
  25. uint16_t n = htobe16(i);
  26. memcpy(p, &n, sizeof(n));
  27. }
  28. static inline uint32_t ntoh32(const char* p) {
  29. uint32_t n;
  30. memcpy(&n, p, sizeof(n));
  31. return be32toh(n);
  32. }
  33. static inline uint16_t ntoh16(const char* p) {
  34. uint16_t n;
  35. memcpy(&n, p, sizeof(n));
  36. return be16toh(n);
  37. }
  38. yrmcds_error
  39. yrmcds_cnt_set_timeout(yrmcds_cnt* c, int timeout) {
  40. if( c == NULL || timeout < 0 )
  41. return YRMCDS_BAD_ARGUMENT;
  42. struct timeval tv;
  43. tv.tv_sec = timeout;
  44. tv.tv_usec = 0;
  45. if( setsockopt(c->sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1 )
  46. return YRMCDS_SYSTEM_ERROR;
  47. if( setsockopt(c->sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1 )
  48. return YRMCDS_SYSTEM_ERROR;
  49. return YRMCDS_OK;
  50. }
  51. yrmcds_error
  52. yrmcds_cnt_close(yrmcds_cnt* c) {
  53. if( c == NULL )
  54. return YRMCDS_BAD_ARGUMENT;
  55. if( c->sock == -1 )
  56. return YRMCDS_OK;
  57. close(c->sock);
  58. c->sock = -1;
  59. #ifndef LIBYRMCDS_NO_INTERNAL_LOCK
  60. pthread_mutex_destroy(&(c->lock));
  61. #endif
  62. free(c->recvbuf);
  63. c->recvbuf = NULL;
  64. free(c->stats.records);
  65. c->stats.records = NULL;
  66. return YRMCDS_OK;
  67. }
  68. yrmcds_error
  69. yrmcds_cnt_shutdown(yrmcds_cnt* c) {
  70. if( c == NULL )
  71. return YRMCDS_BAD_ARGUMENT;
  72. if( shutdown(c->sock, SHUT_RD) == -1 )
  73. return YRMCDS_SYSTEM_ERROR;
  74. return YRMCDS_OK;
  75. }
  76. int
  77. yrmcds_cnt_fileno(yrmcds_cnt* c) {
  78. return c->sock;
  79. }
  80. static yrmcds_error
  81. recv_data(yrmcds_cnt* c) {
  82. if( (c->capacity - c->used) < RECV_SIZE ) {
  83. size_t new_capacity = c->capacity * 2;
  84. char* new_buffer = (char*)realloc(c->recvbuf, new_capacity);
  85. if( new_buffer == NULL )
  86. return YRMCDS_OUT_OF_MEMORY;
  87. c->recvbuf = new_buffer;
  88. c->capacity = new_capacity;
  89. }
  90. ssize_t n;
  91. AGAIN:
  92. n = recv(c->sock, c->recvbuf + c->used, RECV_SIZE, 0);
  93. if( n == -1 ) {
  94. if( errno == EINTR ) goto AGAIN;
  95. return YRMCDS_SYSTEM_ERROR;
  96. }
  97. if( n == 0 )
  98. return YRMCDS_DISCONNECTED;
  99. c->used += (size_t)n;
  100. return YRMCDS_OK;
  101. }
  102. static yrmcds_error
  103. append_stat(yrmcds_cnt_statistics* s,
  104. uint16_t name_len, uint16_t value_len,
  105. const char* name, const char* value) {
  106. if( s->count == s->capacity ) {
  107. size_t new_capacity = s->capacity * 2;
  108. if( new_capacity < INITIAL_STATS_CAPACITY )
  109. new_capacity = INITIAL_STATS_CAPACITY;
  110. yrmcds_cnt_stat* new_records =
  111. realloc(s->records, sizeof(yrmcds_cnt_stat) * new_capacity);
  112. if( new_records == NULL )
  113. return YRMCDS_OUT_OF_MEMORY;
  114. s->capacity = new_capacity;
  115. s->records = new_records;
  116. }
  117. s->records[s->count].name_length = name_len;
  118. s->records[s->count].value_length = value_len;
  119. s->records[s->count].name = name;
  120. s->records[s->count].value = value;
  121. s->count += 1;
  122. return YRMCDS_OK;
  123. }
  124. static yrmcds_error
  125. parse_statistics(yrmcds_cnt* c, const yrmcds_cnt_response* r) {
  126. yrmcds_cnt_statistics* s = &c->stats;
  127. s->count = 0;
  128. const char* p = r->body;
  129. const char* end = r->body + r->body_length;
  130. while( p < end ) {
  131. if( p + 4 > end )
  132. return YRMCDS_PROTOCOL_ERROR;
  133. uint16_t name_len = ntoh16(p);
  134. uint16_t value_len = ntoh16(p + 2);
  135. if( p + 4 + name_len + value_len > end )
  136. return YRMCDS_PROTOCOL_ERROR;
  137. yrmcds_error err =
  138. append_stat(s, name_len, value_len, p + 4, p + 4 + name_len);
  139. if( err != YRMCDS_OK )
  140. return err;
  141. p += 4 + name_len + value_len;
  142. }
  143. return YRMCDS_OK;
  144. }
  145. static yrmcds_error
  146. parse_dump_record(yrmcds_cnt* c, yrmcds_cnt_response* r) {
  147. if( r->body_length == 0 ) {
  148. // End of dump
  149. return YRMCDS_OK;
  150. }
  151. if( r->body_length < 10 ) {
  152. c->invalid = 1;
  153. return YRMCDS_PROTOCOL_ERROR;
  154. }
  155. r->current_consumption = ntoh32(r->body);
  156. r->max_consumption = ntoh32(r->body + 4);
  157. r->name_length = ntoh16(r->body + 8);
  158. if( r->body_length < 10 + r->name_length ) {
  159. c->invalid = 1;
  160. return YRMCDS_PROTOCOL_ERROR;
  161. }
  162. r->name = r->body + 10;
  163. return YRMCDS_OK;
  164. }
  165. yrmcds_error
  166. yrmcds_cnt_recv(yrmcds_cnt* c, yrmcds_cnt_response* r) {
  167. if( c == NULL || r == NULL )
  168. return YRMCDS_BAD_ARGUMENT;
  169. if( c->invalid )
  170. return YRMCDS_PROTOCOL_ERROR;
  171. if( c->last_size > 0 ) {
  172. size_t remain = c->used - c->last_size;
  173. if( remain > 0 )
  174. memmove(c->recvbuf, c->recvbuf + c->last_size, remain);
  175. c->used = remain;
  176. c->last_size = 0;
  177. }
  178. while( c->used < HEADER_SIZE ) {
  179. yrmcds_error e = recv_data(c);
  180. if( e != YRMCDS_OK ) return e;
  181. }
  182. if( (uint8_t)c->recvbuf[0] != 0x91 ) {
  183. c->invalid = 1;
  184. return YRMCDS_PROTOCOL_ERROR;
  185. }
  186. r->command = (yrmcds_cnt_command)c->recvbuf[1];
  187. r->status = (yrmcds_cnt_status)c->recvbuf[2];
  188. r->body_length = ntoh32(c->recvbuf + 4);
  189. memcpy(&r->serial, c->recvbuf + 8, sizeof(r->serial));
  190. r->body = NULL;
  191. r->resources = 0;
  192. r->current_consumption = 0;
  193. r->max_consumption = 0;
  194. r->name_length = 0;
  195. r->stats = NULL;
  196. if( r->body_length > 0 ) {
  197. while( c->used < HEADER_SIZE + r->body_length ) {
  198. yrmcds_error e = recv_data(c);
  199. if( e != YRMCDS_OK ) return e;
  200. }
  201. r->body = c->recvbuf + HEADER_SIZE;
  202. }
  203. c->last_size = HEADER_SIZE + r->body_length;
  204. if( r->status != YRMCDS_STATUS_OK )
  205. return YRMCDS_OK;
  206. yrmcds_error err;
  207. switch( r->command ) {
  208. case YRMCDS_CNT_CMD_GET:
  209. if( r->body_length < 4 ) {
  210. c->invalid = 1;
  211. return YRMCDS_PROTOCOL_ERROR;
  212. }
  213. r->current_consumption = ntoh32(r->body);
  214. break;
  215. case YRMCDS_CNT_CMD_ACQUIRE:
  216. if( r->body_length < 4 ) {
  217. c->invalid = 1;
  218. return YRMCDS_PROTOCOL_ERROR;
  219. }
  220. r->resources = ntoh32(r->body);
  221. break;
  222. case YRMCDS_CNT_CMD_STATS:
  223. err = parse_statistics(c, r);
  224. if( err != YRMCDS_OK ) {
  225. c->invalid = 1;
  226. return err;
  227. }
  228. r->stats = &c->stats;
  229. break;
  230. case YRMCDS_CNT_CMD_DUMP:
  231. err = parse_dump_record(c, r);
  232. if( err != YRMCDS_OK ) {
  233. c->invalid = 1;
  234. return err;
  235. }
  236. break;
  237. default:
  238. // No body
  239. break;
  240. }
  241. return YRMCDS_OK;
  242. }
  243. static yrmcds_error
  244. send_command(yrmcds_cnt* c, yrmcds_cnt_command cmd, uint32_t* serial,
  245. size_t body1_len, const char* body1,
  246. size_t body2_len, const char* body2) {
  247. if( c == NULL ||
  248. body1_len > UINT32_MAX - body2_len ||
  249. (body1_len != 0 && body1 == NULL) ||
  250. (body2_len != 0 && body2 == NULL) )
  251. return YRMCDS_BAD_ARGUMENT;
  252. #ifndef LIBYRMCDS_NO_INTERNAL_LOCK
  253. int e = pthread_mutex_lock(&c->lock);
  254. if( e != 0 ) {
  255. errno = e;
  256. return YRMCDS_SYSTEM_ERROR;
  257. }
  258. #endif // ! LIBYRMCDS_NO_INTERNAL_LOCK
  259. c->serial += 1;
  260. if( serial != NULL )
  261. *serial = c->serial;
  262. char header[HEADER_SIZE];
  263. header[0] = '\x90';
  264. header[1] = (char)cmd;
  265. header[2] = 0;
  266. header[3] = 0;
  267. hton32((uint32_t)(body1_len + body2_len), header + 4);
  268. memcpy(header + 8, &c->serial, 4);
  269. yrmcds_error ret = YRMCDS_OK;
  270. struct iovec iov[3];
  271. size_t iovcnt = 1;
  272. iov[0].iov_base = header;
  273. iov[0].iov_len = HEADER_SIZE;
  274. if( body1_len != 0 ) {
  275. iov[iovcnt].iov_base = (void*)body1;
  276. iov[iovcnt].iov_len = body1_len;
  277. ++iovcnt;
  278. }
  279. if( body2_len != 0 ) {
  280. iov[iovcnt].iov_base = (void*)body2;
  281. iov[iovcnt].iov_len = body2_len;
  282. ++iovcnt;
  283. }
  284. size_t i;
  285. for( i = 0; i < iovcnt; ) {
  286. ssize_t n = writev(c->sock, iov + i, (int)(iovcnt - i));
  287. size_t n2 = (size_t)n;
  288. if( n == -1 ) {
  289. if( errno == EINTR ) continue;
  290. ret = YRMCDS_SYSTEM_ERROR;
  291. break;
  292. }
  293. while( n2 > 0 ) {
  294. if( n2 < iov[i].iov_len ) {
  295. iov[i].iov_base = (char*)iov[i].iov_base + n2;
  296. iov[i].iov_len -= n2;
  297. break;
  298. }
  299. n2 -= iov[i].iov_len;
  300. ++i;
  301. }
  302. }
  303. #ifndef LIBYRMCDS_NO_INTERNAL_LOCK
  304. pthread_mutex_unlock(&c->lock);
  305. #endif
  306. return ret;
  307. }
  308. yrmcds_error
  309. yrmcds_cnt_noop(yrmcds_cnt* c, uint32_t* serial) {
  310. return send_command(c, YRMCDS_CNT_CMD_NOOP, serial,
  311. 0, NULL, 0, NULL);
  312. }
  313. yrmcds_error
  314. yrmcds_cnt_get(yrmcds_cnt* c, const char* name, size_t name_len,
  315. uint32_t* serial) {
  316. if( name == NULL || name_len == 0 || name_len > UINT16_MAX )
  317. return YRMCDS_BAD_ARGUMENT;
  318. char body[2];
  319. hton16((uint16_t)name_len, body);
  320. return send_command(c, YRMCDS_CNT_CMD_GET, serial,
  321. sizeof(body), body, name_len, name);
  322. }
  323. yrmcds_error
  324. yrmcds_cnt_acquire(yrmcds_cnt* c, const char* name, size_t name_len,
  325. uint32_t resources, uint32_t initial, uint32_t* serial) {
  326. if( name == NULL || name_len == 0 || name_len > UINT16_MAX ||
  327. resources == 0 || resources > initial )
  328. return YRMCDS_BAD_ARGUMENT;
  329. char body[10];
  330. hton32(resources, body);
  331. hton32(initial, body + 4);
  332. hton16((uint16_t)name_len, body + 8);
  333. return send_command(c, YRMCDS_CNT_CMD_ACQUIRE, serial,
  334. sizeof(body), body, name_len, name);
  335. }
  336. yrmcds_error
  337. yrmcds_cnt_release(yrmcds_cnt* c, const char* name, size_t name_len,
  338. uint32_t resources, uint32_t* serial) {
  339. if( name == NULL || name_len == 0 || name_len > UINT16_MAX )
  340. return YRMCDS_BAD_ARGUMENT;
  341. char body[6];
  342. hton32(resources, body);
  343. hton16((uint16_t)name_len, body + 4);
  344. return send_command(c, YRMCDS_CNT_CMD_RELEASE, serial,
  345. sizeof(body), body, name_len, name);
  346. }
  347. yrmcds_error
  348. yrmcds_cnt_stats(yrmcds_cnt* c, uint32_t* serial) {
  349. return send_command(c, YRMCDS_CNT_CMD_STATS, serial,
  350. 0, NULL, 0, NULL);
  351. }
  352. yrmcds_error
  353. yrmcds_cnt_dump(yrmcds_cnt* c, uint32_t* serial) {
  354. return send_command(c, YRMCDS_CNT_CMD_DUMP, serial,
  355. 0, NULL, 0, NULL);
  356. }