00001
00002
00003 #ifndef MuscleAbstractReflectSession_h
00004 #define MuscleAbstractReflectSession_h
00005
00006 #include "iogateway/AbstractMessageIOGateway.h"
00007 #include "reflector/AbstractSessionIOPolicy.h"
00008 #include "reflector/ServerComponent.h"
00009 #include "util/Queue.h"
00010 #include "util/RefCount.h"
00011
00012 BEGIN_NAMESPACE(muscle);
00013
00018 class ReflectSessionFactory : public ServerComponent
00019 {
00020 public:
00022 ReflectSessionFactory();
00023
00025 virtual ~ReflectSessionFactory() {}
00026
00032 virtual AbstractReflectSessionRef CreateSession(const String & clientAddress, const IPAddressAndPort & factoryInfo) = 0;
00033
00042 virtual bool IsReadyToAcceptSessions() const {return true;}
00043
00048 uint32 GetFactoryID() const {return _id;}
00049
00050 protected:
00058 void BroadcastToAllSessions(const MessageRef & msgRef, void * userData = NULL);
00059
00068 void BroadcastToAllFactories(const MessageRef & msgRef, void * userData = NULL, bool includeSelf = true);
00069
00070 private:
00071 uint32 _id;
00072 };
00073
00079 class ProxySessionFactory : public ReflectSessionFactory
00080 {
00081 public:
00082 ProxySessionFactory(const ReflectSessionFactoryRef & slaveRef) : _slaveRef(slaveRef) {}
00083 virtual ~ProxySessionFactory() {}
00084
00085 virtual status_t AttachedToServer();
00086 virtual void AboutToDetachFromServer();
00087
00089 const ReflectSessionFactoryRef & GetSlave() const {return _slaveRef;}
00090
00091 private:
00092 ReflectSessionFactoryRef _slaveRef;
00093 };
00094
00099 class AbstractReflectSession : public ServerComponent, public AbstractGatewayMessageReceiver
00100 {
00101 public:
00103 AbstractReflectSession();
00104
00106 virtual ~AbstractReflectSession();
00107
00111 const String & GetHostName() const;
00112
00117 uint16 GetPort() const;
00118
00123 const ip_address & GetLocalInterfaceAddress() const;
00124
00126 uint32 GetSessionID() const {return _sessionID;}
00127
00132 const String & GetSessionIDString() const {return _idString;}
00133
00135 void EndSession();
00136
00142 bool DisconnectSession();
00143
00151 status_t ReplaceSession(const AbstractReflectSessionRef & newSession);
00152
00163 virtual bool ClientConnectionClosed();
00164
00173 virtual void AsyncConnectCompleted();
00174
00180 void SetInputPolicy(const PolicyRef & newPolicy);
00181
00185 PolicyRef GetInputPolicy() const {return _inputPolicyRef;}
00186
00192 void SetOutputPolicy(const PolicyRef & newPolicy);
00193
00197 PolicyRef GetOutputPolicy() const {return _outputPolicyRef;}
00198
00204 void SetGateway(const AbstractMessageIOGatewayRef & ref) {_gateway = ref; _outputStallLimit = _gateway()?_gateway()->GetOutputStallLimit():MUSCLE_TIME_NEVER;}
00205
00211 const AbstractMessageIOGatewayRef & GetGateway() const {return _gateway;}
00212
00217 virtual bool HasBytesToOutput() const;
00218
00226 virtual bool IsReadyForInput() const;
00227
00234 virtual int32 DoInput(AbstractGatewayMessageReceiver & receiver, uint32 maxBytes);
00235
00241 virtual int32 DoOutput(uint32 maxBytes);
00242
00250 virtual SocketRef CreateDefaultSocket();
00251
00261 virtual DataIORef CreateDataIO(const SocketRef & socket);
00262
00272 virtual AbstractMessageIOGatewayRef CreateGateway();
00273
00275 virtual uint64 GetPulseTime(uint64 now, uint64 sched);
00276
00278 virtual void Pulse(uint64 now, uint64 sched);
00279
00281 virtual const char * GetTypeName() const = 0;
00282
00288 virtual String GetDefaultHostName() const;
00289
00293 String GetSessionDescriptionString() const;
00294
00299 const ip_address & GetAsyncConnectIP() const {return _asyncConnectDest.GetIPAddress();}
00300
00305 uint16 GetAsyncConnectPort() const {return _asyncConnectDest.GetPort();}
00306
00308 virtual const String & GetSessionRootPath() const {return _sessionRootPath;}
00309
00316 void SetAutoReconnectDelay(uint64 delay) {_autoReconnectDelay = delay; InvalidatePulseTime();}
00317
00322 uint64 GetAutoReconnectDelay() const {return _autoReconnectDelay;}
00323
00325 bool IsConnectingAsync() const {return _connectingAsync;}
00326
00328 bool IsConnected() const {return _isConnected;}
00329
00330 protected:
00337 virtual status_t AddOutgoingMessage(const MessageRef & ref);
00338
00348 void BroadcastToAllSessions(const MessageRef & msgRef, void * userData = NULL, bool includeSelf = true);
00349
00358 void BroadcastToAllFactories(const MessageRef & msgRef, void * userData = NULL);
00359
00373 status_t Reconnect();
00374
00378 const SocketRef & GetSessionSelectSocket() const;
00379
00381 void SetSessionRootPath(const String & p) {_sessionRootPath = p;}
00382
00383 private:
00384 void SetPolicyAux(PolicyRef & setRef, uint32 & setChunk, const PolicyRef & newRef, bool isInput);
00385 void PlanForReconnect();
00386
00387 friend class ReflectServer;
00388 uint32 _sessionID;
00389 String _idString;
00390 IPAddressAndPort _ipAddressAndPort;
00391 bool _connectingAsync;
00392 bool _isConnected;
00393 String _hostName;
00394 IPAddressAndPort _asyncConnectDest;
00395 AbstractMessageIOGatewayRef _gateway;
00396 uint64 _lastByteOutputAt;
00397 PolicyRef _inputPolicyRef;
00398 PolicyRef _outputPolicyRef;
00399 uint32 _maxInputChunk;
00400 uint32 _maxOutputChunk;
00401 uint64 _outputStallLimit;
00402 bool _scratchReconnected;
00403 String _sessionRootPath;
00404
00405
00406 uint64 _autoReconnectDelay;
00407 uint64 _reconnectTime;
00408 bool _wasConnected;
00409 };
00410
00411
00412 #ifdef MUSCLE_USING_OLD_MICROSOFT_COMPILER
00413 DECLARE_HASHTABLE_KEY_CLASS(Ref<AbstractReflectSession>);
00414 #endif
00415
00416 END_NAMESPACE(muscle);
00417
00418 #endif