MessageTransceiverThread.h

00001 /* This file is Copyright 2000-2008 Meyer Sound Laboratories Inc.  See the included LICENSE.txt file for details. */
00002 
00003 #ifndef MuscleMessageTransceiverThread_h
00004 #define MuscleMessageTransceiverThread_h
00005 
00006 #include "system/Thread.h"
00007 #include "reflector/StorageReflectSession.h"
00008 #include "reflector/ReflectServer.h"
00009 
00010 BEGIN_NAMESPACE(muscle);
00011 
00012 class ThreadSupervisorSession;
00013 class MessageTransceiverThread;
00014 
00018 enum {
00019    MTT_EVENT_INCOMING_MESSAGE = 1835233840, // A new message from a remote computer is ready to process
00020    MTT_EVENT_SESSION_ACCEPTED,              // A new session has been created by one of our factory objects
00021    MTT_EVENT_SESSION_ATTACHED,              // A new session has been attached to the local server
00022    MTT_EVENT_SESSION_CONNECTED,             // A session on the local server has completed its connection to the remote one
00023    MTT_EVENT_SESSION_DISCONNECTED,          // A session on the local server got disconnected from its remote peer
00024    MTT_EVENT_SESSION_DETACHED,              // A session on the local server has detached (and been destroyed)
00025    MTT_EVENT_FACTORY_ATTACHED,              // A ReflectSessionFactory object has been attached to the server
00026    MTT_EVENT_FACTORY_DETACHED,              // A ReflectSessionFactory object has been detached (and been destroyed)
00027    MTT_EVENT_OUTPUT_QUEUES_DRAINED,         // Output queues of sessions previously specified in RequestOutputQueuesDrainedNotification() have drained
00028    MTT_EVENT_SERVER_EXITED,                 // The ReflectServer event loop has terminated
00029    MTT_LAST_EVENT
00030 };
00031    
00033 enum {
00034    MTT_COMMAND_SEND_USER_MESSAGE = 1835230000, // contains a user message to be sent out
00035    MTT_COMMAND_ADD_NEW_SESSION,                // contains info on a new session to be created
00036    MTT_COMMAND_PUT_ACCEPT_FACTORY,             // request to start accepting session(s) on a given port
00037    MTT_COMMAND_REMOVE_ACCEPT_FACTORY,          // remove the acceptor factory on a given port(s)
00038    MTT_COMMAND_SET_DEFAULT_PATH,               // change the default distribution path
00039    MTT_COMMAND_NOTIFY_ON_OUTPUT_DRAIN,         // request a notification when all currently pending output has been sent
00040    MTT_COMMAND_SET_INPUT_POLICY,               // set a new input IO Policy for worker sessions
00041    MTT_COMMAND_SET_OUTPUT_POLICY,              // set a new output IO Policy for worker sessions
00042    MTT_COMMAND_REMOVE_SESSIONS,                // remove the matching worker sessions
00043    MTT_COMMAND_SET_OUTGOING_ENCODING,          // set the MUSCLE_MESSAGE_ENCODING_* setting on worker sessions
00044    MTT_LAST_COMMAND
00045 };
00046 
00048 #define MTT_NAME_PATH        "path"  // field containing a session path (e.g. "/*/*")
00049 #define MTT_NAME_MESSAGE     "mssg"  // field containing a message object
00050 #define MTT_NAME_SOCKET      "sock"  // field containing a Socket reference
00051 #define MTT_NAME_IP_ADDRESS  "addr"  // field containing an int32 IP address
00052 #define MTT_NAME_HOSTNAME    "host"  // field containing an ASCII hostname or IP address
00053 #define MTT_NAME_PORT        "port"  // field containing an int16 port number
00054 #define MTT_NAME_FACTORY_ID  "fcid"  // field containing a uint32 factory ID number (new for v3.40)
00055 #define MTT_NAME_SESSION     "sess"  // field containing an AbstractReflectSession tag
00056 #define MTT_NAME_FROMSESSION "sfrm"  // field containing the root path of the session this message is from (e.g. "192.168.1.103/17")
00057 #define MTT_NAME_FACTORY     "fact"  // field containing a ReflectSessionFactory tag
00058 #define MTT_NAME_DRAIN_TAG   "dtag"  // field containing a DrainTag reference
00059 #define MTT_NAME_POLICY_TAG  "ptag"  // field containing an IOPolicy reference
00060 #define MTT_NAME_ENCODING    "enco"  // field containing the MUSCLE_MESSAGE_ENCODING_* value
00061 #define MTT_NAME_EXPANDLOCALHOST "expl" // boolean field indicating whether localhost IP should be expanded to primary IP
00062 #define MTT_NAME_AUTORECONNECTDELAY "arcd" // int64 indicating how long after disconnect before an auto-reconnect should occur
00063 
00070 class DrainTag : public RefCountable
00071 {
00072 public:
00074    DrainTag() : _notify(NULL) {/* empty */}
00075  
00077    virtual ~DrainTag();
00078 
00079 private:
00080    friend class ThreadSupervisorSession;
00081    friend class ThreadWorkerSession;
00082    friend class MessageTransceiverThread;
00083 
00084    void SetNotify(ThreadSupervisorSession * notify) {_notify = notify;}
00085    MessageRef GetReplyMessage() const {return _replyRef;}
00086    void SetReplyMessage(const MessageRef & ref) {_replyRef = ref;}
00087 
00088    ThreadSupervisorSession * _notify;
00089    MessageRef _replyRef;
00090 };
00091 typedef Ref<DrainTag> DrainTagRef;
00092 
00097 class ThreadWorkerSession : public StorageReflectSession
00098 {
00099 public:
00101    ThreadWorkerSession();
00102 
00104    virtual ~ThreadWorkerSession();
00105 
00107    virtual status_t AttachedToServer();
00108 
00110    virtual void AboutToDetachFromServer();
00111 
00113    virtual bool ClientConnectionClosed();
00114 
00116    virtual void AsyncConnectCompleted();
00117 
00119    virtual void MessageReceivedFromGateway(const MessageRef & msg, void * userData);
00120 
00122    virtual void MessageReceivedFromSession(AbstractReflectSession & from, const MessageRef & msg, void * userData);
00123 
00125    virtual const char * GetTypeName() const {return "ThreadWorker";}
00126 
00128    virtual int32 DoOutput(uint32 maxBytes);
00129 
00130 private:
00131    friend class ThreadWorkerSessionFactory;
00132    Queue<DrainTagRef> _drainedNotifiers;
00133    bool _sendAcceptedMessage;
00134 };
00135 typedef Ref<ThreadWorkerSession> ThreadWorkerSessionRef;
00136 
00138 class ThreadWorkerSessionFactory : public StorageReflectSessionFactory
00139 {
00140 public:
00142    ThreadWorkerSessionFactory();
00143 
00145    virtual status_t AttachedToServer();
00146 
00148    virtual void AboutToDetachFromServer();
00149 
00157    virtual AbstractReflectSessionRef CreateSession(const String & clientAddress, const IPAddressAndPort & factoryInfo);
00158 
00163    virtual ThreadWorkerSessionRef CreateThreadWorkerSession(const String & clientAddress, const IPAddressAndPort & factoryInfo);
00164 };
00165 typedef Ref<ThreadWorkerSessionFactory> ThreadWorkerSessionFactoryRef;
00166 
00171 class ThreadSupervisorSession : public StorageReflectSession
00172 {
00173 public:
00175    ThreadSupervisorSession();
00176 
00178    virtual ~ThreadSupervisorSession();
00179 
00181    virtual void AboutToDetachFromServer();
00182 
00184    virtual AbstractMessageIOGatewayRef CreateGateway();         
00185 
00189    virtual void MessageReceivedFromGateway(const MessageRef & msg, void * userData);
00190 
00192    virtual void MessageReceivedFromSession(AbstractReflectSession & from, const MessageRef & msg, void * userData);
00193 
00195    virtual void MessageReceivedFromFactory(ReflectSessionFactory & from, const MessageRef & msg, void * userData);
00196 
00200    virtual bool ClientConnectionClosed();
00201 
00208    void SetDefaultDistributionPath(const String & path) {_defaultDistributionPath = path;}
00209 
00211    const String & GetDefaultDistributionPath() const {return _defaultDistributionPath;}
00212 
00214    virtual const char * GetTypeName() const {return "ThreadSupervisor";}
00215 
00216 protected:
00222    virtual status_t MessageReceivedFromOwner(const MessageRef & msg, uint32 numLeft);
00223 
00224 private:
00225    friend class MessageTransceiverThread;
00226    friend class DrainTag;
00227 
00228    void DrainTagIsBeingDeleted(DrainTag * tag);
00229    void DistributeMessageToWorkers(const MessageRef & distMsg);
00230    status_t AddNewWorkerConnectSession(const ThreadWorkerSessionRef & sessionRef, const ip_address & hostIP, uint16 port, uint64 autoReconnectDelay);
00231 
00232    Hashtable<DrainTag *, bool> _drainTags;
00233    String _defaultDistributionPath;
00234    MessageTransceiverThread * _mtt;
00235 };
00236 typedef Ref<ThreadSupervisorSession> ThreadSupervisorSessionRef;
00237 
00246 class MessageTransceiverThread : public Thread
00247 {
00248 public:
00250    MessageTransceiverThread();
00251 
00255    virtual ~MessageTransceiverThread();
00256 
00260    virtual status_t StartInternalThread();
00261 
00270    virtual status_t SendMessageToSessions(const MessageRef & msgRef, const char * optDistPath = NULL);
00271 
00287    virtual status_t AddNewSession(const SocketRef & socket, const ThreadWorkerSessionRef & optSessionRef);
00288 
00290    status_t AddNewSession(const SocketRef & socket) {return AddNewSession(socket, ThreadWorkerSessionRef());}
00291 
00293    status_t AddNewSession() {return AddNewSession(SocketRef(), ThreadWorkerSessionRef());}
00294 
00316    virtual status_t AddNewConnectSession(const ip_address & targetIPAddress, uint16 port, const ThreadWorkerSessionRef & optSessionRef, uint64 autoReconnectDelay = MUSCLE_TIME_NEVER);
00317 
00319    status_t AddNewConnectSession(const ip_address & targetIPAddress, uint16 port, uint64 autoReconnectDelay = MUSCLE_TIME_NEVER) {return AddNewConnectSession(targetIPAddress, port, ThreadWorkerSessionRef(), autoReconnectDelay);}
00320 
00343    virtual status_t AddNewConnectSession(const String & targetHostName, uint16 port, const ThreadWorkerSessionRef & optSessionRef, bool expandLocalhost = false, uint64 autoReconnectDelay = MUSCLE_TIME_NEVER);
00344 
00346    status_t AddNewConnectSession(const String & targetHostName, uint16 port, bool expandLocalhost = false, uint64 autoReconnectDelay = MUSCLE_TIME_NEVER) {return AddNewConnectSession(targetHostName, port, ThreadWorkerSessionRef(), expandLocalhost, autoReconnectDelay);}
00347 
00363    virtual status_t PutAcceptFactory(uint16 port, const ThreadWorkerSessionFactoryRef & optFactoryRef, const ip_address & optInterfaceIP = invalidIP);
00364 
00366    status_t PutAcceptFactory(uint16 port) {return PutAcceptFactory(port, ThreadWorkerSessionFactoryRef());}
00367 
00378    virtual status_t RemoveAcceptFactory(uint16 port, const ip_address & optInterfaceIP = invalidIP);
00379 
00383    virtual void Reset();
00384 
00397    status_t SetDefaultDistributionPath(const String & distPath);
00398 
00400    const String & GetDefaultDistributionPath() const {return _defaultDistributionPath;}
00401 
00433    int32 GetNextEventFromInternalThread(uint32 & retEventCode, MessageRef * optRetMsgRef = NULL, String * optFromSession = NULL, uint32 * optFromFactoryID = NULL);
00434 
00449    status_t RequestOutputQueuesDrainedNotification(const MessageRef & notificationMsg, const char * optDistPath = NULL, DrainTag * optDrainTag = NULL);
00450 
00461    status_t SetNewInputPolicy(const PolicyRef & pref, const char * optDistPath = NULL);
00462 
00473    status_t SetNewOutputPolicy(const PolicyRef & pref, const char * optDistPath = NULL);
00474 
00486    status_t SetOutgoingMessageEncoding(int32 encoding, const char * optDistPath = NULL);
00487 
00494    status_t RemoveSessions(const char * optDistPath = NULL);
00495 
00496 protected:
00498    virtual void InternalThreadEntry();
00499  
00505    virtual ThreadSupervisorSessionRef CreateSupervisorSession();
00506 
00511    virtual ThreadWorkerSessionRef CreateDefaultWorkerSession();
00512 
00517    virtual ThreadWorkerSessionFactoryRef CreateDefaultSessionFactory();
00518 
00520    virtual ReflectServerRef CreateReflectServer();
00521 
00522 private:
00523    friend class ThreadSupervisorSession;
00524    status_t EnsureServerAllocated();
00525    status_t SendAddNewSessionMessage(const ThreadWorkerSessionRef & sessionRef, const SocketRef & socket, const char * hostName, const ip_address & hostIP, uint16 port, bool expandLocalhost, uint64 autoReconnectDelay);
00526    status_t SetNewPolicyAux(uint32 what, const PolicyRef & pref, const char * optDistPath);
00527 
00528    ReflectServerRef _server;
00529    String _defaultDistributionPath;
00530 };
00531 
00532 END_NAMESPACE(muscle);
00533 
00534 #endif
00535 

Generated on Thu Jun 5 17:47:53 2008 for MUSCLE by  doxygen 1.5.1