AbstractMessageIOGateway.h

00001 /* This file is Copyright 2000-2008 Meyer Sound Laboratories Inc.  See the included LICENSE.txt file for details. */
00002 
00003 #ifndef MuscleAbstractMessageIOGateway_h
00004 #define MuscleAbstractMessageIOGateway_h
00005 
00006 #include "dataio/DataIO.h"
00007 #include "message/Message.h"
00008 #include "util/Queue.h"
00009 #include "util/RefCount.h"
00010 #include "util/NetworkUtilityFunctions.h"
00011 #include "util/PulseNode.h"
00012 
00013 BEGIN_NAMESPACE(muscle);
00014 
00015 class AbstractMessageIOGateway;
00016 
00018 class AbstractGatewayMessageReceiver 
00019 {
00020 public:
00022    AbstractGatewayMessageReceiver() : _inBatch(false), _doInputCount(0) {/* empty */}
00023 
00025    virtual ~AbstractGatewayMessageReceiver() {/* empty */}
00026 
00034    void CallMessageReceivedFromGateway(const MessageRef & msg, void * userData = NULL)
00035    {
00036       if ((_doInputCount > 0)&&(_inBatch == false))
00037       {
00038          _inBatch = true;
00039          BeginMessageReceivedFromGatewayBatch();
00040       }
00041       MessageReceivedFromGateway(msg, userData);
00042       AfterMessageReceivedFromGateway(msg, userData);
00043    }
00044 
00045 protected:
00050    virtual void MessageReceivedFromGateway(const MessageRef & msg, void * userData) = 0;
00051 
00058    virtual void AfterMessageReceivedFromGateway(const MessageRef & msg, void * userData) {(void) msg; (void) userData;}
00059 
00063    virtual void BeginMessageReceivedFromGatewayBatch() {/* empty */}
00064 
00068    virtual void EndMessageReceivedFromGatewayBatch() {/* empty */}
00069 
00070 private:
00071    friend class AbstractMessageIOGateway;
00072 
00073    void DoInputBegins() {_doInputCount++;}
00074    void DoInputEnds()
00075    {
00076       if ((--_doInputCount == 0)&&(_inBatch))
00077       {
00078          EndMessageReceivedFromGatewayBatch();
00079          _inBatch = false;
00080       }
00081    }
00082 
00083    bool _inBatch;
00084    uint32 _doInputCount;
00085 };
00086 
00092 class QueueGatewayMessageReceiver : public AbstractGatewayMessageReceiver, public Queue<MessageRef>
00093 {
00094 public:
00096    QueueGatewayMessageReceiver() {/* empty */}
00097 
00098 protected:
00099    virtual void MessageReceivedFromGateway(const MessageRef & msg, void * userData) {(void) userData; (void) AddTail(msg);}
00100 };
00101 
00106 class AbstractMessageIOGateway : public RefCountable, public PulseNode
00107 {
00108 public:
00110    AbstractMessageIOGateway();
00111 
00113    virtual ~AbstractMessageIOGateway();
00114 
00120    status_t AddOutgoingMessage(const MessageRef & messageRef) {return _hosed ? B_ERROR : _outgoingMessages.AddTail(messageRef);}
00121 
00131    int32 DoOutput(uint32 maxBytes = MUSCLE_NO_LIMIT) {return DoOutputImplementation(maxBytes);}
00132 
00146    int32 DoInput(AbstractGatewayMessageReceiver & receiver, uint32 maxBytes = MUSCLE_NO_LIMIT) 
00147    {
00148       receiver.DoInputBegins();
00149       int32 ret = DoInputImplementation(receiver, maxBytes); 
00150       receiver.DoInputEnds();
00151       return ret;
00152    }
00153 
00160    virtual bool IsReadyForInput() const;
00161 
00168    virtual bool HasBytesToOutput() const = 0;
00169 
00176    virtual uint64 GetOutputStallLimit() const;
00177 
00181    virtual void Shutdown();
00182 
00189    virtual void Reset();
00190 
00198    void SetFlushOnEmpty(bool flush);
00199 
00201    bool GetFlushOnEmpty() const {return _flushOnEmpty;}
00202 
00204    Queue<MessageRef> & GetOutgoingMessageQueue() {return _outgoingMessages;}
00205 
00207    const Queue<MessageRef> & GetOutgoingMessageQueue() const {return _outgoingMessages;}
00208 
00212    void SetDataIO(const DataIORef & ref) {_ioRef = ref;}
00213 
00215    const DataIORef & GetDataIO() const {return _ioRef;}
00216 
00218    bool IsHosed() const {return _hosed;}
00219 
00245    virtual status_t ExecuteSynchronousMessaging(AbstractGatewayMessageReceiver * optReceiver, uint64 timeoutPeriod = MUSCLE_TIME_NEVER);
00246 
00247 protected:
00256    virtual int32 DoOutputImplementation(uint32 maxBytes = MUSCLE_NO_LIMIT) = 0;
00257 
00270    virtual int32 DoInputImplementation(AbstractGatewayMessageReceiver & receiver, uint32 maxBytes = MUSCLE_NO_LIMIT) = 0;
00271 
00284    status_t EnsureBufferSize(uint8 ** bufPtr, uint32 * bufSize, uint32 desiredSize, uint32 copySize);
00285 
00294    void FreeLargeBuffer(uint8 ** bufPtr, uint32 * bufSize);
00295 
00297    void SetHosed() {_hosed = true;}
00298   
00299 protected:
00301    virtual bool IsStillAwaitingSynchronousMessagingReply() const {return HasBytesToOutput();}
00302 
00304    virtual void SynchronousMessageReceivedFromGateway(const MessageRef & msg, void * userData, AbstractGatewayMessageReceiver & r) {r.MessageReceivedFromGateway(msg, userData);}
00305 
00307    virtual void SynchronousAfterMessageReceivedFromGateway(const MessageRef & msg, void * userData, AbstractGatewayMessageReceiver & r) {r.AfterMessageReceivedFromGateway(msg, userData);}
00308 
00310    virtual void SynchronousBeginMessageReceivedFromGatewayBatch(AbstractGatewayMessageReceiver & r) {r.BeginMessageReceivedFromGatewayBatch();}
00311 
00313    virtual void SynchronousEndMessageReceivedFromGatewayBatch(AbstractGatewayMessageReceiver & r) {r.EndMessageReceivedFromGatewayBatch();}
00314 
00315 private:
00316    friend class ScratchProxyReceiver;
00317    Queue<MessageRef> _outgoingMessages;
00318 
00319    DataIORef _ioRef;
00320 
00321    bool _hosed;  // set true on error
00322    bool _flushOnEmpty;
00323 
00324    friend class ReflectServer;
00325 };
00326 
00327 typedef Ref<AbstractMessageIOGateway> AbstractMessageIOGatewayRef;
00328 
00329 END_NAMESPACE(muscle);
00330 
00331 #endif

Generated on Thu Jun 5 17:47:52 2008 for MUSCLE by  doxygen 1.5.1