关键词搜索

源码搜索 ×
×

漫话Redis源码之八十二

发布2022-02-13浏览546次

详情内容

看最后的kqueue, 这是为了兼容不同平台,其实你可以理解为它和select/poll/epoll差不多就行:

  1. #include <sys/types.h>
  2. #include <sys/event.h>
  3. #include <sys/time.h>
  4. typedef struct aeApiState {
  5. int kqfd;
  6. struct kevent *events;
  7. /* Events mask for merge read and write event.
  8. * To reduce memory consumption, we use 2 bits to store the mask
  9. * of an event, so that 1 byte will store the mask of 4 events. */
  10. char *eventsMask;
  11. } aeApiState;
  12. #define EVENT_MASK_MALLOC_SIZE(sz) (((sz) + 3) / 4)
  13. #define EVENT_MASK_OFFSET(fd) ((fd) % 4 * 2)
  14. #define EVENT_MASK_ENCODE(fd, mask) (((mask) & 0x3) << EVENT_MASK_OFFSET(fd))
  15. static inline int getEventMask(const char *eventsMask, int fd) {
  16. return (eventsMask[fd/4] >> EVENT_MASK_OFFSET(fd)) & 0x3;
  17. }
  18. static inline void addEventMask(char *eventsMask, int fd, int mask) {
  19. eventsMask[fd/4] |= EVENT_MASK_ENCODE(fd, mask);
  20. }
  21. static inline void resetEventMask(char *eventsMask, int fd) {
  22. eventsMask[fd/4] &= ~EVENT_MASK_ENCODE(fd, 0x3);
  23. }
  24. static int aeApiCreate(aeEventLoop *eventLoop) {
  25. aeApiState *state = zmalloc(sizeof(aeApiState));
  26. if (!state) return -1;
  27. state->events = zmalloc(sizeof(struct kevent)*eventLoop->setsize);
  28. if (!state->events) {
  29. zfree(state);
  30. return -1;
  31. }
  32. state->kqfd = kqueue();
  33. if (state->kqfd == -1) {
  34. zfree(state->events);
  35. zfree(state);
  36. return -1;
  37. }
  38. anetCloexec(state->kqfd);
  39. state->eventsMask = zmalloc(EVENT_MASK_MALLOC_SIZE(eventLoop->setsize));
  40. memset(state->eventsMask, 0, EVENT_MASK_MALLOC_SIZE(eventLoop->setsize));
  41. eventLoop->apidata = state;
  42. return 0;
  43. }
  44. static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
  45. aeApiState *state = eventLoop->apidata;
  46. state->events = zrealloc(state->events, sizeof(struct kevent)*setsize);
  47. state->eventsMask = zrealloc(state->eventsMask, EVENT_MASK_MALLOC_SIZE(setsize));
  48. memset(state->eventsMask, 0, EVENT_MASK_MALLOC_SIZE(setsize));
  49. return 0;
  50. }
  51. static void aeApiFree(aeEventLoop *eventLoop) {
  52. aeApiState *state = eventLoop->apidata;
  53. close(state->kqfd);
  54. zfree(state->events);
  55. zfree(state->eventsMask);
  56. zfree(state);
  57. }
  58. static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
  59. aeApiState *state = eventLoop->apidata;
  60. struct kevent ke;
  61. if (mask & AE_READABLE) {
  62. EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
  63. if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
  64. }
  65. if (mask & AE_WRITABLE) {
  66. EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
  67. if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
  68. }
  69. return 0;
  70. }
  71. static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
  72. aeApiState *state = eventLoop->apidata;
  73. struct kevent ke;
  74. if (mask & AE_READABLE) {
  75. EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
  76. kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
  77. }
  78. if (mask & AE_WRITABLE) {
  79. EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
  80. kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
  81. }
  82. }
  83. static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
  84. aeApiState *state = eventLoop->apidata;
  85. int retval, numevents = 0;
  86. if (tvp != NULL) {
  87. struct timespec timeout;
  88. timeout.tv_sec = tvp->tv_sec;
  89. timeout.tv_nsec = tvp->tv_usec * 1000;
  90. retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
  91. &timeout);
  92. } else {
  93. retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
  94. NULL);
  95. }
  96. if (retval > 0) {
  97. int j;
  98. /* Normally we execute the read event first and then the write event.
  99. * When the barrier is set, we will do it reverse.
  100. *
  101. * However, under kqueue, read and write events would be separate
  102. * events, which would make it impossible to control the order of
  103. * reads and writes. So we store the event's mask we've got and merge
  104. * the same fd events later. */
  105. for (j = 0; j < retval; j++) {
  106. struct kevent *e = state->events+j;
  107. int fd = e->ident;
  108. int mask = 0;
  109. if (e->filter == EVFILT_READ) mask = AE_READABLE;
  110. else if (e->filter == EVFILT_WRITE) mask = AE_WRITABLE;
  111. addEventMask(state->eventsMask, fd, mask);
  112. }
  113. /* Re-traversal to merge read and write events, and set the fd's mask to
  114. * 0 so that events are not added again when the fd is encountered again. */
  115. numevents = 0;
  116. for (j = 0; j < retval; j++) {
  117. struct kevent *e = state->events+j;
  118. int fd = e->ident;
  119. int mask = getEventMask(state->eventsMask, fd);
  120. if (mask) {
  121. eventLoop->fired[numevents].fd = fd;
  122. eventLoop->fired[numevents].mask = mask;
  123. resetEventMask(state->eventsMask, fd);
  124. numevents++;
  125. }
  126. }
  127. }
  128. return numevents;
  129. }
  130. static char *aeApiName(void) {
  131. return "kqueue";
  132. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载