| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #include "tbb/tbb_stddef.h" |
| 18 | #include "tbb/pipeline.h" |
| 19 | #include "tbb/spin_mutex.h" |
| 20 | #include "tbb/atomic.h" |
| 21 | #include <cstdlib> |
| 22 | #include <cstdio> |
| 23 | #include "harness.h" |
| 24 | |
| 25 | // In the test, variables related to token counting are declared |
| 26 | // as unsigned long to match definition of tbb::internal::Token. |
| 27 | |
| 28 | struct Buffer { |
| 29 | //! Indicates that the buffer is not used. |
| 30 | static const unsigned long unused = ~0ul; |
| 31 | unsigned long id; |
| 32 | //! True if Buffer is in use. |
| 33 | bool is_busy; |
| 34 | unsigned long sequence_number; |
| 35 | Buffer() : id(unused), is_busy(false), sequence_number(unused) {} |
| 36 | }; |
| 37 | |
| 38 | class waiting_probe { |
| 39 | size_t check_counter; |
| 40 | public: |
| 41 | waiting_probe() : check_counter(0) {} |
| 42 | bool required( ) { |
| 43 | ++check_counter; |
| 44 | return !((check_counter+1)&size_t(0x7FFF)); |
| 45 | } |
| 46 | void probe( ); // defined below |
| 47 | }; |
| 48 | |
| 49 | static const unsigned MaxStreamSize = 8000; |
| 50 | static const unsigned MaxStreamItemsPerThread = 1000; |
| 51 | //! Maximum number of filters allowed |
| 52 | static const unsigned MaxFilters = 5; |
| 53 | static unsigned StreamSize; |
| 54 | static const unsigned MaxBuffer = 8; |
| 55 | static bool Done[MaxFilters][MaxStreamSize]; |
| 56 | static waiting_probe WaitTest; |
| 57 | static unsigned out_of_order_count; |
| 58 | |
| 59 | #include "harness_concurrency_tracker.h" |
| 60 | |
| 61 | class BaseFilter: public tbb::filter { |
| 62 | bool* const my_done; |
| 63 | const bool my_is_last; |
| 64 | bool my_is_running; |
| 65 | public: |
| 66 | tbb::atomic<tbb::internal::Token> current_token; |
| 67 | BaseFilter( tbb::filter::mode type, bool done[], bool is_last ) : |
| 68 | filter(type), |
| 69 | my_done(done), |
| 70 | my_is_last(is_last), |
| 71 | my_is_running(false), |
| 72 | current_token() |
| 73 | {} |
| 74 | virtual Buffer* get_buffer( void* item ) { |
| 75 | current_token++; |
| 76 | return static_cast<Buffer*>(item); |
| 77 | } |
| 78 | void* operator()( void* item ) __TBB_override { |
| 79 | Harness::ConcurrencyTracker ct; |
| 80 | if( is_serial() ) |
| 81 | ASSERT( !my_is_running, "premature entry to serial stage" ); |
| 82 | my_is_running = true; |
| 83 | Buffer* b = get_buffer(item); |
| 84 | if( b ) { |
| 85 | if( is_ordered() ) { |
| 86 | if( b->sequence_number == Buffer::unused ) |
| 87 | b->sequence_number = current_token-1; |
| 88 | else |
| 89 | ASSERT( b->sequence_number==current_token-1, "item arrived out of order" ); |
| 90 | } else if( is_serial() ) { |
| 91 | if( b->sequence_number != current_token-1 && b->sequence_number != Buffer::unused ) |
| 92 | out_of_order_count++; |
| 93 | } |
| 94 | ASSERT( b->id < StreamSize, NULL ); |
| 95 | ASSERT( !my_done[b->id], "duplicate processing of token?" ); |
| 96 | ASSERT( b->is_busy, NULL ); |
| 97 | my_done[b->id] = true; |
| 98 | if( my_is_last ) { |
| 99 | b->id = Buffer::unused; |
| 100 | b->sequence_number = Buffer::unused; |
| 101 | __TBB_store_with_release(b->is_busy, false); |
| 102 | } |
| 103 | } |
| 104 | my_is_running = false; |
| 105 | return b; |
| 106 | } |
| 107 | }; |
| 108 | |
| 109 | class InputFilter: public BaseFilter { |
| 110 | tbb::spin_mutex input_lock; |
| 111 | Buffer buffer[MaxBuffer]; |
| 112 | const tbb::internal::Token my_number_of_tokens; |
| 113 | public: |
| 114 | InputFilter( tbb::filter::mode type, tbb::internal::Token ntokens, bool done[], bool is_last ) : |
| 115 | BaseFilter(type, done, is_last), |
| 116 | my_number_of_tokens(ntokens) |
| 117 | {} |
| 118 | Buffer* get_buffer( void* ) __TBB_override { |
| 119 | unsigned long next_input; |
| 120 | unsigned free_buffer = 0; |
| 121 | { // lock protected scope |
| 122 | tbb::spin_mutex::scoped_lock lock(input_lock); |
| 123 | if( current_token>=StreamSize ) |
| 124 | return NULL; |
| 125 | next_input = current_token++; |
| 126 | // once in a while, emulate waiting for input; this only makes sense for serial input |
| 127 | if( is_serial() && WaitTest.required() ) |
| 128 | WaitTest.probe( ); |
| 129 | while( free_buffer<MaxBuffer ) |
| 130 | if( __TBB_load_with_acquire(buffer[free_buffer].is_busy) ) |
| 131 | ++free_buffer; |
| 132 | else { |
| 133 | buffer[free_buffer].is_busy = true; |
| 134 | break; |
| 135 | } |
| 136 | } |
| 137 | ASSERT( free_buffer<my_number_of_tokens, "premature reuse of buffer" ); |
| 138 | Buffer* b = &buffer[free_buffer]; |
| 139 | ASSERT( &buffer[0] <= b, NULL ); |
| 140 | ASSERT( b <= &buffer[MaxBuffer-1], NULL ); |
| 141 | ASSERT( b->id == Buffer::unused, NULL); |
| 142 | b->id = next_input; |
| 143 | ASSERT( b->sequence_number == Buffer::unused, NULL); |
| 144 | return b; |
| 145 | } |
| 146 | }; |
| 147 | |
| 148 | //! The struct below repeats layout of tbb::pipeline. |
| 149 | struct hacked_pipeline { |
| 150 | tbb::filter* filter_list; |
| 151 | tbb::filter* filter_end; |
| 152 | tbb::empty_task* end_counter; |
| 153 | tbb::atomic<tbb::internal::Token> input_tokens; |
| 154 | tbb::atomic<tbb::internal::Token> token_counter; |
| 155 | bool end_of_input; |
| 156 | bool has_thread_bound_filters; |
| 157 | |
| 158 | virtual ~hacked_pipeline(); |
| 159 | }; |
| 160 | |
| 161 | //! The struct below repeats layout of tbb::internal::input_buffer. |
| 162 | struct hacked_input_buffer { |
| 163 | void* array; // This should be changed to task_info* if ever used |
| 164 | void* my_sem; // This should be changed to semaphore* if ever used |
| 165 | tbb::internal::Token array_size; |
| 166 | tbb::internal::Token low_token; |
| 167 | tbb::spin_mutex array_mutex; |
| 168 | tbb::internal::Token high_token; |
| 169 | bool is_ordered; |
| 170 | bool is_bound; |
| 171 | }; |
| 172 | |
| 173 | //! The struct below repeats layout of tbb::filter. |
| 174 | struct hacked_filter { |
| 175 | tbb::filter* next_filter_in_pipeline; |
| 176 | hacked_input_buffer* my_input_buffer; |
| 177 | unsigned char my_filter_mode; |
| 178 | tbb::filter* prev_filter_in_pipeline; |
| 179 | tbb::pipeline* my_pipeline; |
| 180 | tbb::filter* next_segment; |
| 181 | |
| 182 | virtual ~hacked_filter(); |
| 183 | }; |
| 184 | |
| 185 | bool do_hacking_tests = true; |
| 186 | const tbb::internal::Token tokens_before_wraparound = 0xF; |
| 187 | |
| 188 | void TestTrivialPipeline( unsigned nthread, unsigned number_of_filters ) { |
| 189 | // There are 3 filter types: parallel, serial_in_order and serial_out_of_order |
| 190 | static const tbb::filter::mode filter_table[] = { tbb::filter::parallel, tbb::filter::serial_in_order, tbb::filter::serial_out_of_order}; |
| 191 | const unsigned number_of_filter_types = sizeof(filter_table)/sizeof(filter_table[0]); |
| 192 | REMARK( "testing with %lu threads and %lu filters\n" , nthread, number_of_filters ); |
| 193 | ASSERT( number_of_filters<=MaxFilters, "too many filters" ); |
| 194 | ASSERT( sizeof(hacked_pipeline) == sizeof(tbb::pipeline), "layout changed for tbb::pipeline?" ); |
| 195 | ASSERT( sizeof(hacked_filter) == sizeof(tbb::filter), "layout changed for tbb::filter?" ); |
| 196 | tbb::internal::Token ntokens = nthread<MaxBuffer ? nthread : MaxBuffer; |
| 197 | // Count maximum iterations number |
| 198 | unsigned limit = 1; |
| 199 | for( unsigned i=0; i<number_of_filters; ++i) |
| 200 | limit *= number_of_filter_types; |
| 201 | // Iterate over possible filter sequences |
| 202 | for( unsigned numeral=0; numeral<limit; ++numeral ) { |
| 203 | // Build pipeline |
| 204 | tbb::pipeline pipeline; |
| 205 | if( do_hacking_tests ) { |
| 206 | // A private member of pipeline is hacked there for sake of testing wrap-around immunity. |
| 207 | tbb::internal::punned_cast<hacked_pipeline*>(&pipeline)->token_counter = ~tokens_before_wraparound; |
| 208 | } |
| 209 | tbb::filter* filter[MaxFilters]; |
| 210 | unsigned temp = numeral; |
| 211 | // parallelism_limit is the upper bound on the possible parallelism |
| 212 | unsigned parallelism_limit = 0; |
| 213 | for( unsigned i=0; i<number_of_filters; ++i, temp/=number_of_filter_types ) { |
| 214 | tbb::filter::mode filter_type = filter_table[temp%number_of_filter_types]; |
| 215 | const bool is_last = i==number_of_filters-1; |
| 216 | if( i==0 ) |
| 217 | filter[i] = new InputFilter(filter_type,ntokens,Done[i],is_last); |
| 218 | else |
| 219 | filter[i] = new BaseFilter(filter_type,Done[i],is_last); |
| 220 | pipeline.add_filter(*filter[i]); |
| 221 | // The ordered buffer of serial filters is hacked as well. |
| 222 | if ( filter[i]->is_serial() ) { |
| 223 | if( do_hacking_tests ) { |
| 224 | ((hacked_filter*)(void*)filter[i])->my_input_buffer->low_token = ~tokens_before_wraparound; |
| 225 | ((hacked_filter*)(void*)filter[i])->my_input_buffer->high_token = ~tokens_before_wraparound; |
| 226 | } |
| 227 | parallelism_limit += 1; |
| 228 | } else { |
| 229 | parallelism_limit = nthread; |
| 230 | } |
| 231 | } |
| 232 | // Account for clipping of parallelism. |
| 233 | if( parallelism_limit>nthread ) |
| 234 | parallelism_limit = nthread; |
| 235 | if( parallelism_limit>ntokens ) |
| 236 | parallelism_limit = (unsigned)ntokens; |
| 237 | Harness::ConcurrencyTracker::Reset(); |
| 238 | unsigned streamSizeLimit = min( MaxStreamSize, nthread * MaxStreamItemsPerThread ); |
| 239 | for( StreamSize=0; StreamSize<=streamSizeLimit; ) { |
| 240 | memset( Done, 0, sizeof(Done) ); |
| 241 | for( unsigned i=0; i<number_of_filters; ++i ) { |
| 242 | static_cast<BaseFilter*>(filter[i])->current_token=0; |
| 243 | } |
| 244 | pipeline.run( ntokens ); |
| 245 | ASSERT( !Harness::ConcurrencyTracker::InstantParallelism(), "filter still running?" ); |
| 246 | for( unsigned i=0; i<number_of_filters; ++i ) |
| 247 | ASSERT( static_cast<BaseFilter*>(filter[i])->current_token==StreamSize, NULL ); |
| 248 | for( unsigned i=0; i<MaxFilters; ++i ) |
| 249 | for( unsigned j=0; j<StreamSize; ++j ) { |
| 250 | ASSERT( Done[i][j]==(i<number_of_filters), NULL ); |
| 251 | } |
| 252 | if( StreamSize < min(nthread*8, 32u) ) { |
| 253 | ++StreamSize; |
| 254 | } else { |
| 255 | StreamSize = StreamSize*8/3; |
| 256 | } |
| 257 | } |
| 258 | if( Harness::ConcurrencyTracker::PeakParallelism() < parallelism_limit ) |
| 259 | REMARK( "nthread=%lu ntokens=%lu MaxParallelism=%lu parallelism_limit=%lu\n" , |
| 260 | nthread, ntokens, Harness::ConcurrencyTracker::PeakParallelism(), parallelism_limit ); |
| 261 | for( unsigned i=0; i < number_of_filters; ++i ) { |
| 262 | delete filter[i]; |
| 263 | filter[i] = NULL; |
| 264 | } |
| 265 | pipeline.clear(); |
| 266 | } |
| 267 | } |
| 268 | |
| 269 | #include "harness_cpu.h" |
| 270 | |
| 271 | static int nthread; // knowing number of threads is necessary to call TestCPUUserTime |
| 272 | |
| 273 | void waiting_probe::probe( ) { |
| 274 | if( nthread==1 ) return; |
| 275 | REMARK("emulating wait for input\n" ); |
| 276 | // Test that threads sleep while no work. |
| 277 | // The master doesn't sleep so there could be 2 active threads if a worker is waiting for input |
| 278 | TestCPUUserTime(nthread, 2); |
| 279 | } |
| 280 | |
| 281 | #include "tbb/task_scheduler_init.h" |
| 282 | |
| 283 | int TestMain () { |
| 284 | out_of_order_count = 0; |
| 285 | if( MinThread<1 ) { |
| 286 | REPORT("must have at least one thread" ); |
| 287 | exit(1); |
| 288 | } |
| 289 | if( tbb::TBB_runtime_interface_version()>TBB_INTERFACE_VERSION) { |
| 290 | REMARK("Warning: implementation dependent tests disabled\n" ); |
| 291 | do_hacking_tests = false; |
| 292 | } |
| 293 | |
| 294 | // Test with varying number of threads. |
| 295 | for( nthread=MinThread; nthread<=MaxThread; ++nthread ) { |
| 296 | // Initialize TBB task scheduler |
| 297 | tbb::task_scheduler_init init(nthread); |
| 298 | |
| 299 | // Test pipelines with n filters |
| 300 | for( unsigned n=0; n<=MaxFilters; ++n ) |
| 301 | TestTrivialPipeline(nthread,n); |
| 302 | |
| 303 | // Test that all workers sleep when no work |
| 304 | TestCPUUserTime(nthread); |
| 305 | } |
| 306 | if( !out_of_order_count ) |
| 307 | REPORT("Warning: out of order serial filter received tokens in order\n" ); |
| 308 | return Harness::Done; |
| 309 | } |
| 310 | |