关键词搜索

源码搜索 ×
×

漫话Redis源码之七十三

发布2022-02-13浏览566次

详情内容

加载配置文件中的信息,非重点:

  1. /* Load the cluster config from 'filename'.
  2. *
  3. * If the file does not exist or is zero-length (this may happen because
  4. * when we lock the nodes.conf file, we create a zero-length one for the
  5. * sake of locking if it does not already exist), C_ERR is returned.
  6. * If the configuration was loaded from the file, C_OK is returned. */
  7. int clusterLoadConfig(char *filename) {
  8. FILE *fp = fopen(filename,"r");
  9. struct stat sb;
  10. char *line;
  11. int maxline, j;
  12. if (fp == NULL) {
  13. if (errno == ENOENT) {
  14. return C_ERR;
  15. } else {
  16. serverLog(LL_WARNING,
  17. "Loading the cluster node config from %s: %s",
  18. filename, strerror(errno));
  19. exit(1);
  20. }
  21. }
  22. /* Check if the file is zero-length: if so return C_ERR to signal
  23. * we have to write the config. */
  24. if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
  25. fclose(fp);
  26. return C_ERR;
  27. }
  28. /* Parse the file. Note that single lines of the cluster config file can
  29. * be really long as they include all the hash slots of the node.
  30. * This means in the worst possible case, half of the Redis slots will be
  31. * present in a single line, possibly in importing or migrating state, so
  32. * together with the node ID of the sender/receiver.
  33. *
  34. * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
  35. maxline = 1024+CLUSTER_SLOTS*128;
  36. line = zmalloc(maxline);
  37. while(fgets(line,maxline,fp) != NULL) {
  38. int argc;
  39. sds *argv;
  40. clusterNode *n, *master;
  41. char *p, *s;
  42. /* Skip blank lines, they can be created either by users manually
  43. * editing nodes.conf or by the config writing process if stopped
  44. * before the truncate() call. */
  45. if (line[0] == '\n' || line[0] == '\0') continue;
  46. /* Split the line into arguments for processing. */
  47. argv = sdssplitargs(line,&argc);
  48. if (argv == NULL) goto fmterr;
  49. /* Handle the special "vars" line. Don't pretend it is the last
  50. * line even if it actually is when generated by Redis. */
  51. if (strcasecmp(argv[0],"vars") == 0) {
  52. if (!(argc % 2)) goto fmterr;
  53. for (j = 1; j < argc; j += 2) {
  54. if (strcasecmp(argv[j],"currentEpoch") == 0) {
  55. server.cluster->currentEpoch =
  56. strtoull(argv[j+1],NULL,10);
  57. } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
  58. server.cluster->lastVoteEpoch =
  59. strtoull(argv[j+1],NULL,10);
  60. } else {
  61. serverLog(LL_WARNING,
  62. "Skipping unknown cluster config variable '%s'",
  63. argv[j]);
  64. }
  65. }
  66. sdsfreesplitres(argv,argc);
  67. continue;
  68. }
  69. /* Regular config lines have at least eight fields */
  70. if (argc < 8) {
  71. sdsfreesplitres(argv,argc);
  72. goto fmterr;
  73. }
  74. /* Create this node if it does not exist */
  75. n = clusterLookupNode(argv[0]);
  76. if (!n) {
  77. n = createClusterNode(argv[0],0);
  78. clusterAddNode(n);
  79. }
  80. /* Address and port */
  81. if ((p = strrchr(argv[1],':')) == NULL) {
  82. sdsfreesplitres(argv,argc);
  83. goto fmterr;
  84. }
  85. *p = '\0';
  86. memcpy(n->ip,argv[1],strlen(argv[1])+1);
  87. char *port = p+1;
  88. char *busp = strchr(port,'@');
  89. if (busp) {
  90. *busp = '\0';
  91. busp++;
  92. }
  93. n->port = atoi(port);
  94. /* In older versions of nodes.conf the "@busport" part is missing.
  95. * In this case we set it to the default offset of 10000 from the
  96. * base port. */
  97. n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
  98. /* The plaintext port for client in a TLS cluster (n->pport) is not
  99. * stored in nodes.conf. It is received later over the bus protocol. */
  100. /* Parse flags */
  101. p = s = argv[2];
  102. while(p) {
  103. p = strchr(s,',');
  104. if (p) *p = '\0';
  105. if (!strcasecmp(s,"myself")) {
  106. serverAssert(server.cluster->myself == NULL);
  107. myself = server.cluster->myself = n;
  108. n->flags |= CLUSTER_NODE_MYSELF;
  109. } else if (!strcasecmp(s,"master")) {
  110. n->flags |= CLUSTER_NODE_MASTER;
  111. } else if (!strcasecmp(s,"slave")) {
  112. n->flags |= CLUSTER_NODE_SLAVE;
  113. } else if (!strcasecmp(s,"fail?")) {
  114. n->flags |= CLUSTER_NODE_PFAIL;
  115. } else if (!strcasecmp(s,"fail")) {
  116. n->flags |= CLUSTER_NODE_FAIL;
  117. n->fail_time = mstime();
  118. } else if (!strcasecmp(s,"handshake")) {
  119. n->flags |= CLUSTER_NODE_HANDSHAKE;
  120. } else if (!strcasecmp(s,"noaddr")) {
  121. n->flags |= CLUSTER_NODE_NOADDR;
  122. } else if (!strcasecmp(s,"nofailover")) {
  123. n->flags |= CLUSTER_NODE_NOFAILOVER;
  124. } else if (!strcasecmp(s,"noflags")) {
  125. /* nothing to do */
  126. } else {
  127. serverPanic("Unknown flag in redis cluster config file");
  128. }
  129. if (p) s = p+1;
  130. }
  131. /* Get master if any. Set the master and populate master's
  132. * slave list. */
  133. if (argv[3][0] != '-') {
  134. master = clusterLookupNode(argv[3]);
  135. if (!master) {
  136. master = createClusterNode(argv[3],0);
  137. clusterAddNode(master);
  138. }
  139. n->slaveof = master;
  140. clusterNodeAddSlave(master,n);
  141. }
  142. /* Set ping sent / pong received timestamps */
  143. if (atoi(argv[4])) n->ping_sent = mstime();
  144. if (atoi(argv[5])) n->pong_received = mstime();
  145. /* Set configEpoch for this node. */
  146. n->configEpoch = strtoull(argv[6],NULL,10);
  147. /* Populate hash slots served by this instance. */
  148. for (j = 8; j < argc; j++) {
  149. int start, stop;
  150. if (argv[j][0] == '[') {
  151. /* Here we handle migrating / importing slots */
  152. int slot;
  153. char direction;
  154. clusterNode *cn;
  155. p = strchr(argv[j],'-');
  156. serverAssert(p != NULL);
  157. *p = '\0';
  158. direction = p[1]; /* Either '>' or '<' */
  159. slot = atoi(argv[j]+1);
  160. if (slot < 0 || slot >= CLUSTER_SLOTS) {
  161. sdsfreesplitres(argv,argc);
  162. goto fmterr;
  163. }
  164. p += 3;
  165. cn = clusterLookupNode(p);
  166. if (!cn) {
  167. cn = createClusterNode(p,0);
  168. clusterAddNode(cn);
  169. }
  170. if (direction == '>') {
  171. server.cluster->migrating_slots_to[slot] = cn;
  172. } else {
  173. server.cluster->importing_slots_from[slot] = cn;
  174. }
  175. continue;
  176. } else if ((p = strchr(argv[j],'-')) != NULL) {
  177. *p = '\0';
  178. start = atoi(argv[j]);
  179. stop = atoi(p+1);
  180. } else {
  181. start = stop = atoi(argv[j]);
  182. }
  183. if (start < 0 || start >= CLUSTER_SLOTS ||
  184. stop < 0 || stop >= CLUSTER_SLOTS)
  185. {
  186. sdsfreesplitres(argv,argc);
  187. goto fmterr;
  188. }
  189. while(start <= stop) clusterAddSlot(n, start++);
  190. }
  191. sdsfreesplitres(argv,argc);
  192. }
  193. /* Config sanity check */
  194. if (server.cluster->myself == NULL) goto fmterr;
  195. zfree(line);
  196. fclose(fp);
  197. serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
  198. /* Something that should never happen: currentEpoch smaller than
  199. * the max epoch found in the nodes configuration. However we handle this
  200. * as some form of protection against manual editing of critical files. */
  201. if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
  202. server.cluster->currentEpoch = clusterGetMaxEpoch();
  203. }
  204. return C_OK;
  205. fmterr:
  206. serverLog(LL_WARNING,
  207. "Unrecoverable error: corrupted cluster config file.");
  208. zfree(line);
  209. if (fp) fclose(fp);
  210. exit(1);
  211. }
  212. /* Cluster node configuration is exactly the same as CLUSTER NODES output.
  213. *
  214. * This function writes the node config and returns 0, on error -1
  215. * is returned.
  216. *
  217. * Note: we need to write the file in an atomic way from the point of view
  218. * of the POSIX filesystem semantics, so that if the server is stopped
  219. * or crashes during the write, we'll end with either the old file or the
  220. * new one. Since we have the full payload to write available we can use
  221. * a single write to write the whole file. If the pre-existing file was
  222. * bigger we pad our payload with newlines that are anyway ignored and truncate
  223. * the file afterward. */
  224. int clusterSaveConfig(int do_fsync) {
  225. sds ci;
  226. size_t content_size;
  227. struct stat sb;
  228. int fd;
  229. server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
  230. /* Get the nodes description and concatenate our "vars" directive to
  231. * save currentEpoch and lastVoteEpoch. */
  232. ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE, 0);
  233. ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
  234. (unsigned long long) server.cluster->currentEpoch,
  235. (unsigned long long) server.cluster->lastVoteEpoch);
  236. content_size = sdslen(ci);
  237. if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
  238. == -1) goto err;
  239. /* Pad the new payload if the existing file length is greater. */
  240. if (fstat(fd,&sb) != -1) {
  241. if (sb.st_size > (off_t)content_size) {
  242. ci = sdsgrowzero(ci,sb.st_size);
  243. memset(ci+content_size,'\n',sb.st_size-content_size);
  244. }
  245. }
  246. if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
  247. if (do_fsync) {
  248. server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
  249. if (fsync(fd) == -1) goto err;
  250. }
  251. /* Truncate the file if needed to remove the final \n padding that
  252. * is just garbage. */
  253. if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
  254. /* ftruncate() failing is not a critical error. */
  255. }
  256. close(fd);
  257. sdsfree(ci);
  258. return 0;
  259. err:
  260. if (fd != -1) close(fd);
  261. sdsfree(ci);
  262. return -1;
  263. }
  264. void clusterSaveConfigOrDie(int do_fsync) {
  265. if (clusterSaveConfig(do_fsync) == -1) {
  266. serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
  267. exit(1);
  268. }
  269. }
  270. /* Lock the cluster config using flock(), and leaks the file descriptor used to
  271. * acquire the lock so that the file will be locked forever.
  272. *
  273. * This works because we always update nodes.conf with a new version
  274. * in-place, reopening the file, and writing to it in place (later adjusting
  275. * the length with ftruncate()).
  276. *
  277. * On success C_OK is returned, otherwise an error is logged and
  278. * the function returns C_ERR to signal a lock was not acquired. */
  279. int clusterLockConfig(char *filename) {
  280. /* flock() does not exist on Solaris
  281. * and a fcntl-based solution won't help, as we constantly re-open that file,
  282. * which will release _all_ locks anyway
  283. */
  284. #if !defined(__sun)
  285. /* To lock it, we need to open the file in a way it is created if
  286. * it does not exist, otherwise there is a race condition with other
  287. * processes. */
  288. int fd = open(filename,O_WRONLY|O_CREAT|O_CLOEXEC,0644);
  289. if (fd == -1) {
  290. serverLog(LL_WARNING,
  291. "Can't open %s in order to acquire a lock: %s",
  292. filename, strerror(errno));
  293. return C_ERR;
  294. }
  295. if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
  296. if (errno == EWOULDBLOCK) {
  297. serverLog(LL_WARNING,
  298. "Sorry, the cluster configuration file %s is already used "
  299. "by a different Redis Cluster node. Please make sure that "
  300. "different nodes use different cluster configuration "
  301. "files.", filename);
  302. } else {
  303. serverLog(LL_WARNING,
  304. "Impossible to lock %s: %s", filename, strerror(errno));
  305. }
  306. close(fd);
  307. return C_ERR;
  308. }
  309. /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
  310. * lock to the file as long as the process exists.
  311. *
  312. * After fork, the child process will get the fd opened by the parent process,
  313. * we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),
  314. * it will be closed in the child process.
  315. * If it is not closed, when the main process is killed -9, but the child process
  316. * (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the
  317. * child process, and the main process will fail to get lock, means fail to start. */
  318. server.cluster_config_file_lock_fd = fd;
  319. #else
  320. UNUSED(filename);
  321. #endif /* __sun */
  322. return C_OK;
  323. }
  324. /* Derives our ports to be announced in the cluster bus. */
  325. void deriveAnnouncedPorts(int *announced_port, int *announced_pport,
  326. int *announced_cport) {
  327. int port = server.tls_cluster ? server.tls_port : server.port;
  328. /* Default announced ports. */
  329. *announced_port = port;
  330. *announced_pport = server.tls_cluster ? server.port : 0;
  331. *announced_cport = port + CLUSTER_PORT_INCR;
  332. /* Config overriding announced ports. */
  333. if (server.tls_cluster && server.cluster_announce_tls_port) {
  334. *announced_port = server.cluster_announce_tls_port;
  335. *announced_pport = server.cluster_announce_port;
  336. } else if (server.cluster_announce_port) {
  337. *announced_port = server.cluster_announce_port;
  338. }
  339. if (server.cluster_announce_bus_port) {
  340. *announced_cport = server.cluster_announce_bus_port;
  341. }
  342. }
  343. /* Some flags (currently just the NOFAILOVER flag) may need to be updated
  344. * in the "myself" node based on the current configuration of the node,
  345. * that may change at runtime via CONFIG SET. This function changes the
  346. * set of flags in myself->flags accordingly. */
  347. void clusterUpdateMyselfFlags(void) {
  348. int oldflags = myself->flags;
  349. int nofailover = server.cluster_slave_no_failover ?
  350. CLUSTER_NODE_NOFAILOVER : 0;
  351. myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
  352. myself->flags |= nofailover;
  353. if (myself->flags != oldflags) {
  354. clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
  355. CLUSTER_TODO_UPDATE_STATE);
  356. }
  357. }
  358. void clusterInit(void) {
  359. int saveconf = 0;
  360. server.cluster = zmalloc(sizeof(clusterState));
  361. server.cluster->myself = NULL;
  362. server.cluster->currentEpoch = 0;
  363. server.cluster->state = CLUSTER_FAIL;
  364. server.cluster->size = 1;
  365. server.cluster->todo_before_sleep = 0;
  366. server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
  367. server.cluster->nodes_black_list =
  368. dictCreate(&clusterNodesBlackListDictType,NULL);
  369. server.cluster->failover_auth_time = 0;
  370. server.cluster->failover_auth_count = 0;
  371. server.cluster->failover_auth_rank = 0;
  372. server.cluster->failover_auth_epoch = 0;
  373. server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
  374. server.cluster->lastVoteEpoch = 0;
  375. for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
  376. server.cluster->stats_bus_messages_sent[i] = 0;
  377. server.cluster->stats_bus_messages_received[i] = 0;
  378. }
  379. server.cluster->stats_pfail_nodes = 0;
  380. memset(server.cluster->slots,0, sizeof(server.cluster->slots));
  381. clusterCloseAllSlots();
  382. /* Lock the cluster config file to make sure every node uses
  383. * its own nodes.conf. */
  384. server.cluster_config_file_lock_fd = -1;
  385. if (clusterLockConfig(server.cluster_configfile) == C_ERR)
  386. exit(1);
  387. /* Load or create a new nodes configuration. */
  388. if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
  389. /* No configuration found. We will just use the random name provided
  390. * by the createClusterNode() function. */
  391. myself = server.cluster->myself =
  392. createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
  393. serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
  394. myself->name);
  395. clusterAddNode(myself);
  396. saveconf = 1;
  397. }
  398. if (saveconf) clusterSaveConfigOrDie(1);
  399. /* We need a listening TCP port for our cluster messaging needs. */
  400. server.cfd.count = 0;
  401. /* Port sanity check II
  402. * The other handshake port check is triggered too late to stop
  403. * us from trying to use a too-high cluster port number. */
  404. int port = server.tls_cluster ? server.tls_port : server.port;
  405. if (port > (65535-CLUSTER_PORT_INCR)) {
  406. serverLog(LL_WARNING, "Redis port number too high. "
  407. "Cluster communication port is 10,000 port "
  408. "numbers higher than your Redis port. "
  409. "Your Redis port number must be 55535 or less.");
  410. exit(1);
  411. }
  412. if (listenToPort(port+CLUSTER_PORT_INCR, &server.cfd) == C_ERR) {
  413. exit(1);
  414. }
  415. if (createSocketAcceptHandler(&server.cfd, clusterAcceptHandler) != C_OK) {
  416. serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
  417. }
  418. /* The slots -> keys map is a radix tree. Initialize it here. */
  419. server.cluster->slots_to_keys = raxNew();
  420. memset(server.cluster->slots_keys_count,0,
  421. sizeof(server.cluster->slots_keys_count));
  422. /* Set myself->port/cport/pport to my listening ports, we'll just need to
  423. * discover the IP address via MEET messages. */
  424. deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);
  425. server.cluster->mf_end = 0;
  426. resetManualFailover();
  427. clusterUpdateMyselfFlags();
  428. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载