Merge lp:~gpr/linuxdcpp/sync into lp:linuxdcpp

Proposed by Gennady Proskurin on 2010-08-15
Status: Needs review
Proposed branch: lp:~gpr/linuxdcpp/sync
Merge into: lp:linuxdcpp
Diff against target: 489 lines (+171/-91)
12 files modified
dcpp/Atomic.h (+122/-0)
dcpp/BufferedSocket.cpp (+3/-3)
dcpp/BufferedSocket.h (+5/-1)
dcpp/Client.cpp (+6/-6)
dcpp/Client.h (+14/-6)
dcpp/CriticalSection.h (+7/-27)
dcpp/Pointer.h (+4/-4)
dcpp/Semaphore.h (+5/-2)
dcpp/ShareManager.cpp (+3/-3)
dcpp/ShareManager.h (+2/-1)
dcpp/Thread.cpp (+0/-4)
dcpp/Thread.h (+0/-34)
To merge this branch: bzr merge lp:~gpr/linuxdcpp/sync
Reviewer Review Type Date Requested Status
LinuxDC++ Team 2010-08-15 Pending
Review via email: mp+32714@code.launchpad.net

Description of the change

+ includes all patches submitted in corresponding bug reports
+ Thread::safeDec/Inc/Exchange functions are replaced by more fine-grained/lightweight/portable implementations
+ Thread::safeDec/Inc/Exchange functions removed (as unused)

Bug #617021: Semaphore potentially may underflow and become negative
Bug #617591: Pointer.h/intrusive_ptr_base class is too heavy-weight
Bug #617757: portable FastCriticalSection implementation
Bug #617988: atomic counters implemented

To post a comment you must log in.

Unmerged revisions

391. By Gennady Proskurin on 2010-08-15

Remove unused include <sched.h>, which was used for sched_yield() earlier.

390. By Gennady Proskurin on 2010-08-15

Remove unused Thread::safeExchange function and associated mutex.

389. By Gennady Proskurin on 2010-08-15

Use Atomic<bool> for "refreshing" variable.

388. By Gennady Proskurin on 2010-08-15

Implement exchange() function for Atomic template (only for memory_ordering_strong for now).
It assigns new value to atomic, returns old value.

387. By Gennady Proskurin on 2010-08-15

Remove Thread::safeInc/safeDec functions. They are unused now.

386. By Gennady Proskurin on 2010-08-15

Change BufferedSocket::sockets to Atomic (strong variant).

385. By Gennady Proskurin on 2010-08-15

Implement "strong memory ordering" variant of Atomic template.
For counters in "struct Counts" "weak" variant is sufficient.

384. By Gennady Proskurin on 2010-08-15

Remove unused Thread::yield functions.

383. By Gennady Proskurin on 2010-08-15

include <boost/detail/atomic_count.hpp> -> <boost/smart_ptr/detail/atomic_count.hpp>
The later header is garanteed to have necessary memory barrier for refcounting.
For now, it is no-op (it is the same header).

382. By Gennady Proskurin on 2010-08-15

Recheck predicate after wakeup of cond_wait/cond_timedwait

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'dcpp/Atomic.h'
2--- dcpp/Atomic.h 1970-01-01 00:00:00 +0000
3+++ dcpp/Atomic.h 2010-08-15 19:35:53 +0000
4@@ -0,0 +1,122 @@
5+#if !defined(DCPP_ATOMIC_H)
6+#define DCPP_ATOMIC_H
7+
8+#include "CriticalSection.h"
9+
10+#include <boost/interprocess/detail/atomic.hpp>
11+#include <boost/cstdint.hpp>
12+
13+namespace dcpp {
14+
15+// Ordering arguments:
16+
17+// memory_ordering_weak
18+// Suitable only for thread-safe accounting of some statistics.
19+// Value can not be used as "flag" (you cannot do any multi-thread action, based
20+// on this value) since it does not garantees necessary memory barriers.
21+class memory_ordering_weak {};
22+
23+// memory_ordering_strong
24+// Suitable for any multi-thread purpose
25+class memory_ordering_strong {};
26+
27+template <typename DataType, class Ordering = memory_ordering_strong>
28+class Atomic;
29+
30+
31+// uint32_t
32+template <>
33+class Atomic<boost::uint32_t, memory_ordering_weak> {
34+ typedef boost::uint32_t value_type;
35+public:
36+ Atomic(value_type val) { assign(val); }
37+ Atomic(const Atomic& other) { assign(static_cast<value_type>(other)); }
38+
39+ // operator=
40+ // return void to be safe
41+ void operator=(value_type val) { assign(val); }
42+ void operator=(const Atomic& other) {
43+ return operator=(static_cast<value_type>(other));
44+ }
45+
46+ // type cast
47+ operator value_type() const {
48+ return boost::interprocess::detail::atomic_read32(&m_value);
49+ }
50+
51+ // increment
52+ void inc() { boost::interprocess::detail::atomic_inc32(&m_value); }
53+
54+ // decrement
55+ void dec() { boost::interprocess::detail::atomic_dec32(&m_value); }
56+
57+private:
58+ mutable value_type m_value;
59+ void assign(value_type val) { boost::interprocess::detail::atomic_write32(&m_value, val); }
60+};
61+
62+// int32_t
63+// just forward all operations to underlying Atomic<uint32_t, ...> variable
64+template <>
65+class Atomic<boost::int32_t, memory_ordering_weak> {
66+ typedef boost::int32_t value_type;
67+public:
68+ Atomic(value_type val) : m_value(val) {}
69+ Atomic(const Atomic& other) : m_value(other) {}
70+
71+ void operator=(value_type val) { m_value=val; }
72+ void operator=(const Atomic& other) { m_value=other; }
73+ operator value_type() const { return static_cast<value_type>(m_value); }
74+
75+ void inc() { m_value.inc(); }
76+ void dec() { m_value.dec(); }
77+private:
78+ Atomic<boost::uint32_t,memory_ordering_weak> m_value;
79+};
80+
81+// memory_ordering_strong
82+template <typename DataType>
83+class Atomic<DataType, memory_ordering_strong> {
84+ typedef DataType value_type;
85+public:
86+ Atomic(value_type new_value) : m_value(new_value) {}
87+ Atomic(const Atomic& other) : m_value(static_cast<value_type>(other)) {}
88+
89+ void operator=(value_type new_value) {
90+ FastLock Lock(cs);
91+ m_value = new_value;
92+ }
93+ void operator=(const Atomic& other) {
94+ FastLock Lock(cs);
95+ m_value = other;
96+ }
97+ operator value_type() const {
98+ FastLock Lock(cs); // shared (read-only) lock would be sufficient here
99+ return m_value;
100+ }
101+
102+ void inc() {
103+ FastLock Lock(cs);
104+ ++m_value;
105+ }
106+ void dec() {
107+ FastLock Lock(cs);
108+ --m_value;
109+ }
110+
111+ // assign new value, return old value
112+ value_type exchange(value_type new_val) {
113+ FastLock Lock(cs);
114+ value_type old_val = m_value;
115+ m_value = new_val;
116+ return old_val;
117+ }
118+private:
119+ value_type m_value;
120+ mutable FastCriticalSection cs;
121+};
122+
123+
124+} // namespace dcpp
125+
126+#endif // !defined(DCPP_ATOMIC_H)
127
128=== modified file 'dcpp/BufferedSocket.cpp'
129--- dcpp/BufferedSocket.cpp 2009-08-15 04:40:26 +0000
130+++ dcpp/BufferedSocket.cpp 2010-08-15 19:35:53 +0000
131@@ -40,13 +40,13 @@
132 {
133 start();
134
135- Thread::safeInc(sockets);
136+ sockets.inc();
137 }
138
139-volatile long BufferedSocket::sockets = 0;
140+Atomic<long,memory_ordering_strong> BufferedSocket::sockets(0);
141
142 BufferedSocket::~BufferedSocket() throw() {
143- Thread::safeDec(sockets);
144+ sockets.dec();
145 }
146
147 void BufferedSocket::setMode (Modes aMode, size_t aRollback) {
148
149=== modified file 'dcpp/BufferedSocket.h'
150--- dcpp/BufferedSocket.h 2009-08-15 04:40:26 +0000
151+++ dcpp/BufferedSocket.h 2010-08-15 19:35:53 +0000
152@@ -26,6 +26,7 @@
153 #include "Speaker.h"
154 #include "Util.h"
155 #include "Socket.h"
156+#include "Atomic.h"
157
158 namespace dcpp {
159
160@@ -153,7 +154,10 @@
161 void threadSendData() throw(Exception);
162
163 void fail(const string& aError);
164- static volatile long sockets;
165+
166+ // For this counter we must use "strong" variant of Atomic
167+ // We do some actions after checking this value, while it changes in other threads
168+ static Atomic<long,memory_ordering_strong> sockets;
169
170 bool checkEvents() throw(Exception);
171 void checkSocket() throw(Exception);
172
173=== modified file 'dcpp/Client.cpp'
174--- dcpp/Client.cpp 2009-08-15 04:40:26 +0000
175+++ dcpp/Client.cpp 2010-08-15 19:35:53 +0000
176@@ -158,24 +158,24 @@
177 void Client::updateCounts(bool aRemove) {
178 // We always remove the count and then add the correct one if requested...
179 if(countType == COUNT_NORMAL) {
180- Thread::safeDec(counts.normal);
181+ counts.normal.dec();
182 } else if(countType == COUNT_REGISTERED) {
183- Thread::safeDec(counts.registered);
184+ counts.registered.dec();
185 } else if(countType == COUNT_OP) {
186- Thread::safeDec(counts.op);
187+ counts.op.dec();
188 }
189
190 countType = COUNT_UNCOUNTED;
191
192 if(!aRemove) {
193 if(getMyIdentity().isOp()) {
194- Thread::safeInc(counts.op);
195+ counts.op.inc();
196 countType = COUNT_OP;
197 } else if(getMyIdentity().isRegistered()) {
198- Thread::safeInc(counts.registered);
199+ counts.registered.inc();
200 countType = COUNT_REGISTERED;
201 } else {
202- Thread::safeInc(counts.normal);
203+ counts.normal.inc();
204 countType = COUNT_NORMAL;
205 }
206 }
207
208=== modified file 'dcpp/Client.h'
209--- dcpp/Client.h 2009-08-15 04:40:26 +0000
210+++ dcpp/Client.h 2010-08-15 19:35:53 +0000
211@@ -26,6 +26,7 @@
212 #include "BufferedSocketListener.h"
213 #include "TimerManager.h"
214 #include "ClientListener.h"
215+#include "Atomic.h"
216
217 namespace dcpp {
218
219@@ -72,7 +73,10 @@
220
221 static string getCounts() {
222 char buf[128];
223- return string(buf, snprintf(buf, sizeof(buf), "%ld/%ld/%ld", counts.normal, counts.registered, counts.op));
224+ return string(buf, snprintf(buf, sizeof(buf), "%ld/%ld/%ld",
225+ static_cast<long>(counts.normal),
226+ static_cast<long>(counts.registered),
227+ static_cast<long>(counts.op)));
228 }
229
230 StringMap& escapeParams(StringMap& sm) {
231@@ -112,12 +116,16 @@
232 friend class ClientManager;
233 Client(const string& hubURL, char separator, bool secure_);
234 virtual ~Client() throw();
235+
236 struct Counts {
237- Counts(long n = 0, long r = 0, long o = 0) : normal(n), registered(r), op(o) { }
238- volatile long normal;
239- volatile long registered;
240- volatile long op;
241- bool operator !=(const Counts& rhs) { return normal != rhs.normal || registered != rhs.registered || op != rhs.op; }
242+ private:
243+ typedef Atomic<boost::int32_t,memory_ordering_weak> atomic_counter_t;
244+ public:
245+ typedef boost::int32_t value_type;
246+ Counts(value_type n = 0, value_type r = 0, value_type o = 0) : normal(n), registered(r), op(o) { }
247+ atomic_counter_t normal;
248+ atomic_counter_t registered;
249+ atomic_counter_t op;
250 };
251
252 enum States {
253
254=== modified file 'dcpp/CriticalSection.h'
255--- dcpp/CriticalSection.h 2009-02-23 01:47:25 +0000
256+++ dcpp/CriticalSection.h 2010-08-15 19:35:53 +0000
257@@ -19,7 +19,8 @@
258 #if !defined(CRITICAL_SECTION_H)
259 #define CRITICAL_SECTION_H
260
261-#include "Thread.h"
262+// header-only implementation of mutex
263+#include <boost/signals2/mutex.hpp>
264
265 namespace dcpp {
266
267@@ -78,32 +79,11 @@
268 */
269 class FastCriticalSection {
270 public:
271-#ifdef _WIN32
272- FastCriticalSection() : state(0) { }
273-
274- void enter() {
275- while(Thread::safeExchange(state, 1) == 1) {
276- Thread::yield();
277- }
278- }
279- void leave() {
280- Thread::safeDec(state);
281- }
282-private:
283- volatile long state;
284-
285-#else
286- // We have to use a pthread (nonrecursive) mutex, didn't find any test_and_set on linux...
287- FastCriticalSection() {
288- static pthread_mutex_t fastmtx = PTHREAD_MUTEX_INITIALIZER;
289- mtx = fastmtx;
290- }
291- ~FastCriticalSection() { pthread_mutex_destroy(&mtx); }
292- void enter() { pthread_mutex_lock(&mtx); }
293- void leave() { pthread_mutex_unlock(&mtx); }
294-private:
295- pthread_mutex_t mtx;
296-#endif
297+ void enter() { mtx.lock(); }
298+ void leave() { mtx.unlock(); }
299+private:
300+ typedef boost::signals2::mutex mutex_t;
301+ mutex_t mtx;
302 };
303
304 template<class T>
305
306=== modified file 'dcpp/Pointer.h'
307--- dcpp/Pointer.h 2009-08-15 04:40:26 +0000
308+++ dcpp/Pointer.h 2010-08-15 19:35:53 +0000
309@@ -20,7 +20,7 @@
310 #define DCPLUSPLUS_DCPP_POINTER_H
311
312 #include <boost/intrusive_ptr.hpp>
313-#include "Thread.h"
314+#include <boost/smart_ptr/detail/atomic_count.hpp>
315
316 namespace dcpp {
317
318@@ -36,10 +36,10 @@
319 intrusive_ptr_base() throw() : ref(0) { }
320
321 private:
322- friend void intrusive_ptr_add_ref(intrusive_ptr_base* p) { Thread::safeInc(p->ref); }
323- friend void intrusive_ptr_release(intrusive_ptr_base* p) { if(Thread::safeDec(p->ref) == 0) { delete static_cast<T*>(p); } }
324+ friend void intrusive_ptr_add_ref(intrusive_ptr_base* p) { ++p->ref; }
325+ friend void intrusive_ptr_release(intrusive_ptr_base* p) { if(--p->ref == 0) { delete static_cast<T*>(p); } }
326
327- volatile long ref;
328+ boost::detail::atomic_count ref;
329 };
330
331
332
333=== modified file 'dcpp/Semaphore.h'
334--- dcpp/Semaphore.h 2009-02-23 01:47:25 +0000
335+++ dcpp/Semaphore.h 2010-08-15 19:35:53 +0000
336@@ -59,7 +59,7 @@
337
338 bool wait() throw() {
339 Lock l(cs);
340- if(count == 0) {
341+ while (count == 0) {
342 pthread_cond_wait(&cond, &cs.getMutex());
343 }
344 count--;
345@@ -74,7 +74,10 @@
346 millis+=timev.tv_usec/1000;
347 t.tv_sec = timev.tv_sec + (millis/1000);
348 t.tv_nsec = (millis%1000)*1000*1000;
349- int ret = pthread_cond_timedwait(&cond, &cs.getMutex(), &t);
350+ int ret;
351+ do {
352+ ret = pthread_cond_timedwait(&cond, &cs.getMutex(), &t);
353+ } while (ret==0 && count==0);
354 if(ret != 0) {
355 return false;
356 }
357
358=== modified file 'dcpp/ShareManager.cpp'
359--- dcpp/ShareManager.cpp 2009-12-27 22:03:53 +0000
360+++ dcpp/ShareManager.cpp 2010-08-15 19:35:53 +0000
361@@ -52,7 +52,7 @@
362 namespace dcpp {
363
364 ShareManager::ShareManager() : hits(0), xmlListLen(0), bzXmlListLen(0),
365- xmlDirty(true), refreshDirs(false), update(false), initial(true), listN(0), refreshing(0),
366+ xmlDirty(true), refreshDirs(false), update(false), initial(true), listN(0), refreshing(false),
367 lastXmlUpdate(0), lastFullUpdate(GET_TICK()), bloom(1<<20)
368 {
369 SettingsManager::getInstance()->addListener(this);
370@@ -812,7 +812,7 @@
371 }
372
373 void ShareManager::refresh(bool dirs /* = false */, bool aUpdate /* = true */, bool block /* = false */) throw() {
374- if(Thread::safeExchange(refreshing, 1) == 1) {
375+ if(refreshing.exchange(true) == true) {
376 LogManager::getInstance()->message(_("File list refresh in progress, please wait for it to finish before trying to refresh again"));
377 return;
378 }
379@@ -883,7 +883,7 @@
380 if(update) {
381 ClientManager::getInstance()->infoUpdated();
382 }
383- refreshing = 0;
384+ refreshing = false;
385 return 0;
386 }
387
388
389=== modified file 'dcpp/ShareManager.h'
390--- dcpp/ShareManager.h 2009-12-27 22:03:53 +0000
391+++ dcpp/ShareManager.h 2010-08-15 19:35:53 +0000
392@@ -33,6 +33,7 @@
393 #include "FastAlloc.h"
394 #include "MerkleTree.h"
395 #include "Pointer.h"
396+#include "Atomic.h"
397
398 namespace dcpp {
399
400@@ -249,7 +250,7 @@
401
402 int listN;
403
404- volatile long refreshing;
405+ Atomic<bool,memory_ordering_strong> refreshing;
406
407 uint64_t lastXmlUpdate;
408 uint64_t lastFullUpdate;
409
410=== modified file 'dcpp/Thread.cpp'
411--- dcpp/Thread.cpp 2009-02-23 01:47:25 +0000
412+++ dcpp/Thread.cpp 2010-08-15 19:35:53 +0000
413@@ -23,10 +23,6 @@
414
415 namespace dcpp {
416
417-#ifndef _WIN32
418-pthread_mutex_t Thread::mtx = PTHREAD_MUTEX_INITIALIZER;
419-#endif
420-
421 #ifdef _WIN32
422 void Thread::start() throw(ThreadException) {
423 join();
424
425=== modified file 'dcpp/Thread.h'
426--- dcpp/Thread.h 2009-03-01 05:27:41 +0000
427+++ dcpp/Thread.h 2010-08-15 19:35:53 +0000
428@@ -21,7 +21,6 @@
429
430 #ifndef _WIN32
431 #include <pthread.h>
432-#include <sched.h>
433 #include <sys/resource.h>
434 #endif
435
436@@ -62,18 +61,6 @@
437 void setThreadPriority(Priority p) throw() { ::SetThreadPriority(threadHandle, p); }
438
439 static void sleep(uint32_t millis) { ::Sleep(millis); }
440- static void yield() { ::Sleep(1); }
441-
442-#ifdef __MINGW32__
443- static long safeInc(volatile long& v) { return InterlockedIncrement((long*)&v); }
444- static long safeDec(volatile long& v) { return InterlockedDecrement((long*)&v); }
445- static long safeExchange(volatile long& target, long value) { return InterlockedExchange((long*)&target, value); }
446-
447-#else
448- static long safeInc(volatile long& v) { return InterlockedIncrement(&v); }
449- static long safeDec(volatile long& v) { return InterlockedDecrement(&v); }
450- static long safeExchange(volatile long& target, long value) { return InterlockedExchange(&target, value); }
451-#endif
452
453 #else
454
455@@ -99,26 +86,6 @@
456
457 void setThreadPriority(Priority p) { setpriority(PRIO_PROCESS, 0, p); }
458 static void sleep(uint32_t millis) { ::usleep(millis*1000); }
459- static void yield() { ::sched_yield(); }
460- static long safeInc(volatile long& v) {
461- pthread_mutex_lock(&mtx);
462- long ret = ++v;
463- pthread_mutex_unlock(&mtx);
464- return ret;
465- }
466- static long safeDec(volatile long& v) {
467- pthread_mutex_lock(&mtx);
468- long ret = --v;
469- pthread_mutex_unlock(&mtx);
470- return ret;
471- }
472- static long safeExchange(volatile long& target, long value) {
473- pthread_mutex_lock(&mtx);
474- long ret = target;
475- target = value;
476- pthread_mutex_unlock(&mtx);
477- return ret;
478- }
479 #endif
480
481 protected:
482@@ -133,7 +100,6 @@
483 return 0;
484 }
485 #else
486- static pthread_mutex_t mtx;
487 pthread_t threadHandle;
488 static void* starter(void* p) {
489 Thread* t = (Thread*)p;

Subscribers

People subscribed via source and target branches

to status/vote changes: