root/juggler/branches/1.0/Environment/vjConnect.cpp

Revision 8789, 14.8 kB (checked in by patrickh, 7 years ago)

Copyright update.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1 /*************** <auto-copyright.pl BEGIN do not edit this line> **************
2  *
3  * VR Juggler is (C) Copyright 1998, 1999, 2000, 2001, 2002
4  *   by Iowa State University
5  *
6  * Original Authors:
7  *   Allen Bierbaum, Christopher Just,
8  *   Patrick Hartling, Kevin Meinert,
9  *   Carolina Cruz-Neira, Albert Baker
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
24  * Boston, MA 02111-1307, USA.
25  *
26  * -----------------------------------------------------------------
27  * File:          $RCSfile$
28  * Date modified: $Date$
29  * Version:       $Revision$
30  * -----------------------------------------------------------------
31  *
32  *************** <auto-copyright.pl END do not edit this line> ***************/
33
34
35 // File output for Environment Manager
36 //
37 // author: Christopher Just
38
39
40 #include <vjConfig.h>
41
42 #include <Environment/vjConnect.h>
43 #include <Config/vjChunkDescDB.h>
44 #include <Config/vjConfigChunkDB.h>
45 #include <Config/vjChunkFactory.h>
46 #include <Environment/vjEnvironmentManager.h>
47 #include <Environment/vjTimedUpdate.h>
48 //#include <Kernel/vjKernel.h>
49 #include <Kernel/vjConfigManager.h>
50 #include <Config/vjConfigTokens.h>
51
52
53 vjConnect::vjConnect(vjSocket* s, const std::string& _name,
54            vjConnectMode _mode): name(""), filename(""), commands_mutex() {
55     vjDEBUG(vjDBG_ENV_MGR,4) << "EM: Creating vjConnect to file or socket\n"
56           << vjDEBUG_FLUSH;
57     sock = s;
58     mode = _mode;
59
60     write_alive = false;
61     filename = "Socket/Pipe";
62     name = _name;
63     read_connect_thread = NULL;
64     write_connect_thread = NULL;
65     read_die = write_die = false;
66
67     outstream = sock->getOutputStream();
68     instream = sock->getInputStream();
69
70     // we need to add a chunk describing ourself
71     vjConfigChunk* ch = vjChunkFactory::instance()->createChunk ("FileConnect");
72     if (ch) {
73         ch->setProperty ("Name", name);
74         ch->setProperty ("Mode", VJC_INTERACTIVE);
75         ch->setProperty ("filename", filename);
76         ch->setProperty ("Enabled", true);
77         vjConfigManager::instance()->addActive(ch);              // Add to active config
78     }
79 }
80
81
82
83 vjConnect::vjConnect(vjConfigChunk* c): commands_mutex() {
84
85     sock = NULL;
86     filename = (std::string)c->getProperty ("FileName");
87     name = (std::string)c->getProperty ("Name");
88     mode = (vjConnectMode)(int)c->getProperty ("Mode");
89
90     read_die = write_die = false;
91     read_connect_thread = NULL;
92     write_connect_thread = NULL;
93
94     switch (mode) {
95     case VJC_OUTPUT:
96         outstream = new std::ofstream (filename.c_str(), std::ios::out);
97         if (!outstream)
98             vjDEBUG(vjDBG_ALL,0) << clrOutNORM(clrRED, "ERROR:") << " file open failed for \"" << filename.c_str()
99                                  << "\"\n" << vjDEBUG_FLUSH;
100         break;
101     case VJC_INPUT:
102         instream = new std::ifstream (filename.c_str(), std::ios::in);
103         if (!instream)
104             vjDEBUG(vjDBG_ALL,0) << clrOutNORM(clrRED, "ERROR:") << " file open failed for \"" << filename.c_str()
105                                  << "\"\n" << vjDEBUG_FLUSH;
106         break;
107     case VJC_INTERACTIVE:
108         instream = new std::fstream (filename.c_str(),
109                                      std::ios::in | std::ios::out);
110         if (!instream)
111             vjDEBUG(vjDBG_ALL,0) << clrOutNORM(clrRED, "ERROR:") << " file open failed for \"" << filename.c_str()
112                                  << "\"\n" << vjDEBUG_FLUSH;
113         outstream = (std::fstream*)instream;
114         break;
115     }
116
117     // logging information to output file...
118     if (mode == VJC_OUTPUT)
119         *outstream << "VR Juggler FileConnect output " << name.c_str()
120                    << std::endl;
121 }
122
123
124
125 vjConnect::~vjConnect() {
126     stopProcess();
127     //close (fd);
128 }
129
130
131
132 bool vjConnect::startProcess() {
133    if (write_connect_thread)
134       return true;
135
136    bool success = true;
137
138    read_die = write_die = false;
139
140    if (mode == VJC_OUTPUT || mode == VJC_INTERACTIVE)
141    {
142       vjThreadMemberFunctor<vjConnect> *writeMemberFunctor =
143       new vjThreadMemberFunctor<vjConnect>(this,
144                                            &vjConnect::writeControlLoop,
145                                            NULL);
146       write_connect_thread = new vjThread (writeMemberFunctor, 0);
147       success = success && write_connect_thread;
148    }
149    if (mode == VJC_INPUT || mode == VJC_INTERACTIVE)
150    {
151       vjThreadMemberFunctor<vjConnect> *readMemberFunctor =
152       new vjThreadMemberFunctor<vjConnect>(this,
153                                            &vjConnect::readControlLoop,
154                                            NULL);
155       read_connect_thread = new vjThread (readMemberFunctor, 0);
156       success = success && read_connect_thread;
157    }
158
159
160    return success;
161 }
162
163
164
165 bool vjConnect::stopProcess() {
166 //      sendDisconnect();
167     read_die = write_die = true;
168     if (read_connect_thread) {
169         read_connect_thread->kill();
170         delete read_connect_thread;
171         read_connect_thread = NULL;
172     }
173     if (write_connect_thread) {
174         while (write_alive)
175             std::cout << "waiting for write to die" << std::endl;
176         //write_connect_thread->kill();
177         delete write_connect_thread;
178         write_connect_thread = NULL;
179     }
180     if (sock) {
181         delete (sock);
182     }
183     else {
184         delete outstream;
185         if (outstream != (void*)instream)
186             delete instream;
187     }
188     instream = 0;
189     outstream = 0;
190     sock = 0;
191     return true;
192 }
193
194
195 void vjConnect::sendDescDB (vjChunkDescDB* db) {
196     if (mode != VJC_INPUT)
197         commands.push (new vjCommandSendDescDB (db));
198 }
199
200
201 void vjConnect::sendDisconnect () {
202     std::cerr << "vjConnect::sendDisconnect not implemented!!!" << std::endl;
203     //    if (mode != VJC_INPUT)
204     //   commands.push (new vjCommandDisconnect());
205 }
206
207
208 void vjConnect::sendChunkDB (vjConfigChunkDB* db, bool all) {
209     if (mode != VJC_INPUT)
210         commands.push (new vjCommandSendChunkDB (db, all));
211 }
212
213
214 void vjConnect::sendRefresh () {
215     if (mode == VJC_INTERACTIVE)
216         commands.push (new vjCommandRefresh);
217 }
218
219
220
221 //! ARGS: _tu - a vjTimedUpdate*
222 //! ARGS: _refresh_time - time between refreshes, in milliseconds
223 void vjConnect::addTimedUpdate (vjTimedUpdate* _tu, float _refresh_time) {
224     if (mode != VJC_INPUT) {
225         commands_mutex.acquire();
226         timed_commands.push (new vjCommandTimedUpdate (_tu, _refresh_time));
227         commands_mutex.release();
228     }
229 }
230
231
232
233 void vjConnect::removeTimedUpdate (vjTimedUpdate* _tu) {
234     // this better not be called often - it's gotta be nlogn or something.
235     // still, there'll probably never be more than a couple dozen
236     // items in the timed_commands queue anyway.
237     std::priority_queue<vjCommand*, std::vector<vjCommand*>, vjCommandPtrCmp> newq;
238     vjCommandTimedUpdate* ctu2;
239     vjCommand* ctu1;
240     commands_mutex.acquire();
241     while (!timed_commands.empty()) {
242         ctu1 = timed_commands.top();
243         ctu2 = dynamic_cast<vjCommandTimedUpdate*>(ctu1);
244         timed_commands.pop();
245         if (ctu2 && (ctu2->timed_update == _tu))
246             continue;
247         newq.push (ctu1);
248     }
249     timed_commands = newq;
250     commands_mutex.release();
251 }
252
253
254 //----------------- PRIVATE utility functions ---------------------------
255
256
257 void vjConnect::readControlLoop(void* nullParam) {
258    vjDEBUG(vjDBG_ENV_MGR,5) << "vjConnect " << name.c_str()
259              << " started read control loop.\n"
260              << vjDEBUG_FLUSH;
261    while (!read_die) {
262        if (!(*instream) || instream->eof())
263       break;
264       if (!readCommand (*instream))
265           break;
266    }
267    vjDEBUG(vjDBG_ENV_MGR,5) << "vjConnect " << name.c_str()
268              <<" ending read control loop.\n" << vjDEBUG_FLUSH;
269
270    read_connect_thread = NULL;
271    read_die = write_die = true;
272 //**//   vjKernel::instance()->getEnvironmentManager()->connectHasDied(this);
273 }
274
275
276
277 void vjConnect::writeControlLoop(void* nullParam) {
278     /* this probably needs considerable revision */
279     vjCommand*  cmd;
280     write_alive = true;
281
282 //              *outstream << "another test : ( \n" << flush;
283
284     vjDEBUG(vjDBG_ENV_MGR,5) << "vjConnect " << name.c_str()
285                              << " started write control loop.\n"
286                              << vjDEBUG_FLUSH;
287
288     while (!write_die) {
289 //          cout << "writing in write loop " << flush;
290 //          *outstream << "yet another test : ( \n" << flush;
291 //          cout << " -done\n" << flush;
292
293         usleep (300000); // half a sec - find a better way to do this...
294         if (!outstream)
295             break;
296
297         commands_mutex.acquire();
298
299         while (!commands.empty()) {
300             cmd = commands.front();
301             commands.pop();
302             vjDEBUG (vjDBG_ENV_MGR, 5) << "calling EM command "
303                                        << cmd->getName().c_str()
304                                        <<vjDEBUG_FLUSH;
305             cmd->call (*outstream);
306             vjDEBUG (vjDBG_ENV_MGR, 5) << " -- done.\n" << vjDEBUG_FLUSH;
307             delete cmd;
308         }
309
310         current_time.set();
311
312         while (!timed_commands.empty()) {
313             cmd = timed_commands.top();
314             if (current_time.usecs() < (cmd->next_fire_time * 1000))
315                 break;
316             timed_commands.pop();
317             cmd->call (*outstream);
318             cmd->resetFireTime (current_time);
319             timed_commands.push (cmd);
320         }
321
322         commands_mutex.release();
323
324     } // end main loop
325     vjDEBUG (vjDBG_ENV_MGR,5) << "vjConnect " << name.c_str() << " ending write loop.\n" << vjDEBUG_FLUSH;
326     //write_connect_thread = NULL;
327     write_alive = false;
328 }
329
330
331
332 bool vjConnect::readCommand(std::istream& fin) {
333     // reads one command.  called from controlloop
334     const int   buflen = 512;
335     char        rbuf[buflen];    // HACK! can't handle lines longer than buflen
336     //char        c;
337     char*       s;
338
339     if (!fin.getline(rbuf,buflen,'\n'))
340         return false;
341
342     vjDEBUG(vjDBG_ENV_MGR,4) << "vjConnect:: read: '" << rbuf
343                              << "'.\n" << vjDEBUG_FLUSH;
344
345     s = strtok (rbuf, " \t\n");
346     if (!s) {
347         vjDEBUG(vjDBG_ERROR,1) << "couldn't get a token.  something's really wrong in vjConnect\n"
348                                << vjDEBUG_FLUSH;
349     }
350
351     if (!strcasecmp (s, get_TOKEN)) {
352         s = strtok (NULL, " \t\n");
353         if (!strcasecmp (s, descriptions_TOKEN)) {
354             vjChunkDescDB* db = vjChunkFactory::instance()->getChunkDescDB();
355             vjDEBUG(vjDBG_ENV_MGR,4) << "vjConnect: Sending (requested) chunkdesc.\n" << vjDEBUG_FLUSH;
356             vjDEBUG(vjDBG_ENV_MGR,5) << *db << std::endl << vjDEBUG_FLUSH;
357             sendDescDB (db);
358         }
359         else if (!strcasecmp (s,chunks_TOKEN)) {
360             vjConfigManager::instance()->lockActive();
361             vjConfigChunkDB* db = new vjConfigChunkDB((*(vjConfigManager::instance()->getActiveConfig())));   // Make a copy
362             vjConfigManager::instance()->unlockActive();
363
364             vjDEBUG(vjDBG_ENV_MGR,4) << "vjConnect: Sending (requested) chunkdb.\n" << vjDEBUG_FLUSH;
365             vjDEBUG(vjDBG_ENV_MGR,5) << *db << std::endl << vjDEBUG_FLUSH;
366             sendChunkDB (db, true);
367         }
368         else {
369             vjDEBUG(vjDBG_ERROR,1)
370                << "Error: vjConnect:: Received unknown GET: " << s
371                << std::endl << vjDEBUG_FLUSH;
372         }
373     }
374
375     else if (!strcasecmp (s, descriptions_TOKEN)) {
376         /* message contains one or more descriptions, to
377          * be read in just like a ChunkDescDB.  If the
378          * descriptions line itself contains the word
379          * "all", then we should clear the db first.
380          */
381         // XXX: Hack!!! We need to change this. We should not
382         // change the dbs outside of kernel
383         //s = strtok (NULL, " \t\n");
384         vjDEBUG(vjDBG_ERROR,0) << "EM Receive descriptions disabled!!!\n" << vjDEBUG_FLUSH;
385         //if (!strcasecmp (s, "all") && (cachedChunkdb->isEmpty()))
386         //    cachedDescdb->removeAll();
387         //fin >> *cachedDescdb;
388     }
389
390     else if (!strcasecmp (s, chunks_TOKEN)) {
391         /* message contains one or more chunks.  If the
392          * descriptions line contains "all", we should
393          * clear the db first
394          */
395         //s = strtok (NULL, " \t\n");
396         // chunks 'all' option disabled for now...
397         //if (!strcasecmp (s, "all"))
398         //   chunkdb->removeAll()
399         vjDEBUG(vjDBG_ENV_MGR,1) << "vjConnect:: Read: chunks: Started\n" << vjDEBUG_FLUSH;
400
401         vjConfigChunkDB* newchunkdb = new vjConfigChunkDB;
402         fin >> *newchunkdb;
403         vjDEBUG(vjDBG_ENV_MGR,5) << *newchunkdb << std::endl << vjDEBUG_FLUSH;
404         vjDEBUG(vjDBG_ENV_MGR,3) << "vjConnect:: Read: chunks: Completed\n" << vjDEBUG_FLUSH;
405         // ALLEN: PUT A FUNCTION HERE FOR THE KERNEL TO LOOK AT NEWCHUNKDB
406         vjConfigManager::instance()->addChunkDB(newchunkdb);    // Adds chunks to the pending list
407         vjDEBUG(vjDBG_ENV_MGR,3) << "vjConnect: Added chunks to vjConfigManager pending list to add\n" << vjDEBUG_FLUSH;
408     }
409
410     else if (!strcasecmp (s, remove_TOKEN)) {
411         s = strtok (NULL, " \t\n");
412         if (!strcasecmp (s, descriptions_TOKEN)) {
413             while ( (s = strtok (NULL, " \t\n")) ) {
414                 // BUG! - what if chunks exist in db using the desc we're removing?
415                 //cachedDescdb->remove(s);
416                 vjDEBUG(vjDBG_ENV_MGR,3) << "EM Remove Descriptions disabled!\n" << vjDEBUG_FLUSH;
417             }
418         }
419         else if (!strcasecmp (s, chunks_TOKEN)) {
420             vjConfigChunkDB* remove_chunk_db = new vjConfigChunkDB();
421
422             vjDEBUG(vjDBG_ENV_MGR,5) << "vjConnect: Remove: chunks: Starting...\n"  << vjDEBUG_FLUSH;
423
424             fin >> *remove_chunk_db;       // Read in the chunks to remove
425
426             vjDEBUG(vjDBG_ENV_MGR,5) << *remove_chunk_db << std::endl
427                                      << vjDEBUG_FLUSH;
428
429             // Tell config manager to remove the chunks
430             vjConfigManager::instance()->removeChunkDB(remove_chunk_db);     // Add chunks to pending list as removes
431             vjDEBUG(vjDBG_ENV_MGR,3) << "vjConnect: Remove chunks added to vjConfigManager pending list\n" << vjDEBUG_FLUSH;
432         }
433         else
434             vjDEBUG(vjDBG_ERROR,3) << "Error: vjConnect: Unknown remove type: "
435                                    << s << std::endl << vjDEBUG_FLUSH;
436     }
437     else {
438         vjDEBUG(vjDBG_ERROR,0) << "Error: vjConnect:: Unknown command '"
439                                << s << "'\n" << vjDEBUG_FLUSH;
440     }
441     return true;
442 }
Note: See TracBrowser for help on using the browser.