Redis的AOF操作,真的和很常见啊,而且在各类笔试面试中,经常遇到:
- /* Return the current size of the AOF rewrite buffer. */
- unsigned long aofRewriteBufferSize(void) {
- listNode *ln;
- listIter li;
- unsigned long size = 0;
-
- listRewind(server.aof_rewrite_buf_blocks,&li);
- while((ln = listNext(&li))) {
- aofrwblock *block = listNodeValue(ln);
- size += block->used;
- }
- return size;
- }
-
- /* Event handler used to send data to the child process doing the AOF
- * rewrite. We send pieces of our AOF differences buffer so that the final
- * write when the child finishes the rewrite will be small. */
- void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
- listNode *ln;
- aofrwblock *block;
- ssize_t nwritten;
- UNUSED(el);
- UNUSED(fd);
- UNUSED(privdata);
- UNUSED(mask);
-
- while(1) {
- ln = listFirst(server.aof_rewrite_buf_blocks);
- block = ln ? ln->value : NULL;
- if (server.aof_stop_sending_diff || !block) {
- aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
- AE_WRITABLE);
- return;
- }
- if (block->used > 0) {
- nwritten = write(server.aof_pipe_write_data_to_child,
- block->buf,block->used);
- if (nwritten <= 0) return;
- memmove(block->buf,block->buf+nwritten,block->used-nwritten);
- block->used -= nwritten;
- block->free += nwritten;
- }
- if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
- }
- }
-
- /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
- void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
- listNode *ln = listLast(server.aof_rewrite_buf_blocks);
- aofrwblock *block = ln ? ln->value : NULL;
-
- while(len) {
- /* If we already got at least an allocated block, try appending
- * at least some piece into it. */
- if (block) {
- unsigned long thislen = (block->free < len) ? block->free : len;
- if (thislen) { /* The current block is not already full. */
- memcpy(block->buf+block->used, s, thislen);
- block->used += thislen;
- block->free -= thislen;
- s += thislen;
- len -= thislen;
- }
- }
-
- if (len) { /* First block to allocate, or need another block. */
- int numblocks;
-
- block = zmalloc(sizeof(*block));
- block->free = AOF_RW_BUF_BLOCK_SIZE;
- block->used = 0;
- listAddNodeTail(server.aof_rewrite_buf_blocks,block);
-
- /* Log every time we cross more 10 or 100 blocks, respectively
- * as a notice or warning. */
- numblocks = listLength(server.aof_rewrite_buf_blocks);
- if (((numblocks+1) % 10) == 0) {
- int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
- LL_NOTICE;
- serverLog(level,"Background AOF buffer size: %lu MB",
- aofRewriteBufferSize()/(1024*1024));
- }
- }
- }
-
- /* Install a file event to send data to the rewrite child if there is
- * not one already. */
- if (!server.aof_stop_sending_diff &&
- aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0)
- {
- aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
- AE_WRITABLE, aofChildWriteDiffData, NULL);
- }
- }
-
- /* Write the buffer (possibly composed of multiple blocks) into the specified
- * fd. If a short write or any other error happens -1 is returned,
- * otherwise the number of bytes written is returned. */
- ssize_t aofRewriteBufferWrite(int fd) {
- listNode *ln;
- listIter li;
- ssize_t count = 0;
-
- listRewind(server.aof_rewrite_buf_blocks,&li);
- while((ln = listNext(&li))) {
- aofrwblock *block = listNodeValue(ln);
- ssize_t nwritten;
-
- if (block->used) {
- nwritten = write(fd,block->buf,block->used);
- if (nwritten != (ssize_t)block->used) {
- if (nwritten == 0) errno = EIO;
- return -1;
- }
- count += nwritten;
- }
- }
- return count;
- }
-
- /* ----------------------------------------------------------------------------
- * AOF file implementation
- * ------------------------------------------------------------------------- */
-
- /* Return true if an AOf fsync is currently already in progress in a
- * BIO thread. */
- int aofFsyncInProgress(void) {
- return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
- }
-
- /* Starts a background task that performs fsync() against the specified
- * file descriptor (the one of the AOF file) in another thread. */
- void aof_background_fsync(int fd) {
- bioCreateFsyncJob(fd);
- }
-
- /* Kills an AOFRW child process if exists */
- void killAppendOnlyChild(void) {
- int statloc;
- /* No AOFRW child? return. */
- if (server.child_type != CHILD_TYPE_AOF) return;
- /* Kill AOFRW child, wait for child exit. */
- serverLog(LL_NOTICE,"Killing running AOF rewrite child: %ld",
- (long) server.child_pid);
- if (kill(server.child_pid,SIGUSR1) != -1) {
- while(waitpid(-1, &statloc, 0) != server.child_pid);
- }
- /* Reset the buffer accumulating changes while the child saves. */
- aofRewriteBufferReset();
- aofRemoveTempFile(server.child_pid);
- resetChildState();
- server.aof_rewrite_time_start = -1;
- /* Close pipes used for IPC between the two processes. */
- aofClosePipes();
- }
-
- /* Called when the user switches from "appendonly yes" to "appendonly no"
- * at runtime using the CONFIG command. */
- void stopAppendOnly(void) {
- serverAssert(server.aof_state != AOF_OFF);
- flushAppendOnlyFile(1);
- if (redis_fsync(server.aof_fd) == -1) {
- serverLog(LL_WARNING,"Fail to fsync the AOF file: %s",strerror(errno));
- } else {
- server.aof_fsync_offset = server.aof_current_size;
- server.aof_last_fsync = server.unixtime;
- }
- close(server.aof_fd);
-
- server.aof_fd = -1;
- server.aof_selected_db = -1;
- server.aof_state = AOF_OFF;
- server.aof_rewrite_scheduled = 0;
- killAppendOnlyChild();
- sdsfree(server.aof_buf);
- server.aof_buf = sdsempty();
- }
-
- /* Called when the user switches from "appendonly no" to "appendonly yes"
- * at runtime using the CONFIG command. */
- int startAppendOnly(void) {
- char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
- int newfd;
-
- newfd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
- serverAssert(server.aof_state == AOF_OFF);
- if (newfd == -1) {
- char *cwdp = getcwd(cwd,MAXPATHLEN);
-
- serverLog(LL_WARNING,
- "Redis needs to enable the AOF but can't open the "
- "append only file %s (in server root dir %s): %s",
- server.aof_filename,
- cwdp ? cwdp : "unknown",
- strerror(errno));
- return C_ERR;
- }
- if (hasActiveChildProcess() && server.child_type != CHILD_TYPE_AOF) {
- server.aof_rewrite_scheduled = 1;
- serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible.");
- } else {
- /* If there is a pending AOF rewrite, we need to switch it off and
- * start a new one: the old one cannot be reused because it is not
- * accumulating the AOF buffer. */
- if (server.child_type == CHILD_TYPE_AOF) {
- serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now.");
- killAppendOnlyChild();
- }
- if (rewriteAppendOnlyFileBackground() == C_ERR) {
- close(newfd);
- serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
- return C_ERR;
- }
- }
- /* We correctly switched on AOF, now wait for the rewrite to be complete
- * in order to append data on disk. */
- server.aof_state = AOF_WAIT_REWRITE;
- server.aof_last_fsync = server.unixtime;
- server.aof_fd = newfd;
-
- /* If AOF fsync error in bio job, we just ignore it and log the event. */
- int aof_bio_fsync_status;
- atomicGet(server.aof_bio_fsync_status, aof_bio_fsync_status);
- if (aof_bio_fsync_status == C_ERR) {
- serverLog(LL_WARNING,
- "AOF reopen, just ignore the AOF fsync error in bio job");
- atomicSet(server.aof_bio_fsync_status,C_OK);
- }
-
- /* If AOF was in error state, we just ignore it and log the event. */
- if (server.aof_last_write_status == C_ERR) {
- serverLog(LL_WARNING,"AOF reopen, just ignore the last error.");
- server.aof_last_write_status = C_OK;
- }
- return C_OK;
- }
-
- /* This is a wrapper to the write syscall in order to retry on short writes
- * or if the syscall gets interrupted. It could look strange that we retry
- * on short writes given that we are writing to a block device: normally if
- * the first call is short, there is a end-of-space condition, so the next
- * is likely to fail. However apparently in modern systems this is no longer
- * true, and in general it looks just more resilient to retry the write. If
- * there is an actual error condition we'll get it at the next try. */
- ssize_t aofWrite(int fd, const char *buf, size_t len) {
- ssize_t nwritten = 0, totwritten = 0;
-
- while(len) {
- nwritten = write(fd, buf, len);
-
- if (nwritten < 0) {
- if (errno == EINTR) continue;
- return totwritten ? totwritten : -1;
- }
-
- len -= nwritten;
- buf += nwritten;
- totwritten += nwritten;
- }
-
- return totwritten;
- }
-
- /* Write the append only file buffer on disk.
- *
- * Since we are required to write the AOF before replying to the client,
- * and the only way the client socket can get a write is entering when the
- * the event loop, we accumulate all the AOF writes in a memory
- * buffer and write it on disk using this function just before entering
- * the event loop again.
- *
- * About the 'force' argument:
- *
- * When the fsync policy is set to 'everysec' we may delay the flush if there
- * is still an fsync() going on in the background thread, since for instance
- * on Linux write(2) will be blocked by the background fsync anyway.
- * When this happens we remember that there is some aof buffer to be
- * flushed ASAP, and will try to do that in the serverCron() function.
- *
- * However if force is set to 1 we'll write regardless of the background
- * fsync. */
- #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
- void flushAppendOnlyFile(int force) {
- ssize_t nwritten;
- int sync_in_progress = 0;
- mstime_t latency;
-
- if (sdslen(server.aof_buf) == 0) {
- /* Check if we need to do fsync even the aof buffer is empty,
- * because previously in AOF_FSYNC_EVERYSEC mode, fsync is
- * called only when aof buffer is not empty, so if users
- * stop write commands before fsync called in one second,
- * the data in page cache cannot be flushed in time. */
- if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
- server.aof_fsync_offset != server.aof_current_size &&
- server.unixtime > server.aof_last_fsync &&
- !(sync_in_progress = aofFsyncInProgress())) {
- goto try_fsync;
- } else {
- return;
- }
- }
-
- if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
- sync_in_progress = aofFsyncInProgress();
-
- if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
- /* With this append fsync policy we do background fsyncing.
- * If the fsync is still in progress we can try to delay
- * the write for a couple of seconds. */
- if (sync_in_progress) {
- if (server.aof_flush_postponed_start == 0) {
- /* No previous write postponing, remember that we are
- * postponing the flush and return. */
- server.aof_flush_postponed_start = server.unixtime;
- return;
- } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
- /* We were already waiting for fsync to finish, but for less
- * than two seconds this is still ok. Postpone again. */
- return;
- }
- /* Otherwise fall trough, and go write since we can't wait
- * over two seconds. */
- server.aof_delayed_fsync++;
- serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
- }
- }
- /* We want to perform a single write. This should be guaranteed atomic
- * at least if the filesystem we are writing is a real physical one.
- * While this will save us against the server being killed I don't think
- * there is much to do about the whole server stopping for power problems
- * or alike */
-
- if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
- usleep(server.aof_flush_sleep);
- }
-
- latencyStartMonitor(latency);
- nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
- latencyEndMonitor(latency);
- /* We want to capture different events for delayed writes:
- * when the delay happens with a pending fsync, or with a saving child
- * active, and when the above two conditions are missing.
- * We also use an additional event name to save all samples which is
- * useful for graphing / monitoring purposes. */
- if (sync_in_progress) {
- latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
- } else if (hasActiveChildProcess()) {
- latencyAddSampleIfNeeded("aof-write-active-child",latency);
- } else {
- latencyAddSampleIfNeeded("aof-write-alone",latency);
- }
- latencyAddSampleIfNeeded("aof-write",latency);
-
- /* We performed the write so reset the postponed flush sentinel to zero. */
- server.aof_flush_postponed_start = 0;
-
- if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
- static time_t last_write_error_log = 0;
- int can_log = 0;
-
- /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
- if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
- can_log = 1;
- last_write_error_log = server.unixtime;
- }
-
- /* Log the AOF write error and record the error code. */
- if (nwritten == -1) {
- if (can_log) {
- serverLog(LL_WARNING,"Error writing to the AOF file: %s",
- strerror(errno));
- server.aof_last_write_errno = errno;
- }
- } else {
- if (can_log) {
- serverLog(LL_WARNING,"Short write while writing to "
- "the AOF file: (nwritten=%lld, "
- "expected=%lld)",
- (long long)nwritten,
- (long long)sdslen(server.aof_buf));
- }
-
- if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
- if (can_log) {
- serverLog(LL_WARNING, "Could not remove short write "
- "from the append-only file. Redis may refuse "
- "to load the AOF the next time it starts. "
- "ftruncate: %s", strerror(errno));
- }
- } else {
- /* If the ftruncate() succeeded we can set nwritten to
- * -1 since there is no longer partial data into the AOF. */
- nwritten = -1;
- }
- server.aof_last_write_errno = ENOSPC;
- }
-
- /* Handle the AOF write error. */
- if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
- /* We can't recover when the fsync policy is ALWAYS since the reply
- * for the client is already in the output buffers (both writes and
- * reads), and the changes to the db can't be rolled back. Since we
- * have a contract with the user that on acknowledged or observed
- * writes are is synced on disk, we must exit. */
- serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
- exit(1);
- } else {
- /* Recover from failed write leaving data into the buffer. However
- * set an error to stop accepting writes as long as the error
- * condition is not cleared. */
- server.aof_last_write_status = C_ERR;
-
- /* Trim the sds buffer if there was a partial write, and there
- * was no way to undo it with ftruncate(2). */
- if (nwritten > 0) {
- server.aof_current_size += nwritten;
- sdsrange(server.aof_buf,nwritten,-1);
- }
- return; /* We'll try again on the next call... */
- }
- } else {
- /* Successful write(2). If AOF was in error state, restore the
- * OK state and log the event. */
- if (server.aof_last_write_status == C_ERR) {
- serverLog(LL_WARNING,
- "AOF write error looks solved, Redis can write again.");
- server.aof_last_write_status = C_OK;
- }
- }
- server.aof_current_size += nwritten;
-
- /* Re-use AOF buffer when it is small enough. The maximum comes from the
- * arena size of 4k minus some overhead (but is otherwise arbitrary). */
- if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
- sdsclear(server.aof_buf);
- } else {
- sdsfree(server.aof_buf);
- server.aof_buf = sdsempty();
- }
-
- try_fsync:
- /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
- * children doing I/O in the background. */
- if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
- return;
-
- /* Perform the fsync if needed. */
- if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
- /* redis_fsync is defined as fdatasync() for Linux in order to avoid
- * flushing metadata. */
- latencyStartMonitor(latency);
- /* Let's try to get this data on the disk. To guarantee data safe when
- * the AOF fsync policy is 'always', we should exit if failed to fsync
- * AOF (see comment next to the exit(1) after write error above). */
- if (redis_fsync(server.aof_fd) == -1) {
- serverLog(LL_WARNING,"Can't persist AOF for fsync error when the "
- "AOF fsync policy is 'always': %s. Exiting...", strerror(errno));
- exit(1);
- }
- latencyEndMonitor(latency);
- latencyAddSampleIfNeeded("aof-fsync-always",latency);
- server.aof_fsync_offset = server.aof_current_size;
- server.aof_last_fsync = server.unixtime;
- } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
- server.unixtime > server.aof_last_fsync)) {
- if (!sync_in_progress) {
- aof_background_fsync(server.aof_fd);
- server.aof_fsync_offset = server.aof_current_size;
- }
- server.aof_last_fsync = server.unixtime;
- }
- }
-