新书推介:《语义网技术体系》
作者:瞿裕忠,胡伟,程龚
   XML论坛     W3CHINA.ORG讨论区     计算机科学论坛     SOAChina论坛     Blog     开放翻译计划     新浪微博  
 
  • 首页
  • 登录
  • 注册
  • 软件下载
  • 资料下载
  • 核心成员
  • 帮助
  •   Add to Google

    >> 操作系统研究。UEFI
    [返回] 中文XML论坛 - 专业的XML技术讨论区计算机理论与工程『 操作系统原理 』 → Heartbeat的可靠消息通信实现分析[转帖] 查看新帖用户列表

      发表一个新主题  发表一个新投票  回复主题  (订阅本版) 您是本帖的第 6560 个阅读者浏览上一篇主题  刷新本主题   树形显示贴子 浏览下一篇主题
     * 贴子主题: Heartbeat的可靠消息通信实现分析[转帖] 举报  打印  推荐  IE收藏夹 
       本主题类别:     
     hi2008 帅哥哟,离线,有人找我吗?
      
      
      等级:大一新生
      文章:2
      积分:71
      门派:XML.ORG.CN
      注册:2007/4/25

    姓名:(无权查看)
    城市:(无权查看)
    院校:(无权查看)
    给hi2008发送一个短消息 把hi2008加入好友 查看hi2008的个人资料 搜索hi2008在『 操作系统原理 』的所有贴子 引用回复这个贴子 回复这个贴子 查看hi2008的博客楼主
    发贴心情 Heartbeat的可靠消息通信实现分析[转帖]

    原贴:
    [url]http://www.gd-linux.org/bbs/showthread.php?p=11048#post11048 [/url]
    Heartbeat的可靠消息通信实现分析

    广东省Linux公共服务技术支持中心
    颜定成

    摘要

    Heartbeat项目是Linux-HA工程的一个组成部分,它实现了一个高可用集群系统。心跳服务和集群通信是高可用集群的两个关键组件,在Heartbeat项目里,由heartbeat模块实现了这两个功能。这篇文章描述了heartbeat模块的可靠消息通信机制,并对其实现原理做了一些介绍。

      关键词:集群通信 高可用集群 Linux Heartbeat

    引言

      Heartbeat是Linux-HA工程的一个组件,自1999年开始到现在,发布了众多版本,是目前开源Linux-HA项目最成功的一个例子,在行业内得到了广泛的应用,这里分析的是2007年1月18日发布的版本2.0.8,可以从Linux-HA的官方网站www.linux-ha.org下载到最新版本。
      随着Linux在关键行业应用的逐渐增多,它必将提供一些原来由IBM和SUN这样的大型商业公司所提供的服务,这些商业公司所提供的服务都有一个关键特性,就是高可用集群。
      高可用集群是指一组通过硬件和软件连接起来的独立计算机,它们在用户面前表现为一个单一系统,在这样的一组计算机系统内部的一个或者多个节点停止工作,服务会从故障节点切换到正常工作的节点上运行,不会引起服务中断。从这个定义可以看出,集群必须检测节点和服务何时失效,何时恢复为可用。这个任务通常由一组被称为“心跳”的代码完成。在Linux-HA里这个功能由一个叫做heartbeat的程序完成。

    Heartbeat消息通信模型

    Heartbeat包括以下几个组件:
    heartbeat – 节点间通信校验模块
    CRM - 集群资源管理模块
    CCM - 维护集群成员的一致性
    LRM - 本地资源管理模块
    Stonith Daemon - 提供节点重启服务
    logd - 非阻塞的日志记录
    apphbd - 提供应用程序级的看门狗计时器
    Recovery Manager - 应用故障恢复
    底层结构–包括插件接口、进程间通信等
    CTS – 集群测试系统,集群压力测试
    这里主要分析的是Heartbeat的集群通信机制,所以这里主要关注的是heartbeat模块。heartbeat模块由以下几个进程构成:
    master进程(master process)
    FIFO子进程(fifo child)
    read子进程(read child)
    write子进程(write child)
    在heartbeat里每一条通信通道对应于一个write子进程和一个read子进程,假设n是通信通道数,p为heartbeat模块的进程数,则p、n有以下关系:

    p = 2 * n + 2

    在heartbeat里,master进程把自己的数据或者是客户端发送来的数据,通过IPC发送到write子进程,write子进程把数据发送到网络;同时read子进程从网络读取数据,通过IPC发送到master进程,由master进程处理或者由master进程转发给其客户端处理。这几个进程间的数据流如图一:
    按此在新窗口浏览图片
    图一:heartbeat模块进程间的数据流
    Heartbeat启动的时候,由master进程来启动FIFO子进程、write子进程和read子进程,最后再启动client进程。

    可靠消息通信

      Heartbeat通过插件技术实现了集群间的串口、多播、广播和组播通信,在配置的时候可以根据通信媒介选择采用的通信协议,heartbeat启动的时候检查这些媒介是否存在,如果存在则加载相应的通信模块。这样开发人员可以很方便地添加新的通信模块,比如添加红外线通信模块。
    对于高可用集群系统,如果集群间的通信不可靠,那么很明显集群本身也不可靠。Heartbeat采用UDP协议和串口进行通信,它们本身是不可靠的,可靠性必须由上层应用来提供。那么怎样保证消息传递的可靠性呢?
    Heartbeat通过冗余通信通道和消息重传机制来保证通信的可靠性。Heartbeat检测主通信链路工作状态的同时也检测备用通信链路状态,并把这一状态报告给系统管理员,这样可以大大减少因为多重失效引起的集群故障不能恢复。例如,某个工作人员不小心拨下了一个备份通信链路,一两个月以后主通信链路也失效了,系统就不能再进行通信了。通过报告备份通信链路的工作状态和主通信链路的状态,可心完全避免这种情况。因为这样在主通信链路失效以前,就可以检测到备份工作链路失效,从而在主通信链路失效前修复备份通信链路。
    Heartbeat通过实现不同的通信子系统,从而避免了某一通信子系统失效而引起的通信失效。最典型的就是采用以太网和串口相结合的通信方式。这被认为是当前的最好实践,有几个理由可以使我们选择采用串口通信:

    (1)IP通信子系统的失效不太可能影响到串口子系统。
    (2)串口不需要复杂的外部设备和电源。
    (3)串口设备简单,在实践中非常可靠。
    (4)串口可以非常容易地专用于集群通信。
    (5)串口的直连线因为偶然性掉线事件很少。

    不管是采用串口还是以太网IP协议进行通信,heartbeat都实现了一套消息重传协议,保证消息包的可靠传递。实现消息包重传有两种协议,一种是发送者发起,另一种是接收者发起。
    对于发送者发起协议,一般情况下接收者会发送一个消息包的确认。发送者维护一个计时器,并在计时器到时的时候重传那些还没有收到确认的消息包。这种方法容易引起发送者溢出,因为每一台机器的每一个消息包都需要确认,使得要发送的消息包成倍增长。这种现像被称为发送者(或者ACK)内爆(implosion)。
    对于接收者发起协议,采用这种协议通信双方的接收者通过序列号负责进行错误检测。当检测到消息包丢失时,接收者请求发送者重传消息包。采用这种方法,如果消息包没有被送达任何一个接收者,那么发送者容易因NACK溢出,因为每个接收者都会向发送者发送一个重传请求,这会引起发送者的负载过高。这种现像被称为NACK内爆(implosion)。
    Heartbeat实现的是接收者发起协议的一个变种,它采用计时器来限制过多的重传,在计时器时间内限制接收者请求重传消息包的次数,这样发送者重传消息包的次数也被相应的限制了,从而严格的限制了NACK内爆。

    可靠消息通信的实现

    一般集群通信有两类消息包,一类是心跳消息包,这类消息包通告集群内节点的存活情况;另一类是控制消息包,这类消息包负责集群的节点和资源管理。heartbeat把心跳消息包看成是控制消息包的一个特例,采用相同的通信通道进行发送,这使得协议的实现简单化,而且很有效,并把相应的代码限制在几百行之内。
    在heartbeat里,一切流向网络的数据都由master进程发送到write子进程进行发送。master进程调用send_cluster_msg()函数把消息发送到所有的write子进程。下面通过一些代码片段看看heartbeat是怎么发送消息的。在介绍代码之前先介绍相关的重要数据结构

    Heartbeat的消息包数据结构
    struct ha_msg {
    int nfields; /*消息包数据域的个数*/
    int nalloc; /*己分配的内存块个数*/
    char **names; /*消息包数据域的名称*/
    size_t *nlens; /*各个数据域称的长度*/
    void **values; /*与数据域名称对应的数据值*/
    size_t *vlens; /*各个数据域对应的数据值的长度*/
    int *types; /*消息包的类型*/
    };

    Heartbeat的历史消息队列
    struct msg_xmit_hist {
    struct ha_msg *msgq[MAXMSGHIST]; /*历史消息队列*/
    seqno_t seqnos[MAXMSGHIST]; /*历史消息序列号*/
    longclock_t lastrexmit[MAXMSGHIST]; /*上一次重传的时间*/
    int lastmsg; /*上一次重传到的消息序列号*/
    seqno_t hiseq; /*最大消息序列号*/
    seqno_t lowseq; /*最小消息序列号*/
    seqno_t ackseq; /*确认了的消息序列号*/
    struct node_info *lowest_acknode; /*确认的节点*/
    };

    代码所属文件heartbeat/heartbeat.c

    int send_cluster_msg(struct ha_msg *msg)
    {
    ...
    pid_t ourpid = getpid();
    ...

    if (ourpid == processes[0]) { /*来自master进程的消息*/
    /*添加控制信息,包括源节点名,源节点全局标识符,序列号,代数,时间等*/
    if ((msg = add_control_msg_fields(msg)) != NULL) {
    /*可靠的多播消息包传递*/
    rc = process_outbound_packet(&msghist, msg);
    }
    } else { /*来自client进程的消息*/
    int ffd = -1;
    char *smsg = NULL;

    ...

    /*发送到FIFO进程*/

    if ((smsg = msg2wirefmt_noac(msg, &len)) == NULL) {
    ...
    } else if ((ffd = open(FIFONAME, O_WRONLY | O_APPEND)) < 0) {
    ...
    } else if ((writerc = write(ffd, smsg, len – 1)) != (ssize_t)(len-1)) {
    ...
    }
    }
    }

    Heartbeat在process_outbound_packet()函数里实现了一个可靠的多播协议,它利用一个循环队列来存放历史消息,对于带有序列号的心跳消息,先存放到历史消息队列里然后发送,接收者可以请求发送传重传该消息,对于不带序列号的控制消息,不会进行重传。下面是这个函数的实现代码。

    static int process_outbound_packet(struct msg_xmit_hist *hist,
    struct ha_msg *msg)
    {
    ...
    const char *cseq;
    seqno_t seqno = -1;
    const char *to;
    int IsToUs;
    size_t len;
    ...

    if ((type = ha_msg_value(msg, F_TYPE)) == NULL) {
    ...
    return HA_FAIL;
    }

    if ((cseq = ha_msg_value(msg, F_SEQ)) != NULL) {
    if (sscanf(cseq, “%lx”, &seqno) != 1 || seqno <= 0) {
    ...
    return HA_FAIL;
    }
    }

    to = ha_msg_value(msg, F_TO);
    IsToUs = (to != NULL) && (strcmp(to, curnode->nodename) == 0);

    /*把消息转换成字符串*/
    smsg = msg2wirefmt(msg, &len);

    ...

    if (cseq != NULL) {
    /*存放到历史消息队列里,通过序列号记录,如果需要,则进行重传*/
    add2_xmit_hist(hist, msg, seqno);
    }

    ...

    /*通过write子进程发送到所有的网络接口上*/
    send_to_all_media(smsg, len);

    ...

    return HA_OK;
    }
    add2_xmit_hist()函数把发送的消息发到一个历史消息队列里去,队列的最大长度为200。如果接收者请求重传消息,发送者通过序列号在该队列里查找要重传的消息,如果找到则进行重传。下面是相关代码。

    static void add2_xmit_hist (struct msg_xmit_hist *hist, struct ha_msg *msg,
    seqno_t seq)
    {
    int slot;
    struct ha_msg *slotmsg;

    ...

    /*查找队列里消息存放的位置*/
    slot = hist->lastmsg + 1;
    if (slot >= MAXMSGHIST) {
    /*到达队尾,从头开始。在这里实现循环队列*/
    slot = 0;
    }

    hist->hiseq = seq;
    slotmsg = hist->msgq[slot];

    /*删除队列中找到的位置上的旧消息*/
    if (slotmsg != NULL) {
    hist->lowseq = hist->seqnos[slot];
    hist->msgq[slot] = NULL;
    if (!ha_is_allocated(slotmsg)) {
    ...
    } else {
    ha_msg_del(slotmsg);
    }
    }

    hist->msgq[slot] = msg;
    hist->seqnos[slot] = seq;
    hist->lastrexmit[slot] = 0L;
    hist->lastmsg = slot;

    if (enable_flow_control && live_node_count > 1
    && (hist->hiseq – hist->lowseq) > ((MAXMSGHIST*3)/4)) {
    /*消息队列长度大于告警长度,记录日志*/
    ...
    }
    if (enable_flow_control
    && hist->hiseq – hist->ackseq > FLOWCONTROL_LIMIT) {
    /*消息队列的长度大于流控限制长度*/
    if (live_node_count < 2) {
    /*集群里只有本机节点为存活节点,更新历史消息队列,删除旧消息,
    以防止历史消息队列满*/
    update_ackseq(hist->hiseq – (FLOWCONTROL_LIMIT – 1));
    all_clients_resume();
    } else {
    /*client进程发送消息过快,暂停所有的client进程*/
    all_clients_pause();
    hist_display(hist);
    }
    }

    }

    当发送者收到接收者的重传请求后,通过回调函数HBDoMsg_T_REXMIT()函数调用process_rexmit()函数进行消息重传。

    #define MAX_REXMIT_BATCH 50 /*每次最多重传的消息包数*/

    static void process_rexmit(struct msg_xmit_hist *hist, struct ha_msg *msg)
    {
    const char *cfseq;
    const char *clseq;
    seqno_t fseq = 0;
    seqno_t lseq = 0;
    seqno_t thisseq;
    int firstslot = hist->lastmsg – 1;
    int rexmit_pkt_count = 0;
    const char *fromnodename = ha_msg_value(msg, F_ORIG);
    struct node_info *fromnode = NULL;

    ...

    /*取得要重传的消息包的起始序列号*/
    if ((cfseq = ha_msg_value(msg, F_FIRSTSEQ)) == NULL
    || (clseq = ha_msg_value(msg, F_LASTSEQ)) == NULL
    || (fseq = atoi(cfseq)) <= 0 || (lseq = atoi(clseq)) <= 0 || fseq > lseq) {
    /*无效序列号,记录日志信息*/
    ...
    }

    ...

    /*重传丢失的消息包*/
    for (thisseq = fseq; thisseq <= lseq; ++thisseq) {
    int msgslot;
    int foundit = 0;

    if (thisseq <= fromnode->track.ackseq) {
    /*该消息包已经被确认过,可以忽略掉*/
    continue;
    }
    if (thisseq <= hist->lowseq) {
    /*序列号小于消息队列里的最小序列号,该消息己不存在于历史消息队列中*/
    /*告知对方,不重传该消息*/
    nak_rexmit(hist, thisseq, fromnodename, “seqno too low”);
    continue;
    }
    if (thisseq > hist->hiseq) {
    /*序列号大于消息队列中最大序列号*/
    ...
    continue;
    }

    for (msgslot = firstslot; !foundit && msgslot != (firstslot+1);
    --msgslot) {
    char *smsg;
    longclock_t now = time_longclock();
    longclock_t last_rexmit;
    size_t len;

    ...

    /*重传上一次重传剩下的消息包*/
    last_rexmit = hist->lastrexmit[msgslot];

    if (cmp_longclock(last_rexmit, zero_longclock) != 0
    && longclockto_ms(sub_longclock(now, last_rexmit))
    < (ACCEPT_REXMIT_REQ_MS)) {
    goto NextReXmit;
    }

    /*一次不能发送太多数据包,如果数据包太多的话,可能会引起串口溢出*/
    ++rexmit_pkt_count;
    if (rexmit_pkt_count > MAX_REXMIT_BATCH) {
    return;
    }

    /*找到要重传的消息包,进行重传*/
    firstslot = msgslot – 1;
    foundit = 1;

    ...

    /*消息包转换为字符串*/
    smsg = msg2wirefmt(hist->msgq[msgslot], &len);

    ...

    if (smsg != NULL) {
    hist->lastrexmit[msgslot] = now;
    send_to_all_media(smsg, len);
    ha_free(smsg);
    }
    }
    if (!foundit) {
    /*告知对方不重传该消息*/
    nak_rexmit(hist, thisseq, fromnodename, “seqno not found”);
    }
    NextReXmit:
    }
    }

    接收者在接收到消息以后,调用check_rexmit_reqs()函数检查是否需要请求发送者重传丢失的消息包,如果需要则调用request_msg_rexmit()函数请求发送者重传丢失的消息包,request_msg_rexmit()函数调用schedule_rexmit_request()来对消息包重传进行调度,限制每秒重传的消息包数。

    代码所属文件heartbeat/hb_rexmit.c

    static gboolean send_rexmit_request(gpointer data)
    {
    ...
    构建重传请求消息包hmsg
    ...

    if (send_cluster_msg(hmsg) != HA_OK) {
    ...
    return FALSE;
    }

    ...

    schedule_rexmit_request(node, seq, max_rexmit_delay);
    }

    static void schedule_rexmit_request(struct node_info *node, seqno_t seq,
    int delay)
    {
    unsigned long sourceid;
    struct rexmit_info * ri;

    ...

    ri->seq = seq;
    ri->node = node;

    sourceid = Gmain_timeout_add_full(G_PRIORITY_HIGH – 1, delay,
    send_rexmit_request, ri, NULL);

    ...

    }

    小结

    通过对Heartbeat消息通信机制的分析,发现它把心跳消息看成是控制消息的一个特例,采用接收者发起的模式进行重传,来确保消息的可靠传递,同时通过计时器来限制每秒请求重传消息包的次数,从而防止发送者因为请求重传的次数过多而发生内爆,导致发送者负载过高。

    参考

    1、Alan Robertson: “Linux-HA Heartbeat System Design”, Proceeding of the 4th Annual Linux Showcase & Conference Atlanta, Atlanta, Georgia, USA October 10-14, 2000
    2、Alex Vrenios: 《Linux集群体系结构》,马朝晖等译,机械工业出版社,2003。
    3、Evan Marcus, Hal Stern: 《高可用性系统设计》,汪青青、卢祖英译,清华大学出版社,2005。

    原贴:
    [url]http://www.gd-linux.org/bbs/showthread.php?p=11048#post11048 [/url]


       收藏   分享  
    顶(0)
      




    点击查看用户来源及管理<br>发贴IP:*.*.*.* 2007/4/29 11:51:00
     
     piaoying 帅哥哟,离线,有人找我吗?
      
      
      等级:大一新生
      文章:0
      积分:54
      门派:XML.ORG.CN
      注册:2007/5/18

    姓名:(无权查看)
    城市:(无权查看)
    院校:(无权查看)
    给piaoying发送一个短消息 把piaoying加入好友 查看piaoying的个人资料 搜索piaoying在『 操作系统原理 』的所有贴子 引用回复这个贴子 回复这个贴子 查看piaoying的博客2
    发贴心情 
    看不明白
    点击查看用户来源及管理<br>发贴IP:*.*.*.* 2007/5/18 9:44:00
     
     GoogleAdSense
      
      
      等级:大一新生
      文章:1
      积分:50
      门派:无门无派
      院校:未填写
      注册:2007-01-01
    给Google AdSense发送一个短消息 把Google AdSense加入好友 查看Google AdSense的个人资料 搜索Google AdSense在『 操作系统原理 』的所有贴子 访问Google AdSense的主页 引用回复这个贴子 回复这个贴子 查看Google AdSense的博客广告
    2024/6/26 10:56:01

    本主题贴数2,分页: [1]

    管理选项修改tag | 锁定 | 解锁 | 提升 | 删除 | 移动 | 固顶 | 总固顶 | 奖励 | 惩罚 | 发布公告
    W3C Contributing Supporter! W 3 C h i n a ( since 2003 ) 旗 下 站 点
    苏ICP备05006046号《全国人大常委会关于维护互联网安全的决定》《计算机信息网络国际联网安全保护管理办法》
    109.375ms