加载配置文件中的信息,非重点:
- /* Load the cluster config from 'filename'.
- *
- * If the file does not exist or is zero-length (this may happen because
- * when we lock the nodes.conf file, we create a zero-length one for the
- * sake of locking if it does not already exist), C_ERR is returned.
- * If the configuration was loaded from the file, C_OK is returned. */
- int clusterLoadConfig(char *filename) {
- FILE *fp = fopen(filename,"r");
- struct stat sb;
- char *line;
- int maxline, j;
-
- if (fp == NULL) {
- if (errno == ENOENT) {
- return C_ERR;
- } else {
- serverLog(LL_WARNING,
- "Loading the cluster node config from %s: %s",
- filename, strerror(errno));
- exit(1);
- }
- }
-
- /* Check if the file is zero-length: if so return C_ERR to signal
- * we have to write the config. */
- if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
- fclose(fp);
- return C_ERR;
- }
-
- /* Parse the file. Note that single lines of the cluster config file can
- * be really long as they include all the hash slots of the node.
- * This means in the worst possible case, half of the Redis slots will be
- * present in a single line, possibly in importing or migrating state, so
- * together with the node ID of the sender/receiver.
- *
- * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
- maxline = 1024+CLUSTER_SLOTS*128;
- line = zmalloc(maxline);
- while(fgets(line,maxline,fp) != NULL) {
- int argc;
- sds *argv;
- clusterNode *n, *master;
- char *p, *s;
-
- /* Skip blank lines, they can be created either by users manually
- * editing nodes.conf or by the config writing process if stopped
- * before the truncate() call. */
- if (line[0] == '\n' || line[0] == '\0') continue;
-
- /* Split the line into arguments for processing. */
- argv = sdssplitargs(line,&argc);
- if (argv == NULL) goto fmterr;
-
- /* Handle the special "vars" line. Don't pretend it is the last
- * line even if it actually is when generated by Redis. */
- if (strcasecmp(argv[0],"vars") == 0) {
- if (!(argc % 2)) goto fmterr;
- for (j = 1; j < argc; j += 2) {
- if (strcasecmp(argv[j],"currentEpoch") == 0) {
- server.cluster->currentEpoch =
- strtoull(argv[j+1],NULL,10);
- } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
- server.cluster->lastVoteEpoch =
- strtoull(argv[j+1],NULL,10);
- } else {
- serverLog(LL_WARNING,
- "Skipping unknown cluster config variable '%s'",
- argv[j]);
- }
- }
- sdsfreesplitres(argv,argc);
- continue;
- }
-
- /* Regular config lines have at least eight fields */
- if (argc < 8) {
- sdsfreesplitres(argv,argc);
- goto fmterr;
- }
-
- /* Create this node if it does not exist */
- n = clusterLookupNode(argv[0]);
- if (!n) {
- n = createClusterNode(argv[0],0);
- clusterAddNode(n);
- }
- /* Address and port */
- if ((p = strrchr(argv[1],':')) == NULL) {
- sdsfreesplitres(argv,argc);
- goto fmterr;
- }
- *p = '\0';
- memcpy(n->ip,argv[1],strlen(argv[1])+1);
- char *port = p+1;
- char *busp = strchr(port,'@');
- if (busp) {
- *busp = '\0';
- busp++;
- }
- n->port = atoi(port);
- /* In older versions of nodes.conf the "@busport" part is missing.
- * In this case we set it to the default offset of 10000 from the
- * base port. */
- n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
-
- /* The plaintext port for client in a TLS cluster (n->pport) is not
- * stored in nodes.conf. It is received later over the bus protocol. */
-
- /* Parse flags */
- p = s = argv[2];
- while(p) {
- p = strchr(s,',');
- if (p) *p = '\0';
- if (!strcasecmp(s,"myself")) {
- serverAssert(server.cluster->myself == NULL);
- myself = server.cluster->myself = n;
- n->flags |= CLUSTER_NODE_MYSELF;
- } else if (!strcasecmp(s,"master")) {
- n->flags |= CLUSTER_NODE_MASTER;
- } else if (!strcasecmp(s,"slave")) {
- n->flags |= CLUSTER_NODE_SLAVE;
- } else if (!strcasecmp(s,"fail?")) {
- n->flags |= CLUSTER_NODE_PFAIL;
- } else if (!strcasecmp(s,"fail")) {
- n->flags |= CLUSTER_NODE_FAIL;
- n->fail_time = mstime();
- } else if (!strcasecmp(s,"handshake")) {
- n->flags |= CLUSTER_NODE_HANDSHAKE;
- } else if (!strcasecmp(s,"noaddr")) {
- n->flags |= CLUSTER_NODE_NOADDR;
- } else if (!strcasecmp(s,"nofailover")) {
- n->flags |= CLUSTER_NODE_NOFAILOVER;
- } else if (!strcasecmp(s,"noflags")) {
- /* nothing to do */
- } else {
- serverPanic("Unknown flag in redis cluster config file");
- }
- if (p) s = p+1;
- }
-
- /* Get master if any. Set the master and populate master's
- * slave list. */
- if (argv[3][0] != '-') {
- master = clusterLookupNode(argv[3]);
- if (!master) {
- master = createClusterNode(argv[3],0);
- clusterAddNode(master);
- }
- n->slaveof = master;
- clusterNodeAddSlave(master,n);
- }
-
- /* Set ping sent / pong received timestamps */
- if (atoi(argv[4])) n->ping_sent = mstime();
- if (atoi(argv[5])) n->pong_received = mstime();
-
- /* Set configEpoch for this node. */
- n->configEpoch = strtoull(argv[6],NULL,10);
-
- /* Populate hash slots served by this instance. */
- for (j = 8; j < argc; j++) {
- int start, stop;
-
- if (argv[j][0] == '[') {
- /* Here we handle migrating / importing slots */
- int slot;
- char direction;
- clusterNode *cn;
-
- p = strchr(argv[j],'-');
- serverAssert(p != NULL);
- *p = '\0';
- direction = p[1]; /* Either '>' or '<' */
- slot = atoi(argv[j]+1);
- if (slot < 0 || slot >= CLUSTER_SLOTS) {
- sdsfreesplitres(argv,argc);
- goto fmterr;
- }
- p += 3;
- cn = clusterLookupNode(p);
- if (!cn) {
- cn = createClusterNode(p,0);
- clusterAddNode(cn);
- }
- if (direction == '>') {
- server.cluster->migrating_slots_to[slot] = cn;
- } else {
- server.cluster->importing_slots_from[slot] = cn;
- }
- continue;
- } else if ((p = strchr(argv[j],'-')) != NULL) {
- *p = '\0';
- start = atoi(argv[j]);
- stop = atoi(p+1);
- } else {
- start = stop = atoi(argv[j]);
- }
- if (start < 0 || start >= CLUSTER_SLOTS ||
- stop < 0 || stop >= CLUSTER_SLOTS)
- {
- sdsfreesplitres(argv,argc);
- goto fmterr;
- }
- while(start <= stop) clusterAddSlot(n, start++);
- }
-
- sdsfreesplitres(argv,argc);
- }
- /* Config sanity check */
- if (server.cluster->myself == NULL) goto fmterr;
-
- zfree(line);
- fclose(fp);
-
- serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
-
- /* Something that should never happen: currentEpoch smaller than
- * the max epoch found in the nodes configuration. However we handle this
- * as some form of protection against manual editing of critical files. */
- if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
- server.cluster->currentEpoch = clusterGetMaxEpoch();
- }
- return C_OK;
-
- fmterr:
- serverLog(LL_WARNING,
- "Unrecoverable error: corrupted cluster config file.");
- zfree(line);
- if (fp) fclose(fp);
- exit(1);
- }
-
- /* Cluster node configuration is exactly the same as CLUSTER NODES output.
- *
- * This function writes the node config and returns 0, on error -1
- * is returned.
- *
- * Note: we need to write the file in an atomic way from the point of view
- * of the POSIX filesystem semantics, so that if the server is stopped
- * or crashes during the write, we'll end with either the old file or the
- * new one. Since we have the full payload to write available we can use
- * a single write to write the whole file. If the pre-existing file was
- * bigger we pad our payload with newlines that are anyway ignored and truncate
- * the file afterward. */
- int clusterSaveConfig(int do_fsync) {
- sds ci;
- size_t content_size;
- struct stat sb;
- int fd;
-
- server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
-
- /* Get the nodes description and concatenate our "vars" directive to
- * save currentEpoch and lastVoteEpoch. */
- ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE, 0);
- ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
- (unsigned long long) server.cluster->currentEpoch,
- (unsigned long long) server.cluster->lastVoteEpoch);
- content_size = sdslen(ci);
-
- if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
- == -1) goto err;
-
- /* Pad the new payload if the existing file length is greater. */
- if (fstat(fd,&sb) != -1) {
- if (sb.st_size > (off_t)content_size) {
- ci = sdsgrowzero(ci,sb.st_size);
- memset(ci+content_size,'\n',sb.st_size-content_size);
- }
- }
- if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
- if (do_fsync) {
- server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
- if (fsync(fd) == -1) goto err;
- }
-
- /* Truncate the file if needed to remove the final \n padding that
- * is just garbage. */
- if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
- /* ftruncate() failing is not a critical error. */
- }
- close(fd);
- sdsfree(ci);
- return 0;
-
- err:
- if (fd != -1) close(fd);
- sdsfree(ci);
- return -1;
- }
-
- void clusterSaveConfigOrDie(int do_fsync) {
- if (clusterSaveConfig(do_fsync) == -1) {
- serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
- exit(1);
- }
- }
-
- /* Lock the cluster config using flock(), and leaks the file descriptor used to
- * acquire the lock so that the file will be locked forever.
- *
- * This works because we always update nodes.conf with a new version
- * in-place, reopening the file, and writing to it in place (later adjusting
- * the length with ftruncate()).
- *
- * On success C_OK is returned, otherwise an error is logged and
- * the function returns C_ERR to signal a lock was not acquired. */
- int clusterLockConfig(char *filename) {
- /* flock() does not exist on Solaris
- * and a fcntl-based solution won't help, as we constantly re-open that file,
- * which will release _all_ locks anyway
- */
- #if !defined(__sun)
- /* To lock it, we need to open the file in a way it is created if
- * it does not exist, otherwise there is a race condition with other
- * processes. */
- int fd = open(filename,O_WRONLY|O_CREAT|O_CLOEXEC,0644);
- if (fd == -1) {
- serverLog(LL_WARNING,
- "Can't open %s in order to acquire a lock: %s",
- filename, strerror(errno));
- return C_ERR;
- }
-
- if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
- if (errno == EWOULDBLOCK) {
- serverLog(LL_WARNING,
- "Sorry, the cluster configuration file %s is already used "
- "by a different Redis Cluster node. Please make sure that "
- "different nodes use different cluster configuration "
- "files.", filename);
- } else {
- serverLog(LL_WARNING,
- "Impossible to lock %s: %s", filename, strerror(errno));
- }
- close(fd);
- return C_ERR;
- }
- /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
- * lock to the file as long as the process exists.
- *
- * After fork, the child process will get the fd opened by the parent process,
- * we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),
- * it will be closed in the child process.
- * If it is not closed, when the main process is killed -9, but the child process
- * (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the
- * child process, and the main process will fail to get lock, means fail to start. */
- server.cluster_config_file_lock_fd = fd;
- #else
- UNUSED(filename);
- #endif /* __sun */
-
- return C_OK;
- }
-
- /* Derives our ports to be announced in the cluster bus. */
- void deriveAnnouncedPorts(int *announced_port, int *announced_pport,
- int *announced_cport) {
- int port = server.tls_cluster ? server.tls_port : server.port;
- /* Default announced ports. */
- *announced_port = port;
- *announced_pport = server.tls_cluster ? server.port : 0;
- *announced_cport = port + CLUSTER_PORT_INCR;
- /* Config overriding announced ports. */
- if (server.tls_cluster && server.cluster_announce_tls_port) {
- *announced_port = server.cluster_announce_tls_port;
- *announced_pport = server.cluster_announce_port;
- } else if (server.cluster_announce_port) {
- *announced_port = server.cluster_announce_port;
- }
- if (server.cluster_announce_bus_port) {
- *announced_cport = server.cluster_announce_bus_port;
- }
- }
-
- /* Some flags (currently just the NOFAILOVER flag) may need to be updated
- * in the "myself" node based on the current configuration of the node,
- * that may change at runtime via CONFIG SET. This function changes the
- * set of flags in myself->flags accordingly. */
- void clusterUpdateMyselfFlags(void) {
- int oldflags = myself->flags;
- int nofailover = server.cluster_slave_no_failover ?
- CLUSTER_NODE_NOFAILOVER : 0;
- myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
- myself->flags |= nofailover;
- if (myself->flags != oldflags) {
- clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
- CLUSTER_TODO_UPDATE_STATE);
- }
- }
-
- void clusterInit(void) {
- int saveconf = 0;
-
- server.cluster = zmalloc(sizeof(clusterState));
- server.cluster->myself = NULL;
- server.cluster->currentEpoch = 0;
- server.cluster->state = CLUSTER_FAIL;
- server.cluster->size = 1;
- server.cluster->todo_before_sleep = 0;
- server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
- server.cluster->nodes_black_list =
- dictCreate(&clusterNodesBlackListDictType,NULL);
- server.cluster->failover_auth_time = 0;
- server.cluster->failover_auth_count = 0;
- server.cluster->failover_auth_rank = 0;
- server.cluster->failover_auth_epoch = 0;
- server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
- server.cluster->lastVoteEpoch = 0;
- for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
- server.cluster->stats_bus_messages_sent[i] = 0;
- server.cluster->stats_bus_messages_received[i] = 0;
- }
- server.cluster->stats_pfail_nodes = 0;
- memset(server.cluster->slots,0, sizeof(server.cluster->slots));
- clusterCloseAllSlots();
-
- /* Lock the cluster config file to make sure every node uses
- * its own nodes.conf. */
- server.cluster_config_file_lock_fd = -1;
- if (clusterLockConfig(server.cluster_configfile) == C_ERR)
- exit(1);
-
- /* Load or create a new nodes configuration. */
- if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
- /* No configuration found. We will just use the random name provided
- * by the createClusterNode() function. */
- myself = server.cluster->myself =
- createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
- serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
- myself->name);
- clusterAddNode(myself);
- saveconf = 1;
- }
- if (saveconf) clusterSaveConfigOrDie(1);
-
- /* We need a listening TCP port for our cluster messaging needs. */
- server.cfd.count = 0;
-
- /* Port sanity check II
- * The other handshake port check is triggered too late to stop
- * us from trying to use a too-high cluster port number. */
- int port = server.tls_cluster ? server.tls_port : server.port;
- if (port > (65535-CLUSTER_PORT_INCR)) {
- serverLog(LL_WARNING, "Redis port number too high. "
- "Cluster communication port is 10,000 port "
- "numbers higher than your Redis port. "
- "Your Redis port number must be 55535 or less.");
- exit(1);
- }
- if (listenToPort(port+CLUSTER_PORT_INCR, &server.cfd) == C_ERR) {
- exit(1);
- }
- if (createSocketAcceptHandler(&server.cfd, clusterAcceptHandler) != C_OK) {
- serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
- }
-
- /* The slots -> keys map is a radix tree. Initialize it here. */
- server.cluster->slots_to_keys = raxNew();
- memset(server.cluster->slots_keys_count,0,
- sizeof(server.cluster->slots_keys_count));
-
- /* Set myself->port/cport/pport to my listening ports, we'll just need to
- * discover the IP address via MEET messages. */
- deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);
-
- server.cluster->mf_end = 0;
- resetManualFailover();
- clusterUpdateMyselfFlags();
- }