Book Home Java Distributed Computing Search this book

9.3. A Basic Collaborative Infrastructure

Before we explore some collaborative applications, let's take a look at a basic collaborative infrastructure implemented using some of the concepts presented earlier in this book. We'll build on this infrastructure to illustrate the development of various types of collaborative systems. The framework that we'll build involves a single mediator (the server) handling interactions among multiple collaborators (clients). Each collaborator has a unique identity, issued by the mediator, and each collaborator can either broadcast messages to all of the collaborators registered with the mediator, or it can send a message to a single collaborator.

One of the first steps in developing a collaborative system is deciding what kind of communications scheme is right for you. We've discussed several ways to connect remote agents, including basic socket communications, message passing, RMI remote objects, and CORBA remote objects. To illustrate our basic collaborative system, we'll show a version based on RMI and remote objects, and another version based on basic message passing. We'll start with the message-passing version, since this will let us look in detail at some of the dynamics of a distributed system involving many agents working together simultaneously. Then we'll look at ways to implement the same collaborative infrastructure using remote objects, where the object distribution system assumes responsibility for some of the issues we'll see in our message-passing version. For those readers only interested in the remote-object version, you can skip ahead to the section Section 9.3.2, "Collaborating with RMI".

9.3.1. Building the Infrastructure with Message Passing

To build a collaborative message-passing system, we'll start by expanding our message-passing classes from Chapter 6, "Message-Passing Systems" to handle multiple agents passing messages through a single MessageHandler. In that chapter, the final version of our message-passing framework (see Example 9-10 and Example 9-11) used MessageHandlers passing Messages to each other. Each Message object has an identifier and a set of arguments. Each MessageHandler runs in a loop, reading messages from the network, constructing Message objects from the data received, and calling the Message's Do() method to handle the message locally. The MessageHandler reconstructs the Message from the incoming network data using a list of prototype Message objects. The identifier for the message is used to pick the right Message prototype, a copy of this Message is made, and the new copy is told to read its arguments from the input data stream. The set of Message prototypes serves to define the message protocol that the MessageHandler understands, and can be updated on the fly if needed.

If we look back at Figure 9-1 and assume that the general collaborative system depicted there is implemented using message passing, we'll see that a server or an agent in a collaborative environment may have to send or receive messages from multiple remote agents. Our MessageHandler class from Example 9-10 only supports point-to-point message passing, so if we wanted to use it for a collaborative system we would either need to create a MessageHandler object for each agent we want to talk to, or we could upgrade the MessageHandler to manage multiple network connections to agents. In some applications, we may have tens, hundreds, or even thousands of agents collaborating with each other, so creating an entire MessageHandler object for each (in addition to a socket connection and its input and output streams) may be too inefficient. Also, in many collaborative applications we need to use the same message protocol with every agent in the system, so forcing ourselves to replicate the same "flavor" of Message-Handler many times over doesn't seem to make sense.

Example 9-1 shows our updated MessageHandler class, with support for multiple agent connections. Two utility classes have been added to help manage agent connections. The AgentConnection class simply holds a pair of input and output streams connected to a remote agent. The AgentHandler class takes care of listening to a particular agent for asynchronous messages. We'll see exactly how the AgentHandler is used as we look at the rest of the updated MessageHandler class.

Example 9-1. Multi-Agent Message Handler Class

package dcj.util.Collaborative;

import java.util.Vector;
import java.util.Hashtable;
import java.util.Enumeration;
import java.net.SocketException;
import java.io.*;

class PeerConnection {
  public PeerConnection(InputStream i, OutputStream o) {
    in = i;
    out = o;
  }
  public InputStream in;
  public OutputStream out;
}

class PeerHandler implements Runnable {
  int peerId;
  MessageHandler handler;
  public PeerHandler(int id, MessageHandler h) {
    peerId = id;
    handler = h;
  }
  
  public void run() {
    System.out.println("ph: Starting peer handler for peer " + peerId);
    while (true) {
      try {
        Message m = handler.readMsg(peerId);
        System.out.println("ph: Got a message from peer " + peerId);
        m.Do(null, null);
      }
      catch (IOException e) {}
    }
  }
}

public class MessageHandler implements Runnable
{
  // A global MessageHandler, for applications where one central
  // handler is used.
  public static MessageHandler current = null;

  Hashtable connections = new Hashtable();
  Hashtable handlers = new Hashtable();
  Vector msgPrototypes = new Vector();

  public MessageHandler() {}
  public MessageHandler(InputStream in, OutputStream out) {
    addPeer(0, in, out);
  }

  synchronized public int nextPeerId() {
    return connections.size();
  }

  synchronized public Vector getPeerIds() {
    Vector ids = new Vector();
    Enumeration e = connections.keys();
    while (e.hasMoreElements()) {
      ids.addElement((Integer)e.nextElement());
    }
    return ids;
  }

  synchronized public int addPeer(InputStream i, OutputStream o) {
    int nextId = nextPeerId();
    addPeer(nextId, i, o);
    return nextId;
  }

  synchronized public void addPeer(int id, InputStream i, OutputStream o) {
    connections.put(new Integer(id), new PeerConnection(i, o));
    PeerHandler ph = new PeerHandler(id, this);
    Thread phThread = new Thread(ph);
    phThread.start();
    handlers.put(new Integer(id), phThread);
  }

  synchronized public boolean removePeer(int id) {
    boolean success = false;
    Thread hthread = (Thread)handlers.remove(new Integer(id));
    if (hthread != null && connections.remove(new Integer(id)) != null) {
      hthread.stop();
      success = true;
    }
    
    return success;
  }

  synchronized protected PeerConnection getPeer(int id) {
    return (PeerConnection)connections.get(new Integer(id));
  }

  public void addMessageType(Message prototype) {
    msgPrototypes.addElement(prototype);
  }

  public Message readMsg(int id) throws IOException {
    Message msg = null;

    PeerConnection conn = getPeer(id);
    if (conn != null) {
      try {
        synchronized (conn.in) {
          DataInputStream din = new DataInputStream(conn.in);
          String msgId = din.readUTF();
          System.out.println("mh: Got message id " + msgId);
          msg = buildMessage(msgId);
          if (msg != null) {
            msg.readArgs(conn.in);
          }
          System.out.println("mh: Received complete message" + msg + ".");
        }
      }
      catch (SocketException s) {
        System.out.println("mm: Lost connection to peer " + id);
        removePeer(id);
        msg = null;
      }
      catch (Exception e) {
        msg = null;
      }
    }

    return msg;
  }

  // Send a message to a specific agent.
  public boolean sendMsg(Message msg, int id) throws IOException {
    boolean success = false;
    PeerConnection conn = getPeer(id);
    if (conn != null) {
      System.out.println("mh: Trying to lock on peer " + id);
      try {
        synchronized (conn.out) {
          System.out.println("mh: Got lock on peer " + id);
          DataOutputStream dout = new DataOutputStream(conn.out);
          System.out.println("mh: Printing message id...");
          dout.writeUTF(msg.messageID());
          System.out.println("mh: Printing message args...");
          msg.writeArgs(conn.out);
          success = true;
        }
      }
      catch (SocketException s) {
        System.out.println("mh: Lost connection to peer " + id);
        removePeer(id);
        success = false;
      }
      catch (Exception e) {
        success = false;
      }
    }
    return success;
  }

  // Broadcast a message to all connected agents.
  public boolean sendMsg(Message msg) throws IOException {
    Enumeration ids = connections.keys();
    boolean success = true;
    while (ids.hasMoreElements()) {
      Integer id = (Integer)ids.nextElement();
      System.out.println("mh: Attempting send to peer " + id.intValue());
      if (!sendMsg(msg, id.intValue()))
        success = false;
      else
        System.out.println("mh: Sent message to peer " + id.intValue());
    }
    return success;
  }

  // Default run() method does nothing...
  public void run() {}

  protected Message buildMessage(String msgId) {
    Message msg = null;
    int numMTypes = msgPrototypes.size();
    for (int i = 0; i < numMTypes; i++) {
      Message m = null;
      synchronized (msgPrototypes) {
        m = (Message)msgPrototypes.elementAt(i);
      }
      if (m.handles(msgId)) {
        msg = m.newCopy();
        msg.setId(msgId);
        break;
      }
    }
    return msg;
  }
}

The updated MessageHandler maintains a table of agent connections, associating each connection with an ID number. A set of methods for adding, removing, and getting agent connections has been added to the Message-Handler interface. Two addAgent() methods are provided: the first takes the InputStream and OutputStream connections to the agent as arguments, and assigns the next available ID to the new agent connection; the second additionally accepts an ID number that the caller wants assigned to the agent. The removeAgent() method removes the agent with a given ID number. The getAgent() method is protected, and is used internally by the Message-Handler class to get the AgentConnection associated with a particular agent.

We've also updated the MessageHandler class by changing its readMsg() and sendMsg() methods so that we can specify which agent to talk to. The readMsg() method now accepts the ID number of the agent from which to read a message. There are now two versions of the sendMsg() method. One accepts an ID number, and sends the given message to that agent. The other version just takes a Message as an argument, and broadcasts the message to all agents the MessageHandler is connected to.

When a new agent is added to the MessageHandler using one of the addAgent() methods, an AgentConnection is made to hold the InputStream and OutputStream connected to the agent, and the connection is stored in a Hashtable using the agent's ID number as the key. Next, an AgentHandler is created and given the ID number of the new agent, along with a reference to the MessageHandler. Then a new thread is created for the AgentHandler, and the new thread is started. The AgentHandler implements the Runnable interface, and its run() method is a loop that continuously attempts to read messages from its agent, using the readMsg() method on the MessageHandler:

  public void run() {
    System.out.println("ph: Starting peer handler for peer " + peerId);
    while (true) {
      try {
        Message m = handler.readMsg(peerId);
        m.Do();
      }
      catch (IOException e) {}
    }
  }

So this new and improved MessageHandler manages multiple connections by creating a thread for each agent that can asynchronously read messages and act on them. New agents can be added to the handler at any time. To support these asynchronous operations, the MessageHandler implementation has been synchronized in a number of places. The readMsg() and sendMsg() methods synchronize on the input and output streams of each agent, for example. All of the methods for adding and removing agents from the MessageHandler are also synchronized to allow asynchronous agent handling.

Now that we have a MessageHandler that can support message passing in a collaborative environment, let's build our collaborative infrastructure on top of it. We can think of collaborative systems as being composed of collaborators and mediators. Collaborators are the agents that work together towards the common goal of the system, and mediators serve to facilitate the communications among the collaborators. Referring back to Figure 9-1, collaborators are the agents in the system, and mediators are the servers.

Before we delve into details, one of the primary needs of collaborative systems is the need to provide an identity for each agent in the system, so that transactions can be targeted and traced to individual agents. To support this, we have the Identity class shown in Example 9-2. This class consists of a Properties list, with methods for getting and setting a name property, which is a String, and an id property, which is an integer. These two properties will be used to identify each collaborator in the system. The name property is a descriptive name that can be used to specify each collaborator in a user interface, for example. The integer id is an internal identifier used to tag each collaborator uniquely.

Example 9-2. An Identity Class

package dcj.util.Collaborative;

import java.util.Hashtable;
import java.io.Serializable;

public class Identity implements Serializable {
  Hashtable props = new Hashtable();

  public Identity(int id) { props.put("idnum", new Integer(id)); }
  
  public boolean equals(Object o) {
    boolean same = false;
    if (o != null && o.getClass() == this.getClass()) {
      Identity oi = (Identity)o;
      if (oi == this || 
           (oi.getId() == this.getId() &&
             ((oi.getName() == null && this.getName() == null) ||
              (oi.getName() != null && this.getName() != null &&
               oi.getName().compareTo(this.getName()) == 0)))) {
        same = true;
      }
    }
    return same;
  }

  public int    getId() {
    Integer idNum = (Integer)props.get("idnum");
    return idNum.intValue();
  }

  public String getName() { return (String)props.get("name"); }
  public void   setName(String n) { props.put("name", n); }

  public Object getProperty(Object key) {
    return props.get(key);
  }
  public void   setProperty(Object key, Object val) {
    props.put(key, val);
  }
}

Additional properties can be added to the property lists of the collaborators to further define the identity of an agent in a system, or to hold state information related to the collaborative application. We've made the Identity class implement the Serializable interface, so that Identity objects can be passed back and forth between agents on the network to tag destination and source agents for transactions. This will come in handy whether your system uses message passing with ObjectInputStreams and ObjectOutputStreams, or RMI with Identity object arguments to remote methods. Since this class is meant to be serializable, however, any object added to the Identity as a property value or key, using the setProperty() method, also needs to be serializable; all objects that the Identity object references at the time that it is serialized and sent over the network will also be serialized and transmitted.

A collaborator has some pretty basic functional requirements. It needs to have a unique identifier in the system, so that messages can be routed to it. It needs to be able to connect to mediators, or to other collaborators, to engage in communication with them. Finally, it needs to be able to send messages and to be notified of incoming messages. Example 9-3 shows a Collaborator interface that includes these abilities.

Example 9-3. A Collaborator Interface

package dcj.util.Collaborative;

import java.util.Properties;
import java.io.IOException;
import java.rmi.RemoteException;

public interface Collaborator {
  public Identity getIdentity();

  // Connect to a mediator - subclasses dictate properties needed
  public boolean connect(Properties p);

  // Outgoing messages/data
  public boolean send(String tag, String msg, Identity dst)
                 throws IOException;
  public boolean send(String tag, Object data, Identity dst)
                 throws IOException;
  public boolean broadcast(String tag, String msg)
                 throws IOException;
  public boolean broadcast(String tag, Object data)
                 throws IOException;

  // Incoming messages/data
  public boolean notify(String tag, String msg, Identity src)
                 throws IOException;
  public boolean notify(String tag, Object data, Identity src)
                 throws IOException;
}

The getIdentity() method returns an Identity for the Collaborator. This Identity may be given to the Collaborator when it connects to a mediator. The connect() method opens a connection to a remote collaborator or mediator. The values in the Properties argument to connect() specify how to locate the agent on the network. Some collaborators may only require a hostname and port number, others may need more information, like a registered name for the agent on an RMI server.

The Collaborator interface supports sending messages with its send() and broadcast() methods. These methods accept a message in the form of a tag or label string, which says what kind of message it is for the receiver, and the message itself, which is either a String or a generic Object. The send() methods also accept an Identity object, which specifies whom to send the message to. So a Collaborator sends a message to individual agents on the network using the send() methods, and it broadcasts messages to all of the agents using the broadcast() methods.

A Collaborator receives messages through its notify() methods. There are versions of notify() that accept a String or an Object as the body of the message. The notify() methods also have an Identity argument that specifies who sent the message. If the sender is unknown, the Identity argument will be null. When it is notified of the message, the Collaborator can react as needed, by adding the data in the message to a database, updating its display, or responding to the sender of the message.

A mediator has an equally simple set of tasks. It needs to be able to register new collaborators by providing them with unique identifiers, send messages to individual collaborators, and broadcast messages to all collaborators that it has registered. Example 9-4 shows a Mediator interface that supports these things. The newMember() method generates a unique identifier for a new collaborator. The removeMember() method removes the given collaborator from the Mediator's registry. The send() and broadcast() methods are analogous to the same methods on the Collaborator interface. Messages can be sent to individual agents using the send() methods, and they can be broadcast to all agents using the broadcast() methods.

Example 9-4. A Mediator Interface

package dcj.util.Collaborative;

import java.util.Vector;
import java.io.IOException;

public interface Mediator {
  public Identity newMember();
  public boolean  removeMember(Identity i);
  public Vector   getMembers();

  public boolean send(Identity to, Identity from, String mtag, String msg)
                 throws IOException;
  public boolean broadcast(Identity from, String mtag, String msg)
                 throws IOException;
  public boolean send(Identity to, Identity from, String mtag, Object data)
                 throws IOException;
  public boolean broadcast(Identity from, String mtag, Object data)
                 throws IOException;
}

We've been careful in designing the Collaborator and Mediator interfaces to allow for implementing these interfaces using whatever communications scheme the application developer chooses. Although the methods on the interfaces seem to suggest a message-passing scheme, the data in these "messages" could be passed using remote methods on RMI objects, or on CORBA implementations of the Collaborator and Mediator classes.

With our updated, multi-agent MessageHandler class in hand, implementing message-passing versions of the Collaborator and Mediator interfaces is a pretty simple matter. The MessageMediator class shown in Example 9-5 is an implementation of the Mediator interface that uses a MessageHandler to route messages back and forth between remote agents. The MessageMediator has a MessageHandler to route messages, a ServerSocket to accept socket connections from remote agents, and a port number that it listens to for connections. It also implements the Runnable interface so that it can sit in its own thread, listening for asynchronous connections from agents. This is the primary function of its run() method, where it creates the ServerSocket listening to its designated port, then loops continuously trying to accept connections over the socket. When a new connection is made, a new agent is added to the handler by calling its addAgent() method with the input and output streams from the Socket that is created to the agent. The Mediator creates a unique Identity for the agent by calling the newMember() method, which creates a new Identity and sets its ID number to the next available integer. Then a message is sent to the agent containing its Identity, so that it can identify itself in future messages.

Example 9-5. A Mediator Based on Message Passing

package dcj.util.Collaborative;

import java.lang.Runnable;
import java.util.Vector;
import java.util.Enumeration;
import java.net.ServerSocket;
import java.net.Socket;
import java.io.IOException;

public class MessageMediator implements Mediator, Runnable {
  MessageHandler mhandler = new MessageHandler();
  ServerSocket socket = null;
  int port = 5009;

  public MessageMediator(int p) {
    initHandler();
    port = p;
  }

  public MessageMediator() {
    initHandler();
  }

  protected void initHandler() {
    // Add the mediator message "prototype" to the handler
    Message m = new MediatorMessage(this);
    mhandler.addMessageType(m);
  }

  public void run() {
    // Make the server socket
    try {
      socket = new ServerSocket(port);
    }
    catch (IOException e) {
      System.out.println("Failed to bind to port " + port);
      return;
    }

    System.out.println("Mediator running on port " + port);
    
    // Listen for new clients...
    while (true) {
      try {
        Socket clientConn = socket.accept();
        Identity i = newMember();
        mhandler.addAgent(i.getId(), clientConn.getInputStream(),
                          clientConn.getOutputStream());
        System.out.println("Got new connection...");
        Message imsg = new Message("identity");
        imsg.addArg(i);
        mhandler.sendMsg(imsg, i.getId());
      }
      catch (Exception e) {}
    }
  }

  public Identity newMember() {
    int id = mhandler.nextAgentId();
    Identity i = new Identity(id);
    return i;
  }

  public boolean remove(Identity i) {
    int id = i.getId();
    boolean success = mhandler.removeAgent(id);
    return success;
  }

  public Vector getMembers() {
    Vector members = new Vector();
    Vector ids = mhandler.getAgentIds();
    Enumeration e = ids.elements();
    while (e.hasMoreElements()) {
      Integer id = (Integer)e.nextElement();
      Identity i = new Identity(id.intValue());
      members.addElement(i);
    }
    return members;
  }

  public boolean send(Identity to, Identity from, String mtag, String s)
                 throws IOException {
    boolean success = false;
    Message msg = new Message(mtag);
    msg.addArg(from);
    msg.addArg(s);
    return mhandler.sendMsg(msg, to.getId());
  }

  public boolean broadcast(Identity from, String mtag, String s)
                 throws IOException {
    System.out.println("mm: Broadcasting message \"" + mtag + s + "\"");
    Message msg = new Message(mtag);
    msg.addArg(from);
    msg.addArg(s);
    return mhandler.sendMsg(msg);
  }

  public boolean send(Identity to, Identity from, String mtag, Object o)
                 throws IOException {
    Message msg = new Message(mtag);
    msg.addArg(from);
    msg.addArg(o);
    return mhandler.sendMsg(msg, to.getId());
  }

  public boolean broadcast(Identity from, String mtag, Object o)
                 throws IOException {
    Message msg = new Message(mtag);
    msg.addArg(from);
    msg.addArg(o);
    return mhandler.sendMsg(msg);
  }
}

The MessageMediator initializes its MessageHandler in each of its constructors by calling the protected initHandler() method. This method adds a MediatorMessage to the handler's list of "prototype" messages. The MediatorMessage, shown in Example 9-6, keeps a reference to a Mediator, and its Do() method handles messages by checking the type of message and calling the appropriate method on its Mediator. This is the only message prototype added to the MessageHandler, and its handles() method always returns true, so all messages received by the MessageHandler will be handled by this message. If the message has a type of "send," then the next four arguments are assumed to be: an Identity object specifying the source of the message, another for the destination of the message, a String message tag, and a String or Object message body. These four arguments are passed into a call to the Mediator's send() method. If a message with a type of "broadcast" is received, then only three arguments are expected: the Identity of the sender, a String message tag, and a String or Object message body. These three arguments are passed into the Mediator's broadcast() method.

Example 9-6. A Mediator Message

package dcj.util.Collaborative;

import java.io.*;
import java.util.Vector;

public class MediatorMessage extends Message
{
  protected Mediator mediator = null;

  public MediatorMessage(Mediator m) {
    mediator = m;
  }

  public MediatorMessage(String mid) {
    super(mid);
  }

  public boolean Do()
  {
    boolean success = false;
    
    try {
      String mtype = messageID();
      if (mtype.compareTo("send") == 0) {
        Identity from = (Identity)getArg(0);
        Identity to = (Identity)getArg(1);
        String tag = (String)getArg(2);
        try {
          String s = (String)getArg(3);
          mediator.send(to, from, tag, s);
          success = true;
        }
        catch (ClassCastException cce) {
          // Argument wasn't a String, so send it as an Object
          Object oarg = getArg(3);
          mediator.send(to, from, tag, oarg);
          success = true;
        }
      }
      else if (mtype.compareTo("broadcast") == 0) {
        System.out.println("mm: Got broadcast message.");
        Identity from = (Identity)getArg(0);
        String tag = (String)getArg(1);
        System.out.println("mm: tag = \"" + tag + "\"");
        try {
          String s = (String)getArg(2);
          mediator.broadcast(from, tag, s);
          success = true;
        }
        catch (ClassCastException cce) {
          Object oarg = getArg(2);
          mediator.broadcast(from, tag, oarg);
        }
      }
    }
    catch (Exception e) {
      success = false;
      System.out.println("mm: Error parsing message.");
      e.printStackTrace();
    }
    return success;
  }

  // We want to handle all messages.
  public boolean handles(String msgId) { return true; }

  public Message newCopy() {
    MediatorMessage copy;
    if (mediator != null) {
      // Make a new MediatorMessage with the same Mediator
      copy = new MediatorMessage(mediator);
      copy.setId(messageID());
    }
    else {
      copy = new MediatorMessage(messageID());
    }
    return copy;
  }
}

The remainder of the MessageMediator implementation consists of the send() and broadcast() methods. These methods simply take the arguments passed to them, bundle them into Messages, and instruct the Message-Handler to send them to the appropriate recipients. The send() methods call the MessageHandler.sendMsg() method with the ID number of the destination agent, while the broadcast() methods call the MessageHandler.send-Msg() method with no ID number; this causes the MessageHandler to send the message to all of the agents in its list.

The last piece of our message-passing collaborative system is a Collaborator implemented using message passing. The MessageCollaborator shown in Example 9-7 implements the Collaborator interface using a Message-Handler. Two constructors are provided: one with just a name for the collaborator, and the other with a name along with the host and port number of a mediator to which to connect. Both constructors initialize the MessageHandler, and the second version goes on to put the host and port number into a Properties list and call the connect() method to connect to the mediator at that network address. The initHandler() method simply adds a CollaboratorMessage to the message prototype list on the MessageHandler. This Message subclass, shown in Example 9-8, is even simpler than the MediatorMessage in Example 9-6. The Do() method takes the message ID, assumes that the first argument is the Identity of the sender and that the second argument is the body of the message, and calls the collaborator's notify() method with these arguments. The CollaboratorMessage also handles all messages by returning a default of true from its handles() method.

Example 9-7. A Message-Passing Collaborator

package dcj.util.Collaborative;

import java.io.IOException;
import java.net.Socket;
import java.util.Properties;

public class MessageCollaborator implements Collaborator
{
  MessageHandler handler = new MessageHandler();
  Identity id = null;
  String name;

  public MessageCollaborator(String n) {
    name = n;
    initHandler();
  }

  public MessageCollaborator(String host, int port, String n) {
    initHandler();
    name = n;
    Properties p = new Properties();
    p.put("host", host);
    p.put("port", String.valueOf(port));
    connect(p);
  }

  protected void initHandler() {
    handler.addMessageType(new CollaboratorMessage(this));
  }

  public Identity getIdentity() { return id; }

  public boolean connect(Properties p) {
    boolean success = false;

    String host = p.getProperty("host");
    String itmp = p.getProperty("port");
    if (host != null && itmp != null) {
      try {
        int port = Integer.parseInt(itmp);
        // Make a socket connection to the mediator.
        Socket mConn = new Socket(host, port);
        int pid = handler.addAgent(mConn.getInputStream(),
                                  mConn.getOutputStream());
        System.out.println("Got socket to Mediator, id = " + id + "...");
        // The mediator should send us an identity in a message...
        Message imsg = handler.readMsg(pid);
        System.out.println("Got message with id = " + imsg.messageID());
        if (imsg.messageID().compareTo("identity") == 0) {
          id = (Identity)imsg.getArg(0);
          id.setName(name);
          System.out.println("Got identity from mediator, id = "
                             + id.getId() + "...");
          success = true;
        }
        else {
          handler.removeAgent(pid);
          success = false;
        }
      }
      catch (Exception e) {
        success = false;
      }
    }
    else {
      success = false;
    }
    return success;
  }

  public boolean send(String tag, String msg, Identity dst)
                 throws IOException {
    boolean success = false;
    Message m = new Message("send");
    m.addArg(getIdentity());
    m.addArg(dst);
    m.addArg(tag);
    m.addArg(msg);
    success = handler.sendMsg(m);
    return success;
  }

  public boolean send(String tag, Object data, Identity dst)
                 throws IOException {
    boolean success = false;
    Message m = new Message("send");
    m.addArg(getIdentity());
    m.addArg(dst);
    m.addArg(tag);
    m.addArg("#OBJ");
    m.addArg(data);
    success = handler.sendMsg(m);
    return success;
  }

  public boolean broadcast(String tag, String msg)
                 throws IOException {
    boolean success = false;
    Message m = new Message("broadcast");
    m.addArg(getIdentity());
    m.addArg(tag);
    m.addArg(msg);
    System.out.println("mc: Sending broadcast message \"" + tag + "\"");
    success = handler.sendMsg(m);
    System.out.println("mc: success = " + success);
    return success;
  }

  public boolean broadcast(String tag, Object data)
                 throws IOException {
    boolean success = true;
    Message m = new Message("broadcast");
    m.addArg(getIdentity());
    m.addArg(tag);
    m.addArg("#OBJ");
    m.addArg(data);
    success = handler.sendMsg(m);
    return success;
  }

  public boolean notify(String tag, String msg, Identity src)
                 throws IOException {
    System.out.println("Received \"" + tag + "\" message \""
                       + msg + "\" from " + src.getName());
    return true;
  }

  public boolean notify(String tag, Object data, Identity src)
                 throws IOException {
    System.out.println("Received \"" + tag + "\" object \""
                       + data + "\" from " + src.getName());
    return true;
  }
}

The connect() method on the MessageCollaborator assumes that a host and port number will be in the Properties list passed to it. These are used to make a socket connection to that address. Once the connection is made, the input and output streams from the socket are passed to the addAgent() method on the MessageHandler. This adds the mediator at the other end of the socket to the list of agents in our MessageHandler. Since the first thing the MessageMediator does is send the collaborator a message with its new Identity with the mediator, the next step in the connect() method is to read the message from the mediator and get our Identity. If we fail to get an Identity from the mediator, then we remove the mediator from the MessageHandler agent list by calling its removeAgent() method.

The rest of the MessageCollaborator is the implementation of the send(), broadcast(), and notify() methods. These are implemented much the same as on the MessageMediator. The send() methods bundle the source (the local Identity), destination, message type, and message body into a Message with an ID of "send," and send it to the mediator using the sendMsg() method on the MessageHandler. The broadcast() methods bundle the source Identity, the message type, and the message body into a Message with an ID of "broadcast," and send it with the MessageHandler.sendMsg() method. The notify() methods implemented here simply print out an indication that a message has been received. Subclasses would override these methods to check the message type or body, and react accordingly.

Example 9-8. A Collaborator Message

package dcj.util.Collaborative;

import java.io.*;
import java.util.Vector;

public class CollaboratorMessage extends Message
{
  protected Collaborator collaborator = null;

  public CollaboratorMessage(Collaborator c) {
    collaborator = c;
  }

  public CollaboratorMessage(String mid) {
    super(mid);
  }

  public boolean Do()
  {
    boolean success = false;

    try {
      String mtype = messageID();
      Identity from = (Identity)getArg(0);
      try {
        String s = (String)getArg(1);
        collaborator.notify(mtype, s, from);
        success = true;
      }
      catch (ClassCastException cce) {
        // Argument isn't a string, so send it as an object
        Object oarg = getArg(1);
        collaborator.notify(mtype, oarg, from);
        success = true;
      }
    }
    catch (Exception e) {
      success = false;
    }
    return success;
  }

  // We want to handle all messages to the collaborator
  public boolean handles(String msgId) { return true; }

  public Message newCopy() {
    CollaboratorMessage copy;
    if (collaborator != null) {
      // Make a new CollaboratorMessage with the same Collaborator
      copy = new CollaboratorMessage(collaborator);
      copy.setId(messageID());
    }
    else {
      copy = new CollaboratorMessage(messageID());
    }
    return copy;
  }
}

Our complete message-passing infrastructure allows us to create a MessageMediator on a given port number on a host. Then any client can connect to the mediator by creating a MessageCollaborator using the mediator's host and port number, and engage in a collaborative exercise with any other agent connected to it using the collaborator's send() and broadcast() methods. Each connection the MessageMediator accepts is serviced in a separate thread by an AgentHandler, which listens for messages from that agent and tells the MessageMediator to route them to the right Collaborators.

If we wanted to support the complete collaborative environment depicted in Figure 9-1, in addition to each Mediator serving multiple Collaborators, we would also want each Collaborator to be able to connect to more than one Mediator. We may want to have clusters of Mediators serving different portions of the overall community. This is a simple extension to the Collaborator and MessageCollaborator interfaces. First, the Message-Collaborator would need to maintain a table of mediators that it was connected to, along with their identities. The identity of a mediator could be as simple as an ID number that the MessageCollaborator generates on its own as a unique local identifier for the Mediator. A Hashtable could be used to store the table of Collaborators and their Identitys. The send() and broadcast() methods would need to include a new Identity argument, to specify which Mediator to route the message through. We may also want to add methods to broadcast a message through all available Mediators (e.g., a broadcastAll() method).

9.3.2. Collaborating with RMI

We implemented our message-passing version of a collaborative infrastructure to demonstrate the communication issues that the system needs to handle; now let's see what a version implemented in RMI would look like, and what pieces of the puzzle RMI handles for us. We'll construct our RMI collaboration system so that a Mediator is registered as an RMI server object to which remote Collaborators connect. The Collaborators can then register themselves with the Mediator by passing stub references to themselves through a remote method call to the Mediator. Once the Mediator has a stub for the Collaborator objects, and each Collaborator has a stub for the Mediator, the Collaborators can exchange messages by calling the appropriate method on the Mediator stub, which in turn passes the message to the appropriate Collaborator by calling the notify() method on its stub.

First, remember that a remote RMI object must have a stub interface that implements the java.rmi.Remote interface, and each method on the interface must throw a java.rmi.RemoteException. Also, the Remote object has to implement the Remote interface from the top of its inheritance tree (i.e., a remote object cannot implement a non-Remote interface). For these reasons, we need new versions of our Collaborator and Mediator interfaces for the RMI version of our system. The RMICollaborator interface in Example 9-9 has essentially the same interface as the Collaborator from Example 9-3, except that it implements java.rmi.Remote, and all methods throw the java.rmi.RemoteException in addition to any Exceptions that the original Collaborator interface throws.

Example 9-9. An RMI Collaborator Interface

package dcj.util.Collaborative;

import java.rmi.RemoteException;
import java.io.IOException;
import java.rmi.Remote;
import java.util.Properties;

public interface RMICollaborator extends Remote
{
  public Identity getIdentity() throws RemoteException;

  // Connect to a mediator - subclasses dictate properties needed
  public boolean connect(Properties p) throws RemoteException;

  // Outgoing messages/data
  public boolean send(String tag, String msg, Identity dst)
                 throws IOException, RemoteException;
  public boolean send(String tag, Object data, Identity dst)
                 throws IOException, RemoteException;
  public boolean broadcast(String tag, String msg)
                 throws IOException, RemoteException;
  public boolean broadcast(String tag, Object data)
                 throws IOException, RemoteException;

  // Incoming messages/data
  public boolean notify(String tag, String msg, Identity src)
                 throws IOException, RemoteException;
  public boolean notify(String tag, Object data, Identity src)
                 throws IOException, RemoteException;
}

The RMIMediator interface in Example 9-10 is adapted in the same way from the Mediator interface in Example 9-4, except that a new register() method has been added to allow each RMICollaborator to register itself with the RMIMediator once it has a stub.

Example 9-10. An RMI Mediator Interface

package dcj.util.Collaborative;

import java.rmi.RemoteException;
import java.io.IOException;
import java.rmi.Remote;
import java.util.Vector;

public interface RMIMediator extends Remote
{
  public boolean register(Identity i, RMICollaborator c)
                 throws RemoteException;
  public Identity newMember() throws RemoteException;
  public boolean  remove(Identity i) throws RemoteException;
  public Vector   getMembers() throws RemoteException;

  public boolean send(Identity to, Identity from, String mtag, String msg)
                 throws IOException, RemoteException;
  public boolean broadcast(Identity from, String mtag, String msg)
                 throws IOException, RemoteException;
  public boolean send(Identity to, Identity from, String mtag, Object data)
                 throws IOException, RemoteException;
  public boolean broadcast(Identity from, String mtag, Object data)
                 throws IOException, RemoteException;
}

The implementations of our RMI-based collaborator and mediator are surprisingly similar to our message-passing versions. The RMICollaboratorImpl in Example 9-11 has two constructors: one with just a name for the collaborator, the other taking a name, a host name, and the name of the remote RMIMediator object to lookup. The first constructor saves the name within an Identity object for the collaborator. The second does the same, then adds the host name and remote object name to a property list and calls the connect() method. The connect() method expects a host name saved as the host property in the Properties argument, and the name of a remote RMIMediator object as the mediatorName in the property list. Once it has these, the connect() method attempts to retrieve a stub to the remote mediator using the Naming.lookup() method with a URL constructed from the host name and the object name. Once the stub is received, the collaborator asks the RMIMediator for a new unique Identity by calling its newMember() method, then registers itself with the mediator by calling its register() method with a reference to itself and the new Identity.

Example 9-11. Implementation of an RMI Collaborator

package dcj.util.Collaborative;

import java.io.IOException;
import java.util.Properties;
import java.rmi.Naming;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.rmi.RMISecurityManager;

public class RMICollaboratorImpl extends UnicastRemoteObject 
                                 implements RMICollaborator
{
  protected Identity id = null;
  protected RMIMediator mediator = null;

  public RMICollaboratorImpl(String name, String host, String mname)
         throws RemoteException {
    id = new Identity(0);
    id.setName(name);
    Properties p = new Properties();
    p.put("host", host);
    p.put("mediatorName", mname);
    connect(p);
  }

  public RMICollaboratorImpl(String name) throws RemoteException {
    id = new Identity(0);
    id.setName(name);
  }
  
  public Identity getIdentity() throws RemoteException { return id; }

  public boolean connect(Properties p) throws RemoteException {
    boolean success = false;
    String host = p.getProperty("host");
    String mName = p.getProperty("mediatorName");
    if (host != null && mName != null) {
      try {
        String url = "rmi://" + host + "/" + mName;
        System.out.println("looking up " + url);
        mediator = (RMIMediator)Naming.lookup(url);
        System.out.println("Got mediator " + mediator);
        Identity newId = mediator.newMember();
        mediator.register(newId, this);
        newId.setName(id.getName());
        id = newId;
        success = true;
      }
      catch (Exception e) {
        e.printStackTrace();
        success = false;
      }
    }

    return success;
  }

  public boolean send(String tag, String msg, Identity dst)
                 throws IOException, RemoteException {
    boolean success = false;
    if (mediator != null) {
      success = mediator.send(dst, getIdentity(), tag, msg);
    }
    return success;
  }

  public boolean send(String tag, Object data, Identity dst)
                 throws IOException, RemoteException {
    boolean success = false;
    if (mediator != null) {
      success = mediator.send(dst, getIdentity(), tag, data);
    }
    return success;
  }

  public boolean broadcast(String tag, String msg)
                 throws IOException, RemoteException {
    boolean success = false;
    if (mediator != null) {
      success = mediator.broadcast(getIdentity(), tag, msg);
    }
    return success;
  }

  public boolean broadcast(String tag, Object data)
                 throws IOException, RemoteException {
    boolean success = false;
    if (mediator != null) {
      success = mediator.broadcast(getIdentity(), tag, data);
    }
    return success;
  }

  public boolean notify(String tag, String msg, Identity src)
                 throws IOException, RemoteException {
    System.out.println("Got message: \"" + tag + " " + msg + "\""
                       + " from " + src.getName());
    return true;
  }

  public boolean notify(String tag, Object data, Identity src)
                 throws IOException, RemoteException {
    System.out.println("Got message: \"" + tag + " " + data + "\""
                       + " from " + src.getName());
    return true;
  }
  
  public static void main(String argv[]) {
    // Install a security manager
    System.setSecurityManager(new RMISecurityManager());
    try {
      String name = argv[0];
      String host = argv[1];
      String mname = argv[2];
      Properties props = new Properties();
      props.put("host", host);
      props.put("mediatorName", mname);
      RMICollaboratorImpl c = new RMICollaboratorImpl(name);
      if (c.connect(props)) {
        System.out.println("Got mediator...");
        c.broadcast("msg", "hello world");
      }
    }
    catch (Exception e) {
      System.out.println("Caught exception:");
      e.printStackTrace();
    }
  }
}

Once the stub to the RMIMediator has been received, the RMICollaboratorImpl simply calls methods on the remote object to implement its send() and broadcast() methods. The send() methods call the mediator's send() methods with the appropriate arguments; the same goes for the broadcast() methods. Since the calls to send() and broadcast() are remote method calls, any Objects passed as the body of a message to the RMIMediator must implement the Serializable interface, or an exception will result when the remote methods are called. Again, the implementation of the notify() methods simply print out some text indicating that a message has been received.

The RMICollaboratorImpl interface also includes a main() method that demonstrates the use of the class with a mediator. The method takes command-line arguments that specify the name of the collaborator, the host for the mediator, and the name under which the mediator is registered. It creates an RMICollaboratorImpl object with the given name, then tells it to connect to the mediator registered as an RMI object under the given mediator name on the remote host. If it connects successfully, then we broadcast a friendly message to the other collaborators connected to the mediator.

The RMIMediatorImpl in Example 9-12 implements our RMIMediator interface. Its newMember() method generates a unique Identity for a collaborator, while its register() method adds its Identity and RMICollaborator arguments to a table of collaborators currently connected to the mediator. The remove() method removes the identified collaborator from the table of connected clients. The send() methods on RMIMediatorImpl retrieve the referenced RMICollaborator from its internal table, and call the notify() method on the collaborator with the appropriate arguments. The broadcast() methods iterate through all of the RMICollaborators in the table, calling each one's notify() method with the message from the remote RMICollaborator.

Example 9-12. Implementation of an RMI Mediator

package dcj.util.Collaborative;

import java.util.Vector;
import java.util.Hashtable;
import java.util.Enumeration;
import java.io.IOException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.rmi.RMISecurityManager;

public class RMIMediatorImpl extends UnicastRemoteObject 
                             implements RMIMediator
{
  Hashtable clients = new Hashtable();
  Vector idList = new Vector();

  public RMIMediatorImpl() throws RemoteException {
    super();
  }

  public boolean register(Identity i, RMICollaborator c)
                 throws RemoteException {
    System.out.println("Registering member " + i.getId()
                       + " as " + c.getIdentity().getName());
    clients.put(i, c);
    return true;
  }

  public Identity newMember() throws RemoteException {
    int max = -1;
    boolean found = true;
    Enumerator enum;
    synchronized (idList) {
       enum = idList.elements();
    }
    while (enum.hasMoreElements()) {
      Integer i = enum.nextElement();
      if (i.intValue() > max) {
        max = i.intValue();
      }
    }

    Identity newId = new Identity(max + 1);
    synchronized (idList) {
      idList.addElement(newId);
    }
    return newId;
  }

  public boolean remove(Identity i) throws RemoteException {
    boolean success = true;
    synchronized (idList, clients) {
      if (idList.removeElement(i) && clients.remove(i) != null) {
        success = true;
      }
      else {
        success = false;
      }
    }
    return success;
  }

  public Vector getMembers() throws RemoteException {
    synchronized (idList) {
      return (Vector)idList.clone();
    }
  }

  public boolean send(Identity to, Identity from, String mtag, String msg)
                 throws IOException, RemoteException {
    boolean success = false;
    RMICollaborator c = getMember(to);
    synchronized (c) {
      if (c != null) {
        success = c.notify(mtag, msg, from);
      }
    }

    return success;
  }

  public boolean send(Identity to, Identity from, String mtag, Object data)
                 throws IOException, RemoteException {
    boolean success = false;
    RMICollaborator c = getMember(to);
    synchronized (c) {
      if (c != null) {
        success = c.notify(mtag, data, from);
      }
    }
    return success;
  }

  public boolean broadcast(Identity from, String mtag, String msg)
                 throws IOException, RemoteException {
    System.out.println("Broadcasting...");
    boolean success = true;
    Enumeration ids;
    synchronized (clients) {
      ids = clients.keys();
    }
    RMICollaborator target = null;
    while (ids.hasMoreElements()) {
      Identity i = (Identity)ids.nextElement();
      synchronized (clients) {
        target = (RMICollaborator)clients.get(i);
      }
      synchronized (target) {
        if (target == null ||
            !target.notify(mtag, msg, from)) {
          success = false;
        }
      }
    }
    return success;
  }

  public boolean broadcast(Identity from, String mtag, Object data)
                 throws IOException, RemoteException {
    boolean success = true;
    Enumeration ids;
    synchronized (ids) {
      ids = clients.keys();
    }
    RMICollaborator target = null;
    while (ids.hasMoreElements()) {
      Identity i = (Identity)ids.nextElement();
      synchronized (clients) {
        target = (RMICollaborator)clients.get(i);
      }
      synchronized (target) {
        if (target == null ||
            !target.notify(mtag, data, from)) {
          success = false;
        }
      }
    }
    return success;
  }

  protected RMICollaborator getMember(Identity i) {
    Enumeration ids;
    synchronized (clients) {
      ids = clients.keys();
    }
    RMICollaborator c = null;
    Identity tmp;
    while (c == null && ids.hasMoreElements()) {
       tmp = (Identity)ids.nextElement();
       if (tmp.equals(i)) {
         synchronized (clients) {
           c = (RMICollaborator)clients.get(tmp);
         }
       }
    }
    return c;
  }
  
  public static void main(String argv[]) {
    // Install a security manager
    System.setSecurityManager(new RMISecurityManager());

    try {
      String name = "TheMediator";
      System.out.println("Registering RMIMediatorImpl as \""
                         + name + "\"");
      RMIMediatorImpl mediator = new RMIMediatorImpl();
      System.out.println("Created mediator, binding...");
      Naming.rebind(name, mediator);
      System.out.println("Remote mediator ready...");
    }
    catch (Exception e) {
      System.out.println("Caught exception while registering: " + e);
    }
  }
}

At the end of the RMIMediatorImpl interface is a main() method that can be used to register a mediator with a local RMI registry. It just creates an RMIMediatorImpl object and binds it with the RMI Naming service under the name TheMediator.

If you compare our RMI implementation of a collaborative system to our message-passing one, you'll notice that they are fairly similar in structure, with the exception that there is no equivalent to the MessageHandler in our RMI-based system. We don't need one; RMI handles the functionality provided by the MessageHandler internally when it marshals, transmits, and then unmarshals a remote method call's arguments between the RMICollaborator and the RMIMediator, and vice versa. It's also important to notice that, while the RMI connection between the two objects allows for asynchronous remote method calls between the two, we need to ensure that the RMIMediatorImpl implementation is multithread-safe, so that multiple connected RMICollaborators can asynchronously route messages by remotely calling its send() and broadcast() methods. We do this by synchronizing any code segments that directly access the RMIMediatorImpl's data members, including the RMICollaborator stub references. By doing this, and by including the collaborators among the objects on which we synchronize, we ensure that asynchronous methods calls by remote agents do not interfere with each other, and we indirectly protect the remote RMICollaboratorImpl from asynchronous method calls by synchronizing locally on our stub reference. If we wanted the collaborator to have access to multiple mediators, then this measure wouldn't help, since each mediator could call methods on the collaborator asynchronously with respect to the other Mediators. If this was the case, we would have to ensure that the RMI-CollaboratorImpl methods were also multithread-safe.

9.3.3. Summary

The basic collaborative utility that we've built, in both message-passing and RMI flavors, can deal with asynchronous handling of multiple remote agents by a single mediator. The mediators are capable of issuing unique identities to each collaborator that is registered with it. And while we've provided a simple interface for sending point-to-point or broadcast messages across the system, we could implement specialized mediator and collaborator subclasses that use a custom interface to communicate.



Library Navigation Links

Copyright © 2001 O'Reilly & Associates. All rights reserved.