Book Home Java Distributed Computing Search this book

6.5. Adaptable Protocols

There are situations where an application's message protocol may change while the application is running. First we'll examine the ways in which a message protocol can vary during runtime, then we'll see how our message-passing facility would need to be modified to support these adaptable message protocols.

6.5.1. Variable Number of Arguments

The most common situation encountered in message passing is a message whose argument list varies in length. Some arguments are actually lists of values, whose size is not known by the receiver beforehand. A multiuser server may provide a way to ask for its current list of users, for example. Optional arguments are another cause for variable-length argument lists. Continuing with the multiuser server example, a request for its list of current users may be a message with some optional arguments that control how much data is returned. A "verbose" argument may indicate that the caller wants all available information on the current users. When this argument is not present, a more condensed set of data is returned.

These possibilities imply that the recipient of a message may require information on the number of arguments to expect. In our previous examples, the message objects handling incoming messages were hardwired to expect a given number of messages, with each argument always having the same meaning. The readArgs() method on MoveMessage in Example 6-7 reads in the expected argument list, then waits for the end-of-message string. If any additional arguments were included with the message, they would be skipped over in the while loop that searches for the end of the message. If an argument were left off of the message, then the end-of-message token would be read in (erroneously) as the last message argument, and readArgs() would wait for another end-of-message token. If the remote agent sends another message, its contents would be skipped and the end-of-message token would trigger an exit from the readArgs() method. If the remote agent is waiting for a response before sending another message, then communications would be deadlocked at this point--the receiver would be waiting for an end-of-message token that it had already read, and the sender would be waiting for the receiver to process the message and send a response.

6.5.2. Variable Message Argument Types

Occasionally the data types of a message's arguments can vary as well. A message meant to request information about a bank account, for example, may identify the account using a numeric account number, or may be allowed to specify the account by name (e.g., "JSmith-savings-1"). The receiver would need to know which form of the argument is being used so that an integer value isn't read as a string, or vice versa. In our chess game example, suppose that, after updating our protocol to use ChessMove objects as arguments, we still wanted to support earlier versions of the message protocol that used three separate arguments to represent a chess move. The same message identifier could now be followed by either a single serialized object, or two strings followed by an integer. The MoveMessage class would need to have some indication of which data types to expect in order to decode the data on the input stream.

6.5.3. Adaptable Message Types

There are times when the message types in the protocol may change at runtime. Perhaps the agents need to negotiate a suitable protocol that both can support before engaging in a conversation. These protocol negotiations are fairly common in complex multimedia protocols, or in some network services protocols. A server or a client may not support the entire protocol, or may require different protocol dialects for backwards compatibility.

Another application for adaptable message protocols is in distributed services that need to be highly reliable and cannot be shut down for any significant period of time. If an update to the message protocol for such a service is required, it would ideally be added to the system while it is online.

6.5.4. An Adaptable Message Handler

One way to deal with variable-length argument lists and variable-type arguments is to tag each message argument in a message with information about its type. We could define a set of argument type identifiers, one for each data type that we are going to support in our message protocol, and precede each argument with its type identifier when we send it. We could include basic data types (e.g., integer, floating-point, character, string), as well as more complex data types (e.g., lists of basic data types). This would help to both identify and verify the type of incoming arguments. Having lists of basic data types helps us deal with variable-length argument lists. If we also include a message argument count, sent over the network before the actual arguments, we can deal with optional arguments and many of the other situations leading to variable argument lists.

These measures would help us implement more robust message protocols, but they come with a price. Tagging each argument with a type identifier complicates the process of updating a message protocol. If we decide in the future to change an argument to be of a type not previously used in our protocol, then we not only have to update the corresponding message classes on all of our networked agents, we also have to update the table of type identifiers to include the new data type. Also, this type identification information, as well as the argument count information, add overhead. The overhead is generally insignificant, but in bandwidth-restricted situations, it may be enough to be a problem.

Dealing with a message protocol with varying message types requires a bit more sophistication. Strange as it may seem, the easiest case to handle is one where the entire message protocol changes during an application's lifetime. In situations like this, we simply need to disable one BasicMsgHandler, and create another using the same input and output streams. Of course, we would need to know precisely when messages belonging to the new protocol would start coming in. If we are running the BasicMsgHandler within its own thread, we just need to call the thread's stop() method, wrap the new BasicMsgHandler with a new thread, and start() the thread.

The same approach can be used if a modified version of the current protocol is expected to take over in midstream. If we have two separate subclasses of BasicMsgHandler implemented for the two protocol variants, then we just have to follow the process sketched out above to switch from one to the other.

A tougher situation occurs when we periodically want to add message types to the current protocol. We may not know beforehand which message types will be needed while the application is running, which would mean that we couldn't create a message-handler subclass to support all possible message types. A particular message protocol may need to be augmented with additional message types after two agents finish negotiating the details of the protocol they will use. Or we may want to provide a more generic message management facility, where multiple sub-systems can add their own message types to the protocol. A manufacturing data management system, for example, may use a message protocol that includes messages for a machine diagnostic system, a machine scheduling algorithm, and an automatic material ordering system. Each subset of the message protocol may need to be refined and updated as the system (and the factory) evolves and grows, and while the system remains continuously operational.

To support situations like these, our message-processing facility needs to be expanded to allow adding individual message types and groups of message types at runtime. Example 6-10 shows the final version of our BasicMsgHandler, renamed MessageHandler, which has been updated to support these requirements. The MessageHandler now has a vector of Message objects that act as prototype messages in the message protocol. New message types can be added to the handler by calling the MessageHandler's addMessageType() method with a prototype Message object of the new type.

Example 6-10. The Message Handler Class

package dcj.util.message;

import java.util.Vector;
import java.io.*;

public class MessageHandler implements Runnable
{
  // A global MessageHandler, for applications where one central 
  // handler is used.
  public static MessageHandler current = null;

  InputStream msgIn;
  OutputStream msgOut;
  Vector msgPrototypes;

  public MessageHandler() {}
  public MessageHandler(InputStream in, OutputStream out) {
    setStreams(in, out);
  }

  public void setStreams(InputStream in, OutputStream out) {
    msgIn = in;
    msgOut = out;
  }

  public void addMessageType(Message prototype) {
    msgPrototypes.addElement(prototype);
  }

  public Message readMsg() throws IOException {
    Message msg = null;
    DataInputStream din = new DataInputStream(msgIn);

    String msgId = din.readUTF();  
    msg = buildMessage(msgId);
    if (msg != null && msg.readArgs(msgIn)) {
      return msg;
    }
    else {
      return null;
    }
  }

  public void sendMsg(Message msg) throws IOException {
    boolean success = true;
    DataOutputStream dout = new DataOutputStream(msgOut);

    dout.writeUTF(msg.messageID());
    msg.writeArgs(msgOut);
  }

  public void run() {
    try {
      while (true) {
        Message msg = readMsg();
        if (msg != null) {
          msg.Do();
        }
      }
    }
    // Treat an IOException as a termination of the message
    // exchange, and let this message-processing thread die.
    catch (IOException e) {}
  }

  protected Message buildMessage(String msgId) {
    Message msg = null;
    int numMTypes = msgPrototypes.size();
    for (int i = 0; i < numMTypes; i++) {
      Message m = (Message)msgPrototypes.elementAt(i);
      if (m.handles(msgId)) {
        msg = m.newCopy();
        break;
      }
    }
    return msg;
  }
}

The Message class referenced in the MessageHandler interface is a slightly modified version of our BasicMessage class, and is shown in Example 6-11. This new version of the message class includes a handles() method and a newCopy() method. After the MessageHandler reads a message identifier in its readMsg() method, it calls its buildMessage() method to construct the message object corresponding to the identifier. The buildMessage() method sequentially calls the handles() method on each Message object in the prototype list to see whether the Message recognizes the message identifier. If handles() returns true, a copy of the prototype message is made to handle the incoming message. The newCopy() method on Message returns a new Message object of the same type as the Message on which it is called. The new Message object is returned to MessageHandler's readMsg() method, where it is told to read its arguments from the input stream as before.

Example 6-11. The Message Class

package dcj.util.message;

import java.io.*;
import java.util.Vector;

public abstract class Message
{
  protected String id;
  protected Vector argList;
  protected String endToken = "END";

  public Message() {
    argList = new Vector();
  }

  public Message(String mid) {
    id = mid;
    argList = new Vector();
  }

  protected void addArg(Object arg) {
    argList.addElement(arg);
  }

  public String messageID() {
    return id;
  }

  public void setId(String mid) {
    id = mid;
  }

  public Vector argList() {
    Vector listCopy = (Vector)argList.clone();
    return listCopy;
  }

  public boolean readArgs(InputStream ins) {
    boolean success = true;
    DataInputStream din = new DataInputStream(ins);

    // Read tokens until the "end-of-message" token is seen.
    try {
      String token = din.readUTF();
      while (token.compareTo(endToken) != 0) {
        addArg(token);
        token = din.readUTF();
      }
    }
    catch (IOException e) {
      // Failed to read complete argument list.
      success = false;
    }
    return success;
  }

  public boolean writeArgs(OutputStream outs) {
    int len = argList.size();
    boolean success = true;
    DataOutputStream dout = new DataOutputStream(outs);

    // Write each argument in order
    try {
      for (int i = 0; i < len; i++) {
        String arg = (String)argList.elementAt(i);
        dout.writeUTF(arg);
      }

      // Finish with the end-of-message token
      dout.writeUTF(endToken);
    }
    catch (IOException e) {
      success = false;
    }
    return success;
  }

  public abstract boolean Do();
  public abstract boolean handles(String msgId);
  public abstract Message newCopy();
}

With this final version of our message-passing and message-processing facility, we're able to define types of messages by creating subclasses of the Message class that can read the message with its arguments and perform the proper task based on the contents of the message. We can define complete message protocols in several ways using the MessageHandler class. A subclass of MessageHandler can be defined that recognizes the various message identifiers in the protocol directly in its buildMessage() method. A MessageHandler subclass could also be written that automatically adds the needed Message prototypes to its internal list, and then uses the default buildMessage() implementation to ask each Message object if it recognizes an incoming message's type. A third option, made possible by the fact that MessageHandler now has no abstract methods, is for a distributed application to construct a generic MessageHandler object, then add the necessary Message prototypes to the generic MessageHandler using its addMessageType() method. The buildMessage() method would then be able to create Message objects corresponding to the incoming messages.

The techniques we discussed earlier for handling variable argument lists and argument types can also be applied within our new message-handling system. A new implementation of the MoveMessage class from our chess examples demonstrates this in Example 6-12. The class has been modified to allow it to support two versions of the message argument list: one uses the from, to, and checkFlag arguments, and another uses a serialized ChessMove object. The message includes an optional new string argument that indicates whether the chess move is being sent as a ChessMove object. The readArgs() method reads the first argument as a string, then checks the string's value. If the string's value is equal to "MOBJ," then the chess move argument is being sent as a ChessMove object, and an ObjectInputStream is used to read it. If not, then the chess move is being sent in the older format of two strings and an integer, and the DataInputStream is used to read the remaining arguments. The writeArgs() method sends out the new argument, then transmits its chess move as a serialized ChessMove object.

Example 6-12. Backwards Compatible MoveMessage Class

class MoveMessage extends ChessMessage {
  public MoveMessage(ChessPlayer p) {
    super(p);
    setId("move");
  }

  public MoveMessage(String from, String to, int checkFlag) {
    setId("move");
    ChessMove move = new ChessMove(from, to, checkFlag);
    addArg(move);
  }

  public boolean Do() {
    boolean success = true;
    ChessMove move = (ChessMove)argList.elementAt(1);
    String to = move.to();
    String from = move.from();
    int checkFlag = move.checkFlag();

    try {
      if (!player.acceptMove(from, to, checkFlag)) {
        MessageHandler.current.sendMsg(new RejectMoveMessage());
      }
      else {
        ConfirmMoveMessage ccmsg =
          new ConfirmMoveMessage(move);
        MessageHandler.current.sendMsg(ccmsg);

        // We accepted the opponent's move, now send them
        // our countermove, unless they just mated us...
        if (checkFlag == ChessPlayer.CHECKMATE) {
          ConcedeMessage cmsg = new ConcedeMessage();
          MessageHandler.current.sendMsg(cmsg);
        }
        else {
          player.nextMove(from, to, checkFlag);
          MoveMessage mmsg = new MoveMessage(from, to, checkFlag);
          MessageHandler.current.sendMsg(mmsg);
        }
      }
    }
    catch (IOException e) {
      success = false;
    }

    return success;
  }

  public boolean readArgs(InputStream ins) {
    boolean success = true;

    DataInputStream din = new DataInputStream(ins);

    try {
      String temp = din.readUTF();
      if (temp.compareTo("MOBJ") == 0) {
        ObjectInputStream oin = new ObjectInputStream(ins);
        ChessMove move = (ChessMove)oin.readObject();
        addArg(move);
      }
      else {
        String to = din.readUTF();
        int checkFlag = din.readInt();
        ChessMove move = new ChessMove(temp, to, checkFlag);
        addArg(move);
      }

      // Got all of our arguments, now watch for the
      // end-of-message token
      temp = din.readUTF();
      while (temp.compareTo(endToken) != 0) {
        temp = din.readUTF();
      }
    }
    catch (Exception e) {
      success = false;
    }

    return success;
  }

  public boolean writeArgs(OutputStream outs) {
    boolean success = true;
    DataOutputStream dout = new DataOutputStream(outs);
    ObjectOutputStream oout = new ObjectOutputStream(outs);
    ChessMove move = (ChessMove)argList.elementAt(0);

    try {
      dout.writeUTF("MOBJ");
      oout.writeObject(move);
    }
    catch (IOException e) {
      success = false;
    }

    return success;
  }

  public boolean handles(String mid) {
    if (mid.compareTo("move") == 0)
      return true;
    else
      return false;
  }

  public Message newCopy() {
    return(new MoveMessage(player));
  }
}

With this version of our MoveMessage class, we can talk to ChessPlayer agents using either the new or old versions of the class. We could have handled the variable argument list by adding an argument count to the argument list, but then our previous version of the MoveMessage class would also have to be modified to send the new argument. This would defeat the purpose, since our original intent was to provide backwards compatibility with the previous version of our chess protocol.



Library Navigation Links

Copyright © 2001 O'Reilly & Associates. All rights reserved.