00001
00002
00003 #ifndef MuscleAbstractMessageIOGateway_h
00004 #define MuscleAbstractMessageIOGateway_h
00005
00006 #include "dataio/DataIO.h"
00007 #include "message/Message.h"
00008 #include "util/Queue.h"
00009 #include "util/RefCount.h"
00010 #include "util/NetworkUtilityFunctions.h"
00011 #include "util/PulseNode.h"
00012
00013 BEGIN_NAMESPACE(muscle);
00014
00015 class AbstractMessageIOGateway;
00016
00018 class AbstractGatewayMessageReceiver
00019 {
00020 public:
00022 AbstractGatewayMessageReceiver() : _inBatch(false), _doInputCount(0) {}
00023
00025 virtual ~AbstractGatewayMessageReceiver() {}
00026
00034 void CallMessageReceivedFromGateway(const MessageRef & msg, void * userData = NULL)
00035 {
00036 if ((_doInputCount > 0)&&(_inBatch == false))
00037 {
00038 _inBatch = true;
00039 BeginMessageReceivedFromGatewayBatch();
00040 }
00041 MessageReceivedFromGateway(msg, userData);
00042 AfterMessageReceivedFromGateway(msg, userData);
00043 }
00044
00045 protected:
00050 virtual void MessageReceivedFromGateway(const MessageRef & msg, void * userData) = 0;
00051
00058 virtual void AfterMessageReceivedFromGateway(const MessageRef & msg, void * userData) {(void) msg; (void) userData;}
00059
00063 virtual void BeginMessageReceivedFromGatewayBatch() {}
00064
00068 virtual void EndMessageReceivedFromGatewayBatch() {}
00069
00070 private:
00071 friend class AbstractMessageIOGateway;
00072
00073 void DoInputBegins() {_doInputCount++;}
00074 void DoInputEnds()
00075 {
00076 if ((--_doInputCount == 0)&&(_inBatch))
00077 {
00078 EndMessageReceivedFromGatewayBatch();
00079 _inBatch = false;
00080 }
00081 }
00082
00083 bool _inBatch;
00084 uint32 _doInputCount;
00085 };
00086
00092 class QueueGatewayMessageReceiver : public AbstractGatewayMessageReceiver, public Queue<MessageRef>
00093 {
00094 public:
00096 QueueGatewayMessageReceiver() {}
00097
00098 protected:
00099 virtual void MessageReceivedFromGateway(const MessageRef & msg, void * userData) {(void) userData; (void) AddTail(msg);}
00100 };
00101
00106 class AbstractMessageIOGateway : public RefCountable, public PulseNode
00107 {
00108 public:
00110 AbstractMessageIOGateway();
00111
00113 virtual ~AbstractMessageIOGateway();
00114
00120 status_t AddOutgoingMessage(const MessageRef & messageRef) {return _hosed ? B_ERROR : _outgoingMessages.AddTail(messageRef);}
00121
00131 int32 DoOutput(uint32 maxBytes = MUSCLE_NO_LIMIT) {return DoOutputImplementation(maxBytes);}
00132
00146 int32 DoInput(AbstractGatewayMessageReceiver & receiver, uint32 maxBytes = MUSCLE_NO_LIMIT)
00147 {
00148 receiver.DoInputBegins();
00149 int32 ret = DoInputImplementation(receiver, maxBytes);
00150 receiver.DoInputEnds();
00151 return ret;
00152 }
00153
00160 virtual bool IsReadyForInput() const;
00161
00168 virtual bool HasBytesToOutput() const = 0;
00169
00176 virtual uint64 GetOutputStallLimit() const;
00177
00181 virtual void Shutdown();
00182
00189 virtual void Reset();
00190
00198 void SetFlushOnEmpty(bool flush);
00199
00201 bool GetFlushOnEmpty() const {return _flushOnEmpty;}
00202
00204 Queue<MessageRef> & GetOutgoingMessageQueue() {return _outgoingMessages;}
00205
00207 const Queue<MessageRef> & GetOutgoingMessageQueue() const {return _outgoingMessages;}
00208
00212 void SetDataIO(const DataIORef & ref) {_ioRef = ref;}
00213
00215 const DataIORef & GetDataIO() const {return _ioRef;}
00216
00218 bool IsHosed() const {return _hosed;}
00219
00245 virtual status_t ExecuteSynchronousMessaging(AbstractGatewayMessageReceiver * optReceiver, uint64 timeoutPeriod = MUSCLE_TIME_NEVER);
00246
00247 protected:
00256 virtual int32 DoOutputImplementation(uint32 maxBytes = MUSCLE_NO_LIMIT) = 0;
00257
00270 virtual int32 DoInputImplementation(AbstractGatewayMessageReceiver & receiver, uint32 maxBytes = MUSCLE_NO_LIMIT) = 0;
00271
00284 status_t EnsureBufferSize(uint8 ** bufPtr, uint32 * bufSize, uint32 desiredSize, uint32 copySize);
00285
00294 void FreeLargeBuffer(uint8 ** bufPtr, uint32 * bufSize);
00295
00297 void SetHosed() {_hosed = true;}
00298
00299 protected:
00301 virtual bool IsStillAwaitingSynchronousMessagingReply() const {return HasBytesToOutput();}
00302
00304 virtual void SynchronousMessageReceivedFromGateway(const MessageRef & msg, void * userData, AbstractGatewayMessageReceiver & r) {r.MessageReceivedFromGateway(msg, userData);}
00305
00307 virtual void SynchronousAfterMessageReceivedFromGateway(const MessageRef & msg, void * userData, AbstractGatewayMessageReceiver & r) {r.AfterMessageReceivedFromGateway(msg, userData);}
00308
00310 virtual void SynchronousBeginMessageReceivedFromGatewayBatch(AbstractGatewayMessageReceiver & r) {r.BeginMessageReceivedFromGatewayBatch();}
00311
00313 virtual void SynchronousEndMessageReceivedFromGatewayBatch(AbstractGatewayMessageReceiver & r) {r.EndMessageReceivedFromGatewayBatch();}
00314
00315 private:
00316 friend class ScratchProxyReceiver;
00317 Queue<MessageRef> _outgoingMessages;
00318
00319 DataIORef _ioRef;
00320
00321 bool _hosed;
00322 bool _flushOnEmpty;
00323
00324 friend class ReflectServer;
00325 };
00326
00327 typedef Ref<AbstractMessageIOGateway> AbstractMessageIOGatewayRef;
00328
00329 END_NAMESPACE(muscle);
00330
00331 #endif