refactor(util/task): do not propagate errors

This commit is contained in:
Gabriel Ebner 2017-02-24 14:48:22 +01:00
parent 1182d8e7f7
commit 3eba8d3ffc
12 changed files with 100 additions and 81 deletions

View file

@ -615,8 +615,8 @@ static expr elaborate_proof(
auto_reporting_info_manager_scope scope4(file_name, use_info_manager);
try {
bool recover_froerrors = true;
elaborator elab(decl_env, opts, local_pp_name(fn), mctx, lctx, recover_froerrors);
bool recover_from_errors = true;
elaborator elab(decl_env, opts, local_pp_name(fn), mctx, lctx, recover_from_errors);
expr val, type;
// TODO(Leo): create an aux function for retrieving this info
@ -648,7 +648,7 @@ static expr elaborate_proof(
ERROR);
error_msg.set_exception(ex);
error_msg.report();
throw ex;
return mk_sorry(final_type);
}
}
@ -668,8 +668,8 @@ static void check_example(environment const & decl_env, options const & opts,
name decl_name = "_example";
try {
bool recover_froerrors = true;
elaborator elab(decl_env, opts, decl_name, mctx, lctx, recover_froerrors);
bool recover_from_errors = true;
elaborator elab(decl_env, opts, decl_name, mctx, lctx, recover_from_errors);
expr val, type;
std::tie(val, type) = elab.elaborate_with_type(val0, mlocal_type(fn));

View file

@ -5,6 +5,7 @@ Released under Apache 2.0 license as described in the file LICENSE.
Author: Gabriel Ebner
*/
#include "library/library_task_builder.h"
#include "library/message_builder.h"
namespace lean {
@ -30,4 +31,25 @@ std::unique_ptr<gtask_imp> library_scopes::operator()(std::unique_ptr<gtask_imp>
std::forward<std::unique_ptr<gtask_imp>>(base), m_lt));
}
struct exception_reporter_imp : public delegating_task_imp {
exception_reporter_imp(std::unique_ptr<gtask_imp> && base) :
delegating_task_imp(std::forward<std::unique_ptr<gtask_imp>>(base)) {}
void execute(void * result) override {
try {
delegating_task_imp::execute(result);
} catch (std::exception & ex) {
report_message(message(logtree().get_location().m_file_name,
logtree().get_location().m_range.m_begin,
ERROR, ex.what()));
throw;
}
}
};
std::unique_ptr<gtask_imp> exception_reporter::operator()(std::unique_ptr<gtask_imp> && base) {
return std::unique_ptr<gtask_imp>(new exception_reporter_imp(
std::forward<std::unique_ptr<gtask_imp>>(base)));
}
}

View file

@ -20,6 +20,10 @@ struct library_scopes {
std::unique_ptr<gtask_imp> operator()(std::unique_ptr<gtask_imp> &&);
};
struct exception_reporter {
std::unique_ptr<gtask_imp> operator()(std::unique_ptr<gtask_imp> &&);
};
template <class Res>
task<Res> add_library_task(task_builder<Res> && builder, std::string const & description, bool add_producer = true) {
auto lt = logtree().mk_child({}, description, logtree().get_location());

View file

@ -413,16 +413,25 @@ environment add(environment const & env, certified_declaration const & d) {
new_env = mark_noncomputable(new_env, _d.get_name());
new_env = update_module_defs(new_env, _d);
new_env = add(new_env, std::make_shared<decl_modification>(_d, env.trust_lvl()));
pos_info pos { g_curr_line, g_curr_column };
add_library_task(task_builder<unit>([_d, pos] {
if (_d.is_theorem()) {
// report errors from kernel type-checker
add_library_task(task_builder<unit>([_d] { _d.get_value(); return unit(); })
.wrap(exception_reporter())
.depends_on(_d.is_theorem() ? _d.get_value_task() : nullptr));
}
add_library_task(task_builder<unit>([_d] {
if (has_sorry(_d)) {
if (optional<name> n = should_report_sorry(_d.get_name())) {
report_message(message(logtree().get_location().m_file_name, pos, WARNING,
report_message(message(logtree().get_location().m_file_name,
logtree().get_location().m_range.m_begin, WARNING,
(sstream() << "declaration '" << *n << "' uses sorry").str()));
}
}
return unit {};
}).depends_on(_d.is_theorem() ? _d.get_value_task() : nullptr));
return add_decl_pos_info(new_env, _d.get_name());
}

View file

@ -110,57 +110,34 @@ void mt_task_queue::spawn_worker() {
}
void mt_task_queue::handle_finished(gtask const & t) {
lean_assert(get_state(t).load() > task_state::Running);
lean_assert(get_data(t));
lean_assert(get_data(t)->m_sched_info);
m_waiting.erase(t);
get_sched_info(t).m_has_finished.notify_all();
if (get_state(t).load() == task_state::Success) {
for (auto & rdep : get_sched_info(t).m_reverse_deps) {
switch (get_state(rdep).load()) {
case task_state::Waiting:
if (check_deps(rdep)) {
m_waiting.erase(rdep);
if (get_state(rdep).load() < task_state::Running) {
lean_assert(get_data(rdep));
if (get_data(rdep)->m_flags.m_needs_execution) {
enqueue(rdep);
} else {
get_state(rdep) = task_state::Success;
handle_finished(rdep);
}
for (auto & rdep : get_sched_info(t).m_reverse_deps) {
switch (get_state(rdep).load()) {
case task_state::Waiting: case task_state::Queued:
if (check_deps(rdep)) {
m_waiting.erase(rdep);
if (get_state(rdep).load() < task_state::Running) {
lean_assert(get_data(rdep));
if (get_data(rdep)->m_flags.m_needs_execution) {
enqueue(rdep);
} else {
get_state(rdep) = task_state::Success;
handle_finished(rdep);
}
}
break;
case task_state::Failed: case task_state::Queued:
// TODO(gabriel): why???
m_waiting.erase(rdep);
break;
default:
lean_unreachable();
}
}
} else {
propagate_failure(t);
}
clear(t);
}
void mt_task_queue::propagate_failure(gtask const & t) {
lean_assert(get_state(t).load() == task_state::Failed);
m_waiting.erase(t);
if (get_data(t) && get_data(t)->m_sched_info) {
get_sched_info(t).m_has_finished.notify_all();
for (auto & rdep : get_sched_info(t).m_reverse_deps) {
if (get_data(rdep) && !get_data(rdep)->m_flags.m_propagate_errors_from_dependencies)
continue;
switch (get_state(rdep).load()) {
case task_state::Waiting: case task_state::Queued:
fail(rdep, t);
propagate_failure(rdep);
break;
default: break;
}
}
break;
case task_state::Failed:
// TODO(gabriel): removed failed tasks from reverse dependency lists?
m_waiting.erase(rdep);
break;
default: lean_unreachable();
}
}
@ -250,11 +227,6 @@ bool mt_task_queue::check_deps(gtask const & t) {
case task_state::Success:
break;
case task_state::Failed:
if (get_data(t)->m_flags.m_propagate_errors_from_dependencies) {
fail(t, dep);
propagate_failure(t);
return true;
}
break;
default: lean_unreachable();
}
@ -268,7 +240,6 @@ void mt_task_queue::wait_for_finish(gtask const & t) {
submit_core(t, get_default_prio());
while (get_state(t).load() <= task_state::Running) {
if (g_current_task) {
// std::cerr << "waiting: " << g_current_task->description() << " -> " << t.description() << std::endl;
scoped_add<int> inc_required(m_required_workers, +1);
if (m_sleeping_workers == 0) {
spawn_worker();
@ -293,7 +264,7 @@ void mt_task_queue::cancel_core(gtask const & t) {
m_waiting.erase(t);
case task_state::Created: case task_state::Queued:
fail(t, std::make_exception_ptr(cancellation_exception()));
propagate_failure(t);
handle_finished(t);
return;
default: return;
}

View file

@ -41,7 +41,6 @@ class mt_task_queue : public task_queue {
void enqueue(gtask const &);
bool check_deps(gtask const &);
void propagate_failure(gtask const &);
void submit_core(gtask const &, unsigned);
void bump_prio(gtask const &, unsigned);
void cancel_core(gtask const &);

View file

@ -81,7 +81,7 @@ vm_obj vm_task_map(vm_obj const &, vm_obj const &, vm_obj const & fn, vm_obj con
vm_state state(env, opts);
scope_vm_state scope_vm(state);
return ts_vm_obj(state.invoke(ts_fn.to_vm_obj(), arg.to_vm_obj()));
})));
}).wrap(exception_reporter())));
}
vm_obj vm_task_flatten(vm_obj const &, vm_obj const & o) {

View file

@ -502,7 +502,6 @@ void server::handle_complete(cmd_req const & req) {
auto complete_gen_task =
task_builder<json>([=] { return autocomplete(mod_info, skip_completions, pos); })
.set_cancellation_token(m_bg_task_ctok)
// .wrap(library_scopes())
.build();
taskq().submit(task_builder<unit>([this, req, complete_gen_task] {
@ -512,7 +511,7 @@ void server::handle_complete(cmd_req const & req) {
send_msg(cmd_res(req.m_seq_num, std::string(ex.what())));
}
return unit{};
}).depends_on(complete_gen_task).ignore_dependency_errors().build());
}).depends_on(complete_gen_task).build());
}
static void get_info_managers(log_tree::node const & n, std::vector<info_manager> & infoms) {
@ -564,7 +563,6 @@ void server::handle_info(server::cmd_req const & req) {
mod_info->m_result, [] (buffer<gtask> & deps, module_info::parse_result const & res) {
deps.push_back(res.m_loaded_module->m_env);
})).set_cancellation_token(m_bg_task_ctok)
// .wrap(library_scopes())
.build();
taskq().submit(task_builder<unit>([this, req, info_gen_task] {
@ -574,7 +572,7 @@ void server::handle_info(server::cmd_req const & req) {
send_msg(cmd_res(req.m_seq_num, std::string(ex.what())));
}
return unit{};
}).depends_on(info_gen_task).ignore_dependency_errors().build());
}).depends_on(info_gen_task).build());
}
std::tuple<std::string, module_src, time_t> server::load_module(module_id const & id, bool can_use_olean) {

View file

@ -167,6 +167,8 @@ void log_tree::node::print() const {
}
void log_tree::node::print_to(std::ostream & out, unsigned indent) const {
indent += 2;
auto l = lock();
auto begin = m_ptr->m_location.m_range.m_begin, end = m_ptr->m_location.m_range.m_end;
out << m_ptr->m_location.m_file_name
@ -177,11 +179,27 @@ void log_tree::node::print_to(std::ostream & out, unsigned indent) const {
<< std::hex << reinterpret_cast<uintptr_t>(m_ptr->m_producer.get()) << std::dec
<< ")" << std::endl;
if (auto prod = m_ptr->m_producer) {
if (auto ex = prod->peek_exception()) {
for (unsigned i = 0; i < indent; i++) out << ' ';
out << "producer threw exception: ";
try {
std::rethrow_exception(ex);
} catch (std::exception & ex) {
out << ex.what();
} catch (cancellation_exception) {
out << "<cancelled>";
} catch (...) {
out << "<unknown exception>";
}
out << "\n";
}
}
auto children = m_ptr->m_children;
auto used_names = m_ptr->m_used_names;
l.unlock();
indent += 2;
children.for_each([&] (name const & n, log_tree::node const & c) {
for (unsigned i = 0; i < indent; i++) out << ' ';
out << n;

View file

@ -19,6 +19,14 @@ void gtask_cell::cancel(std::shared_ptr<cancellable> const & self) {
}
}
std::exception_ptr gtask_cell::peek_exception() const {
if (m_state.load() == task_state::Failed) {
return m_exception;
} else {
return {};
}
}
void task_queue::wait_for_success(gtask const & t) {
while (true) {
switch (t->m_state.load()) {
@ -42,11 +50,7 @@ void task_queue::execute(gtask const & t) {
{
buffer<gtask> deps;
t->m_data->m_imp->get_dependencies(deps);
if (t->m_data->m_flags.m_propagate_errors_from_dependencies) {
for (auto & dep : deps) if (dep) wait_for_success(dep);
} else {
for (auto & dep : deps) if (dep) wait_for_finish(dep);
}
for (auto & dep : deps) if (dep) wait_for_finish(dep);
}
t->execute();

View file

@ -25,7 +25,6 @@ struct scheduling_info {
struct task_flags {
bool m_do_priority_inversion = true;
bool m_propagate_errors_from_dependencies = true;
bool m_needs_separate_thread = true;
bool m_needs_execution = true;
};
@ -81,6 +80,7 @@ public:
void cancel(std::shared_ptr<cancellable> const & self) override;
bool peek_is_finished() const { return m_state.load() > task_state::Running; }
std::exception_ptr peek_exception() const;
virtual ~gtask_cell() {}
};

View file

@ -74,12 +74,6 @@ public:
return std::move(*this);
}
task_builder<Res> ignore_dependency_errors() {
lean_assert(m_imp);
m_flags.m_propagate_errors_from_dependencies = false;
return std::move(*this);
}
task_builder<Res> does_not_require_own_thread() {
lean_assert(m_imp);
m_flags.m_needs_separate_thread = false;