root/juggler/branches/2.2/modules/gadgeteer/gadget/Node.cpp

Revision 20200, 14.3 kB (checked in by aronb, 1 year ago)

Fix a few memory leaks.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1 /*************** <auto-copyright.pl BEGIN do not edit this line> **************
2  *
3  * VR Juggler is (C) Copyright 1998-2007 by Iowa State University
4  *
5  * Original Authors:
6  *   Allen Bierbaum, Christopher Just,
7  *   Patrick Hartling, Kevin Meinert,
8  *   Carolina Cruz-Neira, Albert Baker
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  *
25  *************** <auto-copyright.pl END do not edit this line> ***************/
26
27 #include <gadget/gadgetConfig.h>
28
29 #include <boost/bind.hpp>
30
31 //#include <vpr/vpr.h>
32 #include <vpr/IO/Socket/SocketStream.h>
33 //#include <jccl/Config/ConfigElement.h>
34 //#include <vpr/Util/Error.h>
35
36 #include <gadget/Node.h>
37 #include <gadget/AbstractNetworkManager.h>
38 #include <cluster/Packets/Packet.h>
39 #include <cluster/Packets/ConnectionAck.h>
40 #include <cluster/Packets/DataPacket.h>
41 #include <cluster/Packets/PacketFactory.h>
42 #include <cluster/ClusterManager.h>
43 #include <cluster/ClusterDelta.h>
44
45 #include <gadget/Util/Debug.h>
46 #include <gadget/InputManager.h>
47
48 #include <jccl/RTRC/ConfigManager.h>
49
50 namespace gadget
51 {
52
53 Node::Node(const std::string& name, const std::string& host_name,
54            const vpr::Uint16 port, vpr::SocketStream* socket_stream,
55            AbstractNetworkManager* net_mgr)
56    : mName(name)
57    , mHostname(host_name)
58    , mPort(port)
59    , mSockStream(socket_stream)
60    , mStatus(DISCONNECTED)
61    , mUpdated(false)
62    , mNetworkManager(net_mgr)
63 {
64    vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
65       << clrOutBOLD(clrBLUE,"[Node]")
66       << " Created a Node: " << name << " - " << host_name
67       << std::endl << vprDEBUG_FLUSH;
68 }
69
70 Node::~Node()
71 {
72    shutdown();
73 }
74
75 void Node::shutdown()
76 {
77    setStatus(DISCONNECTED);
78
79    if (NULL != mSockStream)
80    {
81       /*
82       if(mSockStream->isOpen())
83       {
84          mSockStream->close();
85       }
86       delete mSockStream;
87       mSockStream = NULL;
88       */
89    }
90 }
91
92 void Node::debugDump(int debug_level)
93 {
94
95    vpr::DebugOutputGuard dbg_output(gadgetDBG_NET_MGR, debug_level,
96                               std::string("-------------- Node --------------\n"),
97                               std::string("-----------------------------------------\n"));
98
99    vprDEBUG(gadgetDBG_NET_MGR, debug_level) << "Node Name: "
100       << mName << std::endl << vprDEBUG_FLUSH;
101    vprDEBUG(gadgetDBG_NET_MGR, debug_level) << "Hostname:  "
102       << mHostname << std::endl << vprDEBUG_FLUSH;
103    vprDEBUG(gadgetDBG_NET_MGR, debug_level) << "Port:      "
104       << mPort << std::endl << vprDEBUG_FLUSH;
105    vprDEBUG(gadgetDBG_NET_MGR, debug_level) << "SockStream "
106       << (NULL == mSockStream ? "is NULL" : "is NOT NULL") << std::endl << vprDEBUG_FLUSH;
107    if (CONNECTED == getStatus())
108    {
109       vprDEBUG(gadgetDBG_NET_MGR, debug_level) << clrOutBOLD(clrGREEN,"CONNECTED") << std::endl << vprDEBUG_FLUSH;
110    }
111    else if (NEWCONNECTION == getStatus())
112    {
113       vprDEBUG(gadgetDBG_NET_MGR, debug_level) << clrOutBOLD(clrRED,"NEW CONNECTION") << std::endl << vprDEBUG_FLUSH;
114    }
115    else
116    {
117       vprDEBUG(gadgetDBG_NET_MGR, debug_level) << clrOutBOLD(clrRED,"DISCONNECTED") << std::endl << vprDEBUG_FLUSH;
118    }
119 }
120
121 void Node::printStats(int debug_level)
122 {
123    vpr::BaseIOStatsStrategy* stats = mSockStream->getIOStatStrategy();
124    vpr::BandwidthIOStatsStrategy* bw_interface = dynamic_cast<vpr::BandwidthIOStatsStrategy*>(stats );
125
126    if(bw_interface != NULL)
127    {
128       // Dump out write stats
129       vprDEBUG(gadgetDBG_RIM,debug_level) << "Socket Write bandwidth stats ---" << std::endl << vprDEBUG_FLUSH;
130       vprDEBUG(gadgetDBG_RIM,debug_level) << "stats type: " << typeid(stats).name() << std::endl << vprDEBUG_FLUSH;
131       vprDEBUG(gadgetDBG_RIM,debug_level) << "      sent bytes: " << bw_interface->writeStats().getTotal() << std::endl << vprDEBUG_FLUSH;
132       vprDEBUG(gadgetDBG_RIM,debug_level) << "         av send: " << bw_interface->writeStats().getMean()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
133       vprDEBUG(gadgetDBG_RIM,debug_level) << "        STA send: " << bw_interface->writeStats().getSTA()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
134       vprDEBUG(gadgetDBG_RIM,debug_level) << "       Inst send: " << bw_interface->writeStats().getInstAverage()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
135       vprDEBUG(gadgetDBG_RIM,debug_level) << "    Max STA send: " << bw_interface->writeStats().getMaxSTA()/1024.0f << " k/s" << std::endl << std::endl << vprDEBUG_FLUSH;
136
137       vprDEBUG(gadgetDBG_RIM,debug_level) << "      read bytes: " << bw_interface->readStats().getTotal() << std::endl << vprDEBUG_FLUSH;
138       vprDEBUG(gadgetDBG_RIM,debug_level) << "         av read: " << bw_interface->readStats().getMean()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
139       vprDEBUG(gadgetDBG_RIM,debug_level) << "        STA read: " << bw_interface->readStats().getSTA()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
140       vprDEBUG(gadgetDBG_RIM,debug_level) << "       Inst read: " << bw_interface->readStats().getInstAverage()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
141       vprDEBUG(gadgetDBG_RIM,debug_level) << "    Max STA read: " << bw_interface->readStats().getMaxSTA()/1024.0f << " k/s" << std::endl << vprDEBUG_FLUSH;
142
143    }
144    else
145    {
146       vprDEBUG(gadgetDBG_RIM,debug_level) << "SocketBWTest: Don't have BW Stats on stats. type is: " << typeid(stats).name() << std::endl << vprDEBUG_FLUSH;
147    }
148
149 }
150
151 void Node::setStatus(int connect)
152 {
153    vpr::Guard<vpr::Mutex> guard(mStatusLock);
154
155    if (mStatus == CONNECTED && connect == DISCONNECTED)
156    {
157       //TODO: ADD This back in SOON
158       //ClusterManager::instance()->recoverFromLostNode(this);
159    }
160
161    mStatus = connect;
162 }
163
164 void Node::doUpdate()
165 {
166    // - If connected() && !updated()
167    //   - try recvPacket()
168    //     - Catch ClusterException
169    //       - set not connected
170    //       - add node to connection pending list
171    //       - set reconfig needed on reconnect
172    //     - If no Exception
173    //       - Print Packet Data
174    //       - Take the action of the packet
175
176    vprASSERT(isConnected() && "Node is not connected, we can not update!\nWe must not be calling update from the correct location.");
177    cluster::Packet* temp_packet = NULL;
178
179    temp_packet = recvPacket();
180
181    // Print Packet Information
182    temp_packet->printData(vprDBG_CONFIG_LVL);
183
184    // Handle the packet correctly
185    mNetworkManager->handlePacket(temp_packet,this);
186
187    // Clean up after ourselves
188    delete temp_packet;
189 }
190
191 void Node::update()
192 {
193    mUpdated = false;
194
195    while ( ! mUpdated )
196    {
197       try
198       {
199          doUpdate();
200       }
201       catch (cluster::ClusterException& cluster_exception)
202       {
203          vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL) << clrSetBOLD(clrRED)
204             << cluster_exception.what() << clrRESET
205             << std::endl << vprDEBUG_FLUSH;
206          
207          vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
208             << "Node::update() We have lost our connection to: " << getName()
209             << ":" << getPort() << std::endl << vprDEBUG_FLUSH;
210
211          debugDump(vprDBG_CONFIG_LVL);
212          
213          // Set the Node as disconnected since we have lost the connection
214          setStatus(DISCONNECTED);
215       }
216    }
217 }
218
219 bool Node::send(cluster::Packet* out_packet)
220 {
221    vprASSERT(NULL != out_packet && "Can not send a NULL packet.");
222
223    vpr::Guard<vpr::Mutex> guard(mSockWriteLock);
224
225    // -Send header data
226    // -Send packet data
227
228    if (mSockStream == NULL)
229    {
230       vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
231          << clrOutBOLD(clrRED, "ERROR:")
232          << " SocketSteam is NULL" << std::endl << vprDEBUG_FLUSH;
233       throw cluster::ClusterException("Node::send() - SocketStream is NULL!");
234    }
235
236    cluster::Header* mHeader = out_packet->getHeader();
237
238    if (mHeader == NULL)
239    {
240       vprDEBUG(gadgetDBG_RIM,vprDBG_CONFIG_LVL)
241          << clrOutBOLD(clrRED, "ERROR:")
242          << " Packet Header is NULL" << std::endl << vprDEBUG_FLUSH;
243       throw cluster::ClusterException("Node::send() - Packet Header is NULL!");
244    }
245
246    try
247    {
248       mHeader->send(mSockStream);
249    }
250    catch (vpr::IOException&)
251    {
252       // TODO: setCause(ex)
253       throw cluster::ClusterException("Packet::recv() - Sending Header Data failed!");
254    }
255
256    if(mHeader->getPacketLength() == cluster::Header::RIM_PACKET_HEAD_SIZE)
257    {
258       return true;
259    }
260
261    // If we have a data packet we need to also send the raw data
262    if (out_packet->getPacketType() != cluster::Header::RIM_DATA_PACKET)
263    {
264       std::vector<vpr::Uint8>* packet_data = out_packet->getData();
265
266       try
267       {
268          mSockStream->send(*packet_data,
269             mHeader->getPacketLength() - cluster::Header::RIM_PACKET_HEAD_SIZE);
270       }
271       catch (vpr::IOException&)
272       {
273          // TODO: setCause(ex)
274          throw cluster::ClusterException("Packet::recv() - Sending data packet failed!!");
275       }
276    }
277    else
278    {
279       std::vector<vpr::Uint8>* packet_data = out_packet->getData();
280
281       // Since we are sending a DataPacket we are not actually sending all data here. We are only sending 2 GUIDs here
282       int size = 32;
283
284       try
285       {
286          mSockStream->send(*packet_data, size);
287       }
288       catch (vpr::IOException&)
289       {
290          // TODO: setCause(ex)
291          throw cluster::ClusterException("Packet::recv() - Sending packet failed!!");
292       }
293
294
295       cluster::DataPacket* temp_data_packet = dynamic_cast<cluster::DataPacket*>(out_packet);
296       vprASSERT(NULL != temp_data_packet && "Dynamic cast failed!");
297
298       // Testing GUIDs
299       /*vpr::BufferObjectReader* testing = new vpr::BufferObjectReader(packet_data);
300       vpr::GUID test;
301       test.readObject(testing);
302       vpr::GUID test2;
303       test2.readObject(testing);
304
305       std::cout << "1: " << test.toString() << " 2: " << test2.toString() << std::endl;
306
307       delete testing;
308
309       // Testing ID
310       testing = new vpr::BufferObjectReader(temp_data_packet->getDeviceData());
311       std::cout << "ID: " << (int)testing->readUint16() << std::endl;
312
313       delete testing;
314       */
315
316       try
317       {
318          mSockStream->send(*(temp_data_packet->getDeviceData()),
319                            temp_data_packet->getDeviceData()->size());
320       }
321       catch (vpr::IOException&)
322       {
323          // TODO: setCause(ex)
324          throw cluster::ClusterException("Packet::recv() - Sending Packet Data failed!!");
325       }
326    }
327    return true;
328 }
329
330 cluster::Packet* Node::recvPacket()
331 {
332    // - Read in header
333    // - Get Constructor for correct PacketType
334    // - Call constructor
335    // - Read in Packet data
336    // - Parse data into new packet
337    // - Return finished packet
338
339    vpr::Guard<vpr::Mutex> guard(mSockReadLock);
340
341    cluster::Header* packet_head = new cluster::Header();
342
343    try
344    {
345       packet_head->readData(mSockStream);
346    }
347    catch (vpr::IOException& ex)
348    {
349       vprDEBUG( gadgetDBG_RIM, vprDBG_HVERB_LVL )
350          << clrOutNORM(clrRED, "ERROR: ")
351          << "Node::recvPacket() Could not read the header from the socket." << std::endl
352          << ex.what() << std::endl << vprDEBUG_FLUSH;
353       throw ex;
354    }
355
356    vprDEBUG( gadgetDBG_RIM, vprDBG_HVERB_LVL )
357       << "Node::recvPacket() PacketFactory is trying to make a packet type: "
358       << packet_head->getPacketType()
359       << std::endl << vprDEBUG_FLUSH;
360
361    // Get Packet from factory
362    cluster::Packet* new_packet =
363       cluster::PacketFactory::instance()->createObject( packet_head->getPacketType() );
364
365    // Verify that the packet has been made
366    if ( NULL == new_packet )
367    {
368       throw cluster::ClusterException( "Node::recvPacket() - Packet was not found in Factory." );
369    }
370
371    // - Recv the packet data
372    //   - Copy over pointer to header
373    //   - Continue reading packet from socket
374
375    // Set the header for the new packet.
376    new_packet->setHeader( packet_head );
377    // Allocate memory for incoming packet.
378    std::vector<vpr::Uint8> incoming_data;
379
380    // Make sure that we are connected.
381    if ( NULL == mSockStream )
382    {
383       vprDEBUG( gadgetDBG_RIM, vprDBG_CRITICAL_LVL )
384          << clrOutBOLD( clrRED, "ERROR:" )
385          << " mSockSteam is NULL" <<  std::endl << vprDEBUG_FLUSH;
386       throw cluster::ClusterException( "Node::recvPacket::recv() - mSocketStream is NULL!" );
387    }
388    //else if (!mSockStream->isConnected())
389    //{
390    //   vprDEBUG( gadgetDBG_RIM, vprDBG_CRITICAL_LVL )
391    //      << clrOutBOLD( clrRED, "ERROR:" )
392    //      << " mSockSteam is not connected."
393    //      << std::endl << vprDEBUG_FLUSH;
394    //   throw cluster::ClusterException( "Node::recvPacket::recv() - ClusterNode is not connected!" );
395    //}
396    else
397    {
398       try
399       {
400          // Get packet data.
401          mSockStream->recvn(
402             incoming_data,
403             packet_head->getPacketLength() - cluster::Header::RIM_PACKET_HEAD_SIZE
404          );
405       }
406       catch (vpr::IOException&)
407       {
408          vprDEBUG( gadgetDBG_RIM, vprDBG_CONFIG_LVL )
409             << clrOutBOLD( clrRED, "ERROR:" )
410             << " Reading packet data failed. Expecting: "
411             << packet_head->getPacketLength() - cluster::Header::RIM_PACKET_HEAD_SIZE
412             << " bytes" << std::endl << vprDEBUG_FLUSH;
413
414          // TODO: setCause(ex)
415          throw cluster::ClusterException( "Node::recvPacket() - Reading packet data failed!" );
416       }
417    }
418
419    vpr::BufferObjectReader* reader = new vpr::BufferObjectReader( &incoming_data );
420
421    // Parse Packet with new data
422    new_packet->parse( reader );
423
424    delete reader;
425
426    //NOTE: incoming_data goes out of scope here which means that we are left with only the data that we parsed.
427    //TODO: We could save memory by not parsing the raw DataPacket but just passing the location of the memory that we want to use.
428
429    //parse_data_length = DataPacket::ParsedDataLength
430    //recvn(incoming_parse_data, ...)
431    //reader = new reader(incoming_parse_data);
432    //new_packet->parse(reader);
433    //recvn(incoming_raw_data, ...)
434    //new_packet->setRawData(incoming_raw_data);
435
436    return new_packet;
437 }
438
439 // end namespace gadget
440
Note: See TracBrowser for help on using the browser.