Merge lp:~epics-core/epics-base/cahuge into lp:~epics-core/epics-base/3.16

Proposed by mdavidsaver
Status: Rejected
Rejected by: mdavidsaver
Proposed branch: lp:~epics-core/epics-base/cahuge
Merge into: lp:~epics-core/epics-base/3.16
Diff against target: 418 lines (+156/-68)
10 files modified
src/ca/client/tcpiiu.cpp (+48/-16)
src/ca/legacy/pcas/generic/casStrmClient.cc (+1/-1)
src/ca/legacy/pcas/generic/clientBufMemoryManager.cpp (+0/-5)
src/ca/legacy/pcas/generic/clientBufMemoryManager.h (+1/-1)
src/ca/legacy/pcas/generic/inBuf.cc (+13/-5)
src/ca/legacy/pcas/generic/inBuf.h (+2/-2)
src/ca/legacy/pcas/generic/outBuf.cc (+11/-5)
src/ca/legacy/pcas/generic/outBuf.h (+1/-1)
src/ioc/rsrv/caservertask.c (+78/-31)
src/ioc/rsrv/server.h (+1/-1)
To merge this branch: bzr merge lp:~epics-core/epics-base/cahuge
Reviewer Review Type Date Requested Status
mdavidsaver superseded Disapprove
Review via email: mp+319759@code.launchpad.net

Description of the change

Relaxation of EPICS_CA_MAX_ARRAY_BYTES. Instead of failing, attempt to handle large messages with a single allocation, if realloc() errors then fail as before. These large allocations aren't pooled, and are free()'d when a client disconnects.

What I've seen (eg. @ nsls2 and frib) is that users always set EPICS_CA_MAX_ARRAY_BYTES to be larger than the largest possible array in a facility (typically a camera image). This leads to inefficient use of buffers as only a small number of PVs over 16KB are actually this large. (as CAJ does this as well, I've wondered how much this behavior contributes to cs-studio's reputation as a memory hog)

This change would allow EPICS_CA_MAX_ARRAY_BYTES to be used as an optimization based on the most common array size.

I've left a hook (the NO_HUGE macro) to disable this in libca and RSRV on based on some condition.

To post a comment you must log in.
Revision history for this message
Andrew Johnson (anj) wrote :

F2F meeting 3/14/2017: Change to use a run-time switch instead of NO_HUGE, default setting configurable by target, new behaviour: Don’t use free-list, always allocate large buffers on demand and release when client disconnects.

Revision history for this message
mdavidsaver (mdavidsaver) wrote :
review: Disapprove (superseded)

Unmerged revisions

12748. By mdavidsaver

rsrv: support larger than max. array bytes

12747. By mdavidsaver

pcas: support larger than max array bytes

clientBufMemoryManager already supports allocations
larger than max array bytes, adjust callers inBuf/outBuf
to actually request larger allocations.

12746. By mdavidsaver

ca: support alloc larger than max array bytes

automatically try to allocate a custom buffer
when a message larger than ca max array bytes
is encountered.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/ca/client/tcpiiu.cpp'
2--- src/ca/client/tcpiiu.cpp 2016-10-20 20:32:06 +0000
3+++ src/ca/client/tcpiiu.cpp 2017-03-13 23:51:22 +0000
4@@ -26,6 +26,9 @@
5
6 #include <stdexcept>
7 #include <string>
8+
9+#include <stdlib.h>
10+
11 #include "errlog.h"
12
13 #define epicsExportSharedSymbols
14@@ -1023,12 +1026,15 @@
15
16 // free message body cache
17 if ( this->pCurData ) {
18- if ( this->curDataMax == MAX_TCP ) {
19+ if ( this->curDataMax <= MAX_TCP ) {
20 this->cacRef.releaseSmallBufferTCP ( this->pCurData );
21 }
22- else {
23+ else if ( this->curDataMax <= cacRef.largeBufferSizeTCP()) {
24 this->cacRef.releaseLargeBufferTCP ( this->pCurData );
25 }
26+ else {
27+ free ( this->pCurData );
28+ }
29 }
30 }
31
32@@ -1197,18 +1203,44 @@
33 // make sure we have a large enough message body cache
34 //
35 if ( this->curMsg.m_postsize > this->curDataMax ) {
36- if ( this->curDataMax == MAX_TCP &&
37- this->cacRef.largeBufferSizeTCP() >= this->curMsg.m_postsize ) {
38- char * pBuf = this->cacRef.allocateLargeBufferTCP ();
39- if ( pBuf ) {
40+ assert (this->curMsg.m_postsize > MAX_TCP);
41+
42+ char * newbuf = NULL;
43+ arrayElementCount newsize;
44+
45+ if ( this->curMsg.m_postsize <= this->cacRef.largeBufferSizeTCP() ) {
46+ newbuf = this->cacRef.allocateLargeBufferTCP ();
47+ newsize = this->cacRef.largeBufferSizeTCP();
48+ }
49+#ifndef NO_HUGE
50+ else {
51+ // round size up to multiple of 4K
52+ newsize = ((this->curMsg.m_postsize-1)|0xfff)+1;
53+
54+ if (this->curDataMax > this->cacRef.largeBufferSizeTCP()) {
55+ // expand existing huge buffer
56+ newbuf = (char*)realloc(this->pCurData, newsize);
57+ } else {
58+ // trade up from small or large
59+ newbuf = (char*)malloc(newsize);
60+ }
61+ }
62+#endif
63+
64+ if ( newbuf) {
65+ if (this->curDataMax <= MAX_TCP) {
66 this->cacRef.releaseSmallBufferTCP ( this->pCurData );
67- this->pCurData = pBuf;
68- this->curDataMax = this->cacRef.largeBufferSizeTCP ();
69- }
70- else {
71- this->printFormated ( mgr.cbGuard,
72- "CAC: not enough memory for message body cache (ignoring response message)\n");
73- }
74+ } else if (this->curDataMax <= this->cacRef.largeBufferSizeTCP()) {
75+ this->cacRef.releaseLargeBufferTCP ( this->pCurData );
76+ } else {
77+ // called realloc()
78+ }
79+ this->pCurData = newbuf;
80+ this->curDataMax = newsize;
81+
82+ } else {
83+ this->printFormated ( mgr.cbGuard,
84+ "CAC: not enough memory for message body cache (ignoring response message)\n");
85 }
86 }
87
88@@ -1426,7 +1458,7 @@
89 }
90 arrayElementCount maxBytes;
91 if ( CA_V49 ( this->minorProtocolVersion ) ) {
92- maxBytes = this->cacRef.largeBufferSizeTCP ();
93+ maxBytes = 0xfffffff0;
94 }
95 else {
96 maxBytes = MAX_TCP;
97@@ -1537,7 +1569,7 @@
98 guard, CA_V413(this->minorProtocolVersion) );
99 arrayElementCount maxBytes;
100 if ( CA_V49 ( this->minorProtocolVersion ) ) {
101- maxBytes = this->cacRef.largeBufferSizeTCP ();
102+ maxBytes = 0xfffffff0;
103 }
104 else {
105 maxBytes = MAX_TCP;
106@@ -1584,7 +1616,7 @@
107 guard, CA_V413(this->minorProtocolVersion) );
108 arrayElementCount maxBytes;
109 if ( CA_V49 ( this->minorProtocolVersion ) ) {
110- maxBytes = this->cacRef.largeBufferSizeTCP ();
111+ maxBytes = 0xfffffff0;
112 }
113 else {
114 maxBytes = MAX_TCP;
115
116=== modified file 'src/ca/legacy/pcas/generic/casStrmClient.cc'
117--- src/ca/legacy/pcas/generic/casStrmClient.cc 2016-12-07 22:36:58 +0000
118+++ src/ca/legacy/pcas/generic/casStrmClient.cc 2017-03-13 23:51:22 +0000
119@@ -205,7 +205,7 @@
120 if ( bytesLeft < msgSize ) {
121 status = S_cas_success;
122 if ( msgSize > this->in.bufferSize() ) {
123- this->in.expandBuffer ();
124+ this->in.expandBuffer (msgSize);
125 // msg to large - set up message drain
126 if ( msgSize > this->in.bufferSize() ) {
127 caServerI::dumpMsg ( this->pHostName, this->pUserName, & msgTmp, 0,
128
129=== modified file 'src/ca/legacy/pcas/generic/clientBufMemoryManager.cpp'
130--- src/ca/legacy/pcas/generic/clientBufMemoryManager.cpp 2016-05-22 03:43:09 +0000
131+++ src/ca/legacy/pcas/generic/clientBufMemoryManager.cpp 2017-03-13 23:51:22 +0000
132@@ -16,11 +16,6 @@
133 #define epicsExportSharedSymbols
134 #include "clientBufMemoryManager.h"
135
136-bufSizeT clientBufMemoryManager::maxSize () const
137-{
138- return bufferFactory.largeBufferSize ();
139-}
140-
141 casBufferParm clientBufMemoryManager::allocate ( bufSizeT newMinSize )
142 {
143 casBufferParm parm;
144
145=== modified file 'src/ca/legacy/pcas/generic/clientBufMemoryManager.h'
146--- src/ca/legacy/pcas/generic/clientBufMemoryManager.h 2003-02-12 19:06:15 +0000
147+++ src/ca/legacy/pcas/generic/clientBufMemoryManager.h 2017-03-13 23:51:22 +0000
148@@ -39,9 +39,9 @@
149
150 class clientBufMemoryManager {
151 public:
152+ //! @throws std::bad_alloc on failure
153 casBufferParm allocate ( bufSizeT newMinSize );
154 void release ( char * pBuf, bufSizeT bufSize );
155- bufSizeT maxSize () const;
156 private:
157 casBufferFactory bufferFactory;
158 };
159
160=== modified file 'src/ca/legacy/pcas/generic/inBuf.cc'
161--- src/ca/legacy/pcas/generic/inBuf.cc 2016-05-22 12:38:18 +0000
162+++ src/ca/legacy/pcas/generic/inBuf.cc 2017-03-13 23:51:22 +0000
163@@ -13,6 +13,8 @@
164 * 505 665 1831
165 */
166
167+#include <stdexcept>
168+
169 #include <stdio.h>
170 #include <string.h>
171
172@@ -155,11 +157,17 @@
173 }
174 }
175
176-void inBuf::expandBuffer ()
177+void inBuf::expandBuffer (bufSizeT needed)
178 {
179- bufSizeT max = this->memMgr.maxSize();
180- if ( this->bufSize < max ) {
181- casBufferParm bufParm = this->memMgr.allocate ( max );
182+ if (needed > bufSize) {
183+ casBufferParm bufParm;
184+ try {
185+ bufParm = this->memMgr.allocate ( needed );
186+ } catch (std::bad_alloc& e) {
187+ // caller must check that buffer size has expended
188+ return;
189+ }
190+
191 bufSizeT unprocessedBytes = this->bytesPresent ();
192 memcpy ( bufParm.pBuf, &this->pBuf[this->nextReadIndex], unprocessedBytes );
193 this->bytesInBuffer = unprocessedBytes;
194@@ -170,7 +178,7 @@
195 }
196 }
197
198-unsigned inBuf::bufferSize () const
199+bufSizeT inBuf::bufferSize() const
200 {
201 return this->bufSize;
202 }
203
204=== modified file 'src/ca/legacy/pcas/generic/inBuf.h'
205--- src/ca/legacy/pcas/generic/inBuf.h 2012-04-12 16:13:50 +0000
206+++ src/ca/legacy/pcas/generic/inBuf.h 2017-03-13 23:51:22 +0000
207@@ -82,8 +82,8 @@
208 //
209 const inBufCtx pushCtx ( bufSizeT headerSize, bufSizeT bodySize );
210 bufSizeT popCtx ( const inBufCtx & ); // returns actual size
211- unsigned bufferSize () const;
212- void expandBuffer ();
213+ bufSizeT bufferSize () const;
214+ void expandBuffer (bufSizeT needed);
215 private:
216 class inBufClient & client;
217 class clientBufMemoryManager & memMgr;
218
219=== modified file 'src/ca/legacy/pcas/generic/outBuf.cc'
220--- src/ca/legacy/pcas/generic/outBuf.cc 2016-05-22 12:38:18 +0000
221+++ src/ca/legacy/pcas/generic/outBuf.cc 2017-03-13 23:51:22 +0000
222@@ -59,7 +59,7 @@
223 msgsize = CA_MESSAGE_ALIGN ( msgsize );
224
225 if ( msgsize > this->bufSize ) {
226- this->expandBuffer ();
227+ this->expandBuffer (msgsize);
228 if ( msgsize > this->bufSize ) {
229 return S_cas_hugeRequest;
230 }
231@@ -316,11 +316,17 @@
232 }
233 }
234
235-void outBuf::expandBuffer ()
236+void outBuf::expandBuffer (bufSizeT needed)
237 {
238- bufSizeT max = this->memMgr.maxSize();
239- if ( this->bufSize < max ) {
240- casBufferParm bufParm = this->memMgr.allocate ( max );
241+ if (needed > bufSize) {
242+ casBufferParm bufParm;
243+ try {
244+ bufParm = this->memMgr.allocate ( needed );
245+ } catch (std::bad_alloc& e) {
246+ // caller must check that buffer size has expended
247+ return;
248+ }
249+
250 memcpy ( bufParm.pBuf, this->pBuf, this->stack );
251 this->memMgr.release ( this->pBuf, this->bufSize );
252 this->pBuf = bufParm.pBuf;
253
254=== modified file 'src/ca/legacy/pcas/generic/outBuf.h'
255--- src/ca/legacy/pcas/generic/outBuf.h 2012-04-12 16:13:50 +0000
256+++ src/ca/legacy/pcas/generic/outBuf.h 2017-03-13 23:51:22 +0000
257@@ -122,7 +122,7 @@
258 bufSizeT stack;
259 unsigned ctxRecursCount;
260
261- void expandBuffer ();
262+ void expandBuffer (bufSizeT needed);
263
264 outBuf ( const outBuf & );
265 outBuf & operator = ( const outBuf & );
266
267=== modified file 'src/ioc/rsrv/caservertask.c'
268--- src/ioc/rsrv/caservertask.c 2017-01-23 23:20:51 +0000
269+++ src/ioc/rsrv/caservertask.c 2017-03-13 23:51:22 +0000
270@@ -1060,6 +1060,9 @@
271 else if ( client->send.type == mbtLargeTCP ) {
272 freeListFree ( rsrvLargeBufFreeListTCP, client->send.buf );
273 }
274+ else if (client->send.type == mbtHugeTCP ) {
275+ free ( client->send.buf );
276+ }
277 else {
278 errlogPrintf ( "CAS: Corrupt send buffer free list type code=%u during client cleanup?\n",
279 client->send.type );
280@@ -1072,6 +1075,9 @@
281 else if ( client->recv.type == mbtLargeTCP ) {
282 freeListFree ( rsrvLargeBufFreeListTCP, client->recv.buf );
283 }
284+ else if (client->recv.type == mbtHugeTCP ) {
285+ free ( client->recv.buf );
286+ }
287 else {
288 errlogPrintf ( "CAS: Corrupt recv buffer free list type code=%u during client cleanup?\n",
289 client->send.type );
290@@ -1301,43 +1307,84 @@
291 taskwdInsert ( pClient->tid, NULL, NULL );
292 }
293
294+static
295+void casExpandBuffer ( struct message_buffer *buf, ca_uint32_t size, int sendbuf )
296+{
297+ char *newbuf = NULL;
298+ unsigned newsize;
299+ enum messageBufferType newtype;
300+
301+ assert (size > MAX_TCP);
302+
303+ if ( size <= buf->maxstk || buf->type == mbtUDP ) return;
304+
305+ /* try to alloc new buffer */
306+ if (size <= MAX_TCP) {
307+ return; /* shouldn't happen */
308+ } else if (size <= rsrvSizeofLargeBufTCP) {
309+ newbuf = freeListCalloc ( rsrvLargeBufFreeListTCP );
310+ newsize = rsrvSizeofLargeBufTCP;
311+ newtype = mbtLargeTCP;
312+ }
313+#ifndef NO_HUGE
314+ else {
315+ size = ((size-1)|0xfff)+1;
316+
317+ if (buf->type==mbtHugeTCP)
318+ newbuf = realloc (buf->buf, size);
319+ else
320+ newbuf = malloc (size);
321+ newtype = mbtHugeTCP;
322+ newsize = size;
323+ }
324+#endif
325+
326+ if (newbuf) {
327+ /* copy existing buffer */
328+ if (sendbuf) {
329+ /* send buffer uses [0, stk) */
330+ if (buf->type==mbtHugeTCP) {
331+ /* realloc already copied */
332+ } else {
333+ memcpy ( newbuf, buf->buf, buf->stk );
334+ }
335+ } else {
336+ /* recv buffer uses [stk, cnt) */
337+ unsigned used;
338+ assert ( buf->cnt >= buf->stk );
339+ used = buf->cnt - buf->stk;
340+
341+ /* buf->buf may be the same as newbuf if realloc() used */
342+ memmove ( newbuf, &buf->buf[buf->stk], used );
343+
344+ buf->cnt = used;
345+ buf->stk = 0;
346+
347+ }
348+
349+ /* free existing buffer */
350+ if(buf->type==mbtSmallTCP) {
351+ freeListFree ( rsrvSmallBufFreeListTCP, buf->buf );
352+ } else if(buf->type==mbtLargeTCP) {
353+ freeListFree ( rsrvLargeBufFreeListTCP, buf->buf );
354+ } else {
355+ /* realloc() already free()'d if necessary */
356+ }
357+
358+ buf->buf = newbuf;
359+ buf->type = newtype;
360+ buf->maxstk = newsize;
361+ }
362+}
363+
364 void casExpandSendBuffer ( struct client *pClient, ca_uint32_t size )
365 {
366- if ( pClient->send.type == mbtSmallTCP && rsrvSizeofLargeBufTCP > MAX_TCP
367- && size <= rsrvSizeofLargeBufTCP ) {
368- int spaceAvailOnFreeList = freeListItemsAvail ( rsrvLargeBufFreeListTCP ) > 0;
369- if ( osiSufficentSpaceInPool(rsrvSizeofLargeBufTCP) || spaceAvailOnFreeList ) {
370- char *pNewBuf = ( char * ) freeListCalloc ( rsrvLargeBufFreeListTCP );
371- if ( pNewBuf ) {
372- memcpy ( pNewBuf, pClient->send.buf, pClient->send.stk );
373- freeListFree ( rsrvSmallBufFreeListTCP, pClient->send.buf );
374- pClient->send.buf = pNewBuf;
375- pClient->send.maxstk = rsrvSizeofLargeBufTCP;
376- pClient->send.type = mbtLargeTCP;
377- }
378- }
379- }
380+ casExpandBuffer (&pClient->send, size, 1);
381 }
382
383 void casExpandRecvBuffer ( struct client *pClient, ca_uint32_t size )
384 {
385- if ( pClient->recv.type == mbtSmallTCP && rsrvSizeofLargeBufTCP > MAX_TCP
386- && size <= rsrvSizeofLargeBufTCP) {
387- int spaceAvailOnFreeList = freeListItemsAvail ( rsrvLargeBufFreeListTCP ) > 0;
388- if ( osiSufficentSpaceInPool(rsrvSizeofLargeBufTCP) || spaceAvailOnFreeList ) {
389- char *pNewBuf = ( char * ) freeListCalloc ( rsrvLargeBufFreeListTCP );
390- if ( pNewBuf ) {
391- assert ( pClient->recv.cnt >= pClient->recv.stk );
392- memcpy ( pNewBuf, &pClient->recv.buf[pClient->recv.stk], pClient->recv.cnt - pClient->recv.stk );
393- freeListFree ( rsrvSmallBufFreeListTCP, pClient->recv.buf );
394- pClient->recv.buf = pNewBuf;
395- pClient->recv.cnt = pClient->recv.cnt - pClient->recv.stk;
396- pClient->recv.stk = 0u;
397- pClient->recv.maxstk = rsrvSizeofLargeBufTCP;
398- pClient->recv.type = mbtLargeTCP;
399- }
400- }
401- }
402+ casExpandBuffer (&pClient->recv, size, 0);
403 }
404
405 /*
406
407=== modified file 'src/ioc/rsrv/server.h'
408--- src/ioc/rsrv/server.h 2016-05-13 18:12:08 +0000
409+++ src/ioc/rsrv/server.h 2017-03-13 23:51:22 +0000
410@@ -60,7 +60,7 @@
411 * Eight-byte alignment is required by the Sparc 5 and other RISC
412 * processors.
413 */
414-enum messageBufferType { mbtUDP, mbtSmallTCP, mbtLargeTCP };
415+enum messageBufferType { mbtUDP, mbtSmallTCP, mbtLargeTCP, mbtHugeTCP };
416 struct message_buffer {
417 char *buf;
418 unsigned stk;

Subscribers

People subscribed via source and target branches