root/juggler/branches/2.2/modules/gadgeteer/cluster/ClusterDelta.cpp

Revision 19847, 7.6 kB (checked in by patrick, 2 years ago)

Updated for VPR 1.1.42.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1 /*************** <auto-copyright.pl BEGIN do not edit this line> **************
2  *
3  * VR Juggler is (C) Copyright 1998-2007 by Iowa State University
4  *
5  * Original Authors:
6  *   Allen Bierbaum, Christopher Just,
7  *   Patrick Hartling, Kevin Meinert,
8  *   Carolina Cruz-Neira, Albert Baker
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  *
25  *************** <auto-copyright.pl END do not edit this line> ***************/
26
27 #include <gadget/gadgetConfig.h>
28 #include <gadget/Util/Debug.h>
29
30 #include <vector>
31
32 #include <boost/static_assert.hpp>
33 #include <vpr/IO/Socket/SocketStream.h>
34 #include <vpr/IO/BufferObjectWriter.h>
35 #include <vpr/IO/BufferObjectReader.h>
36 #include <cluster/ClusterDelta.h>
37 #include <gmtl/Math.h>
38
39
40 namespace cluster
41 {
42
43    ClusterDelta::ClusterDelta() : syncPacket(12)
44    {
45       mTol = 2;
46       mAccept = false;
47       mReader = new vpr::BufferObjectReader(&syncPacket);
48    }
49
50    void ClusterDelta::clientClusterDelta(vpr::SocketStream* socket_stream)
51    {
52       mSocketStream = socket_stream;
53       while ( mAccept == false )
54       {
55          clearIntervals();
56          receiveHandshake();
57          createHandshake();
58          receiveExpectedTime();
59          createResponce();
60       }
61    }
62    vpr::Interval ClusterDelta::getClusterDelta(vpr::SocketStream* socket_stream)
63    {
64       mSocketStream = socket_stream;
65       while ( mAccept == false )
66       {
67          clearIntervals();
68          createHandshake();
69          receiveHandshake();
70          createExpectedTime();
71          receiveResponce();
72       }
73       return(mDelta);
74    }
75
76    // Sent on initital handshake
77    void ClusterDelta::createHandshake()
78    {
79       mLocalSendTime.setNow();
80          // If this is not the first handshake, then actually send the last receive time.
81       //if ( mLocalReceiveTime.getBaseVal() != 0 )
82       //{
83       //   mWriter.writeUint64(mLocalReceiveTime.getBaseVal());
84       //               vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Send Handshake: " << mLocalReceiveTime.getBaseVal() << "\n" << vprDEBUG_FLUSH;
85       //}
86       //else
87       //{
88          mWriter.writeUint64(mLocalSendTime.getBaseVal());
89                      vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Send Handshake: " << mLocalSendTime.getBaseVal() << "\n" << vprDEBUG_FLUSH;
90       //}
91       sendAndClear();
92    }
93    void ClusterDelta::receiveHandshake()
94    {
95       getPacket(8);
96       mLocalReceiveTime.setNow();
97       vpr::Uint64 temp =  mReader->readUint64();
98                      vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Received Time: " << temp << "\n" << vprDEBUG_FLUSH;
99       mRemoteSendTime.set(temp, vpr::Interval::Base);
100                      vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Received Time: " << mRemoteSendTime.getBaseVal() << "\n" << vprDEBUG_FLUSH;
101    }
102    void ClusterDelta::createExpectedTime()
103    {
104                      vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Calculate Expected Time: " << mRemoteSendTime.getBaseVal() << "\n" << vprDEBUG_FLUSH;
105       mLatencyTime.set( (mLocalReceiveTime.getBaseVal()-mLocalSendTime.getBaseVal())/2, vpr::Interval::Base);
106                      vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC]     Latency: " << mLatencyTime.getBaseVal() << "\n" << vprDEBUG_FLUSH;
107
108       mDelta=(mRemoteSendTime-mLocalSendTime-mLatencyTime);
109
110       std::cout << "   " << mRemoteSendTime.getBaseVal() << std::endl;
111       std::cout << " - " << mRemoteSendTime.getBaseVal() << std::endl;
112       std::cout << " - " << mRemoteSendTime.getBaseVal() << std::endl;
113       std::cout << " = " << mRemoteSendTime.getBaseVal() - mRemoteSendTime.getBaseVal() - mRemoteSendTime.getBaseVal() << std::endl;
114
115       vpr::Int64 mNewDelta = mRemoteSendTime.getBaseVal() - mRemoteSendTime.getBaseVal() - mRemoteSendTime.getBaseVal();
116
117       std::cout << "New Delta: " << mNewDelta << std::endl;
118
119       std::cout << "mDelta" << mDelta.getBaseVal() << std::endl;
120
121                      vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC]     Delta: " << mDelta.getBaseVal() << "\n" << vprDEBUG_FLUSH;
122       mLocalSendTime.setNow();
123       mExpectedRemoteTime = mLocalSendTime + mDelta + mLatencyTime;
124                      vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC]     Sent Expected Time: " << mExpectedRemoteTime.getBaseVal() << "\n" << vprDEBUG_FLUSH;
125       mWriter.writeUint64(mExpectedRemoteTime.getBaseVal());
126       sendAndClear();
127    }
128    void ClusterDelta::receiveExpectedTime()
129    {
130
131       getPacket(8);
132       mLocalReceiveTime.setNow();
133                      vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC]     Got Expected Time at: " << mLocalReceiveTime.getBaseVal() << "\n" << vprDEBUG_FLUSH;
134       mExpectedRemoteTime.set(mReader->readUint64(), vpr::Interval::Base);
135                      vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC]     Got Expected Time: " << mExpectedRemoteTime.getBaseVal() << "\n" << vprDEBUG_FLUSH;
136       if (mExpectedRemoteTime > mLocalReceiveTime)
137       {
138          mErrorTime = mExpectedRemoteTime - mLocalReceiveTime;
139       }
140       else
141       {
142          mErrorTime = mLocalReceiveTime - mExpectedRemoteTime;
143       }
144
145                      vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC]     Error of: " << mErrorTime.getBaseVal() << "\n" << vprDEBUG_FLUSH;
146
147       if ( mErrorTime.getBaseVal() < mTol )
148       {
149          mAccept = true;
150       }
151       else
152       {
153          mAccept = false;
154          mTol = gmtl::Math::pow(mTol,(float)1.5);
155       }
156    }
157    void ClusterDelta::createResponce()
158    {
159                      vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC] Createing Responce" << "\n" << vprDEBUG_FLUSH;
160                      vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << "[SYNC]     mTol: " << mTol << " Accept?: " << mAccept << "\n" << vprDEBUG_FLUSH;
161       mWriter.writeBool(mAccept);
162       sendAndClear();
163    }
164
165    void ClusterDelta::receiveResponce()
166    {
167       getPacket(1);
168       mAccept = mReader->readBool();
169       if ( mAccept == false )
170       {
171          vprDEBUG(gadgetDBG_RIM,vprDBG_VERB_LVL) << clrOutNORM(clrRED,"[SYNC]FAILED SYNC") << "\n" << vprDEBUG_FLUSH;
172       }
173    }
174
175    void ClusterDelta::sendAndClear()
176    {
177       if ( !mWriter.getData()->empty() )
178       {
179          mSocketStream->send(*(mWriter.getData()), mWriter.getData()->size());
180       }
181       mWriter.mData->clear();
182       mWriter.mCurHeadPos = 0;
183    }
184    void ClusterDelta::getPacket(unsigned num)
185    {                                      // Need to delete the old Object Readers
186       syncPacket.clear();
187       mReader->setCurPos(0);
188       //mSocketStream->readn(syncPacket,SYNC_PACKET_LENGTH,bytes_read);
189       mSocketStream->readn(syncPacket,num);
190    }
191    void ClusterDelta::clearIntervals()
192    {
193       mLocalSendTime.set(0,vpr::Interval::Base);
194       mLocalReceiveTime.set(0,vpr::Interval::Base);
195       mRemoteSendTime.set(0,vpr::Interval::Base);
196       mRemoteReceiveTime.set(0,vpr::Interval::Base);
197       mExpectedRemoteTime.set(0,vpr::Interval::Base);
198       mLatencyTime.set(0,vpr::Interval::Base);
199       mDelta.set(0,vpr::Interval::Base);
200       mErrorTime.set(0,vpr::Interval::Base);
201    }
202
203 } // End of gadget namespace
204
Note: See TracBrowser for help on using the browser.