00001
00002
00003 #ifndef MuscleMessageTransceiverThread_h
00004 #define MuscleMessageTransceiverThread_h
00005
00006 #include "system/Thread.h"
00007 #include "reflector/StorageReflectSession.h"
00008 #include "reflector/ReflectServer.h"
00009
00010 BEGIN_NAMESPACE(muscle);
00011
00012 class ThreadSupervisorSession;
00013 class MessageTransceiverThread;
00014
00018 enum {
00019 MTT_EVENT_INCOMING_MESSAGE = 1835233840,
00020 MTT_EVENT_SESSION_ACCEPTED,
00021 MTT_EVENT_SESSION_ATTACHED,
00022 MTT_EVENT_SESSION_CONNECTED,
00023 MTT_EVENT_SESSION_DISCONNECTED,
00024 MTT_EVENT_SESSION_DETACHED,
00025 MTT_EVENT_FACTORY_ATTACHED,
00026 MTT_EVENT_FACTORY_DETACHED,
00027 MTT_EVENT_OUTPUT_QUEUES_DRAINED,
00028 MTT_EVENT_SERVER_EXITED,
00029 MTT_LAST_EVENT
00030 };
00031
00033 enum {
00034 MTT_COMMAND_SEND_USER_MESSAGE = 1835230000,
00035 MTT_COMMAND_ADD_NEW_SESSION,
00036 MTT_COMMAND_PUT_ACCEPT_FACTORY,
00037 MTT_COMMAND_REMOVE_ACCEPT_FACTORY,
00038 MTT_COMMAND_SET_DEFAULT_PATH,
00039 MTT_COMMAND_NOTIFY_ON_OUTPUT_DRAIN,
00040 MTT_COMMAND_SET_INPUT_POLICY,
00041 MTT_COMMAND_SET_OUTPUT_POLICY,
00042 MTT_COMMAND_REMOVE_SESSIONS,
00043 MTT_COMMAND_SET_OUTGOING_ENCODING,
00044 MTT_LAST_COMMAND
00045 };
00046
00048 #define MTT_NAME_PATH "path" // field containing a session path (e.g. "
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00070
00071
00072
00074 }
00075
00077 virtual ~DrainTag();
00078
00079 private:
00080 friend class ThreadSupervisorSession;
00081 friend class ThreadWorkerSession;
00082 friend class MessageTransceiverThread;
00083
00084 void SetNotify(ThreadSupervisorSession * notify) {_notify = notify;}
00085 MessageRef GetReplyMessage() const {return _replyRef;}
00086 void SetReplyMessage(const MessageRef & ref) {_replyRef = ref;}
00087
00088 ThreadSupervisorSession * _notify;
00089 MessageRef _replyRef;
00090 };
00091 typedef Ref<DrainTag> DrainTagRef;
00092
00097 class ThreadWorkerSession : public StorageReflectSession
00098 {
00099 public:
00101 ThreadWorkerSession();
00102
00104 virtual ~ThreadWorkerSession();
00105
00107 virtual status_t AttachedToServer();
00108
00110 virtual void AboutToDetachFromServer();
00111
00113 virtual bool ClientConnectionClosed();
00114
00116 virtual void AsyncConnectCompleted();
00117
00119 virtual void MessageReceivedFromGateway(const MessageRef & msg, void * userData);
00120
00122 virtual void MessageReceivedFromSession(AbstractReflectSession & from, const MessageRef & msg, void * userData);
00123
00125 virtual const char * GetTypeName() const {return "ThreadWorker";}
00126
00128 virtual int32 DoOutput(uint32 maxBytes);
00129
00130 private:
00131 friend class ThreadWorkerSessionFactory;
00132 Queue<DrainTagRef> _drainedNotifiers;
00133 bool _sendAcceptedMessage;
00134 };
00135 typedef Ref<ThreadWorkerSession> ThreadWorkerSessionRef;
00136
00138 class ThreadWorkerSessionFactory : public StorageReflectSessionFactory
00139 {
00140 public:
00142 ThreadWorkerSessionFactory();
00143
00145 virtual status_t AttachedToServer();
00146
00148 virtual void AboutToDetachFromServer();
00149
00157 virtual AbstractReflectSessionRef CreateSession(const String & clientAddress, const IPAddressAndPort & factoryInfo);
00158
00163 virtual ThreadWorkerSessionRef CreateThreadWorkerSession(const String & clientAddress, const IPAddressAndPort & factoryInfo);
00164 };
00165 typedef Ref<ThreadWorkerSessionFactory> ThreadWorkerSessionFactoryRef;
00166
00171 class ThreadSupervisorSession : public StorageReflectSession
00172 {
00173 public:
00175 ThreadSupervisorSession();
00176
00178 virtual ~ThreadSupervisorSession();
00179
00181 virtual void AboutToDetachFromServer();
00182
00184 virtual AbstractMessageIOGatewayRef CreateGateway();
00185
00189 virtual void MessageReceivedFromGateway(const MessageRef & msg, void * userData);
00190
00192 virtual void MessageReceivedFromSession(AbstractReflectSession & from, const MessageRef & msg, void * userData);
00193
00195 virtual void MessageReceivedFromFactory(ReflectSessionFactory & from, const MessageRef & msg, void * userData);
00196
00200 virtual bool ClientConnectionClosed();
00201
00208 void SetDefaultDistributionPath(const String & path) {_defaultDistributionPath = path;}
00209
00211 const String & GetDefaultDistributionPath() const {return _defaultDistributionPath;}
00212
00214 virtual const char * GetTypeName() const {return "ThreadSupervisor";}
00215
00216 protected:
00222 virtual status_t MessageReceivedFromOwner(const MessageRef & msg, uint32 numLeft);
00223
00224 private:
00225 friend class MessageTransceiverThread;
00226 friend class DrainTag;
00227
00228 void DrainTagIsBeingDeleted(DrainTag * tag);
00229 void DistributeMessageToWorkers(const MessageRef & distMsg);
00230 status_t AddNewWorkerConnectSession(const ThreadWorkerSessionRef & sessionRef, const ip_address & hostIP, uint16 port, uint64 autoReconnectDelay);
00231
00232 Hashtable<DrainTag *, bool> _drainTags;
00233 String _defaultDistributionPath;
00234 MessageTransceiverThread * _mtt;
00235 };
00236 typedef Ref<ThreadSupervisorSession> ThreadSupervisorSessionRef;
00237
00246 class MessageTransceiverThread : public Thread
00247 {
00248 public:
00250 MessageTransceiverThread();
00251
00255 virtual ~MessageTransceiverThread();
00256
00260 virtual status_t StartInternalThread();
00261
00270 virtual status_t SendMessageToSessions(const MessageRef & msgRef, const char * optDistPath = NULL);
00271
00287 virtual status_t AddNewSession(const SocketRef & socket, const ThreadWorkerSessionRef & optSessionRef);
00288
00290 status_t AddNewSession(const SocketRef & socket) {return AddNewSession(socket, ThreadWorkerSessionRef());}
00291
00293 status_t AddNewSession() {return AddNewSession(SocketRef(), ThreadWorkerSessionRef());}
00294
00316 virtual status_t AddNewConnectSession(const ip_address & targetIPAddress, uint16 port, const ThreadWorkerSessionRef & optSessionRef, uint64 autoReconnectDelay = MUSCLE_TIME_NEVER);
00317
00319 status_t AddNewConnectSession(const ip_address & targetIPAddress, uint16 port, uint64 autoReconnectDelay = MUSCLE_TIME_NEVER) {return AddNewConnectSession(targetIPAddress, port, ThreadWorkerSessionRef(), autoReconnectDelay);}
00320
00343 virtual status_t AddNewConnectSession(const String & targetHostName, uint16 port, const ThreadWorkerSessionRef & optSessionRef, bool expandLocalhost = false, uint64 autoReconnectDelay = MUSCLE_TIME_NEVER);
00344
00346 status_t AddNewConnectSession(const String & targetHostName, uint16 port, bool expandLocalhost = false, uint64 autoReconnectDelay = MUSCLE_TIME_NEVER) {return AddNewConnectSession(targetHostName, port, ThreadWorkerSessionRef(), expandLocalhost, autoReconnectDelay);}
00347
00363 virtual status_t PutAcceptFactory(uint16 port, const ThreadWorkerSessionFactoryRef & optFactoryRef, const ip_address & optInterfaceIP = invalidIP);
00364
00366 status_t PutAcceptFactory(uint16 port) {return PutAcceptFactory(port, ThreadWorkerSessionFactoryRef());}
00367
00378 virtual status_t RemoveAcceptFactory(uint16 port, const ip_address & optInterfaceIP = invalidIP);
00379
00383 virtual void Reset();
00384
00397 status_t SetDefaultDistributionPath(const String & distPath);
00398
00400 const String & GetDefaultDistributionPath() const {return _defaultDistributionPath;}
00401
00433 int32 GetNextEventFromInternalThread(uint32 & retEventCode, MessageRef * optRetMsgRef = NULL, String * optFromSession = NULL, uint32 * optFromFactoryID = NULL);
00434
00449 status_t RequestOutputQueuesDrainedNotification(const MessageRef & notificationMsg, const char * optDistPath = NULL, DrainTag * optDrainTag = NULL);
00450
00461 status_t SetNewInputPolicy(const PolicyRef & pref, const char * optDistPath = NULL);
00462
00473 status_t SetNewOutputPolicy(const PolicyRef & pref, const char * optDistPath = NULL);
00474
00486 status_t SetOutgoingMessageEncoding(int32 encoding, const char * optDistPath = NULL);
00487
00494 status_t RemoveSessions(const char * optDistPath = NULL);
00495
00496 protected:
00498 virtual void InternalThreadEntry();
00499
00505 virtual ThreadSupervisorSessionRef CreateSupervisorSession();
00506
00511 virtual ThreadWorkerSessionRef CreateDefaultWorkerSession();
00512
00517 virtual ThreadWorkerSessionFactoryRef CreateDefaultSessionFactory();
00518
00520 virtual ReflectServerRef CreateReflectServer();
00521
00522 private:
00523 friend class ThreadSupervisorSession;
00524 status_t EnsureServerAllocated();
00525 status_t SendAddNewSessionMessage(const ThreadWorkerSessionRef & sessionRef, const SocketRef & socket, const char * hostName, const ip_address & hostIP, uint16 port, bool expandLocalhost, uint64 autoReconnectDelay);
00526 status_t SetNewPolicyAux(uint32 what, const PolicyRef & pref, const char * optDistPath);
00527
00528 ReflectServerRef _server;
00529 String _defaultDistributionPath;
00530 };
00531
00532 END_NAMESPACE(muscle);
00533
00534 #endif
00535