Merge lp:~gpr/linuxdcpp/sync into lp:linuxdcpp

Proposed by Gennady Proskurin
Status: Needs review
Proposed branch: lp:~gpr/linuxdcpp/sync
Merge into: lp:linuxdcpp
Diff against target: 489 lines (+171/-91)
12 files modified
dcpp/Atomic.h (+122/-0)
dcpp/BufferedSocket.cpp (+3/-3)
dcpp/BufferedSocket.h (+5/-1)
dcpp/Client.cpp (+6/-6)
dcpp/Client.h (+14/-6)
dcpp/CriticalSection.h (+7/-27)
dcpp/Pointer.h (+4/-4)
dcpp/Semaphore.h (+5/-2)
dcpp/ShareManager.cpp (+3/-3)
dcpp/ShareManager.h (+2/-1)
dcpp/Thread.cpp (+0/-4)
dcpp/Thread.h (+0/-34)
To merge this branch: bzr merge lp:~gpr/linuxdcpp/sync
Reviewer Review Type Date Requested Status
LinuxDC++ Team Pending
Review via email: mp+32714@code.launchpad.net

Description of the change

+ includes all patches submitted in corresponding bug reports
+ Thread::safeDec/Inc/Exchange functions are replaced by more fine-grained/lightweight/portable implementations
+ Thread::safeDec/Inc/Exchange functions removed (as unused)

Bug #617021: Semaphore potentially may underflow and become negative
Bug #617591: Pointer.h/intrusive_ptr_base class is too heavy-weight
Bug #617757: portable FastCriticalSection implementation
Bug #617988: atomic counters implemented

To post a comment you must log in.

Unmerged revisions

391. By Gennady Proskurin

Remove unused include <sched.h>, which was used for sched_yield() earlier.

390. By Gennady Proskurin

Remove unused Thread::safeExchange function and associated mutex.

389. By Gennady Proskurin

Use Atomic<bool> for "refreshing" variable.

388. By Gennady Proskurin

Implement exchange() function for Atomic template (only for memory_ordering_strong for now).
It assigns new value to atomic, returns old value.

387. By Gennady Proskurin

Remove Thread::safeInc/safeDec functions. They are unused now.

386. By Gennady Proskurin

Change BufferedSocket::sockets to Atomic (strong variant).

385. By Gennady Proskurin

Implement "strong memory ordering" variant of Atomic template.
For counters in "struct Counts" "weak" variant is sufficient.

384. By Gennady Proskurin

Remove unused Thread::yield functions.

383. By Gennady Proskurin

include <boost/detail/atomic_count.hpp> -> <boost/smart_ptr/detail/atomic_count.hpp>
The later header is garanteed to have necessary memory barrier for refcounting.
For now, it is no-op (it is the same header).

382. By Gennady Proskurin

Recheck predicate after wakeup of cond_wait/cond_timedwait

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file 'dcpp/Atomic.h'
--- dcpp/Atomic.h 1970-01-01 00:00:00 +0000
+++ dcpp/Atomic.h 2010-08-15 19:35:53 +0000
@@ -0,0 +1,122 @@
1#if !defined(DCPP_ATOMIC_H)
2#define DCPP_ATOMIC_H
3
4#include "CriticalSection.h"
5
6#include <boost/interprocess/detail/atomic.hpp>
7#include <boost/cstdint.hpp>
8
9namespace dcpp {
10
11// Ordering arguments:
12
13// memory_ordering_weak
14// Suitable only for thread-safe accounting of some statistics.
15// Value can not be used as "flag" (you cannot do any multi-thread action, based
16// on this value) since it does not garantees necessary memory barriers.
17class memory_ordering_weak {};
18
19// memory_ordering_strong
20// Suitable for any multi-thread purpose
21class memory_ordering_strong {};
22
23template <typename DataType, class Ordering = memory_ordering_strong>
24class Atomic;
25
26
27// uint32_t
28template <>
29class Atomic<boost::uint32_t, memory_ordering_weak> {
30 typedef boost::uint32_t value_type;
31public:
32 Atomic(value_type val) { assign(val); }
33 Atomic(const Atomic& other) { assign(static_cast<value_type>(other)); }
34
35 // operator=
36 // return void to be safe
37 void operator=(value_type val) { assign(val); }
38 void operator=(const Atomic& other) {
39 return operator=(static_cast<value_type>(other));
40 }
41
42 // type cast
43 operator value_type() const {
44 return boost::interprocess::detail::atomic_read32(&m_value);
45 }
46
47 // increment
48 void inc() { boost::interprocess::detail::atomic_inc32(&m_value); }
49
50 // decrement
51 void dec() { boost::interprocess::detail::atomic_dec32(&m_value); }
52
53private:
54 mutable value_type m_value;
55 void assign(value_type val) { boost::interprocess::detail::atomic_write32(&m_value, val); }
56};
57
58// int32_t
59// just forward all operations to underlying Atomic<uint32_t, ...> variable
60template <>
61class Atomic<boost::int32_t, memory_ordering_weak> {
62 typedef boost::int32_t value_type;
63public:
64 Atomic(value_type val) : m_value(val) {}
65 Atomic(const Atomic& other) : m_value(other) {}
66
67 void operator=(value_type val) { m_value=val; }
68 void operator=(const Atomic& other) { m_value=other; }
69 operator value_type() const { return static_cast<value_type>(m_value); }
70
71 void inc() { m_value.inc(); }
72 void dec() { m_value.dec(); }
73private:
74 Atomic<boost::uint32_t,memory_ordering_weak> m_value;
75};
76
77// memory_ordering_strong
78template <typename DataType>
79class Atomic<DataType, memory_ordering_strong> {
80 typedef DataType value_type;
81public:
82 Atomic(value_type new_value) : m_value(new_value) {}
83 Atomic(const Atomic& other) : m_value(static_cast<value_type>(other)) {}
84
85 void operator=(value_type new_value) {
86 FastLock Lock(cs);
87 m_value = new_value;
88 }
89 void operator=(const Atomic& other) {
90 FastLock Lock(cs);
91 m_value = other;
92 }
93 operator value_type() const {
94 FastLock Lock(cs); // shared (read-only) lock would be sufficient here
95 return m_value;
96 }
97
98 void inc() {
99 FastLock Lock(cs);
100 ++m_value;
101 }
102 void dec() {
103 FastLock Lock(cs);
104 --m_value;
105 }
106
107 // assign new value, return old value
108 value_type exchange(value_type new_val) {
109 FastLock Lock(cs);
110 value_type old_val = m_value;
111 m_value = new_val;
112 return old_val;
113 }
114private:
115 value_type m_value;
116 mutable FastCriticalSection cs;
117};
118
119
120} // namespace dcpp
121
122#endif // !defined(DCPP_ATOMIC_H)
0123
=== modified file 'dcpp/BufferedSocket.cpp'
--- dcpp/BufferedSocket.cpp 2009-08-15 04:40:26 +0000
+++ dcpp/BufferedSocket.cpp 2010-08-15 19:35:53 +0000
@@ -40,13 +40,13 @@
40{40{
41 start();41 start();
4242
43 Thread::safeInc(sockets);43 sockets.inc();
44}44}
4545
46volatile long BufferedSocket::sockets = 0;46Atomic<long,memory_ordering_strong> BufferedSocket::sockets(0);
4747
48BufferedSocket::~BufferedSocket() throw() {48BufferedSocket::~BufferedSocket() throw() {
49 Thread::safeDec(sockets);49 sockets.dec();
50}50}
5151
52void BufferedSocket::setMode (Modes aMode, size_t aRollback) {52void BufferedSocket::setMode (Modes aMode, size_t aRollback) {
5353
=== modified file 'dcpp/BufferedSocket.h'
--- dcpp/BufferedSocket.h 2009-08-15 04:40:26 +0000
+++ dcpp/BufferedSocket.h 2010-08-15 19:35:53 +0000
@@ -26,6 +26,7 @@
26#include "Speaker.h"26#include "Speaker.h"
27#include "Util.h"27#include "Util.h"
28#include "Socket.h"28#include "Socket.h"
29#include "Atomic.h"
2930
30namespace dcpp {31namespace dcpp {
3132
@@ -153,7 +154,10 @@
153 void threadSendData() throw(Exception);154 void threadSendData() throw(Exception);
154155
155 void fail(const string& aError);156 void fail(const string& aError);
156 static volatile long sockets;157
158 // For this counter we must use "strong" variant of Atomic
159 // We do some actions after checking this value, while it changes in other threads
160 static Atomic<long,memory_ordering_strong> sockets;
157161
158 bool checkEvents() throw(Exception);162 bool checkEvents() throw(Exception);
159 void checkSocket() throw(Exception);163 void checkSocket() throw(Exception);
160164
=== modified file 'dcpp/Client.cpp'
--- dcpp/Client.cpp 2009-08-15 04:40:26 +0000
+++ dcpp/Client.cpp 2010-08-15 19:35:53 +0000
@@ -158,24 +158,24 @@
158void Client::updateCounts(bool aRemove) {158void Client::updateCounts(bool aRemove) {
159 // We always remove the count and then add the correct one if requested...159 // We always remove the count and then add the correct one if requested...
160 if(countType == COUNT_NORMAL) {160 if(countType == COUNT_NORMAL) {
161 Thread::safeDec(counts.normal);161 counts.normal.dec();
162 } else if(countType == COUNT_REGISTERED) {162 } else if(countType == COUNT_REGISTERED) {
163 Thread::safeDec(counts.registered);163 counts.registered.dec();
164 } else if(countType == COUNT_OP) {164 } else if(countType == COUNT_OP) {
165 Thread::safeDec(counts.op);165 counts.op.dec();
166 }166 }
167167
168 countType = COUNT_UNCOUNTED;168 countType = COUNT_UNCOUNTED;
169169
170 if(!aRemove) {170 if(!aRemove) {
171 if(getMyIdentity().isOp()) {171 if(getMyIdentity().isOp()) {
172 Thread::safeInc(counts.op);172 counts.op.inc();
173 countType = COUNT_OP;173 countType = COUNT_OP;
174 } else if(getMyIdentity().isRegistered()) {174 } else if(getMyIdentity().isRegistered()) {
175 Thread::safeInc(counts.registered);175 counts.registered.inc();
176 countType = COUNT_REGISTERED;176 countType = COUNT_REGISTERED;
177 } else {177 } else {
178 Thread::safeInc(counts.normal);178 counts.normal.inc();
179 countType = COUNT_NORMAL;179 countType = COUNT_NORMAL;
180 }180 }
181 }181 }
182182
=== modified file 'dcpp/Client.h'
--- dcpp/Client.h 2009-08-15 04:40:26 +0000
+++ dcpp/Client.h 2010-08-15 19:35:53 +0000
@@ -26,6 +26,7 @@
26#include "BufferedSocketListener.h"26#include "BufferedSocketListener.h"
27#include "TimerManager.h"27#include "TimerManager.h"
28#include "ClientListener.h"28#include "ClientListener.h"
29#include "Atomic.h"
2930
30namespace dcpp {31namespace dcpp {
3132
@@ -72,7 +73,10 @@
7273
73 static string getCounts() {74 static string getCounts() {
74 char buf[128];75 char buf[128];
75 return string(buf, snprintf(buf, sizeof(buf), "%ld/%ld/%ld", counts.normal, counts.registered, counts.op));76 return string(buf, snprintf(buf, sizeof(buf), "%ld/%ld/%ld",
77 static_cast<long>(counts.normal),
78 static_cast<long>(counts.registered),
79 static_cast<long>(counts.op)));
76 }80 }
7781
78 StringMap& escapeParams(StringMap& sm) {82 StringMap& escapeParams(StringMap& sm) {
@@ -112,12 +116,16 @@
112 friend class ClientManager;116 friend class ClientManager;
113 Client(const string& hubURL, char separator, bool secure_);117 Client(const string& hubURL, char separator, bool secure_);
114 virtual ~Client() throw();118 virtual ~Client() throw();
119
115 struct Counts {120 struct Counts {
116 Counts(long n = 0, long r = 0, long o = 0) : normal(n), registered(r), op(o) { }121 private:
117 volatile long normal;122 typedef Atomic<boost::int32_t,memory_ordering_weak> atomic_counter_t;
118 volatile long registered;123 public:
119 volatile long op;124 typedef boost::int32_t value_type;
120 bool operator !=(const Counts& rhs) { return normal != rhs.normal || registered != rhs.registered || op != rhs.op; }125 Counts(value_type n = 0, value_type r = 0, value_type o = 0) : normal(n), registered(r), op(o) { }
126 atomic_counter_t normal;
127 atomic_counter_t registered;
128 atomic_counter_t op;
121 };129 };
122130
123 enum States {131 enum States {
124132
=== modified file 'dcpp/CriticalSection.h'
--- dcpp/CriticalSection.h 2009-02-23 01:47:25 +0000
+++ dcpp/CriticalSection.h 2010-08-15 19:35:53 +0000
@@ -19,7 +19,8 @@
19#if !defined(CRITICAL_SECTION_H)19#if !defined(CRITICAL_SECTION_H)
20#define CRITICAL_SECTION_H20#define CRITICAL_SECTION_H
2121
22#include "Thread.h"22// header-only implementation of mutex
23#include <boost/signals2/mutex.hpp>
2324
24namespace dcpp {25namespace dcpp {
2526
@@ -78,32 +79,11 @@
78 */79 */
79class FastCriticalSection {80class FastCriticalSection {
80public:81public:
81#ifdef _WIN3282 void enter() { mtx.lock(); }
82 FastCriticalSection() : state(0) { }83 void leave() { mtx.unlock(); }
8384private:
84 void enter() {85 typedef boost::signals2::mutex mutex_t;
85 while(Thread::safeExchange(state, 1) == 1) {86 mutex_t mtx;
86 Thread::yield();
87 }
88 }
89 void leave() {
90 Thread::safeDec(state);
91 }
92private:
93 volatile long state;
94
95#else
96 // We have to use a pthread (nonrecursive) mutex, didn't find any test_and_set on linux...
97 FastCriticalSection() {
98 static pthread_mutex_t fastmtx = PTHREAD_MUTEX_INITIALIZER;
99 mtx = fastmtx;
100 }
101 ~FastCriticalSection() { pthread_mutex_destroy(&mtx); }
102 void enter() { pthread_mutex_lock(&mtx); }
103 void leave() { pthread_mutex_unlock(&mtx); }
104private:
105 pthread_mutex_t mtx;
106#endif
107};87};
10888
109template<class T>89template<class T>
11090
=== modified file 'dcpp/Pointer.h'
--- dcpp/Pointer.h 2009-08-15 04:40:26 +0000
+++ dcpp/Pointer.h 2010-08-15 19:35:53 +0000
@@ -20,7 +20,7 @@
20#define DCPLUSPLUS_DCPP_POINTER_H20#define DCPLUSPLUS_DCPP_POINTER_H
2121
22#include <boost/intrusive_ptr.hpp>22#include <boost/intrusive_ptr.hpp>
23#include "Thread.h"23#include <boost/smart_ptr/detail/atomic_count.hpp>
2424
25namespace dcpp {25namespace dcpp {
2626
@@ -36,10 +36,10 @@
36 intrusive_ptr_base() throw() : ref(0) { }36 intrusive_ptr_base() throw() : ref(0) { }
3737
38private:38private:
39 friend void intrusive_ptr_add_ref(intrusive_ptr_base* p) { Thread::safeInc(p->ref); }39 friend void intrusive_ptr_add_ref(intrusive_ptr_base* p) { ++p->ref; }
40 friend void intrusive_ptr_release(intrusive_ptr_base* p) { if(Thread::safeDec(p->ref) == 0) { delete static_cast<T*>(p); } }40 friend void intrusive_ptr_release(intrusive_ptr_base* p) { if(--p->ref == 0) { delete static_cast<T*>(p); } }
4141
42 volatile long ref;42 boost::detail::atomic_count ref;
43};43};
4444
4545
4646
=== modified file 'dcpp/Semaphore.h'
--- dcpp/Semaphore.h 2009-02-23 01:47:25 +0000
+++ dcpp/Semaphore.h 2010-08-15 19:35:53 +0000
@@ -59,7 +59,7 @@
5959
60 bool wait() throw() {60 bool wait() throw() {
61 Lock l(cs);61 Lock l(cs);
62 if(count == 0) {62 while (count == 0) {
63 pthread_cond_wait(&cond, &cs.getMutex());63 pthread_cond_wait(&cond, &cs.getMutex());
64 }64 }
65 count--;65 count--;
@@ -74,7 +74,10 @@
74 millis+=timev.tv_usec/1000;74 millis+=timev.tv_usec/1000;
75 t.tv_sec = timev.tv_sec + (millis/1000);75 t.tv_sec = timev.tv_sec + (millis/1000);
76 t.tv_nsec = (millis%1000)*1000*1000;76 t.tv_nsec = (millis%1000)*1000*1000;
77 int ret = pthread_cond_timedwait(&cond, &cs.getMutex(), &t);77 int ret;
78 do {
79 ret = pthread_cond_timedwait(&cond, &cs.getMutex(), &t);
80 } while (ret==0 && count==0);
78 if(ret != 0) {81 if(ret != 0) {
79 return false;82 return false;
80 }83 }
8184
=== modified file 'dcpp/ShareManager.cpp'
--- dcpp/ShareManager.cpp 2009-12-27 22:03:53 +0000
+++ dcpp/ShareManager.cpp 2010-08-15 19:35:53 +0000
@@ -52,7 +52,7 @@
52namespace dcpp {52namespace dcpp {
5353
54ShareManager::ShareManager() : hits(0), xmlListLen(0), bzXmlListLen(0),54ShareManager::ShareManager() : hits(0), xmlListLen(0), bzXmlListLen(0),
55 xmlDirty(true), refreshDirs(false), update(false), initial(true), listN(0), refreshing(0),55 xmlDirty(true), refreshDirs(false), update(false), initial(true), listN(0), refreshing(false),
56 lastXmlUpdate(0), lastFullUpdate(GET_TICK()), bloom(1<<20)56 lastXmlUpdate(0), lastFullUpdate(GET_TICK()), bloom(1<<20)
57{57{
58 SettingsManager::getInstance()->addListener(this);58 SettingsManager::getInstance()->addListener(this);
@@ -812,7 +812,7 @@
812}812}
813813
814void ShareManager::refresh(bool dirs /* = false */, bool aUpdate /* = true */, bool block /* = false */) throw() {814void ShareManager::refresh(bool dirs /* = false */, bool aUpdate /* = true */, bool block /* = false */) throw() {
815 if(Thread::safeExchange(refreshing, 1) == 1) {815 if(refreshing.exchange(true) == true) {
816 LogManager::getInstance()->message(_("File list refresh in progress, please wait for it to finish before trying to refresh again"));816 LogManager::getInstance()->message(_("File list refresh in progress, please wait for it to finish before trying to refresh again"));
817 return;817 return;
818 }818 }
@@ -883,7 +883,7 @@
883 if(update) {883 if(update) {
884 ClientManager::getInstance()->infoUpdated();884 ClientManager::getInstance()->infoUpdated();
885 }885 }
886 refreshing = 0;886 refreshing = false;
887 return 0;887 return 0;
888}888}
889889
890890
=== modified file 'dcpp/ShareManager.h'
--- dcpp/ShareManager.h 2009-12-27 22:03:53 +0000
+++ dcpp/ShareManager.h 2010-08-15 19:35:53 +0000
@@ -33,6 +33,7 @@
33#include "FastAlloc.h"33#include "FastAlloc.h"
34#include "MerkleTree.h"34#include "MerkleTree.h"
35#include "Pointer.h"35#include "Pointer.h"
36#include "Atomic.h"
3637
37namespace dcpp {38namespace dcpp {
3839
@@ -249,7 +250,7 @@
249250
250 int listN;251 int listN;
251252
252 volatile long refreshing;253 Atomic<bool,memory_ordering_strong> refreshing;
253254
254 uint64_t lastXmlUpdate;255 uint64_t lastXmlUpdate;
255 uint64_t lastFullUpdate;256 uint64_t lastFullUpdate;
256257
=== modified file 'dcpp/Thread.cpp'
--- dcpp/Thread.cpp 2009-02-23 01:47:25 +0000
+++ dcpp/Thread.cpp 2010-08-15 19:35:53 +0000
@@ -23,10 +23,6 @@
2323
24namespace dcpp {24namespace dcpp {
2525
26#ifndef _WIN32
27pthread_mutex_t Thread::mtx = PTHREAD_MUTEX_INITIALIZER;
28#endif
29
30#ifdef _WIN3226#ifdef _WIN32
31void Thread::start() throw(ThreadException) {27void Thread::start() throw(ThreadException) {
32 join();28 join();
3329
=== modified file 'dcpp/Thread.h'
--- dcpp/Thread.h 2009-03-01 05:27:41 +0000
+++ dcpp/Thread.h 2010-08-15 19:35:53 +0000
@@ -21,7 +21,6 @@
2121
22#ifndef _WIN3222#ifndef _WIN32
23#include <pthread.h>23#include <pthread.h>
24#include <sched.h>
25#include <sys/resource.h>24#include <sys/resource.h>
26#endif25#endif
2726
@@ -62,18 +61,6 @@
62 void setThreadPriority(Priority p) throw() { ::SetThreadPriority(threadHandle, p); }61 void setThreadPriority(Priority p) throw() { ::SetThreadPriority(threadHandle, p); }
6362
64 static void sleep(uint32_t millis) { ::Sleep(millis); }63 static void sleep(uint32_t millis) { ::Sleep(millis); }
65 static void yield() { ::Sleep(1); }
66
67#ifdef __MINGW32__
68 static long safeInc(volatile long& v) { return InterlockedIncrement((long*)&v); }
69 static long safeDec(volatile long& v) { return InterlockedDecrement((long*)&v); }
70 static long safeExchange(volatile long& target, long value) { return InterlockedExchange((long*)&target, value); }
71
72#else
73 static long safeInc(volatile long& v) { return InterlockedIncrement(&v); }
74 static long safeDec(volatile long& v) { return InterlockedDecrement(&v); }
75 static long safeExchange(volatile long& target, long value) { return InterlockedExchange(&target, value); }
76#endif
7764
78#else65#else
7966
@@ -99,26 +86,6 @@
9986
100 void setThreadPriority(Priority p) { setpriority(PRIO_PROCESS, 0, p); }87 void setThreadPriority(Priority p) { setpriority(PRIO_PROCESS, 0, p); }
101 static void sleep(uint32_t millis) { ::usleep(millis*1000); }88 static void sleep(uint32_t millis) { ::usleep(millis*1000); }
102 static void yield() { ::sched_yield(); }
103 static long safeInc(volatile long& v) {
104 pthread_mutex_lock(&mtx);
105 long ret = ++v;
106 pthread_mutex_unlock(&mtx);
107 return ret;
108 }
109 static long safeDec(volatile long& v) {
110 pthread_mutex_lock(&mtx);
111 long ret = --v;
112 pthread_mutex_unlock(&mtx);
113 return ret;
114 }
115 static long safeExchange(volatile long& target, long value) {
116 pthread_mutex_lock(&mtx);
117 long ret = target;
118 target = value;
119 pthread_mutex_unlock(&mtx);
120 return ret;
121 }
122#endif89#endif
12390
124protected:91protected:
@@ -133,7 +100,6 @@
133 return 0;100 return 0;
134 }101 }
135#else102#else
136 static pthread_mutex_t mtx;
137 pthread_t threadHandle;103 pthread_t threadHandle;
138 static void* starter(void* p) {104 static void* starter(void* p) {
139 Thread* t = (Thread*)p;105 Thread* t = (Thread*)p;

Subscribers

People subscribed via source and target branches

to status/vote changes: