Merge lp:~nchohan/appscale/SSLCassyMysql into lp:appscale

Proposed by Navraj Chohan
Status: Merged
Merged at revision: 534
Proposed branch: lp:~nchohan/appscale/SSLCassyMysql
Merge into: lp:appscale
Diff against target: 5757 lines (+2317/-1984)
43 files modified
AppController/djinn.rb (+28/-21)
AppController/haproxy.rb (+5/-0)
AppController/helperfunctions.rb (+8/-4)
AppController/load_balancer.rb (+5/-0)
AppController/nginx.rb (+93/-15)
AppController/pbserver.rb (+106/-0)
AppController/terminate.rb (+1/-0)
AppDB/appscale_server.py (+117/-312)
AppDB/appscale_server_native_trans.py (+1086/-0)
AppDB/appscale_server_no_trans.py (+0/-1117)
AppDB/as_transaction.py (+0/-241)
AppDB/cassandra/py_cassandra.py (+209/-62)
AppDB/cassandra/templates/storage-conf.xml (+1/-1)
AppDB/datastore_tester.py (+20/-8)
AppDB/dbinterface.py (+12/-4)
AppDB/dhash_datastore.py (+3/-3)
AppDB/hbase/py_hbase.py (+6/-8)
AppDB/helper_functions.py (+1/-1)
AppDB/hypertable/py_hypertable.py (+8/-2)
AppDB/mongodb/py_mongodb.py (+6/-2)
AppDB/mysql/drop_all_tables.py (+4/-1)
AppDB/mysql/prime_mysql.py (+21/-9)
AppDB/mysql/py_mysql.py (+178/-58)
AppDB/mysql/test_mysql_trans.py (+115/-0)
AppDB/soap_server.py (+1/-1)
AppDB/voldemort/py_voldemort.py (+8/-1)
AppDB/zkappscale/zktransaction.py (+0/-1)
AppDB/zkappscale/zktransaction_stub.py (+175/-0)
AppLoadBalancer/lib/usertools.rb (+2/-2)
AppServer/google/appengine/api/datastore_file_distributed.py (+1/-1)
AppServer/google/appengine/tools/dev_appserver.py (+3/-5)
AppServer/google/appengine/tools/dev_appserver_login.py (+18/-81)
AppServer/google/appengine/tools/dev_appserver_main.py (+20/-4)
AppServer/google/net/proto/ProtocolBuffer.py (+1/-1)
AppServer_Java/build.xml (+6/-13)
AppServer_Java/src/com/google/appengine/api/users/dev/LocalLoginServlet.java (+1/-1)
AppServer_Java/src/com/google/appengine/tools/resources/ResourceLoader.java (+2/-2)
debian/appscale_install.sh (+3/-0)
debian/appscale_install_functions.sh (+38/-0)
debian/control.core.jaunty (+1/-0)
debian/control.core.karmic (+1/-0)
debian/control.core.lucid (+2/-1)
firewall.conf (+1/-1)
To merge this branch: bzr merge lp:~nchohan/appscale/SSLCassyMysql
Reviewer Review Type Date Requested Status
Chris Bunch Approve
Review via email: mp+28595@code.launchpad.net

Description of the change

Tested with tasks on all databases. All of them work except for Voldemort. Voldemort works for guestbook.

To post a comment you must log in.
lp:~nchohan/appscale/SSLCassyMysql updated
480. By root <root@appscale-image>

merging with SSLCassyMySQL

481. By root <root@appscale-image0>

dropping database

482. By root <root@appscale-image0>

merged in changes, fixed conflicts

Revision history for this message
Chris Bunch (cgb-cs) wrote :

All databases other than MySQL appear to work fine (haven't tried apps other than guestbook yet). MySQL has failed twice. Sample output from one run:

Traceback (most recent call last):
  File "/root/appscale/AppDB/mysql/prime_mysql.py", line 64, in <module>
    drop_database()
  File "/root/appscale/AppDB/mysql/prime_mysql.py", line 32, in drop_database
    cursor.execute("DROP DATABASE IF EXISTS appscale;")
  File "/var/lib/python-support/python2.6/MySQLdb/cursors.py", line 166, in execute
    self.errorhandler(self, exc, value)
  File "/var/lib/python-support/python2.6/MySQLdb/connections.py", line 35, in defaulterrorhandler
    raise errorclass, errorvalue
_mysql_exceptions.OperationalError: (1051, "Unknown table 'xUSERS__'")

...

gzip: stdin: unexpected end of file
tar: Child returned status 1
tar: Error exit delayed from previous errors
[Tue Jun 29 17:39:47 +0000 2010] saw a cron request with args [128.111.55.237][][guestbook]
cron: lang was neither python nor java but was []
cron: lang was neither python nor java but was []
/root/appscale/AppDB/mysql/prime_mysql.py:55: Warning: Table 'xAPPS__' already exists

Will try running MySQL again.

Revision history for this message
Chris Bunch (cgb-cs) wrote :

MySQL works fine for me on a single node, but fails on the four node case. Tried both -n 1 and -n 2, but both fail.

Revision history for this message
Chris Bunch (cgb-cs) wrote :

Accepted. MySQL on >1 nodes needs to be fixed, and would like to not have the load balancer use SSL for all communication (this way if all the user cares about is app redirection then they don't see the SSL cert problem).

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'AppController/djinn.rb'
--- AppController/djinn.rb 2010-06-25 22:28:38 +0000
+++ AppController/djinn.rb 2010-06-29 01:49:23 +0000
@@ -101,6 +101,7 @@
101 my_data = my_node101 my_data = my_node
102 if has_soap_server?(my_data)102 if has_soap_server?(my_data)
103 stop_soap_server103 stop_soap_server
104 stop_pbserver
104 end105 end
105106
106 jobs_to_run = my_data.jobs107 jobs_to_run = my_data.jobs
@@ -507,7 +508,8 @@
507 def self.get_nearest_db_ip(is_mysql=false)508 def self.get_nearest_db_ip(is_mysql=false)
508 db_ips = self.get_db_slave_ips509 db_ips = self.get_db_slave_ips
509 # Unless this is mysql we include the master ip510 # Unless this is mysql we include the master ip
510 db_ips << self.get_db_master_ip if !is_mysql511 # Update, now mysql also has an API node
512 db_ips << self.get_db_master_ip
511 db_ips.compact!513 db_ips.compact!
512 514
513 local_ip = HelperFunctions.local_ip515 local_ip = HelperFunctions.local_ip
@@ -810,21 +812,23 @@
810 end812 end
811 if retval != 0813 if retval != 0
812 Djinn.log_debug("Fail to create initial table. Could not startup AppScale.")814 Djinn.log_debug("Fail to create initial table. Could not startup AppScale.")
815 exit(1)
813 # TODO: terminate djinn816 # TODO: terminate djinn
814 end817 end
815 end818 end
816819
817 # start soap server and pb server820 # start soap server and pb server
818 if has_soap_server?(my_data)821 if has_soap_server?(my_data)
822 @state = "Starting up SOAP Server and PBServer"
823 start_pbserver
819 start_soap_server824 start_soap_server
825 HelperFunctions.sleep_until_port_is_open(HelperFunctions.local_ip, UA_SERVER_PORT)
820 end826 end
821827
822 # no longer need to start appengine here828 # appengine is started elsewhere
823 end829 end
824830
825 def start_soap_server831 def start_soap_server
826 @state = "Starting up SOAP Server and Appscale Server(PB server)"
827
828 db_master_ip = nil832 db_master_ip = nil
829 @nodes.each { |location|833 @nodes.each { |location|
830 db_master_ip = location.private_ip if location.is_db_master?834 db_master_ip = location.private_ip if location.is_db_master?
@@ -846,28 +850,31 @@
846 "-s #{HelperFunctions.get_secret}"]850 "-s #{HelperFunctions.get_secret}"]
847 Djinn.log_debug(cmd.join(" "))851 Djinn.log_debug(cmd.join(" "))
848 Kernel.system cmd.join(" ")852 Kernel.system cmd.join(" ")
853 end
854
855 def start_pbserver
856 db_master_ip = nil
857 my_ip = my_node.public_ip
858 @nodes.each { |location|
859 db_master_ip = location.private_ip if location.is_db_master?
860 }
861 abort("db master ip was nil") if db_master_ip.nil?
862 table = @creds['table']
849 zoo_connection = get_zk_connection_string(@nodes)863 zoo_connection = get_zk_connection_string(@nodes)
850 cmd = [ "MASTER_IP=#{db_master_ip} LOCAL_DB_IP='#{db_local_ip}'",864 PbServer.start(db_master_ip, @userappserver_private_ip, my_ip, table, zoo_connection)
851 "start-stop-daemon --start",865 HAProxy.create_pbserver_config(my_ip, PbServer.proxy_port)
852 "--exec /usr/bin/python2.6",866 Nginx.create_pbserver_config(my_ip, PbServer.proxy_port)
853 "--name appscale_server",867 Nginx.restart
854 "--make-pidfile",868 # TODO check the return value
855 "--pidfile /var/appscale/appscale-appscaleserver.pid",869 PbServer.is_running
856 "--background",
857 "--",
858 "#{APPSCALE_HOME}/AppDB/appscale_server.py",
859 "--type #{table}",
860 "-z \"#{zoo_connection}\"",
861 "-s #{HelperFunctions.get_secret}",
862 "-a #{get_uaserver_ip} --key"]
863 Djinn.log_debug(cmd.join(" "))
864 Kernel.system cmd.join(" ")
865 HelperFunctions.sleep_until_port_is_open(HelperFunctions.local_ip, UA_SERVER_PORT)
866 end870 end
867871
868 def stop_soap_server872 def stop_soap_server
869 Kernel.system "start-stop-daemon --stop --pidfile /var/appscale/appscale-soapserver.pid"873 Kernel.system "start-stop-daemon --stop --pidfile /var/appscale/appscale-soapserver.pid"
870 Kernel.system "start-stop-daemon --stop --pidfile /var/appscale/appscale-appscaleserver.pid"874 end
875
876 def stop_pbserver
877 PbServer.stop
871 end878 end
872879
873 def is_cloud?880 def is_cloud?
874881
=== modified file 'AppController/haproxy.rb'
--- AppController/haproxy.rb 2010-06-25 22:28:38 +0000
+++ AppController/haproxy.rb 2010-06-29 01:49:23 +0000
@@ -61,6 +61,11 @@
61 self.create_app_config(my_ip, listen_port, Monitoring.server_ports, Monitoring.name)61 self.create_app_config(my_ip, listen_port, Monitoring.server_ports, Monitoring.name)
62 end62 end
6363
64 # Create the config file for PBServer applications
65 def self.create_pbserver_config(my_ip, listen_port)
66 self.create_app_config(my_ip, listen_port, PbServer.server_ports, PbServer.name)
67 end
68
64 # A generic function for creating haproxy config files used by appscale services69 # A generic function for creating haproxy config files used by appscale services
65 def self.create_app_config(my_ip, listen_port, server_ports, name)70 def self.create_app_config(my_ip, listen_port, server_ports, name)
66 servers = []71 servers = []
6772
=== modified file 'AppController/helperfunctions.rb'
--- AppController/helperfunctions.rb 2010-06-01 19:38:10 +0000
+++ AppController/helperfunctions.rb 2010-06-29 01:49:23 +0000
@@ -23,7 +23,8 @@
23TIME_IN_SECONDS = { "d" => 86400, "h" => 3600, "m" => 60, "s" => 1 }23TIME_IN_SECONDS = { "d" => 86400, "h" => 3600, "m" => 60, "s" => 1 }
2424
25APP_START_PORT = 2000025APP_START_PORT = 20000
2626DEFAULT_PBSERVER_PORT = 8443
27DEFAULT_PBSERVER_NOENCRYPT_PORT = 8888
27module HelperFunctions28module HelperFunctions
28 def self.write_file(location, contents)29 def self.write_file(location, contents)
29 File.open(location, "w+") { |file| file.write(contents) }30 File.open(location, "w+") { |file| file.write(contents) }
@@ -204,15 +205,18 @@
204 Djinn.log_debug("saw a python app coming through")205 Djinn.log_debug("saw a python app coming through")
205 cmd = ["MY_IP_ADDRESS=#{public_ip}",206 cmd = ["MY_IP_ADDRESS=#{public_ip}",
206 "MY_PORT=#{port}",207 "MY_PORT=#{port}",
207 "NGINX_ADDRESS=#{public_ip}",208 #"NGINX_ADDRESS=#{public_ip}",
208 "NGINX_PORT=#{nginx_port}",209 #"NGINX_PORT=#{nginx_port}",
209 "python2.5",210 "python2.5",
210 "#{APPSCALE_HOME}/AppServer/dev_appserver.py",211 "#{APPSCALE_HOME}/AppServer/dev_appserver.py",
211 "-p #{port}",212 "-p #{port}",
212 "--cookie_secret #{secret}",213 "--cookie_secret #{secret}",
213 "--login_server #{public_ip}",214 "--login_server #{public_ip}",
214 "--admin_console_server ''",215 "--admin_console_server ''",
215 "--datastore_path #{db_location}",216 "--nginx_port #{nginx_port}",
217 "--nginx_host #{public_ip}",
218 "--enable_sendmail",
219 "--datastore_path #{db_location}:#{DEFAULT_PBSERVER_NOENCRYPT_PORT}",
216 "--history_path /var/apps/#{app_name}/data/app.datastore.history",220 "--history_path /var/apps/#{app_name}/data/app.datastore.history",
217# this is not working.221# this is not working.
218# "--address=#{public_ip}",222# "--address=#{public_ip}",
219223
=== modified file 'AppController/load_balancer.rb'
--- AppController/load_balancer.rb 2010-04-21 20:53:02 +0000
+++ AppController/load_balancer.rb 2010-06-29 01:49:23 +0000
@@ -7,6 +7,7 @@
7 PROXY_PORT = 80607 PROXY_PORT = 8060
8 # The port which requests to this app will be served from8 # The port which requests to this app will be served from
9 LISTEN_PORT = 809 LISTEN_PORT = 80
10 LISTEN_SSL_PORT = 443
1011
11 def self.start12 def self.start
12 `service appscale-loadbalancer start`13 `service appscale-loadbalancer start`
@@ -33,6 +34,10 @@
33 LISTEN_PORT34 LISTEN_PORT
34 end35 end
3536
37 def self.listen_ssl_port
38 LISTEN_SSL_PORT
39 end
40
36 def self.server_ports41 def self.server_ports
37 SERVER_PORTS42 SERVER_PORTS
38 end43 end
3944
=== modified file 'AppController/nginx.rb'
--- AppController/nginx.rb 2010-06-25 22:28:38 +0000
+++ AppController/nginx.rb 2010-06-29 01:49:23 +0000
@@ -5,7 +5,7 @@
5require 'fileutils'5require 'fileutils'
6require 'load_balancer'6require 'load_balancer'
7require 'monitoring'7require 'monitoring'
88require 'pbserver'
99
10# A class to wrap all the interactions with the nginx web server10# A class to wrap all the interactions with the nginx web server
11class Nginx11class Nginx
@@ -72,9 +72,6 @@
7272
73 #{static_locations}73 #{static_locations}
7474
75 location /_ah/admin {
76 # 404 - lock out admin routes
77 }
7875
79 location / {76 location / {
80 proxy_set_header X-Real-IP $remote_addr;77 proxy_set_header X-Real-IP $remote_addr;
@@ -117,7 +114,7 @@
117114
118 # Create the configuration file for the AppLoadBalancer Rails application115 # Create the configuration file for the AppLoadBalancer Rails application
119 def self.create_app_load_balancer_config(my_ip, proxy_port)116 def self.create_app_load_balancer_config(my_ip, proxy_port)
120 self.create_app_config(my_ip, proxy_port, LoadBalancer.listen_port, LoadBalancer.name, LoadBalancer.public_directory)117 self.create_app_config(my_ip, proxy_port, LoadBalancer.listen_port, LoadBalancer.name, LoadBalancer.public_directory, LoadBalancer.listen_ssl_port)
121 end118 end
122119
123 # Create the configuration file for the AppMonitoring Rails application120 # Create the configuration file for the AppMonitoring Rails application
@@ -125,6 +122,68 @@
125 self.create_app_config(my_ip, proxy_port, Monitoring.listen_port, Monitoring.name, Monitoring.public_directory)122 self.create_app_config(my_ip, proxy_port, Monitoring.listen_port, Monitoring.name, Monitoring.public_directory)
126 end123 end
127124
125 # Create the configuration file for the pbserver
126 def self.create_pbserver_config(my_ip, proxy_port)
127 config = <<CONFIG
128upstream #{PbServer.name} {
129 server #{my_ip}:#{proxy_port};
130}
131
132server {
133 listen #{PbServer.listen_port};
134 server_name #{my_ip};
135 root /root/appscale/AppDB/;
136 access_log /var/log/nginx/pbserver.access.log upstream;
137 error_log /var/log/nginx/pbserver.error.log;
138 rewrite_log off;
139 error_page 404 = /404.html;
140
141
142
143 location / {
144 proxy_set_header X-Real-IP $remote_addr;
145 proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
146 proxy_set_header Host $http_host;
147 proxy_redirect off;
148 proxy_pass http://#{PbServer.name};
149 }
150
151}
152
153server {
154 listen #{PbServer.listen_ssl_port};
155 ssl on;
156 ssl_certificate /etc/nginx/mycert.pem;
157 ssl_certificate_key /etc/nginx/mykey.pem;
158 root /root/appscale/AppDB/public;
159 access_log /var/log/nginx/pbencrypt.access.log upstream;
160 error_log /var/log/nginx/pbencrypt.error.log;
161 rewrite_log off;
162 error_page 502 /502.html;
163
164 location / {
165 proxy_set_header X-Real-IP $remote_addr;
166 proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
167 proxy_set_header Host $http_host;
168 proxy_redirect off;
169
170 client_body_timeout 60;
171 proxy_read_timeout 60;
172 #Increase file size so larger applications can be uploaded
173 client_max_body_size 30M;
174 # go to proxy
175 proxy_pass http://#{PbServer.name};
176 }
177}
178CONFIG
179 config_path = File.join(SITES_ENABLED_PATH, "#{PbServer.name}.#{CONFIG_EXTENSION}")
180 File.open(config_path, "w+") { |dest_file| dest_file.write(config) }
181
182 HAProxy.regenerate_config
183
184 end
185
186
128 # A generic function for creating nginx config files used by appscale services187 # A generic function for creating nginx config files used by appscale services
129 def self.create_app_config(my_ip, proxy_port, listen_port, name, public_dir, ssl_port=nil)188 def self.create_app_config(my_ip, proxy_port, listen_port, name, public_dir, ssl_port=nil)
130189
@@ -132,9 +191,30 @@
132upstream #{name} {191upstream #{name} {
133 server #{my_ip}:#{proxy_port};192 server #{my_ip}:#{proxy_port};
134}193}
135194CONFIG
136server {195
137 listen #{listen_port};196 if ssl_port
197 # redirect all request to ssl port.
198 config += <<CONFIG
199server {
200 listen #{listen_port};
201 rewrite ^(.*) https://#{my_ip}:#{ssl_port}$1 permanent;
202}
203
204server {
205 listen #{ssl_port};
206 ssl on;
207 ssl_certificate #{NGINX_PATH}/mycert.pem;
208 ssl_certificate_key #{NGINX_PATH}/mykey.pem;
209CONFIG
210 else
211 config += <<CONFIG
212server {
213 listen #{listen_port};
214CONFIG
215 end
216
217 config += <<CONFIG
138 root #{public_dir};218 root #{public_dir};
139 access_log /var/log/nginx/load-balancer.access.log upstream;219 access_log /var/log/nginx/load-balancer.access.log upstream;
140 error_log /var/log/nginx/load-balancer.error.log;220 error_log /var/log/nginx/load-balancer.error.log;
@@ -165,7 +245,6 @@
165 root #{APPSCALE_HOME}/AppLoadBalancer/public;245 root #{APPSCALE_HOME}/AppLoadBalancer/public;
166 }246 }
167}247}
168
169CONFIG248CONFIG
170249
171 config_path = File.join(SITES_ENABLED_PATH, "#{name}.#{CONFIG_EXTENSION}")250 config_path = File.join(SITES_ENABLED_PATH, "#{name}.#{CONFIG_EXTENSION}")
@@ -209,9 +288,6 @@
209288
210 gzip on;289 gzip on;
211290
212 ssl_certificate #{NGINX_PATH}/cert.pem;
213 ssl_certificate_key #{NGINX_PATH}/cert.key;
214
215 include #{NGINX_PATH}/sites-enabled/*;291 include #{NGINX_PATH}/sites-enabled/*;
216}292}
217CONFIG293CONFIG
@@ -223,9 +299,11 @@
223 end299 end
224300
225 # copy over certs for ssl301 # copy over certs for ssl
226 `cp #{APPSCALE_HOME}/.appscale/certs/cert.pem #{NGINX_PATH}`302 # just copy files once to keep certificate as static.
227 `cp #{APPSCALE_HOME}/.appscale/certs/cert.key #{NGINX_PATH}`303 #`test ! -e #{NGINX_PATH}/mycert.pem && cp #{APPSCALE_HOME}/.appscale/certs/mycert.pem #{NGINX_PATH}`
228 304 #`test ! -e #{NGINX_PATH}/mykey.pem && cp #{APPSCALE_HOME}/.appscale/certs/mykey.pem #{NGINX_PATH}`
305 `cp #{APPSCALE_HOME}/.appscale/certs/mykey.pem #{NGINX_PATH}`
306 `cp #{APPSCALE_HOME}/.appscale/certs/mycert.pem #{NGINX_PATH}`
229 # Write the main configuration file which sets default configuration parameters307 # Write the main configuration file which sets default configuration parameters
230 File.open(MAIN_CONFIG_FILE, "w+") { |dest_file| dest_file.write(config) }308 File.open(MAIN_CONFIG_FILE, "w+") { |dest_file| dest_file.write(config) }
231 end309 end
232310
=== added file 'AppController/pbserver.rb'
--- AppController/pbserver.rb 1970-01-01 00:00:00 +0000
+++ AppController/pbserver.rb 2010-06-29 01:49:23 +0000
@@ -0,0 +1,106 @@
1#!/usr/bin/ruby -w
2require 'helperfunctions'
3
4# A class to wrap all the interactions with the PbServer
5class PbServer
6 APPSCALE_HOME=ENV['APPSCALE_HOME']
7 # If using just native transactions, then use one server port
8 SERVER_PORTS = [4000, 4001, 4002]
9 # The port which nginx will use to send requests to haproxy
10 PROXY_PORT = 3999
11 # The port which requests to this app will be served from
12 LISTEN_PORT = 8888
13 LISTEN_SSL_PORT = 8443
14 # The following databases cannot have multiple pbservers
15 SINGLE_SERVER_TABLE = ["mysql"]
16 # The following databases have native transaction support
17 NATIVE_TRANSACTIONS = ["mysql"]
18 @@pb_master_ip = ""
19 @@pb_ip = ""
20 @@pb_table = ""
21 @@pb_zklocations = ""
22
23 def self.start(master_ip, db_local_ip, my_ip, table, zklocations)
24 @@pb_master_ip = master_ip
25 @@pb_ip = my_ip
26 @@pb_table = table
27 @@pb_zklocations = zklocations
28 ports = SERVER_PORTS
29
30 pbserver = self.pb_script
31 ports = self.server_ports
32 ports.each { |pbserver_port|
33 cmd = [ "MASTER_IP=#{@@pb_master_ip} LOCAL_DB_IP='#{db_local_ip}'",
34 "start-stop-daemon --start",
35 "--exec /usr/bin/python2.6",
36 "--name appscale_server",
37 "--make-pidfile",
38 "--pidfile /var/appscale/appscale-appscaleserver-#{pbserver_port}.pid",
39 "--background",
40 "--",
41 "#{pbserver}",
42 "-p #{pbserver_port}",
43 "--no_encryption",
44 "--type #{@@pb_table}",
45 "-z \"#{@@pb_zklocations}\"",
46 "-s #{HelperFunctions.get_secret}",
47 "-a #{@@pb_ip} --key"]
48 Djinn.log_debug(cmd.join(" "))
49 Kernel.system cmd.join(" ")
50 }
51 end
52
53 def self.stop
54 ports = self.server_ports
55 ports.each { |pbserver_port|
56 Kernel.system "start-stop-daemon --stop --pidfile /var/appscale/appscale-appscaleserver-#{pbserver_port}.pid"
57 }
58 end
59
60 def self.restart
61 self.stop
62 # Use cached variables
63 self.start(@@pb_master_ip, @@pb_ip, @@pb_table, @@pb_zklocations)
64 end
65
66 def self.name
67 "as_pbserver"
68 end
69
70 def self.public_directory
71 "/root/appscale/AppDB/public"
72 end
73
74 def self.listen_port
75 LISTEN_PORT
76 end
77
78 def self.listen_ssl_port
79 LISTEN_SSL_PORT
80 end
81
82 def self.server_ports
83 if SINGLE_SERVER_TABLE.include?(@@pb_table)
84 return SERVER_PORTS.first(1)
85 else
86 return SERVER_PORTS
87 end
88 end
89
90 def self.proxy_port
91 PROXY_PORT
92 end
93
94 def self.is_running
95 `curl http://#{@@pb_ip}:#{PROXY_PORT}`
96 end
97
98 def self.pb_script
99 if NATIVE_TRANSACTIONS.include?(@@pb_table)
100 return "#{APPSCALE_HOME}/AppDB/appscale_server_native_trans.py"
101 else
102 return "#{APPSCALE_HOME}/AppDB/appscale_server.py"
103 end
104 end
105
106end
0107
=== modified file 'AppController/terminate.rb'
--- AppController/terminate.rb 2010-06-21 20:39:53 +0000
+++ AppController/terminate.rb 2010-06-29 01:49:23 +0000
@@ -28,6 +28,7 @@
28Collectd.stop28Collectd.stop
29LoadBalancer.stop29LoadBalancer.stop
30Monitoring.stop30Monitoring.stop
31PbServer.stop
31`/etc/init.d/klogd stop`32`/etc/init.d/klogd stop`
3233
33`rm -rf /var/apps/`34`rm -rf /var/apps/`
3435
=== modified file 'AppDB/appscale_server.py'
--- AppDB/appscale_server.py 2010-05-11 20:49:42 +0000
+++ AppDB/appscale_server.py 2010-06-29 01:49:23 +0000
@@ -5,7 +5,9 @@
5# Soo Hwan Park (suwanny@gmail.com)5# Soo Hwan Park (suwanny@gmail.com)
6# Sydney Pang (pang@cs.ucsb.edu)6# Sydney Pang (pang@cs.ucsb.edu)
7# See LICENSE file7# See LICENSE file
88import tornado.httpserver
9import tornado.ioloop
10import tornado.web
9import sys11import sys
10import socket12import socket
11import os 13import os
@@ -18,10 +20,7 @@
18import md5 20import md5
19import random21import random
20import getopt22import getopt
21from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
22from SocketServer import ThreadingMixIn
23import threading23import threading
24
25from google.appengine.api import api_base_pb24from google.appengine.api import api_base_pb
26from google.appengine.api import datastore25from google.appengine.api import datastore
27from google.appengine.api import datastore_errors26from google.appengine.api import datastore_errors
@@ -37,8 +36,7 @@
37from M2Crypto import SSL36from M2Crypto import SSL
38from drop_privileges import *37from drop_privileges import *
39from zkappscale import zktransaction38from zkappscale import zktransaction
40import as_transaction39zk = zktransaction
41
42import time40import time
4341
44DEBUG = False 42DEBUG = False
@@ -48,7 +46,7 @@
48DEFAULT_APP_LOCATION = ".flatfile_apps"46DEFAULT_APP_LOCATION = ".flatfile_apps"
49HYPERTABLE_XML_TAG = "Name"47HYPERTABLE_XML_TAG = "Name"
50DEFAULT_DATASTORE = "files"48DEFAULT_DATASTORE = "files"
51DEFAULT_SSL_PORT = 44349DEFAULT_SSL_PORT = 8443
52DEFAULT_PORT = 408050DEFAULT_PORT = 4080
53DEFAULT_ENCRYPTION = 151DEFAULT_ENCRYPTION = 1
54CERT_LOCATION = "/etc/appscale/certs/mycert.pem"52CERT_LOCATION = "/etc/appscale/certs/mycert.pem"
@@ -68,7 +66,7 @@
68KEYBLOCKSIZE = "50"66KEYBLOCKSIZE = "50"
69keyDictionaryLock = None67keyDictionaryLock = None
70keyDictionary = {}68keyDictionary = {}
7169MAX_DICT_SIZE = 1000000
72optimizedQuery = False70optimizedQuery = False
73ID_KEY_LENGTH = 6471ID_KEY_LENGTH = 64
74tableHashTable = {}72tableHashTable = {}
@@ -78,7 +76,6 @@
78ssl_cert_file = ""76ssl_cert_file = ""
79ssl_key_file = ""77ssl_key_file = ""
8078
81_trans_set = as_transaction.ASTransSet()
82zoo_keeper = ""79zoo_keeper = ""
83zoo_keeper_locations = "localhost:2181"80zoo_keeper_locations = "localhost:2181"
8481
@@ -93,7 +90,6 @@
93for the entity table90for the entity table
94"""91"""
9592
96
97class ThreadLogger:93class ThreadLogger:
98 def __init__(self, log):94 def __init__(self, log):
99 self.logger_ = log95 self.logger_ = log
@@ -109,6 +105,24 @@
109logger = appscale_logger.getLogger("pb_server")105logger = appscale_logger.getLogger("pb_server")
110106
111107
108class putThread(threading.Thread):
109 def setup(self, db, table, key, fields, values):
110 self.db = db
111 self.table = table
112 self.key = key
113 self.fields = fields
114 self.values = values
115 self.err = None
116 self.ret = None
117 self.timeTaken = 0
118 def run(self):
119 s = time.time()
120 self.err, self.ret = self.db.put_entity(self.table, self.key,
121 self.fields, self.values)
122 self.timeTaken = time.time() - s
123
124
125
112def getTableName(app_id, kind, version):126def getTableName(app_id, kind, version):
113 return app_id + "___" + kind + "___" + version127 return app_id + "___" + kind + "___" + version
114128
@@ -240,7 +254,7 @@
240 else:254 else:
241 keyStart, blockSize = zoo_keeper.generateIDBlock(app_id, root)255 keyStart, blockSize = zoo_keeper.generateIDBlock(app_id, root)
242 keyStart = long(keyStart)256 keyStart = long(keyStart)
243 except e:257 except Exception, e:
244 print "="*60258 print "="*60
245 print "Exception:",str(e)259 print "Exception:",str(e)
246 print "="*60260 print "="*60
@@ -250,6 +264,12 @@
250 key = keyStart264 key = keyStart
251 keyStart = keyStart + 1265 keyStart = keyStart + 1
252 keyDictionary[index] = keyStart, keyEnd266 keyDictionary[index] = keyStart, keyEnd
267
268 # To prevent the dictionary from getting to large
269 # and taking up too much memory
270 if len(keyDictionary) > MAX_DICT_SIZE:
271 keyDictionary = {} # reset
272
253 keyDictionaryLock.release()273 keyDictionaryLock.release()
254 return key274 return key
255275
@@ -258,30 +278,18 @@
258 pass278 pass
259279
260280
261class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):281
262 """ Handle requests in a new thread """282class MainHandler(tornado.web.RequestHandler):
263
264
265class AppScaleSecureHandler( BaseHTTPRequestHandler ):
266 """283 """
267 Defines what to do when the webserver receives different types of 284 Defines what to do when the webserver receives different types of
268 HTTP requests.285 HTTP requests.
269 """286 """
270 def get_http_err_code( self, message ):287 ########################
271 print message288 # GET Request Handling #
272 ret = 599 # 599 is the default code for an unknown error #289 ########################
273 return ret290 def get(self):
274291 self.write("Hi")
275 def send_post_http_response( self , http_code , message ):292
276 self.send_response( http_code )
277 self.send_header( 'Content-type' , 'text/plain' )
278 self.end_headers()
279 self.wfile.write( message )
280
281 def send_http_err_response( self, err_msg ):
282 ret_http_code = self.get_http_err_code( err_msg )
283 self.send_post_http_response( ret_http_code, err_msg )
284
285 # remote api request293 # remote api request
286 # sends back a response 294 # sends back a response
287 def remote_request(self, app_id, appscale_version, http_request_data):295 def remote_request(self, app_id, appscale_version, http_request_data):
@@ -304,8 +312,6 @@
304 method = apirequest.method()312 method = apirequest.method()
305 request_data = apirequest.request()313 request_data = apirequest.request()
306 http_request_data = request_data.contents()314 http_request_data = request_data.contents()
307
308 print "REQUEST:",method," AT time",time.time()
309 if method == "Put":315 if method == "Put":
310 response, errcode, errdetail = self.put_request(app_id, 316 response, errcode, errdetail = self.put_request(app_id,
311 appscale_version, 317 appscale_version,
@@ -393,8 +399,7 @@
393 print "REPLY",method," AT TIME",time.time()399 print "REPLY",method," AT TIME",time.time()
394 print "errcode:",errcode400 print "errcode:",errcode
395 print "errdetail:",errdetail401 print "errdetail:",errdetail
396 self.send_post_http_response( 200 , apiresponse.Encode() ) 402 self.write( apiresponse.Encode() )
397
398403
399 def run_query(self, app_id, appscale_version, http_request_data):404 def run_query(self, app_id, appscale_version, http_request_data):
400 global app_datastore405 global app_datastore
@@ -404,14 +409,7 @@
404 if query.has_transaction():409 if query.has_transaction():
405 txn = query.transaction()410 txn = query.transaction()
406411
407 if not _trans_set.isValid(txn):
408 _trans_set.purge(txn)
409 return (api_base_pb.VoidProto().Encode(),
410 datastore_pb.Error.BAD_REQUEST,
411 'Transaction timed out.')
412
413 if not query.has_ancestor():412 if not query.has_ancestor():
414 _trans_set.purge(txn)
415 return (api_base_pb.VoidProto().Encode(),413 return (api_base_pb.VoidProto().Encode(),
416 datastore_pb.Error.BAD_REQUEST,414 datastore_pb.Error.BAD_REQUEST,
417 'Only ancestor queries are allowed inside transactions.')415 'Only ancestor queries are allowed inside transactions.')
@@ -424,22 +422,12 @@
424 datastore_pb.Error.BAD_REQUEST,422 datastore_pb.Error.BAD_REQUEST,
425 'No group entity or root key.')423 'No group entity or root key.')
426 424
427 if _trans_set.needsLock(txn):425 try:
428 gotLock= zoo_keeper.acquireLock( app_id, txn.handle(), root_key)426 gotLock= zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
429 if gotLock:427 except zk.ZKTransactionException, zkex:
430 _trans_set.setGroup(txn, root_key)
431 else:
432 _trans_set.purge(txn)
433 return (api_base_pb.VoidProto().Encode(),428 return (api_base_pb.VoidProto().Encode(),
434 datastore_pb.Error.CONCURRENT_TRANSACTION,429 datastore_pb.Error.CONCURRENT_TRANSACTION,
435 'Another transaction is running.')430 'Another transaction is running. %s'%zkex.message)
436 else:
437 if _trans_set.hasLockExpired(txn):
438 #zoo_keeper.notifyOfExpiredLock(app_id, root_key)
439 _trans_set.purge(txn)
440 return (api_base_pb.VoidProto().Encode(),
441 datastore_pb.Error.TIMEOUT,
442 'The transaction lease has expired.')
443 431
444 if not query.has_kind():432 if not query.has_kind():
445 # Return nothing in case of error #433 # Return nothing in case of error #
@@ -448,20 +436,15 @@
448 "Kindless queries are not implemented.")436 "Kindless queries are not implemented.")
449 else:437 else:
450 kind = query.kind()438 kind = query.kind()
451 #print "Query kind:",kind
452
453 # Verify validity of the entity name and applicaiton id #
454 # according to the naming sheme for entity tables #
455 #assert kind[-2:] != "__"
456 #assert app_id[-1] != "_"
457
458 # Fetch query from the datastore # 439 # Fetch query from the datastore #
459 table_name = getTableName(app_id, kind, appscale_version)440 table_name = getTableName(app_id, kind, appscale_version)
460 #print "Query using table name: %s, %s" % (table_name, ENTITY_TABLE_SCHEMA)
461 r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA)441 r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA)
462 #print "result: %s" % str(r)442 err = r[0]
443 if err not in ERROR_CODES:
444 return (api_base_pb.VoidProto().Encode(),
445 datastore_pb.Error.INTERNAL_ERROR,
446 "Error running query--." + err)
463447
464 #err = r[0]
465 if len(r) > 1:448 if len(r) > 1:
466 results = r[1:]449 results = r[1:]
467 else:450 else:
@@ -471,8 +454,6 @@
471 versions = results[1::2]454 versions = results[1::2]
472 # evens are encoded entities455 # evens are encoded entities
473 results = results[0::2] 456 results = results[0::2]
474 #print "RESULTS:",results
475 #print "VERSIONS:",versions
476 if len(versions) != len(results):457 if len(versions) != len(results):
477 return(api_base_pb.VoidProto().Encode(),458 return(api_base_pb.VoidProto().Encode(),
478 datastore_pb.Error.INTERNAL_ERROR,459 datastore_pb.Error.INTERNAL_ERROR,
@@ -498,7 +479,6 @@
498 row_key = getRowKeyFromDeletedEncoding(encoded_ent)479 row_key = getRowKeyFromDeletedEncoding(encoded_ent)
499 else:480 else:
500 row_key = getRowKeyFromKeyType(app_id, encoded_ent.key()) 481 row_key = getRowKeyFromKeyType(app_id, encoded_ent.key())
501 #print "constructed row key:",row_key
502 root_key = self.getRootKeyFromEntity(app_id, encoded_ent)482 root_key = self.getRootKeyFromEntity(app_id, encoded_ent)
503483
504 #TODO make sure all deleted keys use the same encoding484 #TODO make sure all deleted keys use the same encoding
@@ -509,8 +489,6 @@
509 journal_result = ["DB_ERROR:",DELETED]489 journal_result = ["DB_ERROR:",DELETED]
510 elif prev_version != long(ii):490 elif prev_version != long(ii):
511 # if the versions don't match, a valid version must be fetched491 # if the versions don't match, a valid version must be fetched
512 print "previous version from zk:",prev_version
513 print "prev version:",str(ii)
514 journal_key = getJournalKey(row_key, prev_version)492 journal_key = getJournalKey(row_key, prev_version)
515 journal_table = getJournalTable(app_id, appscale_version)493 journal_table = getJournalTable(app_id, appscale_version)
516 journal_result = app_datastore.get_entity( journal_table, 494 journal_result = app_datastore.get_entity( journal_table,
@@ -638,59 +616,24 @@
638 transaction_pb = datastore_pb.Transaction()616 transaction_pb = datastore_pb.Transaction()
639 # handle = zk.getTransactionID(app_id)617 # handle = zk.getTransactionID(app_id)
640 handle = zoo_keeper.getTransactionID(app_id)618 handle = zoo_keeper.getTransactionID(app_id)
641 print "Begin Trans Handle:",handle
642 transaction_pb.set_handle(handle)619 transaction_pb.set_handle(handle)
643 _trans_set.add(transaction_pb)
644 return (transaction_pb.Encode(), 0, "")620 return (transaction_pb.Encode(), 0, "")
645621
646 def commit_transaction_request(self, app_id, appscale_version, http_request_data):622 def commit_transaction_request(self, app_id, appscale_version, http_request_data):
647 txn = datastore_pb.Transaction(http_request_data)623 txn = datastore_pb.Transaction(http_request_data)
648 commitres_pb = datastore_pb.CommitResponse()624 commitres_pb = datastore_pb.CommitResponse()
649 if not _trans_set.isValid(txn):
650 _trans_set.purge(txn)
651 return (commitres_pb.Encode(),
652 datastore_pb.Error.BAD_REQUEST,
653 'Transaction timed out.')
654625
655 if _trans_set.needsLock(txn):
656 _trans_set.purge(txn)
657 return (commitres_pb.Encode(),
658 datastore_pb.Error.BAD_REQUEST,
659 'Commiting without owning the group lock.')
660
661 # if zk.releaseLock(app_id, txn.handle()):
662 #try:
663 if zoo_keeper.releaseLock(app_id, txn.handle()):626 if zoo_keeper.releaseLock(app_id, txn.handle()):
664 _trans_set.purge(txn)
665 return (commitres_pb.Encode(), 0, "")627 return (commitres_pb.Encode(), 0, "")
666 else:628 else:
667 # ZK client must now deal with lock629 # ZK client must now deal with lock
668 return (commitres_pb.Encode(), 630 return (commitres_pb.Encode(),
669 datastore_pb.Error.INTERNAL_ERROR,631 datastore_pb.Error.INTERNAL_ERROR,
670 "Unable to release lock")632 "Unable to release lock")
671 #except :
672 # print "exception:",str(ii)
673 # return (commitres_pb.Encode(),
674 # datastore_pb.Error.INTERNAL_ERROR,
675 # "Releasing a lock with the wrong group")
676
677633
678 def rollback_transaction_request(self, app_id, appscale_version, http_request_data):634 def rollback_transaction_request(self, app_id, appscale_version, http_request_data):
679 txn = datastore_pb.Transaction(http_request_data)635 txn = datastore_pb.Transaction(http_request_data)
680 zoo_keeper.notifyFailedTransaction(app_id, txn.handle())636 zoo_keeper.notifyFailedTransaction(app_id, txn.handle())
681 print "ROLLBACK for transaction:",txn.handle()
682 if not _trans_set.isValid(txn):
683 _trans_set.purge(txn)
684 return (api_base_pb.VoidProto().Encode(),
685 datastore_pb.Error.BAD_REQUEST,
686 'Transaction timed out.')
687 if _trans_set.needsLock(txn):
688 _trans_set.purge(txn)
689 return (api_base_pb.VoidProto().Encode(),
690 datastore_pb.Error.BAD_REQUEST,
691 'Rolling back without owning the group lock.')
692
693 _trans_set.purge(txn)
694 return (api_base_pb.VoidProto().Encode(), 0, "")637 return (api_base_pb.VoidProto().Encode(), 0, "")
695638
696639
@@ -766,7 +709,6 @@
766 if not request.has_transaction():709 if not request.has_transaction():
767 rollback_req = datastore_pb.Transaction()710 rollback_req = datastore_pb.Transaction()
768 rollback_req.set_handle(internal_txn)711 rollback_req.set_handle(internal_txn)
769 #print "ROLLING BACK FOR %s" % internal_txn
770 self.rollback_transaction_request(app_id, 712 self.rollback_transaction_request(app_id,
771 "version", 713 "version",
772 rollback_req.Encode())714 rollback_req.Encode())
@@ -806,12 +748,8 @@
806 # Only dealing with root puts748 # Only dealing with root puts
807 if e.key().path().element_size() == 1:749 if e.key().path().element_size() == 1:
808 root_path = e.key().path().mutable_element(0)750 root_path = e.key().path().mutable_element(0)
809 #print "has id:",root_path.has_id(), "has name:",root_path.has_name()
810 if root_path.id() == 0 and not root_path.has_name():751 if root_path.id() == 0 and not root_path.has_name():
811 # zk.generateIDBlock(self, app_id, entity_key = GLOBAL_ID_KEY):
812 #new_key = root_key + "/" + last_path.type()
813 uid = generate_unique_id(app_id, None, None)752 uid = generate_unique_id(app_id, None, None)
814 print "Assigned uid to new root key:",str(uid)
815 if uid <= 0:753 if uid <= 0:
816 return (putresp_pb.Encode(), 754 return (putresp_pb.Encode(),
817 datastore_pb.Error.INTERNAL_ERROR,755 datastore_pb.Error.INTERNAL_ERROR,
@@ -819,36 +757,22 @@
819 root_path.set_id(uid)757 root_path.set_id(uid)
820758
821 if putreq_pb.has_transaction():759 if putreq_pb.has_transaction():
822 #print "This put has transaction"
823 txn = putreq_pb.transaction()760 txn = putreq_pb.transaction()
824761
825 if not _trans_set.isValid(txn):
826 #_trans_set.purge(txn)
827 return (putresp_pb.Encode(),
828 datastore_pb.Error.BAD_REQUEST,
829 'Transaction timed out.')
830 762
831 root_key, errcode, errdetail = self.getRootKeyFromTransPut(app_id, putreq_pb)763 root_key, errcode, errdetail = self.getRootKeyFromTransPut(app_id, putreq_pb)
832 if errcode != 0:764 if errcode != 0:
833 #_trans_set.purge(txn)
834 return (putresp_pb.Encode(), 765 return (putresp_pb.Encode(),
835 errcode,766 errcode,
836 errdetail)767 errdetail)
837 #print "Root key: ",root_key 768 try:
838 if _trans_set.needsLock(txn):
839 # zk.acquireLock(app_id, txn.handle())
840 gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)769 gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
841 if gotLock:770 except zk.ZKTransactionException, zkex:
842 _trans_set.setGroup(txn, root_key) 771 return (putresp_pb.Encode(),
843 else:772 datastore_pb.Error.CONCURRENT_TRANSACTION,
844 #_trans_set.purge(txn)773 'Error trying to acquire lock. %s'%zkex.message)
845 return (putresp_pb.Encode(),
846 datastore_pb.Error.CONCURRENT_TRANSACTION,
847 'Another transaction is running.')
848774
849 # Gather data from Put Request #775 # Gather data from Put Request #
850 #print "Entity list for put:"
851 #print putreq_pb.entity_list()
852 for e in putreq_pb.entity_list():776 for e in putreq_pb.entity_list():
853777
854 for prop in e.property_list() + e.raw_property_list():778 for prop in e.property_list() + e.raw_property_list():
@@ -904,7 +828,6 @@
904 # All writes are transactional and per entity if 828 # All writes are transactional and per entity if
905 # not already wrapped in a transaction829 # not already wrapped in a transaction
906 if not putreq_pb.has_transaction():830 if not putreq_pb.has_transaction():
907 #print "This is not a composite transaction"
908 txn, err, errcode = self.begin_transaction_request(app_id,831 txn, err, errcode = self.begin_transaction_request(app_id,
909 appscale_version,832 appscale_version,
910 http_request_data)833 http_request_data)
@@ -912,33 +835,24 @@
912 txn = datastore_pb.Transaction(txn)835 txn = datastore_pb.Transaction(txn)
913836
914 if err != 0:837 if err != 0:
915 _trans_set.purge(txn)
916 return (putresp_pb.Encode(), err, errcode)838 return (putresp_pb.Encode(), err, errcode)
917 root_key = getRootKey(app_id, e.key().path().element_list())839 root_key = getRootKey(app_id, e.key().path().element_list())
918 # TODO what if this is a new key and does not have a uid
919 if root_key == None:840 if root_key == None:
920 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())841 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
921 return (putresp_pb.Encode(),842 return (putresp_pb.Encode(),
922 datastore_pb.Error.BAD_REQUEST,843 datastore_pb.Error.BAD_REQUEST,
923 'No group entity or root key.')844 'No group entity or root key.')
924 845 try:
925 #print "Root key: ",root_key
926 if _trans_set.needsLock(txn):
927 #print "needs lock"
928 gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)846 gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
929 if gotLock:847 except zk.ZKTransactionException, zkex:
930 _trans_set.setGroup(txn, root_key) 848 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
931 else:849 return (putresp_pb.Encode(),
932 #_trans_set.purge(txn)850 datastore_pb.Error.CONCURRENT_TRANSACTION,
933 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())851 'Another transaction is running %s.'%zkex.message)
934 return (putresp_pb.Encode(),
935 datastore_pb.Error.CONCURRENT_TRANSACTION,
936 'Another transaction is running.')
937 #######################################852 #######################################
938 # Done with key assignment853 # Done with key assignment
939 # Notify Soap Server of any new tables854 # Notify Soap Server of any new tables
940 #######################################855 #######################################
941 #print "Putting of type:",kind,"with uid of",str(uid)
942 # insert key 856 # insert key
943 table_name = getTableName(app_id, kind, appscale_version)857 table_name = getTableName(app_id, kind, appscale_version)
944 #print "Put Using table name:",table_name858 #print "Put Using table name:",table_name
@@ -991,59 +905,42 @@
991 row_key)905 row_key)
992 906
993 try:907 try:
994 print "Using handle for put:",txn.handle()
995 zoo_keeper.registUpdatedKey(app_id, txn.handle(), prev_version, row_key)908 zoo_keeper.registUpdatedKey(app_id, txn.handle(), prev_version, row_key)
996 except e:909 except e:
997 print "Exception thrown by register update key",str(e)
998 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())910 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
999 return (putresp_pb.Encode(),911 return (putresp_pb.Encode(),
1000 datastore_pb.Error.INTERNAL_ERROR,912 datastore_pb.Error.INTERNAL_ERROR,
1001 "Timeout: Unable to update ZooKeeper on change set for transaction")913 "Timeout: Unable to update ZooKeeper on change set for transaction")
1002914 journalPut = putThread()
1003 journal_key = getJournalKey(row_key, txn.handle())915 journal_key = getJournalKey(row_key, txn.handle())
1004
1005 field_name_list = JOURNAL_SCHEMA
1006 field_value_list = [e.Encode()]
1007 journal_table = getJournalTable(app_id, appscale_version)916 journal_table = getJournalTable(app_id, appscale_version)
1008 print "Putting to ",journal_table,"key:",journal_key917 journalPut.setup(app_datastore,
1009 err, res = app_datastore.put_entity( journal_table,918 journal_table,
1010 journal_key,919 journal_key,
1011 field_name_list,920 JOURNAL_SCHEMA,
1012 field_value_list )921 [e.Encode()])
1013922 entPut = putThread()
1014 if err not in ERROR_CODES:923 entPut.setup(app_datastore,
1015 #_trans_set.purge(txn)924 table_name,
1016 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())925 row_key,
1017 return (putresp_pb.Encode(),926 ENTITY_TABLE_SCHEMA,
1018 datastore_pb.Error.INTERNAL_ERROR,927 [e.Encode(), str(txn.handle())])
1019 err + ", Unable to write to journal")928 journalPut.start()
929 entPut.start()
930 journalPut.join()
931 entPut.join()
932 if journalPut.err not in ERROR_CODES:
933 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
934 return (putresp_pb.Encode(),
935 datastore_pb.Error.INTERNAL_ERROR,
936 journalPut.err + ", Unable to write to journal")
937 if entPut.err not in ERROR_CODES:
938 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
939 return (putresp_pb.Encode(),
940 datastore_pb.Error.INTERNAL_ERROR,
941 entPut.err + ", Unable to write to journal")
1020 942
1021 field_name_list = ENTITY_TABLE_SCHEMA943
1022 field_value_list = [e.Encode(), str(txn.handle())]
1023 #print "put entity field name: %s, field value: %s" % (str(field_name_list), str(field_value_list))
1024 err, res = app_datastore.put_entity( table_name,
1025 row_key,
1026 field_name_list,
1027 field_value_list)
1028
1029 if err not in ERROR_CODES:
1030 #_trans_set.purge(txn)
1031 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
1032 return (putresp_pb.Encode(),
1033 datastore_pb.Error.INTERNAL_ERROR,
1034 err)
1035
1036 # RANDOM ERRORS
1037 """
1038 rand = random.random()
1039 rand = int(rand * 10000)
1040 if rand % 100 == 0:
1041 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
1042 print "RANDOM ERROR!!!"
1043 return(putresp_pb.Encode(),
1044 datastore_pb.Error.BAD_REQUEST,
1045 'Random Error.')
1046 """
1047 if not putreq_pb.has_transaction():944 if not putreq_pb.has_transaction():
1048 com_res, errcode, errdetail = self.commit_transaction_request(app_id, 945 com_res, errcode, errdetail = self.commit_transaction_request(app_id,
1049 appscale_version, 946 appscale_version,
@@ -1068,28 +965,18 @@
1068 if getreq_pb.has_transaction():965 if getreq_pb.has_transaction():
1069 txn = getreq_pb.transaction()966 txn = getreq_pb.transaction()
1070967
1071 if not _trans_set.isValid(txn):
1072 _trans_set.purge(txn)
1073 return (getresp_pb.Encode(),
1074 datastore_pb.Error.BAD_REQUEST,
1075 'Transaction timed out.')
1076
1077 root_key, errcode, errdetail = self.getRootKeyFromTransGet(app_id,getreq_pb)968 root_key, errcode, errdetail = self.getRootKeyFromTransGet(app_id,getreq_pb)
1078 if errcode != 0:969 if errcode != 0:
1079 _trans_set.purge(txn)
1080 return (getresp_pb.Encode(), 970 return (getresp_pb.Encode(),
1081 errcode,971 errcode,
1082 errdetail)972 errdetail)
1083 973
1084 if _trans_set.needsLock(txn):974 try:
1085 gotLock= zoo_keeper.acquireLock( app_id, txn.handle(), root_key)975 gotLock= zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
1086 if gotLock:976 except zk.ZKTransactionException, zkex:
1087 _trans_set.setGroup(txn, root_key) 977 return (getresp_pb.Encode(),
1088 else:978 datastore_pb.Error.CONCURRENT_TRANSACTION,
1089 _trans_set.purge(txn)979 'Another transaction is running. %s'%zkex.message)
1090 return (getresp_pb.Encode(),
1091 datastore_pb.Error.CONCURRENT_TRANSACTION,
1092 'Another transaction is running.')
1093 980
1094 for key in getreq_pb.key_list():981 for key in getreq_pb.key_list():
1095 key.set_app(app_id)982 key.set_app(app_id)
@@ -1106,7 +993,6 @@
1106 logger.debug("get: %s___%s___%s %s" % (app_id, kind, appscale_version, str(entity_id)))993 logger.debug("get: %s___%s___%s %s" % (app_id, kind, appscale_version, str(entity_id)))
1107 table_name = getTableName(app_id, kind, appscale_version)994 table_name = getTableName(app_id, kind, appscale_version)
1108 row_key = getRowKey(app_id,key.path().element_list())995 row_key = getRowKey(app_id,key.path().element_list())
1109 print "Get row key:",row_key
1110 r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA )996 r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA )
1111 err = r[0]997 err = r[0]
1112 if err not in ERROR_CODES or len(r) != 3: 998 if err not in ERROR_CODES or len(r) != 3:
@@ -1128,23 +1014,18 @@
1128 prev_version, 1014 prev_version,
1129 row_key)1015 row_key)
1130 if valid_txn != prev_version: 1016 if valid_txn != prev_version:
1131 print "Using journal version",valid_txn," versus invalid:",prev_version
1132 prev_version = valid_txn1017 prev_version = valid_txn
1133 if prev_version == long(NONEXISTANT_TRANSACTION):1018 if prev_version == long(NONEXISTANT_TRANSACTION):
1134 entity = None1019 entity = None
1135 else:1020 else:
1136 journal_table = getJournalTable(app_id, appscale_version)1021 journal_table = getJournalTable(app_id, appscale_version)
1137 journal_key = getJournalKey(row_key, prev_version)1022 journal_key = getJournalKey(row_key, prev_version)
1138 print "Retriving from ",journal_table,"key:",journal_key
1139 r = app_datastore.get_entity(journal_table, journal_key, ENTITY_TABLE_SCHEMA[:1] ) 1023 r = app_datastore.get_entity(journal_table, journal_key, ENTITY_TABLE_SCHEMA[:1] )
1140 err = r[0]1024 err = r[0]
1141 if err not in ERROR_CODES or len(r) != 2:1025 if err not in ERROR_CODES or len(r) != 2:
1142 print r
1143 print "UNABLE TO GET JOURNAL VERSION"
1144 r = ["",DELETED]1026 r = ["",DELETED]
11451027
1146 if len(r) > 1:1028 if len(r) > 1:
1147 print "GOT JOURNAL VERSION"
1148 entity = r[1]1029 entity = r[1]
1149 1030
1150 if entity[0:len(DELETED)] == DELETED:1031 if entity[0:len(DELETED)] == DELETED:
@@ -1156,7 +1037,6 @@
1156 group.mutable_entity().CopyFrom(e_pb)1037 group.mutable_entity().CopyFrom(e_pb)
1157 1038
1158 # Send Response #1039 # Send Response #
1159 print getresp_pb
1160 logger.debug("GET_RESPONSE: %s" % getresp_pb)1040 logger.debug("GET_RESPONSE: %s" % getresp_pb)
1161 return (getresp_pb.Encode(), 0, "")1041 return (getresp_pb.Encode(), 0, "")
11621042
@@ -1174,31 +1054,19 @@
1174 logger.debug("DELETE_REQUEST: %s" % delreq_pb)1054 logger.debug("DELETE_REQUEST: %s" % delreq_pb)
1175 delresp_pb = api_base_pb.VoidProto() 1055 delresp_pb = api_base_pb.VoidProto()
1176 if delreq_pb.has_transaction():1056 if delreq_pb.has_transaction():
1177 print "This delete has a transaction"
1178 txn = delreq_pb.transaction()1057 txn = delreq_pb.transaction()
11791058
1180 if not _trans_set.isValid(txn):
1181 #_trans_set.purge(txn)
1182 return (delresp_pb.Encode(),
1183 datastore_pb.Error.BAD_REQUEST,
1184 'Transaction timed out.')
1185
1186 root_key, errcode, errdetail = self.getRootKeyFromTransDel(app_id, delreq_pb)1059 root_key, errcode, errdetail = self.getRootKeyFromTransDel(app_id, delreq_pb)
1187 if errcode != 0:1060 if errcode != 0:
1188 #_trans_set.purge(txn)
1189 return (delresp_pb.Encode(), 1061 return (delresp_pb.Encode(),
1190 errcode,1062 errcode,
1191 errdetail)1063 errdetail)
1192 if _trans_set.needsLock(txn):1064 try:
1193 # zk.acquireLock(app_id, txn.handle())
1194 gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)1065 gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
1195 if gotLock:1066 except zk.ZKTransactionException, zkex:
1196 _trans_set.setGroup(txn, root_key) 1067 return (delresp_pb.Encode(),
1197 else:1068 datastore_pb.Error.CONCURRENT_TRANSACTION,
1198 #_trans_set.purge(txn)1069 'Another transaction is running. %s'%zkex.message)
1199 return (delresp_pb.Encode(),
1200 datastore_pb.Error.CONCURRENT_TRANSACTION,
1201 'Another transaction is running.')
12021070
12031071
1204 for key in delreq_pb.key_list():1072 for key in delreq_pb.key_list():
@@ -1212,7 +1080,6 @@
1212 # All deletes are transactional and per entity if 1080 # All deletes are transactional and per entity if
1213 # not already wrapped in a transaction1081 # not already wrapped in a transaction
1214 if not delreq_pb.has_transaction():1082 if not delreq_pb.has_transaction():
1215 print "This is not a composite transaction"
1216 txn, err, errcode = self.begin_transaction_request(app_id,1083 txn, err, errcode = self.begin_transaction_request(app_id,
1217 appscale_version,1084 appscale_version,
1218 http_request_data)1085 http_request_data)
@@ -1220,7 +1087,6 @@
1220 txn = datastore_pb.Transaction(txn)1087 txn = datastore_pb.Transaction(txn)
12211088
1222 if err != 0:1089 if err != 0:
1223 _trans_set.purge(txn)
1224 return (delresp_pb.Encode(), err, errcode)1090 return (delresp_pb.Encode(), err, errcode)
1225 root_key = getRootKey(app_id, key.path().element_list())1091 root_key = getRootKey(app_id, key.path().element_list())
1226 if root_key == None:1092 if root_key == None:
@@ -1233,15 +1099,13 @@
1233 # Get a lock for the group1099 # Get a lock for the group
1234 # Do a read before a write to get the old values1100 # Do a read before a write to get the old values
1235 ################################################1101 ################################################
1236 gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)1102 try:
1237 if gotLock:1103 gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
1238 _trans_set.setGroup(txn, root_key) 1104 except zk.ZKTransactionException, zkex:
1239 else:
1240 #_trans_set.purge(txn)
1241 self.maybe_rollback_transaction(app_id, delreq_pb, txn.handle())1105 self.maybe_rollback_transaction(app_id, delreq_pb, txn.handle())
1242 return (delresp_pb.Encode(),1106 return (delresp_pb.Encode(),
1243 datastore_pb.Error.CONCURRENT_TRANSACTION,1107 datastore_pb.Error.CONCURRENT_TRANSACTION,
1244 'Another transaction is running.')1108 'Another transaction is running. %s'%zkex.message)
12451109
1246 ##########################1110 ##########################
1247 # Get the previous version1111 # Get the previous version
@@ -1292,7 +1156,6 @@
1292 field_value_list )1156 field_value_list )
12931157
1294 if err not in ERROR_CODES:1158 if err not in ERROR_CODES:
1295 #_trans_set.purge(txn)
1296 self.maybe_rollback_transaction(app_id, delreq_pb, txn.handle())1159 self.maybe_rollback_transaction(app_id, delreq_pb, txn.handle())
1297 return (delresp_pb.Encode(),1160 return (delresp_pb.Encode(),
1298 datastore_pb.Error.INTERNAL_ERROR,1161 datastore_pb.Error.INTERNAL_ERROR,
@@ -1416,31 +1279,18 @@
1416 print "http done"1279 print "http done"
1417 self.void_proto(app_id, appscale_version, http_request_data)1280 self.void_proto(app_id, appscale_version, http_request_data)
14181281
1419 ########################
1420 # GET Request Handling #
1421 ########################
1422 def do_GET( self ):
1423 self.send_error( 404 , 'File Not Found: %s' % self.path )
1424 1282
1425 #########################1283 #########################
1426 # POST Request Handling #1284 # POST Request Handling #
1427 #########################1285 #########################
1428 def do_POST( self ):1286 @tornado.web.asynchronous
1429 start_time = time.time() 1287 def post( self ):
1430 http_request_data = self.rfile.read(int(self.headers.getheader('content-length')))1288 request = self.request
1431 inter_time = time.time()1289 http_request_data = request.body
1432 logger.debug("Timing for pre pre env setup:" + " " + 1290 pb_type = request.headers['protocolbuffertype']
1433 str(start_time) + " " + str(inter_time) + 1291 app_data = request.headers['appdata']
1434 " total time: " + str(inter_time - start_time) + "\n")
1435 print "intertime - starttime:",(str(inter_time - start_time))
1436 pb_type = self.headers.getheader( 'protocolbuffertype' )
1437 app_data = self.headers.getheader('appdata')
1438 app_data = app_data.split(':')1292 app_data = app_data.split(':')
1439 logger.debug("POST len: %d" % len(app_data))1293 logger.debug("POST len: %d" % len(app_data))
1440 inter_time = time.time()
1441 logger.debug("Timing for pre env setup:" + " " +
1442 str(start_time) + " " + str(inter_time) +
1443 " total time: " + str(inter_time - start_time) + "\n")
14441294
1445 if len(app_data) == 5:1295 if len(app_data) == 5:
1446 app_id, user_email, nick_name, auth_domain, appscale_version = app_data1296 app_id, user_email, nick_name, auth_domain, appscale_version = app_data
@@ -1470,42 +1320,12 @@
1470 # Default HTTP Response Data #1320 # Default HTTP Response Data #
1471 logger.debug("For app id: " + app_id)1321 logger.debug("For app id: " + app_id)
1472 logger.debug("For app version: " + appscale_version)1322 logger.debug("For app version: " + appscale_version)
1473 inter_time = time.time()
1474 logger.debug("Timing for env setup:" + pb_type + " " +
1475 app_id + " " + str(start_time) + " " +
1476 str(inter_time) + " total time: " + str(inter_time - start_time) + "\n")
14771323
1478 if pb_type == "Request":1324 if pb_type == "Request":
1479 self.remote_request(app_id, appscale_version, http_request_data)1325 self.remote_request(app_id, appscale_version, http_request_data)
1480 else:1326 else:
1481 self.unknown_request(app_id, appscale_version, http_request_data, pb_type)1327 self.unknown_request(app_id, appscale_version, http_request_data, pb_type)
1482 1328 self.finish()
1483 stop_time = time.time()
1484
1485 #if logOn == True:
1486 #logFilePtr.write(pb_type + " " + app_id + " " +
1487 #str(start_time) + " " + str(stop_time) + " total time: " +
1488 #str(stop_time - start_time) + "\n")
1489
1490 logger.debug(pb_type + " " + app_id + " " + str(start_time) + " " +
1491 str(stop_time) + " total time: " + str(stop_time - start_time) + "\n")
1492
1493class AppScaleUnSecureServerThreaded( ThreadingMixIn, HTTPServer):
1494 pass
1495
1496class AppScaleSecureServerThreaded( ThreadingMixIn, HTTPServer ):
1497 def __init__( self):
1498 global local_server_address
1499 global HandlerClass
1500 global ssl_cert_file
1501 global ssl_key_file
1502 BaseServer.__init__( self, local_server_address, HandlerClass )
1503 ctx = SSL.Context()
1504 ctx.load_cert( ssl_cert_file, ssl_key_file )
1505 self.socket = SSL.Connection( ctx )
1506 self.server_bind()
1507 self.server_activate()
1508
1509def usage():1329def usage():
1510 print "AppScale Server" 1330 print "AppScale Server"
1511 print1331 print
@@ -1518,6 +1338,11 @@
1518 print "\t--blocksize=<key-block-size>"1338 print "\t--blocksize=<key-block-size>"
1519 print "\t--optimized_query"1339 print "\t--optimized_query"
1520 print "\t--no_encryption"1340 print "\t--no_encryption"
1341
1342pb_application = tornado.web.Application([
1343 (r"/*", MainHandler),
1344])
1345
1521def main(argv):1346def main(argv):
1522 global app_datastore1347 global app_datastore
1523 global getKeyFromServer1348 global getKeyFromServer
@@ -1599,43 +1424,23 @@
1599 exit(1)1424 exit(1)
16001425
1601 tableServer = SOAPpy.SOAPProxy("https://" + soapServer + ":" + str(keyPort))1426 tableServer = SOAPpy.SOAPProxy("https://" + soapServer + ":" + str(keyPort))
1602
1603 # # Bind Port #
1604 #server = AppScaleSecureServer( ('',DEFAULT_SSL_PORT),
1605 # AppScaleSecureHandler, cert_file, key_file )
1606 #help(ThreadedHTTPServer)
1607 global local_server_address
1608 global HandlerClass
1609 global ssl_cert_file
1610 global ssl_key_file
1611 global keyDictionaryLock 1427 global keyDictionaryLock
16121428
1613 zoo_keeper = zktransaction.ZKTransaction(zoo_keeper_locations)1429 zoo_keeper = zktransaction.ZKTransaction(zoo_keeper_locations)
1614 keyDictionaryLock = threading.Lock()1430 keyDictionaryLock = threading.Lock()
1615 if port == DEFAULT_SSL_PORT and not isEncrypted:1431 if port == DEFAULT_SSL_PORT and not isEncrypted:
1616 port = DEFAULT_PORT1432 port = DEFAULT_PORT
1617 local_server_address = ('',port)1433 server = tornado.httpserver.HTTPServer(pb_application)
1618 HandlerClass = AppScaleSecureHandler1434 server.listen(port)
1619 ssl_cert_file = cert_file
1620 ssl_key_file = key_file
1621 if isEncrypted:
1622 server = AppScaleSecureServerThreaded()
1623 else:
1624 server = AppScaleUnSecureServerThreaded(local_server_address, HandlerClass)
1625 sa = server.socket.getsockname()
1626 if not db_type == "timesten":
1627 # Stop running as root, security purposes #
1628 drop_privileges()
1629 logger.debug("\n\nStarting AppScale-Secure-Server on %s:%s" % (sa[0], sa[1]))
16301435
1631 while 1:1436 while 1:
1632 try:1437 try:
1633 # Start Server #1438 # Start Server #
1634 server.serve_forever()1439 tornado.ioloop.IOLoop.instance().start()
1635 except SSL.SSLError:1440 except SSL.SSLError:
1636 logger.debug("\n\nUnexcepted input for AppScale-Secure-Server on %s:%s" % (sa[0], sa[1]))1441 logger.debug("\n\nUnexcepted input for AppScale-Secure-Server")
1637 except KeyboardInterrupt:1442 except KeyboardInterrupt:
1638 server.socket.close() 1443 #server.socket.close()
1639 print "Server interrupted by user, terminating..."1444 print "Server interrupted by user, terminating..."
1640 exit(1)1445 exit(1)
16411446
16421447
=== added file 'AppDB/appscale_server_native_trans.py'
--- AppDB/appscale_server_native_trans.py 1970-01-01 00:00:00 +0000
+++ AppDB/appscale_server_native_trans.py 2010-06-29 01:49:23 +0000
@@ -0,0 +1,1086 @@
1#!/usr/bin/python
2#
3# Author:
4# Navraj Chohan (nchohan@cs.ucsb.edu)
5# Soo Hwan Park (suwanny@gmail.com)
6# Sydney Pang (pang@cs.ucsb.edu)
7# See LICENSE file
8import tornado.httpserver
9import tornado.ioloop
10import tornado.web
11
12import sys
13import socket
14import os
15import types
16import appscale_datastore
17#import helper_functions
18import SOAPpy
19from dbconstants import *
20import appscale_logger
21import md5
22import random
23import getopt
24import threading
25
26from google.appengine.api import api_base_pb
27from google.appengine.api import datastore
28from google.appengine.api import datastore_errors
29from google.appengine.api import datastore_types
30from google.appengine.api import users
31from google.appengine.datastore import datastore_pb
32from google.appengine.datastore import datastore_index
33from google.appengine.runtime import apiproxy_errors
34from google.net.proto import ProtocolBuffer
35from google.appengine.datastore import entity_pb
36from google.appengine.ext.remote_api import remote_api_pb
37from SocketServer import BaseServer
38from M2Crypto import SSL
39from drop_privileges import *
40from zkappscale import zktransaction
41
42import time
43
44DEBUG = False
45APP_TABLE = APPS_TABLE
46USER_TABLE = USERS_TABLE
47DEFAULT_USER_LOCATION = ".flatfile_users"
48DEFAULT_APP_LOCATION = ".flatfile_apps"
49HYPERTABLE_XML_TAG = "Name"
50DEFAULT_DATASTORE = "files"
51DEFAULT_SSL_PORT = 8443
52DEFAULT_PORT = 4080
53DEFAULT_ENCRYPTION = 1
54CERT_LOCATION = "/etc/appscale/certs/mycert.pem"
55KEY_LOCATION = "/etc/appscale/certs/mykey.pem"
56SECRET_LOCATION = "/etc/appscale/secret.key"
57VALID_DATASTORES = []
58ERROR_CODES = []
59app_datastore = []
60logOn = False
61logFilePtr = ""
62zoo_keeper = ""
63
64getKeyFromServer = False
65soapServer = "localhost"
66tableServer = ""
67keyPort = 4343
68keySecret = ""
69KEYBLOCKSIZE = "50"
70keyDictionaryLock = None
71keyDictionary = {}
72
73optimizedQuery = False
74ID_KEY_LENGTH = 64
75tableHashTable = {}
76
77local_server_address = ""
78HandlerClass = ""
79ssl_cert_file = ""
80ssl_key_file = ""
81
82DELETED = "DELETED___"
83"""
84Deleted keys are DELETED/<row_key>
85"""
86
87"""
88keys for tables take the format
89appname/Grandparent:<ID>/Parent:<ID>/Child:<ID>
90for the entity table
91"""
92
93
94class ThreadLogger:
95 def __init__(self, log):
96 self.logger_ = log
97 self.log_lock = threading.Lock()
98
99 def debug(self, string):
100 return
101 self.log_lock.acquire()
102 print string
103 self.logger_.info(string)
104 self.log_lock.release()
105
106logger = appscale_logger.getLogger("pb_server")
107
108
109def getTableName(app_id, kind, version):
110 return app_id + "___" + kind + "___" + version
111
112def getRowKey(app_id, ancestor_list):
113 if ancestor_list == None:
114 logger.debug("Generate row key received null ancestor list")
115 return ""
116
117 key = app_id
118
119 # Note: mysql cannot have \ as the first char in the row key
120 for a in ancestor_list:
121 key += "/"
122 if a.has_type():
123 key += a.type()
124
125 if a.has_id():
126 zero_padded_id = ("0" * (ID_KEY_LENGTH - len(str(a.id())))) + str(a.id())
127 key += ":" + zero_padded_id
128 elif a.has_name():
129 # append _ if the name is a number, prevents collisions of key names
130 if a.name().isdigit():
131 key += ":__key__" + a.name()
132 else:
133 key += ":" + a.name()
134 return key
135
136
137def getRootKey(app_id, ancestor_list):
138 key = app_id # mysql cannot have \ as the first char in the row key
139 a = ancestor_list[0]
140 key += "/"
141
142 # append _ if the name is a number, prevents collisions of key names
143 if a.has_type():
144 key += a.type()
145 else:
146 return None
147
148 if a.has_id():
149 zero_padded_id = ("0" * (ID_KEY_LENGTH - len(str(a.id())))) + str(a.id())
150 key += ":" + zero_padded_id
151 elif a.has_name():
152 if a.name().isdigit():
153 key += ":__key__" + a.name()
154 else:
155 key += ":" + a.name()
156 else:
157 return None
158
159 return key
160
161
162def getRootKeyFromKeyType(app_id, key):
163 ancestor_list = key._Key__reference.path().element_list()
164 return getRootKey(app_id, ancestor_list)
165
166
167def getRowKeyFromKeyType(app_id, key):
168 ancestor_list = key._Key__reference.path().element_list()
169 return getRowKey(app_id, ancestor_list)
170
171def generate_unique_id(app_id, root, isChild):
172 global keyDictionary
173 global keyDictionaryLock
174
175 if isChild:
176 if not root:
177 return -1
178
179 index = None
180 if isChild:
181 index = app_id + "/" + str(root)
182 else:
183 index = app_id
184
185 keyDictionaryLock.acquire()
186 try:
187 keyStart, keyEnd = keyDictionary[index]
188 except:
189 keyStart = 0
190 keyEnd = 0
191
192 key = 0
193 if keyStart != keyEnd:
194 key = keyStart
195 keyStart = keyStart + 1
196 keyDictionary[index]= keyStart, keyEnd
197 keyDictionaryLock.release()
198 return key
199 else:
200 try:
201 if not isChild:
202 keyStart, blockSize = zoo_keeper.generateIDBlock(app_id)
203 keyStart = long(keyStart)
204 else:
205 keyStart, blockSize = zoo_keeper.generateIDBlock(app_id, root)
206 keyStart = long(keyStart)
207 except:
208 print "="*60
209 print "Exception: when getting id block"
210 print "="*60
211 keyDictionaryLock.release()
212 return -1
213 keyEnd = keyStart + long(blockSize)
214 key = keyStart
215 keyStart = keyStart + 1
216 keyDictionary[index] = keyStart, keyEnd
217 keyDictionaryLock.release()
218 return key
219
220
221def getRootKeyFromRef(app_id, ref):
222 if not ref.has_path():
223 return False
224 path = ref.path()
225 element_list = path.element_list()
226 return getRootKey(app_id, element_list)
227
228
229def rollback_function(app_id, trans_id, root_key, change_set):
230 pass
231
232
233
234
235class MainHandler(tornado.web.RequestHandler):
236 """
237 Defines what to do when the webserver receives different types of
238 HTTP requests.
239 """
240 @tornado.web.asynchronous
241 def get(self):
242 self.write("Hi")
243 self.finish()
244 # remote api request
245 # sends back a response
246 def remote_request(self, app_id, appscale_version, http_request_data):
247 apirequest = remote_api_pb.Request(http_request_data)
248 apiresponse = remote_api_pb.Response()
249 response = None
250 errcode = 0
251 errdetail = ""
252 apperror_pb = None
253
254 if not apirequest.has_method():
255 errcode = datastore_pb.Error.BAD_REQUEST
256 errdetail = "Method was not set in request"
257 apirequest.set_method("NOT_FOUND")
258 if not apirequest.has_request():
259 errcode = datastore_pb.Error.BAD_REQUEST
260 errdetail = "Request missing in call"
261 apirequest.set_method("NOT_FOUND")
262 apirequest.clear_request()
263 method = apirequest.method()
264 request_data = apirequest.request()
265 http_request_data = request_data.contents()
266
267 #print "REQUEST:",method," AT time",time.time()
268 if method == "Put":
269 response, errcode, errdetail = self.put_request(app_id,
270 appscale_version,
271 http_request_data)
272 elif method == "Get":
273 response, errcode, errdetail = self.get_request(app_id,
274 appscale_version,
275 http_request_data)
276 elif method == "Delete":
277 response, errcode, errdetail = self.delete_request(app_id,
278 appscale_version,
279 http_request_data)
280 elif method == "RunQuery":
281 response, errcode, errdetail = self.run_query(app_id,
282 appscale_version,
283 http_request_data)
284 elif method == "BeginTransaction":
285 response, errcode, errdetail = self.begin_transaction_request(app_id,
286 appscale_version,
287 http_request_data)
288 elif method == "Commit":
289 response, errcode, errdetail = self.commit_transaction_request(app_id,
290 appscale_version,
291 http_request_data)
292 elif method == "Rollback":
293 response, errcode, errdetail = self.rollback_transaction_request(app_id,
294 appscale_version,
295 http_request_data)
296 elif method == "AllocateIds":
297 response, errcode, errdetail = self.allocate_ids_request(app_id,
298 appscale_version,
299 http_request_data)
300 elif method == "CreateIndex":
301 errcode = datastore_pb.Error.PERMISSION_DENIED
302 errdetail = "Create Index is not implemented"
303 logger.debug(errdetail)
304 """
305 response, errcode, errdetail = self.create_index_request(app_id,
306 appscale_version,
307 http_request_data)
308 """
309 elif method == "GetIndices":
310 errcode = datastore_pb.Error.PERMISSION_DENIED
311 errdetail = "GetIndices is not implemented"
312 logger.debug(errdetail)
313 """
314 response, errcode, errdetail = self.get_indices_request(app_id,
315 appscale_version,
316 http_request_data)
317 """
318 elif method == "UpdateIndex":
319 errcode = datastore_pb.Error.PERMISSION_DENIED
320 errdetail = "UpdateIndex is not implemented"
321 logger.debug(errdetail)
322 """
323 response, errcode, errdetail = self.update_index_request(app_id,
324 appscale_version,
325 http_request_data)
326 """
327 elif method == "DeleteIndex":
328 errcode = datastore_pb.Error.PERMISSION_DENIED
329 errdetail = "DeleteIndex is not implemented"
330 logger.debug(errdetail)
331
332 """
333 response, errcode, errdetail = self.delete_index_request(app_id,
334 appscale_version,
335 http_request_data)
336 """
337 else:
338 errcode = datastore_pb.Error.BAD_REQUEST
339 errdetail = "Unknown datastore message"
340 logger.debug(errdetail)
341
342
343 rawmessage = apiresponse.mutable_response()
344 if response:
345 rawmessage.set_contents(response)
346
347 if errcode != 0:
348 apperror_pb = apiresponse.mutable_application_error()
349 apperror_pb.set_code(errcode)
350 apperror_pb.set_detail(errdetail)
351 if errcode != 0:
352 print "REPLY",method," AT TIME",time.time()
353 print "errcode:",errcode
354 print "errdetail:",errdetail
355 self.write(apiresponse.Encode() )
356
357
358 def run_query(self, app_id, appscale_version, http_request_data):
359 global app_datastore
360 query = datastore_pb.Query(http_request_data)
361 logger.debug("QUERY:%s" % query)
362 results = []
363
364 if not query.has_kind():
365 # Return nothing in case of error #
366 return (api_base_pb.VoidProto().Encode(),
367 datastore_pb.Error.PERMISSION_DENIED,
368 "Kindless queries are not implemented.")
369 else:
370 kind = query.kind()
371 #print "Query kind:",kind
372
373 # Verify validity of the entity name and applicaiton id #
374 # according to the naming sheme for entity tables #
375 #assert kind[-2:] != "__"
376 #assert app_id[-1] != "_"
377
378 # Fetch query from the datastore #
379 table_name = getTableName(app_id, kind, appscale_version)
380 #print "Query using table name:",table_name
381 if query.has_transaction():
382 txn = query.transaction()
383 r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA, txn.handle())
384 else:
385 r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA)
386 #logger.debug("result: %s" % r)
387
388 #err = r[0]
389 if len(r) > 1:
390 results = r[1:]
391 else:
392 results = []
393
394 # odds are versions
395 versions = results[1::2]
396 # evens are encoded entities
397 results = results[0::2]
398 #print "RESULTS:",results
399 #print "VERSIONS:",versions
400 if len(versions) != len(results):
401 return(api_base_pb.VoidProto().Encode(),
402 datastore_pb.Error.INTERNAL_ERROR,
403 'The query had a bad number of results.')
404
405 # convert to objects
406 # Unless its marked as deleted
407 # They are currently strings
408 for index, res in enumerate(results):
409 results[index] = entity_pb.EntityProto(res)
410 results[index] = datastore.Entity._FromPb(results[index])
411
412 logger.debug("====results pre filter====")
413 #logger.debug("%s" % results)
414 if query.has_ancestor():
415 ancestor_path = query.ancestor().path().element_list()
416 def is_descendant(entity):
417 path = entity.key()._Key__reference.path().element_list()
418 return path[:len(ancestor_path)] == ancestor_path
419 results = filter(is_descendant, results)
420
421 operators = {datastore_pb.Query_Filter.LESS_THAN: '<',
422 datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: '<=',
423 datastore_pb.Query_Filter.GREATER_THAN: '>',
424 datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=',
425 datastore_pb.Query_Filter.EQUAL: '==',
426 }
427
428 for filt in query.filter_list():
429 assert filt.op() != datastore_pb.Query_Filter.IN
430
431 prop = filt.property(0).name().decode('utf-8')
432 op = operators[filt.op()]
433
434 def passes(entity):
435 """ Returns True if the entity passes the filter, False otherwise. """
436 entity_vals = entity.get(prop, [])
437 if type(entity_vals) != types.ListType:
438 entity_vals = [entity_vals]
439
440 entity_property_list = [datastore_types.ToPropertyPb(prop, value) for value in entity_vals]
441
442 for entity_prop in entity_property_list:
443 fixed_entity_val = datastore_types.FromPropertyPb(entity_prop)
444
445 for filter_prop in filt.property_list():
446 filter_val = datastore_types.FromPropertyPb(filter_prop)
447 comp = u'%r %s %r' % (fixed_entity_val, op, filter_val)
448 logger.debug('Evaling filter expression "%s"' % comp )
449 if eval(comp):
450 return True
451 return False
452
453 results = filter(passes, results)
454
455 for order in query.order_list():
456 prop = order.property().decode('utf-8')
457 results = [entity for entity in results if prop in entity]
458
459 def order_compare(a, b):
460 """ Return a negative, zero or positive number depending on whether
461 entity a is considered smaller than, equal to, or larger than b,
462 according to the query's orderings. """
463 for o in query.order_list():
464 prop = o.property().decode('utf-8')
465
466 a_values = a[prop]
467 if not isinstance(a_values, types.ListType):
468 a_values = [a_values]
469
470 b_values = b[prop]
471 if not isinstance(b_values, types.ListType):
472 b_values = [b_values]
473
474 cmped = cmp(min(a_values), min(b_values))
475
476 if o.direction() == datastore_pb.Query_Order.DESCENDING:
477 cmped = -cmped
478
479 if cmped != 0:
480 return cmped
481
482 return 0
483
484 results.sort(order_compare)
485
486 if query.has_limit():
487 results = results[:query.limit()]
488 logger.debug("****results after filtering:****")
489 logger.debug("%s" % results)
490
491 results = [ent._ToPb() for ent in results]
492 # Pack Results into a clone of QueryResult #
493 clone_qr_pb = datastore_pb.QueryResult()
494 for res in results:
495 clone_qr_pb.add_result()
496 clone_qr_pb.result_[-1] = res
497
498 clone_qr_pb.clear_cursor()
499 clone_qr_pb.set_more_results( len(results)>0 )
500
501 logger.debug("QUERY_RESULT: %s" % clone_qr_pb)
502 return (clone_qr_pb.Encode(), 0, "")
503
504
505 def begin_transaction_request(self, app_id, appscale_version, http_request_data):
506 transaction_pb = datastore_pb.Transaction()
507 handle = generate_unique_id(app_id, None, None)
508 #print "Begin Trans Handle:",handle
509 transaction_pb.set_handle(handle)
510 app_datastore.setupTransaction(handle)
511 return (transaction_pb.Encode(), 0, "")
512
513 def commit_transaction_request(self, app_id, appscale_version, http_request_data):
514 transaction_pb = datastore_pb.Transaction(http_request_data)
515 handle = transaction_pb.handle()
516 commitres_pb = datastore_pb.CommitResponse()
517 try:
518 app_datastore.commit(handle)
519 except:
520 return (commitres_pb.Encode(), datastore_pb.Error.PERMISSION_DENIED, "Unable to commit for this transaction")
521 return (commitres_pb.Encode(), 0, "")
522
523 def rollback_transaction_request(self, app_id, appscale_version, http_request_data):
524 transaction_pb = datastore_pb.Transaction(http_request_data)
525 handle = transaction_pb.handle()
526 try:
527 app_datastore.rollback(handle)
528 except:
529 return(api_base_pb.VoidProto().Encode(), datastore_pb.Error.PERMISSION_DENIED, "Unable to rollback for this transaction")
530 return (api_base_pb.VoidProto().Encode(), 0, "")
531
532
533 def allocate_ids_request(self, app_id, appscale_version, http_request_data):
534 return (api_base_pb.VoidProto().Encode(),
535 datastore_pb.Error.PERMISSION_DENIED,
536 'Allocation of block ids not implemented.')
537
538
539 # Returns Null on error
540 def getRootKeyFromEntity(self, app_id, entity):
541 key = entity.key()
542 if str(key.__class__) == "google.appengine.datastore.entity_pb.Reference":
543 return getRootKeyFromRef(app_id, key)
544 else:
545 return getRootKeyFromKeyType(app_id, key)
546
547
548 # For transactions
549 # Verifies all puts are apart of the same root entity
550 def getRootKeyFromTransPut(self, app_id, putreq_pb):
551 ent_list = []
552 if putreq_pb.entity_size() > 0:
553 ent_list = putreq_pb.entity_list()
554 first_ent = ent_list[0]
555 expected_root = self.getRootKeyFromEntity(app_id, first_ent)
556 # It is possible that all roots are None
557 # because it is a root that has not gotten a uid
558
559 for e in ent_list:
560 root = self.getRootKeyFromEntity(app_id, e)
561 if root != expected_root:
562 errcode = datastore_pb.Error.BAD_REQUEST
563 errdetail = "All puts must be a part of the same group"
564 return (None, errcode, errdetail)
565
566 return (expected_root, 0, "")
567
568
569 # For transactions
570 # Verifies all puts are apart of the same root entity
571 def getRootKeyFromTransReq(self, app_id, req_pb):
572 if req_pb.key_size() <= 0:
573 errcode = datastore_pb.error.bad_request
574 errdetail = "Bad key listing"
575 return (None, errcode, errdetail)
576
577 key_list = req_pb.key_list()
578 first_key = key_list[0]
579
580 expected_root = getRootKeyFromRef(app_id, first_key)
581 # It is possible that all roots are None
582 # because it is a root that has not gotten a uid
583
584 for k in key_list:
585 root = getRootKeyFromRef(app_id, k)
586 if root != expected_root:
587 errcode = datastore_pb.error.bad_request
588 errdetail = "all transaction gets must be a part of the same group"
589 return (None, errcode, errdetail)
590
591 return (expected_root, 0, "")
592
593 def getRootKeyFromTransGet(self, app_id, get_pb):
594 return self.getRootKeyFromTransReq(app_id, get_pb)
595
596 def getRootKeyFromTransDel(self, app_id, del_pb):
597 return self.getRootKeyFromTransReq(app_id, del_pb)
598
599 def put_request(self, app_id, appscale_version, http_request_data):
600 global app_datastore
601 global keySecret
602 global tableHashTable
603
604 field_name_list = []
605 field_value_list = []
606
607 start_time = time.time()
608 putreq_pb = datastore_pb.PutRequest(http_request_data)
609 logger.debug("RECEIVED PUT_REQUEST %s" % putreq_pb)
610 putresp_pb = datastore_pb.PutResponse( )
611 txn = None
612 root_key = None
613 # Must assign an id if a put is being done in a transaction
614 # and it does not have an id and it is a root
615 for e in putreq_pb.entity_list():
616 # Only dealing with root puts
617 if e.key().path().element_size() == 1:
618 root_path = e.key().path().mutable_element(0)
619 #print "has id:",root_path.has_id(), "has name:",root_path.has_name()
620 if root_path.id() == 0 and not root_path.has_name():
621 #new_key = root_key + "/" + last_path.type()
622 uid = generate_unique_id(app_id, None, None)
623 #print "Assigned uid to new root key:",str(uid)
624 if uid <= 0:
625 return (putresp_pb.Encode(),
626 datastore_pb.Error.INTERNAL_ERROR,
627 'Unable to assign a unique id')
628 root_path.set_id(uid)
629
630 # Gather data from Put Request #
631 #print "Entity list for put:"
632 #print putreq_pb.entity_list()
633 for e in putreq_pb.entity_list():
634
635 for prop in e.property_list() + e.raw_property_list():
636 if prop.value().has_uservalue():
637 obuid = md5.new(prop.value().uservalue().email().lower()).digest()
638 obuid = '1' + ''.join(['%02d' % ord(x) for x in obuid])[:20]
639 prop.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(
640 obuid)
641
642 #################################
643 # Key Assignment for new entities
644 #################################
645 e.mutable_key().set_app(app_id)
646
647 root_type = e.key().path().element(0).type()
648 if root_type[-2:] == "__":
649 return(putresp_pb.Encode(),
650 datastore_pb.Error.PERMISSION_DENIED,
651 "Illegal type name contains reserved delimiters \"__\"")
652
653 last_path = e.key().path().element_list()[-1]
654 uid = last_path.id()
655 kind = last_path.type()
656 # this object has no assigned id thus far
657 if last_path.id() == 0 and not last_path.has_name():
658 if e.key().path().element_size() == 1:
659 root_key = None
660 if root_key:
661 child_key = root_key + "/" + last_path.type()
662 else:
663 child_key = None
664 # if the root is None or the child is None,
665 # then the global counter is used
666 # gen unique id only wants to know if a child exist
667 uid = generate_unique_id(app_id, root_key, child_key)
668 if uid <= 0:
669 return(putresp_pb.Encode(),
670 datastore_pb.Error.INTERNAL_ERROR,
671 "Unable to assign id to entity")
672 last_path.set_id(uid)
673 # It may be its own parent
674 group = e.mutable_entity_group()
675 root = e.key().path().element(0)
676 group.add_element().CopyFrom(root)
677 if last_path.has_name():
678 uid = last_path.name()
679 # It may be its own parent
680 group = e.mutable_entity_group()
681 if group.element_size() == 0:
682 root = e.key().path().element(0)
683 group.add_element().CopyFrom(root)
684
685 #######################################
686 # Done with key assignment
687 # Notify Soap Server of any new tables
688 #######################################
689 #print "Putting of type:",kind,"with uid of",str(uid)
690 # insert key
691 table_name = getTableName(app_id, kind, appscale_version)
692 #print "Put Using table name:",table_name
693 # Notify Users/Apps table if a new class is being added
694 if table_name not in tableHashTable:
695 # This is the first time this pbserver has seen this table
696 # Notify the User/Apps server via soap call
697 # This function is reentrant
698 # If the class was deleted, and added a second time there is no
699 # notifying the users/app server of its creation
700 if tableServer.add_class(app_id, kind, keySecret) == "true":
701 tableHashTable[table_name] = 1
702
703 # Store One Entity #
704 logger.debug("put: %s___%s___%s with id: %s" % (app_id,
705 kind,
706 appscale_version,
707 str(uid)))
708
709 row_key = getRowKey(app_id, e.key().path().element_list())
710 inter_time = time.time()
711 logger.debug("Time spent in put before datastore call: " + str(inter_time - start_time))
712
713
714 field_name_list = ENTITY_TABLE_SCHEMA
715 field_value_list = [e.Encode(), NONEXISTANT_TRANSACTION]
716 if putreq_pb.has_transaction():
717 txn = putreq_pb.transaction()
718 err, res = app_datastore.put_entity( table_name,
719 row_key,
720 field_name_list,
721 field_value_list,
722 txn.handle())
723 else:
724 err, res = app_datastore.put_entity( table_name,
725 row_key,
726 field_name_list,
727 field_value_list)
728
729 if err not in ERROR_CODES:
730 #_trans_set.purge(txn)
731 return (putresp_pb.Encode(),
732 datastore_pb.Error.INTERNAL_ERROR,
733 err)
734
735 putresp_pb.key_list().append(e.key())
736
737 inter_time = time.time()
738 logger.debug("Time spent in put after datastore call: " + str(inter_time - start_time))
739 logger.debug( "PUT_RESPONSE:%s" % putresp_pb)
740 return (putresp_pb.Encode(), 0, "")
741
742
743 def get_request(self, app_id, appscale_version, http_request_data):
744 global app_datastore
745 getreq_pb = datastore_pb.GetRequest(http_request_data)
746 logger.debug("GET_REQUEST: %s" % getreq_pb)
747 getresp_pb = datastore_pb.GetResponse()
748
749
750 for key in getreq_pb.key_list():
751 key.set_app(app_id)
752 last_path = key.path().element_list()[-1]
753
754 if last_path.has_id():
755 entity_id = last_path.id()
756
757 if last_path.has_name():
758 entity_id = last_path.name()
759
760 if last_path.has_type():
761 kind = last_path.type()
762 logger.debug("get: %s___%s___%s %s" % (app_id, kind, appscale_version, str(entity_id)))
763 table_name = getTableName(app_id, kind, appscale_version)
764 row_key = getRowKey(app_id,key.path().element_list())
765 #print "Get row key:",row_key
766 if getreq_pb.has_transaction():
767 txn = getreq_pb.transaction()
768 r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA, txn.handle())
769 else:
770 r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA )
771 err = r[0]
772 if err not in ERROR_CODES or len(r) != 3:
773 r = ["",None,NONEXISTANT_TRANSACTION]
774 print err
775 entity = r[1]
776 prev_version = long(r[2])
777
778 group = getresp_pb.add_entity()
779 if entity:
780 e_pb = entity_pb.EntityProto( entity )
781 group.mutable_entity().CopyFrom(e_pb)
782
783 # Send Response #
784 #print getresp_pb
785 logger.debug("GET_RESPONSE: %s" % getresp_pb)
786 return (getresp_pb.Encode(), 0, "")
787
788 """ Deletes are just PUTs using a sentinal value of DELETED
789 All deleted keys are DELETED/entity_group. This is for
790 rollback to know which entity group a possible failed
791 transaction belongs to.
792 """
793 def delete_request(self, app_id, appscale_version, http_request_data):
794 global app_datastore
795 root_key = None
796 txn = None
797 logger.debug("DeleteRequest Received...")
798 delreq_pb = datastore_pb.DeleteRequest( http_request_data )
799 logger.debug("DELETE_REQUEST: %s" % delreq_pb)
800 delresp_pb = api_base_pb.VoidProto()
801
802 for key in delreq_pb.key_list():
803 key.set_app(app_id)
804 last_path = key.path().element_list()[-1]
805 if last_path.has_type():
806 kind = last_path.type()
807
808 row_key = getRowKey(app_id, key.path().element_list())
809
810
811 table_name = getTableName(app_id, kind, appscale_version)
812 if delreq_pb.has_transaction():
813 txn = delreq_pb.transaction()
814 res = app_datastore.delete_row( table_name, row_key, txn.handle())
815 else:
816 res = app_datastore.delete_row( table_name,
817 row_key)
818 err = res[0]
819 logger.debug("Response from DB for delete request %s" % err)
820 if err not in ERROR_CODES:
821 if DEBUG: print err
822 return (delresp_pb.Encode(),
823 datastore_pb.Error.INTERNAL_ERROR,
824 err + ", Unable to delete row")
825
826 return (delresp_pb.Encode(), 0, "")
827
828
829 def optimized_delete_request(self, app_id, appscale_version, http_request_data):
830 pass
831 def run_optimized_query(self, app_id, appscale_version, http_request_data):
832 return
833 def optimized_put_request(self, app_id, appscale_version, http_request_data):
834 pass
835
836 def void_proto(self, app_id, appscale_version, http_request_data):
837 resp_pb = api_base_pb.VoidProto()
838 print "Got void"
839 logger.debug("VOID_RESPONSE: %s to void" % resp_pb)
840 return (resp_pb.Encode(), 0, "" )
841
842 def str_proto(self, app_id, appscale_version, http_request_data):
843 str_pb = api_base_pb.StringProto( http_request_data )
844 composite_pb = datastore_pb.CompositeIndices()
845 print "Got a string proto"
846 print str_pb
847 logger.debug("String proto received: %s"%str_pb)
848 logger.debug("CompositeIndex response to string: %s" % composite_pb)
849 return (composite_pb.Encode(), 0, "" )
850
851 def int64_proto(self, app_id, appscale_version, http_request_data):
852 int64_pb = api_base_pb.Integer64Proto( http_request_data )
853 resp_pb = api_base_pb.VoidProto()
854 print "Got a int 64"
855 print int64_pb
856 logger.debug("Int64 proto received: %s"%int64_pb)
857 logger.debug("VOID_RESPONSE to int64: %s" % resp_pb)
858 return (resp_pb.Encode(), 0, "")
859
860 def compositeindex_proto(self, app_id, appscale_version, http_request_data):
861 compindex_pb = entity_pb.CompositeIndex( http_request_data)
862 resp_pb = api_base_pb.VoidProto()
863 print "Got Composite Index"
864 print compindex_pb
865 logger.debug("CompositeIndex proto recieved: %s"%str(compindex_pb))
866 logger.debug("VOID_RESPONSE to composite index: %s" % resp_pb)
867 return (resp_pb.Encode(), 0, "")
868
869# Returns 0 on success, 1 on failure
870 def create_index_tables(self, app_id):
871 global app_datastore
872 """table_name = "__" + app_id + "__" + "kind"
873 columns = ["reference"]
874 print "Building table: " + table_name
875 returned = app_datastore.create_table( table_name, columns )
876 err,res = returned
877 if err not in ERROR_CODES:
878 logger.debug("%s" % err)
879 return 1
880 """
881 table_name = "__" + app_id + "__" + "single_prop_asc"
882 print "Building table: " + table_name
883 columns = ["reference"]
884 returned = app_datastore.create_table( table_name, columns )
885 err,res = returned
886 if err not in ERROR_CODES:
887 logger.debug("%s" % err)
888 return 1
889
890 table_name = "__" + app_id + "__" + "single_prop_desc"
891 print "Building table: " + table_name
892 returned = app_datastore.create_table( table_name, columns )
893 err,res = returned
894 if err not in ERROR_CODES:
895 logger.debug("%s" % err)
896 return 1
897
898 table_name = "__" + app_id + "__" + "composite"
899 print "Building table: " + table_name
900 returned = app_datastore.create_table( table_name, columns )
901 err,res = returned
902 if err not in ERROR_CODES:
903 logger.debug("%s" % err)
904 return 1
905
906 return 0
907
908 ##############
909 # OTHER TYPE #
910 ##############
911 def unknown_request(self, app_id, appscale_version, http_request_data, pb_type):
912 logger.debug("Received Unknown Protocol Buffer %s" % pb_type )
913 print "ERROR: Received Unknown Protocol Buffer <" + pb_type +">.",
914 print "Nothing has been implemented to handle this Protocol Buffer type."
915 print "http request data:"
916 print http_request_data
917 print "http done"
918 self.void_proto(app_id, appscale_version, http_request_data)
919
920
921 #########################
922 # POST Request Handling #
923 #########################
924 @tornado.web.asynchronous
925 def post( self ):
926 request = self.request
927 http_request_data = request.body
928 pb_type = request.headers['protocolbuffertype']
929 app_data = request.headers['appdata']
930 app_data = app_data.split(':')
931
932 if len(app_data) == 5:
933 app_id, user_email, nick_name, auth_domain, appscale_version = app_data
934 os.environ['AUTH_DOMAIN'] = auth_domain
935 os.environ['USER_EMAIL'] = user_email
936 os.environ['USER_NICKNAME'] = nick_name
937 os.environ['APPLICATION_ID'] = app_id
938 elif len(app_data) == 4:
939 app_id, user_email, nick_name, auth_domain = app_data
940 os.environ['AUTH_DOMAIN'] = auth_domain
941 os.environ['USER_EMAIL'] = user_email
942 os.environ['USER_NICKNAME'] = nick_name
943 os.environ['APPLICATION_ID'] = app_id
944 appscale_version = "1"
945 elif len(app_data) == 2:
946 app_id, appscale_version = app_data
947 app_id = app_data[0]
948 os.environ['APPLICATION_ID'] = app_id
949 elif len(app_data) == 1:
950 app_id = app_data[0]
951 os.environ['APPLICATION_ID'] = app_id
952 appscale_version = "1"
953 else:
954 logger.debug("UNABLE TO EXTRACT APPLICATION DATA")
955 return
956
957 # Default HTTP Response Data #
958
959 if pb_type == "Request":
960 self.remote_request(app_id, appscale_version, http_request_data)
961 else:
962 self.unknown_request(app_id, appscale_version, http_request_data, pb_type)
963 self.finish()
964
965
966def usage():
967 print "AppScale Server"
968 print
969 print "Options:"
970 print "\t--certificate=<path-to-ssl-certificate>"
971 print "\t--a=<soap server hostname> "
972 print "\t--key for using keys from the soap server"
973 print "\t--type=<hypertable, hbase, cassandra, mysql, mongodb>"
974 print "\t--secret=<secrete to soap server>"
975 print "\t--blocksize=<key-block-size>"
976 print "\t--optimized_query"
977 print "\t--no_encryption"
978def main(argv):
979 global app_datastore
980 global getKeyFromServer
981 global tableServer
982 global keySecret
983 global logOn
984 global logFilePtr
985 global optimizedQuery
986 global soapServer
987 global ERROR_CODES
988 global VALID_DATASTORES
989 global KEYBLOCKSIZE
990 global zoo_keeper
991 cert_file = CERT_LOCATION
992 key_file = KEY_LOCATION
993 db_type = "hypertable"
994 port = DEFAULT_SSL_PORT
995 isEncrypted = True
996 try:
997 opts, args = getopt.getopt( argv, "c:t:l:s:b:a:k:p:o:n:z:",
998 ["certificate=",
999 "type=",
1000 "log=",
1001 "secret=",
1002 "blocksize=",
1003 "soap=",
1004 "key",
1005 "port",
1006 "optimized_query",
1007 "no_encryption",
1008 "zoo_keeper"] )
1009 except getopt.GetoptError:
1010 usage()
1011 sys.exit(1)
1012 for opt, arg in opts:
1013 if opt in ("-c", "--certificate"):
1014 cert_file = arg
1015 print "Using cert..."
1016 elif opt in ("-k", "--key" ):
1017 getKeyFromServer = True
1018 print "Using key server..."
1019 elif opt in ("-t", "--type"):
1020 db_type = arg
1021 print "Datastore type: ",db_type
1022 elif opt in ("-s", "--secret"):
1023 keySecret = arg
1024 print "Secret set..."
1025 elif opt in ("-l", "--log"):
1026 logOn = True
1027 logFile = arg
1028 logFilePtr = open(logFile, "w")
1029 logFilePtr.write("# type, app, start, end\n")
1030 elif opt in ("-b", "--blocksize"):
1031 KEYBLOCKSIZE = arg
1032 print "Block size: ",KEYBLOCKSIZE
1033 elif opt in ("-a", "--soap"):
1034 soapServer = arg
1035 elif opt in ("-o", "--optimized_query"):
1036 optimizedQuery = True
1037 elif opt in ("-p", "--port"):
1038 port = int(arg)
1039 elif opt in ("-n", "--no_encryption"):
1040 isEncrypted = False
1041 elif opt in ("-z", "--zoo_keeper"):
1042 zoo_keeper_locations = arg
1043
1044 app_datastore = appscale_datastore.DatastoreFactory.getDatastore(db_type)
1045 ERROR_CODES = appscale_datastore.DatastoreFactory.error_codes()
1046 VALID_DATASTORES = appscale_datastore.DatastoreFactory.valid_datastores()
1047 if DEBUG: print "ERROR_CODES:"
1048 if DEBUG: print ERROR_CODES
1049 if DEBUG: print "VALID_DATASTORE:"
1050 if DEBUG: print VALID_DATASTORES
1051 if db_type in VALID_DATASTORES:
1052 logger.debug("Using datastore %s" % db_type)
1053 else:
1054 print "Unknown datastore "+ db_type
1055 exit(1)
1056
1057 tableServer = SOAPpy.SOAPProxy("https://" + soapServer + ":" + str(keyPort))
1058
1059 global keyDictionaryLock
1060 zoo_keeper = zktransaction.ZKTransaction(zoo_keeper_locations)
1061
1062 keyDictionaryLock = threading.Lock()
1063 if port == DEFAULT_SSL_PORT and not isEncrypted:
1064 port = DEFAULT_PORT
1065 pb_application = tornado.web.Application([
1066 (r"/*", MainHandler),
1067 ])
1068 server = tornado.httpserver.HTTPServer(pb_application)
1069 server.listen(port)
1070 if not db_type == "timesten":
1071 # Stop running as root, security purposes #
1072 drop_privileges()
1073
1074 while 1:
1075 try:
1076 # Start Server #
1077 tornado.ioloop.IOLoop.instance().start()
1078 except SSL.SSLError:
1079 logger.debug("\n\nUnexcepted input for AppScale-Secure-Server")
1080 except KeyboardInterrupt:
1081 print "Server interrupted by user, terminating..."
1082 exit(1)
1083
1084if __name__ == '__main__':
1085 #cProfile.run("main(sys.argv[1:])")
1086 main(sys.argv[1:])
01087
=== removed file 'AppDB/appscale_server_no_trans.py'
--- AppDB/appscale_server_no_trans.py 2010-05-11 02:07:26 +0000
+++ AppDB/appscale_server_no_trans.py 1970-01-01 00:00:00 +0000
@@ -1,1117 +0,0 @@
1#!/usr/bin/python
2#
3# Author:
4# Navraj Chohan (nchohan@cs.ucsb.edu)
5# Soo Hwan Park (suwanny@gmail.com)
6# Sydney Pang (pang@cs.ucsb.edu)
7# See LICENSE file
8
9import sys
10import socket
11import os
12import types
13import appscale_datastore
14#import helper_functions
15import SOAPpy
16from dbconstants import *
17import appscale_logger
18import md5
19import random
20import getopt
21from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
22from SocketServer import ThreadingMixIn
23import threading
24
25from google.appengine.api import api_base_pb
26from google.appengine.api import datastore
27from google.appengine.api import datastore_errors
28from google.appengine.api import datastore_types
29from google.appengine.api import users
30from google.appengine.datastore import datastore_pb
31from google.appengine.datastore import datastore_index
32from google.appengine.runtime import apiproxy_errors
33from google.net.proto import ProtocolBuffer
34from google.appengine.datastore import entity_pb
35from google.appengine.ext.remote_api import remote_api_pb
36from SocketServer import BaseServer
37from M2Crypto import SSL
38from drop_privileges import *
39
40import time
41
42DEBUG = False
43APP_TABLE = APPS_TABLE
44USER_TABLE = USERS_TABLE
45DEFAULT_USER_LOCATION = ".flatfile_users"
46DEFAULT_APP_LOCATION = ".flatfile_apps"
47HYPERTABLE_XML_TAG = "Name"
48DEFAULT_DATASTORE = "files"
49DEFAULT_SSL_PORT = 443
50DEFAULT_PORT = 4080
51DEFAULT_ENCRYPTION = 1
52CERT_LOCATION = "/etc/appscale/certs/mycert.pem"
53KEY_LOCATION = "/etc/appscale/certs/mykey.pem"
54SECRET_LOCATION = "/etc/appscale/secret.key"
55VALID_DATASTORES = []
56ERROR_CODES = []
57app_datastore = []
58logOn = False
59logFilePtr = ""
60
61getKeyFromServer = False
62soapServer = "localhost"
63tableServer = ""
64keyPort = 4343
65keySecret = ""
66KEYBLOCKSIZE = "50"
67keyDictionaryLock = None
68keyDictionary = {}
69keyStart = 0
70keyEnd = 0
71
72optimizedQuery = False
73ID_KEY_LENGTH = 64
74tableHashTable = {}
75
76local_server_address = ""
77HandlerClass = ""
78ssl_cert_file = ""
79ssl_key_file = ""
80
81DELETED = "DELETED___"
82"""
83Deleted keys are DELETED/<row_key>
84"""
85
86"""
87keys for tables take the format
88appname/Grandparent:<ID>/Parent:<ID>/Child:<ID>
89for the entity table
90"""
91
92
93class ThreadLogger:
94 def __init__(self, log):
95 self.logger_ = log
96 self.log_lock = threading.Lock()
97
98 def debug(self, string):
99 return
100 self.log_lock.acquire()
101 print string
102 self.logger_.info(string)
103 self.log_lock.release()
104
105logger = appscale_logger.getLogger("pb_server")
106
107
108def getTableName(app_id, kind, version):
109 return app_id + "___" + kind + "___" + version
110
111def getRowKey(app_id, ancestor_list):
112 if ancestor_list == None:
113 logger.debug("Generate row key received null ancestor list")
114 return ""
115
116 key = app_id
117
118 # Note: mysql cannot have \ as the first char in the row key
119 for a in ancestor_list:
120 key += "/"
121 if a.has_type():
122 key += a.type()
123
124 if a.has_id():
125 zero_padded_id = ("0" * (ID_KEY_LENGTH - len(str(a.id())))) + str(a.id())
126 key += ":" + zero_padded_id
127 elif a.has_name():
128 # append _ if the name is a number, prevents collisions of key names
129 if a.name().isdigit():
130 key += ":__key__" + a.name()
131 else:
132 key += ":" + a.name()
133 return key
134
135
136def getRootKey(app_id, ancestor_list):
137 key = app_id # mysql cannot have \ as the first char in the row key
138 a = ancestor_list[0]
139 key += "/"
140
141 # append _ if the name is a number, prevents collisions of key names
142 if a.has_type():
143 key += a.type()
144 else:
145 return None
146
147 if a.has_id():
148 zero_padded_id = ("0" * (ID_KEY_LENGTH - len(str(a.id())))) + str(a.id())
149 key += ":" + zero_padded_id
150 elif a.has_name():
151 if a.name().isdigit():
152 key += ":__key__" + a.name()
153 else:
154 key += ":" + a.name()
155 else:
156 return None
157
158 return key
159
160
161def getRootKeyFromKeyType(app_id, key):
162 ancestor_list = key._Key__reference.path().element_list()
163 return getRootKey(app_id, ancestor_list)
164
165
166def getRowKeyFromKeyType(app_id, key):
167 ancestor_list = key._Key__reference.path().element_list()
168 return getRowKey(app_id, ancestor_list)
169
170
171def getRootKeyFromRef(app_id, ref):
172 if not ref.has_path():
173 return False
174 path = ref.path()
175 element_list = path.element_list()
176 return getRootKey(app_id, element_list)
177
178
179# Version must be an int
180def getJournalKey(key, version):
181 key+="/"
182 zero_padded_version = ("0" * (ID_KEY_LENGTH - len(str(version)))) + str(version)
183 key += zero_padded_version
184 return key
185
186
187def getJournalTable(app_id, appscale_version):
188 return JOURNAL_TABLE + "___" + app_id + "___" + str(appscale_version)
189
190def generateIDFromServer(app_id):
191 global keySecret
192 global keyStart
193 global keyEnd
194 global tableServer
195 key = 0
196 if keyStart != keyEnd:
197 if DEBUG: print "fast track key serving"
198 if DEBUG: print "Key Start: " + str(keyStart)
199 if DEBUG: print "Key End: " + str(keyEnd)
200 key = int(keyStart)
201 if DEBUG: print "key returned: " + str(key)
202 keyStart = int(keyStart) + 1
203 return key
204 else:
205 # Retrieve a new set of keys
206 if DEBUG: print "slow track key serving"
207 keyStart = tableServer.get_key_block(app_id, KEYBLOCKSIZE, keySecret)
208 try:
209 keyStart = int(keyStart)
210 except:
211 # value is not a number
212 return -1
213 keyEnd = keyStart + int(KEYBLOCKSIZE)
214 if DEBUG: print "Key Start: " + str(keyStart)
215 if DEBUG: print "Key End: " + str(keyEnd)
216 key = keyStart
217 keyStart = keyStart + 1
218 if DEBUG: print "key returned: " + str(key)
219 return key
220
221
222def rollback_function(app_id, trans_id, root_key, change_set):
223 pass
224
225
226class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
227 """ Handle requests in a new thread """
228
229
230class AppScaleSecureHandler( BaseHTTPRequestHandler ):
231 """
232 Defines what to do when the webserver receives different types of
233 HTTP requests.
234 """
235 def get_http_err_code( self, message ):
236 print message
237 ret = 599 # 599 is the default code for an unknown error #
238 return ret
239
240 def send_post_http_response( self , http_code , message ):
241 self.send_response( http_code )
242 self.send_header( 'Content-type' , 'text/plain' )
243 self.end_headers()
244 self.wfile.write( message )
245
246 def send_http_err_response( self, err_msg ):
247 ret_http_code = self.get_http_err_code( err_msg )
248 self.send_post_http_response( ret_http_code, err_msg )
249
250 # remote api request
251 # sends back a response
252 def remote_request(self, app_id, appscale_version, http_request_data):
253 apirequest = remote_api_pb.Request(http_request_data)
254 apiresponse = remote_api_pb.Response()
255 response = None
256 errcode = 0
257 errdetail = ""
258 apperror_pb = None
259
260 if not apirequest.has_method():
261 errcode = datastore_pb.Error.BAD_REQUEST
262 errdetail = "Method was not set in request"
263 apirequest.set_method("NOT_FOUND")
264 if not apirequest.has_request():
265 errcode = datastore_pb.Error.BAD_REQUEST
266 errdetail = "Request missing in call"
267 apirequest.set_method("NOT_FOUND")
268 apirequest.clear_request()
269 method = apirequest.method()
270 request_data = apirequest.request()
271 http_request_data = request_data.contents()
272
273 print "REQUEST:",method," AT time",time.time()
274 if method == "Put":
275 response, errcode, errdetail = self.put_request(app_id,
276 appscale_version,
277 http_request_data)
278 elif method == "Get":
279 response, errcode, errdetail = self.get_request(app_id,
280 appscale_version,
281 http_request_data)
282 elif method == "Delete":
283 response, errcode, errdetail = self.delete_request(app_id,
284 appscale_version,
285 http_request_data)
286 elif method == "RunQuery":
287 response, errcode, errdetail = self.run_query(app_id,
288 appscale_version,
289 http_request_data)
290 elif method == "BeginTransaction":
291 response, errcode, errdetail = self.begin_transaction_request(app_id,
292 appscale_version,
293 http_request_data)
294 elif method == "Commit":
295 response, errcode, errdetail = self.commit_transaction_request(app_id,
296 appscale_version,
297 http_request_data)
298 elif method == "Rollback":
299 response, errcode, errdetail = self.rollback_transaction_request(app_id,
300 appscale_version,
301 http_request_data)
302 elif method == "AllocateIds":
303 response, errcode, errdetail = self.allocate_ids_request(app_id,
304 appscale_version,
305 http_request_data)
306 elif method == "CreateIndex":
307 errcode = datastore_pb.Error.PERMISSION_DENIED
308 errdetail = "Create Index is not implemented"
309 logger.debug(errdetail)
310 """
311 response, errcode, errdetail = self.create_index_request(app_id,
312 appscale_version,
313 http_request_data)
314 """
315 elif method == "GetIndices":
316 errcode = datastore_pb.Error.PERMISSION_DENIED
317 errdetail = "GetIndices is not implemented"
318 logger.debug(errdetail)
319 """
320 response, errcode, errdetail = self.get_indices_request(app_id,
321 appscale_version,
322 http_request_data)
323 """
324 elif method == "UpdateIndex":
325 errcode = datastore_pb.Error.PERMISSION_DENIED
326 errdetail = "UpdateIndex is not implemented"
327 logger.debug(errdetail)
328 """
329 response, errcode, errdetail = self.update_index_request(app_id,
330 appscale_version,
331 http_request_data)
332 """
333 elif method == "DeleteIndex":
334 errcode = datastore_pb.Error.PERMISSION_DENIED
335 errdetail = "DeleteIndex is not implemented"
336 logger.debug(errdetail)
337
338 """
339 response, errcode, errdetail = self.delete_index_request(app_id,
340 appscale_version,
341 http_request_data)
342 """
343 else:
344 errcode = datastore_pb.Error.BAD_REQUEST
345 errdetail = "Unknown datastore message"
346 logger.debug(errdetail)
347
348
349 rawmessage = apiresponse.mutable_response()
350 if response:
351 rawmessage.set_contents(response)
352
353 if errcode != 0:
354 apperror_pb = apiresponse.mutable_application_error()
355 apperror_pb.set_code(errcode)
356 apperror_pb.set_detail(errdetail)
357 if errcode != 0:
358 print "REPLY",method," AT TIME",time.time()
359 print "errcode:",errcode
360 print "errdetail:",errdetail
361 self.send_post_http_response( 200 , apiresponse.Encode() )
362
363
364 def run_query(self, app_id, appscale_version, http_request_data):
365 global app_datastore
366 query = datastore_pb.Query(http_request_data)
367 logger.debug("QUERY:%s" % query)
368 results = []
369
370 if not query.has_kind():
371 # Return nothing in case of error #
372 return (api_base_pb.VoidProto().Encode(),
373 datastore_pb.Error.PERMISSION_DENIED,
374 "Kindless queries are not implemented.")
375 else:
376 kind = query.kind()
377 #print "Query kind:",kind
378
379 # Verify validity of the entity name and applicaiton id #
380 # according to the naming sheme for entity tables #
381 #assert kind[-2:] != "__"
382 #assert app_id[-1] != "_"
383
384 # Fetch query from the datastore #
385 table_name = getTableName(app_id, kind, appscale_version)
386 #print "Query using table name:",table_name
387 r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA)
388 #logger.debug("result: %s" % r)
389
390 #err = r[0]
391 if len(r) > 1:
392 results = r[1:]
393 else:
394 results = []
395
396 # odds are versions
397 versions = results[1::2]
398 # evens are encoded entities
399 results = results[0::2]
400 #print "RESULTS:",results
401 #print "VERSIONS:",versions
402 if len(versions) != len(results):
403 return(api_base_pb.VoidProto().Encode(),
404 datastore_pb.Error.INTERNAL_ERROR,
405 'The query had a bad number of results.')
406
407 # convert to objects
408 # Unless its marked as deleted
409 # They are currently strings
410 for index, res in enumerate(results):
411 results[index] = entity_pb.EntityProto(res)
412 results[index] = datastore.Entity._FromPb(results[index])
413
414 logger.debug("====results pre filter====")
415 #logger.debug("%s" % results)
416 if query.has_ancestor():
417 ancestor_path = query.ancestor().path().element_list()
418 def is_descendant(entity):
419 path = entity.key()._Key__reference.path().element_list()
420 return path[:len(ancestor_path)] == ancestor_path
421 results = filter(is_descendant, results)
422
423 operators = {datastore_pb.Query_Filter.LESS_THAN: '<',
424 datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: '<=',
425 datastore_pb.Query_Filter.GREATER_THAN: '>',
426 datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=',
427 datastore_pb.Query_Filter.EQUAL: '==',
428 }
429
430 for filt in query.filter_list():
431 assert filt.op() != datastore_pb.Query_Filter.IN
432
433 prop = filt.property(0).name().decode('utf-8')
434 op = operators[filt.op()]
435
436 def passes(entity):
437 """ Returns True if the entity passes the filter, False otherwise. """
438 entity_vals = entity.get(prop, [])
439 if type(entity_vals) != types.ListType:
440 entity_vals = [entity_vals]
441
442 entity_property_list = [datastore_types.ToPropertyPb(prop, value) for value in entity_vals]
443
444 for entity_prop in entity_property_list:
445 fixed_entity_val = datastore_types.FromPropertyPb(entity_prop)
446
447 for filter_prop in filt.property_list():
448 filter_val = datastore_types.FromPropertyPb(filter_prop)
449 comp = u'%r %s %r' % (fixed_entity_val, op, filter_val)
450 logger.debug('Evaling filter expression "%s"' % comp )
451 if eval(comp):
452 return True
453 return False
454
455 results = filter(passes, results)
456
457 for order in query.order_list():
458 prop = order.property().decode('utf-8')
459 results = [entity for entity in results if prop in entity]
460
461 def order_compare(a, b):
462 """ Return a negative, zero or positive number depending on whether
463 entity a is considered smaller than, equal to, or larger than b,
464 according to the query's orderings. """
465 for o in query.order_list():
466 prop = o.property().decode('utf-8')
467
468 a_values = a[prop]
469 if not isinstance(a_values, types.ListType):
470 a_values = [a_values]
471
472 b_values = b[prop]
473 if not isinstance(b_values, types.ListType):
474 b_values = [b_values]
475
476 cmped = cmp(min(a_values), min(b_values))
477
478 if o.direction() == datastore_pb.Query_Order.DESCENDING:
479 cmped = -cmped
480
481 if cmped != 0:
482 return cmped
483
484 return 0
485
486 results.sort(order_compare)
487
488 if query.has_limit():
489 results = results[:query.limit()]
490 logger.debug("****results after filtering:****")
491 logger.debug("%s" % results)
492
493 results = [ent._ToPb() for ent in results]
494 # Pack Results into a clone of QueryResult #
495 clone_qr_pb = datastore_pb.QueryResult()
496 for res in results:
497 clone_qr_pb.add_result()
498 clone_qr_pb.result_[-1] = res
499
500 clone_qr_pb.clear_cursor()
501 clone_qr_pb.set_more_results( len(results)>0 )
502
503 logger.debug("QUERY_RESULT: %s" % clone_qr_pb)
504 return (clone_qr_pb.Encode(), 0, "")
505
506
507 def begin_transaction_request(self, app_id, appscale_version, http_request_data):
508 transaction_pb = datastore_pb.Transaction()
509 handle = random.randint(1,1000000)
510 print "Begin Trans Handle:",handle
511 transaction_pb.set_handle(handle)
512 return (transaction_pb.Encode(), 0, "")
513
514 def commit_transaction_request(self, app_id, appscale_version, http_request_data):
515 commitres_pb = datastore_pb.CommitResponse()
516 return (commitres_pb.Encode(), 0, "")
517
518
519 def rollback_transaction_request(self, app_id, appscale_version, http_request_data):
520 return (api_base_pb.VoidProto().Encode(), 0, "")
521
522
523 def allocate_ids_request(self, app_id, appscale_version, http_request_data):
524 return (api_base_pb.VoidProto().Encode(),
525 datastore_pb.Error.PERMISSION_DENIED,
526 'Allocation of block ids not implemented.')
527
528
529 # Returns Null on error
530 def getRootKeyFromEntity(self, app_id, entity):
531 key = entity.key()
532 if str(key.__class__) == "google.appengine.datastore.entity_pb.Reference":
533 return getRootKeyFromRef(app_id, key)
534 else:
535 return getRootKeyFromKeyType(app_id, key)
536
537
538 # For transactions
539 # Verifies all puts are apart of the same root entity
540 def getRootKeyFromTransPut(self, app_id, putreq_pb):
541 ent_list = []
542 if putreq_pb.entity_size() > 0:
543 ent_list = putreq_pb.entity_list()
544 first_ent = ent_list[0]
545 expected_root = self.getRootKeyFromEntity(app_id, first_ent)
546 # It is possible that all roots are None
547 # because it is a root that has not gotten a uid
548
549 for e in ent_list:
550 root = self.getRootKeyFromEntity(app_id, e)
551 if root != expected_root:
552 errcode = datastore_pb.Error.BAD_REQUEST
553 errdetail = "All puts must be a part of the same group"
554 return (None, errcode, errdetail)
555
556 return (expected_root, 0, "")
557
558
559 # For transactions
560 # Verifies all puts are apart of the same root entity
561 def getRootKeyFromTransReq(self, app_id, req_pb):
562 if req_pb.key_size() <= 0:
563 errcode = datastore_pb.error.bad_request
564 errdetail = "Bad key listing"
565 return (None, errcode, errdetail)
566
567 key_list = req_pb.key_list()
568 first_key = key_list[0]
569
570 expected_root = getRootKeyFromRef(app_id, first_key)
571 # It is possible that all roots are None
572 # because it is a root that has not gotten a uid
573
574 for k in key_list:
575 root = getRootKeyFromRef(app_id, k)
576 if root != expected_root:
577 errcode = datastore_pb.error.bad_request
578 errdetail = "all transaction gets must be a part of the same group"
579 return (None, errcode, errdetail)
580
581 return (expected_root, 0, "")
582
583 def getRootKeyFromTransGet(self, app_id, get_pb):
584 return self.getRootKeyFromTransReq(app_id, get_pb)
585
586 def getRootKeyFromTransDel(self, app_id, del_pb):
587 return self.getRootKeyFromTransReq(app_id, del_pb)
588
589 def put_request(self, app_id, appscale_version, http_request_data):
590 global app_datastore
591 global keySecret
592 global tableHashTable
593
594 field_name_list = []
595 field_value_list = []
596
597 start_time = time.time()
598 putreq_pb = datastore_pb.PutRequest(http_request_data)
599 logger.debug("RECEIVED PUT_REQUEST %s" % putreq_pb)
600 putresp_pb = datastore_pb.PutResponse( )
601 txn = None
602 root_key = None
603 # Must assign an id if a put is being done in a transaction
604 # and it does not have an id and it is a root
605 for e in putreq_pb.entity_list():
606 # Only dealing with root puts
607 if e.key().path().element_size() == 1:
608 root_path = e.key().path().mutable_element(0)
609 #print "has id:",root_path.has_id(), "has name:",root_path.has_name()
610 if root_path.id() == 0 and not root_path.has_name():
611 #new_key = root_key + "/" + last_path.type()
612 uid = generateIDFromServer(app_id)
613 #print "Assigned uid to new root key:",str(uid)
614 if uid <= 0:
615 return (putresp_pb.Encode(),
616 datastore_pb.Error.INTERNAL_ERROR,
617 'Unable to assign a unique id')
618 root_path.set_id(uid)
619
620 # Gather data from Put Request #
621 #print "Entity list for put:"
622 #print putreq_pb.entity_list()
623 for e in putreq_pb.entity_list():
624
625 for prop in e.property_list() + e.raw_property_list():
626 if prop.value().has_uservalue():
627 obuid = md5.new(prop.value().uservalue().email().lower()).digest()
628 obuid = '1' + ''.join(['%02d' % ord(x) for x in obuid])[:20]
629 prop.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(
630 obuid)
631
632 #################################
633 # Key Assignment for new entities
634 #################################
635 e.mutable_key().set_app(app_id)
636
637 root_type = e.key().path().element(0).type()
638 if root_type[-2:] == "__":
639 return(putresp_pb.Encode(),
640 datastore_pb.Error.PERMISSION_DENIED,
641 "Illegal type name contains reserved delimiters \"__\"")
642
643 last_path = e.key().path().element_list()[-1]
644 uid = last_path.id()
645 kind = last_path.type()
646 # this object has no assigned id thus far
647 if last_path.id() == 0 and not last_path.has_name():
648 if e.key().path().element_size() == 1:
649 root_key = None
650 if root_key:
651 child_key = root_key + "/" + last_path.type()
652 else:
653 child_key = None
654 # if the root is None or the child is None,
655 # then the global counter is used
656 # gen unique id only wants to know if a child exist
657 uid = generateIDFromServer(app_id)
658 if uid <= 0:
659 return(putresp_pb.Encode(),
660 datastore_pb.Error.INTERNAL_ERROR,
661 "Unable to assign id to entity")
662 last_path.set_id(uid)
663 # It may be its own parent
664 group = e.mutable_entity_group()
665 root = e.key().path().element(0)
666 group.add_element().CopyFrom(root)
667 if last_path.has_name():
668 uid = last_path.name()
669 # It may be its own parent
670 group = e.mutable_entity_group()
671 if group.element_size() == 0:
672 root = e.key().path().element(0)
673 group.add_element().CopyFrom(root)
674
675 #######################################
676 # Done with key assignment
677 # Notify Soap Server of any new tables
678 #######################################
679 #print "Putting of type:",kind,"with uid of",str(uid)
680 # insert key
681 table_name = getTableName(app_id, kind, appscale_version)
682 #print "Put Using table name:",table_name
683 # Notify Users/Apps table if a new class is being added
684 if table_name not in tableHashTable:
685 # This is the first time this pbserver has seen this table
686 # Notify the User/Apps server via soap call
687 # This function is reentrant
688 # If the class was deleted, and added a second time there is no
689 # notifying the users/app server of its creation
690 if tableServer.add_class(app_id, kind, keySecret) == "true":
691 tableHashTable[table_name] = 1
692
693 # Store One Entity #
694 logger.debug("put: %s___%s___%s with id: %s" % (app_id,
695 kind,
696 appscale_version,
697 str(uid)))
698
699 row_key = getRowKey(app_id, e.key().path().element_list())
700 inter_time = time.time()
701 logger.debug("Time spent in put before datastore call: " + str(inter_time - start_time))
702
703
704 field_name_list = ENTITY_TABLE_SCHEMA
705 field_value_list = [e.Encode(), NONEXISTANT_TRANSACTION]
706 err, res = app_datastore.put_entity( table_name,
707 row_key,
708 field_name_list,
709 field_value_list)
710
711 if err not in ERROR_CODES:
712 #_trans_set.purge(txn)
713 return (putresp_pb.Encode(),
714 datastore_pb.Error.INTERNAL_ERROR,
715 err)
716
717 putresp_pb.key_list().append(e.key())
718
719 inter_time = time.time()
720 logger.debug("Time spent in put after datastore call: " + str(inter_time - start_time))
721 logger.debug( "PUT_RESPONSE:%s" % putresp_pb)
722 return (putresp_pb.Encode(), 0, "")
723
724
725 def get_request(self, app_id, appscale_version, http_request_data):
726 global app_datastore
727 getreq_pb = datastore_pb.GetRequest(http_request_data)
728 logger.debug("GET_REQUEST: %s" % getreq_pb)
729 getresp_pb = datastore_pb.GetResponse()
730
731
732 for key in getreq_pb.key_list():
733 key.set_app(app_id)
734 last_path = key.path().element_list()[-1]
735
736 if last_path.has_id():
737 entity_id = last_path.id()
738
739 if last_path.has_name():
740 entity_id = last_path.name()
741
742 if last_path.has_type():
743 kind = last_path.type()
744 logger.debug("get: %s___%s___%s %s" % (app_id, kind, appscale_version, str(entity_id)))
745 table_name = getTableName(app_id, kind, appscale_version)
746 row_key = getRowKey(app_id,key.path().element_list())
747 print "Get row key:",row_key
748 r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA )
749 err = r[0]
750 if err not in ERROR_CODES or len(r) != 3:
751 r = ["",None,NONEXISTANT_TRANSACTION]
752 print err
753 entity = r[1]
754 prev_version = long(r[2])
755
756 group = getresp_pb.add_entity()
757 if entity:
758 e_pb = entity_pb.EntityProto( entity )
759 group.mutable_entity().CopyFrom(e_pb)
760
761 # Send Response #
762 print getresp_pb
763 logger.debug("GET_RESPONSE: %s" % getresp_pb)
764 return (getresp_pb.Encode(), 0, "")
765
766 """ Deletes are just PUTs using a sentinal value of DELETED
767 All deleted keys are DELETED/entity_group. This is for
768 rollback to know which entity group a possible failed
769 transaction belongs to.
770 """
771 def delete_request(self, app_id, appscale_version, http_request_data):
772 global app_datastore
773 root_key = None
774 txn = None
775 logger.debug("DeleteRequest Received...")
776 delreq_pb = datastore_pb.DeleteRequest( http_request_data )
777 logger.debug("DELETE_REQUEST: %s" % delreq_pb)
778 delresp_pb = api_base_pb.VoidProto()
779
780 for key in delreq_pb.key_list():
781 key.set_app(app_id)
782 last_path = key.path().element_list()[-1]
783 if last_path.has_type():
784 kind = last_path.type()
785
786 row_key = getRowKey(app_id, key.path().element_list())
787
788
789 table_name = getTableName(app_id, kind, appscale_version)
790 res = app_datastore.delete_row( table_name,
791 row_key)
792 err = res[0]
793 logger.debug("Response from DB for delete request %s" % err)
794 if err not in ERROR_CODES:
795 if DEBUG: print err
796 return (delresp_pb.Encode(),
797 datastore_pb.Error.INTERNAL_ERROR,
798 err + ", Unable to write to journal")
799
800 return (delresp_pb.Encode(), 0, "")
801
802
803 def optimized_delete_request(self, app_id, appscale_version, http_request_data):
804 pass
805 def run_optimized_query(self, app_id, appscale_version, http_request_data):
806 return
807 def optimized_put_request(self, app_id, appscale_version, http_request_data):
808 pass
809
810 def void_proto(self, app_id, appscale_version, http_request_data):
811 resp_pb = api_base_pb.VoidProto()
812 print "Got void"
813 logger.debug("VOID_RESPONSE: %s to void" % resp_pb)
814 return (resp_pb.Encode(), 0, "" )
815
816 def str_proto(self, app_id, appscale_version, http_request_data):
817 str_pb = api_base_pb.StringProto( http_request_data )
818 composite_pb = datastore_pb.CompositeIndices()
819 print "Got a string proto"
820 print str_pb
821 logger.debug("String proto received: %s"%str_pb)
822 logger.debug("CompositeIndex response to string: %s" % composite_pb)
823 return (composite_pb.Encode(), 0, "" )
824
825 def int64_proto(self, app_id, appscale_version, http_request_data):
826 int64_pb = api_base_pb.Integer64Proto( http_request_data )
827 resp_pb = api_base_pb.VoidProto()
828 print "Got a int 64"
829 print int64_pb
830 logger.debug("Int64 proto received: %s"%int64_pb)
831 logger.debug("VOID_RESPONSE to int64: %s" % resp_pb)
832 return (resp_pb.Encode(), 0, "")
833
834 def compositeindex_proto(self, app_id, appscale_version, http_request_data):
835 compindex_pb = entity_pb.CompositeIndex( http_request_data)
836 resp_pb = api_base_pb.VoidProto()
837 print "Got Composite Index"
838 print compindex_pb
839 logger.debug("CompositeIndex proto recieved: %s"%str(compindex_pb))
840 logger.debug("VOID_RESPONSE to composite index: %s" % resp_pb)
841 return (resp_pb.Encode(), 0, "")
842
843# Returns 0 on success, 1 on failure
844 def create_index_tables(self, app_id):
845 global app_datastore
846 """table_name = "__" + app_id + "__" + "kind"
847 columns = ["reference"]
848 print "Building table: " + table_name
849 returned = app_datastore.create_table( table_name, columns )
850 err,res = returned
851 if err not in ERROR_CODES:
852 logger.debug("%s" % err)
853 return 1
854 """
855 table_name = "__" + app_id + "__" + "single_prop_asc"
856 print "Building table: " + table_name
857 columns = ["reference"]
858 returned = app_datastore.create_table( table_name, columns )
859 err,res = returned
860 if err not in ERROR_CODES:
861 logger.debug("%s" % err)
862 return 1
863
864 table_name = "__" + app_id + "__" + "single_prop_desc"
865 print "Building table: " + table_name
866 returned = app_datastore.create_table( table_name, columns )
867 err,res = returned
868 if err not in ERROR_CODES:
869 logger.debug("%s" % err)
870 return 1
871
872 table_name = "__" + app_id + "__" + "composite"
873 print "Building table: " + table_name
874 returned = app_datastore.create_table( table_name, columns )
875 err,res = returned
876 if err not in ERROR_CODES:
877 logger.debug("%s" % err)
878 return 1
879
880 return 0
881
882 ##############
883 # OTHER TYPE #
884 ##############
885 def unknown_request(self, app_id, appscale_version, http_request_data, pb_type):
886 logger.debug("Received Unknown Protocol Buffer %s" % pb_type )
887 print "ERROR: Received Unknown Protocol Buffer <" + pb_type +">.",
888 print "Nothing has been implemented to handle this Protocol Buffer type."
889 print "http request data:"
890 print http_request_data
891 print "http done"
892 self.void_proto(app_id, appscale_version, http_request_data)
893
894 ########################
895 # GET Request Handling #
896 ########################
897 def do_GET( self ):
898 self.send_error( 404 , 'File Not Found: %s' % self.path )
899
900 #########################
901 # POST Request Handling #
902 #########################
903 def do_POST( self ):
904 start_time = time.time()
905 http_request_data = self.rfile.read(int(self.headers.getheader('content-length')))
906 inter_time = time.time()
907 logger.debug("Timing for pre pre env setup:" + " " +
908 str(start_time) + " " + str(inter_time) +
909 " total time: " + str(inter_time - start_time) + "\n")
910 print "intertime - starttime:",(str(inter_time - start_time))
911 pb_type = self.headers.getheader( 'protocolbuffertype' )
912 app_data = self.headers.getheader('appdata')
913 app_data = app_data.split(':')
914 logger.debug("POST len: %d" % len(app_data))
915 inter_time = time.time()
916 logger.debug("Timing for pre env setup:" + " " +
917 str(start_time) + " " + str(inter_time) +
918 " total time: " + str(inter_time - start_time) + "\n")
919
920 if len(app_data) == 5:
921 app_id, user_email, nick_name, auth_domain, appscale_version = app_data
922 os.environ['AUTH_DOMAIN'] = auth_domain
923 os.environ['USER_EMAIL'] = user_email
924 os.environ['USER_NICKNAME'] = nick_name
925 os.environ['APPLICATION_ID'] = app_id
926 elif len(app_data) == 4:
927 app_id, user_email, nick_name, auth_domain = app_data
928 os.environ['AUTH_DOMAIN'] = auth_domain
929 os.environ['USER_EMAIL'] = user_email
930 os.environ['USER_NICKNAME'] = nick_name
931 os.environ['APPLICATION_ID'] = app_id
932 appscale_version = "1"
933 elif len(app_data) == 2:
934 app_id, appscale_version = app_data
935 app_id = app_data[0]
936 os.environ['APPLICATION_ID'] = app_id
937 elif len(app_data) == 1:
938 app_id = app_data[0]
939 os.environ['APPLICATION_ID'] = app_id
940 appscale_version = "1"
941 else:
942 logger.debug("UNABLE TO EXTRACT APPLICATION DATA")
943 return
944
945 # Default HTTP Response Data #
946 logger.debug("For app id: " + app_id)
947 logger.debug("For app version: " + appscale_version)
948 inter_time = time.time()
949 logger.debug("Timing for env setup:" + pb_type + " " +
950 app_id + " " + str(start_time) + " " +
951 str(inter_time) + " total time: " + str(inter_time - start_time) + "\n")
952
953 if pb_type == "Request":
954 self.remote_request(app_id, appscale_version, http_request_data)
955 else:
956 self.unknown_request(app_id, appscale_version, http_request_data, pb_type)
957
958 stop_time = time.time()
959
960 #if logOn == True:
961 #logFilePtr.write(pb_type + " " + app_id + " " +
962 #str(start_time) + " " + str(stop_time) + " total time: " +
963 #str(stop_time - start_time) + "\n")
964
965 logger.debug(pb_type + " " + app_id + " " + str(start_time) + " " +
966 str(stop_time) + " total time: " + str(stop_time - start_time) + "\n")
967
968class AppScaleUnSecureServerThreaded( ThreadingMixIn, HTTPServer):
969 pass
970
971class AppScaleSecureServerThreaded( ThreadingMixIn, HTTPServer ):
972 def __init__( self):
973 global local_server_address
974 global HandlerClass
975 global ssl_cert_file
976 global ssl_key_file
977 BaseServer.__init__( self, local_server_address, HandlerClass )
978 ctx = SSL.Context()
979 ctx.load_cert( ssl_cert_file, ssl_key_file )
980 self.socket = SSL.Connection( ctx )
981 self.server_bind()
982 self.server_activate()
983
984def usage():
985 print "AppScale Server"
986 print
987 print "Options:"
988 print "\t--certificate=<path-to-ssl-certificate>"
989 print "\t--a=<soap server hostname> "
990 print "\t--key for using keys from the soap server"
991 print "\t--type=<hypertable, hbase, cassandra, mysql, mongodb>"
992 print "\t--secret=<secrete to soap server>"
993 print "\t--blocksize=<key-block-size>"
994 print "\t--optimized_query"
995 print "\t--no_encryption"
996def main(argv):
997 global app_datastore
998 global getKeyFromServer
999 global tableServer
1000 global keySecret
1001 global logOn
1002 global logFilePtr
1003 global optimizedQuery
1004 global soapServer
1005 global ERROR_CODES
1006 global VALID_DATASTORES
1007 global KEYBLOCKSIZE
1008 cert_file = CERT_LOCATION
1009 key_file = KEY_LOCATION
1010 db_type = "hypertable"
1011 port = DEFAULT_SSL_PORT
1012 isEncrypted = True
1013 try:
1014 opts, args = getopt.getopt( argv, "c:t:l:s:b:a:k:p:o:n:z",
1015 ["certificate=",
1016 "type=",
1017 "log=",
1018 "secret=",
1019 "blocksize=",
1020 "soap=",
1021 "key",
1022 "port",
1023 "optimized_query",
1024 "no_encryption",
1025 "zoo_keeper"] )
1026 except getopt.GetoptError:
1027 usage()
1028 sys.exit(1)
1029 for opt, arg in opts:
1030 if opt in ("-c", "--certificate"):
1031 cert_file = arg
1032 print "Using cert..."
1033 elif opt in ("-k", "--key" ):
1034 getKeyFromServer = True
1035 print "Using key server..."
1036 elif opt in ("-t", "--type"):
1037 db_type = arg
1038 print "Datastore type: ",db_type
1039 elif opt in ("-s", "--secret"):
1040 keySecret = arg
1041 print "Secret set..."
1042 elif opt in ("-l", "--log"):
1043 logOn = True
1044 logFile = arg
1045 logFilePtr = open(logFile, "w")
1046 logFilePtr.write("# type, app, start, end\n")
1047 elif opt in ("-b", "--blocksize"):
1048 KEYBLOCKSIZE = arg
1049 print "Block size: ",KEYBLOCKSIZE
1050 elif opt in ("-a", "--soap"):
1051 soapServer = arg
1052 elif opt in ("-o", "--optimized_query"):
1053 optimizedQuery = True
1054 elif opt in ("-p", "--port"):
1055 port = int(arg)
1056 elif opt in ("-n", "--no_encryption"):
1057 isEncrypted = False
1058 elif opt in ("-z", "--zoo_keeper"):
1059 print "This version does not use zoo keeper"
1060 exit(1)
1061
1062 app_datastore = appscale_datastore.DatastoreFactory.getDatastore(db_type)
1063 ERROR_CODES = appscale_datastore.DatastoreFactory.error_codes()
1064 VALID_DATASTORES = appscale_datastore.DatastoreFactory.valid_datastores()
1065 if DEBUG: print "ERROR_CODES:"
1066 if DEBUG: print ERROR_CODES
1067 if DEBUG: print "VALID_DATASTORE:"
1068 if DEBUG: print VALID_DATASTORES
1069 if db_type in VALID_DATASTORES:
1070 logger.debug("Using datastore %s" % db_type)
1071 else:
1072 print "Unknown datastore "+ db_type
1073 exit(1)
1074
1075 tableServer = SOAPpy.SOAPProxy("https://" + soapServer + ":" + str(keyPort))
1076
1077 # # Bind Port #
1078 #server = AppScaleSecureServer( ('',DEFAULT_SSL_PORT),
1079 # AppScaleSecureHandler, cert_file, key_file )
1080 #help(ThreadedHTTPServer)
1081 global local_server_address
1082 global HandlerClass
1083 global ssl_cert_file
1084 global ssl_key_file
1085 global keyDictionaryLock
1086
1087 keyDictionaryLock = threading.Lock()
1088 if port == DEFAULT_SSL_PORT and not isEncrypted:
1089 port = DEFAULT_PORT
1090 local_server_address = ('',port)
1091 HandlerClass = AppScaleSecureHandler
1092 ssl_cert_file = cert_file
1093 ssl_key_file = key_file
1094 if isEncrypted:
1095 server = AppScaleSecureServerThreaded()
1096 else:
1097 server = AppScaleUnSecureServerThreaded(local_server_address, HandlerClass)
1098 sa = server.socket.getsockname()
1099 if not db_type == "timesten":
1100 # Stop running as root, security purposes #
1101 drop_privileges()
1102 logger.debug("\n\nStarting AppScale-Secure-Server on %s:%s" % (sa[0], sa[1]))
1103
1104 while 1:
1105 try:
1106 # Start Server #
1107 server.serve_forever()
1108 except SSL.SSLError:
1109 logger.debug("\n\nUnexcepted input for AppScale-Secure-Server on %s:%s" % (sa[0], sa[1]))
1110 except KeyboardInterrupt:
1111 server.socket.close()
1112 print "Server interrupted by user, terminating..."
1113 exit(1)
1114
1115if __name__ == '__main__':
1116 #cProfile.run("main(sys.argv[1:])")
1117 main(sys.argv[1:])
11180
=== removed file 'AppDB/as_transaction.py'
--- AppDB/as_transaction.py 2010-05-10 21:08:10 +0000
+++ AppDB/as_transaction.py 1970-01-01 00:00:00 +0000
@@ -1,241 +0,0 @@
1import threading
2import time
3import sys
4from google.net.proto import ProtocolBuffer
5from google.appengine.datastore import datastore_pb
6from google.appengine.runtime import apiproxy_errors
7GC_PERIOD = 100
8TRANS_TIMEOUT = 60
9class ASTransaction:
10 def __init__(self, handle, entity_group = None, lease = 0):
11 self.handle = handle
12 self.group = entity_group
13 self.lease = lease
14 if lease == 0:
15 self.lease = time.time() + TRANS_TIMEOUT
16 self.change_set = None
17
18
19 def addModifiedKey(self, key):
20 if not self.change_set:
21 self.change_set = set()
22 self.change_set.add(key)
23
24
25 def getModifiedKeysCopy(self):
26 if self.change_set == None:
27 return None
28 return self.change_set.copy()
29
30
31 def __str__(self):
32 s = "handle: " + str(self.handle) + "\n"
33 s += "group: " + str(self.group) + "\n"
34 s += "lease:" + str(self.lease) + "\n"
35 s += "change set:" + str(self.change_set)
36 return s
37
38
39 def __def__(self):
40 del self.change_set
41
42
43
44#Thread safe set
45class ASTransSet:
46 def __init__(self):
47 self._txes = {}
48 self._txes_lock = threading.Lock()
49 self.startGC()
50
51 def startGC(self):
52 self.gcthread = threading.Thread(target = self.__gcRunner)
53 self.gcthread.start()
54
55 def __gcRunner(self):
56
57 while 1:
58 time.sleep(GC_PERIOD)
59 del_list = []
60 cur_time = time.time()
61 self._txes_lock.acquire()
62 for ii in self._txes:
63 if self._txes[ii].lease <= cur_time:
64 del_list.append(ii)
65 # iterate backwards to make deletes safe
66 del_list.reverse()
67 for ii in del_list:
68 del self._txes[ii]
69 self._txes_lock.release()
70
71
72 # Takes a transaction_pb
73 # Checks to see if the transaction exist and if
74 # it has timed out
75 def isValid(self, txn):
76 handle = None
77 if not txn.has_handle():
78 return False
79
80 handle = txn.handle()
81 curtime = time.time()
82 self._txes_lock.acquire()
83 if handle not in self._txes:
84 self._txes_lock.release()
85 return False
86 else:
87 if self._txes[handle].lease <= curtime:
88 self._txes_lock.release()
89 return False
90 self._txes_lock.release()
91
92 return True
93
94
95 # Takes a transaction_pb
96 def needsLock(self, txn):
97 handle = txn.handle()
98 self._txes_lock.acquire()
99 txn_copy = self._txes[handle]
100 if txn_copy.group == None:
101 self._txes_lock.release()
102 return True
103 else:
104 self._txes_lock.release()
105 return False
106
107
108 def hasLockExpired(self, txn):
109 cur_time = time.time()
110 handle = txn.handle()
111 self._txes_lock.acquire()
112 txn_copy = self._txes[handle]
113 if txn_copy.lease <= cur_time:
114 print "Cur",str(cur_time),"lease",str(txn_copy.lease)
115 self._txes_lock.release()
116 return True
117 else:
118 self._txes_lock.release()
119 return False
120
121
122 # Takes a transaction_pb
123 def purge(self, txn):
124 if not txn:
125 return
126 handle = txn.handle()
127 self._txes_lock.acquire()
128 if handle in self._txes:
129 del self._txes[handle]
130 self._txes_lock.release()
131
132
133 # Takes a transaction_pb
134 def getGroup(self, txn):
135 handle = txn.handle()
136 group = None
137 self._txes_lock.acquire()
138 if handle in self._txes:
139 group = self._txes[handle].group
140 self._txes_lock.release()
141 return group
142
143 # Takes a transaction_pb
144 def add(self, txn):
145 handle = txn.handle()
146 tx = ASTransaction(handle)
147 self._txes_lock.acquire()
148 if handle in self._txes:
149 self._txes_lock.release()
150 return False
151 self._txes[handle] = tx
152 self._txes_lock.release()
153 return True
154
155
156 def setLease(self, txn, lease):
157 handle = txn.handle()
158 self._txes_lock.acquire()
159 txn_copy = self._txes[handle]
160 txn_copy.lease = lease
161 self._txes[handle] = txn_copy
162 self._txes_lock.release()
163
164
165 def addChangeSet(self, txn, key):
166 handle = txn.handle()
167 self._txes_lock.acquire()
168 txn_copy = self._txes[handle]
169 txn_copy.addModifiedKey(key)
170 self._txes[handle] = txn_copy
171 self._txes_lock.release()
172
173
174 def getChangeSet(self, txn):
175 handle = txn.handle()
176 self._txes_lock.acquire()
177 txn_copy = self._txes[handle]
178 change_set = txn_copy.getModifiedKeysCopy()
179 self._txes_lock.release()
180 return change_set
181
182
183 def setGroup(self, txn, group):
184 handle = txn.handle()
185 self._txes_lock.acquire()
186 txn_copy = self._txes[handle]
187 txn_copy.group = group
188 self._txes[handle] = txn_copy
189 self._txes_lock.release()
190
191 def printSet(self):
192 print "============================="
193 for ii in self._txes:
194 print self._txes[ii]
195 print "============================="
196
197
198######################
199# Unit testing
200######################
201def main(argv):
202 tran_pb = datastore_pb.Transaction()
203 tran_pb.set_handle(500)
204 tran_set = ASTransSet()
205 tran_set.add(tran_pb)
206 print "Needs the lock without the group set:" + str(tran_set.needsLock(tran_pb))
207 tran_set.setGroup(tran_pb, "my_group")
208 print "Does not Need lock with the group set:" + str(tran_set.needsLock(tran_pb))
209 tran_set.setLease(tran_pb, time.time() + .02)
210 tran_set.addChangeSet(tran_pb, "first key")
211 tran_set.addChangeSet(tran_pb, "second key")
212 tran_set.printSet()
213 print "Has the lock expired (no)?:" + str(tran_set.hasLockExpired(tran_pb))
214 time.sleep(1)
215 print "Has the lock expired (yes)?:" + str(tran_set.hasLockExpired(tran_pb))
216 print "******************"
217
218 tran_pb = datastore_pb.Transaction()
219 tran_pb.set_handle(600)
220 tran_set.add(tran_pb)
221 print "Needs the lock without the group set:" + str(tran_set.needsLock(tran_pb))
222 tran_set.setGroup(tran_pb, "my_group")
223 print "Does not Need lock with the group set:" + str(tran_set.needsLock(tran_pb))
224 print "Group is: " + str(tran_set.getGroup(tran_pb))
225 tran_set.printSet()
226 print "******************"
227 print "Removing 600"
228 print "******************"
229
230 tran_set.purge(tran_pb)
231 tran_set.printSet()
232 tran_pb.set_handle(500)
233 print "500 valid (yes) ?: " + str(tran_set.isValid(tran_pb))
234 change_set = tran_set.getChangeSet(tran_pb)
235 print "Change set: " + str(change_set)
236 tran_set.purge(tran_pb)
237 print "Change set after purge (same): " + str(change_set)
238
239
240if __name__ == '__main__':
241 main(sys.argv[1:])
2420
=== modified file 'AppDB/cassandra/py_cassandra.py'
--- AppDB/cassandra/py_cassandra.py 2010-05-21 07:13:04 +0000
+++ AppDB/cassandra/py_cassandra.py 2010-06-29 01:49:23 +0000
@@ -1,11 +1,9 @@
1#1#
2# Cassandra Interface for AppScale2# Cassandra Interface for AppScale
3#3# Rewritten by Navraj Chohan for using range queries
4# Modified by Chris Bunch for upgrade to Cassandra 0.50.04# Modified by Chris Bunch for upgrade to Cassandra 0.50.0
5# on 2/17/105# on 2/17/10
66# Original author: suwanny@gmail.com
7__author__="Soo Hwan Park (suwanny@gmail.com)"
8__date__="$2009.6.4 19:44:00$"
97
10import os,sys8import os,sys
11import time9import time
@@ -14,11 +12,9 @@
14from thrift_cass.ttypes import *12from thrift_cass.ttypes import *
1513
16import string14import string
17import sys, os
18import base64 # base64 2009.04.1615import base64 # base64 2009.04.16
19from dbconstants import *16from dbconstants import *
20from dbinterface import *17from dbinterface import *
21from dhash_datastore import *
22import sqlalchemy.pool as pool18import sqlalchemy.pool as pool
23import appscale_logger19import appscale_logger
2420
@@ -26,9 +22,12 @@
26from thrift.transport import TSocket22from thrift.transport import TSocket
27from thrift.transport import TTransport23from thrift.transport import TTransport
28from thrift.protocol import TBinaryProtocol24from thrift.protocol import TBinaryProtocol
29
30ERROR_DEFAULT = "DB_ERROR:" # ERROR_CASSANDRA25ERROR_DEFAULT = "DB_ERROR:" # ERROR_CASSANDRA
3126# Store all schema information in a special table
27# If a table does not show up in this table, try a range query
28# to discover it's schema
29SCHEMA_TABLE = "__key__"
30SCHEMA_TABLE_SCHEMA = ['schema']
32# use 1 Table and 1 ColumnFamily in Cassandra31# use 1 Table and 1 ColumnFamily in Cassandra
33MAIN_TABLE = "Keyspace1"32MAIN_TABLE = "Keyspace1"
34COLUMN_FAMILY = "Standard1"33COLUMN_FAMILY = "Standard1"
@@ -44,71 +43,219 @@
44CONSISTENCY_QUORUM = 243CONSISTENCY_QUORUM = 2
45CONSISTENCY_ALL = 5 # don't use this for reads (next version may fix this)44CONSISTENCY_ALL = 5 # don't use this for reads (next version may fix this)
4645
47class DatastoreProxy(DHashDatastore):46MAX_ROW_COUNT = 10000000
47table_cache = {}
48class DatastoreProxy(AppDBInterface):
48 def __init__(self, logger = appscale_logger.getLogger("datastore-cassandra")):49 def __init__(self, logger = appscale_logger.getLogger("datastore-cassandra")):
49 DHashDatastore.__init__(self, logger)
50 # TODO: is this correct?50 # TODO: is this correct?
51 f = open(APPSCALE_HOME + '/.appscale/my_public_ip', 'r')51 f = open(APPSCALE_HOME + '/.appscale/my_public_ip', 'r')
52 self.host = f.read()52 self.host = f.read()
53# self.host = host53# self.host = host
54 self.port = DEFAULT_PORT54 self.port = DEFAULT_PORT
55 self.logger.debug("Cassandra connection init to %s:%d" % (self.host, self.port))
56 self.pool = pool.QueuePool(self.__create_connection, reset_on_return=False)55 self.pool = pool.QueuePool(self.__create_connection, reset_on_return=False)
56 self.logger = logger
5757
58 def logTiming(self, function, start_time, end_time):58 def logTiming(self, function, start_time, end_time):
59 if PROFILING:59 if PROFILING:
60 self.logger.debug(function + ": " + str(end_time - start_time) + " s")60 self.logger.debug(function + ": " + str(end_time - start_time) + " s")
6161
62 ######################################################################62 def get_entity(self, table_name, row_key, column_names):
63 # Cassandra specific methods 63 error = [ERROR_DEFAULT]
64 ######################################################################64 list = error
6565 client = None
66 def get(self, key, column = DEFAULT_VALUE):66 row_key = table_name + '/' + row_key
67 value = None67 try:
68 client = None68 slice_predicate = SlicePredicate(column_names=column_names)
69 try: 69 path = ColumnPath(COLUMN_FAMILY)
70 path = ColumnPath(COLUMN_FAMILY, column=column)70 client = self.__setup_connection()
71 client = self.__setup_connection()71 # Result is a column type which has name, value, timestamp
72 response = client.get(MAIN_TABLE, key, path, CONSISTENCY_QUORUM) 72 result = client.get_slice(MAIN_TABLE, row_key, path, slice_predicate,
73# transport.close()73 CONSISTENCY_QUORUM)
7474 for column in column_names:
75 value = response.column.value75 for r in result:
76 except NotFoundException: # occurs normally if the item isn't in the db76 c = r.column
77 return value77 if column == c.name:
78 except Exception, ex:78 list.append(c.value)
79 self.logger.debug("Exception %s" % ex)79 except NotFoundException: # occurs normally if the item isn't in the db
80 self.__close_connection(client)80 list[0] += "Not found"
81 return value81 self.__close_connection(client)
8282 return list
83 def put(self, key, value, column = DEFAULT_VALUE):83 except Exception, ex:
84 client = None84 #self.logger.debug("Exception %s" % ex)
85 try:85 list[0]+=("Exception: %s"%ex)
86 path = ColumnPath(COLUMN_FAMILY, column=column)86 self.__close_connection(client)
87 client = self.__setup_connection()87 return list
88 client.insert(MAIN_TABLE, key, path, value, self.timestamp(), CONSISTENCY_QUORUM)88 self.__close_connection(client)
89# transport.close()89 if len(list) == 1:
90 except Exception, e :90 list[0] += "Not found"
91 self.logger.debug("put key:%s, column: %s value:%s" % (key, column, value))91 return list
92 self.__close_connection(client)92
9393
94 def remove(self, key, column = ""):94 def put_entity(self, table_name, row_key, column_names, cell_values):
95 client = None95 error = [ERROR_DEFAULT]
96 try: 96 list = error
97 client = self.__setup_connection()97 client = None
9898
99 # FIXME: cgb: you'd think that you could just remove the if / else99 # The first time a table is seen
100 # here, but for some reason it messes up delete if you do100 if table_name not in table_cache:
101 # look into why that is101 self.create_table(table_name, column_names)
102 if column:102
103 path = ColumnPath(COLUMN_FAMILY, column=column)103 row_key = table_name + '/' + row_key
104 client.remove(MAIN_TABLE, key, path, self.timestamp(), CONSISTENCY_QUORUM)104 try:
105 else:105 client = self.__setup_connection()
106 path = ColumnPath(COLUMN_FAMILY)106 curtime = self.timestamp()
107 client.remove(MAIN_TABLE, key, path, self.timestamp(), CONSISTENCY_QUORUM)107 # Result is a column type which has name, value, timestamp
108108 mutations = []
109 except Exception, ex: 109 for index, ii in enumerate(column_names):
110 self.logger.debug("Exception %s" % ex)110 column = Column(name = ii, value=cell_values[index],
111 self.__close_connection(client)111 timestamp=curtime)
112 c_or_sc = ColumnOrSuperColumn(column=column)
113 mutation = Mutation(column_or_supercolumn=c_or_sc)
114 mutations.append(mutation)
115 mutation_map = {row_key : { COLUMN_FAMILY : mutations } }
116 client.batch_mutate(MAIN_TABLE, mutation_map,
117 CONSISTENCY_QUORUM)
118 except Exception, ex:
119 self.logger.debug("Exception %s" % ex)
120 list[0]+=("Exception: %s"%ex)
121 self.__close_connection(client)
122 list.append("0")
123 return list
124 self.__close_connection(client)
125 list.append("0")
126 return list
127
128 def put_entity_dict(self, table_name, row_key, value_dict):
129 raise NotImplementedError("put_entity_dict is not implemented in %s." % self.__class__)
130
131
132 def get_table(self, table_name, column_names):
133 error = [ERROR_DEFAULT]
134 client = None
135 result = error
136 keyslices = []
137 column_parent = ColumnParent(column_family="Standard1")
138 predicate = SlicePredicate(column_names=column_names)
139 start_key = table_name
140 end_key = table_name + '~'
141 try:
142 client = self.__setup_connection()
143 keyslices = client.get_range_slice(MAIN_TABLE,
144 column_parent,
145 predicate,
146 start_key,
147 end_key,
148 MAX_ROW_COUNT,
149 CONSISTENCY_QUORUM)
150 except Exception, ex:
151 self.logger.debug("Exception %s" % ex)
152 result[0] += "Exception: " + str(ex)
153 self.__close_connection(client)
154 return result
155 for keyslice in keyslices:
156 ordering_dict = {}
157 for c in keyslice.columns:
158 column = c.column
159 value = column.value
160 ordering_dict[column.name] = value
161 if len(ordering_dict) == 0:
162 continue
163 for column in column_names:
164 try:
165 result.append(ordering_dict[column])
166 except:
167 result[0] += "Key error, get_table did not return the correct schema"
168 self.__close_connection(client)
169 return result
170
171 def delete_row(self, table_name, row_key):
172 error = [ERROR_DEFAULT]
173 ret = error
174 client = None
175 row_key = table_name + '/' + row_key
176 path = ColumnPath(COLUMN_FAMILY)
177 try:
178 client = self.__setup_connection()
179 curtime = self.timestamp()
180 # Result is a column type which has name, value, timestamp
181 client.remove(MAIN_TABLE, row_key, path, curtime,
182 CONSISTENCY_QUORUM)
183 except Exception, ex:
184 self.logger.debug("Exception %s" % ex)
185 ret[0]+=("Exception: %s"%ex)
186 self.__close_connection(client)
187 return ret
188 self.__close_connection(client)
189 ret.append("0")
190 return ret
191
192 def get_schema(self, table_name):
193 error = [ERROR_DEFAULT]
194 result = error
195 ret = self.get_entity(SCHEMA_TABLE,
196 table_name,
197 SCHEMA_TABLE_SCHEMA)
198 if len(ret) > 1:
199 schema = ret[1]
200 else:
201 error[0] = ret[0] + "--unable to get schema"
202 return error
203 schema = schema.split(':')
204 result = result + schema
205 return result
206
207
208 def delete_table(self, table_name):
209 error = [ERROR_DEFAULT]
210 result = error
211 keyslices = []
212 column_parent = ColumnParent(column_family="Standard1")
213 predicate = SlicePredicate(column_names=[])
214 curtime = self.timestamp()
215 path = ColumnPath(COLUMN_FAMILY)
216 start_key = table_name
217 end_key = table_name + '~'
218 try:
219 client = self.__setup_connection()
220 keyslices = client.get_range_slice(MAIN_TABLE,
221 column_parent,
222 predicate,
223 start_key,
224 end_key,
225 MAX_ROW_COUNT,
226 CONSISTENCY_QUORUM)
227 except Exception, ex:
228 self.logger.debug("Exception %s" % ex)
229 result[0]+=("Exception: %s"%ex)
230 self.__close_connection(client)
231 return result
232 keys_removed = False
233 for keyslice in keyslices:
234 row_key = keyslice.key
235 client.remove(MAIN_TABLE,
236 row_key,
237 path,
238 curtime,
239 CONSISTENCY_QUORUM)
240 keys_removed = True
241 if table_name not in table_cache and keys_removed:
242 result[0] += "Table does not exist"
243 return result
244 if table_name in table_cache:
245 del table_cache[table_name]
246
247 self.__close_connection(client)
248 return result
249
250 # Only stores the schema
251 def create_table(self, table_name, column_names):
252 table_cache[table_name] = 1
253 columns = ':'.join(column_names)
254 row_key = table_name
255 # Get and make sure we are not overwriting previous schemas
256 ret = self.get_entity(SCHEMA_TABLE, row_key, SCHEMA_TABLE_SCHEMA)
257 if ret[0] != ERROR_DEFAULT:
258 self.put_entity(SCHEMA_TABLE, row_key, SCHEMA_TABLE_SCHEMA, [columns])
112259
113 ######################################################################260 ######################################################################
114 # private methods 261 # private methods
115262
=== modified file 'AppDB/cassandra/templates/storage-conf.xml'
--- AppDB/cassandra/templates/storage-conf.xml 2010-04-21 11:16:18 +0000
+++ AppDB/cassandra/templates/storage-conf.xml 2010-06-29 01:49:23 +0000
@@ -178,7 +178,7 @@
178 ~ directories, since the partitioner can modify the sstable on-disk178 ~ directories, since the partitioner can modify the sstable on-disk
179 ~ format.179 ~ format.
180 -->180 -->
181 <Partitioner>org.apache.cassandra.dht.RandomPartitioner</Partitioner>181 <Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>
182182
183 <!--183 <!--
184 ~ If you are using an order-preserving partitioner and you know your key184 ~ If you are using an order-preserving partitioner and you know your key
185185
=== modified file 'AppDB/datastore_tester.py'
--- AppDB/datastore_tester.py 2009-12-02 20:15:14 +0000
+++ AppDB/datastore_tester.py 2010-06-29 01:49:23 +0000
@@ -41,10 +41,10 @@
41#print "columns= " + str(columns)41#print "columns= " + str(columns)
42#print "data= " + str(data)42#print "data= " + str(data)
43print "table= " + table_name43print "table= " + table_name
44app_datastore = appscale_datastore.Datastore(datastore_type)44app_datastore = appscale_datastore.DatastoreFactory.getDatastore(datastore_type)
4545ERROR_CODES = appscale_datastore.DatastoreFactory.error_codes()
46ERROR_CODES = app_datastore.error_codes()46VALID_DATASTORES = appscale_datastore.DatastoreFactory.valid_datastores()
47if datastore_type not in app_datastore.valid_datastores():47if datastore_type not in VALID_DATASTORES:
48 print "Bad selection for datastore. Valid selections are:"48 print "Bad selection for datastore. Valid selections are:"
49 print app_datastore.valid_datastores()49 print app_datastore.valid_datastores()
50 exit(1)50 exit(1)
@@ -127,10 +127,12 @@
127#################################################127#################################################
128# Get and a delete on a table that does not exist128# Get and a delete on a table that does not exist
129#################################################129#################################################
130# There is too much overhead in checking to see if the table exists
131# for cassandra
130invalid_table = hf.randomString(10)132invalid_table = hf.randomString(10)
131ret = app_datastore.delete_row(invalid_table, key)133#ret = app_datastore.delete_row(invalid_table, key)
132if ret[0] in ERROR_CODES:134#if ret[0] in ERROR_CODES:
133 err(hf.lineno(), ret)135# err(hf.lineno(), ret)
134ret = app_datastore.get_entity(invalid_table, key, columns)136ret = app_datastore.get_entity(invalid_table, key, columns)
135if ret[0] in ERROR_CODES:137if ret[0] in ERROR_CODES:
136 err(hf.lineno(), ret)138 err(hf.lineno(), ret)
@@ -186,9 +188,19 @@
186# Get schema on a table that exist, and one that doesnt188# Get schema on a table that exist, and one that doesnt
187#######################################################189#######################################################
188ret = app_datastore.get_schema(table_name)190ret = app_datastore.get_schema(table_name)
189if ret[0] not in ERROR_CODES or ret[1:] != columns:191if ret[0] not in ERROR_CODES or (ret[1:]).sort() != columns.sort():
192 print "ret[1:]:",ret[1:].sort
193 print "columns:",columns.sort
190 err(hf.lineno(), ret)194 err(hf.lineno(), ret)
191ret = app_datastore.get_schema(invalid_table)195ret = app_datastore.get_schema(invalid_table)
192if ret[0] in ERROR_CODES:196if ret[0] in ERROR_CODES:
193 err(hf.lineno(), ret)197 err(hf.lineno(), ret)
198################################################
199# Get data from a table that does not exist
200# Should return an empty list
201################################################
202ret = app_datastore.get_table(invalid_table, columns)
203if ret[0] not in ERROR_CODES and len(ret) != 1:
204 err(hf.lineno(), ret)
205
194print "SUCCESS"206print "SUCCESS"
195207
=== modified file 'AppDB/dbinterface.py'
--- AppDB/dbinterface.py 2010-06-25 00:08:24 +0000
+++ AppDB/dbinterface.py 2010-06-29 01:49:23 +0000
@@ -7,19 +7,19 @@
7import os7import os
88
9class AppDBInterface:9class AppDBInterface:
10 def get_entity(self, table_name, row_key, column_names): 10 def get_entity(self, table_name, row_key, column_names, txnid = 0):
11 raise NotImplementedError("get_entity is not implemented in %s." % self.__class__)11 raise NotImplementedError("get_entity is not implemented in %s." % self.__class__)
1212
13 def put_entity(self, table_name, row_key, column_names, cell_values):13 def put_entity(self, table_name, row_key, column_names, cell_values, txnid = 0):
14 raise NotImplementedError("put_entity is not implemented in %s." % self.__class__)14 raise NotImplementedError("put_entity is not implemented in %s." % self.__class__)
1515
16 def put_entity_dict(self, table_name, row_key, value_dict):16 def put_entity_dict(self, table_name, row_key, value_dict):
17 raise NotImplementedError("put_entity_dict is not implemented in %s." % self.__class__)17 raise NotImplementedError("put_entity_dict is not implemented in %s." % self.__class__)
1818
19 def get_table(self, table_name, column_names):19 def get_table(self, table_name, column_names, txnid = 0):
20 raise NotImplementedError("get_table is not implemented in %s." % self.__class__)20 raise NotImplementedError("get_table is not implemented in %s." % self.__class__)
2121
22 def delete_row(self, table_name, row_id):22 def delete_row(self, table_name, row_id, txnid = 0):
23 raise NotImplementedError("delete_row is not implemented in %s." % self.__class__)23 raise NotImplementedError("delete_row is not implemented in %s." % self.__class__)
2424
25 def get_schema(self, table_name):25 def get_schema(self, table_name):
@@ -28,6 +28,14 @@
28 def delete_table(self, table_name):28 def delete_table(self, table_name):
29 raise NotImplementedError("delete_table is not implemented in %s." % self.__class__)29 raise NotImplementedError("delete_table is not implemented in %s." % self.__class__)
3030
31 def commit(self, txnid):
32 raise NotImplementedError("commit is not implemented in %s." % self.__class__)
33 def rollback(self, txnid):
34 raise NotImplementedError("rollback is not implemented in %s." % self.__class__)
35 def setupTransaction(self, txnid):
36 raise NotImplementedError("rollback is not implemented in %s." % self.__class__)
37
38
31 def get_local_ip(self):39 def get_local_ip(self):
32 try:40 try:
33 local_ip = self.__local_ip41 local_ip = self.__local_ip
3442
=== modified file 'AppDB/dhash_datastore.py'
--- AppDB/dhash_datastore.py 2010-05-05 23:27:52 +0000
+++ AppDB/dhash_datastore.py 2010-06-29 01:49:23 +0000
@@ -217,9 +217,9 @@
217 if self.remove_key(row_id, table_name):217 if self.remove_key(row_id, table_name):
218 self.remove(internal_key)218 self.remove(internal_key)
219 return elist219 return elist
220 else:220 #else:
221 elist[0] += "row doesn't exist"221 elist.append("0")
222 return elist222 return elist
223223
224 def get_schema(self, table_name):224 def get_schema(self, table_name):
225 self.logger.debug("get_schema: %s" % table_name) 225 self.logger.debug("get_schema: %s" % table_name)
226226
=== modified file 'AppDB/hbase/py_hbase.py'
--- AppDB/hbase/py_hbase.py 2010-05-21 07:13:04 +0000
+++ AppDB/hbase/py_hbase.py 2010-06-29 01:49:23 +0000
@@ -1,11 +1,9 @@
1#Author: Navraj Chohan1#Author: Navraj Chohan
22
3import os,sys3import os
44
5import Hbase5import Hbase
6import ttypes6import ttypes
7import string
8import threading
9import time7import time
10from thrift import Thrift8from thrift import Thrift
11from thrift.transport import TSocket9from thrift.transport import TSocket
@@ -85,15 +83,12 @@
85 column_names = client.getColumnDescriptors(table_name)83 column_names = client.getColumnDescriptors(table_name)
86 keys = column_names.keys()84 keys = column_names.keys()
8785
88 table = get_table(table_name, keys)86 table = self.get_table(table_name, keys)
89 value = (len(table) - 1)/(len(keys))87 value = (len(table) - 1)/(len(keys))
90 elist.append(value)88 elist.append(value)
91 except ttypes.IOError, io:89 except ttypes.IOError, io:
92 elist[0] = elist[0] + "Row Count. IO Error--" + io.message90 elist[0] = elist[0] + "Row Count. IO Error--" + io.message
93 value = 091 value = 0
94 except ttypes.IOException, io:
95 elist[0] = elist[0] + "Row Count. IO Exception--" + io.message
96 value = 0
97 self.__closeConnection(client)92 self.__closeConnection(client)
98 et = time.time()93 et = time.time()
99 self.logTiming("HB ROWCOUNT", st, et)94 self.logTiming("HB ROWCOUNT", st, et)
@@ -272,7 +267,10 @@
272 client.scannerClose(scanner) 267 client.scannerClose(scanner)
273 except ttypes.IOError, io:268 except ttypes.IOError, io:
274 if io.message:269 if io.message:
275 elist[0] += "IO Error--" + str(io.message)270 if io.message == table_name:
271 pass # Return an empty table
272 else:
273 elist[0] += "IO Error--" + str(io.message)
276 else:274 else:
277 elist[0] += "IOError"275 elist[0] += "IOError"
278 except ttypes.IllegalArgument, e:276 except ttypes.IllegalArgument, e:
279277
=== modified file 'AppDB/helper_functions.py'
--- AppDB/helper_functions.py 2009-12-01 07:13:55 +0000
+++ AppDB/helper_functions.py 2010-06-29 01:49:23 +0000
@@ -57,7 +57,7 @@
5757
58def randomString(length):58def randomString(length):
59 s = hashlib.sha256()59 s = hashlib.sha256()
60 ret = ""60 ret = "a"
61 while len(ret) < length:61 while len(ret) < length:
62 s.update(str(random.random()))62 s.update(str(random.random()))
63 ret += s.hexdigest()63 ret += s.hexdigest()
6464
=== modified file 'AppDB/hypertable/py_hypertable.py'
--- AppDB/hypertable/py_hypertable.py 2010-05-21 07:13:04 +0000
+++ AppDB/hypertable/py_hypertable.py 2010-06-29 01:49:23 +0000
@@ -142,6 +142,8 @@
142 if PROFILING:142 if PROFILING:
143 self.logger.debug("HT GET: %s"%str(endtime - starttime))143 self.logger.debug("HT GET: %s"%str(endtime - starttime))
144 self.__closeConnection(client)144 self.__closeConnection(client)
145 if len(elist) == 1:
146 elist[0] += "Not Found"
145 return elist147 return elist
146148
147 def delete_table(self, table_name):149 def delete_table(self, table_name):
@@ -173,7 +175,10 @@
173 if not self.__table_exist(table_name):175 if not self.__table_exist(table_name):
174 query = "create table " + table_name + "( " + ', '.join(column_names) + ")"176 query = "create table " + table_name + "( " + ', '.join(column_names) + ")"
175 print "create table query=%s" % query177 print "create table query=%s" % query
176 ret = client.hql_query(query)178 try:
179 ret = client.hql_query(query)
180 except Exception, e:
181 print e.message
177# the xml schema is not working currectly.182# the xml schema is not working currectly.
178# schema = "<Schema><AccessGroup name=\"default\"><ColumnFamily>"183# schema = "<Schema><AccessGroup name=\"default\"><ColumnFamily>"
179# for column in column_names:184# for column in column_names:
@@ -269,7 +274,8 @@
269 if res[ii].column_family in column_names:274 if res[ii].column_family in column_names:
270 elist += [res[ii].value]275 elist += [res[ii].value]
271 except:276 except:
272 elist[0] += "Not Found"277 pass
278 #elist[0] += "Not Found"
273 endtime = time.time()279 endtime = time.time()
274 if PROFILING:280 if PROFILING:
275 self.logger.debug("HT GET_TABLE: %s"%str(endtime - starttime))281 self.logger.debug("HT GET_TABLE: %s"%str(endtime - starttime))
276282
=== modified file 'AppDB/mongodb/py_mongodb.py'
--- AppDB/mongodb/py_mongodb.py 2010-04-08 23:58:23 +0000
+++ AppDB/mongodb/py_mongodb.py 2010-06-29 01:49:23 +0000
@@ -74,6 +74,8 @@
7474
75 elist.append("0")75 elist.append("0")
76 self.__close_connection(db)76 self.__close_connection(db)
77 if len(elist) == 1:
78 elist[0] += "Not found"
77 return elist79 return elist
7880
79 def create_table(self, table_name, columns):81 def create_table(self, table_name, columns):
@@ -166,6 +168,8 @@
166 if column == schema[ii]:168 if column == schema[ii]:
167 elist.append(data[ii])169 elist.append(data[ii])
168 self.__close_connection(db)170 self.__close_connection(db)
171 if len(elist) == 1:
172 elist[0] += "Not found"
169 return elist173 return elist
170174
171 def get_schema(self, table_name):175 def get_schema(self, table_name):
@@ -222,8 +226,8 @@
222 usedRows.append(myRow)226 usedRows.append(myRow)
223227
224228
225 if(len(elist) == 1):229 #if len(elist) == 1:
226 elist[0] += "table not found"230 # elist[0] += "table not found"
227231
228 self.__close_connection(db)232 self.__close_connection(db)
229 return elist233 return elist
230234
=== modified file 'AppDB/mysql/drop_all_tables.py'
--- AppDB/mysql/drop_all_tables.py 2009-07-20 08:49:28 +0000
+++ AppDB/mysql/drop_all_tables.py 2010-06-29 01:49:23 +0000
@@ -21,7 +21,7 @@
21SLAVES_FILE = APPSCALE_HOME + "/.appscale/slaves"21SLAVES_FILE = APPSCALE_HOME + "/.appscale/slaves"
22#DB_LOCATION = socket.gethostbyname(socket.gethostname())22#DB_LOCATION = socket.gethostbyname(socket.gethostname())
23DB_LOCATION = "127.0.0.1"23DB_LOCATION = "127.0.0.1"
24DATABASE = "test"24DATABASE = "appscale"
25def set_db_location():25def set_db_location():
26 global DB_LOCATION26 global DB_LOCATION
27 file = open(SLAVES_FILE, "r")27 file = open(SLAVES_FILE, "r")
@@ -34,6 +34,9 @@
34 cursor = client.cursor()34 cursor = client.cursor()
35 cursor.execute("DROP DATABASE " + DATABASE)35 cursor.execute("DROP DATABASE " + DATABASE)
36 cursor.execute("CREATE DATABASE " + DATABASE) 36 cursor.execute("CREATE DATABASE " + DATABASE)
37 cursor.close()
38 client.commit()
39 client.close()
37 except MySQLdb.Error, e:40 except MySQLdb.Error, e:
38 print e.args[1]41 print e.args[1]
39 return 0 42 return 0
4043
=== modified file 'AppDB/mysql/prime_mysql.py'
--- AppDB/mysql/prime_mysql.py 2010-05-27 21:59:59 +0000
+++ AppDB/mysql/prime_mysql.py 2010-06-29 01:49:23 +0000
@@ -3,18 +3,17 @@
33
4import sys, time4import sys, time
5import os 5import os
6import MySQLdb
7import _mysql
8import socket
9from dbconstants import *
10
611
7APPSCALE_HOME = os.environ.get("APPSCALE_HOME")12APPSCALE_HOME = os.environ.get("APPSCALE_HOME")
8if APPSCALE_HOME:13if APPSCALE_HOME:
9 pass14 pass
10else:15else:
11 print "APPSCALE_HOME env var not set"16 print "APPSCALE_HOME env var not set"
12 APPSCALE_HOME = "/root/appscale/"
13
14import MySQLdb
15import _mysql
16import socket
17from dbconstants import *
1817
19ROW_KEY = "mysql__row_key__"18ROW_KEY = "mysql__row_key__"
20SLAVES_FILE = APPSCALE_HOME + "/.appscale/slaves"19SLAVES_FILE = APPSCALE_HOME + "/.appscale/slaves"
@@ -27,13 +26,23 @@
27 DB_LOCATION = file.readline()26 DB_LOCATION = file.readline()
28 file.close()27 file.close()
2928
29def drop_database():
30 client = MySQLdb.connect(host=DB_LOCATION, db="mysql")
31 cursor = client.cursor()
32 cursor.execute("DROP DATABASE IF EXISTS appscale;")
33 cursor.close()
34 client.commit()
35 client.close()
36
30def create_database():37def create_database():
31 client = MySQLdb.connect(host=DB_LOCATION, db="mysql")38 client = MySQLdb.connect(host=DB_LOCATION, db="mysql")
32 cursor = client.cursor()39 cursor = client.cursor()
33 cursor.execute("drop database if exists appscale;")40 cursor.execute("CREATE DATABASE IF NOT EXISTS appscale;")
34 cursor.execute("create database appscale;")41 cursor.close()
42 client.commit()
35 client.close()43 client.close()
3644
45
37def create_table(tablename, columns):46def create_table(tablename, columns):
38 try:47 try:
39 client = MySQLdb.connect(host=DB_LOCATION, db="appscale")48 client = MySQLdb.connect(host=DB_LOCATION, db="appscale")
@@ -43,13 +52,16 @@
43 columnscopy += ["x" + columns[ii]]52 columnscopy += ["x" + columns[ii]]
44 command = "CREATE TABLE IF NOT EXISTS " + tablename + "( " + ROW_KEY + " CHAR(80) primary key, " + ' MEDIUMBLOB, '.join(columnscopy) + " MEDIUMBLOB) ENGINE=NDBCLUSTER"53 command = "CREATE TABLE IF NOT EXISTS " + tablename + "( " + ROW_KEY + " CHAR(80) primary key, " + ' MEDIUMBLOB, '.join(columnscopy) + " MEDIUMBLOB) ENGINE=NDBCLUSTER"
45 print command54 print command
46 cursor.execute(command)55 print cursor.execute(command)
47 except MySQLdb.Error, e:56 except MySQLdb.Error, e:
48 print e.args[1]57 print e.args[1]
49 return 058 return 0
59 cursor.close()
60 client.commit()
50 client.close()61 client.close()
51 return 162 return 1
5263
64drop_database()
53create_database()65create_database()
5466
55def prime_mysql():67def prime_mysql():
5668
=== modified file 'AppDB/mysql/py_mysql.py'
--- AppDB/mysql/py_mysql.py 2010-03-15 22:22:12 +0000
+++ AppDB/mysql/py_mysql.py 2010-06-29 01:49:23 +0000
@@ -1,16 +1,17 @@
1# Author: Navraj Chohan1# Author: Navraj Chohan
2import sys2#import sys
3import os3import os
4import string
5import MySQLdb4import MySQLdb
6import _mysql5#import _mysql
7import base646#import sqlalchemy.pool as pool
8import sqlalchemy.pool as pool
9from dbinterface import *7from dbinterface import *
10import appscale_logger8import appscale_logger
119import threading
12MySQLdb = pool.manage(MySQLdb)10import time
1311#MySQLdb = pool.manage(MySQLdb)
12TIMEOUT = 30
13# Time till next gc of connections
14GC_TIME = 120
14ROW_KEY = "mysql__row_key__"15ROW_KEY = "mysql__row_key__"
15ERROR_MY = "DB_ERROR:"16ERROR_MY = "DB_ERROR:"
16#DB_LOCATION = "appscale-image"17#DB_LOCATION = "appscale-image"
@@ -18,22 +19,47 @@
18DB_LOCATION = "127.0.0.1"19DB_LOCATION = "127.0.0.1"
19#DB_PORT = 330620#DB_PORT = 3306
20DEBUG = False21DEBUG = False
2122transDict = {}
23transDict_lock = threading.Lock()
24last_gc_time = 0
22class DatastoreProxy(AppDBInterface):25class DatastoreProxy(AppDBInterface):
2326
24 def __init__(self, log = appscale_logger.getLogger("datastore-mysql")):27 def __init__(self, log = appscale_logger.getLogger("datastore-mysql")):
25 self.logger = log28 self.logger = log
26 self.client = None29 self.client = None
30 self.transactionsOn = False
31
32
33 def commit(self, txnid):
34 elist = [ERROR_MY]
35 try:
36 cursor, client = self.__get_connection(txnid)
37 cursor.close()
38 client.commit()
39 self.__close_connection(txnid)
40 except MySQLdb.Error, e:
41 if DEBUG: self.logger.info(str(e.args[0]) + "--" + e.args[1])
42 elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1]
43 return elist
44
45 def rollback(self, txnid):
46 elist = [ERROR_MY]
47 try:
48 cursor, client = self.__get_connection(txnid)
49 cursor.close()
50 client.rollback()
51 self.__close_connection(txnid)
52 except MySQLdb.Error, e:
53 elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1]
54 return elist
2755
28 def get_schema(self, table_name):56 def get_schema(self, table_name):
29 table_name = "x" + table_name57 table_name = "x" + table_name
30 elist = [ERROR_MY]58 elist = [ERROR_MY]
31 client = None59 client = None
32 try: 60 try:
33 client = self.__get_connection()61 client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
34 client.autocommit(1)
35 cursor = client.cursor()62 cursor = client.cursor()
36# cursor.execute(USE_DATABASE)
37 command = "SHOW fields FROM " + table_name63 command = "SHOW fields FROM " + table_name
38 cursor.execute(command)64 cursor.execute(command)
39 while (1):65 while (1):
@@ -50,6 +76,7 @@
50 if DEBUG: print str(e.args[0]) + "--" + e.args[1] 76 if DEBUG: print str(e.args[0]) + "--" + e.args[1]
51 elist[0] = ERROR_MY + "Unable to get schema"77 elist[0] = ERROR_MY + "Unable to get schema"
52 if client:78 if client:
79 client.commit()
53 client.close()80 client.close()
54 return elist81 return elist
5582
@@ -58,10 +85,8 @@
58 elist = [ERROR_MY]85 elist = [ERROR_MY]
59 client = None86 client = None
60 try:87 try:
61 client = self.__get_connection()88 client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
62 client.autocommit(1)
63 cursor = client.cursor()89 cursor = client.cursor()
64# cursor.execute(USE_DATABASE)
65 command = "drop table " + table_name90 command = "drop table " + table_name
66 if DEBUG: self.logger.info(command)91 if DEBUG: self.logger.info(command)
67 cursor.execute(command)92 cursor.execute(command)
@@ -69,23 +94,29 @@
69 elist[0] += str(e.args[0]) + "--" + e.args[1]94 elist[0] += str(e.args[0]) + "--" + e.args[1]
70 if DEBUG: self.logger.info(elist)95 if DEBUG: self.logger.info(elist)
71 if client:96 if client:
97 client.commit()
72 client.close()98 client.close()
73 return elist99 return elist
74100
75 def get_entity(self, table_name, row_key, column_names):101 def get_entity(self, table_name, row_key, column_names, txnid = 0):
76 table_name = "x" + table_name102 table_name = "x" + table_name
77 elist = [ERROR_MY]103 elist = [ERROR_MY]
78 client = None104 client = None
105 isTrans = False
106 if txnid != 0 and self.transactionsOn:
107 isTrans = True
79 if not row_key:108 if not row_key:
80 self.logger.info("Null row key")109 self.logger.info("Null row key")
81 elist[0] += "Null row key"110 elist[0] += "Null row key"
82 return elist111 return elist
83112
84 try:113 try:
85 client = self.__get_connection()114 if not isTrans:
86 client.autocommit(1)115 client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
87 cursor = client.cursor()116 cursor = client.cursor()
88# cursor.execute(USE_DATABASE)117 else:
118 cursor, client = self.__get_connection(txnid)
119
89 command = "select "120 command = "select "
90 # Hacking on a x to make sure all columns start with a letter121 # Hacking on a x to make sure all columns start with a letter
91 columncopy = []122 columncopy = []
@@ -99,32 +130,35 @@
99 cursor.execute(command)130 cursor.execute(command)
100 result = cursor.fetchone()131 result = cursor.fetchone()
101 if result == None:132 if result == None:
102 client.close()133 if not isTrans:
134 client.close()
103 if len(elist) == 1:135 if len(elist) == 1:
104 elist[0] += "Not found"136 elist[0] += "Not found"
105 return elist137 return elist
106 for ii in range(0, len(result)):138 for ii in range(0, len(result)):
107 #elist += [result[ii]]
108 if result[ii]:139 if result[ii]:
109 #elist += [base64.b64decode(result[ii])]
110 elist.append(result[ii])140 elist.append(result[ii])
111 else:141 else:
112 elist.append('')142 elist.append('')
113 except MySQLdb.Error, e:143 except MySQLdb.Error, e:
114 if e.args[1].find("exists") == -1:144 if e.args[1].find("exists") == -1:
115 client.close()145 if not isTrans:
146 client.close()
116 if len(elist) == 1:147 if len(elist) == 1:
117 elist[0] += "Not found"148 elist[0] += "Not found"
118 return elist149 return elist
119 elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1] 150 elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1]
120 if DEBUG: self.logger.info(elist)151 if DEBUG: self.logger.info(elist)
121 if client:152
153 if client and not isTrans:
122 client.close()154 client.close()
123 if len(elist) == 1:155 if len(elist) == 1:
124 elist[0] += "Not found"156 elist[0] += "Not found"
125 return elist157 return elist
126158
127 def put_entity(self, table_name, row_key, column_names, cell_values):159 def put_entity(self, table_name, row_key, column_names, cell_values, txnid = 0):
160 # Hacking on a x to make sure all columns start with a letter
161 # Mysql limitation
128 table_name = "x" + table_name162 table_name = "x" + table_name
129 if DEBUG: self.logger.info("PUT ENTITY")163 if DEBUG: self.logger.info("PUT ENTITY")
130 if DEBUG: self.logger.info(str(cell_values))164 if DEBUG: self.logger.info(str(cell_values))
@@ -135,28 +169,43 @@
135 self.logger.info("Null row key")169 self.logger.info("Null row key")
136 elist[0] += "Null row key"170 elist[0] += "Null row key"
137 return elist171 return elist
138 # Hacking on a x to make sure all columns start with a letter
139 columncopy = []172 columncopy = []
140 for ii in range(0, len(column_names)):173 for ii in range(0, len(column_names)):
141 columncopy.append("x" + column_names[ii])174 columncopy.append("x" + column_names[ii])
142 try:175
143 client = self.__get_connection()176 # TODO This should be reactive, if a put fails due to no table, create it
144 client.autocommit(1)177 try:
145 cursor = client.cursor()178 tempclient = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
146# cursor.execute(USE_DATABASE)179 query = "CREATE TABLE IF NOT EXISTS " + table_name + " ( " + ROW_KEY + " CHAR(255) primary key, " + ' BLOB, '.join(columncopy) + " BLOB) ENGINE=NDBCLUSTER"
180 tempcursor = tempclient.cursor()
181 if DEBUG: self.logger.info(query)
182 result = tempcursor.execute(query)
183 if DEBUG: self.logger.info("DONE CREATING TABLE...%s",str(result))
184 except MySQLdb.Error, e:
185 error = ERROR_MY + str(e.args[0]) + "--" + e.args[1]
186 print error
187 tempcursor.close()
188 tempclient.commit()
189 tempclient.close()
190
191 isTrans = False
192 if txnid != 0 and self.transactionsOn:
193 isTrans = True
194 try:
195 if not isTrans:
196 client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
197 cursor = client.cursor()
198 else:
199 cursor, client = self.__get_connection(txnid)
147200
148 if len(column_names) != len(cell_values):201 if len(column_names) != len(cell_values):
149 elist[0] += "Error in put call |column_names| != |cell_values|"202 elist[0] += "Error in put call |column_names| != |cell_values|"
150 client.close()203 if not isTrans:
204 client.close()
151 return elist 205 return elist
152 query = "CREATE TABLE IF NOT EXISTS " + table_name + " ( " + ROW_KEY + " CHAR(255) primary key, " + ' BLOB, '.join(columncopy) + " BLOB) ENGINE=NDBCLUSTER"
153 if DEBUG: self.logger.info(query)
154 result = cursor.execute(query)
155 if DEBUG: self.logger.info("DONE CREATING TABLE...%s",str(result))
156 values = []206 values = []
157 for ii in range(0, len(cell_values)):207 for ii in range(0, len(cell_values)):
158 if cell_values[ii]:208 if cell_values[ii]:
159 #value = base64.b64encode(cell_values[ii])
160 value = cell_values[ii]209 value = cell_values[ii]
161 values.append(MySQLdb.escape_string(value))210 values.append(MySQLdb.escape_string(value))
162 else:211 else:
@@ -178,8 +227,6 @@
178 command += "%s)"227 command += "%s)"
179 if DEBUG: self.logger.info(command)228 if DEBUG: self.logger.info(command)
180 cursor.execute(command, tuple(cell_values + [MySQLdb.escape_string(row_key)])) 229 cursor.execute(command, tuple(cell_values + [MySQLdb.escape_string(row_key)]))
181 cursor.close()
182 cursor = ""
183 else:230 else:
184 # do an update 231 # do an update
185 for ii in range(0, len(cell_values)):232 for ii in range(0, len(cell_values)):
@@ -196,13 +243,15 @@
196 elist.append("0")243 elist.append("0")
197 if DEBUG: self.logger.info(elist)244 if DEBUG: self.logger.info(elist)
198 if DEBUG: self.logger.info("DONE WITH PUT ENTITY")245 if DEBUG: self.logger.info("DONE WITH PUT ENTITY")
199 if client:246 if client and not isTrans:
247 cursor.close()
248 client.commit()
200 client.close()249 client.close()
201 return elist 250 return elist
202251
203 def __table_exist(self, table_name):252 def __table_exist(self, table_name):
204 table_name = "x" + table_name253 table_name = "x" + table_name
205 client = self.__get_connection()254 client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
206 cursor = client.cursor()255 cursor = client.cursor()
207# cursor.execute(USE_DATABASE)256# cursor.execute(USE_DATABASE)
208 cursor.execute('show tables')257 cursor.execute('show tables')
@@ -211,18 +260,30 @@
211 if row == None:260 if row == None:
212 break;261 break;
213 if table_name == row[0]:262 if table_name == row[0]:
263 client.close()
214 return 1264 return 1
265
266 cursor.close()
267 client.commit()
268 client.close()
215 return 0269 return 0
216270
217 def delete_row(self, table_name, row_key):271 def delete_row(self, table_name, row_key, txnid = 0):
218 table_name = "x" + table_name272 table_name = "x" + table_name
219 if DEBUG: self.logger.info("DELETE ROW")273 if DEBUG: self.logger.info("DELETE ROW")
220 client = None274 client = None
275
276 isTrans = False
277 if txnid != 0 and self.transactionsOn:
278 isTrans = True
221 elist = [ERROR_MY]279 elist = [ERROR_MY]
222 try:280 try:
223 client = self.__get_connection()281 if txnid == 0 or not self.transactionsOn:
224 client.autocommit(1)282 client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
225 cursor = client.cursor()283 #client.autocommit(0)
284 cursor = client.cursor()
285 else:
286 cursor, client = self.__get_connection(txnid)
226# cursor.execute(USE_DATABASE)287# cursor.execute(USE_DATABASE)
227 row_key = MySQLdb.escape_string(row_key)288 row_key = MySQLdb.escape_string(row_key)
228 query = "delete from " + table_name + " WHERE " + ROW_KEY + "= '" + row_key + "'" 289 query = "delete from " + table_name + " WHERE " + ROW_KEY + "= '" + row_key + "'"
@@ -232,7 +293,8 @@
232 elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1] 293 elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1]
233 if DEBUG: self.logger.info(elist)294 if DEBUG: self.logger.info(elist)
234 if DEBUG: self.logger.info("DELETING ROW")295 if DEBUG: self.logger.info("DELETING ROW")
235 if client:296 if client and not isTrans:
297 client.commit()
236 client.close()298 client.close()
237 return elist299 return elist
238300
@@ -242,8 +304,8 @@
242 client = None304 client = None
243 elist = [ERROR_MY]305 elist = [ERROR_MY]
244 try:306 try:
245 client = self.__get_connection()307 client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
246 client.autocommit(1)308 #client.autocommit(0)
247 cursor = client.cursor()309 cursor = client.cursor()
248# cursor.execute(USE_DATABASE)310# cursor.execute(USE_DATABASE)
249 query = "SELECT COUNT(*) FROM " + table_name 311 query = "SELECT COUNT(*) FROM " + table_name
@@ -255,18 +317,22 @@
255 if DEBUG: self.logger.info(elist)317 if DEBUG: self.logger.info(elist)
256 if DEBUG: self.logger.info("DONE WITH ROW COUNT")318 if DEBUG: self.logger.info("DONE WITH ROW COUNT")
257 if client:319 if client:
320 cursor.close()
258 client.close()321 client.close()
259 return elist322 return elist
260323
261 def get_table(self, table_name, column_names):324 def get_table(self, table_name, column_names, txnid = 0):
262 table_name = "x" + table_name325 table_name = "x" + table_name
263 if DEBUG: self.logger.info("GET TABLE")326 if DEBUG: self.logger.info("GET TABLE")
264 client = None327 client = None
265 elist = [ERROR_MY]328 elist = [ERROR_MY]
266 try:329 try:
267 client = self.__get_connection()330 if txnid == 0 or not self.transactionsOn:
268 client.autocommit(1)331 client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
269 cursor = client.cursor()332 #client.autocommit(0)
333 cursor = client.cursor()
334 else:
335 cursor, client = self.__get_connection(txnid)
270# cursor.execute(USE_DATABASE)336# cursor.execute(USE_DATABASE)
271 # Hacking on a letter to make sure all columns start with a letter337 # Hacking on a letter to make sure all columns start with a letter
272 columncopy = []338 columncopy = []
@@ -294,15 +360,15 @@
294 if DEBUG: self.logger.info(str(elist))360 if DEBUG: self.logger.info(str(elist))
295 if DEBUG: self.logger.info("DONE GETTING TABLE")361 if DEBUG: self.logger.info("DONE GETTING TABLE")
296 if client:362 if client:
297 client.close()363 if not self.transactionsOn or txnid == 0:
364 cursor.close()
365 client.close()
298 return elist366 return elist
299367
300 def __query_table(self, table_name):368 def __query_table(self, table_name):
301 table_name = "x" + table_name369 table_name = "x" + table_name
302 client = self.__get_connection()370 client = self.__get_connection()
303 client.autocommit(1)
304 cursor = client.cursor()371 cursor = client.cursor()
305# cursor.execute(USE_DATABASE)
306 cursor.execute("select * from " + table_name )372 cursor.execute("select * from " + table_name )
307 elist = []373 elist = []
308 while (1):374 while (1):
@@ -310,9 +376,63 @@
310 if row == None:376 if row == None:
311 break377 break
312 elist.append(row)378 elist.append(row)
379 if cursor:
380 cursor.close()
313 if client:381 if client:
382 client.commit()
314 client.close()383 client.close()
315 return elist384 return elist
316385
317 def __get_connection(self):386 def __get_connection(self, txnid):
318 return MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)387 client = None
388 cursor = None
389 self.__gc()
390
391 transDict_lock.acquire()
392 if txnid in transDict:
393 cursor, client, start_time = transDict[txnid]
394 transDict_lock.release()
395 if not client:
396 raise MySQLdb.Error(1, "Connection timed out")
397 return cursor, client
398
399 # clean up expired connections
400 def __gc(self):
401 global last_gc_time
402 curtime = time.time()
403 if curtime < last_gc_time + GC_TIME:
404 return
405 transDict_lock.acquire()
406 del_list = []
407 for ii in transDict:
408 cu, cl, st = transDict[ii]
409 if st + TIMEOUT < curtime:
410 del_list.append(ii)
411 # safe deletes
412 del_list.reverse()
413 for ii in del_list:
414 del transDict[ii]
415 transDict_lock.release()
416 last_gc_time = time.time()
417
418 def setupTransaction(self, txnid):
419 self.transactionsOn = True
420 # New connection
421 client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
422 #client.autocommit(0)
423 cursor = client.cursor()
424 transDict_lock.acquire()
425 transDict[txnid] = cursor, client, time.time()
426 transDict_lock.release()
427
428 def __close_connection(self, txnid):
429 transDict_lock.acquire()
430 if txnid in transDict:
431 cursor, client, start_time = transDict[txnid]
432 cursor.close()
433 client.close()
434 del transDict[txnid]
435 transDict_lock.release()
436 return
437
438
319439
=== added file 'AppDB/mysql/test_mysql_trans.py'
--- AppDB/mysql/test_mysql_trans.py 1970-01-01 00:00:00 +0000
+++ AppDB/mysql/test_mysql_trans.py 2010-06-29 01:49:23 +0000
@@ -0,0 +1,115 @@
1import py_mysql
2import random
3#over write the import
4py_mysql = py_mysql.DatastoreProxy()
5columns = ["a","b","c"]
6data = ["1","2","3"]
7invalid_data = ['y','y','y']
8table_name = "hello"
9key = "1"
10print "key= " + key
11print "columns= " + str(columns)
12print "data= " + str(data)
13print "table= " + table_name
14txn = random.randint(0,100000000)
15py_mysql.setupTransaction(txn)
16print "Test: Transaction number:",txn
17py_mysql.put_entity(table_name, key, columns, invalid_data, txn)
18ret = py_mysql.get_entity(table_name, key, columns, txn)
19print "Test: Invalid:"
20print ret
21ret = py_mysql.get_entity(table_name, key, columns)
22print "Test: Outside transaction:"
23print ret
24print py_mysql.put_entity(table_name, key, columns, data, txn)
25print "Test: GET"
26ret = py_mysql.get_entity(table_name, key, columns, txn)
27print "Test: Valid:"
28print ret
29print "Test: Committing:"
30#print py_mysql.commit(txn)
31print ret
32if ret[1:] != data:
33 print "ERROR doing a put then get. Data does not match"
34 print "returned: " + str(ret)
35 print "expected: " + str(data)
36 exit(1)
37else:
38 print "Success"
39py_mysql.commit(txn)
40print "After committed transaction:"
41ret = py_mysql.get_entity(table_name, key, columns)
42print ret
43txn = random.randint(0,100000000)
44py_mysql.setupTransaction(txn)
45txn2 = random.randint(0,11000000000)
46py_mysql.setupTransaction(txn2)
47print "Transaction number:",txn
48print "PUT:"
49print py_mysql.put_entity(table_name, key, columns, invalid_data, txn)
50print "outside transaction:"
51ret = py_mysql.get_entity(table_name, key, columns,txn2)
52print ret
53print "inside transaction:"
54ret = py_mysql.get_entity(table_name, key, columns, txn)
55print ret
56print py_mysql.put_entity(table_name, key, columns, invalid_data, txn)
57print "Rollback:"
58print py_mysql.rollback(txn)
59print "doing a put, rollback, then get"
60print "GET"
61ret = py_mysql.get_entity(table_name, key, columns)
62print "doing a put then get"
63print ret
64if ret[1:] != data:
65 print "*" * 60
66 print "FAILURE doing a put then get. Data does not match"
67 print "returned: " + str(ret)
68 print "expected: " + str(data)
69 print "*" * 60
70 exit(1)
71else:
72 print "Success"
73
74ret = py_mysql.get_schema("hello")
75print ret
76print "checking schema:"
77print ret
78if ret[1:] != columns:
79 print "ERROR in recieved schema"
80 print "returned: " + str(ret)
81 print "expected: " + str(columns)
82
83#ret = py_mysql.__table_exist(table_name)
84#print "Does table we just created exist?"
85#print ret
86
87ret = py_mysql.delete_row(table_name, key)
88print "Deleting the key %s"%key
89print ret
90
91ret = py_mysql.get_entity(table_name, key, columns)
92print "Trying to get deleted key:"
93print ret
94print "doing a put with key %s"%key
95print py_mysql.put_entity("hello", "1", ["a","b","c"], ["1","2","3"])
96print "doing a get table"
97print py_mysql.get_table("hello", ["a","b","c"])
98py_mysql.put_entity("hello", "2", ["a","b","c"], ["4","5","6"])
99print "doing get table:"
100print py_mysql.get_table("hello", ["a","b","c"])
101py_mysql.put_entity("hello", "3", ["a","b","c"], ["1","2","3"])
102py_mysql.get_table("hello", ["a","b","c"])
103
104print "TRYING TO REPLACE KEY 3"
105py_mysql.put_entity("hello", "3", ["a","b","c"], ["1","2","3"])
106py_mysql.get_table("hello", ["a","b","c"])
107py_mysql.get_row_count("hello")
108ret = py_mysql.delete_row("hello", "1")
109ret = py_mysql.delete_row("hello", "2")
110ret = py_mysql.delete_row("hello", "3")
111py_mysql.get_table("hello", ["a","b","c"])
112print "Deleting table:"
113print py_mysql.delete_table("hello")
114print "deleting twice:"
115print py_mysql.delete_table("hello")
0116
=== modified file 'AppDB/soap_server.py'
--- AppDB/soap_server.py 2010-03-16 21:22:28 +0000
+++ AppDB/soap_server.py 2010-06-29 01:49:23 +0000
@@ -34,7 +34,7 @@
34DEFAULT_APP_LOCATION = ".flatfile_apps"34DEFAULT_APP_LOCATION = ".flatfile_apps"
35DEFAULT_DATASTORE = "hbase"35DEFAULT_DATASTORE = "hbase"
36DEFAULT_SSL_PORT = 434336DEFAULT_SSL_PORT = 4343
37DEFAULT_PORT = 808037DEFAULT_PORT = 9899
38IP_TABLE = "IPS___"38IP_TABLE = "IPS___"
39DEFAULT_ENCRYPTION = 139DEFAULT_ENCRYPTION = 1
40VALID_DATASTORES = [] 40VALID_DATASTORES = []
4141
=== modified file 'AppDB/voldemort/py_voldemort.py'
--- AppDB/voldemort/py_voldemort.py 2010-05-03 21:02:23 +0000
+++ AppDB/voldemort/py_voldemort.py 2010-06-29 01:49:23 +0000
@@ -47,12 +47,14 @@
47 def get(self, key):47 def get(self, key):
48 done = False48 done = False
49 timeout = RETRY_TIMEOUT49 timeout = RETRY_TIMEOUT
50 while (done != True):50 while (done != True and timeout > 0):
51 try:51 try:
52 value = self.real_get(key)52 value = self.real_get(key)
53 done = True53 done = True
54 except:54 except:
55 timeout -= 155 timeout -= 1
56 if timeout <= 0:
57 raise
56 return value58 return value
5759
58 def real_get(self, key):60 def real_get(self, key):
@@ -88,6 +90,8 @@
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches