Merge lp:~gpr/dcplusplus/sync into lp:dcplusplus

Proposed by Gennady Proskurin
Status: Needs review
Proposed branch: lp:~gpr/dcplusplus/sync
Merge into: lp:dcplusplus
Diff against target: 408 lines (+171/-53)
10 files modified
dcpp/Atomic.h (+122/-0)
dcpp/BufferedSocket.cpp (+3/-3)
dcpp/BufferedSocket.h (+5/-1)
dcpp/Client.cpp (+6/-6)
dcpp/Client.h (+14/-6)
dcpp/CriticalSection.h (+7/-27)
dcpp/Pointer.h (+4/-4)
dcpp/Semaphore.h (+5/-2)
dcpp/ShareManager.cpp (+3/-3)
dcpp/ShareManager.h (+2/-1)
To merge this branch: bzr merge lp:~gpr/dcplusplus/sync
Reviewer Review Type Date Requested Status
Jacek Sieka Needs Information
Review via email: mp+32912@code.launchpad.net

Description of the change

+ includes all patches submitted in corresponding bug reports
+ Thread::safeDec/Inc/Exchange functions are replaced by more fine-grained/lightweight/portable implementations

To post a comment you must log in.
Revision history for this message
Jacek Sieka (arnetheduck) wrote :

I'm not quite sure I understand why one of the counters uses interprocess::detail and the other shared_ptr::detail - if we're relying on internals, I'd minimise their use.

Also, I do not understand the comment on weak vs strong ordering, could you elaborate please.

Regarding fastcriticalsection, it's a spin lock which is quite appropriate if the sections locked are (very) short (small risk of contention) as OS mutexes add a lot of overhead...

review: Needs Information
Revision history for this message
Gennady Proskurin (gpr) wrote :

About weak/strong ordering, much information is available in internet, for example:
http://en.wikipedia.org/wiki/Memory_ordering
http://en.wikipedia.org/wiki/Memory_barrier
http://msdn.microsoft.com/en-us/library/ms686355(VS.85).aspx

In short, "strong" ordering garantees that all threads in all processors see identical synchronized view of memory, because synchronization primitives (mutex in case of FastCriticalSection in Atomic<strong>) do all necessary memory barriers (synchronization of memory between all processors).

In "weak" ordering (which is used for thread-safe counters in my code), you cannot do any action, based on counter's value. For example, you cannot write code like "if (counter==0) do_some_multithread_action()" because only counter value is synchronized (during atomic access to it), and other memory content may be stale. This is suitable for accounting of some statistics for example.

For smart pointer counters, it is necessary to synchronize memory only in case when counter reaches zero, when destructor is called. This is exactly what shared_ptr counters do. So I used shared_ptr/atomic for intrusive_ptr refcount, which was designed exactly for this purpose - thread-safe refcounting in smart pointers.

But shared_ptr/atomic does not have assignment semantics, only construction. So for Atomic<weak> I used another atomic from boost - interprocess::detail. My Atomic template was designed to be general-purpose thread-safe counter, so it should have assignment.

Revision history for this message
Gennady Proskurin (gpr) wrote :

My motivation for writing all this patches (besides bug in using pthread_cond) was bad (coarse) locking for unix. It uses one static mutex in Thread class for almost all synchronization in the whole code.

While I agree that spinning is suitable for FastCriticalSection, it's not easy to implement it properly and effective. At first glance, current implementation may be subject to priority inversion, and therefore, deadlocks.

I'm not sure, what solution will be acceptable for both unix and windows. For intrusive_ptr, I'd just use boost shared_ptr/atomic (as in my patch). For critical sections, I think correctness is priority, and then optimization. And portability is also good.

Unmerged revisions

2216. By Gennady Proskurin

Change BufferedSocket::sockets to Atomic (strong variant).

2215. By Gennady Proskurin

Use Atomic<bool> for "refreshing" variable.

2214. By Gennady Proskurin

Implement Atomic template for atomic counters.

2213. By Gennady Proskurin

Portable FastCriticalSection implementation.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'dcpp/Atomic.h'
2--- dcpp/Atomic.h 1970-01-01 00:00:00 +0000
3+++ dcpp/Atomic.h 2010-08-17 18:57:44 +0000
4@@ -0,0 +1,122 @@
5+#if !defined(DCPP_ATOMIC_H)
6+#define DCPP_ATOMIC_H
7+
8+#include "CriticalSection.h"
9+
10+#include <boost/interprocess/detail/atomic.hpp>
11+#include <boost/cstdint.hpp>
12+
13+namespace dcpp {
14+
15+// Ordering arguments:
16+
17+// memory_ordering_weak
18+// Suitable only for thread-safe accounting of some statistics.
19+// Value can not be used as "flag" (you cannot do any multi-thread action, based
20+// on this value) since it does not garantees necessary memory barriers.
21+class memory_ordering_weak {};
22+
23+// memory_ordering_strong
24+// Suitable for any multi-thread purpose
25+class memory_ordering_strong {};
26+
27+template <typename DataType, class Ordering = memory_ordering_strong>
28+class Atomic;
29+
30+
31+// uint32_t
32+template <>
33+class Atomic<boost::uint32_t, memory_ordering_weak> {
34+ typedef boost::uint32_t value_type;
35+public:
36+ Atomic(value_type val) { assign(val); }
37+ Atomic(const Atomic& other) { assign(static_cast<value_type>(other)); }
38+
39+ // operator=
40+ // return void to be safe
41+ void operator=(value_type val) { assign(val); }
42+ void operator=(const Atomic& other) {
43+ return operator=(static_cast<value_type>(other));
44+ }
45+
46+ // type cast
47+ operator value_type() const {
48+ return boost::interprocess::detail::atomic_read32(&m_value);
49+ }
50+
51+ // increment
52+ void inc() { boost::interprocess::detail::atomic_inc32(&m_value); }
53+
54+ // decrement
55+ void dec() { boost::interprocess::detail::atomic_dec32(&m_value); }
56+
57+private:
58+ mutable value_type m_value;
59+ void assign(value_type val) { boost::interprocess::detail::atomic_write32(&m_value, val); }
60+};
61+
62+// int32_t
63+// just forward all operations to underlying Atomic<uint32_t, ...> variable
64+template <>
65+class Atomic<boost::int32_t, memory_ordering_weak> {
66+ typedef boost::int32_t value_type;
67+public:
68+ Atomic(value_type val) : m_value(val) {}
69+ Atomic(const Atomic& other) : m_value(other) {}
70+
71+ void operator=(value_type val) { m_value=val; }
72+ void operator=(const Atomic& other) { m_value=other; }
73+ operator value_type() const { return static_cast<value_type>(m_value); }
74+
75+ void inc() { m_value.inc(); }
76+ void dec() { m_value.dec(); }
77+private:
78+ Atomic<boost::uint32_t,memory_ordering_weak> m_value;
79+};
80+
81+// memory_ordering_strong
82+template <typename DataType>
83+class Atomic<DataType, memory_ordering_strong> {
84+ typedef DataType value_type;
85+public:
86+ Atomic(value_type new_value) : m_value(new_value) {}
87+ Atomic(const Atomic& other) : m_value(static_cast<value_type>(other)) {}
88+
89+ void operator=(value_type new_value) {
90+ FastLock Lock(cs);
91+ m_value = new_value;
92+ }
93+ void operator=(const Atomic& other) {
94+ FastLock Lock(cs);
95+ m_value = other;
96+ }
97+ operator value_type() const {
98+ FastLock Lock(cs); // shared (read-only) lock would be sufficient here
99+ return m_value;
100+ }
101+
102+ void inc() {
103+ FastLock Lock(cs);
104+ ++m_value;
105+ }
106+ void dec() {
107+ FastLock Lock(cs);
108+ --m_value;
109+ }
110+
111+ // assign new value, return old value
112+ value_type exchange(value_type new_val) {
113+ FastLock Lock(cs);
114+ value_type old_val = m_value;
115+ m_value = new_val;
116+ return old_val;
117+ }
118+private:
119+ value_type m_value;
120+ mutable FastCriticalSection cs;
121+};
122+
123+
124+} // namespace dcpp
125+
126+#endif // !defined(DCPP_ATOMIC_H)
127
128=== modified file 'dcpp/BufferedSocket.cpp'
129--- dcpp/BufferedSocket.cpp 2010-07-10 14:36:48 +0000
130+++ dcpp/BufferedSocket.cpp 2010-08-17 18:57:44 +0000
131@@ -42,13 +42,13 @@
132 {
133 start();
134
135- Thread::safeInc(sockets);
136+ sockets.inc();
137 }
138
139-volatile long BufferedSocket::sockets = 0;
140+Atomic<long,memory_ordering_strong> BufferedSocket::sockets(0);
141
142 BufferedSocket::~BufferedSocket() throw() {
143- Thread::safeDec(sockets);
144+ sockets.dec();
145 }
146
147 void BufferedSocket::setMode (Modes aMode, size_t aRollback) {
148
149=== modified file 'dcpp/BufferedSocket.h'
150--- dcpp/BufferedSocket.h 2010-07-10 14:36:48 +0000
151+++ dcpp/BufferedSocket.h 2010-08-17 18:57:44 +0000
152@@ -26,6 +26,7 @@
153 #include "Speaker.h"
154 #include "Util.h"
155 #include "Socket.h"
156+#include "Atomic.h"
157
158 namespace dcpp {
159
160@@ -163,7 +164,10 @@
161 void threadSendData() throw(Exception);
162
163 void fail(const string& aError);
164- static volatile long sockets;
165+
166+ // For this counter we must use "strong" variant of Atomic
167+ // We do some actions after checking this value, while it changes in other threads
168+ static Atomic<long,memory_ordering_strong> sockets;
169
170 bool checkEvents() throw(Exception);
171 void checkSocket() throw(Exception);
172
173=== modified file 'dcpp/Client.cpp'
174--- dcpp/Client.cpp 2010-02-11 21:44:13 +0000
175+++ dcpp/Client.cpp 2010-08-17 18:57:44 +0000
176@@ -158,24 +158,24 @@
177 void Client::updateCounts(bool aRemove) {
178 // We always remove the count and then add the correct one if requested...
179 if(countType == COUNT_NORMAL) {
180- Thread::safeDec(counts.normal);
181+ counts.normal.dec();
182 } else if(countType == COUNT_REGISTERED) {
183- Thread::safeDec(counts.registered);
184+ counts.registered.dec();
185 } else if(countType == COUNT_OP) {
186- Thread::safeDec(counts.op);
187+ counts.op.dec();
188 }
189
190 countType = COUNT_UNCOUNTED;
191
192 if(!aRemove) {
193 if(getMyIdentity().isOp()) {
194- Thread::safeInc(counts.op);
195+ counts.op.inc();
196 countType = COUNT_OP;
197 } else if(getMyIdentity().isRegistered()) {
198- Thread::safeInc(counts.registered);
199+ counts.registered.inc();
200 countType = COUNT_REGISTERED;
201 } else {
202- Thread::safeInc(counts.normal);
203+ counts.normal.inc();
204 countType = COUNT_NORMAL;
205 }
206 }
207
208=== modified file 'dcpp/Client.h'
209--- dcpp/Client.h 2010-06-30 14:40:00 +0000
210+++ dcpp/Client.h 2010-08-17 18:57:44 +0000
211@@ -26,6 +26,7 @@
212 #include "BufferedSocketListener.h"
213 #include "TimerManager.h"
214 #include "ClientListener.h"
215+#include "Atomic.h"
216
217 namespace dcpp {
218
219@@ -72,7 +73,10 @@
220
221 static string getCounts() {
222 char buf[128];
223- return string(buf, snprintf(buf, sizeof(buf), "%ld/%ld/%ld", counts.normal, counts.registered, counts.op));
224+ return string(buf, snprintf(buf, sizeof(buf), "%ld/%ld/%ld",
225+ static_cast<long>(counts.normal),
226+ static_cast<long>(counts.registered),
227+ static_cast<long>(counts.op)));
228 }
229
230 StringMap& escapeParams(StringMap& sm) {
231@@ -112,12 +116,16 @@
232 friend class ClientManager;
233 Client(const string& hubURL, char separator, bool secure_);
234 virtual ~Client() throw();
235+
236 struct Counts {
237- Counts(long n = 0, long r = 0, long o = 0) : normal(n), registered(r), op(o) { }
238- volatile long normal;
239- volatile long registered;
240- volatile long op;
241- bool operator !=(const Counts& rhs) { return normal != rhs.normal || registered != rhs.registered || op != rhs.op; }
242+ private:
243+ typedef Atomic<boost::int32_t,memory_ordering_weak> atomic_counter_t;
244+ public:
245+ typedef boost::int32_t value_type;
246+ Counts(value_type n = 0, value_type r = 0, value_type o = 0) : normal(n), registered(r), op(o) { }
247+ atomic_counter_t normal;
248+ atomic_counter_t registered;
249+ atomic_counter_t op;
250 };
251
252 enum States {
253
254=== modified file 'dcpp/CriticalSection.h'
255--- dcpp/CriticalSection.h 2010-04-23 18:32:11 +0000
256+++ dcpp/CriticalSection.h 2010-08-17 18:57:44 +0000
257@@ -19,7 +19,8 @@
258 #if !defined(CRITICAL_SECTION_H)
259 #define CRITICAL_SECTION_H
260
261-#include "Thread.h"
262+// header-only implementation of mutex
263+#include <boost/signals2/mutex.hpp>
264
265 namespace dcpp {
266
267@@ -78,32 +79,11 @@
268 */
269 class FastCriticalSection {
270 public:
271-#ifdef _WIN32
272- FastCriticalSection() : state(0) { }
273-
274- void enter() {
275- while(Thread::safeExchange(state, 1) == 1) {
276- Thread::sleep(1);
277- }
278- }
279- void leave() {
280- Thread::safeDec(state);
281- }
282-private:
283- volatile long state;
284-
285-#else
286- // We have to use a pthread (nonrecursive) mutex, didn't find any test_and_set on linux...
287- FastCriticalSection() {
288- static pthread_mutex_t fastmtx = PTHREAD_MUTEX_INITIALIZER;
289- mtx = fastmtx;
290- }
291- ~FastCriticalSection() { pthread_mutex_destroy(&mtx); }
292- void enter() { pthread_mutex_lock(&mtx); }
293- void leave() { pthread_mutex_unlock(&mtx); }
294-private:
295- pthread_mutex_t mtx;
296-#endif
297+ void enter() { mtx.lock(); }
298+ void leave() { mtx.unlock(); }
299+private:
300+ typedef boost::signals2::mutex mutex_t;
301+ mutex_t mtx;
302 };
303
304 template<class T>
305
306=== modified file 'dcpp/Pointer.h'
307--- dcpp/Pointer.h 2010-02-11 21:44:13 +0000
308+++ dcpp/Pointer.h 2010-08-17 18:57:44 +0000
309@@ -20,7 +20,7 @@
310 #define DCPLUSPLUS_DCPP_POINTER_H
311
312 #include <boost/intrusive_ptr.hpp>
313-#include "Thread.h"
314+#include <boost/smart_ptr/detail/atomic_count.hpp>
315
316 namespace dcpp {
317
318@@ -36,10 +36,10 @@
319 intrusive_ptr_base() throw() : ref(0) { }
320
321 private:
322- friend void intrusive_ptr_add_ref(intrusive_ptr_base* p) { Thread::safeInc(p->ref); }
323- friend void intrusive_ptr_release(intrusive_ptr_base* p) { if(Thread::safeDec(p->ref) == 0) { delete static_cast<T*>(p); } }
324+ friend void intrusive_ptr_add_ref(intrusive_ptr_base* p) { ++p->ref; }
325+ friend void intrusive_ptr_release(intrusive_ptr_base* p) { if(--p->ref == 0) { delete static_cast<T*>(p); } }
326
327- volatile long ref;
328+ boost::detail::atomic_count ref;
329 };
330
331
332
333=== modified file 'dcpp/Semaphore.h'
334--- dcpp/Semaphore.h 2010-02-11 21:44:13 +0000
335+++ dcpp/Semaphore.h 2010-08-17 18:57:44 +0000
336@@ -59,7 +59,7 @@
337
338 bool wait() throw() {
339 Lock l(cs);
340- if(count == 0) {
341+ while (count == 0) {
342 pthread_cond_wait(&cond, &cs.getMutex());
343 }
344 count--;
345@@ -74,7 +74,10 @@
346 millis+=timev.tv_usec/1000;
347 t.tv_sec = timev.tv_sec + (millis/1000);
348 t.tv_nsec = (millis%1000)*1000*1000;
349- int ret = pthread_cond_timedwait(&cond, &cs.getMutex(), &t);
350+ int ret;
351+ do {
352+ ret = pthread_cond_timedwait(&cond, &cs.getMutex(), &t);
353+ } while (ret==0 && count==0);
354 if(ret != 0) {
355 return false;
356 }
357
358=== modified file 'dcpp/ShareManager.cpp'
359--- dcpp/ShareManager.cpp 2010-07-10 14:36:48 +0000
360+++ dcpp/ShareManager.cpp 2010-08-17 18:57:44 +0000
361@@ -52,7 +52,7 @@
362 namespace dcpp {
363
364 ShareManager::ShareManager() : hits(0), xmlListLen(0), bzXmlListLen(0),
365- xmlDirty(true), forceXmlRefresh(false), refreshDirs(false), update(false), initial(true), listN(0), refreshing(0),
366+ xmlDirty(true), forceXmlRefresh(false), refreshDirs(false), update(false), initial(true), listN(0), refreshing(false),
367 lastXmlUpdate(0), lastFullUpdate(GET_TICK()), bloom(1<<20)
368 {
369 SettingsManager::getInstance()->addListener(this);
370@@ -723,7 +723,7 @@
371 }
372
373 void ShareManager::refresh(bool dirs /* = false */, bool aUpdate /* = true */, bool block /* = false */) throw() {
374- if(Thread::safeExchange(refreshing, 1) == 1) {
375+ if(refreshing.exchange(true) == true) {
376 LogManager::getInstance()->message(_("File list refresh in progress, please wait for it to finish before trying to refresh again"));
377 return;
378 }
379@@ -798,7 +798,7 @@
380 if(update) {
381 ClientManager::getInstance()->infoUpdated();
382 }
383- refreshing = 0;
384+ refreshing = false;
385 return 0;
386 }
387
388
389=== modified file 'dcpp/ShareManager.h'
390--- dcpp/ShareManager.h 2010-07-10 14:36:48 +0000
391+++ dcpp/ShareManager.h 2010-08-17 18:57:44 +0000
392@@ -33,6 +33,7 @@
393 #include "FastAlloc.h"
394 #include "MerkleTree.h"
395 #include "Pointer.h"
396+#include "Atomic.h"
397
398 namespace dcpp {
399
400@@ -251,7 +252,7 @@
401
402 int listN;
403
404- volatile long refreshing;
405+ Atomic<bool,memory_ordering_strong> refreshing;
406
407 uint64_t lastXmlUpdate;
408 uint64_t lastFullUpdate;

Subscribers

People subscribed via source and target branches

to status/vote changes: