Merge lp:~fpstovall/nrtb/unique_ptr_fix into lp:nrtb

Proposed by Rick Stovall
Status: Merged
Approved by: Rick Stovall
Approved revision: 21
Merge reported by: Rick Stovall
Merged at revision: not available
Proposed branch: lp:~fpstovall/nrtb/unique_ptr_fix
Merge into: lp:nrtb
Diff against target: 1067 lines (+405/-334)
7 files modified
common/sockets/Makefile (+3/-3)
common/sockets/base_socket.cpp (+97/-112)
common/sockets/base_socket.h (+2/-2)
common/sockets/socket_test.cpp (+98/-80)
common/transceiver/Makefile (+2/-2)
common/transceiver/transceiver.h (+44/-36)
common/transceiver/transceiver_test.cpp (+159/-99)
To merge this branch: bzr merge lp:~fpstovall/nrtb/unique_ptr_fix
Reviewer Review Type Date Requested Status
Rick Stovall Approve
Review via email: mp+100340@code.launchpad.net

Commit message

First C++11 update; using std::unique_ptr for socket handling.

Description of the change

First of the changes to bring the code up to C+11 spec, particularly focused on replacing boost::shared_ptr with std::unique_ptr where appropriate.

This merge includes significant updates to the sockets and transceiver libs, improving stability, reporting and enforcing the logical constraint that a socket can have only one owner and that code which accepts a connection from the tcp_server socket factory must take actual ownership of the new socket. Fixes were also added which allowed the removal of arbitrary time delays in the associated unit test programs.

This will be the first of several merges assuming the move to C++11 is approved.

To post a comment you must log in.
Revision history for this message
Rick Stovall (fpstovall) wrote :

Merge test complete. As there has been no comments or response from the team, I'll be moving the code to the alpha stream. We need to be using the -std=gnu++0x switch on all compiles and links from this point on.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'common/sockets/Makefile'
--- common/sockets/Makefile 2011-09-17 01:21:36 +0000
+++ common/sockets/Makefile 2012-04-01 15:57:19 +0000
@@ -24,13 +24,13 @@
2424
25socket_test: base_socket.o socket_test.cpp25socket_test: base_socket.o socket_test.cpp
26 @rm -f socket_test26 @rm -f socket_test
27 g++ -c -O3 socket_test.cpp -I ../include27 g++ -c -O3 socket_test.cpp -I ../include -std=gnu++0x
28 g++ -o socket_test socket_test.o base_socket.o ../obj/hires_timer.o ../obj/common.o ../obj/base_thread.o -lpthread 28 g++ -o socket_test socket_test.o base_socket.o ../obj/hires_timer.o ../obj/common.o ../obj/base_thread.o -lpthread -std=gnu++0x
2929
3030
31base_socket.o: base_socket.cpp base_socket.h Makefile31base_socket.o: base_socket.cpp base_socket.h Makefile
32 @rm -f base_socket.o32 @rm -f base_socket.o
33 g++ -c -O3 base_socket.cpp -I ../include33 g++ -c -O3 base_socket.cpp -I ../include -std=gnu++0x
3434
35clean:35clean:
36 @rm -vf *.o ../include/base_socket.h socket_test36 @rm -vf *.o ../include/base_socket.h socket_test
3737
=== modified file 'common/sockets/base_socket.cpp'
--- common/sockets/base_socket.cpp 2011-09-17 01:21:36 +0000
+++ common/sockets/base_socket.cpp 2012-04-01 15:57:19 +0000
@@ -605,32 +605,21 @@
605 // take action only if the listen thread is running.605 // take action only if the listen thread is running.
606 if (listening())606 if (listening())
607 {607 {
608 // stop the listener thread608 // stop the listener thread
609 if (is_running()) stop();609 stop();
610 // wait here until the thread stops.610 join();
611 if (is_running()) join();
612// try
613// {
614// if (listen_sock) close(listen_sock);
615// }
616// catch (...) {};
617 };611 };
618};612};
619613
620bool tcp_server_socket_factory::listening()614bool tcp_server_socket_factory::listening()
621{615{
622 bool running = is_running();616 bool running = is_running();
623/* if (!running)617 return running;
624 {618};
625 // check to be sure the thread did not die due to an error.619
626 if (_last_thread_fault != 0)620int tcp_server_socket_factory::last_fault()
627 {621{
628 // if thread_return was non-zero, it is assumed the thread died an622 return _last_thread_fault;
629 // evil and useless death. Scream in anger!
630 throw listen_terminated_exception();
631 };
632 };
633*/ return running;
634};623};
635624
636unsigned short int tcp_server_socket_factory::backlog()625unsigned short int tcp_server_socket_factory::backlog()
@@ -643,108 +632,104 @@
643{632{
644 std::cerr << "in thread cleanup sock closer" << std::endl;633 std::cerr << "in thread cleanup sock closer" << std::endl;
645 int & socket = *(static_cast<int*>(sock));634 int & socket = *(static_cast<int*>(sock));
646 ::close(socket);635 try { ::close(socket); } catch (...) {};
647 std::cerr << "socker closer done." << std::endl;636 std::cerr << "socker closer done." << std::endl;
648};637};
649638
650void tcp_server_socket_factory::run()639void tcp_server_socket_factory::run()
651{640{
652 /* Put this entire thing in a try block to protect the application. 641 // set up the listening socket.
653 * Without this, an untrapped exception thrown here or in the
654 * user supplied on_accept() method would abort the entire
655 * application instead of just this
656 * thread.
657 */
658 int listen_sock;642 int listen_sock;
659 // make sure the listener is closed when we exit.643 _last_thread_fault = 0;
660 pthread_cleanup_push(closeme, (void*) &listen_sock);644 bool go = false;
661 try645 try
662 {646 {
663 bool go = true;647 listen_sock = socket(AF_INET,SOCK_STREAM,0);
664 // set up our listening socket.648 sockaddr_in myaddr;
665 listen_sock = socket(AF_INET,SOCK_STREAM,0);649 myaddr = tcp_socket::str_to_sockaddr(_address);
666 sockaddr_in myaddr;650 int a = bind(listen_sock,(sockaddr *) &myaddr,sizeof(myaddr));
667 try651 int b = listen(listen_sock,_backlog);
668 {652 if (a || b)
669 myaddr = tcp_socket::str_to_sockaddr(_address);653 {
670 }654 go = false;
671 catch (...)655 if (a) _last_thread_fault += 1;
672 {656 if (b) _last_thread_fault += 2;
673 // probably a tcp_socket::bad_address_exception, 657 }
674 // but any reason will do.658 else go = true;
675 go = false;
676 };
677 if (bind(listen_sock,(sockaddr *) &myaddr,sizeof(myaddr)))
678 {
679 // bind did not work.
680 go = false;
681 };
682 if (listen(listen_sock,_backlog))
683 {
684 // listen failed in some way.. I don't care which.
685 go = false;
686 };
687 // processing loop
688 while (go)
689 {
690 // accept a new connection
691 bool good_connect = true;
692 int new_conn = accept(listen_sock,NULL,NULL);
693 // validate the accept return value.
694 if (new_conn == -1)
695 {
696 // accept returned an error.
697 switch (errno)
698 {
699// case ENETDOWN :
700 case EPROTO :
701// case ENOPROTOOPT :
702 case EHOSTDOWN :
703// case ENONET :
704 case EHOSTUNREACH :
705// case EOPNOTSUPP :
706// case ENETUNREACH :
707 case EAGAIN :
708// case EPERM :
709 case ECONNABORTED :
710 {
711 good_connect = false;
712 break;
713 };
714 default :
715 {
716 // for any other error, we're going to shutdown the
717 // this listener thread.
718 go = false;
719 good_connect = false;
720 _last_thread_fault = errno;
721 break;
722 };
723 }; // switch (errno)
724 }; // error thrown by accept.
725 if (good_connect)
726 {
727 // create connect_sock
728 connect_sock.reset(new tcp_socket(new_conn));
729 // make the thread easily cancelable.
730 set_cancel_anytime();
731 // call on_accept
732 go = on_accept();
733 // set back to cancel_deferred.
734 set_deferred_cancel();
735 // release our claim to the new socket
736 connect_sock.reset();
737 };
738 }; // while go;
739 }659 }
740 catch (...)660 catch (...)
741 {661 {
742 /* an untrapped exception was thrown by someone in this thread.662 _last_thread_fault = 100;
743 * We'll shutdown this thread and put -1 in the thread_return field663 };
744 * to let the world know that we don't know what killed us.664 // if not in a good state, terminate the thread.
745 */665 if (!go)
746 _last_thread_fault = -1;666 {
747 };667 _last_thread_fault =+ 200;
668 exit(0);
669 };
670 // make sure the listener is closed when we exit.
671 pthread_cleanup_push(closeme, (void*) &listen_sock);
672 // processing loop
673 while (go)
674 {
675 // accept a new connection
676 bool good_connect = true;
677 int new_conn = accept(listen_sock,NULL,NULL);
678 // validate the accept return value.
679 if (new_conn == -1)
680 {
681 // accept returned an error.
682 switch (errno)
683 {
684 case EPROTO :
685 case EHOSTDOWN :
686 case EHOSTUNREACH :
687 case EAGAIN :
688 case ECONNABORTED :
689 {
690 // abandon this connection
691 good_connect = false;
692 break;
693 };
694 default :
695 {
696 // for any other error, we're going to shutdown the
697 // this listener thread.
698 go = false;
699 good_connect = false;
700 _last_thread_fault = errno;
701 break;
702 };
703 }; // switch (errno)
704 }; // error thrown by accept.
705 if (good_connect)
706 {
707 connect_sock.reset(new tcp_socket(new_conn));
708 set_cancel_anytime();
709 // call the connection handler.
710 try
711 {
712 go = on_accept();
713 }
714 catch (...)
715 {
716 go = false;
717 _last_thread_fault = 501;
718 };
719 set_deferred_cancel();
720 // safety check.
721 if (connect_sock)
722 {
723 std::cerr << "WARNING: on_accept() did not take ownership of "
724 << "connect_sock.\n"
725 << " This can lead to leaks and should be fixed."
726 << std::endl;
727 connect_sock.reset();
728 _last_thread_fault = 500;
729 go = false;
730 };
731 };
732 }; // while go;
748 pthread_cleanup_pop(0);733 pthread_cleanup_pop(0);
749};734};
750735
751736
=== modified file 'common/sockets/base_socket.h'
--- common/sockets/base_socket.h 2011-09-17 00:33:23 +0000
+++ common/sockets/base_socket.h 2012-04-01 15:57:19 +0000
@@ -20,7 +20,7 @@
20#define base_socket_header20#define base_socket_header
2121
22#include <base_thread.h>22#include <base_thread.h>
23#include <boost/shared_ptr.hpp>23#include <memory>
24#include <sys/socket.h>24#include <sys/socket.h>
25#include <netinet/in.h>25#include <netinet/in.h>
2626
@@ -373,7 +373,7 @@
373};373};
374374
375/// smart pointer for use with tcp_sockets375/// smart pointer for use with tcp_sockets
376typedef boost::shared_ptr<nrtb::tcp_socket> tcp_socket_p;376typedef std::unique_ptr<nrtb::tcp_socket> tcp_socket_p;
377377
378/** Abstract "listener" TCP/IP socket for servers. 378/** Abstract "listener" TCP/IP socket for servers.
379 ** 379 **
380380
=== modified file 'common/sockets/socket_test.cpp'
--- common/sockets/socket_test.cpp 2011-09-17 01:21:36 +0000
+++ common/sockets/socket_test.cpp 2012-04-01 15:57:19 +0000
@@ -21,7 +21,6 @@
21#include <string>21#include <string>
22#include <boost/random.hpp>22#include <boost/random.hpp>
23#include "base_socket.h"23#include "base_socket.h"
24#include <boost/shared_ptr.hpp>
2524
26using namespace nrtb;25using namespace nrtb;
27using namespace std;26using namespace std;
@@ -29,46 +28,48 @@
29class myserver: public tcp_server_socket_factory28class myserver: public tcp_server_socket_factory
30{29{
31public:30public:
32 int hits;31 int hits;
33 int errors;32 int errors;
3433
35 // constructor34 // constructor
36 myserver(const string & a, const unsigned short int & b) 35 myserver(const string & a, const unsigned short int & b)
37 : tcp_server_socket_factory(a,b)36 : tcp_server_socket_factory(a,b)
38 {37 {
39 // Don't need to lock here because we know the 38 // Don't need to lock here because we know the
40 // listener thread is not running.39 // listener thread is not running.
41 hits = 0;40 hits = 0;
42 errors = 0;41 errors = 0;
43 };42 };
4443
45protected:44protected:
46 // on_accept() is called on each connection.45
47 bool on_accept()46 // on_accept() is called on each connection.
48 {47 bool on_accept()
49 try48 {
50 {49 try
51 // just return what we've recieved.50 {
52 string msg = connect_sock->getln();51 tcp_socket_p sock = std::move(connect_sock);
53 connect_sock->put(msg);52 // just return what we've recieved.
54 // Update our hit count. 53 string msg = sock->getln();
55 hits++;54 sock->put(msg);
56 }55 // Update our hit count.
57 catch (base_exception & e)56 hits++;
58 {57 }
59 errors++;58 catch (base_exception & e)
60 cerr << "server Caught " << e.what() << endl;59 {
61 }60 errors++;
62 catch (...)61 cerr << "server Caught " << e.what() << endl;
63 {62 }
64 errors++;63 catch (...)
65 cerr << "Unexpected error in on_accept()" << endl;64 {
66 };65 errors++;
67 if (hits > 99) 66 cerr << "Unexpected error in on_accept()" << endl;
68 return false;67 };
69 else68 if (hits > 99)
70 return true;69 return false;
71 };70 else
71 return true;
72 };
72};73};
7374
74string transceiver(const string address, const string sendme)75string transceiver(const string address, const string sendme)
@@ -78,7 +79,7 @@
78 sender.connect(address);79 sender.connect(address);
79 sender.put(sendme);80 sender.put(sendme);
80 returnme = sender.getln();81 returnme = sender.getln();
81 sender.close();//cerr << "tc>> sock closed" << endl;82 sender.close();
82 return returnme;83 return returnme;
83};84};
8485
@@ -101,86 +102,103 @@
101102
102 try103 try
103 {104 {
104 // start the receiver/server105 // start the receiver/server
105 test_server.start_listen();106 test_server.start_listen();
106 usleep(5e5);107 int countdown = 99;
107 108 while ((!test_server.listening()) and countdown)
108 // Send test messages109 {
109 for (int i = 0; i < 100; i++)110 usleep(1e3);
110 {111 countdown++;
111 stringstream msg;112 };
112 msg << "test message " << i << "\r";113 if (!countdown)
113 string checkme = msg.str();114 {
114 string returned = transceiver(address, checkme);115 cerr << "Could not start listener." << endl;
115 if (returned != checkme)116 exit(1);
116 {117 }
117 er_count++;118 cout << "test_server ready." << endl;
118 };119
119 cout << returned.substr(0,returned.size()-1) << ": " 120 // Send test messages
120 << ((returned == checkme) ? "Passed" : "Failed")121 for (int i = 0; i < 100; i++)
121 << endl;122 {
122 };123 stringstream msg;
124 msg << "test message " << i << "\r";
125 string checkme = msg.str();
126 string returned = transceiver(address, checkme);
127 if (returned != checkme)
128 {
129 er_count++;
130 };
131 cout << returned.substr(0,returned.size()-1) << ": "
132 << ((returned == checkme) ? "Passed" : "Failed")
133 << endl;
134 };
123 }135 }
124 catch (myserver::bind_failure_exception)136 catch (myserver::bind_failure_exception)
125 {137 {
126 cout << "Could not bind port" << endl;138 cout << "Could not bind port" << endl;
127 }139 }
128 catch (myserver::mem_exhasted_exception)140 catch (myserver::mem_exhasted_exception)
129 {141 {
130 cout << "myserver reports out of memory." << endl;142 cout << "myserver reports out of memory." << endl;
131 }143 }
132 catch (myserver::listen_terminated_exception)144 catch (myserver::listen_terminated_exception)
133 {145 {
134 cout << "Listener terminated unexpectedly." << endl;146 cout << "Listener terminated unexpectedly." << endl;
135 }147 }
136 catch (myserver::on_accept_bound_exception)148 catch (myserver::on_accept_bound_exception)
137 {149 {
138 cout << "myserver::on_accept() seems bound." << endl;150 cout << "myserver::on_accept() seems bound." << endl;
139 }151 }
140 catch (tcp_socket::bad_connect_exception & e)152 catch (tcp_socket::bad_connect_exception & e)
141 {153 {
142 cout << "A bad_connect_exception was thrown.\n" 154 cout << "A bad_connect_exception was thrown.\n"
143 << e.comment() << endl;155 << " comment: " << e.comment() << endl;
156 cout << " test_server.last_fault() = "
157 << test_server.last_fault() << endl;
144 }158 }
145 catch (tcp_socket::not_open_exception & e)159 catch (tcp_socket::not_open_exception & e)
146 {160 {
147 cout << "A tcp not open exception was caught.\n" 161 cout << "A tcp not open exception was caught.\n"
148 << e.comment() << endl;162 << e.comment() << endl;
149 }163 }
150 catch (tcp_socket::close_exception & e)164 catch (tcp_socket::close_exception & e)
151 {165 {
152 cout << "A close_exception was caught.\n" 166 cout << "A close_exception was caught.\n"
153 << e.comment() << endl;167 << e.comment() << endl;
154 }168 }
155 catch (tcp_socket::overrun_exception & e)169 catch (tcp_socket::overrun_exception & e)
156 {170 {
157 cout << "An overrun_exception was caught.\n" 171 cout << "An overrun_exception was caught.\n"
158 << e.comment() << endl;172 << e.comment() << endl;
159 }173 }
160 catch (tcp_socket::buffer_full_exception & e)174 catch (tcp_socket::buffer_full_exception & e)
161 {175 {
162 cout << "A buffer_full_exception was caught.\n" 176 cout << "A buffer_full_exception was caught.\n"
163 << e.comment() << endl;177 << e.comment() << endl;
164 }178 }
165 catch (tcp_socket::general_exception & e)179 catch (tcp_socket::general_exception & e)
166 {180 {
167 cout << "A tcp_socket exception was caught.\n" 181 cout << "A tcp_socket exception was caught.\n"
168 << e.comment() << endl;182 << " comment: " << e.comment() << endl;
183 cout << " test_server.last_fault() = "
184 << test_server.last_fault() << endl;
169 }185 }
170 catch (exception & e)186 catch (exception & e)
171 {187 {
172 cout << "A unexpected " << e.what() << " exception was caught." << endl;188 cout << "A unexpected " << e.what()
189 << " exception was caught." << endl;
173 };190 };
174191
175 // final check.192 // final check.
176 if (test_server.hits != 100)193 if (test_server.hits != 100)
177 {194 {
178 er_count++;195 er_count++;
179 cout << "Server does not report the proper number of hits.\n"196 cout << "Server does not report the proper number of hits.\n"
180 << "\tExpected 100, got " << test_server.hits 197 << "\tExpected 100, got " << test_server.hits
181 << endl;198 << endl;
182 };199 };
183 cout << "=========== tcp_socket test complete =============" << endl;200 cout << "=========== tcp_socket test complete ============="
201 << endl;
184 202
185 return er_count;203 return er_count;
186};204};
187205
=== modified file 'common/transceiver/Makefile'
--- common/transceiver/Makefile 2011-08-25 04:22:52 +0000
+++ common/transceiver/Makefile 2012-04-01 15:57:19 +0000
@@ -23,8 +23,8 @@
2323
24transceiver_test: transceiver.h transceiver_test.cpp24transceiver_test: transceiver.h transceiver_test.cpp
25 @rm -f transceiver_test25 @rm -f transceiver_test
26 g++ -c transceiver_test.cpp -I../include26 g++ -c transceiver_test.cpp -I../include -std=gnu++0x
27 g++ -o transceiver_test transceiver_test.o ../obj/common.o ../obj/log_setup.o ../obj/serializer.o ../obj/base_thread.o ../obj/base_socket.o ../obj/confreader.o -lpthread -lprotobuf ../lib/nrtb_gpb.a -lPocoFoundation27 g++ -o transceiver_test transceiver_test.o ../obj/common.o ../obj/log_setup.o ../obj/serializer.o ../obj/base_thread.o ../obj/base_socket.o ../obj/confreader.o -lpthread -lprotobuf ../lib/nrtb_gpb.a -lPocoFoundation -std=gnu++0x
2828
29clean:29clean:
30 @rm -rvf *.o transceiver_test ../include/transceiver.h *.log ../obj/transceiver.o30 @rm -rvf *.o transceiver_test ../include/transceiver.h *.log ../obj/transceiver.o
3131
=== modified file 'common/transceiver/transceiver.h'
--- common/transceiver/transceiver.h 2011-09-15 22:53:06 +0000
+++ common/transceiver/transceiver.h 2012-04-01 15:57:19 +0000
@@ -26,7 +26,6 @@
26#include <serializer.h>26#include <serializer.h>
27#include <confreader.h>27#include <confreader.h>
28#include <Poco/Logger.h>28#include <Poco/Logger.h>
29#include <boost/shared_ptr.hpp>
30#include <boost/circular_buffer.hpp>29#include <boost/circular_buffer.hpp>
3130
32namespace nrtb31namespace nrtb
@@ -51,8 +50,8 @@
51 * specification this class implements.50 * specification this class implements.
52 * ***************************************************************/51 * ***************************************************************/
53 template <class out, class in,52 template <class out, class in,
54 class outp = boost::shared_ptr<out>,53 class outp = std::unique_ptr<out>,
55 class inp = boost::shared_ptr<in> >54 class inp = std::unique_ptr<in> >
56 class transceiver55 class transceiver
57 {56 {
58 public:57 public:
@@ -70,12 +69,17 @@
70 * socket. Once created this class assumes it uniquely owns the 69 * socket. Once created this class assumes it uniquely owns the
71 * socket and will close it upon distruction.70 * socket and will close it upon distruction.
72 * ***********************************************************/71 * ***********************************************************/
73 transceiver(tcp_socket_p socket);72 transceiver(tcp_socket_p & socket);
74 /*************************************************************73 /*************************************************************
75 * Closes the socket and releases all mmemory associated with74 * Closes the socket and releases all mmemory associated with
76 * this class.75 * this class.
77 * ***********************************************************/76 * ***********************************************************/
78 virtual ~transceiver();77 virtual ~transceiver();
78 /*************************************************************
79 * is_connected() returns true if the socket is up and ready
80 * to use, false otherwise.
81 *************************************************************/
82 bool is_connected();
79 /**************************************************************83 /**************************************************************
80 * gets the next message from the socket. If no messages are 84 * gets the next message from the socket. If no messages are
81 * ready, blocks util one arrives. 85 * ready, blocks util one arrives.
@@ -85,7 +89,7 @@
85 * Sends a message over the socket and adds it to the 89 * Sends a message over the socket and adds it to the
86 * sent_messages buffer in case it's needed for error recovery.90 * sent_messages buffer in case it's needed for error recovery.
87 * ***********************************************************/91 * ***********************************************************/
88 void send(outp sendme);92 void send(outp & sendme);
89 /**************************************************************93 /**************************************************************
90 * Called by the data consumer when an inbound message was 94 * Called by the data consumer when an inbound message was
91 * not valid in the current application context. msg_number95 * not valid in the current application context. msg_number
@@ -124,7 +128,7 @@
124 unsigned long long last_inbound;128 unsigned long long last_inbound;
125 /// buffer to hold previously sent messages; required for 129 /// buffer to hold previously sent messages; required for
126 /// error recovery.130 /// error recovery.
127 boost::circular_buffer<outp> sent_messages;131 boost::circular_buffer<out> sent_messages;
128 /// fence post for recovery efforts, zero if none in play132 /// fence post for recovery efforts, zero if none in play
129 unsigned long long nak_fence_post;133 unsigned long long nak_fence_post;
130 /// These methods implment actual nak recovery.134 /// These methods implment actual nak recovery.
@@ -136,7 +140,7 @@
136serializer tscvr_sequence(0);140serializer tscvr_sequence(0);
137141
138template <class out, class in, class outp, class inp>142template <class out, class in, class outp, class inp>
139transceiver<out,in,outp,inp>::transceiver(tcp_socket_p socket)143transceiver<out,in,outp,inp>::transceiver(tcp_socket_p & socket)
140{144{
141 // get the configuration parameters.145 // get the configuration parameters.
142 global_conf_reader & config = global_conf_reader::get_instance();146 global_conf_reader & config = global_conf_reader::get_instance();
@@ -152,7 +156,7 @@
152 logname = s.str();156 logname = s.str();
153 Poco::Logger & log = Poco::Logger::get(logname);157 Poco::Logger & log = Poco::Logger::get(logname);
154 // set up the socket.158 // set up the socket.
155 sock = socket;159 sock = std::move(socket);
156 // annouce ourselves...160 // annouce ourselves...
157 log.information("Instanciated."); 161 log.information("Instanciated.");
158 s.str("");162 s.str("");
@@ -173,10 +177,10 @@
173 // shutdown and release the socket.177 // shutdown and release the socket.
174 try 178 try
175 {179 {
176 if (sock)180 if (sock)
177 {181 {
178 sock.reset(); 182 sock.reset();
179 };183 };
180 } catch (...) {};184 } catch (...) {};
181 // discard the sent messages list.185 // discard the sent messages list.
182 sent_messages.clear();186 sent_messages.clear();
@@ -184,65 +188,69 @@
184};188};
185189
186template <class out, class in, class outp, class inp>190template <class out, class in, class outp, class inp>
191bool transceiver<out,in,outp,inp>::is_connected()
192{
193 bool returnme = false;
194 if (sock and (sock->status() == tcp_socket::sock_connect))
195 {
196 returnme = true;
197 };
198 return returnme;
199};
200
201template <class out, class in, class outp, class inp>
187inp transceiver<out,in,outp,inp>::get()202inp transceiver<out,in,outp,inp>::get()
188{203{
189 // get the message length first.204 // get the message length first.
190 std::string len_field = sock->get(4,10);205 std::string len_field = sock->get(4,10);
191//std::cout << "len_field=" << http_chartohex(len_field) << std::endl;
192 msg_num_t msg_len;206 msg_num_t msg_len;
193 for (int i=0; i<4; i++)207 for (int i=0; i<4; i++)
194 {208 {
195 msg_len.bytes[i] = len_field[i];209 msg_len.bytes[i] = len_field[i];
196 };210 };
197//std::cout << ":len=" << msg_len.number << std::endl;
198 // get the rest of the message.211 // get the rest of the message.
199 inp returnme(new in);212 inp returnme(new in);
200 std::string input = sock->get(msg_len.number);213 std::string input = sock->get(msg_len.number);
201//std::cout << ":received=" << http_chartohex(input) << std::endl;
202 returnme->ParseFromString(input);214 returnme->ParseFromString(input);
203 // for the first messsge any number is215 // for the first messsge any number is
204 // accepted.216 // accepted.
205 if (last_inbound == 0)217 if (last_inbound == 0)
206 {218 {
207 last_inbound = returnme->msg_uid();219 last_inbound = returnme->msg_uid();
208 }220 }
209 else221 else
210 {222 {
211 last_inbound++;223 last_inbound++;
212 int temp = returnme->msg_uid();224 int temp = returnme->msg_uid();
213 if (temp != last_inbound)225 if (temp != last_inbound)
214 { 226 {
215 inbound_seq_error e;227 inbound_seq_error e;
216 std::stringstream message;228 std::stringstream message;
217 message << "Expected " << last_inbound229 message << "Expected " << last_inbound
218 << " received " << temp;230 << " received " << temp;
219 e.store(message.str());231 e.store(message.str());
220 throw e;232 throw e;
221 };233 };
222 };234 };
223 return returnme;235 return returnme;
224};236};
225237
226template <class out, class in, class outp, class inp>238template <class out, class in, class outp, class inp>
227void transceiver<out,in,outp,inp>::send(outp sendme)239void transceiver<out,in,outp,inp>::send(outp & sendme)
228{240{
229 sendme->set_msg_uid(out_msg_num());241 sendme->set_msg_uid(out_msg_num());
230 std::string output;242 std::string output;
231 output = sendme->SerializeAsString();243 output = sendme->SerializeAsString();
232 msg_num_t msg_len;244 msg_num_t msg_len;
233 msg_len.number = output.size();245 msg_len.number = output.size();
234//std::cout << "num:len" << msg_len.number << ":" << output.length() << //std::endl;
235 std::string num_field = " ";246 std::string num_field = " ";
236 for (int i=0; i<4; i++)247 for (int i=0; i<4; i++)
237 {248 {
238 num_field[i] = msg_len.bytes[i];249 num_field[i] = msg_len.bytes[i];
239//std::cout << int(num_field[i]) << "," ;
240 };250 };
241//std::cout << " = " << msg_len.number << std::endl;
242 output = num_field + output;251 output = num_field + output;
243//std::cout << "out msg=" << http_chartohex(output) << std::endl;
244 sock->put(output);252 sock->put(output);
245 sent_messages.push_back(sendme);253 sent_messages.push_back(*sendme);
246};254};
247255
248template <class out, class in, class outp, class inp>256template <class out, class in, class outp, class inp>
@@ -279,4 +287,4 @@
279287
280} // namespace nrtb288} // namespace nrtb
281 289
282#endif //nrtb_transceiver_h//
283\ No newline at end of file290\ No newline at end of file
291#endif //nrtb_transceiver_h//
284292
=== modified file 'common/transceiver/transceiver_test.cpp'
--- common/transceiver/transceiver_test.cpp 2011-09-17 01:08:29 +0000
+++ common/transceiver/transceiver_test.cpp 2012-04-01 15:57:19 +0000
@@ -45,14 +45,14 @@
4545
46 void inc()46 void inc()
47 {47 {
48 scope_lock lock(data_lock);48 scope_lock lock(data_lock);
49 er_count++;49 er_count++;
50 };50 };
5151
52 int operator ()()52 int operator ()()
53 {53 {
54 scope_lock lock(data_lock);54 scope_lock lock(data_lock);
55 return er_count;55 return er_count;
56 };56 };
57};57};
5858
@@ -64,90 +64,124 @@
64 64
65 tcp_socket_p sock;65 tcp_socket_p sock;
66 unsigned long long last_inbound;66 unsigned long long last_inbound;
67
68 server_work_thread()
69 {
70 cout << "Creating server_work_thread." << endl;
71 last_inbound = 0;
72 }
67 73
68 ~server_work_thread()74 ~server_work_thread()
69 {75 {
70 cout << "Destructing server_work_thread" << endl;76 cout << "Destructing server_work_thread" << endl;
71 sock.reset();
72 };77 };
73 78
74 void run()79 void run()
75 {80 {
76 set_cancel_anytime();81 set_cancel_anytime();
77 linkt link(sock);82 linkt link(sock);
78 while (sock->status() == tcp_socket::sock_connect)83 while (link.is_connected())
79 {84 {
80 try 85 try
81 {86 {
82 linkt::out_ptr inbound = link.get();87 linkt::in_ptr inbound = link.get();
83 last_inbound = inbound->msg_uid();88 last_inbound = inbound->msg_uid();
84 cout << "\tReceived #" << last_inbound << endl;89 link.send(inbound);
85 link.send(inbound);90 if (last_inbound == 99)
86 if (last_inbound == 99)91 {
87 {92 cout << "Receiver thread closing." << endl;
88 cout << "Receiver thread closing." << endl;93 exit(0);
89 exit(0);94 };
90 };95 }
91 }96 catch (linkt::general_exception & e)
92 catch (linkt::general_exception & e)97 {
93 {98 cerr << "Server work thread caught " << e.what()
94 cerr << "Server work thread caught " << e.what()99 << "\n\tComment: " << e.comment() << endl;
95 << "\n\tComment: " << e.comment() << endl;100 er_count.inc();;
96 er_count.inc();;101 }
97 }102 catch (tcp_socket::general_exception & e)
98 catch (tcp_socket::general_exception & e)103 {
99 {104 cerr << "Server work thread caught " << e.what()
100 cerr << "Server work thread caught " << e.what()105 << "\n\tComment: " << e.comment() << endl;
101 << "\n\tComment: " << e.comment() << endl;106 er_count.inc();;
102 er_count.inc();;107 }
103 }108 catch (std::exception & e)
104 catch (std::exception & e)109 {
105 {110 cerr << "Server work thread caught " << e.what()
106 cerr << "Server work thread caught " << e.what() 111 << endl;
107 << endl;112 er_count.inc();;
108 er_count.inc();;113 };
109 };114 };
110 };
111 };115 };
112};116};
113117
114class listener: public tcp_server_socket_factory118class listener: public tcp_server_socket_factory
115{119{
116protected:
117 boost::shared_ptr<server_work_thread> task;
118
119public:120public:
121
122 std::unique_ptr<server_work_thread> task;
123
120 listener(const string & add, const int & back)124 listener(const string & add, const int & back)
121 : tcp_server_socket_factory(add, back) {};125 : tcp_server_socket_factory(add, back)
126 {
127 cout << "Listener constructed." << endl;
128 };
129
122 ~listener()130 ~listener()
123 {131 {
124 cout << "Destructing listener" << endl;132 cout << "Destructing listener" << endl;
125 task.reset();133 // check to see if the listener is still up.
134 try
135 {
136 if (listening())
137 {
138 cerr << " Listener is still running...";
139 stop_listen();
140 cerr << " shutdown is complete." << endl;
141 };
142 }
143 catch (...)
144 {
145 cerr << " Presuming listener is down." << endl;
146 };
147 if (!task)
148 {
149 cerr << " Task is not allocated. " << endl;
150 }
151 // check to see if task is still running and display
152 // a warning if it is.
153 if (task and (task->is_running()))
154 {
155 cerr << "WARNING: Worker is still running!!" << endl;
156 task->stop();
157 task->join();
158 cerr << "Worker thread shutdown is complete." << endl;
159 };
126 };160 };
127 161
128 bool on_accept()162 bool on_accept()
129 {163 {
130 if (!task)164 if (!task)
131 {165 {
132 task.reset(new server_work_thread);166 cout << "In listener::on_accept()" << endl;
133 task->last_inbound = 0;167 task.reset(new server_work_thread);
134 task->sock = connect_sock;168 task->sock = std::move(connect_sock);
135 task->start(*(task.get()));169 task->start();
136 cout << "server thread running." << endl;170 cout << "server thread started." << endl;
137 // shutdown the listener thead.. our work is done here.171 // shutdown the listener thead.. our work is done here.
138 return false;172 return false;
139 }173 }
140 else174 else
141 {175 {
142 connect_sock->close();176 connect_sock->close();
143 cerr << "Multiple attempts to connect to server" 177 cerr << "Multiple attempts to connect to server"
144 << endl;178 << endl;
145 };179 };
146 };180 };
147};181};
148182
149string address = "127.0.0.1:";183string address = "127.0.0.1:";
150int port_base = 12334;184int port_base = 14334;
151185
152int main()186int main()
153{187{
@@ -155,51 +189,77 @@
155189
156 try190 try
157 {191 {
158 //set up our port and address192 //set up our port and address
159 boost::mt19937 rng;193 boost::mt19937 rng;
160 rng.seed(time(0));194 rng.seed(time(0));
161 boost::uniform_int<> r(0,1000);195 boost::uniform_int<> r(0,1000);
162 stringstream s;196 stringstream s;
163 s << address << port_base + r(rng);197 s << address << port_base + r(rng);
164 address = s.str();198 address = s.str();
165 cout << "Using " << address << endl;199 cout << "Using " << address << endl;
166200
167 // kick off the listener thread.201 // kick off the listener thread.
168 listener server(address,5);202 listener server(address,5);
169 server.start_listen();203 server.start_listen();
170 usleep(1e4);204 while (!server.listening())
171205 {
172 // set up our sender206 usleep(1e3);
173 tcp_socket_p sock(new tcp_socket);207 };
174 sock->connect(address);208 cout << "Listener thread is ready." << endl;
175 linkt sender(sock);209
176210 // set up our sender
177 // Let's send a few things.211 tcp_socket_p sock(new tcp_socket);
178 for (int i=0; i<100; i++)212 int trycount = 0;
179 {213 while (sock->status() != tcp_socket::sock_connect)
180 linkt::out_ptr msg(new my_msg);214 {
181 sender.send(msg);215 try
182 cout << "Sent " << msg->msg_uid() << endl;216 {
183 msg = sender.get();217 sock->connect(address);
184 cout << "Got back " << msg->msg_uid() << endl;218 }
185 };219 catch (tcp_socket::bad_connect_exception & e)
186 usleep(1e4);220 {
221 trycount++;
222 if (trycount > 99)
223 {
224 cerr << "Too many connect failures for the sender socket."
225 << endl;
226 throw e;
227 };
228 usleep(1e4);
229 };
230 }
231 cout << "sender socket is connected to listener" << endl;
232 linkt sender(sock);
233 cout << "Sender transciever is ready to use." << endl;
234
235 // Let's send a few things.
236 for (int i=0; i<100; i++)
237 {
238 linkt::out_ptr msg(new my_msg);
239 sender.send(msg);
240 cout << "Sent " << msg->msg_uid() << ", ";
241 msg = sender.get();
242 cout << "good return." << endl;
243 };
244 usleep(1e4);
187 }245 }
188 catch (...)246 catch (...)
189 {247 {
190 cout << "exception caught during test." << endl;248 cout << "exception caught during test." << endl;
191 er_count.inc();249 er_count.inc();
192 };250 };
193251
194 int faults = er_count(); 252 int faults = er_count();
195 if (faults)253 if (faults)
196 {254 {
197 cout << "========== ** There were " << faults 255 cout << "========== ** There were " << faults
198 << "errors logged. =========" << endl; 256 << " errors logged. =========" << endl;
199 }257 }
200 else258 else
201 cout << "========= nrtb::transceiver test complete.=========" 259 {
202 << endl;260 cout << "========= nrtb::transceiver test complete.========="
261 << endl;
262 };
203263
204 return faults;264 return faults;
205};
206\ No newline at end of file265\ No newline at end of file
266};

Subscribers

People subscribed via source and target branches

to all changes: