Changeset 19808
- Timestamp:
- 02/13/07 15:39:13 (2 years ago)
- Files:
-
- juggler/trunk/modules/vapor/ChangeLog (modified) (1 diff)
- juggler/trunk/modules/vapor/VERSION (modified) (1 diff)
- juggler/trunk/modules/vapor/vpr/md/WIN32/Sync/CondVarWin32.cpp (modified) (6 diffs)
- juggler/trunk/modules/vapor/vpr/md/WIN32/Sync/CondVarWin32.h (modified) (4 diffs)
- juggler/trunk/modules/vapor/vpr/md/WIN32/Sync/MutexWin32.h (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
juggler/trunk/modules/vapor/ChangeLog
r19785 r19808 1 1 DATE AUTHOR CHANGE 2 2 ---------- -------- ----------------------------------------------------------- 3 2007-02-13 patrick Reimplemented vpr::CondVarWin32 using a simpler approach. 4 NEW VERSION: 1.1.40 3 5 2007-01-30 patrick Eliminated the need to set VPR_BASE_DIR on Windows to run 4 6 applications. juggler/trunk/modules/vapor/VERSION
r19785 r19808 1 1.1.40-0 @02/13/2007 21:40:00 UTC@ 1 2 1.1.39-0 @01/30/2007 22:30:00 UTC@ 2 3 1.1.38-0 @12/13/2006 16:50:00 UTC@ juggler/trunk/modules/vapor/vpr/md/WIN32/Sync/CondVarWin32.cpp
r19729 r19808 33 33 * 34 34 *************** <auto-copyright.pl END do not edit this line> ***************/ 35 36 // This must be defined to 0x0400 or higher in order to have the function 37 // SignalObjectAndWait() declared. 38 #define _WIN32_WINNT 0x0400 35 39 36 40 #include <vpr/vprConfig.h> … … 52 56 53 57 CondVarWin32::CondVarWin32(MutexWin32* mutex) 54 : mGate(NULL) 55 , mQueue(NULL) 56 , mMutex(NULL) 57 , mGone(0) 58 , mBlocked(0) 59 , mWaiting(0) 58 : mWaitersCount(0) 59 , mSema(NULL) 60 , mWaitersDone(NULL) 61 , mWasBroadcast(false) 60 62 , mCondMutex(NULL) 61 63 { 62 // If the caller did not specify a mutex variable to use with 63 // thecondition variable, use mDefaultMutex.64 if ( mutex == NULL)64 // If the caller did not specify a mutex variable to use with the 65 // condition variable, use mDefaultMutex. 66 if ( NULL == mutex ) 65 67 { 66 68 mutex = &mDefaultMutex; 67 69 } 68 70 69 mGate = CreateSemaphore(NULL, 1, 1, NULL); 70 mQueue = CreateSemaphore(NULL, 0, (std::numeric_limits<long>::max)(), 71 NULL); 72 mMutex = CreateMutex(0, 0, 0); 73 74 if ( ! mGate || ! mQueue || ! mMutex ) 71 // Create an unnamed semaphore with no security initially set to 0 with a 72 // maximum count of 0x7fffffff. 73 mSema = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); 74 75 if ( NULL == mSema ) 75 76 { 76 77 const std::string err_msg(vpr::Error::getCurrentErrorMsg()); 77 int result(0);78 79 if ( mGate )80 {81 result = CloseHandle(mGate);82 assert(result);83 }84 85 if ( mQueue )86 {87 result = CloseHandle(mQueue);88 assert(result);89 }90 91 if ( mMutex )92 {93 result = CloseHandle(mMutex);94 assert(result);95 }96 78 97 79 std::ostringstream msg_stream; … … 100 82 } 101 83 84 InitializeCriticalSection(&mWaitersCountLock); 85 86 // Create an unnamed event with no security that resets automatically and 87 // is initially non-signaled. 88 mWaitersDone = CreateEvent(NULL, FALSE, FALSE, NULL); 89 90 if ( NULL == mWaitersDone ) 91 { 92 const std::string err_msg(vpr::Error::getCurrentErrorMsg()); 93 94 int result = CloseHandle(mSema); 95 assert(result); 96 97 std::ostringstream msg_stream; 98 msg_stream << "Condition variable allocation failed: " << err_msg; 99 throw vpr::ResourceException(msg_stream.str(), VPR_LOCATION); 100 } 101 102 102 mCondMutex = mutex; 103 103 } … … 106 106 { 107 107 int result(0); 108 result = CloseHandle(m Gate);108 result = CloseHandle(mSema); 109 109 assert(result); 110 result = CloseHandle(m Queue);110 result = CloseHandle(mWaitersDone); 111 111 assert(result); 112 result = CloseHandle(mMutex); 113 assert(result); 114 115 mGate = NULL; 116 mQueue = NULL; 117 mMutex = NULL; 118 } 119 112 113 mSema = NULL; 114 mWaitersDone = NULL; 115 } 116 117 // From the article: 118 // 119 // "This implementation [...] ensures that the external mutex is held until 120 // all threads waiting on [this condition variable] have a chance to wait 121 // again on the external mutex before returning to their callers. This 122 // solution relies on the fact that Windows NT mutex requests are queued in 123 // FIFO order, rather than in, e.g., priority order. Because the external 124 // mutex queue is serviced in FIFO order, all waiting threads will acquire 125 // the external mutex before any of them can reacquire it a second time. 126 // This property is essential to ensure fairness." 120 127 bool CondVarWin32::wait(const vpr::Interval& timeToWait) 121 128 { 122 bool status(true);123 124 129 // If not locked ... 125 130 if ( ! mCondMutex->test() ) … … 130 135 throw vpr::Exception(msg_stream.str(), VPR_LOCATION); 131 136 } 132 // The mutex variable must be locked when passed to pthread_cond_wait(). 133 else 134 { 135 int result(0); 136 result = WaitForSingleObject(mGate, INFINITE); 137 138 bool status(true); 139 140 EnterCriticalSection(&mWaitersCountLock); 141 ++mWaitersCount; 142 LeaveCriticalSection(&mWaitersCountLock); 143 144 // Wait indefinitely on the condition variable by default. 145 DWORD timeout(INFINITE); 146 147 // If the caller specified timeout value other than NoTimeout, then this 148 // call will not block longer than that timeout. 149 if ( vpr::Interval::NoTimeout != timeToWait ) 150 { 151 timeout = timeToWait.msec(); 152 } 153 154 int result(0); 155 156 // SignalObjectAndWait unlocks the given mutex and waits on the semaphore 157 // until signal() or broadcast() is called by another thread. This direct 158 // manipulation of mCondMutex->mLocked is safe because this thread holds 159 // the mutex when we make the change. 160 mCondMutex->mLocked = false; 161 result = SignalObjectAndWait(mCondMutex->mMutex, mSema, timeout, FALSE); 162 163 // If we did not wait infinitely on SignalObjectAndWait(), then we need to 164 // determine if the call timed out or if we got the lock on mSema. 165 if ( INFINITE != timeout ) 166 { 167 assert(result != WAIT_FAILED && result != WAIT_ABANDONED); 168 status = (result == WAIT_OBJECT_0); 169 } 170 171 // At this point, if status is true, then we had better have the lock on 172 // mSema. 173 if ( status ) 174 { 137 175 assert(result == WAIT_OBJECT_0); 138 ++mBlocked; 139 result = ReleaseSemaphore(mGate, 1, 0); 140 assert(result); 141 142 mCondMutex->release(); 143 144 // Wait indefinitely on the condition variable. 145 if ( vpr::Interval::NoTimeout == timeToWait ) 176 177 EnterCriticalSection(&mWaitersCountLock); 178 179 // We are not waiting now. 180 --mWaitersCount; 181 182 // Check to see if w3e are the last waiter after broadcast(). 183 const bool last_waiter = mWasBroadcast && mWaitersCount == 0; 184 185 LeaveCriticalSection(&mWaitersCountLock); 186 187 // If we are the last waiter thread during this broadcast, then let all 188 // the other threads proceed. 189 if ( last_waiter ) 146 190 { 147 result = WaitForSingleObject(mQueue, INFINITE); 148 assert(result == WAIT_OBJECT_0); 149 150 unsigned int was_waiting(0); 151 unsigned int was_gone(0); 152 153 result = WaitForSingleObject(mMutex, INFINITE); 154 assert(result == WAIT_OBJECT_0); 155 was_waiting = mWaiting; 156 was_gone = mGone; 157 158 if ( was_waiting != 0 ) 159 { 160 if ( --mWaiting == 0 ) 161 { 162 if ( mBlocked != 0 ) 163 { 164 // Open mGate 165 result = ReleaseSemaphore(mGate, 1, 0); 166 assert(result); 167 was_waiting = 0; 168 } 169 else if ( mGone != 0 ) 170 { 171 mGone = 0; 172 } 173 } 174 } 175 else if ( ++mGone == (std::numeric_limits<unsigned int>::max)() / 2 ) 176 { 177 result = WaitForSingleObject(mGate, INFINITE); 178 assert(result == WAIT_OBJECT_0); 179 mBlocked -= mGone; 180 result = ReleaseSemaphore(mGate, 1, 0); 181 assert(result); 182 mGone = 0; 183 } 184 185 result = ReleaseMutex(mMutex); 186 assert(result); 187 188 if ( was_waiting == 1 ) 189 { 190 for ( ; was_gone; --was_gone ) 191 { 192 result = WaitForSingleObject(mQueue, INFINITE); 193 assert(result == WAIT_OBJECT_0); 194 } 195 196 result = ReleaseSemaphore(mGate, 1, 0); 197 assert(result); 198 } 199 } 200 // Wait for no longer than the given timeout period to acquire the lock 201 // on the condition variable. 202 else 203 { 204 for ( ; ; ) 205 { 206 const DWORD ms(timeToWait.msec()); 207 208 result = WaitForSingleObject(mQueue, ms); 209 assert(result != WAIT_FAILED && result != WAIT_ABANDONED); 210 status = (result == WAIT_OBJECT_0); 211 212 if ( result == WAIT_TIMEOUT ) 213 { 214 vpr::Interval diff(vpr::Interval::now()); 215 diff = diff - timeToWait; 216 217 if ( diff.msec() > 0 ) 218 { 219 continue; 220 } 221 } 222 223 break; 224 } 225 226 unsigned int was_waiting(0); 227 unsigned int was_gone(0); 228 229 result = WaitForSingleObject(mMutex, INFINITE); 230 assert(result == WAIT_OBJECT_0); 231 was_waiting = mWaiting; 232 was_gone = mGone; 233 234 if ( was_waiting != 0 ) 235 { 236 // Timeout. 237 if ( ! status ) 238 { 239 if ( mBlocked != 0 ) 240 { 241 --mBlocked; 242 } 243 else 244 { 245 ++mGone; 246 } 247 } 248 249 if ( --mWaiting == 0 ) 250 { 251 if ( mBlocked != 0 ) 252 { 253 // Open mGate 254 result = ReleaseSemaphore(mGate, 1, 0); 255 assert(result); 256 was_waiting = 0; 257 } 258 else if ( mGone != 0 ) 259 { 260 mGone = 0; 261 } 262 } 263 } 264 else if ( ++mGone == (std::numeric_limits<unsigned int>::max)() / 2 ) 265 { 266 result = WaitForSingleObject(mGate, INFINITE); 267 assert(result == WAIT_OBJECT_0); 268 mBlocked -= mGone; 269 result = ReleaseSemaphore(mGate, 1, 0); 270 assert(result); 271 mGone = 0; 272 } 273 274 result = ReleaseMutex(mMutex); 275 assert(result); 276 277 if ( was_waiting == 1 ) 278 { 279 for ( ; was_gone; --was_gone ) 280 { 281 result = WaitForSingleObject(mQueue, INFINITE); 282 assert(result == WAIT_OBJECT_0); 283 } 284 285 result = ReleaseSemaphore(mGate, 1, 0); 286 assert(result); 287 } 288 } 289 290 mCondMutex->acquire(); 291 } 292 293 return status; 294 } 295 296 void CondVarWin32::signal() 297 { 298 if ( ! mCondMutex->test() ) 299 { 300 throw vpr::LockException("Condition variable mutex must be locked", 301 VPR_LOCATION); 302 } 303 304 unsigned int signals(0); 305 306 int result(0); 307 result = WaitForSingleObject(mMutex, INFINITE); 308 assert(result == WAIT_OBJECT_0); 309 310 if ( mWaiting != 0 ) 311 { 312 if ( mBlocked == 0 ) 313 { 314 result = ReleaseMutex(mMutex); 315 assert(result); 316 return; 317 } 318 319 ++mWaiting; 320 --mBlocked; 321 signals = 1; 322 } 323 else 324 { 325 result = WaitForSingleObject(mGate, INFINITE); 326 assert(result == WAIT_OBJECT_0); 327 328 if ( mBlocked > mGone ) 329 { 330 if ( mGone != 0 ) 331 { 332 mBlocked -= mGone; 333 mGone = 0; 334 } 335 336 signals = mWaiting = 1; 337 --mBlocked; 191 // This atomically signals the mWaitersDone event and waits until it 192 // can acquire the given mutex. This is required to ensure fairness. 193 SignalObjectAndWait(mWaitersDone, mCondMutex->mMutex, INFINITE, 194 FALSE); 195 mCondMutex->mLocked = true; 338 196 } 339 197 else 340 198 { 341 result = ReleaseSemaphore(mGate, 1, 0); 342 assert(result); 199 // Always regain the external mutex sing that is the guarantee that 200 // we give to our callers. 201 mCondMutex->acquire(); 343 202 } 344 203 } 345 346 result = ReleaseMutex(mMutex); 347 assert(result); 348 349 if ( signals ) 350 { 351 result = ReleaseSemaphore(mQueue, signals, 0); 352 assert(result); 353 } 354 } 355 204 else 205 { 206 // Always regain the external mutex sing that is the guarantee that we 207 // give to our callers. 208 mCondMutex->acquire(); 209 } 210 211 assert(mCondMutex->test() && "External mutex is supposed to be locked"); 212 213 return status; 214 } 215 216 void CondVarWin32::signal() 217 { 218 if ( ! mCondMutex->test() ) 219 { 220 throw vpr::LockException("Condition variable mutex must be locked", 221 VPR_LOCATION); 222 } 223 224 EnterCriticalSection(&mWaitersCountLock); 225 const bool have_waiters = mWaitersCount > 0; 226 LeaveCriticalSection(&mWaitersCountLock); 227 228 if ( have_waiters ) 229 { 230 ReleaseSemaphore(mSema, 1, 0); 231 } 232 } 233 234 // From the article: 235 // 236 // "[This] function is more complex and requires two steps: 237 // 238 // 1. It wakes up all the threads waiting on [mSema], which can be done 239 // atomically by passing [mWaitersCount] to ReleaseSemaphore(). 240 // 2. It then blocks on the auto-reset [mWaitersDone] event until the 241 // last thread in the group of waiting threads exits the [wait()] 242 // critical section." 356 243 void CondVarWin32::broadcast() 357 244 { … … 363 250 } 364 251 365 unsigned int signals(0); 366 367 int result(0); 368 result = WaitForSingleObject(mMutex, INFINITE); 369 assert(result == WAIT_OBJECT_0); 370 371 if ( mWaiting != 0 ) 372 { 373 if ( mBlocked == 0 ) 374 { 375 result = ReleaseMutex(mMutex); 376 assert(result); 377 return; 378 } 379 380 signals = mBlocked; 381 mWaiting += signals; 382 mBlocked = 0; 252 // Protect access to mWaitersCount and mWasBroadcast. 253 EnterCriticalSection(&mWaitersCountLock); 254 bool have_waiters(false); 255 256 if ( mWaitersCount > 0 ) 257 { 258 // We are broadcasting even if there was just one waiter. 259 mWasBroadcast = true; 260 have_waiters = true; 261 } 262 263 if ( have_waiters ) 264 { 265 // Wake up all the waiters atomically. 266 ReleaseSemaphore(mSema, mWaitersCount, 0); 267 LeaveCriticalSection(&mWaitersCountLock); 268 269 // Wait for all the awakened threads to acquire the semaphore. 270 WaitForSingleObject(mWaitersDone, INFINITE); 271 272 // This is safe because no other waiter threads can wake up to access 273 // it. 274 mWasBroadcast = false; 383 275 } 384 276 else 385 277 { 386 result = WaitForSingleObject(mGate, INFINITE); 387 assert(result == WAIT_OBJECT_0); 388 389 if ( mBlocked > mGone ) 390 { 391 if ( mGone != 0 ) 392 { 393 mBlocked -= mGone; 394 mGone = 0; 395 } 396 397 signals = mWaiting = mBlocked; 398 mBlocked = 0; 399 } 400 else 401 { 402 result = ReleaseSemaphore(mGate, 1, 0); 403 assert(result); 404 } 405 } 406 407 result = ReleaseMutex(mMutex); 408 assert(result); 409 410 if ( signals ) 411 { 412 result = ReleaseSemaphore(mQueue, signals, 0); 413 assert(result); 278 LeaveCriticalSection(&mWaitersCountLock); 414 279 } 415 280 } juggler/trunk/modules/vapor/vpr/md/WIN32/Sync/CondVarWin32.h
r19729 r19808 63 63 * @since 1.1.36 64 64 * 65 * @note This implementation is based on the condition_impl from Boost.Thread 66 * for Windows. 65 * @note This implementation is based on the "SignalObjectAndWait Solution" 66 * from the article "Strategies for Implementing POSIX Condition 67 * Variables on Win32" by Douglas C. Scmidt and Irfan Pyarali. The 68 * article text can be found at 69 * http://www.cs.wustl.edu/~schmidt/win32-cv-1.html 67 70 */ 68 71 class VPR_CLASS_API CondVarWin32 : boost::noncopyable … … 114 117 * acquiring the lock. 115 118 * 116 * @throw vpr::IllegalArgumentException is thrown if \p timeToWait is not117 * vpr::Interval::NoTimeout and an invalid value is passed to118 * pthread_cond_timedwait().119 119 * @throw vpr::Exception is thrown if something goes wrong while trying to 120 120 * wait on the condition variable. … … 233 233 { 234 234 std::cerr << "------------- vpr::CondVarWin32::Dump ---------\n" 235 << " mGone = " << mGone << "\n" 236 << "mBlocked = " << mBlocked << "\n" 237 << "mWaiting = " << mWaiting << std::endl; 235 << "mWaitersCount = " << mWaitersCount << std::endl; 238 236 } 239 237 … … 241 239 /** @name Condition Variable State */ 242 240 //@{ 243 HANDLE mGate; 244 HANDLE mQueue; 245 HANDLE mMutex; /**< Internal data state protection */ 246 247 /** Number of threads that timed out and never made it to mQueue. */ 248 unsigned int mGone; 249 250 /** Number of threads blocked on the condition. */ 251 unsigned long mBlocked; 252 253 /** 254 * Number of threads no longer waiting for the condition but still waiting 255 * to be removed from mQueue. 256 */ 257 unsigned int mWaiting; 241 int mWaitersCount; /**< Number of waiting threads. */ 242 CRITICAL_SECTION mWaitersCountLock; /**< Lock for \c mWaitersCount. */ 243 244 /** 245 * Semaphore used to queue up threads waiting for the condition to 246 * become signaled. 247 */ 248 HANDLE mSema; 249 250 /** 251 * An auto-reset event used by the broadcast/signal thread to wait for 252 * all the waiting thread(s) to wake up and be released from the 253 * semaphore. 254 */ 255 HANDLE mWaitersDone; 256 257 /** 258 * Keeps track of whether we were broadcasting or signaling. This allows 259 * us to optimze the code when just signaling. 260 */ 261 bool mWasBroadcast; 258 262 //@} 259 263 juggler/trunk/modules/vapor/vpr/md/WIN32/Sync/MutexWin32.h
r19729 r19808 242 242 */ 243 243 bool mLocked; 244 245 friend class CondVarWin32; 244 246 }; 245 247
