1 // This file is part of BOINC.
2 // http://boinc.berkeley.edu
3 // Copyright (C) 2008 University of California
5 // BOINC is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Lesser General Public License
7 // as published by the Free Software Foundation,
8 // either version 3 of the License, or (at your option) any later version.
10 // BOINC is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 // See the GNU Lesser General Public License for more details.
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with BOINC. If not, see <http://www.gnu.org/licenses/>.
18 // validator - check and validate results, and grant credit
21 // [-one_pass_N_WU N] // Validate only N WU in one pass, then exit
22 // [-one_pass] // make one pass through WU table, then exit
23 // [-mod n i] // process only WUs with (id mod n) == i
24 // [-max_granted_credit X] // limit maximum granted credit to X
25 // [-max_claimed_credit Y] // invalid if claims more than Y
26 // [-grant_claimed_credit] // just grant whatever is claimed
27 // [-update_credited_job] // add userid/wuid pair to credited_job table
28 // [-credit_from_wu] // get credit from WU XML
30 // This program must be linked with two project-specific functions:
31 // check_set() and check_pair().
32 // See doc/validate.php for a description.
48 #include "error_numbers.h"
50 #include "sched_config.h"
51 #include "sched_util.h"
52 #include "sched_msgs.h"
53 #include "validator.h"
54 #include "validate_util.h"
56 #include "gcl_simulator.h"
59 #define LOCKFILE "validate.out"
60 #define PIDFILE "validate.pid"
62 #define SELECT_LIMIT 1000
63 #define SLEEP_PERIOD 5
65 int sleep_interval = SLEEP_PERIOD;
75 vector<RESULT>&, WORKUNIT& wu, int& canonical, double& credit,
78 extern int check_pair(
79 RESULT & new_result, RESULT & canonical_result, bool& retry
85 int wu_id_remainder=0;
87 bool one_pass = false;
88 double max_granted_credit = 0;
89 double max_claimed_credit = 0;
90 bool grant_claimed_credit = false;
91 bool update_credited_job = false;
92 bool credit_from_wu = false;
95 bool is_unreplicated(WORKUNIT& wu) {
96 return (wu.target_nresults == 1 && app.target_nresults > 1);
99 void update_error_rate(DB_HOST& host, bool valid) {
101 host.error_rate *= 0.95;
103 host.error_rate += 0.1;
105 if (host.error_rate > 1) host.error_rate = 1;
106 if (host.error_rate <= 0) host.error_rate = 0.1;
109 // Here when a result has been validated and its granted_credit has been set.
110 // Grant credit to host, user and team, and update host error rate.
112 int is_valid(RESULT& result, WORKUNIT& wu) {
116 DB_CREDITED_JOB credited_job;
120 retval = host.lookup_id(result.hostid);
122 log_messages.printf(MSG_CRITICAL,
123 "[RESULT#%d] lookup of host %d failed %d\n",
124 result.id, result.hostid, retval
129 retval = user.lookup_id(host.userid);
131 log_messages.printf(MSG_CRITICAL,
132 "[RESULT#%d] lookup of user %d failed %d\n",
133 result.id, host.userid, retval
139 result.sent_time, result.granted_credit, CREDIT_HALF_LIFE,
140 user.expavg_credit, user.expavg_time
143 buf, "total_credit=total_credit+%f, expavg_credit=%f, expavg_time=%f",
144 result.granted_credit, user.expavg_credit, user.expavg_time
146 retval = user.update_field(buf);
148 log_messages.printf(MSG_CRITICAL,
149 "[RESULT#%d] update of user %d failed %d\n",
150 result.id, host.userid, retval
155 result.sent_time, result.granted_credit, CREDIT_HALF_LIFE,
156 host.expavg_credit, host.expavg_time
159 double turnaround = result.received_time - result.sent_time;
160 compute_avg_turnaround(host, turnaround);
163 // compute new credit per CPU time
165 retval = update_credit_per_cpu_sec(
166 result.granted_credit, result.cpu_time, host.credit_per_cpu_sec
169 log_messages.printf(MSG_CRITICAL,
170 "[RESULT#%d][HOST#%d] claimed too much credit (%f) in too little CPU time (%f)\n",
171 result.id, result.hostid, result.granted_credit, result.cpu_time
175 double old_error_rate = host.error_rate;
176 if (!is_unreplicated(wu)) {
177 update_error_rate(host, true);
181 "total_credit=total_credit+%f, expavg_credit=%f, expavg_time=%f, avg_turnaround=%f, credit_per_cpu_sec=%f, error_rate=%f",
182 result.granted_credit, host.expavg_credit, host.expavg_time, host.avg_turnaround, host.credit_per_cpu_sec, host.error_rate
184 retval = host.update_field(buf);
186 log_messages.printf(MSG_CRITICAL,
187 "[RESULT#%d] update of host %d failed %d\n",
188 result.id, result.hostid, retval
191 log_messages.printf(MSG_DEBUG,
192 "[HOST#%d] error rate %f->%f\n",
193 host.id, old_error_rate, host.error_rate
197 retval = team.lookup_id(user.teamid);
199 log_messages.printf(MSG_CRITICAL,
200 "[RESULT#%d] lookup of team %d failed %d\n",
201 result.id, user.teamid, retval
205 update_average(result.sent_time, result.granted_credit, CREDIT_HALF_LIFE, team.expavg_credit, team.expavg_time);
207 buf, "total_credit=total_credit+%f, expavg_credit=%f, expavg_time=%f",
208 result.granted_credit, team.expavg_credit, team.expavg_time
210 retval = team.update_field(buf);
212 log_messages.printf(MSG_CRITICAL,
213 "[RESULT#%d] update of team %d failed %d\n",
214 result.id, team.id, retval
219 if (update_credited_job) {
220 credited_job.userid = user.id;
221 credited_job.workunitid = long(wu.opaque);
222 retval = credited_job.insert();
224 log_messages.printf(MSG_CRITICAL,
225 "[RESULT#%d] Warning: credited_job insert failed (userid: %d workunit: %f err: %d)\n",
226 result.id, user.id, wu.opaque, retval
229 log_messages.printf(MSG_DEBUG,
230 "[RESULT#%d %s] added credited_job record [WU#%d OPAQUE#%f USER#%d]\n",
231 result.id, result.name, wu.id, wu.opaque, user.id
239 int is_invalid(WORKUNIT& wu, RESULT& result) {
244 retval = host.lookup_id(result.hostid);
246 log_messages.printf(MSG_CRITICAL,
247 "[RESULT#%d] lookup of host %d failed %d\n",
248 result.id, result.hostid, retval
252 double old_error_rate = host.error_rate;
253 if (!is_unreplicated(wu)) {
254 update_error_rate(host, false);
256 sprintf(buf, "error_rate=%f", host.error_rate);
257 retval = host.update_field(buf);
259 log_messages.printf(MSG_CRITICAL,
260 "[RESULT#%d] update of host %d failed %d\n",
261 result.id, result.hostid, retval
265 log_messages.printf(MSG_DEBUG,
266 "[HOST#%d] invalid result; error rate %f->%f\n",
267 host.id, old_error_rate, host.error_rate
272 // Return zero iff we resolved the WU
275 DB_VALIDATOR_ITEM_SET& validator, std::vector<VALIDATOR_ITEM>& items
277 int canonical_result_index = -1;
278 bool update_result, retry;
279 TRANSITION_TIME transition_time = NO_CHANGE;
280 int retval = 0, canonicalid = 0, x;
284 WORKUNIT& wu = items[0].wu;
286 vector<RESULT> results;
287 int nsuccess_results;
289 // Here if WU doesn't have a canonical result yet.
292 log_messages.printf(MSG_NORMAL,
293 "[WU#%d %s] handle_wu(): No canonical result yet\n",
298 // make a vector of only successful results
300 for (i=0; i<items.size(); i++) {
301 RESULT& result = items[i].res;
303 if ((result.server_state == RESULT_SERVER_STATE_OVER) &&
304 (result.outcome == RESULT_OUTCOME_SUCCESS)
306 results.push_back(result);
311 log_messages.printf(MSG_DEBUG,
312 "[WU#%d %s] Found %d successful results\n",
313 wu.id, wu.name, (int)results.size()
315 if (results.size() >= (unsigned int)wu.min_quorum) {
316 log_messages.printf(MSG_DEBUG,
317 "[WU#%d %s] Enough for quorum, checking set.\n",
321 retval = check_set(results, wu, canonicalid, credit, retry);
323 log_messages.printf(MSG_CRITICAL,
324 "[WU#%d %s] check_set returned %d, exiting\n",
325 wu.id, wu.name, retval
329 if (retry) transition_time = DELAYED;
331 if (credit_from_wu) {
332 retval = get_credit_from_wu(wu, results, credit);
334 log_messages.printf(MSG_CRITICAL,
335 "[WU#%d %s] get_credit_from_wu returned %d\n",
336 wu.id, wu.name, retval
341 if (max_granted_credit && credit>max_granted_credit) {
342 credit = max_granted_credit;
346 // update as needed, and count the # of results
347 // that are still outcome=SUCCESS
348 // (some may have changed to VALIDATE_ERROR)
350 nsuccess_results = 0;
351 for (i=0; i<results.size(); i++) {
352 update_result = false;
353 RESULT& result = results[i];
354 if (result.outcome == RESULT_OUTCOME_VALIDATE_ERROR) {
355 transition_time = IMMEDIATE;
356 update_result = true;
361 switch (result.validate_state) {
362 case VALIDATE_STATE_VALID:
363 // grant credit for valid results
365 update_result = true;
366 if (result.granted_credit == 0) {
367 result.granted_credit = grant_claimed_credit ? result.claimed_credit : credit;
368 if (max_granted_credit && result.granted_credit > max_granted_credit) {
369 result.granted_credit = max_granted_credit;
372 retval = is_valid(result, wu);
374 log_messages.printf(MSG_DEBUG,
375 "[RESULT#%d %s] is_valid() failed: %d\n",
376 result.id, result.name, retval
379 log_messages.printf(MSG_NORMAL,
380 "[RESULT#%d %s] Valid; granted %f credit [HOST#%d]\n",
381 result.id, result.name, result.granted_credit,
385 case VALIDATE_STATE_INVALID:
386 log_messages.printf(MSG_NORMAL,
387 "[RESULT#%d %s] Invalid [HOST#%d]\n",
388 result.id, result.name, result.hostid
390 is_invalid(wu, result);
391 update_result = true;
393 case VALIDATE_STATE_INIT:
394 log_messages.printf(MSG_NORMAL,
395 "[RESULT#%d %s] Inconclusive [HOST#%d]\n",
396 result.id, result.name, result.hostid
398 result.validate_state = VALIDATE_STATE_INCONCLUSIVE;
399 update_result = true;
404 retval = validator.update_result(result);
406 log_messages.printf(MSG_CRITICAL,
407 "[RESULT#%d %s] result.update() failed: %d\n",
408 result.id, result.name, retval
414 // if we found a canonical result,
415 // trigger the assimilator, but do NOT trigger
416 // the transitioner - doing so creates a race condition
418 transition_time = NEVER;
419 log_messages.printf(MSG_DEBUG,
420 "[WU#%d %s] Found a canonical result: id=%d\n",
421 wu.id, wu.name, canonicalid
423 wu.canonical_resultid = canonicalid;
424 wu.canonical_credit = credit;
425 wu.assimilate_state = ASSIMILATE_READY;
427 // If found a canonical result, don't send any unsent results
429 for (i=0; i<items.size(); i++) {
430 RESULT& result = items[i].res;
432 if (result.server_state != RESULT_SERVER_STATE_UNSENT) {
436 result.server_state = RESULT_SERVER_STATE_OVER;
437 result.outcome = RESULT_OUTCOME_DIDNT_NEED;
438 retval = validator.update_result(result);
440 log_messages.printf(MSG_CRITICAL,
441 "[RESULT#%d %s] result.update() failed: %d\n",
442 result.id, result.name, retval
447 // here if no consensus.
449 // check if #success results is too large
451 if (nsuccess_results > wu.max_success_results) {
452 wu.error_mask |= WU_ERROR_TOO_MANY_SUCCESS_RESULTS;
453 transition_time = IMMEDIATE;
456 // if #success results == than target_nresults,
457 // we need more results, so bump target_nresults
458 // NOTE: nsuccess_results should never be > target_nresults,
459 // but accommodate that if it should happen
461 if (nsuccess_results >= wu.target_nresults) {
462 wu.target_nresults = nsuccess_results+1;
463 transition_time = IMMEDIATE;
471 switch (transition_time) {
473 wu.transition_time = time(0);
476 x = time(0) + 6*3600;
477 if (x < wu.transition_time) wu.transition_time = x;
480 wu.transition_time = INT_MAX;
486 wu.need_validate = 0;
488 retval = validator.update_workunit(wu);
490 log_messages.printf(MSG_CRITICAL,
491 "[WU#%d %s] update_workunit() failed: %d; exiting\n",
492 wu.id, wu.name, retval
499 // make one pass through the workunits with need_validate set.
500 // return true if there were any
502 bool do_validate_scan() {
503 DB_VALIDATOR_ITEM_SET validator;
504 std::vector<VALIDATOR_ITEM> items;
508 // loop over entries that need to be checked
511 retval = validator.enumerate(
512 app.id, one_pass_N_WU?one_pass_N_WU:SELECT_LIMIT,
513 wu_id_modulus, wu_id_remainder,
517 if (retval != ERR_DB_NOT_FOUND) {
518 log_messages.printf(MSG_DEBUG,
519 "DB connection lost, exiting\n"
525 retval = handle_wu(validator, items);
526 if (!retval) found = true;
536 retval = boinc_db.open(config.db_name, config.db_host, config.db_user, config.db_passwd);
538 log_messages.printf(MSG_CRITICAL, "boinc_db.open failed: %d\n", retval);
542 sprintf(buf, "where name='%s'", app_name);
543 retval = app.lookup(buf);
545 log_messages.printf(MSG_CRITICAL, "can't find app %s\n", app_name);
550 check_stop_daemons();
551 did_something = do_validate_scan();
552 if (!did_something) {
556 sprintf(nameforsim, "validator%i", app.id);
557 continue_simulation(nameforsim);
558 signal(SIGUSR2, simulator_signal_handler);
561 sleep(sleep_interval);
568 // For use by user routines check_set() and check_match() that link to
570 int boinc_validator_debuglevel=0;
572 int main(int argc, char** argv) {
578 sprintf(debugcmd, "ddd %s %d &", argv[0], mypid);
584 "\nUsage: %s -app <app-name> [OPTIONS]\n"
585 "Start validator for application <app-name>\n\n"
586 "Optional arguments:\n"
587 " -one_pass_N_WU N Validate at most N WUs, then exit\n"
588 " -one_pass Make one pass through WU table, then exit\n"
589 " -mod n i Process only WUs with (id mod n) == i\n"
590 " -max_claimed_credit X If a result claims more credit than this, mark it as invalid\n"
591 " -max_granted_credit X Grant no more than this amount of credit to a result\n"
592 " -grant_claimed_credit Grant the claimed credit, regardless of what other results for this workunit claimed\n"
593 " -update_credited_job Add record to credited_job table after granting credit\n"
594 " -credit_from_wu Credit is specified in WU XML\n"
595 " -sleep_interval n Set sleep-interval to n\n"
596 " -d level Set debug-level\n\n";
598 if ((argc > 1) && (!strcmp(argv[1], "-h") || !strcmp(argv[1], "--help"))) {
599 printf (usage, argv[0] );
604 check_stop_daemons();
606 for (i=1; i<argc; i++) {
607 if (!strcmp(argv[i], "-one_pass_N_WU")) {
608 one_pass_N_WU = atoi(argv[++i]);
610 } else if (!strcmp(argv[i], "-sleep_interval")) {
611 sleep_interval = atoi(argv[++i]);
612 } else if (!strcmp(argv[i], "-one_pass")) {
614 } else if (!strcmp(argv[i], "-app")) {
615 strcpy(app_name, argv[++i]);
616 } else if (!strcmp(argv[i], "-d")) {
617 boinc_validator_debuglevel=atoi(argv[++i]);
618 log_messages.set_debug_level(boinc_validator_debuglevel);
619 } else if (!strcmp(argv[i], "-mod")) {
620 wu_id_modulus = atoi(argv[++i]);
621 wu_id_remainder = atoi(argv[++i]);
622 } else if (!strcmp(argv[i], "-max_granted_credit")) {
623 max_granted_credit = atof(argv[++i]);
624 } else if (!strcmp(argv[i], "-max_claimed_credit")) {
625 max_claimed_credit = atof(argv[++i]);
626 } else if (!strcmp(argv[i], "-grant_claimed_credit")) {
627 grant_claimed_credit = true;
628 } else if (!strcmp(argv[i], "-update_credited_job")) {
629 update_credited_job = true;
630 } else if (!strcmp(argv[i], "-credit_from_wu")) {
631 credit_from_wu = true;
633 fprintf(stderr, "Invalid option '%s'\nTry `%s --help` for more information\n", argv[i], argv[0]);
634 log_messages.printf(MSG_CRITICAL, "unrecognized arg: %s\n", argv[i]);
640 if (app_name[0] == 0) {
641 fprintf (stderr, "\nERROR: use '-app' to specify the application to run the validator for.\n");
642 printf (usage, argv[0] );
646 retval = config.parse_file("..");
648 log_messages.printf(MSG_CRITICAL,
649 "Can't parse ../config.xml: %s\n", boincerror(retval)
654 log_messages.printf(MSG_NORMAL,
655 "Starting validator, debug level %d\n", log_messages.debug_level
658 log_messages.printf(MSG_NORMAL,
659 "Modulus %d, remainder %d\n", wu_id_modulus, wu_id_remainder
663 install_stop_signal_handler();
668 const char *BOINC_RCSID_634dbda0b9 = "$Id: validator.cpp 16097 2008-09-30 18:21:41Z davea $";