Merge lp:~epics-core/epics-base/thread-pool into lp:~epics-core/epics-base/3.15

Proposed by mdavidsaver
Status: Merged
Merged at revision: 12505
Proposed branch: lp:~epics-core/epics-base/thread-pool
Merge into: lp:~epics-core/epics-base/3.15
Diff against target: 1509 lines (+1446/-0)
9 files modified
documentation/RELEASE_NOTES.html (+8/-0)
src/libCom/Makefile (+1/-0)
src/libCom/pool/Makefile (+16/-0)
src/libCom/pool/epicsThreadPool.h (+150/-0)
src/libCom/pool/poolJob.c (+328/-0)
src/libCom/pool/poolPriv.h (+97/-0)
src/libCom/pool/threadPool.c (+399/-0)
src/libCom/test/Makefile (+4/-0)
src/libCom/test/epicsThreadPoolTest.c (+443/-0)
To merge this branch: bzr merge lp:~epics-core/epics-base/thread-pool
Reviewer Review Type Date Requested Status
Andrew Johnson Approve
mdavidsaver Needs Resubmitting
Review via email: mp+108385@code.launchpad.net

Description of the change

General purpose thread pool.

To post a comment you must log in.
Revision history for this message
mdavidsaver (mdavidsaver) wrote :

* Include in RTEMS/vxworks test harness

Done.

* For RTEMS/vxworks, replace mutex with interrupt disable

Done.

* Avoid excessive wakeups from worker and observer Events.

Done.

Also fixed several issues and added calls to shared thread pools.

Remaining task

Get approval of name epicsMaxParallelThreads()

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

The current implementation for RTEMS uses epicsEventSignal in places where interrupts have been disabled in the belief that this disables all context switching. This is not so.

review: Needs Fixing
Revision history for this message
mdavidsaver (mdavidsaver) wrote :

I've changed the implementation to use only mutex and abandoned plans to use spin locks.

Also, the API to report the number of cores is made private and will be replaced with Ralph's API after this is merged.

The error codes returned are changed to the usual form.

review: Approve
Revision history for this message
Ralph Lange (ralph-lange) wrote :

Michael,

could you push the documentation to a branch in https://code.launchpad.net/epics-appdev ?

Thank you,
~Ralph

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

> could you push the documentation to a branch in https://code.launchpad.net
> /epics-appdev ?

Done

lp:~mdavidsaver/epics-appdev/thread-pool

Also made some final cleanup of the API including the use enums instead of integers for the work functions mode and the epicsThreadPoolControl function option arguments. The epicsJobSet function is removed in favor of a magic argument EPICSJOB_SELF to set the user pointer to the epicsJob*. Also tweaked the default thread priority to run between highest CA server and the Medium scan.

Revision history for this message
Andrew Johnson (anj) wrote :

Please delete the _poolNumCores() implementations and use the new epicsThreadGetCPUs() API from epicsThread.h instead. Unfortunately that was added to the 3.15 branch in commit 12480 after this branch was created, so you'll either have to rebase or merge 3.15 into this branch to get it (rebase is better, I have had problems with hiding the history the other way).

While you're doing that, you could change the M_com value to avoid the clash with the now-defined M_stdlib. However, you may not realize that you are allowed to use status values from errno.h instead of inventing your own if they mean the same thing; errSymLookup() calls strerror() when given a status value with modNum <= 500, so you can use errno values such as ENOMEM, ERANGE, ETIMEDOUT etc. directly as status values. Maybe avoid the more obscure values for portability reasons, but anything returned by the OS or defined in POSIX.1 should be fine.

As a result I'm not keen on adding the errCommon.h header. If you need status values that don't match anything already defined, I'd rather that you add an M_pool and define your own S_pool_xxx status values in epicsThreadPool.h

I have not looked at the pool code itself, but I am reading the AppDevGuide entry. More comments tomorrow.

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

> Please delete the _poolNumCores() implementations and use the new
> epicsThreadGetCPUs() API from epicsThread.h instead.

Ok.

> ... you'll either have to rebase or merge 3.15 into this branch to get it

Ok. (We'll see how launchpad copes with this)

> ... you may not realize that you are
> allowed to use status values from errno.h instead of inventing your own if
> they mean the same thing;

Indeed I didn't know this, and will try.

> Maybe avoid the more obscure values

But I want to use EILSEQ :)

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

> > ... you may not realize that you are
> > allowed to use status values from errno.h instead of inventing your own if
> > they mean the same thing;
>
> Indeed I didn't know this, and will try.

To do this should I use SOCK_EINVAL instead of EINVAL?

FYI I think I can get by with only EINVAL and EWOULDBLOCK.

Revision history for this message
Andrew Johnson (anj) wrote :

On 07/24/2014 10:54 AM, mdavidsaver wrote:
> To do this should I use SOCK_EINVAL instead of EINVAL?

Since errlog uses strerror() to get the error string I think you have to
use EINVAL, which Microsoft supports at least back to MSVS-2003. I don't
know whether strerror() can handle WSAEINVAL, they don't say.

> FYI I think I can get by with only EINVAL and EWOULDBLOCK.

EWOULDBLOCK is listed in the MS documentation as supported by MSVS-2010
so that's fine too.

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

Ok, I think I just messed up trying to overwrite this branch with a re-based version.

review: Needs Fixing
Revision history for this message
Andrew Johnson (anj) wrote :

I think Launchpad just got itself confused, but by setting the status back to Needs Review and waiting a little it seems to have recovered and I think it's now showing the right diff again.

Revision history for this message
Andrew Johnson (anj) wrote :

Why are the thread and observer counters in struct epicsThreadPool all declared as size_t instead of unsigned integers? We'll never want even 2**32 threads or observers, so allowing for 2**64 on 64-bit systems seems silly. There is no printf format string for a size_t that works across all our OSs, and while your casting to unsigned long in epicsThreadPoolReport() works a cast should be unnecessary and still has to drop bits on 64-bit Windows where sizeof(size_t) == sizeof(unsigned long long).

Shouldn't that EWOULDBLOCK be ETIMEDOUT? It has already blocked for the timeout period, and this status has that meaning in sem_wait, mq_send/receive etc.

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

> Why are the thread and observer counters in struct epicsThreadPool all
> declared as size_t

Its shorter :)

> Shouldn't that EWOULDBLOCK be ETIMEDOUT? It has already blocked for the
> timeout period, and this status has that meaning in sem_wait, mq_send/receive
> etc.

Good point. I'll change this.

12503. By mdavidsaver

thread pool: epicsJobQueue return EPERM

When pool control prevents operation

12504. By mdavidsaver

thread pool: handle failure to create worker

epicsJobQueue() returns EAGAIN when the first worker
can't be lazily created.

Failure to create workers beyond the first is
silently ignored.

12505. By mdavidsaver

thread pool: fix return of epicsJobUnqueue()

Return 0 on success (was queued, now is not),
1 if not queued initially, and EINVAL
if orphaned.

12506. By mdavidsaver

thread pool: epicsThreadPoolWait return ETIMEOUT

12507. By mdavidsaver

thread pool: mark epicsJobCreate() as safe for job functions

Also, use epicsJobMove() to avoid some redundant code

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

Rebased and made requested changes. Removed S_com_* error codes in favor of E* code from errno.h.

Also improved handling when worker thread creation fails. This was treated as an error in epicsThreadPoolCreate(). Now also in epicsJobQueue() when lazy creation of the first worker fails.

Failure to create workers after the first is not treated as an error.
This requires that user code needing to ensure that more than one worker is
running must explicitly test with epicsThreadPoolNThreads().

review: Needs Resubmitting
Revision history for this message
Andrew Johnson (anj) wrote :

Your counters are still defined as size_t. Both epicsThreadPoolConfig and struct epicsThreadPool are guilty of this.

Strictly speaking the name _epicsJobArgSelf is reserved for the C implementation because it begins with an underscore and appears in the global namespace.
http://stackoverflow.com/questions/228783/what-are-the-rules-about-using-an-underscore-in-a-c-identifier

_epicsThreadPoolControl is OK because it's static, but I would prefer to see it named something like threadPoolControl so it doesn't get made visible and break the rule in the future.

12508. By mdavidsaver

don't include errCommon.h

doesn't exist anymore

12509. By mdavidsaver

thread pool: switch thread counts to unsigned int

size_t is considered overly optimistic

12510. By mdavidsaver

thread pool: don't use reserved names

Avoid global symbols with leading underscore

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

Done.

review: Needs Resubmitting
Revision history for this message
Andrew Johnson (anj) wrote :

Test 148 was failing on VxWorks, both 5.5.2 and 6.9, and should have failed on RTEMS too.

ok 147 - priv->inprogress==0
# count==0
not ok 148 - epicsThreadPoolNThreads(pool)==2
ok 149 - (pool=epicsThreadPoolCreate(NULL))!=NULL

epicsThreadPoolNThreads(pool) was returning 1 because this is a UP machine so the default value of maxThreads is 1.

I added an epicsThreadPoolConfig variable and set maxThreads to 2 to fix this.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'documentation/RELEASE_NOTES.html'
--- documentation/RELEASE_NOTES.html 2014-06-12 19:47:42 +0000
+++ documentation/RELEASE_NOTES.html 2014-07-29 16:22:24 +0000
@@ -15,6 +15,14 @@
15<h2 align="center">Changes between 3.15.0.1 and 3.15.0.2</h2>15<h2 align="center">Changes between 3.15.0.1 and 3.15.0.2</h2>
16<!-- Insert new items immediately below here ... -->16<!-- Insert new items immediately below here ... -->
1717
18<h3>
19General purpose thread pool</h3>
20
21<p>
22A general purpose threaded work queue API epicsThreadPool is added.
23Multiple pools can be created with controlable priority and number
24of worker threads. Lazy worker startup is supported.</p>
25
18<h3>Database field setting updates</h3>26<h3>Database field setting updates</h3>
1927
20<p>A database (.db) file loaded by an IOC does not have to repeat the record28<p>A database (.db) file loaded by an IOC does not have to repeat the record
2129
=== modified file 'src/libCom/Makefile'
--- src/libCom/Makefile 2012-04-27 17:21:29 +0000
+++ src/libCom/Makefile 2014-07-29 16:22:24 +0000
@@ -31,6 +31,7 @@
31include $(LIBCOM)/macLib/Makefile31include $(LIBCOM)/macLib/Makefile
32include $(LIBCOM)/misc/Makefile32include $(LIBCOM)/misc/Makefile
33include $(LIBCOM)/osi/Makefile33include $(LIBCOM)/osi/Makefile
34include $(LIBCOM)/pool/Makefile
34include $(LIBCOM)/ring/Makefile35include $(LIBCOM)/ring/Makefile
35include $(LIBCOM)/taskwd/Makefile36include $(LIBCOM)/taskwd/Makefile
36include $(LIBCOM)/timer/Makefile37include $(LIBCOM)/timer/Makefile
3738
=== added directory 'src/libCom/pool'
=== added file 'src/libCom/pool/Makefile'
--- src/libCom/pool/Makefile 1970-01-01 00:00:00 +0000
+++ src/libCom/pool/Makefile 2014-07-29 16:22:24 +0000
@@ -0,0 +1,16 @@
1#*************************************************************************
2# Copyright (c) 2014 UChicago Argonne LLC, as Operator of Argonne
3# National Laboratory.
4# EPICS BASE is distributed subject to a Software License Agreement found
5# in file LICENSE that is included with this distribution.
6#*************************************************************************
7
8# This is a Makefile fragment, see src/libCom/Makefile.
9
10SRC_DIRS += $(LIBCOM)/pool
11
12INC += epicsThreadPool.h
13
14Com_SRCS += poolJob.c
15Com_SRCS += threadPool.c
16
017
=== added file 'src/libCom/pool/epicsThreadPool.h'
--- src/libCom/pool/epicsThreadPool.h 1970-01-01 00:00:00 +0000
+++ src/libCom/pool/epicsThreadPool.h 2014-07-29 16:22:24 +0000
@@ -0,0 +1,150 @@
1/*************************************************************************\
2* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
3* Brookhaven National Laboratory.
4* EPICS BASE is distributed subject to a Software License Agreement found
5* in file LICENSE that is included with this distribution.
6\*************************************************************************/
7/* General purpose worker thread pool manager
8 * mdavidsaver@bnl.gov
9 */
10#ifndef EPICSTHREADPOOL_H
11#define EPICSTHREADPOOL_H
12
13#include <stdlib.h>
14#include <stdio.h>
15
16#include "shareLib.h"
17
18#ifdef __cplusplus
19extern "C" {
20#endif
21
22typedef struct {
23 unsigned int initialThreads;
24 unsigned int maxThreads;
25 unsigned int workerStack;
26 unsigned int workerPriority;
27} epicsThreadPoolConfig;
28
29typedef struct epicsThreadPool epicsThreadPool;
30
31/* Job function call modes */
32typedef enum {
33 /* Normal run of job */
34 epicsJobModeRun,
35 /* Thread pool is being destroyed.
36 * A chance to cleanup the job immediately with epicsJobDestroy().
37 * If ignored, the job is orphaned (dissociated from the thread pool)
38 * and epicsJobDestroy() must be called later.
39 */
40 epicsJobModeCleanup
41} epicsJobMode;
42
43typedef void (*epicsJobFunction)(void* arg, epicsJobMode mode);
44
45typedef struct epicsJob epicsJob;
46
47/* Pool operations */
48
49/* Initialize a pool config with default values.
50 * This much be done to preserve future compatibility
51 * when new options are added.
52 */
53epicsShareFunc void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *);
54
55/* fetch or create a thread pool which can be shared with other users.
56 * may return NULL for allocation failures
57 */
58epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts);
59epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool);
60
61/* If opts is NULL then defaults are used.
62 * The opts pointer is not stored by this call, and may exist on the stack.
63 */
64epicsShareFunc epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts);
65
66/* Blocks until all worker threads have stopped.
67 * Any jobs still attached to this pool receive a callback with EPICSJOB_CLEANUP
68 * and are then orphaned.
69 */
70epicsShareFunc void epicsThreadPoolDestroy(epicsThreadPool *);
71
72/* pool control options */
73typedef enum {
74 epicsThreadPoolQueueAdd, /* val==0 causes epicsJobQueue to fail, 1 is default */
75 epicsThreadPoolQueueRun /* val==0 prevents workers from running jobs, 1 is default */
76} epicsThreadPoolOption;
77epicsShareFunc void epicsThreadPoolControl(epicsThreadPool* pool,
78 epicsThreadPoolOption opt,
79 unsigned int val);
80
81/* Block until job queue is emptied and no jobs are running.
82 * Useful after calling epicsThreadPoolControl() with option epicsThreadPoolQueueAdd=0
83 *
84 * timeout<0 waits forever, timeout==0 polls, timeout>0 waits at most one timeout period
85 * Returns 0 for success or non-zero on error (timeout is ETIMEOUT)
86 */
87epicsShareFunc int epicsThreadPoolWait(epicsThreadPool* pool, double timeout);
88
89
90/* Per job operations */
91
92/* special flag for epicsJobCreate().
93 * When passed as the third argument "user"
94 * the argument passed to the job callback
95 * will be the epicsJob*
96 */
97#define EPICSJOB_SELF epicsJobArgSelfMagic
98epicsShareExtern void* epicsJobArgSelfMagic;
99
100/* creates, but does not add, a new job.
101 * If pool in NULL then the job is not associated with any pool and
102 * epicsJobMove() must be called before epicsJobQueue()
103 * Safe to call from a running job function.
104 * returns a new job pointer, or NULL on error
105 */
106epicsShareFunc epicsJob* epicsJobCreate(epicsThreadPool* pool,
107 epicsJobFunction cb,
108 void* user);
109
110/* Cancel and free a job structure. Does not block.
111 * job may not be immediately free'd.
112 * Safe to call from a running job function.
113 */
114epicsShareFunc void epicsJobDestroy(epicsJob*);
115
116/* Move the job to a different pool.
117 * If pool is NULL then the job will no longer be associated
118 * with any pool.
119 * Not thread safe. Job must not be running or queued.
120 * returns 0 on error, and non-zero on error
121 */
122epicsShareFunc int epicsJobMove(epicsJob* job, epicsThreadPool* pool);
123
124/* Adds the job to the run queue
125 * Safe to call from a running job function.
126 * returns 0 for success, non-zero on error.
127 */
128epicsShareFunc int epicsJobQueue(epicsJob*);
129
130/* Remove a job from the run queue if it is queued.
131 * Safe to call from a running job function.
132 * returns 0 if job was queued and now is not.
133 * 1 if job already ran, is running, or was not queued before,
134 * Other non-zero on error
135 */
136epicsShareFunc int epicsJobUnqueue(epicsJob*);
137
138
139/* Mostly useful for debugging */
140
141epicsShareFunc void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd);
142
143/* Current number of active workers. May be less than the maximum */
144epicsShareFunc unsigned int epicsThreadPoolNThreads(epicsThreadPool *);
145
146#ifdef __cplusplus
147}
148#endif
149
150#endif // EPICSTHREADPOOL_H
0151
=== added file 'src/libCom/pool/poolJob.c'
--- src/libCom/pool/poolJob.c 1970-01-01 00:00:00 +0000
+++ src/libCom/pool/poolJob.c 2014-07-29 16:22:24 +0000
@@ -0,0 +1,328 @@
1/*************************************************************************\
2* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
3* Brookhaven National Laboratory.
4* EPICS BASE is distributed subject to a Software License Agreement found
5* in file LICENSE that is included with this distribution.
6\*************************************************************************/
7
8#include <stdlib.h>
9#include <string.h>
10#include <errno.h>
11
12#define epicsExportSharedSymbols
13
14#include "dbDefs.h"
15#include "errlog.h"
16#include "ellLib.h"
17#include "epicsThread.h"
18#include "epicsMutex.h"
19#include "epicsEvent.h"
20#include "epicsInterrupt.h"
21
22#include "epicsThreadPool.h"
23#include "poolPriv.h"
24
25void* epicsJobArgSelfMagic = &epicsJobArgSelfMagic;
26
27static
28void workerMain(void* arg)
29{
30 epicsThreadPool *pool=arg;
31
32 /* workers are created with counts
33 * in the running, sleeping, and (possibly) waking counters
34 */
35
36 epicsMutexMustLock(pool->guard);
37 pool->threadsAreAwake++;
38 pool->threadsSleeping--;
39
40 while(1)
41 {
42 ELLNODE *cur;
43 epicsJob *job;
44
45 pool->threadsAreAwake--;
46 pool->threadsSleeping++;
47 epicsMutexUnlock(pool->guard);
48
49 epicsEventMustWait(pool->workerWakeup);
50
51 epicsMutexMustLock(pool->guard);
52 pool->threadsSleeping--;
53 pool->threadsAreAwake++;
54
55 if(pool->threadsWaking==0)
56 continue;
57
58 pool->threadsWaking--;
59
60 CHECKCOUNT(pool);
61
62 if(pool->shutdown)
63 break;
64
65 if(pool->pauserun)
66 continue;
67
68 /* more threads to wakeup */
69 if(pool->threadsWaking)
70 {
71 epicsEventSignal(pool->workerWakeup);
72 }
73
74 while ((cur=ellGet(&pool->jobs)) !=NULL)
75 {
76 job=CONTAINER(cur, epicsJob, jobnode);
77
78 assert(job->queued && !job->running);
79
80 job->queued=0;
81 job->running=1;
82
83 epicsMutexUnlock(pool->guard);
84 (*job->func)(job->arg, epicsJobModeRun);
85 epicsMutexMustLock(pool->guard);
86
87 if(job->freewhendone) {
88 job->dead=1;
89 free(job);
90 } else {
91 job->running=0;
92 /* job may be re-queued from within callback */
93 if(job->queued)
94 ellAdd(&pool->jobs, &job->jobnode);
95 else
96 ellAdd(&pool->owned, &job->jobnode);
97 }
98
99 }
100
101 if(pool->observerCount)
102 epicsEventSignal(pool->observerWakeup);
103 }
104
105 pool->threadsAreAwake--;
106 pool->threadsRunning--;
107
108 {
109 size_t nrun = pool->threadsRunning,
110 ocnt = pool->observerCount;
111 epicsMutexUnlock(pool->guard);
112
113 if(ocnt)
114 epicsEventSignal(pool->observerWakeup);
115
116 if(nrun)
117 epicsEventSignal(pool->workerWakeup); /* pass along */
118 else
119 epicsEventSignal(pool->shutdownEvent);
120 }
121
122 return;
123}
124
125int createPoolThread(epicsThreadPool *pool)
126{
127 epicsThreadId tid;
128
129 tid = epicsThreadCreate("PoolWorker",
130 pool->conf.workerPriority,
131 pool->conf.workerStack,
132 &workerMain,
133 pool);
134 if(!tid)
135 return 1;
136
137 pool->threadsRunning++;
138 pool->threadsSleeping++;
139 return 0;
140}
141
142epicsJob* epicsJobCreate(epicsThreadPool* pool,
143 epicsJobFunction func,
144 void* arg)
145{
146 epicsJob *job=calloc(1, sizeof(*job));
147
148 if(!job)
149 return NULL;
150
151 if(arg==&epicsJobArgSelfMagic)
152 arg=job;
153
154 job->pool=NULL;
155 job->func=func;
156 job->arg=arg;
157
158 epicsJobMove(job, pool);
159
160 return job;
161}
162
163void epicsJobDestroy(epicsJob* job)
164{
165 epicsThreadPool *pool;
166 if(!job || !job->pool){
167 free(job);
168 return;
169 }
170 pool=job->pool;
171
172 epicsMutexMustLock(pool->guard);
173
174 assert(!job->dead);
175
176 epicsJobUnqueue(job);
177
178 if(job->running || job->freewhendone) {
179 job->freewhendone=1;
180 } else {
181 ellDelete(&pool->owned, &job->jobnode);
182 job->dead=1;
183 free(job);
184 }
185
186 epicsMutexUnlock(pool->guard);
187}
188
189int epicsJobMove(epicsJob* job, epicsThreadPool* newpool)
190{
191 epicsThreadPool *pool=job->pool;
192
193 /* remove from current pool */
194 if(pool) {
195 epicsMutexMustLock(pool->guard);
196
197 if(job->queued || job->running) {
198 epicsMutexUnlock(pool->guard);
199 return EINVAL;
200 }
201
202 ellDelete(&pool->owned, &job->jobnode);
203
204 epicsMutexUnlock(pool->guard);
205 }
206
207 pool = job->pool = newpool;
208
209 /* add to new pool */
210 if(pool) {
211 epicsMutexMustLock(pool->guard);
212
213 ellAdd(&pool->owned, &job->jobnode);
214
215 epicsMutexUnlock(pool->guard);
216 }
217
218 return 0;
219}
220
221int epicsJobQueue(epicsJob* job)
222{
223 int ret=0;
224 epicsThreadPool *pool=job->pool;
225 if(!pool)
226 return EINVAL;
227
228 epicsMutexMustLock(pool->guard);
229
230 assert(!job->dead);
231
232 if(pool->pauseadd) {
233 ret=EPERM;
234 goto done;
235 } else if(job->freewhendone) {
236 ret=EINVAL;
237 goto done;
238 } else if(job->queued) {
239 goto done;
240 }
241
242 job->queued=1;
243 /* Job may be queued from within a callback */
244 if(!job->running) {
245 ellDelete(&pool->owned, &job->jobnode);
246 ellAdd(&pool->jobs, &job->jobnode);
247 } else {
248 /* some worker will find it again before sleeping */
249 goto done;
250 }
251
252 /* Since we hold the lock, we can be certain that all awake worker are
253 * executing work functions. The current thread may be a worker.
254 * We prefer to wakeup a new worker rather then wait for a busy worker to
255 * finish. However, after we initiate a wakeup there will be a race
256 * between the worker waking up, and a busy worker finishing.
257 * Thus we can't avoid spurious wakeups.
258 */
259
260 if(pool->threadsRunning >= pool->conf.maxThreads) {
261 /* all workers created... */
262 /* ... but some are sleeping, so wake one up */
263 if(pool->threadsWaking < pool->threadsSleeping) {
264 pool->threadsWaking++;
265 epicsEventSignal(pool->workerWakeup);
266 }
267 /*else one of the running workers will find this job before sleeping */
268 CHECKCOUNT(pool);
269
270 } else {
271 /* could create more workers so
272 * will either create a new worker, or wakeup an existing worker
273 */
274
275 if(pool->threadsWaking >= pool->threadsSleeping) {
276 /* all sleeping workers have already been woken.
277 * start a new worker for this job
278 */
279 if(createPoolThread(pool) && pool->threadsRunning==0) {
280 /* oops, we couldn't lazy create our first worker
281 * so this job would never run!
282 */
283 ret = EAGAIN;
284 job->queued = 0;
285 /* if threadsRunning==0 then no jobs can be running */
286 assert(!job->running);
287 ellDelete(&pool->jobs, &job->jobnode);
288 ellAdd(&pool->owned, &job->jobnode);
289 }
290 }
291 if(ret==0) {
292 pool->threadsWaking++;
293 epicsEventSignal(pool->workerWakeup);
294 }
295 CHECKCOUNT(pool);
296 }
297
298done:
299 epicsMutexUnlock(pool->guard);
300 return ret;
301}
302
303int epicsJobUnqueue(epicsJob* job)
304{
305 int ret=1;
306 epicsThreadPool *pool=job->pool;
307
308 if(!pool)
309 return EINVAL;
310
311 epicsMutexMustLock(pool->guard);
312
313 assert(!job->dead);
314
315 if(job->queued) {
316 if(!job->running) {
317 ellDelete(&pool->jobs, &job->jobnode);
318 ellAdd(&pool->owned, &job->jobnode);
319 }
320 job->queued=0;
321 ret=0;
322 }
323
324 epicsMutexUnlock(pool->guard);
325
326 return ret;
327}
328
0329
=== added file 'src/libCom/pool/poolPriv.h'
--- src/libCom/pool/poolPriv.h 1970-01-01 00:00:00 +0000
+++ src/libCom/pool/poolPriv.h 2014-07-29 16:22:24 +0000
@@ -0,0 +1,97 @@
1/*************************************************************************\
2* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
3* Brookhaven National Laboratory.
4* EPICS BASE is distributed subject to a Software License Agreement found
5* in file LICENSE that is included with this distribution.
6\*************************************************************************/
7
8#ifndef POOLPRIV_H
9#define POOLPRIV_H
10
11#include "epicsThreadPool.h"
12#include "ellLib.h"
13#include "epicsThread.h"
14#include "epicsEvent.h"
15#include "epicsMutex.h"
16
17struct epicsThreadPool {
18 ELLNODE sharedNode;
19 size_t sharedCount;
20
21 ELLLIST jobs; /* run queue */
22 ELLLIST owned; /* unqueued jobs. */
23
24 /* Worker state counters.
25 * The life cycle of a worker is
26 * Wakeup -> Awake -> Sleeping
27 * Newly created workers go into the wakeup state
28 */
29
30 /* # of running workers which are not waiting for a wakeup event */
31 unsigned int threadsAreAwake;
32 /* # of sleeping workers which need to be awakened */
33 unsigned int threadsWaking;
34 /* # of workers waiting on the workerWakeup event */
35 unsigned int threadsSleeping;
36 /* # of threads started and not stopped */
37 unsigned int threadsRunning;
38
39 /* # of observers waiting on pool events */
40 unsigned int observerCount;
41
42 epicsEventId workerWakeup;
43 epicsEventId shutdownEvent;
44
45 epicsEventId observerWakeup;
46
47 /* Disallow epicsJobQueue */
48 unsigned int pauseadd:1;
49 /* Prevent workers from running new jobs */
50 unsigned int pauserun:1;
51 /* Prevent further changes to pool options */
52 unsigned int freezeopt:1;
53 /* tell workers to exit */
54 unsigned int shutdown:1;
55
56 epicsMutexId guard;
57
58 /* copy of config passed when created */
59 epicsThreadPoolConfig conf;
60};
61
62/* Called after manipulating counters to check that invariants are preserved */
63#define CHECKCOUNT(pPool) do{if(!(pPool)->shutdown) { \
64 assert( (pPool)->threadsAreAwake + (pPool)->threadsSleeping == (pPool)->threadsRunning ); \
65 assert( (pPool)->threadsWaking <= (pPool)->threadsSleeping ); \
66}}while(0)
67
68/* When created a job is idle. queued and running are false
69 * and jobnode is in the thread pool's owned list.
70 *
71 * When the job is added, the queued flag is set and jobnode
72 * is in the jobs list.
73 *
74 * When the job starts running the queued flag is cleared and
75 * the running flag is set. jobnode is not in any list
76 * (held locally by worker).
77 *
78 * When the job has finished running, the running flag is cleared.
79 * The queued flag may be set if the job re-added itself.
80 * Based on the queued flag jobnode is added to the appropriate
81 * list.
82 */
83struct epicsJob {
84 ELLNODE jobnode;
85 epicsJobFunction func;
86 void *arg;
87 epicsThreadPool *pool;
88
89 unsigned int queued:1;
90 unsigned int running:1;
91 unsigned int freewhendone:1; /* lazy delete of running job */
92 unsigned int dead:1; /* flag to catch use of freed objects */
93};
94
95int createPoolThread(epicsThreadPool *pool);
96
97#endif // POOLPRIV_H
098
=== added file 'src/libCom/pool/threadPool.c'
--- src/libCom/pool/threadPool.c 1970-01-01 00:00:00 +0000
+++ src/libCom/pool/threadPool.c 2014-07-29 16:22:24 +0000
@@ -0,0 +1,399 @@
1/*************************************************************************\
2* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
3* Brookhaven National Laboratory.
4* EPICS BASE is distributed subject to a Software License Agreement found
5* in file LICENSE that is included with this distribution.
6\*************************************************************************/
7
8#include <stdlib.h>
9#include <string.h>
10#include <errno.h>
11
12#define epicsExportSharedSymbols
13
14#include "dbDefs.h"
15#include "errlog.h"
16#include "ellLib.h"
17#include "epicsThread.h"
18#include "epicsMutex.h"
19#include "epicsEvent.h"
20#include "epicsInterrupt.h"
21#include "cantProceed.h"
22
23#include "epicsThreadPool.h"
24#include "poolPriv.h"
25
26
27void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts)
28{
29 memset(opts, 0, sizeof(*opts));
30 opts->maxThreads=epicsThreadGetCPUs();
31 opts->workerStack=epicsThreadGetStackSize(epicsThreadStackSmall);
32
33 if(epicsThreadLowestPriorityLevelAbove(epicsThreadPriorityCAServerHigh, &opts->workerPriority)
34 !=epicsThreadBooleanStatusSuccess)
35 opts->workerPriority = epicsThreadPriorityMedium;
36}
37
38epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts)
39{
40 size_t i;
41 epicsThreadPool *pool;
42
43 /* caller likely didn't initialize the options structure */
44 if(opts && opts->maxThreads==0) {
45 errlogMessage("Error: epicsThreadPoolCreate() options provided, but not initialized");
46 return NULL;
47 }
48
49 pool=calloc(1, sizeof(*pool));
50 if(!pool)
51 return NULL;
52
53 if(opts)
54 memcpy(&pool->conf, opts, sizeof(*opts));
55 else
56 epicsThreadPoolConfigDefaults(&pool->conf);
57
58 if(pool->conf.initialThreads > pool->conf.maxThreads)
59 pool->conf.initialThreads = pool->conf.maxThreads;
60
61 pool->workerWakeup=epicsEventCreate(epicsEventEmpty);
62 pool->shutdownEvent=epicsEventCreate(epicsEventEmpty);
63 pool->observerWakeup=epicsEventCreate(epicsEventEmpty);
64 pool->guard=epicsMutexCreate();
65
66 if(!pool->workerWakeup || !pool->shutdownEvent ||
67 !pool->observerWakeup || !pool->guard)
68 goto cleanup;
69
70 ellInit(&pool->jobs);
71 ellInit(&pool->owned);
72
73 epicsMutexMustLock(pool->guard);
74
75 for(i=0; i<pool->conf.initialThreads; i++) {
76 createPoolThread(pool);
77 }
78
79 if(pool->threadsRunning==0 && pool->conf.initialThreads!=0) {
80 epicsMutexUnlock(pool->guard);
81 errlogPrintf("Error: Unable to create any threads for thread pool\n");
82 goto cleanup;
83
84 }else if(pool->threadsRunning < pool->conf.initialThreads) {
85 errlogPrintf("Warning: Unable to create all threads for thread pool (%lu/%lu)\n",
86 (unsigned long)pool->threadsRunning,
87 (unsigned long)pool->conf.initialThreads);
88 }
89
90 epicsMutexUnlock(pool->guard);
91
92 return pool;
93
94cleanup:
95 if(pool->workerWakeup) epicsEventDestroy(pool->workerWakeup);
96 if(pool->shutdownEvent) epicsEventDestroy(pool->shutdownEvent);
97 if(pool->observerWakeup) epicsEventDestroy(pool->observerWakeup);
98 if(pool->guard) epicsMutexDestroy(pool->guard);
99
100 free(pool);
101 return 0;
102}
103
104static
105void epicsThreadPoolControlImpl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val)
106{
107 if(pool->freezeopt)
108 return;
109
110 if(opt==epicsThreadPoolQueueAdd) {
111 pool->pauseadd = !val;
112
113 } else if(opt==epicsThreadPoolQueueRun) {
114 if(!val && !pool->pauserun)
115 pool->pauserun=1;
116
117 else if(val && pool->pauserun) {
118 size_t jobs=(size_t)ellCount(&pool->jobs);
119 pool->pauserun=0;
120
121 if(jobs) {
122 size_t wakeable=pool->threadsSleeping - pool->threadsWaking;
123 /* first try to give jobs to sleeping workers */
124 if(wakeable) {
125 int wakeup = jobs > wakeable ? wakeable : jobs;
126 assert(wakeup>0);
127 jobs-=wakeup;
128 pool->threadsWaking+=wakeup;
129 epicsEventSignal(pool->workerWakeup);
130 CHECKCOUNT(pool);
131 }
132 }
133 while(jobs-- && pool->threadsRunning < pool->conf.maxThreads) {
134 if(createPoolThread(pool)==0) {
135 pool->threadsWaking++;
136 epicsEventSignal(pool->workerWakeup);
137 } else
138 break; /* oops, couldn't create worker */
139 }
140 CHECKCOUNT(pool);
141 }
142 }
143 /* unknown options ignored */
144
145}
146
147void epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val)
148{
149 epicsMutexMustLock(pool->guard);
150 epicsThreadPoolControlImpl(pool, opt, val);
151 epicsMutexUnlock(pool->guard);
152}
153
154int epicsThreadPoolWait(epicsThreadPool* pool, double timeout)
155{
156 int ret=0;
157 epicsMutexMustLock(pool->guard);
158
159 while(ellCount(&pool->jobs)>0 || pool->threadsAreAwake>0) {
160 pool->observerCount++;
161 epicsMutexUnlock(pool->guard);
162
163 if(timeout<0.0) {
164 epicsEventMustWait(pool->observerWakeup);
165
166 } else {
167 switch(epicsEventWaitWithTimeout(pool->observerWakeup, timeout)) {
168 case epicsEventWaitError:
169 cantProceed("epicsThreadPoolWait: failed to wait for Event");
170 break;
171 case epicsEventWaitTimeout:
172 ret=ETIMEDOUT;
173 break;
174 case epicsEventWaitOK:
175 ret=0;
176 break;
177 }
178
179 }
180
181 epicsMutexMustLock(pool->guard);
182 pool->observerCount--;
183
184 if(pool->observerCount)
185 epicsEventSignal(pool->observerWakeup);
186
187 if(ret!=0)
188 break;
189 }
190
191 epicsMutexUnlock(pool->guard);
192 return ret;
193}
194
195void epicsThreadPoolDestroy(epicsThreadPool *pool)
196{
197 size_t nThr;
198 ELLLIST notify;
199 ELLNODE *cur;
200
201 if(!pool)
202 return;
203
204 ellInit(&notify);
205
206 epicsMutexMustLock(pool->guard);
207
208 /* run remaining queued jobs */
209 epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueAdd, 0);
210 epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueRun, 1);
211 nThr=pool->threadsRunning;
212 pool->freezeopt = 1;
213
214 epicsMutexUnlock(pool->guard);
215
216 epicsThreadPoolWait(pool, -1.0);
217 /* At this point all queued jobs have run */
218
219 epicsMutexMustLock(pool->guard);
220
221 pool->shutdown=1;
222 /* wakeup all */
223 if(pool->threadsWaking < pool->threadsSleeping) {
224 pool->threadsWaking = pool->threadsSleeping;
225 epicsEventSignal(pool->workerWakeup);
226 }
227
228 ellConcat(&notify, &pool->owned);
229 ellConcat(&notify, &pool->jobs);
230
231 epicsMutexUnlock(pool->guard);
232
233 if(nThr && epicsEventWait(pool->shutdownEvent)!=epicsEventWaitOK){
234 errlogMessage("epicsThreadPoolDestroy: wait error");
235 return;
236 }
237
238 /* all workers are now shutdown */
239
240 /* notify remaining jobs that pool is being destroyed */
241 while( (cur=ellGet(&notify))!=NULL )
242 {
243 epicsJob *job=CONTAINER(cur, epicsJob, jobnode);
244 job->running=1;
245 (*job->func)(job->arg, epicsJobModeCleanup);
246 job->running=0;
247 if(job->freewhendone)
248 free(job);
249 else
250 job->pool=NULL; /* orphan */
251 }
252
253 epicsEventDestroy(pool->workerWakeup);
254 epicsEventDestroy(pool->shutdownEvent);
255 epicsEventDestroy(pool->observerWakeup);
256 epicsMutexDestroy(pool->guard);
257
258 free(pool);
259}
260
261
262void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd)
263{
264 ELLNODE *cur;
265 epicsMutexMustLock(pool->guard);
266
267 fprintf(fd, "Thread Pool with %u/%u threads\n"
268 " running %d jobs with %u threads\n",
269 pool->threadsRunning,
270 pool->conf.maxThreads,
271 ellCount(&pool->jobs),
272 pool->threadsAreAwake);
273 if(pool->pauseadd)
274 fprintf(fd, " Inhibit queueing\n");
275 if(pool->pauserun)
276 fprintf(fd, " Pause workers\n");
277 if(pool->shutdown)
278 fprintf(fd, " Shutdown in progress\n");
279
280 for(cur=ellFirst(&pool->jobs); cur; cur=ellNext(cur))
281 {
282 epicsJob *job=CONTAINER(cur, epicsJob, jobnode);
283 fprintf(fd, " job %p func: %p, arg: %p ",
284 job, job->func,
285 job->arg);
286 if(job->queued)
287 fprintf(fd, "Queued ");
288 if(job->running)
289 fprintf(fd, "Running ");
290 if(job->freewhendone)
291 fprintf(fd, "Free ");
292 fprintf(fd, "\n");
293 }
294
295 epicsMutexUnlock(pool->guard);
296}
297
298unsigned int epicsThreadPoolNThreads(epicsThreadPool *pool)
299{
300 unsigned int ret;
301
302 epicsMutexMustLock(pool->guard);
303 ret=pool->threadsRunning;
304 epicsMutexUnlock(pool->guard);
305
306 return ret;
307}
308
309static
310ELLLIST sharedPools = ELLLIST_INIT;
311
312static
313epicsMutexId sharedPoolsGuard;
314
315static
316epicsThreadOnceId sharedPoolsOnce = EPICS_THREAD_ONCE_INIT;
317
318static
319void sharedPoolsInit(void* unused)
320{
321 sharedPoolsGuard = epicsMutexMustCreate();
322}
323
324epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts)
325{
326 ELLNODE *node;
327 epicsThreadPool *cur;
328 epicsThreadPoolConfig defopts;
329 size_t N=epicsThreadGetCPUs();
330
331 if(!opts) {
332 epicsThreadPoolConfigDefaults(&defopts);
333 opts = &defopts;
334 }
335 /* shared pools must have a minimum allowed number of workers.
336 * Use the number of CPU cores
337 */
338 if(opts->maxThreads<N)
339 opts->maxThreads=N;
340
341 epicsThreadOnce(&sharedPoolsOnce, &sharedPoolsInit, NULL);
342
343 epicsMutexMustLock(sharedPoolsGuard);
344
345 for(node=ellFirst(&sharedPools); node; node=ellNext(node))
346 {
347 cur=CONTAINER(node, epicsThreadPool, sharedNode);
348
349 /* Must have exactly the requested priority
350 * At least the requested max workers
351 * and at least the requested stack size
352 */
353 if(cur->conf.workerPriority != opts->workerPriority)
354 continue;
355 if(cur->conf.maxThreads < opts->maxThreads)
356 continue;
357 if(cur->conf.workerStack < opts->workerStack)
358 continue;
359
360 cur->sharedCount++;
361 assert(cur->sharedCount>0);
362 epicsMutexUnlock(sharedPoolsGuard);
363
364 epicsMutexMustLock(cur->guard);
365 *opts = cur->conf;
366 epicsMutexUnlock(cur->guard);
367 return cur;
368 }
369
370 cur=epicsThreadPoolCreate(opts);
371 if(!cur) {
372 epicsMutexUnlock(sharedPoolsGuard);
373 return NULL;
374 }
375 cur->sharedCount=1;
376
377 ellAdd(&sharedPools, &cur->sharedNode);
378 epicsMutexUnlock(sharedPoolsGuard);
379 return cur;
380}
381
382epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool)
383{
384 if(!pool)
385 return;
386
387 epicsMutexMustLock(sharedPoolsGuard);
388
389 assert(pool->sharedCount>0);
390
391 pool->sharedCount--;
392
393 if(pool->sharedCount==0) {
394 ellDelete(&sharedPools, &pool->sharedNode);
395 epicsThreadPoolDestroy(pool);
396 }
397
398 epicsMutexUnlock(sharedPoolsGuard);
399}
0400
=== modified file 'src/libCom/test/Makefile'
--- src/libCom/test/Makefile 2013-12-17 18:54:04 +0000
+++ src/libCom/test/Makefile 2014-07-29 16:22:24 +0000
@@ -12,6 +12,10 @@
1212
13PROD_LIBS += Com13PROD_LIBS += Com
1414
15TESTPROD_HOST += epicsThreadPoolTest
16epicsThreadPoolTest_SRCS += epicsThreadPoolTest.c
17TESTS += epicsThreadPoolTest
18
15TESTPROD_HOST += epicsUnitTestTest19TESTPROD_HOST += epicsUnitTestTest
16epicsUnitTestTest_SRCS += epicsUnitTestTest.c20epicsUnitTestTest_SRCS += epicsUnitTestTest.c
17# Not much point running this on vxWorks or RTEMS...21# Not much point running this on vxWorks or RTEMS...
1822
=== added file 'src/libCom/test/epicsThreadPoolTest.c'
--- src/libCom/test/epicsThreadPoolTest.c 1970-01-01 00:00:00 +0000
+++ src/libCom/test/epicsThreadPoolTest.c 2014-07-29 16:22:24 +0000
@@ -0,0 +1,443 @@
1/*************************************************************************\
2* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
3* Brookhaven National Laboratory.
4* EPICS BASE is distributed subject to a Software License Agreement found
5* in file LICENSE that is included with this distribution.
6\*************************************************************************/
7
8#include "epicsThreadPool.h"
9
10/* included to allow tests to peek */
11#include "../../pool/poolPriv.h"
12
13#include "testMain.h"
14#include "epicsUnitTest.h"
15
16#include "cantProceed.h"
17#include "epicsEvent.h"
18#include "epicsMutex.h"
19#include "epicsThread.h"
20
21/* Do nothing */
22static void nullop(void)
23{
24 epicsThreadPool *pool;
25 testDiag("nullop()");
26 {
27 epicsThreadPoolConfig conf;
28 epicsThreadPoolConfigDefaults(&conf);
29 testOk1(conf.maxThreads>0);
30
31 testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
32 if(!pool)
33 return;
34 }
35
36 epicsThreadPoolDestroy(pool);
37}
38
39/* Just create and destroy worker threads */
40static void oneop(void)
41{
42 epicsThreadPool *pool;
43 testDiag("oneop()");
44 {
45 epicsThreadPoolConfig conf;
46 epicsThreadPoolConfigDefaults(&conf);
47 conf.initialThreads=2;
48 testOk1(conf.maxThreads>0);
49
50 testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
51 if(!pool)
52 return;
53 }
54
55 epicsThreadPoolDestroy(pool);
56}
57
58/* Test that Bursts of jobs will create enough threads to
59 * run all in parallel
60 */
61typedef struct {
62 epicsMutexId guard;
63 unsigned int count;
64 epicsEventId allrunning;
65 epicsEventId done;
66 epicsJob **job;
67} countPriv;
68
69static void countjob(void *param, epicsJobMode mode)
70{
71 countPriv *cnt=param;
72 testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
73 if(mode==epicsJobModeCleanup)
74 return;
75
76 epicsMutexMustLock(cnt->guard);
77 testDiag("Job %lu", (unsigned long)cnt->count);
78 cnt->count--;
79 if(cnt->count==0) {
80 testDiag("All jobs running");
81 epicsEventSignal(cnt->allrunning);
82 }
83 epicsMutexUnlock(cnt->guard);
84
85 epicsEventMustWait(cnt->done);
86 epicsEventSignal(cnt->done); /* pass along to next thread */
87}
88
89/* Starts "mcnt" jobs in a pool with initial and max
90 * thread counts "icnt" and "mcnt".
91 * The test ensures that all jobs run in parallel.
92 * "cork" checks the function of pausing the run queue
93 * with epicsThreadPoolQueueRun
94 */
95static void postjobs(size_t icnt, size_t mcnt, int cork)
96{
97 size_t i;
98 epicsThreadPool *pool;
99 countPriv *priv=callocMustSucceed(1, sizeof(*priv), "postjobs priv alloc");
100 priv->guard=epicsMutexMustCreate();
101 priv->done=epicsEventMustCreate(epicsEventEmpty);
102 priv->allrunning=epicsEventMustCreate(epicsEventEmpty);
103 priv->count=mcnt;
104 priv->job=callocMustSucceed(mcnt, sizeof(*priv->job), "postjobs job array");
105
106 testDiag("postjobs(%lu,%lu)", (unsigned long)icnt, (unsigned long)mcnt);
107
108 {
109 epicsThreadPoolConfig conf;
110 epicsThreadPoolConfigDefaults(&conf);
111 conf.initialThreads=icnt;
112 conf.maxThreads=mcnt;
113
114 testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
115 if(!pool)
116 return;
117 }
118
119 if(cork)
120 epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0);
121
122 for(i=0; i<mcnt; i++) {
123 testDiag("i=%lu", (unsigned long)i);
124 priv->job[i] = epicsJobCreate(pool, &countjob, priv);
125 testOk1(priv->job[i]!=NULL);
126 testOk1(epicsJobQueue(priv->job[i])==0);
127 }
128
129 if(cork) {
130 /* no jobs should have run */
131 epicsMutexMustLock(priv->guard);
132 testOk1(priv->count==mcnt);
133 epicsMutexUnlock(priv->guard);
134
135 epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1);
136 }
137
138 testDiag("Waiting for all jobs to start");
139 epicsEventMustWait(priv->allrunning);
140 testDiag("Stop all");
141 epicsEventSignal(priv->done);
142
143 for(i=0; i<mcnt; i++) {
144 testDiag("i=%lu", (unsigned long)i);
145 epicsJobDestroy(priv->job[i]);
146 }
147
148 epicsThreadPoolDestroy(pool);
149 epicsMutexDestroy(priv->guard);
150 epicsEventDestroy(priv->allrunning);
151 epicsEventDestroy(priv->done);
152 free(priv->job);
153 free(priv);
154}
155
156static unsigned int flag0 = 0;
157
158/* Test cancel from job (no-op)
159 * and destroy from job (lazy free)
160 */
161static void cleanupjob0(void* arg, epicsJobMode mode)
162{
163 epicsJob *job=arg;
164 testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
165 if(mode==epicsJobModeCleanup)
166 return;
167
168 assert(flag0==0);
169 flag0=1;
170
171 testOk1(epicsJobQueue(job)==0);
172
173 epicsJobDestroy(job); /* delete while job is running */
174}
175static void cleanupjob1(void* arg, epicsJobMode mode)
176{
177 epicsJob *job=arg;
178 testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
179 if(mode==epicsJobModeCleanup)
180 return;
181
182 testOk1(epicsJobQueue(job)==0);
183
184 testOk1(epicsJobUnqueue(job)==0);
185 /* delete later after job finishes, but before pool is destroyed */
186}
187static void cleanupjob2(void* arg, epicsJobMode mode)
188{
189 epicsJob *job=arg;
190 testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
191 if(mode==epicsJobModeCleanup)
192 epicsJobDestroy(job); /* delete when threadpool is destroyed */
193 else if(mode==epicsJobModeRun)
194 testOk1(epicsJobUnqueue(job)==1);
195}
196static epicsJobFunction cleanupjobs[3] = {&cleanupjob0,&cleanupjob1,&cleanupjob2};
197
198/* Tests three methods for job cleanup.
199 * 1. destroy which running
200 * 2. deferred cleanup after pool destroyed
201 * 3. immediate cleanup when pool destroyed
202 */
203static void testcleanup(void)
204{
205 int i=0;
206 epicsThreadPool *pool;
207 epicsJob *job[3];
208
209 testDiag("testcleanup()");
210
211 testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
212 if(!pool)
213 return;
214
215 /* unrolled so that valgrind can show which methods leaks */
216 testOk1((job[0]=epicsJobCreate(pool, cleanupjobs[0], EPICSJOB_SELF))!=NULL);
217 testOk1((job[1]=epicsJobCreate(pool, cleanupjobs[1], EPICSJOB_SELF))!=NULL);
218 testOk1((job[2]=epicsJobCreate(pool, cleanupjobs[2], EPICSJOB_SELF))!=NULL);
219 for(i=0; i<3; i++) {
220 testOk1(epicsJobQueue(job[i])==0);
221 }
222
223 epicsThreadPoolWait(pool, -1);
224 epicsJobDestroy(job[1]);
225 epicsThreadPoolDestroy(pool);
226}
227
228/* Test re-add from inside job */
229typedef struct {
230 unsigned int count;
231 epicsEventId done;
232 epicsJob *job;
233 unsigned int inprogress;
234} readdPriv;
235
236static void readdjob(void *arg, epicsJobMode mode)
237{
238 readdPriv *priv=arg;
239 testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
240 if(mode==epicsJobModeCleanup)
241 return;
242 testOk1(priv->inprogress==0);
243 testDiag("count==%u", priv->count);
244
245 if(priv->count--) {
246 priv->inprogress=1;
247 epicsJobQueue(priv->job);
248 epicsThreadSleep(0.05);
249 priv->inprogress=0;
250 }else{
251 epicsEventSignal(priv->done);
252 epicsJobDestroy(priv->job);
253 }
254}
255
256/* Test re-queueing a job while it is running.
257 * Check that a single job won't run concurrently.
258 */
259static void testreadd(void) {
260 epicsThreadPool *pool;
261 readdPriv *priv=callocMustSucceed(1, sizeof(*priv), "testcleanup priv");
262 readdPriv *priv2=callocMustSucceed(1, sizeof(*priv), "testcleanup priv");
263
264 testDiag("testreadd");
265
266 priv->done=epicsEventMustCreate(epicsEventEmpty);
267 priv->count=5;
268 priv2->done=epicsEventMustCreate(epicsEventEmpty);
269 priv2->count=5;
270
271 testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
272 if(!pool)
273 return;
274
275 testOk1((priv->job=epicsJobCreate(pool, &readdjob, priv))!=NULL);
276 testOk1((priv2->job=epicsJobCreate(pool, &readdjob, priv2))!=NULL);
277
278 testOk1(epicsJobQueue(priv->job)==0);
279 testOk1(epicsJobQueue(priv2->job)==0);
280 epicsEventMustWait(priv->done);
281 epicsEventMustWait(priv2->done);
282
283 testOk1(epicsThreadPoolNThreads(pool)==2);
284
285 epicsThreadPoolDestroy(pool);
286 epicsEventDestroy(priv->done);
287 epicsEventDestroy(priv2->done);
288 free(priv);
289 free(priv2);
290
291}
292
293static int shouldneverrun = 0;
294static int numtoolate = 0;
295
296/* test job canceling */
297static
298void neverrun(void *arg, epicsJobMode mode)
299{
300 epicsJob *job=arg;
301 testOk1(mode==epicsJobModeCleanup);
302 if(mode==epicsJobModeCleanup)
303 epicsJobDestroy(job);
304 else
305 shouldneverrun++;
306}
307static epicsEventId cancel[2];
308static
309void toolate(void *arg, epicsJobMode mode)
310{
311 epicsJob *job=arg;
312 if(mode==epicsJobModeCleanup){
313 epicsJobDestroy(job);
314 return;
315 }
316 testPass("Job runs");
317 numtoolate++;
318 epicsEventSignal(cancel[0]);
319 epicsEventMustWait(cancel[1]);
320}
321
322static
323void testcancel(void)
324{
325 epicsJob *job[2];
326 epicsThreadPool *pool;
327 testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
328 if(!pool)
329 return;
330
331 cancel[0]=epicsEventCreate(epicsEventEmpty);
332 cancel[1]=epicsEventCreate(epicsEventEmpty);
333
334 testOk1((job[0]=epicsJobCreate(pool, &neverrun, EPICSJOB_SELF))!=NULL);
335 testOk1((job[1]=epicsJobCreate(pool, &toolate, EPICSJOB_SELF))!=NULL);
336
337 /* freeze */
338 epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0);
339
340 testOk1(epicsJobUnqueue(job[0])==1); /* not queued yet */
341
342 epicsJobQueue(job[0]);
343 testOk1(epicsJobUnqueue(job[0])==0);
344 testOk1(epicsJobUnqueue(job[0])==1);
345
346 epicsThreadSleep(0.01);
347 epicsJobQueue(job[0]);
348 testOk1(epicsJobUnqueue(job[0])==0);
349 testOk1(epicsJobUnqueue(job[0])==1);
350
351 epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1);
352
353 epicsJobQueue(job[1]); /* actually let it run this time */
354
355 epicsEventMustWait(cancel[0]);
356 testOk1(epicsJobUnqueue(job[0])==1);
357 epicsEventSignal(cancel[1]);
358
359 epicsThreadPoolDestroy(pool);
360 epicsEventDestroy(cancel[0]);
361 epicsEventDestroy(cancel[1]);
362
363 testOk1(shouldneverrun==0);
364 testOk1(numtoolate==1);
365}
366
367static
368unsigned int sharedWasDeleted=0;
369
370static
371void lastjob(void *arg, epicsJobMode mode)
372{
373 epicsJob *job=arg;
374 if(mode==epicsJobModeCleanup) {
375 sharedWasDeleted=1;
376 epicsJobDestroy(job);
377 }
378}
379
380static
381void testshared(void)
382{
383 epicsThreadPool *poolA, *poolB;
384 epicsThreadPoolConfig conf;
385 epicsJob *job;
386
387 epicsThreadPoolConfigDefaults(&conf);
388
389 testDiag("Check reference counting of shared pools");
390
391 testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL);
392
393 testOk1(poolA->sharedCount==1);
394
395 testOk1((poolB=epicsThreadPoolGetShared(&conf))!=NULL);
396
397 testOk1(poolA==poolB);
398
399 testOk1(poolA->sharedCount==2);
400
401 epicsThreadPoolReleaseShared(poolA);
402
403 testOk1(poolB->sharedCount==1);
404
405 testOk1((job=epicsJobCreate(poolB, &lastjob, EPICSJOB_SELF))!=NULL);
406
407 epicsThreadPoolReleaseShared(poolB);
408
409 testOk1(sharedWasDeleted==1);
410
411 testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL);
412
413 testOk1(poolA->sharedCount==1);
414
415 epicsThreadPoolReleaseShared(poolA);
416
417}
418
419MAIN(epicsThreadPoolTest)
420{
421 testPlan(171);
422
423 nullop();
424 oneop();
425 testDiag("Queue with delayed start");
426 postjobs(1,1,1);
427 postjobs(0,1,1);
428 postjobs(4,4,1);
429 postjobs(0,4,1);
430 postjobs(2,4,1);
431 testDiag("Queue with immediate start");
432 postjobs(1,1,0);
433 postjobs(0,1,0);
434 postjobs(4,4,0);
435 postjobs(0,4,0);
436 postjobs(2,4,0);
437 testcleanup();
438 testreadd();
439 testcancel();
440 testshared();
441
442 return testDone();
443}

Subscribers

People subscribed via source and target branches

to all changes: