一、概述

PikiwiDB(pika)3.5.X版本发布了分布式集群方案,基于codis+PikiwiDB(pika)-server实现,已经在360内部搜索团队线上使用,稳定性和性能都非常优秀。本文主要介绍分布式集群的架构和部署方案。

二、分布式架构解析

pika分布式集群基于codis架构进行改造设计,架构图如下所示:

pika分布式集群主要 由以下这些组件组成:

Pika Server:PikiwiDB(pika)3.5.X, PikiwiDB(pika)4.0.x版本,与单机版模式及架构保持一致。

Codis Proxy:客户端直接连接 codis-proxy,codis-proxy 手动用户请求后,会通过计算 hash 值将请求转发到指定的 Pika Server 去执行。

对于同一个业务集群而言,可以同时部署多个 codis-proxy 实例;

不同 codis-proxy 之间由 codis-dashboard 保证状态同步。

Codis Dashboard:集群管理工具,支持 codis-proxy、pika-server 的添加、删除,以及据迁移等操作。在集群状态发生改变时,codis-dashboard 维护集群下所有 codis-proxy 的状态的一致性。

对于同一个业务集群而言,同一个时刻 codis-dashboard 只能有 0个或者1个;

所有对集群的修改都必须通过 codis-dashboard 完成。

Codis FE:集群管理界面。

多个集群实例共享可以共享同一个前端展示页面;

通过配置文件管理后端 codis-dashboard 列表,配置文件可自动更新。

Codis Etcd:

codis-etcd主要用于记录元数据信息,为保证高可用,建议etcd部署为3节点。

三、Sentinel主从切换

为了方便运维管理,本次版本支持sentinel自动主从切换,当集群主挂的时候会备升主,提供了主节点故障自愈的能力。

四、部署方式

机器配置可以根据自身情况选择:

搜索部门节点分配如下:

组件

节点个数(可以根据需求调整)

实例规格(可以根据需求调整)

pika server

12主12从

每个实例:20核,32G内存,200G磁盘

Codis FE

1个节点

1个节点 2核4G

Codis Dashboard

1个节点

1个节点 2核4G

Codis Etcd

3个节点

3个节点 2核4G

Codis Proxy

4个节点

4个节点 2核4G

集群创建部署顺序:

  1. 启动 PikiwiDB(pika)
  2. 建立 PikiwiDB(pika) 主从关系
  3. 启动 codis etcd
  4. 启动 codis dashboard
  5. 启动 codis proxy
  6. 启动 codis fe
  7. 绑定 PikiwiDB(pika)+codis

绑定 codis+PikiwiDB(pika) 需要在 dashboard 中进行操作,操作顺序如下:

1.添加 group (注意:PikiwiDB(pika) 1主一从为一个 group )

2.添加 PikiwiDB(pika)server

3.分配 slots :

至此,PikiwiDB(pika) 和 codis 已经绑定完毕,我们可以用 proxy 的 vip vport 进行访问。

五、快速启动脚本

PikiwiDB(pika)-codis 源码(路径:) 中 admin 文件夹提供了一系列脚本以便快速启动、停止各个组件,提高运维效率。

启动codis-dashboard

使用 codis-dashboard-admin.sh 脚本启动 dashboard,并查看 dashboard 日志确认启动是否有异常。

./admin/codis-dashboard-admin.sh start
 tail -100 ./log/codis-dashboard.log.2017-04-08
2017/04/08 15:16:57 fsclient.go:197: [INFO] fsclient - create /codis3/codis-demo/topom OK
2017/04/08 15:16:57 main.go:140: [WARN] [0xc42025f7a0] dashboard is working ...
2017/04/08 15:16:57 topom.go:424: [WARN] admin start service on [::]:18080

快速启动集群元数据存储使用 filesystem,默认数据路径保存在 /tmp/codis,若启动失败,请检查当前用户是否对该路径拥有读写权限。

启动codis-proxy

使用 codis-proxy-admin.sh 脚本启动 codis-proxy,并查看 proxy 日志确认启动是否有异常。

./admin/codis-proxy-admin.sh start
tail -100 ./log/codis-proxy.log.2017-04-08
2017/04/08 15:39:37 proxy.go:293: [WARN] [0xc4200df760] set sentinels = []
2017/04/08 15:39:37 main.go:320: [WARN] rpc online proxy seems OK
2017/04/08 15:39:38 main.go:210: [WARN] [0xc4200df760] proxy is working ...

启动codis-server

使用 codis-server-admin.sh 脚本启动 codis-server,并查看 redis 日志确认启动是否有异常。

./admin/codis-server-admin.sh start
tail -100 /tmp/redis_6379.log
5706:M 08 Apr 16:04:11.748 * DB loaded from disk: 0.000 seconds
5706:M 08 Apr 16:04:11.748 * The server is now ready to accept connections on port 6379

redis.conf 配置中 pidfile、logfile 默认保存在 /tmp 目录,若启动失败,请检查当前用户是否有该目录的读写权限。

启动codis-fe

使用 codis-fe-admin.sh 脚本启动 codis-fe,并查看 fe 日志确认启动是否有异常。

六、Codis部分代码详细解析

1. Pika Server

codis架构中,pika server作为数据节点存储数据,处理codis proxy的读写请求,并根据slot迁移的命令进行数据迁移。

1.1 数据存储

在codis架构中,所有的数据按照分片进行存储。因此每个pika server只持有部分分片的数据。当前的pika实现中,相同类型的所有数据是写在同一个RocksDB实例中,引擎中数据本身并不携带slot信息。

codis在进行数据迁移时,支持key粒度的迁移和slot粒度的迁移。key粒度的迁移比较好理解,slot粒度的迁移,就需要在存储引擎中找到对应slot的存量数据。pika是通过为每个slot创建一个set类型的key,以此来记录每个slot中存储的key来实现的。由于该操作会引入额外的更新操作,对性能会有影响。因此pika中设置了slotmigrate_参数来表示是否要支持slot粒度迁移。如果不支持,就不需要更新slot set。

pika节点在收到proxy节点发来的请求之后,如果开启了slot_migrate,除了将用户数据写入RocksDB以外,还需要计算得出数据的slotID,将key和type追加到以对应slotID为key的set集合中。该步骤在pika_command层进行处理。以mset为例,写db成功之后会调用AddSlotKey将key记录到对应set中。

void MsetCmd::Do(std::shared_ptr<Slot> slot) {
  storage::Status s = slot->db()->MSet(kvs_);
  if (s.ok()) {
    res_.SetRes(CmdRes::kOk);
    std::vector<storage::KeyValue>::const_iterator it;
    for (it = kvs_.begin(); it != kvs_.end(); it++) {
      AddSlotKey("k", it->key, slot);
    }
  } else {
    res_.SetRes(CmdRes::kErrOther, s.ToString());
  }
}
void AddSlotKey(const std::string& type, const std::string& key, const std::shared_ptr<Slot>& slot) {
  if (g_pika_conf->slotmigrate() != true) {
    return;
  }
  int slotID = GetSlotsID(key, &crc, &hastag);
  std::string slot_key = GetSlotKey(slotID);
  std::vector<std::string> members;
  members.emplace_back(type + key);
  s = slot->db()->SAdd(slot_key, members, &res);
  if (!s.ok()) {
    LOG(ERROR) << "sadd key[" << key << "] to slotKey[" << slot_key << "] failed, error: " << s.ToString();
    return;
  }
  // codis hash tag模式
  if (hastag) {
    std::string tag_key = GetSlotsTagKey(crc);
    s = slot->db()->SAdd(tag_key, members, &res);
    if (!s.ok()) {
      LOG(ERROR) << "sadd key[" << key << "] to tagKey[" << tag_key << "] failed, error: " << s.ToString();
      return;
    }
  }
}

1.2 Slot迁移

codis的架构中,slot迁移包括key粒度的迁移(通过命令slotsmgrtone, slotsmgrttagone)每次将一个key迁移到目的端。还有一种是分片粒度迁移。主要涉及的类包括PikaMigrate, PikaMigrateThread和PikaParseSendThread。首先介绍PikaMigrateThread类。

class PikaMigrateThread : public net::Thread {
 public:
  bool ReqMigrateBatch(const std::string &ip, int64_t port, int64_t time_out, int64_t slot_num, int64_t keys_num,
                       const std::shared_ptr<Slot>& slot);
  int ReqMigrateOne(const std::string &key, const std::shared_ptr<Slot>& slot);
 private:
  void NotifyRequestMigrate(void);
  void ReadSlotKeys(const std::string &slotKey, int64_t need_read_num, int64_t &real_read_num, int32_t *finish);
  bool CreateParseSendThreads(int32_t dispatch_num);
  void DestroyParseSendThreads(void);
  void *ThreadMain() override;
  int32_t workers_num_ = 0;
  std::vector<PikaParseSendThread *> workers_;
  std::deque<std::pair<const char, std::string>> mgrtone_queue_;
  std::deque<std::pair<const char, std::string>> mgrtkeys_queue_;
  std::map<std::pair<const char, std::string>, std::string> mgrtkeys_map_;

PikaMigrateThread是Pika的后台迁移线程,负责收集待发送的key,创建迁移线程,管理迁移流程。PikaMigrateThread创建了若干个实际的迁移线程(即workers_),负责在后台进行数据迁移。

待迁移的数据有两个来源,一个接收到是codis发来的异步迁移单个key的命令,将具体的key追加到mgrtone_queue_队列中,再由PikaMigrateThread线程消费mgrtone_queue_将数据追加到mgrtkeys_queue_中。另一个是遍历slotkey对应的set,将遍历到的key追加到mgrtkeys_queue_中。wokers_线程从mgrtkeys_queue_中消费出待迁移的key并进行发送。

PikaParseSendThread是迁移的worker线程,在一个while循环中消费mgrtkeys_queue_,将key发送到对应的接收端。MigrateOneKey会根据key类型的不同调用不同的处理函数构造网络请求包,比如string类型直接从引擎中读取value并发送,hash类型则需要将整个pkey下的所有field遍历出来并发送。当前的migrateOneKey的实现中查询两次RocksDB并发送两次请求包,一次是key-value本身,第二次是ttl。

void *PikaParseSendThread::ThreadMain() {
  while (!should_exit_) {
    //消费mgrtkeys_queue_
    std::deque<std::pair<const char, std::string>> send_keys;
    {
      migrate_thread_->IncWorkingThreadNum();
      for (int32_t i = 0; i < mgrtkeys_num_; ++i) {
        if (migrate_thread_->mgrtkeys_queue_.empty()) {
          break;
        }
        send_keys.emplace_back(migrate_thread_->mgrtkeys_queue_.front());
        migrate_thread_->mgrtkeys_queue_.pop_front();
      }
    }
    int64_t send_num = 0;
    int64_t need_receive_num = 0;
    int32_t migrate_keys_num = 0;
    for (const auto& send_key : send_keys) {
      // 发送单个key
      if (0 > (send_num = MigrateOneKey(cli_, send_key.second, send_key.first, false))) {
        LOG(WARNING) << "PikaParseSendThread::ThreadMain MigrateOneKey: " << send_key.second << " failed !!!";
        migrate_thread_->OnTaskFailed();
        migrate_thread_->DecWorkingThreadNum();
        return nullptr;
      } else {
        need_receive_num += send_num;
        ++migrate_keys_num;
      }
    }
    // 阻塞,等待收到need_receive_num个数据包
    if (!CheckMigrateRecv(need_receive_num)) {
      LOG(INFO) << "PikaMigrateThread::ThreadMain CheckMigrateRecv failed !!!";
      migrate_thread_->OnTaskFailed();
      migrate_thread_->DecWorkingThreadNum();
      return nullptr;
    } else {
      DelKeysAndWriteBinlog(send_keys, slot_);
    }
    migrate_thread_->AddResponseNum(migrate_keys_num);
    migrate_thread_->DecWorkingThreadNum();
  }
  return nullptr;
}
int PikaParseSendThread::MigrateOneKey(net::NetCli *cli, const std::string& key, const char key_type, bool async) {
  int send_num;
  switch (key_type) {
    case 'k':
      if (0 > (send_num = MigrateKv(cli_, key, slot_))) {
        return -1;
      }
      break;
    ......
    default:
      return -1;
      break;
  }
  return send_num;
}

相对于PikaParseSendThread,PikaMigrateThread并不执行实际的数据迁移任务,而是用来进行迁移任务的管理。pika后台线程线程的执行是分批次进行,每一批次执行完成之后会挂起,需要被再次唤醒。PikaMigrateThread后台线程的主要代码执行流程如下:

LOG(INFO) << "PikaMigrateThread::ThreadMain Start";
  // Create parse_send_threads
  auto dispatch_num = static_cast<int32_t>(g_pika_conf->thread_migrate_keys_num());
  if (!CreateParseSendThreads(dispatch_num)) {
    LOG(INFO) << "PikaMigrateThread::ThreadMain CreateParseSendThreads failed !!!";
    DestroyThread(true);
    return nullptr;
  }
  std::string slotKey = GetSlotKey(static_cast<int32_t>(slot_id_));
  int32_t slot_size = 0;
  slot_->db()->SCard(slotKey, &slot_size);
  while (!should_exit_) {
    // Waiting migrate task
    {
      std::unique_lock<std::mutex> lm(request_migrate_mutex_);
      while (!request_migrate_) {
        request_migrate_cond_.wait(lm);
      }
      // 每轮迁移对应一个task,执行完成之后需要被再次唤醒
      request_migrate_ = false;
    }
    // read keys form slot and push to mgrtkeys_queue_
    int64_t round_remained_keys = keys_num_;
    int64_t real_read_num = 0;
    int32_t is_finish = 0;
    send_num_ = 0;
    response_num_ = 0;
    do {
      std::unique_lock lq(mgrtkeys_queue_mutex_);
      std::unique_lock lo(mgrtone_queue_mutex_);
      std::unique_lock lm(mgrtkeys_map_mutex_);
      // 查找待迁移的key,包括单key的迁移和slot迁移
      if (!mgrtone_queue_.empty()) {
        while (!mgrtone_queue_.empty()) {
          mgrtkeys_queue_.push_front(mgrtone_queue_.front());
          mgrtkeys_map_[mgrtone_queue_.front()] = INVALID_STR;
          mgrtone_queue_.pop_front();
          ++send_num_;
        }
      } else {
        int64_t need_read_num = (0 < round_remained_keys - dispatch_num) ? dispatch_num : round_remained_keys;
        ReadSlotKeys(slotKey, need_read_num, real_read_num, &is_finish);
        round_remained_keys -= need_read_num;
        send_num_ += static_cast<int32_t>(real_read_num);
      }
      //唤醒worker线程
      mgrtkeys_cond_.notify_all();
    } while (0 < round_remained_keys && !is_finish);
    LOG(INFO) << "PikaMigrateThread:: wait ParseSenderThread finish";
    //阻塞等待worker线程执行完成
    {
      std::unique_lock lw(workers_mutex_);
      while (!should_exit_ && is_task_success_ && send_num_ != response_num_) {
        workers_cond_.wait(lw);
      }
    }
    LOG(INFO) << "PikaMigrateThread::ThreadMain send_num:" << send_num_ << " response_num:" << response_num_;
    if (should_exit_) {
      LOG(INFO) << "PikaMigrateThread::ThreadMain :" << pthread_self() << " exit2 !!!";
      DestroyThread(false);
      return nullptr;
    }
    // check one round migrate task success
    if (!is_task_success_) {
      LOG(ERROR) << "PikaMigrateThread::ThreadMain one round migrate task failed !!!";
      DestroyThread(true);
      return nullptr;
    } else {
      moved_num_ += response_num_;
      std::unique_lock lm(mgrtkeys_map_mutex_);
      std::map<std::pair<const char, std::string>, std::string>().swap(mgrtkeys_map_);
    }
    // check slot migrate finish
    int32_t slot_remained_keys = 0;
    slot_->db()->SCard(slotKey, &slot_remained_keys);
    if (0 == slot_remained_keys) {
      LOG(INFO) << "PikaMigrateThread::ThreadMain slot_size:" << slot_size << " moved_num:" << moved_num_;
      if (slot_size != moved_num_) {
        LOG(ERROR) << "PikaMigrateThread::ThreadMain moved_num != slot_size !!!";
      }
      DestroyThread(true);
      return nullptr;
    }
  }
  return nullptr;
}

2. codis

2.1 dashboard

dashboard是codis架构中的中心管理节点,负责proxy节点,pika节点的管理,发起运维操作,检测节点状态以及进行failover。dashboard持有集群整体的信息,需要持久化的数据保存在etcd或zookeeper或者本地磁盘文件中。

2.1.1 关键类

Topom是dashboard中的一个关键类,记录了dashboard中所有信息。定义如下:

type Topom struct {
	mu sync.Mutex
	xauth string
	model *models.Topom
        //抽象出来的存储组件,可以是zk/etcd/fs
	store *models.Store
        //缓存结构,减少从store获取次数,当成员变量值有变更时,
        //会调用相关接口清除cache,下次获取数据时会强制从store中load
	cache struct {
		hooks list.List
		slots []*models.SlotMapping
		group map[int]*models.Group
		proxy map[string]*models.Proxy
		sentinel *models.Sentinel
	}
	exit struct {
		C chan struct{}
	}
	config *Config
	online bool
	closed bool
	ladmin net.Listener
        //slot迁移时使用
	action struct {
		redisp *redis.Pool
		interval atomic2.Int64
		disabled atomic2.Bool
		progress struct {
			status atomic.Value
		}
		executor atomic2.Int64
	}
	stats struct {
		redisp *redis.Pool
		servers map[string]*RedisStats
		proxies map[string]*ProxyStats
	}
	ha struct {
		redisp *redis.Pool
		monitor *redis.CodisSentinel
		masters map[int]string
	}
}
context看起来是Topom的一个只读快照。
type context struct {
	slots []*models.SlotMapping
	group map[int]*models.Group
	proxy map[string]*models.Proxy
	sentinel *models.Sentinel
	hosts struct {
		sync.Mutex
		m map[string]net.IP
	}
	method int
}
const (
	ActionNothing   = ""
	ActionPending   = "pending"
	ActionPreparing = "preparing"
	ActionPrepared  = "prepared"
	ActionMigrating = "migrating"
	ActionFinished  = "finished"
	ActionSyncing   = "syncing"
)

Action状态在三个地方会用到,第一个是仅用于实现promot server,即提升group server为master,此时更新的是group.Action.State。

第二个是用于实现slot的迁移,第三个是用来实现group内pika节点的主从sync。

2.1.2 主要函数

dashboard在启动之后,其主要工作通过6个goroutine来实现。分别是:

1. CheckMastersAndSlavesState。
2. CheckPreOffineMastersState。
3. RefreshRedisStats。
4. RefreshProxyStats。
5. ProcessSlotAction。
6. ProcessSyncAction。
2.1.2.1 CheckMastersAndSlavesState

主要工作:周期性检测pika节点状态,并根据状态统计对需要下线节点进行摘除。

第一阶段是检测pika节点的状态。首先是获取所有group的server信息,之后对每个server执行info replication命令,获取每个group的master和slave关系。这一阶段在
CodisSentinel.RefreshMastersAndSlavesClient函数中完成。

第二阶段根据统计结果更新每个group中的pika节点的状态。即遍历每个server的stat统计信息,如果某个pika server的error状态不为nil,而且该节点是对应group的master节点,需要更新该节点的状态。具体地,如果该节点之前的状态是GroupServerStateNormal,先将该节点标记为
GroupServerStateSubjectiveOffline,即主观下线。如果之前不是normal状态,则累加ReCallTimes,如果ReCallTimes大于等于设定的主观下线阈值,将节点状态更新为GroupServerStateOffline,并将该group记录到pending数据中,后续将对pending中记录的group进行failover操作。(其实在CheckMasterAndSlavesState中并不会走到第二个节点,因为filter函数会过滤掉master状态不是normal的节点,所以stat信息中不会包含非normal状态的master节点,这部分逻辑将会在2.1.2.2函数中进行)。接下来对每个节点更新state信息和offset信息。具体代码如下所示:

// It was the master node before, the master node hangs up, and it is currently the master node
if state.Index == 0 && state.Err != nil && g.Servers[0].Addr == state.Addr {
if g.Servers[0].State == models.GroupServerStateNormal {
//主观下线
				g.Servers[0].State = models.GroupServerStateSubjectiveOffline
			} else {
// update retries
				g.Servers[0].ReCallTimes++
// Retry more than config times, start election
if g.Servers[0].ReCallTimes >= s.Config().SentinelMasterDeadCheckTimes {
// Mark enters objective offline state
					g.Servers[0].State = models.GroupServerStateOffline
					g.Servers[0].ReplicaGroup = false
				}
// Start the election master node
if g.Servers[0].State == models.GroupServerStateOffline {
					pending = append(pending, g)
				}
			}
		}
                // Update the offset information of the state and role nodes
if val, ok := serversMap[state.Addr]; ok {
if state.Err != nil {
if val.State == models.GroupServerStateNormal {
val.State = models.GroupServerStateSubjectiveOffline
				}
continue
			}
val.State = models.GroupServerStateNormal
val.ReCallTimes = 0
val.Role = state.Replication.Role
if val.Role == "master" {
val.ReplyOffset = state.Replication.MasterReplOffset
			} else {
val.ReplyOffset = state.Replication.SlaveReplOffset
			}
		}

第三阶段进行failover。第二阶段中记录到pending中的group需要进行主从切换。首先从对应group中选新的master,挑选的原则是状态是normal且并且replyoffset最大。接下来对新master执行slaveof no one,对其他节点重新执行slaveof命令绑定到新master上。更新group统计信息,交换新老master在group.servers的位置,删除对应group的cache信息,标记group为OutOfSync = true,更新store中信息。

2.1.2.2 CheckPreOffineMastersState

整体执行逻辑类似于CheckMasterAndSlaveState,不同的是filter函数。2.1.2.1中检测状态时会忽略掉状态不是normal的master节点,当前函数逻辑互补,即只检测不是正常状态的master节点。猜测区分成两个函数处理的逻辑是为了使用不同的检测频率。

2.1.2.3 RefreshRedisStats

遍历所有的group以及每个group的server,向每个pika节点发info请求,获取统计信息。然后发送命令“config get maxmemory”,统计maxmemory。目前返回的是max-write-buffer-size,后期可以优化下。所有server的统计信息统计在map[string]*RedisStats,赋值给s.stats.severs。

2.1.2.4 RefreshProxyStats

类似于获取redis server节点的统计信息,同理,向所有的proxy发请求,获取统计信息,记录到Topom.stats.proxies中。初次之外,会扫一遍topom中记录的所有proxy,如果proxy的状态不是online,执行OnlineProxy对proxy进行上线操作。

2.1.2.5 ProcessSlotAction

功能:执行slots迁移相关的工作。具体执行流程类似于ProcessSyncAction。

第一步找到所有状态不是Nothing的slot,说明这些slots需要执行迁移动作,或者已经在执行迁移动作过程中。找到足够的slots之后,更新状态到Migrating,根据dashboard中设置的迁移函数,开始进行迁移。每迁移完成一个slots,执行一次resyncmappings操作,将slots新状态同步到proxy节点。在slot迁移完成之前,proxy收到业务的请求,需要先将对应的key迁移到destination,然后在执行读写操作。

2.1.2.6 ProcessSyncAction

功能:遍历所有的group和所有的节点,如果有group的Action成员变量不为空,找到Action.Index最小的,对其执行slaveof。主要函数分为

SyncActionPrepare,
newSyncActionExecutor, 
SyncActionComplete。
SyncActionPrepare

第一步遍历所有的group中的所有server,如果server.Action.State == models.ActionPending,并且server.Action.Index最小,记录对应的server addr。然后根据server addr找到所属的group,更新group.server.Action.Index为0,Action.State为ActionSyncing。

newSyncActionExecutor(addr)

找出对应group的master,返回的lambda处理逻辑是如果master为NO:ONE,执行slaveof no one。如果不是,对addr节点执行slaveof master

SyncActionComplete

扫尾函数,更新cache,下发mappings到proxy节点。

2.1.2.7 resyncSlotMappings

函数签名:

func (s *Topom) resyncSlotMappings(ctx *context, slots ...*models.SlotMapping) error作用是将传入的models.SlotMapping中的slot转换成models.Slot,并下发给所有的proxy节点。

models.Slot定义:

type Slot struct {
	Id     int  `json:"id"`
	Locked bool `json:"locked,omitempty"`
	BackendAddr        string `json:"backend_addr,omitempty"`
	BackendAddrGroupId int    `json:"backend_addr_group_id,omitempty"`
        //处于迁移状态的slot会设置这两个值.
        //在proxy节点,如果客户端请求的key分配到了当前slot且这两个参数不为空,proxy需要先迁移对应的key到target节点(slotmgrtone)
	MigrateFrom        string `json:"migrate_from,omitempty"`
	MigrateFromGroupId int    `json:"migrate_from_group_id,omitempty"`
        //proxy需要同步迁移一个key时使用的方法
	ForwardMethod int `json:"forward_method,omitempty"`
        //复制组,dc就近读
	ReplicaGroups [][]string `json:"replica_groups,omitempty"`
}

主要包括了两个函数,context.toSlot()和FilSlots()

context完成models.Mapping到models.Slot的转换,转换过程中相关key的赋值情况:

Locked: 如果slot的状态是prepared,返回true。如果不是preapred状态,检查group lock状态。

BackendAddr: 
BackendAddrGroupId
MigrateFrom
MigrateFromGroupId
2.1.3 主要流程
slot rebalance

slot rebalance形参中有一个confirm参数,如果为false,表明只是生成一个slot rebalance的迁移计划,并不会真正执行,如果为true,表明生成了迁移计划之后就更新对应slot的状态。

关键的几个变量:

  • assigned : map[int]int //key: groupId, value: 不需要迁移的slots个数
  • pendings : map[int][]int //key: groupId, value: 等待迁出的待分配的slots个数,即pendings中记录的[]slot记录的slot当前属于key对应的groupId,可以迁出,但还没有找到目的端。
  • moveout:map[int]int //key: groupId, value: 对应groupId中,要迁出的slots的数量。如果为正值,说明对应的groupId需要往外迁移,如果为负值,说明需要其他的group向它迁移数据。
  • docking: 需要进行迁移的slots,其中包括了offline slots,还有需要进行迁移的slots

slot_rebalance整体的执行逻辑包括:

1. topom加锁,生成一个新的context。

2. 遍历ctx.slots,如果某个slot的Action.State不为“”,说明该slot属于已经迁移的slot,那么对应的嵌入端的group的assigned值++。

3. 遍历ctx.slots,如果某个slot的Action.State为"",并且groupId不为0, 如果该group的slot个数小于平均值,那么该group的assigned值++。否则,该group就需要迁出一些slots,所以pendings中记录该group和slotid

4. 构造一棵红黑树,排序依据是groupsize,将所有的groupId存到rbtree中。作用是尽量slots分片尽量均匀地分配给所有的slots,所以需要按照节点持有的分片数进行排序。

5. 遍历所有的slots,如果某个slot的groupId为0,即为offline slot(最开始,集群中的slots没有分配给任何节点,就是offline slot),则groupsize最小的group的moveout值--。(通过rbtree找到最小size的group,moveout值负数,表示需要迁入slot)。

6. 从rbtree中找到groupsize最大的group,再找到groupsize最小的group,如果他们size相差大于1,那么最大size的group的moveout++,最小的moveout--。(其实就是在做slots均衡,削峰填谷)

7. 根据第8步中已经计算出的moveout值,遍历moveout,如果值大于0,表明需要迁出,那么从pending中截取指定长度的slots,追加到docking中。

8. 遍历groupids和docking,将docking中记录的slots的目的端记录为需要迁入的group,生成了一个迁移plan,key位slotid,value为targetGroupId。

9. 如果confirm为true,更新slot的Action.State为ActionPending,Action.Index为一个单调递增的counter值,targeId为plans中记录的groupId。执行storeUpdateSlotMapping,更新store,清除cache。processslotaction函数会执行从plan中恢复中所有需要执行迁移的slots。

2.1.4 主要接口

类定义

type Group struct {
	Id      int            json:"id"
	Servers []*GroupServer json:"servers"
	Promoting struct {
		Index int    json:"index,omitempty"
		State string json:"state,omitempty"
	} json:"promoting"
	OutOfSync bool json:"out_of_sync"
}

相关api

创建group

"/api/topom/group/create/{xauth}/{gid}"

topom_group.go

执行完成之后,如果store选择的是文件系统,会在prodct_name目录下创建一个group目录,group目录中新建“group-{gid}”,目录中初始内容:

{
    "id": 1,
    "servers": [],
    "promoting": {},
    "out_of_sync": false
}

添加server

"/api/topom/group/add/{xauth}/{gid}/{addr}/{dc}"
topom_group.go:GroupAddServer
newcontext -> getGroup() -> group g

如果group的Promoting.state不是ActionNothing, 返回error

如果ctx.sentinel.servers不为空,标记sentinel.OutOfSync为true,执行storeUpdateSentinel

将新server追加到group的server中,保存到store.group-{gid}文件内容变为:

{
    "id": 1,
    "servers": [
        {
            "server": "10.224.129.40:9271",
            "datacenter": "",
            "action": {},
            "role": "",
            "reply_offset": 0,
            "state": 0,
            "recall_times": 0,
            "replica_group": false
        }
    ],
    "promoting": {},
    "out_of_sync": false
}

主从同步

"/api/topom/group/action/create/{token}/10.224.129.40:9261"

首先是状态判断,如果group.Promoting.State不是nothing,返回error。如果对应server的Action.State == models.ActionPending,返回error,表明该server已经有action存在。

设置server.Action.Index,设置Action.State ==models.ActionPending

实际执行主从同步的步骤,是通过dashboard的后台goroutine执行ProcessSyncAction完成。

{
    "id": 1,
    "servers": [
        {
            "server": "10.224.129.40:9271",
            "datacenter": "",
            "action": {},
            "role": "master",
            "reply_offset": 0,
            "state": 0,
            "recall_times": 0,
            "replica_group": false
        },
        {
            "server": "10.224.129.40:9261",
            "datacenter": "",
            "action": {
                "state": "synced"
            },
            "role": "master",
            "reply_offset": 0,
            "state": 0,
            "recall_times": 0,
            "replica_group": false
        }
    ],
    "promoting": {},
    "out_of_sync": false
}

rebalance

"/api/topom/slots/rebalance/95b62887719520f17e312eaa76d28f2b/0"

topom_api.go:SlotsRebalance

topom_slots.go:SlotRebalance

同步完成plan的更新,将plan中的每个slot保存到store中,状态为pending。

接下来processSlotAction协程每秒被调度执行一次,首先遍历所有的slots,找到Action.State不是nothing状态的slot,更新状态从ActionPending -> ActionPreparing -> ActionPrepared -> ActionMigrating。每次更新时需要同步更新store,以及使cache失效。之后开始执行迁移操作,根据config中的配置,选择执行SLOTSMGRTTAGSLOT或者是SLOTSMGRTTAGSLOT_ASYNC. slotsmigrtagslot命令会返回对应slotkey中还剩余的key个数。dashboard收到之后就可以判断是否这个codis已经迁移完成,如果已经迁移完成,执行slotActionComplete,更新slotAction的状态以及resyncSlotMapings通知proxy。

如果没有全部迁移完,sleep指定的slot_action_interval之后重视。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐