首页 文章详情

Redis(二):命令集构建及关键属性源码解析

JAVA烂猪皮 | 289 2021-01-13 06:09 0 0 0
UniSMS (合一短信)

走过路过不要错过

点击蓝字关注我们


上一篇文章,我们从框架层面,主要介绍了redis的启动过程,以及主要的命令处理流程逻辑。这些更多的都是些差不多的道理,而要细了解redis,则需要更细节的东西。

今天我们稍微内围的角度,来看看几个命令执行的重要方法,深入理解下redis的魅力所在。

首先,我们通过上一章知道,processCommand 是其业务主要入口,我们再来回顾下:

// server.c/* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the * server for a bulk read from the client. * * If C_OK is returned the client is still alive and valid and * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */int processCommand(client *c) {    /* The QUIT command is handled separately. Normal command procs will     * go through checking for replication and QUIT will cause trouble     * when FORCE_REPLICATION is enabled and would be implemented in     * a regular command proc. */    // 如果是 quit 命令,直接回复ok即可,由客户端主动关闭请求    if (!strcasecmp(c->argv[0]->ptr,"quit")) {        addReply(c,shared.ok);        c->flags |= CLIENT_CLOSE_AFTER_REPLY;        return C_ERR;    }
/* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ // 查找命令信息,根据第一个字符串进行查找,这个我们主要看看 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // 验证命令查找是否找到,以及参数个数是否匹配,否则直接响应返回 if (!c->cmd) { flagTransaction(c); addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { flagTransaction(c); addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return C_OK; }
/* Check if the user is authenticated */ // 权限判定,只有先授权后,才能执行后续命令(auth 除外) if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { flagTransaction(c); addReply(c,shared.noautherr); return C_OK; }
/* If cluster is enabled perform the cluster redirection here. * However we don't perform the redirection if: * 1) The sender of this command is our master. * 2) The command has no key arguments. */ // 集群非master写请求转移 // 此处可见 flags 设计的重要性,代表了n多属性 if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { int hashslot;
if (server.cluster->state != CLUSTER_OK) { flagTransaction(c); clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE); return C_OK; } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); if (n == NULL || n != server.cluster->myself) { flagTransaction(c); clusterRedirectClient(c,n,hashslot,error_code); return C_OK; } } }
/* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile * keys in the dataset). If there are not the only thing we can do * is returning an error. */ // 最大内存检查,释放 if (server.maxmemory) { int retval = freeMemoryIfNeeded(); /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return C_ERR;
/* It was impossible to free enough memory, and the command the client * is trying to execute is denied during OOM conditions? Error. */ if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) { flagTransaction(c); addReply(c, shared.oomerr); return C_OK; } }
/* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ // 持久化异常检测,不接受写操作,不接受 ping if (((server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == C_ERR) || server.aof_last_write_status == C_ERR) && server.masterhost == NULL && (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand)) { flagTransaction(c); if (server.aof_last_write_status == C_OK) addReply(c, shared.bgsaveerr); else addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", strerror(server.aof_last_write_errno))); return C_OK; }
/* Don't accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ // slave 数量不够时,不接受写操作 if (server.masterhost == NULL && server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && c->cmd->flags & CMD_WRITE && server.repl_good_slaves_count < server.repl_min_slaves_to_write) { flagTransaction(c); addReply(c, shared.noreplicaserr); return C_OK; }
/* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ // 只读 slave 不接受写操作 if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->cmd->flags & CMD_WRITE) { addReply(c, shared.roslaveerr); return C_OK; }
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ // pub/sub 模式仅接受极少数命令 if (c->flags & CLIENT_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); return C_OK; }
/* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and * we are a slave with a broken link with master. */ // 复制连接超时,slave-serve-stale-data: on 时可以处理请求 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 && !(c->cmd->flags & CMD_STALE)) { flagTransaction(c); addReply(c, shared.masterdownerr); return C_OK; }
/* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */ // db 还在加载中,不接受任何请求 if (server.loading && !(c->cmd->flags & CMD_LOADING)) { addReply(c, shared.loadingerr); return C_OK; }
/* Lua script too slow? Only allow a limited number of commands. */ // lua 脚本执行缓慢时,仅接受少数命令 if (server.lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != replconfCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && !(c->cmd->proc == scriptCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) { flagTransaction(c); addReply(c, shared.slowscripterr); return C_OK; }
/* Exec the command */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { // 事务型命令,将命令入队 queueMultiCommand(c); addReply(c,shared.queued); } else { // 非事务性命令,直接处理请求 call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; // 如果有待处理事件,继续处理 if (listLength(server.ready_keys)) handleClientsBlockedOnLists(); } return C_OK;}

零、redis中的几个关键数据结构

1. redisServer

redisServer 是redis中最大的全局变量,负责保存客户端,配置信息,db 等等各种重要信息。各函数之间通信,也是隐藏的使用 redisServer 进行通信。它的定义是在 server.h 中,而 实例化则是在 server.c 中。

struct redisServer {    /* General */    pid_t pid;                  /* Main process pid. */    char *configfile;           /* Absolute config file path, or NULL */    char *executable;           /* Absolute executable file path. */    char **exec_argv;           /* Executable argv vector (copy). */    int hz;                     /* serverCron() calls frequency in hertz */    redisDb *db;    dict *commands;             /* Command table */    dict *orig_commands;        /* Command table before command renaming. */    aeEventLoop *el;    unsigned lruclock:LRU_BITS; /* Clock for LRU eviction */    int shutdown_asap;          /* SHUTDOWN needed ASAP */    int activerehashing;        /* Incremental rehash in serverCron() */    char *requirepass;          /* Pass for AUTH command, or NULL */    char *pidfile;              /* PID file path */    int arch_bits;              /* 32 or 64 depending on sizeof(long) */    int cronloops;              /* Number of times the cron function run */    char runid[CONFIG_RUN_ID_SIZE+1];  /* ID always different at every exec. */    int sentinel_mode;          /* True if this instance is a Sentinel. */    /* Networking */    int port;                   /* TCP listening port */    int tcp_backlog;            /* TCP listen() backlog */    char *bindaddr[CONFIG_BINDADDR_MAX]; /* Addresses we should bind to */    int bindaddr_count;         /* Number of addresses in server.bindaddr[] */    char *unixsocket;           /* UNIX socket path */    mode_t unixsocketperm;      /* UNIX socket permission */    int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */    int ipfd_count;             /* Used slots in ipfd[] */    int sofd;                   /* Unix socket file descriptor */    int cfd[CONFIG_BINDADDR_MAX];/* Cluster bus listening socket */    int cfd_count;              /* Used slots in cfd[] */    list *clients;              /* List of active clients */    list *clients_to_close;     /* Clients to close asynchronously */    list *clients_pending_write; /* There is to write or install handler. */    list *slaves, *monitors;    /* List of slaves and MONITORs */    client *current_client; /* Current client, only used on crash report */    int clients_paused;         /* True if clients are currently paused */    mstime_t clients_pause_end_time; /* Time when we undo clients_paused */    char neterr[ANET_ERR_LEN];   /* Error buffer for anet.c */    dict *migrate_cached_sockets;/* MIGRATE cached sockets */    uint64_t next_client_id;    /* Next client unique ID. Incremental. */    int protected_mode;         /* Don't accept external connections. */    /* RDB / AOF loading information */    int loading;                /* We are loading data from disk if true */    off_t loading_total_bytes;    off_t loading_loaded_bytes;    time_t loading_start_time;    off_t loading_process_events_interval_bytes;    /* Fast pointers to often looked up command */    struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,                        *rpopCommand, *sremCommand, *execCommand;    /* Fields used only for stats */    time_t stat_starttime;          /* Server start time */    long long stat_numcommands;     /* Number of processed commands */    long long stat_numconnections;  /* Number of connections received */    long long stat_expiredkeys;     /* Number of expired keys */    long long stat_evictedkeys;     /* Number of evicted keys (maxmemory) */    long long stat_keyspace_hits;   /* Number of successful lookups of keys */    long long stat_keyspace_misses; /* Number of failed lookups of keys */    size_t stat_peak_memory;        /* Max used memory record */    long long stat_fork_time;       /* Time needed to perform latest fork() */    double stat_fork_rate;          /* Fork rate in GB/sec. */    long long stat_rejected_conn;   /* Clients rejected because of maxclients */    long long stat_sync_full;       /* Number of full resyncs with slaves. */    long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */    long long stat_sync_partial_err;/* Number of unaccepted PSYNC requests. */    list *slowlog;                  /* SLOWLOG list of commands */    long long slowlog_entry_id;     /* SLOWLOG current entry ID */    long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */    unsigned long slowlog_max_len;     /* SLOWLOG max number of items logged */    size_t resident_set_size;       /* RSS sampled in serverCron(). */    long long stat_net_input_bytes; /* Bytes read from network. */    long long stat_net_output_bytes; /* Bytes written to network. */    /* The following two are used to track instantaneous metrics, like     * number of operations per second, network traffic. */    struct {        long long last_sample_time; /* Timestamp of last sample in ms */        long long last_sample_count;/* Count in last sample */        long long samples[STATS_METRIC_SAMPLES];        int idx;    } inst_metric[STATS_METRIC_COUNT];    /* Configuration */    int verbosity;                  /* Loglevel in redis.conf */    int maxidletime;                /* Client timeout in seconds */    int tcpkeepalive;               /* Set SO_KEEPALIVE if non-zero. */    int active_expire_enabled;      /* Can be disabled for testing purposes. */    size_t client_max_querybuf_len; /* Limit for client query buffer length */    int dbnum;                      /* Total number of configured DBs */    int supervised;                 /* 1 if supervised, 0 otherwise. */    int supervised_mode;            /* See SUPERVISED_* */    int daemonize;                  /* True if running as a daemon */    clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT];    /* AOF persistence */    int aof_state;                  /* AOF_(ON|OFF|WAIT_REWRITE) */    int aof_fsync;                  /* Kind of fsync() policy */    char *aof_filename;             /* Name of the AOF file */    int aof_no_fsync_on_rewrite;    /* Don't fsync if a rewrite is in prog. */    int aof_rewrite_perc;           /* Rewrite AOF if % growth is > M and... */    off_t aof_rewrite_min_size;     /* the AOF file is at least N bytes. */    off_t aof_rewrite_base_size;    /* AOF size on latest startup or rewrite. */    off_t aof_current_size;         /* AOF current size. */    int aof_rewrite_scheduled;      /* Rewrite once BGSAVE terminates. */    pid_t aof_child_pid;            /* PID if rewriting process */    list *aof_rewrite_buf_blocks;   /* Hold changes during an AOF rewrite. */    sds aof_buf;      /* AOF buffer, written before entering the event loop */    int aof_fd;       /* File descriptor of currently selected AOF file */    int aof_selected_db; /* Currently selected DB in AOF */    time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */    time_t aof_last_fsync;            /* UNIX time of last fsync() */    time_t aof_rewrite_time_last;   /* Time used by last AOF rewrite run. */    time_t aof_rewrite_time_start;  /* Current AOF rewrite start time. */    int aof_lastbgrewrite_status;   /* C_OK or C_ERR */    unsigned long aof_delayed_fsync;  /* delayed AOF fsync() counter */    int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */    int aof_last_write_status;      /* C_OK or C_ERR */    int aof_last_write_errno;       /* Valid if aof_last_write_status is ERR */    int aof_load_truncated;         /* Don't stop on unexpected AOF EOF. */    /* AOF pipes used to communicate between parent and child during rewrite. */    int aof_pipe_write_data_to_child;    int aof_pipe_read_data_from_parent;    int aof_pipe_write_ack_to_parent;    int aof_pipe_read_ack_from_child;    int aof_pipe_write_ack_to_child;    int aof_pipe_read_ack_from_parent;    int aof_stop_sending_diff;     /* If true stop sending accumulated diffs                                      to child process. */    sds aof_child_diff;             /* AOF diff accumulator child side. */    /* RDB persistence */    long long dirty;                /* Changes to DB from the last save */    long long dirty_before_bgsave;  /* Used to restore dirty on failed BGSAVE */    pid_t rdb_child_pid;            /* PID of RDB saving child */    struct saveparam *saveparams;   /* Save points array for RDB */    int saveparamslen;              /* Number of saving points */    char *rdb_filename;             /* Name of RDB file */    int rdb_compression;            /* Use compression in RDB? */    int rdb_checksum;               /* Use RDB checksum? */    time_t lastsave;                /* Unix time of last successful save */    time_t lastbgsave_try;          /* Unix time of last attempted bgsave */    time_t rdb_save_time_last;      /* Time used by last RDB save run. */    time_t rdb_save_time_start;     /* Current RDB save start time. */    int rdb_child_type;             /* Type of save by active child. */    int lastbgsave_status;          /* C_OK or C_ERR */    int stop_writes_on_bgsave_err;  /* Don't allow writes if can't BGSAVE */    int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */    int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */    /* Propagation of commands in AOF / replication */    redisOpArray also_propagate;    /* Additional command to propagate. */    /* Logging */    char *logfile;                  /* Path of log file */    int syslog_enabled;             /* Is syslog enabled? */    char *syslog_ident;             /* Syslog ident */    int syslog_facility;            /* Syslog facility */    /* Replication (master) */    int slaveseldb;                 /* Last SELECTed DB in replication output */    long long master_repl_offset;   /* Global replication offset */    int repl_ping_slave_period;     /* Master pings the slave every N seconds */    char *repl_backlog;             /* Replication backlog for partial syncs */    long long repl_backlog_size;    /* Backlog circular buffer size */    long long repl_backlog_histlen; /* Backlog actual data length */    long long repl_backlog_idx;     /* Backlog circular buffer current offset */    long long repl_backlog_off;     /* Replication offset of first byte in the                                       backlog buffer. */    time_t repl_backlog_time_limit; /* Time without slaves after the backlog                                       gets released. */    time_t repl_no_slaves_since;    /* We have no slaves since that time.                                       Only valid if server.slaves len is 0. */    int repl_min_slaves_to_write;   /* Min number of slaves to write. */    int repl_min_slaves_max_lag;    /* Max lag of <count> slaves to write. */    int repl_good_slaves_count;     /* Number of slaves with lag <= max_lag. */    int repl_diskless_sync;         /* Send RDB to slaves sockets directly. */    int repl_diskless_sync_delay;   /* Delay to start a diskless repl BGSAVE. */    /* Replication (slave) */    char *masterauth;               /* AUTH with this password with master */    char *masterhost;               /* Hostname of master */    int masterport;                 /* Port of master */    int repl_timeout;               /* Timeout after N seconds of master idle */    client *master;     /* Client that is master for this slave */    client *cached_master; /* Cached master to be reused for PSYNC. */    int repl_syncio_timeout; /* Timeout for synchronous I/O calls */    int repl_state;          /* Replication status if the instance is a slave */    off_t repl_transfer_size; /* Size of RDB to read from master during sync. */    off_t repl_transfer_read; /* Amount of RDB read from master during sync. */    off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */    int repl_transfer_s;     /* Slave -> Master SYNC socket */    int repl_transfer_fd;    /* Slave -> Master SYNC temp file descriptor */    char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */    time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */    int repl_serve_stale_data; /* Serve stale data when link is down? */    int repl_slave_ro;          /* Slave is read only? */    time_t repl_down_since; /* Unix time at which link with master went down */    int repl_disable_tcp_nodelay;   /* Disable TCP_NODELAY after SYNC? */    int slave_priority;             /* Reported in INFO and used by Sentinel. */    char repl_master_runid[CONFIG_RUN_ID_SIZE+1];  /* Master run id for PSYNC. */    long long repl_master_initial_offset;         /* Master PSYNC offset. */    int repl_slave_lazy_flush;          /* Lazy FLUSHALL before loading DB? */    /* Replication script cache. */    dict *repl_scriptcache_dict;        /* SHA1 all slaves are aware of. */    list *repl_scriptcache_fifo;        /* First in, first out LRU eviction. */    unsigned int repl_scriptcache_size; /* Max number of elements. */    /* Synchronous replication. */    list *clients_waiting_acks;         /* Clients waiting in WAIT command. */    int get_ack_from_slaves;            /* If true we send REPLCONF GETACK. */    /* Limits */    unsigned int maxclients;            /* Max number of simultaneous clients */    unsigned long long maxmemory;   /* Max number of memory bytes to use */    int maxmemory_policy;           /* Policy for key eviction */    int maxmemory_samples;          /* Pricision of random sampling */    /* Blocked clients */    unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */    list *unblocked_clients; /* list of clients to unblock before next loop */    list *ready_keys;        /* List of readyList structures for BLPOP & co */    /* Sort parameters - qsort_r() is only available under BSD so we     * have to take this state global, in order to pass it to sortCompare() */    int sort_desc;    int sort_alpha;    int sort_bypattern;    int sort_store;    /* Zip structure config, see redis.conf for more information  */    size_t hash_max_ziplist_entries;    size_t hash_max_ziplist_value;    size_t set_max_intset_entries;    size_t zset_max_ziplist_entries;    size_t zset_max_ziplist_value;    size_t hll_sparse_max_bytes;    /* List parameters */    int list_max_ziplist_size;    int list_compress_depth;    /* time cache */    time_t unixtime;    /* Unix time sampled every cron cycle. */    long long mstime;   /* Like 'unixtime' but with milliseconds resolution. */    /* Pubsub */    dict *pubsub_channels;  /* Map channels to list of subscribed clients */    list *pubsub_patterns;  /* A list of pubsub_patterns */    int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an                                   xor of NOTIFY_... flags. */    /* Cluster */    int cluster_enabled;      /* Is cluster enabled? */    mstime_t cluster_node_timeout; /* Cluster node timeout. */    char *cluster_configfile; /* Cluster auto-generated config file name. */    struct clusterState *cluster;  /* State of the cluster */    int cluster_migration_barrier; /* Cluster replicas migration barrier. */    int cluster_slave_validity_factor; /* Slave max data age for failover. */    int cluster_require_full_coverage; /* If true, put the cluster down if                                          there is at least an uncovered slot.*/    char *cluster_announce_ip;  /* IP address to announce on cluster bus. */    int cluster_announce_port;     /* base port to announce on cluster bus. */    int cluster_announce_bus_port; /* bus port to announce on cluster bus. */    /* Scripting */    lua_State *lua; /* The Lua interpreter. We use just one for all clients */    client *lua_client;   /* The "fake client" to query Redis from Lua */    client *lua_caller;   /* The client running EVAL right now, or NULL */    dict *lua_scripts;         /* A dictionary of SHA1 -> Lua scripts */    mstime_t lua_time_limit;  /* Script timeout in milliseconds */    mstime_t lua_time_start;  /* Start time of script, milliseconds time */    int lua_write_dirty;  /* True if a write command was called during the                             execution of the current script. */    int lua_random_dirty; /* True if a random command was called during the                             execution of the current script. */    int lua_replicate_commands; /* True if we are doing single commands repl. */    int lua_multi_emitted;/* True if we already proagated MULTI. */    int lua_repl;         /* Script replication flags for redis.set_repl(). */    int lua_timedout;     /* True if we reached the time limit for script                             execution. */    int lua_kill;         /* Kill the script if true. */    int lua_always_replicate_commands; /* Default replication type. */    /* Lazy free */    int lazyfree_lazy_eviction;    int lazyfree_lazy_expire;    int lazyfree_lazy_server_del;    /* Latency monitor */    long long latency_monitor_threshold;    dict *latency_events;    /* Assert & bug reporting */    char *assert_failed;    char *assert_file;    int assert_line;    int bug_report_start; /* True if bug report header was already logged. */    int watchdog_period;  /* Software watchdog period in ms. 0 = off */    /* System hardware info */    size_t system_memory_size;  /* Total memory in system as reported by OS */};    // server.c, 实例化 server    struct redisServer server; /* server global state */


2. redisObject

java有万事万物皆对象的说法,而redis中也可以用 一切皆 redisObject 来描述,可以说是最通用的redis数据结构。

typedef struct redisObject {    // 类型, 4个字节    unsigned type:4;    // 编码, 4个字节    unsigned encoding:4;    // lru 时间, 24字节    unsigned lru:LRU_BITS; /* lru time (relative to server.lruclock) */    // 引用计数, 当引用为0时,意味着无用了    int refcount;    // 数据指针,存储任意数据    void *ptr;} robj;

3. redisDb

redisDb 是redis作为数据库的主要存储模型,承载了所有的业务数据存储。单从这点来说,要实现一个数据库貌似很简单,但实际却是很难。

typedef struct redisDb {    // 一个数据库,就是一个kv字典,查找出 k 后,才能确定其数据类型 如 string, hash, list, set, zset    dict *dict;                 /* The keyspace for this DB */    // 过期数据队列    dict *expires;              /* Timeout of keys with a timeout set */    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */    dict *ready_keys;           /* Blocked keys that received a PUSH */    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */    struct evictionPoolEntry *eviction_pool;    /* Eviction pool of keys */    // 数据库号, 默认是 16, 如果想支持更多数据库号,改外部db数组大小,增大这个值就可以了   int id;                     /* Database ID */    long long avg_ttl;          /* Average TTL, just for stats */} redisDb;

4. client

每一个客户端连接,就是一个 client 实例,其中包含许多全局引用信息。比如解析完客户端请求之后,会把参数,数据库指针都放到 client 中。

typedef struct client {    uint64_t id;            /* Client incremental unique ID. */    // socket fd    int fd;                 /* Client socket. */    // 用户目前使用的db,所有的操作都是针对这个db的操作    redisDb *db;            /* Pointer to currently SELECTed DB. */    int dictid;             /* ID of the currently SELECTed DB. */    robj *name;             /* As set by CLIENT SETNAME. */    // 用户请求相关参数放置    sds querybuf;           /* Buffer we use to accumulate client queries. */    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */    int argc;               /* Num of arguments of current command. */    robj **argv;            /* Arguments of current command. */    // 当前命令和上一个命令指针    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */    int reqtype;            /* Request protocol type: PROTO_REQ_* */    int multibulklen;       /* Number of multi bulk arguments left to read. */    long bulklen;           /* Length of bulk argument in multi bulk request. */    list *reply;            /* List of reply objects to send to the client. */    unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */    size_t sentlen;         /* Amount of bytes already sent in the current                               buffer or object being sent. */    time_t ctime;           /* Client creation time. */    time_t lastinteraction; /* Time of the last interaction, used for timeout */    time_t obuf_soft_limit_reached_time;    int flags;              /* Client flags: CLIENT_* macros. */    // 是否已授权    int authenticated;      /* When requirepass is non-NULL. */    // 复制相关    int replstate;          /* Replication state if this is a slave. */    int repl_put_online_on_ack; /* Install slave write handler on ACK. */    int repldbfd;           /* Replication DB file descriptor. */    off_t repldboff;        /* Replication DB file offset. */    off_t repldbsize;       /* Replication DB file size. */    sds replpreamble;       /* Replication DB preamble. */    long long reploff;      /* Replication offset if this is our master. */    long long repl_ack_off; /* Replication ack offset, if this is a slave. */    long long repl_ack_time;/* Replication ack time, if this is a slave. */    long long psync_initial_offset; /* FULLRESYNC reply offset other slaves                                       copying this slave output buffer                                       should use. */    char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */    int slave_listening_port; /* As configured with: SLAVECONF listening-port */    int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */    multiState mstate;      /* MULTI/EXEC state */    int btype;              /* Type of blocking op if CLIENT_BLOCKED. */    blockingState bpop;     /* blocking state */    long long woff;         /* Last write global replication offset. */    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */    sds peerid;             /* Cached peer ID. */
/* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES];} client;

一、命令查找 dict

当一个请求发到redis服务器后,我们将其数据解析出来,自然先要明白命令是哪个,然后才知道如何处理它。我们看一下,redis是如何查找具体的处理命令的?

// server.c, 其实无非就是一个 map 形式的查找而已struct redisCommand *lookupCommand(sds name) {    // 直接基于 server.commands 查询, server.commands 是在启动的时候初始化好的    return dictFetchValue(server.commands, name);}// dict.c , 查找字典, 返回任意类型的地址void *dictFetchValue(dict *d, const void *key) {    dictEntry *he;    // 找到 dict 后,直接取其 value 即可,否则返回 NULL    he = dictFind(d,key);    return he ? dictGetVal(he) : NULL;}// dict.c, 查找字典 entry, 也就是 hashmap 那一套东西了dictEntry *dictFind(dict *d, const void *key){    dictEntry *he;    unsigned int h, idx, table;
if (d->ht[0].size == 0) return NULL; /* We don't have a table at all */ // 如果正在进行 rehash 缩扩容, 则进行一次增量式rehash数据迁移,这是redis独有的玩意 if (dictIsRehashing(d)) _dictRehashStep(d); h = dictHashKey(d, key); // 最大查找 ht的2个表,如果进行 rehash 的话,否则只遍历一次 for (table = 0; table <= 1; table++) { idx = h & d->ht[table].sizemask; he = d->ht[table].table[idx]; while(he) { // 找到对应元素,则返回,否则链表查询 if (dictCompareKeys(d, key, he->key)) return he; he = he->next; } // 如果没有进行rehash, 那么应当是一遍历就可以拿到结果或者拿不到 if (!dictIsRehashing(d)) return NULL; } return NULL;}// dict.h, 只有 rehashidx=-1 才表示在进行 rehash, rehashidx 表示正在进行的rehash 的元素数#define dictIsRehashing(d) ((d)->rehashidx != -1)// dict.c, 内部rehash/* This function performs just a step of rehashing, and only if there are * no safe iterators bound to our hash table. When we have iterators in the * middle of a rehashing we can't mess with the two hash tables otherwise * some element can be missed or duplicated. * * This function is called by common lookup or update operations in the * dictionary so that the hash table automatically migrates from H1 to H2 * while it is actively used. */static void _dictRehashStep(dict *d) { // 只会进行一次rehash 操作,不会用时很久 if (d->iterators == 0) dictRehash(d,1);}// dict.c/* Performs N steps of incremental rehashing. Returns 1 if there are still * keys to move from the old to the new hash table, otherwise 0 is returned. * * Note that a rehashing step consists in moving a bucket (that may have more * than one key as we use chaining) from the old to the new hash table, however * since part of the hash table may be composed of empty spaces, it is not * guaranteed that this function will rehash even a single bucket, since it * will visit at max N*10 empty buckets in total, otherwise the amount of * work it does would be unbound and the function may block for a long time. */int dictRehash(dict *d, int n) { // 最大只会访问 n*10 个元素,避免长时间hash导致暂停 int empty_visits = n*10; /* Max number of empty buckets to visit. */ if (!dictIsRehashing(d)) return 0; // rehash 并不是一次性完成的,而是遇到 used=0, 则退出了 while(n-- && d->ht[0].used != 0) { dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more * elements because ht[0].used != 0 */ assert(d->ht[0].size > (unsigned long)d->rehashidx); // 从上次 rehash 的地方开始进行, 如果中间的值都是空的,则本次不再进行深度rehash了 while(d->ht[0].table[d->rehashidx] == NULL) { d->rehashidx++; if (--empty_visits == 0) return 1; } de = d->ht[0].table[d->rehashidx]; /* Move all the keys in this bucket from the old to the new hash HT */ // 针对找到的需要 rehash 的元素,做转移 while(de) { unsigned int h;
nextde = de->next; /* Get the index in the new hash table */ h = dictHashKey(d, de->key) & d->ht[1].sizemask; // 解决冲突问题,与原有slot元素链接 de->next = d->ht[1].table[h]; d->ht[1].table[h] = de; d->ht[0].used--; d->ht[1].used++; de = nextde; } d->ht[0].table[d->rehashidx] = NULL; // rehashidx++, 表示正在进行的rehash d->rehashidx++; }
/* Check if we already rehashed the whole table... */ if (d->ht[0].used == 0) { zfree(d->ht[0].table); // rehash 完所有元素后,直接交换 ht 0/1 d->ht[0] = d->ht[1]; _dictReset(&d->ht[1]); d->rehashidx = -1; return 0; }
/* More to rehash... */ // 返回1代表还需要进行rehash return 1;}


综上,可以看出一个 命令的查找过程,其实就是一个hash字典的查找过程。做的比较特别的优化是,进行rehash时,只做部分rehash,将停顿时间最大可能减小。使用 两个hash表进行互相替换,来保证数据的完整性。

hash表是个很有用的数据结构,上面是对对于命令的查找使用,但是对于后面的 kv 的查找,同样可以使用 dict 这种结构,所以后续对其他命令的解析时,只需注意其特有的处理方式即可。

二、溯源: 命令的添加

要想实现如上的查找,我们有必要了解下其是在何时添加的。下面,我们来看看,这些命令是如何写入到 server.commands 中的。

// server.c, 在进行配置初始化 initServerConfig() 时,添加命令集到 server.commands/* Populates the Redis Command Table starting from the hard coded list * we have on top of redis.c file. */void populateCommandTable(void) {    int j;    int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
for (j = 0; j < numcommands; j++) { // 通过在文件开头定义的一个数组,将命令集加入 struct redisCommand *c = redisCommandTable+j; char *f = c->sflags; int retval1, retval2; // 转换 flags 到 command 中,用一个位表示一个 flag 标识 while(*f != '\0') { switch(*f) { case 'w': c->flags |= CMD_WRITE; break; case 'r': c->flags |= CMD_READONLY; break; case 'm': c->flags |= CMD_DENYOOM; break; case 'a': c->flags |= CMD_ADMIN; break; case 'p': c->flags |= CMD_PUBSUB; break; case 's': c->flags |= CMD_NOSCRIPT; break; case 'R': c->flags |= CMD_RANDOM; break; case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break; case 'l': c->flags |= CMD_LOADING; break; case 't': c->flags |= CMD_STALE; break; case 'M': c->flags |= CMD_SKIP_MONITOR; break; case 'k': c->flags |= CMD_ASKING; break; case 'F': c->flags |= CMD_FAST; break; default: serverPanic("Unsupported command flag"); break; } f++; } // 添加 command 到 server.commands 中,不外乎就是 hash, rehash... retval1 = dictAdd(server.commands, sdsnew(c->name), c); /* Populate an additional dictionary that will be unaffected * by rename-command statements in redis.conf. */ retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); serverAssert(retval1 == DICT_OK && retval2 == DICT_OK); }}// dict.c, 字典数据的添加,其实已经超过前面理解的范围/* Add an element to the target hash table */int dictAdd(dict *d, void *key, void *val){ dictEntry *entry = dictAddRaw(d,key);
if (!entry) return DICT_ERR; // 设置value到 entry 上 dictSetVal(d, entry, val); return DICT_OK;}// 不过既然都到这里了,我们索性把dict的添加过程也给了解了吧,省得后面再花时间// dict.c, 将 key 添加到 dict 中,并返回 dictEntry 添加的实例/* Low level add. This function adds the entry but instead of setting * a value returns the dictEntry structure to the user, that will make * sure to fill the value field as he wishes. * * This function is also directly exposed to the user API to be called * mainly in order to store non-pointers inside the hash value, example: * * entry = dictAddRaw(dict,mykey); * if (entry != NULL) dictSetSignedIntegerVal(entry,1000); * * Return values: * * If key already exists NULL is returned. * If key was added, the hash entry is returned to be manipulated by the caller. */dictEntry *dictAddRaw(dict *d, void *key){ int index; dictEntry *entry; dictht *ht; // 和 dictFetchValue 一样,先检查 rehash 情况 if (dictIsRehashing(d)) _dictRehashStep(d);
/* Get the index of the new element, or -1 if * the element already exists. */ // 如果元素已经存在,则返回 -1,否则走后续添加流程 // 其含义是 元素只允许新增,不允许修改 if ((index = _dictKeyIndex(d, key)) == -1) return NULL;
/* Allocate the memory and store the new entry. * Insert the element in top, with the assumption that in a database * system it is more likely that recently added entries are accessed * more frequently. */ ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0]; // 以链表形式保存数据 entry = zmalloc(sizeof(*entry)); entry->next = ht->table[index]; ht->table[index] = entry; ht->used++;
/* Set the hash entry fields. */ // 将新组织的 entry 设置值到key上, 小技巧 do {} while(0); 的应用 dictSetKey(d, entry, key); return entry;}// dict.c, 获取元素所在的数组下标/* Returns the index of a free slot that can be populated with * a hash entry for the given 'key'. * If the key already exists, -1 is returned. * * Note that if we are in the process of rehashing the hash table, the * index is always returned in the context of the second (new) hash table. */static int _dictKeyIndex(dict *d, const void *key){ unsigned int h, idx, table; dictEntry *he;
/* Expand the hash table if needed */ // 做扩容操作 if (_dictExpandIfNeeded(d) == DICT_ERR) return -1; /* Compute the key hash value */ h = dictHashKey(d, key); for (table = 0; table <= 1; table++) { idx = h & d->ht[table].sizemask; /* Search if this slot does not already contain the given key */ he = d->ht[table].table[idx]; while(he) { if (dictCompareKeys(d, key, he->key)) return -1; he = he->next; } if (!dictIsRehashing(d)) break; } return idx;}
// 扩容过程// dict.c, /* Expand the hash table if needed */static int _dictExpandIfNeeded(dict *d){ /* Incremental rehashing already in progress. Return. */ if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */ // 默认 DICT_HT_INITIAL_SIZE=4 if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash * table (global setting) or we should avoid it but the ratio between * elements/buckets is over the "safe" threshold, we resize doubling * the number of buckets. */ // 扩容,直接 *2 后得到 if (d->ht[0].used >= d->ht[0].size && (dict_can_resize || d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) { return dictExpand(d, d->ht[0].used*2); } return DICT_OK;}// dict.c, 扩容,其实就是创建一个空的 dictht hash表,备用做一步步rehash/* Expand or create the hash table */int dictExpand(dict *d, unsigned long size){ dictht n; /* the new hash table */ // 大于 size的首个 2n次方作为 realsize unsigned long realsize = _dictNextPower(size);
/* the size is invalid if it is smaller than the number of * elements already inside the hash table */ if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR;
/* Rehashing to the same table size is not useful. */ if (realsize == d->ht[0].size) return DICT_ERR;
/* Allocate the new hash table and initialize all pointers to NULL */ n.size = realsize; n.sizemask = realsize-1; n.table = zcalloc(realsize*sizeof(dictEntry*)); n.used = 0;
/* Is this the first initialization? If so it's not really a rehashing * we just set the first hash table so that it can accept keys. */ if (d->ht[0].table == NULL) { d->ht[0] = n; return DICT_OK; }
/* Prepare a second hash table for incremental rehashing */ // 将新开辟的 hash 表赋给 ht[1], 并将 rehashidx=0, 表示需要进行rehash // 然后,后续任务就依次进入rehash阶段了 d->ht[1] = n; d->rehashidx = 0; return DICT_OK;}// dict.h, setVal, setKey, do while 0 防止宏编译报错#define dictSetKey(d, entry, _key_) do { \ if ((d)->type->keyDup) \ entry->key = (d)->type->keyDup((d)->privdata, _key_); \ else \ entry->key = (_key_); \} while(0)#define dictSetVal(d, entry, _val_) do { \ if ((d)->type->valDup) \ entry->v.val = (d)->type->valDup((d)->privdata, _val_); \ else \ entry->v.val = (_val_); \} while(0)

以上,自然就是一个 hash 表插入数据过程,然后 server.commands 就有数据了,请求进来自然就可以处理了。

三、命令集的定义

在server.c的头部,就有一个数组,专门用于定义各个命令的处理方法,并最终被初始化到 server.commands 中。

/* Our command table. * * Every entry is composed of the following fields: * * name: a string representing the command name. * function: pointer to the C function implementing the command. * arity: number of arguments, it is possible to use -N to say >= N * sflags: command flags as string. See below for a table of flags. * flags: flags as bitmask. Computed by Redis using the 'sflags' field. * get_keys_proc: an optional function to get key arguments from a command. *                This is only used when the following three fields are not *                enough to specify what arguments are keys. * first_key_index: first argument that is a key * last_key_index: last argument that is a key * key_step: step to get all the keys from first to last argument. For instance *           in MSET the step is two since arguments are key,val,key,val,... * microseconds: microseconds of total execution time for this command. * calls: total number of calls of this command. * * The flags, microseconds and calls fields are computed by Redis and should * always be set to zero. * * Command flags are expressed using strings where every character represents * a flag. Later the populateCommandTable() function will take care of * populating the real 'flags' field using this characters. * * This is the meaning of the flags: * * w: write command (may modify the key space). * r: read command  (will never modify the key space). * m: may increase memory usage once called. Don't allow if out of memory. * a: admin command, like SAVE or SHUTDOWN. * p: Pub/Sub related command. * f: force replication of this command, regardless of server.dirty. * s: command not allowed in scripts. * R: random command. Command is not deterministic, that is, the same command *    with the same arguments, with the same key space, may have different *    results. For instance SPOP and RANDOMKEY are two random commands. * S: Sort command output array if called from script, so that the output *    is deterministic. * l: Allow command while loading the database. * t: Allow command while a slave has stale data but is not allowed to *    server this data. Normally no command is accepted in this condition *    but just a few. * M: Do not automatically propagate the command on MONITOR. * k: Perform an implicit ASKING for this command, so the command will be *    accepted in cluster mode if the slot is marked as 'importing'. * F: Fast command: O(1) or O(log(N)) command that should never delay *    its execution as long as the kernel scheduler is giving us time. *    Note that commands that may trigger a DEL as a side effect (like SET) *    are not fast commands. */struct redisCommand redisCommandTable[] = {    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},    {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},    {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},    {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},    {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},    {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},    {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},    {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0},    {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},    {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},    {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},    {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},    {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},    {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},    {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},    {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},    {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},    {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},    {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},    {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},    {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},    {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},    {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},    {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},    {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},    {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},    {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},    {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},    {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},    {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},    {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},    {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},    {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},    {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},    {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},    {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},    {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},    {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},    {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},    {"spop",spopCommand,-2,"wRsF",0,NULL,1,1,1,0,0},    {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},    {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},    {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},    {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},    {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},    {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},    {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},    {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},    {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0},    {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},    {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},    {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0},    {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0},    {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0},    {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0},    {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},    {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},    {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0},    {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},    {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},    {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},    {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},    {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0},    {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0},    {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},    {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0},    {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0},    {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},    {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},    {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},    {"hset",hsetCommand,4,"wmF",0,NULL,1,1,1,0,0},    {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},    {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},    {"hmset",hmsetCommand,-4,"wm",0,NULL,1,1,1,0,0},    {"hmget",hmgetCommand,-3,"r",0,NULL,1,1,1,0,0},    {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},    {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0},    {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0},    {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0},    {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0},    {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},    {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},    {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0},    {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0},    {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0},    {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},    {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},    {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0},    {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0},    {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0},    {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},    {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},    {"select",selectCommand,2,"rlF",0,NULL,0,0,0,0,0},    {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0},    {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0},    {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0},    {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0},    {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0},    {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0},    {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0},    {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},    {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0},    {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0},    {"auth",authCommand,2,"rsltF",0,NULL,0,0,0,0,0},    {"ping",pingCommand,-1,"rtF",0,NULL,0,0,0,0,0},    {"echo",echoCommand,2,"rF",0,NULL,0,0,0,0,0},    {"save",saveCommand,1,"ars",0,NULL,0,0,0,0,0},    {"bgsave",bgsaveCommand,1,"ar",0,NULL,0,0,0,0,0},    {"bgrewriteaof",bgrewriteaofCommand,1,"ar",0,NULL,0,0,0,0,0},    {"shutdown",shutdownCommand,-1,"arlt",0,NULL,0,0,0,0,0},    {"lastsave",lastsaveCommand,1,"rRF",0,NULL,0,0,0,0,0},    {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0},    {"multi",multiCommand,1,"rsF",0,NULL,0,0,0,0,0},    {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},    {"discard",discardCommand,1,"rsF",0,NULL,0,0,0,0,0},    {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},    {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},    {"replconf",replconfCommand,-1,"arslt",0,NULL,0,0,0,0,0},    {"flushdb",flushdbCommand,-1,"w",0,NULL,0,0,0,0,0},    {"flushall",flushallCommand,-1,"w",0,NULL,0,0,0,0,0},    {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0},    {"info",infoCommand,-1,"rlt",0,NULL,0,0,0,0,0},    {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},    {"ttl",ttlCommand,2,"rF",0,NULL,1,1,1,0,0},    {"pttl",pttlCommand,2,"rF",0,NULL,1,1,1,0,0},    {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0},    {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},    {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0},    {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},    {"config",configCommand,-2,"art",0,NULL,0,0,0,0,0},    {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},    {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},    {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},    {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},    {"publish",publishCommand,3,"pltrF",0,NULL,0,0,0,0,0},    {"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0},    {"watch",watchCommand,-2,"rsF",0,NULL,1,-1,1,0,0},    {"unwatch",unwatchCommand,1,"rsF",0,NULL,0,0,0,0,0},    {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},    {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0},    {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0},    {"migrate",migrateCommand,-6,"w",0,migrateGetKeys,0,0,0,0,0},    {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0},    {"readonly",readonlyCommand,1,"rF",0,NULL,0,0,0,0,0},    {"readwrite",readwriteCommand,1,"rF",0,NULL,0,0,0,0,0},    {"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0},    {"object",objectCommand,3,"r",0,NULL,2,2,2,0,0},    {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},    {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},    {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},    {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},    {"script",scriptCommand,-2,"rs",0,NULL,0,0,0,0,0},    {"time",timeCommand,1,"rRF",0,NULL,0,0,0,0,0},    {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},    {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0},    {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0},    {"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0},    {"command",commandCommand,0,"rlt",0,NULL,0,0,0,0,0},    {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},    {"georadius",georadiusCommand,-6,"r",0,NULL,1,1,1,0,0},    {"georadiusbymember",georadiusByMemberCommand,-5,"r",0,NULL,1,1,1,0,0},    {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},    {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},    {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},    {"pfselftest",pfselftestCommand,1,"r",0,NULL,0,0,0,0,0},    {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},    {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},    {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},    {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},    {"latency",latencyCommand,-2,"arslt",0,NULL,0,0,0,0,0}};

可以说,核心功能都是在这里定义的哟(小手册),如果想自己添加功能,也是从这里开始,然后去实现它。其中,lua的调用则直接使用 eval进行即可。

四、执行命令的模板方法

调用真正命令之前,之后,会各种判断,才可以进行命令的调用。这也是数据库的与简单函数调用的差别之一。

// server.c, call 框架/* Call() is the core of Redis execution of a command. * * The following flags can be passed: * CMD_CALL_NONE        No flags. * CMD_CALL_SLOWLOG     Check command speed and log in the slow log if needed. * CMD_CALL_STATS       Populate command stats. * CMD_CALL_PROPAGATE_AOF   Append command to AOF if it modified the dataset *                          or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE_REPL  Send command to salves if it modified the dataset *                          or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE   Alias for PROPAGATE_AOF|PROPAGATE_REPL. * CMD_CALL_FULL        Alias for SLOWLOG|STATS|PROPAGATE. * * The exact propagation behavior depends on the client flags. * Specifically: * * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set *    and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set *    in the call flags, then the command is propagated even if the *    dataset was not affected by the command. * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP *    are set, the propagation into AOF or to slaves is not performed even *    if the command modified the dataset. * * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or * slaves propagation will never occur. * * Client flags are modified by the implementation of a given command * using the following API: * * forceCommandPropagation(client *c, int flags); * preventCommandPropagation(client *c); * preventCommandAOF(client *c); * preventCommandReplication(client *c); * */void call(client *c, int flags) {    long long dirty, start, duration;    int client_old_flags = c->flags;
/* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ // monitors 模式下,先把命令传播给需要的客户端 if (listLength(server.monitors) && !server.loading && !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); }
/* Initialization: clear the flags that must be set by the command on * demand, and initialize the array for additional commands propagation. */ c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); // 重置 server.also_propagate redisOpArrayInit(&server.also_propagate);
/* Call the command. */ // 调用具体命令操作,该命令会自已负责客户端的响应,外部仅记录时间 dirty = server.dirty; start = ustime(); c->cmd->proc(c); duration = ustime()-start; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0;
/* When EVAL is called loading the AOF we don't want commands called * from Lua to go into the slowlog or to populate statistics. */ if (server.loading && c->flags & CLIENT_LUA) flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
/* If the caller is Lua, we want to force the EVAL caller to propagate * the script if the command flag or client flag are forcing the * propagation. */ if (c->flags & CLIENT_LUA && server.lua_caller) { if (c->flags & CLIENT_FORCE_REPL) server.lua_caller->flags |= CLIENT_FORCE_REPL; if (c->flags & CLIENT_FORCE_AOF) server.lua_caller->flags |= CLIENT_FORCE_AOF; }
/* Log the command into the Slow log if needed, and populate the * per-command statistics that we show in INFO commandstats. */ // 慢查询日志记录 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { char *latency_event = (c->cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); slowlogPushEntryIfNeeded(c->argv,c->argc,duration); } if (flags & CMD_CALL_STATS) { c->cmd->microseconds += duration; c->cmd->calls++; }
/* Propagate the command into the AOF and replication link */ if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { int propagate_flags = PROPAGATE_NONE;
/* Check if the command operated changes in the data set. If so * set for replication / AOF propagation. */ if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
/* If the client forced AOF / replication of the command, set * the flags regardless of the command effects on the data set. */ if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
/* However prevent AOF / replication propagation if the command * implementatino called preventCommandPropagation() or similar, * or if we don't have the call() flags to do so. */ if (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL)) propagate_flags &= ~PROPAGATE_REPL; if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF;
/* Call propagate() only if at least one of AOF / replication * propagation is needed. */ // 只要不是 PROPAGATE_NONE, 都会进行命令传播 if (propagate_flags != PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); }
/* Restore the old replication flags, since call() can be executed * recursively. */ c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); c->flags |= client_old_flags & (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
/* Handle the alsoPropagate() API to handle commands that want to propagate * multiple separated commands. Note that alsoPropagate() is not affected * by CLIENT_PREVENT_PROP flag. */ if (server.also_propagate.numops) { int j; redisOp *rop; // 命令传播到 server.also_propagate 中 if (flags & CMD_CALL_PROPAGATE) { for (j = 0; j < server.also_propagate.numops; j++) { rop = &server.also_propagate.ops[j]; int target = rop->target; /* Whatever the command wish is, we honor the call() flags. */ if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF; if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL; if (target) propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); } } redisOpArrayFree(&server.also_propagate); } server.stat_numcommands++;}


四、响应客户端

响应客户端可以使用addReply(), 当然还有其他简化版本, 道理一致。

// networking.c, 向 client c 中响应数据 obj    /* ----------------------------------------------------------------------------- * Higher level functions to queue data on the client output buffer. * The following functions are the ones that commands implementations will call. * -------------------------------------------------------------------------- */
void addReply(client *c, robj *obj) { // 客户端连接不可写时,直接返回本次写操作 if (prepareClientToWrite(c) != C_OK) return;
/* This is an important place where we can avoid copy-on-write * when there is a saving child running, avoiding touching the * refcount field of the object if it's not needed. * * If the encoding is RAW and there is room in the static buffer * we'll be able to send the object to the client without * messing with its page. */ // 检测编码是否是 OBJ_ENCODING_RAW/OBJ_ENCODING_EMBSTR if (sdsEncodedObject(obj)) { // 将数据添加到 c->buf 中 // 添加失败则将 obj 直接添加到 c->reply 队列中 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyObjectToList(c,obj); } else if (obj->encoding == OBJ_ENCODING_INT) { /* Optimization: if there is room in the static buffer for 32 bytes * (more than the max chars a 64 bit integer can take as string) we * avoid decoding the object and go for the lower level approach. */ if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) { char buf[32]; int len;
len = ll2string(buf,sizeof(buf),(long)obj->ptr); if (_addReplyToBuffer(c,buf,len) == C_OK) return; /* else... continue with the normal code path, but should never * happen actually since we verified there is room. */ } obj = getDecodedObject(obj); if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyObjectToList(c,obj); decrRefCount(obj); } else { serverPanic("Wrong obj->encoding in addReply()"); }}// networking.c, int _addReplyToBuffer(client *c, const char *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos;
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
/* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ if (listLength(c->reply) > 0) return C_ERR;
/* Check that the buffer has enough space available for this string. */ if (len > available) return C_ERR;
memcpy(c->buf+c->bufpos,s,len); c->bufpos+=len; return C_OK;}

将数据写入到 c->buf 后,又是谁向客户端写出了结果呢?

要么是有一个后台线程一直写,要么是在下一次循环的时候再主动写,你觉得呢?( 请参考: server.clients_pending_write )




往期精彩推荐



腾讯、阿里、滴滴后台面试题汇总总结 — (含答案)

面试:史上最全多线程面试题 !

最新阿里内推Java后端面试题

JVM难学?那是因为你没认真看完这篇文章


END


关注作者微信公众号 —《JAVA烂猪皮》


了解更多java后端架构知识以及最新面试宝典


你点的每个好看,我都认真当成了


看完本文记得给作者点赞+在看哦~~~大家的支持,是作者源源不断出文的动力

作者:等你归去来

出处:https://www.cnblogs.com/yougewe/p/12219098.html

good-icon 0
favorite-icon 0
收藏
回复数量: 0
    暂无评论~~
    Ctrl+Enter