关键词搜索

源码搜索 ×
×

漫话Redis源码之七十九

发布2022-02-13浏览440次

详情内容

Redis的AOF操作,真的和很常见啊,而且在各类笔试面试中,经常遇到:

  1. /* Return the current size of the AOF rewrite buffer. */
  2. unsigned long aofRewriteBufferSize(void) {
  3. listNode *ln;
  4. listIter li;
  5. unsigned long size = 0;
  6. listRewind(server.aof_rewrite_buf_blocks,&li);
  7. while((ln = listNext(&li))) {
  8. aofrwblock *block = listNodeValue(ln);
  9. size += block->used;
  10. }
  11. return size;
  12. }
  13. /* Event handler used to send data to the child process doing the AOF
  14. * rewrite. We send pieces of our AOF differences buffer so that the final
  15. * write when the child finishes the rewrite will be small. */
  16. void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
  17. listNode *ln;
  18. aofrwblock *block;
  19. ssize_t nwritten;
  20. UNUSED(el);
  21. UNUSED(fd);
  22. UNUSED(privdata);
  23. UNUSED(mask);
  24. while(1) {
  25. ln = listFirst(server.aof_rewrite_buf_blocks);
  26. block = ln ? ln->value : NULL;
  27. if (server.aof_stop_sending_diff || !block) {
  28. aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
  29. AE_WRITABLE);
  30. return;
  31. }
  32. if (block->used > 0) {
  33. nwritten = write(server.aof_pipe_write_data_to_child,
  34. block->buf,block->used);
  35. if (nwritten <= 0) return;
  36. memmove(block->buf,block->buf+nwritten,block->used-nwritten);
  37. block->used -= nwritten;
  38. block->free += nwritten;
  39. }
  40. if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
  41. }
  42. }
  43. /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
  44. void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
  45. listNode *ln = listLast(server.aof_rewrite_buf_blocks);
  46. aofrwblock *block = ln ? ln->value : NULL;
  47. while(len) {
  48. /* If we already got at least an allocated block, try appending
  49. * at least some piece into it. */
  50. if (block) {
  51. unsigned long thislen = (block->free < len) ? block->free : len;
  52. if (thislen) { /* The current block is not already full. */
  53. memcpy(block->buf+block->used, s, thislen);
  54. block->used += thislen;
  55. block->free -= thislen;
  56. s += thislen;
  57. len -= thislen;
  58. }
  59. }
  60. if (len) { /* First block to allocate, or need another block. */
  61. int numblocks;
  62. block = zmalloc(sizeof(*block));
  63. block->free = AOF_RW_BUF_BLOCK_SIZE;
  64. block->used = 0;
  65. listAddNodeTail(server.aof_rewrite_buf_blocks,block);
  66. /* Log every time we cross more 10 or 100 blocks, respectively
  67. * as a notice or warning. */
  68. numblocks = listLength(server.aof_rewrite_buf_blocks);
  69. if (((numblocks+1) % 10) == 0) {
  70. int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
  71. LL_NOTICE;
  72. serverLog(level,"Background AOF buffer size: %lu MB",
  73. aofRewriteBufferSize()/(1024*1024));
  74. }
  75. }
  76. }
  77. /* Install a file event to send data to the rewrite child if there is
  78. * not one already. */
  79. if (!server.aof_stop_sending_diff &&
  80. aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0)
  81. {
  82. aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
  83. AE_WRITABLE, aofChildWriteDiffData, NULL);
  84. }
  85. }
  86. /* Write the buffer (possibly composed of multiple blocks) into the specified
  87. * fd. If a short write or any other error happens -1 is returned,
  88. * otherwise the number of bytes written is returned. */
  89. ssize_t aofRewriteBufferWrite(int fd) {
  90. listNode *ln;
  91. listIter li;
  92. ssize_t count = 0;
  93. listRewind(server.aof_rewrite_buf_blocks,&li);
  94. while((ln = listNext(&li))) {
  95. aofrwblock *block = listNodeValue(ln);
  96. ssize_t nwritten;
  97. if (block->used) {
  98. nwritten = write(fd,block->buf,block->used);
  99. if (nwritten != (ssize_t)block->used) {
  100. if (nwritten == 0) errno = EIO;
  101. return -1;
  102. }
  103. count += nwritten;
  104. }
  105. }
  106. return count;
  107. }
  108. /* ----------------------------------------------------------------------------
  109. * AOF file implementation
  110. * ------------------------------------------------------------------------- */
  111. /* Return true if an AOf fsync is currently already in progress in a
  112. * BIO thread. */
  113. int aofFsyncInProgress(void) {
  114. return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
  115. }
  116. /* Starts a background task that performs fsync() against the specified
  117. * file descriptor (the one of the AOF file) in another thread. */
  118. void aof_background_fsync(int fd) {
  119. bioCreateFsyncJob(fd);
  120. }
  121. /* Kills an AOFRW child process if exists */
  122. void killAppendOnlyChild(void) {
  123. int statloc;
  124. /* No AOFRW child? return. */
  125. if (server.child_type != CHILD_TYPE_AOF) return;
  126. /* Kill AOFRW child, wait for child exit. */
  127. serverLog(LL_NOTICE,"Killing running AOF rewrite child: %ld",
  128. (long) server.child_pid);
  129. if (kill(server.child_pid,SIGUSR1) != -1) {
  130. while(waitpid(-1, &statloc, 0) != server.child_pid);
  131. }
  132. /* Reset the buffer accumulating changes while the child saves. */
  133. aofRewriteBufferReset();
  134. aofRemoveTempFile(server.child_pid);
  135. resetChildState();
  136. server.aof_rewrite_time_start = -1;
  137. /* Close pipes used for IPC between the two processes. */
  138. aofClosePipes();
  139. }
  140. /* Called when the user switches from "appendonly yes" to "appendonly no"
  141. * at runtime using the CONFIG command. */
  142. void stopAppendOnly(void) {
  143. serverAssert(server.aof_state != AOF_OFF);
  144. flushAppendOnlyFile(1);
  145. if (redis_fsync(server.aof_fd) == -1) {
  146. serverLog(LL_WARNING,"Fail to fsync the AOF file: %s",strerror(errno));
  147. } else {
  148. server.aof_fsync_offset = server.aof_current_size;
  149. server.aof_last_fsync = server.unixtime;
  150. }
  151. close(server.aof_fd);
  152. server.aof_fd = -1;
  153. server.aof_selected_db = -1;
  154. server.aof_state = AOF_OFF;
  155. server.aof_rewrite_scheduled = 0;
  156. killAppendOnlyChild();
  157. sdsfree(server.aof_buf);
  158. server.aof_buf = sdsempty();
  159. }
  160. /* Called when the user switches from "appendonly no" to "appendonly yes"
  161. * at runtime using the CONFIG command. */
  162. int startAppendOnly(void) {
  163. char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
  164. int newfd;
  165. newfd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
  166. serverAssert(server.aof_state == AOF_OFF);
  167. if (newfd == -1) {
  168. char *cwdp = getcwd(cwd,MAXPATHLEN);
  169. serverLog(LL_WARNING,
  170. "Redis needs to enable the AOF but can't open the "
  171. "append only file %s (in server root dir %s): %s",
  172. server.aof_filename,
  173. cwdp ? cwdp : "unknown",
  174. strerror(errno));
  175. return C_ERR;
  176. }
  177. if (hasActiveChildProcess() && server.child_type != CHILD_TYPE_AOF) {
  178. server.aof_rewrite_scheduled = 1;
  179. serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible.");
  180. } else {
  181. /* If there is a pending AOF rewrite, we need to switch it off and
  182. * start a new one: the old one cannot be reused because it is not
  183. * accumulating the AOF buffer. */
  184. if (server.child_type == CHILD_TYPE_AOF) {
  185. serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now.");
  186. killAppendOnlyChild();
  187. }
  188. if (rewriteAppendOnlyFileBackground() == C_ERR) {
  189. close(newfd);
  190. serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
  191. return C_ERR;
  192. }
  193. }
  194. /* We correctly switched on AOF, now wait for the rewrite to be complete
  195. * in order to append data on disk. */
  196. server.aof_state = AOF_WAIT_REWRITE;
  197. server.aof_last_fsync = server.unixtime;
  198. server.aof_fd = newfd;
  199. /* If AOF fsync error in bio job, we just ignore it and log the event. */
  200. int aof_bio_fsync_status;
  201. atomicGet(server.aof_bio_fsync_status, aof_bio_fsync_status);
  202. if (aof_bio_fsync_status == C_ERR) {
  203. serverLog(LL_WARNING,
  204. "AOF reopen, just ignore the AOF fsync error in bio job");
  205. atomicSet(server.aof_bio_fsync_status,C_OK);
  206. }
  207. /* If AOF was in error state, we just ignore it and log the event. */
  208. if (server.aof_last_write_status == C_ERR) {
  209. serverLog(LL_WARNING,"AOF reopen, just ignore the last error.");
  210. server.aof_last_write_status = C_OK;
  211. }
  212. return C_OK;
  213. }
  214. /* This is a wrapper to the write syscall in order to retry on short writes
  215. * or if the syscall gets interrupted. It could look strange that we retry
  216. * on short writes given that we are writing to a block device: normally if
  217. * the first call is short, there is a end-of-space condition, so the next
  218. * is likely to fail. However apparently in modern systems this is no longer
  219. * true, and in general it looks just more resilient to retry the write. If
  220. * there is an actual error condition we'll get it at the next try. */
  221. ssize_t aofWrite(int fd, const char *buf, size_t len) {
  222. ssize_t nwritten = 0, totwritten = 0;
  223. while(len) {
  224. nwritten = write(fd, buf, len);
  225. if (nwritten < 0) {
  226. if (errno == EINTR) continue;
  227. return totwritten ? totwritten : -1;
  228. }
  229. len -= nwritten;
  230. buf += nwritten;
  231. totwritten += nwritten;
  232. }
  233. return totwritten;
  234. }
  235. /* Write the append only file buffer on disk.
  236. *
  237. * Since we are required to write the AOF before replying to the client,
  238. * and the only way the client socket can get a write is entering when the
  239. * the event loop, we accumulate all the AOF writes in a memory
  240. * buffer and write it on disk using this function just before entering
  241. * the event loop again.
  242. *
  243. * About the 'force' argument:
  244. *
  245. * When the fsync policy is set to 'everysec' we may delay the flush if there
  246. * is still an fsync() going on in the background thread, since for instance
  247. * on Linux write(2) will be blocked by the background fsync anyway.
  248. * When this happens we remember that there is some aof buffer to be
  249. * flushed ASAP, and will try to do that in the serverCron() function.
  250. *
  251. * However if force is set to 1 we'll write regardless of the background
  252. * fsync. */
  253. #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
  254. void flushAppendOnlyFile(int force) {
  255. ssize_t nwritten;
  256. int sync_in_progress = 0;
  257. mstime_t latency;
  258. if (sdslen(server.aof_buf) == 0) {
  259. /* Check if we need to do fsync even the aof buffer is empty,
  260. * because previously in AOF_FSYNC_EVERYSEC mode, fsync is
  261. * called only when aof buffer is not empty, so if users
  262. * stop write commands before fsync called in one second,
  263. * the data in page cache cannot be flushed in time. */
  264. if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
  265. server.aof_fsync_offset != server.aof_current_size &&
  266. server.unixtime > server.aof_last_fsync &&
  267. !(sync_in_progress = aofFsyncInProgress())) {
  268. goto try_fsync;
  269. } else {
  270. return;
  271. }
  272. }
  273. if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
  274. sync_in_progress = aofFsyncInProgress();
  275. if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
  276. /* With this append fsync policy we do background fsyncing.
  277. * If the fsync is still in progress we can try to delay
  278. * the write for a couple of seconds. */
  279. if (sync_in_progress) {
  280. if (server.aof_flush_postponed_start == 0) {
  281. /* No previous write postponing, remember that we are
  282. * postponing the flush and return. */
  283. server.aof_flush_postponed_start = server.unixtime;
  284. return;
  285. } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
  286. /* We were already waiting for fsync to finish, but for less
  287. * than two seconds this is still ok. Postpone again. */
  288. return;
  289. }
  290. /* Otherwise fall trough, and go write since we can't wait
  291. * over two seconds. */
  292. server.aof_delayed_fsync++;
  293. serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
  294. }
  295. }
  296. /* We want to perform a single write. This should be guaranteed atomic
  297. * at least if the filesystem we are writing is a real physical one.
  298. * While this will save us against the server being killed I don't think
  299. * there is much to do about the whole server stopping for power problems
  300. * or alike */
  301. if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
  302. usleep(server.aof_flush_sleep);
  303. }
  304. latencyStartMonitor(latency);
  305. nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
  306. latencyEndMonitor(latency);
  307. /* We want to capture different events for delayed writes:
  308. * when the delay happens with a pending fsync, or with a saving child
  309. * active, and when the above two conditions are missing.
  310. * We also use an additional event name to save all samples which is
  311. * useful for graphing / monitoring purposes. */
  312. if (sync_in_progress) {
  313. latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
  314. } else if (hasActiveChildProcess()) {
  315. latencyAddSampleIfNeeded("aof-write-active-child",latency);
  316. } else {
  317. latencyAddSampleIfNeeded("aof-write-alone",latency);
  318. }
  319. latencyAddSampleIfNeeded("aof-write",latency);
  320. /* We performed the write so reset the postponed flush sentinel to zero. */
  321. server.aof_flush_postponed_start = 0;
  322. if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
  323. static time_t last_write_error_log = 0;
  324. int can_log = 0;
  325. /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
  326. if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
  327. can_log = 1;
  328. last_write_error_log = server.unixtime;
  329. }
  330. /* Log the AOF write error and record the error code. */
  331. if (nwritten == -1) {
  332. if (can_log) {
  333. serverLog(LL_WARNING,"Error writing to the AOF file: %s",
  334. strerror(errno));
  335. server.aof_last_write_errno = errno;
  336. }
  337. } else {
  338. if (can_log) {
  339. serverLog(LL_WARNING,"Short write while writing to "
  340. "the AOF file: (nwritten=%lld, "
  341. "expected=%lld)",
  342. (long long)nwritten,
  343. (long long)sdslen(server.aof_buf));
  344. }
  345. if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
  346. if (can_log) {
  347. serverLog(LL_WARNING, "Could not remove short write "
  348. "from the append-only file. Redis may refuse "
  349. "to load the AOF the next time it starts. "
  350. "ftruncate: %s", strerror(errno));
  351. }
  352. } else {
  353. /* If the ftruncate() succeeded we can set nwritten to
  354. * -1 since there is no longer partial data into the AOF. */
  355. nwritten = -1;
  356. }
  357. server.aof_last_write_errno = ENOSPC;
  358. }
  359. /* Handle the AOF write error. */
  360. if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
  361. /* We can't recover when the fsync policy is ALWAYS since the reply
  362. * for the client is already in the output buffers (both writes and
  363. * reads), and the changes to the db can't be rolled back. Since we
  364. * have a contract with the user that on acknowledged or observed
  365. * writes are is synced on disk, we must exit. */
  366. serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
  367. exit(1);
  368. } else {
  369. /* Recover from failed write leaving data into the buffer. However
  370. * set an error to stop accepting writes as long as the error
  371. * condition is not cleared. */
  372. server.aof_last_write_status = C_ERR;
  373. /* Trim the sds buffer if there was a partial write, and there
  374. * was no way to undo it with ftruncate(2). */
  375. if (nwritten > 0) {
  376. server.aof_current_size += nwritten;
  377. sdsrange(server.aof_buf,nwritten,-1);
  378. }
  379. return; /* We'll try again on the next call... */
  380. }
  381. } else {
  382. /* Successful write(2). If AOF was in error state, restore the
  383. * OK state and log the event. */
  384. if (server.aof_last_write_status == C_ERR) {
  385. serverLog(LL_WARNING,
  386. "AOF write error looks solved, Redis can write again.");
  387. server.aof_last_write_status = C_OK;
  388. }
  389. }
  390. server.aof_current_size += nwritten;
  391. /* Re-use AOF buffer when it is small enough. The maximum comes from the
  392. * arena size of 4k minus some overhead (but is otherwise arbitrary). */
  393. if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
  394. sdsclear(server.aof_buf);
  395. } else {
  396. sdsfree(server.aof_buf);
  397. server.aof_buf = sdsempty();
  398. }
  399. try_fsync:
  400. /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
  401. * children doing I/O in the background. */
  402. if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
  403. return;
  404. /* Perform the fsync if needed. */
  405. if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
  406. /* redis_fsync is defined as fdatasync() for Linux in order to avoid
  407. * flushing metadata. */
  408. latencyStartMonitor(latency);
  409. /* Let's try to get this data on the disk. To guarantee data safe when
  410. * the AOF fsync policy is 'always', we should exit if failed to fsync
  411. * AOF (see comment next to the exit(1) after write error above). */
  412. if (redis_fsync(server.aof_fd) == -1) {
  413. serverLog(LL_WARNING,"Can't persist AOF for fsync error when the "
  414. "AOF fsync policy is 'always': %s. Exiting...", strerror(errno));
  415. exit(1);
  416. }
  417. latencyEndMonitor(latency);
  418. latencyAddSampleIfNeeded("aof-fsync-always",latency);
  419. server.aof_fsync_offset = server.aof_current_size;
  420. server.aof_last_fsync = server.unixtime;
  421. } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
  422. server.unixtime > server.aof_last_fsync)) {
  423. if (!sync_in_progress) {
  424. aof_background_fsync(server.aof_fd);
  425. server.aof_fsync_offset = server.aof_current_size;
  426. }
  427. server.aof_last_fsync = server.unixtime;
  428. }
  429. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载