root/juggler/branches/2.2/modules/gadgeteer/gadget/AbstractNetworkManager.cpp

Revision 20317, 19.1 kB (checked in by aronb, 1 year ago)

MF20 r20316: Hack to ensure that we don't start listening for connections until we have fully configured all other nodes.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1 /*************** <auto-copyright.pl BEGIN do not edit this line> **************
2  *
3  * VR Juggler is (C) Copyright 1998-2007 by Iowa State University
4  *
5  * Original Authors:
6  *   Allen Bierbaum, Christopher Just,
7  *   Patrick Hartling, Kevin Meinert,
8  *   Carolina Cruz-Neira, Albert Baker
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  *
25  *************** <auto-copyright.pl END do not edit this line> ***************/
26
27 #include <gadget/gadgetConfig.h>
28
29 #include <algorithm>
30 #include <iomanip>
31
32 #include <vpr/IO/Socket/InetAddr.h>
33
34 #include <cluster/Packets/Header.h>
35 #include <cluster/Packets/Packet.h>
36 #include <cluster/Packets/PacketFactory.h>
37
38 #include <gadget/Node.h>
39 #include <gadget/PacketHandler.h>
40 #include <gadget/Util/Debug.h>
41
42 #include <jccl/Config/ConfigElement.h>
43 #include <jccl/RTRC/ConfigManager.h>
44
45 #include <gadget/AbstractNetworkManager.h>
46
47
48 namespace gadget
49 {
50    AbstractNetworkManager::AbstractNetworkManager()
51       : mNodes(0), mHandlerMap()
52    {;}
53
54    AbstractNetworkManager::~AbstractNetworkManager()
55    {
56       shutdown();
57    }
58
59    /**
60     * Determine if the given hostname matches the local machine's hostname.
61     */
62    bool AbstractNetworkManager::isLocalHost(const std::string& testHostName)
63    {
64       // Resolve the address of the hostname given to test.
65       vpr::InetAddr remote_address;
66       remote_address.setAddress(testHostName, 0);
67
68       vprDEBUG(gadgetDBG_NET_MGR,vprDBG_HVERB_LVL)
69          << "+======= Resolved addresses for given hostname ======+"
70          << std::endl << vprDEBUG_FLUSH;
71
72       std::string remote_hostname;
73       try
74       {
75          remote_hostname = remote_address.getHostname();
76       }
77       catch(std::exception&)
78       {
79          remote_hostname = "Error resolving remote hostname";
80       }
81      
82       vprDEBUG( gadgetDBG_NET_MGR, vprDBG_HVERB_LVL )
83             << "| Name: " << std::setw(25) << remote_hostname << " | "
84             << std::setw(16) << remote_address.getAddressString() << " | "
85             << std::endl << vprDEBUG_FLUSH;
86
87       // Try to get all local interfaces.
88       //   - Fall back on local host
89       std::vector<vpr::InetAddr> local_interfaces;
90       try
91       {
92          local_interfaces = vpr::InetAddr::getAllLocalAddrs();
93       }
94       catch (vpr::Exception& ex)
95       {
96          vprDEBUG(gadgetDBG_NET_MGR, vprDBG_WARNING_LVL)
97             << clrOutBOLD(clrYELLOW, "WARNING: ")
98             << "Failed to get list of local interfaces: "
99             << ex.getExtendedDescription()
100             << std::endl << vprDEBUG_FLUSH;
101          try
102          {
103             vpr::InetAddr local_host = vpr::InetAddr::getLocalHost();
104             local_interfaces.push_back(local_host);
105          }
106          catch (vpr::Exception& ex)
107          {
108             vprDEBUG(gadgetDBG_NET_MGR, vprDBG_WARNING_LVL)
109                << clrOutBOLD(clrYELLOW, "WARNING: ")
110                << "Failed to get local host: "
111                << ex.getExtendedDescription()
112                << std::endl << vprDEBUG_FLUSH;
113          }
114       }
115
116       // Print debug information about all local interfaces.
117       vprDEBUG( gadgetDBG_NET_MGR, vprDBG_HVERB_LVL )
118          << "+================= Local interfaces =================+"
119          << std::endl << vprDEBUG_FLUSH;
120       for (std::vector<vpr::InetAddr>::iterator itr = local_interfaces.begin() ; itr != local_interfaces.end() ; ++itr)
121       {
122          std::string temp_hostname;
123          try
124          {
125             temp_hostname = (*itr).getHostname();
126          }
127          catch(std::exception&)
128          {
129             temp_hostname = "Error getting hostname";
130          }
131          vprDEBUG( gadgetDBG_NET_MGR, vprDBG_HVERB_LVL )
132             << "| Name: " << std::setw(25) << temp_hostname << " | "
133             << std::setw(16) << (*itr).getAddressString() << " | "
134             << std::endl << vprDEBUG_FLUSH;
135       }
136
137       vprDEBUG( gadgetDBG_NET_MGR, vprDBG_HVERB_LVL )
138          << "+====================================================+"
139          << std::endl << vprDEBUG_FLUSH;
140
141       bool result(false);
142       for (std::vector<vpr::InetAddr>::iterator itr = local_interfaces.begin() ; itr != local_interfaces.end() ; ++itr)
143       {
144          if ((*itr).getAddressValue() == remote_address.getAddressValue())
145          {
146             vprDEBUG( gadgetDBG_NET_MGR, vprDBG_HVERB_LVL )
147                << "| We have a match.                  "
148                << std::setw(16) << (*itr).getAddressString() << " |"
149                << std::endl << vprDEBUG_FLUSH;
150             result = true;
151          }
152          else
153          {
154             vprDEBUG( gadgetDBG_NET_MGR, vprDBG_HVERB_LVL )
155                << "| NO match.                         "
156                << std::setw(16) << (*itr).getAddressString() << " |"
157                << std::endl << vprDEBUG_FLUSH;
158          }
159       }
160       vprDEBUG( gadgetDBG_NET_MGR, vprDBG_HVERB_LVL )
161          << "+====================================================+"
162          << std::endl << vprDEBUG_FLUSH;
163
164       return result;
165    }
166
167
168    void AbstractNetworkManager::handlePacket(cluster::Packet* packet, Node* node)
169    {
170       // If the ClusterManager should handle this packet, then do so.
171       if (packet->getPacketType() == cluster::Header::RIM_END_BLOCK)
172       {
173          // -Set New State
174          if (node == NULL)
175          {
176             return;
177          }
178
179          node->setUpdated( true );
180          return;
181       }
182       else if (packet->getPacketType() == cluster::Header::RIM_CONNECTION_REQ ||
183                packet->getPacketType() == cluster::Header::RIM_CONNECTION_ACK)
184       {
185          //handleLocalPacket(packet, node);
186         
187          vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CRITICAL_LVL )
188             << clrOutBOLD( clrRED, "[ERROR] " )
189             << "RIM_CONNECTION_REQ & RIM_CONNECTION_ACK data packet types are depreciated."
190             << std::endl << vprDEBUG_FLUSH;
191          return;
192       }
193
194       vpr::GUID handler_guid = packet->getPluginId();
195
196       PacketHandler* temp_handler = getHandlerByGUID( handler_guid );
197
198       if (NULL != temp_handler)
199       {
200          vprDEBUG( gadgetDBG_NET_MGR, vprDBG_HVERB_LVL )
201             << clrOutBOLD(clrMAGENTA,"[AbstractNetworkManager]")
202             << " Handler \"" << temp_handler->getHandlerName() << "\" will handle this packet."
203             << std::endl << vprDEBUG_FLUSH;
204
205
206          temp_handler->handlePacket( packet, node );
207       }
208       else
209       {
210          vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
211             << clrOutBOLD( clrMAGENTA, "[AbstractNetworkManager]" )
212             << " Handler " << handler_guid.toString() << " does not exist to handle this packet."
213             << std::endl << vprDEBUG_FLUSH;
214       }
215    }
216
217    bool AbstractNetworkManager::addNode(const std::string& name,
218                                         const std::string& host_name,
219                                         const vpr::Uint16& port,
220                                         vpr::SocketStream* socketStream)
221    {
222       // -Create a new Node using the given information
223       // -Add the new node to the AbstractNetworkManager
224       
225       vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
226          << clrOutBOLD(clrMAGENTA,"[AbstractNetworkManager]")
227          << " Adding node: " << name
228          << std::endl << vprDEBUG_FLUSH;
229
230       Node* temp_node = new Node(name, host_name, port, socketStream, this);
231       mNodes.push_back( temp_node );
232      
233       return true;
234    }
235
236    void AbstractNetworkManager::addNode(Node* node)
237    {
238       // -Add the given node to the AbstractNetworkManager
239       
240       vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
241          << clrOutBOLD( clrMAGENTA, "[AbstractNetworkManager]" )
242          << " Adding node: " << node->getName()
243          << std::endl << vprDEBUG_FLUSH;
244
245       mNodes.push_back( node );
246    }
247
248    struct HostnamePred
249    {
250       HostnamePred(const std::string& nodeHostname)
251          : mHostname(nodeHostname)
252       {
253          /* Do nothing. */ ;
254       }
255
256       bool operator()(gadget::Node* n)
257       {
258          return n->getHostname() == mHostname;
259       }
260
261       const std::string& mHostname;
262    };
263
264    void AbstractNetworkManager::removeNode(const std::string& nodeHostname)
265    {
266       vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
267          << clrOutBOLD( clrMAGENTA, "[AbstractNetworkManager]" )
268          << " Removing node: " << nodeHostname
269          << std::endl << vprDEBUG_FLUSH;
270
271       typedef std::vector<gadget::Node*>::iterator iter_t;
272       HostnamePred pred(nodeHostname);
273       iter_t n = std::find_if(mNodes.begin(), mNodes.end(), pred);
274
275       if ( n != mNodes.end() )
276       {
277          mReactor.removeNode(*n);
278          mNodes.erase(n);
279       }
280    }
281    
282    Node* AbstractNetworkManager::getNodeByHostname(const std::string& host_name)
283    {
284       vpr::DebugOutputGuard dbg_output( gadgetDBG_NET_MGR, vprDBG_VERB_LVL,
285          std::string("-------- getNodeByHostname() --------\n"),
286          std::string("--------------------------------------------\n"));
287
288       vpr::InetAddr searching_for_node;
289       searching_for_node.setAddress( host_name, 0 );
290
291       vprDEBUG( gadgetDBG_NET_MGR, vprDBG_VERB_LVL )
292          << clrOutBOLD( clrMAGENTA, "[AbstractNetworkManager]" )
293          << " Looking for Node with hostname: " << host_name
294          << std::endl << vprDEBUG_FLUSH;
295  
296          
297       // -Find Node with given hostname and return a pointer to it.
298       // -If we do not find one, return NULL
299       for (std::vector<Node*>::iterator i = mNodes.begin();
300            i != mNodes.end() ; i++)
301       {
302          vpr::InetAddr testing_node;
303          testing_node.setAddress( (*i)->getHostname(), 0 );
304          
305          vprDEBUG( gadgetDBG_NET_MGR, vprDBG_VERB_LVL )
306             << clrOutBOLD( clrMAGENTA, "[AbstractNetworkManager]" )
307             << " Searcing for: " << searching_for_node.getAddressString()
308             << " Testing: " << testing_node.getAddressString()
309             << std::endl << vprDEBUG_FLUSH;
310
311          if (searching_for_node.getAddressString() == testing_node.getAddressString())
312          {
313             return(*i);
314          }
315       }
316
317       return(NULL);
318    }
319
320    Node* AbstractNetworkManager::getNodeByName(const std::string& node_name)
321    {
322       // -Find Node with given name and return a pointer to it.
323       // -If we do not find one, return NULL
324       for (std::vector<Node*>::iterator i = mNodes.begin();
325            i != mNodes.end() ; i++)
326       {
327          if ((*i)->getName() == node_name)
328          {
329             return *i;
330          }
331       }
332
333       return NULL;
334    }
335
336    vpr::Uint16 AbstractNetworkManager::getNumPendingNodes()
337    {
338       int num_pending = 0;
339      
340       for (std::vector<Node*>::iterator i = mNodes.begin();
341            i != mNodes.end() ; i++)
342       {
343          if (Node::PENDING == (*i)->getStatus())
344          {
345             ++num_pending;
346          }
347       }
348
349       return num_pending;
350    }
351
352    bool AbstractNetworkManager::attemptPendingNodes()
353    {
354       bool ret_val = false;
355      
356       for (std::vector<Node*>::iterator i = mNodes.begin();
357            i != mNodes.end() ; i++)
358       {
359          if ( Node::PENDING == (*i)->getStatus() )
360          {
361             if ( attemptConnect(*i) )
362             {
363                // If any of the nodes were successful connecting
364                // then we should return true
365                ret_val = true;
366             }
367          }
368       }
369
370       return ret_val;
371    }
372
373    void AbstractNetworkManager::shutdown()
374    {
375       for (std::vector<Node*>::iterator j = mNodes.begin(); j != mNodes.end(); j++)
376       {
377           (*j)->shutdown();
378       }
379    }
380
381
382    void AbstractNetworkManager::debugDumpNodes(int debug_level)
383    {
384       vpr::DebugOutputGuard dbg_output(gadgetDBG_NET_MGR,debug_level,
385          std::string("-------------- Cluster Network --------------\n"),
386          std::string("---------------------------------------------\n"));
387       for(std::vector<Node*>::iterator j = mNodes.begin(); j != mNodes.end(); j++)
388       {
389          (*j)->debugDump( debug_level );
390       }
391    }
392    
393    bool AbstractNetworkManager::recognizeClusterMachineConfig(jccl::ConfigElementPtr element)
394    {
395       return (element->getID() == getClusterNodeElementType());
396    }
397
398    bool AbstractNetworkManager::configCanHandle(jccl::ConfigElementPtr element)
399    {
400        return recognizeClusterMachineConfig(element);
401    }
402
403    struct ElementPred
404    {
405       ElementPred(const std::string& type)
406          : mType(type)
407       {
408          /* Do nothing. */ ;
409       }
410
411       bool operator()(const jccl::ConfigManager::PendingElement& p)
412       {
413          return p.mElement->getID() == mType;
414       }
415
416       const std::string mType;
417    };
418
419    bool AbstractNetworkManager::configAdd(jccl::ConfigElementPtr element)
420    {
421       if (recognizeClusterMachineConfig(element))
422       {
423          // -If local machine element
424          //   -Add machine specific ConfigElements to the pending list.
425          //   -Start Listening thread
426          // -Else
427          //   -Add Node to AbstractNetworkManager
428
429          if (isLocalHost( element->getProperty<std::string>( "host_name" ) ))
430          {
431             // XXX: Hack to ensure that we don't start listening for connections until
432             //      we have fully configured all other nodes.
433             ElementPred pred(getClusterNodeElementType());
434
435             jccl::ConfigManager* cfg_mgr = jccl::ConfigManager::instance();
436             unsigned int num_pending_nodes =
437                std::count_if(cfg_mgr->getPendingBegin(), cfg_mgr->getPendingEnd(), pred);
438
439             if (num_pending_nodes > 1)
440             {
441                vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL ) << clrSetBOLD(clrRED)
442                   << clrOutBOLD( clrMAGENTA,"[AbstractNetworkManager]" )
443                   << " Some nodes not configured yet: " << num_pending_nodes
444                   << clrRESET << std::endl << vprDEBUG_FLUSH;
445                return false;
446             }
447
448             // NOTE: Add all machine dependent ConfigElementPtr's here
449             vprASSERT( element->getNum("display_system") == 1
450                && "A Cluster System element must have exactly 1 display_system element" );
451
452             std::vector<jccl::ConfigElementPtr> cluster_node_elements =
453                element->getChildElements();
454
455             for (std::vector<jccl::ConfigElementPtr>::iterator i = cluster_node_elements.begin();
456                  i != cluster_node_elements.end();
457                  ++i)
458             {
459                jccl::ConfigManager::instance()->addConfigElement(*i, jccl::ConfigManager::PendingElement::ADD);
460
461                vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL ) << clrSetBOLD(clrCYAN)
462                   << clrOutBOLD( clrMAGENTA,"[AbstractNetworkManager]" )
463                   << " Adding Machine specific ConfigElement: "
464                   << (*i)->getName() << clrRESET << std::endl << vprDEBUG_FLUSH;
465             }
466
467             const int listen_port = element->getProperty<int>( "listen_port" );
468             startListening( listen_port, false );
469          }
470          else
471          {
472             vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
473                << clrOutBOLD( clrMAGENTA, "[AbstractNetworkManager]" )
474                << " Adding Node: " << element->getName()
475                << " to the Cluster Network\n" << vprDEBUG_FLUSH;
476
477             std::string    name        = element->getName();
478             std::string    host_name   = element->getProperty<std::string>( "host_name" );
479             vpr::Uint16    listen_port = element->getProperty<int>( "listen_port" );
480
481             addNode(name, host_name, listen_port);
482          }
483          return true;
484       }
485       return false;
486    }
487
488    bool AbstractNetworkManager::configRemove(jccl::ConfigElementPtr element)
489    {
490       if (recognizeClusterMachineConfig( element ))
491       {
492          vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
493             << clrOutBOLD( clrMAGENTA, "[AbstractNetworkManager]" )
494             << " Removing the Node: " << element->getName()
495             << " from the Cluster Network\n" << vprDEBUG_FLUSH;
496          return true;
497       }
498       else
499       {
500          vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
501             << clrOutBOLD( clrMAGENTA, "[AbstractNetworkManager]" )
502             << " ERROR, Something is seriously wrong, we should never get here\n"
503             << vprDEBUG_FLUSH;
504          return false;
505       }
506    }
507
508    std::string AbstractNetworkManager::getClusterNodeElementType()
509    {
510       return "cluster_node";
511    }
512
513    void AbstractNetworkManager::updateNewConnections()
514    {
515       // This fuction is needed in order to gurantee that all cluster
516       // nodes become active at the same time, the beginning of the frame
517       //
518       // -Loop over all Nodes checking for new connections
519       // -If Node is a new connection
520       //   - Set it to connected
521
522       bool new_connection = false;
523      
524       for (std::vector<Node*>::iterator i = mNodes.begin() ;
525             i != mNodes.end() ; i++)
526       {
527          if ((*i)->getStatus() == Node::NEWCONNECTION)
528          {
529             vprDEBUG( gadgetDBG_NET_MGR, 0 )
530                << clrOutBOLD( clrMAGENTA, "[AbstractNetworkManager]" )
531                << " Node: " << (*i)->getName()
532                << " is now CONNECTED."
533                << std::endl << vprDEBUG_FLUSH;
534
535             new_connection = true;
536             (*i)->setStatus( Node::CONNECTED );
537             mReactor.addNode(*i);
538          }
539       }
540
541       // If we have pending nodes, attempt to connect to them.
542       if (getNumPendingNodes() > 0)
543       {
544          attemptPendingNodes();
545       }
546      
547       if (new_connection)
548       {
549          // - If the pending list is stale
550          //   - Refresh the list
551          if (jccl::ConfigManager::instance()->isPendingStale())
552          {
553             vprDEBUG( gadgetDBG_NET_MGR, vprDBG_STATE_LVL )
554                << clrOutBOLD( clrMAGENTA, "[AbstractNetworkManager]" )
555                << " New connections were made, so refresh the pending list."
556                << std::endl << vprDEBUG_FLUSH;
557             jccl::ConfigManager::instance()->refreshPendingList();
558          }
559
560          debugDumpNodes(0);
561       }
562    }
563    
564    PacketHandler* AbstractNetworkManager::getHandlerByGUID(const vpr::GUID& handler_guid)
565    {
566       std::map<vpr::GUID, PacketHandler*>::const_iterator i = mHandlerMap.find( handler_guid );
567       if (i != mHandlerMap.end())
568       {
569          return ((*i).second);
570       }
571       return NULL;
572    }
573    
574    /**
575     * Adds a new plugin to the ClusterManager.
576     */
577    void AbstractNetworkManager::addHandler(PacketHandler* new_handler)
578    {
579          std::pair<vpr::GUID, PacketHandler*> p
580             = std::make_pair( new_handler->getHandlerGUID(), new_handler );
581          mHandlerMap.insert( p );
582
583          vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
584             << clrOutBOLD( clrMAGENTA, "[Reactor] " )
585             << "Adding Handler: " << new_handler->getHandlerName() << std::endl << vprDEBUG_FLUSH;
586    }
587 } // end namespace gadget
588
Note: See TracBrowser for help on using the browser.