| 1 | #include <sstream> |
|---|---|
| 2 | |
| 3 | #include <Common/CurrentThread.h> |
| 4 | #include <Common/Exception.h> |
| 5 | #include <Common/ThreadProfileEvents.h> |
| 6 | #include <Common/TaskStatsInfoGetter.h> |
| 7 | #include <Common/QueryProfiler.h> |
| 8 | #include <Common/ThreadStatus.h> |
| 9 | |
| 10 | #include <Poco/Logger.h> |
| 11 | #include <common/getThreadNumber.h> |
| 12 | |
| 13 | |
| 14 | namespace DB |
| 15 | { |
| 16 | |
| 17 | |
| 18 | namespace ErrorCodes |
| 19 | { |
| 20 | extern const int LOGICAL_ERROR; |
| 21 | extern const int PTHREAD_ERROR; |
| 22 | } |
| 23 | |
| 24 | |
| 25 | thread_local ThreadStatus * current_thread = nullptr; |
| 26 | |
| 27 | |
| 28 | TasksStatsCounters TasksStatsCounters::current() |
| 29 | { |
| 30 | TasksStatsCounters res; |
| 31 | CurrentThread::get().taskstats_getter->getStat(res.stat, CurrentThread::get().os_thread_id); |
| 32 | return res; |
| 33 | } |
| 34 | |
| 35 | ThreadStatus::ThreadStatus() |
| 36 | { |
| 37 | thread_number = getThreadNumber(); |
| 38 | os_thread_id = TaskStatsInfoGetter::getCurrentTID(); |
| 39 | |
| 40 | last_rusage = std::make_unique<RUsageCounters>(); |
| 41 | last_taskstats = std::make_unique<TasksStatsCounters>(); |
| 42 | |
| 43 | memory_tracker.setDescription("(for thread)"); |
| 44 | log = &Poco::Logger::get("ThreadStatus"); |
| 45 | |
| 46 | current_thread = this; |
| 47 | |
| 48 | /// NOTE: It is important not to do any non-trivial actions (like updating ProfileEvents or logging) before ThreadStatus is created |
| 49 | /// Otherwise it could lead to SIGSEGV due to current_thread dereferencing |
| 50 | } |
| 51 | |
| 52 | ThreadStatus::~ThreadStatus() |
| 53 | { |
| 54 | try |
| 55 | { |
| 56 | if (untracked_memory > 0) |
| 57 | memory_tracker.alloc(untracked_memory); |
| 58 | else |
| 59 | memory_tracker.free(-untracked_memory); |
| 60 | } |
| 61 | catch (const DB::Exception &) |
| 62 | { |
| 63 | /// It's a minor tracked memory leak here (not the memory itself but it's counter). |
| 64 | /// We've already allocated a little bit more then the limit and cannot track it in the thread memory tracker or its parent. |
| 65 | } |
| 66 | |
| 67 | if (deleter) |
| 68 | deleter(); |
| 69 | current_thread = nullptr; |
| 70 | } |
| 71 | |
| 72 | void ThreadStatus::initPerformanceCounters() |
| 73 | { |
| 74 | performance_counters_finalized = false; |
| 75 | |
| 76 | /// Clear stats from previous query if a new query is started |
| 77 | /// TODO: make separate query_thread_performance_counters and thread_performance_counters |
| 78 | performance_counters.resetCounters(); |
| 79 | memory_tracker.resetCounters(); |
| 80 | memory_tracker.setDescription("(for thread)"); |
| 81 | |
| 82 | query_start_time_nanoseconds = getCurrentTimeNanoseconds(); |
| 83 | query_start_time = time(nullptr); |
| 84 | ++queries_started; |
| 85 | |
| 86 | *last_rusage = RUsageCounters::current(query_start_time_nanoseconds); |
| 87 | |
| 88 | try |
| 89 | { |
| 90 | if (TaskStatsInfoGetter::checkPermissions()) |
| 91 | { |
| 92 | if (!taskstats_getter) |
| 93 | taskstats_getter = std::make_unique<TaskStatsInfoGetter>(); |
| 94 | |
| 95 | *last_taskstats = TasksStatsCounters::current(); |
| 96 | } |
| 97 | } |
| 98 | catch (...) |
| 99 | { |
| 100 | taskstats_getter.reset(); |
| 101 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 102 | } |
| 103 | } |
| 104 | |
| 105 | void ThreadStatus::updatePerformanceCounters() |
| 106 | { |
| 107 | try |
| 108 | { |
| 109 | RUsageCounters::updateProfileEvents(*last_rusage, performance_counters); |
| 110 | if (taskstats_getter) |
| 111 | TasksStatsCounters::updateProfileEvents(*last_taskstats, performance_counters); |
| 112 | } |
| 113 | catch (...) |
| 114 | { |
| 115 | tryLogCurrentException(log); |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | void ThreadStatus::assertState(const std::initializer_list<int> & permitted_states, const char * description) |
| 120 | { |
| 121 | for (auto permitted_state : permitted_states) |
| 122 | { |
| 123 | if (getCurrentState() == permitted_state) |
| 124 | return; |
| 125 | } |
| 126 | |
| 127 | std::stringstream ss; |
| 128 | ss << "Unexpected thread state "<< getCurrentState(); |
| 129 | if (description) |
| 130 | ss << ": "<< description; |
| 131 | throw Exception(ss.str(), ErrorCodes::LOGICAL_ERROR); |
| 132 | } |
| 133 | |
| 134 | void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, |
| 135 | LogsLevel client_logs_level) |
| 136 | { |
| 137 | logs_queue_ptr = logs_queue; |
| 138 | |
| 139 | if (!thread_group) |
| 140 | return; |
| 141 | |
| 142 | std::lock_guard lock(thread_group->mutex); |
| 143 | thread_group->logs_queue_ptr = logs_queue; |
| 144 | thread_group->client_logs_level = client_logs_level; |
| 145 | } |
| 146 | |
| 147 | } |
| 148 |