« | August 2025 | » | 日 | 一 | 二 | 三 | 四 | 五 | 六 | | | | | | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 | 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 | | | | | | | |
| 公告 |
戒除浮躁,读好书,交益友 |
Blog信息 |
blog名称:邢红瑞的blog 日志总数:523 评论数量:1142 留言数量:0 访问次数:9691313 建立时间:2004年12月20日 |

| |
[java语言]组播技术 和 JGroups(转 有修改) 文章收藏, 网上资源, 软件技术, 电脑与网络, 校园生活
邢红瑞 发表于 2006/7/21 14:40:15 |
IP数据包传输类型IPv4定义了3种IP数据包的传输:单播(unicast)广播(broadcast)组播(multicast).
比较一下unicast和multicast两种数据的传输方式可以发现,当一台主机向多个用户发送信息时,单播对于每一个用户都要发送一份数据的拷贝,而组播总共只需发送一份数据的拷贝。这样,组播的使用就大大的节省了带宽,减轻了网络的负载,从而更加有效的利用了网络的带宽资源.
IP组播和单播的目的地址不同,IP组播的目的地址是组地址.是从224.0.0.0到239.255.255.255之间的D类IP地址,其中224.0.0.0到224.0.0.255是被保留的地址,224.0.0.1表示子网中所有的组播组, 224.0.0.2表示子网中的所有路由器,224.0.0.5表示OSPF(Open Shortest Path First)路由器,224.0.0.6表示OSPF指定路由器,224.0.0.12表示DHCP服务器.
在D类地址的分配中,IETF建议遵循以下的原则: 全球范围:224.0.1.0~238.255.255.255; 有限范围:239.0.0.0~239.255.255.255; 本地站点范围:239.253.0.0~239.253.0.16本地机构范围:239.192.0.0~239.192.0.14.
Java 组播程序的例子
import java.io.IOException;import java.net.DatagramPacket;import java.net.InetAddress;import java.net.MulticastSocket;import java.net.NetworkInterface;
public class Learn2 { public static void main(String[] args) throws IOException { new MulticastServer().start(); new MulticastClient().start(); }
}
class MulticastServer extends Thread{ public void run(){ try{ MulticastSocket mcastSocket = new MulticastSocket(7777); InetAddress inetAddress = InetAddress.getByName("230.0.0.1"); mcastSocket.setNetworkInterface(NetworkInterface.getByInetAddress(InetAddress.getLocalHost())); mcastSocket.joinGroup(inetAddress); while (true) { byte [] arb = new byte [100]; DatagramPacket datagramPacket = new DatagramPacket(arb, arb.length); mcastSocket.receive(datagramPacket); System.out.println("Server:"+new String(arb,0,datagramPacket.getLength())); }
}catch(Exception dx){ dx.printStackTrace(); } }}
class MulticastClient extends Thread{ public void run(){ try{ while(true){ byte [] arb = new byte []{'h', 'e', 'l', 'l', 'o'}; InetAddress inetAddress = InetAddress.getByName("230.0.0.1"); DatagramPacket datagramPacket = new DatagramPacket(arb, arb.length, inetAddress, 7777); MulticastSocket multicastSocket = new MulticastSocket(); multicastSocket.send(datagramPacket); Thread.sleep(1000); } }catch(Exception dx){ dx.printStackTrace(); } } }使用jgroups的例子
import java.io.BufferedReader;import java.io.InputStreamReader;
import org.jgroups.Address;import org.jgroups.ChannelException;import org.jgroups.JChannel;import org.jgroups.Message;import org.jgroups.ReceiverAdapter;import org.jgroups.View;
public class Learn1 { private JChannel channel = null; public Learn1() throws ChannelException{ channel = new JChannel("c:/udp.xml"); } public static void main(String[] args) throws ChannelException { Learn1 learn = new Learn1(); learn.startup(); new SendMessage(learn.channel).start(); } private void startup() throws ChannelException{ channel.setReceiver(new ReceiverMessage()); //channel.setOpt(option, value); channel.connect("FL"); }}
class ReceiverMessage extends ReceiverAdapter{
public void receive(Message msg) { System.out.println(msg.getSrc() + ": " + msg.getObject()+" at "+System.currentTimeMillis()); } public void viewAccepted(View new_view) { System.out.println("viewAccepted: " + new_view); } public void suspect(Address suspected_mbr) { System.out.println("suspect: " + suspected_mbr); } }
class SendMessage extends Thread{ private JChannel channel; public SendMessage(JChannel channel){ this.channel = channel; } public void run(){ BufferedReader in=new BufferedReader(new InputStreamReader(System.in)); while(true) { try{ System.out.print("> "); System.out.flush(); String line=in.readLine().toLowerCase(); if(line.startsWith("quit") || line.startsWith("exit")) { System.exit(0); } Message msg=new Message(null, null, line); channel.send(msg); }catch(Exception dx){ dx.printStackTrace(); } } } }
使用的 udp.xml 及相关 参数说明
<!-- Default stack using IP multicasting. It is similar to the "udp" stack in stacks.xml, but doesn't use streaming state transfer and flushing author: Bela Ban version: $Id: udp.xml,v 1.24.2.1 2007/11/20 08:53:40 belaban Exp $-->
<config>
<!-- UDP 协议设置 ip_mcast:指定是否使用ip 多播 , default true 也可以通过 TCP 连接发送多个单播消息到成员,TCP也可以发送多播消息,但是这不是标准。 mcast_addr: 多播地址 缺省:228.8.8.8; mcast_port: 多播端口 缺省:7600 ucast_recv_buf_size="20000000" //单播接收缓冲区大小 缺省:64000 ucast_send_buf_size="640000" //单播发送缓冲区大小 缺省:32000 mcast_recv_buf_size="25000000" //组播接收缓冲区大小 缺省:64000 mcast_send_buf_size="640000" //组播发送缓冲区大小 缺省:32000 loopback :如果为true, 单播直接给自己。组播则同样直接给自己处理,并且之后组播。 当 enable_bundling 为 true 并将max_bundle_timeout设置一个足够大的值会体现出差别 缺省:false discard_incompatible_packets: 丟弃不同版本的数据包 缺省:false enable_bundling:指定是否能够捆绑消息。如果为真,则累积到max_bundle_size字节,或者max_bundle_time超时。再将其发出。缺省:false max_bundle_size:字节数,当消息累积字节数。当累积值超过该值则消息会被发送。缺省:65535 max_bundle_timeout:消息发送的超时毫秒数。队列中等待发送的消息超过该时间则被发送. 缺省:20 use_incoming_packet_handler:当接收消息过载时是否增加新的线程进行处理。 缺省:true ip_ttl:TTL 为0,则报文只能 本机上使用。 为1 则只能在本地网络上使用。路由器会将其抛弃,标准TTL经过一个路由器,减1,所以不会传递到其它网络。文档缺省:32,代码是64 enable_diagnostics:是否打开内部检测。(没有找到相关说明,从代码上推测出来的) 缺省:true thread_naming_pattern: 线程命名方式。 有效值可以是:pcl. 缺省:cl p:之前的名称 c:clusterName 聚簇名称 l:localaddress地址 use_concurrent_stack:是否使用并发栈。 thread_pool: oob_thread_pool: use_packet_handler:
--> <UDP ip_mcast="true" mcast_addr="${jgroups.udp.mcast_addr:228.10.10.10}" mcast_port="${jgroups.udp.mcast_port:45588}" ucast_recv_buf_size="20000000" ucast_send_buf_size="640000" mcast_recv_buf_size="25000000" mcast_send_buf_size="640000" loopback="false" discard_incompatible_packets="true" enable_bundling="true" max_bundle_size="64000" max_bundle_timeout="3000" use_incoming_packet_handler="true" ip_ttl="${jgroups.udp.ip_ttl:2}" enable_diagnostics="false" thread_naming_pattern="cl"
use_concurrent_stack="true"
thread_pool.enabled="true" thread_pool.min_threads="2" thread_pool.max_threads="8" thread_pool.keep_alive_time="5000" thread_pool.queue_enabled="true" thread_pool.queue_max_size="1000" thread_pool.rejection_policy="Run"
oob_thread_pool.enabled="true" oob_thread_pool.min_threads="1" oob_thread_pool.max_threads="8" oob_thread_pool.keep_alive_time="5000" oob_thread_pool.queue_enabled="false" oob_thread_pool.queue_max_size="100" oob_thread_pool.rejection_policy="Run" />
<!-- * PING 协议 * timeout : 等待初始化成员超时时间(ms) 。缺省:3000 * property: 为FIND_INITAL_MBRS 找到的最小初始化成员数。缺省:2 * property: gossip_host - if you are using GOSSIP then this defines the host of the GossipRouter, default is null * property: gossip_port - if you are using GOSSIP then this defines the port of the GossipRouter, default is null --> <PING timeout="2000" num_initial_members="3"/> <!-- 当网络发生故障。可能将group 分隔开。 该协议用于将 被分隔开的 group 进行重新组合 --> <MERGE2 max_interval="30000" min_interval="10000"/> <!-- FD_SOCK:基于 TCP 套接字的故障检测。基于环 的 ping 被在邻居成员之间发送。 --> <FD_SOCK/> <!-- FD(Failure Detection 故障检测) heartbeat 方式 的作用就是探测组内的成员是否还活着。 当组内的成员被怀疑可能死掉了, 那么SUSPECT消息就会传播到集群的每一个节点上 timeout:等影响消息的 ms 数 max_tries:最大重试次数 shun:例如:一个组内有a,b,c,d四个成员, 当d由于负荷过高没有在timeout的时间内作出响应, 导致被踢出组的时候,a,b,c的组员view中只有abc三个member, 而d的view却还有abcd四个成员,此时如果D再发消息给组内的成员, 组内成员将会拒绝接受。那么如果设置shun为true的时候, D就重新加入组内在下面两种情况: 1.ABC接受到了D发过来的are-you-alive的检测消息。 2.D自己收到了一个view消息,view内不包含D。(类似于重连机制)。 这点很重要特别是在分布式的环境中,当某些服务器的压力可能较高, 配置的超时时间又不确定是否可以满足高负荷响应。 记得要在GMS里面的shun也配置一样的情况。 --> <FD timeout="10000" max_tries="5" shun="true"/> <!-- VERIFY_SUSPECT:发送消息以确保以前怀疑的成员已真正崩溃(crashed) --> <VERIFY_SUSPECT timeout="1500" /> <BARRIER /> <!-- pbcast.NAKACK:保证消息的可靠性和顺序性 因为 基于底层的 UDP 协议,数据报不被可靠的传输。 其使用一个 NAK 的应答保证消息的可靠接收。 使用一个序号保证消息的顺序性 retransmit_timeout:类似 UNICAST 的 timeout use_mcast_xmit:是否多播到cluser中 --> <pbcast.NAKACK use_stats_for_retransmission="false" exponential_backoff="150" use_mcast_xmit="true" gc_lag="0" retransmit_timeout="50,300,600,1200" discard_delivered_msgs="true"/> <!-- UNICAST:实现可靠的单播传输。请求丢失消息的重新传输,并确保发出消息的正确排序 其通过 ACK 来进行确认。第一次超时时间为 300 ms . 第二次为 600 ms .... --> <UNICAST timeout="300,600,1200"/> <!-- pbcast.STABLE:删除被全部member查看到的消息 (作用就是如果在分布式的情况下,其中某些member没有看到, 那么这条消息就不会被删除,会重发,保证消息广播的全局性) pbcast.STABLE:实现分布式的垃圾收集协议 (也就是说,删除所有已被所有组成员接收到的消息) --> <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000" max_bytes="1000000"/> <VIEW_SYNC avg_send_interval="60000" /> <pbcast.GMS print_local_addr="true" join_timeout="3000" shun="true" view_bundling="true"/> <!-- FlowControl流控制服务 用于调用 发送端的发送速率和接收端的接收速率达到平衡 --> <FC max_credits="500000" min_threshold="0.20"/> <!-- 分块协议:FRAG和FRAG2用来将大消息分块发送 --> <FRAG2 frag_size="60000" /> <!--pbcast.STREAMING_STATE_TRANSFER /--> <pbcast.STATE_TRANSFER /> <!-- pbcast.FLUSH /--></config>
数据传输及网络层协议
这个协议是协议栈中相当重要的部分,它来完成数据的发送和接收。
JMS
使用Java Message Service (JMS)实现的传输协议,此协议依赖JMS服务器来分发消息,JMS服务器将发消息发送到特定的Topic下,所有订阅此Topic的将收到消息。关于JMS请访问:http://java.sun.com/products/jms/
需要注意的是当使用JMS作为数据传输协议时应避免使用那些打开服务器连接的协议,比如FD_SOCKET。这JMS里FD比FD_SOCKET更合适。(关于FD和FD_SOCKET请看下面的介绍)
参数表如下:
500)this.width=500'>
TP
这是一个传输协议的抽象类,TCP和UDP都是从这里继承而来,主要将将它的一些配置,这些配置都可以用再UDP和TCP协议上。
参数表如下:
500)this.width=500'>
TCP,TCP_NIO
使用TCP实现的传输协议,创建一个ServerSocket来监听消息。对于每个连接(accept())都会创建一个线程来监听其消息,所有的外发的消息对于每个目标地址都使用独立的线程来发送,这些线程是复用的。使用ConectionTable来实现以上机制。NIO使用java.nio实现,关于NIO请访问:http://java.sun.com/j2se/1.4.2/docs/guide/nio/参数表如下:
500)this.width=500'>
对于TCP_NIO有如下附加参数:500)this.width=500'>
UDP,UDP_NIO
使用UDP实现的组播协议,分别使用multicast socket和unicast socket来实现点到多点以及点到点。
参数表如下:
500)this.width=500'>
TUNNEL
UDP的替代品,使用一个连接到Router的TCP连接来代替UDP,所有外发的消息都会先发送到Router,然后再有Router来分发所有连接到此组的TUNNEL上。
此协议可以用来穿透防火墙,防火墙外的组成员使用Router连接到其的TCP连接向防火墙内发送消息。
参数表如下:500)this.width=500'>
==========================================================================
绑定网卡的问题
如果没有指定bind_addr和use_local_host为false,使用本机的第一块网卡。
UDP.java createSockets()
if(bind_addr == null && !use_local_host) { bind_addr=Util.getFirstNonLoopbackAddress(); } if(bind_addr == null) bind_addr=InetAddress.getLocalHost();
if(bind_addr != null) if(log.isDebugEnabled()) log.debug("sockets will use interface " + bind_addr.getHostAddress());
|
|
|