Merge ~info-martin-konrad/epics-base:callbackQueueStatus into ~epics-core/epics-base/+git/epics-base:3.16

Proposed by Martin Konrad
Status: Merged
Approved by: Andrew Johnson
Approved revision: 6f919c3991bc264e91a313415a402250bab463ac
Merged at revision: ec036cb26d6664fa463a076d20cd3b41a09821bd
Proposed branch: ~info-martin-konrad/epics-base:callbackQueueStatus
Merge into: ~epics-core/epics-base/+git/epics-base:3.16
Diff against target: 631 lines (+232/-20)
11 files modified
src/ioc/db/callback.c (+47/-0)
src/ioc/db/callback.h (+9/-0)
src/ioc/db/dbIocRegister.c (+24/-0)
src/ioc/db/dbScan.c (+37/-0)
src/ioc/db/dbScan.h (+9/-0)
src/libCom/ring/epicsRingBytes.c (+28/-3)
src/libCom/ring/epicsRingBytes.h (+3/-0)
src/libCom/ring/epicsRingPointer.cpp (+12/-0)
src/libCom/ring/epicsRingPointer.h (+34/-3)
src/libCom/test/ringBytesTest.c (+21/-13)
src/libCom/test/ringPointerTest.c (+8/-1)
Reviewer Review Type Date Requested Status
Andrew Johnson Approve
mdavidsaver Needs Fixing
Ralph Lange Pending
Review via email: mp+352918@code.launchpad.net

Description of the change

Allow scanOnce and callback queue utilization to be monitored from the IOC shell and iocStats. Fixes lp:1786540

To post a comment you must log in.
Revision history for this message
Ralph Lange (ralph-lange) wrote :

I like the idea of adding queue health status.

Number of overruns is good and useful data.

Given the common usage pattern of those queues (a large number of entries is put on the queue by a driver or at IOC init for PINI processing, then a number of threads are working on taking items off) I think that an iocStats or manual check of the current usage does not tell much as it most probably misses the moment of high usage.
A simple high water mark mechanism might generate data that is a lot more useful.

I would also consider resetting the stats at every read of their values for easy one-record reporting by iocStats. (Cumulative values are not nice in the archiver.)

Revision history for this message
Andrew Johnson (anj) wrote :

Core Group review at ESS: Please implement Ralph's high water mark (which needs implementing inside the queueing code) and use epicsAtomic types to protect access to the usage data.

review: Needs Fixing
Revision history for this message
Martin Konrad (info-martin-konrad) wrote :

I have implemented the high-water mark Ralph suggested. This is in fact very useful on small IOCs which are booting fast. The one I'm mainly testing with is seeing very heavy load after IOC start and I can watch the queue usage drop slowly in the first minutes after IOC boot (so in some cases the PVs showing the current queue usage also make sense).

I'm now also leveraging epicsAtomic to make my code thread safe.

I understand Ralph's concerns about cumulative values. However, I'm a little hesitant to reset the stats at every read since we offer read out from both the IOC shell and via iocStats. If a read by iocStats resets the number of queue overruns once a second chances are I'll never see anything on the IOC shell...

Revision history for this message
mdavidsaver (mdavidsaver) wrote :

Looking at this again, I find myself wondering about adding so many new public functions. 4 query functions and 1 printer, repeated 3 times. This in addition to the ring buffer feature addition. I'd rather see the 4x query functions combined into one which accepts a struct. Something like:

typedef struct QueueStats {
    size_t size;
    size_t numUsed[NUM_CALLBACK_PRIORITIES];
    size_t maxUsed[NUM_CALLBACK_PRIORITIES];
    size_t numOverflow[NUM_CALLBACK_PRIORITIES];
} QueueStats;

void queryQueue(int reset, QueueStates* result);

I think this would be a better match for the (only) prospective outside user, devIocStats.

The printer functions are accessible through iocsh, so it doesn't seem like they need to be public.

Also, I like the idea of combining fetch with (conditional) reset to zero. I've seen and used this idea in the past, and found it a good fit for how I ended up access these stats.

I'm not going to push for this too much if there is a desire to see this in 3.16.2.

At minimum though, the idea of const-ness should apply to the C API as well as C++. eg. epicsRingPointerGetHighWaterMark() and epicsRingBytesHighWaterMark() should take a pointer to const.

review: Needs Fixing
Revision history for this message
Martin Konrad (info-martin-konrad) wrote :

Thanks for the feedback. I reworked this to include Michael's comments. I have also updated my IOCStats PR. You might want to look at my code again - this ended up to be quite a few changes.

Revision history for this message
Andrew Johnson (anj) wrote :

Hi Martin, if you committed any changes I think you forgot to push them to launchpad, this merge request still only shows your original code. There may still be a chance to get it into the final 3.16.2 release.

Revision history for this message
Martin Konrad (info-martin-konrad) wrote :

Andrew, I double checked, this is my latest code. It has the const-ness change, the reset feature and it has been modified to pull all values at the same time. Note that I have squashed my commits together to avoid cluttering the history.

Revision history for this message
Andrew Johnson (anj) wrote :

@mdavidsaver Please re-review this merge, should it go into 3.16.2?

Revision history for this message
Andrew Johnson (anj) wrote :

Some older C compilers (MSVC, older GCC) will fail while building this code, there are several places where it declares variables after the first statement in a block. It's fine to do that in C++ code, but still not in our C code yet.

I don't understand the use of epicsAtomic functions for accessing callbackQueueSize. The queue size must be set before callbackIsInit and can't be changed after that (unless we go through callbackCleanup() but that clears callbackIsInit anyway and if callbackQueueStatus() is called then it returns an error). Did I miss something?

review: Needs Fixing
Revision history for this message
Martin Konrad (info-martin-konrad) wrote :

I tweaked the code to make pre-C99 compilers happy.

I have also removed the unneeded epicsAtomic from the callbackQueueSize variable.

Let me know if you want me to squash the commits.

Revision history for this message
Andrew Johnson (anj) wrote :

Thanks for those changes. Once you have created a Merge Request I would ask you to *never* squash commits, as doing that prevents us from seeing what additional changes you have made since we last looked at the proposal (hence my confusion on 2018-10-30). If you really want to hide the evidence of your having taken a U-turn from the final history, resubmitting it as a new Merge Request at the end of the review (or after a major rewrite) is probably the best way to handle that.

However I'm still unsure about accepting this; see my code-specific comments below. Further review from the other core developers is still welcome.

I also think these changes deserve some test code; we have test programs in src/libCom/test for both the bytes and pointer versions of the ring-buffer that could be extended, although the bytes version is single-threaded and just tests basic functionality — no inter-thread issues or tests for a spin-locked ring buffer.

Revision history for this message
Martin Konrad (info-martin-konrad) wrote :

I'm glad you asked me to write tests - that helped to I catch an issue in the code...

Do we need multi-threaded tests for this feature?

Revision history for this message
Andrew Johnson (anj) :
Revision history for this message
Martin Konrad (info-martin-konrad) wrote :

After our discussion I switched from atomics to using the existing spin lock. This should make the code a little bit easier to understand.

Revision history for this message
Andrew Johnson (anj) wrote :

Merging this, but one change I've made is to rename your QueuePrintStatus() routines and the QueueStatus iocsh commands (and their associated routines and data structures) to QueueShow for consistency with other subsystems. We normally use a Show suffix for routines that print stuff to stdout, and Status for querying whether a subsystem is happy, so your QueueStatus() API routines are fine. This is also necessary to maintain command-line compatibility with the VxWorks shell.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c
index ae07414..1b6c249 100644
--- a/src/ioc/db/callback.c
+++ b/src/ioc/db/callback.c
@@ -54,6 +54,7 @@ typedef struct cbQueueSet {
54 epicsEventId semWakeUp;54 epicsEventId semWakeUp;
55 epicsRingPointerId queue;55 epicsRingPointerId queue;
56 int queueOverflow;56 int queueOverflow;
57 int queueOverflows;
57 int shutdown;58 int shutdown;
58 int threadsConfigured;59 int threadsConfigured;
59 int threadsRunning;60 int threadsRunning;
@@ -103,6 +104,51 @@ int callbackSetQueueSize(int size)
103 return 0;104 return 0;
104}105}
105106
107int callbackQueueStatus(const int reset, callbackQueueStats *result)
108{
109 int ret;
110 if (!callbackIsInit) return -1;
111 if (result) {
112 int prio;
113 result->size = callbackQueueSize;
114 for(prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) {
115 epicsRingPointerId qId = callbackQueue[prio].queue;
116 result->numUsed[prio] = epicsRingPointerGetUsed(qId);
117 result->maxUsed[prio] = epicsRingPointerGetHighWaterMark(qId);
118 result->numOverflow[prio] = epicsAtomicGetIntT(&callbackQueue[prio].queueOverflows);
119 }
120 ret = 0;
121 } else {
122 ret = -2;
123 }
124 if (reset) {
125 int prio;
126 for(prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) {
127 epicsRingPointerResetHighWaterMark(callbackQueue[prio].queue);
128 }
129 }
130 return ret;
131}
132
133void callbackQueuePrintStatus(const int reset)
134{
135 callbackQueueStats stats;
136 if (callbackQueueStatus(reset, &stats) == -1) {
137 fprintf(stderr, "Callback system not initialized, yet. Please run "
138 "iocInit before using this command.\n");
139 } else {
140 int prio;
141 printf("PRIORITY HIGH-WATER MARK ITEMS IN Q Q SIZE %% USED Q OVERFLOWS\n");
142 for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) {
143 double qusage = 100.0 * stats.numUsed[prio] / stats.size;
144 printf("%8s %15d %10d %6d %6.1f %11d\n",
145 threadNamePrefix[prio], stats.maxUsed[prio],
146 stats.numUsed[prio], stats.size, qusage,
147 stats.numOverflow[prio]);
148 }
149 }
150}
151
106int callbackParallelThreads(int count, const char *prio)152int callbackParallelThreads(int count, const char *prio)
107{153{
108 if (callbackIsInit) {154 if (callbackIsInit) {
@@ -290,6 +336,7 @@ int callbackRequest(CALLBACK *pcallback)
290 if (!pushOK) {336 if (!pushOK) {
291 epicsInterruptContextMessage(fullMessage[priority]);337 epicsInterruptContextMessage(fullMessage[priority]);
292 mySet->queueOverflow = TRUE;338 mySet->queueOverflow = TRUE;
339 epicsAtomicIncrIntT(&mySet->queueOverflows);
293 return S_db_bufFull;340 return S_db_bufFull;
294 }341 }
295 epicsEventSignal(mySet->semWakeUp);342 epicsEventSignal(mySet->semWakeUp);
diff --git a/src/ioc/db/callback.h b/src/ioc/db/callback.h
index fa626d1..dd13cdb 100644
--- a/src/ioc/db/callback.h
+++ b/src/ioc/db/callback.h
@@ -48,6 +48,13 @@ typedef epicsCallback CALLBACK;
4848
49typedef void (*CALLBACKFUNC)(struct callbackPvt*);49typedef void (*CALLBACKFUNC)(struct callbackPvt*);
5050
51typedef struct callbackQueueStats {
52 int size;
53 int numUsed[NUM_CALLBACK_PRIORITIES];
54 int maxUsed[NUM_CALLBACK_PRIORITIES];
55 int numOverflow[NUM_CALLBACK_PRIORITIES];
56} callbackQueueStats;
57
51#define callbackSetCallback(PFUN, PCALLBACK) \58#define callbackSetCallback(PFUN, PCALLBACK) \
52 ( (PCALLBACK)->callback = (PFUN) )59 ( (PCALLBACK)->callback = (PFUN) )
53#define callbackSetPriority(PRIORITY, PCALLBACK) \60#define callbackSetPriority(PRIORITY, PCALLBACK) \
@@ -73,6 +80,8 @@ epicsShareFunc void callbackCancelDelayed(CALLBACK *pcallback);
73epicsShareFunc void callbackRequestProcessCallbackDelayed(80epicsShareFunc void callbackRequestProcessCallbackDelayed(
74 CALLBACK *pCallback, int Priority, void *pRec, double seconds);81 CALLBACK *pCallback, int Priority, void *pRec, double seconds);
75epicsShareFunc int callbackSetQueueSize(int size);82epicsShareFunc int callbackSetQueueSize(int size);
83epicsShareFunc int callbackQueueStatus(const int reset, callbackQueueStats *result);
84void callbackQueuePrintStatus(const int reset);
76epicsShareFunc int callbackParallelThreads(int count, const char *prio);85epicsShareFunc int callbackParallelThreads(int count, const char *prio);
7786
78#ifdef __cplusplus87#ifdef __cplusplus
diff --git a/src/ioc/db/dbIocRegister.c b/src/ioc/db/dbIocRegister.c
index 4d0b88c..cecf756 100644
--- a/src/ioc/db/dbIocRegister.c
+++ b/src/ioc/db/dbIocRegister.c
@@ -296,6 +296,17 @@ static void scanOnceSetQueueSizeCallFunc(const iocshArgBuf *args)
296 scanOnceSetQueueSize(args[0].ival);296 scanOnceSetQueueSize(args[0].ival);
297}297}
298298
299/* scanOnceQueueStatus */
300static const iocshArg scanOnceQueueStatusArg0 = { "reset",iocshArgInt};
301static const iocshArg * const scanOnceQueueStatusArgs[1] =
302 {&scanOnceQueueStatusArg0};
303static const iocshFuncDef scanOnceQueueStatusFuncDef =
304 {"scanOnceQueueStatus",1,scanOnceQueueStatusArgs};
305static void scanOnceQueueStatusCallFunc(const iocshArgBuf *args)
306{
307 scanOnceQueuePrintStatus(args[0].ival);
308}
309
299/* scanppl */310/* scanppl */
300static const iocshArg scanpplArg0 = { "rate",iocshArgDouble};311static const iocshArg scanpplArg0 = { "rate",iocshArgDouble};
301static const iocshArg * const scanpplArgs[1] = {&scanpplArg0};312static const iocshArg * const scanpplArgs[1] = {&scanpplArg0};
@@ -335,6 +346,17 @@ static void callbackSetQueueSizeCallFunc(const iocshArgBuf *args)
335 callbackSetQueueSize(args[0].ival);346 callbackSetQueueSize(args[0].ival);
336}347}
337348
349/* callbackQueueStatus */
350static const iocshArg callbackQueueStatusArg0 = { "reset", iocshArgInt};
351static const iocshArg * const callbackQueueStatusArgs[1] =
352 {&callbackQueueStatusArg0};
353static const iocshFuncDef callbackQueueStatusFuncDef =
354 {"callbackQueueStatus",1,callbackQueueStatusArgs};
355static void callbackQueueStatusCallFunc(const iocshArgBuf *args)
356{
357 callbackQueuePrintStatus(args[0].ival);
358}
359
338/* callbackParallelThreads */360/* callbackParallelThreads */
339static const iocshArg callbackParallelThreadsArg0 = { "no of threads", iocshArgInt};361static const iocshArg callbackParallelThreadsArg0 = { "no of threads", iocshArgInt};
340static const iocshArg callbackParallelThreadsArg1 = { "priority", iocshArgString};362static const iocshArg callbackParallelThreadsArg1 = { "priority", iocshArgString};
@@ -441,12 +463,14 @@ void dbIocRegister(void)
441 iocshRegister(&dbLockShowLockedFuncDef,dbLockShowLockedCallFunc);463 iocshRegister(&dbLockShowLockedFuncDef,dbLockShowLockedCallFunc);
442464
443 iocshRegister(&scanOnceSetQueueSizeFuncDef,scanOnceSetQueueSizeCallFunc);465 iocshRegister(&scanOnceSetQueueSizeFuncDef,scanOnceSetQueueSizeCallFunc);
466 iocshRegister(&scanOnceQueueStatusFuncDef,scanOnceQueueStatusCallFunc);
444 iocshRegister(&scanpplFuncDef,scanpplCallFunc);467 iocshRegister(&scanpplFuncDef,scanpplCallFunc);
445 iocshRegister(&scanpelFuncDef,scanpelCallFunc);468 iocshRegister(&scanpelFuncDef,scanpelCallFunc);
446 iocshRegister(&postEventFuncDef,postEventCallFunc);469 iocshRegister(&postEventFuncDef,postEventCallFunc);
447 iocshRegister(&scanpiolFuncDef,scanpiolCallFunc);470 iocshRegister(&scanpiolFuncDef,scanpiolCallFunc);
448471
449 iocshRegister(&callbackSetQueueSizeFuncDef,callbackSetQueueSizeCallFunc);472 iocshRegister(&callbackSetQueueSizeFuncDef,callbackSetQueueSizeCallFunc);
473 iocshRegister(&callbackQueueStatusFuncDef,callbackQueueStatusCallFunc);
450 iocshRegister(&callbackParallelThreadsFuncDef,callbackParallelThreadsCallFunc);474 iocshRegister(&callbackParallelThreadsFuncDef,callbackParallelThreadsCallFunc);
451475
452 /* Needed before callback system is initialized */476 /* Needed before callback system is initialized */
diff --git a/src/ioc/db/dbScan.c b/src/ioc/db/dbScan.c
index e5c78fe..e0e14e0 100644
--- a/src/ioc/db/dbScan.c
+++ b/src/ioc/db/dbScan.c
@@ -24,6 +24,7 @@
24#include "cantProceed.h"24#include "cantProceed.h"
25#include "dbDefs.h"25#include "dbDefs.h"
26#include "ellLib.h"26#include "ellLib.h"
27#include "epicsAtomic.h"
27#include "epicsEvent.h"28#include "epicsEvent.h"
28#include "epicsMutex.h"29#include "epicsMutex.h"
29#include "epicsPrint.h"30#include "epicsPrint.h"
@@ -63,6 +64,7 @@ static volatile enum ctl scanCtl;
63static int onceQueueSize = 1000;64static int onceQueueSize = 1000;
64static epicsEventId onceSem;65static epicsEventId onceSem;
65static epicsRingBytesId onceQ;66static epicsRingBytesId onceQ;
67static int onceQOverruns = 0;
66static epicsThreadId onceTaskId;68static epicsThreadId onceTaskId;
67static void *exitOnce;69static void *exitOnce;
6870
@@ -676,6 +678,7 @@ int scanOnceCallback(struct dbCommon *precord, once_complete cb, void *usr)
676 if (!pushOK) {678 if (!pushOK) {
677 if (newOverflow) errlogPrintf("scanOnce: Ring buffer overflow\n");679 if (newOverflow) errlogPrintf("scanOnce: Ring buffer overflow\n");
678 newOverflow = FALSE;680 newOverflow = FALSE;
681 epicsAtomicIncrIntT(&onceQOverruns);
679 } else {682 } else {
680 newOverflow = TRUE;683 newOverflow = TRUE;
681 }684 }
@@ -722,6 +725,40 @@ int scanOnceSetQueueSize(int size)
722 return 0;725 return 0;
723}726}
724727
728int scanOnceQueueStatus(const int reset, scanOnceQueueStats *result)
729{
730 int ret;
731 if (!onceQ) return -1;
732 if (result) {
733 result->size = epicsRingBytesSize(onceQ) / sizeof(onceEntry);
734 result->numUsed = epicsRingBytesUsedBytes(onceQ) / sizeof(onceEntry);
735 result->maxUsed = epicsRingBytesHighWaterMark(onceQ) / sizeof(onceEntry);
736 result->numOverflow = epicsAtomicGetIntT(&onceQOverruns);
737 ret = 0;
738 } else {
739 ret = -2;
740 }
741 if (reset) {
742 epicsRingBytesResetHighWaterMark(onceQ);
743 }
744 return ret;
745}
746
747void scanOnceQueuePrintStatus(const int reset)
748{
749 scanOnceQueueStats stats;
750 if (scanOnceQueueStatus(reset, &stats) == -1) {
751 fprintf(stderr, "scanOnce system not initialized, yet. Please run "
752 "iocInit before using this command.\n");
753 } else {
754 double qusage = 100.0 * stats.numUsed / stats.size;
755 printf("PRIORITY HIGH-WATER MARK ITEMS IN Q Q SIZE %% USED Q OVERFLOWS\n");
756 printf("%8s %15d %10d %6d %6.1f %11d\n", "scanOnce", stats.maxUsed,
757 stats.numUsed, stats.size, qusage,
758 epicsAtomicGetIntT(&onceQOverruns));
759 }
760}
761
725static void initOnce(void)762static void initOnce(void)
726{763{
727 if ((onceQ = epicsRingBytesLockedCreate(sizeof(onceEntry)*onceQueueSize)) == NULL) {764 if ((onceQ = epicsRingBytesLockedCreate(sizeof(onceEntry)*onceQueueSize)) == NULL) {
diff --git a/src/ioc/db/dbScan.h b/src/ioc/db/dbScan.h
index d483a0c..830d3a8 100644
--- a/src/ioc/db/dbScan.h
+++ b/src/ioc/db/dbScan.h
@@ -42,6 +42,13 @@ struct dbCommon;
42typedef void (*io_scan_complete)(void *usr, IOSCANPVT, int prio);42typedef void (*io_scan_complete)(void *usr, IOSCANPVT, int prio);
43typedef void (*once_complete)(void *usr, struct dbCommon*);43typedef void (*once_complete)(void *usr, struct dbCommon*);
4444
45typedef struct scanOnceQueueStats {
46 int size;
47 int numUsed;
48 int maxUsed;
49 int numOverflow;
50} scanOnceQueueStats;
51
45epicsShareFunc long scanInit(void);52epicsShareFunc long scanInit(void);
46epicsShareFunc void scanRun(void);53epicsShareFunc void scanRun(void);
47epicsShareFunc void scanPause(void);54epicsShareFunc void scanPause(void);
@@ -57,6 +64,8 @@ epicsShareFunc double scanPeriod(int scan);
57epicsShareFunc int scanOnce(struct dbCommon *);64epicsShareFunc int scanOnce(struct dbCommon *);
58epicsShareFunc int scanOnceCallback(struct dbCommon *, once_complete cb, void *usr);65epicsShareFunc int scanOnceCallback(struct dbCommon *, once_complete cb, void *usr);
59epicsShareFunc int scanOnceSetQueueSize(int size);66epicsShareFunc int scanOnceSetQueueSize(int size);
67epicsShareFunc int scanOnceQueueStatus(const int reset, scanOnceQueueStats *result);
68void scanOnceQueuePrintStatus(const int reset);
6069
61/*print periodic lists*/70/*print periodic lists*/
62epicsShareFunc int scanppl(double rate);71epicsShareFunc int scanppl(double rate);
diff --git a/src/libCom/ring/epicsRingBytes.c b/src/libCom/ring/epicsRingBytes.c
index cb7e52e..ab048e4 100644
--- a/src/libCom/ring/epicsRingBytes.c
+++ b/src/libCom/ring/epicsRingBytes.c
@@ -38,6 +38,7 @@ typedef struct ringPvt {
38 volatile int nextPut;38 volatile int nextPut;
39 volatile int nextGet;39 volatile int nextGet;
40 int size;40 int size;
41 int highWaterMark;
41 volatile char buffer[1]; /* actually larger */42 volatile char buffer[1]; /* actually larger */
42}ringPvt;43}ringPvt;
4344
@@ -47,6 +48,7 @@ epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int size)
47 if(!pring)48 if(!pring)
48 return NULL;49 return NULL;
49 pring->size = size + SLOP;50 pring->size = size + SLOP;
51 pring->highWaterMark = 0;
50 pring->nextGet = 0;52 pring->nextGet = 0;
51 pring->nextPut = 0;53 pring->nextPut = 0;
52 pring->lock = 0;54 pring->lock = 0;
@@ -118,7 +120,7 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut(
118{120{
119 ringPvt *pring = (ringPvt *)id;121 ringPvt *pring = (ringPvt *)id;
120 int nextGet, nextPut, size;122 int nextGet, nextPut, size;
121 int freeCount, copyCount, topCount;123 int freeCount, copyCount, topCount, used;
122124
123 if (pring->lock) epicsSpinLock(pring->lock);125 if (pring->lock) epicsSpinLock(pring->lock);
124 nextGet = pring->nextGet;126 nextGet = pring->nextGet;
@@ -131,8 +133,9 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut(
131 if (pring->lock) epicsSpinUnlock(pring->lock);133 if (pring->lock) epicsSpinUnlock(pring->lock);
132 return 0;134 return 0;
133 }135 }
134 if (nbytes)136 if (nbytes) {
135 memcpy ((void *)&pring->buffer[nextPut], value, nbytes);137 memcpy ((void *)&pring->buffer[nextPut], value, nbytes);
138 }
136 nextPut += nbytes;139 nextPut += nbytes;
137 }140 }
138 else {141 else {
@@ -143,8 +146,9 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut(
143 }146 }
144 topCount = size - nextPut;147 topCount = size - nextPut;
145 copyCount = (nbytes > topCount) ? topCount : nbytes;148 copyCount = (nbytes > topCount) ? topCount : nbytes;
146 if (copyCount)149 if (copyCount) {
147 memcpy ((void *)&pring->buffer[nextPut], value, copyCount);150 memcpy ((void *)&pring->buffer[nextPut], value, copyCount);
151 }
148 nextPut += copyCount;152 nextPut += copyCount;
149 if (nextPut == size) {153 if (nextPut == size) {
150 int nLeft = nbytes - copyCount;154 int nLeft = nbytes - copyCount;
@@ -155,6 +159,10 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut(
155 }159 }
156 pring->nextPut = nextPut;160 pring->nextPut = nextPut;
157161
162 used = nextPut - nextGet;
163 if (used < 0) used += pring->size;
164 if (used > pring->highWaterMark) pring->highWaterMark = used;
165
158 if (pring->lock) epicsSpinUnlock(pring->lock);166 if (pring->lock) epicsSpinUnlock(pring->lock);
159 return nbytes;167 return nbytes;
160}168}
@@ -224,3 +232,20 @@ epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id)
224{232{
225 return (epicsRingBytesFreeBytes(id) <= 0);233 return (epicsRingBytesFreeBytes(id) <= 0);
226}234}
235
236epicsShareFunc int epicsShareAPI epicsRingBytesHighWaterMark(epicsRingBytesIdConst id)
237{
238 ringPvt *pring = (ringPvt *)id;
239 return pring->highWaterMark;
240}
241
242epicsShareFunc void epicsShareAPI epicsRingBytesResetHighWaterMark(epicsRingBytesId id)
243{
244 ringPvt *pring = (ringPvt *)id;
245 int used;
246 if (pring->lock) epicsSpinLock(pring->lock);
247 used = pring->nextGet - pring->nextPut;
248 if (used < 0) used += pring->size;
249 pring->highWaterMark = used;
250 if (pring->lock) epicsSpinUnlock(pring->lock);
251}
diff --git a/src/libCom/ring/epicsRingBytes.h b/src/libCom/ring/epicsRingBytes.h
index 011829b..3dc0081 100644
--- a/src/libCom/ring/epicsRingBytes.h
+++ b/src/libCom/ring/epicsRingBytes.h
@@ -24,6 +24,7 @@ extern "C" {
24#include "shareLib.h"24#include "shareLib.h"
2525
26typedef void *epicsRingBytesId;26typedef void *epicsRingBytesId;
27typedef void const *epicsRingBytesIdConst;
2728
28epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int nbytes);29epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int nbytes);
29/* Same, but secured by a spinlock */30/* Same, but secured by a spinlock */
@@ -39,6 +40,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesUsedBytes(epicsRingBytesId id);
39epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id);40epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id);
40epicsShareFunc int epicsShareAPI epicsRingBytesIsEmpty(epicsRingBytesId id);41epicsShareFunc int epicsShareAPI epicsRingBytesIsEmpty(epicsRingBytesId id);
41epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id);42epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id);
43epicsShareFunc int epicsShareAPI epicsRingBytesHighWaterMark(epicsRingBytesIdConst id);
44epicsShareFunc void epicsShareAPI epicsRingBytesResetHighWaterMark(epicsRingBytesId id);
4245
43#ifdef __cplusplus46#ifdef __cplusplus
44}47}
diff --git a/src/libCom/ring/epicsRingPointer.cpp b/src/libCom/ring/epicsRingPointer.cpp
index 9c144ce..709ab65 100644
--- a/src/libCom/ring/epicsRingPointer.cpp
+++ b/src/libCom/ring/epicsRingPointer.cpp
@@ -90,3 +90,15 @@ epicsShareFunc int epicsShareAPI epicsRingPointerIsFull(epicsRingPointerId id)
90 voidPointer *pvoidPointer = reinterpret_cast<voidPointer*>(id);90 voidPointer *pvoidPointer = reinterpret_cast<voidPointer*>(id);
91 return((pvoidPointer->isFull()) ? 1 : 0);91 return((pvoidPointer->isFull()) ? 1 : 0);
92}92}
93
94epicsShareFunc int epicsShareAPI epicsRingPointerGetHighWaterMark(epicsRingPointerIdConst id)
95{
96 voidPointer const *pvoidPointer = reinterpret_cast<voidPointer const*>(id);
97 return(pvoidPointer->getHighWaterMark());
98}
99
100epicsShareFunc void epicsShareAPI epicsRingPointerResetHighWaterMark(epicsRingPointerId id)
101{
102 voidPointer *pvoidPointer = reinterpret_cast<voidPointer*>(id);
103 pvoidPointer->resetHighWaterMark();
104}
diff --git a/src/libCom/ring/epicsRingPointer.h b/src/libCom/ring/epicsRingPointer.h
index 48d6203..68bf8f5 100644
--- a/src/libCom/ring/epicsRingPointer.h
+++ b/src/libCom/ring/epicsRingPointer.h
@@ -40,18 +40,22 @@ public: /* Functions */
40 int getSize() const;40 int getSize() const;
41 bool isEmpty() const;41 bool isEmpty() const;
42 bool isFull() const;42 bool isFull() const;
43 int getHighWaterMark() const;
44 void resetHighWaterMark();
4345
44private: /* Prevent compiler-generated member functions */46private: /* Prevent compiler-generated member functions */
45 /* default constructor, copy constructor, assignment operator */47 /* default constructor, copy constructor, assignment operator */
46 epicsRingPointer();48 epicsRingPointer();
47 epicsRingPointer(const epicsRingPointer &);49 epicsRingPointer(const epicsRingPointer &);
48 epicsRingPointer& operator=(const epicsRingPointer &);50 epicsRingPointer& operator=(const epicsRingPointer &);
51 int getUsedNoLock() const;
4952
50private: /* Data */53private: /* Data */
51 epicsSpinId lock;54 epicsSpinId lock;
52 volatile int nextPush;55 volatile int nextPush;
53 volatile int nextPop;56 volatile int nextPop;
54 int size;57 int size;
58 int highWaterMark;
55 T * volatile * buffer;59 T * volatile * buffer;
56};60};
5761
@@ -59,6 +63,7 @@ extern "C" {
59#endif /*__cplusplus */63#endif /*__cplusplus */
6064
61typedef void *epicsRingPointerId;65typedef void *epicsRingPointerId;
66typedef void const *epicsRingPointerIdConst;
6267
63epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size);68epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size);
64/* Same, but secured by a spinlock */69/* Same, but secured by a spinlock */
@@ -74,6 +79,8 @@ epicsShareFunc int epicsShareAPI epicsRingPointerGetUsed(epicsRingPointerId id)
74epicsShareFunc int epicsShareAPI epicsRingPointerGetSize(epicsRingPointerId id);79epicsShareFunc int epicsShareAPI epicsRingPointerGetSize(epicsRingPointerId id);
75epicsShareFunc int epicsShareAPI epicsRingPointerIsEmpty(epicsRingPointerId id);80epicsShareFunc int epicsShareAPI epicsRingPointerIsEmpty(epicsRingPointerId id);
76epicsShareFunc int epicsShareAPI epicsRingPointerIsFull(epicsRingPointerId id);81epicsShareFunc int epicsShareAPI epicsRingPointerIsFull(epicsRingPointerId id);
82epicsShareFunc int epicsShareAPI epicsRingPointerGetHighWaterMark(epicsRingPointerIdConst id);
83epicsShareFunc void epicsShareAPI epicsRingPointerResetHighWaterMark(epicsRingPointerId id);
7784
78/* This routine was incorrectly named in previous releases */85/* This routine was incorrectly named in previous releases */
79#define epicsRingPointerSize epicsRingPointerGetSize86#define epicsRingPointerSize epicsRingPointerGetSize
@@ -95,7 +102,8 @@ epicsShareFunc int epicsShareAPI epicsRingPointerIsFull(epicsRingPointerId id);
95102
96template <class T>103template <class T>
97inline epicsRingPointer<T>::epicsRingPointer(int sz, bool locked) :104inline epicsRingPointer<T>::epicsRingPointer(int sz, bool locked) :
98 lock(0), nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1])105 lock(0), nextPush(0), nextPop(0), size(sz+1), highWaterMark(0),
106 buffer(new T* [sz+1])
99{107{
100 if (locked)108 if (locked)
101 lock = epicsSpinCreate();109 lock = epicsSpinCreate();
@@ -121,6 +129,8 @@ inline bool epicsRingPointer<T>::push(T *p)
121 }129 }
122 buffer[next] = p;130 buffer[next] = p;
123 nextPush = newNext;131 nextPush = newNext;
132 int used = getUsedNoLock();
133 if (used > highWaterMark) highWaterMark = used;
124 if (lock) epicsSpinUnlock(lock);134 if (lock) epicsSpinUnlock(lock);
125 return(true);135 return(true);
126}136}
@@ -162,11 +172,18 @@ inline int epicsRingPointer<T>::getFree() const
162}172}
163173
164template <class T>174template <class T>
165inline int epicsRingPointer<T>::getUsed() const175inline int epicsRingPointer<T>::getUsedNoLock() const
166{176{
167 if (lock) epicsSpinLock(lock);
168 int n = nextPush - nextPop;177 int n = nextPush - nextPop;
169 if (n < 0) n += size;178 if (n < 0) n += size;
179 return n;
180}
181
182template <class T>
183inline int epicsRingPointer<T>::getUsed() const
184{
185 if (lock) epicsSpinLock(lock);
186 int n = getUsedNoLock();
170 if (lock) epicsSpinUnlock(lock);187 if (lock) epicsSpinUnlock(lock);
171 return n;188 return n;
172}189}
@@ -196,6 +213,20 @@ inline bool epicsRingPointer<T>::isFull() const
196 return((count == 0) || (count == size));213 return((count == 0) || (count == size));
197}214}
198215
216template <class T>
217inline int epicsRingPointer<T>::getHighWaterMark() const
218{
219 return highWaterMark;
220}
221
222template <class T>
223inline void epicsRingPointer<T>::resetHighWaterMark()
224{
225 if (lock) epicsSpinLock(lock);
226 highWaterMark = getUsedNoLock();
227 if (lock) epicsSpinUnlock(lock);
228}
229
199#endif /* __cplusplus */230#endif /* __cplusplus */
200231
201#endif /* INCepicsRingPointerh */232#endif /* INCepicsRingPointerh */
diff --git a/src/libCom/test/ringBytesTest.c b/src/libCom/test/ringBytesTest.c
index 6cef933..bb91d02 100644
--- a/src/libCom/test/ringBytesTest.c
+++ b/src/libCom/test/ringBytesTest.c
@@ -30,7 +30,8 @@ typedef struct info {
30 epicsRingBytesId ring;30 epicsRingBytesId ring;
31}info;31}info;
3232
33static void check(epicsRingBytesId ring, int expectedFree)33static void check(epicsRingBytesId ring, int expectedFree,
34 int expectedHighWaterMark)
34{35{
35 int expectedUsed = RINGSIZE - expectedFree;36 int expectedUsed = RINGSIZE - expectedFree;
36 int expectedEmpty = (expectedUsed == 0);37 int expectedEmpty = (expectedUsed == 0);
@@ -39,11 +40,14 @@ static void check(epicsRingBytesId ring, int expectedFree)
39 int nUsed = epicsRingBytesUsedBytes(ring);40 int nUsed = epicsRingBytesUsedBytes(ring);
40 int isEmpty = epicsRingBytesIsEmpty(ring);41 int isEmpty = epicsRingBytesIsEmpty(ring);
41 int isFull = epicsRingBytesIsFull(ring);42 int isFull = epicsRingBytesIsFull(ring);
43 int highWaterMark = epicsRingBytesHighWaterMark(ring);
42 44
43 testOk(nFree == expectedFree, "Free: %d == %d", nFree, expectedFree);45 testOk(nFree == expectedFree, "Free: %d == %d", nFree, expectedFree);
44 testOk(nUsed == expectedUsed, "Used: %d == %d", nUsed, expectedUsed);46 testOk(nUsed == expectedUsed, "Used: %d == %d", nUsed, expectedUsed);
45 testOk(isEmpty == expectedEmpty, "Empty: %d == %d", isEmpty, expectedEmpty);47 testOk(isEmpty == expectedEmpty, "Empty: %d == %d", isEmpty, expectedEmpty);
46 testOk(isFull == expectedFull, "Full: %d == %d", isFull, expectedFull);48 testOk(isFull == expectedFull, "Full: %d == %d", isFull, expectedFull);
49 testOk(highWaterMark == expectedHighWaterMark, "HighWaterMark: %d == %d",
50 highWaterMark, expectedHighWaterMark);
47}51}
48 52
49MAIN(ringBytesTest)53MAIN(ringBytesTest)
@@ -55,7 +59,7 @@ MAIN(ringBytesTest)
55 char get[RINGSIZE+1];59 char get[RINGSIZE+1];
56 epicsRingBytesId ring;60 epicsRingBytesId ring;
5761
58 testPlan(245);62 testPlan(292);
5963
60 pinfo = calloc(1,sizeof(info));64 pinfo = calloc(1,sizeof(info));
61 if (!pinfo) {65 if (!pinfo) {
@@ -70,50 +74,54 @@ MAIN(ringBytesTest)
70 if (!ring) {74 if (!ring) {
71 testAbort("epicsRingBytesCreate failed");75 testAbort("epicsRingBytesCreate failed");
72 }76 }
73 check(ring, RINGSIZE);77 check(ring, RINGSIZE, 0);
7478
75 for (i = 0 ; i < sizeof(put) ; i++)79 for (i = 0 ; i < sizeof(put) ; i++)
76 put[i] = i;80 put[i] = i;
77 for(i = 0 ; i < RINGSIZE ; i++) {81 for(i = 0 ; i < RINGSIZE ; i++) {
78 n = epicsRingBytesPut(ring, put, i);82 n = epicsRingBytesPut(ring, put, i);
79 testOk(n==i, "ring put %d", i);83 testOk(n==i, "ring put %d", i);
80 check(ring, RINGSIZE-i);84 check(ring, RINGSIZE-i, i);
81 n = epicsRingBytesGet(ring, get, i);85 n = epicsRingBytesGet(ring, get, i);
82 testOk(n==i, "ring get %d", i);86 testOk(n==i, "ring get %d", i);
83 check(ring, RINGSIZE);87 check(ring, RINGSIZE, i);
84 testOk(memcmp(put,get,i)==0, "get matches write");88 testOk(memcmp(put,get,i)==0, "get matches write");
85 }89 }
8690
91 epicsRingBytesResetHighWaterMark(ring);
92
87 for(i = 0 ; i < RINGSIZE ; i++) {93 for(i = 0 ; i < RINGSIZE ; i++) {
88 n = epicsRingBytesPut(ring, put+i, 1);94 n = epicsRingBytesPut(ring, put+i, 1);
89 testOk(n==1, "ring put 1, %d", i);95 testOk(n==1, "ring put 1, %d", i);
90 check(ring, RINGSIZE-1-i);96 check(ring, RINGSIZE-1-i, i + 1);
91 }97 }
92 n = epicsRingBytesPut(ring, put+RINGSIZE, 1);98 n = epicsRingBytesPut(ring, put+RINGSIZE, 1);
93 testOk(n==0, "put to full ring");99 testOk(n==0, "put to full ring");
94 check(ring, 0);100 check(ring, 0, RINGSIZE);
95 for(i = 0 ; i < RINGSIZE ; i++) {101 for(i = 0 ; i < RINGSIZE ; i++) {
96 n = epicsRingBytesGet(ring, get+i, 1);102 n = epicsRingBytesGet(ring, get+i, 1);
97 testOk(n==1, "ring get 1, %d", i);103 testOk(n==1, "ring get 1, %d", i);
98 check(ring, 1+i);104 check(ring, 1+i, RINGSIZE);
99 }105 }
100 testOk(memcmp(put,get,RINGSIZE)==0, "get matches write");106 testOk(memcmp(put,get,RINGSIZE)==0, "get matches write");
101 n = epicsRingBytesGet(ring, get+RINGSIZE, 1);107 n = epicsRingBytesGet(ring, get+RINGSIZE, 1);
102 testOk(n==0, "get from empty ring");108 testOk(n==0, "get from empty ring");
103 check(ring, RINGSIZE);109 check(ring, RINGSIZE, RINGSIZE);
110
111 epicsRingBytesResetHighWaterMark(ring);
104112
105 n = epicsRingBytesPut(ring, put, RINGSIZE+1);113 n = epicsRingBytesPut(ring, put, RINGSIZE+1);
106 testOk(n==0, "ring put beyond ring capacity (%d, expected 0)",n);114 testOk(n==0, "ring put beyond ring capacity (%d, expected 0)",n);
107 check(ring, RINGSIZE);115 check(ring, RINGSIZE, 0);
108 n = epicsRingBytesPut(ring, put, 1);116 n = epicsRingBytesPut(ring, put, 1);
109 testOk(n==1, "ring put %d", 1);117 testOk(n==1, "ring put %d", 1);
110 check(ring, RINGSIZE-1);118 check(ring, RINGSIZE-1, 1);
111 n = epicsRingBytesPut(ring, put, RINGSIZE);119 n = epicsRingBytesPut(ring, put, RINGSIZE);
112 testOk(n==0, "ring put beyond ring capacity (%d, expected 0)",n);120 testOk(n==0, "ring put beyond ring capacity (%d, expected 0)",n);
113 check(ring, RINGSIZE-1);121 check(ring, RINGSIZE-1, 1);
114 n = epicsRingBytesGet(ring, get, 1);122 n = epicsRingBytesGet(ring, get, 1);
115 testOk(n==1, "ring get %d", 1);123 testOk(n==1, "ring get %d", 1);
116 check(ring, RINGSIZE);124 check(ring, RINGSIZE, 1);
117125
118 epicsRingBytesDelete(ring);126 epicsRingBytesDelete(ring);
119 epicsEventDestroy(consumerEvent);127 epicsEventDestroy(consumerEvent);
diff --git a/src/libCom/test/ringPointerTest.c b/src/libCom/test/ringPointerTest.c
index 65a3494..d351708 100644
--- a/src/libCom/test/ringPointerTest.c
+++ b/src/libCom/test/ringPointerTest.c
@@ -64,6 +64,7 @@ static void testSingle(void)
64 testOk1(epicsRingPointerGetFree(ring)==rsize);64 testOk1(epicsRingPointerGetFree(ring)==rsize);
65 testOk1(epicsRingPointerGetSize(ring)==rsize);65 testOk1(epicsRingPointerGetSize(ring)==rsize);
66 testOk1(epicsRingPointerGetUsed(ring)==0);66 testOk1(epicsRingPointerGetUsed(ring)==0);
67 testOk1(epicsRingPointerGetHighWaterMark(ring)==0);
6768
68 testOk1(epicsRingPointerPop(ring)==NULL);69 testOk1(epicsRingPointerPop(ring)==NULL);
6970
@@ -75,6 +76,10 @@ static void testSingle(void)
75 testOk1(epicsRingPointerGetFree(ring)==rsize-1);76 testOk1(epicsRingPointerGetFree(ring)==rsize-1);
76 testOk1(epicsRingPointerGetSize(ring)==rsize);77 testOk1(epicsRingPointerGetSize(ring)==rsize);
77 testOk1(epicsRingPointerGetUsed(ring)==1);78 testOk1(epicsRingPointerGetUsed(ring)==1);
79 testOk1(epicsRingPointerGetHighWaterMark(ring)==1);
80
81 epicsRingPointerResetHighWaterMark(ring);
82 testOk1(epicsRingPointerGetHighWaterMark(ring)==1);
7883
79 testDiag("Fill it up");84 testDiag("Fill it up");
80 for(i=2; i<2*rsize; i++) {85 for(i=2; i<2*rsize; i++) {
@@ -92,6 +97,7 @@ static void testSingle(void)
92 testOk1(epicsRingPointerGetFree(ring)==0);97 testOk1(epicsRingPointerGetFree(ring)==0);
93 testOk1(epicsRingPointerGetSize(ring)==rsize);98 testOk1(epicsRingPointerGetSize(ring)==rsize);
94 testOk1(epicsRingPointerGetUsed(ring)==rsize);99 testOk1(epicsRingPointerGetUsed(ring)==rsize);
100 testOk1(epicsRingPointerGetHighWaterMark(ring)==rsize);
95101
96 testDiag("Drain it out");102 testDiag("Drain it out");
97 for(i=1; i<2*rsize; i++) {103 for(i=1; i<2*rsize; i++) {
@@ -108,6 +114,7 @@ static void testSingle(void)
108 testOk1(epicsRingPointerGetFree(ring)==rsize);114 testOk1(epicsRingPointerGetFree(ring)==rsize);
109 testOk1(epicsRingPointerGetSize(ring)==rsize);115 testOk1(epicsRingPointerGetSize(ring)==rsize);
110 testOk1(epicsRingPointerGetUsed(ring)==0);116 testOk1(epicsRingPointerGetUsed(ring)==0);
117 testOk1(epicsRingPointerGetHighWaterMark(ring)==rsize);
111118
112 testDiag("Fill it up again");119 testDiag("Fill it up again");
113 for(i=2; i<2*rsize; i++) {120 for(i=2; i<2*rsize; i++) {
@@ -236,7 +243,7 @@ MAIN(ringPointerTest)
236{243{
237 int prio = epicsThreadGetPrioritySelf();244 int prio = epicsThreadGetPrioritySelf();
238245
239 testPlan(37);246 testPlan(42);
240 testSingle();247 testSingle();
241 if (prio)248 if (prio)
242 epicsThreadSetPriority(epicsThreadGetIdSelf(), epicsThreadPriorityScanLow);249 epicsThreadSetPriority(epicsThreadGetIdSelf(), epicsThreadPriorityScanLow);

Subscribers

People subscribed via source and target branches