Changeset 20997

Show
Ignore:
Timestamp:
01/12/08 09:22:32 (8 months ago)
Author:
patrick
Message:

Extended vpr::SocketStream? to support "corked" or no-push writes. On
platforms that define the socket option TCP_CORK or TCP_NOPUSH, this is done
at the OS kernel level. On all others, it is done in userland code through
the new class vpr::NoPushWriter?. That class uses an internal buffer that
grows to accommodate write operations while the socket is corked. The manner
by which said buffer grows is determined using a simple callable that
implements the Strategy Pattern. I have written two very basic strategies,
and I hope to add a third based on average allocation sizes once I wrap my
head around the specific behavior that is required.

A vpr::SocketStream? object can be corked and uncorked by calling
vpr::SocketStream::setNoPush() with the parameter true or false respectively.

Rolled the version to 2.1.9.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • juggler/trunk/modules/vapor/ChangeLog

    r20887 r20997  
    11DATE       AUTHOR   CHANGE 
    22---------- -------- ----------------------------------------------------------- 
     32008-01-12 patrick  Extended vpr::SocketStream to support "corked" or no-push 
     4                    writing. 
     5                    NEW VERSION: 2.1.9 
    362007-10-30 patrick  Improved const correctness in the statistics collection 
    47                    classes vpr::BandwidthIOStatsStrategy and 
  • juggler/trunk/modules/vapor/VERSION

    r20887 r20997  
     12.1.9-0 @01/12/2008 15:20:00 UTC@ 
    122.1.8-0 @10/30/2007 20:15:00 UTC@ 
    232.1.7-0 @09/16/2007 01:55:00 UTC@ 
  • juggler/trunk/modules/vapor/configure.ac

    r20974 r20997  
    476476                        [ac_cv_have_sin_len='yes'], 
    477477                        [ac_cv_have_sin_len='no'])]) 
     478 
     479    AC_CACHE_CHECK([for TCP_CORK], [ac_cv_has_tcp_cork], 
     480        [AC_TRY_COMPILE([ 
     481#include <sys/types.h> 
     482#include <netinet/tcp.h> 
     483], 
     484                        [int cork = TCP_CORK; ], 
     485                        [ac_cv_has_tcp_cork='yes'], 
     486                        [ac_cv_has_tcp_cork='no'])]) 
     487 
     488    AH_TEMPLATE([HAVE_CORKABLE_TCP], 
     489                [Define if TCP_CORK or TCP_NOPUSH is defined in netint/tcp.h]) 
     490 
     491    # If TCP_CORK is not defined, check for TCP_NOPUSH. 
     492    if test "x$ac_cv_has_tcp_cork" = "xno" ; then 
     493       AC_CACHE_CHECK([for TCP_NOPUSH], [ac_cv_has_tcp_nopush], 
     494           [AC_TRY_COMPILE([ 
     495#include <sys/types.h> 
     496#include <netinet/tcp.h> 
     497], 
     498                           [int np = TCP_NOPUSH; ], 
     499                           [ac_cv_has_tcp_nopush='yes'], 
     500                           [ac_cv_has_tcp_nopush='no'])]) 
     501 
     502       # TCP_NOPUSH is defined. 
     503       if test "x$ac_cv_has_tcp_nopush" = "xyes" ; then 
     504           AC_DEFINE(HAVE_CORKABLE_TCP) 
     505       fi 
     506    # TCP_CORK is defined. 
     507    else 
     508        AC_DEFINE(HAVE_CORKABLE_TCP) 
     509    fi 
     510 
    478511 
    479512    AH_TEMPLATE([HAVE_SIN_LEN], [Define if struct sockaddr_in has sin_len]) 
  • juggler/trunk/modules/vapor/test/TestSuite/TestCases/IO/Socket/Makefile.in

    r20974 r20997  
    4141top_srcdir=     @top_srcdir@ 
    4242 
    43 SRCS=           InetAddrTest.cpp 
     43SRCS=           InetAddrTest.cpp                        \ 
     44                NoPushWriterTest.cpp                    \ 
     45                SimpleAllocationStrategiesTest.cpp 
    4446 
    4547include $(MKPATH)/dpp.obj.mk 
  • juggler/trunk/modules/vapor/vpr/IO/Socket/Makefile.in

    r20974 r20997  
    5454                ConnectionRefusedException.cpp  \ 
    5555                ConnectionResetException.cpp    \ 
     56                NoPushWriter.cpp                \ 
    5657                NoRouteToHostException.cpp      \ 
    5758                SocketAcceptor.cpp              \ 
  • juggler/trunk/modules/vapor/vpr/IO/Socket/SocketOptions.h

    r20974 r20997  
    7272      McastLoopback,    /**< Multicast loopback option */ 
    7373      NoDelay,          /**< TCP "no delay" option (Nagle Algorithm) */ 
     74      NoPush,           /**< TCP "corking" option. @since 2.1.9 */ 
    7475      MaxSegment,       /**< TCP maximum segment size option */ 
    7576            /** Implementation specific */ 
     
    109110      bool          keep_alive; 
    110111      bool          no_delay; 
     112      bool          no_push;            /**< @since 2.1.9 */ 
    111113      bool          broadcast; 
    112114      size_t        max_segment; 
  • juggler/trunk/modules/vapor/vpr/IO/Socket/SocketStreamOpt.cpp

    r20974 r20997  
    4343 
    4444SocketStreamOpt::SocketStreamOpt() 
     45   : mCorked(false) 
    4546{ 
    4647   /* Do nothing. */ ; 
     
    5253} 
    5354 
     55bool SocketStreamOpt::getNoPush() const 
     56{ 
     57#if defined(HAVE_CORKABLE_TCP) 
     58   vpr::SocketOptions::Data option; 
     59 
     60   getOption(vpr::SocketOptions::NoPush, option); 
     61   return option.no_push; 
     62#else 
     63   return mCorked; 
     64#endif 
    5465} 
     66 
     67void SocketStreamOpt::setNoPush(const bool enableVal) 
     68{ 
     69#if defined(HAVE_CORKABLE_TCP) 
     70   vpr::SocketOptions::Data option; 
     71   option.no_push = enableVal; 
     72   setOption(vpr::SocketOptions::NoPush, option); 
     73#else 
     74   if ( enableVal != mCorked ) 
     75   { 
     76      // Changing from the uncorked state to the corked state. 
     77      if ( enableVal ) 
     78      { 
     79         cork(); 
     80      } 
     81      // Changing from the corked state to the uncorked state. 
     82      else 
     83      { 
     84         uncork(); 
     85      } 
     86   } 
     87#endif 
     88 
     89   mCorked = enableVal; 
     90} 
     91 
     92} 
  • juggler/trunk/modules/vapor/vpr/IO/Socket/SocketStreamOpt.h

    r20974 r20997  
    8787   } 
    8888 
     89   /** @name Nagle Algorithm Control */ 
     90   //@{ 
    8991   /** 
    9092    * Gets the current no-delay status for this socket.  If no-delay is true, 
    91     * then the Nagel algorithm has been disabled. 
     93    * then the Nagle algorithm has been disabled. 
    9294    * 
    9395    * @return \c true is returned if the Nabel algorithm is \em not being used 
    94     *         to delay the transmission of TCP segments. Otherwise, the Nagel 
     96    *         to delay the transmission of TCP segments. Otherwise, the Nagle 
    9597    *         algorithm is delaying the transmission. 
    9698    */ 
     
    105107   /** 
    106108    * Sets the current no-delay status for this socket.  If no-delay is true, 
    107     * then the Nagel algorithm will be disabled. 
     109    * then the Nagle algorithm will be disabled. 
    108110    * 
    109111    * @param enableVal The Boolean enable/disable state for no-delay on this 
     
    116118      setOption(vpr::SocketOptions::NoDelay, option); 
    117119   } 
     120   //@} 
     121 
     122   /** @name TCP Corking Interface */ 
     123   //@{ 
     124   /** 
     125    * Gets the state of the "no push" or "corking" for this socket. While a 
     126    * TCP socket is in the corked state, write operations are queued for 
     127    * later transmission when the socket is uncorked (that is, when the 
     128    * no-push state is disabled). 
     129    * 
     130    * @return true is returned if this socket is currently in the no-push 
     131    *         (corked) state. false is returned otherwise. 
     132    * 
     133    * @since 2.1.9 
     134    */ 
     135   bool getNoPush() const; 
     136 
     137   /** 
     138    * Enables or disables the no-push (or "corked") state. 
     139    * 
     140    * @post If this socket was previously in the corked state and the value of 
     141    *       \p enableVal is false, then all queued buffer writes will be 
     142    *       performed. 
     143    * 
     144    * @param enableVal A boolean value indicating whether this socket should 
     145    *                  be in the no-push (corked) state. 
     146    * 
     147    * @since 2.1.9 
     148    */ 
     149   void setNoPush(const bool enableVal); 
     150   //@} 
     151 
     152protected: 
     153   /** @name Socket Corking Interface */ 
     154   //@{ 
     155   /** 
     156    * Template method used to inform the platform-specific implementation when 
     157    * this socket is put into the no-push (corked) state. This method is only 
     158    * used for platforms that lack TCP corking as a socket option. 
     159    * 
     160    * @pre setNoPush() was invoked with the enable state value set to \c true. 
     161    * 
     162    * @see setNoPush() 
     163    * 
     164    * @since 2.1.9 
     165    */ 
     166   virtual void cork() = 0; 
     167 
     168   /** 
     169    * Template method used to inform the platform-specific implementation when 
     170    * this socket is changed from the no-push (corked) state to the regular, 
     171    * "uncorked" state. This method is only used for platforms that lack TCP 
     172    * corking as a socket option. 
     173    * 
     174    * @pre setNoPush() was invoked with the enable state value set to \c false. 
     175    * 
     176    * @see setNoPush() 
     177    * 
     178    * @since 2.1.9 
     179    */ 
     180   virtual void uncork() = 0; 
     181   //@} 
     182 
     183private: 
     184   bool mCorked; 
    118185}; 
    119186 
  • juggler/trunk/modules/vapor/vpr/IO/Socket/SocketStream_t.h

    r20994 r20997  
    245245   } 
    246246 
     247   /** 
     248    * @name TCP Corking Interface Implementation 
     249    * 
     250    * More about TCP corking can be found here: http://www.baus.net/on-tcp_cork 
     251    */ 
     252   //@{ 
     253   /** 
     254    * Changes the strategy used for determining hwo much memory to allocate 
     255    * for the corking buffer. 
     256    * 
     257    * @param strategy A callable object used to determine how much memory to 
     258    *                 allocate when the corking buffer needs to grow. 
     259    * 
     260    * @note This only has bearing on platforms (such as Windows) that do not 
     261    *       implement the no-push ("corking") option on TCP sockets. 
     262    * 
     263    * @since 2.1.9 
     264    */ 
     265   void setCorkAllocStrategy(const NoPushAllocStrategy& s) 
     266   { 
     267      mSocketStreamImpl->setCorkAllocStrategy(s); 
     268   } 
     269 
     270   /** 
     271    * Calls through to the cork() method of the internal stream socket 
     272    * implementation. 
     273    * 
     274    * @since 2.1.9 
     275    * 
     276    * @see vpr::SocketStreamImplBSD::cork() 
     277    * @see vpr::SocketStreamImplNSPR::cork() 
     278    */ 
     279   virtual void cork() 
     280   { 
     281      mSocketStreamImpl->cork(); 
     282   } 
     283 
     284   /** 
     285    * Calls through to the uncork() method of the internal stream socket 
     286    * implementation. 
     287    * 
     288    * @since 2.1.9 
     289    * 
     290    * @see vpr::SocketStreamImplBSD::uncork() 
     291    * @see vpr::SocketStreamImplNSPR::uncork() 
     292    */ 
     293   virtual void uncork() 
     294   { 
     295      mSocketStreamImpl->uncork(); 
     296   } 
     297   //@} 
     298 
    247299// Put in back door for simulator 
    248300#if VPR_IO_DOMAIN_INCLUDE == VPR_DOMAIN_SIMULATOR 
  • juggler/trunk/modules/vapor/vpr/md/NSPR/IO/Socket/SocketImplNSPR.cpp

    r20974 r20997  
    688688         opt_data.option = PR_SockOpt_MaxSegment; 
    689689         break; 
     690      case vpr::SocketOptions::NoPush: 
     691         vprASSERT(false && "Should not have gotten here with NSPR sockets"); 
     692         break; 
    690693      default: 
    691694         throw SocketException("Unsupported option passed to getOption().", 
     
    769772               data.max_segment = opt_data.value.max_segment; 
    770773               break; 
     774            case vpr::SocketOptions::NoPush: 
     775               vprASSERT(false && 
     776                         "Should not have gotten here with NSPR sockets"); 
     777               break; 
    771778            case vpr::SocketOptions::AddMember: 
    772779            case vpr::SocketOptions::DropMember: 
     
    885892         opt_data.option            = PR_SockOpt_MaxSegment; 
    886893         opt_data.value.max_segment = data.max_segment; 
     894         break; 
     895      case vpr::SocketOptions::NoPush: 
     896         vprASSERT(false && "Should not have gotten here with NSPR sockets"); 
    887897         break; 
    888898      default: 
  • juggler/trunk/modules/vapor/vpr/md/NSPR/IO/Socket/SocketImplNSPR.h

    r20989 r20997  
    399399    *           Thrown if the socket write operation failed. 
    400400    */ 
    401    vpr::Uint32 write_i(const void* buffer, const vpr::Uint32 length, 
    402                        const vpr::Interval& timeout); 
     401   virtual vpr::Uint32 write_i(const void* buffer, const vpr::Uint32 length, 
     402                               const vpr::Interval& timeout); 
    403403 
    404404   /** 
     
    444444    *       this socket. 
    445445    */ 
    446    ~SocketImplNSPR(); 
     446   virtual ~SocketImplNSPR(); 
    447447 
    448448protected: 
  • juggler/trunk/modules/vapor/vpr/md/NSPR/IO/Socket/SocketStreamImplNSPR.cpp

    r20974 r20997  
    4343#include <vpr/IO/WouldBlockException.h> 
    4444#include <vpr/IO/TimeoutException.h> 
     45#include <vpr/IO/Socket/SimpleAllocationStrategy.h> 
    4546#include <vpr/Util/Error.h> 
     47#include <vpr/Util/Debug.h> 
    4648#include <vpr/md/NSPR/NSPRHelpers.h> 
    4749#include <vpr/md/NSPR/IO/Socket/SocketStreamImplNSPR.h> 
     
    6062SocketStreamImplNSPR::SocketStreamImplNSPR() 
    6163   : SocketImplNSPR(vpr::SocketTypes::STREAM) 
     64   , mCorked(false) 
     65   , mCorkedWriter(doublingAllocationStrategy) 
    6266{ 
    6367   /* Do nothing. */ ; 
     
    7074                                           const vpr::InetAddr& remoteAddr) 
    7175   : SocketImplNSPR(localAddr, remoteAddr, vpr::SocketTypes::STREAM) 
     76   , mCorked(false) 
     77   , mCorkedWriter(doublingAllocationStrategy) 
    7278{ 
    7379   ; 
     
    7783SocketStreamImplNSPR::SocketStreamImplNSPR(const SocketStreamImplNSPR& sock) 
    7884   : SocketImplNSPR(sock) 
     85   , mCorked(sock.mCorked) 
     86   , mCorkedWriter(sock.mCorkedWriter) 
    7987{ 
    8088   /* Just call base class */ ; 
     89} 
     90 
     91SocketStreamImplNSPR::~SocketStreamImplNSPR() 
     92{ 
     93   /* Do nothing. */ ; 
    8194} 
    8295 
     
    182195} 
    183196 
     197vpr::Uint32 SocketStreamImplNSPR::write_i(const void* buffer, 
     198                                          const vpr::Uint32 length, 
     199                                          const vpr::Interval& timeout) 
     200{ 
     201   vpr::Uint32 written(0); 
     202 
     203   if ( mCorked ) 
     204   { 
     205      written = mCorkedWriter.write(buffer, length); 
     206   } 
     207   else 
     208   { 
     209      written = SocketImplNSPR::write_i(buffer, length, timeout); 
     210   } 
     211 
     212   return written; 
     213} 
     214 
     215void SocketStreamImplNSPR::cork() 
     216{ 
     217   mCorked = true; 
     218} 
     219 
     220void SocketStreamImplNSPR::uncork() 
     221{ 
     222   if ( mCorked ) 
     223   { 
     224      mCorked = false; 
     225 
     226      if ( mCorkedWriter.getBufferUse() > 0 ) 
     227      { 
     228         SocketImplNSPR::write_i(mCorkedWriter.getBuffer(), 
     229                                 mCorkedWriter.getBufferUse(), 
     230                                 vpr::Interval::NoTimeout); 
     231      } 
     232 
     233      mCorkedWriter.flush(); 
     234   } 
     235} 
     236 
    184237} // End of vpr namespace 
  • juggler/trunk/modules/vapor/vpr/md/NSPR/IO/Socket/SocketStreamImplNSPR.h

    r20974 r20997  
    4141#include <prio.h> 
    4242 
     43#include <vpr/IO/Socket/NoPushWriter.h> 
    4344#include <vpr/md/NSPR/IO/Socket/SocketImplNSPR.h> 
    44 #include <vpr/Util/Debug.h> 
     45 
    4546 
    4647namespace vpr 
     
    5253 * in conjunction with vpr::SocketConfiguration to create the typedef 
    5354 * vpr::SocketStream. 
     55 * 
     56 * @see vpr::NoPushWriter 
    5457 */ 
    5558class VPR_CLASS_API SocketStreamImplNSPR : public SocketImplNSPR 
     
    8790    */ 
    8891   SocketStreamImplNSPR(const SocketStreamImplNSPR& sock); 
     92 
     93   virtual ~SocketStreamImplNSPR(); 
    8994 
    9095   /** 
     
    130135   void accept(SocketStreamImplNSPR& sock, 
    131136               const vpr::Interval timeout = vpr::Interval::NoTimeout); 
     137 
     138   /** 
     139    * Implementation of the write() template method. This writes the buffer 
     140    * to the socket. If currently in the no-push (corked) state, then the 
     141    * write is delayed until the state is changed back to the regular 
     142    * uncorked state. 
     143    * 
     144    * @pre The socket is open for writing. 
     145    * @post If not corked, the given buffer is written to the I/O socket. If 
     146    *       corked, the buffer will be queued for later transmission. The 
     147    *       number of bytes written successfully is returned to the caller. 
     148    * 
     149    * @param buffer  A pointer to the buffer to be written. 
     150    * @param length  The length of the buffer. 
     151    * @param timeout The maximum amount of time to wait for data to be 
     152    *                available for writing. 
     153    * 
     154    * @return The number of bytes written to the socket is returned. 
     155    * 
     156    * @throw vpr::ConnectionResetException 
     157    *           Thrown if connection is reset. 
     158    * @throw vpr::NoRouteToHostException 
     159    *           Thrown if a route to host does not exist. 
     160    * @throw vpr::UnknownHostException 
     161    *           Thrown if host does not exist. 
     162    * @throw vpr::IOException 
     163    *           Thrown if the network is down. 
     164    * @throw vpr::WouldBlockException 
     165    *           Thrown if the handle is in non-blocking mode, and the write 
     166    *           operation could not be completed. 
     167    * @throw vpr::TimeoutException 
     168    *           Thrown if the write could not begin within the timeout 
     169    *           interval. 
     170    * @throw vpr::SocketException 
     171    *           Thrown if the write operation failed. 
     172    * @throw vpr::IOException 
     173    *           Thrown if the file handle write operation failed. 
     174    * @throw vpr::Exception 
     175    *           Thrown if corking is enabled and, in the event that resizing 
     176    *           the corking buffer is necessary, the new buffer allocation 
     177    *           size returned by the allocation strategy is insufficiently 
     178    *           large. 
     179    * @throw std::bad_alloc 
     180    * 
     181    * @see cork() 
     182    * @see vpr::NoPushWriter::write() 
     183    * 
     184    * @since 2.1.9 
     185    */ 
     186   vpr::Uint32 write_i(const void* buffer, const vpr::Uint32 length, 
     187                       const vpr::Interval& timeout); 
     188 
     189   /** 
     190    * @name TCP Corking Interface 
     191    * 
     192    * @note These methods are not meant to be called by user-level code and 
     193    *       may at some point be changed to have private visiblity. Since this 
     194    *       class itself is not supposed to be used directly be user-level 
     195    *       code, this is not seen as a high priority change, however. 
     196    */ 
     197   //@{ 
     198   /** 
     199    * Corks this socket for platforms that lack support for TCP corking as a 
     200    * socket option. Until this socket is uncorked, all write/send operations 
     201    * will queue up the data instead of putting it on the wire. On all other 
     202    * platforms, this method does nothing. 
     203    * 
     204    * @post \c mCorked is set to true. 
     205    * 
     206    * @since 2.1.9 
     207    */ 
     208   void cork(); 
     209 
     210   /** 
     211    * Uncorks this socket and writes all pending data to the wire. 
     212    * 
     213    * @post \c mCorked is set to false and \c mCorkedWriter is flushed. 
     214    * 
     215    * @since 2.1.9 
     216    */ 
     217   void uncork(); 
     218 
     219   /** 
     220    * Changes the strategy used for determining hwo much memory to allocate 
     221    * for the corking buffer. 
     222    * 
     223    * @param strategy A callable object used to determine how much memory to 
     224    *                 allocate when the corking buffer needs to grow. 
     225    * 
     226    * @since 2.1.9 
     227    */ 
     228   void setCorkAllocStrategy(const NoPushAllocStrategy& strategy) 
     229   { 
     230      mCorkedWriter.setAllocStrategy(strategy); 
     231   } 
     232   //@} 
     233 
     234private: 
     235   bool mCorked;        /**< "Corked" state for the internal TCP socket */ 
     236 
     237   NoPushWriter mCorkedWriter; 
    132238}; 
    133239 
  • juggler/trunk/modules/vapor/vpr/md/POSIX/IO/Socket/SocketImplBSD.cpp

    r20974 r20997  
    4848#include <errno.h> 
    4949#include <boost/concept_check.hpp> 
     50#include <boost/static_assert.hpp> 
    5051 
    5152#include <vpr/IO/Socket/ConnectionResetException.h> 
     
    6364namespace 
    6465{ 
     66 
     67#if defined(HAVE_CORKABLE_TCP) 
     68#  if defined(TCP_CORK) 
     69const int VPR_TCP_CORK(TCP_CORK); 
     70#  elif defined(TCP_NOPUSH) 
     71const int VPR_TCP_CORK(TCP_NOPUSH); 
     72#  else 
     73// If HAVE_CORKABLE_TCP is defined, either TCP_CORK or TCP_NOPUSH must be 
     74// defined. 
     75BOOST_STATIC_ASSERT(false); 
     76#  endif 
     77#endif 
    6578 
    6679// Given an error number (or errno) build up an exception with the 
     
    666679         opt_size  = sizeof(opt_data.size); 
    667680         break; 
     681      case vpr::SocketOptions::NoPush: 
     682#if defined(HAVE_CORKABLE_TCP) 
     683         opt_level = IPPROTO_TCP; 
     684         opt_name  = VPR_TCP_CORK; 
     685         opt_size  = sizeof(int); 
     686#else 
     687         // Maybe this should be a compile-time assertion. 
     688         vprASSERT(false && "Should not have gotten here without TCP corking"); 
     689#endif 
     690         break; 
    668691 
    669692      // BSD specific 
     
    736759         case vpr::SocketOptions::NoDelay: 
    737760            data.no_delay = (opt_data.enabled != 0 ? true : false); 
     761            break; 
     762         case vpr::SocketOptions::NoPush: 
     763            data.no_push = (opt_data.enabled != 0 ? true : false); 
    738764            break; 
    739765         case vpr::SocketOptions::Broadcast: 
     
    905931         opt_size      = sizeof(size_t); 
    906932         break; 
     933      case vpr::SocketOptions::NoPush: 
     934#if defined(HAVE_CORKABLE_TCP) 
     935         opt_level        = IPPROTO_TCP; 
     936         opt_name         = VPR_TCP_CORK; 
     937         opt_data.enabled = (data.no_push ? 1 : 0); 
     938         opt_size         = sizeof(int); 
     939#else 
     940         // Maybe this should be a compile-time assertion. 
     941         vprASSERT(false && "Should not have gotten here without TCP corking"); 
     942#endif 
     943         break; 
    907944 
    908945      // Unsetable 
  • juggler/trunk/modules/vapor/vpr/md/POSIX/IO/Socket/SocketImplBSD.h

    r20989 r20997  
    379379    *           Thrown if the file handle write operation failed. 
    380380    */ 
    381    vpr::Uint32 write_i(const void* buffer, const vpr::Uint32 length, 
    382                        const vpr::Interval& timeout); 
     381   virtual vpr::Uint32 write_i(const void* buffer, const vpr::Uint32 length, 
     382                               const vpr::Interval& timeout); 
    383383 
    384384   vpr::Uint32 availableBytes() const 
     
    421421    * @post The memory for mHandle is deleted. 
    422422    */ 
    423    ~SocketImplBSD(); 
     423   virtual ~SocketImplBSD(); 
    424424 
    425425protected: 
  • juggler/trunk/modules/vapor/vpr/md/POSIX/IO/Socket/SocketStreamImplBSD.cpp

    r20974 r20997  
    4444 
    4545#include <vpr/Util/Debug.h> 
    46 #include <vpr/md/POSIX/IO/Socket/SocketStreamImplBSD.h> 
    4746#include <vpr/IO/TimeoutException.h> 
    4847#include <vpr/IO/WouldBlockException.h> 
     48#include <vpr/IO/Socket/SimpleAllocationStrategies.h> 
     49#include <vpr/md/POSIX/IO/Socket/SocketStreamImplBSD.h> 
    4950 
    5051 
     
    5859SocketStreamImplBSD::SocketStreamImplBSD() 
    5960   : SocketImplBSD(vpr::SocketTypes::STREAM) 
     61   , mCorked(false) 
     62   , mCorkedWriter(doublingAllocationStrategy) 
    6063{ 
    6164   /* Do nothing. */ ; 
     
    6568                                         const InetAddr& remoteAddr) 
    6669   : SocketImplBSD(localAddr, remoteAddr, SocketTypes::STREAM) 
     70   , mCorked(false) 
     71   , mCorkedWriter(doublingAllocationStrategy) 
    6772{ 
    6873   /* Do nothing. */ ; 
     
    7176SocketStreamImplBSD::SocketStreamImplBSD(const SocketStreamImplBSD& sock) 
    7277   : SocketImplBSD(sock.mLocalAddr, sock.mRemoteAddr, SocketTypes::STREAM) 
     78   , mCorked(sock.mCorked) 
     79   , mCorkedWriter(sock.mCorkedWriter) 
    7380{ 
    7481   // mHandle is created in the base class constructor. Since we are creating 
     
    8289   mHandle         = new FileHandleImplUNIX(sock.mHandle->getName()); 
    8390   mHandle->mFdesc = sock.mHandle->mFdesc; 
     91} 
     92 
     93SocketStreamImplBSD::~SocketStreamImplBSD() 
     94{ 
     95   /* Do nothing. */ ; 
    8496} 
    8597 
     
    162174} 
    163175 
     176vpr::Uint32 SocketStreamImplBSD::write_i(const void* buffer, 
     177                                         const vpr::Uint32 length, 
     178                                         const vpr::Interval& timeout) 
     179{ 
     180#if ! defined(HAVE_CORKABLE_TCP) 
     181   vpr::Uint32 written(0); 
     182 
     183   if ( mCorked ) 
     184   { 
     185      written = mCorkedWriter.write(buffer, length); 
     186   } 
     187   else 
     188   { 
     189      written = SocketImplBSD::write_i(buffer, length, timeout); 
     190   } 
     191 
     192   return written; 
     193#else 
     194   return SocketImplBSD::write_i(buffer, length, timeout); 
     195#endif 
     196} 
     197 
     198void SocketStreamImplBSD::cork() 
     199{ 
     200#if ! defined(HAVE_CORKABLE_TCP) 
     201   mCorked = true; 
     202#endif 
     203} 
     204 
     205void SocketStreamImplBSD::uncork() 
     206{ 
     207#if ! defined(HAVE_CORKABLE_TCP) 
     208   if ( mCorked ) 
     209   { 
     210      mCorked = false; 
     211 
     212      if ( mCorkedWriter.getBufferUse() > 0 ) 
     213      { 
     214         SocketImplBSD::write_i(mCorkedWriter.getBuffer(), 
     215                                mCorkedWriter.getBufferUse(), 
     216                                vpr::Interval::NoTimeout); 
     217      } 
     218 
     219      mCorkedWriter.flush(); 
     220   } 
     221#endif 
     222} 
     223 
    164224} // End of vpr namespace 
  • juggler/trunk/modules/vapor/vpr/md/POSIX/IO/Socket/SocketStreamImplBSD.h

    r20974 r20997  
    4141#include <string> 
    4242 
     43#include <vpr/IO/Socket/NoPushWriter.h> 
    4344#include <vpr/md/POSIX/IO/Socket/SocketImplBSD.h> 
    4445 
     
    5253 * used in conjunction with vpr::SocketConfiguration to create the typedef 
    5354 * vpr::SocketStream. 
     55 * 
     56 * @see vpr::NoPushWriter 
    5457 */ 
    5558class VPR_CLASS_API SocketStreamImplBSD : public SocketImplBSD 
     
    9295   SocketStreamImplBSD(const SocketStreamImplBSD& sock); 
    9396 
     97   virtual ~SocketStreamImplBSD(); 
     98 
    9499   /** 
    95100    * Puts this socket into the listening state where it listens for 
     
    132137   void accept(SocketStreamImplBSD& sock, 
    133138               vpr::Interval timeout = vpr::Interval::NoTimeout); 
     139 
     140   /** 
     141    * Implementation of the write() template method. This writes the buffer 
     142    * to the socket. If currently in the no-push (corked) state, then the 
     143    * write is delayed until the state is changed back to the regular 
     144    * uncorked state. 
     145    * 
     146    * @pre The socket is open for writing. 
     147    * @post If not corked, the given buffer is written to the I/O socket. If 
     148    *       corked, the buffer will be queued for later transmission. The 
     149    *       number of bytes written successfully is returned to the caller. 
     150    * 
     151    * @param buffer  A pointer to the buffer to be written. 
     152    * @param length  The length of the buffer. 
     153    * @param timeout The maximum amount of time to wait for data to be 
     154    *                available for writing. 
     155    * 
     156    * @return The number of bytes written to the socket is returned. 
     157    * 
     158    * @throw vpr::ConnectionResetException 
     159    *           Thrown if connection is reset. 
     160    * @throw vpr::NoRouteToHostException 
     161    *           Thrown if a route to host does not exist. 
     162    * @throw vpr::UnknownHostException 
     163    *           Thrown if host does not exist. 
     164    * @throw vpr::IOException 
     165    *           Thrown if the network is down. 
     166    * @throw vpr::WouldBlockException 
     167    *           Thrown if the handle is in non-blocking mode, and the write 
     168    *           operation could not be completed. 
     169    * @throw vpr::TimeoutException 
     170    *           Thrown if the write could not begin within the timeout 
     171    *           interval. 
     172    * @throw vpr::SocketException 
     173    *           Thrown if the write operation failed. 
     174    * @throw vpr::IOException 
     175    *           Thrown if the file handle write operation failed. 
     176    * @throw vpr::Exception 
     177    *           Thrown if corking is enabled and, in the event that resizing 
     178    *           the corking buffer is necessary, the new buffer allocation 
     179    *           size returned by the allocation strategy is insufficiently 
     180    *           large. 
     181    * @throw std::bad_alloc 
     182    * 
     183    * @see cork() 
     184    * @see vpr::NoPushWriter::write() 
     185    * 
     186    * @since 2.1.9 
     187    */ 
     188   vpr::Uint32 write_i(const void* buffer, const vpr::Uint32 length, 
     189                       const vpr::Interval& timeout); 
     190 
     191   /** 
     192    * @name TCP Corking Interface 
     193    * 
     194    * @note These methods are not meant to be called by user-level code and 
     195    *       may at some point be changed to have private visiblity. Since this 
     196    *       class itself is not supposed to be used directly be user-level 
     197    *       code, this is not seen as a high priority change, however. 
     198    */ 
     199   //@{ 
     200   /** 
     201    * Corks this socket for platforms that lack support for TCP corking as a 
     202    * socket option. Until this socket is uncorked, all write/send operations 
     203    * will queue up the data instead of putting it on the wire. On all other 
     204    * platforms, this method does nothing. 
     205    * 
     206    * @post For platforms that lack support for TCP corking as a socket 
     207    *       option, \c mCorked is set to true. 
     208    * 
     209    * @since 2.1.9 
     210    */ 
     211   void cork(); 
     212 
     213