关键词搜索

源码搜索 ×
×

漫话Redis源码之七十五

发布2022-02-13浏览463次

详情内容

这里是管道相关的控制代码,简要看下就行:

  1. #include "server.h"
  2. #include <unistd.h>
  3. typedef struct {
  4. size_t keys;
  5. size_t cow;
  6. monotime cow_updated;
  7. double progress;
  8. childInfoType information_type; /* Type of information */
  9. } child_info_data;
  10. /* Open a child-parent channel used in order to move information about the
  11. * RDB / AOF saving process from the child to the parent (for instance
  12. * the amount of copy on write memory used) */
  13. void openChildInfoPipe(void) {
  14. if (pipe(server.child_info_pipe) == -1) {
  15. /* On error our two file descriptors should be still set to -1,
  16. * but we call anyway closeChildInfoPipe() since can't hurt. */
  17. closeChildInfoPipe();
  18. } else if (anetNonBlock(NULL,server.child_info_pipe[0]) != ANET_OK) {
  19. closeChildInfoPipe();
  20. } else {
  21. server.child_info_nread = 0;
  22. }
  23. }
  24. /* Close the pipes opened with openChildInfoPipe(). */
  25. void closeChildInfoPipe(void) {
  26. if (server.child_info_pipe[0] != -1 ||
  27. server.child_info_pipe[1] != -1)
  28. {
  29. close(server.child_info_pipe[0]);
  30. close(server.child_info_pipe[1]);
  31. server.child_info_pipe[0] = -1;
  32. server.child_info_pipe[1] = -1;
  33. server.child_info_nread = 0;
  34. }
  35. }
  36. /* Send save data to parent. */
  37. void sendChildInfoGeneric(childInfoType info_type, size_t keys, double progress, char *pname) {
  38. if (server.child_info_pipe[1] == -1) return;
  39. static monotime cow_updated = 0;
  40. static uint64_t cow_update_cost = 0;
  41. static size_t cow = 0;
  42. child_info_data data = {0}; /* zero everything, including padding to satisfy valgrind */
  43. /* When called to report current info, we need to throttle down CoW updates as they
  44. * can be very expensive. To do that, we measure the time it takes to get a reading
  45. * and schedule the next reading to happen not before time*CHILD_COW_COST_FACTOR
  46. * passes. */
  47. monotime now = getMonotonicUs();
  48. if (info_type != CHILD_INFO_TYPE_CURRENT_INFO ||
  49. !cow_updated ||
  50. now - cow_updated > cow_update_cost * CHILD_COW_DUTY_CYCLE)
  51. {
  52. cow = zmalloc_get_private_dirty(-1);
  53. cow_updated = getMonotonicUs();
  54. cow_update_cost = cow_updated - now;
  55. if (cow) {
  56. serverLog((info_type == CHILD_INFO_TYPE_CURRENT_INFO) ? LL_VERBOSE : LL_NOTICE,
  57. "%s: %zu MB of memory used by copy-on-write",
  58. pname, cow / (1024 * 1024));
  59. }
  60. }
  61. data.information_type = info_type;
  62. data.keys = keys;
  63. data.cow = cow;
  64. data.cow_updated = cow_updated;
  65. data.progress = progress;
  66. ssize_t wlen = sizeof(data);
  67. if (write(server.child_info_pipe[1], &data, wlen) != wlen) {
  68. /* Nothing to do on error, this will be detected by the other side. */
  69. }
  70. }
  71. /* Update Child info. */
  72. void updateChildInfo(childInfoType information_type, size_t cow, monotime cow_updated, size_t keys, double progress) {
  73. if (information_type == CHILD_INFO_TYPE_CURRENT_INFO) {
  74. server.stat_current_cow_bytes = cow;
  75. server.stat_current_cow_updated = cow_updated;
  76. server.stat_current_save_keys_processed = keys;
  77. if (progress != -1) server.stat_module_progress = progress;
  78. } else if (information_type == CHILD_INFO_TYPE_AOF_COW_SIZE) {
  79. server.stat_aof_cow_bytes = cow;
  80. } else if (information_type == CHILD_INFO_TYPE_RDB_COW_SIZE) {
  81. server.stat_rdb_cow_bytes = cow;
  82. } else if (information_type == CHILD_INFO_TYPE_MODULE_COW_SIZE) {
  83. server.stat_module_cow_bytes = cow;
  84. }
  85. }
  86. /* Read child info data from the pipe.
  87. * if complete data read into the buffer,
  88. * data is stored into *buffer, and returns 1.
  89. * otherwise, the partial data is left in the buffer, waiting for the next read, and returns 0. */
  90. int readChildInfo(childInfoType *information_type, size_t *cow, monotime *cow_updated, size_t *keys, double* progress) {
  91. /* We are using here a static buffer in combination with the server.child_info_nread to handle short reads */
  92. static child_info_data buffer;
  93. ssize_t wlen = sizeof(buffer);
  94. /* Do not overlap */
  95. if (server.child_info_nread == wlen) server.child_info_nread = 0;
  96. int nread = read(server.child_info_pipe[0], (char *)&buffer + server.child_info_nread, wlen - server.child_info_nread);
  97. if (nread > 0) {
  98. server.child_info_nread += nread;
  99. }
  100. /* We have complete child info */
  101. if (server.child_info_nread == wlen) {
  102. *information_type = buffer.information_type;
  103. *cow = buffer.cow;
  104. *cow_updated = buffer.cow_updated;
  105. *keys = buffer.keys;
  106. *progress = buffer.progress;
  107. return 1;
  108. } else {
  109. return 0;
  110. }
  111. }
  112. /* Receive info data from child. */
  113. void receiveChildInfo(void) {
  114. if (server.child_info_pipe[0] == -1) return;
  115. size_t cow;
  116. monotime cow_updated;
  117. size_t keys;
  118. double progress;
  119. childInfoType information_type;
  120. /* Drain the pipe and update child info so that we get the final message. */
  121. while (readChildInfo(&information_type, &cow, &cow_updated, &keys, &progress)) {
  122. updateChildInfo(information_type, cow, cow_updated, keys, progress);
  123. }
  124. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载