]> git.sesse.net Git - freerainbowtables/blob - BOINC software/BOINC server apps/distrrtgen_validator/validator.cpp
merged paths
[freerainbowtables] / BOINC software / BOINC server apps / distrrtgen_validator / validator.cpp
1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2008 University of California
4 //
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
9 //
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC.  If not, see <http://www.gnu.org/licenses/>.
17
18 // validator - check and validate results, and grant credit
19 //  -app appname
20 //  [-d debug_level]
21 //  [-one_pass_N_WU N]      // Validate only N WU in one pass, then exit
22 //  [-one_pass]             // make one pass through WU table, then exit
23 //  [-mod n i]              // process only WUs with (id mod n) == i
24 //  [-max_granted_credit X] // limit maximum granted credit to X
25 //  [-max_claimed_credit Y] // invalid if claims more than Y
26 //  [-grant_claimed_credit] // just grant whatever is claimed 
27 //  [-update_credited_job]  // add userid/wuid pair to credited_job table
28 //  [-credit_from_wu]       // get credit from WU XML
29 //
30 // This program must be linked with two project-specific functions:
31 // check_set() and check_pair().
32 // See doc/validate.php for a description.
33
34 using namespace std;
35
36 #include "config.h"
37 #include <unistd.h>
38 #include <climits>
39 #include <cmath>
40 #include <vector>
41 #include <cstdlib>
42 #include <string>
43 #include <signal.h>
44
45 #include "boinc_db.h"
46 #include "util.h"
47 #include "str_util.h"
48 #include "error_numbers.h"
49
50 #include "sched_config.h"
51 #include "sched_util.h"
52 #include "sched_msgs.h"
53 #include "validator.h"
54 #include "validate_util.h"
55 #ifdef GCL_SIMULATOR
56 #include "gcl_simulator.h"
57 #endif
58
59 #define LOCKFILE "validate.out"
60 #define PIDFILE  "validate.pid"
61
62 #define SELECT_LIMIT    1000
63 #define SLEEP_PERIOD    5
64
65 int sleep_interval = SLEEP_PERIOD;
66
67 typedef enum {
68     NEVER,
69     DELAYED,
70     IMMEDIATE,
71     NO_CHANGE
72 } TRANSITION_TIME;
73
74 extern int check_set(
75     vector<RESULT>&, WORKUNIT& wu, int& canonical, double& credit,
76     bool& retry
77 );
78 extern int check_pair(
79     RESULT & new_result, RESULT & canonical_result, bool& retry
80 );
81
82 char app_name[256];
83 DB_APP app;
84 int wu_id_modulus=0;
85 int wu_id_remainder=0;
86 int one_pass_N_WU=0;
87 bool one_pass = false;
88 double max_granted_credit = 0;
89 double max_claimed_credit = 0;
90 bool grant_claimed_credit = false;
91 bool update_credited_job = false;
92 bool credit_from_wu = false;
93 WORKUNIT* g_wup;
94
95 bool is_unreplicated(WORKUNIT& wu) {
96     return (wu.target_nresults == 1 && app.target_nresults > 1);
97 }
98
99 void update_error_rate(DB_HOST& host, bool valid) {
100     if (valid) {
101         host.error_rate *= 0.95;
102     } else {
103         host.error_rate += 0.1;
104     }
105     if (host.error_rate > 1) host.error_rate = 1;
106     if (host.error_rate <= 0) host.error_rate = 0.1;
107 }
108
109 // Here when a result has been validated and its granted_credit has been set.
110 // Grant credit to host, user and team, and update host error rate.
111 //
112 int is_valid(RESULT& result, WORKUNIT& wu) {
113     DB_USER user;
114     DB_HOST host;
115     DB_TEAM team;
116     DB_CREDITED_JOB credited_job;
117     int retval;
118     char buf[256];
119
120     retval = host.lookup_id(result.hostid);
121     if (retval) {
122         log_messages.printf(MSG_CRITICAL,
123             "[RESULT#%d] lookup of host %d failed %d\n",
124             result.id, result.hostid, retval
125         );
126         return retval;
127     }
128
129     retval = user.lookup_id(host.userid);
130     if (retval) {
131         log_messages.printf(MSG_CRITICAL,
132             "[RESULT#%d] lookup of user %d failed %d\n",
133             result.id, host.userid, retval
134         );
135         return retval;
136     }
137
138     update_average(
139         result.sent_time, result.granted_credit, CREDIT_HALF_LIFE,
140         user.expavg_credit, user.expavg_time
141     );
142     sprintf(
143         buf, "total_credit=total_credit+%f, expavg_credit=%f, expavg_time=%f",
144         result.granted_credit,  user.expavg_credit, user.expavg_time
145     ); 
146     retval = user.update_field(buf);
147     if (retval) {
148         log_messages.printf(MSG_CRITICAL,
149             "[RESULT#%d] update of user %d failed %d\n",
150             result.id, host.userid, retval
151         );
152     }
153
154     update_average(
155         result.sent_time, result.granted_credit, CREDIT_HALF_LIFE,
156         host.expavg_credit, host.expavg_time
157     );
158
159     double turnaround = result.received_time - result.sent_time;
160     compute_avg_turnaround(host, turnaround);
161
162         
163     // compute new credit per CPU time
164     //
165     retval = update_credit_per_cpu_sec(
166         result.granted_credit, result.cpu_time, host.credit_per_cpu_sec
167     );
168     if (retval) {
169         log_messages.printf(MSG_CRITICAL,
170             "[RESULT#%d][HOST#%d] claimed too much credit (%f) in too little CPU time (%f)\n",
171             result.id, result.hostid, result.granted_credit, result.cpu_time
172         );
173     }
174
175     double old_error_rate = host.error_rate;
176     if (!is_unreplicated(wu)) {
177         update_error_rate(host, true);
178     }
179     sprintf(
180         buf,
181         "total_credit=total_credit+%f, expavg_credit=%f, expavg_time=%f, avg_turnaround=%f, credit_per_cpu_sec=%f, error_rate=%f",
182         result.granted_credit,  host.expavg_credit, host.expavg_time, host.avg_turnaround, host.credit_per_cpu_sec, host.error_rate
183     );
184     retval = host.update_field(buf);
185     if (retval) {
186         log_messages.printf(MSG_CRITICAL,
187             "[RESULT#%d] update of host %d failed %d\n",
188             result.id, result.hostid, retval
189         );
190     }
191     log_messages.printf(MSG_DEBUG,
192         "[HOST#%d] error rate %f->%f\n",
193         host.id, old_error_rate, host.error_rate
194     );
195
196     if (user.teamid) {
197         retval = team.lookup_id(user.teamid);
198         if (retval) {
199             log_messages.printf(MSG_CRITICAL,
200                 "[RESULT#%d] lookup of team %d failed %d\n",
201                 result.id, user.teamid, retval
202             );
203             return retval;
204         }
205         update_average(result.sent_time, result.granted_credit, CREDIT_HALF_LIFE, team.expavg_credit, team.expavg_time);
206         sprintf(
207             buf, "total_credit=total_credit+%f, expavg_credit=%f, expavg_time=%f",
208             result.granted_credit,  team.expavg_credit, team.expavg_time
209         );
210         retval = team.update_field(buf);
211         if (retval) {
212             log_messages.printf(MSG_CRITICAL,
213                 "[RESULT#%d] update of team %d failed %d\n",
214                 result.id, team.id, retval
215             );
216         }
217     }
218
219     if (update_credited_job) {
220         credited_job.userid = user.id;
221         credited_job.workunitid = long(wu.opaque);
222         retval = credited_job.insert();
223         if (retval) {
224             log_messages.printf(MSG_CRITICAL,
225                 "[RESULT#%d] Warning: credited_job insert failed (userid: %d workunit: %f err: %d)\n",
226                 result.id, user.id, wu.opaque, retval
227             );
228         } else {
229             log_messages.printf(MSG_DEBUG,
230                 "[RESULT#%d %s] added credited_job record [WU#%d OPAQUE#%f USER#%d]\n",
231                 result.id, result.name, wu.id, wu.opaque, user.id
232             );
233         }
234     }
235
236     return 0;
237 }
238
239 int is_invalid(WORKUNIT& wu, RESULT& result) {
240     char buf[256];
241     int retval;
242     DB_HOST host;
243
244     retval = host.lookup_id(result.hostid);
245     if (retval) {
246         log_messages.printf(MSG_CRITICAL,
247             "[RESULT#%d] lookup of host %d failed %d\n",
248             result.id, result.hostid, retval
249         );
250         return retval;
251     }
252     double old_error_rate = host.error_rate;
253     if (!is_unreplicated(wu)) {
254         update_error_rate(host, false);
255     }
256     sprintf(buf, "error_rate=%f", host.error_rate);
257     retval = host.update_field(buf);
258     if (retval) {
259         log_messages.printf(MSG_CRITICAL,
260             "[RESULT#%d] update of host %d failed %d\n",
261             result.id, result.hostid, retval
262         );
263         return retval;
264     }
265     log_messages.printf(MSG_DEBUG,
266         "[HOST#%d] invalid result; error rate %f->%f\n",
267         host.id, old_error_rate, host.error_rate
268     );
269     return 0;
270 }
271
272 // Return zero iff we resolved the WU
273 //
274 int handle_wu(
275     DB_VALIDATOR_ITEM_SET& validator, std::vector<VALIDATOR_ITEM>& items
276 ) { 
277     int canonical_result_index = -1;
278     bool update_result, retry;
279     TRANSITION_TIME transition_time = NO_CHANGE;
280     int retval = 0, canonicalid = 0, x;
281     double credit = 0;
282     unsigned int i;
283
284     WORKUNIT& wu = items[0].wu;
285     g_wup = &wu;
286         vector<RESULT> results;
287         int nsuccess_results;
288
289         // Here if WU doesn't have a canonical result yet.
290         // Try to get one
291 /*
292         log_messages.printf(MSG_NORMAL,
293             "[WU#%d %s] handle_wu(): No canonical result yet\n",
294             wu.id, wu.name
295         );*/
296         ++log_messages;
297
298         // make a vector of only successful results
299         //
300         for (i=0; i<items.size(); i++) {
301             RESULT& result = items[i].res;
302
303             if ((result.server_state == RESULT_SERVER_STATE_OVER) &&
304                 (result.outcome == RESULT_OUTCOME_SUCCESS)
305             ) {
306                 results.push_back(result);
307             }
308
309         }
310
311         log_messages.printf(MSG_DEBUG,
312             "[WU#%d %s] Found %d successful results\n",
313             wu.id, wu.name, (int)results.size()
314         );
315         if (results.size() >= (unsigned int)wu.min_quorum) {
316             log_messages.printf(MSG_DEBUG,
317                 "[WU#%d %s] Enough for quorum, checking set.\n",
318                 wu.id, wu.name
319             );
320            
321             retval = check_set(results, wu, canonicalid, credit, retry);
322             if (retval) {
323                 log_messages.printf(MSG_CRITICAL,
324                     "[WU#%d %s] check_set returned %d, exiting\n",
325                     wu.id, wu.name, retval
326                 );
327                 return retval;
328             }
329             if (retry) transition_time = DELAYED;
330
331             if (credit_from_wu) {
332                 retval = get_credit_from_wu(wu, results, credit);
333                 if (retval) {
334                     log_messages.printf(MSG_CRITICAL,
335                         "[WU#%d %s] get_credit_from_wu returned %d\n",
336                         wu.id, wu.name, retval
337                     );
338                     return retval;
339                 }
340             }
341             if (max_granted_credit && credit>max_granted_credit) {
342                 credit = max_granted_credit;
343             }
344
345             // scan results.
346             // update as needed, and count the # of results
347             // that are still outcome=SUCCESS
348             // (some may have changed to VALIDATE_ERROR)
349             //
350             nsuccess_results = 0;
351             for (i=0; i<results.size(); i++) {
352                 update_result = false;
353                 RESULT& result = results[i];
354                 if (result.outcome == RESULT_OUTCOME_VALIDATE_ERROR) {
355                     transition_time = IMMEDIATE;
356                     update_result = true;
357                 } else {
358                     nsuccess_results++;
359                 }
360
361                 switch (result.validate_state) {
362                 case VALIDATE_STATE_VALID:
363                     // grant credit for valid results
364                     //
365                     update_result = true;
366                     if (result.granted_credit == 0) {
367                         result.granted_credit = grant_claimed_credit ? result.claimed_credit : credit;
368                         if (max_granted_credit && result.granted_credit > max_granted_credit) {
369                             result.granted_credit = max_granted_credit;
370                         }
371                     }
372                     retval = is_valid(result, wu);
373                     if (retval) {
374                         log_messages.printf(MSG_DEBUG,
375                             "[RESULT#%d %s] is_valid() failed: %d\n",
376                             result.id, result.name, retval
377                         );
378                     }
379                     log_messages.printf(MSG_NORMAL,
380                         "[RESULT#%d %s] Valid; granted %f credit [HOST#%d]\n",
381                         result.id, result.name, result.granted_credit,
382                         result.hostid
383                     );
384                     break;
385                 case VALIDATE_STATE_INVALID:
386                     log_messages.printf(MSG_NORMAL,
387                         "[RESULT#%d %s] Invalid [HOST#%d]\n",
388                         result.id, result.name, result.hostid
389                     );
390                     is_invalid(wu, result);
391                     update_result = true;
392                     break;
393                 case VALIDATE_STATE_INIT:
394                     log_messages.printf(MSG_NORMAL,
395                         "[RESULT#%d %s] Inconclusive [HOST#%d]\n",
396                         result.id, result.name, result.hostid
397                     );
398                     result.validate_state = VALIDATE_STATE_INCONCLUSIVE;
399                     update_result = true;
400                     break;
401                 }
402
403                 if (update_result) {
404                     retval = validator.update_result(result);
405                     if (retval) {
406                         log_messages.printf(MSG_CRITICAL,
407                             "[RESULT#%d %s] result.update() failed: %d\n",
408                             result.id, result.name, retval
409                         );
410                     }
411                 }
412
413             if (canonicalid) {
414                 // if we found a canonical result,
415                 // trigger the assimilator, but do NOT trigger
416                 // the transitioner - doing so creates a race condition
417                 //
418                 transition_time = NEVER;
419                 log_messages.printf(MSG_DEBUG,
420                     "[WU#%d %s] Found a canonical result: id=%d\n",
421                     wu.id, wu.name, canonicalid
422                 );
423                 wu.canonical_resultid = canonicalid;
424                 wu.canonical_credit = credit;
425                 wu.assimilate_state = ASSIMILATE_READY;
426
427                 // If found a canonical result, don't send any unsent results
428                 //
429                 for (i=0; i<items.size(); i++) {
430                     RESULT& result = items[i].res;
431
432                     if (result.server_state != RESULT_SERVER_STATE_UNSENT) {
433                         continue;
434                     }
435
436                     result.server_state = RESULT_SERVER_STATE_OVER;
437                     result.outcome = RESULT_OUTCOME_DIDNT_NEED;
438                     retval = validator.update_result(result);
439                     if (retval) {
440                         log_messages.printf(MSG_CRITICAL,
441                             "[RESULT#%d %s] result.update() failed: %d\n",
442                             result.id, result.name, retval
443                         );
444                     }
445                 }
446             } else {
447                 // here if no consensus.
448
449                 // check if #success results is too large
450                 //
451                 if (nsuccess_results > wu.max_success_results) {
452                     wu.error_mask |= WU_ERROR_TOO_MANY_SUCCESS_RESULTS;
453                     transition_time = IMMEDIATE;
454                 }
455
456                 // if #success results == than target_nresults,
457                 // we need more results, so bump target_nresults
458                 // NOTE: nsuccess_results should never be > target_nresults,
459                 // but accommodate that if it should happen
460                 //
461                 if (nsuccess_results >= wu.target_nresults) {
462                     wu.target_nresults = nsuccess_results+1;
463                     transition_time = IMMEDIATE;
464                 }
465             }
466         }
467     }
468
469     --log_messages;
470
471     switch (transition_time) {
472     case IMMEDIATE:
473         wu.transition_time = time(0);
474         break;
475     case DELAYED:
476         x = time(0) + 6*3600;
477         if (x < wu.transition_time) wu.transition_time = x;
478         break;
479     case NEVER:
480         wu.transition_time = INT_MAX;
481         break;
482     case NO_CHANGE:
483         break;
484     }
485
486     wu.need_validate = 0;
487     
488     retval = validator.update_workunit(wu);
489     if (retval) {
490         log_messages.printf(MSG_CRITICAL,
491             "[WU#%d %s] update_workunit() failed: %d; exiting\n",
492             wu.id, wu.name, retval
493         );
494         return retval;
495     }
496     return 0;
497 }
498
499 // make one pass through the workunits with need_validate set.
500 // return true if there were any
501 //
502 bool do_validate_scan() {
503     DB_VALIDATOR_ITEM_SET validator;
504     std::vector<VALIDATOR_ITEM> items;
505     bool found=false;
506     int retval;
507
508     // loop over entries that need to be checked
509     //
510     while (1) {
511         retval = validator.enumerate(
512             app.id, one_pass_N_WU?one_pass_N_WU:SELECT_LIMIT,
513             wu_id_modulus, wu_id_remainder,
514             items
515         );
516         if (retval) {
517             if (retval != ERR_DB_NOT_FOUND) {
518                 log_messages.printf(MSG_DEBUG,
519                     "DB connection lost, exiting\n"
520                 );
521                 exit(0);
522             }
523             break;
524         }
525         retval = handle_wu(validator, items);
526         if (!retval) found = true;
527     }
528     return found;
529 }
530
531 int main_loop() {
532     int retval;
533     bool did_something;
534     char buf[256];
535
536     retval = boinc_db.open(config.db_name, config.db_host, config.db_user, config.db_passwd);
537     if (retval) {
538         log_messages.printf(MSG_CRITICAL, "boinc_db.open failed: %d\n", retval);
539         exit(1);
540     }
541
542     sprintf(buf, "where name='%s'", app_name);
543     retval = app.lookup(buf);
544     if (retval) {
545         log_messages.printf(MSG_CRITICAL, "can't find app %s\n", app_name);
546         exit(1);
547     }
548
549     while (1) {
550         check_stop_daemons();
551         did_something = do_validate_scan();
552         if (!did_something) {
553             if (one_pass) break;
554 #ifdef GCL_SIMULATOR
555              char nameforsim[64];
556              sprintf(nameforsim, "validator%i", app.id);
557              continue_simulation(nameforsim);
558              signal(SIGUSR2, simulator_signal_handler);
559              pause();
560 #else
561             sleep(sleep_interval);
562 #endif
563         }
564     }
565     return 0;
566 }
567
568 // For use by user routines check_set() and check_match() that link to
569 // this code.
570 int boinc_validator_debuglevel=0;
571
572 int main(int argc, char** argv) {
573     int i, retval;
574
575 #if 0
576     int mypid=getpid();
577     char debugcmd[512];
578     sprintf(debugcmd, "ddd %s %d &", argv[0], mypid);
579     system(debugcmd);
580     sleep(30);
581 #endif
582
583     const char *usage = 
584       "\nUsage: %s -app <app-name> [OPTIONS]\n"
585       "Start validator for application <app-name>\n\n"
586       "Optional arguments:\n"
587       "  -one_pass_N_WU N       Validate at most N WUs, then exit\n"
588       "  -one_pass              Make one pass through WU table, then exit\n"
589       "  -mod n i               Process only WUs with (id mod n) == i\n"
590       "  -max_claimed_credit X  If a result claims more credit than this, mark it as invalid\n"
591       "  -max_granted_credit X  Grant no more than this amount of credit to a result\n"
592       "  -grant_claimed_credit  Grant the claimed credit, regardless of what other results for this workunit claimed\n"
593       "  -update_credited_job   Add record to credited_job table after granting credit\n"
594       "  -credit_from_wu        Credit is specified in WU XML\n"
595       "  -sleep_interval n      Set sleep-interval to n\n"
596       "  -d level               Set debug-level\n\n";
597
598     if ((argc > 1) && (!strcmp(argv[1], "-h") || !strcmp(argv[1], "--help"))) {
599       printf (usage, argv[0] );
600       exit(1);
601     }
602
603
604     check_stop_daemons();
605
606     for (i=1; i<argc; i++) {
607         if (!strcmp(argv[i], "-one_pass_N_WU")) {
608             one_pass_N_WU = atoi(argv[++i]);
609             one_pass = true;
610         } else if (!strcmp(argv[i], "-sleep_interval")) {
611             sleep_interval = atoi(argv[++i]);
612         } else if (!strcmp(argv[i], "-one_pass")) {
613             one_pass = true;
614         } else if (!strcmp(argv[i], "-app")) {
615             strcpy(app_name, argv[++i]);
616         } else if (!strcmp(argv[i], "-d")) {
617             boinc_validator_debuglevel=atoi(argv[++i]);
618             log_messages.set_debug_level(boinc_validator_debuglevel);
619         } else if (!strcmp(argv[i], "-mod")) {
620             wu_id_modulus = atoi(argv[++i]);
621             wu_id_remainder = atoi(argv[++i]);
622         } else if (!strcmp(argv[i], "-max_granted_credit")) {
623             max_granted_credit = atof(argv[++i]);
624         } else if (!strcmp(argv[i], "-max_claimed_credit")) {
625             max_claimed_credit = atof(argv[++i]);
626         } else if (!strcmp(argv[i], "-grant_claimed_credit")) {
627             grant_claimed_credit = true;
628         } else if (!strcmp(argv[i], "-update_credited_job")) {
629             update_credited_job = true;
630         } else if (!strcmp(argv[i], "-credit_from_wu")) {
631             credit_from_wu = true;
632         } else {
633             fprintf(stderr, "Invalid option '%s'\nTry `%s --help` for more information\n", argv[i], argv[0]);
634             log_messages.printf(MSG_CRITICAL, "unrecognized arg: %s\n", argv[i]);
635             exit(1);
636         }
637     }
638
639     // -app is required
640     if (app_name[0] == 0) {
641       fprintf (stderr, "\nERROR: use '-app' to specify the application to run the validator for.\n");
642       printf (usage, argv[0] );
643       exit(1);      
644     }
645
646     retval = config.parse_file("..");
647     if (retval) {
648         log_messages.printf(MSG_CRITICAL,
649             "Can't parse ../config.xml: %s\n", boincerror(retval)
650         );
651         exit(1);
652     }
653
654     log_messages.printf(MSG_NORMAL,
655         "Starting validator, debug level %d\n", log_messages.debug_level
656     );
657     if (wu_id_modulus) {
658         log_messages.printf(MSG_NORMAL,
659             "Modulus %d, remainder %d\n", wu_id_modulus, wu_id_remainder
660         );
661     }
662
663     install_stop_signal_handler();
664
665     main_loop();
666 }
667
668 const char *BOINC_RCSID_634dbda0b9 = "$Id: validator.cpp 16097 2008-09-30 18:21:41Z davea $";