本站首页    管理页面    写新日志    退出


«August 2025»
12
3456789
10111213141516
17181920212223
24252627282930
31


公告

戒除浮躁,读好书,交益友


我的分类(专题)

日志更新

最新评论

留言板

链接

Blog信息
blog名称:邢红瑞的blog
日志总数:523
评论数量:1142
留言数量:0
访问次数:9691313
建立时间:2004年12月20日




[java语言]组播技术 和 JGroups(转 有修改)
文章收藏,  网上资源,  软件技术,  电脑与网络,  校园生活

邢红瑞 发表于 2006/7/21 14:40:15

IP数据包传输类型IPv4定义了3种IP数据包的传输:单播(unicast)广播(broadcast)组播(multicast). 比较一下unicast和multicast两种数据的传输方式可以发现,当一台主机向多个用户发送信息时,单播对于每一个用户都要发送一份数据的拷贝,而组播总共只需发送一份数据的拷贝。这样,组播的使用就大大的节省了带宽,减轻了网络的负载,从而更加有效的利用了网络的带宽资源. IP组播和单播的目的地址不同,IP组播的目的地址是组地址.是从224.0.0.0到239.255.255.255之间的D类IP地址,其中224.0.0.0到224.0.0.255是被保留的地址,224.0.0.1表示子网中所有的组播组, 224.0.0.2表示子网中的所有路由器,224.0.0.5表示OSPF(Open Shortest Path First)路由器,224.0.0.6表示OSPF指定路由器,224.0.0.12表示DHCP服务器. 在D类地址的分配中,IETF建议遵循以下的原则:    全球范围:224.0.1.0~238.255.255.255;    有限范围:239.0.0.0~239.255.255.255;    本地站点范围:239.253.0.0~239.253.0.16本地机构范围:239.192.0.0~239.192.0.14.   Java 组播程序的例子 import java.io.IOException;import java.net.DatagramPacket;import java.net.InetAddress;import java.net.MulticastSocket;import java.net.NetworkInterface;   public class Learn2 {   public static void main(String[] args) throws IOException {    new MulticastServer().start();    new MulticastClient().start();   } }   class MulticastServer extends Thread{  public void run(){    try{   MulticastSocket mcastSocket = new MulticastSocket(7777);   InetAddress inetAddress = InetAddress.getByName("230.0.0.1");   mcastSocket.setNetworkInterface(NetworkInterface.getByInetAddress(InetAddress.getLocalHost()));   mcastSocket.joinGroup(inetAddress);         while (true) {    byte [] arb = new byte [100];    DatagramPacket datagramPacket = new DatagramPacket(arb, arb.length);    mcastSocket.receive(datagramPacket);    System.out.println("Server:"+new String(arb,0,datagramPacket.getLength()));        }   }catch(Exception dx){   dx.printStackTrace();  } }}   class MulticastClient extends Thread{  public void run(){    try{   while(true){    byte [] arb = new byte []{'h', 'e', 'l', 'l', 'o'};    InetAddress inetAddress = InetAddress.getByName("230.0.0.1");    DatagramPacket datagramPacket = new DatagramPacket(arb, arb.length, inetAddress, 7777);    MulticastSocket multicastSocket = new MulticastSocket();    multicastSocket.send(datagramPacket);        Thread.sleep(1000);   }  }catch(Exception dx){   dx.printStackTrace();  } }  }使用jgroups的例子  import java.io.BufferedReader;import java.io.InputStreamReader; import org.jgroups.Address;import org.jgroups.ChannelException;import org.jgroups.JChannel;import org.jgroups.Message;import org.jgroups.ReceiverAdapter;import org.jgroups.View;   public class Learn1 {   private JChannel channel = null;  public Learn1() throws ChannelException{  channel = new JChannel("c:/udp.xml"); }  public static void main(String[] args) throws ChannelException {  Learn1 learn = new Learn1();    learn.startup();    new SendMessage(learn.channel).start(); }   private void startup() throws ChannelException{    channel.setReceiver(new ReceiverMessage());        //channel.setOpt(option, value);      channel.connect("FL");     }} class ReceiverMessage extends ReceiverAdapter{   public void receive(Message msg) {  System.out.println(msg.getSrc() + ": " + msg.getObject()+" at "+System.currentTimeMillis());   }   public void viewAccepted(View new_view) {  System.out.println("viewAccepted: " + new_view);   }   public void suspect(Address suspected_mbr) {  System.out.println("suspect: " + suspected_mbr);    } } class SendMessage extends Thread{   private JChannel channel;  public SendMessage(JChannel channel){  this.channel = channel; }   public void run(){   BufferedReader in=new BufferedReader(new InputStreamReader(System.in));  while(true) {   try{    System.out.print("> "); System.out.flush();    String line=in.readLine().toLowerCase();    if(line.startsWith("quit") || line.startsWith("exit")) {     System.exit(0);    }    Message msg=new Message(null, null, line);    channel.send(msg);   }catch(Exception dx){    dx.printStackTrace();   }  }   } }   使用的 udp.xml 及相关 参数说明 <!--  Default stack using IP multicasting. It is similar to the "udp"  stack in stacks.xml, but doesn't use streaming state transfer and flushing  author: Bela Ban  version: $Id: udp.xml,v 1.24.2.1 2007/11/20 08:53:40 belaban Exp $--> <config>  <!--  UDP 协议设置    ip_mcast:指定是否使用ip 多播 , default true    也可以通过 TCP 连接发送多个单播消息到成员,TCP也可以发送多播消息,但是这不是标准。    mcast_addr: 多播地址 缺省:228.8.8.8;        mcast_port: 多播端口 缺省:7600               ucast_recv_buf_size="20000000" //单播接收缓冲区大小 缺省:64000        ucast_send_buf_size="640000" //单播发送缓冲区大小 缺省:32000        mcast_recv_buf_size="25000000" //组播接收缓冲区大小 缺省:64000        mcast_send_buf_size="640000" //组播发送缓冲区大小 缺省:32000      loopback :如果为true, 单播直接给自己。组播则同样直接给自己处理,并且之后组播。      当 enable_bundling 为 true 并将max_bundle_timeout设置一个足够大的值会体现出差别      缺省:false                  discard_incompatible_packets: 丟弃不同版本的数据包 缺省:false    enable_bundling:指定是否能够捆绑消息。如果为真,则累积到max_bundle_size字节,或者max_bundle_time超时。再将其发出。缺省:false        max_bundle_size:字节数,当消息累积字节数。当累积值超过该值则消息会被发送。缺省:65535        max_bundle_timeout:消息发送的超时毫秒数。队列中等待发送的消息超过该时间则被发送. 缺省:20               use_incoming_packet_handler:当接收消息过载时是否增加新的线程进行处理。 缺省:true               ip_ttl:TTL 为0,则报文只能 本机上使用。 为1 则只能在本地网络上使用。路由器会将其抛弃,标准TTL经过一个路由器,减1,所以不会传递到其它网络。文档缺省:32,代码是64               enable_diagnostics:是否打开内部检测。(没有找到相关说明,从代码上推测出来的) 缺省:true               thread_naming_pattern: 线程命名方式。 有效值可以是:pcl. 缺省:cl              p:之前的名称 c:clusterName 聚簇名称 l:localaddress地址                      use_concurrent_stack:是否使用并发栈。        thread_pool:        oob_thread_pool:             use_packet_handler:  -->    <UDP      ip_mcast="true"         mcast_addr="${jgroups.udp.mcast_addr:228.10.10.10}"         mcast_port="${jgroups.udp.mcast_port:45588}"                 ucast_recv_buf_size="20000000"         ucast_send_buf_size="640000"         mcast_recv_buf_size="25000000"         mcast_send_buf_size="640000"                 loopback="false"                 discard_incompatible_packets="true"     enable_bundling="true"         max_bundle_size="64000"         max_bundle_timeout="3000"                 use_incoming_packet_handler="true"                 ip_ttl="${jgroups.udp.ip_ttl:2}"                 enable_diagnostics="false"                 thread_naming_pattern="cl"          use_concurrent_stack="true"          thread_pool.enabled="true"         thread_pool.min_threads="2"         thread_pool.max_threads="8"         thread_pool.keep_alive_time="5000"         thread_pool.queue_enabled="true"         thread_pool.queue_max_size="1000"         thread_pool.rejection_policy="Run"          oob_thread_pool.enabled="true"         oob_thread_pool.min_threads="1"         oob_thread_pool.max_threads="8"         oob_thread_pool.keep_alive_time="5000"         oob_thread_pool.queue_enabled="false"         oob_thread_pool.queue_max_size="100"         oob_thread_pool.rejection_policy="Run"    />  <!--  * PING 协议     * timeout : 等待初始化成员超时时间(ms) 。缺省:3000     * property: 为FIND_INITAL_MBRS 找到的最小初始化成员数。缺省:2     * property: gossip_host - if you are using GOSSIP then this defines the host of the GossipRouter, default is null     * property: gossip_port - if you are using GOSSIP then this defines the port of the GossipRouter, default is null -->    <PING timeout="2000" num_initial_members="3"/>          <!--     当网络发生故障。可能将group 分隔开。     该协议用于将 被分隔开的 group 进行重新组合    -->    <MERGE2 max_interval="30000" min_interval="10000"/>          <!--     FD_SOCK:基于 TCP 套接字的故障检测。基于环 的 ping 被在邻居成员之间发送。    -->    <FD_SOCK/>       <!--     FD(Failure Detection 故障检测) heartbeat 方式     的作用就是探测组内的成员是否还活着。     当组内的成员被怀疑可能死掉了,     那么SUSPECT消息就会传播到集群的每一个节点上          timeout:等影响消息的 ms 数     max_tries:最大重试次数     shun:例如:一个组内有a,b,c,d四个成员,       当d由于负荷过高没有在timeout的时间内作出响应,       导致被踢出组的时候,a,b,c的组员view中只有abc三个member,       而d的view却还有abcd四个成员,此时如果D再发消息给组内的成员,       组内成员将会拒绝接受。那么如果设置shun为true的时候,       D就重新加入组内在下面两种情况:        1.ABC接受到了D发过来的are-you-alive的检测消息。        2.D自己收到了一个view消息,view内不包含D。(类似于重连机制)。               这点很重要特别是在分布式的环境中,当某些服务器的压力可能较高,       配置的超时时间又不确定是否可以满足高负荷响应。       记得要在GMS里面的shun也配置一样的情况。    -->    <FD timeout="10000" max_tries="5"   shun="true"/>       <!--     VERIFY_SUSPECT:发送消息以确保以前怀疑的成员已真正崩溃(crashed)     -->    <VERIFY_SUSPECT timeout="1500"  />          <BARRIER />       <!--     pbcast.NAKACK:保证消息的可靠性和顺序性     因为 基于底层的 UDP 协议,数据报不被可靠的传输。          其使用一个 NAK 的应答保证消息的可靠接收。     使用一个序号保证消息的顺序性          retransmit_timeout:类似 UNICAST 的 timeout     use_mcast_xmit:是否多播到cluser中    -->    <pbcast.NAKACK use_stats_for_retransmission="false"                   exponential_backoff="150"                   use_mcast_xmit="true" gc_lag="0"                   retransmit_timeout="50,300,600,1200"                   discard_delivered_msgs="true"/>                    <!--  UNICAST:实现可靠的单播传输。请求丢失消息的重新传输,并确保发出消息的正确排序    其通过 ACK 来进行确认。第一次超时时间为 300 ms . 第二次为 600 ms .... -->                      <UNICAST timeout="300,600,1200"/>       <!--     pbcast.STABLE:删除被全部member查看到的消息     (作用就是如果在分布式的情况下,其中某些member没有看到,     那么这条消息就不会被删除,会重发,保证消息广播的全局性)     pbcast.STABLE:实现分布式的垃圾收集协议 (也就是说,删除所有已被所有组成员接收到的消息)    -->    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000" max_bytes="1000000"/>    <VIEW_SYNC avg_send_interval="60000"   />    <pbcast.GMS print_local_addr="true" join_timeout="3000" shun="true" view_bundling="true"/>       <!--    FlowControl流控制服务    用于调用 发送端的发送速率和接收端的接收速率达到平衡    -->    <FC max_credits="500000" min_threshold="0.20"/>          <!--     分块协议:FRAG和FRAG2用来将大消息分块发送    -->    <FRAG2 frag_size="60000"  />          <!--pbcast.STREAMING_STATE_TRANSFER /-->       <pbcast.STATE_TRANSFER  />    <!-- pbcast.FLUSH  /--></config>  数据传输及网络层协议   这个协议是协议栈中相当重要的部分,它来完成数据的发送和接收。   JMS   使用Java Message Service (JMS)实现的传输协议,此协议依赖JMS服务器来分发消息,JMS服务器将发消息发送到特定的Topic下,所有订阅此Topic的将收到消息。关于JMS请访问:http://java.sun.com/products/jms/   需要注意的是当使用JMS作为数据传输协议时应避免使用那些打开服务器连接的协议,比如FD_SOCKET。这JMS里FD比FD_SOCKET更合适。(关于FD和FD_SOCKET请看下面的介绍)   参数表如下: 500)this.width=500'>   TP   这是一个传输协议的抽象类,TCP和UDP都是从这里继承而来,主要将将它的一些配置,这些配置都可以用再UDP和TCP协议上。   参数表如下: 500)this.width=500'>   TCP,TCP_NIO   使用TCP实现的传输协议,创建一个ServerSocket来监听消息。对于每个连接(accept())都会创建一个线程来监听其消息,所有的外发的消息对于每个目标地址都使用独立的线程来发送,这些线程是复用的。使用ConectionTable来实现以上机制。NIO使用java.nio实现,关于NIO请访问:http://java.sun.com/j2se/1.4.2/docs/guide/nio/参数表如下: 500)this.width=500'> 对于TCP_NIO有如下附加参数:500)this.width=500'>   UDP,UDP_NIO   使用UDP实现的组播协议,分别使用multicast socket和unicast socket来实现点到多点以及点到点。   参数表如下: 500)this.width=500'>   TUNNEL   UDP的替代品,使用一个连接到Router的TCP连接来代替UDP,所有外发的消息都会先发送到Router,然后再有Router来分发所有连接到此组的TUNNEL上。   此协议可以用来穿透防火墙,防火墙外的组成员使用Router连接到其的TCP连接向防火墙内发送消息。   参数表如下:500)this.width=500'> ========================================================================== 绑定网卡的问题 如果没有指定bind_addr和use_local_host为false,使用本机的第一块网卡。 UDP.java  createSockets()   if(bind_addr == null && !use_local_host) {            bind_addr=Util.getFirstNonLoopbackAddress();        }        if(bind_addr == null)            bind_addr=InetAddress.getLocalHost();         if(bind_addr != null)            if(log.isDebugEnabled()) log.debug("sockets will use interface " + bind_addr.getHostAddress());  


阅读全文(5230) | 回复(0) | 编辑 | 精华
 



发表评论:
昵称:
密码:
主页:
标题:
验证码:  (不区分大小写,请仔细填写,输错需重写评论内容!)



站点首页 | 联系我们 | 博客注册 | 博客登陆

Sponsored By W3CHINA
W3CHINA Blog 0.8 Processed in 0.063 second(s), page refreshed 144758643 times.
《全国人大常委会关于维护互联网安全的决定》  《计算机信息网络国际联网安全保护管理办法》
苏ICP备05006046号