处理非阻塞的client, 整个实现还是比较简单的:
- /* This function is called in the beforeSleep() function of the event loop
-  * in order to process the pending input buffer of clients that were
-  * unblocked after a blocking operation. */
- void processUnblockedClients(void) {
-     listNode *ln;
-     client *c;
-  
-     while (listLength(server.unblocked_clients)) {
-         ln = listFirst(server.unblocked_clients);
-         serverAssert(ln != NULL);
-         c = ln->value;
-         listDelNode(server.unblocked_clients,ln);
-         c->flags &= ~CLIENT_UNBLOCKED;
-  
-         /* Process remaining data in the input buffer, unless the client
-          * is blocked again. Actually processInputBuffer() checks that the
-          * client is not blocked before to proceed, but things may change and
-          * the code is conceptually more correct this way. */
-         if (!(c->flags & CLIENT_BLOCKED)) {
-             /* If we have a queued command, execute it now. */
-             if (processPendingCommandsAndResetClient(c) == C_ERR) {
-                 continue;
-             }
-             /* Then process client if it has more data in it's buffer. */
-             if (c->querybuf && sdslen(c->querybuf) > 0) {
-                 processInputBuffer(c);
-             }
-         }
-     }
- }
-  
- /* This function will schedule the client for reprocessing at a safe time.
-  *
-  * This is useful when a client was blocked for some reason (blocking operation,
-  * CLIENT PAUSE, or whatever), because it may end with some accumulated query
-  * buffer that needs to be processed ASAP:
-  *
-  * 1. When a client is blocked, its readable handler is still active.
-  * 2. However in this case it only gets data into the query buffer, but the
-  *    query is not parsed or executed once there is enough to proceed as
-  *    usually (because the client is blocked... so we can't execute commands).
-  * 3. When the client is unblocked, without this function, the client would
-  *    have to write some query in order for the readable handler to finally
-  *    call processQueryBuffer*() on it.
-  * 4. With this function instead we can put the client in a queue that will
-  *    process it for queries ready to be executed at a safe time.
-  */
- void queueClientForReprocessing(client *c) {
-     /* The client may already be into the unblocked list because of a previous
-      * blocking operation, don't add back it into the list multiple times. */
-     if (!(c->flags & CLIENT_UNBLOCKED)) {
-         c->flags |= CLIENT_UNBLOCKED;
-         listAddNodeTail(server.unblocked_clients,c);
-     }
- }
-  
- /* Unblock a client calling the right function depending on the kind
-  * of operation the client is blocking for. */
- void unblockClient(client *c) {
-     if (c->btype == BLOCKED_LIST ||
-         c->btype == BLOCKED_ZSET ||
-         c->btype == BLOCKED_STREAM) {
-         unblockClientWaitingData(c);
-     } else if (c->btype == BLOCKED_WAIT) {
-         unblockClientWaitingReplicas(c);
-     } else if (c->btype == BLOCKED_MODULE) {
-         if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
-         unblockClientFromModule(c);
-     } else if (c->btype == BLOCKED_PAUSE) {
-         listDelNode(server.paused_clients,c->paused_list_node);
-         c->paused_list_node = NULL;
-     } else {
-         serverPanic("Unknown btype in unblockClient().");
-     }
-  
-     /* Reset the client for a new query since, for blocking commands
-      * we do not do it immediately after the command returns (when the
-      * client got blocked) in order to be still able to access the argument
-      * vector from module callbacks and updateStatsOnUnblock. */
-     if (c->btype != BLOCKED_PAUSE) {
-         freeClientOriginalArgv(c);
-         resetClient(c);
-     }
-  
-     /* Clear the flags, and put the client in the unblocked list so that
-      * we'll process new commands in its query buffer ASAP. */
-     server.blocked_clients--;
-     server.blocked_clients_by_type[c->btype]--;
-     c->flags &= ~CLIENT_BLOCKED;
-     c->btype = BLOCKED_NONE;
-     removeClientFromTimeoutTable(c);
-     queueClientForReprocessing(c);
- }
-  
- /* This function gets called when a blocked client timed out in order to
-  * send it a reply of some kind. After this function is called,
-  * unblockClient() will be called with the same client as argument. */
- void replyToBlockedClientTimedOut(client *c) {
-     if (c->btype == BLOCKED_LIST ||
-         c->btype == BLOCKED_ZSET ||
-         c->btype == BLOCKED_STREAM) {
-         addReplyNullArray(c);
-     } else if (c->btype == BLOCKED_WAIT) {
-         addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
-     } else if (c->btype == BLOCKED_MODULE) {
-         moduleBlockedClientTimedOut(c);
-     } else {
-         serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
-     }
- }
-  
- /* Mass-unblock clients because something changed in the instance that makes
-  * blocking no longer safe. For example clients blocked in list operations
-  * in an instance which turns from master to slave is unsafe, so this function
-  * is called when a master turns into a slave.
-  *
-  * The semantics is to send an -UNBLOCKED error to the client, disconnecting
-  * it at the same time. */
- void disconnectAllBlockedClients(void) {
-     listNode *ln;
-     listIter li;
-  
-     listRewind(server.clients,&li);
-     while((ln = listNext(&li))) {
-         client *c = listNodeValue(ln);
-  
-         if (c->flags & CLIENT_BLOCKED) {
-             /* PAUSED clients are an exception, when they'll be unblocked, the
-              * command processing will start from scratch, and the command will
-              * be either executed or rejected. (unlike LIST blocked clients for
-              * which the command is already in progress in a way. */
-             if (c->btype == BLOCKED_PAUSE)
-                 continue;
-  
-             addReplyError(c,
-                 "-UNBLOCKED force unblock from blocking operation, "
-                 "instance state changed (master -> replica?)");
-             unblockClient(c);
-             c->flags |= CLIENT_CLOSE_AFTER_REPLY;
-         }
-     }
- }
-  
- /* Helper function for handleClientsBlockedOnKeys(). This function is called
-  * when there may be clients blocked on a list key, and there may be new
-  * data to fetch (the key is ready). */
- void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
-     /* We serve clients in the same order they blocked for
-      * this key, from the first blocked to the last. */
-     dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
-     if (de) {
-         list *clients = dictGetVal(de);
-         int numclients = listLength(clients);
-  
-         while(numclients--) {
-             listNode *clientnode = listFirst(clients);
-             client *receiver = clientnode->value;
-  
-             if (receiver->btype != BLOCKED_LIST) {
-                 /* Put at the tail, so that at the next call
-                  * we'll not run into it again. */
-                 listRotateHeadToTail(clients);
-                 continue;
-             }
-  
-             robj *dstkey = receiver->bpop.target;
-             int wherefrom = receiver->bpop.listpos.wherefrom;
-             int whereto = receiver->bpop.listpos.whereto;
-             robj *value = listTypePop(o, wherefrom);
-  
-             if (value) {
-                 /* Protect receiver->bpop.target, that will be
-                  * freed by the next unblockClient()
-                  * call. */
-                 if (dstkey) incrRefCount(dstkey);
-  
-                 monotime replyTimer;
-                 elapsedStart(&replyTimer);
-                 if (serveClientBlockedOnList(receiver,
-                     rl->key,dstkey,rl->db,value,
-                     wherefrom, whereto) == C_ERR)
-                 {
-                     /* If we failed serving the client we need
-                      * to also undo the POP operation. */
-                     listTypePush(o,value,wherefrom);
-                 }
-                 updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
-                 unblockClient(receiver);
-  
-                 if (dstkey) decrRefCount(dstkey);
-                 decrRefCount(value);
-             } else {
-                 break;
-             }
-         }
-     }
-  
-     if (listTypeLength(o) == 0) {
-         dbDelete(rl->db,rl->key);
-         notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);
-     }
-     /* We don't call signalModifiedKey() as it was already called
-      * when an element was pushed on the list. */
- }
-  
- /* Helper function for handleClientsBlockedOnKeys(). This function is called
-  * when there may be clients blocked on a sorted set key, and there may be new
-  * data to fetch (the key is ready). */
- void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
-     /* We serve clients in the same order they blocked for
-      * this key, from the first blocked to the last. */
-     dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
-     if (de) {
-         list *clients = dictGetVal(de);
-         int numclients = listLength(clients);
-         unsigned long zcard = zsetLength(o);
-  
-         while(numclients-- && zcard) {
-             listNode *clientnode = listFirst(clients);
-             client *receiver = clientnode->value;
-  
-             if (receiver->btype != BLOCKED_ZSET) {
-                 /* Put at the tail, so that at the next call
-                  * we'll not run into it again. */
-                 listRotateHeadToTail(clients);
-                 continue;
-             }
-  
-             int where = (receiver->lastcmd &&
-                          receiver->lastcmd->proc == bzpopminCommand)
-                          ? ZSET_MIN : ZSET_MAX;
-             monotime replyTimer;
-             elapsedStart(&replyTimer);
-             genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
-             updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
-             unblockClient(receiver);
-             zcard--;
-  
-             /* Replicate the command. */
-             robj *argv[2];
-             struct redisCommand *cmd = where == ZSET_MIN ?
-                                        server.zpopminCommand :
-                                        server.zpopmaxCommand;
-             argv[0] = createStringObject(cmd->name,strlen(cmd->name));
-             argv[1] = rl->key;
-             incrRefCount(rl->key);
-             propagate(cmd,receiver->db->id,
-                       argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
-             decrRefCount(argv[0]);
-             decrRefCount(argv[1]);
-         }
-     }
- }
-  
- /* Helper function for handleClientsBlockedOnKeys(). This function is called
-  * when there may be clients blocked on a stream key, and there may be new
-  * data to fetch (the key is ready). */
- void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
-     dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
-     stream *s = o->ptr;
-  
-     /* We need to provide the new data arrived on the stream
-      * to all the clients that are waiting for an offset smaller
-      * than the current top item. */
-     if (de) {
-         list *clients = dictGetVal(de);
-         listNode *ln;
-         listIter li;
-         listRewind(clients,&li);
-  
-         while((ln = listNext(&li))) {
-             client *receiver = listNodeValue(ln);
-             if (receiver->btype != BLOCKED_STREAM) continue;
-             bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key);
-             streamID *gt = &bki->stream_id;
-  
-             /* If we blocked in the context of a consumer
-              * group, we need to resolve the group and update the
-              * last ID the client is blocked for: this is needed
-              * because serving other clients in the same consumer
-              * group will alter the "last ID" of the consumer
-              * group, and clients blocked in a consumer group are
-              * always blocked for the ">" ID: we need to deliver
-              * only new messages and avoid unblocking the client
-              * otherwise. */
-             streamCG *group = NULL;
-             if (receiver->bpop.xread_group) {
-                 group = streamLookupCG(s,
-                         receiver->bpop.xread_group->ptr);
-                 /* If the group was not found, send an error
-                  * to the consumer. */
-                 if (!group) {
-                     addReplyError(receiver,
-                         "-NOGROUP the consumer group this client "
-                         "was blocked on no longer exists");
-                     unblockClient(receiver);
-                     continue;
-                 } else {
-                     *gt = group->last_id;
-                 }
-             }
-  
-             if (streamCompareID(&s->last_id, gt) > 0) {
-                 streamID start = *gt;
-                 streamIncrID(&start);
-  
-                 /* Lookup the consumer for the group, if any. */
-                 streamConsumer *consumer = NULL;
-                 int noack = 0;
-  
-                 if (group) {
-                     int created = 0;
-                     consumer =
-                         streamLookupConsumer(group,
-                                              receiver->bpop.xread_consumer->ptr,
-                                              SLC_NONE,
-                                              &created);
-                     noack = receiver->bpop.xread_group_noack;
-                     if (created && noack) {
-                         streamPropagateConsumerCreation(receiver,rl->key,
-                                                         receiver->bpop.xread_group,
-                                                         consumer->name);
-                     }
-                 }
-  
-                 monotime replyTimer;
-                 elapsedStart(&replyTimer);
-                 /* Emit the two elements sub-array consisting of
-                  * the name of the stream and the data we
-                  * extracted from it. Wrapped in a single-item
-                  * array, since we have just one key. */
-                 if (receiver->resp == 2) {
-                     addReplyArrayLen(receiver,1);
-                     addReplyArrayLen(receiver,2);
-                 } else {
-                     addReplyMapLen(receiver,1);
-                 }
-                 addReplyBulk(receiver,rl->key);
-  
-                 streamPropInfo pi = {
-                     rl->key,
-                     receiver->bpop.xread_group
-                 };
-                 streamReplyWithRange(receiver,s,&start,NULL,
-                                      receiver->bpop.xread_count,
-                                      0, group, consumer, noack, &pi);
-                 updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
-  
-                 /* Note that after we unblock the client, 'gt'
-                  * and other receiver->bpop stuff are no longer
-                  * valid, so we must do the setup above before
-                  * this call. */
-                 unblockClient(receiver);
-             }
-         }
-     }
- }
-  
- /* Helper function for handleClientsBlockedOnKeys(). This function is called
-  * in order to check if we can serve clients blocked by modules using
-  * RM_BlockClientOnKeys(), when the corresponding key was signaled as ready:
-  * our goal here is to call the RedisModuleBlockedClient reply() callback to
-  * see if the key is really able to serve the client, and in that case,
-  * unblock it. */
- void serveClientsBlockedOnKeyByModule(readyList *rl) {
-     dictEntry *de;
-  
-     /* Optimization: If no clients are in type BLOCKED_MODULE,
-      * we can skip this loop. */
-     if (!server.blocked_clients_by_type[BLOCKED_MODULE]) return;
-  
-     /* We serve clients in the same order they blocked for
-      * this key, from the first blocked to the last. */
-     de = dictFind(rl->db->blocking_keys,rl->key);
-     if (de) {
-         list *clients = dictGetVal(de);
-         int numclients = listLength(clients);
-  
-         while(numclients--) {
-             listNode *clientnode = listFirst(clients);
-             client *receiver = clientnode->value;
-  
-             /* Put at the tail, so that at the next call
-              * we'll not run into it again: clients here may not be
-              * ready to be served, so they'll remain in the list
-              * sometimes. We want also be able to skip clients that are
-              * not blocked for the MODULE type safely. */
-             listRotateHeadToTail(clients);
-  
-             if (receiver->btype != BLOCKED_MODULE) continue;
-  
-             /* Note that if *this* client cannot be served by this key,
-              * it does not mean that another client that is next into the
-              * list cannot be served as well: they may be blocked by
-              * different modules with different triggers to consider if a key
-              * is ready or not. This means we can't exit the loop but need
-              * to continue after the first failure. */
-             monotime replyTimer;
-             elapsedStart(&replyTimer);
-             if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
-             updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
-  
-             moduleUnblockClient(receiver);
-         }
-     }
- }
-  
- /* This function should be called by Redis every time a single command,
-  * a MULTI/EXEC block, or a Lua script, terminated its execution after
-  * being called by a client. It handles serving clients blocked in
-  * lists, streams, and sorted sets, via a blocking commands.
-  *
-  * All the keys with at least one client blocked that received at least
-  * one new element via some write operation are accumulated into
-  * the server.ready_keys list. This function will run the list and will
-  * serve clients accordingly. Note that the function will iterate again and
-  * again as a result of serving BLMOVE we can have new blocking clients
-  * to serve because of the PUSH side of BLMOVE.
-  *
-  * This function is normally "fair", that is, it will server clients
-  * using a FIFO behavior. However this fairness is violated in certain
-  * edge cases, that is, when we have clients blocked at the same time
-  * in a sorted set and in a list, for the same key (a very odd thing to
-  * do client side, indeed!). Because mismatching clients (blocking for
-  * a different type compared to the current key type) are moved in the
-  * other side of the linked list. However as long as the key starts to
-  * be used only for a single type, like virtually any Redis application will
-  * do, the function is already fair. */
- void handleClientsBlockedOnKeys(void) {
-     while(listLength(server.ready_keys) != 0) {
-         list *l;
-  
-         /* Point server.ready_keys to a fresh list and save the current one
-          * locally. This way as we run the old list we are free to call
-          * signalKeyAsReady() that may push new elements in server.ready_keys
-          * when handling clients blocked into BLMOVE. */
-         l = server.ready_keys;
-         server.ready_keys = listCreate();
-  
-         while(listLength(l) != 0) {
-             listNode *ln = listFirst(l);
-             readyList *rl = ln->value;
-  
-             /* First of all remove this key from db->ready_keys so that
-              * we can safely call signalKeyAsReady() against this key. */
-             dictDelete(rl->db->ready_keys,rl->key);
-  
-             /* Even if we are not inside call(), increment the call depth
-              * in order to make sure that keys are expired against a fixed
-              * reference time, and not against the wallclock time. This
-              * way we can lookup an object multiple times (BLMOVE does
-              * that) without the risk of it being freed in the second
-              * lookup, invalidating the first one.
-              * See https://github.com/redis/redis/pull/6554. */
-             server.fixed_time_expire++;
-             updateCachedTime(0);
-  
-             /* Serve clients blocked on the key. */
-             robj *o = lookupKeyWrite(rl->db,rl->key);
-  
-             if (o != NULL) {
-                 if (o->type == OBJ_LIST)
-                     serveClientsBlockedOnListKey(o,rl);
-                 else if (o->type == OBJ_ZSET)
-                     serveClientsBlockedOnSortedSetKey(o,rl);
-                 else if (o->type == OBJ_STREAM)
-                     serveClientsBlockedOnStreamKey(o,rl);
-                 /* We want to serve clients blocked on module keys
-                  * regardless of the object type: we don't know what the
-                  * module is trying to accomplish right now. */
-                 serveClientsBlockedOnKeyByModule(rl);
-             }
-             server.fixed_time_expire--;
-  
-             /* Free this item. */
-             decrRefCount(rl->key);
-             zfree(rl);
-             listDelNode(l,ln);
-         }
-         listRelease(l); /* We have the new list on place at this point. */
-     }
- }
-  
- /* This is how the current blocking lists/sorted sets/streams work, we use
-  * BLPOP as example, but the concept is the same for other list ops, sorted
-  * sets and XREAD.
-  * - If the user calls BLPOP and the key exists and contains a non empty list
-  *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
-  *   if blocking is not required.
-  * - If instead BLPOP is called and the key does not exists or the list is
-  *   empty we need to block. In order to do so we remove the notification for
-  *   new data to read in the client socket (so that we'll not serve new
-  *   requests if the blocking request is not served). Also we put the client
-  *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
-  *   blocking for this keys.
-  * - If a PUSH operation against a key with blocked clients waiting is
-  *   performed, we mark this key as "ready", and after the current command,
-  *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
-  *   for this list, from the one that blocked first, to the last, accordingly
-  *   to the number of elements we have in the ready list.
-  */
-  
- /* Set a client in blocking mode for the specified key (list, zset or stream),
-  * with the specified timeout. The 'type' argument is BLOCKED_LIST,
-  * BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are
-  * waiting for an empty key in order to awake the client. The client is blocked
-  * for all the 'numkeys' keys as in the 'keys' argument. When we block for
-  * stream keys, we also provide an array of streamID structures: clients will
-  * be unblocked only when items with an ID greater or equal to the specified
-  * one is appended to the stream. */
- void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) {
-     dictEntry *de;
-     list *l;
-     int j;
-  
-     c->bpop.timeout = timeout;
-     c->bpop.target = target;
-  
-     if (listpos != NULL) c->bpop.listpos = *listpos;
-  
-     if (target != NULL) incrRefCount(target);
-  
-     for (j = 0; j < numkeys; j++) {
-         /* Allocate our bkinfo structure, associated to each key the client
-          * is blocked for. */
-         bkinfo *bki = zmalloc(sizeof(*bki));
-         if (btype == BLOCKED_STREAM)
-             bki->stream_id = ids[j];
-  
-         /* If the key already exists in the dictionary ignore it. */
-         if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {
-             zfree(bki);
-             continue;
-         }
-         incrRefCount(keys[j]);
-  
-         /* And in the other "side", to map keys -> clients */
-         de = dictFind(c->db->blocking_keys,keys[j]);
-         if (de == NULL) {
-             int retval;
-  
-             /* For every key we take a list of clients blocked for it */
-             l = listCreate();
-             retval = dictAdd(c->db->blocking_keys,keys[j],l);
-             incrRefCount(keys[j]);
-             serverAssertWithInfo(c,keys[j],retval == DICT_OK);
-         } else {
-             l = dictGetVal(de);
-         }
-         listAddNodeTail(l,c);
-         bki->listnode = listLast(l);
-     }
-     blockClient(c,btype);
- }
-  
- /* Unblock a client that's waiting in a blocking operation such as BLPOP.
-  * You should never call this function directly, but unblockClient() instead. */
- void unblockClientWaitingData(client *c) {
-     dictEntry *de;
-     dictIterator *di;
-     list *l;
-  
-     serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
-     di = dictGetIterator(c->bpop.keys);
-     /* The client may wait for multiple keys, so unblock it for every key. */
-     while((de = dictNext(di)) != NULL) {
-         robj *key = dictGetKey(de);
-         bkinfo *bki = dictGetVal(de);
-  
-         /* Remove this client from the list of clients waiting for this key. */
-         l = dictFetchValue(c->db->blocking_keys,key);
-         serverAssertWithInfo(c,key,l != NULL);
-         listDelNode(l,bki->listnode);
-         /* If the list is empty we need to remove it to avoid wasting memory */
-         if (listLength(l) == 0)
-             dictDelete(c->db->blocking_keys,key);
-     }
-     dictReleaseIterator(di);
-  
-     /* Cleanup the client structure */
-     dictEmpty(c->bpop.keys,NULL);
-     if (c->bpop.target) {
-         decrRefCount(c->bpop.target);
-         c->bpop.target = NULL;
-     }
-     if (c->bpop.xread_group) {
-         decrRefCount(c->bpop.xread_group);
-         decrRefCount(c->bpop.xread_consumer);
-         c->bpop.xread_group = NULL;
-         c->bpop.xread_consumer = NULL;
-     }
- }

 
                
![战神引擎传奇手游【黯晶灭世[白猪3.1]】最新整理Win系特色服务端+安卓苹果双端+GM授权后台+详细搭建教程](https://cdn.jxasp.com:9143/image/20251028/0F2E0E55BA6157D5F76B8125D0A511AC.jpg)
















