Changeset 19808

Show
Ignore:
Timestamp:
02/13/07 15:39:13 (2 years ago)
Author:
patrick
Message:

Replaced the implementation of vpr::CondVarWin?32 that was modeled after
condition_impl from Boost.Threads with much simpler, more understandable
implementation. This new implementation comes from an article written by
Douglas C. Schmidt and Irfan Pyarali showing how to create an interface
for condition variables on Windows that looks like pthreads condition
variables.

Bumped version to 1.1.40.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • juggler/trunk/modules/vapor/ChangeLog

    r19785 r19808  
    11DATE       AUTHOR   CHANGE 
    22---------- -------- ----------------------------------------------------------- 
     32007-02-13 patrick  Reimplemented vpr::CondVarWin32 using a simpler approach. 
     4                    NEW VERSION: 1.1.40 
    352007-01-30 patrick  Eliminated the need to set VPR_BASE_DIR on Windows to run 
    46                    applications. 
  • juggler/trunk/modules/vapor/VERSION

    r19785 r19808  
     11.1.40-0 @02/13/2007 21:40:00 UTC@ 
    121.1.39-0 @01/30/2007 22:30:00 UTC@ 
    231.1.38-0 @12/13/2006 16:50:00 UTC@ 
  • juggler/trunk/modules/vapor/vpr/md/WIN32/Sync/CondVarWin32.cpp

    r19729 r19808  
    3333 * 
    3434 *************** <auto-copyright.pl END do not edit this line> ***************/ 
     35 
     36// This must be defined to 0x0400 or higher in order to have the function 
     37// SignalObjectAndWait() declared. 
     38#define _WIN32_WINNT 0x0400 
    3539 
    3640#include <vpr/vprConfig.h> 
     
    5256 
    5357CondVarWin32::CondVarWin32(MutexWin32* mutex) 
    54    : mGate(NULL) 
    55    , mQueue(NULL) 
    56    , mMutex(NULL) 
    57    , mGone(0) 
    58    , mBlocked(0) 
    59    , mWaiting(0) 
     58   : mWaitersCount(0) 
     59   , mSema(NULL) 
     60   , mWaitersDone(NULL) 
     61   , mWasBroadcast(false) 
    6062   , mCondMutex(NULL) 
    6163{ 
    62    // If the caller did not specify a mutex variable to use with 
    63    // the condition variable, use mDefaultMutex. 
    64    if ( mutex == NULL
     64   // If the caller did not specify a mutex variable to use with the 
     65   // condition variable, use mDefaultMutex. 
     66   if ( NULL == mutex
    6567   { 
    6668      mutex = &mDefaultMutex; 
    6769   } 
    6870 
    69    mGate  = CreateSemaphore(NULL, 1, 1, NULL); 
    70    mQueue = CreateSemaphore(NULL, 0, (std::numeric_limits<long>::max)(), 
    71                             NULL); 
    72    mMutex = CreateMutex(0, 0, 0); 
    73  
    74    if ( ! mGate || ! mQueue || ! mMutex ) 
     71   // Create an unnamed semaphore with no security initially set to 0 with a 
     72   // maximum count of 0x7fffffff. 
     73   mSema = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); 
     74 
     75   if ( NULL == mSema ) 
    7576   { 
    7677      const std::string err_msg(vpr::Error::getCurrentErrorMsg()); 
    77       int result(0); 
    78  
    79       if ( mGate ) 
    80       { 
    81          result = CloseHandle(mGate); 
    82          assert(result); 
    83       } 
    84  
    85       if ( mQueue ) 
    86       { 
    87          result = CloseHandle(mQueue); 
    88          assert(result); 
    89       } 
    90  
    91       if ( mMutex ) 
    92       { 
    93          result = CloseHandle(mMutex); 
    94          assert(result); 
    95       } 
    9678 
    9779      std::ostringstream msg_stream; 
     
    10082   } 
    10183 
     84   InitializeCriticalSection(&mWaitersCountLock); 
     85 
     86   // Create an unnamed event with no security that resets automatically and 
     87   // is initially non-signaled. 
     88   mWaitersDone = CreateEvent(NULL, FALSE, FALSE, NULL); 
     89 
     90   if ( NULL == mWaitersDone ) 
     91   { 
     92      const std::string err_msg(vpr::Error::getCurrentErrorMsg()); 
     93 
     94      int result = CloseHandle(mSema); 
     95      assert(result); 
     96 
     97      std::ostringstream msg_stream; 
     98      msg_stream << "Condition variable allocation failed: " << err_msg; 
     99      throw vpr::ResourceException(msg_stream.str(), VPR_LOCATION); 
     100   } 
     101 
    102102   mCondMutex = mutex; 
    103103} 
     
    106106{ 
    107107   int result(0); 
    108    result = CloseHandle(mGate); 
     108   result = CloseHandle(mSema); 
    109109   assert(result); 
    110    result = CloseHandle(mQueue); 
     110   result = CloseHandle(mWaitersDone); 
    111111   assert(result); 
    112    result = CloseHandle(mMutex); 
    113    assert(result); 
    114  
    115    mGate  = NULL; 
    116    mQueue = NULL; 
    117    mMutex = NULL; 
    118 
    119  
     112 
     113   mSema        = NULL; 
     114   mWaitersDone = NULL; 
     115
     116 
     117// From the article: 
     118// 
     119//    "This implementation [...] ensures that the external mutex is held until 
     120//    all threads waiting on [this condition variable] have a chance to wait 
     121//    again on the external mutex before returning to their callers. This 
     122//    solution relies on the fact that Windows NT mutex requests are queued in 
     123//    FIFO order, rather than in, e.g., priority order. Because the external 
     124//    mutex queue is serviced in FIFO order, all waiting threads will acquire 
     125//    the external mutex before any of them can reacquire it a second time. 
     126//    This property is essential to ensure fairness." 
    120127bool CondVarWin32::wait(const vpr::Interval& timeToWait) 
    121128{ 
    122    bool status(true); 
    123  
    124129   // If not locked ... 
    125130   if ( ! mCondMutex->test() ) 
     
    130135      throw vpr::Exception(msg_stream.str(), VPR_LOCATION); 
    131136   } 
    132    // The mutex variable must be locked when passed to pthread_cond_wait(). 
    133    else 
    134    { 
    135       int result(0); 
    136       result = WaitForSingleObject(mGate, INFINITE); 
     137 
     138   bool status(true); 
     139 
     140   EnterCriticalSection(&mWaitersCountLock); 
     141   ++mWaitersCount; 
     142   LeaveCriticalSection(&mWaitersCountLock); 
     143 
     144   // Wait indefinitely on the condition variable by default. 
     145   DWORD timeout(INFINITE); 
     146 
     147   // If the caller specified timeout value other than NoTimeout, then this 
     148   // call will not block longer than that timeout. 
     149   if ( vpr::Interval::NoTimeout != timeToWait ) 
     150   { 
     151      timeout = timeToWait.msec(); 
     152   } 
     153 
     154   int result(0); 
     155 
     156   // SignalObjectAndWait unlocks the given mutex and waits on the semaphore 
     157   // until signal() or broadcast() is called by another thread. This direct 
     158   // manipulation of mCondMutex->mLocked is safe because this thread holds 
     159   // the mutex when we make the change. 
     160   mCondMutex->mLocked = false; 
     161   result = SignalObjectAndWait(mCondMutex->mMutex, mSema, timeout, FALSE); 
     162 
     163   // If we did not wait infinitely on SignalObjectAndWait(), then we need to 
     164   // determine if the call timed out or if we got the lock on mSema. 
     165   if ( INFINITE != timeout ) 
     166   { 
     167      assert(result != WAIT_FAILED && result != WAIT_ABANDONED); 
     168      status = (result == WAIT_OBJECT_0); 
     169   } 
     170 
     171   // At this point, if status is true, then we had better have the lock on 
     172   // mSema. 
     173   if ( status ) 
     174   { 
    137175      assert(result == WAIT_OBJECT_0); 
    138       ++mBlocked; 
    139       result = ReleaseSemaphore(mGate, 1, 0); 
    140       assert(result); 
    141  
    142       mCondMutex->release(); 
    143  
    144       // Wait indefinitely on the condition variable. 
    145       if ( vpr::Interval::NoTimeout == timeToWait ) 
     176 
     177      EnterCriticalSection(&mWaitersCountLock); 
     178 
     179      // We are not waiting now. 
     180      --mWaitersCount; 
     181 
     182      // Check to see if w3e are the last waiter after broadcast(). 
     183      const bool last_waiter = mWasBroadcast && mWaitersCount == 0; 
     184 
     185      LeaveCriticalSection(&mWaitersCountLock); 
     186 
     187      // If we are the last waiter thread during this broadcast, then let all 
     188      // the other threads proceed. 
     189      if ( last_waiter ) 
    146190      { 
    147          result = WaitForSingleObject(mQueue, INFINITE); 
    148          assert(result == WAIT_OBJECT_0); 
    149  
    150          unsigned int was_waiting(0); 
    151          unsigned int was_gone(0); 
    152  
    153          result = WaitForSingleObject(mMutex, INFINITE); 
    154          assert(result == WAIT_OBJECT_0); 
    155          was_waiting = mWaiting; 
    156          was_gone    = mGone; 
    157  
    158          if ( was_waiting != 0 ) 
    159          { 
    160             if ( --mWaiting == 0 ) 
    161             { 
    162                if ( mBlocked != 0 ) 
    163                { 
    164                   // Open mGate 
    165                   result = ReleaseSemaphore(mGate, 1, 0); 
    166                   assert(result); 
    167                   was_waiting = 0; 
    168                } 
    169                else if ( mGone != 0 ) 
    170                { 
    171                   mGone = 0; 
    172                } 
    173             } 
    174          } 
    175          else if ( ++mGone == (std::numeric_limits<unsigned int>::max)() / 2 ) 
    176          { 
    177             result = WaitForSingleObject(mGate, INFINITE); 
    178             assert(result == WAIT_OBJECT_0); 
    179             mBlocked -= mGone; 
    180             result = ReleaseSemaphore(mGate, 1, 0); 
    181             assert(result); 
    182             mGone = 0; 
    183          } 
    184  
    185          result = ReleaseMutex(mMutex); 
    186          assert(result); 
    187  
    188          if ( was_waiting == 1 ) 
    189          { 
    190             for ( ; was_gone; --was_gone ) 
    191             { 
    192                result = WaitForSingleObject(mQueue, INFINITE); 
    193                assert(result == WAIT_OBJECT_0); 
    194             } 
    195  
    196             result = ReleaseSemaphore(mGate, 1, 0); 
    197             assert(result); 
    198          } 
    199       } 
    200       // Wait for no longer than the given timeout period to acquire the lock 
    201       // on the condition variable. 
    202       else 
    203       { 
    204          for ( ; ; ) 
    205          { 
    206             const DWORD ms(timeToWait.msec()); 
    207  
    208             result = WaitForSingleObject(mQueue, ms); 
    209             assert(result != WAIT_FAILED && result != WAIT_ABANDONED); 
    210             status = (result == WAIT_OBJECT_0); 
    211  
    212             if ( result == WAIT_TIMEOUT ) 
    213             { 
    214                vpr::Interval diff(vpr::Interval::now()); 
    215                diff = diff - timeToWait; 
    216  
    217                if ( diff.msec() > 0 ) 
    218                { 
    219                   continue; 
    220                } 
    221             } 
    222  
    223             break; 
    224          } 
    225  
    226          unsigned int was_waiting(0); 
    227          unsigned int was_gone(0); 
    228  
    229          result = WaitForSingleObject(mMutex, INFINITE); 
    230          assert(result == WAIT_OBJECT_0); 
    231          was_waiting = mWaiting; 
    232          was_gone    = mGone; 
    233  
    234          if ( was_waiting != 0 ) 
    235          { 
    236             // Timeout. 
    237             if ( ! status ) 
    238             { 
    239                if ( mBlocked != 0 ) 
    240                { 
    241                   --mBlocked; 
    242                } 
    243                else 
    244                { 
    245                   ++mGone; 
    246                } 
    247             } 
    248  
    249             if ( --mWaiting == 0 ) 
    250             { 
    251                if ( mBlocked != 0 ) 
    252                { 
    253                   // Open mGate 
    254                   result = ReleaseSemaphore(mGate, 1, 0); 
    255                   assert(result); 
    256                   was_waiting = 0; 
    257                } 
    258                else if ( mGone != 0 ) 
    259                { 
    260                   mGone = 0; 
    261                } 
    262             } 
    263          } 
    264          else if ( ++mGone == (std::numeric_limits<unsigned int>::max)() / 2 ) 
    265          { 
    266             result = WaitForSingleObject(mGate, INFINITE); 
    267             assert(result == WAIT_OBJECT_0); 
    268             mBlocked -= mGone; 
    269             result = ReleaseSemaphore(mGate, 1, 0); 
    270             assert(result); 
    271             mGone = 0; 
    272          } 
    273  
    274          result = ReleaseMutex(mMutex); 
    275          assert(result); 
    276  
    277          if ( was_waiting == 1 ) 
    278          { 
    279             for ( ; was_gone; --was_gone ) 
    280             { 
    281                result = WaitForSingleObject(mQueue, INFINITE); 
    282                assert(result == WAIT_OBJECT_0); 
    283             } 
    284  
    285             result = ReleaseSemaphore(mGate, 1, 0); 
    286             assert(result); 
    287          } 
    288       } 
    289  
    290       mCondMutex->acquire(); 
    291    } 
    292  
    293    return status; 
    294 
    295  
    296 void CondVarWin32::signal() 
    297 
    298    if ( ! mCondMutex->test() ) 
    299    { 
    300       throw vpr::LockException("Condition variable mutex must be locked", 
    301                                VPR_LOCATION); 
    302    } 
    303  
    304    unsigned int signals(0); 
    305  
    306    int result(0); 
    307    result = WaitForSingleObject(mMutex, INFINITE); 
    308    assert(result == WAIT_OBJECT_0); 
    309  
    310    if ( mWaiting != 0 ) 
    311    { 
    312       if ( mBlocked == 0 ) 
    313       { 
    314          result = ReleaseMutex(mMutex); 
    315          assert(result); 
    316          return; 
    317       } 
    318  
    319       ++mWaiting; 
    320       --mBlocked; 
    321       signals = 1; 
    322    } 
    323    else 
    324    { 
    325       result = WaitForSingleObject(mGate, INFINITE); 
    326       assert(result == WAIT_OBJECT_0); 
    327  
    328       if ( mBlocked > mGone ) 
    329       { 
    330          if ( mGone != 0 ) 
    331          { 
    332             mBlocked -= mGone; 
    333             mGone     = 0; 
    334          } 
    335  
    336          signals = mWaiting = 1; 
    337          --mBlocked; 
     191         // This atomically signals the mWaitersDone event and waits until it 
     192         // can acquire the given mutex. This is required to ensure fairness. 
     193         SignalObjectAndWait(mWaitersDone, mCondMutex->mMutex, INFINITE, 
     194                             FALSE); 
     195         mCondMutex->mLocked = true; 
    338196      } 
    339197      else 
    340198      { 
    341          result = ReleaseSemaphore(mGate, 1, 0); 
    342          assert(result); 
     199         // Always regain the external mutex sing that is the guarantee that 
     200         // we give to our callers. 
     201         mCondMutex->acquire(); 
    343202      } 
    344203   } 
    345  
    346    result = ReleaseMutex(mMutex); 
    347    assert(result); 
    348  
    349    if ( signals ) 
    350    { 
    351       result = ReleaseSemaphore(mQueue, signals, 0); 
    352       assert(result); 
    353    } 
    354 
    355  
     204   else 
     205   { 
     206      // Always regain the external mutex sing that is the guarantee that we 
     207      // give to our callers. 
     208      mCondMutex->acquire(); 
     209   } 
     210 
     211   assert(mCondMutex->test() && "External mutex is supposed to be locked"); 
     212 
     213   return status; 
     214
     215 
     216void CondVarWin32::signal() 
     217
     218   if ( ! mCondMutex->test() ) 
     219   { 
     220      throw vpr::LockException("Condition variable mutex must be locked", 
     221                               VPR_LOCATION); 
     222   } 
     223 
     224   EnterCriticalSection(&mWaitersCountLock); 
     225   const bool have_waiters = mWaitersCount > 0; 
     226   LeaveCriticalSection(&mWaitersCountLock); 
     227 
     228   if ( have_waiters ) 
     229   { 
     230      ReleaseSemaphore(mSema, 1, 0); 
     231   } 
     232
     233 
     234// From the article: 
     235// 
     236//    "[This] function is more complex and requires two steps: 
     237//     
     238//       1. It wakes up all the threads waiting on [mSema], which can be done 
     239//          atomically by passing [mWaitersCount] to ReleaseSemaphore(). 
     240//       2. It then blocks on the auto-reset [mWaitersDone] event until the 
     241//          last thread in the group of waiting threads exits the [wait()] 
     242//          critical section." 
    356243void CondVarWin32::broadcast() 
    357244{ 
     
    363250   } 
    364251 
    365    unsigned int signals(0); 
    366  
    367    int result(0); 
    368    result = WaitForSingleObject(mMutex, INFINITE); 
    369    assert(result == WAIT_OBJECT_0); 
    370  
    371    if ( mWaiting != 0 ) 
    372    { 
    373       if ( mBlocked == 0 ) 
    374       { 
    375          result = ReleaseMutex(mMutex); 
    376          assert(result); 
    377          return; 
    378       } 
    379  
    380       signals   = mBlocked; 
    381       mWaiting += signals; 
    382       mBlocked  = 0; 
     252   // Protect access to mWaitersCount and mWasBroadcast. 
     253   EnterCriticalSection(&mWaitersCountLock); 
     254   bool have_waiters(false); 
     255 
     256   if ( mWaitersCount > 0 ) 
     257   { 
     258      // We are broadcasting even if there was just one waiter. 
     259      mWasBroadcast = true; 
     260      have_waiters = true; 
     261   } 
     262 
     263   if ( have_waiters ) 
     264   { 
     265      // Wake up all the waiters atomically. 
     266      ReleaseSemaphore(mSema, mWaitersCount, 0); 
     267      LeaveCriticalSection(&mWaitersCountLock); 
     268 
     269      // Wait for all the awakened threads to acquire the semaphore. 
     270      WaitForSingleObject(mWaitersDone, INFINITE); 
     271 
     272      // This is safe because no other waiter threads can wake up to access 
     273      // it. 
     274      mWasBroadcast = false; 
    383275   } 
    384276   else 
    385277   { 
    386       result = WaitForSingleObject(mGate, INFINITE); 
    387       assert(result == WAIT_OBJECT_0); 
    388  
    389       if ( mBlocked > mGone ) 
    390       { 
    391          if ( mGone != 0 ) 
    392          { 
    393             mBlocked -= mGone; 
    394             mGone     = 0; 
    395          } 
    396  
    397          signals = mWaiting = mBlocked; 
    398          mBlocked = 0; 
    399       } 
    400       else 
    401       { 
    402          result = ReleaseSemaphore(mGate, 1, 0); 
    403          assert(result); 
    404       } 
    405    } 
    406  
    407    result = ReleaseMutex(mMutex); 
    408    assert(result); 
    409  
    410    if ( signals ) 
    411    { 
    412       result = ReleaseSemaphore(mQueue, signals, 0); 
    413       assert(result); 
     278      LeaveCriticalSection(&mWaitersCountLock); 
    414279   } 
    415280} 
  • juggler/trunk/modules/vapor/vpr/md/WIN32/Sync/CondVarWin32.h

    r19729 r19808  
    6363 * @since 1.1.36 
    6464 * 
    65  * @note This implementation is based on the condition_impl from Boost.Thread 
    66  *       for Windows. 
     65 * @note This implementation is based on the "SignalObjectAndWait Solution" 
     66 *       from the article "Strategies for Implementing POSIX Condition 
     67 *       Variables on Win32" by Douglas C. Scmidt and Irfan Pyarali. The 
     68 *       article text can be found at 
     69 *       http://www.cs.wustl.edu/~schmidt/win32-cv-1.html 
    6770 */ 
    6871class VPR_CLASS_API CondVarWin32 : boost::noncopyable 
     
    114117    *         acquiring the lock. 
    115118    * 
    116     * @throw vpr::IllegalArgumentException is thrown if \p timeToWait is not 
    117     *        vpr::Interval::NoTimeout and an invalid value is passed to 
    118     *        pthread_cond_timedwait(). 
    119119    * @throw vpr::Exception is thrown if something goes wrong while trying to 
    120120    *        wait on the condition variable. 
     
    233233   { 
    234234      std::cerr << "------------- vpr::CondVarWin32::Dump ---------\n" 
    235                 << "   mGone = " << mGone << "\n" 
    236                 << "mBlocked = " << mBlocked << "\n" 
    237                 << "mWaiting = " << mWaiting << std::endl; 
     235                << "mWaitersCount = " << mWaitersCount << std::endl; 
    238236   } 
    239237 
     
    241239   /** @name Condition Variable State */ 
    242240   //@{ 
    243    HANDLE mGate; 
    244    HANDLE mQueue; 
    245    HANDLE mMutex;       /**< Internal data state protection */ 
    246  
    247    /** Number of threads that timed out and never made it to mQueue. */ 
    248    unsigned int mGone; 
    249  
    250    /** Number of threads blocked on the condition. */ 
    251    unsigned long mBlocked; 
    252  
    253    /** 
    254     * Number of threads no longer waiting for the condition but still waiting 
    255     * to be removed from mQueue. 
    256     */ 
    257    unsigned int  mWaiting; 
     241   int mWaitersCount;                   /**< Number of waiting threads. */ 
     242   CRITICAL_SECTION mWaitersCountLock;  /**< Lock for \c mWaitersCount. */ 
     243 
     244   /** 
     245    * Semaphore used to queue up threads waiting for the condition to 
     246    * become signaled. 
     247    */ 
     248   HANDLE mSema; 
     249 
     250   /** 
     251    * An auto-reset event used by the broadcast/signal thread to wait for 
     252    * all the waiting thread(s) to wake up and be released from the 
     253    * semaphore. 
     254    */ 
     255   HANDLE mWaitersDone; 
     256 
     257   /** 
     258    * Keeps track of whether we were broadcasting or signaling. This allows 
     259    * us to optimze the code when just signaling. 
     260    */ 
     261   bool mWasBroadcast; 
    258262   //@} 
    259263 
  • juggler/trunk/modules/vapor/vpr/md/WIN32/Sync/MutexWin32.h

    r19729 r19808  
    242242    */ 
    243243   bool mLocked; 
     244 
     245   friend class CondVarWin32; 
    244246}; 
    245247