Changeset 20010

Show
Ignore:
Timestamp:
04/27/07 14:22:07 (1 year ago)
Author:
aronb
Message:

Change clustering to be single threaded. This uses a Reactor to do a select on
all sockets.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • juggler/branches/2.2/modules/gadgeteer/cluster/ClusterManager.cpp

    r19729 r20010  
    414414      } 
    415415 
    416       std::vector<gadget::Node*>::iterator begin_cluster_node = 
    417          mClusterNetwork->getNodesBegin(); 
    418       std::vector<gadget::Node*>::iterator end_cluster_node = 
    419          mClusterNetwork->getNodesEnd(); 
    420  
    421       cluster::EndBlock* temp_end_block = new EndBlock( temp )
    422  
    423       for ( std::vector<gadget::Node*>::iterator i=begin_cluster_node
    424             i!=end_cluster_node
     416      cluster::EndBlock temp_end_block(temp); 
     417 
     418      // Used to accumulate the number of connected nodes. 
     419      size_t num_nodes(0); 
     420 
     421      typedef std::vector<gadget::Node*>::iterator iter_t
     422 
     423      for ( iter_t i = mClusterNetwork->getNodesBegin()
     424            i != mClusterNetwork->getNodesEnd()
    425425            ++i ) 
    426426      { 
     
    429429            try 
    430430            { 
    431                // Send End Blocks to all connected Nodes 
    432                (*i)->send( temp_end_block ); 
    433  
    434                // Signal Update thread to read Network Packets 
    435                (*i)->signalUpdate(); 
     431               // Send End Block to the node. 
     432               (*i)->send(&temp_end_block); 
     433 
     434               // Indicate that this node is not up to date. It will be updated 
     435               // below. 
     436               (*i)->setUpdated(false); 
     437 
     438               ++num_nodes; 
    436439            } 
    437             catch(cluster::ClusterException& cluster_exception
    438             { 
    439                vprDEBUG( gadgetDBG_RIM, vprDBG_CONFIG_LVL
    440                   << clrOutBOLD(clrRED, "ERROR: "
    441                   << cluster_exception.what() 
     440            catch (cluster::ClusterException& ex
     441            { 
     442               vprDEBUG(gadgetDBG_RIM, vprDBG_CRITICAL_LVL
     443                  << clrOutBOLD("ERROR", clrRED
     444                  << ": Failed to send end block to " << (*i)->getName() 
    442445                  << std::endl << vprDEBUG_FLUSH; 
     446               vprDEBUG_NEXT(gadgetDBG_RIM, vprDBG_CRITICAL_LVL) 
     447                  << ex.what() << std::endl << vprDEBUG_FLUSH; 
     448               vprDEBUG_NEXT(gadgetDBG_RIM, vprDBG_CRITICAL_LVL) 
     449                  << "Shutting down the node." << std::endl << vprDEBUG_FLUSH; 
    443450 
    444451               (*i)->shutdown(); 
     
    446453         } 
    447454      } 
    448       for ( std::vector<gadget::Node*>::iterator i = begin_cluster_node; 
    449             i != end_cluster_node; 
    450             ++i ) 
    451       { 
    452          if ( (*i)->isConnected() ) 
    453          { 
    454             //Block waiting for all packets to be received 
    455             (*i)->sync(); 
    456          } 
    457       } 
    458       delete temp_end_block; 
     455 
     456      gadget::Reactor& reactor = mClusterNetwork->getReactor(); 
     457      size_t completed_nodes(0); 
     458 
     459      while ( completed_nodes != num_nodes ) 
     460      { 
     461         std::vector<gadget::Node*> ready_nodes = 
     462            reactor.getReadyNodes(vpr::Interval::NoWait); 
     463 
     464         for ( iter_t i = ready_nodes.begin(); i != ready_nodes.end(); ++i ) 
     465         { 
     466            // Make sure that we do not update nodes more than once simply 
     467            // because there is data ready to read from them. This is an 
     468            // unfortunate side effect of using a reactor in this clumsy way. 
     469            if ( (*i)->isUpdated() ) 
     470            { 
     471               continue; 
     472            } 
     473 
     474            try 
     475            { 
     476               // Read network packets. 
     477               (*i)->update(); 
     478            } 
     479            catch (cluster::ClusterException& ex) 
     480            { 
     481               vprDEBUG(gadgetDBG_RIM, vprDBG_CRITICAL_LVL) 
     482                  << clrOutBOLD("ERROR", clrRED) 
     483                  << ": Failed to complete state update for " 
     484                  << (*i)->getName() << std::endl << vprDEBUG_FLUSH; 
     485               vprDEBUG_NEXT(gadgetDBG_RIM, vprDBG_CRITICAL_LVL) 
     486                  << ex.what() << std::endl << vprDEBUG_FLUSH; 
     487               vprDEBUG_NEXT(gadgetDBG_RIM, vprDBG_CRITICAL_LVL) 
     488                  << "Shutting down the node." << std::endl << vprDEBUG_FLUSH; 
     489 
     490               (*i)->shutdown(); 
     491            } 
     492 
     493            // Record completed state. This happens regardless of whether 
     494            // the node update completed successfully. Since the node is 
     495            // shut down if update fails, we would never get completed_nodes 
     496            // to equal num_nodes otherwise. 
     497            ++completed_nodes; 
     498         } 
     499      } 
    459500   } 
    460501 
  • juggler/branches/2.2/modules/gadgeteer/cluster/Makefile.in

    r19729 r20010  
    5252                ClusterException.cpp    \ 
    5353                ClusterManager.cpp      \ 
    54                 ClusterNode.cpp         \ 
    5554                ClusterPlugin.cpp 
    5655 
  • juggler/branches/2.2/modules/gadgeteer/gadget/AbstractNetworkManager.cpp

    r19729 r20010  
    2727#include <gadget/gadgetConfig.h> 
    2828 
     29#include <algorithm> 
    2930#include <iomanip> 
    3031 
     
    245246   } 
    246247 
     248   struct HostnamePred 
     249   { 
     250      HostnamePred(const std::string& nodeHostname) 
     251         : mHostname(nodeHostname) 
     252      { 
     253         /* Do nothing. */ ; 
     254      } 
     255 
     256      bool operator()(gadget::Node* n) 
     257      { 
     258         return n->getHostname() == mHostname; 
     259      } 
     260 
     261      const std::string& mHostname; 
     262   }; 
     263 
    247264   void AbstractNetworkManager::removeNode(const std::string& nodeHostname) 
    248265   { 
     
    252269         << std::endl << vprDEBUG_FLUSH; 
    253270 
    254       for (std::vector<gadget::Node*>::iterator itr = mNodes.begin() ; itr != mNodes.end() ; itr++) 
    255       { 
    256          if ((*itr)->getHostname() == nodeHostname) 
    257          { 
    258             mNodes.erase(itr); 
    259             return; 
    260          } 
     271      typedef std::vector<gadget::Node*>::iterator iter_t; 
     272      HostnamePred pred(nodeHostname); 
     273      iter_t n = std::find_if(mNodes.begin(), mNodes.end(), pred); 
     274 
     275      if ( n != mNodes.end() ) 
     276      { 
     277         mReactor.removeNode(*n); 
     278         mNodes.erase(n); 
    261279      } 
    262280   } 
     
    476494         if ((*i)->getStatus() == Node::NEWCONNECTION) 
    477495         { 
    478             vprDEBUG( gadgetDBG_NET_MGR, vprDBG_STATE_LVL
     496            vprDEBUG( gadgetDBG_NET_MGR, 0
    479497               << clrOutBOLD( clrMAGENTA, "[AbstractNetworkManager]" ) 
    480498               << " Node: " << (*i)->getName() 
     
    483501 
    484502            new_connection = true; 
    485             (*i)->start(); 
    486503            (*i)->setStatus( Node::CONNECTED ); 
     504            mReactor.addNode(*i); 
    487505         } 
    488506      } 
  • juggler/branches/2.2/modules/gadgeteer/gadget/AbstractNetworkManager.h

    r19729 r20010  
    3737#include <jccl/RTRC/ConfigElementHandler.h> 
    3838#include <jccl/Config/ConfigElementPtr.h> 
     39 
     40#include <gadget/Connector.h> 
     41#include <gadget/Reactor.h> 
    3942 
    4043 
     
    131134   void debugDumpNodes(int debug_level); 
    132135 
     136   Reactor& getReactor() 
     137   { 
     138      return mReactor; 
     139   } 
     140 
    133141   /** 
    134142    * Get an iterator to the beginning of the Nodes std::vector. 
     
    216224 
    217225   std::map<vpr::GUID, PacketHandler*>  mHandlerMap; 
     226   Reactor mReactor; 
    218227}; 
    219228 
  • juggler/branches/2.2/modules/gadgeteer/gadget/Makefile.in

    r19729 r20010  
    6060                ProxyDepChecker.cpp             \ 
    6161                ProxyFactory.cpp                \ 
     62                Reactor.cpp                     \ 
    6263                RemoteInputManager.cpp          \ 
    6364                VirtualDevice.cpp 
  • juggler/branches/2.2/modules/gadgeteer/gadget/Node.cpp

    r19847 r20010  
    5151{ 
    5252 
    53 Node::Node(const std::string& name, const std::string& host_name, 
    54                          const vpr::Uint16& port, vpr::SocketStream* socket_stream, 
    55                          AbstractNetworkManager* net_mgr) 
    56    : mRunning(false), mStatus(DISCONNECTED), mUpdateTriggerSema(0), 
    57      mNodeDoneSema(0), mControlThread(NULL), mNetworkManager(net_mgr) 
    58 
    59    mThreadActive = false; 
    60    mUpdated = false; 
    61  
    62    mName = name; 
    63    mHostname = host_name; 
    64    mPort = port; 
    65    mSockStream = socket_stream; 
     53Node::Node(const std::string& name, const std::string& host_name,  
     54           const vpr::Uint16 port, vpr::SocketStream* socket_stream, 
     55           AbstractNetworkManager* net_mgr) 
     56   : mName(name) 
     57   , mHostname(host_name) 
     58   , mPort(port) 
     59   , mSockStream(socket_stream) 
     60   , mStatus(DISCONNECTED) 
     61   , mUpdated(false) 
     62   , mNetworkManager(net_mgr) 
     63
    6664   vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) 
    6765      << clrOutBOLD(clrBLUE,"[Node]") 
     
    7876{ 
    7977   setStatus(DISCONNECTED); 
    80    // This may break the accept code since we might not want to delete the Socket. 
    81    // We may be able to just use a smart pointer to point to the SocketStream. 
    82    mRunning = false; 
    83  
    84    // Make sure that the conrtol loop exits naturally. 
    85    mUpdateTriggerSema.release(); 
    86  
    87    //mNodeDoneSema.acquire(); 
    8878 
    8979   if (NULL != mSockStream) 
     
    9585      } 
    9686      delete mSockStream; 
     87      mSockStream = NULL; 
    9788      */ 
    9889   } 
     
    171162} 
    172163 
    173 void Node::update() 
     164void Node::doUpdate() 
    174165{ 
    175166   // - If connected() && !updated() 
     
    198189} 
    199190 
    200 void Node::controlLoop() 
    201 
    202    // - Block on an update call 
    203    // - Update Local Data 
    204    // - Send 
    205    // - Signal Sync 
    206  
    207    while( mRunning ) 
    208    { 
    209       // Wait for trigger 
    210       if( mRunning ) 
    211       { 
    212          mUpdateTriggerSema.acquire(); 
    213       } 
    214  
    215       mUpdated = false; 
    216       while ( mRunning && !mUpdated ) 
    217       { 
    218          try 
    219          { 
    220             update(); 
    221          } 
    222          catch(cluster::ClusterException cluster_exception) 
    223          { 
    224             vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrOutBOLD(clrRED, "ERROR: ") 
    225                << cluster_exception.what() 
    226                << std::endl << vprDEBUG_FLUSH; 
    227  
    228             vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << 
    229                "Node::update() We have lost our connection to: " << getName() << ":" << getPort() 
    230                << std::endl << vprDEBUG_FLUSH; 
    231  
    232             debugDump(vprDBG_CONFIG_LVL); 
    233  
    234             // Set the Node as disconnected since we have lost the connection 
    235             setStatus(DISCONNECTED); 
    236  
    237             // Shut down manually instead of calling shutdown since 
    238             // we are in the control thread. 
    239             mRunning = false; 
    240             //if (NULL != mSockStream) 
    241             //{ 
    242                //if(mSockStream->isOpen()) 
    243                //{ 
    244                //   mSockStream->close(); 
    245                //} 
    246                //delete mSockStream; 
    247                //mSockStream = NULL; 
    248             //} 
    249          } 
    250       } 
    251  
    252       // Signal done with Update 
    253       mNodeDoneSema.release(); 
    254    } 
    255    vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << "Node: " << getName() << " is stopping." 
    256                                              << std::endl << vprDEBUG_FLUSH; 
    257 
    258  
    259 /** Starts the control loop. */ 
    260 void Node::start() 
    261 
    262    // --- Setup Multi-Process stuff --- // 
    263    // Create a new thread to handle the control 
    264  
    265    if (NULL != mControlThread && mControlThread->valid()) 
    266    { 
    267       vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) 
    268             << "Node " << getName() << " already running..." 
     191void Node::update() 
     192
     193   mUpdated = false; 
     194 
     195   while ( ! mUpdated ) 
     196   { 
     197      try 
     198      { 
     199         doUpdate(); 
     200      } 
     201      catch (cluster::ClusterException& cluster_exception) 
     202      { 
     203         vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED) 
     204            << cluster_exception.what() << clrRESET 
    269205            << std::endl << vprDEBUG_FLUSH; 
    270       return; 
    271    } 
    272  
    273    mRunning = true; 
    274  
    275    mControlThread = new vpr::Thread(boost::bind(&Node::controlLoop, this)); 
    276  
    277    if (mControlThread->valid()) 
    278    { 
    279       mThreadActive = true; 
    280    } 
    281    vprDEBUG(gadgetDBG_RIM, vprDBG_CONFIG_LVL) 
    282       << "Node " << getName() << " started. thread: " 
    283       << mControlThread << std::endl << vprDEBUG_FLUSH; 
    284 
    285  
    286 void Node::signalUpdate() 
    287 
    288    while(!mThreadActive) 
    289    { 
    290       vprDEBUG(gadgetDBG_RIM, vprDBG_HVERB_LVL) << "Waiting for thread to start ClusterNode::start().\n" << vprDEBUG_FLUSH; 
    291       vpr::Thread::yield(); 
    292    } 
    293    //vprDEBUG(gadgetDBG_RIM,/*vprDBG_HVERB_LVL*/1) << getName() << "Signaling ClusterNode\n" << vprDEBUG_FLUSH; 
    294    mUpdateTriggerSema.release(); 
    295 
    296  
    297 /** 
    298  * Blocks until the end of the frame. 
    299  * @post The frame has been drawn. 
    300  */ 
    301 void Node::sync() 
    302 
    303    vprASSERT(mThreadActive == true); 
    304    mNodeDoneSema.acquire(); 
     206          
     207         vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) 
     208            << "Node::update() We have lost our connection to: " << getName() 
     209            << ":" << getPort() << std::endl << vprDEBUG_FLUSH; 
     210 
     211         debugDump(vprDBG_CONFIG_LVL); 
     212          
     213         // Set the Node as disconnected since we have lost the connection 
     214         setStatus(DISCONNECTED); 
     215      } 
     216   } 
    305217} 
    306218 
  • juggler/branches/2.2/modules/gadgeteer/gadget/Node.h

    r19729 r20010  
    3030#include <gadget/gadgetConfig.h> 
    3131#include <vpr/IO/Socket/SocketStream.h> 
    32 #include <vpr/Sync/Semaphore.h> 
    3332#include <vpr/Thread/Thread.h> 
    3433#include <gadget/Util/Debug.h> 
     
    7069    */ 
    7170   Node(const std::string& name, const std::string& host_name,  
    72                const vpr::Uint16& port, vpr::SocketStream* socket_stream, 
    73                AbstractNetworkManager* net_mgr); 
     71        const vpr::Uint16 port, vpr::SocketStream* socket_stream, 
     72        AbstractNetworkManager* net_mgr); 
    7473 
    7574   /** 
     
    175174public: 
    176175   /** 
    177     * Start the update thread for this node. 
    178     */ 
    179    void start(); 
    180  
    181    /** 
    182     * Control loop for updating this thread. 
    183     */ 
    184    void controlLoop(); 
    185  
    186    /** 
    187     * Signal a semaphore to let the update thread fall into 
    188     * the code to update the UserData structures. 
    189     */ 
    190    void signalUpdate(); 
    191           
    192    /** 
    193     * Signal a semaphore to signal that we are done either 
    194     * updating the UserData or DeviceData. 
    195     */ 
    196    void sync(); 
    197     
     176    */ 
     177   void update(); 
     178 
    198179   /** 
    199180    * Kill the update thread. 
     
    221202protected: 
    222203   /** 
    223     * Update this cluster node. 
    224     */ 
    225    void update(); 
     204    * Do the actual work of updating this cluster node. 
     205    */ 
     206   void doUpdate(); 
    226207 
    227208protected: 
     
    229210   std::string          mHostname;              /**< Host that it is connected to */ 
    230211   vpr::Uint16          mPort;                  /**< Port that it is connected to */ 
    231    bool                 mRunning;               /**< Thread is running the control loop */ 
    232212 
    233213   vpr::SocketStream*   mSockStream;            /**< Socket used for communication to this node */       
     
    237217   vpr::Mutex           mStatusLock;            /**< Lock the isConnected value */ 
    238218   int                  mStatus;                /**< States if this node is connected */ 
    239     
    240    vpr::Mutex           mUpdatedLock;           /**< Lock the isUpdated value */ 
     219 
    241220   bool                 mUpdated;               /**< States if this node is updated */ 
    242     
    243    vpr::Semaphore       mUpdateTriggerSema;     /**< Semaphore trigger for UserData update  */ 
    244    vpr::Semaphore       mNodeDoneSema;          /**< Semaphore trigger for completion */ 
    245     
    246    vpr::Thread*         mControlThread;         /**< Update thread for this node */ 
    247    bool                 mThreadActive;          /**< Has the update thread started? */ 
    248221 
    249222   vpr::Uint64          mDelta;                 /**< Time delta between remote and local clocks. */