1 /**
2 * Copyright 2003-2006 Greg Luck
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import net.sf.ehcache.CacheManager;
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22
23 import java.io.IOException;
24 import java.net.DatagramPacket;
25 import java.net.InetAddress;
26 import java.net.MulticastSocket;
27 import java.rmi.RemoteException;
28 import java.util.StringTokenizer;
29
30 /**
31 * Receives heartbeats from any {@link MulticastKeepaliveHeartbeatSender}s out there.
32 * <p/>
33 * Our own multicast heartbeats are ignored.
34 *
35 * @author Greg Luck
36 * @version $Id: MulticastKeepaliveHeartbeatReceiver.java 52 2006-04-24 14:50:03Z gregluck $
37 */
38 public final class MulticastKeepaliveHeartbeatReceiver {
39
40 private static final Log LOG = LogFactory.getLog(MulticastKeepaliveHeartbeatReceiver.class.getName());
41
42 private final InetAddress groupMulticastAddress;
43 private final Integer groupMulticastPort;
44 private MulticastReceiverThread receiverThread;
45 private MulticastSocket socket;
46 private boolean stopped;
47 private final MulticastRMICacheManagerPeerProvider peerProvider;
48
49 /**
50 * Constructor.
51 *
52 * @param peerProvider
53 * @param multicastAddress
54 * @param multicastPort
55 */
56 public MulticastKeepaliveHeartbeatReceiver(
57 MulticastRMICacheManagerPeerProvider peerProvider, InetAddress multicastAddress, Integer multicastPort) {
58 this.peerProvider = peerProvider;
59 this.groupMulticastAddress = multicastAddress;
60 this.groupMulticastPort = multicastPort;
61 }
62
63 /**
64 * Start.
65 * @throws IOException
66 */
67 final void init() throws IOException {
68
69 socket = new MulticastSocket(groupMulticastPort.intValue());
70 socket.joinGroup(groupMulticastAddress);
71 receiverThread = new MulticastReceiverThread();
72 receiverThread.start();
73 }
74
75 /**
76 * Shutdown the heartbeat.
77 */
78 public final void dispose() {
79 stopped = true;
80 receiverThread.interrupt();
81 }
82
83
84 /**
85 * A multicast receiver which continously receives heartbeats.
86 */
87 private final class MulticastReceiverThread extends Thread {
88
89
90
91 public final void run() {
92 byte[] buf = new byte[PayloadUtil.MTU];
93 while (!stopped) {
94 DatagramPacket packet = new DatagramPacket(buf, buf.length);
95 try {
96 socket.receive(packet);
97 byte[] payload = packet.getData();
98 processPayload(payload);
99
100
101 } catch (IOException e) {
102 if (!stopped) {
103 LOG.error("Error receiving heartbeat. " + e.getMessage() + ". Initial cause was " + e.getMessage(), e);
104 }
105 }
106 }
107 }
108
109 private void processPayload(byte[] compressedPayload) {
110 byte[] payload = PayloadUtil.ungzip(compressedPayload);
111 String rmiUrls = new String(payload);
112 if (self(rmiUrls)) {
113 return;
114 }
115 rmiUrls = rmiUrls.trim();
116 if (LOG.isTraceEnabled()) {
117 LOG.trace("rmiUrls received " + rmiUrls);
118 }
119 for (StringTokenizer stringTokenizer = new StringTokenizer(rmiUrls,
120 PayloadUtil.URL_DELIMITER); stringTokenizer.hasMoreTokens();) {
121 String rmiUrl = stringTokenizer.nextToken();
122 registerNotification(rmiUrl);
123 }
124 }
125
126
127 /**
128 * @param rmiUrls
129 * @return true if our own hostname and listener port are found in the list. This then means we have
130 * caught our onw multicast, and should be ignored.
131 */
132 private boolean self(String rmiUrls) {
133 CacheManager cacheManager = peerProvider.getCacheManager();
134 CachePeer peer = (CachePeer) cacheManager.getCachePeerListener().getBoundCachePeers().get(0);
135 String cacheManagerUrlBase = null;
136 try {
137 cacheManagerUrlBase = peer.getUrlBase();
138 } catch (RemoteException e) {
139 LOG.error("Error geting url base");
140 }
141 int baseUrlMatch = rmiUrls.indexOf(cacheManagerUrlBase);
142 return baseUrlMatch != -1;
143 }
144
145 private void registerNotification(String rmiUrl) {
146 peerProvider.registerPeer(rmiUrl);
147 }
148
149
150 /**
151 * {@inheritDoc}
152 */
153 public final void interrupt() {
154 try {
155 socket.leaveGroup(groupMulticastAddress);
156 } catch (IOException e) {
157 LOG.error("Error leaving group");
158 }
159 socket.close();
160 super.interrupt();
161 }
162 }
163
164
165 }