Merge lp:~fpstovall/nrtb/unique_ptr_fix into lp:nrtb

Proposed by Rick Stovall
Status: Merged
Approved by: Rick Stovall
Approved revision: 21
Merge reported by: Rick Stovall
Merged at revision: not available
Proposed branch: lp:~fpstovall/nrtb/unique_ptr_fix
Merge into: lp:nrtb
Diff against target: 1067 lines (+405/-334)
7 files modified
common/sockets/Makefile (+3/-3)
common/sockets/base_socket.cpp (+97/-112)
common/sockets/base_socket.h (+2/-2)
common/sockets/socket_test.cpp (+98/-80)
common/transceiver/Makefile (+2/-2)
common/transceiver/transceiver.h (+44/-36)
common/transceiver/transceiver_test.cpp (+159/-99)
To merge this branch: bzr merge lp:~fpstovall/nrtb/unique_ptr_fix
Reviewer Review Type Date Requested Status
Rick Stovall Approve
Review via email: mp+100340@code.launchpad.net

Commit message

First C++11 update; using std::unique_ptr for socket handling.

Description of the change

First of the changes to bring the code up to C+11 spec, particularly focused on replacing boost::shared_ptr with std::unique_ptr where appropriate.

This merge includes significant updates to the sockets and transceiver libs, improving stability, reporting and enforcing the logical constraint that a socket can have only one owner and that code which accepts a connection from the tcp_server socket factory must take actual ownership of the new socket. Fixes were also added which allowed the removal of arbitrary time delays in the associated unit test programs.

This will be the first of several merges assuming the move to C++11 is approved.

To post a comment you must log in.
Revision history for this message
Rick Stovall (fpstovall) wrote :

Merge test complete. As there has been no comments or response from the team, I'll be moving the code to the alpha stream. We need to be using the -std=gnu++0x switch on all compiles and links from this point on.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'common/sockets/Makefile'
2--- common/sockets/Makefile 2011-09-17 01:21:36 +0000
3+++ common/sockets/Makefile 2012-04-01 15:57:19 +0000
4@@ -24,13 +24,13 @@
5
6 socket_test: base_socket.o socket_test.cpp
7 @rm -f socket_test
8- g++ -c -O3 socket_test.cpp -I ../include
9- g++ -o socket_test socket_test.o base_socket.o ../obj/hires_timer.o ../obj/common.o ../obj/base_thread.o -lpthread
10+ g++ -c -O3 socket_test.cpp -I ../include -std=gnu++0x
11+ g++ -o socket_test socket_test.o base_socket.o ../obj/hires_timer.o ../obj/common.o ../obj/base_thread.o -lpthread -std=gnu++0x
12
13
14 base_socket.o: base_socket.cpp base_socket.h Makefile
15 @rm -f base_socket.o
16- g++ -c -O3 base_socket.cpp -I ../include
17+ g++ -c -O3 base_socket.cpp -I ../include -std=gnu++0x
18
19 clean:
20 @rm -vf *.o ../include/base_socket.h socket_test
21
22=== modified file 'common/sockets/base_socket.cpp'
23--- common/sockets/base_socket.cpp 2011-09-17 01:21:36 +0000
24+++ common/sockets/base_socket.cpp 2012-04-01 15:57:19 +0000
25@@ -605,32 +605,21 @@
26 // take action only if the listen thread is running.
27 if (listening())
28 {
29- // stop the listener thread
30- if (is_running()) stop();
31- // wait here until the thread stops.
32- if (is_running()) join();
33-// try
34-// {
35-// if (listen_sock) close(listen_sock);
36-// }
37-// catch (...) {};
38+ // stop the listener thread
39+ stop();
40+ join();
41 };
42 };
43
44 bool tcp_server_socket_factory::listening()
45 {
46- bool running = is_running();
47-/* if (!running)
48- {
49- // check to be sure the thread did not die due to an error.
50- if (_last_thread_fault != 0)
51- {
52- // if thread_return was non-zero, it is assumed the thread died an
53- // evil and useless death. Scream in anger!
54- throw listen_terminated_exception();
55- };
56- };
57-*/ return running;
58+ bool running = is_running();
59+ return running;
60+};
61+
62+int tcp_server_socket_factory::last_fault()
63+{
64+ return _last_thread_fault;
65 };
66
67 unsigned short int tcp_server_socket_factory::backlog()
68@@ -643,108 +632,104 @@
69 {
70 std::cerr << "in thread cleanup sock closer" << std::endl;
71 int & socket = *(static_cast<int*>(sock));
72- ::close(socket);
73+ try { ::close(socket); } catch (...) {};
74 std::cerr << "socker closer done." << std::endl;
75 };
76
77 void tcp_server_socket_factory::run()
78 {
79- /* Put this entire thing in a try block to protect the application.
80- * Without this, an untrapped exception thrown here or in the
81- * user supplied on_accept() method would abort the entire
82- * application instead of just this
83- * thread.
84- */
85+ // set up the listening socket.
86 int listen_sock;
87- // make sure the listener is closed when we exit.
88- pthread_cleanup_push(closeme, (void*) &listen_sock);
89+ _last_thread_fault = 0;
90+ bool go = false;
91 try
92 {
93- bool go = true;
94- // set up our listening socket.
95- listen_sock = socket(AF_INET,SOCK_STREAM,0);
96- sockaddr_in myaddr;
97- try
98- {
99- myaddr = tcp_socket::str_to_sockaddr(_address);
100- }
101- catch (...)
102- {
103- // probably a tcp_socket::bad_address_exception,
104- // but any reason will do.
105- go = false;
106- };
107- if (bind(listen_sock,(sockaddr *) &myaddr,sizeof(myaddr)))
108- {
109- // bind did not work.
110- go = false;
111- };
112- if (listen(listen_sock,_backlog))
113- {
114- // listen failed in some way.. I don't care which.
115- go = false;
116- };
117- // processing loop
118- while (go)
119- {
120- // accept a new connection
121- bool good_connect = true;
122- int new_conn = accept(listen_sock,NULL,NULL);
123- // validate the accept return value.
124- if (new_conn == -1)
125- {
126- // accept returned an error.
127- switch (errno)
128- {
129-// case ENETDOWN :
130- case EPROTO :
131-// case ENOPROTOOPT :
132- case EHOSTDOWN :
133-// case ENONET :
134- case EHOSTUNREACH :
135-// case EOPNOTSUPP :
136-// case ENETUNREACH :
137- case EAGAIN :
138-// case EPERM :
139- case ECONNABORTED :
140- {
141- good_connect = false;
142- break;
143- };
144- default :
145- {
146- // for any other error, we're going to shutdown the
147- // this listener thread.
148- go = false;
149- good_connect = false;
150- _last_thread_fault = errno;
151- break;
152- };
153- }; // switch (errno)
154- }; // error thrown by accept.
155- if (good_connect)
156- {
157- // create connect_sock
158- connect_sock.reset(new tcp_socket(new_conn));
159- // make the thread easily cancelable.
160- set_cancel_anytime();
161- // call on_accept
162- go = on_accept();
163- // set back to cancel_deferred.
164- set_deferred_cancel();
165- // release our claim to the new socket
166- connect_sock.reset();
167- };
168- }; // while go;
169+ listen_sock = socket(AF_INET,SOCK_STREAM,0);
170+ sockaddr_in myaddr;
171+ myaddr = tcp_socket::str_to_sockaddr(_address);
172+ int a = bind(listen_sock,(sockaddr *) &myaddr,sizeof(myaddr));
173+ int b = listen(listen_sock,_backlog);
174+ if (a || b)
175+ {
176+ go = false;
177+ if (a) _last_thread_fault += 1;
178+ if (b) _last_thread_fault += 2;
179+ }
180+ else go = true;
181 }
182 catch (...)
183 {
184- /* an untrapped exception was thrown by someone in this thread.
185- * We'll shutdown this thread and put -1 in the thread_return field
186- * to let the world know that we don't know what killed us.
187- */
188- _last_thread_fault = -1;
189- };
190+ _last_thread_fault = 100;
191+ };
192+ // if not in a good state, terminate the thread.
193+ if (!go)
194+ {
195+ _last_thread_fault =+ 200;
196+ exit(0);
197+ };
198+ // make sure the listener is closed when we exit.
199+ pthread_cleanup_push(closeme, (void*) &listen_sock);
200+ // processing loop
201+ while (go)
202+ {
203+ // accept a new connection
204+ bool good_connect = true;
205+ int new_conn = accept(listen_sock,NULL,NULL);
206+ // validate the accept return value.
207+ if (new_conn == -1)
208+ {
209+ // accept returned an error.
210+ switch (errno)
211+ {
212+ case EPROTO :
213+ case EHOSTDOWN :
214+ case EHOSTUNREACH :
215+ case EAGAIN :
216+ case ECONNABORTED :
217+ {
218+ // abandon this connection
219+ good_connect = false;
220+ break;
221+ };
222+ default :
223+ {
224+ // for any other error, we're going to shutdown the
225+ // this listener thread.
226+ go = false;
227+ good_connect = false;
228+ _last_thread_fault = errno;
229+ break;
230+ };
231+ }; // switch (errno)
232+ }; // error thrown by accept.
233+ if (good_connect)
234+ {
235+ connect_sock.reset(new tcp_socket(new_conn));
236+ set_cancel_anytime();
237+ // call the connection handler.
238+ try
239+ {
240+ go = on_accept();
241+ }
242+ catch (...)
243+ {
244+ go = false;
245+ _last_thread_fault = 501;
246+ };
247+ set_deferred_cancel();
248+ // safety check.
249+ if (connect_sock)
250+ {
251+ std::cerr << "WARNING: on_accept() did not take ownership of "
252+ << "connect_sock.\n"
253+ << " This can lead to leaks and should be fixed."
254+ << std::endl;
255+ connect_sock.reset();
256+ _last_thread_fault = 500;
257+ go = false;
258+ };
259+ };
260+ }; // while go;
261 pthread_cleanup_pop(0);
262 };
263
264
265=== modified file 'common/sockets/base_socket.h'
266--- common/sockets/base_socket.h 2011-09-17 00:33:23 +0000
267+++ common/sockets/base_socket.h 2012-04-01 15:57:19 +0000
268@@ -20,7 +20,7 @@
269 #define base_socket_header
270
271 #include <base_thread.h>
272-#include <boost/shared_ptr.hpp>
273+#include <memory>
274 #include <sys/socket.h>
275 #include <netinet/in.h>
276
277@@ -373,7 +373,7 @@
278 };
279
280 /// smart pointer for use with tcp_sockets
281-typedef boost::shared_ptr<nrtb::tcp_socket> tcp_socket_p;
282+typedef std::unique_ptr<nrtb::tcp_socket> tcp_socket_p;
283
284 /** Abstract "listener" TCP/IP socket for servers.
285 **
286
287=== modified file 'common/sockets/socket_test.cpp'
288--- common/sockets/socket_test.cpp 2011-09-17 01:21:36 +0000
289+++ common/sockets/socket_test.cpp 2012-04-01 15:57:19 +0000
290@@ -21,7 +21,6 @@
291 #include <string>
292 #include <boost/random.hpp>
293 #include "base_socket.h"
294-#include <boost/shared_ptr.hpp>
295
296 using namespace nrtb;
297 using namespace std;
298@@ -29,46 +28,48 @@
299 class myserver: public tcp_server_socket_factory
300 {
301 public:
302- int hits;
303- int errors;
304+ int hits;
305+ int errors;
306
307- // constructor
308- myserver(const string & a, const unsigned short int & b)
309- : tcp_server_socket_factory(a,b)
310- {
311- // Don't need to lock here because we know the
312- // listener thread is not running.
313- hits = 0;
314- errors = 0;
315- };
316+ // constructor
317+ myserver(const string & a, const unsigned short int & b)
318+ : tcp_server_socket_factory(a,b)
319+ {
320+ // Don't need to lock here because we know the
321+ // listener thread is not running.
322+ hits = 0;
323+ errors = 0;
324+ };
325
326 protected:
327- // on_accept() is called on each connection.
328- bool on_accept()
329- {
330- try
331- {
332- // just return what we've recieved.
333- string msg = connect_sock->getln();
334- connect_sock->put(msg);
335- // Update our hit count.
336- hits++;
337- }
338- catch (base_exception & e)
339- {
340- errors++;
341- cerr << "server Caught " << e.what() << endl;
342- }
343- catch (...)
344- {
345- errors++;
346- cerr << "Unexpected error in on_accept()" << endl;
347- };
348- if (hits > 99)
349- return false;
350- else
351- return true;
352- };
353+
354+ // on_accept() is called on each connection.
355+ bool on_accept()
356+ {
357+ try
358+ {
359+ tcp_socket_p sock = std::move(connect_sock);
360+ // just return what we've recieved.
361+ string msg = sock->getln();
362+ sock->put(msg);
363+ // Update our hit count.
364+ hits++;
365+ }
366+ catch (base_exception & e)
367+ {
368+ errors++;
369+ cerr << "server Caught " << e.what() << endl;
370+ }
371+ catch (...)
372+ {
373+ errors++;
374+ cerr << "Unexpected error in on_accept()" << endl;
375+ };
376+ if (hits > 99)
377+ return false;
378+ else
379+ return true;
380+ };
381 };
382
383 string transceiver(const string address, const string sendme)
384@@ -78,7 +79,7 @@
385 sender.connect(address);
386 sender.put(sendme);
387 returnme = sender.getln();
388- sender.close();//cerr << "tc>> sock closed" << endl;
389+ sender.close();
390 return returnme;
391 };
392
393@@ -101,86 +102,103 @@
394
395 try
396 {
397- // start the receiver/server
398- test_server.start_listen();
399- usleep(5e5);
400-
401- // Send test messages
402- for (int i = 0; i < 100; i++)
403- {
404- stringstream msg;
405- msg << "test message " << i << "\r";
406- string checkme = msg.str();
407- string returned = transceiver(address, checkme);
408- if (returned != checkme)
409- {
410- er_count++;
411- };
412- cout << returned.substr(0,returned.size()-1) << ": "
413- << ((returned == checkme) ? "Passed" : "Failed")
414- << endl;
415- };
416+ // start the receiver/server
417+ test_server.start_listen();
418+ int countdown = 99;
419+ while ((!test_server.listening()) and countdown)
420+ {
421+ usleep(1e3);
422+ countdown++;
423+ };
424+ if (!countdown)
425+ {
426+ cerr << "Could not start listener." << endl;
427+ exit(1);
428+ }
429+ cout << "test_server ready." << endl;
430+
431+ // Send test messages
432+ for (int i = 0; i < 100; i++)
433+ {
434+ stringstream msg;
435+ msg << "test message " << i << "\r";
436+ string checkme = msg.str();
437+ string returned = transceiver(address, checkme);
438+ if (returned != checkme)
439+ {
440+ er_count++;
441+ };
442+ cout << returned.substr(0,returned.size()-1) << ": "
443+ << ((returned == checkme) ? "Passed" : "Failed")
444+ << endl;
445+ };
446 }
447 catch (myserver::bind_failure_exception)
448 {
449- cout << "Could not bind port" << endl;
450+ cout << "Could not bind port" << endl;
451 }
452 catch (myserver::mem_exhasted_exception)
453 {
454- cout << "myserver reports out of memory." << endl;
455+ cout << "myserver reports out of memory." << endl;
456 }
457 catch (myserver::listen_terminated_exception)
458 {
459- cout << "Listener terminated unexpectedly." << endl;
460+ cout << "Listener terminated unexpectedly." << endl;
461 }
462 catch (myserver::on_accept_bound_exception)
463 {
464- cout << "myserver::on_accept() seems bound." << endl;
465+ cout << "myserver::on_accept() seems bound." << endl;
466 }
467 catch (tcp_socket::bad_connect_exception & e)
468 {
469- cout << "A bad_connect_exception was thrown.\n"
470- << e.comment() << endl;
471+ cout << "A bad_connect_exception was thrown.\n"
472+ << " comment: " << e.comment() << endl;
473+ cout << " test_server.last_fault() = "
474+ << test_server.last_fault() << endl;
475 }
476 catch (tcp_socket::not_open_exception & e)
477 {
478- cout << "A tcp not open exception was caught.\n"
479- << e.comment() << endl;
480+ cout << "A tcp not open exception was caught.\n"
481+ << e.comment() << endl;
482 }
483 catch (tcp_socket::close_exception & e)
484 {
485- cout << "A close_exception was caught.\n"
486- << e.comment() << endl;
487+ cout << "A close_exception was caught.\n"
488+ << e.comment() << endl;
489 }
490 catch (tcp_socket::overrun_exception & e)
491 {
492- cout << "An overrun_exception was caught.\n"
493- << e.comment() << endl;
494+ cout << "An overrun_exception was caught.\n"
495+ << e.comment() << endl;
496 }
497 catch (tcp_socket::buffer_full_exception & e)
498 {
499- cout << "A buffer_full_exception was caught.\n"
500- << e.comment() << endl;
501+ cout << "A buffer_full_exception was caught.\n"
502+ << e.comment() << endl;
503 }
504 catch (tcp_socket::general_exception & e)
505 {
506- cout << "A tcp_socket exception was caught.\n"
507- << e.comment() << endl;
508+ cout << "A tcp_socket exception was caught.\n"
509+ << " comment: " << e.comment() << endl;
510+ cout << " test_server.last_fault() = "
511+ << test_server.last_fault() << endl;
512 }
513 catch (exception & e)
514 {
515- cout << "A unexpected " << e.what() << " exception was caught." << endl;
516+ cout << "A unexpected " << e.what()
517+ << " exception was caught." << endl;
518 };
519
520 // final check.
521 if (test_server.hits != 100)
522 {
523- er_count++;
524- cout << "Server does not report the proper number of hits.\n"
525- << "\tExpected 100, got " << test_server.hits
526- << endl;
527+ er_count++;
528+ cout << "Server does not report the proper number of hits.\n"
529+ << "\tExpected 100, got " << test_server.hits
530+ << endl;
531 };
532- cout << "=========== tcp_socket test complete =============" << endl;
533+ cout << "=========== tcp_socket test complete ============="
534+ << endl;
535
536 return er_count;
537 };
538
539=== modified file 'common/transceiver/Makefile'
540--- common/transceiver/Makefile 2011-08-25 04:22:52 +0000
541+++ common/transceiver/Makefile 2012-04-01 15:57:19 +0000
542@@ -23,8 +23,8 @@
543
544 transceiver_test: transceiver.h transceiver_test.cpp
545 @rm -f transceiver_test
546- g++ -c transceiver_test.cpp -I../include
547- g++ -o transceiver_test transceiver_test.o ../obj/common.o ../obj/log_setup.o ../obj/serializer.o ../obj/base_thread.o ../obj/base_socket.o ../obj/confreader.o -lpthread -lprotobuf ../lib/nrtb_gpb.a -lPocoFoundation
548+ g++ -c transceiver_test.cpp -I../include -std=gnu++0x
549+ g++ -o transceiver_test transceiver_test.o ../obj/common.o ../obj/log_setup.o ../obj/serializer.o ../obj/base_thread.o ../obj/base_socket.o ../obj/confreader.o -lpthread -lprotobuf ../lib/nrtb_gpb.a -lPocoFoundation -std=gnu++0x
550
551 clean:
552 @rm -rvf *.o transceiver_test ../include/transceiver.h *.log ../obj/transceiver.o
553
554=== modified file 'common/transceiver/transceiver.h'
555--- common/transceiver/transceiver.h 2011-09-15 22:53:06 +0000
556+++ common/transceiver/transceiver.h 2012-04-01 15:57:19 +0000
557@@ -26,7 +26,6 @@
558 #include <serializer.h>
559 #include <confreader.h>
560 #include <Poco/Logger.h>
561-#include <boost/shared_ptr.hpp>
562 #include <boost/circular_buffer.hpp>
563
564 namespace nrtb
565@@ -51,8 +50,8 @@
566 * specification this class implements.
567 * ***************************************************************/
568 template <class out, class in,
569- class outp = boost::shared_ptr<out>,
570- class inp = boost::shared_ptr<in> >
571+ class outp = std::unique_ptr<out>,
572+ class inp = std::unique_ptr<in> >
573 class transceiver
574 {
575 public:
576@@ -70,12 +69,17 @@
577 * socket. Once created this class assumes it uniquely owns the
578 * socket and will close it upon distruction.
579 * ***********************************************************/
580- transceiver(tcp_socket_p socket);
581+ transceiver(tcp_socket_p & socket);
582 /*************************************************************
583 * Closes the socket and releases all mmemory associated with
584 * this class.
585 * ***********************************************************/
586 virtual ~transceiver();
587+ /*************************************************************
588+ * is_connected() returns true if the socket is up and ready
589+ * to use, false otherwise.
590+ *************************************************************/
591+ bool is_connected();
592 /**************************************************************
593 * gets the next message from the socket. If no messages are
594 * ready, blocks util one arrives.
595@@ -85,7 +89,7 @@
596 * Sends a message over the socket and adds it to the
597 * sent_messages buffer in case it's needed for error recovery.
598 * ***********************************************************/
599- void send(outp sendme);
600+ void send(outp & sendme);
601 /**************************************************************
602 * Called by the data consumer when an inbound message was
603 * not valid in the current application context. msg_number
604@@ -124,7 +128,7 @@
605 unsigned long long last_inbound;
606 /// buffer to hold previously sent messages; required for
607 /// error recovery.
608- boost::circular_buffer<outp> sent_messages;
609+ boost::circular_buffer<out> sent_messages;
610 /// fence post for recovery efforts, zero if none in play
611 unsigned long long nak_fence_post;
612 /// These methods implment actual nak recovery.
613@@ -136,7 +140,7 @@
614 serializer tscvr_sequence(0);
615
616 template <class out, class in, class outp, class inp>
617-transceiver<out,in,outp,inp>::transceiver(tcp_socket_p socket)
618+transceiver<out,in,outp,inp>::transceiver(tcp_socket_p & socket)
619 {
620 // get the configuration parameters.
621 global_conf_reader & config = global_conf_reader::get_instance();
622@@ -152,7 +156,7 @@
623 logname = s.str();
624 Poco::Logger & log = Poco::Logger::get(logname);
625 // set up the socket.
626- sock = socket;
627+ sock = std::move(socket);
628 // annouce ourselves...
629 log.information("Instanciated.");
630 s.str("");
631@@ -173,10 +177,10 @@
632 // shutdown and release the socket.
633 try
634 {
635- if (sock)
636- {
637- sock.reset();
638- };
639+ if (sock)
640+ {
641+ sock.reset();
642+ };
643 } catch (...) {};
644 // discard the sent messages list.
645 sent_messages.clear();
646@@ -184,65 +188,69 @@
647 };
648
649 template <class out, class in, class outp, class inp>
650+bool transceiver<out,in,outp,inp>::is_connected()
651+{
652+ bool returnme = false;
653+ if (sock and (sock->status() == tcp_socket::sock_connect))
654+ {
655+ returnme = true;
656+ };
657+ return returnme;
658+};
659+
660+template <class out, class in, class outp, class inp>
661 inp transceiver<out,in,outp,inp>::get()
662 {
663 // get the message length first.
664 std::string len_field = sock->get(4,10);
665-//std::cout << "len_field=" << http_chartohex(len_field) << std::endl;
666 msg_num_t msg_len;
667 for (int i=0; i<4; i++)
668 {
669- msg_len.bytes[i] = len_field[i];
670+ msg_len.bytes[i] = len_field[i];
671 };
672-//std::cout << ":len=" << msg_len.number << std::endl;
673 // get the rest of the message.
674 inp returnme(new in);
675 std::string input = sock->get(msg_len.number);
676-//std::cout << ":received=" << http_chartohex(input) << std::endl;
677 returnme->ParseFromString(input);
678 // for the first messsge any number is
679 // accepted.
680 if (last_inbound == 0)
681 {
682- last_inbound = returnme->msg_uid();
683+ last_inbound = returnme->msg_uid();
684 }
685 else
686 {
687- last_inbound++;
688- int temp = returnme->msg_uid();
689- if (temp != last_inbound)
690- {
691- inbound_seq_error e;
692- std::stringstream message;
693- message << "Expected " << last_inbound
694- << " received " << temp;
695- e.store(message.str());
696- throw e;
697- };
698+ last_inbound++;
699+ int temp = returnme->msg_uid();
700+ if (temp != last_inbound)
701+ {
702+ inbound_seq_error e;
703+ std::stringstream message;
704+ message << "Expected " << last_inbound
705+ << " received " << temp;
706+ e.store(message.str());
707+ throw e;
708+ };
709 };
710 return returnme;
711 };
712
713 template <class out, class in, class outp, class inp>
714-void transceiver<out,in,outp,inp>::send(outp sendme)
715+void transceiver<out,in,outp,inp>::send(outp & sendme)
716 {
717 sendme->set_msg_uid(out_msg_num());
718 std::string output;
719 output = sendme->SerializeAsString();
720 msg_num_t msg_len;
721 msg_len.number = output.size();
722-//std::cout << "num:len" << msg_len.number << ":" << output.length() << //std::endl;
723 std::string num_field = " ";
724 for (int i=0; i<4; i++)
725 {
726- num_field[i] = msg_len.bytes[i];
727-//std::cout << int(num_field[i]) << "," ;
728+ num_field[i] = msg_len.bytes[i];
729 };
730-//std::cout << " = " << msg_len.number << std::endl;
731 output = num_field + output;
732-//std::cout << "out msg=" << http_chartohex(output) << std::endl;
733 sock->put(output);
734- sent_messages.push_back(sendme);
735+ sent_messages.push_back(*sendme);
736 };
737
738 template <class out, class in, class outp, class inp>
739@@ -279,4 +287,4 @@
740
741 } // namespace nrtb
742
743-#endif //nrtb_transceiver_h//
744\ No newline at end of file
745+#endif //nrtb_transceiver_h//
746
747=== modified file 'common/transceiver/transceiver_test.cpp'
748--- common/transceiver/transceiver_test.cpp 2011-09-17 01:08:29 +0000
749+++ common/transceiver/transceiver_test.cpp 2012-04-01 15:57:19 +0000
750@@ -45,14 +45,14 @@
751
752 void inc()
753 {
754- scope_lock lock(data_lock);
755- er_count++;
756+ scope_lock lock(data_lock);
757+ er_count++;
758 };
759
760 int operator ()()
761 {
762- scope_lock lock(data_lock);
763- return er_count;
764+ scope_lock lock(data_lock);
765+ return er_count;
766 };
767 };
768
769@@ -64,90 +64,124 @@
770
771 tcp_socket_p sock;
772 unsigned long long last_inbound;
773+
774+ server_work_thread()
775+ {
776+ cout << "Creating server_work_thread." << endl;
777+ last_inbound = 0;
778+ }
779
780 ~server_work_thread()
781 {
782- cout << "Destructing server_work_thread" << endl;
783- sock.reset();
784+ cout << "Destructing server_work_thread" << endl;
785 };
786
787 void run()
788 {
789- set_cancel_anytime();
790- linkt link(sock);
791- while (sock->status() == tcp_socket::sock_connect)
792- {
793- try
794- {
795- linkt::out_ptr inbound = link.get();
796- last_inbound = inbound->msg_uid();
797- cout << "\tReceived #" << last_inbound << endl;
798- link.send(inbound);
799- if (last_inbound == 99)
800- {
801- cout << "Receiver thread closing." << endl;
802- exit(0);
803- };
804- }
805- catch (linkt::general_exception & e)
806- {
807- cerr << "Server work thread caught " << e.what()
808- << "\n\tComment: " << e.comment() << endl;
809- er_count.inc();;
810- }
811- catch (tcp_socket::general_exception & e)
812- {
813- cerr << "Server work thread caught " << e.what()
814- << "\n\tComment: " << e.comment() << endl;
815- er_count.inc();;
816- }
817- catch (std::exception & e)
818- {
819- cerr << "Server work thread caught " << e.what()
820- << endl;
821- er_count.inc();;
822+ set_cancel_anytime();
823+ linkt link(sock);
824+ while (link.is_connected())
825+ {
826+ try
827+ {
828+ linkt::in_ptr inbound = link.get();
829+ last_inbound = inbound->msg_uid();
830+ link.send(inbound);
831+ if (last_inbound == 99)
832+ {
833+ cout << "Receiver thread closing." << endl;
834+ exit(0);
835+ };
836+ }
837+ catch (linkt::general_exception & e)
838+ {
839+ cerr << "Server work thread caught " << e.what()
840+ << "\n\tComment: " << e.comment() << endl;
841+ er_count.inc();;
842+ }
843+ catch (tcp_socket::general_exception & e)
844+ {
845+ cerr << "Server work thread caught " << e.what()
846+ << "\n\tComment: " << e.comment() << endl;
847+ er_count.inc();;
848+ }
849+ catch (std::exception & e)
850+ {
851+ cerr << "Server work thread caught " << e.what()
852+ << endl;
853+ er_count.inc();;
854+ };
855 };
856- };
857 };
858 };
859
860 class listener: public tcp_server_socket_factory
861 {
862-protected:
863- boost::shared_ptr<server_work_thread> task;
864-
865 public:
866+
867+ std::unique_ptr<server_work_thread> task;
868+
869 listener(const string & add, const int & back)
870- : tcp_server_socket_factory(add, back) {};
871+ : tcp_server_socket_factory(add, back)
872+ {
873+ cout << "Listener constructed." << endl;
874+ };
875+
876 ~listener()
877 {
878- cout << "Destructing listener" << endl;
879- task.reset();
880+ cout << "Destructing listener" << endl;
881+ // check to see if the listener is still up.
882+ try
883+ {
884+ if (listening())
885+ {
886+ cerr << " Listener is still running...";
887+ stop_listen();
888+ cerr << " shutdown is complete." << endl;
889+ };
890+ }
891+ catch (...)
892+ {
893+ cerr << " Presuming listener is down." << endl;
894+ };
895+ if (!task)
896+ {
897+ cerr << " Task is not allocated. " << endl;
898+ }
899+ // check to see if task is still running and display
900+ // a warning if it is.
901+ if (task and (task->is_running()))
902+ {
903+ cerr << "WARNING: Worker is still running!!" << endl;
904+ task->stop();
905+ task->join();
906+ cerr << "Worker thread shutdown is complete." << endl;
907+ };
908 };
909
910 bool on_accept()
911 {
912- if (!task)
913- {
914- task.reset(new server_work_thread);
915- task->last_inbound = 0;
916- task->sock = connect_sock;
917- task->start(*(task.get()));
918- cout << "server thread running." << endl;
919- // shutdown the listener thead.. our work is done here.
920- return false;
921- }
922- else
923- {
924- connect_sock->close();
925- cerr << "Multiple attempts to connect to server"
926- << endl;
927- };
928+ if (!task)
929+ {
930+ cout << "In listener::on_accept()" << endl;
931+ task.reset(new server_work_thread);
932+ task->sock = std::move(connect_sock);
933+ task->start();
934+ cout << "server thread started." << endl;
935+ // shutdown the listener thead.. our work is done here.
936+ return false;
937+ }
938+ else
939+ {
940+ connect_sock->close();
941+ cerr << "Multiple attempts to connect to server"
942+ << endl;
943+ };
944 };
945 };
946
947 string address = "127.0.0.1:";
948-int port_base = 12334;
949+int port_base = 14334;
950
951 int main()
952 {
953@@ -155,51 +189,77 @@
954
955 try
956 {
957- //set up our port and address
958- boost::mt19937 rng;
959- rng.seed(time(0));
960- boost::uniform_int<> r(0,1000);
961- stringstream s;
962- s << address << port_base + r(rng);
963- address = s.str();
964- cout << "Using " << address << endl;
965-
966- // kick off the listener thread.
967- listener server(address,5);
968- server.start_listen();
969- usleep(1e4);
970-
971- // set up our sender
972- tcp_socket_p sock(new tcp_socket);
973- sock->connect(address);
974- linkt sender(sock);
975-
976- // Let's send a few things.
977- for (int i=0; i<100; i++)
978- {
979- linkt::out_ptr msg(new my_msg);
980- sender.send(msg);
981- cout << "Sent " << msg->msg_uid() << endl;
982- msg = sender.get();
983- cout << "Got back " << msg->msg_uid() << endl;
984- };
985- usleep(1e4);
986+ //set up our port and address
987+ boost::mt19937 rng;
988+ rng.seed(time(0));
989+ boost::uniform_int<> r(0,1000);
990+ stringstream s;
991+ s << address << port_base + r(rng);
992+ address = s.str();
993+ cout << "Using " << address << endl;
994+
995+ // kick off the listener thread.
996+ listener server(address,5);
997+ server.start_listen();
998+ while (!server.listening())
999+ {
1000+ usleep(1e3);
1001+ };
1002+ cout << "Listener thread is ready." << endl;
1003+
1004+ // set up our sender
1005+ tcp_socket_p sock(new tcp_socket);
1006+ int trycount = 0;
1007+ while (sock->status() != tcp_socket::sock_connect)
1008+ {
1009+ try
1010+ {
1011+ sock->connect(address);
1012+ }
1013+ catch (tcp_socket::bad_connect_exception & e)
1014+ {
1015+ trycount++;
1016+ if (trycount > 99)
1017+ {
1018+ cerr << "Too many connect failures for the sender socket."
1019+ << endl;
1020+ throw e;
1021+ };
1022+ usleep(1e4);
1023+ };
1024+ }
1025+ cout << "sender socket is connected to listener" << endl;
1026+ linkt sender(sock);
1027+ cout << "Sender transciever is ready to use." << endl;
1028+
1029+ // Let's send a few things.
1030+ for (int i=0; i<100; i++)
1031+ {
1032+ linkt::out_ptr msg(new my_msg);
1033+ sender.send(msg);
1034+ cout << "Sent " << msg->msg_uid() << ", ";
1035+ msg = sender.get();
1036+ cout << "good return." << endl;
1037+ };
1038+ usleep(1e4);
1039 }
1040 catch (...)
1041 {
1042- cout << "exception caught during test." << endl;
1043- er_count.inc();
1044+ cout << "exception caught during test." << endl;
1045+ er_count.inc();
1046 };
1047
1048 int faults = er_count();
1049 if (faults)
1050 {
1051- cout << "========== ** There were " << faults
1052- << "errors logged. =========" << endl;
1053+ cout << "========== ** There were " << faults
1054+ << " errors logged. =========" << endl;
1055 }
1056 else
1057- cout << "========= nrtb::transceiver test complete.========="
1058- << endl;
1059+ {
1060+ cout << "========= nrtb::transceiver test complete.========="
1061+ << endl;
1062+ };
1063
1064 return faults;
1065-};
1066\ No newline at end of file
1067+};

Subscribers

People subscribed via source and target branches

to all changes: