Changeset 20010
- Timestamp:
- 04/27/07 14:22:07 (1 year ago)
- Files:
-
- juggler/branches/2.2/modules/gadgeteer/cluster/ClusterManager.cpp (modified) (3 diffs)
- juggler/branches/2.2/modules/gadgeteer/cluster/ClusterNode.cpp (deleted)
- juggler/branches/2.2/modules/gadgeteer/cluster/ClusterNode.h (deleted)
- juggler/branches/2.2/modules/gadgeteer/cluster/Makefile.in (modified) (1 diff)
- juggler/branches/2.2/modules/gadgeteer/gadget/AbstractNetworkManager.cpp (modified) (5 diffs)
- juggler/branches/2.2/modules/gadgeteer/gadget/AbstractNetworkManager.h (modified) (3 diffs)
- juggler/branches/2.2/modules/gadgeteer/gadget/Makefile.in (modified) (1 diff)
- juggler/branches/2.2/modules/gadgeteer/gadget/Node.cpp (modified) (5 diffs)
- juggler/branches/2.2/modules/gadgeteer/gadget/Node.h (modified) (6 diffs)
- juggler/branches/2.2/modules/gadgeteer/gadget/Reactor.cpp (added)
- juggler/branches/2.2/modules/gadgeteer/gadget/Reactor.h (added)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
juggler/branches/2.2/modules/gadgeteer/cluster/ClusterManager.cpp
r19729 r20010 414 414 } 415 415 416 std::vector<gadget::Node*>::iterator begin_cluster_node =417 mClusterNetwork->getNodesBegin(); 418 std::vector<gadget::Node*>::iterator end_cluster_node =419 mClusterNetwork->getNodesEnd();420 421 cluster::EndBlock* temp_end_block = new EndBlock( temp );422 423 for ( std::vector<gadget::Node*>::iterator i=begin_cluster_node;424 i !=end_cluster_node;416 cluster::EndBlock temp_end_block(temp); 417 418 // Used to accumulate the number of connected nodes. 419 size_t num_nodes(0); 420 421 typedef std::vector<gadget::Node*>::iterator iter_t; 422 423 for ( iter_t i = mClusterNetwork->getNodesBegin(); 424 i != mClusterNetwork->getNodesEnd(); 425 425 ++i ) 426 426 { … … 429 429 try 430 430 { 431 // Send End Blocks to all connected Nodes 432 (*i)->send( temp_end_block ); 433 434 // Signal Update thread to read Network Packets 435 (*i)->signalUpdate(); 431 // Send End Block to the node. 432 (*i)->send(&temp_end_block); 433 434 // Indicate that this node is not up to date. It will be updated 435 // below. 436 (*i)->setUpdated(false); 437 438 ++num_nodes; 436 439 } 437 catch (cluster::ClusterException& cluster_exception)438 { 439 vprDEBUG( gadgetDBG_RIM, vprDBG_CONFIG_LVL)440 << clrOutBOLD( clrRED, "ERROR: ")441 << cluster_exception.what()440 catch (cluster::ClusterException& ex) 441 { 442 vprDEBUG(gadgetDBG_RIM, vprDBG_CRITICAL_LVL) 443 << clrOutBOLD("ERROR", clrRED) 444 << ": Failed to send end block to " << (*i)->getName() 442 445 << std::endl << vprDEBUG_FLUSH; 446 vprDEBUG_NEXT(gadgetDBG_RIM, vprDBG_CRITICAL_LVL) 447 << ex.what() << std::endl << vprDEBUG_FLUSH; 448 vprDEBUG_NEXT(gadgetDBG_RIM, vprDBG_CRITICAL_LVL) 449 << "Shutting down the node." << std::endl << vprDEBUG_FLUSH; 443 450 444 451 (*i)->shutdown(); … … 446 453 } 447 454 } 448 for ( std::vector<gadget::Node*>::iterator i = begin_cluster_node; 449 i != end_cluster_node; 450 ++i ) 451 { 452 if ( (*i)->isConnected() ) 453 { 454 //Block waiting for all packets to be received 455 (*i)->sync(); 456 } 457 } 458 delete temp_end_block; 455 456 gadget::Reactor& reactor = mClusterNetwork->getReactor(); 457 size_t completed_nodes(0); 458 459 while ( completed_nodes != num_nodes ) 460 { 461 std::vector<gadget::Node*> ready_nodes = 462 reactor.getReadyNodes(vpr::Interval::NoWait); 463 464 for ( iter_t i = ready_nodes.begin(); i != ready_nodes.end(); ++i ) 465 { 466 // Make sure that we do not update nodes more than once simply 467 // because there is data ready to read from them. This is an 468 // unfortunate side effect of using a reactor in this clumsy way. 469 if ( (*i)->isUpdated() ) 470 { 471 continue; 472 } 473 474 try 475 { 476 // Read network packets. 477 (*i)->update(); 478 } 479 catch (cluster::ClusterException& ex) 480 { 481 vprDEBUG(gadgetDBG_RIM, vprDBG_CRITICAL_LVL) 482 << clrOutBOLD("ERROR", clrRED) 483 << ": Failed to complete state update for " 484 << (*i)->getName() << std::endl << vprDEBUG_FLUSH; 485 vprDEBUG_NEXT(gadgetDBG_RIM, vprDBG_CRITICAL_LVL) 486 << ex.what() << std::endl << vprDEBUG_FLUSH; 487 vprDEBUG_NEXT(gadgetDBG_RIM, vprDBG_CRITICAL_LVL) 488 << "Shutting down the node." << std::endl << vprDEBUG_FLUSH; 489 490 (*i)->shutdown(); 491 } 492 493 // Record completed state. This happens regardless of whether 494 // the node update completed successfully. Since the node is 495 // shut down if update fails, we would never get completed_nodes 496 // to equal num_nodes otherwise. 497 ++completed_nodes; 498 } 499 } 459 500 } 460 501 juggler/branches/2.2/modules/gadgeteer/cluster/Makefile.in
r19729 r20010 52 52 ClusterException.cpp \ 53 53 ClusterManager.cpp \ 54 ClusterNode.cpp \55 54 ClusterPlugin.cpp 56 55 juggler/branches/2.2/modules/gadgeteer/gadget/AbstractNetworkManager.cpp
r19729 r20010 27 27 #include <gadget/gadgetConfig.h> 28 28 29 #include <algorithm> 29 30 #include <iomanip> 30 31 … … 245 246 } 246 247 248 struct HostnamePred 249 { 250 HostnamePred(const std::string& nodeHostname) 251 : mHostname(nodeHostname) 252 { 253 /* Do nothing. */ ; 254 } 255 256 bool operator()(gadget::Node* n) 257 { 258 return n->getHostname() == mHostname; 259 } 260 261 const std::string& mHostname; 262 }; 263 247 264 void AbstractNetworkManager::removeNode(const std::string& nodeHostname) 248 265 { … … 252 269 << std::endl << vprDEBUG_FLUSH; 253 270 254 for (std::vector<gadget::Node*>::iterator itr = mNodes.begin() ; itr != mNodes.end() ; itr++) 255 { 256 if ((*itr)->getHostname() == nodeHostname) 257 { 258 mNodes.erase(itr); 259 return; 260 } 271 typedef std::vector<gadget::Node*>::iterator iter_t; 272 HostnamePred pred(nodeHostname); 273 iter_t n = std::find_if(mNodes.begin(), mNodes.end(), pred); 274 275 if ( n != mNodes.end() ) 276 { 277 mReactor.removeNode(*n); 278 mNodes.erase(n); 261 279 } 262 280 } … … 476 494 if ((*i)->getStatus() == Node::NEWCONNECTION) 477 495 { 478 vprDEBUG( gadgetDBG_NET_MGR, vprDBG_STATE_LVL)496 vprDEBUG( gadgetDBG_NET_MGR, 0 ) 479 497 << clrOutBOLD( clrMAGENTA, "[AbstractNetworkManager]" ) 480 498 << " Node: " << (*i)->getName() … … 483 501 484 502 new_connection = true; 485 (*i)->start();486 503 (*i)->setStatus( Node::CONNECTED ); 504 mReactor.addNode(*i); 487 505 } 488 506 } juggler/branches/2.2/modules/gadgeteer/gadget/AbstractNetworkManager.h
r19729 r20010 37 37 #include <jccl/RTRC/ConfigElementHandler.h> 38 38 #include <jccl/Config/ConfigElementPtr.h> 39 40 #include <gadget/Connector.h> 41 #include <gadget/Reactor.h> 39 42 40 43 … … 131 134 void debugDumpNodes(int debug_level); 132 135 136 Reactor& getReactor() 137 { 138 return mReactor; 139 } 140 133 141 /** 134 142 * Get an iterator to the beginning of the Nodes std::vector. … … 216 224 217 225 std::map<vpr::GUID, PacketHandler*> mHandlerMap; 226 Reactor mReactor; 218 227 }; 219 228 juggler/branches/2.2/modules/gadgeteer/gadget/Makefile.in
r19729 r20010 60 60 ProxyDepChecker.cpp \ 61 61 ProxyFactory.cpp \ 62 Reactor.cpp \ 62 63 RemoteInputManager.cpp \ 63 64 VirtualDevice.cpp juggler/branches/2.2/modules/gadgeteer/gadget/Node.cpp
r19847 r20010 51 51 { 52 52 53 Node::Node(const std::string& name, const std::string& host_name, 54 const vpr::Uint16& port, vpr::SocketStream* socket_stream, 55 AbstractNetworkManager* net_mgr) 56 : mRunning(false), mStatus(DISCONNECTED), mUpdateTriggerSema(0), 57 mNodeDoneSema(0), mControlThread(NULL), mNetworkManager(net_mgr) 58 { 59 mThreadActive = false; 60 mUpdated = false; 61 62 mName = name; 63 mHostname = host_name; 64 mPort = port; 65 mSockStream = socket_stream; 53 Node::Node(const std::string& name, const std::string& host_name, 54 const vpr::Uint16 port, vpr::SocketStream* socket_stream, 55 AbstractNetworkManager* net_mgr) 56 : mName(name) 57 , mHostname(host_name) 58 , mPort(port) 59 , mSockStream(socket_stream) 60 , mStatus(DISCONNECTED) 61 , mUpdated(false) 62 , mNetworkManager(net_mgr) 63 { 66 64 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) 67 65 << clrOutBOLD(clrBLUE,"[Node]") … … 78 76 { 79 77 setStatus(DISCONNECTED); 80 // This may break the accept code since we might not want to delete the Socket.81 // We may be able to just use a smart pointer to point to the SocketStream.82 mRunning = false;83 84 // Make sure that the conrtol loop exits naturally.85 mUpdateTriggerSema.release();86 87 //mNodeDoneSema.acquire();88 78 89 79 if (NULL != mSockStream) … … 95 85 } 96 86 delete mSockStream; 87 mSockStream = NULL; 97 88 */ 98 89 } … … 171 162 } 172 163 173 void Node:: update()164 void Node::doUpdate() 174 165 { 175 166 // - If connected() && !updated() … … 198 189 } 199 190 200 void Node::controlLoop() 201 { 202 // - Block on an update call 203 // - Update Local Data 204 // - Send 205 // - Signal Sync 206 207 while( mRunning ) 208 { 209 // Wait for trigger 210 if( mRunning ) 211 { 212 mUpdateTriggerSema.acquire(); 213 } 214 215 mUpdated = false; 216 while ( mRunning && !mUpdated ) 217 { 218 try 219 { 220 update(); 221 } 222 catch(cluster::ClusterException cluster_exception) 223 { 224 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrOutBOLD(clrRED, "ERROR: ") 225 << cluster_exception.what() 226 << std::endl << vprDEBUG_FLUSH; 227 228 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << 229 "Node::update() We have lost our connection to: " << getName() << ":" << getPort() 230 << std::endl << vprDEBUG_FLUSH; 231 232 debugDump(vprDBG_CONFIG_LVL); 233 234 // Set the Node as disconnected since we have lost the connection 235 setStatus(DISCONNECTED); 236 237 // Shut down manually instead of calling shutdown since 238 // we are in the control thread. 239 mRunning = false; 240 //if (NULL != mSockStream) 241 //{ 242 //if(mSockStream->isOpen()) 243 //{ 244 // mSockStream->close(); 245 //} 246 //delete mSockStream; 247 //mSockStream = NULL; 248 //} 249 } 250 } 251 252 // Signal done with Update 253 mNodeDoneSema.release(); 254 } 255 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << "Node: " << getName() << " is stopping." 256 << std::endl << vprDEBUG_FLUSH; 257 } 258 259 /** Starts the control loop. */ 260 void Node::start() 261 { 262 // --- Setup Multi-Process stuff --- // 263 // Create a new thread to handle the control 264 265 if (NULL != mControlThread && mControlThread->valid()) 266 { 267 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) 268 << "Node " << getName() << " already running..." 191 void Node::update() 192 { 193 mUpdated = false; 194 195 while ( ! mUpdated ) 196 { 197 try 198 { 199 doUpdate(); 200 } 201 catch (cluster::ClusterException& cluster_exception) 202 { 203 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED) 204 << cluster_exception.what() << clrRESET 269 205 << std::endl << vprDEBUG_FLUSH; 270 return; 271 } 272 273 mRunning = true; 274 275 mControlThread = new vpr::Thread(boost::bind(&Node::controlLoop, this)); 276 277 if (mControlThread->valid()) 278 { 279 mThreadActive = true; 280 } 281 vprDEBUG(gadgetDBG_RIM, vprDBG_CONFIG_LVL) 282 << "Node " << getName() << " started. thread: " 283 << mControlThread << std::endl << vprDEBUG_FLUSH; 284 } 285 286 void Node::signalUpdate() 287 { 288 while(!mThreadActive) 289 { 290 vprDEBUG(gadgetDBG_RIM, vprDBG_HVERB_LVL) << "Waiting for thread to start ClusterNode::start().\n" << vprDEBUG_FLUSH; 291 vpr::Thread::yield(); 292 } 293 //vprDEBUG(gadgetDBG_RIM,/*vprDBG_HVERB_LVL*/1) << getName() << "Signaling ClusterNode\n" << vprDEBUG_FLUSH; 294 mUpdateTriggerSema.release(); 295 } 296 297 /** 298 * Blocks until the end of the frame. 299 * @post The frame has been drawn. 300 */ 301 void Node::sync() 302 { 303 vprASSERT(mThreadActive == true); 304 mNodeDoneSema.acquire(); 206 207 vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) 208 << "Node::update() We have lost our connection to: " << getName() 209 << ":" << getPort() << std::endl << vprDEBUG_FLUSH; 210 211 debugDump(vprDBG_CONFIG_LVL); 212 213 // Set the Node as disconnected since we have lost the connection 214 setStatus(DISCONNECTED); 215 } 216 } 305 217 } 306 218 juggler/branches/2.2/modules/gadgeteer/gadget/Node.h
r19729 r20010 30 30 #include <gadget/gadgetConfig.h> 31 31 #include <vpr/IO/Socket/SocketStream.h> 32 #include <vpr/Sync/Semaphore.h>33 32 #include <vpr/Thread/Thread.h> 34 33 #include <gadget/Util/Debug.h> … … 70 69 */ 71 70 Node(const std::string& name, const std::string& host_name, 72 const vpr::Uint16&port, vpr::SocketStream* socket_stream,73 AbstractNetworkManager* net_mgr);71 const vpr::Uint16 port, vpr::SocketStream* socket_stream, 72 AbstractNetworkManager* net_mgr); 74 73 75 74 /** … … 175 174 public: 176 175 /** 177 * Start the update thread for this node. 178 */ 179 void start(); 180 181 /** 182 * Control loop for updating this thread. 183 */ 184 void controlLoop(); 185 186 /** 187 * Signal a semaphore to let the update thread fall into 188 * the code to update the UserData structures. 189 */ 190 void signalUpdate(); 191 192 /** 193 * Signal a semaphore to signal that we are done either 194 * updating the UserData or DeviceData. 195 */ 196 void sync(); 197 176 */ 177 void update(); 178 198 179 /** 199 180 * Kill the update thread. … … 221 202 protected: 222 203 /** 223 * Updatethis cluster node.224 */ 225 void update();204 * Do the actual work of updating this cluster node. 205 */ 206 void doUpdate(); 226 207 227 208 protected: … … 229 210 std::string mHostname; /**< Host that it is connected to */ 230 211 vpr::Uint16 mPort; /**< Port that it is connected to */ 231 bool mRunning; /**< Thread is running the control loop */232 212 233 213 vpr::SocketStream* mSockStream; /**< Socket used for communication to this node */ … … 237 217 vpr::Mutex mStatusLock; /**< Lock the isConnected value */ 238 218 int mStatus; /**< States if this node is connected */ 239 240 vpr::Mutex mUpdatedLock; /**< Lock the isUpdated value */ 219 241 220 bool mUpdated; /**< States if this node is updated */ 242 243 vpr::Semaphore mUpdateTriggerSema; /**< Semaphore trigger for UserData update */244 vpr::Semaphore mNodeDoneSema; /**< Semaphore trigger for completion */245 246 vpr::Thread* mControlThread; /**< Update thread for this node */247 bool mThreadActive; /**< Has the update thread started? */248 221 249 222 vpr::Uint64 mDelta; /**< Time delta between remote and local clocks. */
