Changeset 21053
- Timestamp:
- 02/25/08 09:10:07 (8 months ago)
- Files:
-
- juggler/trunk/modules/gadgeteer/cluster/ClusterManager.cpp (modified) (2 diffs)
- juggler/trunk/modules/gadgeteer/cluster/Packets/ConfigPacket.cpp (modified) (1 diff)
- juggler/trunk/modules/gadgeteer/cluster/Packets/DataPacket.cpp (modified) (1 diff)
- juggler/trunk/modules/gadgeteer/cluster/Packets/DeviceAck.cpp (modified) (2 diffs)
- juggler/trunk/modules/gadgeteer/cluster/Packets/EndBlock.cpp (modified) (1 diff)
- juggler/trunk/modules/gadgeteer/cluster/Packets/Header.cpp (modified) (5 diffs)
- juggler/trunk/modules/gadgeteer/cluster/Packets/Header.h (modified) (2 diffs)
- juggler/trunk/modules/gadgeteer/cluster/Packets/Packet.cpp (modified) (1 diff)
- juggler/trunk/modules/gadgeteer/gadget/NetworkManager.cpp (modified) (5 diffs)
- juggler/trunk/modules/gadgeteer/gadget/NetworkManager.h (modified) (1 diff)
- juggler/trunk/modules/gadgeteer/gadget/Node.cpp (modified) (3 diffs)
- juggler/trunk/modules/gadgeteer/gadget/Node.h (modified) (2 diffs)
- juggler/trunk/modules/gadgeteer/gadget/Reactor.cpp (modified) (1 diff)
- juggler/trunk/modules/gadgeteer/plugins/ApplicationDataManager/ApplicationDataServer.cpp (modified) (1 diff)
- juggler/trunk/modules/gadgeteer/plugins/RIMPlugin/DeviceServer.cpp (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
juggler/trunk/modules/gadgeteer/cluster/ClusterManager.cpp
r20974 r21053 368 368 << clrOutBOLD( clrCYAN,"[ClusterManager]" ) 369 369 << " preDraw" << std::endl << vprDEBUG_FLUSH; 370 370 getNetwork()->corkNetwork(); 371 371 for ( plugin_list_t::iterator itr = mPlugins.begin(); itr != mPlugins.end(); itr++ ) 372 372 { … … 394 394 << clrOutBOLD( clrCYAN,"[ClusterManager]" ) 395 395 << " postPostFrame" << std::endl << vprDEBUG_FLUSH; 396 396 getNetwork()->corkNetwork(); 397 397 for ( plugin_list_t::iterator itr = mPlugins.begin(); itr != mPlugins.end(); itr++ ) 398 398 { juggler/trunk/modules/gadgeteer/cluster/Packets/ConfigPacket.cpp
r20974 r21053 72 72 mPacketWriter->setCurPos(0); 73 73 74 // Serialize the header.75 mHeader->serializeHeader();76 77 74 mPacketWriter->writeString(mConfig); 78 75 mPacketWriter->writeUint16(mType); 76 77 // Serialize the header. 78 mHeader->prependSerializedHeader(mPacketWriter); 79 79 } 80 80 juggler/trunk/modules/gadgeteer/cluster/Packets/DataPacket.cpp
r20974 r21053 77 77 void DataPacket::serialize(vpr::SerializableObject& object) 78 78 { 79 // Clear the data stream.79 // Clear data stream since header is at beginning 80 80 mPacketWriter->getData()->clear(); 81 mPacketWriter->setCurPos( 0);81 mPacketWriter->setCurPos( 0 ); 82 82 83 // Serialize the header.84 mHeader->serializeHeader();85 86 83 // Serialize plugin GUID. 87 84 mPluginId.writeObject(mPacketWriter); 88 85 89 86 // Serialize device GUID. 90 87 mObjectId.writeObject(mPacketWriter); 91 88 92 89 object.writeObject(mPacketWriter); 90 91 // Serialize the header. 92 mHeader->prependSerializedHeader(mPacketWriter); 93 93 } 94 94 juggler/trunk/modules/gadgeteer/cluster/Packets/DeviceAck.cpp
r20974 r21053 87 87 void DeviceAck::serialize() 88 88 { 89 // Clear the data stream.89 // Clear data stream since header is at beginning 90 90 mPacketWriter->getData()->clear(); 91 mPacketWriter->setCurPos( 0);91 mPacketWriter->setCurPos( 0 ); 92 92 93 // Serialize the header.94 mHeader->serializeHeader();95 96 93 // Serialize plugin GUID 97 94 mPluginId.writeObject(mPacketWriter); 98 95 99 96 // Serialize Device GUID 100 97 mId.writeObject(mPacketWriter); 101 98 102 99 // Serialize the Device Name 103 100 mPacketWriter->writeString(mDeviceName); 104 101 105 102 // Serialize the Base Type of the acknowledged device 106 103 mPacketWriter->writeString(mDeviceBaseType); … … 111 108 // Serialize the Ack boolean 112 109 mPacketWriter->writeBool(mAck); 110 111 // Serialize the header. 112 mHeader->prependSerializedHeader(mPacketWriter); 113 113 } 114 114 juggler/trunk/modules/gadgeteer/cluster/Packets/EndBlock.cpp
r20974 r21053 78 78 void EndBlock::serialize() 79 79 { 80 //mTempVar = Header::RIM_END_BLOCK; 81 // Clear the data stream. 80 // Clear data stream since header is at beginning 82 81 mPacketWriter->getData()->clear(); 83 mPacketWriter->setCurPos(0); 82 mPacketWriter->setCurPos( 0 ); 83 84 // Serialize the Temp Var 85 mPacketWriter->writeUint16(mTempVar); 84 86 85 87 // Serialize the header. 86 mHeader->serializeHeader(); 87 88 // Serialize the Temp Var 89 mPacketWriter->writeUint16(mTempVar); 88 mHeader->prependSerializedHeader(mPacketWriter); 90 89 } 91 90 juggler/trunk/modules/gadgeteer/cluster/Packets/Header.cpp
r20974 r21053 55 55 } 56 56 57 void Header::readData(vpr::SocketStream* stream )57 void Header::readData(vpr::SocketStream* stream, bool dumpHeader) 58 58 { 59 59 vprASSERT( NULL != stream && "Can not create a Header using a NULL SocketStream" ); … … 73 73 74 74 vpr::Uint32 bytes_read; 75 75 std::vector<vpr::Uint8> header_data; 76 76 try 77 77 { 78 bytes_read = stream->readn( mData, Header::RIM_PACKET_HEAD_SIZE);78 bytes_read = stream->readn(header_data, Header::RIM_PACKET_HEAD_SIZE); 79 79 } 80 80 catch (vpr::IOException& ex) … … 88 88 89 89 vprASSERT( RIM_PACKET_HEAD_SIZE == bytes_read && "Header::Header() - Bytes read != RIM_PACKET_HEAD_SIZE" ); 90 91 parseHeader(); 90 91 parseHeader(header_data); 92 93 if(dumpHeader) 94 { 95 std::cout << "Dumping Header(" << header_data.size() << " bytes): "; 96 for ( std::vector<vpr::Uint8>::const_iterator i = header_data.begin(); 97 i != header_data.end(); i++ ) 98 { 99 std::cout << (int)*i << " "; 100 } 101 std::cout << std::endl; 102 } 92 103 } 93 104 94 void Header::serializeHeader() 95 { 96 vpr::BufferObjectWriter writer(&mData); 97 writer.getData()->clear(); 98 writer.setCurPos( 0 ); 105 void Header::prependSerializedHeader(vpr::BufferObjectWriter* packetWriter) 106 { 107 // Set Packet length 108 setPacketLength( packetWriter->getData()->size() + RIM_PACKET_HEAD_SIZE ); 109 110 std::vector<vpr::Uint8> header_data; 111 vpr::BufferObjectWriter writer( &header_data ); 99 112 100 113 // Serialize all header data. … … 103 116 writer.writeUint32( mFrame ); 104 117 writer.writeUint32( mPacketLength ); 118 119 // Prepend header onto packets data 120 packetWriter->getData()->insert( packetWriter->getData()->begin(), 121 header_data.begin(), 122 header_data.end() ); 123 124 packetWriter->setCurPos( getPacketLength() ); 125 105 126 } 106 127 107 void Header::parseHeader( )128 void Header::parseHeader(std::vector<vpr::Uint8>& headerData) 108 129 { 109 vpr::BufferObjectReader reader( & mData );130 vpr::BufferObjectReader reader( &headerData ); 110 131 111 132 // Parse the incoming data. … … 125 146 throw cluster::ClusterException( "Header::parseHeader() - Invalid packet header!" ); 126 147 } 127 }128 129 void Header::send(vpr::SocketStream* socket) const130 {131 vprASSERT( NULL != socket && "Socket is NULL" );132 133 // Send the header data.134 try135 {136 socket->send(mData, RIM_PACKET_HEAD_SIZE);137 }138 catch (vpr::IOException& ex)139 {140 vprDEBUG( gadgetDBG_RIM, vprDBG_CRITICAL_LVL )141 << "Header::send() - Failed to send header."142 << ex.what()143 << std::endl << vprDEBUG_FLUSH;144 throw ex;145 }146 }147 148 void Header::dump() const149 {150 std::cout << "Dumping Header(" << mData.size() << " bytes): ";151 for ( std::vector<vpr::Uint8>::const_iterator i = mData.begin();152 i != mData.end(); i++ )153 {154 std::cout << (int)*i << " ";155 }156 std::cout << std::endl;157 148 } 158 149 juggler/trunk/modules/gadgeteer/cluster/Packets/Header.h
r20974 r21053 111 111 * to read from \p stream. 112 112 */ 113 void readData(vpr::SocketStream* stream );113 void readData(vpr::SocketStream* stream, bool dumpHeader=false); 114 114 115 void serializeHeader(); 116 117 void parseHeader(); 118 119 /** 120 * Writes the packet header data to the given socket. 121 * 122 * @pre \p socket is not NULL. 123 * @post The contents of \c mData (which is \c RIM_PACKET_HEADER_SIZE bytes 124 * long) are written to \p socket. 125 * 126 * @throw cluster::ClusterException is thrown if the packet header cannot 127 * be written to \p socket. 128 */ 129 void send(vpr::SocketStream* socket) const; 130 131 void dump() const; 115 void prependSerializedHeader(vpr::BufferObjectWriter* writer); 132 116 133 117 vpr::Uint16 getRIMCode() const … … 158 142 void printData( const int debug_level ) const; 159 143 protected: 160 std::vector<vpr::Uint8> mData;144 void parseHeader(std::vector<vpr::Uint8>& headerData); 161 145 162 146 vpr::Uint16 mRIMCode; juggler/trunk/modules/gadgeteer/cluster/Packets/Packet.cpp
r20974 r21053 62 62 void Packet::dump() const 63 63 { 64 if (NULL != mHeader.get())65 {66 mHeader->dump();67 }68 else69 {70 std::cout << "Could not dump Header since it is NULL!" << std::endl;71 }72 64 std::cout << "Dumping Packet(" << mData.size() << " bytes): "; 73 65 for ( std::vector<vpr::Uint8>::const_iterator i = mData.begin(); juggler/trunk/modules/gadgeteer/gadget/NetworkManager.cpp
r20974 r21053 304 304 setAllUpdated(false); 305 305 size_t num_nodes = sendEndBlocks(temp); 306 uncorkNetwork(); 306 307 updateAllNodes(num_nodes); 307 308 vpr::prof::stop(); … … 401 402 402 403 vpr::prof::start("ClusterManager::updateAllNodes()",10); 404 std::vector<gadget::NodePtr> ready_nodes; 403 405 while ( completed_nodes != numNodes ) 404 406 { 405 std::vector<gadget::NodePtr> ready_nodes;406 407 407 try 408 408 { … … 520 520 // -Create a new Node using the given information 521 521 // -Add the new node to the NetworkManager 522 522 523 523 vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL ) 524 524 << clrOutBOLD(clrBLUE,"[NetworkManager]") … … 528 528 NodePtr temp_node = Node::create(name, hostName, port, socketStream); 529 529 mNodes.push_back( temp_node ); 530 530 531 531 return true; 532 532 } … … 673 673 } 674 674 675 void NetworkManager::corkNetwork() 676 { 677 for (node_list_t::iterator itr = mNodes.begin(); itr != mNodes.end(); itr++) 678 { 679 (*itr)->setNoPush(true); 680 //(*itr)->setNoDelay(false); 681 } 682 683 } 684 685 void NetworkManager::uncorkNetwork() 686 { 687 vpr::prof::start("ClusterManager::uncorkNetwork()",10); 688 for (node_list_t::iterator itr = mNodes.begin(); itr != mNodes.end(); itr++) 689 { 690 (*itr)->setNoPush(false); 691 //(*itr)->setNoDelay(true); 692 } 693 vpr::prof::stop(); 694 } 695 696 675 697 bool NetworkManager::connectTo(NodePtr node) 676 698 { juggler/trunk/modules/gadgeteer/gadget/NetworkManager.h
r20974 r21053 84 84 void update( const int temp); 85 85 void barrier( bool master ); 86 87 /** 88 * Optimize network traffic by gathering write calls. 89 * uncorkNetwork() must be called once write calls are finished. 90 */ 91 inline void corkNetwork(); 92 93 /** 94 * Flush pending writes to the network. 95 */ 96 inline void uncorkNetwork(); 86 97 87 98 private: juggler/trunk/modules/gadgeteer/gadget/Node.cpp
r20974 r21053 160 160 } 161 161 162 void Node::setNoPush(bool enable) 163 { 164 mSockStream->setNoPush(enable); 165 } 166 167 void Node::setNoDelay(bool enable) 168 { 169 mSockStream->setNoDelay(enable); 170 } 171 162 172 bool Node::send(cluster::PacketPtr outPacket) 163 173 { … … 166 176 vpr::Guard<vpr::Mutex> guard(mSockWriteLock); 167 177 168 cluster::HeaderPtr header = outPacket->getHeader();169 170 vprASSERT(NULL != header.get() && "Node::send() - Can't have a NULL header.");171 178 vprASSERT(NULL != mSockStream && "Node::send() - SocketStream can't be NULL"); 172 179 … … 175 182 try 176 183 { 177 header->send(mSockStream); 184 mSockStream->send( outPacket->getData(), 185 outPacket->getHeader()->getPacketLength()); 178 186 } 179 187 catch (vpr::IOException&) 180 188 { 181 189 // TODO: setCause(ex) 182 throw cluster::ClusterException("Packet::recv() - Sending Header Data failed!"); 183 } 184 185 // Early out if we only have a header. 186 if(header->getPacketLength() == cluster::Header::RIM_PACKET_HEAD_SIZE) 187 { 188 return true; 189 } 190 191 try 192 { 193 mSockStream->send(outPacket->getData(), 194 header->getPacketLength() - cluster::Header::RIM_PACKET_HEAD_SIZE); 195 } 196 catch (vpr::IOException&) 197 { 198 // TODO: setCause(ex) 199 throw cluster::ClusterException("Packet::recv() - Sending data packet failed!!"); 190 throw cluster::ClusterException("Packet::send() - Sending Data failed!"); 200 191 } 201 192 juggler/trunk/modules/gadgeteer/gadget/Node.h
r20974 r21053 120 120 return mHostname; 121 121 } 122 123 void setNoDelay(bool enable); 122 124 123 125 /** … … 152 154 return (CONNECTED == getStatus()); 153 155 } 156 157 void setNoPush(bool enable); 154 158 155 159 /** juggler/trunk/modules/gadgeteer/gadget/Reactor.cpp
r20228 r21053 76 76 { 77 77 ready_nodes.reserve(num_events); 78 79 for ( vpr::Uint16 i = 0; i < mSelector.getNumHandles(); ++i ) 78 vpr::Uint16 event_mask; 79 const vpr::Uint16 num_handles = mSelector.getNumHandles(); 80 for ( vpr::Uint16 i = 0; i < num_handles; ++i ) 80 81 { 81 vpr::IOSys::Handle h = mSelector.getHandle(i);82 const vpr::Uint16event_mask = mSelector.getOut(h);82 vpr::IOSys::Handle h = mSelector.getHandle(i); 83 event_mask = mSelector.getOut(h); 83 84 84 85 if ( 0 != event_mask ) juggler/trunk/modules/gadgeteer/plugins/ApplicationDataManager/ApplicationDataServer.cpp
r20974 r21053 61 61 mDataPacket->serialize(*mApplicationData); 62 62 63 // We must update the size of the actual data that we are going to send64 mDataPacket->getHeader()->setPacketLength(Header::RIM_PACKET_HEAD_SIZE65 + mDataPacket->getData().size());66 67 // We must serialize the header again so that we can reset the size.68 mDataPacket->getHeader()->serializeHeader();69 70 63 cluster::ClusterManager::instance()->getNetwork()->sendToAll(mDataPacket); 71 64 } juggler/trunk/modules/gadgeteer/plugins/RIMPlugin/DeviceServer.cpp
r20974 r21053 84 84 85 85 mDataPacket->serialize(*mDevice); 86 87 // We must update the size of the actual data that we are going to send88 mDataPacket->getHeader()->setPacketLength(89 cluster::Header::RIM_PACKET_HEAD_SIZE90 + mDataPacket->getData().size()91 );92 93 // We must serialize the header again so that we can reset the size.94 mDataPacket->getHeader()->serializeHeader();95 86 } 96 87
