Merge lp:~epics-core/epics-base/thread-pool into lp:~epics-core/epics-base/3.15
- thread-pool
- Merge into 3.15
Status: | Merged |
---|---|
Merged at revision: | 12505 |
Proposed branch: | lp:~epics-core/epics-base/thread-pool |
Merge into: | lp:~epics-core/epics-base/3.15 |
Diff against target: |
1509 lines (+1446/-0) 9 files modified
documentation/RELEASE_NOTES.html (+8/-0) src/libCom/Makefile (+1/-0) src/libCom/pool/Makefile (+16/-0) src/libCom/pool/epicsThreadPool.h (+150/-0) src/libCom/pool/poolJob.c (+328/-0) src/libCom/pool/poolPriv.h (+97/-0) src/libCom/pool/threadPool.c (+399/-0) src/libCom/test/Makefile (+4/-0) src/libCom/test/epicsThreadPoolTest.c (+443/-0) |
To merge this branch: | bzr merge lp:~epics-core/epics-base/thread-pool |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Andrew Johnson | Approve | ||
mdavidsaver | Needs Resubmitting | ||
Review via email: mp+108385@code.launchpad.net |
Commit message
Description of the change
General purpose thread pool.
mdavidsaver (mdavidsaver) wrote : | # |
mdavidsaver (mdavidsaver) wrote : | # |
The current implementation for RTEMS uses epicsEventSignal in places where interrupts have been disabled in the belief that this disables all context switching. This is not so.
mdavidsaver (mdavidsaver) wrote : | # |
I've changed the implementation to use only mutex and abandoned plans to use spin locks.
Also, the API to report the number of cores is made private and will be replaced with Ralph's API after this is merged.
The error codes returned are changed to the usual form.
Ralph Lange (ralph-lange) wrote : | # |
Michael,
could you push the documentation to a branch in https:/
Thank you,
~Ralph
mdavidsaver (mdavidsaver) wrote : | # |
> could you push the documentation to a branch in https:/
> /epics-appdev ?
Done
lp:~mdavidsaver/epics-appdev/thread-pool
Also made some final cleanup of the API including the use enums instead of integers for the work functions mode and the epicsThreadPool
Andrew Johnson (anj) wrote : | # |
Please delete the _poolNumCores() implementations and use the new epicsThreadGetC
While you're doing that, you could change the M_com value to avoid the clash with the now-defined M_stdlib. However, you may not realize that you are allowed to use status values from errno.h instead of inventing your own if they mean the same thing; errSymLookup() calls strerror() when given a status value with modNum <= 500, so you can use errno values such as ENOMEM, ERANGE, ETIMEDOUT etc. directly as status values. Maybe avoid the more obscure values for portability reasons, but anything returned by the OS or defined in POSIX.1 should be fine.
As a result I'm not keen on adding the errCommon.h header. If you need status values that don't match anything already defined, I'd rather that you add an M_pool and define your own S_pool_xxx status values in epicsThreadPool.h
I have not looked at the pool code itself, but I am reading the AppDevGuide entry. More comments tomorrow.
mdavidsaver (mdavidsaver) wrote : | # |
> Please delete the _poolNumCores() implementations and use the new
> epicsThreadGetC
Ok.
> ... you'll either have to rebase or merge 3.15 into this branch to get it
Ok. (We'll see how launchpad copes with this)
> ... you may not realize that you are
> allowed to use status values from errno.h instead of inventing your own if
> they mean the same thing;
Indeed I didn't know this, and will try.
> Maybe avoid the more obscure values
But I want to use EILSEQ :)
mdavidsaver (mdavidsaver) wrote : | # |
> > ... you may not realize that you are
> > allowed to use status values from errno.h instead of inventing your own if
> > they mean the same thing;
>
> Indeed I didn't know this, and will try.
To do this should I use SOCK_EINVAL instead of EINVAL?
FYI I think I can get by with only EINVAL and EWOULDBLOCK.
Andrew Johnson (anj) wrote : | # |
On 07/24/2014 10:54 AM, mdavidsaver wrote:
> To do this should I use SOCK_EINVAL instead of EINVAL?
Since errlog uses strerror() to get the error string I think you have to
use EINVAL, which Microsoft supports at least back to MSVS-2003. I don't
know whether strerror() can handle WSAEINVAL, they don't say.
> FYI I think I can get by with only EINVAL and EWOULDBLOCK.
EWOULDBLOCK is listed in the MS documentation as supported by MSVS-2010
so that's fine too.
mdavidsaver (mdavidsaver) wrote : | # |
Ok, I think I just messed up trying to overwrite this branch with a re-based version.
Andrew Johnson (anj) wrote : | # |
I think Launchpad just got itself confused, but by setting the status back to Needs Review and waiting a little it seems to have recovered and I think it's now showing the right diff again.
Andrew Johnson (anj) wrote : | # |
Why are the thread and observer counters in struct epicsThreadPool all declared as size_t instead of unsigned integers? We'll never want even 2**32 threads or observers, so allowing for 2**64 on 64-bit systems seems silly. There is no printf format string for a size_t that works across all our OSs, and while your casting to unsigned long in epicsThreadPool
Shouldn't that EWOULDBLOCK be ETIMEDOUT? It has already blocked for the timeout period, and this status has that meaning in sem_wait, mq_send/receive etc.
mdavidsaver (mdavidsaver) wrote : | # |
> Why are the thread and observer counters in struct epicsThreadPool all
> declared as size_t
Its shorter :)
> Shouldn't that EWOULDBLOCK be ETIMEDOUT? It has already blocked for the
> timeout period, and this status has that meaning in sem_wait, mq_send/receive
> etc.
Good point. I'll change this.
- 12503. By mdavidsaver
-
thread pool: epicsJobQueue return EPERM
When pool control prevents operation
- 12504. By mdavidsaver
-
thread pool: handle failure to create worker
epicsJobQueue() returns EAGAIN when the first worker
can't be lazily created.Failure to create workers beyond the first is
silently ignored. - 12505. By mdavidsaver
-
thread pool: fix return of epicsJobUnqueue()
Return 0 on success (was queued, now is not),
1 if not queued initially, and EINVAL
if orphaned. - 12506. By mdavidsaver
-
thread pool: epicsThreadPoolWait return ETIMEOUT
- 12507. By mdavidsaver
-
thread pool: mark epicsJobCreate() as safe for job functions
Also, use epicsJobMove() to avoid some redundant code
mdavidsaver (mdavidsaver) wrote : | # |
Rebased and made requested changes. Removed S_com_* error codes in favor of E* code from errno.h.
Also improved handling when worker thread creation fails. This was treated as an error in epicsThreadPool
Failure to create workers after the first is not treated as an error.
This requires that user code needing to ensure that more than one worker is
running must explicitly test with epicsThreadPool
Andrew Johnson (anj) wrote : | # |
Your counters are still defined as size_t. Both epicsThreadPool
Strictly speaking the name _epicsJobArgSelf is reserved for the C implementation because it begins with an underscore and appears in the global namespace.
http://
_epicsThreadPoo
- 12508. By mdavidsaver
-
don't include errCommon.h
doesn't exist anymore
- 12509. By mdavidsaver
-
thread pool: switch thread counts to unsigned int
size_t is considered overly optimistic
- 12510. By mdavidsaver
-
thread pool: don't use reserved names
Avoid global symbols with leading underscore
mdavidsaver (mdavidsaver) wrote : | # |
Done.
Andrew Johnson (anj) wrote : | # |
Test 148 was failing on VxWorks, both 5.5.2 and 6.9, and should have failed on RTEMS too.
ok 147 - priv->inprogress==0
# count==0
not ok 148 - epicsThreadPool
ok 149 - (pool=epicsThre
epicsThreadPool
I added an epicsThreadPool
Preview Diff
1 | === modified file 'documentation/RELEASE_NOTES.html' | |||
2 | --- documentation/RELEASE_NOTES.html 2014-06-12 19:47:42 +0000 | |||
3 | +++ documentation/RELEASE_NOTES.html 2014-07-29 16:22:24 +0000 | |||
4 | @@ -15,6 +15,14 @@ | |||
5 | 15 | <h2 align="center">Changes between 3.15.0.1 and 3.15.0.2</h2> | 15 | <h2 align="center">Changes between 3.15.0.1 and 3.15.0.2</h2> |
6 | 16 | <!-- Insert new items immediately below here ... --> | 16 | <!-- Insert new items immediately below here ... --> |
7 | 17 | 17 | ||
8 | 18 | <h3> | ||
9 | 19 | General purpose thread pool</h3> | ||
10 | 20 | |||
11 | 21 | <p> | ||
12 | 22 | A general purpose threaded work queue API epicsThreadPool is added. | ||
13 | 23 | Multiple pools can be created with controlable priority and number | ||
14 | 24 | of worker threads. Lazy worker startup is supported.</p> | ||
15 | 25 | |||
16 | 18 | <h3>Database field setting updates</h3> | 26 | <h3>Database field setting updates</h3> |
17 | 19 | 27 | ||
18 | 20 | <p>A database (.db) file loaded by an IOC does not have to repeat the record | 28 | <p>A database (.db) file loaded by an IOC does not have to repeat the record |
19 | 21 | 29 | ||
20 | === modified file 'src/libCom/Makefile' | |||
21 | --- src/libCom/Makefile 2012-04-27 17:21:29 +0000 | |||
22 | +++ src/libCom/Makefile 2014-07-29 16:22:24 +0000 | |||
23 | @@ -31,6 +31,7 @@ | |||
24 | 31 | include $(LIBCOM)/macLib/Makefile | 31 | include $(LIBCOM)/macLib/Makefile |
25 | 32 | include $(LIBCOM)/misc/Makefile | 32 | include $(LIBCOM)/misc/Makefile |
26 | 33 | include $(LIBCOM)/osi/Makefile | 33 | include $(LIBCOM)/osi/Makefile |
27 | 34 | include $(LIBCOM)/pool/Makefile | ||
28 | 34 | include $(LIBCOM)/ring/Makefile | 35 | include $(LIBCOM)/ring/Makefile |
29 | 35 | include $(LIBCOM)/taskwd/Makefile | 36 | include $(LIBCOM)/taskwd/Makefile |
30 | 36 | include $(LIBCOM)/timer/Makefile | 37 | include $(LIBCOM)/timer/Makefile |
31 | 37 | 38 | ||
32 | === added directory 'src/libCom/pool' | |||
33 | === added file 'src/libCom/pool/Makefile' | |||
34 | --- src/libCom/pool/Makefile 1970-01-01 00:00:00 +0000 | |||
35 | +++ src/libCom/pool/Makefile 2014-07-29 16:22:24 +0000 | |||
36 | @@ -0,0 +1,16 @@ | |||
37 | 1 | #************************************************************************* | ||
38 | 2 | # Copyright (c) 2014 UChicago Argonne LLC, as Operator of Argonne | ||
39 | 3 | # National Laboratory. | ||
40 | 4 | # EPICS BASE is distributed subject to a Software License Agreement found | ||
41 | 5 | # in file LICENSE that is included with this distribution. | ||
42 | 6 | #************************************************************************* | ||
43 | 7 | |||
44 | 8 | # This is a Makefile fragment, see src/libCom/Makefile. | ||
45 | 9 | |||
46 | 10 | SRC_DIRS += $(LIBCOM)/pool | ||
47 | 11 | |||
48 | 12 | INC += epicsThreadPool.h | ||
49 | 13 | |||
50 | 14 | Com_SRCS += poolJob.c | ||
51 | 15 | Com_SRCS += threadPool.c | ||
52 | 16 | |||
53 | 0 | 17 | ||
54 | === added file 'src/libCom/pool/epicsThreadPool.h' | |||
55 | --- src/libCom/pool/epicsThreadPool.h 1970-01-01 00:00:00 +0000 | |||
56 | +++ src/libCom/pool/epicsThreadPool.h 2014-07-29 16:22:24 +0000 | |||
57 | @@ -0,0 +1,150 @@ | |||
58 | 1 | /*************************************************************************\ | ||
59 | 2 | * Copyright (c) 2014 Brookhaven Science Associates, as Operator of | ||
60 | 3 | * Brookhaven National Laboratory. | ||
61 | 4 | * EPICS BASE is distributed subject to a Software License Agreement found | ||
62 | 5 | * in file LICENSE that is included with this distribution. | ||
63 | 6 | \*************************************************************************/ | ||
64 | 7 | /* General purpose worker thread pool manager | ||
65 | 8 | * mdavidsaver@bnl.gov | ||
66 | 9 | */ | ||
67 | 10 | #ifndef EPICSTHREADPOOL_H | ||
68 | 11 | #define EPICSTHREADPOOL_H | ||
69 | 12 | |||
70 | 13 | #include <stdlib.h> | ||
71 | 14 | #include <stdio.h> | ||
72 | 15 | |||
73 | 16 | #include "shareLib.h" | ||
74 | 17 | |||
75 | 18 | #ifdef __cplusplus | ||
76 | 19 | extern "C" { | ||
77 | 20 | #endif | ||
78 | 21 | |||
79 | 22 | typedef struct { | ||
80 | 23 | unsigned int initialThreads; | ||
81 | 24 | unsigned int maxThreads; | ||
82 | 25 | unsigned int workerStack; | ||
83 | 26 | unsigned int workerPriority; | ||
84 | 27 | } epicsThreadPoolConfig; | ||
85 | 28 | |||
86 | 29 | typedef struct epicsThreadPool epicsThreadPool; | ||
87 | 30 | |||
88 | 31 | /* Job function call modes */ | ||
89 | 32 | typedef enum { | ||
90 | 33 | /* Normal run of job */ | ||
91 | 34 | epicsJobModeRun, | ||
92 | 35 | /* Thread pool is being destroyed. | ||
93 | 36 | * A chance to cleanup the job immediately with epicsJobDestroy(). | ||
94 | 37 | * If ignored, the job is orphaned (dissociated from the thread pool) | ||
95 | 38 | * and epicsJobDestroy() must be called later. | ||
96 | 39 | */ | ||
97 | 40 | epicsJobModeCleanup | ||
98 | 41 | } epicsJobMode; | ||
99 | 42 | |||
100 | 43 | typedef void (*epicsJobFunction)(void* arg, epicsJobMode mode); | ||
101 | 44 | |||
102 | 45 | typedef struct epicsJob epicsJob; | ||
103 | 46 | |||
104 | 47 | /* Pool operations */ | ||
105 | 48 | |||
106 | 49 | /* Initialize a pool config with default values. | ||
107 | 50 | * This much be done to preserve future compatibility | ||
108 | 51 | * when new options are added. | ||
109 | 52 | */ | ||
110 | 53 | epicsShareFunc void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *); | ||
111 | 54 | |||
112 | 55 | /* fetch or create a thread pool which can be shared with other users. | ||
113 | 56 | * may return NULL for allocation failures | ||
114 | 57 | */ | ||
115 | 58 | epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts); | ||
116 | 59 | epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool); | ||
117 | 60 | |||
118 | 61 | /* If opts is NULL then defaults are used. | ||
119 | 62 | * The opts pointer is not stored by this call, and may exist on the stack. | ||
120 | 63 | */ | ||
121 | 64 | epicsShareFunc epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts); | ||
122 | 65 | |||
123 | 66 | /* Blocks until all worker threads have stopped. | ||
124 | 67 | * Any jobs still attached to this pool receive a callback with EPICSJOB_CLEANUP | ||
125 | 68 | * and are then orphaned. | ||
126 | 69 | */ | ||
127 | 70 | epicsShareFunc void epicsThreadPoolDestroy(epicsThreadPool *); | ||
128 | 71 | |||
129 | 72 | /* pool control options */ | ||
130 | 73 | typedef enum { | ||
131 | 74 | epicsThreadPoolQueueAdd, /* val==0 causes epicsJobQueue to fail, 1 is default */ | ||
132 | 75 | epicsThreadPoolQueueRun /* val==0 prevents workers from running jobs, 1 is default */ | ||
133 | 76 | } epicsThreadPoolOption; | ||
134 | 77 | epicsShareFunc void epicsThreadPoolControl(epicsThreadPool* pool, | ||
135 | 78 | epicsThreadPoolOption opt, | ||
136 | 79 | unsigned int val); | ||
137 | 80 | |||
138 | 81 | /* Block until job queue is emptied and no jobs are running. | ||
139 | 82 | * Useful after calling epicsThreadPoolControl() with option epicsThreadPoolQueueAdd=0 | ||
140 | 83 | * | ||
141 | 84 | * timeout<0 waits forever, timeout==0 polls, timeout>0 waits at most one timeout period | ||
142 | 85 | * Returns 0 for success or non-zero on error (timeout is ETIMEOUT) | ||
143 | 86 | */ | ||
144 | 87 | epicsShareFunc int epicsThreadPoolWait(epicsThreadPool* pool, double timeout); | ||
145 | 88 | |||
146 | 89 | |||
147 | 90 | /* Per job operations */ | ||
148 | 91 | |||
149 | 92 | /* special flag for epicsJobCreate(). | ||
150 | 93 | * When passed as the third argument "user" | ||
151 | 94 | * the argument passed to the job callback | ||
152 | 95 | * will be the epicsJob* | ||
153 | 96 | */ | ||
154 | 97 | #define EPICSJOB_SELF epicsJobArgSelfMagic | ||
155 | 98 | epicsShareExtern void* epicsJobArgSelfMagic; | ||
156 | 99 | |||
157 | 100 | /* creates, but does not add, a new job. | ||
158 | 101 | * If pool in NULL then the job is not associated with any pool and | ||
159 | 102 | * epicsJobMove() must be called before epicsJobQueue() | ||
160 | 103 | * Safe to call from a running job function. | ||
161 | 104 | * returns a new job pointer, or NULL on error | ||
162 | 105 | */ | ||
163 | 106 | epicsShareFunc epicsJob* epicsJobCreate(epicsThreadPool* pool, | ||
164 | 107 | epicsJobFunction cb, | ||
165 | 108 | void* user); | ||
166 | 109 | |||
167 | 110 | /* Cancel and free a job structure. Does not block. | ||
168 | 111 | * job may not be immediately free'd. | ||
169 | 112 | * Safe to call from a running job function. | ||
170 | 113 | */ | ||
171 | 114 | epicsShareFunc void epicsJobDestroy(epicsJob*); | ||
172 | 115 | |||
173 | 116 | /* Move the job to a different pool. | ||
174 | 117 | * If pool is NULL then the job will no longer be associated | ||
175 | 118 | * with any pool. | ||
176 | 119 | * Not thread safe. Job must not be running or queued. | ||
177 | 120 | * returns 0 on error, and non-zero on error | ||
178 | 121 | */ | ||
179 | 122 | epicsShareFunc int epicsJobMove(epicsJob* job, epicsThreadPool* pool); | ||
180 | 123 | |||
181 | 124 | /* Adds the job to the run queue | ||
182 | 125 | * Safe to call from a running job function. | ||
183 | 126 | * returns 0 for success, non-zero on error. | ||
184 | 127 | */ | ||
185 | 128 | epicsShareFunc int epicsJobQueue(epicsJob*); | ||
186 | 129 | |||
187 | 130 | /* Remove a job from the run queue if it is queued. | ||
188 | 131 | * Safe to call from a running job function. | ||
189 | 132 | * returns 0 if job was queued and now is not. | ||
190 | 133 | * 1 if job already ran, is running, or was not queued before, | ||
191 | 134 | * Other non-zero on error | ||
192 | 135 | */ | ||
193 | 136 | epicsShareFunc int epicsJobUnqueue(epicsJob*); | ||
194 | 137 | |||
195 | 138 | |||
196 | 139 | /* Mostly useful for debugging */ | ||
197 | 140 | |||
198 | 141 | epicsShareFunc void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd); | ||
199 | 142 | |||
200 | 143 | /* Current number of active workers. May be less than the maximum */ | ||
201 | 144 | epicsShareFunc unsigned int epicsThreadPoolNThreads(epicsThreadPool *); | ||
202 | 145 | |||
203 | 146 | #ifdef __cplusplus | ||
204 | 147 | } | ||
205 | 148 | #endif | ||
206 | 149 | |||
207 | 150 | #endif // EPICSTHREADPOOL_H | ||
208 | 0 | 151 | ||
209 | === added file 'src/libCom/pool/poolJob.c' | |||
210 | --- src/libCom/pool/poolJob.c 1970-01-01 00:00:00 +0000 | |||
211 | +++ src/libCom/pool/poolJob.c 2014-07-29 16:22:24 +0000 | |||
212 | @@ -0,0 +1,328 @@ | |||
213 | 1 | /*************************************************************************\ | ||
214 | 2 | * Copyright (c) 2014 Brookhaven Science Associates, as Operator of | ||
215 | 3 | * Brookhaven National Laboratory. | ||
216 | 4 | * EPICS BASE is distributed subject to a Software License Agreement found | ||
217 | 5 | * in file LICENSE that is included with this distribution. | ||
218 | 6 | \*************************************************************************/ | ||
219 | 7 | |||
220 | 8 | #include <stdlib.h> | ||
221 | 9 | #include <string.h> | ||
222 | 10 | #include <errno.h> | ||
223 | 11 | |||
224 | 12 | #define epicsExportSharedSymbols | ||
225 | 13 | |||
226 | 14 | #include "dbDefs.h" | ||
227 | 15 | #include "errlog.h" | ||
228 | 16 | #include "ellLib.h" | ||
229 | 17 | #include "epicsThread.h" | ||
230 | 18 | #include "epicsMutex.h" | ||
231 | 19 | #include "epicsEvent.h" | ||
232 | 20 | #include "epicsInterrupt.h" | ||
233 | 21 | |||
234 | 22 | #include "epicsThreadPool.h" | ||
235 | 23 | #include "poolPriv.h" | ||
236 | 24 | |||
237 | 25 | void* epicsJobArgSelfMagic = &epicsJobArgSelfMagic; | ||
238 | 26 | |||
239 | 27 | static | ||
240 | 28 | void workerMain(void* arg) | ||
241 | 29 | { | ||
242 | 30 | epicsThreadPool *pool=arg; | ||
243 | 31 | |||
244 | 32 | /* workers are created with counts | ||
245 | 33 | * in the running, sleeping, and (possibly) waking counters | ||
246 | 34 | */ | ||
247 | 35 | |||
248 | 36 | epicsMutexMustLock(pool->guard); | ||
249 | 37 | pool->threadsAreAwake++; | ||
250 | 38 | pool->threadsSleeping--; | ||
251 | 39 | |||
252 | 40 | while(1) | ||
253 | 41 | { | ||
254 | 42 | ELLNODE *cur; | ||
255 | 43 | epicsJob *job; | ||
256 | 44 | |||
257 | 45 | pool->threadsAreAwake--; | ||
258 | 46 | pool->threadsSleeping++; | ||
259 | 47 | epicsMutexUnlock(pool->guard); | ||
260 | 48 | |||
261 | 49 | epicsEventMustWait(pool->workerWakeup); | ||
262 | 50 | |||
263 | 51 | epicsMutexMustLock(pool->guard); | ||
264 | 52 | pool->threadsSleeping--; | ||
265 | 53 | pool->threadsAreAwake++; | ||
266 | 54 | |||
267 | 55 | if(pool->threadsWaking==0) | ||
268 | 56 | continue; | ||
269 | 57 | |||
270 | 58 | pool->threadsWaking--; | ||
271 | 59 | |||
272 | 60 | CHECKCOUNT(pool); | ||
273 | 61 | |||
274 | 62 | if(pool->shutdown) | ||
275 | 63 | break; | ||
276 | 64 | |||
277 | 65 | if(pool->pauserun) | ||
278 | 66 | continue; | ||
279 | 67 | |||
280 | 68 | /* more threads to wakeup */ | ||
281 | 69 | if(pool->threadsWaking) | ||
282 | 70 | { | ||
283 | 71 | epicsEventSignal(pool->workerWakeup); | ||
284 | 72 | } | ||
285 | 73 | |||
286 | 74 | while ((cur=ellGet(&pool->jobs)) !=NULL) | ||
287 | 75 | { | ||
288 | 76 | job=CONTAINER(cur, epicsJob, jobnode); | ||
289 | 77 | |||
290 | 78 | assert(job->queued && !job->running); | ||
291 | 79 | |||
292 | 80 | job->queued=0; | ||
293 | 81 | job->running=1; | ||
294 | 82 | |||
295 | 83 | epicsMutexUnlock(pool->guard); | ||
296 | 84 | (*job->func)(job->arg, epicsJobModeRun); | ||
297 | 85 | epicsMutexMustLock(pool->guard); | ||
298 | 86 | |||
299 | 87 | if(job->freewhendone) { | ||
300 | 88 | job->dead=1; | ||
301 | 89 | free(job); | ||
302 | 90 | } else { | ||
303 | 91 | job->running=0; | ||
304 | 92 | /* job may be re-queued from within callback */ | ||
305 | 93 | if(job->queued) | ||
306 | 94 | ellAdd(&pool->jobs, &job->jobnode); | ||
307 | 95 | else | ||
308 | 96 | ellAdd(&pool->owned, &job->jobnode); | ||
309 | 97 | } | ||
310 | 98 | |||
311 | 99 | } | ||
312 | 100 | |||
313 | 101 | if(pool->observerCount) | ||
314 | 102 | epicsEventSignal(pool->observerWakeup); | ||
315 | 103 | } | ||
316 | 104 | |||
317 | 105 | pool->threadsAreAwake--; | ||
318 | 106 | pool->threadsRunning--; | ||
319 | 107 | |||
320 | 108 | { | ||
321 | 109 | size_t nrun = pool->threadsRunning, | ||
322 | 110 | ocnt = pool->observerCount; | ||
323 | 111 | epicsMutexUnlock(pool->guard); | ||
324 | 112 | |||
325 | 113 | if(ocnt) | ||
326 | 114 | epicsEventSignal(pool->observerWakeup); | ||
327 | 115 | |||
328 | 116 | if(nrun) | ||
329 | 117 | epicsEventSignal(pool->workerWakeup); /* pass along */ | ||
330 | 118 | else | ||
331 | 119 | epicsEventSignal(pool->shutdownEvent); | ||
332 | 120 | } | ||
333 | 121 | |||
334 | 122 | return; | ||
335 | 123 | } | ||
336 | 124 | |||
337 | 125 | int createPoolThread(epicsThreadPool *pool) | ||
338 | 126 | { | ||
339 | 127 | epicsThreadId tid; | ||
340 | 128 | |||
341 | 129 | tid = epicsThreadCreate("PoolWorker", | ||
342 | 130 | pool->conf.workerPriority, | ||
343 | 131 | pool->conf.workerStack, | ||
344 | 132 | &workerMain, | ||
345 | 133 | pool); | ||
346 | 134 | if(!tid) | ||
347 | 135 | return 1; | ||
348 | 136 | |||
349 | 137 | pool->threadsRunning++; | ||
350 | 138 | pool->threadsSleeping++; | ||
351 | 139 | return 0; | ||
352 | 140 | } | ||
353 | 141 | |||
354 | 142 | epicsJob* epicsJobCreate(epicsThreadPool* pool, | ||
355 | 143 | epicsJobFunction func, | ||
356 | 144 | void* arg) | ||
357 | 145 | { | ||
358 | 146 | epicsJob *job=calloc(1, sizeof(*job)); | ||
359 | 147 | |||
360 | 148 | if(!job) | ||
361 | 149 | return NULL; | ||
362 | 150 | |||
363 | 151 | if(arg==&epicsJobArgSelfMagic) | ||
364 | 152 | arg=job; | ||
365 | 153 | |||
366 | 154 | job->pool=NULL; | ||
367 | 155 | job->func=func; | ||
368 | 156 | job->arg=arg; | ||
369 | 157 | |||
370 | 158 | epicsJobMove(job, pool); | ||
371 | 159 | |||
372 | 160 | return job; | ||
373 | 161 | } | ||
374 | 162 | |||
375 | 163 | void epicsJobDestroy(epicsJob* job) | ||
376 | 164 | { | ||
377 | 165 | epicsThreadPool *pool; | ||
378 | 166 | if(!job || !job->pool){ | ||
379 | 167 | free(job); | ||
380 | 168 | return; | ||
381 | 169 | } | ||
382 | 170 | pool=job->pool; | ||
383 | 171 | |||
384 | 172 | epicsMutexMustLock(pool->guard); | ||
385 | 173 | |||
386 | 174 | assert(!job->dead); | ||
387 | 175 | |||
388 | 176 | epicsJobUnqueue(job); | ||
389 | 177 | |||
390 | 178 | if(job->running || job->freewhendone) { | ||
391 | 179 | job->freewhendone=1; | ||
392 | 180 | } else { | ||
393 | 181 | ellDelete(&pool->owned, &job->jobnode); | ||
394 | 182 | job->dead=1; | ||
395 | 183 | free(job); | ||
396 | 184 | } | ||
397 | 185 | |||
398 | 186 | epicsMutexUnlock(pool->guard); | ||
399 | 187 | } | ||
400 | 188 | |||
401 | 189 | int epicsJobMove(epicsJob* job, epicsThreadPool* newpool) | ||
402 | 190 | { | ||
403 | 191 | epicsThreadPool *pool=job->pool; | ||
404 | 192 | |||
405 | 193 | /* remove from current pool */ | ||
406 | 194 | if(pool) { | ||
407 | 195 | epicsMutexMustLock(pool->guard); | ||
408 | 196 | |||
409 | 197 | if(job->queued || job->running) { | ||
410 | 198 | epicsMutexUnlock(pool->guard); | ||
411 | 199 | return EINVAL; | ||
412 | 200 | } | ||
413 | 201 | |||
414 | 202 | ellDelete(&pool->owned, &job->jobnode); | ||
415 | 203 | |||
416 | 204 | epicsMutexUnlock(pool->guard); | ||
417 | 205 | } | ||
418 | 206 | |||
419 | 207 | pool = job->pool = newpool; | ||
420 | 208 | |||
421 | 209 | /* add to new pool */ | ||
422 | 210 | if(pool) { | ||
423 | 211 | epicsMutexMustLock(pool->guard); | ||
424 | 212 | |||
425 | 213 | ellAdd(&pool->owned, &job->jobnode); | ||
426 | 214 | |||
427 | 215 | epicsMutexUnlock(pool->guard); | ||
428 | 216 | } | ||
429 | 217 | |||
430 | 218 | return 0; | ||
431 | 219 | } | ||
432 | 220 | |||
433 | 221 | int epicsJobQueue(epicsJob* job) | ||
434 | 222 | { | ||
435 | 223 | int ret=0; | ||
436 | 224 | epicsThreadPool *pool=job->pool; | ||
437 | 225 | if(!pool) | ||
438 | 226 | return EINVAL; | ||
439 | 227 | |||
440 | 228 | epicsMutexMustLock(pool->guard); | ||
441 | 229 | |||
442 | 230 | assert(!job->dead); | ||
443 | 231 | |||
444 | 232 | if(pool->pauseadd) { | ||
445 | 233 | ret=EPERM; | ||
446 | 234 | goto done; | ||
447 | 235 | } else if(job->freewhendone) { | ||
448 | 236 | ret=EINVAL; | ||
449 | 237 | goto done; | ||
450 | 238 | } else if(job->queued) { | ||
451 | 239 | goto done; | ||
452 | 240 | } | ||
453 | 241 | |||
454 | 242 | job->queued=1; | ||
455 | 243 | /* Job may be queued from within a callback */ | ||
456 | 244 | if(!job->running) { | ||
457 | 245 | ellDelete(&pool->owned, &job->jobnode); | ||
458 | 246 | ellAdd(&pool->jobs, &job->jobnode); | ||
459 | 247 | } else { | ||
460 | 248 | /* some worker will find it again before sleeping */ | ||
461 | 249 | goto done; | ||
462 | 250 | } | ||
463 | 251 | |||
464 | 252 | /* Since we hold the lock, we can be certain that all awake worker are | ||
465 | 253 | * executing work functions. The current thread may be a worker. | ||
466 | 254 | * We prefer to wakeup a new worker rather then wait for a busy worker to | ||
467 | 255 | * finish. However, after we initiate a wakeup there will be a race | ||
468 | 256 | * between the worker waking up, and a busy worker finishing. | ||
469 | 257 | * Thus we can't avoid spurious wakeups. | ||
470 | 258 | */ | ||
471 | 259 | |||
472 | 260 | if(pool->threadsRunning >= pool->conf.maxThreads) { | ||
473 | 261 | /* all workers created... */ | ||
474 | 262 | /* ... but some are sleeping, so wake one up */ | ||
475 | 263 | if(pool->threadsWaking < pool->threadsSleeping) { | ||
476 | 264 | pool->threadsWaking++; | ||
477 | 265 | epicsEventSignal(pool->workerWakeup); | ||
478 | 266 | } | ||
479 | 267 | /*else one of the running workers will find this job before sleeping */ | ||
480 | 268 | CHECKCOUNT(pool); | ||
481 | 269 | |||
482 | 270 | } else { | ||
483 | 271 | /* could create more workers so | ||
484 | 272 | * will either create a new worker, or wakeup an existing worker | ||
485 | 273 | */ | ||
486 | 274 | |||
487 | 275 | if(pool->threadsWaking >= pool->threadsSleeping) { | ||
488 | 276 | /* all sleeping workers have already been woken. | ||
489 | 277 | * start a new worker for this job | ||
490 | 278 | */ | ||
491 | 279 | if(createPoolThread(pool) && pool->threadsRunning==0) { | ||
492 | 280 | /* oops, we couldn't lazy create our first worker | ||
493 | 281 | * so this job would never run! | ||
494 | 282 | */ | ||
495 | 283 | ret = EAGAIN; | ||
496 | 284 | job->queued = 0; | ||
497 | 285 | /* if threadsRunning==0 then no jobs can be running */ | ||
498 | 286 | assert(!job->running); | ||
499 | 287 | ellDelete(&pool->jobs, &job->jobnode); | ||
500 | 288 | ellAdd(&pool->owned, &job->jobnode); | ||
501 | 289 | } | ||
502 | 290 | } | ||
503 | 291 | if(ret==0) { | ||
504 | 292 | pool->threadsWaking++; | ||
505 | 293 | epicsEventSignal(pool->workerWakeup); | ||
506 | 294 | } | ||
507 | 295 | CHECKCOUNT(pool); | ||
508 | 296 | } | ||
509 | 297 | |||
510 | 298 | done: | ||
511 | 299 | epicsMutexUnlock(pool->guard); | ||
512 | 300 | return ret; | ||
513 | 301 | } | ||
514 | 302 | |||
515 | 303 | int epicsJobUnqueue(epicsJob* job) | ||
516 | 304 | { | ||
517 | 305 | int ret=1; | ||
518 | 306 | epicsThreadPool *pool=job->pool; | ||
519 | 307 | |||
520 | 308 | if(!pool) | ||
521 | 309 | return EINVAL; | ||
522 | 310 | |||
523 | 311 | epicsMutexMustLock(pool->guard); | ||
524 | 312 | |||
525 | 313 | assert(!job->dead); | ||
526 | 314 | |||
527 | 315 | if(job->queued) { | ||
528 | 316 | if(!job->running) { | ||
529 | 317 | ellDelete(&pool->jobs, &job->jobnode); | ||
530 | 318 | ellAdd(&pool->owned, &job->jobnode); | ||
531 | 319 | } | ||
532 | 320 | job->queued=0; | ||
533 | 321 | ret=0; | ||
534 | 322 | } | ||
535 | 323 | |||
536 | 324 | epicsMutexUnlock(pool->guard); | ||
537 | 325 | |||
538 | 326 | return ret; | ||
539 | 327 | } | ||
540 | 328 | |||
541 | 0 | 329 | ||
542 | === added file 'src/libCom/pool/poolPriv.h' | |||
543 | --- src/libCom/pool/poolPriv.h 1970-01-01 00:00:00 +0000 | |||
544 | +++ src/libCom/pool/poolPriv.h 2014-07-29 16:22:24 +0000 | |||
545 | @@ -0,0 +1,97 @@ | |||
546 | 1 | /*************************************************************************\ | ||
547 | 2 | * Copyright (c) 2014 Brookhaven Science Associates, as Operator of | ||
548 | 3 | * Brookhaven National Laboratory. | ||
549 | 4 | * EPICS BASE is distributed subject to a Software License Agreement found | ||
550 | 5 | * in file LICENSE that is included with this distribution. | ||
551 | 6 | \*************************************************************************/ | ||
552 | 7 | |||
553 | 8 | #ifndef POOLPRIV_H | ||
554 | 9 | #define POOLPRIV_H | ||
555 | 10 | |||
556 | 11 | #include "epicsThreadPool.h" | ||
557 | 12 | #include "ellLib.h" | ||
558 | 13 | #include "epicsThread.h" | ||
559 | 14 | #include "epicsEvent.h" | ||
560 | 15 | #include "epicsMutex.h" | ||
561 | 16 | |||
562 | 17 | struct epicsThreadPool { | ||
563 | 18 | ELLNODE sharedNode; | ||
564 | 19 | size_t sharedCount; | ||
565 | 20 | |||
566 | 21 | ELLLIST jobs; /* run queue */ | ||
567 | 22 | ELLLIST owned; /* unqueued jobs. */ | ||
568 | 23 | |||
569 | 24 | /* Worker state counters. | ||
570 | 25 | * The life cycle of a worker is | ||
571 | 26 | * Wakeup -> Awake -> Sleeping | ||
572 | 27 | * Newly created workers go into the wakeup state | ||
573 | 28 | */ | ||
574 | 29 | |||
575 | 30 | /* # of running workers which are not waiting for a wakeup event */ | ||
576 | 31 | unsigned int threadsAreAwake; | ||
577 | 32 | /* # of sleeping workers which need to be awakened */ | ||
578 | 33 | unsigned int threadsWaking; | ||
579 | 34 | /* # of workers waiting on the workerWakeup event */ | ||
580 | 35 | unsigned int threadsSleeping; | ||
581 | 36 | /* # of threads started and not stopped */ | ||
582 | 37 | unsigned int threadsRunning; | ||
583 | 38 | |||
584 | 39 | /* # of observers waiting on pool events */ | ||
585 | 40 | unsigned int observerCount; | ||
586 | 41 | |||
587 | 42 | epicsEventId workerWakeup; | ||
588 | 43 | epicsEventId shutdownEvent; | ||
589 | 44 | |||
590 | 45 | epicsEventId observerWakeup; | ||
591 | 46 | |||
592 | 47 | /* Disallow epicsJobQueue */ | ||
593 | 48 | unsigned int pauseadd:1; | ||
594 | 49 | /* Prevent workers from running new jobs */ | ||
595 | 50 | unsigned int pauserun:1; | ||
596 | 51 | /* Prevent further changes to pool options */ | ||
597 | 52 | unsigned int freezeopt:1; | ||
598 | 53 | /* tell workers to exit */ | ||
599 | 54 | unsigned int shutdown:1; | ||
600 | 55 | |||
601 | 56 | epicsMutexId guard; | ||
602 | 57 | |||
603 | 58 | /* copy of config passed when created */ | ||
604 | 59 | epicsThreadPoolConfig conf; | ||
605 | 60 | }; | ||
606 | 61 | |||
607 | 62 | /* Called after manipulating counters to check that invariants are preserved */ | ||
608 | 63 | #define CHECKCOUNT(pPool) do{if(!(pPool)->shutdown) { \ | ||
609 | 64 | assert( (pPool)->threadsAreAwake + (pPool)->threadsSleeping == (pPool)->threadsRunning ); \ | ||
610 | 65 | assert( (pPool)->threadsWaking <= (pPool)->threadsSleeping ); \ | ||
611 | 66 | }}while(0) | ||
612 | 67 | |||
613 | 68 | /* When created a job is idle. queued and running are false | ||
614 | 69 | * and jobnode is in the thread pool's owned list. | ||
615 | 70 | * | ||
616 | 71 | * When the job is added, the queued flag is set and jobnode | ||
617 | 72 | * is in the jobs list. | ||
618 | 73 | * | ||
619 | 74 | * When the job starts running the queued flag is cleared and | ||
620 | 75 | * the running flag is set. jobnode is not in any list | ||
621 | 76 | * (held locally by worker). | ||
622 | 77 | * | ||
623 | 78 | * When the job has finished running, the running flag is cleared. | ||
624 | 79 | * The queued flag may be set if the job re-added itself. | ||
625 | 80 | * Based on the queued flag jobnode is added to the appropriate | ||
626 | 81 | * list. | ||
627 | 82 | */ | ||
628 | 83 | struct epicsJob { | ||
629 | 84 | ELLNODE jobnode; | ||
630 | 85 | epicsJobFunction func; | ||
631 | 86 | void *arg; | ||
632 | 87 | epicsThreadPool *pool; | ||
633 | 88 | |||
634 | 89 | unsigned int queued:1; | ||
635 | 90 | unsigned int running:1; | ||
636 | 91 | unsigned int freewhendone:1; /* lazy delete of running job */ | ||
637 | 92 | unsigned int dead:1; /* flag to catch use of freed objects */ | ||
638 | 93 | }; | ||
639 | 94 | |||
640 | 95 | int createPoolThread(epicsThreadPool *pool); | ||
641 | 96 | |||
642 | 97 | #endif // POOLPRIV_H | ||
643 | 0 | 98 | ||
644 | === added file 'src/libCom/pool/threadPool.c' | |||
645 | --- src/libCom/pool/threadPool.c 1970-01-01 00:00:00 +0000 | |||
646 | +++ src/libCom/pool/threadPool.c 2014-07-29 16:22:24 +0000 | |||
647 | @@ -0,0 +1,399 @@ | |||
648 | 1 | /*************************************************************************\ | ||
649 | 2 | * Copyright (c) 2014 Brookhaven Science Associates, as Operator of | ||
650 | 3 | * Brookhaven National Laboratory. | ||
651 | 4 | * EPICS BASE is distributed subject to a Software License Agreement found | ||
652 | 5 | * in file LICENSE that is included with this distribution. | ||
653 | 6 | \*************************************************************************/ | ||
654 | 7 | |||
655 | 8 | #include <stdlib.h> | ||
656 | 9 | #include <string.h> | ||
657 | 10 | #include <errno.h> | ||
658 | 11 | |||
659 | 12 | #define epicsExportSharedSymbols | ||
660 | 13 | |||
661 | 14 | #include "dbDefs.h" | ||
662 | 15 | #include "errlog.h" | ||
663 | 16 | #include "ellLib.h" | ||
664 | 17 | #include "epicsThread.h" | ||
665 | 18 | #include "epicsMutex.h" | ||
666 | 19 | #include "epicsEvent.h" | ||
667 | 20 | #include "epicsInterrupt.h" | ||
668 | 21 | #include "cantProceed.h" | ||
669 | 22 | |||
670 | 23 | #include "epicsThreadPool.h" | ||
671 | 24 | #include "poolPriv.h" | ||
672 | 25 | |||
673 | 26 | |||
674 | 27 | void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts) | ||
675 | 28 | { | ||
676 | 29 | memset(opts, 0, sizeof(*opts)); | ||
677 | 30 | opts->maxThreads=epicsThreadGetCPUs(); | ||
678 | 31 | opts->workerStack=epicsThreadGetStackSize(epicsThreadStackSmall); | ||
679 | 32 | |||
680 | 33 | if(epicsThreadLowestPriorityLevelAbove(epicsThreadPriorityCAServerHigh, &opts->workerPriority) | ||
681 | 34 | !=epicsThreadBooleanStatusSuccess) | ||
682 | 35 | opts->workerPriority = epicsThreadPriorityMedium; | ||
683 | 36 | } | ||
684 | 37 | |||
685 | 38 | epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts) | ||
686 | 39 | { | ||
687 | 40 | size_t i; | ||
688 | 41 | epicsThreadPool *pool; | ||
689 | 42 | |||
690 | 43 | /* caller likely didn't initialize the options structure */ | ||
691 | 44 | if(opts && opts->maxThreads==0) { | ||
692 | 45 | errlogMessage("Error: epicsThreadPoolCreate() options provided, but not initialized"); | ||
693 | 46 | return NULL; | ||
694 | 47 | } | ||
695 | 48 | |||
696 | 49 | pool=calloc(1, sizeof(*pool)); | ||
697 | 50 | if(!pool) | ||
698 | 51 | return NULL; | ||
699 | 52 | |||
700 | 53 | if(opts) | ||
701 | 54 | memcpy(&pool->conf, opts, sizeof(*opts)); | ||
702 | 55 | else | ||
703 | 56 | epicsThreadPoolConfigDefaults(&pool->conf); | ||
704 | 57 | |||
705 | 58 | if(pool->conf.initialThreads > pool->conf.maxThreads) | ||
706 | 59 | pool->conf.initialThreads = pool->conf.maxThreads; | ||
707 | 60 | |||
708 | 61 | pool->workerWakeup=epicsEventCreate(epicsEventEmpty); | ||
709 | 62 | pool->shutdownEvent=epicsEventCreate(epicsEventEmpty); | ||
710 | 63 | pool->observerWakeup=epicsEventCreate(epicsEventEmpty); | ||
711 | 64 | pool->guard=epicsMutexCreate(); | ||
712 | 65 | |||
713 | 66 | if(!pool->workerWakeup || !pool->shutdownEvent || | ||
714 | 67 | !pool->observerWakeup || !pool->guard) | ||
715 | 68 | goto cleanup; | ||
716 | 69 | |||
717 | 70 | ellInit(&pool->jobs); | ||
718 | 71 | ellInit(&pool->owned); | ||
719 | 72 | |||
720 | 73 | epicsMutexMustLock(pool->guard); | ||
721 | 74 | |||
722 | 75 | for(i=0; i<pool->conf.initialThreads; i++) { | ||
723 | 76 | createPoolThread(pool); | ||
724 | 77 | } | ||
725 | 78 | |||
726 | 79 | if(pool->threadsRunning==0 && pool->conf.initialThreads!=0) { | ||
727 | 80 | epicsMutexUnlock(pool->guard); | ||
728 | 81 | errlogPrintf("Error: Unable to create any threads for thread pool\n"); | ||
729 | 82 | goto cleanup; | ||
730 | 83 | |||
731 | 84 | }else if(pool->threadsRunning < pool->conf.initialThreads) { | ||
732 | 85 | errlogPrintf("Warning: Unable to create all threads for thread pool (%lu/%lu)\n", | ||
733 | 86 | (unsigned long)pool->threadsRunning, | ||
734 | 87 | (unsigned long)pool->conf.initialThreads); | ||
735 | 88 | } | ||
736 | 89 | |||
737 | 90 | epicsMutexUnlock(pool->guard); | ||
738 | 91 | |||
739 | 92 | return pool; | ||
740 | 93 | |||
741 | 94 | cleanup: | ||
742 | 95 | if(pool->workerWakeup) epicsEventDestroy(pool->workerWakeup); | ||
743 | 96 | if(pool->shutdownEvent) epicsEventDestroy(pool->shutdownEvent); | ||
744 | 97 | if(pool->observerWakeup) epicsEventDestroy(pool->observerWakeup); | ||
745 | 98 | if(pool->guard) epicsMutexDestroy(pool->guard); | ||
746 | 99 | |||
747 | 100 | free(pool); | ||
748 | 101 | return 0; | ||
749 | 102 | } | ||
750 | 103 | |||
751 | 104 | static | ||
752 | 105 | void epicsThreadPoolControlImpl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val) | ||
753 | 106 | { | ||
754 | 107 | if(pool->freezeopt) | ||
755 | 108 | return; | ||
756 | 109 | |||
757 | 110 | if(opt==epicsThreadPoolQueueAdd) { | ||
758 | 111 | pool->pauseadd = !val; | ||
759 | 112 | |||
760 | 113 | } else if(opt==epicsThreadPoolQueueRun) { | ||
761 | 114 | if(!val && !pool->pauserun) | ||
762 | 115 | pool->pauserun=1; | ||
763 | 116 | |||
764 | 117 | else if(val && pool->pauserun) { | ||
765 | 118 | size_t jobs=(size_t)ellCount(&pool->jobs); | ||
766 | 119 | pool->pauserun=0; | ||
767 | 120 | |||
768 | 121 | if(jobs) { | ||
769 | 122 | size_t wakeable=pool->threadsSleeping - pool->threadsWaking; | ||
770 | 123 | /* first try to give jobs to sleeping workers */ | ||
771 | 124 | if(wakeable) { | ||
772 | 125 | int wakeup = jobs > wakeable ? wakeable : jobs; | ||
773 | 126 | assert(wakeup>0); | ||
774 | 127 | jobs-=wakeup; | ||
775 | 128 | pool->threadsWaking+=wakeup; | ||
776 | 129 | epicsEventSignal(pool->workerWakeup); | ||
777 | 130 | CHECKCOUNT(pool); | ||
778 | 131 | } | ||
779 | 132 | } | ||
780 | 133 | while(jobs-- && pool->threadsRunning < pool->conf.maxThreads) { | ||
781 | 134 | if(createPoolThread(pool)==0) { | ||
782 | 135 | pool->threadsWaking++; | ||
783 | 136 | epicsEventSignal(pool->workerWakeup); | ||
784 | 137 | } else | ||
785 | 138 | break; /* oops, couldn't create worker */ | ||
786 | 139 | } | ||
787 | 140 | CHECKCOUNT(pool); | ||
788 | 141 | } | ||
789 | 142 | } | ||
790 | 143 | /* unknown options ignored */ | ||
791 | 144 | |||
792 | 145 | } | ||
793 | 146 | |||
794 | 147 | void epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val) | ||
795 | 148 | { | ||
796 | 149 | epicsMutexMustLock(pool->guard); | ||
797 | 150 | epicsThreadPoolControlImpl(pool, opt, val); | ||
798 | 151 | epicsMutexUnlock(pool->guard); | ||
799 | 152 | } | ||
800 | 153 | |||
801 | 154 | int epicsThreadPoolWait(epicsThreadPool* pool, double timeout) | ||
802 | 155 | { | ||
803 | 156 | int ret=0; | ||
804 | 157 | epicsMutexMustLock(pool->guard); | ||
805 | 158 | |||
806 | 159 | while(ellCount(&pool->jobs)>0 || pool->threadsAreAwake>0) { | ||
807 | 160 | pool->observerCount++; | ||
808 | 161 | epicsMutexUnlock(pool->guard); | ||
809 | 162 | |||
810 | 163 | if(timeout<0.0) { | ||
811 | 164 | epicsEventMustWait(pool->observerWakeup); | ||
812 | 165 | |||
813 | 166 | } else { | ||
814 | 167 | switch(epicsEventWaitWithTimeout(pool->observerWakeup, timeout)) { | ||
815 | 168 | case epicsEventWaitError: | ||
816 | 169 | cantProceed("epicsThreadPoolWait: failed to wait for Event"); | ||
817 | 170 | break; | ||
818 | 171 | case epicsEventWaitTimeout: | ||
819 | 172 | ret=ETIMEDOUT; | ||
820 | 173 | break; | ||
821 | 174 | case epicsEventWaitOK: | ||
822 | 175 | ret=0; | ||
823 | 176 | break; | ||
824 | 177 | } | ||
825 | 178 | |||
826 | 179 | } | ||
827 | 180 | |||
828 | 181 | epicsMutexMustLock(pool->guard); | ||
829 | 182 | pool->observerCount--; | ||
830 | 183 | |||
831 | 184 | if(pool->observerCount) | ||
832 | 185 | epicsEventSignal(pool->observerWakeup); | ||
833 | 186 | |||
834 | 187 | if(ret!=0) | ||
835 | 188 | break; | ||
836 | 189 | } | ||
837 | 190 | |||
838 | 191 | epicsMutexUnlock(pool->guard); | ||
839 | 192 | return ret; | ||
840 | 193 | } | ||
841 | 194 | |||
842 | 195 | void epicsThreadPoolDestroy(epicsThreadPool *pool) | ||
843 | 196 | { | ||
844 | 197 | size_t nThr; | ||
845 | 198 | ELLLIST notify; | ||
846 | 199 | ELLNODE *cur; | ||
847 | 200 | |||
848 | 201 | if(!pool) | ||
849 | 202 | return; | ||
850 | 203 | |||
851 | 204 | ellInit(¬ify); | ||
852 | 205 | |||
853 | 206 | epicsMutexMustLock(pool->guard); | ||
854 | 207 | |||
855 | 208 | /* run remaining queued jobs */ | ||
856 | 209 | epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueAdd, 0); | ||
857 | 210 | epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueRun, 1); | ||
858 | 211 | nThr=pool->threadsRunning; | ||
859 | 212 | pool->freezeopt = 1; | ||
860 | 213 | |||
861 | 214 | epicsMutexUnlock(pool->guard); | ||
862 | 215 | |||
863 | 216 | epicsThreadPoolWait(pool, -1.0); | ||
864 | 217 | /* At this point all queued jobs have run */ | ||
865 | 218 | |||
866 | 219 | epicsMutexMustLock(pool->guard); | ||
867 | 220 | |||
868 | 221 | pool->shutdown=1; | ||
869 | 222 | /* wakeup all */ | ||
870 | 223 | if(pool->threadsWaking < pool->threadsSleeping) { | ||
871 | 224 | pool->threadsWaking = pool->threadsSleeping; | ||
872 | 225 | epicsEventSignal(pool->workerWakeup); | ||
873 | 226 | } | ||
874 | 227 | |||
875 | 228 | ellConcat(¬ify, &pool->owned); | ||
876 | 229 | ellConcat(¬ify, &pool->jobs); | ||
877 | 230 | |||
878 | 231 | epicsMutexUnlock(pool->guard); | ||
879 | 232 | |||
880 | 233 | if(nThr && epicsEventWait(pool->shutdownEvent)!=epicsEventWaitOK){ | ||
881 | 234 | errlogMessage("epicsThreadPoolDestroy: wait error"); | ||
882 | 235 | return; | ||
883 | 236 | } | ||
884 | 237 | |||
885 | 238 | /* all workers are now shutdown */ | ||
886 | 239 | |||
887 | 240 | /* notify remaining jobs that pool is being destroyed */ | ||
888 | 241 | while( (cur=ellGet(¬ify))!=NULL ) | ||
889 | 242 | { | ||
890 | 243 | epicsJob *job=CONTAINER(cur, epicsJob, jobnode); | ||
891 | 244 | job->running=1; | ||
892 | 245 | (*job->func)(job->arg, epicsJobModeCleanup); | ||
893 | 246 | job->running=0; | ||
894 | 247 | if(job->freewhendone) | ||
895 | 248 | free(job); | ||
896 | 249 | else | ||
897 | 250 | job->pool=NULL; /* orphan */ | ||
898 | 251 | } | ||
899 | 252 | |||
900 | 253 | epicsEventDestroy(pool->workerWakeup); | ||
901 | 254 | epicsEventDestroy(pool->shutdownEvent); | ||
902 | 255 | epicsEventDestroy(pool->observerWakeup); | ||
903 | 256 | epicsMutexDestroy(pool->guard); | ||
904 | 257 | |||
905 | 258 | free(pool); | ||
906 | 259 | } | ||
907 | 260 | |||
908 | 261 | |||
909 | 262 | void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd) | ||
910 | 263 | { | ||
911 | 264 | ELLNODE *cur; | ||
912 | 265 | epicsMutexMustLock(pool->guard); | ||
913 | 266 | |||
914 | 267 | fprintf(fd, "Thread Pool with %u/%u threads\n" | ||
915 | 268 | " running %d jobs with %u threads\n", | ||
916 | 269 | pool->threadsRunning, | ||
917 | 270 | pool->conf.maxThreads, | ||
918 | 271 | ellCount(&pool->jobs), | ||
919 | 272 | pool->threadsAreAwake); | ||
920 | 273 | if(pool->pauseadd) | ||
921 | 274 | fprintf(fd, " Inhibit queueing\n"); | ||
922 | 275 | if(pool->pauserun) | ||
923 | 276 | fprintf(fd, " Pause workers\n"); | ||
924 | 277 | if(pool->shutdown) | ||
925 | 278 | fprintf(fd, " Shutdown in progress\n"); | ||
926 | 279 | |||
927 | 280 | for(cur=ellFirst(&pool->jobs); cur; cur=ellNext(cur)) | ||
928 | 281 | { | ||
929 | 282 | epicsJob *job=CONTAINER(cur, epicsJob, jobnode); | ||
930 | 283 | fprintf(fd, " job %p func: %p, arg: %p ", | ||
931 | 284 | job, job->func, | ||
932 | 285 | job->arg); | ||
933 | 286 | if(job->queued) | ||
934 | 287 | fprintf(fd, "Queued "); | ||
935 | 288 | if(job->running) | ||
936 | 289 | fprintf(fd, "Running "); | ||
937 | 290 | if(job->freewhendone) | ||
938 | 291 | fprintf(fd, "Free "); | ||
939 | 292 | fprintf(fd, "\n"); | ||
940 | 293 | } | ||
941 | 294 | |||
942 | 295 | epicsMutexUnlock(pool->guard); | ||
943 | 296 | } | ||
944 | 297 | |||
945 | 298 | unsigned int epicsThreadPoolNThreads(epicsThreadPool *pool) | ||
946 | 299 | { | ||
947 | 300 | unsigned int ret; | ||
948 | 301 | |||
949 | 302 | epicsMutexMustLock(pool->guard); | ||
950 | 303 | ret=pool->threadsRunning; | ||
951 | 304 | epicsMutexUnlock(pool->guard); | ||
952 | 305 | |||
953 | 306 | return ret; | ||
954 | 307 | } | ||
955 | 308 | |||
956 | 309 | static | ||
957 | 310 | ELLLIST sharedPools = ELLLIST_INIT; | ||
958 | 311 | |||
959 | 312 | static | ||
960 | 313 | epicsMutexId sharedPoolsGuard; | ||
961 | 314 | |||
962 | 315 | static | ||
963 | 316 | epicsThreadOnceId sharedPoolsOnce = EPICS_THREAD_ONCE_INIT; | ||
964 | 317 | |||
965 | 318 | static | ||
966 | 319 | void sharedPoolsInit(void* unused) | ||
967 | 320 | { | ||
968 | 321 | sharedPoolsGuard = epicsMutexMustCreate(); | ||
969 | 322 | } | ||
970 | 323 | |||
971 | 324 | epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts) | ||
972 | 325 | { | ||
973 | 326 | ELLNODE *node; | ||
974 | 327 | epicsThreadPool *cur; | ||
975 | 328 | epicsThreadPoolConfig defopts; | ||
976 | 329 | size_t N=epicsThreadGetCPUs(); | ||
977 | 330 | |||
978 | 331 | if(!opts) { | ||
979 | 332 | epicsThreadPoolConfigDefaults(&defopts); | ||
980 | 333 | opts = &defopts; | ||
981 | 334 | } | ||
982 | 335 | /* shared pools must have a minimum allowed number of workers. | ||
983 | 336 | * Use the number of CPU cores | ||
984 | 337 | */ | ||
985 | 338 | if(opts->maxThreads<N) | ||
986 | 339 | opts->maxThreads=N; | ||
987 | 340 | |||
988 | 341 | epicsThreadOnce(&sharedPoolsOnce, &sharedPoolsInit, NULL); | ||
989 | 342 | |||
990 | 343 | epicsMutexMustLock(sharedPoolsGuard); | ||
991 | 344 | |||
992 | 345 | for(node=ellFirst(&sharedPools); node; node=ellNext(node)) | ||
993 | 346 | { | ||
994 | 347 | cur=CONTAINER(node, epicsThreadPool, sharedNode); | ||
995 | 348 | |||
996 | 349 | /* Must have exactly the requested priority | ||
997 | 350 | * At least the requested max workers | ||
998 | 351 | * and at least the requested stack size | ||
999 | 352 | */ | ||
1000 | 353 | if(cur->conf.workerPriority != opts->workerPriority) | ||
1001 | 354 | continue; | ||
1002 | 355 | if(cur->conf.maxThreads < opts->maxThreads) | ||
1003 | 356 | continue; | ||
1004 | 357 | if(cur->conf.workerStack < opts->workerStack) | ||
1005 | 358 | continue; | ||
1006 | 359 | |||
1007 | 360 | cur->sharedCount++; | ||
1008 | 361 | assert(cur->sharedCount>0); | ||
1009 | 362 | epicsMutexUnlock(sharedPoolsGuard); | ||
1010 | 363 | |||
1011 | 364 | epicsMutexMustLock(cur->guard); | ||
1012 | 365 | *opts = cur->conf; | ||
1013 | 366 | epicsMutexUnlock(cur->guard); | ||
1014 | 367 | return cur; | ||
1015 | 368 | } | ||
1016 | 369 | |||
1017 | 370 | cur=epicsThreadPoolCreate(opts); | ||
1018 | 371 | if(!cur) { | ||
1019 | 372 | epicsMutexUnlock(sharedPoolsGuard); | ||
1020 | 373 | return NULL; | ||
1021 | 374 | } | ||
1022 | 375 | cur->sharedCount=1; | ||
1023 | 376 | |||
1024 | 377 | ellAdd(&sharedPools, &cur->sharedNode); | ||
1025 | 378 | epicsMutexUnlock(sharedPoolsGuard); | ||
1026 | 379 | return cur; | ||
1027 | 380 | } | ||
1028 | 381 | |||
1029 | 382 | epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool) | ||
1030 | 383 | { | ||
1031 | 384 | if(!pool) | ||
1032 | 385 | return; | ||
1033 | 386 | |||
1034 | 387 | epicsMutexMustLock(sharedPoolsGuard); | ||
1035 | 388 | |||
1036 | 389 | assert(pool->sharedCount>0); | ||
1037 | 390 | |||
1038 | 391 | pool->sharedCount--; | ||
1039 | 392 | |||
1040 | 393 | if(pool->sharedCount==0) { | ||
1041 | 394 | ellDelete(&sharedPools, &pool->sharedNode); | ||
1042 | 395 | epicsThreadPoolDestroy(pool); | ||
1043 | 396 | } | ||
1044 | 397 | |||
1045 | 398 | epicsMutexUnlock(sharedPoolsGuard); | ||
1046 | 399 | } | ||
1047 | 0 | 400 | ||
1048 | === modified file 'src/libCom/test/Makefile' | |||
1049 | --- src/libCom/test/Makefile 2013-12-17 18:54:04 +0000 | |||
1050 | +++ src/libCom/test/Makefile 2014-07-29 16:22:24 +0000 | |||
1051 | @@ -12,6 +12,10 @@ | |||
1052 | 12 | 12 | ||
1053 | 13 | PROD_LIBS += Com | 13 | PROD_LIBS += Com |
1054 | 14 | 14 | ||
1055 | 15 | TESTPROD_HOST += epicsThreadPoolTest | ||
1056 | 16 | epicsThreadPoolTest_SRCS += epicsThreadPoolTest.c | ||
1057 | 17 | TESTS += epicsThreadPoolTest | ||
1058 | 18 | |||
1059 | 15 | TESTPROD_HOST += epicsUnitTestTest | 19 | TESTPROD_HOST += epicsUnitTestTest |
1060 | 16 | epicsUnitTestTest_SRCS += epicsUnitTestTest.c | 20 | epicsUnitTestTest_SRCS += epicsUnitTestTest.c |
1061 | 17 | # Not much point running this on vxWorks or RTEMS... | 21 | # Not much point running this on vxWorks or RTEMS... |
1062 | 18 | 22 | ||
1063 | === added file 'src/libCom/test/epicsThreadPoolTest.c' | |||
1064 | --- src/libCom/test/epicsThreadPoolTest.c 1970-01-01 00:00:00 +0000 | |||
1065 | +++ src/libCom/test/epicsThreadPoolTest.c 2014-07-29 16:22:24 +0000 | |||
1066 | @@ -0,0 +1,443 @@ | |||
1067 | 1 | /*************************************************************************\ | ||
1068 | 2 | * Copyright (c) 2014 Brookhaven Science Associates, as Operator of | ||
1069 | 3 | * Brookhaven National Laboratory. | ||
1070 | 4 | * EPICS BASE is distributed subject to a Software License Agreement found | ||
1071 | 5 | * in file LICENSE that is included with this distribution. | ||
1072 | 6 | \*************************************************************************/ | ||
1073 | 7 | |||
1074 | 8 | #include "epicsThreadPool.h" | ||
1075 | 9 | |||
1076 | 10 | /* included to allow tests to peek */ | ||
1077 | 11 | #include "../../pool/poolPriv.h" | ||
1078 | 12 | |||
1079 | 13 | #include "testMain.h" | ||
1080 | 14 | #include "epicsUnitTest.h" | ||
1081 | 15 | |||
1082 | 16 | #include "cantProceed.h" | ||
1083 | 17 | #include "epicsEvent.h" | ||
1084 | 18 | #include "epicsMutex.h" | ||
1085 | 19 | #include "epicsThread.h" | ||
1086 | 20 | |||
1087 | 21 | /* Do nothing */ | ||
1088 | 22 | static void nullop(void) | ||
1089 | 23 | { | ||
1090 | 24 | epicsThreadPool *pool; | ||
1091 | 25 | testDiag("nullop()"); | ||
1092 | 26 | { | ||
1093 | 27 | epicsThreadPoolConfig conf; | ||
1094 | 28 | epicsThreadPoolConfigDefaults(&conf); | ||
1095 | 29 | testOk1(conf.maxThreads>0); | ||
1096 | 30 | |||
1097 | 31 | testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL); | ||
1098 | 32 | if(!pool) | ||
1099 | 33 | return; | ||
1100 | 34 | } | ||
1101 | 35 | |||
1102 | 36 | epicsThreadPoolDestroy(pool); | ||
1103 | 37 | } | ||
1104 | 38 | |||
1105 | 39 | /* Just create and destroy worker threads */ | ||
1106 | 40 | static void oneop(void) | ||
1107 | 41 | { | ||
1108 | 42 | epicsThreadPool *pool; | ||
1109 | 43 | testDiag("oneop()"); | ||
1110 | 44 | { | ||
1111 | 45 | epicsThreadPoolConfig conf; | ||
1112 | 46 | epicsThreadPoolConfigDefaults(&conf); | ||
1113 | 47 | conf.initialThreads=2; | ||
1114 | 48 | testOk1(conf.maxThreads>0); | ||
1115 | 49 | |||
1116 | 50 | testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL); | ||
1117 | 51 | if(!pool) | ||
1118 | 52 | return; | ||
1119 | 53 | } | ||
1120 | 54 | |||
1121 | 55 | epicsThreadPoolDestroy(pool); | ||
1122 | 56 | } | ||
1123 | 57 | |||
1124 | 58 | /* Test that Bursts of jobs will create enough threads to | ||
1125 | 59 | * run all in parallel | ||
1126 | 60 | */ | ||
1127 | 61 | typedef struct { | ||
1128 | 62 | epicsMutexId guard; | ||
1129 | 63 | unsigned int count; | ||
1130 | 64 | epicsEventId allrunning; | ||
1131 | 65 | epicsEventId done; | ||
1132 | 66 | epicsJob **job; | ||
1133 | 67 | } countPriv; | ||
1134 | 68 | |||
1135 | 69 | static void countjob(void *param, epicsJobMode mode) | ||
1136 | 70 | { | ||
1137 | 71 | countPriv *cnt=param; | ||
1138 | 72 | testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); | ||
1139 | 73 | if(mode==epicsJobModeCleanup) | ||
1140 | 74 | return; | ||
1141 | 75 | |||
1142 | 76 | epicsMutexMustLock(cnt->guard); | ||
1143 | 77 | testDiag("Job %lu", (unsigned long)cnt->count); | ||
1144 | 78 | cnt->count--; | ||
1145 | 79 | if(cnt->count==0) { | ||
1146 | 80 | testDiag("All jobs running"); | ||
1147 | 81 | epicsEventSignal(cnt->allrunning); | ||
1148 | 82 | } | ||
1149 | 83 | epicsMutexUnlock(cnt->guard); | ||
1150 | 84 | |||
1151 | 85 | epicsEventMustWait(cnt->done); | ||
1152 | 86 | epicsEventSignal(cnt->done); /* pass along to next thread */ | ||
1153 | 87 | } | ||
1154 | 88 | |||
1155 | 89 | /* Starts "mcnt" jobs in a pool with initial and max | ||
1156 | 90 | * thread counts "icnt" and "mcnt". | ||
1157 | 91 | * The test ensures that all jobs run in parallel. | ||
1158 | 92 | * "cork" checks the function of pausing the run queue | ||
1159 | 93 | * with epicsThreadPoolQueueRun | ||
1160 | 94 | */ | ||
1161 | 95 | static void postjobs(size_t icnt, size_t mcnt, int cork) | ||
1162 | 96 | { | ||
1163 | 97 | size_t i; | ||
1164 | 98 | epicsThreadPool *pool; | ||
1165 | 99 | countPriv *priv=callocMustSucceed(1, sizeof(*priv), "postjobs priv alloc"); | ||
1166 | 100 | priv->guard=epicsMutexMustCreate(); | ||
1167 | 101 | priv->done=epicsEventMustCreate(epicsEventEmpty); | ||
1168 | 102 | priv->allrunning=epicsEventMustCreate(epicsEventEmpty); | ||
1169 | 103 | priv->count=mcnt; | ||
1170 | 104 | priv->job=callocMustSucceed(mcnt, sizeof(*priv->job), "postjobs job array"); | ||
1171 | 105 | |||
1172 | 106 | testDiag("postjobs(%lu,%lu)", (unsigned long)icnt, (unsigned long)mcnt); | ||
1173 | 107 | |||
1174 | 108 | { | ||
1175 | 109 | epicsThreadPoolConfig conf; | ||
1176 | 110 | epicsThreadPoolConfigDefaults(&conf); | ||
1177 | 111 | conf.initialThreads=icnt; | ||
1178 | 112 | conf.maxThreads=mcnt; | ||
1179 | 113 | |||
1180 | 114 | testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL); | ||
1181 | 115 | if(!pool) | ||
1182 | 116 | return; | ||
1183 | 117 | } | ||
1184 | 118 | |||
1185 | 119 | if(cork) | ||
1186 | 120 | epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0); | ||
1187 | 121 | |||
1188 | 122 | for(i=0; i<mcnt; i++) { | ||
1189 | 123 | testDiag("i=%lu", (unsigned long)i); | ||
1190 | 124 | priv->job[i] = epicsJobCreate(pool, &countjob, priv); | ||
1191 | 125 | testOk1(priv->job[i]!=NULL); | ||
1192 | 126 | testOk1(epicsJobQueue(priv->job[i])==0); | ||
1193 | 127 | } | ||
1194 | 128 | |||
1195 | 129 | if(cork) { | ||
1196 | 130 | /* no jobs should have run */ | ||
1197 | 131 | epicsMutexMustLock(priv->guard); | ||
1198 | 132 | testOk1(priv->count==mcnt); | ||
1199 | 133 | epicsMutexUnlock(priv->guard); | ||
1200 | 134 | |||
1201 | 135 | epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1); | ||
1202 | 136 | } | ||
1203 | 137 | |||
1204 | 138 | testDiag("Waiting for all jobs to start"); | ||
1205 | 139 | epicsEventMustWait(priv->allrunning); | ||
1206 | 140 | testDiag("Stop all"); | ||
1207 | 141 | epicsEventSignal(priv->done); | ||
1208 | 142 | |||
1209 | 143 | for(i=0; i<mcnt; i++) { | ||
1210 | 144 | testDiag("i=%lu", (unsigned long)i); | ||
1211 | 145 | epicsJobDestroy(priv->job[i]); | ||
1212 | 146 | } | ||
1213 | 147 | |||
1214 | 148 | epicsThreadPoolDestroy(pool); | ||
1215 | 149 | epicsMutexDestroy(priv->guard); | ||
1216 | 150 | epicsEventDestroy(priv->allrunning); | ||
1217 | 151 | epicsEventDestroy(priv->done); | ||
1218 | 152 | free(priv->job); | ||
1219 | 153 | free(priv); | ||
1220 | 154 | } | ||
1221 | 155 | |||
1222 | 156 | static unsigned int flag0 = 0; | ||
1223 | 157 | |||
1224 | 158 | /* Test cancel from job (no-op) | ||
1225 | 159 | * and destroy from job (lazy free) | ||
1226 | 160 | */ | ||
1227 | 161 | static void cleanupjob0(void* arg, epicsJobMode mode) | ||
1228 | 162 | { | ||
1229 | 163 | epicsJob *job=arg; | ||
1230 | 164 | testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); | ||
1231 | 165 | if(mode==epicsJobModeCleanup) | ||
1232 | 166 | return; | ||
1233 | 167 | |||
1234 | 168 | assert(flag0==0); | ||
1235 | 169 | flag0=1; | ||
1236 | 170 | |||
1237 | 171 | testOk1(epicsJobQueue(job)==0); | ||
1238 | 172 | |||
1239 | 173 | epicsJobDestroy(job); /* delete while job is running */ | ||
1240 | 174 | } | ||
1241 | 175 | static void cleanupjob1(void* arg, epicsJobMode mode) | ||
1242 | 176 | { | ||
1243 | 177 | epicsJob *job=arg; | ||
1244 | 178 | testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); | ||
1245 | 179 | if(mode==epicsJobModeCleanup) | ||
1246 | 180 | return; | ||
1247 | 181 | |||
1248 | 182 | testOk1(epicsJobQueue(job)==0); | ||
1249 | 183 | |||
1250 | 184 | testOk1(epicsJobUnqueue(job)==0); | ||
1251 | 185 | /* delete later after job finishes, but before pool is destroyed */ | ||
1252 | 186 | } | ||
1253 | 187 | static void cleanupjob2(void* arg, epicsJobMode mode) | ||
1254 | 188 | { | ||
1255 | 189 | epicsJob *job=arg; | ||
1256 | 190 | testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); | ||
1257 | 191 | if(mode==epicsJobModeCleanup) | ||
1258 | 192 | epicsJobDestroy(job); /* delete when threadpool is destroyed */ | ||
1259 | 193 | else if(mode==epicsJobModeRun) | ||
1260 | 194 | testOk1(epicsJobUnqueue(job)==1); | ||
1261 | 195 | } | ||
1262 | 196 | static epicsJobFunction cleanupjobs[3] = {&cleanupjob0,&cleanupjob1,&cleanupjob2}; | ||
1263 | 197 | |||
1264 | 198 | /* Tests three methods for job cleanup. | ||
1265 | 199 | * 1. destroy which running | ||
1266 | 200 | * 2. deferred cleanup after pool destroyed | ||
1267 | 201 | * 3. immediate cleanup when pool destroyed | ||
1268 | 202 | */ | ||
1269 | 203 | static void testcleanup(void) | ||
1270 | 204 | { | ||
1271 | 205 | int i=0; | ||
1272 | 206 | epicsThreadPool *pool; | ||
1273 | 207 | epicsJob *job[3]; | ||
1274 | 208 | |||
1275 | 209 | testDiag("testcleanup()"); | ||
1276 | 210 | |||
1277 | 211 | testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL); | ||
1278 | 212 | if(!pool) | ||
1279 | 213 | return; | ||
1280 | 214 | |||
1281 | 215 | /* unrolled so that valgrind can show which methods leaks */ | ||
1282 | 216 | testOk1((job[0]=epicsJobCreate(pool, cleanupjobs[0], EPICSJOB_SELF))!=NULL); | ||
1283 | 217 | testOk1((job[1]=epicsJobCreate(pool, cleanupjobs[1], EPICSJOB_SELF))!=NULL); | ||
1284 | 218 | testOk1((job[2]=epicsJobCreate(pool, cleanupjobs[2], EPICSJOB_SELF))!=NULL); | ||
1285 | 219 | for(i=0; i<3; i++) { | ||
1286 | 220 | testOk1(epicsJobQueue(job[i])==0); | ||
1287 | 221 | } | ||
1288 | 222 | |||
1289 | 223 | epicsThreadPoolWait(pool, -1); | ||
1290 | 224 | epicsJobDestroy(job[1]); | ||
1291 | 225 | epicsThreadPoolDestroy(pool); | ||
1292 | 226 | } | ||
1293 | 227 | |||
1294 | 228 | /* Test re-add from inside job */ | ||
1295 | 229 | typedef struct { | ||
1296 | 230 | unsigned int count; | ||
1297 | 231 | epicsEventId done; | ||
1298 | 232 | epicsJob *job; | ||
1299 | 233 | unsigned int inprogress; | ||
1300 | 234 | } readdPriv; | ||
1301 | 235 | |||
1302 | 236 | static void readdjob(void *arg, epicsJobMode mode) | ||
1303 | 237 | { | ||
1304 | 238 | readdPriv *priv=arg; | ||
1305 | 239 | testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); | ||
1306 | 240 | if(mode==epicsJobModeCleanup) | ||
1307 | 241 | return; | ||
1308 | 242 | testOk1(priv->inprogress==0); | ||
1309 | 243 | testDiag("count==%u", priv->count); | ||
1310 | 244 | |||
1311 | 245 | if(priv->count--) { | ||
1312 | 246 | priv->inprogress=1; | ||
1313 | 247 | epicsJobQueue(priv->job); | ||
1314 | 248 | epicsThreadSleep(0.05); | ||
1315 | 249 | priv->inprogress=0; | ||
1316 | 250 | }else{ | ||
1317 | 251 | epicsEventSignal(priv->done); | ||
1318 | 252 | epicsJobDestroy(priv->job); | ||
1319 | 253 | } | ||
1320 | 254 | } | ||
1321 | 255 | |||
1322 | 256 | /* Test re-queueing a job while it is running. | ||
1323 | 257 | * Check that a single job won't run concurrently. | ||
1324 | 258 | */ | ||
1325 | 259 | static void testreadd(void) { | ||
1326 | 260 | epicsThreadPool *pool; | ||
1327 | 261 | readdPriv *priv=callocMustSucceed(1, sizeof(*priv), "testcleanup priv"); | ||
1328 | 262 | readdPriv *priv2=callocMustSucceed(1, sizeof(*priv), "testcleanup priv"); | ||
1329 | 263 | |||
1330 | 264 | testDiag("testreadd"); | ||
1331 | 265 | |||
1332 | 266 | priv->done=epicsEventMustCreate(epicsEventEmpty); | ||
1333 | 267 | priv->count=5; | ||
1334 | 268 | priv2->done=epicsEventMustCreate(epicsEventEmpty); | ||
1335 | 269 | priv2->count=5; | ||
1336 | 270 | |||
1337 | 271 | testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL); | ||
1338 | 272 | if(!pool) | ||
1339 | 273 | return; | ||
1340 | 274 | |||
1341 | 275 | testOk1((priv->job=epicsJobCreate(pool, &readdjob, priv))!=NULL); | ||
1342 | 276 | testOk1((priv2->job=epicsJobCreate(pool, &readdjob, priv2))!=NULL); | ||
1343 | 277 | |||
1344 | 278 | testOk1(epicsJobQueue(priv->job)==0); | ||
1345 | 279 | testOk1(epicsJobQueue(priv2->job)==0); | ||
1346 | 280 | epicsEventMustWait(priv->done); | ||
1347 | 281 | epicsEventMustWait(priv2->done); | ||
1348 | 282 | |||
1349 | 283 | testOk1(epicsThreadPoolNThreads(pool)==2); | ||
1350 | 284 | |||
1351 | 285 | epicsThreadPoolDestroy(pool); | ||
1352 | 286 | epicsEventDestroy(priv->done); | ||
1353 | 287 | epicsEventDestroy(priv2->done); | ||
1354 | 288 | free(priv); | ||
1355 | 289 | free(priv2); | ||
1356 | 290 | |||
1357 | 291 | } | ||
1358 | 292 | |||
1359 | 293 | static int shouldneverrun = 0; | ||
1360 | 294 | static int numtoolate = 0; | ||
1361 | 295 | |||
1362 | 296 | /* test job canceling */ | ||
1363 | 297 | static | ||
1364 | 298 | void neverrun(void *arg, epicsJobMode mode) | ||
1365 | 299 | { | ||
1366 | 300 | epicsJob *job=arg; | ||
1367 | 301 | testOk1(mode==epicsJobModeCleanup); | ||
1368 | 302 | if(mode==epicsJobModeCleanup) | ||
1369 | 303 | epicsJobDestroy(job); | ||
1370 | 304 | else | ||
1371 | 305 | shouldneverrun++; | ||
1372 | 306 | } | ||
1373 | 307 | static epicsEventId cancel[2]; | ||
1374 | 308 | static | ||
1375 | 309 | void toolate(void *arg, epicsJobMode mode) | ||
1376 | 310 | { | ||
1377 | 311 | epicsJob *job=arg; | ||
1378 | 312 | if(mode==epicsJobModeCleanup){ | ||
1379 | 313 | epicsJobDestroy(job); | ||
1380 | 314 | return; | ||
1381 | 315 | } | ||
1382 | 316 | testPass("Job runs"); | ||
1383 | 317 | numtoolate++; | ||
1384 | 318 | epicsEventSignal(cancel[0]); | ||
1385 | 319 | epicsEventMustWait(cancel[1]); | ||
1386 | 320 | } | ||
1387 | 321 | |||
1388 | 322 | static | ||
1389 | 323 | void testcancel(void) | ||
1390 | 324 | { | ||
1391 | 325 | epicsJob *job[2]; | ||
1392 | 326 | epicsThreadPool *pool; | ||
1393 | 327 | testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL); | ||
1394 | 328 | if(!pool) | ||
1395 | 329 | return; | ||
1396 | 330 | |||
1397 | 331 | cancel[0]=epicsEventCreate(epicsEventEmpty); | ||
1398 | 332 | cancel[1]=epicsEventCreate(epicsEventEmpty); | ||
1399 | 333 | |||
1400 | 334 | testOk1((job[0]=epicsJobCreate(pool, &neverrun, EPICSJOB_SELF))!=NULL); | ||
1401 | 335 | testOk1((job[1]=epicsJobCreate(pool, &toolate, EPICSJOB_SELF))!=NULL); | ||
1402 | 336 | |||
1403 | 337 | /* freeze */ | ||
1404 | 338 | epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0); | ||
1405 | 339 | |||
1406 | 340 | testOk1(epicsJobUnqueue(job[0])==1); /* not queued yet */ | ||
1407 | 341 | |||
1408 | 342 | epicsJobQueue(job[0]); | ||
1409 | 343 | testOk1(epicsJobUnqueue(job[0])==0); | ||
1410 | 344 | testOk1(epicsJobUnqueue(job[0])==1); | ||
1411 | 345 | |||
1412 | 346 | epicsThreadSleep(0.01); | ||
1413 | 347 | epicsJobQueue(job[0]); | ||
1414 | 348 | testOk1(epicsJobUnqueue(job[0])==0); | ||
1415 | 349 | testOk1(epicsJobUnqueue(job[0])==1); | ||
1416 | 350 | |||
1417 | 351 | epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1); | ||
1418 | 352 | |||
1419 | 353 | epicsJobQueue(job[1]); /* actually let it run this time */ | ||
1420 | 354 | |||
1421 | 355 | epicsEventMustWait(cancel[0]); | ||
1422 | 356 | testOk1(epicsJobUnqueue(job[0])==1); | ||
1423 | 357 | epicsEventSignal(cancel[1]); | ||
1424 | 358 | |||
1425 | 359 | epicsThreadPoolDestroy(pool); | ||
1426 | 360 | epicsEventDestroy(cancel[0]); | ||
1427 | 361 | epicsEventDestroy(cancel[1]); | ||
1428 | 362 | |||
1429 | 363 | testOk1(shouldneverrun==0); | ||
1430 | 364 | testOk1(numtoolate==1); | ||
1431 | 365 | } | ||
1432 | 366 | |||
1433 | 367 | static | ||
1434 | 368 | unsigned int sharedWasDeleted=0; | ||
1435 | 369 | |||
1436 | 370 | static | ||
1437 | 371 | void lastjob(void *arg, epicsJobMode mode) | ||
1438 | 372 | { | ||
1439 | 373 | epicsJob *job=arg; | ||
1440 | 374 | if(mode==epicsJobModeCleanup) { | ||
1441 | 375 | sharedWasDeleted=1; | ||
1442 | 376 | epicsJobDestroy(job); | ||
1443 | 377 | } | ||
1444 | 378 | } | ||
1445 | 379 | |||
1446 | 380 | static | ||
1447 | 381 | void testshared(void) | ||
1448 | 382 | { | ||
1449 | 383 | epicsThreadPool *poolA, *poolB; | ||
1450 | 384 | epicsThreadPoolConfig conf; | ||
1451 | 385 | epicsJob *job; | ||
1452 | 386 | |||
1453 | 387 | epicsThreadPoolConfigDefaults(&conf); | ||
1454 | 388 | |||
1455 | 389 | testDiag("Check reference counting of shared pools"); | ||
1456 | 390 | |||
1457 | 391 | testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL); | ||
1458 | 392 | |||
1459 | 393 | testOk1(poolA->sharedCount==1); | ||
1460 | 394 | |||
1461 | 395 | testOk1((poolB=epicsThreadPoolGetShared(&conf))!=NULL); | ||
1462 | 396 | |||
1463 | 397 | testOk1(poolA==poolB); | ||
1464 | 398 | |||
1465 | 399 | testOk1(poolA->sharedCount==2); | ||
1466 | 400 | |||
1467 | 401 | epicsThreadPoolReleaseShared(poolA); | ||
1468 | 402 | |||
1469 | 403 | testOk1(poolB->sharedCount==1); | ||
1470 | 404 | |||
1471 | 405 | testOk1((job=epicsJobCreate(poolB, &lastjob, EPICSJOB_SELF))!=NULL); | ||
1472 | 406 | |||
1473 | 407 | epicsThreadPoolReleaseShared(poolB); | ||
1474 | 408 | |||
1475 | 409 | testOk1(sharedWasDeleted==1); | ||
1476 | 410 | |||
1477 | 411 | testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL); | ||
1478 | 412 | |||
1479 | 413 | testOk1(poolA->sharedCount==1); | ||
1480 | 414 | |||
1481 | 415 | epicsThreadPoolReleaseShared(poolA); | ||
1482 | 416 | |||
1483 | 417 | } | ||
1484 | 418 | |||
1485 | 419 | MAIN(epicsThreadPoolTest) | ||
1486 | 420 | { | ||
1487 | 421 | testPlan(171); | ||
1488 | 422 | |||
1489 | 423 | nullop(); | ||
1490 | 424 | oneop(); | ||
1491 | 425 | testDiag("Queue with delayed start"); | ||
1492 | 426 | postjobs(1,1,1); | ||
1493 | 427 | postjobs(0,1,1); | ||
1494 | 428 | postjobs(4,4,1); | ||
1495 | 429 | postjobs(0,4,1); | ||
1496 | 430 | postjobs(2,4,1); | ||
1497 | 431 | testDiag("Queue with immediate start"); | ||
1498 | 432 | postjobs(1,1,0); | ||
1499 | 433 | postjobs(0,1,0); | ||
1500 | 434 | postjobs(4,4,0); | ||
1501 | 435 | postjobs(0,4,0); | ||
1502 | 436 | postjobs(2,4,0); | ||
1503 | 437 | testcleanup(); | ||
1504 | 438 | testreadd(); | ||
1505 | 439 | testcancel(); | ||
1506 | 440 | testshared(); | ||
1507 | 441 | |||
1508 | 442 | return testDone(); | ||
1509 | 443 | } |
* Include in RTEMS/vxworks test harness
Done.
* For RTEMS/vxworks, replace mutex with interrupt disable
Done.
* Avoid excessive wakeups from worker and observer Events.
Done.
Also fixed several issues and added calls to shared thread pools.
Remaining task
Get approval of name epicsMaxParalle lThreads( )