这里的函数主要是RIO的读写实现:
- static const rio rioFileIO = {
- rioFileRead,
- rioFileWrite,
- rioFileTell,
- rioFileFlush,
- NULL, /* update_checksum */
- 0, /* current checksum */
- 0, /* flags */
- 0, /* bytes read or written */
- 0, /* read/write chunk size */
- { { NULL, 0 } } /* union for io-specific vars */
- };
-
- void rioInitWithFile(rio *r, FILE *fp) {
- *r = rioFileIO;
- r->io.file.fp = fp;
- r->io.file.buffered = 0;
- r->io.file.autosync = 0;
- }
-
- /* ------------------- Connection implementation -------------------
- * We use this RIO implementation when reading an RDB file directly from
- * the connection to the memory via rdbLoadRio(), thus this implementation
- * only implements reading from a connection that is, normally,
- * just a socket. */
-
- static size_t rioConnWrite(rio *r, const void *buf, size_t len) {
- UNUSED(r);
- UNUSED(buf);
- UNUSED(len);
- return 0; /* Error, this target does not yet support writing. */
- }
-
- /* Returns 1 or 0 for success/failure. */
- static size_t rioConnRead(rio *r, void *buf, size_t len) {
- size_t avail = sdslen(r->io.conn.buf)-r->io.conn.pos;
-
- /* If the buffer is too small for the entire request: realloc. */
- if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len)
- r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf));
-
- /* If the remaining unused buffer is not large enough: memmove so that we
- * can read the rest. */
- if (len > avail && sdsavail(r->io.conn.buf) < len - avail) {
- sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
- r->io.conn.pos = 0;
- }
-
- /* If we don't already have all the data in the sds, read more */
- while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) {
- size_t buffered = sdslen(r->io.conn.buf) - r->io.conn.pos;
- size_t needs = len - buffered;
- /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of
- * the two. */
- size_t toread = needs < PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN: needs;
- if (toread > sdsavail(r->io.conn.buf)) toread = sdsavail(r->io.conn.buf);
- if (r->io.conn.read_limit != 0 &&
- r->io.conn.read_so_far + buffered + toread > r->io.conn.read_limit)
- {
- /* Make sure the caller didn't request to read past the limit.
- * If they didn't we'll buffer till the limit, if they did, we'll
- * return an error. */
- if (r->io.conn.read_limit >= r->io.conn.read_so_far + len)
- toread = r->io.conn.read_limit - r->io.conn.read_so_far - buffered;
- else {
- errno = EOVERFLOW;
- return 0;
- }
- }
- int retval = connRead(r->io.conn.conn,
- (char*)r->io.conn.buf + sdslen(r->io.conn.buf),
- toread);
- if (retval <= 0) {
- if (errno == EWOULDBLOCK) errno = ETIMEDOUT;
- return 0;
- }
- sdsIncrLen(r->io.conn.buf, retval);
- }
-
- memcpy(buf, (char*)r->io.conn.buf + r->io.conn.pos, len);
- r->io.conn.read_so_far += len;
- r->io.conn.pos += len;
- return len;
- }
-
- /* Returns read/write position in file. */
- static off_t rioConnTell(rio *r) {
- return r->io.conn.read_so_far;
- }
-
- /* Flushes any buffer to target device if applicable. Returns 1 on success
- * and 0 on failures. */
- static int rioConnFlush(rio *r) {
- /* Our flush is implemented by the write method, that recognizes a
- * buffer set to NULL with a count of zero as a flush request. */
- return rioConnWrite(r,NULL,0);
- }
-
- static const rio rioConnIO = {
- rioConnRead,
- rioConnWrite,
- rioConnTell,
- rioConnFlush,
- NULL, /* update_checksum */
- 0, /* current checksum */
- 0, /* flags */
- 0, /* bytes read or written */
- 0, /* read/write chunk size */
- { { NULL, 0 } } /* union for io-specific vars */
- };
-
- /* Create an RIO that implements a buffered read from an fd
- * read_limit argument stops buffering when the reaching the limit. */
- void rioInitWithConn(rio *r, connection *conn, size_t read_limit) {
- *r = rioConnIO;
- r->io.conn.conn = conn;
- r->io.conn.pos = 0;
- r->io.conn.read_limit = read_limit;
- r->io.conn.read_so_far = 0;
- r->io.conn.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
- sdsclear(r->io.conn.buf);
- }
-
- /* Release the RIO stream. Optionally returns the unread buffered data
- * when the SDS pointer 'remaining' is passed. */
- void rioFreeConn(rio *r, sds *remaining) {
- if (remaining && (size_t)r->io.conn.pos < sdslen(r->io.conn.buf)) {
- if (r->io.conn.pos > 0) sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
- *remaining = r->io.conn.buf;
- } else {
- sdsfree(r->io.conn.buf);
- if (remaining) *remaining = NULL;
- }
- r->io.conn.buf = NULL;
- }
-
- /* ------------------- File descriptor implementation ------------------
- * This target is used to write the RDB file to pipe, when the master just
- * streams the data to the replicas without creating an RDB on-disk image
- * (diskless replication option).
- * It only implements writes. */
-
- /* Returns 1 or 0 for success/failure.
- *
- * When buf is NULL and len is 0, the function performs a flush operation
- * if there is some pending buffer, so this function is also used in order
- * to implement rioFdFlush(). */
- static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
- ssize_t retval;
- unsigned char *p = (unsigned char*) buf;
- int doflush = (buf == NULL && len == 0);
-
- /* For small writes, we rather keep the data in user-space buffer, and flush
- * it only when it grows. however for larger writes, we prefer to flush
- * any pre-existing buffer, and write the new one directly without reallocs
- * and memory copying. */
- if (len > PROTO_IOBUF_LEN) {
- /* First, flush any pre-existing buffered data. */
- if (sdslen(r->io.fd.buf)) {
- if (rioFdWrite(r, NULL, 0) == 0)
- return 0;
- }
- /* Write the new data, keeping 'p' and 'len' from the input. */
- } else {
- if (len) {
- r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len);
- if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN)
- doflush = 1;
- if (!doflush)
- return 1;
- }
- /* Flusing the buffered data. set 'p' and 'len' accordintly. */
- p = (unsigned char*) r->io.fd.buf;
- len = sdslen(r->io.fd.buf);
- }
-
- size_t nwritten = 0;
- while(nwritten != len) {
- retval = write(r->io.fd.fd,p+nwritten,len-nwritten);
- if (retval <= 0) {
- /* With blocking io, which is the sole user of this
- * rio target, EWOULDBLOCK is returned only because of
- * the SO_SNDTIMEO socket option, so we translate the error
- * into one more recognizable by the user. */
- if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
- return 0; /* error. */
- }
- nwritten += retval;
- }
-
- r->io.fd.pos += len;
- sdsclear(r->io.fd.buf);
- return 1;
- }