root/juggler/branches/2.2/modules/gadgeteer/gadget/Acceptor.cpp

Revision 20876, 13.0 kB (checked in by patrick, 9 months ago)

Ensure that the socket is open before trying to close it when handling
errors.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1 /*************** <auto-copyright.pl BEGIN do not edit this line> **************
2  *
3  * VR Juggler is (C) Copyright 1998-2007 by Iowa State University
4  *
5  * Original Authors:
6  *   Allen Bierbaum, Christopher Just,
7  *   Patrick Hartling, Kevin Meinert,
8  *   Carolina Cruz-Neira, Albert Baker
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  *
25  *************** <auto-copyright.pl END do not edit this line> ***************/
26
27 #include <gadget/gadgetConfig.h>
28
29 #include <boost/bind.hpp>
30
31 #include <gadget/Acceptor.h>
32 #include <gadget/Node.h>
33 #include <gadget/Util/Debug.h>
34
35 #include <cluster/Packets/ConnectionAck.h>
36 #include <cluster/Packets/Packet.h>
37 #include <cluster/ClusterDelta.h>
38
39 #include <vpr/IO/Socket/SocketStream.h>
40
41
42 namespace gadget
43 {
44    Acceptor::Acceptor( AbstractNetworkManager* network)
45       : mNetworkManager( network ), mAcceptThread( NULL )
46    {;}
47
48    Acceptor::~Acceptor()
49    {
50       shutdown();
51    }
52
53    bool Acceptor::startListening( const int& listen_port, bool accept_anonymous )
54    {
55       mAcceptAnonymous = accept_anonymous;
56
57       // If we haven't already started the listening thread
58       if ( mAcceptThread == NULL )
59       {
60          // If the listen port is valid
61          if ( listen_port > 0 )
62          {
63             mListenAddr.setPort( listen_port );
64
65             vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
66                << clrOutBOLD( clrMAGENTA, "[Acceptor]" )
67                << "Starting the listening thread...\n" << vprDEBUG_FLUSH;
68
69                // Start a thread to monitor port
70             try
71             {
72                mAcceptThread =
73                   new vpr::Thread(boost::bind(&Acceptor::acceptLoop, this));
74                vprASSERT(mAcceptThread->valid());
75
76                return true;
77             }
78             catch (vpr::Exception& ex)
79             {
80                vprDEBUG(gadgetDBG_NET_MGR, vprDBG_CRITICAL_LVL)
81                   << clrOutBOLD(clrRED, "ERROR")
82                   << ": Failed to spawn acceptor thread!\n" << vprDEBUG_FLUSH;
83                vprDEBUG_NEXT(gadgetDBG_NET_MGR, vprDBG_CRITICAL_LVL)
84                   << ex.what() << std::endl << vprDEBUG_FLUSH;
85
86                return false;
87             }
88          }
89          else
90          {
91             vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
92                << clrOutBOLD( clrMAGENTA, "[Acceptor]" )
93                << "startListening() Can NOT listen on port: "
94                << listen_port << "\n" << vprDEBUG_FLUSH;
95             return false;
96          }
97       }
98       else
99       {
100          vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
101                << clrOutBOLD( clrMAGENTA,"[Acceptor]" )
102                << "startListening() Listening thread already active."
103                << std::endl << vprDEBUG_FLUSH;
104          return false;
105       }
106    }
107
108    void Acceptor::acceptLoop()
109    {
110      
111       // - Create a socket to listen for incoming connections
112       // - Wait for new connections
113       //   - If successful
114       //     - Lock Node list
115       //       - If hostname does not exist in list
116       //         - Add new Node
117       //     - Unlock Node list
118       //         
119       //     - Lock pending list
120       //       - If hostname exists in pending list
121       //         - Remove Node from pending list
122       //     - Unlock pending list
123       //
124       //     - Lock Node list
125       //       - If Node is not connected
126       //         - Set socket stream
127       //         - Set NEWCONNECTION
128       //     - Unlock Node list
129       //
130       //     - Get Cluster Delta
131       //   - Else close and delete socket
132       
133       // Create a socket to listen for incoming connections
134       vpr::SocketStream sock( mListenAddr, vpr::InetAddr::AnyAddr );
135      
136       // Open in server mode.
137       try
138       {
139          // Open server enabling reuse of bound address.
140          sock.openServer(true);
141
142          vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
143          << clrOutBOLD( clrMAGENTA, "[Acceptor]" )
144          << " Listening on Port: " << mListenAddr.getPort()
145          << std::endl << vprDEBUG_FLUSH;
146       }
147       catch (vpr::IOException&)
148       {
149          vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CRITICAL_LVL )
150             << clrSetBOLD( clrRED ) << "[Acceptor]"
151             << " Unable to open listening socket on port: "
152             << mListenAddr.getPort() << std::endl
153             << clrRESET << vprDEBUG_FLUSH;
154
155          exit(0);
156       }
157
158       // Create a socketstream for new socket
159       vpr::SocketStream* client_sock = new vpr::SocketStream();
160
161       // Wait for an incoming connection.
162       bool mRunning = true;
163
164       while ( mRunning )
165       {
166          try
167          {
168             // Wait for a connection request.
169             sock.accept( *client_sock, vpr::Interval::NoTimeout );
170
171             vprDEBUG(gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL)
172                << clrOutBOLD(clrMAGENTA, "[Acceptor]")
173                << " Receiving a connection on Port: "
174                << mListenAddr.getPort() << std::endl << vprDEBUG_FLUSH;
175
176             // Optimize new socket for low latency communication
177             client_sock->setNoDelay( true );
178
179             // Get the hostname and port of the requesting host.
180             const std::string remote_host_name(
181                client_sock->getRemoteAddr().getHostname()
182             );
183             vpr::Uint16 port = client_sock->getRemoteAddr().getPort();
184
185             vprDEBUG( gadgetDBG_NET_MGR, vprDBG_CONFIG_LVL )
186                << clrOutBOLD( clrMAGENTA, "[Acceptor]" )
187                << " Received from: " << remote_host_name
188                << ":" << port << std::endl << vprDEBUG_FLUSH;
189
190             Node* remote_node = mNetworkManager->getNodeByHostname( remote_host_name );
191
192             if ( NULL == remote_node )
193             {
194                vprDEBUG( gadgetDBG_NET_MGR,vprDBG_VERB_LVL )
195                   << clrOutBOLD( clrMAGENTA,"[Acceptor]" )
196                   << " Nodes not configured yet."
197                   << std::endl << vprDEBUG_FLUSH;
198                
199                client_sock->close();
200                delete client_sock;
201                client_sock = new vpr::SocketStream;
202             }
203             else
204             {
205                // Get address information about local host.
206                const vpr::InetAddr local = vpr::InetAddr::getLocalHost();
207                const std::string local_hostname = local.getHostname();
208                cluster::ConnectionAck* temp = NULL;
209
210                if( remote_node->getStatus() == Node::CONNECTED ||
211                    remote_node->getStatus() == Node::NEWCONNECTION )
212                {
213                   vprDEBUG( gadgetDBG_NET_MGR,vprDBG_STATE_LVL )
214                      << clrOutBOLD( clrMAGENTA, "[Acceptor]" )
215                      << " Node is already connected, no need to respond to request."
216                      << std::endl << vprDEBUG_FLUSH;
217
218                   temp = new cluster::ConnectionAck( local_hostname, mListenAddr.getPort(), false );
219                }
220                else if ( Node::PENDING == remote_node->getStatus() )
221                {
222                   vprASSERT( NULL != remote_node && "Remote node must nut be equal to NULL" );
223                
224                   vprDEBUG( gadgetDBG_NET_MGR, vprDBG_STATE_LVL )
225                      << clrOutBOLD( clrMAGENTA, "[Acceptor]")
226                      << " Pending Node exists, we must decide which"
227                      << " side should initiate the connection."
228                      << std::endl << vprDEBUG_FLUSH;
229                  
230                   // Get address values to test.
231                   vpr::Uint32 remote_value = client_sock->getRemoteAddr().getAddressValue();
232                   vpr::Uint32 local_value = local.getAddressValue();
233                  
234                   vprDEBUG( gadgetDBG_NET_MGR, vprDBG_STATE_LVL )
235                      << "Remote: " << remote_value
236                      << std::endl << vprDEBUG_FLUSH;
237                  
238                   vprDEBUG( gadgetDBG_NET_MGR, vprDBG_STATE_LVL )
239                      << "Local: " << local_value
240                      << std::endl << vprDEBUG_FLUSH;
241
242                   // If Node has an address value less than mine
243                   if ( remote_value < local_value )
244                   {
245                      vprDEBUG( gadgetDBG_NET_MGR, vprDBG_STATE_LVL )
246                         << "Result: (remote address < local address)"
247                         << " Create NACK" << std::endl << vprDEBUG_FLUSH;
248                      
249                      // Create NACK
250                      temp = new cluster::ConnectionAck( local_hostname, mListenAddr.getPort(), false );
251                   }
252                   else
253                   {
254                      vprDEBUG( gadgetDBG_NET_MGR, vprDBG_STATE_LVL )
255                         << "Result: (remote address >= local address)"
256                         << "Create ACK" << std::endl << vprDEBUG_FLUSH;
257
258                      temp = new cluster::ConnectionAck( local_hostname, mListenAddr.getPort(), true );
259                  }
260                }
261                else
262                {
263                   vprDEBUG( gadgetDBG_NET_MGR, vprDBG_STATE_LVL )
264                      << clrOutBOLD( clrMAGENTA, "[Acceptor]" )
265                      << " Node is not pending, create ACK."
266                      << std::endl << vprDEBUG_FLUSH;
267                      
268                   // Create an ACK since we do not depend on it
269                   temp = new cluster::ConnectionAck( local_hostname, mListenAddr.getPort(), true );
270                }
271                  
272                vprDEBUG( gadgetDBG_NET_MGR, vprDBG_STATE_LVL )
273                   << clrOutBOLD( clrMAGENTA,"[Acceptor]" )
274                   << " Set SockStream and send responce."
275                   << std::endl << vprDEBUG_FLUSH;
276                
277                vpr::SocketStream* old_stream = remote_node->getSockStream();
278                remote_node->setSockStream( client_sock );
279                remote_node->send( temp );
280                remote_node->setSockStream( old_stream );
281
282                if ( temp->getAck() )
283                {
284                   vprDEBUG( gadgetDBG_NET_MGR,vprDBG_STATE_LVL )
285                      << clrOutBOLD( clrMAGENTA,"[Acceptor]" )
286                      << " Set new Node as a NEWCONNECTION."
287                      << std::endl << vprDEBUG_FLUSH;
288
289                   remote_node->setSockStream( client_sock );
290                  
291                   // Since we have just recieved a new connection,
292                   // set the connected status as so. We are not
293                   // in a fully connected state until the begining
294                   // of the next frame in Acceptor::updateNewConnetions()
295                   // this is becuase we only want to start using a
296                   // new connection at the start of a new frame.
297                   remote_node->setStatus( Node::NEWCONNECTION );
298                   // Print the new state information about this node.
299                   remote_node->debugDump( vprDBG_CONFIG_LVL );
300                  
301                   // XXX: We need to fix this in the near future.
302                   //ClusterDelta cluster_delta;
303                   //cluster_delta.clientClusterDelta(requesting_node->getSockStream());
304                }
305                else
306                {
307                   vprDEBUG( gadgetDBG_NET_MGR,vprDBG_STATE_LVL )
308                      << clrOutBOLD( clrMAGENTA,"[Acceptor]" )
309                      << " Delete unused client sock."
310                      << std::endl << vprDEBUG_FLUSH;
311                  
312                   client_sock->close();
313                   remote_node->setSockStream( NULL );
314                   delete client_sock;
315                }
316
317                // We need to create a new SocketStream since the to
318                // hold the value of the next recieved socketstream since
319                // the old one is now being used by the new Node
320                client_sock = new vpr::SocketStream;
321             }
322          }
323          catch (vpr::IOException&)
324          {
325             if ( client_sock->isOpen() )
326             {
327                client_sock->close();
328             }
329
330             delete client_sock;
331             client_sock = new vpr::SocketStream;
332          }
333       }   // end infinite while
334    }
335
336    void Acceptor::shutdown()
337    {
338       // Kill thread used to listen for incoming
339       // connection requests
340
341       // TODO: Make this actually shutdown the Accepting thread, this will require
342       //       non blocking accept calls.
343       if ( mAcceptThread )
344       {
345          mAcceptThread->kill();
346          delete mAcceptThread;
347          mAcceptThread = NULL;
348       }
349    }
350 } // end namespace gadget
351
Note: See TracBrowser for help on using the browser.