Merge lp:~vadim-tk/sysbench/inj-rate into lp:sysbench

Proposed by Vadim Tkachenko on 2012-02-25
Status: Needs review
Proposed branch: lp:~vadim-tk/sysbench/inj-rate
Merge into: lp:sysbench
Diff against target: 364 lines (+140/-47)
5 files modified
sysbench/db_driver.c (+6/-3)
sysbench/sb_timer.c (+2/-1)
sysbench/sb_timer.h (+1/-0)
sysbench/sysbench.c (+130/-43)
sysbench/sysbench.h (+1/-0)
To merge this branch: bzr merge lp:~vadim-tk/sysbench/inj-rate
Reviewer Review Type Date Requested Status
Alexey Kopytov 2012-02-25 Pending
Review via email: mp+94662@code.launchpad.net

Description of the change

This is rather a proof of concept than a final proposal.
I am doing merge proposal to seek a feedback what changes needs to be done.

To post a comment you must log in.
lp:~vadim-tk/sysbench/inj-rate updated on 2012-02-27
110. By Vadim Tkachenko on 2012-02-27

start eventgen thread only if tx_rate specified

Unmerged revisions

110. By Vadim Tkachenko on 2012-02-27

start eventgen thread only if tx_rate specified

109. By Vadim Tkachenko on 2012-02-25

cleanup unused variables

108. By Vadim Tkachenko on 2012-02-25

added current concurrency counter

107. By Vadim Tkachenko on 2012-02-25

polishing print queue size

106. By Vadim Tkachenko on 2012-02-25

cleanup tx_rate mode and jitter

105. By Vadim Tkachenko on 2012-02-13

Syntax fixes

104. By Vadim Tkachenko on 2012-02-13

Count queue time in response times

103. By Vadim Tkachenko on 2012-02-12

Implement injection rate, queries produced from dedicated thread

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'sysbench/db_driver.c'
2--- sysbench/db_driver.c 2011-09-26 13:54:56 +0000
3+++ sysbench/db_driver.c 2012-02-27 21:16:17 +0000
4@@ -77,6 +77,9 @@
5 static void db_update_thread_stats(int, db_query_type_t);
6 static void db_reset_stats(void);
7
8+extern int queue_counter;
9+extern int current_concurrency;
10+
11 /* DB layer arguments */
12
13 static sb_arg_t db_args[] =
14@@ -830,15 +833,15 @@
15 seconds = NS2SEC(sb_timer_split(&sb_globals.exec_timer));
16
17 log_timestamp(LOG_NOTICE, &sb_globals.exec_timer,
18- "threads: %d, tps: %4.2f, reads/s: %4.2f, writes/s: %4.2f "
19- "response time: %4.2fms (%u%%)",
20+ "threads: %d, tps: %4.2f, reads/s: %4.2f, writes/s: %4.2f, "
21+ "response time: %4.2fms (%u%%), queue size: %d, concurrency: %d",
22 sb_globals.num_threads,
23 (transactions - last_transactions) / seconds,
24 (read_ops - last_read_ops) / seconds,
25 (write_ops - last_write_ops) / seconds,
26 NS2MS(sb_percentile_calculate(&local_percentile,
27 sb_globals.percentile_rank)),
28- sb_globals.percentile_rank);
29+ sb_globals.percentile_rank, queue_counter, current_concurrency);
30
31 SB_THREAD_MUTEX_LOCK();
32 last_transactions = transactions;
33
34=== modified file 'sysbench/sb_timer.c'
35--- sysbench/sb_timer.c 2011-07-21 16:45:22 +0000
36+++ sysbench/sb_timer.c 2012-02-27 21:16:17 +0000
37@@ -35,7 +35,7 @@
38 static inline void sb_timer_update(sb_timer_t *t)
39 {
40 SB_GETTIME(&t->time_end);
41- t->elapsed = TIMESPEC_DIFF(t->time_end, t->time_start);
42+ t->elapsed = TIMESPEC_DIFF(t->time_end, t->time_start) + t->queue_time;
43 }
44
45 /* initialize timer */
46@@ -59,6 +59,7 @@
47 t->sum_time = 0;
48 t->events = 0;
49 t->elapsed = 0;
50+ t->queue_time = 0;
51 }
52
53
54
55=== modified file 'sysbench/sb_timer.h'
56--- sysbench/sb_timer.h 2011-07-21 16:45:22 +0000
57+++ sysbench/sb_timer.h 2012-02-27 21:16:17 +0000
58@@ -77,6 +77,7 @@
59 unsigned long long max_time;
60 unsigned long long sum_time;
61 unsigned long long events;
62+ unsigned long long queue_time;
63 timer_state_t state;
64 } sb_timer_t;
65
66
67=== modified file 'sysbench/sysbench.c'
68--- sysbench/sysbench.c 2011-09-26 13:54:56 +0000
69+++ sysbench/sysbench.c 2012-02-27 21:16:17 +0000
70@@ -108,8 +108,6 @@
71 SB_ARG_TYPE_STRING, "off"},
72 {"thread-stack-size", "size of stack per thread", SB_ARG_TYPE_SIZE, "64K"},
73 {"tx-rate", "target transaction rate (tps)", SB_ARG_TYPE_INT, "0"},
74- {"tx-jitter", "target transaction variation, in microseconds",
75- SB_ARG_TYPE_INT, "0"},
76 {"report-interval", "periodically report intermediate statistics "
77 "with a specified interval in seconds. 0 disables intermediate reports",
78 SB_ARG_TYPE_INT, "0"},
79@@ -151,6 +149,15 @@
80 static pthread_mutex_t thread_start_mutex;
81 static pthread_attr_t thread_attr;
82
83+/* structures to handle queue of events, needed for tx_rate mode */
84+static pthread_mutex_t queue_mutex;
85+static pthread_cond_t queue_cv;
86+int queue_counter = 0;
87+int current_concurrency = 0;
88+static int queue_is_full = 0;
89+#define MAX_QUEUE_LEN 100000
90+static unsigned long long queue_data[MAX_QUEUE_LEN];
91+
92 static void print_header(void);
93 static void print_usage(void);
94 static void print_run_mode(sb_test_t *);
95@@ -199,6 +206,8 @@
96 static int execute_request(sb_test_t *test, sb_request_t *r,int thread_id)
97 {
98 unsigned int rc;
99+
100+
101
102 if (test->ops.execute_request != NULL)
103 rc = test->ops.execute_request(r, thread_id);
104@@ -361,8 +370,7 @@
105 if (sb_globals.tx_rate > 0)
106 {
107 log_text(LOG_NOTICE,
108- "Target transaction rate: %d/sec, with jitter %d usec",
109- sb_globals.tx_rate, sb_globals.tx_jitter);
110+ "Target transaction rate: %d/sec", sb_globals.tx_rate);
111 }
112
113 if (sb_globals.report_interval)
114@@ -434,15 +442,13 @@
115 sb_thread_ctxt_t *ctxt;
116 sb_test_t *test;
117 unsigned int thread_id;
118- long long period_ns = 0;
119- long long jitter_ns = 0;
120- long long pause_ns;
121- struct timespec target_tv, now_tv;
122-
123+ int queue_loop;
124+ unsigned long long queue_start_time;
125+
126 ctxt = (sb_thread_ctxt_t *)arg;
127 test = ctxt->test;
128 thread_id = ctxt->id;
129-
130+
131 log_text(LOG_DEBUG, "Runner thread started (%d)!", thread_id);
132 if (test->ops.thread_init != NULL && test->ops.thread_init(thread_id) != 0)
133 {
134@@ -450,17 +456,6 @@
135 return NULL; /* thread initialization failed */
136 }
137
138- if (sb_globals.tx_rate > 0)
139- {
140- /* initialize tx_rate variables */
141- period_ns = floor(1e9 / sb_globals.tx_rate * sb_globals.num_threads + 0.5);
142- if (sb_globals.tx_jitter > 0)
143- jitter_ns = sb_globals.tx_jitter * 1000;
144- else
145- /* Default jitter is 1/10th of the period */
146- jitter_ns = period_ns / 10;
147- }
148-
149 /*
150 We do this to make sure all threads get to this barrier
151 about the same time
152@@ -469,25 +464,54 @@
153 sb_globals.num_running++;
154 pthread_mutex_unlock(&thread_start_mutex);
155
156- if (sb_globals.tx_rate > 0)
157- {
158- /* we are time-rating transactions */
159- SB_GETTIME(&target_tv);
160- /* For the first transaction - ramp up */
161- pause_ns = period_ns / sb_globals.num_threads * thread_id;
162- add_ns_to_timespec(&target_tv, period_ns);
163- usleep(pause_ns / 1000);
164- }
165-
166 do
167 {
168+
169+ /* If we are in tx_rate mode, we take events from queue */
170+ if (sb_globals.tx_rate > 0)
171+ {
172+ if (queue_is_full)
173+ {
174+ log_errno(LOG_FATAL, "Event queue is full.");
175+ break;
176+ }
177+ pthread_mutex_lock (&queue_mutex);
178+ while(!queue_counter)
179+ pthread_cond_wait (&queue_cv, &queue_mutex);
180+
181+ queue_start_time = queue_data[0];
182+ /* This is probably a quite uneffective way to handle queue,
183+ may need to use copy() function */
184+
185+ for (queue_loop=0; queue_loop < queue_counter; queue_loop++)
186+ queue_data[queue_loop] = queue_data[queue_loop+1];
187+
188+ queue_counter--;
189+
190+ pthread_mutex_unlock(&queue_mutex);
191+
192+ (&timers[thread_id])->queue_time = sb_timer_value(&sb_globals.exec_timer) - queue_start_time;
193+
194+ /* we do it without mutex protection, that's fine to have racing */
195+ current_concurrency++;
196+ }
197+
198+
199 request = get_request(test, thread_id);
200+
201 /* check if we shall execute it */
202 if (request.type != SB_REQ_TYPE_NULL)
203 {
204 if (execute_request(test, &request, thread_id))
205 break; /* break if error returned (terminates only one thread) */
206 }
207+
208+ if (sb_globals.tx_rate > 0)
209+ {
210+ /* we do it without mutex protection, that's fine to have racing */
211+ current_concurrency--;
212+ }
213+
214 /* Check if we have a time limit */
215 if (sb_globals.max_time != 0 &&
216 sb_timer_value(&sb_globals.exec_timer) >= SEC2NS(sb_globals.max_time))
217@@ -496,17 +520,6 @@
218 break;
219 }
220
221- /* check if we are time-rating transactions and need to pause */
222- if (sb_globals.tx_rate > 0)
223- {
224- add_ns_to_timespec(&target_tv, period_ns);
225- SB_GETTIME(&now_tv);
226- pause_ns = TIMESPEC_DIFF(target_tv, now_tv) - (jitter_ns / 2) +
227- (sb_rnd() % jitter_ns);
228- if (pause_ns > 5000)
229- usleep(pause_ns / 1000);
230- }
231-
232 } while ((request.type != SB_REQ_TYPE_NULL) && (!sb_globals.error) );
233
234 if (test->ops.thread_done != NULL)
235@@ -519,6 +532,60 @@
236 return NULL;
237 }
238
239+static void *eventgen_thread_proc(void *arg)
240+{
241+ unsigned long long pause_ns;
242+ unsigned long long prev_ns;
243+ unsigned long long next_ns;
244+ unsigned long long curr_ns;
245+ unsigned long long intr_ns;
246+
247+ (void)arg; /* unused */
248+
249+ log_text(LOG_DEBUG, "Event generating thread started");
250+
251+ pthread_mutex_lock(&thread_start_mutex);
252+ pthread_mutex_unlock(&thread_start_mutex);
253+
254+ curr_ns = sb_timer_value(&sb_globals.exec_timer);
255+ /* emulate exponential distribution with Lambda = tx_rate */
256+ intr_ns = (long)(log(1-(double)sb_rnd() / (double)SB_MAX_RND)/(-(double)sb_globals.tx_rate)*1000000);
257+ next_ns = curr_ns + intr_ns*1000;
258+
259+ for (;;)
260+ {
261+ prev_ns = curr_ns;
262+
263+ curr_ns = sb_timer_value(&sb_globals.exec_timer);
264+
265+ /* emulate exponential distribution with Lambda = tx_rate */
266+ intr_ns = (long)(log(1-(double)sb_rnd() / (double)SB_MAX_RND)/(-(double)sb_globals.tx_rate)*1000000);
267+
268+ next_ns = next_ns + intr_ns*1000;
269+ if (next_ns > curr_ns)
270+ pause_ns = next_ns - curr_ns;
271+ else
272+ pause_ns = 1000;
273+
274+ usleep(pause_ns/1000);
275+
276+ pthread_mutex_lock(&queue_mutex);
277+ queue_data[queue_counter]=sb_timer_value(&sb_globals.exec_timer);
278+ queue_counter++;
279+ if (queue_counter >= MAX_QUEUE_LEN)
280+ queue_is_full = 1;
281+ pthread_cond_signal(&queue_cv);
282+ pthread_mutex_unlock(&queue_mutex);
283+
284+ if (queue_is_full)
285+ {
286+ log_errno(LOG_FATAL, "Event queue is full.");
287+ return NULL;
288+ }
289+ }
290+
291+ return NULL;
292+}
293
294 /* Intermediate reports thread */
295
296@@ -627,6 +694,7 @@
297 int err;
298 pthread_t report_thread;
299 pthread_t checkpoints_thread;
300+ pthread_t eventgen_thread;
301 int report_thread_created = 0;
302 int checkpoints_thread_created = 0;
303
304@@ -652,6 +720,12 @@
305 return 1;
306
307 pthread_mutex_init(&sb_globals.exec_mutex, NULL);
308+
309+
310+ pthread_mutex_init(&queue_mutex, NULL);
311+ pthread_cond_init(&queue_cv, NULL);
312+ queue_counter = 0;
313+ queue_is_full = 0;
314
315 /* start mutex used for barrier */
316 pthread_mutex_init(&thread_start_mutex,NULL);
317@@ -686,6 +760,16 @@
318 report_thread_created = 1;
319 }
320
321+ if (sb_globals.tx_rate > 0)
322+ {
323+ if ((err = pthread_create(&eventgen_thread, &thread_attr, &eventgen_thread_proc,
324+ NULL)) != 0)
325+ {
326+ log_errno(LOG_FATAL, "pthread_create() for the reporting thread failed.");
327+ return 1;
328+ }
329+ }
330+
331 if (sb_globals.n_checkpoints > 0)
332 {
333 /* Create a thread for checkpoint statistic reports */
334@@ -768,6 +852,10 @@
335 if (pthread_cancel(report_thread) || pthread_join(report_thread, NULL))
336 log_errno(LOG_FATAL, "Terminating the reporting thread failed.");
337 }
338+
339+ if (pthread_cancel(eventgen_thread) || pthread_join(eventgen_thread, NULL))
340+ log_errno(LOG_FATAL, "Terminating the event generator thread failed.");
341+
342 if (checkpoints_thread_created)
343 {
344 if (pthread_cancel(checkpoints_thread) ||
345@@ -914,7 +1002,6 @@
346 rand_res = sb_get_value_int("rand-spec-res");
347
348 sb_globals.tx_rate = sb_get_value_int("tx-rate");
349- sb_globals.tx_jitter = sb_get_value_int("tx-jitter");
350 sb_globals.report_interval = sb_get_value_int("report-interval");
351
352 sb_globals.n_checkpoints = 0;
353
354=== modified file 'sysbench/sysbench.h'
355--- sysbench/sysbench.h 2011-07-21 17:02:14 +0000
356+++ sysbench/sysbench.h 2012-02-27 21:16:17 +0000
357@@ -97,6 +97,7 @@
358 {
359 int type;
360 struct sb_test_t *test;
361+ unsigned long long start_time_queue; /* bad hack, need to look how to fix */
362
363 /* type-specific data */
364 union

Subscribers

People subscribed via source and target branches

to status/vote changes: