Merge lp:~epics-core/epics-base/thread-pool into lp:~epics-core/epics-base/3.15

Proposed by mdavidsaver
Status: Merged
Merged at revision: 12505
Proposed branch: lp:~epics-core/epics-base/thread-pool
Merge into: lp:~epics-core/epics-base/3.15
Diff against target: 1509 lines (+1446/-0)
9 files modified
documentation/RELEASE_NOTES.html (+8/-0)
src/libCom/Makefile (+1/-0)
src/libCom/pool/Makefile (+16/-0)
src/libCom/pool/epicsThreadPool.h (+150/-0)
src/libCom/pool/poolJob.c (+328/-0)
src/libCom/pool/poolPriv.h (+97/-0)
src/libCom/pool/threadPool.c (+399/-0)
src/libCom/test/Makefile (+4/-0)
src/libCom/test/epicsThreadPoolTest.c (+443/-0)
To merge this branch: bzr merge lp:~epics-core/epics-base/thread-pool
Reviewer Review Type Date Requested Status
Andrew Johnson Approve
mdavidsaver Needs Resubmitting
Review via email: mp+108385@code.launchpad.net

Description of the change

General purpose thread pool.

To post a comment you must log in.
Revision history for this message
mdavidsaver (mdavidsaver) wrote :

* Include in RTEMS/vxworks test harness

Done.

* For RTEMS/vxworks, replace mutex with interrupt disable

Done.

* Avoid excessive wakeups from worker and observer Events.

Done.

Also fixed several issues and added calls to shared thread pools.

Remaining task

Get approval of name epicsMaxParallelThreads()

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

The current implementation for RTEMS uses epicsEventSignal in places where interrupts have been disabled in the belief that this disables all context switching. This is not so.

review: Needs Fixing
Revision history for this message
mdavidsaver (mdavidsaver) wrote :

I've changed the implementation to use only mutex and abandoned plans to use spin locks.

Also, the API to report the number of cores is made private and will be replaced with Ralph's API after this is merged.

The error codes returned are changed to the usual form.

review: Approve
Revision history for this message
Ralph Lange (ralph-lange) wrote :

Michael,

could you push the documentation to a branch in https://code.launchpad.net/epics-appdev ?

Thank you,
~Ralph

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

> could you push the documentation to a branch in https://code.launchpad.net
> /epics-appdev ?

Done

lp:~mdavidsaver/epics-appdev/thread-pool

Also made some final cleanup of the API including the use enums instead of integers for the work functions mode and the epicsThreadPoolControl function option arguments. The epicsJobSet function is removed in favor of a magic argument EPICSJOB_SELF to set the user pointer to the epicsJob*. Also tweaked the default thread priority to run between highest CA server and the Medium scan.

Revision history for this message
Andrew Johnson (anj) wrote :

Please delete the _poolNumCores() implementations and use the new epicsThreadGetCPUs() API from epicsThread.h instead. Unfortunately that was added to the 3.15 branch in commit 12480 after this branch was created, so you'll either have to rebase or merge 3.15 into this branch to get it (rebase is better, I have had problems with hiding the history the other way).

While you're doing that, you could change the M_com value to avoid the clash with the now-defined M_stdlib. However, you may not realize that you are allowed to use status values from errno.h instead of inventing your own if they mean the same thing; errSymLookup() calls strerror() when given a status value with modNum <= 500, so you can use errno values such as ENOMEM, ERANGE, ETIMEDOUT etc. directly as status values. Maybe avoid the more obscure values for portability reasons, but anything returned by the OS or defined in POSIX.1 should be fine.

As a result I'm not keen on adding the errCommon.h header. If you need status values that don't match anything already defined, I'd rather that you add an M_pool and define your own S_pool_xxx status values in epicsThreadPool.h

I have not looked at the pool code itself, but I am reading the AppDevGuide entry. More comments tomorrow.

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

> Please delete the _poolNumCores() implementations and use the new
> epicsThreadGetCPUs() API from epicsThread.h instead.

Ok.

> ... you'll either have to rebase or merge 3.15 into this branch to get it

Ok. (We'll see how launchpad copes with this)

> ... you may not realize that you are
> allowed to use status values from errno.h instead of inventing your own if
> they mean the same thing;

Indeed I didn't know this, and will try.

> Maybe avoid the more obscure values

But I want to use EILSEQ :)

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

> > ... you may not realize that you are
> > allowed to use status values from errno.h instead of inventing your own if
> > they mean the same thing;
>
> Indeed I didn't know this, and will try.

To do this should I use SOCK_EINVAL instead of EINVAL?

FYI I think I can get by with only EINVAL and EWOULDBLOCK.

Revision history for this message
Andrew Johnson (anj) wrote :

On 07/24/2014 10:54 AM, mdavidsaver wrote:
> To do this should I use SOCK_EINVAL instead of EINVAL?

Since errlog uses strerror() to get the error string I think you have to
use EINVAL, which Microsoft supports at least back to MSVS-2003. I don't
know whether strerror() can handle WSAEINVAL, they don't say.

> FYI I think I can get by with only EINVAL and EWOULDBLOCK.

EWOULDBLOCK is listed in the MS documentation as supported by MSVS-2010
so that's fine too.

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

Ok, I think I just messed up trying to overwrite this branch with a re-based version.

review: Needs Fixing
Revision history for this message
Andrew Johnson (anj) wrote :

I think Launchpad just got itself confused, but by setting the status back to Needs Review and waiting a little it seems to have recovered and I think it's now showing the right diff again.

Revision history for this message
Andrew Johnson (anj) wrote :

Why are the thread and observer counters in struct epicsThreadPool all declared as size_t instead of unsigned integers? We'll never want even 2**32 threads or observers, so allowing for 2**64 on 64-bit systems seems silly. There is no printf format string for a size_t that works across all our OSs, and while your casting to unsigned long in epicsThreadPoolReport() works a cast should be unnecessary and still has to drop bits on 64-bit Windows where sizeof(size_t) == sizeof(unsigned long long).

Shouldn't that EWOULDBLOCK be ETIMEDOUT? It has already blocked for the timeout period, and this status has that meaning in sem_wait, mq_send/receive etc.

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

> Why are the thread and observer counters in struct epicsThreadPool all
> declared as size_t

Its shorter :)

> Shouldn't that EWOULDBLOCK be ETIMEDOUT? It has already blocked for the
> timeout period, and this status has that meaning in sem_wait, mq_send/receive
> etc.

Good point. I'll change this.

12503. By mdavidsaver

thread pool: epicsJobQueue return EPERM

When pool control prevents operation

12504. By mdavidsaver

thread pool: handle failure to create worker

epicsJobQueue() returns EAGAIN when the first worker
can't be lazily created.

Failure to create workers beyond the first is
silently ignored.

12505. By mdavidsaver

thread pool: fix return of epicsJobUnqueue()

Return 0 on success (was queued, now is not),
1 if not queued initially, and EINVAL
if orphaned.

12506. By mdavidsaver

thread pool: epicsThreadPoolWait return ETIMEOUT

12507. By mdavidsaver

thread pool: mark epicsJobCreate() as safe for job functions

Also, use epicsJobMove() to avoid some redundant code

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

Rebased and made requested changes. Removed S_com_* error codes in favor of E* code from errno.h.

Also improved handling when worker thread creation fails. This was treated as an error in epicsThreadPoolCreate(). Now also in epicsJobQueue() when lazy creation of the first worker fails.

Failure to create workers after the first is not treated as an error.
This requires that user code needing to ensure that more than one worker is
running must explicitly test with epicsThreadPoolNThreads().

review: Needs Resubmitting
Revision history for this message
Andrew Johnson (anj) wrote :

Your counters are still defined as size_t. Both epicsThreadPoolConfig and struct epicsThreadPool are guilty of this.

Strictly speaking the name _epicsJobArgSelf is reserved for the C implementation because it begins with an underscore and appears in the global namespace.
http://stackoverflow.com/questions/228783/what-are-the-rules-about-using-an-underscore-in-a-c-identifier

_epicsThreadPoolControl is OK because it's static, but I would prefer to see it named something like threadPoolControl so it doesn't get made visible and break the rule in the future.

12508. By mdavidsaver

don't include errCommon.h

doesn't exist anymore

12509. By mdavidsaver

thread pool: switch thread counts to unsigned int

size_t is considered overly optimistic

12510. By mdavidsaver

thread pool: don't use reserved names

Avoid global symbols with leading underscore

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

Done.

review: Needs Resubmitting
Revision history for this message
Andrew Johnson (anj) wrote :

Test 148 was failing on VxWorks, both 5.5.2 and 6.9, and should have failed on RTEMS too.

ok 147 - priv->inprogress==0
# count==0
not ok 148 - epicsThreadPoolNThreads(pool)==2
ok 149 - (pool=epicsThreadPoolCreate(NULL))!=NULL

epicsThreadPoolNThreads(pool) was returning 1 because this is a UP machine so the default value of maxThreads is 1.

I added an epicsThreadPoolConfig variable and set maxThreads to 2 to fix this.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'documentation/RELEASE_NOTES.html'
2--- documentation/RELEASE_NOTES.html 2014-06-12 19:47:42 +0000
3+++ documentation/RELEASE_NOTES.html 2014-07-29 16:22:24 +0000
4@@ -15,6 +15,14 @@
5 <h2 align="center">Changes between 3.15.0.1 and 3.15.0.2</h2>
6 <!-- Insert new items immediately below here ... -->
7
8+<h3>
9+General purpose thread pool</h3>
10+
11+<p>
12+A general purpose threaded work queue API epicsThreadPool is added.
13+Multiple pools can be created with controlable priority and number
14+of worker threads. Lazy worker startup is supported.</p>
15+
16 <h3>Database field setting updates</h3>
17
18 <p>A database (.db) file loaded by an IOC does not have to repeat the record
19
20=== modified file 'src/libCom/Makefile'
21--- src/libCom/Makefile 2012-04-27 17:21:29 +0000
22+++ src/libCom/Makefile 2014-07-29 16:22:24 +0000
23@@ -31,6 +31,7 @@
24 include $(LIBCOM)/macLib/Makefile
25 include $(LIBCOM)/misc/Makefile
26 include $(LIBCOM)/osi/Makefile
27+include $(LIBCOM)/pool/Makefile
28 include $(LIBCOM)/ring/Makefile
29 include $(LIBCOM)/taskwd/Makefile
30 include $(LIBCOM)/timer/Makefile
31
32=== added directory 'src/libCom/pool'
33=== added file 'src/libCom/pool/Makefile'
34--- src/libCom/pool/Makefile 1970-01-01 00:00:00 +0000
35+++ src/libCom/pool/Makefile 2014-07-29 16:22:24 +0000
36@@ -0,0 +1,16 @@
37+#*************************************************************************
38+# Copyright (c) 2014 UChicago Argonne LLC, as Operator of Argonne
39+# National Laboratory.
40+# EPICS BASE is distributed subject to a Software License Agreement found
41+# in file LICENSE that is included with this distribution.
42+#*************************************************************************
43+
44+# This is a Makefile fragment, see src/libCom/Makefile.
45+
46+SRC_DIRS += $(LIBCOM)/pool
47+
48+INC += epicsThreadPool.h
49+
50+Com_SRCS += poolJob.c
51+Com_SRCS += threadPool.c
52+
53
54=== added file 'src/libCom/pool/epicsThreadPool.h'
55--- src/libCom/pool/epicsThreadPool.h 1970-01-01 00:00:00 +0000
56+++ src/libCom/pool/epicsThreadPool.h 2014-07-29 16:22:24 +0000
57@@ -0,0 +1,150 @@
58+/*************************************************************************\
59+* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
60+* Brookhaven National Laboratory.
61+* EPICS BASE is distributed subject to a Software License Agreement found
62+* in file LICENSE that is included with this distribution.
63+\*************************************************************************/
64+/* General purpose worker thread pool manager
65+ * mdavidsaver@bnl.gov
66+ */
67+#ifndef EPICSTHREADPOOL_H
68+#define EPICSTHREADPOOL_H
69+
70+#include <stdlib.h>
71+#include <stdio.h>
72+
73+#include "shareLib.h"
74+
75+#ifdef __cplusplus
76+extern "C" {
77+#endif
78+
79+typedef struct {
80+ unsigned int initialThreads;
81+ unsigned int maxThreads;
82+ unsigned int workerStack;
83+ unsigned int workerPriority;
84+} epicsThreadPoolConfig;
85+
86+typedef struct epicsThreadPool epicsThreadPool;
87+
88+/* Job function call modes */
89+typedef enum {
90+ /* Normal run of job */
91+ epicsJobModeRun,
92+ /* Thread pool is being destroyed.
93+ * A chance to cleanup the job immediately with epicsJobDestroy().
94+ * If ignored, the job is orphaned (dissociated from the thread pool)
95+ * and epicsJobDestroy() must be called later.
96+ */
97+ epicsJobModeCleanup
98+} epicsJobMode;
99+
100+typedef void (*epicsJobFunction)(void* arg, epicsJobMode mode);
101+
102+typedef struct epicsJob epicsJob;
103+
104+/* Pool operations */
105+
106+/* Initialize a pool config with default values.
107+ * This much be done to preserve future compatibility
108+ * when new options are added.
109+ */
110+epicsShareFunc void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *);
111+
112+/* fetch or create a thread pool which can be shared with other users.
113+ * may return NULL for allocation failures
114+ */
115+epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts);
116+epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool);
117+
118+/* If opts is NULL then defaults are used.
119+ * The opts pointer is not stored by this call, and may exist on the stack.
120+ */
121+epicsShareFunc epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts);
122+
123+/* Blocks until all worker threads have stopped.
124+ * Any jobs still attached to this pool receive a callback with EPICSJOB_CLEANUP
125+ * and are then orphaned.
126+ */
127+epicsShareFunc void epicsThreadPoolDestroy(epicsThreadPool *);
128+
129+/* pool control options */
130+typedef enum {
131+ epicsThreadPoolQueueAdd, /* val==0 causes epicsJobQueue to fail, 1 is default */
132+ epicsThreadPoolQueueRun /* val==0 prevents workers from running jobs, 1 is default */
133+} epicsThreadPoolOption;
134+epicsShareFunc void epicsThreadPoolControl(epicsThreadPool* pool,
135+ epicsThreadPoolOption opt,
136+ unsigned int val);
137+
138+/* Block until job queue is emptied and no jobs are running.
139+ * Useful after calling epicsThreadPoolControl() with option epicsThreadPoolQueueAdd=0
140+ *
141+ * timeout<0 waits forever, timeout==0 polls, timeout>0 waits at most one timeout period
142+ * Returns 0 for success or non-zero on error (timeout is ETIMEOUT)
143+ */
144+epicsShareFunc int epicsThreadPoolWait(epicsThreadPool* pool, double timeout);
145+
146+
147+/* Per job operations */
148+
149+/* special flag for epicsJobCreate().
150+ * When passed as the third argument "user"
151+ * the argument passed to the job callback
152+ * will be the epicsJob*
153+ */
154+#define EPICSJOB_SELF epicsJobArgSelfMagic
155+epicsShareExtern void* epicsJobArgSelfMagic;
156+
157+/* creates, but does not add, a new job.
158+ * If pool in NULL then the job is not associated with any pool and
159+ * epicsJobMove() must be called before epicsJobQueue()
160+ * Safe to call from a running job function.
161+ * returns a new job pointer, or NULL on error
162+ */
163+epicsShareFunc epicsJob* epicsJobCreate(epicsThreadPool* pool,
164+ epicsJobFunction cb,
165+ void* user);
166+
167+/* Cancel and free a job structure. Does not block.
168+ * job may not be immediately free'd.
169+ * Safe to call from a running job function.
170+ */
171+epicsShareFunc void epicsJobDestroy(epicsJob*);
172+
173+/* Move the job to a different pool.
174+ * If pool is NULL then the job will no longer be associated
175+ * with any pool.
176+ * Not thread safe. Job must not be running or queued.
177+ * returns 0 on error, and non-zero on error
178+ */
179+epicsShareFunc int epicsJobMove(epicsJob* job, epicsThreadPool* pool);
180+
181+/* Adds the job to the run queue
182+ * Safe to call from a running job function.
183+ * returns 0 for success, non-zero on error.
184+ */
185+epicsShareFunc int epicsJobQueue(epicsJob*);
186+
187+/* Remove a job from the run queue if it is queued.
188+ * Safe to call from a running job function.
189+ * returns 0 if job was queued and now is not.
190+ * 1 if job already ran, is running, or was not queued before,
191+ * Other non-zero on error
192+ */
193+epicsShareFunc int epicsJobUnqueue(epicsJob*);
194+
195+
196+/* Mostly useful for debugging */
197+
198+epicsShareFunc void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd);
199+
200+/* Current number of active workers. May be less than the maximum */
201+epicsShareFunc unsigned int epicsThreadPoolNThreads(epicsThreadPool *);
202+
203+#ifdef __cplusplus
204+}
205+#endif
206+
207+#endif // EPICSTHREADPOOL_H
208
209=== added file 'src/libCom/pool/poolJob.c'
210--- src/libCom/pool/poolJob.c 1970-01-01 00:00:00 +0000
211+++ src/libCom/pool/poolJob.c 2014-07-29 16:22:24 +0000
212@@ -0,0 +1,328 @@
213+/*************************************************************************\
214+* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
215+* Brookhaven National Laboratory.
216+* EPICS BASE is distributed subject to a Software License Agreement found
217+* in file LICENSE that is included with this distribution.
218+\*************************************************************************/
219+
220+#include <stdlib.h>
221+#include <string.h>
222+#include <errno.h>
223+
224+#define epicsExportSharedSymbols
225+
226+#include "dbDefs.h"
227+#include "errlog.h"
228+#include "ellLib.h"
229+#include "epicsThread.h"
230+#include "epicsMutex.h"
231+#include "epicsEvent.h"
232+#include "epicsInterrupt.h"
233+
234+#include "epicsThreadPool.h"
235+#include "poolPriv.h"
236+
237+void* epicsJobArgSelfMagic = &epicsJobArgSelfMagic;
238+
239+static
240+void workerMain(void* arg)
241+{
242+ epicsThreadPool *pool=arg;
243+
244+ /* workers are created with counts
245+ * in the running, sleeping, and (possibly) waking counters
246+ */
247+
248+ epicsMutexMustLock(pool->guard);
249+ pool->threadsAreAwake++;
250+ pool->threadsSleeping--;
251+
252+ while(1)
253+ {
254+ ELLNODE *cur;
255+ epicsJob *job;
256+
257+ pool->threadsAreAwake--;
258+ pool->threadsSleeping++;
259+ epicsMutexUnlock(pool->guard);
260+
261+ epicsEventMustWait(pool->workerWakeup);
262+
263+ epicsMutexMustLock(pool->guard);
264+ pool->threadsSleeping--;
265+ pool->threadsAreAwake++;
266+
267+ if(pool->threadsWaking==0)
268+ continue;
269+
270+ pool->threadsWaking--;
271+
272+ CHECKCOUNT(pool);
273+
274+ if(pool->shutdown)
275+ break;
276+
277+ if(pool->pauserun)
278+ continue;
279+
280+ /* more threads to wakeup */
281+ if(pool->threadsWaking)
282+ {
283+ epicsEventSignal(pool->workerWakeup);
284+ }
285+
286+ while ((cur=ellGet(&pool->jobs)) !=NULL)
287+ {
288+ job=CONTAINER(cur, epicsJob, jobnode);
289+
290+ assert(job->queued && !job->running);
291+
292+ job->queued=0;
293+ job->running=1;
294+
295+ epicsMutexUnlock(pool->guard);
296+ (*job->func)(job->arg, epicsJobModeRun);
297+ epicsMutexMustLock(pool->guard);
298+
299+ if(job->freewhendone) {
300+ job->dead=1;
301+ free(job);
302+ } else {
303+ job->running=0;
304+ /* job may be re-queued from within callback */
305+ if(job->queued)
306+ ellAdd(&pool->jobs, &job->jobnode);
307+ else
308+ ellAdd(&pool->owned, &job->jobnode);
309+ }
310+
311+ }
312+
313+ if(pool->observerCount)
314+ epicsEventSignal(pool->observerWakeup);
315+ }
316+
317+ pool->threadsAreAwake--;
318+ pool->threadsRunning--;
319+
320+ {
321+ size_t nrun = pool->threadsRunning,
322+ ocnt = pool->observerCount;
323+ epicsMutexUnlock(pool->guard);
324+
325+ if(ocnt)
326+ epicsEventSignal(pool->observerWakeup);
327+
328+ if(nrun)
329+ epicsEventSignal(pool->workerWakeup); /* pass along */
330+ else
331+ epicsEventSignal(pool->shutdownEvent);
332+ }
333+
334+ return;
335+}
336+
337+int createPoolThread(epicsThreadPool *pool)
338+{
339+ epicsThreadId tid;
340+
341+ tid = epicsThreadCreate("PoolWorker",
342+ pool->conf.workerPriority,
343+ pool->conf.workerStack,
344+ &workerMain,
345+ pool);
346+ if(!tid)
347+ return 1;
348+
349+ pool->threadsRunning++;
350+ pool->threadsSleeping++;
351+ return 0;
352+}
353+
354+epicsJob* epicsJobCreate(epicsThreadPool* pool,
355+ epicsJobFunction func,
356+ void* arg)
357+{
358+ epicsJob *job=calloc(1, sizeof(*job));
359+
360+ if(!job)
361+ return NULL;
362+
363+ if(arg==&epicsJobArgSelfMagic)
364+ arg=job;
365+
366+ job->pool=NULL;
367+ job->func=func;
368+ job->arg=arg;
369+
370+ epicsJobMove(job, pool);
371+
372+ return job;
373+}
374+
375+void epicsJobDestroy(epicsJob* job)
376+{
377+ epicsThreadPool *pool;
378+ if(!job || !job->pool){
379+ free(job);
380+ return;
381+ }
382+ pool=job->pool;
383+
384+ epicsMutexMustLock(pool->guard);
385+
386+ assert(!job->dead);
387+
388+ epicsJobUnqueue(job);
389+
390+ if(job->running || job->freewhendone) {
391+ job->freewhendone=1;
392+ } else {
393+ ellDelete(&pool->owned, &job->jobnode);
394+ job->dead=1;
395+ free(job);
396+ }
397+
398+ epicsMutexUnlock(pool->guard);
399+}
400+
401+int epicsJobMove(epicsJob* job, epicsThreadPool* newpool)
402+{
403+ epicsThreadPool *pool=job->pool;
404+
405+ /* remove from current pool */
406+ if(pool) {
407+ epicsMutexMustLock(pool->guard);
408+
409+ if(job->queued || job->running) {
410+ epicsMutexUnlock(pool->guard);
411+ return EINVAL;
412+ }
413+
414+ ellDelete(&pool->owned, &job->jobnode);
415+
416+ epicsMutexUnlock(pool->guard);
417+ }
418+
419+ pool = job->pool = newpool;
420+
421+ /* add to new pool */
422+ if(pool) {
423+ epicsMutexMustLock(pool->guard);
424+
425+ ellAdd(&pool->owned, &job->jobnode);
426+
427+ epicsMutexUnlock(pool->guard);
428+ }
429+
430+ return 0;
431+}
432+
433+int epicsJobQueue(epicsJob* job)
434+{
435+ int ret=0;
436+ epicsThreadPool *pool=job->pool;
437+ if(!pool)
438+ return EINVAL;
439+
440+ epicsMutexMustLock(pool->guard);
441+
442+ assert(!job->dead);
443+
444+ if(pool->pauseadd) {
445+ ret=EPERM;
446+ goto done;
447+ } else if(job->freewhendone) {
448+ ret=EINVAL;
449+ goto done;
450+ } else if(job->queued) {
451+ goto done;
452+ }
453+
454+ job->queued=1;
455+ /* Job may be queued from within a callback */
456+ if(!job->running) {
457+ ellDelete(&pool->owned, &job->jobnode);
458+ ellAdd(&pool->jobs, &job->jobnode);
459+ } else {
460+ /* some worker will find it again before sleeping */
461+ goto done;
462+ }
463+
464+ /* Since we hold the lock, we can be certain that all awake worker are
465+ * executing work functions. The current thread may be a worker.
466+ * We prefer to wakeup a new worker rather then wait for a busy worker to
467+ * finish. However, after we initiate a wakeup there will be a race
468+ * between the worker waking up, and a busy worker finishing.
469+ * Thus we can't avoid spurious wakeups.
470+ */
471+
472+ if(pool->threadsRunning >= pool->conf.maxThreads) {
473+ /* all workers created... */
474+ /* ... but some are sleeping, so wake one up */
475+ if(pool->threadsWaking < pool->threadsSleeping) {
476+ pool->threadsWaking++;
477+ epicsEventSignal(pool->workerWakeup);
478+ }
479+ /*else one of the running workers will find this job before sleeping */
480+ CHECKCOUNT(pool);
481+
482+ } else {
483+ /* could create more workers so
484+ * will either create a new worker, or wakeup an existing worker
485+ */
486+
487+ if(pool->threadsWaking >= pool->threadsSleeping) {
488+ /* all sleeping workers have already been woken.
489+ * start a new worker for this job
490+ */
491+ if(createPoolThread(pool) && pool->threadsRunning==0) {
492+ /* oops, we couldn't lazy create our first worker
493+ * so this job would never run!
494+ */
495+ ret = EAGAIN;
496+ job->queued = 0;
497+ /* if threadsRunning==0 then no jobs can be running */
498+ assert(!job->running);
499+ ellDelete(&pool->jobs, &job->jobnode);
500+ ellAdd(&pool->owned, &job->jobnode);
501+ }
502+ }
503+ if(ret==0) {
504+ pool->threadsWaking++;
505+ epicsEventSignal(pool->workerWakeup);
506+ }
507+ CHECKCOUNT(pool);
508+ }
509+
510+done:
511+ epicsMutexUnlock(pool->guard);
512+ return ret;
513+}
514+
515+int epicsJobUnqueue(epicsJob* job)
516+{
517+ int ret=1;
518+ epicsThreadPool *pool=job->pool;
519+
520+ if(!pool)
521+ return EINVAL;
522+
523+ epicsMutexMustLock(pool->guard);
524+
525+ assert(!job->dead);
526+
527+ if(job->queued) {
528+ if(!job->running) {
529+ ellDelete(&pool->jobs, &job->jobnode);
530+ ellAdd(&pool->owned, &job->jobnode);
531+ }
532+ job->queued=0;
533+ ret=0;
534+ }
535+
536+ epicsMutexUnlock(pool->guard);
537+
538+ return ret;
539+}
540+
541
542=== added file 'src/libCom/pool/poolPriv.h'
543--- src/libCom/pool/poolPriv.h 1970-01-01 00:00:00 +0000
544+++ src/libCom/pool/poolPriv.h 2014-07-29 16:22:24 +0000
545@@ -0,0 +1,97 @@
546+/*************************************************************************\
547+* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
548+* Brookhaven National Laboratory.
549+* EPICS BASE is distributed subject to a Software License Agreement found
550+* in file LICENSE that is included with this distribution.
551+\*************************************************************************/
552+
553+#ifndef POOLPRIV_H
554+#define POOLPRIV_H
555+
556+#include "epicsThreadPool.h"
557+#include "ellLib.h"
558+#include "epicsThread.h"
559+#include "epicsEvent.h"
560+#include "epicsMutex.h"
561+
562+struct epicsThreadPool {
563+ ELLNODE sharedNode;
564+ size_t sharedCount;
565+
566+ ELLLIST jobs; /* run queue */
567+ ELLLIST owned; /* unqueued jobs. */
568+
569+ /* Worker state counters.
570+ * The life cycle of a worker is
571+ * Wakeup -> Awake -> Sleeping
572+ * Newly created workers go into the wakeup state
573+ */
574+
575+ /* # of running workers which are not waiting for a wakeup event */
576+ unsigned int threadsAreAwake;
577+ /* # of sleeping workers which need to be awakened */
578+ unsigned int threadsWaking;
579+ /* # of workers waiting on the workerWakeup event */
580+ unsigned int threadsSleeping;
581+ /* # of threads started and not stopped */
582+ unsigned int threadsRunning;
583+
584+ /* # of observers waiting on pool events */
585+ unsigned int observerCount;
586+
587+ epicsEventId workerWakeup;
588+ epicsEventId shutdownEvent;
589+
590+ epicsEventId observerWakeup;
591+
592+ /* Disallow epicsJobQueue */
593+ unsigned int pauseadd:1;
594+ /* Prevent workers from running new jobs */
595+ unsigned int pauserun:1;
596+ /* Prevent further changes to pool options */
597+ unsigned int freezeopt:1;
598+ /* tell workers to exit */
599+ unsigned int shutdown:1;
600+
601+ epicsMutexId guard;
602+
603+ /* copy of config passed when created */
604+ epicsThreadPoolConfig conf;
605+};
606+
607+/* Called after manipulating counters to check that invariants are preserved */
608+#define CHECKCOUNT(pPool) do{if(!(pPool)->shutdown) { \
609+ assert( (pPool)->threadsAreAwake + (pPool)->threadsSleeping == (pPool)->threadsRunning ); \
610+ assert( (pPool)->threadsWaking <= (pPool)->threadsSleeping ); \
611+}}while(0)
612+
613+/* When created a job is idle. queued and running are false
614+ * and jobnode is in the thread pool's owned list.
615+ *
616+ * When the job is added, the queued flag is set and jobnode
617+ * is in the jobs list.
618+ *
619+ * When the job starts running the queued flag is cleared and
620+ * the running flag is set. jobnode is not in any list
621+ * (held locally by worker).
622+ *
623+ * When the job has finished running, the running flag is cleared.
624+ * The queued flag may be set if the job re-added itself.
625+ * Based on the queued flag jobnode is added to the appropriate
626+ * list.
627+ */
628+struct epicsJob {
629+ ELLNODE jobnode;
630+ epicsJobFunction func;
631+ void *arg;
632+ epicsThreadPool *pool;
633+
634+ unsigned int queued:1;
635+ unsigned int running:1;
636+ unsigned int freewhendone:1; /* lazy delete of running job */
637+ unsigned int dead:1; /* flag to catch use of freed objects */
638+};
639+
640+int createPoolThread(epicsThreadPool *pool);
641+
642+#endif // POOLPRIV_H
643
644=== added file 'src/libCom/pool/threadPool.c'
645--- src/libCom/pool/threadPool.c 1970-01-01 00:00:00 +0000
646+++ src/libCom/pool/threadPool.c 2014-07-29 16:22:24 +0000
647@@ -0,0 +1,399 @@
648+/*************************************************************************\
649+* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
650+* Brookhaven National Laboratory.
651+* EPICS BASE is distributed subject to a Software License Agreement found
652+* in file LICENSE that is included with this distribution.
653+\*************************************************************************/
654+
655+#include <stdlib.h>
656+#include <string.h>
657+#include <errno.h>
658+
659+#define epicsExportSharedSymbols
660+
661+#include "dbDefs.h"
662+#include "errlog.h"
663+#include "ellLib.h"
664+#include "epicsThread.h"
665+#include "epicsMutex.h"
666+#include "epicsEvent.h"
667+#include "epicsInterrupt.h"
668+#include "cantProceed.h"
669+
670+#include "epicsThreadPool.h"
671+#include "poolPriv.h"
672+
673+
674+void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts)
675+{
676+ memset(opts, 0, sizeof(*opts));
677+ opts->maxThreads=epicsThreadGetCPUs();
678+ opts->workerStack=epicsThreadGetStackSize(epicsThreadStackSmall);
679+
680+ if(epicsThreadLowestPriorityLevelAbove(epicsThreadPriorityCAServerHigh, &opts->workerPriority)
681+ !=epicsThreadBooleanStatusSuccess)
682+ opts->workerPriority = epicsThreadPriorityMedium;
683+}
684+
685+epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts)
686+{
687+ size_t i;
688+ epicsThreadPool *pool;
689+
690+ /* caller likely didn't initialize the options structure */
691+ if(opts && opts->maxThreads==0) {
692+ errlogMessage("Error: epicsThreadPoolCreate() options provided, but not initialized");
693+ return NULL;
694+ }
695+
696+ pool=calloc(1, sizeof(*pool));
697+ if(!pool)
698+ return NULL;
699+
700+ if(opts)
701+ memcpy(&pool->conf, opts, sizeof(*opts));
702+ else
703+ epicsThreadPoolConfigDefaults(&pool->conf);
704+
705+ if(pool->conf.initialThreads > pool->conf.maxThreads)
706+ pool->conf.initialThreads = pool->conf.maxThreads;
707+
708+ pool->workerWakeup=epicsEventCreate(epicsEventEmpty);
709+ pool->shutdownEvent=epicsEventCreate(epicsEventEmpty);
710+ pool->observerWakeup=epicsEventCreate(epicsEventEmpty);
711+ pool->guard=epicsMutexCreate();
712+
713+ if(!pool->workerWakeup || !pool->shutdownEvent ||
714+ !pool->observerWakeup || !pool->guard)
715+ goto cleanup;
716+
717+ ellInit(&pool->jobs);
718+ ellInit(&pool->owned);
719+
720+ epicsMutexMustLock(pool->guard);
721+
722+ for(i=0; i<pool->conf.initialThreads; i++) {
723+ createPoolThread(pool);
724+ }
725+
726+ if(pool->threadsRunning==0 && pool->conf.initialThreads!=0) {
727+ epicsMutexUnlock(pool->guard);
728+ errlogPrintf("Error: Unable to create any threads for thread pool\n");
729+ goto cleanup;
730+
731+ }else if(pool->threadsRunning < pool->conf.initialThreads) {
732+ errlogPrintf("Warning: Unable to create all threads for thread pool (%lu/%lu)\n",
733+ (unsigned long)pool->threadsRunning,
734+ (unsigned long)pool->conf.initialThreads);
735+ }
736+
737+ epicsMutexUnlock(pool->guard);
738+
739+ return pool;
740+
741+cleanup:
742+ if(pool->workerWakeup) epicsEventDestroy(pool->workerWakeup);
743+ if(pool->shutdownEvent) epicsEventDestroy(pool->shutdownEvent);
744+ if(pool->observerWakeup) epicsEventDestroy(pool->observerWakeup);
745+ if(pool->guard) epicsMutexDestroy(pool->guard);
746+
747+ free(pool);
748+ return 0;
749+}
750+
751+static
752+void epicsThreadPoolControlImpl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val)
753+{
754+ if(pool->freezeopt)
755+ return;
756+
757+ if(opt==epicsThreadPoolQueueAdd) {
758+ pool->pauseadd = !val;
759+
760+ } else if(opt==epicsThreadPoolQueueRun) {
761+ if(!val && !pool->pauserun)
762+ pool->pauserun=1;
763+
764+ else if(val && pool->pauserun) {
765+ size_t jobs=(size_t)ellCount(&pool->jobs);
766+ pool->pauserun=0;
767+
768+ if(jobs) {
769+ size_t wakeable=pool->threadsSleeping - pool->threadsWaking;
770+ /* first try to give jobs to sleeping workers */
771+ if(wakeable) {
772+ int wakeup = jobs > wakeable ? wakeable : jobs;
773+ assert(wakeup>0);
774+ jobs-=wakeup;
775+ pool->threadsWaking+=wakeup;
776+ epicsEventSignal(pool->workerWakeup);
777+ CHECKCOUNT(pool);
778+ }
779+ }
780+ while(jobs-- && pool->threadsRunning < pool->conf.maxThreads) {
781+ if(createPoolThread(pool)==0) {
782+ pool->threadsWaking++;
783+ epicsEventSignal(pool->workerWakeup);
784+ } else
785+ break; /* oops, couldn't create worker */
786+ }
787+ CHECKCOUNT(pool);
788+ }
789+ }
790+ /* unknown options ignored */
791+
792+}
793+
794+void epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val)
795+{
796+ epicsMutexMustLock(pool->guard);
797+ epicsThreadPoolControlImpl(pool, opt, val);
798+ epicsMutexUnlock(pool->guard);
799+}
800+
801+int epicsThreadPoolWait(epicsThreadPool* pool, double timeout)
802+{
803+ int ret=0;
804+ epicsMutexMustLock(pool->guard);
805+
806+ while(ellCount(&pool->jobs)>0 || pool->threadsAreAwake>0) {
807+ pool->observerCount++;
808+ epicsMutexUnlock(pool->guard);
809+
810+ if(timeout<0.0) {
811+ epicsEventMustWait(pool->observerWakeup);
812+
813+ } else {
814+ switch(epicsEventWaitWithTimeout(pool->observerWakeup, timeout)) {
815+ case epicsEventWaitError:
816+ cantProceed("epicsThreadPoolWait: failed to wait for Event");
817+ break;
818+ case epicsEventWaitTimeout:
819+ ret=ETIMEDOUT;
820+ break;
821+ case epicsEventWaitOK:
822+ ret=0;
823+ break;
824+ }
825+
826+ }
827+
828+ epicsMutexMustLock(pool->guard);
829+ pool->observerCount--;
830+
831+ if(pool->observerCount)
832+ epicsEventSignal(pool->observerWakeup);
833+
834+ if(ret!=0)
835+ break;
836+ }
837+
838+ epicsMutexUnlock(pool->guard);
839+ return ret;
840+}
841+
842+void epicsThreadPoolDestroy(epicsThreadPool *pool)
843+{
844+ size_t nThr;
845+ ELLLIST notify;
846+ ELLNODE *cur;
847+
848+ if(!pool)
849+ return;
850+
851+ ellInit(&notify);
852+
853+ epicsMutexMustLock(pool->guard);
854+
855+ /* run remaining queued jobs */
856+ epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueAdd, 0);
857+ epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueRun, 1);
858+ nThr=pool->threadsRunning;
859+ pool->freezeopt = 1;
860+
861+ epicsMutexUnlock(pool->guard);
862+
863+ epicsThreadPoolWait(pool, -1.0);
864+ /* At this point all queued jobs have run */
865+
866+ epicsMutexMustLock(pool->guard);
867+
868+ pool->shutdown=1;
869+ /* wakeup all */
870+ if(pool->threadsWaking < pool->threadsSleeping) {
871+ pool->threadsWaking = pool->threadsSleeping;
872+ epicsEventSignal(pool->workerWakeup);
873+ }
874+
875+ ellConcat(&notify, &pool->owned);
876+ ellConcat(&notify, &pool->jobs);
877+
878+ epicsMutexUnlock(pool->guard);
879+
880+ if(nThr && epicsEventWait(pool->shutdownEvent)!=epicsEventWaitOK){
881+ errlogMessage("epicsThreadPoolDestroy: wait error");
882+ return;
883+ }
884+
885+ /* all workers are now shutdown */
886+
887+ /* notify remaining jobs that pool is being destroyed */
888+ while( (cur=ellGet(&notify))!=NULL )
889+ {
890+ epicsJob *job=CONTAINER(cur, epicsJob, jobnode);
891+ job->running=1;
892+ (*job->func)(job->arg, epicsJobModeCleanup);
893+ job->running=0;
894+ if(job->freewhendone)
895+ free(job);
896+ else
897+ job->pool=NULL; /* orphan */
898+ }
899+
900+ epicsEventDestroy(pool->workerWakeup);
901+ epicsEventDestroy(pool->shutdownEvent);
902+ epicsEventDestroy(pool->observerWakeup);
903+ epicsMutexDestroy(pool->guard);
904+
905+ free(pool);
906+}
907+
908+
909+void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd)
910+{
911+ ELLNODE *cur;
912+ epicsMutexMustLock(pool->guard);
913+
914+ fprintf(fd, "Thread Pool with %u/%u threads\n"
915+ " running %d jobs with %u threads\n",
916+ pool->threadsRunning,
917+ pool->conf.maxThreads,
918+ ellCount(&pool->jobs),
919+ pool->threadsAreAwake);
920+ if(pool->pauseadd)
921+ fprintf(fd, " Inhibit queueing\n");
922+ if(pool->pauserun)
923+ fprintf(fd, " Pause workers\n");
924+ if(pool->shutdown)
925+ fprintf(fd, " Shutdown in progress\n");
926+
927+ for(cur=ellFirst(&pool->jobs); cur; cur=ellNext(cur))
928+ {
929+ epicsJob *job=CONTAINER(cur, epicsJob, jobnode);
930+ fprintf(fd, " job %p func: %p, arg: %p ",
931+ job, job->func,
932+ job->arg);
933+ if(job->queued)
934+ fprintf(fd, "Queued ");
935+ if(job->running)
936+ fprintf(fd, "Running ");
937+ if(job->freewhendone)
938+ fprintf(fd, "Free ");
939+ fprintf(fd, "\n");
940+ }
941+
942+ epicsMutexUnlock(pool->guard);
943+}
944+
945+unsigned int epicsThreadPoolNThreads(epicsThreadPool *pool)
946+{
947+ unsigned int ret;
948+
949+ epicsMutexMustLock(pool->guard);
950+ ret=pool->threadsRunning;
951+ epicsMutexUnlock(pool->guard);
952+
953+ return ret;
954+}
955+
956+static
957+ELLLIST sharedPools = ELLLIST_INIT;
958+
959+static
960+epicsMutexId sharedPoolsGuard;
961+
962+static
963+epicsThreadOnceId sharedPoolsOnce = EPICS_THREAD_ONCE_INIT;
964+
965+static
966+void sharedPoolsInit(void* unused)
967+{
968+ sharedPoolsGuard = epicsMutexMustCreate();
969+}
970+
971+epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts)
972+{
973+ ELLNODE *node;
974+ epicsThreadPool *cur;
975+ epicsThreadPoolConfig defopts;
976+ size_t N=epicsThreadGetCPUs();
977+
978+ if(!opts) {
979+ epicsThreadPoolConfigDefaults(&defopts);
980+ opts = &defopts;
981+ }
982+ /* shared pools must have a minimum allowed number of workers.
983+ * Use the number of CPU cores
984+ */
985+ if(opts->maxThreads<N)
986+ opts->maxThreads=N;
987+
988+ epicsThreadOnce(&sharedPoolsOnce, &sharedPoolsInit, NULL);
989+
990+ epicsMutexMustLock(sharedPoolsGuard);
991+
992+ for(node=ellFirst(&sharedPools); node; node=ellNext(node))
993+ {
994+ cur=CONTAINER(node, epicsThreadPool, sharedNode);
995+
996+ /* Must have exactly the requested priority
997+ * At least the requested max workers
998+ * and at least the requested stack size
999+ */
1000+ if(cur->conf.workerPriority != opts->workerPriority)
1001+ continue;
1002+ if(cur->conf.maxThreads < opts->maxThreads)
1003+ continue;
1004+ if(cur->conf.workerStack < opts->workerStack)
1005+ continue;
1006+
1007+ cur->sharedCount++;
1008+ assert(cur->sharedCount>0);
1009+ epicsMutexUnlock(sharedPoolsGuard);
1010+
1011+ epicsMutexMustLock(cur->guard);
1012+ *opts = cur->conf;
1013+ epicsMutexUnlock(cur->guard);
1014+ return cur;
1015+ }
1016+
1017+ cur=epicsThreadPoolCreate(opts);
1018+ if(!cur) {
1019+ epicsMutexUnlock(sharedPoolsGuard);
1020+ return NULL;
1021+ }
1022+ cur->sharedCount=1;
1023+
1024+ ellAdd(&sharedPools, &cur->sharedNode);
1025+ epicsMutexUnlock(sharedPoolsGuard);
1026+ return cur;
1027+}
1028+
1029+epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool)
1030+{
1031+ if(!pool)
1032+ return;
1033+
1034+ epicsMutexMustLock(sharedPoolsGuard);
1035+
1036+ assert(pool->sharedCount>0);
1037+
1038+ pool->sharedCount--;
1039+
1040+ if(pool->sharedCount==0) {
1041+ ellDelete(&sharedPools, &pool->sharedNode);
1042+ epicsThreadPoolDestroy(pool);
1043+ }
1044+
1045+ epicsMutexUnlock(sharedPoolsGuard);
1046+}
1047
1048=== modified file 'src/libCom/test/Makefile'
1049--- src/libCom/test/Makefile 2013-12-17 18:54:04 +0000
1050+++ src/libCom/test/Makefile 2014-07-29 16:22:24 +0000
1051@@ -12,6 +12,10 @@
1052
1053 PROD_LIBS += Com
1054
1055+TESTPROD_HOST += epicsThreadPoolTest
1056+epicsThreadPoolTest_SRCS += epicsThreadPoolTest.c
1057+TESTS += epicsThreadPoolTest
1058+
1059 TESTPROD_HOST += epicsUnitTestTest
1060 epicsUnitTestTest_SRCS += epicsUnitTestTest.c
1061 # Not much point running this on vxWorks or RTEMS...
1062
1063=== added file 'src/libCom/test/epicsThreadPoolTest.c'
1064--- src/libCom/test/epicsThreadPoolTest.c 1970-01-01 00:00:00 +0000
1065+++ src/libCom/test/epicsThreadPoolTest.c 2014-07-29 16:22:24 +0000
1066@@ -0,0 +1,443 @@
1067+/*************************************************************************\
1068+* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
1069+* Brookhaven National Laboratory.
1070+* EPICS BASE is distributed subject to a Software License Agreement found
1071+* in file LICENSE that is included with this distribution.
1072+\*************************************************************************/
1073+
1074+#include "epicsThreadPool.h"
1075+
1076+/* included to allow tests to peek */
1077+#include "../../pool/poolPriv.h"
1078+
1079+#include "testMain.h"
1080+#include "epicsUnitTest.h"
1081+
1082+#include "cantProceed.h"
1083+#include "epicsEvent.h"
1084+#include "epicsMutex.h"
1085+#include "epicsThread.h"
1086+
1087+/* Do nothing */
1088+static void nullop(void)
1089+{
1090+ epicsThreadPool *pool;
1091+ testDiag("nullop()");
1092+ {
1093+ epicsThreadPoolConfig conf;
1094+ epicsThreadPoolConfigDefaults(&conf);
1095+ testOk1(conf.maxThreads>0);
1096+
1097+ testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
1098+ if(!pool)
1099+ return;
1100+ }
1101+
1102+ epicsThreadPoolDestroy(pool);
1103+}
1104+
1105+/* Just create and destroy worker threads */
1106+static void oneop(void)
1107+{
1108+ epicsThreadPool *pool;
1109+ testDiag("oneop()");
1110+ {
1111+ epicsThreadPoolConfig conf;
1112+ epicsThreadPoolConfigDefaults(&conf);
1113+ conf.initialThreads=2;
1114+ testOk1(conf.maxThreads>0);
1115+
1116+ testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
1117+ if(!pool)
1118+ return;
1119+ }
1120+
1121+ epicsThreadPoolDestroy(pool);
1122+}
1123+
1124+/* Test that Bursts of jobs will create enough threads to
1125+ * run all in parallel
1126+ */
1127+typedef struct {
1128+ epicsMutexId guard;
1129+ unsigned int count;
1130+ epicsEventId allrunning;
1131+ epicsEventId done;
1132+ epicsJob **job;
1133+} countPriv;
1134+
1135+static void countjob(void *param, epicsJobMode mode)
1136+{
1137+ countPriv *cnt=param;
1138+ testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
1139+ if(mode==epicsJobModeCleanup)
1140+ return;
1141+
1142+ epicsMutexMustLock(cnt->guard);
1143+ testDiag("Job %lu", (unsigned long)cnt->count);
1144+ cnt->count--;
1145+ if(cnt->count==0) {
1146+ testDiag("All jobs running");
1147+ epicsEventSignal(cnt->allrunning);
1148+ }
1149+ epicsMutexUnlock(cnt->guard);
1150+
1151+ epicsEventMustWait(cnt->done);
1152+ epicsEventSignal(cnt->done); /* pass along to next thread */
1153+}
1154+
1155+/* Starts "mcnt" jobs in a pool with initial and max
1156+ * thread counts "icnt" and "mcnt".
1157+ * The test ensures that all jobs run in parallel.
1158+ * "cork" checks the function of pausing the run queue
1159+ * with epicsThreadPoolQueueRun
1160+ */
1161+static void postjobs(size_t icnt, size_t mcnt, int cork)
1162+{
1163+ size_t i;
1164+ epicsThreadPool *pool;
1165+ countPriv *priv=callocMustSucceed(1, sizeof(*priv), "postjobs priv alloc");
1166+ priv->guard=epicsMutexMustCreate();
1167+ priv->done=epicsEventMustCreate(epicsEventEmpty);
1168+ priv->allrunning=epicsEventMustCreate(epicsEventEmpty);
1169+ priv->count=mcnt;
1170+ priv->job=callocMustSucceed(mcnt, sizeof(*priv->job), "postjobs job array");
1171+
1172+ testDiag("postjobs(%lu,%lu)", (unsigned long)icnt, (unsigned long)mcnt);
1173+
1174+ {
1175+ epicsThreadPoolConfig conf;
1176+ epicsThreadPoolConfigDefaults(&conf);
1177+ conf.initialThreads=icnt;
1178+ conf.maxThreads=mcnt;
1179+
1180+ testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
1181+ if(!pool)
1182+ return;
1183+ }
1184+
1185+ if(cork)
1186+ epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0);
1187+
1188+ for(i=0; i<mcnt; i++) {
1189+ testDiag("i=%lu", (unsigned long)i);
1190+ priv->job[i] = epicsJobCreate(pool, &countjob, priv);
1191+ testOk1(priv->job[i]!=NULL);
1192+ testOk1(epicsJobQueue(priv->job[i])==0);
1193+ }
1194+
1195+ if(cork) {
1196+ /* no jobs should have run */
1197+ epicsMutexMustLock(priv->guard);
1198+ testOk1(priv->count==mcnt);
1199+ epicsMutexUnlock(priv->guard);
1200+
1201+ epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1);
1202+ }
1203+
1204+ testDiag("Waiting for all jobs to start");
1205+ epicsEventMustWait(priv->allrunning);
1206+ testDiag("Stop all");
1207+ epicsEventSignal(priv->done);
1208+
1209+ for(i=0; i<mcnt; i++) {
1210+ testDiag("i=%lu", (unsigned long)i);
1211+ epicsJobDestroy(priv->job[i]);
1212+ }
1213+
1214+ epicsThreadPoolDestroy(pool);
1215+ epicsMutexDestroy(priv->guard);
1216+ epicsEventDestroy(priv->allrunning);
1217+ epicsEventDestroy(priv->done);
1218+ free(priv->job);
1219+ free(priv);
1220+}
1221+
1222+static unsigned int flag0 = 0;
1223+
1224+/* Test cancel from job (no-op)
1225+ * and destroy from job (lazy free)
1226+ */
1227+static void cleanupjob0(void* arg, epicsJobMode mode)
1228+{
1229+ epicsJob *job=arg;
1230+ testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
1231+ if(mode==epicsJobModeCleanup)
1232+ return;
1233+
1234+ assert(flag0==0);
1235+ flag0=1;
1236+
1237+ testOk1(epicsJobQueue(job)==0);
1238+
1239+ epicsJobDestroy(job); /* delete while job is running */
1240+}
1241+static void cleanupjob1(void* arg, epicsJobMode mode)
1242+{
1243+ epicsJob *job=arg;
1244+ testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
1245+ if(mode==epicsJobModeCleanup)
1246+ return;
1247+
1248+ testOk1(epicsJobQueue(job)==0);
1249+
1250+ testOk1(epicsJobUnqueue(job)==0);
1251+ /* delete later after job finishes, but before pool is destroyed */
1252+}
1253+static void cleanupjob2(void* arg, epicsJobMode mode)
1254+{
1255+ epicsJob *job=arg;
1256+ testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
1257+ if(mode==epicsJobModeCleanup)
1258+ epicsJobDestroy(job); /* delete when threadpool is destroyed */
1259+ else if(mode==epicsJobModeRun)
1260+ testOk1(epicsJobUnqueue(job)==1);
1261+}
1262+static epicsJobFunction cleanupjobs[3] = {&cleanupjob0,&cleanupjob1,&cleanupjob2};
1263+
1264+/* Tests three methods for job cleanup.
1265+ * 1. destroy which running
1266+ * 2. deferred cleanup after pool destroyed
1267+ * 3. immediate cleanup when pool destroyed
1268+ */
1269+static void testcleanup(void)
1270+{
1271+ int i=0;
1272+ epicsThreadPool *pool;
1273+ epicsJob *job[3];
1274+
1275+ testDiag("testcleanup()");
1276+
1277+ testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
1278+ if(!pool)
1279+ return;
1280+
1281+ /* unrolled so that valgrind can show which methods leaks */
1282+ testOk1((job[0]=epicsJobCreate(pool, cleanupjobs[0], EPICSJOB_SELF))!=NULL);
1283+ testOk1((job[1]=epicsJobCreate(pool, cleanupjobs[1], EPICSJOB_SELF))!=NULL);
1284+ testOk1((job[2]=epicsJobCreate(pool, cleanupjobs[2], EPICSJOB_SELF))!=NULL);
1285+ for(i=0; i<3; i++) {
1286+ testOk1(epicsJobQueue(job[i])==0);
1287+ }
1288+
1289+ epicsThreadPoolWait(pool, -1);
1290+ epicsJobDestroy(job[1]);
1291+ epicsThreadPoolDestroy(pool);
1292+}
1293+
1294+/* Test re-add from inside job */
1295+typedef struct {
1296+ unsigned int count;
1297+ epicsEventId done;
1298+ epicsJob *job;
1299+ unsigned int inprogress;
1300+} readdPriv;
1301+
1302+static void readdjob(void *arg, epicsJobMode mode)
1303+{
1304+ readdPriv *priv=arg;
1305+ testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
1306+ if(mode==epicsJobModeCleanup)
1307+ return;
1308+ testOk1(priv->inprogress==0);
1309+ testDiag("count==%u", priv->count);
1310+
1311+ if(priv->count--) {
1312+ priv->inprogress=1;
1313+ epicsJobQueue(priv->job);
1314+ epicsThreadSleep(0.05);
1315+ priv->inprogress=0;
1316+ }else{
1317+ epicsEventSignal(priv->done);
1318+ epicsJobDestroy(priv->job);
1319+ }
1320+}
1321+
1322+/* Test re-queueing a job while it is running.
1323+ * Check that a single job won't run concurrently.
1324+ */
1325+static void testreadd(void) {
1326+ epicsThreadPool *pool;
1327+ readdPriv *priv=callocMustSucceed(1, sizeof(*priv), "testcleanup priv");
1328+ readdPriv *priv2=callocMustSucceed(1, sizeof(*priv), "testcleanup priv");
1329+
1330+ testDiag("testreadd");
1331+
1332+ priv->done=epicsEventMustCreate(epicsEventEmpty);
1333+ priv->count=5;
1334+ priv2->done=epicsEventMustCreate(epicsEventEmpty);
1335+ priv2->count=5;
1336+
1337+ testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
1338+ if(!pool)
1339+ return;
1340+
1341+ testOk1((priv->job=epicsJobCreate(pool, &readdjob, priv))!=NULL);
1342+ testOk1((priv2->job=epicsJobCreate(pool, &readdjob, priv2))!=NULL);
1343+
1344+ testOk1(epicsJobQueue(priv->job)==0);
1345+ testOk1(epicsJobQueue(priv2->job)==0);
1346+ epicsEventMustWait(priv->done);
1347+ epicsEventMustWait(priv2->done);
1348+
1349+ testOk1(epicsThreadPoolNThreads(pool)==2);
1350+
1351+ epicsThreadPoolDestroy(pool);
1352+ epicsEventDestroy(priv->done);
1353+ epicsEventDestroy(priv2->done);
1354+ free(priv);
1355+ free(priv2);
1356+
1357+}
1358+
1359+static int shouldneverrun = 0;
1360+static int numtoolate = 0;
1361+
1362+/* test job canceling */
1363+static
1364+void neverrun(void *arg, epicsJobMode mode)
1365+{
1366+ epicsJob *job=arg;
1367+ testOk1(mode==epicsJobModeCleanup);
1368+ if(mode==epicsJobModeCleanup)
1369+ epicsJobDestroy(job);
1370+ else
1371+ shouldneverrun++;
1372+}
1373+static epicsEventId cancel[2];
1374+static
1375+void toolate(void *arg, epicsJobMode mode)
1376+{
1377+ epicsJob *job=arg;
1378+ if(mode==epicsJobModeCleanup){
1379+ epicsJobDestroy(job);
1380+ return;
1381+ }
1382+ testPass("Job runs");
1383+ numtoolate++;
1384+ epicsEventSignal(cancel[0]);
1385+ epicsEventMustWait(cancel[1]);
1386+}
1387+
1388+static
1389+void testcancel(void)
1390+{
1391+ epicsJob *job[2];
1392+ epicsThreadPool *pool;
1393+ testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
1394+ if(!pool)
1395+ return;
1396+
1397+ cancel[0]=epicsEventCreate(epicsEventEmpty);
1398+ cancel[1]=epicsEventCreate(epicsEventEmpty);
1399+
1400+ testOk1((job[0]=epicsJobCreate(pool, &neverrun, EPICSJOB_SELF))!=NULL);
1401+ testOk1((job[1]=epicsJobCreate(pool, &toolate, EPICSJOB_SELF))!=NULL);
1402+
1403+ /* freeze */
1404+ epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0);
1405+
1406+ testOk1(epicsJobUnqueue(job[0])==1); /* not queued yet */
1407+
1408+ epicsJobQueue(job[0]);
1409+ testOk1(epicsJobUnqueue(job[0])==0);
1410+ testOk1(epicsJobUnqueue(job[0])==1);
1411+
1412+ epicsThreadSleep(0.01);
1413+ epicsJobQueue(job[0]);
1414+ testOk1(epicsJobUnqueue(job[0])==0);
1415+ testOk1(epicsJobUnqueue(job[0])==1);
1416+
1417+ epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1);
1418+
1419+ epicsJobQueue(job[1]); /* actually let it run this time */
1420+
1421+ epicsEventMustWait(cancel[0]);
1422+ testOk1(epicsJobUnqueue(job[0])==1);
1423+ epicsEventSignal(cancel[1]);
1424+
1425+ epicsThreadPoolDestroy(pool);
1426+ epicsEventDestroy(cancel[0]);
1427+ epicsEventDestroy(cancel[1]);
1428+
1429+ testOk1(shouldneverrun==0);
1430+ testOk1(numtoolate==1);
1431+}
1432+
1433+static
1434+unsigned int sharedWasDeleted=0;
1435+
1436+static
1437+void lastjob(void *arg, epicsJobMode mode)
1438+{
1439+ epicsJob *job=arg;
1440+ if(mode==epicsJobModeCleanup) {
1441+ sharedWasDeleted=1;
1442+ epicsJobDestroy(job);
1443+ }
1444+}
1445+
1446+static
1447+void testshared(void)
1448+{
1449+ epicsThreadPool *poolA, *poolB;
1450+ epicsThreadPoolConfig conf;
1451+ epicsJob *job;
1452+
1453+ epicsThreadPoolConfigDefaults(&conf);
1454+
1455+ testDiag("Check reference counting of shared pools");
1456+
1457+ testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL);
1458+
1459+ testOk1(poolA->sharedCount==1);
1460+
1461+ testOk1((poolB=epicsThreadPoolGetShared(&conf))!=NULL);
1462+
1463+ testOk1(poolA==poolB);
1464+
1465+ testOk1(poolA->sharedCount==2);
1466+
1467+ epicsThreadPoolReleaseShared(poolA);
1468+
1469+ testOk1(poolB->sharedCount==1);
1470+
1471+ testOk1((job=epicsJobCreate(poolB, &lastjob, EPICSJOB_SELF))!=NULL);
1472+
1473+ epicsThreadPoolReleaseShared(poolB);
1474+
1475+ testOk1(sharedWasDeleted==1);
1476+
1477+ testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL);
1478+
1479+ testOk1(poolA->sharedCount==1);
1480+
1481+ epicsThreadPoolReleaseShared(poolA);
1482+
1483+}
1484+
1485+MAIN(epicsThreadPoolTest)
1486+{
1487+ testPlan(171);
1488+
1489+ nullop();
1490+ oneop();
1491+ testDiag("Queue with delayed start");
1492+ postjobs(1,1,1);
1493+ postjobs(0,1,1);
1494+ postjobs(4,4,1);
1495+ postjobs(0,4,1);
1496+ postjobs(2,4,1);
1497+ testDiag("Queue with immediate start");
1498+ postjobs(1,1,0);
1499+ postjobs(0,1,0);
1500+ postjobs(4,4,0);
1501+ postjobs(0,4,0);
1502+ postjobs(2,4,0);
1503+ testcleanup();
1504+ testreadd();
1505+ testcancel();
1506+ testshared();
1507+
1508+ return testDone();
1509+}

Subscribers

People subscribed via source and target branches

to all changes: