Merge lp:~vadim-tk/sysbench/inj-rate into lp:sysbench

Proposed by Vadim Tkachenko
Status: Needs review
Proposed branch: lp:~vadim-tk/sysbench/inj-rate
Merge into: lp:sysbench
Diff against target: 364 lines (+140/-47)
5 files modified
sysbench/db_driver.c (+6/-3)
sysbench/sb_timer.c (+2/-1)
sysbench/sb_timer.h (+1/-0)
sysbench/sysbench.c (+130/-43)
sysbench/sysbench.h (+1/-0)
To merge this branch: bzr merge lp:~vadim-tk/sysbench/inj-rate
Reviewer Review Type Date Requested Status
Alexey Kopytov Pending
Review via email: mp+94662@code.launchpad.net

Description of the change

This is rather a proof of concept than a final proposal.
I am doing merge proposal to seek a feedback what changes needs to be done.

To post a comment you must log in.
lp:~vadim-tk/sysbench/inj-rate updated
110. By Vadim Tkachenko

start eventgen thread only if tx_rate specified

Unmerged revisions

110. By Vadim Tkachenko

start eventgen thread only if tx_rate specified

109. By Vadim Tkachenko

cleanup unused variables

108. By Vadim Tkachenko

added current concurrency counter

107. By Vadim Tkachenko

polishing print queue size

106. By Vadim Tkachenko

cleanup tx_rate mode and jitter

105. By Vadim Tkachenko

Syntax fixes

104. By Vadim Tkachenko

Count queue time in response times

103. By Vadim Tkachenko

Implement injection rate, queries produced from dedicated thread

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'sysbench/db_driver.c'
2--- sysbench/db_driver.c 2011-09-26 13:54:56 +0000
3+++ sysbench/db_driver.c 2012-02-27 21:16:17 +0000
4@@ -77,6 +77,9 @@
5 static void db_update_thread_stats(int, db_query_type_t);
6 static void db_reset_stats(void);
7
8+extern int queue_counter;
9+extern int current_concurrency;
10+
11 /* DB layer arguments */
12
13 static sb_arg_t db_args[] =
14@@ -830,15 +833,15 @@
15 seconds = NS2SEC(sb_timer_split(&sb_globals.exec_timer));
16
17 log_timestamp(LOG_NOTICE, &sb_globals.exec_timer,
18- "threads: %d, tps: %4.2f, reads/s: %4.2f, writes/s: %4.2f "
19- "response time: %4.2fms (%u%%)",
20+ "threads: %d, tps: %4.2f, reads/s: %4.2f, writes/s: %4.2f, "
21+ "response time: %4.2fms (%u%%), queue size: %d, concurrency: %d",
22 sb_globals.num_threads,
23 (transactions - last_transactions) / seconds,
24 (read_ops - last_read_ops) / seconds,
25 (write_ops - last_write_ops) / seconds,
26 NS2MS(sb_percentile_calculate(&local_percentile,
27 sb_globals.percentile_rank)),
28- sb_globals.percentile_rank);
29+ sb_globals.percentile_rank, queue_counter, current_concurrency);
30
31 SB_THREAD_MUTEX_LOCK();
32 last_transactions = transactions;
33
34=== modified file 'sysbench/sb_timer.c'
35--- sysbench/sb_timer.c 2011-07-21 16:45:22 +0000
36+++ sysbench/sb_timer.c 2012-02-27 21:16:17 +0000
37@@ -35,7 +35,7 @@
38 static inline void sb_timer_update(sb_timer_t *t)
39 {
40 SB_GETTIME(&t->time_end);
41- t->elapsed = TIMESPEC_DIFF(t->time_end, t->time_start);
42+ t->elapsed = TIMESPEC_DIFF(t->time_end, t->time_start) + t->queue_time;
43 }
44
45 /* initialize timer */
46@@ -59,6 +59,7 @@
47 t->sum_time = 0;
48 t->events = 0;
49 t->elapsed = 0;
50+ t->queue_time = 0;
51 }
52
53
54
55=== modified file 'sysbench/sb_timer.h'
56--- sysbench/sb_timer.h 2011-07-21 16:45:22 +0000
57+++ sysbench/sb_timer.h 2012-02-27 21:16:17 +0000
58@@ -77,6 +77,7 @@
59 unsigned long long max_time;
60 unsigned long long sum_time;
61 unsigned long long events;
62+ unsigned long long queue_time;
63 timer_state_t state;
64 } sb_timer_t;
65
66
67=== modified file 'sysbench/sysbench.c'
68--- sysbench/sysbench.c 2011-09-26 13:54:56 +0000
69+++ sysbench/sysbench.c 2012-02-27 21:16:17 +0000
70@@ -108,8 +108,6 @@
71 SB_ARG_TYPE_STRING, "off"},
72 {"thread-stack-size", "size of stack per thread", SB_ARG_TYPE_SIZE, "64K"},
73 {"tx-rate", "target transaction rate (tps)", SB_ARG_TYPE_INT, "0"},
74- {"tx-jitter", "target transaction variation, in microseconds",
75- SB_ARG_TYPE_INT, "0"},
76 {"report-interval", "periodically report intermediate statistics "
77 "with a specified interval in seconds. 0 disables intermediate reports",
78 SB_ARG_TYPE_INT, "0"},
79@@ -151,6 +149,15 @@
80 static pthread_mutex_t thread_start_mutex;
81 static pthread_attr_t thread_attr;
82
83+/* structures to handle queue of events, needed for tx_rate mode */
84+static pthread_mutex_t queue_mutex;
85+static pthread_cond_t queue_cv;
86+int queue_counter = 0;
87+int current_concurrency = 0;
88+static int queue_is_full = 0;
89+#define MAX_QUEUE_LEN 100000
90+static unsigned long long queue_data[MAX_QUEUE_LEN];
91+
92 static void print_header(void);
93 static void print_usage(void);
94 static void print_run_mode(sb_test_t *);
95@@ -199,6 +206,8 @@
96 static int execute_request(sb_test_t *test, sb_request_t *r,int thread_id)
97 {
98 unsigned int rc;
99+
100+
101
102 if (test->ops.execute_request != NULL)
103 rc = test->ops.execute_request(r, thread_id);
104@@ -361,8 +370,7 @@
105 if (sb_globals.tx_rate > 0)
106 {
107 log_text(LOG_NOTICE,
108- "Target transaction rate: %d/sec, with jitter %d usec",
109- sb_globals.tx_rate, sb_globals.tx_jitter);
110+ "Target transaction rate: %d/sec", sb_globals.tx_rate);
111 }
112
113 if (sb_globals.report_interval)
114@@ -434,15 +442,13 @@
115 sb_thread_ctxt_t *ctxt;
116 sb_test_t *test;
117 unsigned int thread_id;
118- long long period_ns = 0;
119- long long jitter_ns = 0;
120- long long pause_ns;
121- struct timespec target_tv, now_tv;
122-
123+ int queue_loop;
124+ unsigned long long queue_start_time;
125+
126 ctxt = (sb_thread_ctxt_t *)arg;
127 test = ctxt->test;
128 thread_id = ctxt->id;
129-
130+
131 log_text(LOG_DEBUG, "Runner thread started (%d)!", thread_id);
132 if (test->ops.thread_init != NULL && test->ops.thread_init(thread_id) != 0)
133 {
134@@ -450,17 +456,6 @@
135 return NULL; /* thread initialization failed */
136 }
137
138- if (sb_globals.tx_rate > 0)
139- {
140- /* initialize tx_rate variables */
141- period_ns = floor(1e9 / sb_globals.tx_rate * sb_globals.num_threads + 0.5);
142- if (sb_globals.tx_jitter > 0)
143- jitter_ns = sb_globals.tx_jitter * 1000;
144- else
145- /* Default jitter is 1/10th of the period */
146- jitter_ns = period_ns / 10;
147- }
148-
149 /*
150 We do this to make sure all threads get to this barrier
151 about the same time
152@@ -469,25 +464,54 @@
153 sb_globals.num_running++;
154 pthread_mutex_unlock(&thread_start_mutex);
155
156- if (sb_globals.tx_rate > 0)
157- {
158- /* we are time-rating transactions */
159- SB_GETTIME(&target_tv);
160- /* For the first transaction - ramp up */
161- pause_ns = period_ns / sb_globals.num_threads * thread_id;
162- add_ns_to_timespec(&target_tv, period_ns);
163- usleep(pause_ns / 1000);
164- }
165-
166 do
167 {
168+
169+ /* If we are in tx_rate mode, we take events from queue */
170+ if (sb_globals.tx_rate > 0)
171+ {
172+ if (queue_is_full)
173+ {
174+ log_errno(LOG_FATAL, "Event queue is full.");
175+ break;
176+ }
177+ pthread_mutex_lock (&queue_mutex);
178+ while(!queue_counter)
179+ pthread_cond_wait (&queue_cv, &queue_mutex);
180+
181+ queue_start_time = queue_data[0];
182+ /* This is probably a quite uneffective way to handle queue,
183+ may need to use copy() function */
184+
185+ for (queue_loop=0; queue_loop < queue_counter; queue_loop++)
186+ queue_data[queue_loop] = queue_data[queue_loop+1];
187+
188+ queue_counter--;
189+
190+ pthread_mutex_unlock(&queue_mutex);
191+
192+ (&timers[thread_id])->queue_time = sb_timer_value(&sb_globals.exec_timer) - queue_start_time;
193+
194+ /* we do it without mutex protection, that's fine to have racing */
195+ current_concurrency++;
196+ }
197+
198+
199 request = get_request(test, thread_id);
200+
201 /* check if we shall execute it */
202 if (request.type != SB_REQ_TYPE_NULL)
203 {
204 if (execute_request(test, &request, thread_id))
205 break; /* break if error returned (terminates only one thread) */
206 }
207+
208+ if (sb_globals.tx_rate > 0)
209+ {
210+ /* we do it without mutex protection, that's fine to have racing */
211+ current_concurrency--;
212+ }
213+
214 /* Check if we have a time limit */
215 if (sb_globals.max_time != 0 &&
216 sb_timer_value(&sb_globals.exec_timer) >= SEC2NS(sb_globals.max_time))
217@@ -496,17 +520,6 @@
218 break;
219 }
220
221- /* check if we are time-rating transactions and need to pause */
222- if (sb_globals.tx_rate > 0)
223- {
224- add_ns_to_timespec(&target_tv, period_ns);
225- SB_GETTIME(&now_tv);
226- pause_ns = TIMESPEC_DIFF(target_tv, now_tv) - (jitter_ns / 2) +
227- (sb_rnd() % jitter_ns);
228- if (pause_ns > 5000)
229- usleep(pause_ns / 1000);
230- }
231-
232 } while ((request.type != SB_REQ_TYPE_NULL) && (!sb_globals.error) );
233
234 if (test->ops.thread_done != NULL)
235@@ -519,6 +532,60 @@
236 return NULL;
237 }
238
239+static void *eventgen_thread_proc(void *arg)
240+{
241+ unsigned long long pause_ns;
242+ unsigned long long prev_ns;
243+ unsigned long long next_ns;
244+ unsigned long long curr_ns;
245+ unsigned long long intr_ns;
246+
247+ (void)arg; /* unused */
248+
249+ log_text(LOG_DEBUG, "Event generating thread started");
250+
251+ pthread_mutex_lock(&thread_start_mutex);
252+ pthread_mutex_unlock(&thread_start_mutex);
253+
254+ curr_ns = sb_timer_value(&sb_globals.exec_timer);
255+ /* emulate exponential distribution with Lambda = tx_rate */
256+ intr_ns = (long)(log(1-(double)sb_rnd() / (double)SB_MAX_RND)/(-(double)sb_globals.tx_rate)*1000000);
257+ next_ns = curr_ns + intr_ns*1000;
258+
259+ for (;;)
260+ {
261+ prev_ns = curr_ns;
262+
263+ curr_ns = sb_timer_value(&sb_globals.exec_timer);
264+
265+ /* emulate exponential distribution with Lambda = tx_rate */
266+ intr_ns = (long)(log(1-(double)sb_rnd() / (double)SB_MAX_RND)/(-(double)sb_globals.tx_rate)*1000000);
267+
268+ next_ns = next_ns + intr_ns*1000;
269+ if (next_ns > curr_ns)
270+ pause_ns = next_ns - curr_ns;
271+ else
272+ pause_ns = 1000;
273+
274+ usleep(pause_ns/1000);
275+
276+ pthread_mutex_lock(&queue_mutex);
277+ queue_data[queue_counter]=sb_timer_value(&sb_globals.exec_timer);
278+ queue_counter++;
279+ if (queue_counter >= MAX_QUEUE_LEN)
280+ queue_is_full = 1;
281+ pthread_cond_signal(&queue_cv);
282+ pthread_mutex_unlock(&queue_mutex);
283+
284+ if (queue_is_full)
285+ {
286+ log_errno(LOG_FATAL, "Event queue is full.");
287+ return NULL;
288+ }
289+ }
290+
291+ return NULL;
292+}
293
294 /* Intermediate reports thread */
295
296@@ -627,6 +694,7 @@
297 int err;
298 pthread_t report_thread;
299 pthread_t checkpoints_thread;
300+ pthread_t eventgen_thread;
301 int report_thread_created = 0;
302 int checkpoints_thread_created = 0;
303
304@@ -652,6 +720,12 @@
305 return 1;
306
307 pthread_mutex_init(&sb_globals.exec_mutex, NULL);
308+
309+
310+ pthread_mutex_init(&queue_mutex, NULL);
311+ pthread_cond_init(&queue_cv, NULL);
312+ queue_counter = 0;
313+ queue_is_full = 0;
314
315 /* start mutex used for barrier */
316 pthread_mutex_init(&thread_start_mutex,NULL);
317@@ -686,6 +760,16 @@
318 report_thread_created = 1;
319 }
320
321+ if (sb_globals.tx_rate > 0)
322+ {
323+ if ((err = pthread_create(&eventgen_thread, &thread_attr, &eventgen_thread_proc,
324+ NULL)) != 0)
325+ {
326+ log_errno(LOG_FATAL, "pthread_create() for the reporting thread failed.");
327+ return 1;
328+ }
329+ }
330+
331 if (sb_globals.n_checkpoints > 0)
332 {
333 /* Create a thread for checkpoint statistic reports */
334@@ -768,6 +852,10 @@
335 if (pthread_cancel(report_thread) || pthread_join(report_thread, NULL))
336 log_errno(LOG_FATAL, "Terminating the reporting thread failed.");
337 }
338+
339+ if (pthread_cancel(eventgen_thread) || pthread_join(eventgen_thread, NULL))
340+ log_errno(LOG_FATAL, "Terminating the event generator thread failed.");
341+
342 if (checkpoints_thread_created)
343 {
344 if (pthread_cancel(checkpoints_thread) ||
345@@ -914,7 +1002,6 @@
346 rand_res = sb_get_value_int("rand-spec-res");
347
348 sb_globals.tx_rate = sb_get_value_int("tx-rate");
349- sb_globals.tx_jitter = sb_get_value_int("tx-jitter");
350 sb_globals.report_interval = sb_get_value_int("report-interval");
351
352 sb_globals.n_checkpoints = 0;
353
354=== modified file 'sysbench/sysbench.h'
355--- sysbench/sysbench.h 2011-07-21 17:02:14 +0000
356+++ sysbench/sysbench.h 2012-02-27 21:16:17 +0000
357@@ -97,6 +97,7 @@
358 {
359 int type;
360 struct sb_test_t *test;
361+ unsigned long long start_time_queue; /* bad hack, need to look how to fix */
362
363 /* type-specific data */
364 union

Subscribers

People subscribed via source and target branches

to status/vote changes: