关键词搜索

源码搜索 ×
×

漫话Redis源码之二十五

发布2021-12-05浏览559次

详情内容

这里的函数主要是RIO的读写实现:

  1. static const rio rioFileIO = {
  2. rioFileRead,
  3. rioFileWrite,
  4. rioFileTell,
  5. rioFileFlush,
  6. NULL, /* update_checksum */
  7. 0, /* current checksum */
  8. 0, /* flags */
  9. 0, /* bytes read or written */
  10. 0, /* read/write chunk size */
  11. { { NULL, 0 } } /* union for io-specific vars */
  12. };
  13. void rioInitWithFile(rio *r, FILE *fp) {
  14. *r = rioFileIO;
  15. r->io.file.fp = fp;
  16. r->io.file.buffered = 0;
  17. r->io.file.autosync = 0;
  18. }
  19. /* ------------------- Connection implementation -------------------
  20. * We use this RIO implementation when reading an RDB file directly from
  21. * the connection to the memory via rdbLoadRio(), thus this implementation
  22. * only implements reading from a connection that is, normally,
  23. * just a socket. */
  24. static size_t rioConnWrite(rio *r, const void *buf, size_t len) {
  25. UNUSED(r);
  26. UNUSED(buf);
  27. UNUSED(len);
  28. return 0; /* Error, this target does not yet support writing. */
  29. }
  30. /* Returns 1 or 0 for success/failure. */
  31. static size_t rioConnRead(rio *r, void *buf, size_t len) {
  32. size_t avail = sdslen(r->io.conn.buf)-r->io.conn.pos;
  33. /* If the buffer is too small for the entire request: realloc. */
  34. if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len)
  35. r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf));
  36. /* If the remaining unused buffer is not large enough: memmove so that we
  37. * can read the rest. */
  38. if (len > avail && sdsavail(r->io.conn.buf) < len - avail) {
  39. sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
  40. r->io.conn.pos = 0;
  41. }
  42. /* If we don't already have all the data in the sds, read more */
  43. while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) {
  44. size_t buffered = sdslen(r->io.conn.buf) - r->io.conn.pos;
  45. size_t needs = len - buffered;
  46. /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of
  47. * the two. */
  48. size_t toread = needs < PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN: needs;
  49. if (toread > sdsavail(r->io.conn.buf)) toread = sdsavail(r->io.conn.buf);
  50. if (r->io.conn.read_limit != 0 &&
  51. r->io.conn.read_so_far + buffered + toread > r->io.conn.read_limit)
  52. {
  53. /* Make sure the caller didn't request to read past the limit.
  54. * If they didn't we'll buffer till the limit, if they did, we'll
  55. * return an error. */
  56. if (r->io.conn.read_limit >= r->io.conn.read_so_far + len)
  57. toread = r->io.conn.read_limit - r->io.conn.read_so_far - buffered;
  58. else {
  59. errno = EOVERFLOW;
  60. return 0;
  61. }
  62. }
  63. int retval = connRead(r->io.conn.conn,
  64. (char*)r->io.conn.buf + sdslen(r->io.conn.buf),
  65. toread);
  66. if (retval <= 0) {
  67. if (errno == EWOULDBLOCK) errno = ETIMEDOUT;
  68. return 0;
  69. }
  70. sdsIncrLen(r->io.conn.buf, retval);
  71. }
  72. memcpy(buf, (char*)r->io.conn.buf + r->io.conn.pos, len);
  73. r->io.conn.read_so_far += len;
  74. r->io.conn.pos += len;
  75. return len;
  76. }
  77. /* Returns read/write position in file. */
  78. static off_t rioConnTell(rio *r) {
  79. return r->io.conn.read_so_far;
  80. }
  81. /* Flushes any buffer to target device if applicable. Returns 1 on success
  82. * and 0 on failures. */
  83. static int rioConnFlush(rio *r) {
  84. /* Our flush is implemented by the write method, that recognizes a
  85. * buffer set to NULL with a count of zero as a flush request. */
  86. return rioConnWrite(r,NULL,0);
  87. }
  88. static const rio rioConnIO = {
  89. rioConnRead,
  90. rioConnWrite,
  91. rioConnTell,
  92. rioConnFlush,
  93. NULL, /* update_checksum */
  94. 0, /* current checksum */
  95. 0, /* flags */
  96. 0, /* bytes read or written */
  97. 0, /* read/write chunk size */
  98. { { NULL, 0 } } /* union for io-specific vars */
  99. };
  100. /* Create an RIO that implements a buffered read from an fd
  101. * read_limit argument stops buffering when the reaching the limit. */
  102. void rioInitWithConn(rio *r, connection *conn, size_t read_limit) {
  103. *r = rioConnIO;
  104. r->io.conn.conn = conn;
  105. r->io.conn.pos = 0;
  106. r->io.conn.read_limit = read_limit;
  107. r->io.conn.read_so_far = 0;
  108. r->io.conn.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
  109. sdsclear(r->io.conn.buf);
  110. }
  111. /* Release the RIO stream. Optionally returns the unread buffered data
  112. * when the SDS pointer 'remaining' is passed. */
  113. void rioFreeConn(rio *r, sds *remaining) {
  114. if (remaining && (size_t)r->io.conn.pos < sdslen(r->io.conn.buf)) {
  115. if (r->io.conn.pos > 0) sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
  116. *remaining = r->io.conn.buf;
  117. } else {
  118. sdsfree(r->io.conn.buf);
  119. if (remaining) *remaining = NULL;
  120. }
  121. r->io.conn.buf = NULL;
  122. }
  123. /* ------------------- File descriptor implementation ------------------
  124. * This target is used to write the RDB file to pipe, when the master just
  125. * streams the data to the replicas without creating an RDB on-disk image
  126. * (diskless replication option).
  127. * It only implements writes. */
  128. /* Returns 1 or 0 for success/failure.
  129. *
  130. * When buf is NULL and len is 0, the function performs a flush operation
  131. * if there is some pending buffer, so this function is also used in order
  132. * to implement rioFdFlush(). */
  133. static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
  134. ssize_t retval;
  135. unsigned char *p = (unsigned char*) buf;
  136. int doflush = (buf == NULL && len == 0);
  137. /* For small writes, we rather keep the data in user-space buffer, and flush
  138. * it only when it grows. however for larger writes, we prefer to flush
  139. * any pre-existing buffer, and write the new one directly without reallocs
  140. * and memory copying. */
  141. if (len > PROTO_IOBUF_LEN) {
  142. /* First, flush any pre-existing buffered data. */
  143. if (sdslen(r->io.fd.buf)) {
  144. if (rioFdWrite(r, NULL, 0) == 0)
  145. return 0;
  146. }
  147. /* Write the new data, keeping 'p' and 'len' from the input. */
  148. } else {
  149. if (len) {
  150. r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len);
  151. if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN)
  152. doflush = 1;
  153. if (!doflush)
  154. return 1;
  155. }
  156. /* Flusing the buffered data. set 'p' and 'len' accordintly. */
  157. p = (unsigned char*) r->io.fd.buf;
  158. len = sdslen(r->io.fd.buf);
  159. }
  160. size_t nwritten = 0;
  161. while(nwritten != len) {
  162. retval = write(r->io.fd.fd,p+nwritten,len-nwritten);
  163. if (retval <= 0) {
  164. /* With blocking io, which is the sole user of this
  165. * rio target, EWOULDBLOCK is returned only because of
  166. * the SO_SNDTIMEO socket option, so we translate the error
  167. * into one more recognizable by the user. */
  168. if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
  169. return 0; /* error. */
  170. }
  171. nwritten += retval;
  172. }
  173. r->io.fd.pos += len;
  174. sdsclear(r->io.fd.buf);
  175. return 1;
  176. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载