Divide Framework 0.1
A free and open-source 3D Framework under heavy development
Loading...
Searching...
No Matches
ThreadingTests.cpp
Go to the documentation of this file.
2
5#include <atomic>
6#include <iostream>
7
8namespace Divide
9{
10
11namespace
12{
14 void PrintLine( const std::string_view line )
15 {
17 std::cout << line << std::endl;
18 };
19
20 void StartAndWait( Task& task, TaskPool& pool, const TaskPriority priority = TaskPriority::DONT_CARE, const DELEGATE<void>& onCompletionFunction = {})
21 {
22 Start( task, pool, priority, onCompletionFunction );
23 Wait( task, pool );
24 }
25
26 void SleepThread(const D64 milliseconds )
27 {
28 const D64 start = Time::App::ElapsedMilliseconds();
29
30 while ( true )
31 {
32 if ( Time::App::ElapsedMilliseconds() - start >= Time::Milliseconds( milliseconds ) )
33 {
34 break;
35 }
36 std::this_thread::yield();
37 }
38
39 }
40};
41
42TEST_CASE( "Task Pool Construction Test", "[threading_tests]" )
43{
45
47
48 TaskPool test("CONSTRUCTION_TEST");
49
50 // Not enough workers
51 bool init = test.init( 0 );
52 CHECK_FALSE( init );
53
54 // Valid
55 init = test.init( 1 );
56 CHECK_TRUE( init );
57
58 // Double init
59 init = test.init( std::thread::hardware_concurrency() );
60 CHECK_TRUE( init );
61
62 test.shutdown();
63}
64
65TEST_CASE( "Parallel For Test", "[threading_tests]" )
66{
68
70
71 TaskPool test( "PARALLEL_FOR_TEST" );
72
73 const bool init = test.init( std::thread::hardware_concurrency() );
74 CHECK_TRUE( init );
75
76 constexpr U32 partitionSize = 4;
77 constexpr U32 loopCount = partitionSize * 4 + 2;
78
79 std::atomic_uint loopCounter = 0;
80 std::atomic_uint totalCounter = 0;
81
82 ParallelForDescriptor descriptor = {};
83 descriptor._iterCount = loopCount;
84 descriptor._partitionSize = partitionSize;
85 Parallel_For( test, descriptor, [&totalCounter, &loopCounter]( [[maybe_unused]] const Task* parentTask, const U32 start, const U32 end ) noexcept
86 {
87 ++loopCounter;
88 for ( U32 i = start; i < end; ++i )
89 {
90 ++totalCounter;
91 }
92 });
93
94 CHECK_EQUAL( loopCounter, 5u );
95 CHECK_EQUAL( totalCounter, 18u );
96
97 test.shutdown();
98}
99
100
101TEST_CASE( "Task Callback Test", "[threading_tests]" )
102{
104
105 TaskPool test( "CALLBACK_TEST" );
106 const bool init = test.init( std::thread::hardware_concurrency() );
107 CHECK_TRUE( init );
108
109 bool testValue = false;
110
111 Task* job = CreateTask( []( [[maybe_unused]] const Task& parentTask )
112 {
113 Time::ProfileTimer timer;
114 timer.start();
115 PrintLine( "TaskCallbackTest: Thread sleeping for 500ms" );
116 SleepThread(Time::Milliseconds(500) );
117
118 timer.stop();
119 const F32 durationMS = Time::MicrosecondsToMilliseconds<F32>( timer.get() - Time::ProfileTimer::overhead() );
120 PrintLine( "TaskCallbackTest: Thread waking up (" + std::to_string( durationMS ) + "ms )" );
121 } );
122
123 Start( *job, test, TaskPriority::DONT_CARE, [&testValue]()
124 {
125 PrintLine( "TaskCallbackTest: Callback called!" );
126 testValue = true;
127 PrintLine( "TaskCallbackTest: Value changed to: [ " + std::string( testValue ? "true" : "false" ) + " ]!" );
128 } );
129
130 CHECK_FALSE( testValue );
131 PrintLine( "TaskCallbackTest: waiting for task!" );
132 Wait( *job, test );
133
134 CHECK_TRUE( Finished( *job ) );
135 CHECK_FALSE( testValue );
136
137 PrintLine( "TaskCallbackTest: flushing queue!" );
138 const size_t callbackCount = test.flushCallbackQueue();
139 CHECK_EQUAL( callbackCount, 1u );
140 PrintLine( "TaskCallbackTest: flushing test! Value: " + std::string( testValue ? "true" : "false" ) );
141 CHECK_TRUE( testValue );
142
143 test.shutdown();
144}
145
146
147namespace
148{
150 {
151 void setTestValue( const bool state ) noexcept
152 {
153 _testValue.store( state, std::memory_order_release );
154 }
155
156 [[nodiscard]] bool getTestValue() const noexcept
157 {
158 return _testValue.load( std::memory_order_acquire );
159 }
160
161 void threadedFunction( [[maybe_unused]] const Task& parentTask )
162 {
163 Time::ProfileTimer timer;
164 timer.start();
165 SleepThread( Time::Milliseconds( 300 ) );
166 timer.stop();
167 const F32 durationMS = Time::MicrosecondsToMilliseconds<F32>( timer.get() - Time::ProfileTimer::overhead() );
168 PrintLine( "threadedFunction completed in: " + std::to_string( durationMS ) + " ms." );
169
170 setTestValue( true );
171 }
172
173 private:
174 std::atomic_bool _testValue{ false };
175 };
176}
177
178TEST_CASE_METHOD( ThreadedTest, "Task Class Member Callback Test", "[threading_tests]" )
179{
181
182 TaskPool test("MEMBER_CALLBACK_TEST");
183
184 const bool init = test.init( to_U8( std::thread::hardware_concurrency() ));
185 CHECK_TRUE( init );
186
187 Task* job = CreateTask( [&]( const Task& parentTask )
188 {
189 threadedFunction( parentTask );
190 } );
191
192 CHECK_FALSE( getTestValue() );
193
194 Start( *job, test, TaskPriority::DONT_CARE, [&]() noexcept
195 {
196 setTestValue( false );
197 } );
198
199 CHECK_FALSE( getTestValue() );
200
201 Wait( *job, test );
202
203 CHECK_TRUE( getTestValue() );
204
205 const size_t callbackCount = test.flushCallbackQueue();
206 CHECK_EQUAL( callbackCount, 1u );
207
208 const bool finalValue = getTestValue();
209
210 CHECK_FALSE( finalValue );
211
212 test.shutdown();
213}
214
215TEST_CASE( "Task Speed Test", "[threading_tests]" )
216{
218
219 constexpr size_t loopCountA = 60u * 1000u;
220 constexpr U32 partitionSize = 256u;
221 constexpr U32 loopCountB = partitionSize * 8192u + 2u;
222
223 const U64 timerOverhead = Time::ProfileTimer::overhead();
224 {
225 TaskPool test("SPEED_TEST_LOOP");
226 const bool init = test.init( to_U8( std::thread::hardware_concurrency() ) );
227 CHECK_TRUE( init );
228
229 Time::ProfileTimer timer;
230
231 timer.start();
232 Task* job = CreateTask( TASK_NOP );
233
234 for ( size_t i = 0u; i < loopCountA; ++i )
235 {
236 Start( *CreateTask( job, TASK_NOP ), test );
237 }
238
239 StartAndWait( *job, test );
240
241 timer.stop();
242 const F32 durationMS = Time::MicrosecondsToMilliseconds<F32>( timer.get() - timerOverhead );
243 PrintLine( "Threading speed test: " + std::to_string( loopCountA ) + " tasks completed in: " + std::to_string( durationMS ) + " ms." );
244
245 test.shutdown();
246 }
247 {
248 TaskPool test("SPEED_TEST_PARALLEL_FOR");
249 const bool init = test.init( to_U8( std::thread::hardware_concurrency() ) );
250 CHECK_TRUE( init );
251
252 Time::ProfileTimer timer;
253 timer.start();
254
255 ParallelForDescriptor descriptor = {};
256 descriptor._iterCount = loopCountB;
257 descriptor._partitionSize = partitionSize;
258 descriptor._useCurrentThread = false;
259 Parallel_For( test, descriptor, []( [[maybe_unused]] const Task* parentTask, [[maybe_unused]] const U32 start, [[maybe_unused]] const U32 end )
260 {
261 NOP();
262 });
263 timer.stop();
264 const F32 durationMS = Time::MicrosecondsToMilliseconds<F32>( timer.get() - timerOverhead );
265 PrintLine( "Threading speed test (Parallel_For): " + std::to_string( loopCountB / partitionSize ) + " partitions tasks completed in: " + std::to_string( durationMS ) + " ms." );
266
267 test.shutdown();
268 }
269 {
270 TaskPool test("SPEED_TEST_PARALLEL_FOR_CURRENT_THREAD");
271 const bool init = test.init( to_U8( std::thread::hardware_concurrency() ) );
272 CHECK_TRUE( init );
273
274 Time::ProfileTimer timer;
275 timer.start();
276
277 ParallelForDescriptor descriptor = {};
278 descriptor._iterCount = loopCountB;
279 descriptor._partitionSize = partitionSize;
280 descriptor._useCurrentThread = true;
281 Parallel_For( test, descriptor, []( [[maybe_unused]] const Task* parentTask, [[maybe_unused]] const U32 start, [[maybe_unused]] const U32 end )
282 {
283 NOP();
284 });
285
286 timer.stop();
287 const F32 durationMS = Time::MicrosecondsToMilliseconds<F32>( timer.get() - timerOverhead );
288 PrintLine( "Threading speed test (Parallel_For - use current thread): " + std::to_string( loopCountB / partitionSize ) + " partitions tasks completed in: " + std::to_string( durationMS ) + " ms." );
289
290 test.shutdown();
291 }
292}
293
294TEST_CASE( "Task Priority Test", "[threading_tests]" )
295{
297
298 TaskPool test("PRIORTIY_TEST");
299 const bool init = test.init( std::thread::hardware_concurrency() );
300 CHECK_TRUE( init );
301
302 U32 callbackValue = 0u;
303
304 Task* job = CreateTask( [&callbackValue]( [[maybe_unused]] const Task& parentTask )
305 {
306 ++callbackValue;
307 } );
308
309 StartAndWait( *job, test, TaskPriority::DONT_CARE, [&callbackValue]()
310 {
311 ++callbackValue;
312 } );
313
314 CHECK_EQUAL( callbackValue, 1u );
315
316 size_t callbackCount = test.flushCallbackQueue();
317 CHECK_EQUAL( callbackCount, 1u );
318 CHECK_EQUAL( callbackValue, 2u );
319
320 job = CreateTask( [&callbackValue]( [[maybe_unused]] const Task& parentTask )
321 {
322 ++callbackValue;
323 } );
324
325 StartAndWait( *job, test );
326 CHECK_EQUAL( callbackValue, 3u );
327
328 callbackCount = test.flushCallbackQueue();
329 CHECK_EQUAL( callbackCount, 0u );
330 CHECK_EQUAL( callbackValue, 3u );
331
332 job = CreateTask( [&callbackValue]( [[maybe_unused]] const Task& parentTask )
333 {
334 ++callbackValue;
335 } );
336 StartAndWait( *job, test, TaskPriority::REALTIME, [&callbackValue]()
337 {
338 ++callbackValue;
339 } );
340
341 CHECK_EQUAL( callbackValue, 5u );
342
343 test.shutdown();
344}
345
346} //namespace Divide
#define NOP()
size_t flushCallbackQueue()
Returns the number of callbacks processed.
Definition: TaskPool.cpp:201
bool init(size_t threadCount, const DELEGATE< void, const std::thread::id & > &onThreadCreate={})
Definition: TaskPool.cpp:30
D64 ElapsedMilliseconds() noexcept
constexpr T Milliseconds(T a)
Definition: MathHelper.inl:653
void PrintLine(const std::string_view line)
void StartAndWait(Task &task, TaskPool &pool, const TaskPriority priority=TaskPriority::DONT_CARE, const DELEGATE< void > &onCompletionFunction={})
Handle console commands that start with a forward slash.
Definition: AIProcessor.cpp:7
DELEGATE_STD< Ret, Args... > DELEGATE
std::lock_guard< mutex > LockGuard
Definition: SharedMutex.h:55
std::mutex Mutex
Definition: SharedMutex.h:40
void Wait(const Task &task, TaskPool &pool)
Definition: Task.cpp:20
Task * CreateTask(Predicate &&threadedFunction, bool allowedInIdle=true)
Definition: TaskPool.inl:45
bool Finished(const Task &task) noexcept
Definition: Task.inl:38
constexpr auto TASK_NOP
Definition: Task.h:57
TaskPriority
Definition: Task.h:41
@ REALTIME
not threaded
constexpr U8 to_U8(const T value)
TEST_CASE_METHOD(ThreadedTest, "Task Class Member Callback Test", "[threading_tests]")
double D64
void Start(Task &task, TaskPool &pool, TaskPriority priority=TaskPriority::DONT_CARE, const DELEGATE< void > &onCompletionFunction={})
Definition: Task.cpp:9
void Parallel_For(TaskPool &pool, const ParallelForDescriptor &descriptor, const DELEGATE< void, const Task *, U32, U32 > &cbk)
Definition: TaskPool.cpp:428
TEST_CASE("ByteBuffer RW Bool", "[byte_buffer]")
uint32_t U32
uint64_t U64
static void ToggleFlag(const Flags flag, const bool state)
Definition: Console.h:135
U32 _partitionSize
How many elements should we process per async task.
Definition: TaskPool.h:45
bool _useCurrentThread
If true, we'll process a for partition on the calling thread.
Definition: TaskPool.h:51
U32 _iterCount
For loop iteration count.
Definition: TaskPool.h:43
#define CHECK_EQUAL(LHS, RHS)
#define CHECK_TRUE(...)