Merge lp:~gpr/linuxdcpp/sync into lp:linuxdcpp

Proposed by Gennady Proskurin
Status: Needs review
Proposed branch: lp:~gpr/linuxdcpp/sync
Merge into: lp:linuxdcpp
Diff against target: 489 lines (+171/-91)
12 files modified
dcpp/Atomic.h (+122/-0)
dcpp/BufferedSocket.cpp (+3/-3)
dcpp/BufferedSocket.h (+5/-1)
dcpp/Client.cpp (+6/-6)
dcpp/Client.h (+14/-6)
dcpp/CriticalSection.h (+7/-27)
dcpp/Pointer.h (+4/-4)
dcpp/Semaphore.h (+5/-2)
dcpp/ShareManager.cpp (+3/-3)
dcpp/ShareManager.h (+2/-1)
dcpp/Thread.cpp (+0/-4)
dcpp/Thread.h (+0/-34)
To merge this branch: bzr merge lp:~gpr/linuxdcpp/sync
Reviewer Review Type Date Requested Status
LinuxDC++ Team Pending
Review via email: mp+32714@code.launchpad.net

Description of the change

+ includes all patches submitted in corresponding bug reports
+ Thread::safeDec/Inc/Exchange functions are replaced by more fine-grained/lightweight/portable implementations
+ Thread::safeDec/Inc/Exchange functions removed (as unused)

Bug #617021: Semaphore potentially may underflow and become negative
Bug #617591: Pointer.h/intrusive_ptr_base class is too heavy-weight
Bug #617757: portable FastCriticalSection implementation
Bug #617988: atomic counters implemented

To post a comment you must log in.

Unmerged revisions

391. By Gennady Proskurin

Remove unused include <sched.h>, which was used for sched_yield() earlier.

390. By Gennady Proskurin

Remove unused Thread::safeExchange function and associated mutex.

389. By Gennady Proskurin

Use Atomic<bool> for "refreshing" variable.

388. By Gennady Proskurin

Implement exchange() function for Atomic template (only for memory_ordering_strong for now).
It assigns new value to atomic, returns old value.

387. By Gennady Proskurin

Remove Thread::safeInc/safeDec functions. They are unused now.

386. By Gennady Proskurin

Change BufferedSocket::sockets to Atomic (strong variant).

385. By Gennady Proskurin

Implement "strong memory ordering" variant of Atomic template.
For counters in "struct Counts" "weak" variant is sufficient.

384. By Gennady Proskurin

Remove unused Thread::yield functions.

383. By Gennady Proskurin

include <boost/detail/atomic_count.hpp> -> <boost/smart_ptr/detail/atomic_count.hpp>
The later header is garanteed to have necessary memory barrier for refcounting.
For now, it is no-op (it is the same header).

382. By Gennady Proskurin

Recheck predicate after wakeup of cond_wait/cond_timedwait

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'dcpp/Atomic.h'
2--- dcpp/Atomic.h 1970-01-01 00:00:00 +0000
3+++ dcpp/Atomic.h 2010-08-15 19:35:53 +0000
4@@ -0,0 +1,122 @@
5+#if !defined(DCPP_ATOMIC_H)
6+#define DCPP_ATOMIC_H
7+
8+#include "CriticalSection.h"
9+
10+#include <boost/interprocess/detail/atomic.hpp>
11+#include <boost/cstdint.hpp>
12+
13+namespace dcpp {
14+
15+// Ordering arguments:
16+
17+// memory_ordering_weak
18+// Suitable only for thread-safe accounting of some statistics.
19+// Value can not be used as "flag" (you cannot do any multi-thread action, based
20+// on this value) since it does not garantees necessary memory barriers.
21+class memory_ordering_weak {};
22+
23+// memory_ordering_strong
24+// Suitable for any multi-thread purpose
25+class memory_ordering_strong {};
26+
27+template <typename DataType, class Ordering = memory_ordering_strong>
28+class Atomic;
29+
30+
31+// uint32_t
32+template <>
33+class Atomic<boost::uint32_t, memory_ordering_weak> {
34+ typedef boost::uint32_t value_type;
35+public:
36+ Atomic(value_type val) { assign(val); }
37+ Atomic(const Atomic& other) { assign(static_cast<value_type>(other)); }
38+
39+ // operator=
40+ // return void to be safe
41+ void operator=(value_type val) { assign(val); }
42+ void operator=(const Atomic& other) {
43+ return operator=(static_cast<value_type>(other));
44+ }
45+
46+ // type cast
47+ operator value_type() const {
48+ return boost::interprocess::detail::atomic_read32(&m_value);
49+ }
50+
51+ // increment
52+ void inc() { boost::interprocess::detail::atomic_inc32(&m_value); }
53+
54+ // decrement
55+ void dec() { boost::interprocess::detail::atomic_dec32(&m_value); }
56+
57+private:
58+ mutable value_type m_value;
59+ void assign(value_type val) { boost::interprocess::detail::atomic_write32(&m_value, val); }
60+};
61+
62+// int32_t
63+// just forward all operations to underlying Atomic<uint32_t, ...> variable
64+template <>
65+class Atomic<boost::int32_t, memory_ordering_weak> {
66+ typedef boost::int32_t value_type;
67+public:
68+ Atomic(value_type val) : m_value(val) {}
69+ Atomic(const Atomic& other) : m_value(other) {}
70+
71+ void operator=(value_type val) { m_value=val; }
72+ void operator=(const Atomic& other) { m_value=other; }
73+ operator value_type() const { return static_cast<value_type>(m_value); }
74+
75+ void inc() { m_value.inc(); }
76+ void dec() { m_value.dec(); }
77+private:
78+ Atomic<boost::uint32_t,memory_ordering_weak> m_value;
79+};
80+
81+// memory_ordering_strong
82+template <typename DataType>
83+class Atomic<DataType, memory_ordering_strong> {
84+ typedef DataType value_type;
85+public:
86+ Atomic(value_type new_value) : m_value(new_value) {}
87+ Atomic(const Atomic& other) : m_value(static_cast<value_type>(other)) {}
88+
89+ void operator=(value_type new_value) {
90+ FastLock Lock(cs);
91+ m_value = new_value;
92+ }
93+ void operator=(const Atomic& other) {
94+ FastLock Lock(cs);
95+ m_value = other;
96+ }
97+ operator value_type() const {
98+ FastLock Lock(cs); // shared (read-only) lock would be sufficient here
99+ return m_value;
100+ }
101+
102+ void inc() {
103+ FastLock Lock(cs);
104+ ++m_value;
105+ }
106+ void dec() {
107+ FastLock Lock(cs);
108+ --m_value;
109+ }
110+
111+ // assign new value, return old value
112+ value_type exchange(value_type new_val) {
113+ FastLock Lock(cs);
114+ value_type old_val = m_value;
115+ m_value = new_val;
116+ return old_val;
117+ }
118+private:
119+ value_type m_value;
120+ mutable FastCriticalSection cs;
121+};
122+
123+
124+} // namespace dcpp
125+
126+#endif // !defined(DCPP_ATOMIC_H)
127
128=== modified file 'dcpp/BufferedSocket.cpp'
129--- dcpp/BufferedSocket.cpp 2009-08-15 04:40:26 +0000
130+++ dcpp/BufferedSocket.cpp 2010-08-15 19:35:53 +0000
131@@ -40,13 +40,13 @@
132 {
133 start();
134
135- Thread::safeInc(sockets);
136+ sockets.inc();
137 }
138
139-volatile long BufferedSocket::sockets = 0;
140+Atomic<long,memory_ordering_strong> BufferedSocket::sockets(0);
141
142 BufferedSocket::~BufferedSocket() throw() {
143- Thread::safeDec(sockets);
144+ sockets.dec();
145 }
146
147 void BufferedSocket::setMode (Modes aMode, size_t aRollback) {
148
149=== modified file 'dcpp/BufferedSocket.h'
150--- dcpp/BufferedSocket.h 2009-08-15 04:40:26 +0000
151+++ dcpp/BufferedSocket.h 2010-08-15 19:35:53 +0000
152@@ -26,6 +26,7 @@
153 #include "Speaker.h"
154 #include "Util.h"
155 #include "Socket.h"
156+#include "Atomic.h"
157
158 namespace dcpp {
159
160@@ -153,7 +154,10 @@
161 void threadSendData() throw(Exception);
162
163 void fail(const string& aError);
164- static volatile long sockets;
165+
166+ // For this counter we must use "strong" variant of Atomic
167+ // We do some actions after checking this value, while it changes in other threads
168+ static Atomic<long,memory_ordering_strong> sockets;
169
170 bool checkEvents() throw(Exception);
171 void checkSocket() throw(Exception);
172
173=== modified file 'dcpp/Client.cpp'
174--- dcpp/Client.cpp 2009-08-15 04:40:26 +0000
175+++ dcpp/Client.cpp 2010-08-15 19:35:53 +0000
176@@ -158,24 +158,24 @@
177 void Client::updateCounts(bool aRemove) {
178 // We always remove the count and then add the correct one if requested...
179 if(countType == COUNT_NORMAL) {
180- Thread::safeDec(counts.normal);
181+ counts.normal.dec();
182 } else if(countType == COUNT_REGISTERED) {
183- Thread::safeDec(counts.registered);
184+ counts.registered.dec();
185 } else if(countType == COUNT_OP) {
186- Thread::safeDec(counts.op);
187+ counts.op.dec();
188 }
189
190 countType = COUNT_UNCOUNTED;
191
192 if(!aRemove) {
193 if(getMyIdentity().isOp()) {
194- Thread::safeInc(counts.op);
195+ counts.op.inc();
196 countType = COUNT_OP;
197 } else if(getMyIdentity().isRegistered()) {
198- Thread::safeInc(counts.registered);
199+ counts.registered.inc();
200 countType = COUNT_REGISTERED;
201 } else {
202- Thread::safeInc(counts.normal);
203+ counts.normal.inc();
204 countType = COUNT_NORMAL;
205 }
206 }
207
208=== modified file 'dcpp/Client.h'
209--- dcpp/Client.h 2009-08-15 04:40:26 +0000
210+++ dcpp/Client.h 2010-08-15 19:35:53 +0000
211@@ -26,6 +26,7 @@
212 #include "BufferedSocketListener.h"
213 #include "TimerManager.h"
214 #include "ClientListener.h"
215+#include "Atomic.h"
216
217 namespace dcpp {
218
219@@ -72,7 +73,10 @@
220
221 static string getCounts() {
222 char buf[128];
223- return string(buf, snprintf(buf, sizeof(buf), "%ld/%ld/%ld", counts.normal, counts.registered, counts.op));
224+ return string(buf, snprintf(buf, sizeof(buf), "%ld/%ld/%ld",
225+ static_cast<long>(counts.normal),
226+ static_cast<long>(counts.registered),
227+ static_cast<long>(counts.op)));
228 }
229
230 StringMap& escapeParams(StringMap& sm) {
231@@ -112,12 +116,16 @@
232 friend class ClientManager;
233 Client(const string& hubURL, char separator, bool secure_);
234 virtual ~Client() throw();
235+
236 struct Counts {
237- Counts(long n = 0, long r = 0, long o = 0) : normal(n), registered(r), op(o) { }
238- volatile long normal;
239- volatile long registered;
240- volatile long op;
241- bool operator !=(const Counts& rhs) { return normal != rhs.normal || registered != rhs.registered || op != rhs.op; }
242+ private:
243+ typedef Atomic<boost::int32_t,memory_ordering_weak> atomic_counter_t;
244+ public:
245+ typedef boost::int32_t value_type;
246+ Counts(value_type n = 0, value_type r = 0, value_type o = 0) : normal(n), registered(r), op(o) { }
247+ atomic_counter_t normal;
248+ atomic_counter_t registered;
249+ atomic_counter_t op;
250 };
251
252 enum States {
253
254=== modified file 'dcpp/CriticalSection.h'
255--- dcpp/CriticalSection.h 2009-02-23 01:47:25 +0000
256+++ dcpp/CriticalSection.h 2010-08-15 19:35:53 +0000
257@@ -19,7 +19,8 @@
258 #if !defined(CRITICAL_SECTION_H)
259 #define CRITICAL_SECTION_H
260
261-#include "Thread.h"
262+// header-only implementation of mutex
263+#include <boost/signals2/mutex.hpp>
264
265 namespace dcpp {
266
267@@ -78,32 +79,11 @@
268 */
269 class FastCriticalSection {
270 public:
271-#ifdef _WIN32
272- FastCriticalSection() : state(0) { }
273-
274- void enter() {
275- while(Thread::safeExchange(state, 1) == 1) {
276- Thread::yield();
277- }
278- }
279- void leave() {
280- Thread::safeDec(state);
281- }
282-private:
283- volatile long state;
284-
285-#else
286- // We have to use a pthread (nonrecursive) mutex, didn't find any test_and_set on linux...
287- FastCriticalSection() {
288- static pthread_mutex_t fastmtx = PTHREAD_MUTEX_INITIALIZER;
289- mtx = fastmtx;
290- }
291- ~FastCriticalSection() { pthread_mutex_destroy(&mtx); }
292- void enter() { pthread_mutex_lock(&mtx); }
293- void leave() { pthread_mutex_unlock(&mtx); }
294-private:
295- pthread_mutex_t mtx;
296-#endif
297+ void enter() { mtx.lock(); }
298+ void leave() { mtx.unlock(); }
299+private:
300+ typedef boost::signals2::mutex mutex_t;
301+ mutex_t mtx;
302 };
303
304 template<class T>
305
306=== modified file 'dcpp/Pointer.h'
307--- dcpp/Pointer.h 2009-08-15 04:40:26 +0000
308+++ dcpp/Pointer.h 2010-08-15 19:35:53 +0000
309@@ -20,7 +20,7 @@
310 #define DCPLUSPLUS_DCPP_POINTER_H
311
312 #include <boost/intrusive_ptr.hpp>
313-#include "Thread.h"
314+#include <boost/smart_ptr/detail/atomic_count.hpp>
315
316 namespace dcpp {
317
318@@ -36,10 +36,10 @@
319 intrusive_ptr_base() throw() : ref(0) { }
320
321 private:
322- friend void intrusive_ptr_add_ref(intrusive_ptr_base* p) { Thread::safeInc(p->ref); }
323- friend void intrusive_ptr_release(intrusive_ptr_base* p) { if(Thread::safeDec(p->ref) == 0) { delete static_cast<T*>(p); } }
324+ friend void intrusive_ptr_add_ref(intrusive_ptr_base* p) { ++p->ref; }
325+ friend void intrusive_ptr_release(intrusive_ptr_base* p) { if(--p->ref == 0) { delete static_cast<T*>(p); } }
326
327- volatile long ref;
328+ boost::detail::atomic_count ref;
329 };
330
331
332
333=== modified file 'dcpp/Semaphore.h'
334--- dcpp/Semaphore.h 2009-02-23 01:47:25 +0000
335+++ dcpp/Semaphore.h 2010-08-15 19:35:53 +0000
336@@ -59,7 +59,7 @@
337
338 bool wait() throw() {
339 Lock l(cs);
340- if(count == 0) {
341+ while (count == 0) {
342 pthread_cond_wait(&cond, &cs.getMutex());
343 }
344 count--;
345@@ -74,7 +74,10 @@
346 millis+=timev.tv_usec/1000;
347 t.tv_sec = timev.tv_sec + (millis/1000);
348 t.tv_nsec = (millis%1000)*1000*1000;
349- int ret = pthread_cond_timedwait(&cond, &cs.getMutex(), &t);
350+ int ret;
351+ do {
352+ ret = pthread_cond_timedwait(&cond, &cs.getMutex(), &t);
353+ } while (ret==0 && count==0);
354 if(ret != 0) {
355 return false;
356 }
357
358=== modified file 'dcpp/ShareManager.cpp'
359--- dcpp/ShareManager.cpp 2009-12-27 22:03:53 +0000
360+++ dcpp/ShareManager.cpp 2010-08-15 19:35:53 +0000
361@@ -52,7 +52,7 @@
362 namespace dcpp {
363
364 ShareManager::ShareManager() : hits(0), xmlListLen(0), bzXmlListLen(0),
365- xmlDirty(true), refreshDirs(false), update(false), initial(true), listN(0), refreshing(0),
366+ xmlDirty(true), refreshDirs(false), update(false), initial(true), listN(0), refreshing(false),
367 lastXmlUpdate(0), lastFullUpdate(GET_TICK()), bloom(1<<20)
368 {
369 SettingsManager::getInstance()->addListener(this);
370@@ -812,7 +812,7 @@
371 }
372
373 void ShareManager::refresh(bool dirs /* = false */, bool aUpdate /* = true */, bool block /* = false */) throw() {
374- if(Thread::safeExchange(refreshing, 1) == 1) {
375+ if(refreshing.exchange(true) == true) {
376 LogManager::getInstance()->message(_("File list refresh in progress, please wait for it to finish before trying to refresh again"));
377 return;
378 }
379@@ -883,7 +883,7 @@
380 if(update) {
381 ClientManager::getInstance()->infoUpdated();
382 }
383- refreshing = 0;
384+ refreshing = false;
385 return 0;
386 }
387
388
389=== modified file 'dcpp/ShareManager.h'
390--- dcpp/ShareManager.h 2009-12-27 22:03:53 +0000
391+++ dcpp/ShareManager.h 2010-08-15 19:35:53 +0000
392@@ -33,6 +33,7 @@
393 #include "FastAlloc.h"
394 #include "MerkleTree.h"
395 #include "Pointer.h"
396+#include "Atomic.h"
397
398 namespace dcpp {
399
400@@ -249,7 +250,7 @@
401
402 int listN;
403
404- volatile long refreshing;
405+ Atomic<bool,memory_ordering_strong> refreshing;
406
407 uint64_t lastXmlUpdate;
408 uint64_t lastFullUpdate;
409
410=== modified file 'dcpp/Thread.cpp'
411--- dcpp/Thread.cpp 2009-02-23 01:47:25 +0000
412+++ dcpp/Thread.cpp 2010-08-15 19:35:53 +0000
413@@ -23,10 +23,6 @@
414
415 namespace dcpp {
416
417-#ifndef _WIN32
418-pthread_mutex_t Thread::mtx = PTHREAD_MUTEX_INITIALIZER;
419-#endif
420-
421 #ifdef _WIN32
422 void Thread::start() throw(ThreadException) {
423 join();
424
425=== modified file 'dcpp/Thread.h'
426--- dcpp/Thread.h 2009-03-01 05:27:41 +0000
427+++ dcpp/Thread.h 2010-08-15 19:35:53 +0000
428@@ -21,7 +21,6 @@
429
430 #ifndef _WIN32
431 #include <pthread.h>
432-#include <sched.h>
433 #include <sys/resource.h>
434 #endif
435
436@@ -62,18 +61,6 @@
437 void setThreadPriority(Priority p) throw() { ::SetThreadPriority(threadHandle, p); }
438
439 static void sleep(uint32_t millis) { ::Sleep(millis); }
440- static void yield() { ::Sleep(1); }
441-
442-#ifdef __MINGW32__
443- static long safeInc(volatile long& v) { return InterlockedIncrement((long*)&v); }
444- static long safeDec(volatile long& v) { return InterlockedDecrement((long*)&v); }
445- static long safeExchange(volatile long& target, long value) { return InterlockedExchange((long*)&target, value); }
446-
447-#else
448- static long safeInc(volatile long& v) { return InterlockedIncrement(&v); }
449- static long safeDec(volatile long& v) { return InterlockedDecrement(&v); }
450- static long safeExchange(volatile long& target, long value) { return InterlockedExchange(&target, value); }
451-#endif
452
453 #else
454
455@@ -99,26 +86,6 @@
456
457 void setThreadPriority(Priority p) { setpriority(PRIO_PROCESS, 0, p); }
458 static void sleep(uint32_t millis) { ::usleep(millis*1000); }
459- static void yield() { ::sched_yield(); }
460- static long safeInc(volatile long& v) {
461- pthread_mutex_lock(&mtx);
462- long ret = ++v;
463- pthread_mutex_unlock(&mtx);
464- return ret;
465- }
466- static long safeDec(volatile long& v) {
467- pthread_mutex_lock(&mtx);
468- long ret = --v;
469- pthread_mutex_unlock(&mtx);
470- return ret;
471- }
472- static long safeExchange(volatile long& target, long value) {
473- pthread_mutex_lock(&mtx);
474- long ret = target;
475- target = value;
476- pthread_mutex_unlock(&mtx);
477- return ret;
478- }
479 #endif
480
481 protected:
482@@ -133,7 +100,6 @@
483 return 0;
484 }
485 #else
486- static pthread_mutex_t mtx;
487 pthread_t threadHandle;
488 static void* starter(void* p) {
489 Thread* t = (Thread*)p;

Subscribers

People subscribed via source and target branches

to status/vote changes: