Merge lp:~nchohan/appscale/SSLCassyMysql into lp:appscale
- SSLCassyMysql
- Merge into appscale-main
Proposed by
Navraj Chohan
Status: | Merged | ||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Merged at revision: | 534 | ||||||||||||||||||||||||||||
Proposed branch: | lp:~nchohan/appscale/SSLCassyMysql | ||||||||||||||||||||||||||||
Merge into: | lp:appscale | ||||||||||||||||||||||||||||
Diff against target: |
5757 lines (+2317/-1984) 43 files modified
AppController/djinn.rb (+28/-21) AppController/haproxy.rb (+5/-0) AppController/helperfunctions.rb (+8/-4) AppController/load_balancer.rb (+5/-0) AppController/nginx.rb (+93/-15) AppController/pbserver.rb (+106/-0) AppController/terminate.rb (+1/-0) AppDB/appscale_server.py (+117/-312) AppDB/appscale_server_native_trans.py (+1086/-0) AppDB/appscale_server_no_trans.py (+0/-1117) AppDB/as_transaction.py (+0/-241) AppDB/cassandra/py_cassandra.py (+209/-62) AppDB/cassandra/templates/storage-conf.xml (+1/-1) AppDB/datastore_tester.py (+20/-8) AppDB/dbinterface.py (+12/-4) AppDB/dhash_datastore.py (+3/-3) AppDB/hbase/py_hbase.py (+6/-8) AppDB/helper_functions.py (+1/-1) AppDB/hypertable/py_hypertable.py (+8/-2) AppDB/mongodb/py_mongodb.py (+6/-2) AppDB/mysql/drop_all_tables.py (+4/-1) AppDB/mysql/prime_mysql.py (+21/-9) AppDB/mysql/py_mysql.py (+178/-58) AppDB/mysql/test_mysql_trans.py (+115/-0) AppDB/soap_server.py (+1/-1) AppDB/voldemort/py_voldemort.py (+8/-1) AppDB/zkappscale/zktransaction.py (+0/-1) AppDB/zkappscale/zktransaction_stub.py (+175/-0) AppLoadBalancer/lib/usertools.rb (+2/-2) AppServer/google/appengine/api/datastore_file_distributed.py (+1/-1) AppServer/google/appengine/tools/dev_appserver.py (+3/-5) AppServer/google/appengine/tools/dev_appserver_login.py (+18/-81) AppServer/google/appengine/tools/dev_appserver_main.py (+20/-4) AppServer/google/net/proto/ProtocolBuffer.py (+1/-1) AppServer_Java/build.xml (+6/-13) AppServer_Java/src/com/google/appengine/api/users/dev/LocalLoginServlet.java (+1/-1) AppServer_Java/src/com/google/appengine/tools/resources/ResourceLoader.java (+2/-2) debian/appscale_install.sh (+3/-0) debian/appscale_install_functions.sh (+38/-0) debian/control.core.jaunty (+1/-0) debian/control.core.karmic (+1/-0) debian/control.core.lucid (+2/-1) firewall.conf (+1/-1) |
||||||||||||||||||||||||||||
To merge this branch: | bzr merge lp:~nchohan/appscale/SSLCassyMysql | ||||||||||||||||||||||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Chris Bunch | Approve | ||
Review via email: mp+28595@code.launchpad.net |
Commit message
Description of the change
Tested with tasks on all databases. All of them work except for Voldemort. Voldemort works for guestbook.
To post a comment you must log in.
Revision history for this message
Chris Bunch (cgb-cs) wrote : | # |
Revision history for this message
Chris Bunch (cgb-cs) wrote : | # |
MySQL works fine for me on a single node, but fails on the four node case. Tried both -n 1 and -n 2, but both fail.
Revision history for this message
Chris Bunch (cgb-cs) wrote : | # |
Accepted. MySQL on >1 nodes needs to be fixed, and would like to not have the load balancer use SSL for all communication (this way if all the user cares about is app redirection then they don't see the SSL cert problem).
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'AppController/djinn.rb' |
2 | --- AppController/djinn.rb 2010-06-25 22:28:38 +0000 |
3 | +++ AppController/djinn.rb 2010-06-29 01:49:23 +0000 |
4 | @@ -101,6 +101,7 @@ |
5 | my_data = my_node |
6 | if has_soap_server?(my_data) |
7 | stop_soap_server |
8 | + stop_pbserver |
9 | end |
10 | |
11 | jobs_to_run = my_data.jobs |
12 | @@ -507,7 +508,8 @@ |
13 | def self.get_nearest_db_ip(is_mysql=false) |
14 | db_ips = self.get_db_slave_ips |
15 | # Unless this is mysql we include the master ip |
16 | - db_ips << self.get_db_master_ip if !is_mysql |
17 | + # Update, now mysql also has an API node |
18 | + db_ips << self.get_db_master_ip |
19 | db_ips.compact! |
20 | |
21 | local_ip = HelperFunctions.local_ip |
22 | @@ -810,21 +812,23 @@ |
23 | end |
24 | if retval != 0 |
25 | Djinn.log_debug("Fail to create initial table. Could not startup AppScale.") |
26 | + exit(1) |
27 | # TODO: terminate djinn |
28 | end |
29 | end |
30 | |
31 | # start soap server and pb server |
32 | if has_soap_server?(my_data) |
33 | + @state = "Starting up SOAP Server and PBServer" |
34 | + start_pbserver |
35 | start_soap_server |
36 | + HelperFunctions.sleep_until_port_is_open(HelperFunctions.local_ip, UA_SERVER_PORT) |
37 | end |
38 | |
39 | - # no longer need to start appengine here |
40 | + # appengine is started elsewhere |
41 | end |
42 | |
43 | def start_soap_server |
44 | - @state = "Starting up SOAP Server and Appscale Server(PB server)" |
45 | - |
46 | db_master_ip = nil |
47 | @nodes.each { |location| |
48 | db_master_ip = location.private_ip if location.is_db_master? |
49 | @@ -846,28 +850,31 @@ |
50 | "-s #{HelperFunctions.get_secret}"] |
51 | Djinn.log_debug(cmd.join(" ")) |
52 | Kernel.system cmd.join(" ") |
53 | + end |
54 | + |
55 | + def start_pbserver |
56 | + db_master_ip = nil |
57 | + my_ip = my_node.public_ip |
58 | + @nodes.each { |location| |
59 | + db_master_ip = location.private_ip if location.is_db_master? |
60 | + } |
61 | + abort("db master ip was nil") if db_master_ip.nil? |
62 | + table = @creds['table'] |
63 | zoo_connection = get_zk_connection_string(@nodes) |
64 | - cmd = [ "MASTER_IP=#{db_master_ip} LOCAL_DB_IP='#{db_local_ip}'", |
65 | - "start-stop-daemon --start", |
66 | - "--exec /usr/bin/python2.6", |
67 | - "--name appscale_server", |
68 | - "--make-pidfile", |
69 | - "--pidfile /var/appscale/appscale-appscaleserver.pid", |
70 | - "--background", |
71 | - "--", |
72 | - "#{APPSCALE_HOME}/AppDB/appscale_server.py", |
73 | - "--type #{table}", |
74 | - "-z \"#{zoo_connection}\"", |
75 | - "-s #{HelperFunctions.get_secret}", |
76 | - "-a #{get_uaserver_ip} --key"] |
77 | - Djinn.log_debug(cmd.join(" ")) |
78 | - Kernel.system cmd.join(" ") |
79 | - HelperFunctions.sleep_until_port_is_open(HelperFunctions.local_ip, UA_SERVER_PORT) |
80 | + PbServer.start(db_master_ip, @userappserver_private_ip, my_ip, table, zoo_connection) |
81 | + HAProxy.create_pbserver_config(my_ip, PbServer.proxy_port) |
82 | + Nginx.create_pbserver_config(my_ip, PbServer.proxy_port) |
83 | + Nginx.restart |
84 | + # TODO check the return value |
85 | + PbServer.is_running |
86 | end |
87 | |
88 | def stop_soap_server |
89 | Kernel.system "start-stop-daemon --stop --pidfile /var/appscale/appscale-soapserver.pid" |
90 | - Kernel.system "start-stop-daemon --stop --pidfile /var/appscale/appscale-appscaleserver.pid" |
91 | + end |
92 | + |
93 | + def stop_pbserver |
94 | + PbServer.stop |
95 | end |
96 | |
97 | def is_cloud? |
98 | |
99 | === modified file 'AppController/haproxy.rb' |
100 | --- AppController/haproxy.rb 2010-06-25 22:28:38 +0000 |
101 | +++ AppController/haproxy.rb 2010-06-29 01:49:23 +0000 |
102 | @@ -61,6 +61,11 @@ |
103 | self.create_app_config(my_ip, listen_port, Monitoring.server_ports, Monitoring.name) |
104 | end |
105 | |
106 | + # Create the config file for PBServer applications |
107 | + def self.create_pbserver_config(my_ip, listen_port) |
108 | + self.create_app_config(my_ip, listen_port, PbServer.server_ports, PbServer.name) |
109 | + end |
110 | + |
111 | # A generic function for creating haproxy config files used by appscale services |
112 | def self.create_app_config(my_ip, listen_port, server_ports, name) |
113 | servers = [] |
114 | |
115 | === modified file 'AppController/helperfunctions.rb' |
116 | --- AppController/helperfunctions.rb 2010-06-01 19:38:10 +0000 |
117 | +++ AppController/helperfunctions.rb 2010-06-29 01:49:23 +0000 |
118 | @@ -23,7 +23,8 @@ |
119 | TIME_IN_SECONDS = { "d" => 86400, "h" => 3600, "m" => 60, "s" => 1 } |
120 | |
121 | APP_START_PORT = 20000 |
122 | - |
123 | +DEFAULT_PBSERVER_PORT = 8443 |
124 | +DEFAULT_PBSERVER_NOENCRYPT_PORT = 8888 |
125 | module HelperFunctions |
126 | def self.write_file(location, contents) |
127 | File.open(location, "w+") { |file| file.write(contents) } |
128 | @@ -204,15 +205,18 @@ |
129 | Djinn.log_debug("saw a python app coming through") |
130 | cmd = ["MY_IP_ADDRESS=#{public_ip}", |
131 | "MY_PORT=#{port}", |
132 | - "NGINX_ADDRESS=#{public_ip}", |
133 | - "NGINX_PORT=#{nginx_port}", |
134 | + #"NGINX_ADDRESS=#{public_ip}", |
135 | + #"NGINX_PORT=#{nginx_port}", |
136 | "python2.5", |
137 | "#{APPSCALE_HOME}/AppServer/dev_appserver.py", |
138 | "-p #{port}", |
139 | "--cookie_secret #{secret}", |
140 | "--login_server #{public_ip}", |
141 | "--admin_console_server ''", |
142 | - "--datastore_path #{db_location}", |
143 | + "--nginx_port #{nginx_port}", |
144 | + "--nginx_host #{public_ip}", |
145 | + "--enable_sendmail", |
146 | + "--datastore_path #{db_location}:#{DEFAULT_PBSERVER_NOENCRYPT_PORT}", |
147 | "--history_path /var/apps/#{app_name}/data/app.datastore.history", |
148 | # this is not working. |
149 | # "--address=#{public_ip}", |
150 | |
151 | === modified file 'AppController/load_balancer.rb' |
152 | --- AppController/load_balancer.rb 2010-04-21 20:53:02 +0000 |
153 | +++ AppController/load_balancer.rb 2010-06-29 01:49:23 +0000 |
154 | @@ -7,6 +7,7 @@ |
155 | PROXY_PORT = 8060 |
156 | # The port which requests to this app will be served from |
157 | LISTEN_PORT = 80 |
158 | + LISTEN_SSL_PORT = 443 |
159 | |
160 | def self.start |
161 | `service appscale-loadbalancer start` |
162 | @@ -33,6 +34,10 @@ |
163 | LISTEN_PORT |
164 | end |
165 | |
166 | + def self.listen_ssl_port |
167 | + LISTEN_SSL_PORT |
168 | + end |
169 | + |
170 | def self.server_ports |
171 | SERVER_PORTS |
172 | end |
173 | |
174 | === modified file 'AppController/nginx.rb' |
175 | --- AppController/nginx.rb 2010-06-25 22:28:38 +0000 |
176 | +++ AppController/nginx.rb 2010-06-29 01:49:23 +0000 |
177 | @@ -5,7 +5,7 @@ |
178 | require 'fileutils' |
179 | require 'load_balancer' |
180 | require 'monitoring' |
181 | - |
182 | +require 'pbserver' |
183 | |
184 | # A class to wrap all the interactions with the nginx web server |
185 | class Nginx |
186 | @@ -72,9 +72,6 @@ |
187 | |
188 | #{static_locations} |
189 | |
190 | - location /_ah/admin { |
191 | - # 404 - lock out admin routes |
192 | - } |
193 | |
194 | location / { |
195 | proxy_set_header X-Real-IP $remote_addr; |
196 | @@ -117,7 +114,7 @@ |
197 | |
198 | # Create the configuration file for the AppLoadBalancer Rails application |
199 | def self.create_app_load_balancer_config(my_ip, proxy_port) |
200 | - self.create_app_config(my_ip, proxy_port, LoadBalancer.listen_port, LoadBalancer.name, LoadBalancer.public_directory) |
201 | + self.create_app_config(my_ip, proxy_port, LoadBalancer.listen_port, LoadBalancer.name, LoadBalancer.public_directory, LoadBalancer.listen_ssl_port) |
202 | end |
203 | |
204 | # Create the configuration file for the AppMonitoring Rails application |
205 | @@ -125,6 +122,68 @@ |
206 | self.create_app_config(my_ip, proxy_port, Monitoring.listen_port, Monitoring.name, Monitoring.public_directory) |
207 | end |
208 | |
209 | + # Create the configuration file for the pbserver |
210 | + def self.create_pbserver_config(my_ip, proxy_port) |
211 | + config = <<CONFIG |
212 | +upstream #{PbServer.name} { |
213 | + server #{my_ip}:#{proxy_port}; |
214 | +} |
215 | + |
216 | +server { |
217 | + listen #{PbServer.listen_port}; |
218 | + server_name #{my_ip}; |
219 | + root /root/appscale/AppDB/; |
220 | + access_log /var/log/nginx/pbserver.access.log upstream; |
221 | + error_log /var/log/nginx/pbserver.error.log; |
222 | + rewrite_log off; |
223 | + error_page 404 = /404.html; |
224 | + |
225 | + |
226 | + |
227 | + location / { |
228 | + proxy_set_header X-Real-IP $remote_addr; |
229 | + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; |
230 | + proxy_set_header Host $http_host; |
231 | + proxy_redirect off; |
232 | + proxy_pass http://#{PbServer.name}; |
233 | + } |
234 | + |
235 | +} |
236 | + |
237 | +server { |
238 | + listen #{PbServer.listen_ssl_port}; |
239 | + ssl on; |
240 | + ssl_certificate /etc/nginx/mycert.pem; |
241 | + ssl_certificate_key /etc/nginx/mykey.pem; |
242 | + root /root/appscale/AppDB/public; |
243 | + access_log /var/log/nginx/pbencrypt.access.log upstream; |
244 | + error_log /var/log/nginx/pbencrypt.error.log; |
245 | + rewrite_log off; |
246 | + error_page 502 /502.html; |
247 | + |
248 | + location / { |
249 | + proxy_set_header X-Real-IP $remote_addr; |
250 | + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; |
251 | + proxy_set_header Host $http_host; |
252 | + proxy_redirect off; |
253 | + |
254 | + client_body_timeout 60; |
255 | + proxy_read_timeout 60; |
256 | + #Increase file size so larger applications can be uploaded |
257 | + client_max_body_size 30M; |
258 | + # go to proxy |
259 | + proxy_pass http://#{PbServer.name}; |
260 | + } |
261 | +} |
262 | +CONFIG |
263 | + config_path = File.join(SITES_ENABLED_PATH, "#{PbServer.name}.#{CONFIG_EXTENSION}") |
264 | + File.open(config_path, "w+") { |dest_file| dest_file.write(config) } |
265 | + |
266 | + HAProxy.regenerate_config |
267 | + |
268 | + end |
269 | + |
270 | + |
271 | # A generic function for creating nginx config files used by appscale services |
272 | def self.create_app_config(my_ip, proxy_port, listen_port, name, public_dir, ssl_port=nil) |
273 | |
274 | @@ -132,9 +191,30 @@ |
275 | upstream #{name} { |
276 | server #{my_ip}:#{proxy_port}; |
277 | } |
278 | - |
279 | -server { |
280 | - listen #{listen_port}; |
281 | +CONFIG |
282 | + |
283 | + if ssl_port |
284 | + # redirect all request to ssl port. |
285 | + config += <<CONFIG |
286 | +server { |
287 | + listen #{listen_port}; |
288 | + rewrite ^(.*) https://#{my_ip}:#{ssl_port}$1 permanent; |
289 | +} |
290 | + |
291 | +server { |
292 | + listen #{ssl_port}; |
293 | + ssl on; |
294 | + ssl_certificate #{NGINX_PATH}/mycert.pem; |
295 | + ssl_certificate_key #{NGINX_PATH}/mykey.pem; |
296 | +CONFIG |
297 | + else |
298 | + config += <<CONFIG |
299 | +server { |
300 | + listen #{listen_port}; |
301 | +CONFIG |
302 | + end |
303 | + |
304 | + config += <<CONFIG |
305 | root #{public_dir}; |
306 | access_log /var/log/nginx/load-balancer.access.log upstream; |
307 | error_log /var/log/nginx/load-balancer.error.log; |
308 | @@ -165,7 +245,6 @@ |
309 | root #{APPSCALE_HOME}/AppLoadBalancer/public; |
310 | } |
311 | } |
312 | - |
313 | CONFIG |
314 | |
315 | config_path = File.join(SITES_ENABLED_PATH, "#{name}.#{CONFIG_EXTENSION}") |
316 | @@ -209,9 +288,6 @@ |
317 | |
318 | gzip on; |
319 | |
320 | - ssl_certificate #{NGINX_PATH}/cert.pem; |
321 | - ssl_certificate_key #{NGINX_PATH}/cert.key; |
322 | - |
323 | include #{NGINX_PATH}/sites-enabled/*; |
324 | } |
325 | CONFIG |
326 | @@ -223,9 +299,11 @@ |
327 | end |
328 | |
329 | # copy over certs for ssl |
330 | - `cp #{APPSCALE_HOME}/.appscale/certs/cert.pem #{NGINX_PATH}` |
331 | - `cp #{APPSCALE_HOME}/.appscale/certs/cert.key #{NGINX_PATH}` |
332 | - |
333 | + # just copy files once to keep certificate as static. |
334 | + #`test ! -e #{NGINX_PATH}/mycert.pem && cp #{APPSCALE_HOME}/.appscale/certs/mycert.pem #{NGINX_PATH}` |
335 | + #`test ! -e #{NGINX_PATH}/mykey.pem && cp #{APPSCALE_HOME}/.appscale/certs/mykey.pem #{NGINX_PATH}` |
336 | + `cp #{APPSCALE_HOME}/.appscale/certs/mykey.pem #{NGINX_PATH}` |
337 | + `cp #{APPSCALE_HOME}/.appscale/certs/mycert.pem #{NGINX_PATH}` |
338 | # Write the main configuration file which sets default configuration parameters |
339 | File.open(MAIN_CONFIG_FILE, "w+") { |dest_file| dest_file.write(config) } |
340 | end |
341 | |
342 | === added file 'AppController/pbserver.rb' |
343 | --- AppController/pbserver.rb 1970-01-01 00:00:00 +0000 |
344 | +++ AppController/pbserver.rb 2010-06-29 01:49:23 +0000 |
345 | @@ -0,0 +1,106 @@ |
346 | +#!/usr/bin/ruby -w |
347 | +require 'helperfunctions' |
348 | + |
349 | +# A class to wrap all the interactions with the PbServer |
350 | +class PbServer |
351 | + APPSCALE_HOME=ENV['APPSCALE_HOME'] |
352 | + # If using just native transactions, then use one server port |
353 | + SERVER_PORTS = [4000, 4001, 4002] |
354 | + # The port which nginx will use to send requests to haproxy |
355 | + PROXY_PORT = 3999 |
356 | + # The port which requests to this app will be served from |
357 | + LISTEN_PORT = 8888 |
358 | + LISTEN_SSL_PORT = 8443 |
359 | + # The following databases cannot have multiple pbservers |
360 | + SINGLE_SERVER_TABLE = ["mysql"] |
361 | + # The following databases have native transaction support |
362 | + NATIVE_TRANSACTIONS = ["mysql"] |
363 | + @@pb_master_ip = "" |
364 | + @@pb_ip = "" |
365 | + @@pb_table = "" |
366 | + @@pb_zklocations = "" |
367 | + |
368 | + def self.start(master_ip, db_local_ip, my_ip, table, zklocations) |
369 | + @@pb_master_ip = master_ip |
370 | + @@pb_ip = my_ip |
371 | + @@pb_table = table |
372 | + @@pb_zklocations = zklocations |
373 | + ports = SERVER_PORTS |
374 | + |
375 | + pbserver = self.pb_script |
376 | + ports = self.server_ports |
377 | + ports.each { |pbserver_port| |
378 | + cmd = [ "MASTER_IP=#{@@pb_master_ip} LOCAL_DB_IP='#{db_local_ip}'", |
379 | + "start-stop-daemon --start", |
380 | + "--exec /usr/bin/python2.6", |
381 | + "--name appscale_server", |
382 | + "--make-pidfile", |
383 | + "--pidfile /var/appscale/appscale-appscaleserver-#{pbserver_port}.pid", |
384 | + "--background", |
385 | + "--", |
386 | + "#{pbserver}", |
387 | + "-p #{pbserver_port}", |
388 | + "--no_encryption", |
389 | + "--type #{@@pb_table}", |
390 | + "-z \"#{@@pb_zklocations}\"", |
391 | + "-s #{HelperFunctions.get_secret}", |
392 | + "-a #{@@pb_ip} --key"] |
393 | + Djinn.log_debug(cmd.join(" ")) |
394 | + Kernel.system cmd.join(" ") |
395 | + } |
396 | + end |
397 | + |
398 | + def self.stop |
399 | + ports = self.server_ports |
400 | + ports.each { |pbserver_port| |
401 | + Kernel.system "start-stop-daemon --stop --pidfile /var/appscale/appscale-appscaleserver-#{pbserver_port}.pid" |
402 | + } |
403 | + end |
404 | + |
405 | + def self.restart |
406 | + self.stop |
407 | + # Use cached variables |
408 | + self.start(@@pb_master_ip, @@pb_ip, @@pb_table, @@pb_zklocations) |
409 | + end |
410 | + |
411 | + def self.name |
412 | + "as_pbserver" |
413 | + end |
414 | + |
415 | + def self.public_directory |
416 | + "/root/appscale/AppDB/public" |
417 | + end |
418 | + |
419 | + def self.listen_port |
420 | + LISTEN_PORT |
421 | + end |
422 | + |
423 | + def self.listen_ssl_port |
424 | + LISTEN_SSL_PORT |
425 | + end |
426 | + |
427 | + def self.server_ports |
428 | + if SINGLE_SERVER_TABLE.include?(@@pb_table) |
429 | + return SERVER_PORTS.first(1) |
430 | + else |
431 | + return SERVER_PORTS |
432 | + end |
433 | + end |
434 | + |
435 | + def self.proxy_port |
436 | + PROXY_PORT |
437 | + end |
438 | + |
439 | + def self.is_running |
440 | + `curl http://#{@@pb_ip}:#{PROXY_PORT}` |
441 | + end |
442 | + |
443 | + def self.pb_script |
444 | + if NATIVE_TRANSACTIONS.include?(@@pb_table) |
445 | + return "#{APPSCALE_HOME}/AppDB/appscale_server_native_trans.py" |
446 | + else |
447 | + return "#{APPSCALE_HOME}/AppDB/appscale_server.py" |
448 | + end |
449 | + end |
450 | + |
451 | +end |
452 | |
453 | === modified file 'AppController/terminate.rb' |
454 | --- AppController/terminate.rb 2010-06-21 20:39:53 +0000 |
455 | +++ AppController/terminate.rb 2010-06-29 01:49:23 +0000 |
456 | @@ -28,6 +28,7 @@ |
457 | Collectd.stop |
458 | LoadBalancer.stop |
459 | Monitoring.stop |
460 | +PbServer.stop |
461 | `/etc/init.d/klogd stop` |
462 | |
463 | `rm -rf /var/apps/` |
464 | |
465 | === modified file 'AppDB/appscale_server.py' |
466 | --- AppDB/appscale_server.py 2010-05-11 20:49:42 +0000 |
467 | +++ AppDB/appscale_server.py 2010-06-29 01:49:23 +0000 |
468 | @@ -5,7 +5,9 @@ |
469 | # Soo Hwan Park (suwanny@gmail.com) |
470 | # Sydney Pang (pang@cs.ucsb.edu) |
471 | # See LICENSE file |
472 | - |
473 | +import tornado.httpserver |
474 | +import tornado.ioloop |
475 | +import tornado.web |
476 | import sys |
477 | import socket |
478 | import os |
479 | @@ -18,10 +20,7 @@ |
480 | import md5 |
481 | import random |
482 | import getopt |
483 | -from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer |
484 | -from SocketServer import ThreadingMixIn |
485 | import threading |
486 | - |
487 | from google.appengine.api import api_base_pb |
488 | from google.appengine.api import datastore |
489 | from google.appengine.api import datastore_errors |
490 | @@ -37,8 +36,7 @@ |
491 | from M2Crypto import SSL |
492 | from drop_privileges import * |
493 | from zkappscale import zktransaction |
494 | -import as_transaction |
495 | - |
496 | +zk = zktransaction |
497 | import time |
498 | |
499 | DEBUG = False |
500 | @@ -48,7 +46,7 @@ |
501 | DEFAULT_APP_LOCATION = ".flatfile_apps" |
502 | HYPERTABLE_XML_TAG = "Name" |
503 | DEFAULT_DATASTORE = "files" |
504 | -DEFAULT_SSL_PORT = 443 |
505 | +DEFAULT_SSL_PORT = 8443 |
506 | DEFAULT_PORT = 4080 |
507 | DEFAULT_ENCRYPTION = 1 |
508 | CERT_LOCATION = "/etc/appscale/certs/mycert.pem" |
509 | @@ -68,7 +66,7 @@ |
510 | KEYBLOCKSIZE = "50" |
511 | keyDictionaryLock = None |
512 | keyDictionary = {} |
513 | - |
514 | +MAX_DICT_SIZE = 1000000 |
515 | optimizedQuery = False |
516 | ID_KEY_LENGTH = 64 |
517 | tableHashTable = {} |
518 | @@ -78,7 +76,6 @@ |
519 | ssl_cert_file = "" |
520 | ssl_key_file = "" |
521 | |
522 | -_trans_set = as_transaction.ASTransSet() |
523 | zoo_keeper = "" |
524 | zoo_keeper_locations = "localhost:2181" |
525 | |
526 | @@ -93,7 +90,6 @@ |
527 | for the entity table |
528 | """ |
529 | |
530 | - |
531 | class ThreadLogger: |
532 | def __init__(self, log): |
533 | self.logger_ = log |
534 | @@ -109,6 +105,24 @@ |
535 | logger = appscale_logger.getLogger("pb_server") |
536 | |
537 | |
538 | +class putThread(threading.Thread): |
539 | + def setup(self, db, table, key, fields, values): |
540 | + self.db = db |
541 | + self.table = table |
542 | + self.key = key |
543 | + self.fields = fields |
544 | + self.values = values |
545 | + self.err = None |
546 | + self.ret = None |
547 | + self.timeTaken = 0 |
548 | + def run(self): |
549 | + s = time.time() |
550 | + self.err, self.ret = self.db.put_entity(self.table, self.key, |
551 | + self.fields, self.values) |
552 | + self.timeTaken = time.time() - s |
553 | + |
554 | + |
555 | + |
556 | def getTableName(app_id, kind, version): |
557 | return app_id + "___" + kind + "___" + version |
558 | |
559 | @@ -240,7 +254,7 @@ |
560 | else: |
561 | keyStart, blockSize = zoo_keeper.generateIDBlock(app_id, root) |
562 | keyStart = long(keyStart) |
563 | - except e: |
564 | + except Exception, e: |
565 | print "="*60 |
566 | print "Exception:",str(e) |
567 | print "="*60 |
568 | @@ -250,6 +264,12 @@ |
569 | key = keyStart |
570 | keyStart = keyStart + 1 |
571 | keyDictionary[index] = keyStart, keyEnd |
572 | + |
573 | + # To prevent the dictionary from getting to large |
574 | + # and taking up too much memory |
575 | + if len(keyDictionary) > MAX_DICT_SIZE: |
576 | + keyDictionary = {} # reset |
577 | + |
578 | keyDictionaryLock.release() |
579 | return key |
580 | |
581 | @@ -258,30 +278,18 @@ |
582 | pass |
583 | |
584 | |
585 | -class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): |
586 | - """ Handle requests in a new thread """ |
587 | - |
588 | - |
589 | -class AppScaleSecureHandler( BaseHTTPRequestHandler ): |
590 | + |
591 | +class MainHandler(tornado.web.RequestHandler): |
592 | """ |
593 | Defines what to do when the webserver receives different types of |
594 | HTTP requests. |
595 | """ |
596 | - def get_http_err_code( self, message ): |
597 | - print message |
598 | - ret = 599 # 599 is the default code for an unknown error # |
599 | - return ret |
600 | - |
601 | - def send_post_http_response( self , http_code , message ): |
602 | - self.send_response( http_code ) |
603 | - self.send_header( 'Content-type' , 'text/plain' ) |
604 | - self.end_headers() |
605 | - self.wfile.write( message ) |
606 | - |
607 | - def send_http_err_response( self, err_msg ): |
608 | - ret_http_code = self.get_http_err_code( err_msg ) |
609 | - self.send_post_http_response( ret_http_code, err_msg ) |
610 | - |
611 | + ######################## |
612 | + # GET Request Handling # |
613 | + ######################## |
614 | + def get(self): |
615 | + self.write("Hi") |
616 | + |
617 | # remote api request |
618 | # sends back a response |
619 | def remote_request(self, app_id, appscale_version, http_request_data): |
620 | @@ -304,8 +312,6 @@ |
621 | method = apirequest.method() |
622 | request_data = apirequest.request() |
623 | http_request_data = request_data.contents() |
624 | - |
625 | - print "REQUEST:",method," AT time",time.time() |
626 | if method == "Put": |
627 | response, errcode, errdetail = self.put_request(app_id, |
628 | appscale_version, |
629 | @@ -393,8 +399,7 @@ |
630 | print "REPLY",method," AT TIME",time.time() |
631 | print "errcode:",errcode |
632 | print "errdetail:",errdetail |
633 | - self.send_post_http_response( 200 , apiresponse.Encode() ) |
634 | - |
635 | + self.write( apiresponse.Encode() ) |
636 | |
637 | def run_query(self, app_id, appscale_version, http_request_data): |
638 | global app_datastore |
639 | @@ -404,14 +409,7 @@ |
640 | if query.has_transaction(): |
641 | txn = query.transaction() |
642 | |
643 | - if not _trans_set.isValid(txn): |
644 | - _trans_set.purge(txn) |
645 | - return (api_base_pb.VoidProto().Encode(), |
646 | - datastore_pb.Error.BAD_REQUEST, |
647 | - 'Transaction timed out.') |
648 | - |
649 | if not query.has_ancestor(): |
650 | - _trans_set.purge(txn) |
651 | return (api_base_pb.VoidProto().Encode(), |
652 | datastore_pb.Error.BAD_REQUEST, |
653 | 'Only ancestor queries are allowed inside transactions.') |
654 | @@ -424,22 +422,12 @@ |
655 | datastore_pb.Error.BAD_REQUEST, |
656 | 'No group entity or root key.') |
657 | |
658 | - if _trans_set.needsLock(txn): |
659 | + try: |
660 | gotLock= zoo_keeper.acquireLock( app_id, txn.handle(), root_key) |
661 | - if gotLock: |
662 | - _trans_set.setGroup(txn, root_key) |
663 | - else: |
664 | - _trans_set.purge(txn) |
665 | + except zk.ZKTransactionException, zkex: |
666 | return (api_base_pb.VoidProto().Encode(), |
667 | datastore_pb.Error.CONCURRENT_TRANSACTION, |
668 | - 'Another transaction is running.') |
669 | - else: |
670 | - if _trans_set.hasLockExpired(txn): |
671 | - #zoo_keeper.notifyOfExpiredLock(app_id, root_key) |
672 | - _trans_set.purge(txn) |
673 | - return (api_base_pb.VoidProto().Encode(), |
674 | - datastore_pb.Error.TIMEOUT, |
675 | - 'The transaction lease has expired.') |
676 | + 'Another transaction is running. %s'%zkex.message) |
677 | |
678 | if not query.has_kind(): |
679 | # Return nothing in case of error # |
680 | @@ -448,20 +436,15 @@ |
681 | "Kindless queries are not implemented.") |
682 | else: |
683 | kind = query.kind() |
684 | - #print "Query kind:",kind |
685 | - |
686 | - # Verify validity of the entity name and applicaiton id # |
687 | - # according to the naming sheme for entity tables # |
688 | - #assert kind[-2:] != "__" |
689 | - #assert app_id[-1] != "_" |
690 | - |
691 | # Fetch query from the datastore # |
692 | table_name = getTableName(app_id, kind, appscale_version) |
693 | - #print "Query using table name: %s, %s" % (table_name, ENTITY_TABLE_SCHEMA) |
694 | r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA) |
695 | - #print "result: %s" % str(r) |
696 | + err = r[0] |
697 | + if err not in ERROR_CODES: |
698 | + return (api_base_pb.VoidProto().Encode(), |
699 | + datastore_pb.Error.INTERNAL_ERROR, |
700 | + "Error running query--." + err) |
701 | |
702 | - #err = r[0] |
703 | if len(r) > 1: |
704 | results = r[1:] |
705 | else: |
706 | @@ -471,8 +454,6 @@ |
707 | versions = results[1::2] |
708 | # evens are encoded entities |
709 | results = results[0::2] |
710 | - #print "RESULTS:",results |
711 | - #print "VERSIONS:",versions |
712 | if len(versions) != len(results): |
713 | return(api_base_pb.VoidProto().Encode(), |
714 | datastore_pb.Error.INTERNAL_ERROR, |
715 | @@ -498,7 +479,6 @@ |
716 | row_key = getRowKeyFromDeletedEncoding(encoded_ent) |
717 | else: |
718 | row_key = getRowKeyFromKeyType(app_id, encoded_ent.key()) |
719 | - #print "constructed row key:",row_key |
720 | root_key = self.getRootKeyFromEntity(app_id, encoded_ent) |
721 | |
722 | #TODO make sure all deleted keys use the same encoding |
723 | @@ -509,8 +489,6 @@ |
724 | journal_result = ["DB_ERROR:",DELETED] |
725 | elif prev_version != long(ii): |
726 | # if the versions don't match, a valid version must be fetched |
727 | - print "previous version from zk:",prev_version |
728 | - print "prev version:",str(ii) |
729 | journal_key = getJournalKey(row_key, prev_version) |
730 | journal_table = getJournalTable(app_id, appscale_version) |
731 | journal_result = app_datastore.get_entity( journal_table, |
732 | @@ -638,59 +616,24 @@ |
733 | transaction_pb = datastore_pb.Transaction() |
734 | # handle = zk.getTransactionID(app_id) |
735 | handle = zoo_keeper.getTransactionID(app_id) |
736 | - print "Begin Trans Handle:",handle |
737 | transaction_pb.set_handle(handle) |
738 | - _trans_set.add(transaction_pb) |
739 | return (transaction_pb.Encode(), 0, "") |
740 | |
741 | def commit_transaction_request(self, app_id, appscale_version, http_request_data): |
742 | txn = datastore_pb.Transaction(http_request_data) |
743 | commitres_pb = datastore_pb.CommitResponse() |
744 | - if not _trans_set.isValid(txn): |
745 | - _trans_set.purge(txn) |
746 | - return (commitres_pb.Encode(), |
747 | - datastore_pb.Error.BAD_REQUEST, |
748 | - 'Transaction timed out.') |
749 | |
750 | - if _trans_set.needsLock(txn): |
751 | - _trans_set.purge(txn) |
752 | - return (commitres_pb.Encode(), |
753 | - datastore_pb.Error.BAD_REQUEST, |
754 | - 'Commiting without owning the group lock.') |
755 | - |
756 | - # if zk.releaseLock(app_id, txn.handle()): |
757 | - #try: |
758 | if zoo_keeper.releaseLock(app_id, txn.handle()): |
759 | - _trans_set.purge(txn) |
760 | return (commitres_pb.Encode(), 0, "") |
761 | else: |
762 | # ZK client must now deal with lock |
763 | return (commitres_pb.Encode(), |
764 | datastore_pb.Error.INTERNAL_ERROR, |
765 | "Unable to release lock") |
766 | - #except : |
767 | - # print "exception:",str(ii) |
768 | - # return (commitres_pb.Encode(), |
769 | - # datastore_pb.Error.INTERNAL_ERROR, |
770 | - # "Releasing a lock with the wrong group") |
771 | - |
772 | |
773 | def rollback_transaction_request(self, app_id, appscale_version, http_request_data): |
774 | txn = datastore_pb.Transaction(http_request_data) |
775 | zoo_keeper.notifyFailedTransaction(app_id, txn.handle()) |
776 | - print "ROLLBACK for transaction:",txn.handle() |
777 | - if not _trans_set.isValid(txn): |
778 | - _trans_set.purge(txn) |
779 | - return (api_base_pb.VoidProto().Encode(), |
780 | - datastore_pb.Error.BAD_REQUEST, |
781 | - 'Transaction timed out.') |
782 | - if _trans_set.needsLock(txn): |
783 | - _trans_set.purge(txn) |
784 | - return (api_base_pb.VoidProto().Encode(), |
785 | - datastore_pb.Error.BAD_REQUEST, |
786 | - 'Rolling back without owning the group lock.') |
787 | - |
788 | - _trans_set.purge(txn) |
789 | return (api_base_pb.VoidProto().Encode(), 0, "") |
790 | |
791 | |
792 | @@ -766,7 +709,6 @@ |
793 | if not request.has_transaction(): |
794 | rollback_req = datastore_pb.Transaction() |
795 | rollback_req.set_handle(internal_txn) |
796 | - #print "ROLLING BACK FOR %s" % internal_txn |
797 | self.rollback_transaction_request(app_id, |
798 | "version", |
799 | rollback_req.Encode()) |
800 | @@ -806,12 +748,8 @@ |
801 | # Only dealing with root puts |
802 | if e.key().path().element_size() == 1: |
803 | root_path = e.key().path().mutable_element(0) |
804 | - #print "has id:",root_path.has_id(), "has name:",root_path.has_name() |
805 | if root_path.id() == 0 and not root_path.has_name(): |
806 | - # zk.generateIDBlock(self, app_id, entity_key = GLOBAL_ID_KEY): |
807 | - #new_key = root_key + "/" + last_path.type() |
808 | uid = generate_unique_id(app_id, None, None) |
809 | - print "Assigned uid to new root key:",str(uid) |
810 | if uid <= 0: |
811 | return (putresp_pb.Encode(), |
812 | datastore_pb.Error.INTERNAL_ERROR, |
813 | @@ -819,36 +757,22 @@ |
814 | root_path.set_id(uid) |
815 | |
816 | if putreq_pb.has_transaction(): |
817 | - #print "This put has transaction" |
818 | txn = putreq_pb.transaction() |
819 | |
820 | - if not _trans_set.isValid(txn): |
821 | - #_trans_set.purge(txn) |
822 | - return (putresp_pb.Encode(), |
823 | - datastore_pb.Error.BAD_REQUEST, |
824 | - 'Transaction timed out.') |
825 | |
826 | root_key, errcode, errdetail = self.getRootKeyFromTransPut(app_id, putreq_pb) |
827 | if errcode != 0: |
828 | - #_trans_set.purge(txn) |
829 | return (putresp_pb.Encode(), |
830 | errcode, |
831 | errdetail) |
832 | - #print "Root key: ",root_key |
833 | - if _trans_set.needsLock(txn): |
834 | - # zk.acquireLock(app_id, txn.handle()) |
835 | + try: |
836 | gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key) |
837 | - if gotLock: |
838 | - _trans_set.setGroup(txn, root_key) |
839 | - else: |
840 | - #_trans_set.purge(txn) |
841 | - return (putresp_pb.Encode(), |
842 | - datastore_pb.Error.CONCURRENT_TRANSACTION, |
843 | - 'Another transaction is running.') |
844 | + except zk.ZKTransactionException, zkex: |
845 | + return (putresp_pb.Encode(), |
846 | + datastore_pb.Error.CONCURRENT_TRANSACTION, |
847 | + 'Error trying to acquire lock. %s'%zkex.message) |
848 | |
849 | # Gather data from Put Request # |
850 | - #print "Entity list for put:" |
851 | - #print putreq_pb.entity_list() |
852 | for e in putreq_pb.entity_list(): |
853 | |
854 | for prop in e.property_list() + e.raw_property_list(): |
855 | @@ -904,7 +828,6 @@ |
856 | # All writes are transactional and per entity if |
857 | # not already wrapped in a transaction |
858 | if not putreq_pb.has_transaction(): |
859 | - #print "This is not a composite transaction" |
860 | txn, err, errcode = self.begin_transaction_request(app_id, |
861 | appscale_version, |
862 | http_request_data) |
863 | @@ -912,33 +835,24 @@ |
864 | txn = datastore_pb.Transaction(txn) |
865 | |
866 | if err != 0: |
867 | - _trans_set.purge(txn) |
868 | return (putresp_pb.Encode(), err, errcode) |
869 | root_key = getRootKey(app_id, e.key().path().element_list()) |
870 | - # TODO what if this is a new key and does not have a uid |
871 | if root_key == None: |
872 | self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle()) |
873 | return (putresp_pb.Encode(), |
874 | datastore_pb.Error.BAD_REQUEST, |
875 | 'No group entity or root key.') |
876 | - |
877 | - #print "Root key: ",root_key |
878 | - if _trans_set.needsLock(txn): |
879 | - #print "needs lock" |
880 | + try: |
881 | gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key) |
882 | - if gotLock: |
883 | - _trans_set.setGroup(txn, root_key) |
884 | - else: |
885 | - #_trans_set.purge(txn) |
886 | - self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle()) |
887 | - return (putresp_pb.Encode(), |
888 | - datastore_pb.Error.CONCURRENT_TRANSACTION, |
889 | - 'Another transaction is running.') |
890 | + except zk.ZKTransactionException, zkex: |
891 | + self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle()) |
892 | + return (putresp_pb.Encode(), |
893 | + datastore_pb.Error.CONCURRENT_TRANSACTION, |
894 | + 'Another transaction is running %s.'%zkex.message) |
895 | ####################################### |
896 | # Done with key assignment |
897 | # Notify Soap Server of any new tables |
898 | ####################################### |
899 | - #print "Putting of type:",kind,"with uid of",str(uid) |
900 | # insert key |
901 | table_name = getTableName(app_id, kind, appscale_version) |
902 | #print "Put Using table name:",table_name |
903 | @@ -991,59 +905,42 @@ |
904 | row_key) |
905 | |
906 | try: |
907 | - print "Using handle for put:",txn.handle() |
908 | zoo_keeper.registUpdatedKey(app_id, txn.handle(), prev_version, row_key) |
909 | except e: |
910 | - print "Exception thrown by register update key",str(e) |
911 | self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle()) |
912 | return (putresp_pb.Encode(), |
913 | datastore_pb.Error.INTERNAL_ERROR, |
914 | "Timeout: Unable to update ZooKeeper on change set for transaction") |
915 | - |
916 | + journalPut = putThread() |
917 | journal_key = getJournalKey(row_key, txn.handle()) |
918 | - |
919 | - field_name_list = JOURNAL_SCHEMA |
920 | - field_value_list = [e.Encode()] |
921 | journal_table = getJournalTable(app_id, appscale_version) |
922 | - print "Putting to ",journal_table,"key:",journal_key |
923 | - err, res = app_datastore.put_entity( journal_table, |
924 | - journal_key, |
925 | - field_name_list, |
926 | - field_value_list ) |
927 | - |
928 | - if err not in ERROR_CODES: |
929 | - #_trans_set.purge(txn) |
930 | - self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle()) |
931 | - return (putresp_pb.Encode(), |
932 | - datastore_pb.Error.INTERNAL_ERROR, |
933 | - err + ", Unable to write to journal") |
934 | + journalPut.setup(app_datastore, |
935 | + journal_table, |
936 | + journal_key, |
937 | + JOURNAL_SCHEMA, |
938 | + [e.Encode()]) |
939 | + entPut = putThread() |
940 | + entPut.setup(app_datastore, |
941 | + table_name, |
942 | + row_key, |
943 | + ENTITY_TABLE_SCHEMA, |
944 | + [e.Encode(), str(txn.handle())]) |
945 | + journalPut.start() |
946 | + entPut.start() |
947 | + journalPut.join() |
948 | + entPut.join() |
949 | + if journalPut.err not in ERROR_CODES: |
950 | + self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle()) |
951 | + return (putresp_pb.Encode(), |
952 | + datastore_pb.Error.INTERNAL_ERROR, |
953 | + journalPut.err + ", Unable to write to journal") |
954 | + if entPut.err not in ERROR_CODES: |
955 | + self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle()) |
956 | + return (putresp_pb.Encode(), |
957 | + datastore_pb.Error.INTERNAL_ERROR, |
958 | + entPut.err + ", Unable to write to journal") |
959 | |
960 | - field_name_list = ENTITY_TABLE_SCHEMA |
961 | - field_value_list = [e.Encode(), str(txn.handle())] |
962 | - #print "put entity field name: %s, field value: %s" % (str(field_name_list), str(field_value_list)) |
963 | - err, res = app_datastore.put_entity( table_name, |
964 | - row_key, |
965 | - field_name_list, |
966 | - field_value_list) |
967 | - |
968 | - if err not in ERROR_CODES: |
969 | - #_trans_set.purge(txn) |
970 | - self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle()) |
971 | - return (putresp_pb.Encode(), |
972 | - datastore_pb.Error.INTERNAL_ERROR, |
973 | - err) |
974 | - |
975 | - # RANDOM ERRORS |
976 | - """ |
977 | - rand = random.random() |
978 | - rand = int(rand * 10000) |
979 | - if rand % 100 == 0: |
980 | - self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle()) |
981 | - print "RANDOM ERROR!!!" |
982 | - return(putresp_pb.Encode(), |
983 | - datastore_pb.Error.BAD_REQUEST, |
984 | - 'Random Error.') |
985 | - """ |
986 | + |
987 | if not putreq_pb.has_transaction(): |
988 | com_res, errcode, errdetail = self.commit_transaction_request(app_id, |
989 | appscale_version, |
990 | @@ -1068,28 +965,18 @@ |
991 | if getreq_pb.has_transaction(): |
992 | txn = getreq_pb.transaction() |
993 | |
994 | - if not _trans_set.isValid(txn): |
995 | - _trans_set.purge(txn) |
996 | - return (getresp_pb.Encode(), |
997 | - datastore_pb.Error.BAD_REQUEST, |
998 | - 'Transaction timed out.') |
999 | - |
1000 | root_key, errcode, errdetail = self.getRootKeyFromTransGet(app_id,getreq_pb) |
1001 | if errcode != 0: |
1002 | - _trans_set.purge(txn) |
1003 | return (getresp_pb.Encode(), |
1004 | errcode, |
1005 | errdetail) |
1006 | |
1007 | - if _trans_set.needsLock(txn): |
1008 | + try: |
1009 | gotLock= zoo_keeper.acquireLock( app_id, txn.handle(), root_key) |
1010 | - if gotLock: |
1011 | - _trans_set.setGroup(txn, root_key) |
1012 | - else: |
1013 | - _trans_set.purge(txn) |
1014 | - return (getresp_pb.Encode(), |
1015 | - datastore_pb.Error.CONCURRENT_TRANSACTION, |
1016 | - 'Another transaction is running.') |
1017 | + except zk.ZKTransactionException, zkex: |
1018 | + return (getresp_pb.Encode(), |
1019 | + datastore_pb.Error.CONCURRENT_TRANSACTION, |
1020 | + 'Another transaction is running. %s'%zkex.message) |
1021 | |
1022 | for key in getreq_pb.key_list(): |
1023 | key.set_app(app_id) |
1024 | @@ -1106,7 +993,6 @@ |
1025 | logger.debug("get: %s___%s___%s %s" % (app_id, kind, appscale_version, str(entity_id))) |
1026 | table_name = getTableName(app_id, kind, appscale_version) |
1027 | row_key = getRowKey(app_id,key.path().element_list()) |
1028 | - print "Get row key:",row_key |
1029 | r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA ) |
1030 | err = r[0] |
1031 | if err not in ERROR_CODES or len(r) != 3: |
1032 | @@ -1128,23 +1014,18 @@ |
1033 | prev_version, |
1034 | row_key) |
1035 | if valid_txn != prev_version: |
1036 | - print "Using journal version",valid_txn," versus invalid:",prev_version |
1037 | prev_version = valid_txn |
1038 | if prev_version == long(NONEXISTANT_TRANSACTION): |
1039 | entity = None |
1040 | else: |
1041 | journal_table = getJournalTable(app_id, appscale_version) |
1042 | journal_key = getJournalKey(row_key, prev_version) |
1043 | - print "Retriving from ",journal_table,"key:",journal_key |
1044 | r = app_datastore.get_entity(journal_table, journal_key, ENTITY_TABLE_SCHEMA[:1] ) |
1045 | err = r[0] |
1046 | if err not in ERROR_CODES or len(r) != 2: |
1047 | - print r |
1048 | - print "UNABLE TO GET JOURNAL VERSION" |
1049 | r = ["",DELETED] |
1050 | |
1051 | if len(r) > 1: |
1052 | - print "GOT JOURNAL VERSION" |
1053 | entity = r[1] |
1054 | |
1055 | if entity[0:len(DELETED)] == DELETED: |
1056 | @@ -1156,7 +1037,6 @@ |
1057 | group.mutable_entity().CopyFrom(e_pb) |
1058 | |
1059 | # Send Response # |
1060 | - print getresp_pb |
1061 | logger.debug("GET_RESPONSE: %s" % getresp_pb) |
1062 | return (getresp_pb.Encode(), 0, "") |
1063 | |
1064 | @@ -1174,31 +1054,19 @@ |
1065 | logger.debug("DELETE_REQUEST: %s" % delreq_pb) |
1066 | delresp_pb = api_base_pb.VoidProto() |
1067 | if delreq_pb.has_transaction(): |
1068 | - print "This delete has a transaction" |
1069 | txn = delreq_pb.transaction() |
1070 | |
1071 | - if not _trans_set.isValid(txn): |
1072 | - #_trans_set.purge(txn) |
1073 | - return (delresp_pb.Encode(), |
1074 | - datastore_pb.Error.BAD_REQUEST, |
1075 | - 'Transaction timed out.') |
1076 | - |
1077 | root_key, errcode, errdetail = self.getRootKeyFromTransDel(app_id, delreq_pb) |
1078 | if errcode != 0: |
1079 | - #_trans_set.purge(txn) |
1080 | return (delresp_pb.Encode(), |
1081 | errcode, |
1082 | errdetail) |
1083 | - if _trans_set.needsLock(txn): |
1084 | - # zk.acquireLock(app_id, txn.handle()) |
1085 | + try: |
1086 | gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key) |
1087 | - if gotLock: |
1088 | - _trans_set.setGroup(txn, root_key) |
1089 | - else: |
1090 | - #_trans_set.purge(txn) |
1091 | - return (delresp_pb.Encode(), |
1092 | - datastore_pb.Error.CONCURRENT_TRANSACTION, |
1093 | - 'Another transaction is running.') |
1094 | + except zk.ZKTransactionException, zkex: |
1095 | + return (delresp_pb.Encode(), |
1096 | + datastore_pb.Error.CONCURRENT_TRANSACTION, |
1097 | + 'Another transaction is running. %s'%zkex.message) |
1098 | |
1099 | |
1100 | for key in delreq_pb.key_list(): |
1101 | @@ -1212,7 +1080,6 @@ |
1102 | # All deletes are transactional and per entity if |
1103 | # not already wrapped in a transaction |
1104 | if not delreq_pb.has_transaction(): |
1105 | - print "This is not a composite transaction" |
1106 | txn, err, errcode = self.begin_transaction_request(app_id, |
1107 | appscale_version, |
1108 | http_request_data) |
1109 | @@ -1220,7 +1087,6 @@ |
1110 | txn = datastore_pb.Transaction(txn) |
1111 | |
1112 | if err != 0: |
1113 | - _trans_set.purge(txn) |
1114 | return (delresp_pb.Encode(), err, errcode) |
1115 | root_key = getRootKey(app_id, key.path().element_list()) |
1116 | if root_key == None: |
1117 | @@ -1233,15 +1099,13 @@ |
1118 | # Get a lock for the group |
1119 | # Do a read before a write to get the old values |
1120 | ################################################ |
1121 | - gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key) |
1122 | - if gotLock: |
1123 | - _trans_set.setGroup(txn, root_key) |
1124 | - else: |
1125 | - #_trans_set.purge(txn) |
1126 | + try: |
1127 | + gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key) |
1128 | + except zk.ZKTransactionException, zkex: |
1129 | self.maybe_rollback_transaction(app_id, delreq_pb, txn.handle()) |
1130 | return (delresp_pb.Encode(), |
1131 | datastore_pb.Error.CONCURRENT_TRANSACTION, |
1132 | - 'Another transaction is running.') |
1133 | + 'Another transaction is running. %s'%zkex.message) |
1134 | |
1135 | ########################## |
1136 | # Get the previous version |
1137 | @@ -1292,7 +1156,6 @@ |
1138 | field_value_list ) |
1139 | |
1140 | if err not in ERROR_CODES: |
1141 | - #_trans_set.purge(txn) |
1142 | self.maybe_rollback_transaction(app_id, delreq_pb, txn.handle()) |
1143 | return (delresp_pb.Encode(), |
1144 | datastore_pb.Error.INTERNAL_ERROR, |
1145 | @@ -1416,31 +1279,18 @@ |
1146 | print "http done" |
1147 | self.void_proto(app_id, appscale_version, http_request_data) |
1148 | |
1149 | - ######################## |
1150 | - # GET Request Handling # |
1151 | - ######################## |
1152 | - def do_GET( self ): |
1153 | - self.send_error( 404 , 'File Not Found: %s' % self.path ) |
1154 | |
1155 | ######################### |
1156 | # POST Request Handling # |
1157 | ######################### |
1158 | - def do_POST( self ): |
1159 | - start_time = time.time() |
1160 | - http_request_data = self.rfile.read(int(self.headers.getheader('content-length'))) |
1161 | - inter_time = time.time() |
1162 | - logger.debug("Timing for pre pre env setup:" + " " + |
1163 | - str(start_time) + " " + str(inter_time) + |
1164 | - " total time: " + str(inter_time - start_time) + "\n") |
1165 | - print "intertime - starttime:",(str(inter_time - start_time)) |
1166 | - pb_type = self.headers.getheader( 'protocolbuffertype' ) |
1167 | - app_data = self.headers.getheader('appdata') |
1168 | + @tornado.web.asynchronous |
1169 | + def post( self ): |
1170 | + request = self.request |
1171 | + http_request_data = request.body |
1172 | + pb_type = request.headers['protocolbuffertype'] |
1173 | + app_data = request.headers['appdata'] |
1174 | app_data = app_data.split(':') |
1175 | logger.debug("POST len: %d" % len(app_data)) |
1176 | - inter_time = time.time() |
1177 | - logger.debug("Timing for pre env setup:" + " " + |
1178 | - str(start_time) + " " + str(inter_time) + |
1179 | - " total time: " + str(inter_time - start_time) + "\n") |
1180 | |
1181 | if len(app_data) == 5: |
1182 | app_id, user_email, nick_name, auth_domain, appscale_version = app_data |
1183 | @@ -1470,42 +1320,12 @@ |
1184 | # Default HTTP Response Data # |
1185 | logger.debug("For app id: " + app_id) |
1186 | logger.debug("For app version: " + appscale_version) |
1187 | - inter_time = time.time() |
1188 | - logger.debug("Timing for env setup:" + pb_type + " " + |
1189 | - app_id + " " + str(start_time) + " " + |
1190 | - str(inter_time) + " total time: " + str(inter_time - start_time) + "\n") |
1191 | |
1192 | if pb_type == "Request": |
1193 | self.remote_request(app_id, appscale_version, http_request_data) |
1194 | else: |
1195 | self.unknown_request(app_id, appscale_version, http_request_data, pb_type) |
1196 | - |
1197 | - stop_time = time.time() |
1198 | - |
1199 | - #if logOn == True: |
1200 | - #logFilePtr.write(pb_type + " " + app_id + " " + |
1201 | - #str(start_time) + " " + str(stop_time) + " total time: " + |
1202 | - #str(stop_time - start_time) + "\n") |
1203 | - |
1204 | - logger.debug(pb_type + " " + app_id + " " + str(start_time) + " " + |
1205 | - str(stop_time) + " total time: " + str(stop_time - start_time) + "\n") |
1206 | - |
1207 | -class AppScaleUnSecureServerThreaded( ThreadingMixIn, HTTPServer): |
1208 | - pass |
1209 | - |
1210 | -class AppScaleSecureServerThreaded( ThreadingMixIn, HTTPServer ): |
1211 | - def __init__( self): |
1212 | - global local_server_address |
1213 | - global HandlerClass |
1214 | - global ssl_cert_file |
1215 | - global ssl_key_file |
1216 | - BaseServer.__init__( self, local_server_address, HandlerClass ) |
1217 | - ctx = SSL.Context() |
1218 | - ctx.load_cert( ssl_cert_file, ssl_key_file ) |
1219 | - self.socket = SSL.Connection( ctx ) |
1220 | - self.server_bind() |
1221 | - self.server_activate() |
1222 | - |
1223 | + self.finish() |
1224 | def usage(): |
1225 | print "AppScale Server" |
1226 | |
1227 | @@ -1518,6 +1338,11 @@ |
1228 | print "\t--blocksize=<key-block-size>" |
1229 | print "\t--optimized_query" |
1230 | print "\t--no_encryption" |
1231 | + |
1232 | +pb_application = tornado.web.Application([ |
1233 | + (r"/*", MainHandler), |
1234 | +]) |
1235 | + |
1236 | def main(argv): |
1237 | global app_datastore |
1238 | global getKeyFromServer |
1239 | @@ -1599,43 +1424,23 @@ |
1240 | exit(1) |
1241 | |
1242 | tableServer = SOAPpy.SOAPProxy("https://" + soapServer + ":" + str(keyPort)) |
1243 | - |
1244 | - # # Bind Port # |
1245 | - #server = AppScaleSecureServer( ('',DEFAULT_SSL_PORT), |
1246 | - # AppScaleSecureHandler, cert_file, key_file ) |
1247 | - #help(ThreadedHTTPServer) |
1248 | - global local_server_address |
1249 | - global HandlerClass |
1250 | - global ssl_cert_file |
1251 | - global ssl_key_file |
1252 | global keyDictionaryLock |
1253 | |
1254 | zoo_keeper = zktransaction.ZKTransaction(zoo_keeper_locations) |
1255 | keyDictionaryLock = threading.Lock() |
1256 | if port == DEFAULT_SSL_PORT and not isEncrypted: |
1257 | port = DEFAULT_PORT |
1258 | - local_server_address = ('',port) |
1259 | - HandlerClass = AppScaleSecureHandler |
1260 | - ssl_cert_file = cert_file |
1261 | - ssl_key_file = key_file |
1262 | - if isEncrypted: |
1263 | - server = AppScaleSecureServerThreaded() |
1264 | - else: |
1265 | - server = AppScaleUnSecureServerThreaded(local_server_address, HandlerClass) |
1266 | - sa = server.socket.getsockname() |
1267 | - if not db_type == "timesten": |
1268 | - # Stop running as root, security purposes # |
1269 | - drop_privileges() |
1270 | - logger.debug("\n\nStarting AppScale-Secure-Server on %s:%s" % (sa[0], sa[1])) |
1271 | + server = tornado.httpserver.HTTPServer(pb_application) |
1272 | + server.listen(port) |
1273 | |
1274 | while 1: |
1275 | try: |
1276 | # Start Server # |
1277 | - server.serve_forever() |
1278 | + tornado.ioloop.IOLoop.instance().start() |
1279 | except SSL.SSLError: |
1280 | - logger.debug("\n\nUnexcepted input for AppScale-Secure-Server on %s:%s" % (sa[0], sa[1])) |
1281 | + logger.debug("\n\nUnexcepted input for AppScale-Secure-Server") |
1282 | except KeyboardInterrupt: |
1283 | - server.socket.close() |
1284 | + #server.socket.close() |
1285 | print "Server interrupted by user, terminating..." |
1286 | exit(1) |
1287 | |
1288 | |
1289 | === added file 'AppDB/appscale_server_native_trans.py' |
1290 | --- AppDB/appscale_server_native_trans.py 1970-01-01 00:00:00 +0000 |
1291 | +++ AppDB/appscale_server_native_trans.py 2010-06-29 01:49:23 +0000 |
1292 | @@ -0,0 +1,1086 @@ |
1293 | +#!/usr/bin/python |
1294 | +# |
1295 | +# Author: |
1296 | +# Navraj Chohan (nchohan@cs.ucsb.edu) |
1297 | +# Soo Hwan Park (suwanny@gmail.com) |
1298 | +# Sydney Pang (pang@cs.ucsb.edu) |
1299 | +# See LICENSE file |
1300 | +import tornado.httpserver |
1301 | +import tornado.ioloop |
1302 | +import tornado.web |
1303 | + |
1304 | +import sys |
1305 | +import socket |
1306 | +import os |
1307 | +import types |
1308 | +import appscale_datastore |
1309 | +#import helper_functions |
1310 | +import SOAPpy |
1311 | +from dbconstants import * |
1312 | +import appscale_logger |
1313 | +import md5 |
1314 | +import random |
1315 | +import getopt |
1316 | +import threading |
1317 | + |
1318 | +from google.appengine.api import api_base_pb |
1319 | +from google.appengine.api import datastore |
1320 | +from google.appengine.api import datastore_errors |
1321 | +from google.appengine.api import datastore_types |
1322 | +from google.appengine.api import users |
1323 | +from google.appengine.datastore import datastore_pb |
1324 | +from google.appengine.datastore import datastore_index |
1325 | +from google.appengine.runtime import apiproxy_errors |
1326 | +from google.net.proto import ProtocolBuffer |
1327 | +from google.appengine.datastore import entity_pb |
1328 | +from google.appengine.ext.remote_api import remote_api_pb |
1329 | +from SocketServer import BaseServer |
1330 | +from M2Crypto import SSL |
1331 | +from drop_privileges import * |
1332 | +from zkappscale import zktransaction |
1333 | + |
1334 | +import time |
1335 | + |
1336 | +DEBUG = False |
1337 | +APP_TABLE = APPS_TABLE |
1338 | +USER_TABLE = USERS_TABLE |
1339 | +DEFAULT_USER_LOCATION = ".flatfile_users" |
1340 | +DEFAULT_APP_LOCATION = ".flatfile_apps" |
1341 | +HYPERTABLE_XML_TAG = "Name" |
1342 | +DEFAULT_DATASTORE = "files" |
1343 | +DEFAULT_SSL_PORT = 8443 |
1344 | +DEFAULT_PORT = 4080 |
1345 | +DEFAULT_ENCRYPTION = 1 |
1346 | +CERT_LOCATION = "/etc/appscale/certs/mycert.pem" |
1347 | +KEY_LOCATION = "/etc/appscale/certs/mykey.pem" |
1348 | +SECRET_LOCATION = "/etc/appscale/secret.key" |
1349 | +VALID_DATASTORES = [] |
1350 | +ERROR_CODES = [] |
1351 | +app_datastore = [] |
1352 | +logOn = False |
1353 | +logFilePtr = "" |
1354 | +zoo_keeper = "" |
1355 | + |
1356 | +getKeyFromServer = False |
1357 | +soapServer = "localhost" |
1358 | +tableServer = "" |
1359 | +keyPort = 4343 |
1360 | +keySecret = "" |
1361 | +KEYBLOCKSIZE = "50" |
1362 | +keyDictionaryLock = None |
1363 | +keyDictionary = {} |
1364 | + |
1365 | +optimizedQuery = False |
1366 | +ID_KEY_LENGTH = 64 |
1367 | +tableHashTable = {} |
1368 | + |
1369 | +local_server_address = "" |
1370 | +HandlerClass = "" |
1371 | +ssl_cert_file = "" |
1372 | +ssl_key_file = "" |
1373 | + |
1374 | +DELETED = "DELETED___" |
1375 | +""" |
1376 | +Deleted keys are DELETED/<row_key> |
1377 | +""" |
1378 | + |
1379 | +""" |
1380 | +keys for tables take the format |
1381 | +appname/Grandparent:<ID>/Parent:<ID>/Child:<ID> |
1382 | +for the entity table |
1383 | +""" |
1384 | + |
1385 | + |
1386 | +class ThreadLogger: |
1387 | + def __init__(self, log): |
1388 | + self.logger_ = log |
1389 | + self.log_lock = threading.Lock() |
1390 | + |
1391 | + def debug(self, string): |
1392 | + return |
1393 | + self.log_lock.acquire() |
1394 | + print string |
1395 | + self.logger_.info(string) |
1396 | + self.log_lock.release() |
1397 | + |
1398 | +logger = appscale_logger.getLogger("pb_server") |
1399 | + |
1400 | + |
1401 | +def getTableName(app_id, kind, version): |
1402 | + return app_id + "___" + kind + "___" + version |
1403 | + |
1404 | +def getRowKey(app_id, ancestor_list): |
1405 | + if ancestor_list == None: |
1406 | + logger.debug("Generate row key received null ancestor list") |
1407 | + return "" |
1408 | + |
1409 | + key = app_id |
1410 | + |
1411 | + # Note: mysql cannot have \ as the first char in the row key |
1412 | + for a in ancestor_list: |
1413 | + key += "/" |
1414 | + if a.has_type(): |
1415 | + key += a.type() |
1416 | + |
1417 | + if a.has_id(): |
1418 | + zero_padded_id = ("0" * (ID_KEY_LENGTH - len(str(a.id())))) + str(a.id()) |
1419 | + key += ":" + zero_padded_id |
1420 | + elif a.has_name(): |
1421 | + # append _ if the name is a number, prevents collisions of key names |
1422 | + if a.name().isdigit(): |
1423 | + key += ":__key__" + a.name() |
1424 | + else: |
1425 | + key += ":" + a.name() |
1426 | + return key |
1427 | + |
1428 | + |
1429 | +def getRootKey(app_id, ancestor_list): |
1430 | + key = app_id # mysql cannot have \ as the first char in the row key |
1431 | + a = ancestor_list[0] |
1432 | + key += "/" |
1433 | + |
1434 | + # append _ if the name is a number, prevents collisions of key names |
1435 | + if a.has_type(): |
1436 | + key += a.type() |
1437 | + else: |
1438 | + return None |
1439 | + |
1440 | + if a.has_id(): |
1441 | + zero_padded_id = ("0" * (ID_KEY_LENGTH - len(str(a.id())))) + str(a.id()) |
1442 | + key += ":" + zero_padded_id |
1443 | + elif a.has_name(): |
1444 | + if a.name().isdigit(): |
1445 | + key += ":__key__" + a.name() |
1446 | + else: |
1447 | + key += ":" + a.name() |
1448 | + else: |
1449 | + return None |
1450 | + |
1451 | + return key |
1452 | + |
1453 | + |
1454 | +def getRootKeyFromKeyType(app_id, key): |
1455 | + ancestor_list = key._Key__reference.path().element_list() |
1456 | + return getRootKey(app_id, ancestor_list) |
1457 | + |
1458 | + |
1459 | +def getRowKeyFromKeyType(app_id, key): |
1460 | + ancestor_list = key._Key__reference.path().element_list() |
1461 | + return getRowKey(app_id, ancestor_list) |
1462 | + |
1463 | +def generate_unique_id(app_id, root, isChild): |
1464 | + global keyDictionary |
1465 | + global keyDictionaryLock |
1466 | + |
1467 | + if isChild: |
1468 | + if not root: |
1469 | + return -1 |
1470 | + |
1471 | + index = None |
1472 | + if isChild: |
1473 | + index = app_id + "/" + str(root) |
1474 | + else: |
1475 | + index = app_id |
1476 | + |
1477 | + keyDictionaryLock.acquire() |
1478 | + try: |
1479 | + keyStart, keyEnd = keyDictionary[index] |
1480 | + except: |
1481 | + keyStart = 0 |
1482 | + keyEnd = 0 |
1483 | + |
1484 | + key = 0 |
1485 | + if keyStart != keyEnd: |
1486 | + key = keyStart |
1487 | + keyStart = keyStart + 1 |
1488 | + keyDictionary[index]= keyStart, keyEnd |
1489 | + keyDictionaryLock.release() |
1490 | + return key |
1491 | + else: |
1492 | + try: |
1493 | + if not isChild: |
1494 | + keyStart, blockSize = zoo_keeper.generateIDBlock(app_id) |
1495 | + keyStart = long(keyStart) |
1496 | + else: |
1497 | + keyStart, blockSize = zoo_keeper.generateIDBlock(app_id, root) |
1498 | + keyStart = long(keyStart) |
1499 | + except: |
1500 | + print "="*60 |
1501 | + print "Exception: when getting id block" |
1502 | + print "="*60 |
1503 | + keyDictionaryLock.release() |
1504 | + return -1 |
1505 | + keyEnd = keyStart + long(blockSize) |
1506 | + key = keyStart |
1507 | + keyStart = keyStart + 1 |
1508 | + keyDictionary[index] = keyStart, keyEnd |
1509 | + keyDictionaryLock.release() |
1510 | + return key |
1511 | + |
1512 | + |
1513 | +def getRootKeyFromRef(app_id, ref): |
1514 | + if not ref.has_path(): |
1515 | + return False |
1516 | + path = ref.path() |
1517 | + element_list = path.element_list() |
1518 | + return getRootKey(app_id, element_list) |
1519 | + |
1520 | + |
1521 | +def rollback_function(app_id, trans_id, root_key, change_set): |
1522 | + pass |
1523 | + |
1524 | + |
1525 | + |
1526 | + |
1527 | +class MainHandler(tornado.web.RequestHandler): |
1528 | + """ |
1529 | + Defines what to do when the webserver receives different types of |
1530 | + HTTP requests. |
1531 | + """ |
1532 | + @tornado.web.asynchronous |
1533 | + def get(self): |
1534 | + self.write("Hi") |
1535 | + self.finish() |
1536 | + # remote api request |
1537 | + # sends back a response |
1538 | + def remote_request(self, app_id, appscale_version, http_request_data): |
1539 | + apirequest = remote_api_pb.Request(http_request_data) |
1540 | + apiresponse = remote_api_pb.Response() |
1541 | + response = None |
1542 | + errcode = 0 |
1543 | + errdetail = "" |
1544 | + apperror_pb = None |
1545 | + |
1546 | + if not apirequest.has_method(): |
1547 | + errcode = datastore_pb.Error.BAD_REQUEST |
1548 | + errdetail = "Method was not set in request" |
1549 | + apirequest.set_method("NOT_FOUND") |
1550 | + if not apirequest.has_request(): |
1551 | + errcode = datastore_pb.Error.BAD_REQUEST |
1552 | + errdetail = "Request missing in call" |
1553 | + apirequest.set_method("NOT_FOUND") |
1554 | + apirequest.clear_request() |
1555 | + method = apirequest.method() |
1556 | + request_data = apirequest.request() |
1557 | + http_request_data = request_data.contents() |
1558 | + |
1559 | + #print "REQUEST:",method," AT time",time.time() |
1560 | + if method == "Put": |
1561 | + response, errcode, errdetail = self.put_request(app_id, |
1562 | + appscale_version, |
1563 | + http_request_data) |
1564 | + elif method == "Get": |
1565 | + response, errcode, errdetail = self.get_request(app_id, |
1566 | + appscale_version, |
1567 | + http_request_data) |
1568 | + elif method == "Delete": |
1569 | + response, errcode, errdetail = self.delete_request(app_id, |
1570 | + appscale_version, |
1571 | + http_request_data) |
1572 | + elif method == "RunQuery": |
1573 | + response, errcode, errdetail = self.run_query(app_id, |
1574 | + appscale_version, |
1575 | + http_request_data) |
1576 | + elif method == "BeginTransaction": |
1577 | + response, errcode, errdetail = self.begin_transaction_request(app_id, |
1578 | + appscale_version, |
1579 | + http_request_data) |
1580 | + elif method == "Commit": |
1581 | + response, errcode, errdetail = self.commit_transaction_request(app_id, |
1582 | + appscale_version, |
1583 | + http_request_data) |
1584 | + elif method == "Rollback": |
1585 | + response, errcode, errdetail = self.rollback_transaction_request(app_id, |
1586 | + appscale_version, |
1587 | + http_request_data) |
1588 | + elif method == "AllocateIds": |
1589 | + response, errcode, errdetail = self.allocate_ids_request(app_id, |
1590 | + appscale_version, |
1591 | + http_request_data) |
1592 | + elif method == "CreateIndex": |
1593 | + errcode = datastore_pb.Error.PERMISSION_DENIED |
1594 | + errdetail = "Create Index is not implemented" |
1595 | + logger.debug(errdetail) |
1596 | + """ |
1597 | + response, errcode, errdetail = self.create_index_request(app_id, |
1598 | + appscale_version, |
1599 | + http_request_data) |
1600 | + """ |
1601 | + elif method == "GetIndices": |
1602 | + errcode = datastore_pb.Error.PERMISSION_DENIED |
1603 | + errdetail = "GetIndices is not implemented" |
1604 | + logger.debug(errdetail) |
1605 | + """ |
1606 | + response, errcode, errdetail = self.get_indices_request(app_id, |
1607 | + appscale_version, |
1608 | + http_request_data) |
1609 | + """ |
1610 | + elif method == "UpdateIndex": |
1611 | + errcode = datastore_pb.Error.PERMISSION_DENIED |
1612 | + errdetail = "UpdateIndex is not implemented" |
1613 | + logger.debug(errdetail) |
1614 | + """ |
1615 | + response, errcode, errdetail = self.update_index_request(app_id, |
1616 | + appscale_version, |
1617 | + http_request_data) |
1618 | + """ |
1619 | + elif method == "DeleteIndex": |
1620 | + errcode = datastore_pb.Error.PERMISSION_DENIED |
1621 | + errdetail = "DeleteIndex is not implemented" |
1622 | + logger.debug(errdetail) |
1623 | + |
1624 | + """ |
1625 | + response, errcode, errdetail = self.delete_index_request(app_id, |
1626 | + appscale_version, |
1627 | + http_request_data) |
1628 | + """ |
1629 | + else: |
1630 | + errcode = datastore_pb.Error.BAD_REQUEST |
1631 | + errdetail = "Unknown datastore message" |
1632 | + logger.debug(errdetail) |
1633 | + |
1634 | + |
1635 | + rawmessage = apiresponse.mutable_response() |
1636 | + if response: |
1637 | + rawmessage.set_contents(response) |
1638 | + |
1639 | + if errcode != 0: |
1640 | + apperror_pb = apiresponse.mutable_application_error() |
1641 | + apperror_pb.set_code(errcode) |
1642 | + apperror_pb.set_detail(errdetail) |
1643 | + if errcode != 0: |
1644 | + print "REPLY",method," AT TIME",time.time() |
1645 | + print "errcode:",errcode |
1646 | + print "errdetail:",errdetail |
1647 | + self.write(apiresponse.Encode() ) |
1648 | + |
1649 | + |
1650 | + def run_query(self, app_id, appscale_version, http_request_data): |
1651 | + global app_datastore |
1652 | + query = datastore_pb.Query(http_request_data) |
1653 | + logger.debug("QUERY:%s" % query) |
1654 | + results = [] |
1655 | + |
1656 | + if not query.has_kind(): |
1657 | + # Return nothing in case of error # |
1658 | + return (api_base_pb.VoidProto().Encode(), |
1659 | + datastore_pb.Error.PERMISSION_DENIED, |
1660 | + "Kindless queries are not implemented.") |
1661 | + else: |
1662 | + kind = query.kind() |
1663 | + #print "Query kind:",kind |
1664 | + |
1665 | + # Verify validity of the entity name and applicaiton id # |
1666 | + # according to the naming sheme for entity tables # |
1667 | + #assert kind[-2:] != "__" |
1668 | + #assert app_id[-1] != "_" |
1669 | + |
1670 | + # Fetch query from the datastore # |
1671 | + table_name = getTableName(app_id, kind, appscale_version) |
1672 | + #print "Query using table name:",table_name |
1673 | + if query.has_transaction(): |
1674 | + txn = query.transaction() |
1675 | + r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA, txn.handle()) |
1676 | + else: |
1677 | + r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA) |
1678 | + #logger.debug("result: %s" % r) |
1679 | + |
1680 | + #err = r[0] |
1681 | + if len(r) > 1: |
1682 | + results = r[1:] |
1683 | + else: |
1684 | + results = [] |
1685 | + |
1686 | + # odds are versions |
1687 | + versions = results[1::2] |
1688 | + # evens are encoded entities |
1689 | + results = results[0::2] |
1690 | + #print "RESULTS:",results |
1691 | + #print "VERSIONS:",versions |
1692 | + if len(versions) != len(results): |
1693 | + return(api_base_pb.VoidProto().Encode(), |
1694 | + datastore_pb.Error.INTERNAL_ERROR, |
1695 | + 'The query had a bad number of results.') |
1696 | + |
1697 | + # convert to objects |
1698 | + # Unless its marked as deleted |
1699 | + # They are currently strings |
1700 | + for index, res in enumerate(results): |
1701 | + results[index] = entity_pb.EntityProto(res) |
1702 | + results[index] = datastore.Entity._FromPb(results[index]) |
1703 | + |
1704 | + logger.debug("====results pre filter====") |
1705 | + #logger.debug("%s" % results) |
1706 | + if query.has_ancestor(): |
1707 | + ancestor_path = query.ancestor().path().element_list() |
1708 | + def is_descendant(entity): |
1709 | + path = entity.key()._Key__reference.path().element_list() |
1710 | + return path[:len(ancestor_path)] == ancestor_path |
1711 | + results = filter(is_descendant, results) |
1712 | + |
1713 | + operators = {datastore_pb.Query_Filter.LESS_THAN: '<', |
1714 | + datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: '<=', |
1715 | + datastore_pb.Query_Filter.GREATER_THAN: '>', |
1716 | + datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=', |
1717 | + datastore_pb.Query_Filter.EQUAL: '==', |
1718 | + } |
1719 | + |
1720 | + for filt in query.filter_list(): |
1721 | + assert filt.op() != datastore_pb.Query_Filter.IN |
1722 | + |
1723 | + prop = filt.property(0).name().decode('utf-8') |
1724 | + op = operators[filt.op()] |
1725 | + |
1726 | + def passes(entity): |
1727 | + """ Returns True if the entity passes the filter, False otherwise. """ |
1728 | + entity_vals = entity.get(prop, []) |
1729 | + if type(entity_vals) != types.ListType: |
1730 | + entity_vals = [entity_vals] |
1731 | + |
1732 | + entity_property_list = [datastore_types.ToPropertyPb(prop, value) for value in entity_vals] |
1733 | + |
1734 | + for entity_prop in entity_property_list: |
1735 | + fixed_entity_val = datastore_types.FromPropertyPb(entity_prop) |
1736 | + |
1737 | + for filter_prop in filt.property_list(): |
1738 | + filter_val = datastore_types.FromPropertyPb(filter_prop) |
1739 | + comp = u'%r %s %r' % (fixed_entity_val, op, filter_val) |
1740 | + logger.debug('Evaling filter expression "%s"' % comp ) |
1741 | + if eval(comp): |
1742 | + return True |
1743 | + return False |
1744 | + |
1745 | + results = filter(passes, results) |
1746 | + |
1747 | + for order in query.order_list(): |
1748 | + prop = order.property().decode('utf-8') |
1749 | + results = [entity for entity in results if prop in entity] |
1750 | + |
1751 | + def order_compare(a, b): |
1752 | + """ Return a negative, zero or positive number depending on whether |
1753 | + entity a is considered smaller than, equal to, or larger than b, |
1754 | + according to the query's orderings. """ |
1755 | + for o in query.order_list(): |
1756 | + prop = o.property().decode('utf-8') |
1757 | + |
1758 | + a_values = a[prop] |
1759 | + if not isinstance(a_values, types.ListType): |
1760 | + a_values = [a_values] |
1761 | + |
1762 | + b_values = b[prop] |
1763 | + if not isinstance(b_values, types.ListType): |
1764 | + b_values = [b_values] |
1765 | + |
1766 | + cmped = cmp(min(a_values), min(b_values)) |
1767 | + |
1768 | + if o.direction() == datastore_pb.Query_Order.DESCENDING: |
1769 | + cmped = -cmped |
1770 | + |
1771 | + if cmped != 0: |
1772 | + return cmped |
1773 | + |
1774 | + return 0 |
1775 | + |
1776 | + results.sort(order_compare) |
1777 | + |
1778 | + if query.has_limit(): |
1779 | + results = results[:query.limit()] |
1780 | + logger.debug("****results after filtering:****") |
1781 | + logger.debug("%s" % results) |
1782 | + |
1783 | + results = [ent._ToPb() for ent in results] |
1784 | + # Pack Results into a clone of QueryResult # |
1785 | + clone_qr_pb = datastore_pb.QueryResult() |
1786 | + for res in results: |
1787 | + clone_qr_pb.add_result() |
1788 | + clone_qr_pb.result_[-1] = res |
1789 | + |
1790 | + clone_qr_pb.clear_cursor() |
1791 | + clone_qr_pb.set_more_results( len(results)>0 ) |
1792 | + |
1793 | + logger.debug("QUERY_RESULT: %s" % clone_qr_pb) |
1794 | + return (clone_qr_pb.Encode(), 0, "") |
1795 | + |
1796 | + |
1797 | + def begin_transaction_request(self, app_id, appscale_version, http_request_data): |
1798 | + transaction_pb = datastore_pb.Transaction() |
1799 | + handle = generate_unique_id(app_id, None, None) |
1800 | + #print "Begin Trans Handle:",handle |
1801 | + transaction_pb.set_handle(handle) |
1802 | + app_datastore.setupTransaction(handle) |
1803 | + return (transaction_pb.Encode(), 0, "") |
1804 | + |
1805 | + def commit_transaction_request(self, app_id, appscale_version, http_request_data): |
1806 | + transaction_pb = datastore_pb.Transaction(http_request_data) |
1807 | + handle = transaction_pb.handle() |
1808 | + commitres_pb = datastore_pb.CommitResponse() |
1809 | + try: |
1810 | + app_datastore.commit(handle) |
1811 | + except: |
1812 | + return (commitres_pb.Encode(), datastore_pb.Error.PERMISSION_DENIED, "Unable to commit for this transaction") |
1813 | + return (commitres_pb.Encode(), 0, "") |
1814 | + |
1815 | + def rollback_transaction_request(self, app_id, appscale_version, http_request_data): |
1816 | + transaction_pb = datastore_pb.Transaction(http_request_data) |
1817 | + handle = transaction_pb.handle() |
1818 | + try: |
1819 | + app_datastore.rollback(handle) |
1820 | + except: |
1821 | + return(api_base_pb.VoidProto().Encode(), datastore_pb.Error.PERMISSION_DENIED, "Unable to rollback for this transaction") |
1822 | + return (api_base_pb.VoidProto().Encode(), 0, "") |
1823 | + |
1824 | + |
1825 | + def allocate_ids_request(self, app_id, appscale_version, http_request_data): |
1826 | + return (api_base_pb.VoidProto().Encode(), |
1827 | + datastore_pb.Error.PERMISSION_DENIED, |
1828 | + 'Allocation of block ids not implemented.') |
1829 | + |
1830 | + |
1831 | + # Returns Null on error |
1832 | + def getRootKeyFromEntity(self, app_id, entity): |
1833 | + key = entity.key() |
1834 | + if str(key.__class__) == "google.appengine.datastore.entity_pb.Reference": |
1835 | + return getRootKeyFromRef(app_id, key) |
1836 | + else: |
1837 | + return getRootKeyFromKeyType(app_id, key) |
1838 | + |
1839 | + |
1840 | + # For transactions |
1841 | + # Verifies all puts are apart of the same root entity |
1842 | + def getRootKeyFromTransPut(self, app_id, putreq_pb): |
1843 | + ent_list = [] |
1844 | + if putreq_pb.entity_size() > 0: |
1845 | + ent_list = putreq_pb.entity_list() |
1846 | + first_ent = ent_list[0] |
1847 | + expected_root = self.getRootKeyFromEntity(app_id, first_ent) |
1848 | + # It is possible that all roots are None |
1849 | + # because it is a root that has not gotten a uid |
1850 | + |
1851 | + for e in ent_list: |
1852 | + root = self.getRootKeyFromEntity(app_id, e) |
1853 | + if root != expected_root: |
1854 | + errcode = datastore_pb.Error.BAD_REQUEST |
1855 | + errdetail = "All puts must be a part of the same group" |
1856 | + return (None, errcode, errdetail) |
1857 | + |
1858 | + return (expected_root, 0, "") |
1859 | + |
1860 | + |
1861 | + # For transactions |
1862 | + # Verifies all puts are apart of the same root entity |
1863 | + def getRootKeyFromTransReq(self, app_id, req_pb): |
1864 | + if req_pb.key_size() <= 0: |
1865 | + errcode = datastore_pb.error.bad_request |
1866 | + errdetail = "Bad key listing" |
1867 | + return (None, errcode, errdetail) |
1868 | + |
1869 | + key_list = req_pb.key_list() |
1870 | + first_key = key_list[0] |
1871 | + |
1872 | + expected_root = getRootKeyFromRef(app_id, first_key) |
1873 | + # It is possible that all roots are None |
1874 | + # because it is a root that has not gotten a uid |
1875 | + |
1876 | + for k in key_list: |
1877 | + root = getRootKeyFromRef(app_id, k) |
1878 | + if root != expected_root: |
1879 | + errcode = datastore_pb.error.bad_request |
1880 | + errdetail = "all transaction gets must be a part of the same group" |
1881 | + return (None, errcode, errdetail) |
1882 | + |
1883 | + return (expected_root, 0, "") |
1884 | + |
1885 | + def getRootKeyFromTransGet(self, app_id, get_pb): |
1886 | + return self.getRootKeyFromTransReq(app_id, get_pb) |
1887 | + |
1888 | + def getRootKeyFromTransDel(self, app_id, del_pb): |
1889 | + return self.getRootKeyFromTransReq(app_id, del_pb) |
1890 | + |
1891 | + def put_request(self, app_id, appscale_version, http_request_data): |
1892 | + global app_datastore |
1893 | + global keySecret |
1894 | + global tableHashTable |
1895 | + |
1896 | + field_name_list = [] |
1897 | + field_value_list = [] |
1898 | + |
1899 | + start_time = time.time() |
1900 | + putreq_pb = datastore_pb.PutRequest(http_request_data) |
1901 | + logger.debug("RECEIVED PUT_REQUEST %s" % putreq_pb) |
1902 | + putresp_pb = datastore_pb.PutResponse( ) |
1903 | + txn = None |
1904 | + root_key = None |
1905 | + # Must assign an id if a put is being done in a transaction |
1906 | + # and it does not have an id and it is a root |
1907 | + for e in putreq_pb.entity_list(): |
1908 | + # Only dealing with root puts |
1909 | + if e.key().path().element_size() == 1: |
1910 | + root_path = e.key().path().mutable_element(0) |
1911 | + #print "has id:",root_path.has_id(), "has name:",root_path.has_name() |
1912 | + if root_path.id() == 0 and not root_path.has_name(): |
1913 | + #new_key = root_key + "/" + last_path.type() |
1914 | + uid = generate_unique_id(app_id, None, None) |
1915 | + #print "Assigned uid to new root key:",str(uid) |
1916 | + if uid <= 0: |
1917 | + return (putresp_pb.Encode(), |
1918 | + datastore_pb.Error.INTERNAL_ERROR, |
1919 | + 'Unable to assign a unique id') |
1920 | + root_path.set_id(uid) |
1921 | + |
1922 | + # Gather data from Put Request # |
1923 | + #print "Entity list for put:" |
1924 | + #print putreq_pb.entity_list() |
1925 | + for e in putreq_pb.entity_list(): |
1926 | + |
1927 | + for prop in e.property_list() + e.raw_property_list(): |
1928 | + if prop.value().has_uservalue(): |
1929 | + obuid = md5.new(prop.value().uservalue().email().lower()).digest() |
1930 | + obuid = '1' + ''.join(['%02d' % ord(x) for x in obuid])[:20] |
1931 | + prop.mutable_value().mutable_uservalue().set_obfuscated_gaiaid( |
1932 | + obuid) |
1933 | + |
1934 | + ################################# |
1935 | + # Key Assignment for new entities |
1936 | + ################################# |
1937 | + e.mutable_key().set_app(app_id) |
1938 | + |
1939 | + root_type = e.key().path().element(0).type() |
1940 | + if root_type[-2:] == "__": |
1941 | + return(putresp_pb.Encode(), |
1942 | + datastore_pb.Error.PERMISSION_DENIED, |
1943 | + "Illegal type name contains reserved delimiters \"__\"") |
1944 | + |
1945 | + last_path = e.key().path().element_list()[-1] |
1946 | + uid = last_path.id() |
1947 | + kind = last_path.type() |
1948 | + # this object has no assigned id thus far |
1949 | + if last_path.id() == 0 and not last_path.has_name(): |
1950 | + if e.key().path().element_size() == 1: |
1951 | + root_key = None |
1952 | + if root_key: |
1953 | + child_key = root_key + "/" + last_path.type() |
1954 | + else: |
1955 | + child_key = None |
1956 | + # if the root is None or the child is None, |
1957 | + # then the global counter is used |
1958 | + # gen unique id only wants to know if a child exist |
1959 | + uid = generate_unique_id(app_id, root_key, child_key) |
1960 | + if uid <= 0: |
1961 | + return(putresp_pb.Encode(), |
1962 | + datastore_pb.Error.INTERNAL_ERROR, |
1963 | + "Unable to assign id to entity") |
1964 | + last_path.set_id(uid) |
1965 | + # It may be its own parent |
1966 | + group = e.mutable_entity_group() |
1967 | + root = e.key().path().element(0) |
1968 | + group.add_element().CopyFrom(root) |
1969 | + if last_path.has_name(): |
1970 | + uid = last_path.name() |
1971 | + # It may be its own parent |
1972 | + group = e.mutable_entity_group() |
1973 | + if group.element_size() == 0: |
1974 | + root = e.key().path().element(0) |
1975 | + group.add_element().CopyFrom(root) |
1976 | + |
1977 | + ####################################### |
1978 | + # Done with key assignment |
1979 | + # Notify Soap Server of any new tables |
1980 | + ####################################### |
1981 | + #print "Putting of type:",kind,"with uid of",str(uid) |
1982 | + # insert key |
1983 | + table_name = getTableName(app_id, kind, appscale_version) |
1984 | + #print "Put Using table name:",table_name |
1985 | + # Notify Users/Apps table if a new class is being added |
1986 | + if table_name not in tableHashTable: |
1987 | + # This is the first time this pbserver has seen this table |
1988 | + # Notify the User/Apps server via soap call |
1989 | + # This function is reentrant |
1990 | + # If the class was deleted, and added a second time there is no |
1991 | + # notifying the users/app server of its creation |
1992 | + if tableServer.add_class(app_id, kind, keySecret) == "true": |
1993 | + tableHashTable[table_name] = 1 |
1994 | + |
1995 | + # Store One Entity # |
1996 | + logger.debug("put: %s___%s___%s with id: %s" % (app_id, |
1997 | + kind, |
1998 | + appscale_version, |
1999 | + str(uid))) |
2000 | + |
2001 | + row_key = getRowKey(app_id, e.key().path().element_list()) |
2002 | + inter_time = time.time() |
2003 | + logger.debug("Time spent in put before datastore call: " + str(inter_time - start_time)) |
2004 | + |
2005 | + |
2006 | + field_name_list = ENTITY_TABLE_SCHEMA |
2007 | + field_value_list = [e.Encode(), NONEXISTANT_TRANSACTION] |
2008 | + if putreq_pb.has_transaction(): |
2009 | + txn = putreq_pb.transaction() |
2010 | + err, res = app_datastore.put_entity( table_name, |
2011 | + row_key, |
2012 | + field_name_list, |
2013 | + field_value_list, |
2014 | + txn.handle()) |
2015 | + else: |
2016 | + err, res = app_datastore.put_entity( table_name, |
2017 | + row_key, |
2018 | + field_name_list, |
2019 | + field_value_list) |
2020 | + |
2021 | + if err not in ERROR_CODES: |
2022 | + #_trans_set.purge(txn) |
2023 | + return (putresp_pb.Encode(), |
2024 | + datastore_pb.Error.INTERNAL_ERROR, |
2025 | + err) |
2026 | + |
2027 | + putresp_pb.key_list().append(e.key()) |
2028 | + |
2029 | + inter_time = time.time() |
2030 | + logger.debug("Time spent in put after datastore call: " + str(inter_time - start_time)) |
2031 | + logger.debug( "PUT_RESPONSE:%s" % putresp_pb) |
2032 | + return (putresp_pb.Encode(), 0, "") |
2033 | + |
2034 | + |
2035 | + def get_request(self, app_id, appscale_version, http_request_data): |
2036 | + global app_datastore |
2037 | + getreq_pb = datastore_pb.GetRequest(http_request_data) |
2038 | + logger.debug("GET_REQUEST: %s" % getreq_pb) |
2039 | + getresp_pb = datastore_pb.GetResponse() |
2040 | + |
2041 | + |
2042 | + for key in getreq_pb.key_list(): |
2043 | + key.set_app(app_id) |
2044 | + last_path = key.path().element_list()[-1] |
2045 | + |
2046 | + if last_path.has_id(): |
2047 | + entity_id = last_path.id() |
2048 | + |
2049 | + if last_path.has_name(): |
2050 | + entity_id = last_path.name() |
2051 | + |
2052 | + if last_path.has_type(): |
2053 | + kind = last_path.type() |
2054 | + logger.debug("get: %s___%s___%s %s" % (app_id, kind, appscale_version, str(entity_id))) |
2055 | + table_name = getTableName(app_id, kind, appscale_version) |
2056 | + row_key = getRowKey(app_id,key.path().element_list()) |
2057 | + #print "Get row key:",row_key |
2058 | + if getreq_pb.has_transaction(): |
2059 | + txn = getreq_pb.transaction() |
2060 | + r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA, txn.handle()) |
2061 | + else: |
2062 | + r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA ) |
2063 | + err = r[0] |
2064 | + if err not in ERROR_CODES or len(r) != 3: |
2065 | + r = ["",None,NONEXISTANT_TRANSACTION] |
2066 | + print err |
2067 | + entity = r[1] |
2068 | + prev_version = long(r[2]) |
2069 | + |
2070 | + group = getresp_pb.add_entity() |
2071 | + if entity: |
2072 | + e_pb = entity_pb.EntityProto( entity ) |
2073 | + group.mutable_entity().CopyFrom(e_pb) |
2074 | + |
2075 | + # Send Response # |
2076 | + #print getresp_pb |
2077 | + logger.debug("GET_RESPONSE: %s" % getresp_pb) |
2078 | + return (getresp_pb.Encode(), 0, "") |
2079 | + |
2080 | + """ Deletes are just PUTs using a sentinal value of DELETED |
2081 | + All deleted keys are DELETED/entity_group. This is for |
2082 | + rollback to know which entity group a possible failed |
2083 | + transaction belongs to. |
2084 | + """ |
2085 | + def delete_request(self, app_id, appscale_version, http_request_data): |
2086 | + global app_datastore |
2087 | + root_key = None |
2088 | + txn = None |
2089 | + logger.debug("DeleteRequest Received...") |
2090 | + delreq_pb = datastore_pb.DeleteRequest( http_request_data ) |
2091 | + logger.debug("DELETE_REQUEST: %s" % delreq_pb) |
2092 | + delresp_pb = api_base_pb.VoidProto() |
2093 | + |
2094 | + for key in delreq_pb.key_list(): |
2095 | + key.set_app(app_id) |
2096 | + last_path = key.path().element_list()[-1] |
2097 | + if last_path.has_type(): |
2098 | + kind = last_path.type() |
2099 | + |
2100 | + row_key = getRowKey(app_id, key.path().element_list()) |
2101 | + |
2102 | + |
2103 | + table_name = getTableName(app_id, kind, appscale_version) |
2104 | + if delreq_pb.has_transaction(): |
2105 | + txn = delreq_pb.transaction() |
2106 | + res = app_datastore.delete_row( table_name, row_key, txn.handle()) |
2107 | + else: |
2108 | + res = app_datastore.delete_row( table_name, |
2109 | + row_key) |
2110 | + err = res[0] |
2111 | + logger.debug("Response from DB for delete request %s" % err) |
2112 | + if err not in ERROR_CODES: |
2113 | + if DEBUG: print err |
2114 | + return (delresp_pb.Encode(), |
2115 | + datastore_pb.Error.INTERNAL_ERROR, |
2116 | + err + ", Unable to delete row") |
2117 | + |
2118 | + return (delresp_pb.Encode(), 0, "") |
2119 | + |
2120 | + |
2121 | + def optimized_delete_request(self, app_id, appscale_version, http_request_data): |
2122 | + pass |
2123 | + def run_optimized_query(self, app_id, appscale_version, http_request_data): |
2124 | + return |
2125 | + def optimized_put_request(self, app_id, appscale_version, http_request_data): |
2126 | + pass |
2127 | + |
2128 | + def void_proto(self, app_id, appscale_version, http_request_data): |
2129 | + resp_pb = api_base_pb.VoidProto() |
2130 | + print "Got void" |
2131 | + logger.debug("VOID_RESPONSE: %s to void" % resp_pb) |
2132 | + return (resp_pb.Encode(), 0, "" ) |
2133 | + |
2134 | + def str_proto(self, app_id, appscale_version, http_request_data): |
2135 | + str_pb = api_base_pb.StringProto( http_request_data ) |
2136 | + composite_pb = datastore_pb.CompositeIndices() |
2137 | + print "Got a string proto" |
2138 | + print str_pb |
2139 | + logger.debug("String proto received: %s"%str_pb) |
2140 | + logger.debug("CompositeIndex response to string: %s" % composite_pb) |
2141 | + return (composite_pb.Encode(), 0, "" ) |
2142 | + |
2143 | + def int64_proto(self, app_id, appscale_version, http_request_data): |
2144 | + int64_pb = api_base_pb.Integer64Proto( http_request_data ) |
2145 | + resp_pb = api_base_pb.VoidProto() |
2146 | + print "Got a int 64" |
2147 | + print int64_pb |
2148 | + logger.debug("Int64 proto received: %s"%int64_pb) |
2149 | + logger.debug("VOID_RESPONSE to int64: %s" % resp_pb) |
2150 | + return (resp_pb.Encode(), 0, "") |
2151 | + |
2152 | + def compositeindex_proto(self, app_id, appscale_version, http_request_data): |
2153 | + compindex_pb = entity_pb.CompositeIndex( http_request_data) |
2154 | + resp_pb = api_base_pb.VoidProto() |
2155 | + print "Got Composite Index" |
2156 | + print compindex_pb |
2157 | + logger.debug("CompositeIndex proto recieved: %s"%str(compindex_pb)) |
2158 | + logger.debug("VOID_RESPONSE to composite index: %s" % resp_pb) |
2159 | + return (resp_pb.Encode(), 0, "") |
2160 | + |
2161 | +# Returns 0 on success, 1 on failure |
2162 | + def create_index_tables(self, app_id): |
2163 | + global app_datastore |
2164 | + """table_name = "__" + app_id + "__" + "kind" |
2165 | + columns = ["reference"] |
2166 | + print "Building table: " + table_name |
2167 | + returned = app_datastore.create_table( table_name, columns ) |
2168 | + err,res = returned |
2169 | + if err not in ERROR_CODES: |
2170 | + logger.debug("%s" % err) |
2171 | + return 1 |
2172 | + """ |
2173 | + table_name = "__" + app_id + "__" + "single_prop_asc" |
2174 | + print "Building table: " + table_name |
2175 | + columns = ["reference"] |
2176 | + returned = app_datastore.create_table( table_name, columns ) |
2177 | + err,res = returned |
2178 | + if err not in ERROR_CODES: |
2179 | + logger.debug("%s" % err) |
2180 | + return 1 |
2181 | + |
2182 | + table_name = "__" + app_id + "__" + "single_prop_desc" |
2183 | + print "Building table: " + table_name |
2184 | + returned = app_datastore.create_table( table_name, columns ) |
2185 | + err,res = returned |
2186 | + if err not in ERROR_CODES: |
2187 | + logger.debug("%s" % err) |
2188 | + return 1 |
2189 | + |
2190 | + table_name = "__" + app_id + "__" + "composite" |
2191 | + print "Building table: " + table_name |
2192 | + returned = app_datastore.create_table( table_name, columns ) |
2193 | + err,res = returned |
2194 | + if err not in ERROR_CODES: |
2195 | + logger.debug("%s" % err) |
2196 | + return 1 |
2197 | + |
2198 | + return 0 |
2199 | + |
2200 | + ############## |
2201 | + # OTHER TYPE # |
2202 | + ############## |
2203 | + def unknown_request(self, app_id, appscale_version, http_request_data, pb_type): |
2204 | + logger.debug("Received Unknown Protocol Buffer %s" % pb_type ) |
2205 | + print "ERROR: Received Unknown Protocol Buffer <" + pb_type +">.", |
2206 | + print "Nothing has been implemented to handle this Protocol Buffer type." |
2207 | + print "http request data:" |
2208 | + print http_request_data |
2209 | + print "http done" |
2210 | + self.void_proto(app_id, appscale_version, http_request_data) |
2211 | + |
2212 | + |
2213 | + ######################### |
2214 | + # POST Request Handling # |
2215 | + ######################### |
2216 | + @tornado.web.asynchronous |
2217 | + def post( self ): |
2218 | + request = self.request |
2219 | + http_request_data = request.body |
2220 | + pb_type = request.headers['protocolbuffertype'] |
2221 | + app_data = request.headers['appdata'] |
2222 | + app_data = app_data.split(':') |
2223 | + |
2224 | + if len(app_data) == 5: |
2225 | + app_id, user_email, nick_name, auth_domain, appscale_version = app_data |
2226 | + os.environ['AUTH_DOMAIN'] = auth_domain |
2227 | + os.environ['USER_EMAIL'] = user_email |
2228 | + os.environ['USER_NICKNAME'] = nick_name |
2229 | + os.environ['APPLICATION_ID'] = app_id |
2230 | + elif len(app_data) == 4: |
2231 | + app_id, user_email, nick_name, auth_domain = app_data |
2232 | + os.environ['AUTH_DOMAIN'] = auth_domain |
2233 | + os.environ['USER_EMAIL'] = user_email |
2234 | + os.environ['USER_NICKNAME'] = nick_name |
2235 | + os.environ['APPLICATION_ID'] = app_id |
2236 | + appscale_version = "1" |
2237 | + elif len(app_data) == 2: |
2238 | + app_id, appscale_version = app_data |
2239 | + app_id = app_data[0] |
2240 | + os.environ['APPLICATION_ID'] = app_id |
2241 | + elif len(app_data) == 1: |
2242 | + app_id = app_data[0] |
2243 | + os.environ['APPLICATION_ID'] = app_id |
2244 | + appscale_version = "1" |
2245 | + else: |
2246 | + logger.debug("UNABLE TO EXTRACT APPLICATION DATA") |
2247 | + return |
2248 | + |
2249 | + # Default HTTP Response Data # |
2250 | + |
2251 | + if pb_type == "Request": |
2252 | + self.remote_request(app_id, appscale_version, http_request_data) |
2253 | + else: |
2254 | + self.unknown_request(app_id, appscale_version, http_request_data, pb_type) |
2255 | + self.finish() |
2256 | + |
2257 | + |
2258 | +def usage(): |
2259 | + print "AppScale Server" |
2260 | |
2261 | + print "Options:" |
2262 | + print "\t--certificate=<path-to-ssl-certificate>" |
2263 | + print "\t--a=<soap server hostname> " |
2264 | + print "\t--key for using keys from the soap server" |
2265 | + print "\t--type=<hypertable, hbase, cassandra, mysql, mongodb>" |
2266 | + print "\t--secret=<secrete to soap server>" |
2267 | + print "\t--blocksize=<key-block-size>" |
2268 | + print "\t--optimized_query" |
2269 | + print "\t--no_encryption" |
2270 | +def main(argv): |
2271 | + global app_datastore |
2272 | + global getKeyFromServer |
2273 | + global tableServer |
2274 | + global keySecret |
2275 | + global logOn |
2276 | + global logFilePtr |
2277 | + global optimizedQuery |
2278 | + global soapServer |
2279 | + global ERROR_CODES |
2280 | + global VALID_DATASTORES |
2281 | + global KEYBLOCKSIZE |
2282 | + global zoo_keeper |
2283 | + cert_file = CERT_LOCATION |
2284 | + key_file = KEY_LOCATION |
2285 | + db_type = "hypertable" |
2286 | + port = DEFAULT_SSL_PORT |
2287 | + isEncrypted = True |
2288 | + try: |
2289 | + opts, args = getopt.getopt( argv, "c:t:l:s:b:a:k:p:o:n:z:", |
2290 | + ["certificate=", |
2291 | + "type=", |
2292 | + "log=", |
2293 | + "secret=", |
2294 | + "blocksize=", |
2295 | + "soap=", |
2296 | + "key", |
2297 | + "port", |
2298 | + "optimized_query", |
2299 | + "no_encryption", |
2300 | + "zoo_keeper"] ) |
2301 | + except getopt.GetoptError: |
2302 | + usage() |
2303 | + sys.exit(1) |
2304 | + for opt, arg in opts: |
2305 | + if opt in ("-c", "--certificate"): |
2306 | + cert_file = arg |
2307 | + print "Using cert..." |
2308 | + elif opt in ("-k", "--key" ): |
2309 | + getKeyFromServer = True |
2310 | + print "Using key server..." |
2311 | + elif opt in ("-t", "--type"): |
2312 | + db_type = arg |
2313 | + print "Datastore type: ",db_type |
2314 | + elif opt in ("-s", "--secret"): |
2315 | + keySecret = arg |
2316 | + print "Secret set..." |
2317 | + elif opt in ("-l", "--log"): |
2318 | + logOn = True |
2319 | + logFile = arg |
2320 | + logFilePtr = open(logFile, "w") |
2321 | + logFilePtr.write("# type, app, start, end\n") |
2322 | + elif opt in ("-b", "--blocksize"): |
2323 | + KEYBLOCKSIZE = arg |
2324 | + print "Block size: ",KEYBLOCKSIZE |
2325 | + elif opt in ("-a", "--soap"): |
2326 | + soapServer = arg |
2327 | + elif opt in ("-o", "--optimized_query"): |
2328 | + optimizedQuery = True |
2329 | + elif opt in ("-p", "--port"): |
2330 | + port = int(arg) |
2331 | + elif opt in ("-n", "--no_encryption"): |
2332 | + isEncrypted = False |
2333 | + elif opt in ("-z", "--zoo_keeper"): |
2334 | + zoo_keeper_locations = arg |
2335 | + |
2336 | + app_datastore = appscale_datastore.DatastoreFactory.getDatastore(db_type) |
2337 | + ERROR_CODES = appscale_datastore.DatastoreFactory.error_codes() |
2338 | + VALID_DATASTORES = appscale_datastore.DatastoreFactory.valid_datastores() |
2339 | + if DEBUG: print "ERROR_CODES:" |
2340 | + if DEBUG: print ERROR_CODES |
2341 | + if DEBUG: print "VALID_DATASTORE:" |
2342 | + if DEBUG: print VALID_DATASTORES |
2343 | + if db_type in VALID_DATASTORES: |
2344 | + logger.debug("Using datastore %s" % db_type) |
2345 | + else: |
2346 | + print "Unknown datastore "+ db_type |
2347 | + exit(1) |
2348 | + |
2349 | + tableServer = SOAPpy.SOAPProxy("https://" + soapServer + ":" + str(keyPort)) |
2350 | + |
2351 | + global keyDictionaryLock |
2352 | + zoo_keeper = zktransaction.ZKTransaction(zoo_keeper_locations) |
2353 | + |
2354 | + keyDictionaryLock = threading.Lock() |
2355 | + if port == DEFAULT_SSL_PORT and not isEncrypted: |
2356 | + port = DEFAULT_PORT |
2357 | + pb_application = tornado.web.Application([ |
2358 | + (r"/*", MainHandler), |
2359 | + ]) |
2360 | + server = tornado.httpserver.HTTPServer(pb_application) |
2361 | + server.listen(port) |
2362 | + if not db_type == "timesten": |
2363 | + # Stop running as root, security purposes # |
2364 | + drop_privileges() |
2365 | + |
2366 | + while 1: |
2367 | + try: |
2368 | + # Start Server # |
2369 | + tornado.ioloop.IOLoop.instance().start() |
2370 | + except SSL.SSLError: |
2371 | + logger.debug("\n\nUnexcepted input for AppScale-Secure-Server") |
2372 | + except KeyboardInterrupt: |
2373 | + print "Server interrupted by user, terminating..." |
2374 | + exit(1) |
2375 | + |
2376 | +if __name__ == '__main__': |
2377 | + #cProfile.run("main(sys.argv[1:])") |
2378 | + main(sys.argv[1:]) |
2379 | |
2380 | === removed file 'AppDB/appscale_server_no_trans.py' |
2381 | --- AppDB/appscale_server_no_trans.py 2010-05-11 02:07:26 +0000 |
2382 | +++ AppDB/appscale_server_no_trans.py 1970-01-01 00:00:00 +0000 |
2383 | @@ -1,1117 +0,0 @@ |
2384 | -#!/usr/bin/python |
2385 | -# |
2386 | -# Author: |
2387 | -# Navraj Chohan (nchohan@cs.ucsb.edu) |
2388 | -# Soo Hwan Park (suwanny@gmail.com) |
2389 | -# Sydney Pang (pang@cs.ucsb.edu) |
2390 | -# See LICENSE file |
2391 | - |
2392 | -import sys |
2393 | -import socket |
2394 | -import os |
2395 | -import types |
2396 | -import appscale_datastore |
2397 | -#import helper_functions |
2398 | -import SOAPpy |
2399 | -from dbconstants import * |
2400 | -import appscale_logger |
2401 | -import md5 |
2402 | -import random |
2403 | -import getopt |
2404 | -from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer |
2405 | -from SocketServer import ThreadingMixIn |
2406 | -import threading |
2407 | - |
2408 | -from google.appengine.api import api_base_pb |
2409 | -from google.appengine.api import datastore |
2410 | -from google.appengine.api import datastore_errors |
2411 | -from google.appengine.api import datastore_types |
2412 | -from google.appengine.api import users |
2413 | -from google.appengine.datastore import datastore_pb |
2414 | -from google.appengine.datastore import datastore_index |
2415 | -from google.appengine.runtime import apiproxy_errors |
2416 | -from google.net.proto import ProtocolBuffer |
2417 | -from google.appengine.datastore import entity_pb |
2418 | -from google.appengine.ext.remote_api import remote_api_pb |
2419 | -from SocketServer import BaseServer |
2420 | -from M2Crypto import SSL |
2421 | -from drop_privileges import * |
2422 | - |
2423 | -import time |
2424 | - |
2425 | -DEBUG = False |
2426 | -APP_TABLE = APPS_TABLE |
2427 | -USER_TABLE = USERS_TABLE |
2428 | -DEFAULT_USER_LOCATION = ".flatfile_users" |
2429 | -DEFAULT_APP_LOCATION = ".flatfile_apps" |
2430 | -HYPERTABLE_XML_TAG = "Name" |
2431 | -DEFAULT_DATASTORE = "files" |
2432 | -DEFAULT_SSL_PORT = 443 |
2433 | -DEFAULT_PORT = 4080 |
2434 | -DEFAULT_ENCRYPTION = 1 |
2435 | -CERT_LOCATION = "/etc/appscale/certs/mycert.pem" |
2436 | -KEY_LOCATION = "/etc/appscale/certs/mykey.pem" |
2437 | -SECRET_LOCATION = "/etc/appscale/secret.key" |
2438 | -VALID_DATASTORES = [] |
2439 | -ERROR_CODES = [] |
2440 | -app_datastore = [] |
2441 | -logOn = False |
2442 | -logFilePtr = "" |
2443 | - |
2444 | -getKeyFromServer = False |
2445 | -soapServer = "localhost" |
2446 | -tableServer = "" |
2447 | -keyPort = 4343 |
2448 | -keySecret = "" |
2449 | -KEYBLOCKSIZE = "50" |
2450 | -keyDictionaryLock = None |
2451 | -keyDictionary = {} |
2452 | -keyStart = 0 |
2453 | -keyEnd = 0 |
2454 | - |
2455 | -optimizedQuery = False |
2456 | -ID_KEY_LENGTH = 64 |
2457 | -tableHashTable = {} |
2458 | - |
2459 | -local_server_address = "" |
2460 | -HandlerClass = "" |
2461 | -ssl_cert_file = "" |
2462 | -ssl_key_file = "" |
2463 | - |
2464 | -DELETED = "DELETED___" |
2465 | -""" |
2466 | -Deleted keys are DELETED/<row_key> |
2467 | -""" |
2468 | - |
2469 | -""" |
2470 | -keys for tables take the format |
2471 | -appname/Grandparent:<ID>/Parent:<ID>/Child:<ID> |
2472 | -for the entity table |
2473 | -""" |
2474 | - |
2475 | - |
2476 | -class ThreadLogger: |
2477 | - def __init__(self, log): |
2478 | - self.logger_ = log |
2479 | - self.log_lock = threading.Lock() |
2480 | - |
2481 | - def debug(self, string): |
2482 | - return |
2483 | - self.log_lock.acquire() |
2484 | - print string |
2485 | - self.logger_.info(string) |
2486 | - self.log_lock.release() |
2487 | - |
2488 | -logger = appscale_logger.getLogger("pb_server") |
2489 | - |
2490 | - |
2491 | -def getTableName(app_id, kind, version): |
2492 | - return app_id + "___" + kind + "___" + version |
2493 | - |
2494 | -def getRowKey(app_id, ancestor_list): |
2495 | - if ancestor_list == None: |
2496 | - logger.debug("Generate row key received null ancestor list") |
2497 | - return "" |
2498 | - |
2499 | - key = app_id |
2500 | - |
2501 | - # Note: mysql cannot have \ as the first char in the row key |
2502 | - for a in ancestor_list: |
2503 | - key += "/" |
2504 | - if a.has_type(): |
2505 | - key += a.type() |
2506 | - |
2507 | - if a.has_id(): |
2508 | - zero_padded_id = ("0" * (ID_KEY_LENGTH - len(str(a.id())))) + str(a.id()) |
2509 | - key += ":" + zero_padded_id |
2510 | - elif a.has_name(): |
2511 | - # append _ if the name is a number, prevents collisions of key names |
2512 | - if a.name().isdigit(): |
2513 | - key += ":__key__" + a.name() |
2514 | - else: |
2515 | - key += ":" + a.name() |
2516 | - return key |
2517 | - |
2518 | - |
2519 | -def getRootKey(app_id, ancestor_list): |
2520 | - key = app_id # mysql cannot have \ as the first char in the row key |
2521 | - a = ancestor_list[0] |
2522 | - key += "/" |
2523 | - |
2524 | - # append _ if the name is a number, prevents collisions of key names |
2525 | - if a.has_type(): |
2526 | - key += a.type() |
2527 | - else: |
2528 | - return None |
2529 | - |
2530 | - if a.has_id(): |
2531 | - zero_padded_id = ("0" * (ID_KEY_LENGTH - len(str(a.id())))) + str(a.id()) |
2532 | - key += ":" + zero_padded_id |
2533 | - elif a.has_name(): |
2534 | - if a.name().isdigit(): |
2535 | - key += ":__key__" + a.name() |
2536 | - else: |
2537 | - key += ":" + a.name() |
2538 | - else: |
2539 | - return None |
2540 | - |
2541 | - return key |
2542 | - |
2543 | - |
2544 | -def getRootKeyFromKeyType(app_id, key): |
2545 | - ancestor_list = key._Key__reference.path().element_list() |
2546 | - return getRootKey(app_id, ancestor_list) |
2547 | - |
2548 | - |
2549 | -def getRowKeyFromKeyType(app_id, key): |
2550 | - ancestor_list = key._Key__reference.path().element_list() |
2551 | - return getRowKey(app_id, ancestor_list) |
2552 | - |
2553 | - |
2554 | -def getRootKeyFromRef(app_id, ref): |
2555 | - if not ref.has_path(): |
2556 | - return False |
2557 | - path = ref.path() |
2558 | - element_list = path.element_list() |
2559 | - return getRootKey(app_id, element_list) |
2560 | - |
2561 | - |
2562 | -# Version must be an int |
2563 | -def getJournalKey(key, version): |
2564 | - key+="/" |
2565 | - zero_padded_version = ("0" * (ID_KEY_LENGTH - len(str(version)))) + str(version) |
2566 | - key += zero_padded_version |
2567 | - return key |
2568 | - |
2569 | - |
2570 | -def getJournalTable(app_id, appscale_version): |
2571 | - return JOURNAL_TABLE + "___" + app_id + "___" + str(appscale_version) |
2572 | - |
2573 | -def generateIDFromServer(app_id): |
2574 | - global keySecret |
2575 | - global keyStart |
2576 | - global keyEnd |
2577 | - global tableServer |
2578 | - key = 0 |
2579 | - if keyStart != keyEnd: |
2580 | - if DEBUG: print "fast track key serving" |
2581 | - if DEBUG: print "Key Start: " + str(keyStart) |
2582 | - if DEBUG: print "Key End: " + str(keyEnd) |
2583 | - key = int(keyStart) |
2584 | - if DEBUG: print "key returned: " + str(key) |
2585 | - keyStart = int(keyStart) + 1 |
2586 | - return key |
2587 | - else: |
2588 | - # Retrieve a new set of keys |
2589 | - if DEBUG: print "slow track key serving" |
2590 | - keyStart = tableServer.get_key_block(app_id, KEYBLOCKSIZE, keySecret) |
2591 | - try: |
2592 | - keyStart = int(keyStart) |
2593 | - except: |
2594 | - # value is not a number |
2595 | - return -1 |
2596 | - keyEnd = keyStart + int(KEYBLOCKSIZE) |
2597 | - if DEBUG: print "Key Start: " + str(keyStart) |
2598 | - if DEBUG: print "Key End: " + str(keyEnd) |
2599 | - key = keyStart |
2600 | - keyStart = keyStart + 1 |
2601 | - if DEBUG: print "key returned: " + str(key) |
2602 | - return key |
2603 | - |
2604 | - |
2605 | -def rollback_function(app_id, trans_id, root_key, change_set): |
2606 | - pass |
2607 | - |
2608 | - |
2609 | -class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): |
2610 | - """ Handle requests in a new thread """ |
2611 | - |
2612 | - |
2613 | -class AppScaleSecureHandler( BaseHTTPRequestHandler ): |
2614 | - """ |
2615 | - Defines what to do when the webserver receives different types of |
2616 | - HTTP requests. |
2617 | - """ |
2618 | - def get_http_err_code( self, message ): |
2619 | - print message |
2620 | - ret = 599 # 599 is the default code for an unknown error # |
2621 | - return ret |
2622 | - |
2623 | - def send_post_http_response( self , http_code , message ): |
2624 | - self.send_response( http_code ) |
2625 | - self.send_header( 'Content-type' , 'text/plain' ) |
2626 | - self.end_headers() |
2627 | - self.wfile.write( message ) |
2628 | - |
2629 | - def send_http_err_response( self, err_msg ): |
2630 | - ret_http_code = self.get_http_err_code( err_msg ) |
2631 | - self.send_post_http_response( ret_http_code, err_msg ) |
2632 | - |
2633 | - # remote api request |
2634 | - # sends back a response |
2635 | - def remote_request(self, app_id, appscale_version, http_request_data): |
2636 | - apirequest = remote_api_pb.Request(http_request_data) |
2637 | - apiresponse = remote_api_pb.Response() |
2638 | - response = None |
2639 | - errcode = 0 |
2640 | - errdetail = "" |
2641 | - apperror_pb = None |
2642 | - |
2643 | - if not apirequest.has_method(): |
2644 | - errcode = datastore_pb.Error.BAD_REQUEST |
2645 | - errdetail = "Method was not set in request" |
2646 | - apirequest.set_method("NOT_FOUND") |
2647 | - if not apirequest.has_request(): |
2648 | - errcode = datastore_pb.Error.BAD_REQUEST |
2649 | - errdetail = "Request missing in call" |
2650 | - apirequest.set_method("NOT_FOUND") |
2651 | - apirequest.clear_request() |
2652 | - method = apirequest.method() |
2653 | - request_data = apirequest.request() |
2654 | - http_request_data = request_data.contents() |
2655 | - |
2656 | - print "REQUEST:",method," AT time",time.time() |
2657 | - if method == "Put": |
2658 | - response, errcode, errdetail = self.put_request(app_id, |
2659 | - appscale_version, |
2660 | - http_request_data) |
2661 | - elif method == "Get": |
2662 | - response, errcode, errdetail = self.get_request(app_id, |
2663 | - appscale_version, |
2664 | - http_request_data) |
2665 | - elif method == "Delete": |
2666 | - response, errcode, errdetail = self.delete_request(app_id, |
2667 | - appscale_version, |
2668 | - http_request_data) |
2669 | - elif method == "RunQuery": |
2670 | - response, errcode, errdetail = self.run_query(app_id, |
2671 | - appscale_version, |
2672 | - http_request_data) |
2673 | - elif method == "BeginTransaction": |
2674 | - response, errcode, errdetail = self.begin_transaction_request(app_id, |
2675 | - appscale_version, |
2676 | - http_request_data) |
2677 | - elif method == "Commit": |
2678 | - response, errcode, errdetail = self.commit_transaction_request(app_id, |
2679 | - appscale_version, |
2680 | - http_request_data) |
2681 | - elif method == "Rollback": |
2682 | - response, errcode, errdetail = self.rollback_transaction_request(app_id, |
2683 | - appscale_version, |
2684 | - http_request_data) |
2685 | - elif method == "AllocateIds": |
2686 | - response, errcode, errdetail = self.allocate_ids_request(app_id, |
2687 | - appscale_version, |
2688 | - http_request_data) |
2689 | - elif method == "CreateIndex": |
2690 | - errcode = datastore_pb.Error.PERMISSION_DENIED |
2691 | - errdetail = "Create Index is not implemented" |
2692 | - logger.debug(errdetail) |
2693 | - """ |
2694 | - response, errcode, errdetail = self.create_index_request(app_id, |
2695 | - appscale_version, |
2696 | - http_request_data) |
2697 | - """ |
2698 | - elif method == "GetIndices": |
2699 | - errcode = datastore_pb.Error.PERMISSION_DENIED |
2700 | - errdetail = "GetIndices is not implemented" |
2701 | - logger.debug(errdetail) |
2702 | - """ |
2703 | - response, errcode, errdetail = self.get_indices_request(app_id, |
2704 | - appscale_version, |
2705 | - http_request_data) |
2706 | - """ |
2707 | - elif method == "UpdateIndex": |
2708 | - errcode = datastore_pb.Error.PERMISSION_DENIED |
2709 | - errdetail = "UpdateIndex is not implemented" |
2710 | - logger.debug(errdetail) |
2711 | - """ |
2712 | - response, errcode, errdetail = self.update_index_request(app_id, |
2713 | - appscale_version, |
2714 | - http_request_data) |
2715 | - """ |
2716 | - elif method == "DeleteIndex": |
2717 | - errcode = datastore_pb.Error.PERMISSION_DENIED |
2718 | - errdetail = "DeleteIndex is not implemented" |
2719 | - logger.debug(errdetail) |
2720 | - |
2721 | - """ |
2722 | - response, errcode, errdetail = self.delete_index_request(app_id, |
2723 | - appscale_version, |
2724 | - http_request_data) |
2725 | - """ |
2726 | - else: |
2727 | - errcode = datastore_pb.Error.BAD_REQUEST |
2728 | - errdetail = "Unknown datastore message" |
2729 | - logger.debug(errdetail) |
2730 | - |
2731 | - |
2732 | - rawmessage = apiresponse.mutable_response() |
2733 | - if response: |
2734 | - rawmessage.set_contents(response) |
2735 | - |
2736 | - if errcode != 0: |
2737 | - apperror_pb = apiresponse.mutable_application_error() |
2738 | - apperror_pb.set_code(errcode) |
2739 | - apperror_pb.set_detail(errdetail) |
2740 | - if errcode != 0: |
2741 | - print "REPLY",method," AT TIME",time.time() |
2742 | - print "errcode:",errcode |
2743 | - print "errdetail:",errdetail |
2744 | - self.send_post_http_response( 200 , apiresponse.Encode() ) |
2745 | - |
2746 | - |
2747 | - def run_query(self, app_id, appscale_version, http_request_data): |
2748 | - global app_datastore |
2749 | - query = datastore_pb.Query(http_request_data) |
2750 | - logger.debug("QUERY:%s" % query) |
2751 | - results = [] |
2752 | - |
2753 | - if not query.has_kind(): |
2754 | - # Return nothing in case of error # |
2755 | - return (api_base_pb.VoidProto().Encode(), |
2756 | - datastore_pb.Error.PERMISSION_DENIED, |
2757 | - "Kindless queries are not implemented.") |
2758 | - else: |
2759 | - kind = query.kind() |
2760 | - #print "Query kind:",kind |
2761 | - |
2762 | - # Verify validity of the entity name and applicaiton id # |
2763 | - # according to the naming sheme for entity tables # |
2764 | - #assert kind[-2:] != "__" |
2765 | - #assert app_id[-1] != "_" |
2766 | - |
2767 | - # Fetch query from the datastore # |
2768 | - table_name = getTableName(app_id, kind, appscale_version) |
2769 | - #print "Query using table name:",table_name |
2770 | - r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA) |
2771 | - #logger.debug("result: %s" % r) |
2772 | - |
2773 | - #err = r[0] |
2774 | - if len(r) > 1: |
2775 | - results = r[1:] |
2776 | - else: |
2777 | - results = [] |
2778 | - |
2779 | - # odds are versions |
2780 | - versions = results[1::2] |
2781 | - # evens are encoded entities |
2782 | - results = results[0::2] |
2783 | - #print "RESULTS:",results |
2784 | - #print "VERSIONS:",versions |
2785 | - if len(versions) != len(results): |
2786 | - return(api_base_pb.VoidProto().Encode(), |
2787 | - datastore_pb.Error.INTERNAL_ERROR, |
2788 | - 'The query had a bad number of results.') |
2789 | - |
2790 | - # convert to objects |
2791 | - # Unless its marked as deleted |
2792 | - # They are currently strings |
2793 | - for index, res in enumerate(results): |
2794 | - results[index] = entity_pb.EntityProto(res) |
2795 | - results[index] = datastore.Entity._FromPb(results[index]) |
2796 | - |
2797 | - logger.debug("====results pre filter====") |
2798 | - #logger.debug("%s" % results) |
2799 | - if query.has_ancestor(): |
2800 | - ancestor_path = query.ancestor().path().element_list() |
2801 | - def is_descendant(entity): |
2802 | - path = entity.key()._Key__reference.path().element_list() |
2803 | - return path[:len(ancestor_path)] == ancestor_path |
2804 | - results = filter(is_descendant, results) |
2805 | - |
2806 | - operators = {datastore_pb.Query_Filter.LESS_THAN: '<', |
2807 | - datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: '<=', |
2808 | - datastore_pb.Query_Filter.GREATER_THAN: '>', |
2809 | - datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=', |
2810 | - datastore_pb.Query_Filter.EQUAL: '==', |
2811 | - } |
2812 | - |
2813 | - for filt in query.filter_list(): |
2814 | - assert filt.op() != datastore_pb.Query_Filter.IN |
2815 | - |
2816 | - prop = filt.property(0).name().decode('utf-8') |
2817 | - op = operators[filt.op()] |
2818 | - |
2819 | - def passes(entity): |
2820 | - """ Returns True if the entity passes the filter, False otherwise. """ |
2821 | - entity_vals = entity.get(prop, []) |
2822 | - if type(entity_vals) != types.ListType: |
2823 | - entity_vals = [entity_vals] |
2824 | - |
2825 | - entity_property_list = [datastore_types.ToPropertyPb(prop, value) for value in entity_vals] |
2826 | - |
2827 | - for entity_prop in entity_property_list: |
2828 | - fixed_entity_val = datastore_types.FromPropertyPb(entity_prop) |
2829 | - |
2830 | - for filter_prop in filt.property_list(): |
2831 | - filter_val = datastore_types.FromPropertyPb(filter_prop) |
2832 | - comp = u'%r %s %r' % (fixed_entity_val, op, filter_val) |
2833 | - logger.debug('Evaling filter expression "%s"' % comp ) |
2834 | - if eval(comp): |
2835 | - return True |
2836 | - return False |
2837 | - |
2838 | - results = filter(passes, results) |
2839 | - |
2840 | - for order in query.order_list(): |
2841 | - prop = order.property().decode('utf-8') |
2842 | - results = [entity for entity in results if prop in entity] |
2843 | - |
2844 | - def order_compare(a, b): |
2845 | - """ Return a negative, zero or positive number depending on whether |
2846 | - entity a is considered smaller than, equal to, or larger than b, |
2847 | - according to the query's orderings. """ |
2848 | - for o in query.order_list(): |
2849 | - prop = o.property().decode('utf-8') |
2850 | - |
2851 | - a_values = a[prop] |
2852 | - if not isinstance(a_values, types.ListType): |
2853 | - a_values = [a_values] |
2854 | - |
2855 | - b_values = b[prop] |
2856 | - if not isinstance(b_values, types.ListType): |
2857 | - b_values = [b_values] |
2858 | - |
2859 | - cmped = cmp(min(a_values), min(b_values)) |
2860 | - |
2861 | - if o.direction() == datastore_pb.Query_Order.DESCENDING: |
2862 | - cmped = -cmped |
2863 | - |
2864 | - if cmped != 0: |
2865 | - return cmped |
2866 | - |
2867 | - return 0 |
2868 | - |
2869 | - results.sort(order_compare) |
2870 | - |
2871 | - if query.has_limit(): |
2872 | - results = results[:query.limit()] |
2873 | - logger.debug("****results after filtering:****") |
2874 | - logger.debug("%s" % results) |
2875 | - |
2876 | - results = [ent._ToPb() for ent in results] |
2877 | - # Pack Results into a clone of QueryResult # |
2878 | - clone_qr_pb = datastore_pb.QueryResult() |
2879 | - for res in results: |
2880 | - clone_qr_pb.add_result() |
2881 | - clone_qr_pb.result_[-1] = res |
2882 | - |
2883 | - clone_qr_pb.clear_cursor() |
2884 | - clone_qr_pb.set_more_results( len(results)>0 ) |
2885 | - |
2886 | - logger.debug("QUERY_RESULT: %s" % clone_qr_pb) |
2887 | - return (clone_qr_pb.Encode(), 0, "") |
2888 | - |
2889 | - |
2890 | - def begin_transaction_request(self, app_id, appscale_version, http_request_data): |
2891 | - transaction_pb = datastore_pb.Transaction() |
2892 | - handle = random.randint(1,1000000) |
2893 | - print "Begin Trans Handle:",handle |
2894 | - transaction_pb.set_handle(handle) |
2895 | - return (transaction_pb.Encode(), 0, "") |
2896 | - |
2897 | - def commit_transaction_request(self, app_id, appscale_version, http_request_data): |
2898 | - commitres_pb = datastore_pb.CommitResponse() |
2899 | - return (commitres_pb.Encode(), 0, "") |
2900 | - |
2901 | - |
2902 | - def rollback_transaction_request(self, app_id, appscale_version, http_request_data): |
2903 | - return (api_base_pb.VoidProto().Encode(), 0, "") |
2904 | - |
2905 | - |
2906 | - def allocate_ids_request(self, app_id, appscale_version, http_request_data): |
2907 | - return (api_base_pb.VoidProto().Encode(), |
2908 | - datastore_pb.Error.PERMISSION_DENIED, |
2909 | - 'Allocation of block ids not implemented.') |
2910 | - |
2911 | - |
2912 | - # Returns Null on error |
2913 | - def getRootKeyFromEntity(self, app_id, entity): |
2914 | - key = entity.key() |
2915 | - if str(key.__class__) == "google.appengine.datastore.entity_pb.Reference": |
2916 | - return getRootKeyFromRef(app_id, key) |
2917 | - else: |
2918 | - return getRootKeyFromKeyType(app_id, key) |
2919 | - |
2920 | - |
2921 | - # For transactions |
2922 | - # Verifies all puts are apart of the same root entity |
2923 | - def getRootKeyFromTransPut(self, app_id, putreq_pb): |
2924 | - ent_list = [] |
2925 | - if putreq_pb.entity_size() > 0: |
2926 | - ent_list = putreq_pb.entity_list() |
2927 | - first_ent = ent_list[0] |
2928 | - expected_root = self.getRootKeyFromEntity(app_id, first_ent) |
2929 | - # It is possible that all roots are None |
2930 | - # because it is a root that has not gotten a uid |
2931 | - |
2932 | - for e in ent_list: |
2933 | - root = self.getRootKeyFromEntity(app_id, e) |
2934 | - if root != expected_root: |
2935 | - errcode = datastore_pb.Error.BAD_REQUEST |
2936 | - errdetail = "All puts must be a part of the same group" |
2937 | - return (None, errcode, errdetail) |
2938 | - |
2939 | - return (expected_root, 0, "") |
2940 | - |
2941 | - |
2942 | - # For transactions |
2943 | - # Verifies all puts are apart of the same root entity |
2944 | - def getRootKeyFromTransReq(self, app_id, req_pb): |
2945 | - if req_pb.key_size() <= 0: |
2946 | - errcode = datastore_pb.error.bad_request |
2947 | - errdetail = "Bad key listing" |
2948 | - return (None, errcode, errdetail) |
2949 | - |
2950 | - key_list = req_pb.key_list() |
2951 | - first_key = key_list[0] |
2952 | - |
2953 | - expected_root = getRootKeyFromRef(app_id, first_key) |
2954 | - # It is possible that all roots are None |
2955 | - # because it is a root that has not gotten a uid |
2956 | - |
2957 | - for k in key_list: |
2958 | - root = getRootKeyFromRef(app_id, k) |
2959 | - if root != expected_root: |
2960 | - errcode = datastore_pb.error.bad_request |
2961 | - errdetail = "all transaction gets must be a part of the same group" |
2962 | - return (None, errcode, errdetail) |
2963 | - |
2964 | - return (expected_root, 0, "") |
2965 | - |
2966 | - def getRootKeyFromTransGet(self, app_id, get_pb): |
2967 | - return self.getRootKeyFromTransReq(app_id, get_pb) |
2968 | - |
2969 | - def getRootKeyFromTransDel(self, app_id, del_pb): |
2970 | - return self.getRootKeyFromTransReq(app_id, del_pb) |
2971 | - |
2972 | - def put_request(self, app_id, appscale_version, http_request_data): |
2973 | - global app_datastore |
2974 | - global keySecret |
2975 | - global tableHashTable |
2976 | - |
2977 | - field_name_list = [] |
2978 | - field_value_list = [] |
2979 | - |
2980 | - start_time = time.time() |
2981 | - putreq_pb = datastore_pb.PutRequest(http_request_data) |
2982 | - logger.debug("RECEIVED PUT_REQUEST %s" % putreq_pb) |
2983 | - putresp_pb = datastore_pb.PutResponse( ) |
2984 | - txn = None |
2985 | - root_key = None |
2986 | - # Must assign an id if a put is being done in a transaction |
2987 | - # and it does not have an id and it is a root |
2988 | - for e in putreq_pb.entity_list(): |
2989 | - # Only dealing with root puts |
2990 | - if e.key().path().element_size() == 1: |
2991 | - root_path = e.key().path().mutable_element(0) |
2992 | - #print "has id:",root_path.has_id(), "has name:",root_path.has_name() |
2993 | - if root_path.id() == 0 and not root_path.has_name(): |
2994 | - #new_key = root_key + "/" + last_path.type() |
2995 | - uid = generateIDFromServer(app_id) |
2996 | - #print "Assigned uid to new root key:",str(uid) |
2997 | - if uid <= 0: |
2998 | - return (putresp_pb.Encode(), |
2999 | - datastore_pb.Error.INTERNAL_ERROR, |
3000 | - 'Unable to assign a unique id') |
3001 | - root_path.set_id(uid) |
3002 | - |
3003 | - # Gather data from Put Request # |
3004 | - #print "Entity list for put:" |
3005 | - #print putreq_pb.entity_list() |
3006 | - for e in putreq_pb.entity_list(): |
3007 | - |
3008 | - for prop in e.property_list() + e.raw_property_list(): |
3009 | - if prop.value().has_uservalue(): |
3010 | - obuid = md5.new(prop.value().uservalue().email().lower()).digest() |
3011 | - obuid = '1' + ''.join(['%02d' % ord(x) for x in obuid])[:20] |
3012 | - prop.mutable_value().mutable_uservalue().set_obfuscated_gaiaid( |
3013 | - obuid) |
3014 | - |
3015 | - ################################# |
3016 | - # Key Assignment for new entities |
3017 | - ################################# |
3018 | - e.mutable_key().set_app(app_id) |
3019 | - |
3020 | - root_type = e.key().path().element(0).type() |
3021 | - if root_type[-2:] == "__": |
3022 | - return(putresp_pb.Encode(), |
3023 | - datastore_pb.Error.PERMISSION_DENIED, |
3024 | - "Illegal type name contains reserved delimiters \"__\"") |
3025 | - |
3026 | - last_path = e.key().path().element_list()[-1] |
3027 | - uid = last_path.id() |
3028 | - kind = last_path.type() |
3029 | - # this object has no assigned id thus far |
3030 | - if last_path.id() == 0 and not last_path.has_name(): |
3031 | - if e.key().path().element_size() == 1: |
3032 | - root_key = None |
3033 | - if root_key: |
3034 | - child_key = root_key + "/" + last_path.type() |
3035 | - else: |
3036 | - child_key = None |
3037 | - # if the root is None or the child is None, |
3038 | - # then the global counter is used |
3039 | - # gen unique id only wants to know if a child exist |
3040 | - uid = generateIDFromServer(app_id) |
3041 | - if uid <= 0: |
3042 | - return(putresp_pb.Encode(), |
3043 | - datastore_pb.Error.INTERNAL_ERROR, |
3044 | - "Unable to assign id to entity") |
3045 | - last_path.set_id(uid) |
3046 | - # It may be its own parent |
3047 | - group = e.mutable_entity_group() |
3048 | - root = e.key().path().element(0) |
3049 | - group.add_element().CopyFrom(root) |
3050 | - if last_path.has_name(): |
3051 | - uid = last_path.name() |
3052 | - # It may be its own parent |
3053 | - group = e.mutable_entity_group() |
3054 | - if group.element_size() == 0: |
3055 | - root = e.key().path().element(0) |
3056 | - group.add_element().CopyFrom(root) |
3057 | - |
3058 | - ####################################### |
3059 | - # Done with key assignment |
3060 | - # Notify Soap Server of any new tables |
3061 | - ####################################### |
3062 | - #print "Putting of type:",kind,"with uid of",str(uid) |
3063 | - # insert key |
3064 | - table_name = getTableName(app_id, kind, appscale_version) |
3065 | - #print "Put Using table name:",table_name |
3066 | - # Notify Users/Apps table if a new class is being added |
3067 | - if table_name not in tableHashTable: |
3068 | - # This is the first time this pbserver has seen this table |
3069 | - # Notify the User/Apps server via soap call |
3070 | - # This function is reentrant |
3071 | - # If the class was deleted, and added a second time there is no |
3072 | - # notifying the users/app server of its creation |
3073 | - if tableServer.add_class(app_id, kind, keySecret) == "true": |
3074 | - tableHashTable[table_name] = 1 |
3075 | - |
3076 | - # Store One Entity # |
3077 | - logger.debug("put: %s___%s___%s with id: %s" % (app_id, |
3078 | - kind, |
3079 | - appscale_version, |
3080 | - str(uid))) |
3081 | - |
3082 | - row_key = getRowKey(app_id, e.key().path().element_list()) |
3083 | - inter_time = time.time() |
3084 | - logger.debug("Time spent in put before datastore call: " + str(inter_time - start_time)) |
3085 | - |
3086 | - |
3087 | - field_name_list = ENTITY_TABLE_SCHEMA |
3088 | - field_value_list = [e.Encode(), NONEXISTANT_TRANSACTION] |
3089 | - err, res = app_datastore.put_entity( table_name, |
3090 | - row_key, |
3091 | - field_name_list, |
3092 | - field_value_list) |
3093 | - |
3094 | - if err not in ERROR_CODES: |
3095 | - #_trans_set.purge(txn) |
3096 | - return (putresp_pb.Encode(), |
3097 | - datastore_pb.Error.INTERNAL_ERROR, |
3098 | - err) |
3099 | - |
3100 | - putresp_pb.key_list().append(e.key()) |
3101 | - |
3102 | - inter_time = time.time() |
3103 | - logger.debug("Time spent in put after datastore call: " + str(inter_time - start_time)) |
3104 | - logger.debug( "PUT_RESPONSE:%s" % putresp_pb) |
3105 | - return (putresp_pb.Encode(), 0, "") |
3106 | - |
3107 | - |
3108 | - def get_request(self, app_id, appscale_version, http_request_data): |
3109 | - global app_datastore |
3110 | - getreq_pb = datastore_pb.GetRequest(http_request_data) |
3111 | - logger.debug("GET_REQUEST: %s" % getreq_pb) |
3112 | - getresp_pb = datastore_pb.GetResponse() |
3113 | - |
3114 | - |
3115 | - for key in getreq_pb.key_list(): |
3116 | - key.set_app(app_id) |
3117 | - last_path = key.path().element_list()[-1] |
3118 | - |
3119 | - if last_path.has_id(): |
3120 | - entity_id = last_path.id() |
3121 | - |
3122 | - if last_path.has_name(): |
3123 | - entity_id = last_path.name() |
3124 | - |
3125 | - if last_path.has_type(): |
3126 | - kind = last_path.type() |
3127 | - logger.debug("get: %s___%s___%s %s" % (app_id, kind, appscale_version, str(entity_id))) |
3128 | - table_name = getTableName(app_id, kind, appscale_version) |
3129 | - row_key = getRowKey(app_id,key.path().element_list()) |
3130 | - print "Get row key:",row_key |
3131 | - r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA ) |
3132 | - err = r[0] |
3133 | - if err not in ERROR_CODES or len(r) != 3: |
3134 | - r = ["",None,NONEXISTANT_TRANSACTION] |
3135 | - print err |
3136 | - entity = r[1] |
3137 | - prev_version = long(r[2]) |
3138 | - |
3139 | - group = getresp_pb.add_entity() |
3140 | - if entity: |
3141 | - e_pb = entity_pb.EntityProto( entity ) |
3142 | - group.mutable_entity().CopyFrom(e_pb) |
3143 | - |
3144 | - # Send Response # |
3145 | - print getresp_pb |
3146 | - logger.debug("GET_RESPONSE: %s" % getresp_pb) |
3147 | - return (getresp_pb.Encode(), 0, "") |
3148 | - |
3149 | - """ Deletes are just PUTs using a sentinal value of DELETED |
3150 | - All deleted keys are DELETED/entity_group. This is for |
3151 | - rollback to know which entity group a possible failed |
3152 | - transaction belongs to. |
3153 | - """ |
3154 | - def delete_request(self, app_id, appscale_version, http_request_data): |
3155 | - global app_datastore |
3156 | - root_key = None |
3157 | - txn = None |
3158 | - logger.debug("DeleteRequest Received...") |
3159 | - delreq_pb = datastore_pb.DeleteRequest( http_request_data ) |
3160 | - logger.debug("DELETE_REQUEST: %s" % delreq_pb) |
3161 | - delresp_pb = api_base_pb.VoidProto() |
3162 | - |
3163 | - for key in delreq_pb.key_list(): |
3164 | - key.set_app(app_id) |
3165 | - last_path = key.path().element_list()[-1] |
3166 | - if last_path.has_type(): |
3167 | - kind = last_path.type() |
3168 | - |
3169 | - row_key = getRowKey(app_id, key.path().element_list()) |
3170 | - |
3171 | - |
3172 | - table_name = getTableName(app_id, kind, appscale_version) |
3173 | - res = app_datastore.delete_row( table_name, |
3174 | - row_key) |
3175 | - err = res[0] |
3176 | - logger.debug("Response from DB for delete request %s" % err) |
3177 | - if err not in ERROR_CODES: |
3178 | - if DEBUG: print err |
3179 | - return (delresp_pb.Encode(), |
3180 | - datastore_pb.Error.INTERNAL_ERROR, |
3181 | - err + ", Unable to write to journal") |
3182 | - |
3183 | - return (delresp_pb.Encode(), 0, "") |
3184 | - |
3185 | - |
3186 | - def optimized_delete_request(self, app_id, appscale_version, http_request_data): |
3187 | - pass |
3188 | - def run_optimized_query(self, app_id, appscale_version, http_request_data): |
3189 | - return |
3190 | - def optimized_put_request(self, app_id, appscale_version, http_request_data): |
3191 | - pass |
3192 | - |
3193 | - def void_proto(self, app_id, appscale_version, http_request_data): |
3194 | - resp_pb = api_base_pb.VoidProto() |
3195 | - print "Got void" |
3196 | - logger.debug("VOID_RESPONSE: %s to void" % resp_pb) |
3197 | - return (resp_pb.Encode(), 0, "" ) |
3198 | - |
3199 | - def str_proto(self, app_id, appscale_version, http_request_data): |
3200 | - str_pb = api_base_pb.StringProto( http_request_data ) |
3201 | - composite_pb = datastore_pb.CompositeIndices() |
3202 | - print "Got a string proto" |
3203 | - print str_pb |
3204 | - logger.debug("String proto received: %s"%str_pb) |
3205 | - logger.debug("CompositeIndex response to string: %s" % composite_pb) |
3206 | - return (composite_pb.Encode(), 0, "" ) |
3207 | - |
3208 | - def int64_proto(self, app_id, appscale_version, http_request_data): |
3209 | - int64_pb = api_base_pb.Integer64Proto( http_request_data ) |
3210 | - resp_pb = api_base_pb.VoidProto() |
3211 | - print "Got a int 64" |
3212 | - print int64_pb |
3213 | - logger.debug("Int64 proto received: %s"%int64_pb) |
3214 | - logger.debug("VOID_RESPONSE to int64: %s" % resp_pb) |
3215 | - return (resp_pb.Encode(), 0, "") |
3216 | - |
3217 | - def compositeindex_proto(self, app_id, appscale_version, http_request_data): |
3218 | - compindex_pb = entity_pb.CompositeIndex( http_request_data) |
3219 | - resp_pb = api_base_pb.VoidProto() |
3220 | - print "Got Composite Index" |
3221 | - print compindex_pb |
3222 | - logger.debug("CompositeIndex proto recieved: %s"%str(compindex_pb)) |
3223 | - logger.debug("VOID_RESPONSE to composite index: %s" % resp_pb) |
3224 | - return (resp_pb.Encode(), 0, "") |
3225 | - |
3226 | -# Returns 0 on success, 1 on failure |
3227 | - def create_index_tables(self, app_id): |
3228 | - global app_datastore |
3229 | - """table_name = "__" + app_id + "__" + "kind" |
3230 | - columns = ["reference"] |
3231 | - print "Building table: " + table_name |
3232 | - returned = app_datastore.create_table( table_name, columns ) |
3233 | - err,res = returned |
3234 | - if err not in ERROR_CODES: |
3235 | - logger.debug("%s" % err) |
3236 | - return 1 |
3237 | - """ |
3238 | - table_name = "__" + app_id + "__" + "single_prop_asc" |
3239 | - print "Building table: " + table_name |
3240 | - columns = ["reference"] |
3241 | - returned = app_datastore.create_table( table_name, columns ) |
3242 | - err,res = returned |
3243 | - if err not in ERROR_CODES: |
3244 | - logger.debug("%s" % err) |
3245 | - return 1 |
3246 | - |
3247 | - table_name = "__" + app_id + "__" + "single_prop_desc" |
3248 | - print "Building table: " + table_name |
3249 | - returned = app_datastore.create_table( table_name, columns ) |
3250 | - err,res = returned |
3251 | - if err not in ERROR_CODES: |
3252 | - logger.debug("%s" % err) |
3253 | - return 1 |
3254 | - |
3255 | - table_name = "__" + app_id + "__" + "composite" |
3256 | - print "Building table: " + table_name |
3257 | - returned = app_datastore.create_table( table_name, columns ) |
3258 | - err,res = returned |
3259 | - if err not in ERROR_CODES: |
3260 | - logger.debug("%s" % err) |
3261 | - return 1 |
3262 | - |
3263 | - return 0 |
3264 | - |
3265 | - ############## |
3266 | - # OTHER TYPE # |
3267 | - ############## |
3268 | - def unknown_request(self, app_id, appscale_version, http_request_data, pb_type): |
3269 | - logger.debug("Received Unknown Protocol Buffer %s" % pb_type ) |
3270 | - print "ERROR: Received Unknown Protocol Buffer <" + pb_type +">.", |
3271 | - print "Nothing has been implemented to handle this Protocol Buffer type." |
3272 | - print "http request data:" |
3273 | - print http_request_data |
3274 | - print "http done" |
3275 | - self.void_proto(app_id, appscale_version, http_request_data) |
3276 | - |
3277 | - ######################## |
3278 | - # GET Request Handling # |
3279 | - ######################## |
3280 | - def do_GET( self ): |
3281 | - self.send_error( 404 , 'File Not Found: %s' % self.path ) |
3282 | - |
3283 | - ######################### |
3284 | - # POST Request Handling # |
3285 | - ######################### |
3286 | - def do_POST( self ): |
3287 | - start_time = time.time() |
3288 | - http_request_data = self.rfile.read(int(self.headers.getheader('content-length'))) |
3289 | - inter_time = time.time() |
3290 | - logger.debug("Timing for pre pre env setup:" + " " + |
3291 | - str(start_time) + " " + str(inter_time) + |
3292 | - " total time: " + str(inter_time - start_time) + "\n") |
3293 | - print "intertime - starttime:",(str(inter_time - start_time)) |
3294 | - pb_type = self.headers.getheader( 'protocolbuffertype' ) |
3295 | - app_data = self.headers.getheader('appdata') |
3296 | - app_data = app_data.split(':') |
3297 | - logger.debug("POST len: %d" % len(app_data)) |
3298 | - inter_time = time.time() |
3299 | - logger.debug("Timing for pre env setup:" + " " + |
3300 | - str(start_time) + " " + str(inter_time) + |
3301 | - " total time: " + str(inter_time - start_time) + "\n") |
3302 | - |
3303 | - if len(app_data) == 5: |
3304 | - app_id, user_email, nick_name, auth_domain, appscale_version = app_data |
3305 | - os.environ['AUTH_DOMAIN'] = auth_domain |
3306 | - os.environ['USER_EMAIL'] = user_email |
3307 | - os.environ['USER_NICKNAME'] = nick_name |
3308 | - os.environ['APPLICATION_ID'] = app_id |
3309 | - elif len(app_data) == 4: |
3310 | - app_id, user_email, nick_name, auth_domain = app_data |
3311 | - os.environ['AUTH_DOMAIN'] = auth_domain |
3312 | - os.environ['USER_EMAIL'] = user_email |
3313 | - os.environ['USER_NICKNAME'] = nick_name |
3314 | - os.environ['APPLICATION_ID'] = app_id |
3315 | - appscale_version = "1" |
3316 | - elif len(app_data) == 2: |
3317 | - app_id, appscale_version = app_data |
3318 | - app_id = app_data[0] |
3319 | - os.environ['APPLICATION_ID'] = app_id |
3320 | - elif len(app_data) == 1: |
3321 | - app_id = app_data[0] |
3322 | - os.environ['APPLICATION_ID'] = app_id |
3323 | - appscale_version = "1" |
3324 | - else: |
3325 | - logger.debug("UNABLE TO EXTRACT APPLICATION DATA") |
3326 | - return |
3327 | - |
3328 | - # Default HTTP Response Data # |
3329 | - logger.debug("For app id: " + app_id) |
3330 | - logger.debug("For app version: " + appscale_version) |
3331 | - inter_time = time.time() |
3332 | - logger.debug("Timing for env setup:" + pb_type + " " + |
3333 | - app_id + " " + str(start_time) + " " + |
3334 | - str(inter_time) + " total time: " + str(inter_time - start_time) + "\n") |
3335 | - |
3336 | - if pb_type == "Request": |
3337 | - self.remote_request(app_id, appscale_version, http_request_data) |
3338 | - else: |
3339 | - self.unknown_request(app_id, appscale_version, http_request_data, pb_type) |
3340 | - |
3341 | - stop_time = time.time() |
3342 | - |
3343 | - #if logOn == True: |
3344 | - #logFilePtr.write(pb_type + " " + app_id + " " + |
3345 | - #str(start_time) + " " + str(stop_time) + " total time: " + |
3346 | - #str(stop_time - start_time) + "\n") |
3347 | - |
3348 | - logger.debug(pb_type + " " + app_id + " " + str(start_time) + " " + |
3349 | - str(stop_time) + " total time: " + str(stop_time - start_time) + "\n") |
3350 | - |
3351 | -class AppScaleUnSecureServerThreaded( ThreadingMixIn, HTTPServer): |
3352 | - pass |
3353 | - |
3354 | -class AppScaleSecureServerThreaded( ThreadingMixIn, HTTPServer ): |
3355 | - def __init__( self): |
3356 | - global local_server_address |
3357 | - global HandlerClass |
3358 | - global ssl_cert_file |
3359 | - global ssl_key_file |
3360 | - BaseServer.__init__( self, local_server_address, HandlerClass ) |
3361 | - ctx = SSL.Context() |
3362 | - ctx.load_cert( ssl_cert_file, ssl_key_file ) |
3363 | - self.socket = SSL.Connection( ctx ) |
3364 | - self.server_bind() |
3365 | - self.server_activate() |
3366 | - |
3367 | -def usage(): |
3368 | - print "AppScale Server" |
3369 | |
3370 | - print "Options:" |
3371 | - print "\t--certificate=<path-to-ssl-certificate>" |
3372 | - print "\t--a=<soap server hostname> " |
3373 | - print "\t--key for using keys from the soap server" |
3374 | - print "\t--type=<hypertable, hbase, cassandra, mysql, mongodb>" |
3375 | - print "\t--secret=<secrete to soap server>" |
3376 | - print "\t--blocksize=<key-block-size>" |
3377 | - print "\t--optimized_query" |
3378 | - print "\t--no_encryption" |
3379 | -def main(argv): |
3380 | - global app_datastore |
3381 | - global getKeyFromServer |
3382 | - global tableServer |
3383 | - global keySecret |
3384 | - global logOn |
3385 | - global logFilePtr |
3386 | - global optimizedQuery |
3387 | - global soapServer |
3388 | - global ERROR_CODES |
3389 | - global VALID_DATASTORES |
3390 | - global KEYBLOCKSIZE |
3391 | - cert_file = CERT_LOCATION |
3392 | - key_file = KEY_LOCATION |
3393 | - db_type = "hypertable" |
3394 | - port = DEFAULT_SSL_PORT |
3395 | - isEncrypted = True |
3396 | - try: |
3397 | - opts, args = getopt.getopt( argv, "c:t:l:s:b:a:k:p:o:n:z", |
3398 | - ["certificate=", |
3399 | - "type=", |
3400 | - "log=", |
3401 | - "secret=", |
3402 | - "blocksize=", |
3403 | - "soap=", |
3404 | - "key", |
3405 | - "port", |
3406 | - "optimized_query", |
3407 | - "no_encryption", |
3408 | - "zoo_keeper"] ) |
3409 | - except getopt.GetoptError: |
3410 | - usage() |
3411 | - sys.exit(1) |
3412 | - for opt, arg in opts: |
3413 | - if opt in ("-c", "--certificate"): |
3414 | - cert_file = arg |
3415 | - print "Using cert..." |
3416 | - elif opt in ("-k", "--key" ): |
3417 | - getKeyFromServer = True |
3418 | - print "Using key server..." |
3419 | - elif opt in ("-t", "--type"): |
3420 | - db_type = arg |
3421 | - print "Datastore type: ",db_type |
3422 | - elif opt in ("-s", "--secret"): |
3423 | - keySecret = arg |
3424 | - print "Secret set..." |
3425 | - elif opt in ("-l", "--log"): |
3426 | - logOn = True |
3427 | - logFile = arg |
3428 | - logFilePtr = open(logFile, "w") |
3429 | - logFilePtr.write("# type, app, start, end\n") |
3430 | - elif opt in ("-b", "--blocksize"): |
3431 | - KEYBLOCKSIZE = arg |
3432 | - print "Block size: ",KEYBLOCKSIZE |
3433 | - elif opt in ("-a", "--soap"): |
3434 | - soapServer = arg |
3435 | - elif opt in ("-o", "--optimized_query"): |
3436 | - optimizedQuery = True |
3437 | - elif opt in ("-p", "--port"): |
3438 | - port = int(arg) |
3439 | - elif opt in ("-n", "--no_encryption"): |
3440 | - isEncrypted = False |
3441 | - elif opt in ("-z", "--zoo_keeper"): |
3442 | - print "This version does not use zoo keeper" |
3443 | - exit(1) |
3444 | - |
3445 | - app_datastore = appscale_datastore.DatastoreFactory.getDatastore(db_type) |
3446 | - ERROR_CODES = appscale_datastore.DatastoreFactory.error_codes() |
3447 | - VALID_DATASTORES = appscale_datastore.DatastoreFactory.valid_datastores() |
3448 | - if DEBUG: print "ERROR_CODES:" |
3449 | - if DEBUG: print ERROR_CODES |
3450 | - if DEBUG: print "VALID_DATASTORE:" |
3451 | - if DEBUG: print VALID_DATASTORES |
3452 | - if db_type in VALID_DATASTORES: |
3453 | - logger.debug("Using datastore %s" % db_type) |
3454 | - else: |
3455 | - print "Unknown datastore "+ db_type |
3456 | - exit(1) |
3457 | - |
3458 | - tableServer = SOAPpy.SOAPProxy("https://" + soapServer + ":" + str(keyPort)) |
3459 | - |
3460 | - # # Bind Port # |
3461 | - #server = AppScaleSecureServer( ('',DEFAULT_SSL_PORT), |
3462 | - # AppScaleSecureHandler, cert_file, key_file ) |
3463 | - #help(ThreadedHTTPServer) |
3464 | - global local_server_address |
3465 | - global HandlerClass |
3466 | - global ssl_cert_file |
3467 | - global ssl_key_file |
3468 | - global keyDictionaryLock |
3469 | - |
3470 | - keyDictionaryLock = threading.Lock() |
3471 | - if port == DEFAULT_SSL_PORT and not isEncrypted: |
3472 | - port = DEFAULT_PORT |
3473 | - local_server_address = ('',port) |
3474 | - HandlerClass = AppScaleSecureHandler |
3475 | - ssl_cert_file = cert_file |
3476 | - ssl_key_file = key_file |
3477 | - if isEncrypted: |
3478 | - server = AppScaleSecureServerThreaded() |
3479 | - else: |
3480 | - server = AppScaleUnSecureServerThreaded(local_server_address, HandlerClass) |
3481 | - sa = server.socket.getsockname() |
3482 | - if not db_type == "timesten": |
3483 | - # Stop running as root, security purposes # |
3484 | - drop_privileges() |
3485 | - logger.debug("\n\nStarting AppScale-Secure-Server on %s:%s" % (sa[0], sa[1])) |
3486 | - |
3487 | - while 1: |
3488 | - try: |
3489 | - # Start Server # |
3490 | - server.serve_forever() |
3491 | - except SSL.SSLError: |
3492 | - logger.debug("\n\nUnexcepted input for AppScale-Secure-Server on %s:%s" % (sa[0], sa[1])) |
3493 | - except KeyboardInterrupt: |
3494 | - server.socket.close() |
3495 | - print "Server interrupted by user, terminating..." |
3496 | - exit(1) |
3497 | - |
3498 | -if __name__ == '__main__': |
3499 | - #cProfile.run("main(sys.argv[1:])") |
3500 | - main(sys.argv[1:]) |
3501 | |
3502 | === removed file 'AppDB/as_transaction.py' |
3503 | --- AppDB/as_transaction.py 2010-05-10 21:08:10 +0000 |
3504 | +++ AppDB/as_transaction.py 1970-01-01 00:00:00 +0000 |
3505 | @@ -1,241 +0,0 @@ |
3506 | -import threading |
3507 | -import time |
3508 | -import sys |
3509 | -from google.net.proto import ProtocolBuffer |
3510 | -from google.appengine.datastore import datastore_pb |
3511 | -from google.appengine.runtime import apiproxy_errors |
3512 | -GC_PERIOD = 100 |
3513 | -TRANS_TIMEOUT = 60 |
3514 | -class ASTransaction: |
3515 | - def __init__(self, handle, entity_group = None, lease = 0): |
3516 | - self.handle = handle |
3517 | - self.group = entity_group |
3518 | - self.lease = lease |
3519 | - if lease == 0: |
3520 | - self.lease = time.time() + TRANS_TIMEOUT |
3521 | - self.change_set = None |
3522 | - |
3523 | - |
3524 | - def addModifiedKey(self, key): |
3525 | - if not self.change_set: |
3526 | - self.change_set = set() |
3527 | - self.change_set.add(key) |
3528 | - |
3529 | - |
3530 | - def getModifiedKeysCopy(self): |
3531 | - if self.change_set == None: |
3532 | - return None |
3533 | - return self.change_set.copy() |
3534 | - |
3535 | - |
3536 | - def __str__(self): |
3537 | - s = "handle: " + str(self.handle) + "\n" |
3538 | - s += "group: " + str(self.group) + "\n" |
3539 | - s += "lease:" + str(self.lease) + "\n" |
3540 | - s += "change set:" + str(self.change_set) |
3541 | - return s |
3542 | - |
3543 | - |
3544 | - def __def__(self): |
3545 | - del self.change_set |
3546 | - |
3547 | - |
3548 | - |
3549 | -#Thread safe set |
3550 | -class ASTransSet: |
3551 | - def __init__(self): |
3552 | - self._txes = {} |
3553 | - self._txes_lock = threading.Lock() |
3554 | - self.startGC() |
3555 | - |
3556 | - def startGC(self): |
3557 | - self.gcthread = threading.Thread(target = self.__gcRunner) |
3558 | - self.gcthread.start() |
3559 | - |
3560 | - def __gcRunner(self): |
3561 | - |
3562 | - while 1: |
3563 | - time.sleep(GC_PERIOD) |
3564 | - del_list = [] |
3565 | - cur_time = time.time() |
3566 | - self._txes_lock.acquire() |
3567 | - for ii in self._txes: |
3568 | - if self._txes[ii].lease <= cur_time: |
3569 | - del_list.append(ii) |
3570 | - # iterate backwards to make deletes safe |
3571 | - del_list.reverse() |
3572 | - for ii in del_list: |
3573 | - del self._txes[ii] |
3574 | - self._txes_lock.release() |
3575 | - |
3576 | - |
3577 | - # Takes a transaction_pb |
3578 | - # Checks to see if the transaction exist and if |
3579 | - # it has timed out |
3580 | - def isValid(self, txn): |
3581 | - handle = None |
3582 | - if not txn.has_handle(): |
3583 | - return False |
3584 | - |
3585 | - handle = txn.handle() |
3586 | - curtime = time.time() |
3587 | - self._txes_lock.acquire() |
3588 | - if handle not in self._txes: |
3589 | - self._txes_lock.release() |
3590 | - return False |
3591 | - else: |
3592 | - if self._txes[handle].lease <= curtime: |
3593 | - self._txes_lock.release() |
3594 | - return False |
3595 | - self._txes_lock.release() |
3596 | - |
3597 | - return True |
3598 | - |
3599 | - |
3600 | - # Takes a transaction_pb |
3601 | - def needsLock(self, txn): |
3602 | - handle = txn.handle() |
3603 | - self._txes_lock.acquire() |
3604 | - txn_copy = self._txes[handle] |
3605 | - if txn_copy.group == None: |
3606 | - self._txes_lock.release() |
3607 | - return True |
3608 | - else: |
3609 | - self._txes_lock.release() |
3610 | - return False |
3611 | - |
3612 | - |
3613 | - def hasLockExpired(self, txn): |
3614 | - cur_time = time.time() |
3615 | - handle = txn.handle() |
3616 | - self._txes_lock.acquire() |
3617 | - txn_copy = self._txes[handle] |
3618 | - if txn_copy.lease <= cur_time: |
3619 | - print "Cur",str(cur_time),"lease",str(txn_copy.lease) |
3620 | - self._txes_lock.release() |
3621 | - return True |
3622 | - else: |
3623 | - self._txes_lock.release() |
3624 | - return False |
3625 | - |
3626 | - |
3627 | - # Takes a transaction_pb |
3628 | - def purge(self, txn): |
3629 | - if not txn: |
3630 | - return |
3631 | - handle = txn.handle() |
3632 | - self._txes_lock.acquire() |
3633 | - if handle in self._txes: |
3634 | - del self._txes[handle] |
3635 | - self._txes_lock.release() |
3636 | - |
3637 | - |
3638 | - # Takes a transaction_pb |
3639 | - def getGroup(self, txn): |
3640 | - handle = txn.handle() |
3641 | - group = None |
3642 | - self._txes_lock.acquire() |
3643 | - if handle in self._txes: |
3644 | - group = self._txes[handle].group |
3645 | - self._txes_lock.release() |
3646 | - return group |
3647 | - |
3648 | - # Takes a transaction_pb |
3649 | - def add(self, txn): |
3650 | - handle = txn.handle() |
3651 | - tx = ASTransaction(handle) |
3652 | - self._txes_lock.acquire() |
3653 | - if handle in self._txes: |
3654 | - self._txes_lock.release() |
3655 | - return False |
3656 | - self._txes[handle] = tx |
3657 | - self._txes_lock.release() |
3658 | - return True |
3659 | - |
3660 | - |
3661 | - def setLease(self, txn, lease): |
3662 | - handle = txn.handle() |
3663 | - self._txes_lock.acquire() |
3664 | - txn_copy = self._txes[handle] |
3665 | - txn_copy.lease = lease |
3666 | - self._txes[handle] = txn_copy |
3667 | - self._txes_lock.release() |
3668 | - |
3669 | - |
3670 | - def addChangeSet(self, txn, key): |
3671 | - handle = txn.handle() |
3672 | - self._txes_lock.acquire() |
3673 | - txn_copy = self._txes[handle] |
3674 | - txn_copy.addModifiedKey(key) |
3675 | - self._txes[handle] = txn_copy |
3676 | - self._txes_lock.release() |
3677 | - |
3678 | - |
3679 | - def getChangeSet(self, txn): |
3680 | - handle = txn.handle() |
3681 | - self._txes_lock.acquire() |
3682 | - txn_copy = self._txes[handle] |
3683 | - change_set = txn_copy.getModifiedKeysCopy() |
3684 | - self._txes_lock.release() |
3685 | - return change_set |
3686 | - |
3687 | - |
3688 | - def setGroup(self, txn, group): |
3689 | - handle = txn.handle() |
3690 | - self._txes_lock.acquire() |
3691 | - txn_copy = self._txes[handle] |
3692 | - txn_copy.group = group |
3693 | - self._txes[handle] = txn_copy |
3694 | - self._txes_lock.release() |
3695 | - |
3696 | - def printSet(self): |
3697 | - print "=============================" |
3698 | - for ii in self._txes: |
3699 | - print self._txes[ii] |
3700 | - print "=============================" |
3701 | - |
3702 | - |
3703 | -###################### |
3704 | -# Unit testing |
3705 | -###################### |
3706 | -def main(argv): |
3707 | - tran_pb = datastore_pb.Transaction() |
3708 | - tran_pb.set_handle(500) |
3709 | - tran_set = ASTransSet() |
3710 | - tran_set.add(tran_pb) |
3711 | - print "Needs the lock without the group set:" + str(tran_set.needsLock(tran_pb)) |
3712 | - tran_set.setGroup(tran_pb, "my_group") |
3713 | - print "Does not Need lock with the group set:" + str(tran_set.needsLock(tran_pb)) |
3714 | - tran_set.setLease(tran_pb, time.time() + .02) |
3715 | - tran_set.addChangeSet(tran_pb, "first key") |
3716 | - tran_set.addChangeSet(tran_pb, "second key") |
3717 | - tran_set.printSet() |
3718 | - print "Has the lock expired (no)?:" + str(tran_set.hasLockExpired(tran_pb)) |
3719 | - time.sleep(1) |
3720 | - print "Has the lock expired (yes)?:" + str(tran_set.hasLockExpired(tran_pb)) |
3721 | - print "******************" |
3722 | - |
3723 | - tran_pb = datastore_pb.Transaction() |
3724 | - tran_pb.set_handle(600) |
3725 | - tran_set.add(tran_pb) |
3726 | - print "Needs the lock without the group set:" + str(tran_set.needsLock(tran_pb)) |
3727 | - tran_set.setGroup(tran_pb, "my_group") |
3728 | - print "Does not Need lock with the group set:" + str(tran_set.needsLock(tran_pb)) |
3729 | - print "Group is: " + str(tran_set.getGroup(tran_pb)) |
3730 | - tran_set.printSet() |
3731 | - print "******************" |
3732 | - print "Removing 600" |
3733 | - print "******************" |
3734 | - |
3735 | - tran_set.purge(tran_pb) |
3736 | - tran_set.printSet() |
3737 | - tran_pb.set_handle(500) |
3738 | - print "500 valid (yes) ?: " + str(tran_set.isValid(tran_pb)) |
3739 | - change_set = tran_set.getChangeSet(tran_pb) |
3740 | - print "Change set: " + str(change_set) |
3741 | - tran_set.purge(tran_pb) |
3742 | - print "Change set after purge (same): " + str(change_set) |
3743 | - |
3744 | - |
3745 | -if __name__ == '__main__': |
3746 | - main(sys.argv[1:]) |
3747 | |
3748 | === modified file 'AppDB/cassandra/py_cassandra.py' |
3749 | --- AppDB/cassandra/py_cassandra.py 2010-05-21 07:13:04 +0000 |
3750 | +++ AppDB/cassandra/py_cassandra.py 2010-06-29 01:49:23 +0000 |
3751 | @@ -1,11 +1,9 @@ |
3752 | # |
3753 | # Cassandra Interface for AppScale |
3754 | -# |
3755 | +# Rewritten by Navraj Chohan for using range queries |
3756 | # Modified by Chris Bunch for upgrade to Cassandra 0.50.0 |
3757 | # on 2/17/10 |
3758 | - |
3759 | -__author__="Soo Hwan Park (suwanny@gmail.com)" |
3760 | -__date__="$2009.6.4 19:44:00$" |
3761 | +# Original author: suwanny@gmail.com |
3762 | |
3763 | import os,sys |
3764 | import time |
3765 | @@ -14,11 +12,9 @@ |
3766 | from thrift_cass.ttypes import * |
3767 | |
3768 | import string |
3769 | -import sys, os |
3770 | import base64 # base64 2009.04.16 |
3771 | from dbconstants import * |
3772 | from dbinterface import * |
3773 | -from dhash_datastore import * |
3774 | import sqlalchemy.pool as pool |
3775 | import appscale_logger |
3776 | |
3777 | @@ -26,9 +22,12 @@ |
3778 | from thrift.transport import TSocket |
3779 | from thrift.transport import TTransport |
3780 | from thrift.protocol import TBinaryProtocol |
3781 | - |
3782 | ERROR_DEFAULT = "DB_ERROR:" # ERROR_CASSANDRA |
3783 | - |
3784 | +# Store all schema information in a special table |
3785 | +# If a table does not show up in this table, try a range query |
3786 | +# to discover it's schema |
3787 | +SCHEMA_TABLE = "__key__" |
3788 | +SCHEMA_TABLE_SCHEMA = ['schema'] |
3789 | # use 1 Table and 1 ColumnFamily in Cassandra |
3790 | MAIN_TABLE = "Keyspace1" |
3791 | COLUMN_FAMILY = "Standard1" |
3792 | @@ -44,71 +43,219 @@ |
3793 | CONSISTENCY_QUORUM = 2 |
3794 | CONSISTENCY_ALL = 5 # don't use this for reads (next version may fix this) |
3795 | |
3796 | -class DatastoreProxy(DHashDatastore): |
3797 | +MAX_ROW_COUNT = 10000000 |
3798 | +table_cache = {} |
3799 | +class DatastoreProxy(AppDBInterface): |
3800 | def __init__(self, logger = appscale_logger.getLogger("datastore-cassandra")): |
3801 | - DHashDatastore.__init__(self, logger) |
3802 | # TODO: is this correct? |
3803 | f = open(APPSCALE_HOME + '/.appscale/my_public_ip', 'r') |
3804 | self.host = f.read() |
3805 | # self.host = host |
3806 | self.port = DEFAULT_PORT |
3807 | - self.logger.debug("Cassandra connection init to %s:%d" % (self.host, self.port)) |
3808 | self.pool = pool.QueuePool(self.__create_connection, reset_on_return=False) |
3809 | + self.logger = logger |
3810 | |
3811 | def logTiming(self, function, start_time, end_time): |
3812 | if PROFILING: |
3813 | self.logger.debug(function + ": " + str(end_time - start_time) + " s") |
3814 | - |
3815 | - ###################################################################### |
3816 | - # Cassandra specific methods |
3817 | - ###################################################################### |
3818 | - |
3819 | - def get(self, key, column = DEFAULT_VALUE): |
3820 | - value = None |
3821 | - client = None |
3822 | - try: |
3823 | - path = ColumnPath(COLUMN_FAMILY, column=column) |
3824 | - client = self.__setup_connection() |
3825 | - response = client.get(MAIN_TABLE, key, path, CONSISTENCY_QUORUM) |
3826 | -# transport.close() |
3827 | - |
3828 | - value = response.column.value |
3829 | - except NotFoundException: # occurs normally if the item isn't in the db |
3830 | - return value |
3831 | - except Exception, ex: |
3832 | - self.logger.debug("Exception %s" % ex) |
3833 | - self.__close_connection(client) |
3834 | - return value |
3835 | - |
3836 | - def put(self, key, value, column = DEFAULT_VALUE): |
3837 | - client = None |
3838 | - try: |
3839 | - path = ColumnPath(COLUMN_FAMILY, column=column) |
3840 | - client = self.__setup_connection() |
3841 | - client.insert(MAIN_TABLE, key, path, value, self.timestamp(), CONSISTENCY_QUORUM) |
3842 | -# transport.close() |
3843 | - except Exception, e : |
3844 | - self.logger.debug("put key:%s, column: %s value:%s" % (key, column, value)) |
3845 | - self.__close_connection(client) |
3846 | - |
3847 | - def remove(self, key, column = ""): |
3848 | - client = None |
3849 | - try: |
3850 | - client = self.__setup_connection() |
3851 | - |
3852 | - # FIXME: cgb: you'd think that you could just remove the if / else |
3853 | - # here, but for some reason it messes up delete if you do |
3854 | - # look into why that is |
3855 | - if column: |
3856 | - path = ColumnPath(COLUMN_FAMILY, column=column) |
3857 | - client.remove(MAIN_TABLE, key, path, self.timestamp(), CONSISTENCY_QUORUM) |
3858 | - else: |
3859 | - path = ColumnPath(COLUMN_FAMILY) |
3860 | - client.remove(MAIN_TABLE, key, path, self.timestamp(), CONSISTENCY_QUORUM) |
3861 | - |
3862 | - except Exception, ex: |
3863 | - self.logger.debug("Exception %s" % ex) |
3864 | - self.__close_connection(client) |
3865 | + |
3866 | + def get_entity(self, table_name, row_key, column_names): |
3867 | + error = [ERROR_DEFAULT] |
3868 | + list = error |
3869 | + client = None |
3870 | + row_key = table_name + '/' + row_key |
3871 | + try: |
3872 | + slice_predicate = SlicePredicate(column_names=column_names) |
3873 | + path = ColumnPath(COLUMN_FAMILY) |
3874 | + client = self.__setup_connection() |
3875 | + # Result is a column type which has name, value, timestamp |
3876 | + result = client.get_slice(MAIN_TABLE, row_key, path, slice_predicate, |
3877 | + CONSISTENCY_QUORUM) |
3878 | + for column in column_names: |
3879 | + for r in result: |
3880 | + c = r.column |
3881 | + if column == c.name: |
3882 | + list.append(c.value) |
3883 | + except NotFoundException: # occurs normally if the item isn't in the db |
3884 | + list[0] += "Not found" |
3885 | + self.__close_connection(client) |
3886 | + return list |
3887 | + except Exception, ex: |
3888 | + #self.logger.debug("Exception %s" % ex) |
3889 | + list[0]+=("Exception: %s"%ex) |
3890 | + self.__close_connection(client) |
3891 | + return list |
3892 | + self.__close_connection(client) |
3893 | + if len(list) == 1: |
3894 | + list[0] += "Not found" |
3895 | + return list |
3896 | + |
3897 | + |
3898 | + def put_entity(self, table_name, row_key, column_names, cell_values): |
3899 | + error = [ERROR_DEFAULT] |
3900 | + list = error |
3901 | + client = None |
3902 | + |
3903 | + # The first time a table is seen |
3904 | + if table_name not in table_cache: |
3905 | + self.create_table(table_name, column_names) |
3906 | + |
3907 | + row_key = table_name + '/' + row_key |
3908 | + try: |
3909 | + client = self.__setup_connection() |
3910 | + curtime = self.timestamp() |
3911 | + # Result is a column type which has name, value, timestamp |
3912 | + mutations = [] |
3913 | + for index, ii in enumerate(column_names): |
3914 | + column = Column(name = ii, value=cell_values[index], |
3915 | + timestamp=curtime) |
3916 | + c_or_sc = ColumnOrSuperColumn(column=column) |
3917 | + mutation = Mutation(column_or_supercolumn=c_or_sc) |
3918 | + mutations.append(mutation) |
3919 | + mutation_map = {row_key : { COLUMN_FAMILY : mutations } } |
3920 | + client.batch_mutate(MAIN_TABLE, mutation_map, |
3921 | + CONSISTENCY_QUORUM) |
3922 | + except Exception, ex: |
3923 | + self.logger.debug("Exception %s" % ex) |
3924 | + list[0]+=("Exception: %s"%ex) |
3925 | + self.__close_connection(client) |
3926 | + list.append("0") |
3927 | + return list |
3928 | + self.__close_connection(client) |
3929 | + list.append("0") |
3930 | + return list |
3931 | + |
3932 | + def put_entity_dict(self, table_name, row_key, value_dict): |
3933 | + raise NotImplementedError("put_entity_dict is not implemented in %s." % self.__class__) |
3934 | + |
3935 | + |
3936 | + def get_table(self, table_name, column_names): |
3937 | + error = [ERROR_DEFAULT] |
3938 | + client = None |
3939 | + result = error |
3940 | + keyslices = [] |
3941 | + column_parent = ColumnParent(column_family="Standard1") |
3942 | + predicate = SlicePredicate(column_names=column_names) |
3943 | + start_key = table_name |
3944 | + end_key = table_name + '~' |
3945 | + try: |
3946 | + client = self.__setup_connection() |
3947 | + keyslices = client.get_range_slice(MAIN_TABLE, |
3948 | + column_parent, |
3949 | + predicate, |
3950 | + start_key, |
3951 | + end_key, |
3952 | + MAX_ROW_COUNT, |
3953 | + CONSISTENCY_QUORUM) |
3954 | + except Exception, ex: |
3955 | + self.logger.debug("Exception %s" % ex) |
3956 | + result[0] += "Exception: " + str(ex) |
3957 | + self.__close_connection(client) |
3958 | + return result |
3959 | + for keyslice in keyslices: |
3960 | + ordering_dict = {} |
3961 | + for c in keyslice.columns: |
3962 | + column = c.column |
3963 | + value = column.value |
3964 | + ordering_dict[column.name] = value |
3965 | + if len(ordering_dict) == 0: |
3966 | + continue |
3967 | + for column in column_names: |
3968 | + try: |
3969 | + result.append(ordering_dict[column]) |
3970 | + except: |
3971 | + result[0] += "Key error, get_table did not return the correct schema" |
3972 | + self.__close_connection(client) |
3973 | + return result |
3974 | + |
3975 | + def delete_row(self, table_name, row_key): |
3976 | + error = [ERROR_DEFAULT] |
3977 | + ret = error |
3978 | + client = None |
3979 | + row_key = table_name + '/' + row_key |
3980 | + path = ColumnPath(COLUMN_FAMILY) |
3981 | + try: |
3982 | + client = self.__setup_connection() |
3983 | + curtime = self.timestamp() |
3984 | + # Result is a column type which has name, value, timestamp |
3985 | + client.remove(MAIN_TABLE, row_key, path, curtime, |
3986 | + CONSISTENCY_QUORUM) |
3987 | + except Exception, ex: |
3988 | + self.logger.debug("Exception %s" % ex) |
3989 | + ret[0]+=("Exception: %s"%ex) |
3990 | + self.__close_connection(client) |
3991 | + return ret |
3992 | + self.__close_connection(client) |
3993 | + ret.append("0") |
3994 | + return ret |
3995 | + |
3996 | + def get_schema(self, table_name): |
3997 | + error = [ERROR_DEFAULT] |
3998 | + result = error |
3999 | + ret = self.get_entity(SCHEMA_TABLE, |
4000 | + table_name, |
4001 | + SCHEMA_TABLE_SCHEMA) |
4002 | + if len(ret) > 1: |
4003 | + schema = ret[1] |
4004 | + else: |
4005 | + error[0] = ret[0] + "--unable to get schema" |
4006 | + return error |
4007 | + schema = schema.split(':') |
4008 | + result = result + schema |
4009 | + return result |
4010 | + |
4011 | + |
4012 | + def delete_table(self, table_name): |
4013 | + error = [ERROR_DEFAULT] |
4014 | + result = error |
4015 | + keyslices = [] |
4016 | + column_parent = ColumnParent(column_family="Standard1") |
4017 | + predicate = SlicePredicate(column_names=[]) |
4018 | + curtime = self.timestamp() |
4019 | + path = ColumnPath(COLUMN_FAMILY) |
4020 | + start_key = table_name |
4021 | + end_key = table_name + '~' |
4022 | + try: |
4023 | + client = self.__setup_connection() |
4024 | + keyslices = client.get_range_slice(MAIN_TABLE, |
4025 | + column_parent, |
4026 | + predicate, |
4027 | + start_key, |
4028 | + end_key, |
4029 | + MAX_ROW_COUNT, |
4030 | + CONSISTENCY_QUORUM) |
4031 | + except Exception, ex: |
4032 | + self.logger.debug("Exception %s" % ex) |
4033 | + result[0]+=("Exception: %s"%ex) |
4034 | + self.__close_connection(client) |
4035 | + return result |
4036 | + keys_removed = False |
4037 | + for keyslice in keyslices: |
4038 | + row_key = keyslice.key |
4039 | + client.remove(MAIN_TABLE, |
4040 | + row_key, |
4041 | + path, |
4042 | + curtime, |
4043 | + CONSISTENCY_QUORUM) |
4044 | + keys_removed = True |
4045 | + if table_name not in table_cache and keys_removed: |
4046 | + result[0] += "Table does not exist" |
4047 | + return result |
4048 | + if table_name in table_cache: |
4049 | + del table_cache[table_name] |
4050 | + |
4051 | + self.__close_connection(client) |
4052 | + return result |
4053 | + |
4054 | + # Only stores the schema |
4055 | + def create_table(self, table_name, column_names): |
4056 | + table_cache[table_name] = 1 |
4057 | + columns = ':'.join(column_names) |
4058 | + row_key = table_name |
4059 | + # Get and make sure we are not overwriting previous schemas |
4060 | + ret = self.get_entity(SCHEMA_TABLE, row_key, SCHEMA_TABLE_SCHEMA) |
4061 | + if ret[0] != ERROR_DEFAULT: |
4062 | + self.put_entity(SCHEMA_TABLE, row_key, SCHEMA_TABLE_SCHEMA, [columns]) |
4063 | |
4064 | ###################################################################### |
4065 | # private methods |
4066 | |
4067 | === modified file 'AppDB/cassandra/templates/storage-conf.xml' |
4068 | --- AppDB/cassandra/templates/storage-conf.xml 2010-04-21 11:16:18 +0000 |
4069 | +++ AppDB/cassandra/templates/storage-conf.xml 2010-06-29 01:49:23 +0000 |
4070 | @@ -178,7 +178,7 @@ |
4071 | ~ directories, since the partitioner can modify the sstable on-disk |
4072 | ~ format. |
4073 | --> |
4074 | - <Partitioner>org.apache.cassandra.dht.RandomPartitioner</Partitioner> |
4075 | + <Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner> |
4076 | |
4077 | <!-- |
4078 | ~ If you are using an order-preserving partitioner and you know your key |
4079 | |
4080 | === modified file 'AppDB/datastore_tester.py' |
4081 | --- AppDB/datastore_tester.py 2009-12-02 20:15:14 +0000 |
4082 | +++ AppDB/datastore_tester.py 2010-06-29 01:49:23 +0000 |
4083 | @@ -41,10 +41,10 @@ |
4084 | #print "columns= " + str(columns) |
4085 | #print "data= " + str(data) |
4086 | print "table= " + table_name |
4087 | -app_datastore = appscale_datastore.Datastore(datastore_type) |
4088 | - |
4089 | -ERROR_CODES = app_datastore.error_codes() |
4090 | -if datastore_type not in app_datastore.valid_datastores(): |
4091 | +app_datastore = appscale_datastore.DatastoreFactory.getDatastore(datastore_type) |
4092 | +ERROR_CODES = appscale_datastore.DatastoreFactory.error_codes() |
4093 | +VALID_DATASTORES = appscale_datastore.DatastoreFactory.valid_datastores() |
4094 | +if datastore_type not in VALID_DATASTORES: |
4095 | print "Bad selection for datastore. Valid selections are:" |
4096 | print app_datastore.valid_datastores() |
4097 | exit(1) |
4098 | @@ -127,10 +127,12 @@ |
4099 | ################################################# |
4100 | # Get and a delete on a table that does not exist |
4101 | ################################################# |
4102 | +# There is too much overhead in checking to see if the table exists |
4103 | +# for cassandra |
4104 | invalid_table = hf.randomString(10) |
4105 | -ret = app_datastore.delete_row(invalid_table, key) |
4106 | -if ret[0] in ERROR_CODES: |
4107 | - err(hf.lineno(), ret) |
4108 | +#ret = app_datastore.delete_row(invalid_table, key) |
4109 | +#if ret[0] in ERROR_CODES: |
4110 | +# err(hf.lineno(), ret) |
4111 | ret = app_datastore.get_entity(invalid_table, key, columns) |
4112 | if ret[0] in ERROR_CODES: |
4113 | err(hf.lineno(), ret) |
4114 | @@ -186,9 +188,19 @@ |
4115 | # Get schema on a table that exist, and one that doesnt |
4116 | ####################################################### |
4117 | ret = app_datastore.get_schema(table_name) |
4118 | -if ret[0] not in ERROR_CODES or ret[1:] != columns: |
4119 | +if ret[0] not in ERROR_CODES or (ret[1:]).sort() != columns.sort(): |
4120 | + print "ret[1:]:",ret[1:].sort |
4121 | + print "columns:",columns.sort |
4122 | err(hf.lineno(), ret) |
4123 | ret = app_datastore.get_schema(invalid_table) |
4124 | if ret[0] in ERROR_CODES: |
4125 | err(hf.lineno(), ret) |
4126 | +################################################ |
4127 | +# Get data from a table that does not exist |
4128 | +# Should return an empty list |
4129 | +################################################ |
4130 | +ret = app_datastore.get_table(invalid_table, columns) |
4131 | +if ret[0] not in ERROR_CODES and len(ret) != 1: |
4132 | + err(hf.lineno(), ret) |
4133 | + |
4134 | print "SUCCESS" |
4135 | |
4136 | === modified file 'AppDB/dbinterface.py' |
4137 | --- AppDB/dbinterface.py 2010-06-25 00:08:24 +0000 |
4138 | +++ AppDB/dbinterface.py 2010-06-29 01:49:23 +0000 |
4139 | @@ -7,19 +7,19 @@ |
4140 | import os |
4141 | |
4142 | class AppDBInterface: |
4143 | - def get_entity(self, table_name, row_key, column_names): |
4144 | + def get_entity(self, table_name, row_key, column_names, txnid = 0): |
4145 | raise NotImplementedError("get_entity is not implemented in %s." % self.__class__) |
4146 | |
4147 | - def put_entity(self, table_name, row_key, column_names, cell_values): |
4148 | + def put_entity(self, table_name, row_key, column_names, cell_values, txnid = 0): |
4149 | raise NotImplementedError("put_entity is not implemented in %s." % self.__class__) |
4150 | |
4151 | def put_entity_dict(self, table_name, row_key, value_dict): |
4152 | raise NotImplementedError("put_entity_dict is not implemented in %s." % self.__class__) |
4153 | |
4154 | - def get_table(self, table_name, column_names): |
4155 | + def get_table(self, table_name, column_names, txnid = 0): |
4156 | raise NotImplementedError("get_table is not implemented in %s." % self.__class__) |
4157 | |
4158 | - def delete_row(self, table_name, row_id): |
4159 | + def delete_row(self, table_name, row_id, txnid = 0): |
4160 | raise NotImplementedError("delete_row is not implemented in %s." % self.__class__) |
4161 | |
4162 | def get_schema(self, table_name): |
4163 | @@ -28,6 +28,14 @@ |
4164 | def delete_table(self, table_name): |
4165 | raise NotImplementedError("delete_table is not implemented in %s." % self.__class__) |
4166 | |
4167 | + def commit(self, txnid): |
4168 | + raise NotImplementedError("commit is not implemented in %s." % self.__class__) |
4169 | + def rollback(self, txnid): |
4170 | + raise NotImplementedError("rollback is not implemented in %s." % self.__class__) |
4171 | + def setupTransaction(self, txnid): |
4172 | + raise NotImplementedError("rollback is not implemented in %s." % self.__class__) |
4173 | + |
4174 | + |
4175 | def get_local_ip(self): |
4176 | try: |
4177 | local_ip = self.__local_ip |
4178 | |
4179 | === modified file 'AppDB/dhash_datastore.py' |
4180 | --- AppDB/dhash_datastore.py 2010-05-05 23:27:52 +0000 |
4181 | +++ AppDB/dhash_datastore.py 2010-06-29 01:49:23 +0000 |
4182 | @@ -217,9 +217,9 @@ |
4183 | if self.remove_key(row_id, table_name): |
4184 | self.remove(internal_key) |
4185 | return elist |
4186 | - else: |
4187 | - elist[0] += "row doesn't exist" |
4188 | - return elist |
4189 | + #else: |
4190 | + elist.append("0") |
4191 | + return elist |
4192 | |
4193 | def get_schema(self, table_name): |
4194 | self.logger.debug("get_schema: %s" % table_name) |
4195 | |
4196 | === modified file 'AppDB/hbase/py_hbase.py' |
4197 | --- AppDB/hbase/py_hbase.py 2010-05-21 07:13:04 +0000 |
4198 | +++ AppDB/hbase/py_hbase.py 2010-06-29 01:49:23 +0000 |
4199 | @@ -1,11 +1,9 @@ |
4200 | #Author: Navraj Chohan |
4201 | |
4202 | -import os,sys |
4203 | +import os |
4204 | |
4205 | import Hbase |
4206 | import ttypes |
4207 | -import string |
4208 | -import threading |
4209 | import time |
4210 | from thrift import Thrift |
4211 | from thrift.transport import TSocket |
4212 | @@ -85,15 +83,12 @@ |
4213 | column_names = client.getColumnDescriptors(table_name) |
4214 | keys = column_names.keys() |
4215 | |
4216 | - table = get_table(table_name, keys) |
4217 | + table = self.get_table(table_name, keys) |
4218 | value = (len(table) - 1)/(len(keys)) |
4219 | elist.append(value) |
4220 | except ttypes.IOError, io: |
4221 | elist[0] = elist[0] + "Row Count. IO Error--" + io.message |
4222 | value = 0 |
4223 | - except ttypes.IOException, io: |
4224 | - elist[0] = elist[0] + "Row Count. IO Exception--" + io.message |
4225 | - value = 0 |
4226 | self.__closeConnection(client) |
4227 | et = time.time() |
4228 | self.logTiming("HB ROWCOUNT", st, et) |
4229 | @@ -272,7 +267,10 @@ |
4230 | client.scannerClose(scanner) |
4231 | except ttypes.IOError, io: |
4232 | if io.message: |
4233 | - elist[0] += "IO Error--" + str(io.message) |
4234 | + if io.message == table_name: |
4235 | + pass # Return an empty table |
4236 | + else: |
4237 | + elist[0] += "IO Error--" + str(io.message) |
4238 | else: |
4239 | elist[0] += "IOError" |
4240 | except ttypes.IllegalArgument, e: |
4241 | |
4242 | === modified file 'AppDB/helper_functions.py' |
4243 | --- AppDB/helper_functions.py 2009-12-01 07:13:55 +0000 |
4244 | +++ AppDB/helper_functions.py 2010-06-29 01:49:23 +0000 |
4245 | @@ -57,7 +57,7 @@ |
4246 | |
4247 | def randomString(length): |
4248 | s = hashlib.sha256() |
4249 | - ret = "" |
4250 | + ret = "a" |
4251 | while len(ret) < length: |
4252 | s.update(str(random.random())) |
4253 | ret += s.hexdigest() |
4254 | |
4255 | === modified file 'AppDB/hypertable/py_hypertable.py' |
4256 | --- AppDB/hypertable/py_hypertable.py 2010-05-21 07:13:04 +0000 |
4257 | +++ AppDB/hypertable/py_hypertable.py 2010-06-29 01:49:23 +0000 |
4258 | @@ -142,6 +142,8 @@ |
4259 | if PROFILING: |
4260 | self.logger.debug("HT GET: %s"%str(endtime - starttime)) |
4261 | self.__closeConnection(client) |
4262 | + if len(elist) == 1: |
4263 | + elist[0] += "Not Found" |
4264 | return elist |
4265 | |
4266 | def delete_table(self, table_name): |
4267 | @@ -173,7 +175,10 @@ |
4268 | if not self.__table_exist(table_name): |
4269 | query = "create table " + table_name + "( " + ', '.join(column_names) + ")" |
4270 | print "create table query=%s" % query |
4271 | - ret = client.hql_query(query) |
4272 | + try: |
4273 | + ret = client.hql_query(query) |
4274 | + except Exception, e: |
4275 | + print e.message |
4276 | # the xml schema is not working currectly. |
4277 | # schema = "<Schema><AccessGroup name=\"default\"><ColumnFamily>" |
4278 | # for column in column_names: |
4279 | @@ -269,7 +274,8 @@ |
4280 | if res[ii].column_family in column_names: |
4281 | elist += [res[ii].value] |
4282 | except: |
4283 | - elist[0] += "Not Found" |
4284 | + pass |
4285 | + #elist[0] += "Not Found" |
4286 | endtime = time.time() |
4287 | if PROFILING: |
4288 | self.logger.debug("HT GET_TABLE: %s"%str(endtime - starttime)) |
4289 | |
4290 | === modified file 'AppDB/mongodb/py_mongodb.py' |
4291 | --- AppDB/mongodb/py_mongodb.py 2010-04-08 23:58:23 +0000 |
4292 | +++ AppDB/mongodb/py_mongodb.py 2010-06-29 01:49:23 +0000 |
4293 | @@ -74,6 +74,8 @@ |
4294 | |
4295 | elist.append("0") |
4296 | self.__close_connection(db) |
4297 | + if len(elist) == 1: |
4298 | + elist[0] += "Not found" |
4299 | return elist |
4300 | |
4301 | def create_table(self, table_name, columns): |
4302 | @@ -166,6 +168,8 @@ |
4303 | if column == schema[ii]: |
4304 | elist.append(data[ii]) |
4305 | self.__close_connection(db) |
4306 | + if len(elist) == 1: |
4307 | + elist[0] += "Not found" |
4308 | return elist |
4309 | |
4310 | def get_schema(self, table_name): |
4311 | @@ -222,8 +226,8 @@ |
4312 | usedRows.append(myRow) |
4313 | |
4314 | |
4315 | - if(len(elist) == 1): |
4316 | - elist[0] += "table not found" |
4317 | + #if len(elist) == 1: |
4318 | + # elist[0] += "table not found" |
4319 | |
4320 | self.__close_connection(db) |
4321 | return elist |
4322 | |
4323 | === modified file 'AppDB/mysql/drop_all_tables.py' |
4324 | --- AppDB/mysql/drop_all_tables.py 2009-07-20 08:49:28 +0000 |
4325 | +++ AppDB/mysql/drop_all_tables.py 2010-06-29 01:49:23 +0000 |
4326 | @@ -21,7 +21,7 @@ |
4327 | SLAVES_FILE = APPSCALE_HOME + "/.appscale/slaves" |
4328 | #DB_LOCATION = socket.gethostbyname(socket.gethostname()) |
4329 | DB_LOCATION = "127.0.0.1" |
4330 | -DATABASE = "test" |
4331 | +DATABASE = "appscale" |
4332 | def set_db_location(): |
4333 | global DB_LOCATION |
4334 | file = open(SLAVES_FILE, "r") |
4335 | @@ -34,6 +34,9 @@ |
4336 | cursor = client.cursor() |
4337 | cursor.execute("DROP DATABASE " + DATABASE) |
4338 | cursor.execute("CREATE DATABASE " + DATABASE) |
4339 | + cursor.close() |
4340 | + client.commit() |
4341 | + client.close() |
4342 | except MySQLdb.Error, e: |
4343 | print e.args[1] |
4344 | return 0 |
4345 | |
4346 | === modified file 'AppDB/mysql/prime_mysql.py' |
4347 | --- AppDB/mysql/prime_mysql.py 2010-05-27 21:59:59 +0000 |
4348 | +++ AppDB/mysql/prime_mysql.py 2010-06-29 01:49:23 +0000 |
4349 | @@ -3,18 +3,17 @@ |
4350 | |
4351 | import sys, time |
4352 | import os |
4353 | +import MySQLdb |
4354 | +import _mysql |
4355 | +import socket |
4356 | +from dbconstants import * |
4357 | + |
4358 | |
4359 | APPSCALE_HOME = os.environ.get("APPSCALE_HOME") |
4360 | if APPSCALE_HOME: |
4361 | pass |
4362 | else: |
4363 | print "APPSCALE_HOME env var not set" |
4364 | - APPSCALE_HOME = "/root/appscale/" |
4365 | - |
4366 | -import MySQLdb |
4367 | -import _mysql |
4368 | -import socket |
4369 | -from dbconstants import * |
4370 | |
4371 | ROW_KEY = "mysql__row_key__" |
4372 | SLAVES_FILE = APPSCALE_HOME + "/.appscale/slaves" |
4373 | @@ -27,13 +26,23 @@ |
4374 | DB_LOCATION = file.readline() |
4375 | file.close() |
4376 | |
4377 | +def drop_database(): |
4378 | + client = MySQLdb.connect(host=DB_LOCATION, db="mysql") |
4379 | + cursor = client.cursor() |
4380 | + cursor.execute("DROP DATABASE IF EXISTS appscale;") |
4381 | + cursor.close() |
4382 | + client.commit() |
4383 | + client.close() |
4384 | + |
4385 | def create_database(): |
4386 | client = MySQLdb.connect(host=DB_LOCATION, db="mysql") |
4387 | cursor = client.cursor() |
4388 | - cursor.execute("drop database if exists appscale;") |
4389 | - cursor.execute("create database appscale;") |
4390 | + cursor.execute("CREATE DATABASE IF NOT EXISTS appscale;") |
4391 | + cursor.close() |
4392 | + client.commit() |
4393 | client.close() |
4394 | |
4395 | + |
4396 | def create_table(tablename, columns): |
4397 | try: |
4398 | client = MySQLdb.connect(host=DB_LOCATION, db="appscale") |
4399 | @@ -43,13 +52,16 @@ |
4400 | columnscopy += ["x" + columns[ii]] |
4401 | command = "CREATE TABLE IF NOT EXISTS " + tablename + "( " + ROW_KEY + " CHAR(80) primary key, " + ' MEDIUMBLOB, '.join(columnscopy) + " MEDIUMBLOB) ENGINE=NDBCLUSTER" |
4402 | print command |
4403 | - cursor.execute(command) |
4404 | + print cursor.execute(command) |
4405 | except MySQLdb.Error, e: |
4406 | print e.args[1] |
4407 | return 0 |
4408 | + cursor.close() |
4409 | + client.commit() |
4410 | client.close() |
4411 | return 1 |
4412 | |
4413 | +drop_database() |
4414 | create_database() |
4415 | |
4416 | def prime_mysql(): |
4417 | |
4418 | === modified file 'AppDB/mysql/py_mysql.py' |
4419 | --- AppDB/mysql/py_mysql.py 2010-03-15 22:22:12 +0000 |
4420 | +++ AppDB/mysql/py_mysql.py 2010-06-29 01:49:23 +0000 |
4421 | @@ -1,16 +1,17 @@ |
4422 | # Author: Navraj Chohan |
4423 | -import sys |
4424 | +#import sys |
4425 | import os |
4426 | -import string |
4427 | import MySQLdb |
4428 | -import _mysql |
4429 | -import base64 |
4430 | -import sqlalchemy.pool as pool |
4431 | +#import _mysql |
4432 | +#import sqlalchemy.pool as pool |
4433 | from dbinterface import * |
4434 | import appscale_logger |
4435 | - |
4436 | -MySQLdb = pool.manage(MySQLdb) |
4437 | - |
4438 | +import threading |
4439 | +import time |
4440 | +#MySQLdb = pool.manage(MySQLdb) |
4441 | +TIMEOUT = 30 |
4442 | +# Time till next gc of connections |
4443 | +GC_TIME = 120 |
4444 | ROW_KEY = "mysql__row_key__" |
4445 | ERROR_MY = "DB_ERROR:" |
4446 | #DB_LOCATION = "appscale-image" |
4447 | @@ -18,22 +19,47 @@ |
4448 | DB_LOCATION = "127.0.0.1" |
4449 | #DB_PORT = 3306 |
4450 | DEBUG = False |
4451 | - |
4452 | +transDict = {} |
4453 | +transDict_lock = threading.Lock() |
4454 | +last_gc_time = 0 |
4455 | class DatastoreProxy(AppDBInterface): |
4456 | |
4457 | def __init__(self, log = appscale_logger.getLogger("datastore-mysql")): |
4458 | self.logger = log |
4459 | self.client = None |
4460 | + self.transactionsOn = False |
4461 | + |
4462 | + |
4463 | + def commit(self, txnid): |
4464 | + elist = [ERROR_MY] |
4465 | + try: |
4466 | + cursor, client = self.__get_connection(txnid) |
4467 | + cursor.close() |
4468 | + client.commit() |
4469 | + self.__close_connection(txnid) |
4470 | + except MySQLdb.Error, e: |
4471 | + if DEBUG: self.logger.info(str(e.args[0]) + "--" + e.args[1]) |
4472 | + elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1] |
4473 | + return elist |
4474 | + |
4475 | + def rollback(self, txnid): |
4476 | + elist = [ERROR_MY] |
4477 | + try: |
4478 | + cursor, client = self.__get_connection(txnid) |
4479 | + cursor.close() |
4480 | + client.rollback() |
4481 | + self.__close_connection(txnid) |
4482 | + except MySQLdb.Error, e: |
4483 | + elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1] |
4484 | + return elist |
4485 | |
4486 | def get_schema(self, table_name): |
4487 | table_name = "x" + table_name |
4488 | elist = [ERROR_MY] |
4489 | client = None |
4490 | try: |
4491 | - client = self.__get_connection() |
4492 | - client.autocommit(1) |
4493 | + client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE) |
4494 | cursor = client.cursor() |
4495 | -# cursor.execute(USE_DATABASE) |
4496 | command = "SHOW fields FROM " + table_name |
4497 | cursor.execute(command) |
4498 | while (1): |
4499 | @@ -50,6 +76,7 @@ |
4500 | if DEBUG: print str(e.args[0]) + "--" + e.args[1] |
4501 | elist[0] = ERROR_MY + "Unable to get schema" |
4502 | if client: |
4503 | + client.commit() |
4504 | client.close() |
4505 | return elist |
4506 | |
4507 | @@ -58,10 +85,8 @@ |
4508 | elist = [ERROR_MY] |
4509 | client = None |
4510 | try: |
4511 | - client = self.__get_connection() |
4512 | - client.autocommit(1) |
4513 | + client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE) |
4514 | cursor = client.cursor() |
4515 | -# cursor.execute(USE_DATABASE) |
4516 | command = "drop table " + table_name |
4517 | if DEBUG: self.logger.info(command) |
4518 | cursor.execute(command) |
4519 | @@ -69,23 +94,29 @@ |
4520 | elist[0] += str(e.args[0]) + "--" + e.args[1] |
4521 | if DEBUG: self.logger.info(elist) |
4522 | if client: |
4523 | + client.commit() |
4524 | client.close() |
4525 | return elist |
4526 | |
4527 | - def get_entity(self, table_name, row_key, column_names): |
4528 | + def get_entity(self, table_name, row_key, column_names, txnid = 0): |
4529 | table_name = "x" + table_name |
4530 | elist = [ERROR_MY] |
4531 | client = None |
4532 | + isTrans = False |
4533 | + if txnid != 0 and self.transactionsOn: |
4534 | + isTrans = True |
4535 | if not row_key: |
4536 | self.logger.info("Null row key") |
4537 | elist[0] += "Null row key" |
4538 | return elist |
4539 | |
4540 | try: |
4541 | - client = self.__get_connection() |
4542 | - client.autocommit(1) |
4543 | - cursor = client.cursor() |
4544 | -# cursor.execute(USE_DATABASE) |
4545 | + if not isTrans: |
4546 | + client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE) |
4547 | + cursor = client.cursor() |
4548 | + else: |
4549 | + cursor, client = self.__get_connection(txnid) |
4550 | + |
4551 | command = "select " |
4552 | # Hacking on a x to make sure all columns start with a letter |
4553 | columncopy = [] |
4554 | @@ -99,32 +130,35 @@ |
4555 | cursor.execute(command) |
4556 | result = cursor.fetchone() |
4557 | if result == None: |
4558 | - client.close() |
4559 | + if not isTrans: |
4560 | + client.close() |
4561 | if len(elist) == 1: |
4562 | elist[0] += "Not found" |
4563 | return elist |
4564 | for ii in range(0, len(result)): |
4565 | - #elist += [result[ii]] |
4566 | if result[ii]: |
4567 | - #elist += [base64.b64decode(result[ii])] |
4568 | elist.append(result[ii]) |
4569 | else: |
4570 | elist.append('') |
4571 | except MySQLdb.Error, e: |
4572 | if e.args[1].find("exists") == -1: |
4573 | - client.close() |
4574 | + if not isTrans: |
4575 | + client.close() |
4576 | if len(elist) == 1: |
4577 | elist[0] += "Not found" |
4578 | return elist |
4579 | elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1] |
4580 | if DEBUG: self.logger.info(elist) |
4581 | - if client: |
4582 | + |
4583 | + if client and not isTrans: |
4584 | client.close() |
4585 | if len(elist) == 1: |
4586 | elist[0] += "Not found" |
4587 | return elist |
4588 | |
4589 | - def put_entity(self, table_name, row_key, column_names, cell_values): |
4590 | + def put_entity(self, table_name, row_key, column_names, cell_values, txnid = 0): |
4591 | + # Hacking on a x to make sure all columns start with a letter |
4592 | + # Mysql limitation |
4593 | table_name = "x" + table_name |
4594 | if DEBUG: self.logger.info("PUT ENTITY") |
4595 | if DEBUG: self.logger.info(str(cell_values)) |
4596 | @@ -135,28 +169,43 @@ |
4597 | self.logger.info("Null row key") |
4598 | elist[0] += "Null row key" |
4599 | return elist |
4600 | - # Hacking on a x to make sure all columns start with a letter |
4601 | columncopy = [] |
4602 | for ii in range(0, len(column_names)): |
4603 | columncopy.append("x" + column_names[ii]) |
4604 | - try: |
4605 | - client = self.__get_connection() |
4606 | - client.autocommit(1) |
4607 | - cursor = client.cursor() |
4608 | -# cursor.execute(USE_DATABASE) |
4609 | + |
4610 | + # TODO This should be reactive, if a put fails due to no table, create it |
4611 | + try: |
4612 | + tempclient = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE) |
4613 | + query = "CREATE TABLE IF NOT EXISTS " + table_name + " ( " + ROW_KEY + " CHAR(255) primary key, " + ' BLOB, '.join(columncopy) + " BLOB) ENGINE=NDBCLUSTER" |
4614 | + tempcursor = tempclient.cursor() |
4615 | + if DEBUG: self.logger.info(query) |
4616 | + result = tempcursor.execute(query) |
4617 | + if DEBUG: self.logger.info("DONE CREATING TABLE...%s",str(result)) |
4618 | + except MySQLdb.Error, e: |
4619 | + error = ERROR_MY + str(e.args[0]) + "--" + e.args[1] |
4620 | + print error |
4621 | + tempcursor.close() |
4622 | + tempclient.commit() |
4623 | + tempclient.close() |
4624 | + |
4625 | + isTrans = False |
4626 | + if txnid != 0 and self.transactionsOn: |
4627 | + isTrans = True |
4628 | + try: |
4629 | + if not isTrans: |
4630 | + client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE) |
4631 | + cursor = client.cursor() |
4632 | + else: |
4633 | + cursor, client = self.__get_connection(txnid) |
4634 | |
4635 | if len(column_names) != len(cell_values): |
4636 | elist[0] += "Error in put call |column_names| != |cell_values|" |
4637 | - client.close() |
4638 | + if not isTrans: |
4639 | + client.close() |
4640 | return elist |
4641 | - query = "CREATE TABLE IF NOT EXISTS " + table_name + " ( " + ROW_KEY + " CHAR(255) primary key, " + ' BLOB, '.join(columncopy) + " BLOB) ENGINE=NDBCLUSTER" |
4642 | - if DEBUG: self.logger.info(query) |
4643 | - result = cursor.execute(query) |
4644 | - if DEBUG: self.logger.info("DONE CREATING TABLE...%s",str(result)) |
4645 | values = [] |
4646 | for ii in range(0, len(cell_values)): |
4647 | if cell_values[ii]: |
4648 | - #value = base64.b64encode(cell_values[ii]) |
4649 | value = cell_values[ii] |
4650 | values.append(MySQLdb.escape_string(value)) |
4651 | else: |
4652 | @@ -178,8 +227,6 @@ |
4653 | command += "%s)" |
4654 | if DEBUG: self.logger.info(command) |
4655 | cursor.execute(command, tuple(cell_values + [MySQLdb.escape_string(row_key)])) |
4656 | - cursor.close() |
4657 | - cursor = "" |
4658 | else: |
4659 | # do an update |
4660 | for ii in range(0, len(cell_values)): |
4661 | @@ -196,13 +243,15 @@ |
4662 | elist.append("0") |
4663 | if DEBUG: self.logger.info(elist) |
4664 | if DEBUG: self.logger.info("DONE WITH PUT ENTITY") |
4665 | - if client: |
4666 | + if client and not isTrans: |
4667 | + cursor.close() |
4668 | + client.commit() |
4669 | client.close() |
4670 | return elist |
4671 | |
4672 | def __table_exist(self, table_name): |
4673 | table_name = "x" + table_name |
4674 | - client = self.__get_connection() |
4675 | + client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE) |
4676 | cursor = client.cursor() |
4677 | # cursor.execute(USE_DATABASE) |
4678 | cursor.execute('show tables') |
4679 | @@ -211,18 +260,30 @@ |
4680 | if row == None: |
4681 | break; |
4682 | if table_name == row[0]: |
4683 | + client.close() |
4684 | return 1 |
4685 | + |
4686 | + cursor.close() |
4687 | + client.commit() |
4688 | + client.close() |
4689 | return 0 |
4690 | |
4691 | - def delete_row(self, table_name, row_key): |
4692 | + def delete_row(self, table_name, row_key, txnid = 0): |
4693 | table_name = "x" + table_name |
4694 | if DEBUG: self.logger.info("DELETE ROW") |
4695 | client = None |
4696 | + |
4697 | + isTrans = False |
4698 | + if txnid != 0 and self.transactionsOn: |
4699 | + isTrans = True |
4700 | elist = [ERROR_MY] |
4701 | try: |
4702 | - client = self.__get_connection() |
4703 | - client.autocommit(1) |
4704 | - cursor = client.cursor() |
4705 | + if txnid == 0 or not self.transactionsOn: |
4706 | + client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE) |
4707 | + #client.autocommit(0) |
4708 | + cursor = client.cursor() |
4709 | + else: |
4710 | + cursor, client = self.__get_connection(txnid) |
4711 | # cursor.execute(USE_DATABASE) |
4712 | row_key = MySQLdb.escape_string(row_key) |
4713 | query = "delete from " + table_name + " WHERE " + ROW_KEY + "= '" + row_key + "'" |
4714 | @@ -232,7 +293,8 @@ |
4715 | elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1] |
4716 | if DEBUG: self.logger.info(elist) |
4717 | if DEBUG: self.logger.info("DELETING ROW") |
4718 | - if client: |
4719 | + if client and not isTrans: |
4720 | + client.commit() |
4721 | client.close() |
4722 | return elist |
4723 | |
4724 | @@ -242,8 +304,8 @@ |
4725 | client = None |
4726 | elist = [ERROR_MY] |
4727 | try: |
4728 | - client = self.__get_connection() |
4729 | - client.autocommit(1) |
4730 | + client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE) |
4731 | + #client.autocommit(0) |
4732 | cursor = client.cursor() |
4733 | # cursor.execute(USE_DATABASE) |
4734 | query = "SELECT COUNT(*) FROM " + table_name |
4735 | @@ -255,18 +317,22 @@ |
4736 | if DEBUG: self.logger.info(elist) |
4737 | if DEBUG: self.logger.info("DONE WITH ROW COUNT") |
4738 | if client: |
4739 | + cursor.close() |
4740 | client.close() |
4741 | return elist |
4742 | |
4743 | - def get_table(self, table_name, column_names): |
4744 | + def get_table(self, table_name, column_names, txnid = 0): |
4745 | table_name = "x" + table_name |
4746 | if DEBUG: self.logger.info("GET TABLE") |
4747 | client = None |
4748 | elist = [ERROR_MY] |
4749 | try: |
4750 | - client = self.__get_connection() |
4751 | - client.autocommit(1) |
4752 | - cursor = client.cursor() |
4753 | + if txnid == 0 or not self.transactionsOn: |
4754 | + client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE) |
4755 | + #client.autocommit(0) |
4756 | + cursor = client.cursor() |
4757 | + else: |
4758 | + cursor, client = self.__get_connection(txnid) |
4759 | # cursor.execute(USE_DATABASE) |
4760 | # Hacking on a letter to make sure all columns start with a letter |
4761 | columncopy = [] |
4762 | @@ -294,15 +360,15 @@ |
4763 | if DEBUG: self.logger.info(str(elist)) |
4764 | if DEBUG: self.logger.info("DONE GETTING TABLE") |
4765 | if client: |
4766 | - client.close() |
4767 | + if not self.transactionsOn or txnid == 0: |
4768 | + cursor.close() |
4769 | + client.close() |
4770 | return elist |
4771 | |
4772 | def __query_table(self, table_name): |
4773 | table_name = "x" + table_name |
4774 | client = self.__get_connection() |
4775 | - client.autocommit(1) |
4776 | cursor = client.cursor() |
4777 | -# cursor.execute(USE_DATABASE) |
4778 | cursor.execute("select * from " + table_name ) |
4779 | elist = [] |
4780 | while (1): |
4781 | @@ -310,9 +376,63 @@ |
4782 | if row == None: |
4783 | break |
4784 | elist.append(row) |
4785 | + if cursor: |
4786 | + cursor.close() |
4787 | if client: |
4788 | + client.commit() |
4789 | client.close() |
4790 | return elist |
4791 | |
4792 | - def __get_connection(self): |
4793 | - return MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE) |
4794 | + def __get_connection(self, txnid): |
4795 | + client = None |
4796 | + cursor = None |
4797 | + self.__gc() |
4798 | + |
4799 | + transDict_lock.acquire() |
4800 | + if txnid in transDict: |
4801 | + cursor, client, start_time = transDict[txnid] |
4802 | + transDict_lock.release() |
4803 | + if not client: |
4804 | + raise MySQLdb.Error(1, "Connection timed out") |
4805 | + return cursor, client |
4806 | + |
4807 | + # clean up expired connections |
4808 | + def __gc(self): |
4809 | + global last_gc_time |
4810 | + curtime = time.time() |
4811 | + if curtime < last_gc_time + GC_TIME: |
4812 | + return |
4813 | + transDict_lock.acquire() |
4814 | + del_list = [] |
4815 | + for ii in transDict: |
4816 | + cu, cl, st = transDict[ii] |
4817 | + if st + TIMEOUT < curtime: |
4818 | + del_list.append(ii) |
4819 | + # safe deletes |
4820 | + del_list.reverse() |
4821 | + for ii in del_list: |
4822 | + del transDict[ii] |
4823 | + transDict_lock.release() |
4824 | + last_gc_time = time.time() |
4825 | + |
4826 | + def setupTransaction(self, txnid): |
4827 | + self.transactionsOn = True |
4828 | + # New connection |
4829 | + client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE) |
4830 | + #client.autocommit(0) |
4831 | + cursor = client.cursor() |
4832 | + transDict_lock.acquire() |
4833 | + transDict[txnid] = cursor, client, time.time() |
4834 | + transDict_lock.release() |
4835 | + |
4836 | + def __close_connection(self, txnid): |
4837 | + transDict_lock.acquire() |
4838 | + if txnid in transDict: |
4839 | + cursor, client, start_time = transDict[txnid] |
4840 | + cursor.close() |
4841 | + client.close() |
4842 | + del transDict[txnid] |
4843 | + transDict_lock.release() |
4844 | + return |
4845 | + |
4846 | + |
4847 | |
4848 | === added file 'AppDB/mysql/test_mysql_trans.py' |
4849 | --- AppDB/mysql/test_mysql_trans.py 1970-01-01 00:00:00 +0000 |
4850 | +++ AppDB/mysql/test_mysql_trans.py 2010-06-29 01:49:23 +0000 |
4851 | @@ -0,0 +1,115 @@ |
4852 | +import py_mysql |
4853 | +import random |
4854 | +#over write the import |
4855 | +py_mysql = py_mysql.DatastoreProxy() |
4856 | +columns = ["a","b","c"] |
4857 | +data = ["1","2","3"] |
4858 | +invalid_data = ['y','y','y'] |
4859 | +table_name = "hello" |
4860 | +key = "1" |
4861 | +print "key= " + key |
4862 | +print "columns= " + str(columns) |
4863 | +print "data= " + str(data) |
4864 | +print "table= " + table_name |
4865 | +txn = random.randint(0,100000000) |
4866 | +py_mysql.setupTransaction(txn) |
4867 | +print "Test: Transaction number:",txn |
4868 | +py_mysql.put_entity(table_name, key, columns, invalid_data, txn) |
4869 | +ret = py_mysql.get_entity(table_name, key, columns, txn) |
4870 | +print "Test: Invalid:" |
4871 | +print ret |
4872 | +ret = py_mysql.get_entity(table_name, key, columns) |
4873 | +print "Test: Outside transaction:" |
4874 | +print ret |
4875 | +print py_mysql.put_entity(table_name, key, columns, data, txn) |
4876 | +print "Test: GET" |
4877 | +ret = py_mysql.get_entity(table_name, key, columns, txn) |
4878 | +print "Test: Valid:" |
4879 | +print ret |
4880 | +print "Test: Committing:" |
4881 | +#print py_mysql.commit(txn) |
4882 | +print ret |
4883 | +if ret[1:] != data: |
4884 | + print "ERROR doing a put then get. Data does not match" |
4885 | + print "returned: " + str(ret) |
4886 | + print "expected: " + str(data) |
4887 | + exit(1) |
4888 | +else: |
4889 | + print "Success" |
4890 | +py_mysql.commit(txn) |
4891 | +print "After committed transaction:" |
4892 | +ret = py_mysql.get_entity(table_name, key, columns) |
4893 | +print ret |
4894 | +txn = random.randint(0,100000000) |
4895 | +py_mysql.setupTransaction(txn) |
4896 | +txn2 = random.randint(0,11000000000) |
4897 | +py_mysql.setupTransaction(txn2) |
4898 | +print "Transaction number:",txn |
4899 | +print "PUT:" |
4900 | +print py_mysql.put_entity(table_name, key, columns, invalid_data, txn) |
4901 | +print "outside transaction:" |
4902 | +ret = py_mysql.get_entity(table_name, key, columns,txn2) |
4903 | +print ret |
4904 | +print "inside transaction:" |
4905 | +ret = py_mysql.get_entity(table_name, key, columns, txn) |
4906 | +print ret |
4907 | +print py_mysql.put_entity(table_name, key, columns, invalid_data, txn) |
4908 | +print "Rollback:" |
4909 | +print py_mysql.rollback(txn) |
4910 | +print "doing a put, rollback, then get" |
4911 | +print "GET" |
4912 | +ret = py_mysql.get_entity(table_name, key, columns) |
4913 | +print "doing a put then get" |
4914 | +print ret |
4915 | +if ret[1:] != data: |
4916 | + print "*" * 60 |
4917 | + print "FAILURE doing a put then get. Data does not match" |
4918 | + print "returned: " + str(ret) |
4919 | + print "expected: " + str(data) |
4920 | + print "*" * 60 |
4921 | + exit(1) |
4922 | +else: |
4923 | + print "Success" |
4924 | + |
4925 | +ret = py_mysql.get_schema("hello") |
4926 | +print ret |
4927 | +print "checking schema:" |
4928 | +print ret |
4929 | +if ret[1:] != columns: |
4930 | + print "ERROR in recieved schema" |
4931 | + print "returned: " + str(ret) |
4932 | + print "expected: " + str(columns) |
4933 | + |
4934 | +#ret = py_mysql.__table_exist(table_name) |
4935 | +#print "Does table we just created exist?" |
4936 | +#print ret |
4937 | + |
4938 | +ret = py_mysql.delete_row(table_name, key) |
4939 | +print "Deleting the key %s"%key |
4940 | +print ret |
4941 | + |
4942 | +ret = py_mysql.get_entity(table_name, key, columns) |
4943 | +print "Trying to get deleted key:" |
4944 | +print ret |
4945 | +print "doing a put with key %s"%key |
4946 | +print py_mysql.put_entity("hello", "1", ["a","b","c"], ["1","2","3"]) |
4947 | +print "doing a get table" |
4948 | +print py_mysql.get_table("hello", ["a","b","c"]) |
4949 | +py_mysql.put_entity("hello", "2", ["a","b","c"], ["4","5","6"]) |
4950 | +print "doing get table:" |
4951 | +print py_mysql.get_table("hello", ["a","b","c"]) |
4952 | +py_mysql.put_entity("hello", "3", ["a","b","c"], ["1","2","3"]) |
4953 | +py_mysql.get_table("hello", ["a","b","c"]) |
4954 | + |
4955 | +print "TRYING TO REPLACE KEY 3" |
4956 | +py_mysql.put_entity("hello", "3", ["a","b","c"], ["1","2","3"]) |
4957 | +py_mysql.get_table("hello", ["a","b","c"]) |
4958 | +py_mysql.get_row_count("hello") |
4959 | +ret = py_mysql.delete_row("hello", "1") |
4960 | +ret = py_mysql.delete_row("hello", "2") |
4961 | +ret = py_mysql.delete_row("hello", "3") |
4962 | +py_mysql.get_table("hello", ["a","b","c"]) |
4963 | +print "Deleting table:" |
4964 | +print py_mysql.delete_table("hello") |
4965 | +print "deleting twice:" |
4966 | +print py_mysql.delete_table("hello") |
4967 | |
4968 | === modified file 'AppDB/soap_server.py' |
4969 | --- AppDB/soap_server.py 2010-03-16 21:22:28 +0000 |
4970 | +++ AppDB/soap_server.py 2010-06-29 01:49:23 +0000 |
4971 | @@ -34,7 +34,7 @@ |
4972 | DEFAULT_APP_LOCATION = ".flatfile_apps" |
4973 | DEFAULT_DATASTORE = "hbase" |
4974 | DEFAULT_SSL_PORT = 4343 |
4975 | -DEFAULT_PORT = 8080 |
4976 | +DEFAULT_PORT = 9899 |
4977 | IP_TABLE = "IPS___" |
4978 | DEFAULT_ENCRYPTION = 1 |
4979 | VALID_DATASTORES = [] |
4980 | |
4981 | === modified file 'AppDB/voldemort/py_voldemort.py' |
4982 | --- AppDB/voldemort/py_voldemort.py 2010-05-03 21:02:23 +0000 |
4983 | +++ AppDB/voldemort/py_voldemort.py 2010-06-29 01:49:23 +0000 |
4984 | @@ -47,12 +47,14 @@ |
4985 | def get(self, key): |
4986 | done = False |
4987 | timeout = RETRY_TIMEOUT |
4988 | - while (done != True): |
4989 | + while (done != True and timeout > 0): |
4990 | try: |
4991 | value = self.real_get(key) |
4992 | done = True |
4993 | except: |
4994 | timeout -= 1 |
4995 | + if timeout <= 0: |
4996 | + raise |
4997 | return value |
4998 | |
4999 | def real_get(self, key): |
5000 | @@ -88,6 +90,8 @@ |
The diff has been truncated for viewing.
All databases other than MySQL appear to work fine (haven't tried apps other than guestbook yet). MySQL has failed twice. Sample output from one run:
Traceback (most recent call last): appscale/ AppDB/mysql/ prime_mysql. py", line 64, in <module> appscale/ AppDB/mysql/ prime_mysql. py", line 32, in drop_database execute( "DROP DATABASE IF EXISTS appscale;") python- support/ python2. 6/MySQLdb/ cursors. py", line 166, in execute errorhandler( self, exc, value) python- support/ python2. 6/MySQLdb/ connections. py", line 35, in defaulterrorhandler exceptions. OperationalErro r: (1051, "Unknown table 'xUSERS__'")
File "/root/
drop_database()
File "/root/
cursor.
File "/var/lib/
self.
File "/var/lib/
raise errorclass, errorvalue
_mysql_
...
gzip: stdin: unexpected end of file 55.237] [][guestbook] AppDB/mysql/ prime_mysql. py:55: Warning: Table 'xAPPS__' already exists
tar: Child returned status 1
tar: Error exit delayed from previous errors
[Tue Jun 29 17:39:47 +0000 2010] saw a cron request with args [128.111.
cron: lang was neither python nor java but was []
cron: lang was neither python nor java but was []
/root/appscale/
Will try running MySQL again.