Diana Software
QDb.cc
Go to the documentation of this file.
1 /*
2 * APOLLO: A complete DAQ and Online Data Analysis Framework for Diana
3 * $Id: QDb.cc 631 2006-12-19 11:54:27Z giacher $
4 * M.P. created 21/9/4
5 *
6 * Class QDb: base class for DB interfaces (Apollo & Diana)
7 *
8 */
9 
10 #include <stdexcept>
11 #include <iostream>
12 #include <sstream>
13 #include <cstdio>
14 #include <unistd.h>
15 
16 #include "QDb.hh"
17 #include "QError.hh"
18 
20 #include "QWatchdogTimer.hh"
21 
22 #include <boost/thread/mutex.hpp>
23 #include <boost/thread/locks.hpp>
24 
25 
26 //_____________________________________________________________________________
27 QDb::QDb(const std::string& dbHost,
28  const std::string& dbPort,
29  const std::string& dbUsr,
30  const std::string& dbPwd,
31  const std::string& dbName):
32  fConnection(NULL),
33  fDbHost(dbHost),
34  fDbPort(dbPort),
35  fDbUser(dbUsr),
36  fDbPwd(dbPwd),
37  fDbName(dbName),
38  fConnectAttemptTimeoutSec(0),
39  fMinConnectDelaySec(-1),
40  fMaxConnectDelaySec(-1),
41  fDisconnectTimeoutSec(-1),
42  fThreadHandler(NULL),
43  fWatchdogTimer(NULL),
44  fMutex(NULL)
45 {
46  fMutex = new boost::mutex();
47 }
48 
49 
50 //_____________________________________________________________________________
52 {
53  if (IsConnected())
54  Disconnect();
55 
56  if(fThreadHandler != NULL)
57  delete fThreadHandler;
58  if(fWatchdogTimer != NULL)
59  delete fWatchdogTimer;
60 
61  delete fMutex;
62 }
63 
64 
65 //_____________________________________________________________________________
66 bool QDb::IsConnected() const
67 {
68  return (CONNECTION_OK == PQstatus(fConnection));
69 }
70 
71 
72 //_____________________________________________________________________________
74 {
76 
77  // variables for handling multiple connect attempts
78  int maxNumAttempts = 0;
79  double sleepTimeSec = 1;
81  maxNumAttempts = fConnectAttemptTimeoutSec / sleepTimeSec;
82 
84  if(err != QERR_SUCCESS)
85  return err;
86 
87  int attempt = 0;
88  while(!IsConnected())
89  {
90  if(0 == attempt)
92 
93  err = ConnectBase();
94  if(err == QERR_SUCCESS)
95  break;
96 
97  if(attempt++ < maxNumAttempts)
98  sleep(sleepTimeSec);
99  else
100  break;
101  }
102 
103  if(err == QERR_SUCCESS) // i.e. connection was successful
104  {
106  if(err != QERR_SUCCESS)
107  return err;
108  }
109 
110  return err;
111 }
112 
113 
114 //_____________________________________________________________________________
116 {
117  if (IsConnected())
118  return QERR_SUCCESS;
119 
120  const char* pgoptions = NULL;
121  const char* pgtty = NULL;
122 
123  fConnection = PQsetdbLogin(fDbHost.c_str(),
124  fDbPort.c_str(),
125  pgoptions,
126  pgtty,
127  fDbName.c_str(),
128  fDbUser.c_str(),
129  fDbPwd.c_str());
130 
131  if( !IsConnected() )
132  {
133  std::string msg = "Cannot connect to DB. Srv="
134  + fDbHost
135  + " Port="
136  + fDbPort
137  + " DataBase="
138  + fDbName
139  + " User="
140  + fDbUser
141  + ": "
142  + PQerrorMessage(fConnection);
143  //std::cerr << msg << std::endl;
144  return QError(QERR_DB_CONN_FAILED, __FILE__, __LINE__, msg);
145  }
146  return QERR_SUCCESS;
147 }
148 
149 
150 //_____________________________________________________________________________
152 {
153  boost::lock_guard<boost::mutex> guard(*fMutex);
154  return UnlockedDisconnect();
155 }
156 
157 //_____________________________________________________________________________
159 {
160  if(NULL == fConnection)
161  return true;
162  PQfinish(fConnection);
163  fConnection = NULL;
164  return true;
165 }
166 
167 
168 //_____________________________________________________________________________
169 QDb::QDbTable QDb::DoQuery(const std::string& query)
170 {
171  QDbTable table;
172  QError err = DoQuery(query, table);
173  if(err != QERR_SUCCESS)
174  throw err;
175  return table;
176 
177 }
178 
179 
180 //_____________________________________________________________________________
181 double QDb::DoQueryDouble( const std::string& query)
182 {
183  QDbTable table;
184  QError err = DoQuery(query, table);
185  if(err != QERR_SUCCESS)
186  throw err;
187 
188  err = CheckSingleResult(table, query, __FUNCTION__, __FILE__, __LINE__);
189  if(err != QERR_SUCCESS)
190  throw err;
191 
192  return table.begin()->second.front().GetDouble();
193 }
194 
195 
196 //_____________________________________________________________________________
197 int QDb::DoQueryInt( const std::string& query)
198 {
199  QDbTable table;
200  QError err = DoQuery(query, table);
201  if(err != QERR_SUCCESS)
202  throw err;
203 
204  err = CheckSingleResult(table, query, __FUNCTION__, __FILE__, __LINE__);
205  if(err != QERR_SUCCESS)
206  throw err;
207 
208  return table.begin()->second.front().GetInt();
209 }
210 
211 
212 //_____________________________________________________________________________
213 bool QDb::DoQueryBool( const std::string& query)
214 {
215  QDbTable table;
216  QError err = DoQuery(query, table);
217  if(err != QERR_SUCCESS)
218  throw err;
219 
220  err = CheckSingleResult(table, query, __FUNCTION__, __FILE__, __LINE__);
221  if(err != QERR_SUCCESS)
222  throw err;
223 
224  return table.begin()->second.front().GetBool();
225 }
226 
227 
228 //_____________________________________________________________________________
229 std::string QDb::DoQueryString( const std::string& query)
230 {
231  QDbTable table;
232  QError err = DoQuery(query, table);
233  if(err != QERR_SUCCESS)
234  throw err;
235 
236  err = CheckSingleResult(table, query, __FUNCTION__, __FILE__, __LINE__);
237  if(err != QERR_SUCCESS)
238  throw err;
239 
240  return table.begin()->second.front().GetString();
241 }
242 
243 
244 //_____________________________________________________________________________
245 std::vector<int> QDb::DoQueryVectorInt( const std::string& query)
246 {
247  QDbTable table;
248  QError err = DoQuery(query, table);
249  if(err != QERR_SUCCESS)
250  throw err;
251 
252  err = CheckSingleResult(table, query, __FUNCTION__, __FILE__, __LINE__);
253  if(err != QERR_SUCCESS)
254  throw err;
255 
256  return table.begin()->second[0].GetVectorInt();
257 }
258 
259 //_____________________________________________________________________________
260 std::vector<double> QDb::DoQueryVectorDouble( const std::string& query)
261 {
262  QDbTable table;
263  QError err = DoQuery(query, table);
264  if(err != QERR_SUCCESS)
265  throw err;
266 
267  err = CheckSingleResult(table, query, __FUNCTION__, __FILE__, __LINE__);
268  if(err != QERR_SUCCESS)
269  throw err;
270 
271  return table.begin()->second[0].GetVectorDouble();
272 }
273 
274 
275 //_____________________________________________________________________________
276 std::vector<std::string> QDb::DoQueryVectorString( const std::string& query)
277 {
278  QDbTable table;
279  QError err = DoQuery(query, table);
280  if(err != QERR_SUCCESS)
281  throw err;
282 
283  err = CheckSingleResult(table, query, __FUNCTION__, __FILE__, __LINE__);
284  if(err != QERR_SUCCESS)
285  throw err;
286 
287  return table.begin()->second[0].GetVectorString();
288 }
289 
290 
291 //_____________________________________________________________________________
292 QError QDb::DoSQLNoReturn(const std::string& query)
293 {
294  PGresult* result = NULL;
295  QError err = Query(query, &result);
296  if(err != QERR_SUCCESS)
297  return err;
298 
299  PQclear(result);
300  return QERR_SUCCESS;
301 }
302 
303 
304 //_____________________________________________________________________________
305 int QDb::DoExec(const std::string& query)
306 {
307  PGresult* result = NULL;
308  QError err = Query(query, &result);
309  if(err != QERR_SUCCESS)
310  throw err;
311 
312  int affected = -1;
313  sscanf(PQcmdTuples(result), "%d", &affected);
314  PQclear(result);
315  return affected;
316 }
317 
318 
319 //_____________________________________________________________________________
320 int QDb::Insert(const std::string& tableName, const column& fields, const column & values)
321 {
322  if(fields.size() != values.size()) {
324  std::string errMsg = "QDb::Insert: size of fields differs from size of values";
325  err.SetDescription(__FILE__,__LINE__,errMsg);
326  throw err;
327  }
328 
329  std::string fieldlist;
330  std::string valuelist;
331  for(size_t i = 0; i < fields.size(); i++) {
332  std::string valuestr = values[i].GetString();
333  if(valuestr == "NULL" || valuestr == "Null" || valuestr == "null") { continue; }
334  if(i > 0) {
335  fieldlist+=",";
336  valuelist+=",";
337  }
338  valuelist += valuestr;
339  fieldlist += fields[i].GetString();
340  }
341 
342  std::stringstream query;
343  query<<"INSERT into "<<tableName<<" ("<<fieldlist<<")"
344  <<" SELECT "<<valuelist<<" WHERE NOT EXISTS ("
345  <<" SELECT * from "<<tableName<<" WHERE "
346  <<"("<<fieldlist<<") IN (("<<valuelist<<")))";
347  return DoExec(query.str());
348 }
349 
350 
351 //_____________________________________________________________________________
352 QError QDb::Query(const std::string& query, PGresult** result)
353 {
354  boost::lock_guard<boost::mutex> guard(*fMutex);
355  QError err = Connect();
356  if(err != QERR_SUCCESS)
357  return err;
358 
359  err = BasicQuery(query, result);
360  if(err != QERR_SUCCESS) // try again once
361  {
363 
364  err = Connect();
365  if(err != QERR_SUCCESS)
366  return err;
367 
368  err = BasicQuery(query, result);
369  return err;
370  }
371  return QERR_SUCCESS;
372 }
373 
374 
375 //_____________________________________________________________________________
376 QError QDb::BasicQuery(const std::string& query, PGresult** result) const
377 {
378  *result = PQexec(fConnection, query.c_str());
379  ExecStatusType status = PQresultStatus(*result);
380  switch(status)
381  {
382  case PGRES_TUPLES_OK:
383  case PGRES_COMMAND_OK:
384  return QERR_SUCCESS;
385 
386  case PGRES_BAD_RESPONSE:
387  case PGRES_NONFATAL_ERROR:
388  case PGRES_FATAL_ERROR:
389  case PGRES_EMPTY_QUERY:
390  case PGRES_COPY_OUT:
391  case PGRES_COPY_IN:
392  case PGRES_COPY_BOTH:
393  default:
394  {
395  std::string msg = "DoQuery: Error while executing query \""
396  + query
397  + "\" - "
398  + PQresStatus(status)
399  + " - "
400  + PQresultErrorMessage(*result);
401  PQclear(*result);
402  return QError(QERR_DB_QUERY_FAILED, __FILE__, __LINE__, msg);
403  }
404  }
405 }
406 
407 
408 //_____________________________________________________________________________
409 QError QDb::DoQuery(const std::string& query, QDbTable& table)
410 {
411  table.clear();
412 
413  PGresult *result = NULL;
414  QError err = Query(query, &result);
415  if(err != QERR_SUCCESS)
416  return err;
417 
418  if (PQresultStatus (result) != PGRES_TUPLES_OK)
419  {
420  std::string errMsg = "DoQuery: Error while executing query \"";
421  errMsg += query;
422  errMsg += "\" - ";
423  errMsg += PQresultErrorMessage(result);
424  err.Set(QERR_DB_QUERY_FAILED, __FILE__, __LINE__, errMsg);
425  PQclear(result);
426  return err;
427  }
428 
429  int nRows = PQntuples(result);
430 
431  if (0 == nRows)
432  {
433  PQclear(result);
434  return QERR_SUCCESS;
435  }
436 
437 
438  int numFields = PQnfields(result);
439  for (int field = 0; field < numFields; field++)
440  {
441  const char *fieldName = PQfname(result, field);
442  table[fieldName].reserve(nRows);
443  for (int row = 0; row < nRows; row++)
444  {
445  const char *value = PQgetvalue(result, row, field);
446  table[fieldName].push_back(QVdt(value, fieldName));
447  }
448  }
449 
450  PQclear(result);
451  return QERR_SUCCESS;
452 }
453 
454 
455 //_____________________________________________________________________________
457  const std::string& query,
458  const std::string& funcName,
459  const std::string& file,
460  int line) const
461 {
462  // a single result is a single field and a single row
463 
464  // check single field
465  if(table.empty())
466  return QError(QERR_DB_NULL_RESULT, __FILE__, __LINE__,
467  "No fields returned in "
468  + funcName
469  + ". Query was: \""
470  + query + "\"");
471  if(table.size() > 1)
472  return QError(QERR_UNDEFINED_STATUS, __FILE__, __LINE__,
473  "Multiple fields returned in "
474  + funcName
475  + ". Query was: \""
476  + query + "\"");
477 
478  // check single row
479  const column& col = table.begin()->second;
480  if(col.empty())
481  return QError(QERR_DB_NULL_RESULT, __FILE__, __LINE__,
482  "No rows returned in "
483  + funcName
484  + ". Query was: \""
485  + query + "\"");
486  if(col.size() > 1)
487  return QError(QERR_DB_MAXLINES_EXCEEDED, file, line,
488  "Multiple rows returned in "
489  + funcName
490  + ". Query was: \""
491  + query + "\"");
492 
493  return QERR_SUCCESS;
494 }
495 
496 
497 //_____________________________________________________________________________
499  if(fMinConnectDelaySec < 0
500  || fMaxConnectDelaySec < 0
502  return;
503 
504  // the if check allows fMaxConnectDelaySec == fMinConnectDelaySec,
505  // however the algorithm below handles this limit case properly, as
506  // randSleep is zero and the sleep time becomes fMinConnectDelaySec.
507 
508  double intervalSec = fMaxConnectDelaySec - fMinConnectDelaySec;
509  int randGranularity = 5000;
510  double randSleep
511  = intervalSec * (rand() % randGranularity) / randGranularity;
512  double sleepTimeSec = fMinConnectDelaySec + randSleep;
513 
514  int numSecs = (int)sleepTimeSec;
515  int numMicroSecs = 1e6*(sleepTimeSec - numSecs);
516  sleep(numSecs);
517  usleep(numMicroSecs);
518  return;
519 }
520 
521 
522 //_____________________________________________________________________________
524 {
525  if(fDisconnectTimeoutSec <= 0)
526  return QERR_SUCCESS;
527 
528  if(NULL == fThreadHandler)
529  {
532 
537  if(err != QERR_SUCCESS)
538  {
539  delete fThreadHandler;
540  delete fWatchdogTimer;
541  fThreadHandler = NULL;
542  fWatchdogTimer = NULL;
543  return err;
544  }
545  }
546  return QERR_SUCCESS;
547 }
548 
549 
550 //_____________________________________________________________________________
552 {
553  if(NULL == fThreadHandler)
554  return QERR_SUCCESS;
555 
557  err = fWatchdogTimer->Start();
558  if(err != QERR_SUCCESS)
559  return err;
560 
561  bool isRunning = false;
562  err = fThreadHandler->Check(isRunning, 0);
563  if(err != QERR_SUCCESS)
564  return err;
565 
566  if( ! isRunning )
567  return QError(QERR_UNDEFINED_STATUS, __FILE__, __LINE__,
568  "Watchdog timer thread died unexpectedly");
569 
570  return QERR_SUCCESS;
571 }
err
Definition: CheckOF.C:114
@ QERR_UNDEFINED_STATUS
Definition: QError.hh:51
@ QERR_DB_CONN_FAILED
Definition: QError.hh:102
@ QERR_DB_NULL_RESULT
Definition: QError.hh:104
@ QERR_DB_QUERY_FAILED
Definition: QError.hh:103
@ QERR_SIZE_NOT_MATCH
Definition: QError.hh:31
@ QERR_DB_MAXLINES_EXCEEDED
Definition: QError.hh:105
@ QERR_SUCCESS
Definition: QError.hh:27
QError Check(bool &isRunning)
void SetWatchdogTimer(QWatchdogTimer *timer)
std::vector< std::string > DoQueryVectorString(const std::string &query)
Definition: QDb.cc:276
PGconn * fConnection
Definition: QDb.hh:132
double fMinConnectDelaySec
Definition: QDb.hh:149
QWatchdogTimer * fWatchdogTimer
Definition: QDb.hh:157
std::vector< QVdt > column
Definition: QDb.hh:33
std::vector< double > DoQueryVectorDouble(const std::string &query)
Definition: QDb.cc:260
double fMaxConnectDelaySec
Definition: QDb.hh:150
double fDisconnectTimeoutSec
Definition: QDb.hh:154
QDbWatchdogThreadHandler * fThreadHandler
Definition: QDb.hh:156
QError UpdateWatchdogTimer()
Definition: QDb.cc:551
std::vector< int > DoQueryVectorInt(const std::string &query)
Definition: QDb.cc:245
const std::string fDbPort
Definition: QDb.hh:135
QError Connect()
Definition: QDb.cc:73
bool DoQueryBool(const std::string &query)
Definition: QDb.cc:213
QError DoQuery(const std::string &query, QDbTable &table)
Definition: QDb.cc:409
QError CheckSingleResult(const QDbTable &table, const std::string &query, const std::string &funcName, const std::string &file, int line) const
Definition: QDb.cc:456
boost::mutex * fMutex
Definition: QDb.hh:158
const std::string fDbHost
Definition: QDb.hh:134
QError ConnectBase()
Definition: QDb.cc:115
std::map< std::string, column > QDbTable
Definition: QDb.hh:34
int DoQueryInt(const std::string &query)
Definition: QDb.cc:197
const std::string fDbPwd
Definition: QDb.hh:137
bool Disconnect()
Definition: QDb.cc:151
double DoQueryDouble(const std::string &query)
Definition: QDb.cc:181
int DoExec(const std::string &Query)
Execute an INSERT, UPDATE, DELETE, FETCH, or MOVE statement.
Definition: QDb.cc:305
std::string DoQueryString(const std::string &query)
Definition: QDb.cc:229
QDb(const std::string &dbHost, const std::string &dbPort, const std::string &dbUsr, const std::string &dbPwd, const std::string &dbName)
Definition: QDb.cc:27
void RandomWaitBeforeConnect() const
Definition: QDb.cc:498
QError BasicQuery(const std::string &query, PGresult **result) const
Definition: QDb.cc:376
QError DoSQLNoReturn(const std::string &query)
Definition: QDb.cc:292
int fConnectAttemptTimeoutSec
Definition: QDb.hh:141
const std::string fDbUser
Definition: QDb.hh:136
QError Query(const std::string &query, PGresult **result)
Definition: QDb.cc:352
virtual ~QDb()
Definition: QDb.cc:51
int Insert(const std::string &tableName, const column &fields, const column &values)
Definition: QDb.cc:320
bool UnlockedDisconnect()
Definition: QDb.cc:158
QError InitWatchdogThread()
Definition: QDb.cc:523
bool IsConnected() const
Definition: QDb.cc:66
const std::string fDbName
Definition: QDb.hh:138
error class with error type and description
Definition: QError.hh:115
Variable Data Type.
Definition: QVdt.hh:26
thread-safe timer class
QError SetTimeoutSec(double timeout)