Changeset 21053

Show
Ignore:
Timestamp:
02/25/08 09:10:07 (8 months ago)
Author:
dshipton
Message:

Refactor Gadgeteer network code to eliminate costly back-to-back writes.
On a 50 node cluster performance jumps from ~60FPS to ~170FPS with a simple clustered app.

FYI-

Test 0: Trunk(no patches) ~60FPS
Test 1: Just corking the network on plugin calls ~120FPS
Test 2: 1 + eliminate most back-to-back writes ~150FPS
Test 3: Full patch ~170FPS

Major changes include:

  • Utilizing the setNoPush() methods recently added to vpr.
  • Refactoring/Simplifying the Packets/Header code.

Eliminate extraneous back-to-back calls with new Packets/Header code.
Prepend header instead of serializing twice.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • juggler/trunk/modules/gadgeteer/cluster/ClusterManager.cpp

    r20974 r21053  
    368368      << clrOutBOLD( clrCYAN,"[ClusterManager]" ) 
    369369      << " preDraw" << std::endl << vprDEBUG_FLUSH; 
    370  
     370   getNetwork()->corkNetwork(); 
    371371   for ( plugin_list_t::iterator itr = mPlugins.begin(); itr != mPlugins.end(); itr++ ) 
    372372   { 
     
    394394      << clrOutBOLD( clrCYAN,"[ClusterManager]" ) 
    395395      << " postPostFrame" << std::endl << vprDEBUG_FLUSH; 
    396  
     396   getNetwork()->corkNetwork(); 
    397397   for ( plugin_list_t::iterator itr = mPlugins.begin(); itr != mPlugins.end(); itr++ ) 
    398398   { 
  • juggler/trunk/modules/gadgeteer/cluster/Packets/ConfigPacket.cpp

    r20974 r21053  
    7272   mPacketWriter->setCurPos(0); 
    7373 
    74    // Serialize the header. 
    75    mHeader->serializeHeader(); 
    76     
    7774   mPacketWriter->writeString(mConfig); 
    7875   mPacketWriter->writeUint16(mType); 
     76 
     77   // Serialize the header. 
     78   mHeader->prependSerializedHeader(mPacketWriter); 
    7979} 
    8080 
  • juggler/trunk/modules/gadgeteer/cluster/Packets/DataPacket.cpp

    r20974 r21053  
    7777void DataPacket::serialize(vpr::SerializableObject& object) 
    7878{ 
    79    // Clear the data stream. 
     79   // Clear data stream since header is at beginning 
    8080   mPacketWriter->getData()->clear(); 
    81    mPacketWriter->setCurPos(0); 
     81   mPacketWriter->setCurPos( 0 ); 
    8282 
    83    // Serialize the header. 
    84    mHeader->serializeHeader(); 
    85     
    8683   // Serialize plugin GUID. 
    8784   mPluginId.writeObject(mPacketWriter); 
    88     
     85 
    8986   // Serialize device GUID. 
    9087   mObjectId.writeObject(mPacketWriter); 
    9188 
    9289   object.writeObject(mPacketWriter); 
     90 
     91   // Serialize the header. 
     92   mHeader->prependSerializedHeader(mPacketWriter); 
    9393} 
    9494 
  • juggler/trunk/modules/gadgeteer/cluster/Packets/DeviceAck.cpp

    r20974 r21053  
    8787void DeviceAck::serialize() 
    8888{ 
    89    // Clear the data stream. 
     89   // Clear data stream since header is at beginning 
    9090   mPacketWriter->getData()->clear(); 
    91    mPacketWriter->setCurPos(0); 
     91   mPacketWriter->setCurPos( 0 ); 
    9292 
    93    // Serialize the header. 
    94    mHeader->serializeHeader(); 
    95     
    9693   // Serialize plugin GUID 
    9794   mPluginId.writeObject(mPacketWriter); 
    98     
     95 
    9996   // Serialize Device GUID 
    10097   mId.writeObject(mPacketWriter); 
    101     
     98 
    10299   // Serialize the Device Name 
    103100   mPacketWriter->writeString(mDeviceName); 
    104     
     101 
    105102   // Serialize the Base Type of the acknowledged device 
    106103   mPacketWriter->writeString(mDeviceBaseType); 
     
    111108   // Serialize the Ack boolean 
    112109   mPacketWriter->writeBool(mAck); 
     110 
     111   // Serialize the header. 
     112   mHeader->prependSerializedHeader(mPacketWriter); 
    113113} 
    114114 
  • juggler/trunk/modules/gadgeteer/cluster/Packets/EndBlock.cpp

    r20974 r21053  
    7878void EndBlock::serialize() 
    7979{ 
    80    //mTempVar = Header::RIM_END_BLOCK; 
    81    // Clear the data stream. 
     80   // Clear data stream since header is at beginning 
    8281   mPacketWriter->getData()->clear(); 
    83    mPacketWriter->setCurPos(0); 
     82   mPacketWriter->setCurPos( 0 ); 
     83 
     84   // Serialize the Temp Var 
     85   mPacketWriter->writeUint16(mTempVar); 
    8486 
    8587   // Serialize the header. 
    86    mHeader->serializeHeader();       
    87     
    88    // Serialize the Temp Var 
    89    mPacketWriter->writeUint16(mTempVar); 
     88   mHeader->prependSerializedHeader(mPacketWriter); 
    9089} 
    9190 
  • juggler/trunk/modules/gadgeteer/cluster/Packets/Header.cpp

    r20974 r21053  
    5555} 
    5656 
    57 void Header::readData(vpr::SocketStream* stream
     57void Header::readData(vpr::SocketStream* stream, bool dumpHeader
    5858{ 
    5959   vprASSERT( NULL != stream && "Can not create a Header using a NULL SocketStream" ); 
     
    7373 
    7474   vpr::Uint32 bytes_read;    
    75     
     75   std::vector<vpr::Uint8> header_data;  
    7676   try 
    7777   { 
    78       bytes_read = stream->readn(mData, Header::RIM_PACKET_HEAD_SIZE); 
     78      bytes_read = stream->readn(header_data, Header::RIM_PACKET_HEAD_SIZE); 
    7979   } 
    8080   catch (vpr::IOException& ex) 
     
    8888 
    8989   vprASSERT( RIM_PACKET_HEAD_SIZE == bytes_read && "Header::Header() - Bytes read != RIM_PACKET_HEAD_SIZE" ); 
    90     
    91    parseHeader(); 
     90 
     91   parseHeader(header_data); 
     92 
     93   if(dumpHeader) 
     94   { 
     95      std::cout << "Dumping Header(" << header_data.size() << " bytes): "; 
     96      for ( std::vector<vpr::Uint8>::const_iterator i = header_data.begin(); 
     97            i != header_data.end(); i++ ) 
     98      { 
     99         std::cout << (int)*i << " "; 
     100      } 
     101      std::cout << std::endl; 
     102   } 
    92103} 
    93104 
    94 void Header::serializeHeader() 
    95 {   
    96    vpr::BufferObjectWriter writer(&mData); 
    97    writer.getData()->clear(); 
    98    writer.setCurPos( 0 ); 
     105void Header::prependSerializedHeader(vpr::BufferObjectWriter* packetWriter) 
     106
     107   // Set Packet length 
     108   setPacketLength( packetWriter->getData()->size() + RIM_PACKET_HEAD_SIZE ); 
     109 
     110   std::vector<vpr::Uint8> header_data; 
     111   vpr::BufferObjectWriter writer( &header_data ); 
    99112 
    100113   // Serialize all header data. 
     
    103116   writer.writeUint32( mFrame ); 
    104117   writer.writeUint32( mPacketLength ); 
     118 
     119   // Prepend header onto packets data 
     120   packetWriter->getData()->insert( packetWriter->getData()->begin(), 
     121                                   header_data.begin(), 
     122                                   header_data.end() ); 
     123 
     124   packetWriter->setCurPos( getPacketLength() ); 
     125 
    105126} 
    106127 
    107 void Header::parseHeader(
     128void Header::parseHeader(std::vector<vpr::Uint8>& headerData
    108129{ 
    109    vpr::BufferObjectReader reader( &mData ); 
     130   vpr::BufferObjectReader reader( &headerData ); 
    110131 
    111132   // Parse the incoming data. 
     
    125146      throw cluster::ClusterException( "Header::parseHeader() - Invalid packet header!" ); 
    126147   } 
    127 } 
    128  
    129 void Header::send(vpr::SocketStream* socket) const 
    130 { 
    131    vprASSERT( NULL != socket && "Socket is NULL" ); 
    132  
    133    // Send the header data. 
    134    try 
    135    { 
    136       socket->send(mData, RIM_PACKET_HEAD_SIZE); 
    137    } 
    138    catch (vpr::IOException& ex) 
    139    { 
    140       vprDEBUG( gadgetDBG_RIM, vprDBG_CRITICAL_LVL ) 
    141          << "Header::send() - Failed to send header." 
    142          << ex.what() 
    143          << std::endl << vprDEBUG_FLUSH; 
    144       throw ex; 
    145    } 
    146 } 
    147  
    148 void Header::dump() const 
    149 { 
    150    std::cout << "Dumping Header(" << mData.size() << " bytes): "; 
    151    for ( std::vector<vpr::Uint8>::const_iterator i = mData.begin(); 
    152          i != mData.end(); i++ ) 
    153    { 
    154       std::cout << (int)*i << " "; 
    155    } 
    156    std::cout << std::endl; 
    157148} 
    158149 
  • juggler/trunk/modules/gadgeteer/cluster/Packets/Header.h

    r20974 r21053  
    111111    *        to read from \p stream. 
    112112    */ 
    113    void readData(vpr::SocketStream* stream); 
     113   void readData(vpr::SocketStream* stream, bool dumpHeader=false); 
    114114 
    115    void serializeHeader(); 
    116  
    117    void parseHeader(); 
    118  
    119    /** 
    120     * Writes the packet header data to the given socket. 
    121     * 
    122     * @pre \p socket is not NULL. 
    123     * @post The contents of \c mData (which is \c RIM_PACKET_HEADER_SIZE bytes 
    124     *       long) are written to \p socket. 
    125     * 
    126     * @throw cluster::ClusterException is thrown if the packet header cannot 
    127     *        be written to \p socket. 
    128     */ 
    129    void send(vpr::SocketStream* socket) const; 
    130  
    131    void dump() const; 
     115   void prependSerializedHeader(vpr::BufferObjectWriter* writer); 
    132116 
    133117   vpr::Uint16 getRIMCode() const 
     
    158142   void printData( const int debug_level ) const; 
    159143protected: 
    160    std::vector<vpr::Uint8> mData
     144   void parseHeader(std::vector<vpr::Uint8>& headerData)
    161145 
    162146   vpr::Uint16 mRIMCode; 
  • juggler/trunk/modules/gadgeteer/cluster/Packets/Packet.cpp

    r20974 r21053  
    6262void Packet::dump() const 
    6363{ 
    64    if (NULL != mHeader.get()) 
    65    { 
    66       mHeader->dump(); 
    67    } 
    68    else 
    69    { 
    70       std::cout << "Could not dump Header since it is NULL!" << std::endl; 
    71    } 
    7264   std::cout << "Dumping Packet(" << mData.size() << " bytes): "; 
    7365   for ( std::vector<vpr::Uint8>::const_iterator i = mData.begin(); 
  • juggler/trunk/modules/gadgeteer/gadget/NetworkManager.cpp

    r20974 r21053  
    304304   setAllUpdated(false); 
    305305   size_t num_nodes = sendEndBlocks(temp); 
     306   uncorkNetwork(); 
    306307   updateAllNodes(num_nodes); 
    307308   vpr::prof::stop(); 
     
    401402 
    402403   vpr::prof::start("ClusterManager::updateAllNodes()",10); 
     404   std::vector<gadget::NodePtr> ready_nodes; 
    403405   while ( completed_nodes != numNodes ) 
    404406   { 
    405       std::vector<gadget::NodePtr> ready_nodes; 
    406  
    407407      try 
    408408      { 
     
    520520   // -Create a new Node using the given information 
    521521   // -Add the new node to the NetworkManager 
    522     
     522 
    523523   vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL ) 
    524524      << clrOutBOLD(clrBLUE,"[NetworkManager]") 
     
    528528   NodePtr temp_node = Node::create(name, hostName, port, socketStream); 
    529529   mNodes.push_back( temp_node ); 
    530     
     530 
    531531   return true; 
    532532} 
     
    673673} 
    674674 
     675void NetworkManager::corkNetwork() 
     676{ 
     677   for (node_list_t::iterator itr = mNodes.begin(); itr != mNodes.end(); itr++) 
     678   { 
     679      (*itr)->setNoPush(true); 
     680      //(*itr)->setNoDelay(false); 
     681   } 
     682 
     683} 
     684 
     685void NetworkManager::uncorkNetwork() 
     686{ 
     687   vpr::prof::start("ClusterManager::uncorkNetwork()",10); 
     688   for (node_list_t::iterator itr = mNodes.begin(); itr != mNodes.end(); itr++) 
     689   { 
     690      (*itr)->setNoPush(false); 
     691      //(*itr)->setNoDelay(true); 
     692   } 
     693   vpr::prof::stop(); 
     694} 
     695 
     696 
    675697bool NetworkManager::connectTo(NodePtr node) 
    676698{ 
  • juggler/trunk/modules/gadgeteer/gadget/NetworkManager.h

    r20974 r21053  
    8484   void update( const int temp); 
    8585   void barrier( bool master ); 
     86    
     87   /** 
     88    * Optimize network traffic by gathering write calls. 
     89    * uncorkNetwork() must be called once write calls are finished. 
     90    */ 
     91   inline void corkNetwork(); 
     92 
     93   /** 
     94    * Flush pending writes to the network. 
     95    */ 
     96   inline void uncorkNetwork(); 
    8697 
    8798private: 
  • juggler/trunk/modules/gadgeteer/gadget/Node.cpp

    r20974 r21053  
    160160} 
    161161 
     162void Node::setNoPush(bool enable) 
     163{ 
     164  mSockStream->setNoPush(enable); 
     165} 
     166 
     167void Node::setNoDelay(bool enable) 
     168{ 
     169   mSockStream->setNoDelay(enable); 
     170} 
     171 
    162172bool Node::send(cluster::PacketPtr outPacket) 
    163173{ 
     
    166176   vpr::Guard<vpr::Mutex> guard(mSockWriteLock); 
    167177 
    168    cluster::HeaderPtr header = outPacket->getHeader(); 
    169  
    170    vprASSERT(NULL != header.get() && "Node::send() - Can't have a NULL header."); 
    171178   vprASSERT(NULL != mSockStream && "Node::send() - SocketStream can't be NULL"); 
    172179 
     
    175182   try 
    176183   { 
    177       header->send(mSockStream); 
     184      mSockStream->send( outPacket->getData(), 
     185                         outPacket->getHeader()->getPacketLength()); 
    178186   } 
    179187   catch (vpr::IOException&) 
    180188   { 
    181189      // TODO: setCause(ex) 
    182       throw cluster::ClusterException("Packet::recv() - Sending Header Data failed!"); 
    183    } 
    184  
    185    // Early out if we only have a header. 
    186    if(header->getPacketLength() == cluster::Header::RIM_PACKET_HEAD_SIZE) 
    187    { 
    188       return true; 
    189    } 
    190  
    191    try 
    192    { 
    193       mSockStream->send(outPacket->getData(), 
    194          header->getPacketLength() - cluster::Header::RIM_PACKET_HEAD_SIZE); 
    195    } 
    196    catch (vpr::IOException&) 
    197    { 
    198       // TODO: setCause(ex) 
    199       throw cluster::ClusterException("Packet::recv() - Sending data packet failed!!"); 
     190      throw cluster::ClusterException("Packet::send() - Sending Data failed!"); 
    200191   } 
    201192 
  • juggler/trunk/modules/gadgeteer/gadget/Node.h

    r20974 r21053  
    120120      return mHostname; 
    121121   } 
     122     
     123   void setNoDelay(bool enable); 
    122124 
    123125   /** 
     
    152154      return (CONNECTED == getStatus()); 
    153155   } 
     156 
     157    void setNoPush(bool enable); 
    154158 
    155159   /** 
  • juggler/trunk/modules/gadgeteer/gadget/Reactor.cpp

    r20228 r21053  
    7676   { 
    7777      ready_nodes.reserve(num_events); 
    78  
    79       for ( vpr::Uint16 i = 0; i < mSelector.getNumHandles(); ++i ) 
     78      vpr::Uint16 event_mask; 
     79      const vpr::Uint16 num_handles = mSelector.getNumHandles(); 
     80      for ( vpr::Uint16 i = 0; i < num_handles; ++i ) 
    8081      { 
    81          vpr::IOSys::Handle h         = mSelector.getHandle(i); 
    82          const vpr::Uint16 event_mask = mSelector.getOut(h); 
     82         vpr::IOSys::Handle h = mSelector.getHandle(i); 
     83         event_mask = mSelector.getOut(h); 
    8384 
    8485         if ( 0 != event_mask ) 
  • juggler/trunk/modules/gadgeteer/plugins/ApplicationDataManager/ApplicationDataServer.cpp

    r20974 r21053  
    6161   mDataPacket->serialize(*mApplicationData); 
    6262 
    63    // We must update the size of the actual data that we are going to send 
    64    mDataPacket->getHeader()->setPacketLength(Header::RIM_PACKET_HEAD_SIZE  
    65                                     + mDataPacket->getData().size()); 
    66  
    67    // We must serialize the header again so that we can reset the size. 
    68    mDataPacket->getHeader()->serializeHeader(); 
    69  
    7063   cluster::ClusterManager::instance()->getNetwork()->sendToAll(mDataPacket); 
    7164} 
  • juggler/trunk/modules/gadgeteer/plugins/RIMPlugin/DeviceServer.cpp

    r20974 r21053  
    8484 
    8585   mDataPacket->serialize(*mDevice); 
    86  
    87    // We must update the size of the actual data that we are going to send 
    88    mDataPacket->getHeader()->setPacketLength( 
    89       cluster::Header::RIM_PACKET_HEAD_SIZE 
    90          + mDataPacket->getData().size() 
    91    ); 
    92  
    93    // We must serialize the header again so that we can reset the size. 
    94    mDataPacket->getHeader()->serializeHeader(); 
    9586} 
    9687