root/juggler/tags/1.0.5/Environment/vjConnect.cpp

Revision 7539, 14.8 kB (checked in by anonymous, 7 years ago)

This commit was manufactured by cvs2svn to create tag
'RELENG_1_0_5_RELEASE'.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1 /*************** <auto-copyright.pl BEGIN do not edit this line> **************
2  *
3  * VR Juggler is (C) Copyright 1998, 1999, 2000 by Iowa State University
4  *
5  * Original Authors:
6  *   Allen Bierbaum, Christopher Just,
7  *   Patrick Hartling, Kevin Meinert,
8  *   Carolina Cruz-Neira, Albert Baker
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  *
25  * -----------------------------------------------------------------
26  * File:          $RCSfile$
27  * Date modified: $Date$
28  * Version:       $Revision$
29  * -----------------------------------------------------------------
30  *
31  *************** <auto-copyright.pl END do not edit this line> ***************/
32
33
34 // File output for Environment Manager
35 //
36 // author: Christopher Just
37
38
39 #include <vjConfig.h>
40
41 #include <Environment/vjConnect.h>
42 #include <Config/vjChunkDescDB.h>
43 #include <Config/vjConfigChunkDB.h>
44 #include <Config/vjChunkFactory.h>
45 #include <Environment/vjEnvironmentManager.h>
46 #include <Environment/vjTimedUpdate.h>
47 //#include <Kernel/vjKernel.h>
48 #include <Kernel/vjConfigManager.h>
49 #include <Config/vjConfigTokens.h>
50
51
52 vjConnect::vjConnect(vjSocket* s, const std::string& _name,
53            vjConnectMode _mode): name(""), filename(""), commands_mutex() {
54     vjDEBUG(vjDBG_ENV_MGR,4) << "EM: Creating vjConnect to file or socket\n"
55           << vjDEBUG_FLUSH;
56     sock = s;
57     mode = _mode;
58
59     write_alive = false;
60     filename = "Socket/Pipe";
61     name = _name;
62     read_connect_thread = NULL;
63     write_connect_thread = NULL;
64     read_die = write_die = false;
65
66     outstream = sock->getOutputStream();
67     instream = sock->getInputStream();
68
69     // we need to add a chunk describing ourself
70     vjConfigChunk* ch = vjChunkFactory::instance()->createChunk ("FileConnect");
71     if (ch) {
72         ch->setProperty ("Name", name);
73         ch->setProperty ("Mode", VJC_INTERACTIVE);
74         ch->setProperty ("filename", filename);
75         ch->setProperty ("Enabled", true);
76         vjConfigManager::instance()->addActive(ch);              // Add to active config
77     }
78 }
79
80
81
82 vjConnect::vjConnect(vjConfigChunk* c): commands_mutex() {
83
84     sock = NULL;
85     filename = (std::string)c->getProperty ("FileName");
86     name = (std::string)c->getProperty ("Name");
87     mode = (vjConnectMode)(int)c->getProperty ("Mode");
88
89     read_die = write_die = false;
90     read_connect_thread = NULL;
91     write_connect_thread = NULL;
92
93     switch (mode) {
94     case VJC_OUTPUT:
95         outstream = new std::ofstream (filename.c_str(), std::ios::out);
96         if (!outstream)
97             vjDEBUG(vjDBG_ALL,0) << clrOutNORM(clrRED, "ERROR:") << " file open failed for \"" << filename.c_str()
98                                  << "\"\n" << vjDEBUG_FLUSH;
99         break;
100     case VJC_INPUT:
101         instream = new std::ifstream (filename.c_str(), std::ios::in);
102         if (!instream)
103             vjDEBUG(vjDBG_ALL,0) << clrOutNORM(clrRED, "ERROR:") << " file open failed for \"" << filename.c_str()
104                                  << "\"\n" << vjDEBUG_FLUSH;
105         break;
106     case VJC_INTERACTIVE:
107         instream = new std::fstream (filename.c_str(),
108                                      std::ios::in | std::ios::out);
109         if (!instream)
110             vjDEBUG(vjDBG_ALL,0) << clrOutNORM(clrRED, "ERROR:") << " file open failed for \"" << filename.c_str()
111                                  << "\"\n" << vjDEBUG_FLUSH;
112         outstream = (std::fstream*)instream;
113         break;
114     }
115
116     // logging information to output file...
117     if (mode == VJC_OUTPUT)
118         *outstream << "VR Juggler FileConnect output " << name.c_str()
119                    << std::endl;
120 }
121
122
123
124 vjConnect::~vjConnect() {
125     stopProcess();
126     //close (fd);
127 }
128
129
130
131 bool vjConnect::startProcess() {
132    if (write_connect_thread)
133       return true;
134
135    bool success = true;
136
137    read_die = write_die = false;
138
139    if (mode == VJC_OUTPUT || mode == VJC_INTERACTIVE)
140    {
141       vjThreadMemberFunctor<vjConnect> *writeMemberFunctor =
142       new vjThreadMemberFunctor<vjConnect>(this,
143                                            &vjConnect::writeControlLoop,
144                                            NULL);
145       write_connect_thread = new vjThread (writeMemberFunctor, 0);
146       success = success && write_connect_thread;
147    }
148    if (mode == VJC_INPUT || mode == VJC_INTERACTIVE)
149    {
150       vjThreadMemberFunctor<vjConnect> *readMemberFunctor =
151       new vjThreadMemberFunctor<vjConnect>(this,
152                                            &vjConnect::readControlLoop,
153                                            NULL);
154       read_connect_thread = new vjThread (readMemberFunctor, 0);
155       success = success && read_connect_thread;
156    }
157
158
159    return success;
160 }
161
162
163
164 bool vjConnect::stopProcess() {
165 //      sendDisconnect();
166     read_die = write_die = true;
167     if (read_connect_thread) {
168         read_connect_thread->kill();
169         delete read_connect_thread;
170         read_connect_thread = NULL;
171     }
172     if (write_connect_thread) {
173         while (write_alive)
174             std::cout << "waiting for write to die" << std::endl;
175         //write_connect_thread->kill();
176         delete write_connect_thread;
177         write_connect_thread = NULL;
178     }
179     if (sock) {
180         delete (sock);
181     }
182     else {
183         delete outstream;
184         if (outstream != (void*)instream)
185             delete instream;
186     }
187     instream = 0;
188     outstream = 0;
189     sock = 0;
190     return true;
191 }
192
193
194 void vjConnect::sendDescDB (vjChunkDescDB* db) {
195     if (mode != VJC_INPUT)
196         commands.push (new vjCommandSendDescDB (db));
197 }
198
199
200 void vjConnect::sendDisconnect () {
201     std::cerr << "vjConnect::sendDisconnect not implemented!!!" << std::endl;
202     //    if (mode != VJC_INPUT)
203     //   commands.push (new vjCommandDisconnect());
204 }
205
206
207 void vjConnect::sendChunkDB (vjConfigChunkDB* db, bool all) {
208     if (mode != VJC_INPUT)
209         commands.push (new vjCommandSendChunkDB (db, all));
210 }
211
212
213 void vjConnect::sendRefresh () {
214     if (mode == VJC_INTERACTIVE)
215         commands.push (new vjCommandRefresh);
216 }
217
218
219
220 //! ARGS: _tu - a vjTimedUpdate*
221 //! ARGS: _refresh_time - time between refreshes, in milliseconds
222 void vjConnect::addTimedUpdate (vjTimedUpdate* _tu, float _refresh_time) {
223     if (mode != VJC_INPUT) {
224         commands_mutex.acquire();
225         timed_commands.push (new vjCommandTimedUpdate (_tu, _refresh_time));
226         commands_mutex.release();
227     }
228 }
229
230
231
232 void vjConnect::removeTimedUpdate (vjTimedUpdate* _tu) {
233     // this better not be called often - it's gotta be nlogn or something.
234     // still, there'll probably never be more than a couple dozen
235     // items in the timed_commands queue anyway.
236     std::priority_queue<vjCommand*, std::vector<vjCommand*>, vjCommandPtrCmp> newq;
237     vjCommandTimedUpdate* ctu2;
238     vjCommand* ctu1;
239     commands_mutex.acquire();
240     while (!timed_commands.empty()) {
241         ctu1 = timed_commands.top();
242         ctu2 = dynamic_cast<vjCommandTimedUpdate*>(ctu1);
243         timed_commands.pop();
244         if (ctu2 && (ctu2->timed_update == _tu))
245             continue;
246         newq.push (ctu1);
247     }
248     timed_commands = newq;
249     commands_mutex.release();
250 }
251
252
253 //----------------- PRIVATE utility functions ---------------------------
254
255
256 void vjConnect::readControlLoop(void* nullParam) {
257    vjDEBUG(vjDBG_ENV_MGR,5) << "vjConnect " << name.c_str()
258              << " started read control loop.\n"
259              << vjDEBUG_FLUSH;
260    while (!read_die) {
261        if (!(*instream) || instream->eof())
262       break;
263       if (!readCommand (*instream))
264           break;
265    }
266    vjDEBUG(vjDBG_ENV_MGR,5) << "vjConnect " << name.c_str()
267              <<" ending read control loop.\n" << vjDEBUG_FLUSH;
268
269    read_connect_thread = NULL;
270    read_die = write_die = true;
271 //**//   vjKernel::instance()->getEnvironmentManager()->connectHasDied(this);
272 }
273
274
275
276 void vjConnect::writeControlLoop(void* nullParam) {
277     /* this probably needs considerable revision */
278     vjCommand*  cmd;
279     write_alive = true;
280
281 //              *outstream << "another test : ( \n" << flush;
282
283     vjDEBUG(vjDBG_ENV_MGR,5) << "vjConnect " << name.c_str()
284                              << " started write control loop.\n"
285                              << vjDEBUG_FLUSH;
286
287     while (!write_die) {
288 //          cout << "writing in write loop " << flush;
289 //          *outstream << "yet another test : ( \n" << flush;
290 //          cout << " -done\n" << flush;
291
292         usleep (300000); // half a sec - find a better way to do this...
293         if (!outstream)
294             break;
295
296         commands_mutex.acquire();
297
298         while (!commands.empty()) {
299             cmd = commands.front();
300             commands.pop();
301             vjDEBUG (vjDBG_ENV_MGR, 5) << "calling EM command "
302                                        << cmd->getName().c_str()
303                                        <<vjDEBUG_FLUSH;
304             cmd->call (*outstream);
305             vjDEBUG (vjDBG_ENV_MGR, 5) << " -- done.\n" << vjDEBUG_FLUSH;
306             delete cmd;
307         }
308
309         current_time.set();
310
311         while (!timed_commands.empty()) {
312             cmd = timed_commands.top();
313             if (current_time.usecs() < (cmd->next_fire_time * 1000))
314                 break;
315             timed_commands.pop();
316             cmd->call (*outstream);
317             cmd->resetFireTime (current_time);
318             timed_commands.push (cmd);
319         }
320
321         commands_mutex.release();
322
323     } // end main loop
324     vjDEBUG (vjDBG_ENV_MGR,5) << "vjConnect " << name.c_str() << " ending write loop.\n" << vjDEBUG_FLUSH;
325     //write_connect_thread = NULL;
326     write_alive = false;
327 }
328
329
330
331 bool vjConnect::readCommand(std::istream& fin) {
332     // reads one command.  called from controlloop
333     const int   buflen = 512;
334     char        rbuf[buflen];    // HACK! can't handle lines longer than buflen
335     //char        c;
336     char*       s;
337
338     if (!fin.getline(rbuf,buflen,'\n'))
339         return false;
340
341     vjDEBUG(vjDBG_ENV_MGR,4) << "vjConnect:: read: '" << rbuf
342                              << "'.\n" << vjDEBUG_FLUSH;
343
344     s = strtok (rbuf, " \t\n");
345     if (!s) {
346         vjDEBUG(vjDBG_ERROR,1) << "couldn't get a token.  something's really wrong in vjConnect\n"
347                                << vjDEBUG_FLUSH;
348     }
349
350     if (!strcasecmp (s, get_TOKEN)) {
351         s = strtok (NULL, " \t\n");
352         if (!strcasecmp (s, descriptions_TOKEN)) {
353             vjChunkDescDB* db = vjChunkFactory::instance()->getChunkDescDB();
354             vjDEBUG(vjDBG_ENV_MGR,4) << "vjConnect: Sending (requested) chunkdesc.\n" << vjDEBUG_FLUSH;
355             vjDEBUG(vjDBG_ENV_MGR,5) << *db << std::endl << vjDEBUG_FLUSH;
356             sendDescDB (db);
357         }
358         else if (!strcasecmp (s,chunks_TOKEN)) {
359             vjConfigManager::instance()->lockActive();
360             vjConfigChunkDB* db = new vjConfigChunkDB((*(vjConfigManager::instance()->getActiveConfig())));   // Make a copy
361             vjConfigManager::instance()->unlockActive();
362
363             vjDEBUG(vjDBG_ENV_MGR,4) << "vjConnect: Sending (requested) chunkdb.\n" << vjDEBUG_FLUSH;
364             vjDEBUG(vjDBG_ENV_MGR,5) << *db << std::endl << vjDEBUG_FLUSH;
365             sendChunkDB (db, true);
366         }
367         else {
368             vjDEBUG(vjDBG_ERROR,1)
369                << "Error: vjConnect:: Received unknown GET: " << s
370                << std::endl << vjDEBUG_FLUSH;
371         }
372     }
373
374     else if (!strcasecmp (s, descriptions_TOKEN)) {
375         /* message contains one or more descriptions, to
376          * be read in just like a ChunkDescDB.  If the
377          * descriptions line itself contains the word
378          * "all", then we should clear the db first.
379          */
380         // XXX: Hack!!! We need to change this. We should not
381         // change the dbs outside of kernel
382         //s = strtok (NULL, " \t\n");
383         vjDEBUG(vjDBG_ERROR,0) << "EM Receive descriptions disabled!!!\n" << vjDEBUG_FLUSH;
384         //if (!strcasecmp (s, "all") && (cachedChunkdb->isEmpty()))
385         //    cachedDescdb->removeAll();
386         //fin >> *cachedDescdb;
387     }
388
389     else if (!strcasecmp (s, chunks_TOKEN)) {
390         /* message contains one or more chunks.  If the
391          * descriptions line contains "all", we should
392          * clear the db first
393          */
394         //s = strtok (NULL, " \t\n");
395         // chunks 'all' option disabled for now...
396         //if (!strcasecmp (s, "all"))
397         //   chunkdb->removeAll()
398         vjDEBUG(vjDBG_ENV_MGR,1) << "vjConnect:: Read: chunks: Started\n" << vjDEBUG_FLUSH;
399
400         vjConfigChunkDB* newchunkdb = new vjConfigChunkDB;
401         fin >> *newchunkdb;
402         vjDEBUG(vjDBG_ENV_MGR,5) << *newchunkdb << std::endl << vjDEBUG_FLUSH;
403         vjDEBUG(vjDBG_ENV_MGR,3) << "vjConnect:: Read: chunks: Completed\n" << vjDEBUG_FLUSH;
404         // ALLEN: PUT A FUNCTION HERE FOR THE KERNEL TO LOOK AT NEWCHUNKDB
405         vjConfigManager::instance()->addChunkDB(newchunkdb);    // Adds chunks to the pending list
406         vjDEBUG(vjDBG_ENV_MGR,3) << "vjConnect: Added chunks to vjConfigManager pending list to add\n" << vjDEBUG_FLUSH;
407     }
408
409     else if (!strcasecmp (s, remove_TOKEN)) {
410         s = strtok (NULL, " \t\n");
411         if (!strcasecmp (s, descriptions_TOKEN)) {
412             while ( (s = strtok (NULL, " \t\n")) ) {
413                 // BUG! - what if chunks exist in db using the desc we're removing?
414                 //cachedDescdb->remove(s);
415                 vjDEBUG(vjDBG_ENV_MGR,3) << "EM Remove Descriptions disabled!\n" << vjDEBUG_FLUSH;
416             }
417         }
418         else if (!strcasecmp (s, chunks_TOKEN)) {
419             vjConfigChunkDB* remove_chunk_db = new vjConfigChunkDB();
420
421             vjDEBUG(vjDBG_ENV_MGR,5) << "vjConnect: Remove: chunks: Starting...\n"  << vjDEBUG_FLUSH;
422
423             fin >> *remove_chunk_db;       // Read in the chunks to remove
424
425             vjDEBUG(vjDBG_ENV_MGR,5) << *remove_chunk_db << std::endl
426                                      << vjDEBUG_FLUSH;
427
428             // Tell config manager to remove the chunks
429             vjConfigManager::instance()->removeChunkDB(remove_chunk_db);     // Add chunks to pending list as removes
430             vjDEBUG(vjDBG_ENV_MGR,3) << "vjConnect: Remove chunks added to vjConfigManager pending list\n" << vjDEBUG_FLUSH;
431         }
432         else
433             vjDEBUG(vjDBG_ERROR,3) << "Error: vjConnect: Unknown remove type: "
434                                    << s << std::endl << vjDEBUG_FLUSH;
435     }
436     else {
437         vjDEBUG(vjDBG_ERROR,0) << "Error: vjConnect:: Unknown command '"
438                                << s << "'\n" << vjDEBUG_FLUSH;
439     }
440     return true;
441 }
Note: See TracBrowser for help on using the browser.