Merge lp:~nchohan/appscale/SSLCassyMysql into lp:appscale

Proposed by Navraj Chohan
Status: Merged
Merged at revision: 534
Proposed branch: lp:~nchohan/appscale/SSLCassyMysql
Merge into: lp:appscale
Diff against target: 5757 lines (+2317/-1984)
43 files modified
AppController/djinn.rb (+28/-21)
AppController/haproxy.rb (+5/-0)
AppController/helperfunctions.rb (+8/-4)
AppController/load_balancer.rb (+5/-0)
AppController/nginx.rb (+93/-15)
AppController/pbserver.rb (+106/-0)
AppController/terminate.rb (+1/-0)
AppDB/appscale_server.py (+117/-312)
AppDB/appscale_server_native_trans.py (+1086/-0)
AppDB/appscale_server_no_trans.py (+0/-1117)
AppDB/as_transaction.py (+0/-241)
AppDB/cassandra/py_cassandra.py (+209/-62)
AppDB/cassandra/templates/storage-conf.xml (+1/-1)
AppDB/datastore_tester.py (+20/-8)
AppDB/dbinterface.py (+12/-4)
AppDB/dhash_datastore.py (+3/-3)
AppDB/hbase/py_hbase.py (+6/-8)
AppDB/helper_functions.py (+1/-1)
AppDB/hypertable/py_hypertable.py (+8/-2)
AppDB/mongodb/py_mongodb.py (+6/-2)
AppDB/mysql/drop_all_tables.py (+4/-1)
AppDB/mysql/prime_mysql.py (+21/-9)
AppDB/mysql/py_mysql.py (+178/-58)
AppDB/mysql/test_mysql_trans.py (+115/-0)
AppDB/soap_server.py (+1/-1)
AppDB/voldemort/py_voldemort.py (+8/-1)
AppDB/zkappscale/zktransaction.py (+0/-1)
AppDB/zkappscale/zktransaction_stub.py (+175/-0)
AppLoadBalancer/lib/usertools.rb (+2/-2)
AppServer/google/appengine/api/datastore_file_distributed.py (+1/-1)
AppServer/google/appengine/tools/dev_appserver.py (+3/-5)
AppServer/google/appengine/tools/dev_appserver_login.py (+18/-81)
AppServer/google/appengine/tools/dev_appserver_main.py (+20/-4)
AppServer/google/net/proto/ProtocolBuffer.py (+1/-1)
AppServer_Java/build.xml (+6/-13)
AppServer_Java/src/com/google/appengine/api/users/dev/LocalLoginServlet.java (+1/-1)
AppServer_Java/src/com/google/appengine/tools/resources/ResourceLoader.java (+2/-2)
debian/appscale_install.sh (+3/-0)
debian/appscale_install_functions.sh (+38/-0)
debian/control.core.jaunty (+1/-0)
debian/control.core.karmic (+1/-0)
debian/control.core.lucid (+2/-1)
firewall.conf (+1/-1)
To merge this branch: bzr merge lp:~nchohan/appscale/SSLCassyMysql
Reviewer Review Type Date Requested Status
Chris Bunch Approve
Review via email: mp+28595@code.launchpad.net

Description of the change

Tested with tasks on all databases. All of them work except for Voldemort. Voldemort works for guestbook.

To post a comment you must log in.
lp:~nchohan/appscale/SSLCassyMysql updated
480. By root <root@appscale-image>

merging with SSLCassyMySQL

481. By root <root@appscale-image0>

dropping database

482. By root <root@appscale-image0>

merged in changes, fixed conflicts

Revision history for this message
Chris Bunch (cgb-cs) wrote :

All databases other than MySQL appear to work fine (haven't tried apps other than guestbook yet). MySQL has failed twice. Sample output from one run:

Traceback (most recent call last):
  File "/root/appscale/AppDB/mysql/prime_mysql.py", line 64, in <module>
    drop_database()
  File "/root/appscale/AppDB/mysql/prime_mysql.py", line 32, in drop_database
    cursor.execute("DROP DATABASE IF EXISTS appscale;")
  File "/var/lib/python-support/python2.6/MySQLdb/cursors.py", line 166, in execute
    self.errorhandler(self, exc, value)
  File "/var/lib/python-support/python2.6/MySQLdb/connections.py", line 35, in defaulterrorhandler
    raise errorclass, errorvalue
_mysql_exceptions.OperationalError: (1051, "Unknown table 'xUSERS__'")

...

gzip: stdin: unexpected end of file
tar: Child returned status 1
tar: Error exit delayed from previous errors
[Tue Jun 29 17:39:47 +0000 2010] saw a cron request with args [128.111.55.237][][guestbook]
cron: lang was neither python nor java but was []
cron: lang was neither python nor java but was []
/root/appscale/AppDB/mysql/prime_mysql.py:55: Warning: Table 'xAPPS__' already exists

Will try running MySQL again.

Revision history for this message
Chris Bunch (cgb-cs) wrote :

MySQL works fine for me on a single node, but fails on the four node case. Tried both -n 1 and -n 2, but both fail.

Revision history for this message
Chris Bunch (cgb-cs) wrote :

Accepted. MySQL on >1 nodes needs to be fixed, and would like to not have the load balancer use SSL for all communication (this way if all the user cares about is app redirection then they don't see the SSL cert problem).

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'AppController/djinn.rb'
2--- AppController/djinn.rb 2010-06-25 22:28:38 +0000
3+++ AppController/djinn.rb 2010-06-29 01:49:23 +0000
4@@ -101,6 +101,7 @@
5 my_data = my_node
6 if has_soap_server?(my_data)
7 stop_soap_server
8+ stop_pbserver
9 end
10
11 jobs_to_run = my_data.jobs
12@@ -507,7 +508,8 @@
13 def self.get_nearest_db_ip(is_mysql=false)
14 db_ips = self.get_db_slave_ips
15 # Unless this is mysql we include the master ip
16- db_ips << self.get_db_master_ip if !is_mysql
17+ # Update, now mysql also has an API node
18+ db_ips << self.get_db_master_ip
19 db_ips.compact!
20
21 local_ip = HelperFunctions.local_ip
22@@ -810,21 +812,23 @@
23 end
24 if retval != 0
25 Djinn.log_debug("Fail to create initial table. Could not startup AppScale.")
26+ exit(1)
27 # TODO: terminate djinn
28 end
29 end
30
31 # start soap server and pb server
32 if has_soap_server?(my_data)
33+ @state = "Starting up SOAP Server and PBServer"
34+ start_pbserver
35 start_soap_server
36+ HelperFunctions.sleep_until_port_is_open(HelperFunctions.local_ip, UA_SERVER_PORT)
37 end
38
39- # no longer need to start appengine here
40+ # appengine is started elsewhere
41 end
42
43 def start_soap_server
44- @state = "Starting up SOAP Server and Appscale Server(PB server)"
45-
46 db_master_ip = nil
47 @nodes.each { |location|
48 db_master_ip = location.private_ip if location.is_db_master?
49@@ -846,28 +850,31 @@
50 "-s #{HelperFunctions.get_secret}"]
51 Djinn.log_debug(cmd.join(" "))
52 Kernel.system cmd.join(" ")
53+ end
54+
55+ def start_pbserver
56+ db_master_ip = nil
57+ my_ip = my_node.public_ip
58+ @nodes.each { |location|
59+ db_master_ip = location.private_ip if location.is_db_master?
60+ }
61+ abort("db master ip was nil") if db_master_ip.nil?
62+ table = @creds['table']
63 zoo_connection = get_zk_connection_string(@nodes)
64- cmd = [ "MASTER_IP=#{db_master_ip} LOCAL_DB_IP='#{db_local_ip}'",
65- "start-stop-daemon --start",
66- "--exec /usr/bin/python2.6",
67- "--name appscale_server",
68- "--make-pidfile",
69- "--pidfile /var/appscale/appscale-appscaleserver.pid",
70- "--background",
71- "--",
72- "#{APPSCALE_HOME}/AppDB/appscale_server.py",
73- "--type #{table}",
74- "-z \"#{zoo_connection}\"",
75- "-s #{HelperFunctions.get_secret}",
76- "-a #{get_uaserver_ip} --key"]
77- Djinn.log_debug(cmd.join(" "))
78- Kernel.system cmd.join(" ")
79- HelperFunctions.sleep_until_port_is_open(HelperFunctions.local_ip, UA_SERVER_PORT)
80+ PbServer.start(db_master_ip, @userappserver_private_ip, my_ip, table, zoo_connection)
81+ HAProxy.create_pbserver_config(my_ip, PbServer.proxy_port)
82+ Nginx.create_pbserver_config(my_ip, PbServer.proxy_port)
83+ Nginx.restart
84+ # TODO check the return value
85+ PbServer.is_running
86 end
87
88 def stop_soap_server
89 Kernel.system "start-stop-daemon --stop --pidfile /var/appscale/appscale-soapserver.pid"
90- Kernel.system "start-stop-daemon --stop --pidfile /var/appscale/appscale-appscaleserver.pid"
91+ end
92+
93+ def stop_pbserver
94+ PbServer.stop
95 end
96
97 def is_cloud?
98
99=== modified file 'AppController/haproxy.rb'
100--- AppController/haproxy.rb 2010-06-25 22:28:38 +0000
101+++ AppController/haproxy.rb 2010-06-29 01:49:23 +0000
102@@ -61,6 +61,11 @@
103 self.create_app_config(my_ip, listen_port, Monitoring.server_ports, Monitoring.name)
104 end
105
106+ # Create the config file for PBServer applications
107+ def self.create_pbserver_config(my_ip, listen_port)
108+ self.create_app_config(my_ip, listen_port, PbServer.server_ports, PbServer.name)
109+ end
110+
111 # A generic function for creating haproxy config files used by appscale services
112 def self.create_app_config(my_ip, listen_port, server_ports, name)
113 servers = []
114
115=== modified file 'AppController/helperfunctions.rb'
116--- AppController/helperfunctions.rb 2010-06-01 19:38:10 +0000
117+++ AppController/helperfunctions.rb 2010-06-29 01:49:23 +0000
118@@ -23,7 +23,8 @@
119 TIME_IN_SECONDS = { "d" => 86400, "h" => 3600, "m" => 60, "s" => 1 }
120
121 APP_START_PORT = 20000
122-
123+DEFAULT_PBSERVER_PORT = 8443
124+DEFAULT_PBSERVER_NOENCRYPT_PORT = 8888
125 module HelperFunctions
126 def self.write_file(location, contents)
127 File.open(location, "w+") { |file| file.write(contents) }
128@@ -204,15 +205,18 @@
129 Djinn.log_debug("saw a python app coming through")
130 cmd = ["MY_IP_ADDRESS=#{public_ip}",
131 "MY_PORT=#{port}",
132- "NGINX_ADDRESS=#{public_ip}",
133- "NGINX_PORT=#{nginx_port}",
134+ #"NGINX_ADDRESS=#{public_ip}",
135+ #"NGINX_PORT=#{nginx_port}",
136 "python2.5",
137 "#{APPSCALE_HOME}/AppServer/dev_appserver.py",
138 "-p #{port}",
139 "--cookie_secret #{secret}",
140 "--login_server #{public_ip}",
141 "--admin_console_server ''",
142- "--datastore_path #{db_location}",
143+ "--nginx_port #{nginx_port}",
144+ "--nginx_host #{public_ip}",
145+ "--enable_sendmail",
146+ "--datastore_path #{db_location}:#{DEFAULT_PBSERVER_NOENCRYPT_PORT}",
147 "--history_path /var/apps/#{app_name}/data/app.datastore.history",
148 # this is not working.
149 # "--address=#{public_ip}",
150
151=== modified file 'AppController/load_balancer.rb'
152--- AppController/load_balancer.rb 2010-04-21 20:53:02 +0000
153+++ AppController/load_balancer.rb 2010-06-29 01:49:23 +0000
154@@ -7,6 +7,7 @@
155 PROXY_PORT = 8060
156 # The port which requests to this app will be served from
157 LISTEN_PORT = 80
158+ LISTEN_SSL_PORT = 443
159
160 def self.start
161 `service appscale-loadbalancer start`
162@@ -33,6 +34,10 @@
163 LISTEN_PORT
164 end
165
166+ def self.listen_ssl_port
167+ LISTEN_SSL_PORT
168+ end
169+
170 def self.server_ports
171 SERVER_PORTS
172 end
173
174=== modified file 'AppController/nginx.rb'
175--- AppController/nginx.rb 2010-06-25 22:28:38 +0000
176+++ AppController/nginx.rb 2010-06-29 01:49:23 +0000
177@@ -5,7 +5,7 @@
178 require 'fileutils'
179 require 'load_balancer'
180 require 'monitoring'
181-
182+require 'pbserver'
183
184 # A class to wrap all the interactions with the nginx web server
185 class Nginx
186@@ -72,9 +72,6 @@
187
188 #{static_locations}
189
190- location /_ah/admin {
191- # 404 - lock out admin routes
192- }
193
194 location / {
195 proxy_set_header X-Real-IP $remote_addr;
196@@ -117,7 +114,7 @@
197
198 # Create the configuration file for the AppLoadBalancer Rails application
199 def self.create_app_load_balancer_config(my_ip, proxy_port)
200- self.create_app_config(my_ip, proxy_port, LoadBalancer.listen_port, LoadBalancer.name, LoadBalancer.public_directory)
201+ self.create_app_config(my_ip, proxy_port, LoadBalancer.listen_port, LoadBalancer.name, LoadBalancer.public_directory, LoadBalancer.listen_ssl_port)
202 end
203
204 # Create the configuration file for the AppMonitoring Rails application
205@@ -125,6 +122,68 @@
206 self.create_app_config(my_ip, proxy_port, Monitoring.listen_port, Monitoring.name, Monitoring.public_directory)
207 end
208
209+ # Create the configuration file for the pbserver
210+ def self.create_pbserver_config(my_ip, proxy_port)
211+ config = <<CONFIG
212+upstream #{PbServer.name} {
213+ server #{my_ip}:#{proxy_port};
214+}
215+
216+server {
217+ listen #{PbServer.listen_port};
218+ server_name #{my_ip};
219+ root /root/appscale/AppDB/;
220+ access_log /var/log/nginx/pbserver.access.log upstream;
221+ error_log /var/log/nginx/pbserver.error.log;
222+ rewrite_log off;
223+ error_page 404 = /404.html;
224+
225+
226+
227+ location / {
228+ proxy_set_header X-Real-IP $remote_addr;
229+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
230+ proxy_set_header Host $http_host;
231+ proxy_redirect off;
232+ proxy_pass http://#{PbServer.name};
233+ }
234+
235+}
236+
237+server {
238+ listen #{PbServer.listen_ssl_port};
239+ ssl on;
240+ ssl_certificate /etc/nginx/mycert.pem;
241+ ssl_certificate_key /etc/nginx/mykey.pem;
242+ root /root/appscale/AppDB/public;
243+ access_log /var/log/nginx/pbencrypt.access.log upstream;
244+ error_log /var/log/nginx/pbencrypt.error.log;
245+ rewrite_log off;
246+ error_page 502 /502.html;
247+
248+ location / {
249+ proxy_set_header X-Real-IP $remote_addr;
250+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
251+ proxy_set_header Host $http_host;
252+ proxy_redirect off;
253+
254+ client_body_timeout 60;
255+ proxy_read_timeout 60;
256+ #Increase file size so larger applications can be uploaded
257+ client_max_body_size 30M;
258+ # go to proxy
259+ proxy_pass http://#{PbServer.name};
260+ }
261+}
262+CONFIG
263+ config_path = File.join(SITES_ENABLED_PATH, "#{PbServer.name}.#{CONFIG_EXTENSION}")
264+ File.open(config_path, "w+") { |dest_file| dest_file.write(config) }
265+
266+ HAProxy.regenerate_config
267+
268+ end
269+
270+
271 # A generic function for creating nginx config files used by appscale services
272 def self.create_app_config(my_ip, proxy_port, listen_port, name, public_dir, ssl_port=nil)
273
274@@ -132,9 +191,30 @@
275 upstream #{name} {
276 server #{my_ip}:#{proxy_port};
277 }
278-
279-server {
280- listen #{listen_port};
281+CONFIG
282+
283+ if ssl_port
284+ # redirect all request to ssl port.
285+ config += <<CONFIG
286+server {
287+ listen #{listen_port};
288+ rewrite ^(.*) https://#{my_ip}:#{ssl_port}$1 permanent;
289+}
290+
291+server {
292+ listen #{ssl_port};
293+ ssl on;
294+ ssl_certificate #{NGINX_PATH}/mycert.pem;
295+ ssl_certificate_key #{NGINX_PATH}/mykey.pem;
296+CONFIG
297+ else
298+ config += <<CONFIG
299+server {
300+ listen #{listen_port};
301+CONFIG
302+ end
303+
304+ config += <<CONFIG
305 root #{public_dir};
306 access_log /var/log/nginx/load-balancer.access.log upstream;
307 error_log /var/log/nginx/load-balancer.error.log;
308@@ -165,7 +245,6 @@
309 root #{APPSCALE_HOME}/AppLoadBalancer/public;
310 }
311 }
312-
313 CONFIG
314
315 config_path = File.join(SITES_ENABLED_PATH, "#{name}.#{CONFIG_EXTENSION}")
316@@ -209,9 +288,6 @@
317
318 gzip on;
319
320- ssl_certificate #{NGINX_PATH}/cert.pem;
321- ssl_certificate_key #{NGINX_PATH}/cert.key;
322-
323 include #{NGINX_PATH}/sites-enabled/*;
324 }
325 CONFIG
326@@ -223,9 +299,11 @@
327 end
328
329 # copy over certs for ssl
330- `cp #{APPSCALE_HOME}/.appscale/certs/cert.pem #{NGINX_PATH}`
331- `cp #{APPSCALE_HOME}/.appscale/certs/cert.key #{NGINX_PATH}`
332-
333+ # just copy files once to keep certificate as static.
334+ #`test ! -e #{NGINX_PATH}/mycert.pem && cp #{APPSCALE_HOME}/.appscale/certs/mycert.pem #{NGINX_PATH}`
335+ #`test ! -e #{NGINX_PATH}/mykey.pem && cp #{APPSCALE_HOME}/.appscale/certs/mykey.pem #{NGINX_PATH}`
336+ `cp #{APPSCALE_HOME}/.appscale/certs/mykey.pem #{NGINX_PATH}`
337+ `cp #{APPSCALE_HOME}/.appscale/certs/mycert.pem #{NGINX_PATH}`
338 # Write the main configuration file which sets default configuration parameters
339 File.open(MAIN_CONFIG_FILE, "w+") { |dest_file| dest_file.write(config) }
340 end
341
342=== added file 'AppController/pbserver.rb'
343--- AppController/pbserver.rb 1970-01-01 00:00:00 +0000
344+++ AppController/pbserver.rb 2010-06-29 01:49:23 +0000
345@@ -0,0 +1,106 @@
346+#!/usr/bin/ruby -w
347+require 'helperfunctions'
348+
349+# A class to wrap all the interactions with the PbServer
350+class PbServer
351+ APPSCALE_HOME=ENV['APPSCALE_HOME']
352+ # If using just native transactions, then use one server port
353+ SERVER_PORTS = [4000, 4001, 4002]
354+ # The port which nginx will use to send requests to haproxy
355+ PROXY_PORT = 3999
356+ # The port which requests to this app will be served from
357+ LISTEN_PORT = 8888
358+ LISTEN_SSL_PORT = 8443
359+ # The following databases cannot have multiple pbservers
360+ SINGLE_SERVER_TABLE = ["mysql"]
361+ # The following databases have native transaction support
362+ NATIVE_TRANSACTIONS = ["mysql"]
363+ @@pb_master_ip = ""
364+ @@pb_ip = ""
365+ @@pb_table = ""
366+ @@pb_zklocations = ""
367+
368+ def self.start(master_ip, db_local_ip, my_ip, table, zklocations)
369+ @@pb_master_ip = master_ip
370+ @@pb_ip = my_ip
371+ @@pb_table = table
372+ @@pb_zklocations = zklocations
373+ ports = SERVER_PORTS
374+
375+ pbserver = self.pb_script
376+ ports = self.server_ports
377+ ports.each { |pbserver_port|
378+ cmd = [ "MASTER_IP=#{@@pb_master_ip} LOCAL_DB_IP='#{db_local_ip}'",
379+ "start-stop-daemon --start",
380+ "--exec /usr/bin/python2.6",
381+ "--name appscale_server",
382+ "--make-pidfile",
383+ "--pidfile /var/appscale/appscale-appscaleserver-#{pbserver_port}.pid",
384+ "--background",
385+ "--",
386+ "#{pbserver}",
387+ "-p #{pbserver_port}",
388+ "--no_encryption",
389+ "--type #{@@pb_table}",
390+ "-z \"#{@@pb_zklocations}\"",
391+ "-s #{HelperFunctions.get_secret}",
392+ "-a #{@@pb_ip} --key"]
393+ Djinn.log_debug(cmd.join(" "))
394+ Kernel.system cmd.join(" ")
395+ }
396+ end
397+
398+ def self.stop
399+ ports = self.server_ports
400+ ports.each { |pbserver_port|
401+ Kernel.system "start-stop-daemon --stop --pidfile /var/appscale/appscale-appscaleserver-#{pbserver_port}.pid"
402+ }
403+ end
404+
405+ def self.restart
406+ self.stop
407+ # Use cached variables
408+ self.start(@@pb_master_ip, @@pb_ip, @@pb_table, @@pb_zklocations)
409+ end
410+
411+ def self.name
412+ "as_pbserver"
413+ end
414+
415+ def self.public_directory
416+ "/root/appscale/AppDB/public"
417+ end
418+
419+ def self.listen_port
420+ LISTEN_PORT
421+ end
422+
423+ def self.listen_ssl_port
424+ LISTEN_SSL_PORT
425+ end
426+
427+ def self.server_ports
428+ if SINGLE_SERVER_TABLE.include?(@@pb_table)
429+ return SERVER_PORTS.first(1)
430+ else
431+ return SERVER_PORTS
432+ end
433+ end
434+
435+ def self.proxy_port
436+ PROXY_PORT
437+ end
438+
439+ def self.is_running
440+ `curl http://#{@@pb_ip}:#{PROXY_PORT}`
441+ end
442+
443+ def self.pb_script
444+ if NATIVE_TRANSACTIONS.include?(@@pb_table)
445+ return "#{APPSCALE_HOME}/AppDB/appscale_server_native_trans.py"
446+ else
447+ return "#{APPSCALE_HOME}/AppDB/appscale_server.py"
448+ end
449+ end
450+
451+end
452
453=== modified file 'AppController/terminate.rb'
454--- AppController/terminate.rb 2010-06-21 20:39:53 +0000
455+++ AppController/terminate.rb 2010-06-29 01:49:23 +0000
456@@ -28,6 +28,7 @@
457 Collectd.stop
458 LoadBalancer.stop
459 Monitoring.stop
460+PbServer.stop
461 `/etc/init.d/klogd stop`
462
463 `rm -rf /var/apps/`
464
465=== modified file 'AppDB/appscale_server.py'
466--- AppDB/appscale_server.py 2010-05-11 20:49:42 +0000
467+++ AppDB/appscale_server.py 2010-06-29 01:49:23 +0000
468@@ -5,7 +5,9 @@
469 # Soo Hwan Park (suwanny@gmail.com)
470 # Sydney Pang (pang@cs.ucsb.edu)
471 # See LICENSE file
472-
473+import tornado.httpserver
474+import tornado.ioloop
475+import tornado.web
476 import sys
477 import socket
478 import os
479@@ -18,10 +20,7 @@
480 import md5
481 import random
482 import getopt
483-from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
484-from SocketServer import ThreadingMixIn
485 import threading
486-
487 from google.appengine.api import api_base_pb
488 from google.appengine.api import datastore
489 from google.appengine.api import datastore_errors
490@@ -37,8 +36,7 @@
491 from M2Crypto import SSL
492 from drop_privileges import *
493 from zkappscale import zktransaction
494-import as_transaction
495-
496+zk = zktransaction
497 import time
498
499 DEBUG = False
500@@ -48,7 +46,7 @@
501 DEFAULT_APP_LOCATION = ".flatfile_apps"
502 HYPERTABLE_XML_TAG = "Name"
503 DEFAULT_DATASTORE = "files"
504-DEFAULT_SSL_PORT = 443
505+DEFAULT_SSL_PORT = 8443
506 DEFAULT_PORT = 4080
507 DEFAULT_ENCRYPTION = 1
508 CERT_LOCATION = "/etc/appscale/certs/mycert.pem"
509@@ -68,7 +66,7 @@
510 KEYBLOCKSIZE = "50"
511 keyDictionaryLock = None
512 keyDictionary = {}
513-
514+MAX_DICT_SIZE = 1000000
515 optimizedQuery = False
516 ID_KEY_LENGTH = 64
517 tableHashTable = {}
518@@ -78,7 +76,6 @@
519 ssl_cert_file = ""
520 ssl_key_file = ""
521
522-_trans_set = as_transaction.ASTransSet()
523 zoo_keeper = ""
524 zoo_keeper_locations = "localhost:2181"
525
526@@ -93,7 +90,6 @@
527 for the entity table
528 """
529
530-
531 class ThreadLogger:
532 def __init__(self, log):
533 self.logger_ = log
534@@ -109,6 +105,24 @@
535 logger = appscale_logger.getLogger("pb_server")
536
537
538+class putThread(threading.Thread):
539+ def setup(self, db, table, key, fields, values):
540+ self.db = db
541+ self.table = table
542+ self.key = key
543+ self.fields = fields
544+ self.values = values
545+ self.err = None
546+ self.ret = None
547+ self.timeTaken = 0
548+ def run(self):
549+ s = time.time()
550+ self.err, self.ret = self.db.put_entity(self.table, self.key,
551+ self.fields, self.values)
552+ self.timeTaken = time.time() - s
553+
554+
555+
556 def getTableName(app_id, kind, version):
557 return app_id + "___" + kind + "___" + version
558
559@@ -240,7 +254,7 @@
560 else:
561 keyStart, blockSize = zoo_keeper.generateIDBlock(app_id, root)
562 keyStart = long(keyStart)
563- except e:
564+ except Exception, e:
565 print "="*60
566 print "Exception:",str(e)
567 print "="*60
568@@ -250,6 +264,12 @@
569 key = keyStart
570 keyStart = keyStart + 1
571 keyDictionary[index] = keyStart, keyEnd
572+
573+ # To prevent the dictionary from getting to large
574+ # and taking up too much memory
575+ if len(keyDictionary) > MAX_DICT_SIZE:
576+ keyDictionary = {} # reset
577+
578 keyDictionaryLock.release()
579 return key
580
581@@ -258,30 +278,18 @@
582 pass
583
584
585-class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
586- """ Handle requests in a new thread """
587-
588-
589-class AppScaleSecureHandler( BaseHTTPRequestHandler ):
590+
591+class MainHandler(tornado.web.RequestHandler):
592 """
593 Defines what to do when the webserver receives different types of
594 HTTP requests.
595 """
596- def get_http_err_code( self, message ):
597- print message
598- ret = 599 # 599 is the default code for an unknown error #
599- return ret
600-
601- def send_post_http_response( self , http_code , message ):
602- self.send_response( http_code )
603- self.send_header( 'Content-type' , 'text/plain' )
604- self.end_headers()
605- self.wfile.write( message )
606-
607- def send_http_err_response( self, err_msg ):
608- ret_http_code = self.get_http_err_code( err_msg )
609- self.send_post_http_response( ret_http_code, err_msg )
610-
611+ ########################
612+ # GET Request Handling #
613+ ########################
614+ def get(self):
615+ self.write("Hi")
616+
617 # remote api request
618 # sends back a response
619 def remote_request(self, app_id, appscale_version, http_request_data):
620@@ -304,8 +312,6 @@
621 method = apirequest.method()
622 request_data = apirequest.request()
623 http_request_data = request_data.contents()
624-
625- print "REQUEST:",method," AT time",time.time()
626 if method == "Put":
627 response, errcode, errdetail = self.put_request(app_id,
628 appscale_version,
629@@ -393,8 +399,7 @@
630 print "REPLY",method," AT TIME",time.time()
631 print "errcode:",errcode
632 print "errdetail:",errdetail
633- self.send_post_http_response( 200 , apiresponse.Encode() )
634-
635+ self.write( apiresponse.Encode() )
636
637 def run_query(self, app_id, appscale_version, http_request_data):
638 global app_datastore
639@@ -404,14 +409,7 @@
640 if query.has_transaction():
641 txn = query.transaction()
642
643- if not _trans_set.isValid(txn):
644- _trans_set.purge(txn)
645- return (api_base_pb.VoidProto().Encode(),
646- datastore_pb.Error.BAD_REQUEST,
647- 'Transaction timed out.')
648-
649 if not query.has_ancestor():
650- _trans_set.purge(txn)
651 return (api_base_pb.VoidProto().Encode(),
652 datastore_pb.Error.BAD_REQUEST,
653 'Only ancestor queries are allowed inside transactions.')
654@@ -424,22 +422,12 @@
655 datastore_pb.Error.BAD_REQUEST,
656 'No group entity or root key.')
657
658- if _trans_set.needsLock(txn):
659+ try:
660 gotLock= zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
661- if gotLock:
662- _trans_set.setGroup(txn, root_key)
663- else:
664- _trans_set.purge(txn)
665+ except zk.ZKTransactionException, zkex:
666 return (api_base_pb.VoidProto().Encode(),
667 datastore_pb.Error.CONCURRENT_TRANSACTION,
668- 'Another transaction is running.')
669- else:
670- if _trans_set.hasLockExpired(txn):
671- #zoo_keeper.notifyOfExpiredLock(app_id, root_key)
672- _trans_set.purge(txn)
673- return (api_base_pb.VoidProto().Encode(),
674- datastore_pb.Error.TIMEOUT,
675- 'The transaction lease has expired.')
676+ 'Another transaction is running. %s'%zkex.message)
677
678 if not query.has_kind():
679 # Return nothing in case of error #
680@@ -448,20 +436,15 @@
681 "Kindless queries are not implemented.")
682 else:
683 kind = query.kind()
684- #print "Query kind:",kind
685-
686- # Verify validity of the entity name and applicaiton id #
687- # according to the naming sheme for entity tables #
688- #assert kind[-2:] != "__"
689- #assert app_id[-1] != "_"
690-
691 # Fetch query from the datastore #
692 table_name = getTableName(app_id, kind, appscale_version)
693- #print "Query using table name: %s, %s" % (table_name, ENTITY_TABLE_SCHEMA)
694 r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA)
695- #print "result: %s" % str(r)
696+ err = r[0]
697+ if err not in ERROR_CODES:
698+ return (api_base_pb.VoidProto().Encode(),
699+ datastore_pb.Error.INTERNAL_ERROR,
700+ "Error running query--." + err)
701
702- #err = r[0]
703 if len(r) > 1:
704 results = r[1:]
705 else:
706@@ -471,8 +454,6 @@
707 versions = results[1::2]
708 # evens are encoded entities
709 results = results[0::2]
710- #print "RESULTS:",results
711- #print "VERSIONS:",versions
712 if len(versions) != len(results):
713 return(api_base_pb.VoidProto().Encode(),
714 datastore_pb.Error.INTERNAL_ERROR,
715@@ -498,7 +479,6 @@
716 row_key = getRowKeyFromDeletedEncoding(encoded_ent)
717 else:
718 row_key = getRowKeyFromKeyType(app_id, encoded_ent.key())
719- #print "constructed row key:",row_key
720 root_key = self.getRootKeyFromEntity(app_id, encoded_ent)
721
722 #TODO make sure all deleted keys use the same encoding
723@@ -509,8 +489,6 @@
724 journal_result = ["DB_ERROR:",DELETED]
725 elif prev_version != long(ii):
726 # if the versions don't match, a valid version must be fetched
727- print "previous version from zk:",prev_version
728- print "prev version:",str(ii)
729 journal_key = getJournalKey(row_key, prev_version)
730 journal_table = getJournalTable(app_id, appscale_version)
731 journal_result = app_datastore.get_entity( journal_table,
732@@ -638,59 +616,24 @@
733 transaction_pb = datastore_pb.Transaction()
734 # handle = zk.getTransactionID(app_id)
735 handle = zoo_keeper.getTransactionID(app_id)
736- print "Begin Trans Handle:",handle
737 transaction_pb.set_handle(handle)
738- _trans_set.add(transaction_pb)
739 return (transaction_pb.Encode(), 0, "")
740
741 def commit_transaction_request(self, app_id, appscale_version, http_request_data):
742 txn = datastore_pb.Transaction(http_request_data)
743 commitres_pb = datastore_pb.CommitResponse()
744- if not _trans_set.isValid(txn):
745- _trans_set.purge(txn)
746- return (commitres_pb.Encode(),
747- datastore_pb.Error.BAD_REQUEST,
748- 'Transaction timed out.')
749
750- if _trans_set.needsLock(txn):
751- _trans_set.purge(txn)
752- return (commitres_pb.Encode(),
753- datastore_pb.Error.BAD_REQUEST,
754- 'Commiting without owning the group lock.')
755-
756- # if zk.releaseLock(app_id, txn.handle()):
757- #try:
758 if zoo_keeper.releaseLock(app_id, txn.handle()):
759- _trans_set.purge(txn)
760 return (commitres_pb.Encode(), 0, "")
761 else:
762 # ZK client must now deal with lock
763 return (commitres_pb.Encode(),
764 datastore_pb.Error.INTERNAL_ERROR,
765 "Unable to release lock")
766- #except :
767- # print "exception:",str(ii)
768- # return (commitres_pb.Encode(),
769- # datastore_pb.Error.INTERNAL_ERROR,
770- # "Releasing a lock with the wrong group")
771-
772
773 def rollback_transaction_request(self, app_id, appscale_version, http_request_data):
774 txn = datastore_pb.Transaction(http_request_data)
775 zoo_keeper.notifyFailedTransaction(app_id, txn.handle())
776- print "ROLLBACK for transaction:",txn.handle()
777- if not _trans_set.isValid(txn):
778- _trans_set.purge(txn)
779- return (api_base_pb.VoidProto().Encode(),
780- datastore_pb.Error.BAD_REQUEST,
781- 'Transaction timed out.')
782- if _trans_set.needsLock(txn):
783- _trans_set.purge(txn)
784- return (api_base_pb.VoidProto().Encode(),
785- datastore_pb.Error.BAD_REQUEST,
786- 'Rolling back without owning the group lock.')
787-
788- _trans_set.purge(txn)
789 return (api_base_pb.VoidProto().Encode(), 0, "")
790
791
792@@ -766,7 +709,6 @@
793 if not request.has_transaction():
794 rollback_req = datastore_pb.Transaction()
795 rollback_req.set_handle(internal_txn)
796- #print "ROLLING BACK FOR %s" % internal_txn
797 self.rollback_transaction_request(app_id,
798 "version",
799 rollback_req.Encode())
800@@ -806,12 +748,8 @@
801 # Only dealing with root puts
802 if e.key().path().element_size() == 1:
803 root_path = e.key().path().mutable_element(0)
804- #print "has id:",root_path.has_id(), "has name:",root_path.has_name()
805 if root_path.id() == 0 and not root_path.has_name():
806- # zk.generateIDBlock(self, app_id, entity_key = GLOBAL_ID_KEY):
807- #new_key = root_key + "/" + last_path.type()
808 uid = generate_unique_id(app_id, None, None)
809- print "Assigned uid to new root key:",str(uid)
810 if uid <= 0:
811 return (putresp_pb.Encode(),
812 datastore_pb.Error.INTERNAL_ERROR,
813@@ -819,36 +757,22 @@
814 root_path.set_id(uid)
815
816 if putreq_pb.has_transaction():
817- #print "This put has transaction"
818 txn = putreq_pb.transaction()
819
820- if not _trans_set.isValid(txn):
821- #_trans_set.purge(txn)
822- return (putresp_pb.Encode(),
823- datastore_pb.Error.BAD_REQUEST,
824- 'Transaction timed out.')
825
826 root_key, errcode, errdetail = self.getRootKeyFromTransPut(app_id, putreq_pb)
827 if errcode != 0:
828- #_trans_set.purge(txn)
829 return (putresp_pb.Encode(),
830 errcode,
831 errdetail)
832- #print "Root key: ",root_key
833- if _trans_set.needsLock(txn):
834- # zk.acquireLock(app_id, txn.handle())
835+ try:
836 gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
837- if gotLock:
838- _trans_set.setGroup(txn, root_key)
839- else:
840- #_trans_set.purge(txn)
841- return (putresp_pb.Encode(),
842- datastore_pb.Error.CONCURRENT_TRANSACTION,
843- 'Another transaction is running.')
844+ except zk.ZKTransactionException, zkex:
845+ return (putresp_pb.Encode(),
846+ datastore_pb.Error.CONCURRENT_TRANSACTION,
847+ 'Error trying to acquire lock. %s'%zkex.message)
848
849 # Gather data from Put Request #
850- #print "Entity list for put:"
851- #print putreq_pb.entity_list()
852 for e in putreq_pb.entity_list():
853
854 for prop in e.property_list() + e.raw_property_list():
855@@ -904,7 +828,6 @@
856 # All writes are transactional and per entity if
857 # not already wrapped in a transaction
858 if not putreq_pb.has_transaction():
859- #print "This is not a composite transaction"
860 txn, err, errcode = self.begin_transaction_request(app_id,
861 appscale_version,
862 http_request_data)
863@@ -912,33 +835,24 @@
864 txn = datastore_pb.Transaction(txn)
865
866 if err != 0:
867- _trans_set.purge(txn)
868 return (putresp_pb.Encode(), err, errcode)
869 root_key = getRootKey(app_id, e.key().path().element_list())
870- # TODO what if this is a new key and does not have a uid
871 if root_key == None:
872 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
873 return (putresp_pb.Encode(),
874 datastore_pb.Error.BAD_REQUEST,
875 'No group entity or root key.')
876-
877- #print "Root key: ",root_key
878- if _trans_set.needsLock(txn):
879- #print "needs lock"
880+ try:
881 gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
882- if gotLock:
883- _trans_set.setGroup(txn, root_key)
884- else:
885- #_trans_set.purge(txn)
886- self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
887- return (putresp_pb.Encode(),
888- datastore_pb.Error.CONCURRENT_TRANSACTION,
889- 'Another transaction is running.')
890+ except zk.ZKTransactionException, zkex:
891+ self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
892+ return (putresp_pb.Encode(),
893+ datastore_pb.Error.CONCURRENT_TRANSACTION,
894+ 'Another transaction is running %s.'%zkex.message)
895 #######################################
896 # Done with key assignment
897 # Notify Soap Server of any new tables
898 #######################################
899- #print "Putting of type:",kind,"with uid of",str(uid)
900 # insert key
901 table_name = getTableName(app_id, kind, appscale_version)
902 #print "Put Using table name:",table_name
903@@ -991,59 +905,42 @@
904 row_key)
905
906 try:
907- print "Using handle for put:",txn.handle()
908 zoo_keeper.registUpdatedKey(app_id, txn.handle(), prev_version, row_key)
909 except e:
910- print "Exception thrown by register update key",str(e)
911 self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
912 return (putresp_pb.Encode(),
913 datastore_pb.Error.INTERNAL_ERROR,
914 "Timeout: Unable to update ZooKeeper on change set for transaction")
915-
916+ journalPut = putThread()
917 journal_key = getJournalKey(row_key, txn.handle())
918-
919- field_name_list = JOURNAL_SCHEMA
920- field_value_list = [e.Encode()]
921 journal_table = getJournalTable(app_id, appscale_version)
922- print "Putting to ",journal_table,"key:",journal_key
923- err, res = app_datastore.put_entity( journal_table,
924- journal_key,
925- field_name_list,
926- field_value_list )
927-
928- if err not in ERROR_CODES:
929- #_trans_set.purge(txn)
930- self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
931- return (putresp_pb.Encode(),
932- datastore_pb.Error.INTERNAL_ERROR,
933- err + ", Unable to write to journal")
934+ journalPut.setup(app_datastore,
935+ journal_table,
936+ journal_key,
937+ JOURNAL_SCHEMA,
938+ [e.Encode()])
939+ entPut = putThread()
940+ entPut.setup(app_datastore,
941+ table_name,
942+ row_key,
943+ ENTITY_TABLE_SCHEMA,
944+ [e.Encode(), str(txn.handle())])
945+ journalPut.start()
946+ entPut.start()
947+ journalPut.join()
948+ entPut.join()
949+ if journalPut.err not in ERROR_CODES:
950+ self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
951+ return (putresp_pb.Encode(),
952+ datastore_pb.Error.INTERNAL_ERROR,
953+ journalPut.err + ", Unable to write to journal")
954+ if entPut.err not in ERROR_CODES:
955+ self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
956+ return (putresp_pb.Encode(),
957+ datastore_pb.Error.INTERNAL_ERROR,
958+ entPut.err + ", Unable to write to journal")
959
960- field_name_list = ENTITY_TABLE_SCHEMA
961- field_value_list = [e.Encode(), str(txn.handle())]
962- #print "put entity field name: %s, field value: %s" % (str(field_name_list), str(field_value_list))
963- err, res = app_datastore.put_entity( table_name,
964- row_key,
965- field_name_list,
966- field_value_list)
967-
968- if err not in ERROR_CODES:
969- #_trans_set.purge(txn)
970- self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
971- return (putresp_pb.Encode(),
972- datastore_pb.Error.INTERNAL_ERROR,
973- err)
974-
975- # RANDOM ERRORS
976- """
977- rand = random.random()
978- rand = int(rand * 10000)
979- if rand % 100 == 0:
980- self.maybe_rollback_transaction(app_id, putreq_pb, txn.handle())
981- print "RANDOM ERROR!!!"
982- return(putresp_pb.Encode(),
983- datastore_pb.Error.BAD_REQUEST,
984- 'Random Error.')
985- """
986+
987 if not putreq_pb.has_transaction():
988 com_res, errcode, errdetail = self.commit_transaction_request(app_id,
989 appscale_version,
990@@ -1068,28 +965,18 @@
991 if getreq_pb.has_transaction():
992 txn = getreq_pb.transaction()
993
994- if not _trans_set.isValid(txn):
995- _trans_set.purge(txn)
996- return (getresp_pb.Encode(),
997- datastore_pb.Error.BAD_REQUEST,
998- 'Transaction timed out.')
999-
1000 root_key, errcode, errdetail = self.getRootKeyFromTransGet(app_id,getreq_pb)
1001 if errcode != 0:
1002- _trans_set.purge(txn)
1003 return (getresp_pb.Encode(),
1004 errcode,
1005 errdetail)
1006
1007- if _trans_set.needsLock(txn):
1008+ try:
1009 gotLock= zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
1010- if gotLock:
1011- _trans_set.setGroup(txn, root_key)
1012- else:
1013- _trans_set.purge(txn)
1014- return (getresp_pb.Encode(),
1015- datastore_pb.Error.CONCURRENT_TRANSACTION,
1016- 'Another transaction is running.')
1017+ except zk.ZKTransactionException, zkex:
1018+ return (getresp_pb.Encode(),
1019+ datastore_pb.Error.CONCURRENT_TRANSACTION,
1020+ 'Another transaction is running. %s'%zkex.message)
1021
1022 for key in getreq_pb.key_list():
1023 key.set_app(app_id)
1024@@ -1106,7 +993,6 @@
1025 logger.debug("get: %s___%s___%s %s" % (app_id, kind, appscale_version, str(entity_id)))
1026 table_name = getTableName(app_id, kind, appscale_version)
1027 row_key = getRowKey(app_id,key.path().element_list())
1028- print "Get row key:",row_key
1029 r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA )
1030 err = r[0]
1031 if err not in ERROR_CODES or len(r) != 3:
1032@@ -1128,23 +1014,18 @@
1033 prev_version,
1034 row_key)
1035 if valid_txn != prev_version:
1036- print "Using journal version",valid_txn," versus invalid:",prev_version
1037 prev_version = valid_txn
1038 if prev_version == long(NONEXISTANT_TRANSACTION):
1039 entity = None
1040 else:
1041 journal_table = getJournalTable(app_id, appscale_version)
1042 journal_key = getJournalKey(row_key, prev_version)
1043- print "Retriving from ",journal_table,"key:",journal_key
1044 r = app_datastore.get_entity(journal_table, journal_key, ENTITY_TABLE_SCHEMA[:1] )
1045 err = r[0]
1046 if err not in ERROR_CODES or len(r) != 2:
1047- print r
1048- print "UNABLE TO GET JOURNAL VERSION"
1049 r = ["",DELETED]
1050
1051 if len(r) > 1:
1052- print "GOT JOURNAL VERSION"
1053 entity = r[1]
1054
1055 if entity[0:len(DELETED)] == DELETED:
1056@@ -1156,7 +1037,6 @@
1057 group.mutable_entity().CopyFrom(e_pb)
1058
1059 # Send Response #
1060- print getresp_pb
1061 logger.debug("GET_RESPONSE: %s" % getresp_pb)
1062 return (getresp_pb.Encode(), 0, "")
1063
1064@@ -1174,31 +1054,19 @@
1065 logger.debug("DELETE_REQUEST: %s" % delreq_pb)
1066 delresp_pb = api_base_pb.VoidProto()
1067 if delreq_pb.has_transaction():
1068- print "This delete has a transaction"
1069 txn = delreq_pb.transaction()
1070
1071- if not _trans_set.isValid(txn):
1072- #_trans_set.purge(txn)
1073- return (delresp_pb.Encode(),
1074- datastore_pb.Error.BAD_REQUEST,
1075- 'Transaction timed out.')
1076-
1077 root_key, errcode, errdetail = self.getRootKeyFromTransDel(app_id, delreq_pb)
1078 if errcode != 0:
1079- #_trans_set.purge(txn)
1080 return (delresp_pb.Encode(),
1081 errcode,
1082 errdetail)
1083- if _trans_set.needsLock(txn):
1084- # zk.acquireLock(app_id, txn.handle())
1085+ try:
1086 gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
1087- if gotLock:
1088- _trans_set.setGroup(txn, root_key)
1089- else:
1090- #_trans_set.purge(txn)
1091- return (delresp_pb.Encode(),
1092- datastore_pb.Error.CONCURRENT_TRANSACTION,
1093- 'Another transaction is running.')
1094+ except zk.ZKTransactionException, zkex:
1095+ return (delresp_pb.Encode(),
1096+ datastore_pb.Error.CONCURRENT_TRANSACTION,
1097+ 'Another transaction is running. %s'%zkex.message)
1098
1099
1100 for key in delreq_pb.key_list():
1101@@ -1212,7 +1080,6 @@
1102 # All deletes are transactional and per entity if
1103 # not already wrapped in a transaction
1104 if not delreq_pb.has_transaction():
1105- print "This is not a composite transaction"
1106 txn, err, errcode = self.begin_transaction_request(app_id,
1107 appscale_version,
1108 http_request_data)
1109@@ -1220,7 +1087,6 @@
1110 txn = datastore_pb.Transaction(txn)
1111
1112 if err != 0:
1113- _trans_set.purge(txn)
1114 return (delresp_pb.Encode(), err, errcode)
1115 root_key = getRootKey(app_id, key.path().element_list())
1116 if root_key == None:
1117@@ -1233,15 +1099,13 @@
1118 # Get a lock for the group
1119 # Do a read before a write to get the old values
1120 ################################################
1121- gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
1122- if gotLock:
1123- _trans_set.setGroup(txn, root_key)
1124- else:
1125- #_trans_set.purge(txn)
1126+ try:
1127+ gotLock = zoo_keeper.acquireLock( app_id, txn.handle(), root_key)
1128+ except zk.ZKTransactionException, zkex:
1129 self.maybe_rollback_transaction(app_id, delreq_pb, txn.handle())
1130 return (delresp_pb.Encode(),
1131 datastore_pb.Error.CONCURRENT_TRANSACTION,
1132- 'Another transaction is running.')
1133+ 'Another transaction is running. %s'%zkex.message)
1134
1135 ##########################
1136 # Get the previous version
1137@@ -1292,7 +1156,6 @@
1138 field_value_list )
1139
1140 if err not in ERROR_CODES:
1141- #_trans_set.purge(txn)
1142 self.maybe_rollback_transaction(app_id, delreq_pb, txn.handle())
1143 return (delresp_pb.Encode(),
1144 datastore_pb.Error.INTERNAL_ERROR,
1145@@ -1416,31 +1279,18 @@
1146 print "http done"
1147 self.void_proto(app_id, appscale_version, http_request_data)
1148
1149- ########################
1150- # GET Request Handling #
1151- ########################
1152- def do_GET( self ):
1153- self.send_error( 404 , 'File Not Found: %s' % self.path )
1154
1155 #########################
1156 # POST Request Handling #
1157 #########################
1158- def do_POST( self ):
1159- start_time = time.time()
1160- http_request_data = self.rfile.read(int(self.headers.getheader('content-length')))
1161- inter_time = time.time()
1162- logger.debug("Timing for pre pre env setup:" + " " +
1163- str(start_time) + " " + str(inter_time) +
1164- " total time: " + str(inter_time - start_time) + "\n")
1165- print "intertime - starttime:",(str(inter_time - start_time))
1166- pb_type = self.headers.getheader( 'protocolbuffertype' )
1167- app_data = self.headers.getheader('appdata')
1168+ @tornado.web.asynchronous
1169+ def post( self ):
1170+ request = self.request
1171+ http_request_data = request.body
1172+ pb_type = request.headers['protocolbuffertype']
1173+ app_data = request.headers['appdata']
1174 app_data = app_data.split(':')
1175 logger.debug("POST len: %d" % len(app_data))
1176- inter_time = time.time()
1177- logger.debug("Timing for pre env setup:" + " " +
1178- str(start_time) + " " + str(inter_time) +
1179- " total time: " + str(inter_time - start_time) + "\n")
1180
1181 if len(app_data) == 5:
1182 app_id, user_email, nick_name, auth_domain, appscale_version = app_data
1183@@ -1470,42 +1320,12 @@
1184 # Default HTTP Response Data #
1185 logger.debug("For app id: " + app_id)
1186 logger.debug("For app version: " + appscale_version)
1187- inter_time = time.time()
1188- logger.debug("Timing for env setup:" + pb_type + " " +
1189- app_id + " " + str(start_time) + " " +
1190- str(inter_time) + " total time: " + str(inter_time - start_time) + "\n")
1191
1192 if pb_type == "Request":
1193 self.remote_request(app_id, appscale_version, http_request_data)
1194 else:
1195 self.unknown_request(app_id, appscale_version, http_request_data, pb_type)
1196-
1197- stop_time = time.time()
1198-
1199- #if logOn == True:
1200- #logFilePtr.write(pb_type + " " + app_id + " " +
1201- #str(start_time) + " " + str(stop_time) + " total time: " +
1202- #str(stop_time - start_time) + "\n")
1203-
1204- logger.debug(pb_type + " " + app_id + " " + str(start_time) + " " +
1205- str(stop_time) + " total time: " + str(stop_time - start_time) + "\n")
1206-
1207-class AppScaleUnSecureServerThreaded( ThreadingMixIn, HTTPServer):
1208- pass
1209-
1210-class AppScaleSecureServerThreaded( ThreadingMixIn, HTTPServer ):
1211- def __init__( self):
1212- global local_server_address
1213- global HandlerClass
1214- global ssl_cert_file
1215- global ssl_key_file
1216- BaseServer.__init__( self, local_server_address, HandlerClass )
1217- ctx = SSL.Context()
1218- ctx.load_cert( ssl_cert_file, ssl_key_file )
1219- self.socket = SSL.Connection( ctx )
1220- self.server_bind()
1221- self.server_activate()
1222-
1223+ self.finish()
1224 def usage():
1225 print "AppScale Server"
1226 print
1227@@ -1518,6 +1338,11 @@
1228 print "\t--blocksize=<key-block-size>"
1229 print "\t--optimized_query"
1230 print "\t--no_encryption"
1231+
1232+pb_application = tornado.web.Application([
1233+ (r"/*", MainHandler),
1234+])
1235+
1236 def main(argv):
1237 global app_datastore
1238 global getKeyFromServer
1239@@ -1599,43 +1424,23 @@
1240 exit(1)
1241
1242 tableServer = SOAPpy.SOAPProxy("https://" + soapServer + ":" + str(keyPort))
1243-
1244- # # Bind Port #
1245- #server = AppScaleSecureServer( ('',DEFAULT_SSL_PORT),
1246- # AppScaleSecureHandler, cert_file, key_file )
1247- #help(ThreadedHTTPServer)
1248- global local_server_address
1249- global HandlerClass
1250- global ssl_cert_file
1251- global ssl_key_file
1252 global keyDictionaryLock
1253
1254 zoo_keeper = zktransaction.ZKTransaction(zoo_keeper_locations)
1255 keyDictionaryLock = threading.Lock()
1256 if port == DEFAULT_SSL_PORT and not isEncrypted:
1257 port = DEFAULT_PORT
1258- local_server_address = ('',port)
1259- HandlerClass = AppScaleSecureHandler
1260- ssl_cert_file = cert_file
1261- ssl_key_file = key_file
1262- if isEncrypted:
1263- server = AppScaleSecureServerThreaded()
1264- else:
1265- server = AppScaleUnSecureServerThreaded(local_server_address, HandlerClass)
1266- sa = server.socket.getsockname()
1267- if not db_type == "timesten":
1268- # Stop running as root, security purposes #
1269- drop_privileges()
1270- logger.debug("\n\nStarting AppScale-Secure-Server on %s:%s" % (sa[0], sa[1]))
1271+ server = tornado.httpserver.HTTPServer(pb_application)
1272+ server.listen(port)
1273
1274 while 1:
1275 try:
1276 # Start Server #
1277- server.serve_forever()
1278+ tornado.ioloop.IOLoop.instance().start()
1279 except SSL.SSLError:
1280- logger.debug("\n\nUnexcepted input for AppScale-Secure-Server on %s:%s" % (sa[0], sa[1]))
1281+ logger.debug("\n\nUnexcepted input for AppScale-Secure-Server")
1282 except KeyboardInterrupt:
1283- server.socket.close()
1284+ #server.socket.close()
1285 print "Server interrupted by user, terminating..."
1286 exit(1)
1287
1288
1289=== added file 'AppDB/appscale_server_native_trans.py'
1290--- AppDB/appscale_server_native_trans.py 1970-01-01 00:00:00 +0000
1291+++ AppDB/appscale_server_native_trans.py 2010-06-29 01:49:23 +0000
1292@@ -0,0 +1,1086 @@
1293+#!/usr/bin/python
1294+#
1295+# Author:
1296+# Navraj Chohan (nchohan@cs.ucsb.edu)
1297+# Soo Hwan Park (suwanny@gmail.com)
1298+# Sydney Pang (pang@cs.ucsb.edu)
1299+# See LICENSE file
1300+import tornado.httpserver
1301+import tornado.ioloop
1302+import tornado.web
1303+
1304+import sys
1305+import socket
1306+import os
1307+import types
1308+import appscale_datastore
1309+#import helper_functions
1310+import SOAPpy
1311+from dbconstants import *
1312+import appscale_logger
1313+import md5
1314+import random
1315+import getopt
1316+import threading
1317+
1318+from google.appengine.api import api_base_pb
1319+from google.appengine.api import datastore
1320+from google.appengine.api import datastore_errors
1321+from google.appengine.api import datastore_types
1322+from google.appengine.api import users
1323+from google.appengine.datastore import datastore_pb
1324+from google.appengine.datastore import datastore_index
1325+from google.appengine.runtime import apiproxy_errors
1326+from google.net.proto import ProtocolBuffer
1327+from google.appengine.datastore import entity_pb
1328+from google.appengine.ext.remote_api import remote_api_pb
1329+from SocketServer import BaseServer
1330+from M2Crypto import SSL
1331+from drop_privileges import *
1332+from zkappscale import zktransaction
1333+
1334+import time
1335+
1336+DEBUG = False
1337+APP_TABLE = APPS_TABLE
1338+USER_TABLE = USERS_TABLE
1339+DEFAULT_USER_LOCATION = ".flatfile_users"
1340+DEFAULT_APP_LOCATION = ".flatfile_apps"
1341+HYPERTABLE_XML_TAG = "Name"
1342+DEFAULT_DATASTORE = "files"
1343+DEFAULT_SSL_PORT = 8443
1344+DEFAULT_PORT = 4080
1345+DEFAULT_ENCRYPTION = 1
1346+CERT_LOCATION = "/etc/appscale/certs/mycert.pem"
1347+KEY_LOCATION = "/etc/appscale/certs/mykey.pem"
1348+SECRET_LOCATION = "/etc/appscale/secret.key"
1349+VALID_DATASTORES = []
1350+ERROR_CODES = []
1351+app_datastore = []
1352+logOn = False
1353+logFilePtr = ""
1354+zoo_keeper = ""
1355+
1356+getKeyFromServer = False
1357+soapServer = "localhost"
1358+tableServer = ""
1359+keyPort = 4343
1360+keySecret = ""
1361+KEYBLOCKSIZE = "50"
1362+keyDictionaryLock = None
1363+keyDictionary = {}
1364+
1365+optimizedQuery = False
1366+ID_KEY_LENGTH = 64
1367+tableHashTable = {}
1368+
1369+local_server_address = ""
1370+HandlerClass = ""
1371+ssl_cert_file = ""
1372+ssl_key_file = ""
1373+
1374+DELETED = "DELETED___"
1375+"""
1376+Deleted keys are DELETED/<row_key>
1377+"""
1378+
1379+"""
1380+keys for tables take the format
1381+appname/Grandparent:<ID>/Parent:<ID>/Child:<ID>
1382+for the entity table
1383+"""
1384+
1385+
1386+class ThreadLogger:
1387+ def __init__(self, log):
1388+ self.logger_ = log
1389+ self.log_lock = threading.Lock()
1390+
1391+ def debug(self, string):
1392+ return
1393+ self.log_lock.acquire()
1394+ print string
1395+ self.logger_.info(string)
1396+ self.log_lock.release()
1397+
1398+logger = appscale_logger.getLogger("pb_server")
1399+
1400+
1401+def getTableName(app_id, kind, version):
1402+ return app_id + "___" + kind + "___" + version
1403+
1404+def getRowKey(app_id, ancestor_list):
1405+ if ancestor_list == None:
1406+ logger.debug("Generate row key received null ancestor list")
1407+ return ""
1408+
1409+ key = app_id
1410+
1411+ # Note: mysql cannot have \ as the first char in the row key
1412+ for a in ancestor_list:
1413+ key += "/"
1414+ if a.has_type():
1415+ key += a.type()
1416+
1417+ if a.has_id():
1418+ zero_padded_id = ("0" * (ID_KEY_LENGTH - len(str(a.id())))) + str(a.id())
1419+ key += ":" + zero_padded_id
1420+ elif a.has_name():
1421+ # append _ if the name is a number, prevents collisions of key names
1422+ if a.name().isdigit():
1423+ key += ":__key__" + a.name()
1424+ else:
1425+ key += ":" + a.name()
1426+ return key
1427+
1428+
1429+def getRootKey(app_id, ancestor_list):
1430+ key = app_id # mysql cannot have \ as the first char in the row key
1431+ a = ancestor_list[0]
1432+ key += "/"
1433+
1434+ # append _ if the name is a number, prevents collisions of key names
1435+ if a.has_type():
1436+ key += a.type()
1437+ else:
1438+ return None
1439+
1440+ if a.has_id():
1441+ zero_padded_id = ("0" * (ID_KEY_LENGTH - len(str(a.id())))) + str(a.id())
1442+ key += ":" + zero_padded_id
1443+ elif a.has_name():
1444+ if a.name().isdigit():
1445+ key += ":__key__" + a.name()
1446+ else:
1447+ key += ":" + a.name()
1448+ else:
1449+ return None
1450+
1451+ return key
1452+
1453+
1454+def getRootKeyFromKeyType(app_id, key):
1455+ ancestor_list = key._Key__reference.path().element_list()
1456+ return getRootKey(app_id, ancestor_list)
1457+
1458+
1459+def getRowKeyFromKeyType(app_id, key):
1460+ ancestor_list = key._Key__reference.path().element_list()
1461+ return getRowKey(app_id, ancestor_list)
1462+
1463+def generate_unique_id(app_id, root, isChild):
1464+ global keyDictionary
1465+ global keyDictionaryLock
1466+
1467+ if isChild:
1468+ if not root:
1469+ return -1
1470+
1471+ index = None
1472+ if isChild:
1473+ index = app_id + "/" + str(root)
1474+ else:
1475+ index = app_id
1476+
1477+ keyDictionaryLock.acquire()
1478+ try:
1479+ keyStart, keyEnd = keyDictionary[index]
1480+ except:
1481+ keyStart = 0
1482+ keyEnd = 0
1483+
1484+ key = 0
1485+ if keyStart != keyEnd:
1486+ key = keyStart
1487+ keyStart = keyStart + 1
1488+ keyDictionary[index]= keyStart, keyEnd
1489+ keyDictionaryLock.release()
1490+ return key
1491+ else:
1492+ try:
1493+ if not isChild:
1494+ keyStart, blockSize = zoo_keeper.generateIDBlock(app_id)
1495+ keyStart = long(keyStart)
1496+ else:
1497+ keyStart, blockSize = zoo_keeper.generateIDBlock(app_id, root)
1498+ keyStart = long(keyStart)
1499+ except:
1500+ print "="*60
1501+ print "Exception: when getting id block"
1502+ print "="*60
1503+ keyDictionaryLock.release()
1504+ return -1
1505+ keyEnd = keyStart + long(blockSize)
1506+ key = keyStart
1507+ keyStart = keyStart + 1
1508+ keyDictionary[index] = keyStart, keyEnd
1509+ keyDictionaryLock.release()
1510+ return key
1511+
1512+
1513+def getRootKeyFromRef(app_id, ref):
1514+ if not ref.has_path():
1515+ return False
1516+ path = ref.path()
1517+ element_list = path.element_list()
1518+ return getRootKey(app_id, element_list)
1519+
1520+
1521+def rollback_function(app_id, trans_id, root_key, change_set):
1522+ pass
1523+
1524+
1525+
1526+
1527+class MainHandler(tornado.web.RequestHandler):
1528+ """
1529+ Defines what to do when the webserver receives different types of
1530+ HTTP requests.
1531+ """
1532+ @tornado.web.asynchronous
1533+ def get(self):
1534+ self.write("Hi")
1535+ self.finish()
1536+ # remote api request
1537+ # sends back a response
1538+ def remote_request(self, app_id, appscale_version, http_request_data):
1539+ apirequest = remote_api_pb.Request(http_request_data)
1540+ apiresponse = remote_api_pb.Response()
1541+ response = None
1542+ errcode = 0
1543+ errdetail = ""
1544+ apperror_pb = None
1545+
1546+ if not apirequest.has_method():
1547+ errcode = datastore_pb.Error.BAD_REQUEST
1548+ errdetail = "Method was not set in request"
1549+ apirequest.set_method("NOT_FOUND")
1550+ if not apirequest.has_request():
1551+ errcode = datastore_pb.Error.BAD_REQUEST
1552+ errdetail = "Request missing in call"
1553+ apirequest.set_method("NOT_FOUND")
1554+ apirequest.clear_request()
1555+ method = apirequest.method()
1556+ request_data = apirequest.request()
1557+ http_request_data = request_data.contents()
1558+
1559+ #print "REQUEST:",method," AT time",time.time()
1560+ if method == "Put":
1561+ response, errcode, errdetail = self.put_request(app_id,
1562+ appscale_version,
1563+ http_request_data)
1564+ elif method == "Get":
1565+ response, errcode, errdetail = self.get_request(app_id,
1566+ appscale_version,
1567+ http_request_data)
1568+ elif method == "Delete":
1569+ response, errcode, errdetail = self.delete_request(app_id,
1570+ appscale_version,
1571+ http_request_data)
1572+ elif method == "RunQuery":
1573+ response, errcode, errdetail = self.run_query(app_id,
1574+ appscale_version,
1575+ http_request_data)
1576+ elif method == "BeginTransaction":
1577+ response, errcode, errdetail = self.begin_transaction_request(app_id,
1578+ appscale_version,
1579+ http_request_data)
1580+ elif method == "Commit":
1581+ response, errcode, errdetail = self.commit_transaction_request(app_id,
1582+ appscale_version,
1583+ http_request_data)
1584+ elif method == "Rollback":
1585+ response, errcode, errdetail = self.rollback_transaction_request(app_id,
1586+ appscale_version,
1587+ http_request_data)
1588+ elif method == "AllocateIds":
1589+ response, errcode, errdetail = self.allocate_ids_request(app_id,
1590+ appscale_version,
1591+ http_request_data)
1592+ elif method == "CreateIndex":
1593+ errcode = datastore_pb.Error.PERMISSION_DENIED
1594+ errdetail = "Create Index is not implemented"
1595+ logger.debug(errdetail)
1596+ """
1597+ response, errcode, errdetail = self.create_index_request(app_id,
1598+ appscale_version,
1599+ http_request_data)
1600+ """
1601+ elif method == "GetIndices":
1602+ errcode = datastore_pb.Error.PERMISSION_DENIED
1603+ errdetail = "GetIndices is not implemented"
1604+ logger.debug(errdetail)
1605+ """
1606+ response, errcode, errdetail = self.get_indices_request(app_id,
1607+ appscale_version,
1608+ http_request_data)
1609+ """
1610+ elif method == "UpdateIndex":
1611+ errcode = datastore_pb.Error.PERMISSION_DENIED
1612+ errdetail = "UpdateIndex is not implemented"
1613+ logger.debug(errdetail)
1614+ """
1615+ response, errcode, errdetail = self.update_index_request(app_id,
1616+ appscale_version,
1617+ http_request_data)
1618+ """
1619+ elif method == "DeleteIndex":
1620+ errcode = datastore_pb.Error.PERMISSION_DENIED
1621+ errdetail = "DeleteIndex is not implemented"
1622+ logger.debug(errdetail)
1623+
1624+ """
1625+ response, errcode, errdetail = self.delete_index_request(app_id,
1626+ appscale_version,
1627+ http_request_data)
1628+ """
1629+ else:
1630+ errcode = datastore_pb.Error.BAD_REQUEST
1631+ errdetail = "Unknown datastore message"
1632+ logger.debug(errdetail)
1633+
1634+
1635+ rawmessage = apiresponse.mutable_response()
1636+ if response:
1637+ rawmessage.set_contents(response)
1638+
1639+ if errcode != 0:
1640+ apperror_pb = apiresponse.mutable_application_error()
1641+ apperror_pb.set_code(errcode)
1642+ apperror_pb.set_detail(errdetail)
1643+ if errcode != 0:
1644+ print "REPLY",method," AT TIME",time.time()
1645+ print "errcode:",errcode
1646+ print "errdetail:",errdetail
1647+ self.write(apiresponse.Encode() )
1648+
1649+
1650+ def run_query(self, app_id, appscale_version, http_request_data):
1651+ global app_datastore
1652+ query = datastore_pb.Query(http_request_data)
1653+ logger.debug("QUERY:%s" % query)
1654+ results = []
1655+
1656+ if not query.has_kind():
1657+ # Return nothing in case of error #
1658+ return (api_base_pb.VoidProto().Encode(),
1659+ datastore_pb.Error.PERMISSION_DENIED,
1660+ "Kindless queries are not implemented.")
1661+ else:
1662+ kind = query.kind()
1663+ #print "Query kind:",kind
1664+
1665+ # Verify validity of the entity name and applicaiton id #
1666+ # according to the naming sheme for entity tables #
1667+ #assert kind[-2:] != "__"
1668+ #assert app_id[-1] != "_"
1669+
1670+ # Fetch query from the datastore #
1671+ table_name = getTableName(app_id, kind, appscale_version)
1672+ #print "Query using table name:",table_name
1673+ if query.has_transaction():
1674+ txn = query.transaction()
1675+ r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA, txn.handle())
1676+ else:
1677+ r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA)
1678+ #logger.debug("result: %s" % r)
1679+
1680+ #err = r[0]
1681+ if len(r) > 1:
1682+ results = r[1:]
1683+ else:
1684+ results = []
1685+
1686+ # odds are versions
1687+ versions = results[1::2]
1688+ # evens are encoded entities
1689+ results = results[0::2]
1690+ #print "RESULTS:",results
1691+ #print "VERSIONS:",versions
1692+ if len(versions) != len(results):
1693+ return(api_base_pb.VoidProto().Encode(),
1694+ datastore_pb.Error.INTERNAL_ERROR,
1695+ 'The query had a bad number of results.')
1696+
1697+ # convert to objects
1698+ # Unless its marked as deleted
1699+ # They are currently strings
1700+ for index, res in enumerate(results):
1701+ results[index] = entity_pb.EntityProto(res)
1702+ results[index] = datastore.Entity._FromPb(results[index])
1703+
1704+ logger.debug("====results pre filter====")
1705+ #logger.debug("%s" % results)
1706+ if query.has_ancestor():
1707+ ancestor_path = query.ancestor().path().element_list()
1708+ def is_descendant(entity):
1709+ path = entity.key()._Key__reference.path().element_list()
1710+ return path[:len(ancestor_path)] == ancestor_path
1711+ results = filter(is_descendant, results)
1712+
1713+ operators = {datastore_pb.Query_Filter.LESS_THAN: '<',
1714+ datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: '<=',
1715+ datastore_pb.Query_Filter.GREATER_THAN: '>',
1716+ datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=',
1717+ datastore_pb.Query_Filter.EQUAL: '==',
1718+ }
1719+
1720+ for filt in query.filter_list():
1721+ assert filt.op() != datastore_pb.Query_Filter.IN
1722+
1723+ prop = filt.property(0).name().decode('utf-8')
1724+ op = operators[filt.op()]
1725+
1726+ def passes(entity):
1727+ """ Returns True if the entity passes the filter, False otherwise. """
1728+ entity_vals = entity.get(prop, [])
1729+ if type(entity_vals) != types.ListType:
1730+ entity_vals = [entity_vals]
1731+
1732+ entity_property_list = [datastore_types.ToPropertyPb(prop, value) for value in entity_vals]
1733+
1734+ for entity_prop in entity_property_list:
1735+ fixed_entity_val = datastore_types.FromPropertyPb(entity_prop)
1736+
1737+ for filter_prop in filt.property_list():
1738+ filter_val = datastore_types.FromPropertyPb(filter_prop)
1739+ comp = u'%r %s %r' % (fixed_entity_val, op, filter_val)
1740+ logger.debug('Evaling filter expression "%s"' % comp )
1741+ if eval(comp):
1742+ return True
1743+ return False
1744+
1745+ results = filter(passes, results)
1746+
1747+ for order in query.order_list():
1748+ prop = order.property().decode('utf-8')
1749+ results = [entity for entity in results if prop in entity]
1750+
1751+ def order_compare(a, b):
1752+ """ Return a negative, zero or positive number depending on whether
1753+ entity a is considered smaller than, equal to, or larger than b,
1754+ according to the query's orderings. """
1755+ for o in query.order_list():
1756+ prop = o.property().decode('utf-8')
1757+
1758+ a_values = a[prop]
1759+ if not isinstance(a_values, types.ListType):
1760+ a_values = [a_values]
1761+
1762+ b_values = b[prop]
1763+ if not isinstance(b_values, types.ListType):
1764+ b_values = [b_values]
1765+
1766+ cmped = cmp(min(a_values), min(b_values))
1767+
1768+ if o.direction() == datastore_pb.Query_Order.DESCENDING:
1769+ cmped = -cmped
1770+
1771+ if cmped != 0:
1772+ return cmped
1773+
1774+ return 0
1775+
1776+ results.sort(order_compare)
1777+
1778+ if query.has_limit():
1779+ results = results[:query.limit()]
1780+ logger.debug("****results after filtering:****")
1781+ logger.debug("%s" % results)
1782+
1783+ results = [ent._ToPb() for ent in results]
1784+ # Pack Results into a clone of QueryResult #
1785+ clone_qr_pb = datastore_pb.QueryResult()
1786+ for res in results:
1787+ clone_qr_pb.add_result()
1788+ clone_qr_pb.result_[-1] = res
1789+
1790+ clone_qr_pb.clear_cursor()
1791+ clone_qr_pb.set_more_results( len(results)>0 )
1792+
1793+ logger.debug("QUERY_RESULT: %s" % clone_qr_pb)
1794+ return (clone_qr_pb.Encode(), 0, "")
1795+
1796+
1797+ def begin_transaction_request(self, app_id, appscale_version, http_request_data):
1798+ transaction_pb = datastore_pb.Transaction()
1799+ handle = generate_unique_id(app_id, None, None)
1800+ #print "Begin Trans Handle:",handle
1801+ transaction_pb.set_handle(handle)
1802+ app_datastore.setupTransaction(handle)
1803+ return (transaction_pb.Encode(), 0, "")
1804+
1805+ def commit_transaction_request(self, app_id, appscale_version, http_request_data):
1806+ transaction_pb = datastore_pb.Transaction(http_request_data)
1807+ handle = transaction_pb.handle()
1808+ commitres_pb = datastore_pb.CommitResponse()
1809+ try:
1810+ app_datastore.commit(handle)
1811+ except:
1812+ return (commitres_pb.Encode(), datastore_pb.Error.PERMISSION_DENIED, "Unable to commit for this transaction")
1813+ return (commitres_pb.Encode(), 0, "")
1814+
1815+ def rollback_transaction_request(self, app_id, appscale_version, http_request_data):
1816+ transaction_pb = datastore_pb.Transaction(http_request_data)
1817+ handle = transaction_pb.handle()
1818+ try:
1819+ app_datastore.rollback(handle)
1820+ except:
1821+ return(api_base_pb.VoidProto().Encode(), datastore_pb.Error.PERMISSION_DENIED, "Unable to rollback for this transaction")
1822+ return (api_base_pb.VoidProto().Encode(), 0, "")
1823+
1824+
1825+ def allocate_ids_request(self, app_id, appscale_version, http_request_data):
1826+ return (api_base_pb.VoidProto().Encode(),
1827+ datastore_pb.Error.PERMISSION_DENIED,
1828+ 'Allocation of block ids not implemented.')
1829+
1830+
1831+ # Returns Null on error
1832+ def getRootKeyFromEntity(self, app_id, entity):
1833+ key = entity.key()
1834+ if str(key.__class__) == "google.appengine.datastore.entity_pb.Reference":
1835+ return getRootKeyFromRef(app_id, key)
1836+ else:
1837+ return getRootKeyFromKeyType(app_id, key)
1838+
1839+
1840+ # For transactions
1841+ # Verifies all puts are apart of the same root entity
1842+ def getRootKeyFromTransPut(self, app_id, putreq_pb):
1843+ ent_list = []
1844+ if putreq_pb.entity_size() > 0:
1845+ ent_list = putreq_pb.entity_list()
1846+ first_ent = ent_list[0]
1847+ expected_root = self.getRootKeyFromEntity(app_id, first_ent)
1848+ # It is possible that all roots are None
1849+ # because it is a root that has not gotten a uid
1850+
1851+ for e in ent_list:
1852+ root = self.getRootKeyFromEntity(app_id, e)
1853+ if root != expected_root:
1854+ errcode = datastore_pb.Error.BAD_REQUEST
1855+ errdetail = "All puts must be a part of the same group"
1856+ return (None, errcode, errdetail)
1857+
1858+ return (expected_root, 0, "")
1859+
1860+
1861+ # For transactions
1862+ # Verifies all puts are apart of the same root entity
1863+ def getRootKeyFromTransReq(self, app_id, req_pb):
1864+ if req_pb.key_size() <= 0:
1865+ errcode = datastore_pb.error.bad_request
1866+ errdetail = "Bad key listing"
1867+ return (None, errcode, errdetail)
1868+
1869+ key_list = req_pb.key_list()
1870+ first_key = key_list[0]
1871+
1872+ expected_root = getRootKeyFromRef(app_id, first_key)
1873+ # It is possible that all roots are None
1874+ # because it is a root that has not gotten a uid
1875+
1876+ for k in key_list:
1877+ root = getRootKeyFromRef(app_id, k)
1878+ if root != expected_root:
1879+ errcode = datastore_pb.error.bad_request
1880+ errdetail = "all transaction gets must be a part of the same group"
1881+ return (None, errcode, errdetail)
1882+
1883+ return (expected_root, 0, "")
1884+
1885+ def getRootKeyFromTransGet(self, app_id, get_pb):
1886+ return self.getRootKeyFromTransReq(app_id, get_pb)
1887+
1888+ def getRootKeyFromTransDel(self, app_id, del_pb):
1889+ return self.getRootKeyFromTransReq(app_id, del_pb)
1890+
1891+ def put_request(self, app_id, appscale_version, http_request_data):
1892+ global app_datastore
1893+ global keySecret
1894+ global tableHashTable
1895+
1896+ field_name_list = []
1897+ field_value_list = []
1898+
1899+ start_time = time.time()
1900+ putreq_pb = datastore_pb.PutRequest(http_request_data)
1901+ logger.debug("RECEIVED PUT_REQUEST %s" % putreq_pb)
1902+ putresp_pb = datastore_pb.PutResponse( )
1903+ txn = None
1904+ root_key = None
1905+ # Must assign an id if a put is being done in a transaction
1906+ # and it does not have an id and it is a root
1907+ for e in putreq_pb.entity_list():
1908+ # Only dealing with root puts
1909+ if e.key().path().element_size() == 1:
1910+ root_path = e.key().path().mutable_element(0)
1911+ #print "has id:",root_path.has_id(), "has name:",root_path.has_name()
1912+ if root_path.id() == 0 and not root_path.has_name():
1913+ #new_key = root_key + "/" + last_path.type()
1914+ uid = generate_unique_id(app_id, None, None)
1915+ #print "Assigned uid to new root key:",str(uid)
1916+ if uid <= 0:
1917+ return (putresp_pb.Encode(),
1918+ datastore_pb.Error.INTERNAL_ERROR,
1919+ 'Unable to assign a unique id')
1920+ root_path.set_id(uid)
1921+
1922+ # Gather data from Put Request #
1923+ #print "Entity list for put:"
1924+ #print putreq_pb.entity_list()
1925+ for e in putreq_pb.entity_list():
1926+
1927+ for prop in e.property_list() + e.raw_property_list():
1928+ if prop.value().has_uservalue():
1929+ obuid = md5.new(prop.value().uservalue().email().lower()).digest()
1930+ obuid = '1' + ''.join(['%02d' % ord(x) for x in obuid])[:20]
1931+ prop.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(
1932+ obuid)
1933+
1934+ #################################
1935+ # Key Assignment for new entities
1936+ #################################
1937+ e.mutable_key().set_app(app_id)
1938+
1939+ root_type = e.key().path().element(0).type()
1940+ if root_type[-2:] == "__":
1941+ return(putresp_pb.Encode(),
1942+ datastore_pb.Error.PERMISSION_DENIED,
1943+ "Illegal type name contains reserved delimiters \"__\"")
1944+
1945+ last_path = e.key().path().element_list()[-1]
1946+ uid = last_path.id()
1947+ kind = last_path.type()
1948+ # this object has no assigned id thus far
1949+ if last_path.id() == 0 and not last_path.has_name():
1950+ if e.key().path().element_size() == 1:
1951+ root_key = None
1952+ if root_key:
1953+ child_key = root_key + "/" + last_path.type()
1954+ else:
1955+ child_key = None
1956+ # if the root is None or the child is None,
1957+ # then the global counter is used
1958+ # gen unique id only wants to know if a child exist
1959+ uid = generate_unique_id(app_id, root_key, child_key)
1960+ if uid <= 0:
1961+ return(putresp_pb.Encode(),
1962+ datastore_pb.Error.INTERNAL_ERROR,
1963+ "Unable to assign id to entity")
1964+ last_path.set_id(uid)
1965+ # It may be its own parent
1966+ group = e.mutable_entity_group()
1967+ root = e.key().path().element(0)
1968+ group.add_element().CopyFrom(root)
1969+ if last_path.has_name():
1970+ uid = last_path.name()
1971+ # It may be its own parent
1972+ group = e.mutable_entity_group()
1973+ if group.element_size() == 0:
1974+ root = e.key().path().element(0)
1975+ group.add_element().CopyFrom(root)
1976+
1977+ #######################################
1978+ # Done with key assignment
1979+ # Notify Soap Server of any new tables
1980+ #######################################
1981+ #print "Putting of type:",kind,"with uid of",str(uid)
1982+ # insert key
1983+ table_name = getTableName(app_id, kind, appscale_version)
1984+ #print "Put Using table name:",table_name
1985+ # Notify Users/Apps table if a new class is being added
1986+ if table_name not in tableHashTable:
1987+ # This is the first time this pbserver has seen this table
1988+ # Notify the User/Apps server via soap call
1989+ # This function is reentrant
1990+ # If the class was deleted, and added a second time there is no
1991+ # notifying the users/app server of its creation
1992+ if tableServer.add_class(app_id, kind, keySecret) == "true":
1993+ tableHashTable[table_name] = 1
1994+
1995+ # Store One Entity #
1996+ logger.debug("put: %s___%s___%s with id: %s" % (app_id,
1997+ kind,
1998+ appscale_version,
1999+ str(uid)))
2000+
2001+ row_key = getRowKey(app_id, e.key().path().element_list())
2002+ inter_time = time.time()
2003+ logger.debug("Time spent in put before datastore call: " + str(inter_time - start_time))
2004+
2005+
2006+ field_name_list = ENTITY_TABLE_SCHEMA
2007+ field_value_list = [e.Encode(), NONEXISTANT_TRANSACTION]
2008+ if putreq_pb.has_transaction():
2009+ txn = putreq_pb.transaction()
2010+ err, res = app_datastore.put_entity( table_name,
2011+ row_key,
2012+ field_name_list,
2013+ field_value_list,
2014+ txn.handle())
2015+ else:
2016+ err, res = app_datastore.put_entity( table_name,
2017+ row_key,
2018+ field_name_list,
2019+ field_value_list)
2020+
2021+ if err not in ERROR_CODES:
2022+ #_trans_set.purge(txn)
2023+ return (putresp_pb.Encode(),
2024+ datastore_pb.Error.INTERNAL_ERROR,
2025+ err)
2026+
2027+ putresp_pb.key_list().append(e.key())
2028+
2029+ inter_time = time.time()
2030+ logger.debug("Time spent in put after datastore call: " + str(inter_time - start_time))
2031+ logger.debug( "PUT_RESPONSE:%s" % putresp_pb)
2032+ return (putresp_pb.Encode(), 0, "")
2033+
2034+
2035+ def get_request(self, app_id, appscale_version, http_request_data):
2036+ global app_datastore
2037+ getreq_pb = datastore_pb.GetRequest(http_request_data)
2038+ logger.debug("GET_REQUEST: %s" % getreq_pb)
2039+ getresp_pb = datastore_pb.GetResponse()
2040+
2041+
2042+ for key in getreq_pb.key_list():
2043+ key.set_app(app_id)
2044+ last_path = key.path().element_list()[-1]
2045+
2046+ if last_path.has_id():
2047+ entity_id = last_path.id()
2048+
2049+ if last_path.has_name():
2050+ entity_id = last_path.name()
2051+
2052+ if last_path.has_type():
2053+ kind = last_path.type()
2054+ logger.debug("get: %s___%s___%s %s" % (app_id, kind, appscale_version, str(entity_id)))
2055+ table_name = getTableName(app_id, kind, appscale_version)
2056+ row_key = getRowKey(app_id,key.path().element_list())
2057+ #print "Get row key:",row_key
2058+ if getreq_pb.has_transaction():
2059+ txn = getreq_pb.transaction()
2060+ r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA, txn.handle())
2061+ else:
2062+ r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA )
2063+ err = r[0]
2064+ if err not in ERROR_CODES or len(r) != 3:
2065+ r = ["",None,NONEXISTANT_TRANSACTION]
2066+ print err
2067+ entity = r[1]
2068+ prev_version = long(r[2])
2069+
2070+ group = getresp_pb.add_entity()
2071+ if entity:
2072+ e_pb = entity_pb.EntityProto( entity )
2073+ group.mutable_entity().CopyFrom(e_pb)
2074+
2075+ # Send Response #
2076+ #print getresp_pb
2077+ logger.debug("GET_RESPONSE: %s" % getresp_pb)
2078+ return (getresp_pb.Encode(), 0, "")
2079+
2080+ """ Deletes are just PUTs using a sentinal value of DELETED
2081+ All deleted keys are DELETED/entity_group. This is for
2082+ rollback to know which entity group a possible failed
2083+ transaction belongs to.
2084+ """
2085+ def delete_request(self, app_id, appscale_version, http_request_data):
2086+ global app_datastore
2087+ root_key = None
2088+ txn = None
2089+ logger.debug("DeleteRequest Received...")
2090+ delreq_pb = datastore_pb.DeleteRequest( http_request_data )
2091+ logger.debug("DELETE_REQUEST: %s" % delreq_pb)
2092+ delresp_pb = api_base_pb.VoidProto()
2093+
2094+ for key in delreq_pb.key_list():
2095+ key.set_app(app_id)
2096+ last_path = key.path().element_list()[-1]
2097+ if last_path.has_type():
2098+ kind = last_path.type()
2099+
2100+ row_key = getRowKey(app_id, key.path().element_list())
2101+
2102+
2103+ table_name = getTableName(app_id, kind, appscale_version)
2104+ if delreq_pb.has_transaction():
2105+ txn = delreq_pb.transaction()
2106+ res = app_datastore.delete_row( table_name, row_key, txn.handle())
2107+ else:
2108+ res = app_datastore.delete_row( table_name,
2109+ row_key)
2110+ err = res[0]
2111+ logger.debug("Response from DB for delete request %s" % err)
2112+ if err not in ERROR_CODES:
2113+ if DEBUG: print err
2114+ return (delresp_pb.Encode(),
2115+ datastore_pb.Error.INTERNAL_ERROR,
2116+ err + ", Unable to delete row")
2117+
2118+ return (delresp_pb.Encode(), 0, "")
2119+
2120+
2121+ def optimized_delete_request(self, app_id, appscale_version, http_request_data):
2122+ pass
2123+ def run_optimized_query(self, app_id, appscale_version, http_request_data):
2124+ return
2125+ def optimized_put_request(self, app_id, appscale_version, http_request_data):
2126+ pass
2127+
2128+ def void_proto(self, app_id, appscale_version, http_request_data):
2129+ resp_pb = api_base_pb.VoidProto()
2130+ print "Got void"
2131+ logger.debug("VOID_RESPONSE: %s to void" % resp_pb)
2132+ return (resp_pb.Encode(), 0, "" )
2133+
2134+ def str_proto(self, app_id, appscale_version, http_request_data):
2135+ str_pb = api_base_pb.StringProto( http_request_data )
2136+ composite_pb = datastore_pb.CompositeIndices()
2137+ print "Got a string proto"
2138+ print str_pb
2139+ logger.debug("String proto received: %s"%str_pb)
2140+ logger.debug("CompositeIndex response to string: %s" % composite_pb)
2141+ return (composite_pb.Encode(), 0, "" )
2142+
2143+ def int64_proto(self, app_id, appscale_version, http_request_data):
2144+ int64_pb = api_base_pb.Integer64Proto( http_request_data )
2145+ resp_pb = api_base_pb.VoidProto()
2146+ print "Got a int 64"
2147+ print int64_pb
2148+ logger.debug("Int64 proto received: %s"%int64_pb)
2149+ logger.debug("VOID_RESPONSE to int64: %s" % resp_pb)
2150+ return (resp_pb.Encode(), 0, "")
2151+
2152+ def compositeindex_proto(self, app_id, appscale_version, http_request_data):
2153+ compindex_pb = entity_pb.CompositeIndex( http_request_data)
2154+ resp_pb = api_base_pb.VoidProto()
2155+ print "Got Composite Index"
2156+ print compindex_pb
2157+ logger.debug("CompositeIndex proto recieved: %s"%str(compindex_pb))
2158+ logger.debug("VOID_RESPONSE to composite index: %s" % resp_pb)
2159+ return (resp_pb.Encode(), 0, "")
2160+
2161+# Returns 0 on success, 1 on failure
2162+ def create_index_tables(self, app_id):
2163+ global app_datastore
2164+ """table_name = "__" + app_id + "__" + "kind"
2165+ columns = ["reference"]
2166+ print "Building table: " + table_name
2167+ returned = app_datastore.create_table( table_name, columns )
2168+ err,res = returned
2169+ if err not in ERROR_CODES:
2170+ logger.debug("%s" % err)
2171+ return 1
2172+ """
2173+ table_name = "__" + app_id + "__" + "single_prop_asc"
2174+ print "Building table: " + table_name
2175+ columns = ["reference"]
2176+ returned = app_datastore.create_table( table_name, columns )
2177+ err,res = returned
2178+ if err not in ERROR_CODES:
2179+ logger.debug("%s" % err)
2180+ return 1
2181+
2182+ table_name = "__" + app_id + "__" + "single_prop_desc"
2183+ print "Building table: " + table_name
2184+ returned = app_datastore.create_table( table_name, columns )
2185+ err,res = returned
2186+ if err not in ERROR_CODES:
2187+ logger.debug("%s" % err)
2188+ return 1
2189+
2190+ table_name = "__" + app_id + "__" + "composite"
2191+ print "Building table: " + table_name
2192+ returned = app_datastore.create_table( table_name, columns )
2193+ err,res = returned
2194+ if err not in ERROR_CODES:
2195+ logger.debug("%s" % err)
2196+ return 1
2197+
2198+ return 0
2199+
2200+ ##############
2201+ # OTHER TYPE #
2202+ ##############
2203+ def unknown_request(self, app_id, appscale_version, http_request_data, pb_type):
2204+ logger.debug("Received Unknown Protocol Buffer %s" % pb_type )
2205+ print "ERROR: Received Unknown Protocol Buffer <" + pb_type +">.",
2206+ print "Nothing has been implemented to handle this Protocol Buffer type."
2207+ print "http request data:"
2208+ print http_request_data
2209+ print "http done"
2210+ self.void_proto(app_id, appscale_version, http_request_data)
2211+
2212+
2213+ #########################
2214+ # POST Request Handling #
2215+ #########################
2216+ @tornado.web.asynchronous
2217+ def post( self ):
2218+ request = self.request
2219+ http_request_data = request.body
2220+ pb_type = request.headers['protocolbuffertype']
2221+ app_data = request.headers['appdata']
2222+ app_data = app_data.split(':')
2223+
2224+ if len(app_data) == 5:
2225+ app_id, user_email, nick_name, auth_domain, appscale_version = app_data
2226+ os.environ['AUTH_DOMAIN'] = auth_domain
2227+ os.environ['USER_EMAIL'] = user_email
2228+ os.environ['USER_NICKNAME'] = nick_name
2229+ os.environ['APPLICATION_ID'] = app_id
2230+ elif len(app_data) == 4:
2231+ app_id, user_email, nick_name, auth_domain = app_data
2232+ os.environ['AUTH_DOMAIN'] = auth_domain
2233+ os.environ['USER_EMAIL'] = user_email
2234+ os.environ['USER_NICKNAME'] = nick_name
2235+ os.environ['APPLICATION_ID'] = app_id
2236+ appscale_version = "1"
2237+ elif len(app_data) == 2:
2238+ app_id, appscale_version = app_data
2239+ app_id = app_data[0]
2240+ os.environ['APPLICATION_ID'] = app_id
2241+ elif len(app_data) == 1:
2242+ app_id = app_data[0]
2243+ os.environ['APPLICATION_ID'] = app_id
2244+ appscale_version = "1"
2245+ else:
2246+ logger.debug("UNABLE TO EXTRACT APPLICATION DATA")
2247+ return
2248+
2249+ # Default HTTP Response Data #
2250+
2251+ if pb_type == "Request":
2252+ self.remote_request(app_id, appscale_version, http_request_data)
2253+ else:
2254+ self.unknown_request(app_id, appscale_version, http_request_data, pb_type)
2255+ self.finish()
2256+
2257+
2258+def usage():
2259+ print "AppScale Server"
2260+ print
2261+ print "Options:"
2262+ print "\t--certificate=<path-to-ssl-certificate>"
2263+ print "\t--a=<soap server hostname> "
2264+ print "\t--key for using keys from the soap server"
2265+ print "\t--type=<hypertable, hbase, cassandra, mysql, mongodb>"
2266+ print "\t--secret=<secrete to soap server>"
2267+ print "\t--blocksize=<key-block-size>"
2268+ print "\t--optimized_query"
2269+ print "\t--no_encryption"
2270+def main(argv):
2271+ global app_datastore
2272+ global getKeyFromServer
2273+ global tableServer
2274+ global keySecret
2275+ global logOn
2276+ global logFilePtr
2277+ global optimizedQuery
2278+ global soapServer
2279+ global ERROR_CODES
2280+ global VALID_DATASTORES
2281+ global KEYBLOCKSIZE
2282+ global zoo_keeper
2283+ cert_file = CERT_LOCATION
2284+ key_file = KEY_LOCATION
2285+ db_type = "hypertable"
2286+ port = DEFAULT_SSL_PORT
2287+ isEncrypted = True
2288+ try:
2289+ opts, args = getopt.getopt( argv, "c:t:l:s:b:a:k:p:o:n:z:",
2290+ ["certificate=",
2291+ "type=",
2292+ "log=",
2293+ "secret=",
2294+ "blocksize=",
2295+ "soap=",
2296+ "key",
2297+ "port",
2298+ "optimized_query",
2299+ "no_encryption",
2300+ "zoo_keeper"] )
2301+ except getopt.GetoptError:
2302+ usage()
2303+ sys.exit(1)
2304+ for opt, arg in opts:
2305+ if opt in ("-c", "--certificate"):
2306+ cert_file = arg
2307+ print "Using cert..."
2308+ elif opt in ("-k", "--key" ):
2309+ getKeyFromServer = True
2310+ print "Using key server..."
2311+ elif opt in ("-t", "--type"):
2312+ db_type = arg
2313+ print "Datastore type: ",db_type
2314+ elif opt in ("-s", "--secret"):
2315+ keySecret = arg
2316+ print "Secret set..."
2317+ elif opt in ("-l", "--log"):
2318+ logOn = True
2319+ logFile = arg
2320+ logFilePtr = open(logFile, "w")
2321+ logFilePtr.write("# type, app, start, end\n")
2322+ elif opt in ("-b", "--blocksize"):
2323+ KEYBLOCKSIZE = arg
2324+ print "Block size: ",KEYBLOCKSIZE
2325+ elif opt in ("-a", "--soap"):
2326+ soapServer = arg
2327+ elif opt in ("-o", "--optimized_query"):
2328+ optimizedQuery = True
2329+ elif opt in ("-p", "--port"):
2330+ port = int(arg)
2331+ elif opt in ("-n", "--no_encryption"):
2332+ isEncrypted = False
2333+ elif opt in ("-z", "--zoo_keeper"):
2334+ zoo_keeper_locations = arg
2335+
2336+ app_datastore = appscale_datastore.DatastoreFactory.getDatastore(db_type)
2337+ ERROR_CODES = appscale_datastore.DatastoreFactory.error_codes()
2338+ VALID_DATASTORES = appscale_datastore.DatastoreFactory.valid_datastores()
2339+ if DEBUG: print "ERROR_CODES:"
2340+ if DEBUG: print ERROR_CODES
2341+ if DEBUG: print "VALID_DATASTORE:"
2342+ if DEBUG: print VALID_DATASTORES
2343+ if db_type in VALID_DATASTORES:
2344+ logger.debug("Using datastore %s" % db_type)
2345+ else:
2346+ print "Unknown datastore "+ db_type
2347+ exit(1)
2348+
2349+ tableServer = SOAPpy.SOAPProxy("https://" + soapServer + ":" + str(keyPort))
2350+
2351+ global keyDictionaryLock
2352+ zoo_keeper = zktransaction.ZKTransaction(zoo_keeper_locations)
2353+
2354+ keyDictionaryLock = threading.Lock()
2355+ if port == DEFAULT_SSL_PORT and not isEncrypted:
2356+ port = DEFAULT_PORT
2357+ pb_application = tornado.web.Application([
2358+ (r"/*", MainHandler),
2359+ ])
2360+ server = tornado.httpserver.HTTPServer(pb_application)
2361+ server.listen(port)
2362+ if not db_type == "timesten":
2363+ # Stop running as root, security purposes #
2364+ drop_privileges()
2365+
2366+ while 1:
2367+ try:
2368+ # Start Server #
2369+ tornado.ioloop.IOLoop.instance().start()
2370+ except SSL.SSLError:
2371+ logger.debug("\n\nUnexcepted input for AppScale-Secure-Server")
2372+ except KeyboardInterrupt:
2373+ print "Server interrupted by user, terminating..."
2374+ exit(1)
2375+
2376+if __name__ == '__main__':
2377+ #cProfile.run("main(sys.argv[1:])")
2378+ main(sys.argv[1:])
2379
2380=== removed file 'AppDB/appscale_server_no_trans.py'
2381--- AppDB/appscale_server_no_trans.py 2010-05-11 02:07:26 +0000
2382+++ AppDB/appscale_server_no_trans.py 1970-01-01 00:00:00 +0000
2383@@ -1,1117 +0,0 @@
2384-#!/usr/bin/python
2385-#
2386-# Author:
2387-# Navraj Chohan (nchohan@cs.ucsb.edu)
2388-# Soo Hwan Park (suwanny@gmail.com)
2389-# Sydney Pang (pang@cs.ucsb.edu)
2390-# See LICENSE file
2391-
2392-import sys
2393-import socket
2394-import os
2395-import types
2396-import appscale_datastore
2397-#import helper_functions
2398-import SOAPpy
2399-from dbconstants import *
2400-import appscale_logger
2401-import md5
2402-import random
2403-import getopt
2404-from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
2405-from SocketServer import ThreadingMixIn
2406-import threading
2407-
2408-from google.appengine.api import api_base_pb
2409-from google.appengine.api import datastore
2410-from google.appengine.api import datastore_errors
2411-from google.appengine.api import datastore_types
2412-from google.appengine.api import users
2413-from google.appengine.datastore import datastore_pb
2414-from google.appengine.datastore import datastore_index
2415-from google.appengine.runtime import apiproxy_errors
2416-from google.net.proto import ProtocolBuffer
2417-from google.appengine.datastore import entity_pb
2418-from google.appengine.ext.remote_api import remote_api_pb
2419-from SocketServer import BaseServer
2420-from M2Crypto import SSL
2421-from drop_privileges import *
2422-
2423-import time
2424-
2425-DEBUG = False
2426-APP_TABLE = APPS_TABLE
2427-USER_TABLE = USERS_TABLE
2428-DEFAULT_USER_LOCATION = ".flatfile_users"
2429-DEFAULT_APP_LOCATION = ".flatfile_apps"
2430-HYPERTABLE_XML_TAG = "Name"
2431-DEFAULT_DATASTORE = "files"
2432-DEFAULT_SSL_PORT = 443
2433-DEFAULT_PORT = 4080
2434-DEFAULT_ENCRYPTION = 1
2435-CERT_LOCATION = "/etc/appscale/certs/mycert.pem"
2436-KEY_LOCATION = "/etc/appscale/certs/mykey.pem"
2437-SECRET_LOCATION = "/etc/appscale/secret.key"
2438-VALID_DATASTORES = []
2439-ERROR_CODES = []
2440-app_datastore = []
2441-logOn = False
2442-logFilePtr = ""
2443-
2444-getKeyFromServer = False
2445-soapServer = "localhost"
2446-tableServer = ""
2447-keyPort = 4343
2448-keySecret = ""
2449-KEYBLOCKSIZE = "50"
2450-keyDictionaryLock = None
2451-keyDictionary = {}
2452-keyStart = 0
2453-keyEnd = 0
2454-
2455-optimizedQuery = False
2456-ID_KEY_LENGTH = 64
2457-tableHashTable = {}
2458-
2459-local_server_address = ""
2460-HandlerClass = ""
2461-ssl_cert_file = ""
2462-ssl_key_file = ""
2463-
2464-DELETED = "DELETED___"
2465-"""
2466-Deleted keys are DELETED/<row_key>
2467-"""
2468-
2469-"""
2470-keys for tables take the format
2471-appname/Grandparent:<ID>/Parent:<ID>/Child:<ID>
2472-for the entity table
2473-"""
2474-
2475-
2476-class ThreadLogger:
2477- def __init__(self, log):
2478- self.logger_ = log
2479- self.log_lock = threading.Lock()
2480-
2481- def debug(self, string):
2482- return
2483- self.log_lock.acquire()
2484- print string
2485- self.logger_.info(string)
2486- self.log_lock.release()
2487-
2488-logger = appscale_logger.getLogger("pb_server")
2489-
2490-
2491-def getTableName(app_id, kind, version):
2492- return app_id + "___" + kind + "___" + version
2493-
2494-def getRowKey(app_id, ancestor_list):
2495- if ancestor_list == None:
2496- logger.debug("Generate row key received null ancestor list")
2497- return ""
2498-
2499- key = app_id
2500-
2501- # Note: mysql cannot have \ as the first char in the row key
2502- for a in ancestor_list:
2503- key += "/"
2504- if a.has_type():
2505- key += a.type()
2506-
2507- if a.has_id():
2508- zero_padded_id = ("0" * (ID_KEY_LENGTH - len(str(a.id())))) + str(a.id())
2509- key += ":" + zero_padded_id
2510- elif a.has_name():
2511- # append _ if the name is a number, prevents collisions of key names
2512- if a.name().isdigit():
2513- key += ":__key__" + a.name()
2514- else:
2515- key += ":" + a.name()
2516- return key
2517-
2518-
2519-def getRootKey(app_id, ancestor_list):
2520- key = app_id # mysql cannot have \ as the first char in the row key
2521- a = ancestor_list[0]
2522- key += "/"
2523-
2524- # append _ if the name is a number, prevents collisions of key names
2525- if a.has_type():
2526- key += a.type()
2527- else:
2528- return None
2529-
2530- if a.has_id():
2531- zero_padded_id = ("0" * (ID_KEY_LENGTH - len(str(a.id())))) + str(a.id())
2532- key += ":" + zero_padded_id
2533- elif a.has_name():
2534- if a.name().isdigit():
2535- key += ":__key__" + a.name()
2536- else:
2537- key += ":" + a.name()
2538- else:
2539- return None
2540-
2541- return key
2542-
2543-
2544-def getRootKeyFromKeyType(app_id, key):
2545- ancestor_list = key._Key__reference.path().element_list()
2546- return getRootKey(app_id, ancestor_list)
2547-
2548-
2549-def getRowKeyFromKeyType(app_id, key):
2550- ancestor_list = key._Key__reference.path().element_list()
2551- return getRowKey(app_id, ancestor_list)
2552-
2553-
2554-def getRootKeyFromRef(app_id, ref):
2555- if not ref.has_path():
2556- return False
2557- path = ref.path()
2558- element_list = path.element_list()
2559- return getRootKey(app_id, element_list)
2560-
2561-
2562-# Version must be an int
2563-def getJournalKey(key, version):
2564- key+="/"
2565- zero_padded_version = ("0" * (ID_KEY_LENGTH - len(str(version)))) + str(version)
2566- key += zero_padded_version
2567- return key
2568-
2569-
2570-def getJournalTable(app_id, appscale_version):
2571- return JOURNAL_TABLE + "___" + app_id + "___" + str(appscale_version)
2572-
2573-def generateIDFromServer(app_id):
2574- global keySecret
2575- global keyStart
2576- global keyEnd
2577- global tableServer
2578- key = 0
2579- if keyStart != keyEnd:
2580- if DEBUG: print "fast track key serving"
2581- if DEBUG: print "Key Start: " + str(keyStart)
2582- if DEBUG: print "Key End: " + str(keyEnd)
2583- key = int(keyStart)
2584- if DEBUG: print "key returned: " + str(key)
2585- keyStart = int(keyStart) + 1
2586- return key
2587- else:
2588- # Retrieve a new set of keys
2589- if DEBUG: print "slow track key serving"
2590- keyStart = tableServer.get_key_block(app_id, KEYBLOCKSIZE, keySecret)
2591- try:
2592- keyStart = int(keyStart)
2593- except:
2594- # value is not a number
2595- return -1
2596- keyEnd = keyStart + int(KEYBLOCKSIZE)
2597- if DEBUG: print "Key Start: " + str(keyStart)
2598- if DEBUG: print "Key End: " + str(keyEnd)
2599- key = keyStart
2600- keyStart = keyStart + 1
2601- if DEBUG: print "key returned: " + str(key)
2602- return key
2603-
2604-
2605-def rollback_function(app_id, trans_id, root_key, change_set):
2606- pass
2607-
2608-
2609-class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
2610- """ Handle requests in a new thread """
2611-
2612-
2613-class AppScaleSecureHandler( BaseHTTPRequestHandler ):
2614- """
2615- Defines what to do when the webserver receives different types of
2616- HTTP requests.
2617- """
2618- def get_http_err_code( self, message ):
2619- print message
2620- ret = 599 # 599 is the default code for an unknown error #
2621- return ret
2622-
2623- def send_post_http_response( self , http_code , message ):
2624- self.send_response( http_code )
2625- self.send_header( 'Content-type' , 'text/plain' )
2626- self.end_headers()
2627- self.wfile.write( message )
2628-
2629- def send_http_err_response( self, err_msg ):
2630- ret_http_code = self.get_http_err_code( err_msg )
2631- self.send_post_http_response( ret_http_code, err_msg )
2632-
2633- # remote api request
2634- # sends back a response
2635- def remote_request(self, app_id, appscale_version, http_request_data):
2636- apirequest = remote_api_pb.Request(http_request_data)
2637- apiresponse = remote_api_pb.Response()
2638- response = None
2639- errcode = 0
2640- errdetail = ""
2641- apperror_pb = None
2642-
2643- if not apirequest.has_method():
2644- errcode = datastore_pb.Error.BAD_REQUEST
2645- errdetail = "Method was not set in request"
2646- apirequest.set_method("NOT_FOUND")
2647- if not apirequest.has_request():
2648- errcode = datastore_pb.Error.BAD_REQUEST
2649- errdetail = "Request missing in call"
2650- apirequest.set_method("NOT_FOUND")
2651- apirequest.clear_request()
2652- method = apirequest.method()
2653- request_data = apirequest.request()
2654- http_request_data = request_data.contents()
2655-
2656- print "REQUEST:",method," AT time",time.time()
2657- if method == "Put":
2658- response, errcode, errdetail = self.put_request(app_id,
2659- appscale_version,
2660- http_request_data)
2661- elif method == "Get":
2662- response, errcode, errdetail = self.get_request(app_id,
2663- appscale_version,
2664- http_request_data)
2665- elif method == "Delete":
2666- response, errcode, errdetail = self.delete_request(app_id,
2667- appscale_version,
2668- http_request_data)
2669- elif method == "RunQuery":
2670- response, errcode, errdetail = self.run_query(app_id,
2671- appscale_version,
2672- http_request_data)
2673- elif method == "BeginTransaction":
2674- response, errcode, errdetail = self.begin_transaction_request(app_id,
2675- appscale_version,
2676- http_request_data)
2677- elif method == "Commit":
2678- response, errcode, errdetail = self.commit_transaction_request(app_id,
2679- appscale_version,
2680- http_request_data)
2681- elif method == "Rollback":
2682- response, errcode, errdetail = self.rollback_transaction_request(app_id,
2683- appscale_version,
2684- http_request_data)
2685- elif method == "AllocateIds":
2686- response, errcode, errdetail = self.allocate_ids_request(app_id,
2687- appscale_version,
2688- http_request_data)
2689- elif method == "CreateIndex":
2690- errcode = datastore_pb.Error.PERMISSION_DENIED
2691- errdetail = "Create Index is not implemented"
2692- logger.debug(errdetail)
2693- """
2694- response, errcode, errdetail = self.create_index_request(app_id,
2695- appscale_version,
2696- http_request_data)
2697- """
2698- elif method == "GetIndices":
2699- errcode = datastore_pb.Error.PERMISSION_DENIED
2700- errdetail = "GetIndices is not implemented"
2701- logger.debug(errdetail)
2702- """
2703- response, errcode, errdetail = self.get_indices_request(app_id,
2704- appscale_version,
2705- http_request_data)
2706- """
2707- elif method == "UpdateIndex":
2708- errcode = datastore_pb.Error.PERMISSION_DENIED
2709- errdetail = "UpdateIndex is not implemented"
2710- logger.debug(errdetail)
2711- """
2712- response, errcode, errdetail = self.update_index_request(app_id,
2713- appscale_version,
2714- http_request_data)
2715- """
2716- elif method == "DeleteIndex":
2717- errcode = datastore_pb.Error.PERMISSION_DENIED
2718- errdetail = "DeleteIndex is not implemented"
2719- logger.debug(errdetail)
2720-
2721- """
2722- response, errcode, errdetail = self.delete_index_request(app_id,
2723- appscale_version,
2724- http_request_data)
2725- """
2726- else:
2727- errcode = datastore_pb.Error.BAD_REQUEST
2728- errdetail = "Unknown datastore message"
2729- logger.debug(errdetail)
2730-
2731-
2732- rawmessage = apiresponse.mutable_response()
2733- if response:
2734- rawmessage.set_contents(response)
2735-
2736- if errcode != 0:
2737- apperror_pb = apiresponse.mutable_application_error()
2738- apperror_pb.set_code(errcode)
2739- apperror_pb.set_detail(errdetail)
2740- if errcode != 0:
2741- print "REPLY",method," AT TIME",time.time()
2742- print "errcode:",errcode
2743- print "errdetail:",errdetail
2744- self.send_post_http_response( 200 , apiresponse.Encode() )
2745-
2746-
2747- def run_query(self, app_id, appscale_version, http_request_data):
2748- global app_datastore
2749- query = datastore_pb.Query(http_request_data)
2750- logger.debug("QUERY:%s" % query)
2751- results = []
2752-
2753- if not query.has_kind():
2754- # Return nothing in case of error #
2755- return (api_base_pb.VoidProto().Encode(),
2756- datastore_pb.Error.PERMISSION_DENIED,
2757- "Kindless queries are not implemented.")
2758- else:
2759- kind = query.kind()
2760- #print "Query kind:",kind
2761-
2762- # Verify validity of the entity name and applicaiton id #
2763- # according to the naming sheme for entity tables #
2764- #assert kind[-2:] != "__"
2765- #assert app_id[-1] != "_"
2766-
2767- # Fetch query from the datastore #
2768- table_name = getTableName(app_id, kind, appscale_version)
2769- #print "Query using table name:",table_name
2770- r = app_datastore.get_table( table_name, ENTITY_TABLE_SCHEMA)
2771- #logger.debug("result: %s" % r)
2772-
2773- #err = r[0]
2774- if len(r) > 1:
2775- results = r[1:]
2776- else:
2777- results = []
2778-
2779- # odds are versions
2780- versions = results[1::2]
2781- # evens are encoded entities
2782- results = results[0::2]
2783- #print "RESULTS:",results
2784- #print "VERSIONS:",versions
2785- if len(versions) != len(results):
2786- return(api_base_pb.VoidProto().Encode(),
2787- datastore_pb.Error.INTERNAL_ERROR,
2788- 'The query had a bad number of results.')
2789-
2790- # convert to objects
2791- # Unless its marked as deleted
2792- # They are currently strings
2793- for index, res in enumerate(results):
2794- results[index] = entity_pb.EntityProto(res)
2795- results[index] = datastore.Entity._FromPb(results[index])
2796-
2797- logger.debug("====results pre filter====")
2798- #logger.debug("%s" % results)
2799- if query.has_ancestor():
2800- ancestor_path = query.ancestor().path().element_list()
2801- def is_descendant(entity):
2802- path = entity.key()._Key__reference.path().element_list()
2803- return path[:len(ancestor_path)] == ancestor_path
2804- results = filter(is_descendant, results)
2805-
2806- operators = {datastore_pb.Query_Filter.LESS_THAN: '<',
2807- datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: '<=',
2808- datastore_pb.Query_Filter.GREATER_THAN: '>',
2809- datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=',
2810- datastore_pb.Query_Filter.EQUAL: '==',
2811- }
2812-
2813- for filt in query.filter_list():
2814- assert filt.op() != datastore_pb.Query_Filter.IN
2815-
2816- prop = filt.property(0).name().decode('utf-8')
2817- op = operators[filt.op()]
2818-
2819- def passes(entity):
2820- """ Returns True if the entity passes the filter, False otherwise. """
2821- entity_vals = entity.get(prop, [])
2822- if type(entity_vals) != types.ListType:
2823- entity_vals = [entity_vals]
2824-
2825- entity_property_list = [datastore_types.ToPropertyPb(prop, value) for value in entity_vals]
2826-
2827- for entity_prop in entity_property_list:
2828- fixed_entity_val = datastore_types.FromPropertyPb(entity_prop)
2829-
2830- for filter_prop in filt.property_list():
2831- filter_val = datastore_types.FromPropertyPb(filter_prop)
2832- comp = u'%r %s %r' % (fixed_entity_val, op, filter_val)
2833- logger.debug('Evaling filter expression "%s"' % comp )
2834- if eval(comp):
2835- return True
2836- return False
2837-
2838- results = filter(passes, results)
2839-
2840- for order in query.order_list():
2841- prop = order.property().decode('utf-8')
2842- results = [entity for entity in results if prop in entity]
2843-
2844- def order_compare(a, b):
2845- """ Return a negative, zero or positive number depending on whether
2846- entity a is considered smaller than, equal to, or larger than b,
2847- according to the query's orderings. """
2848- for o in query.order_list():
2849- prop = o.property().decode('utf-8')
2850-
2851- a_values = a[prop]
2852- if not isinstance(a_values, types.ListType):
2853- a_values = [a_values]
2854-
2855- b_values = b[prop]
2856- if not isinstance(b_values, types.ListType):
2857- b_values = [b_values]
2858-
2859- cmped = cmp(min(a_values), min(b_values))
2860-
2861- if o.direction() == datastore_pb.Query_Order.DESCENDING:
2862- cmped = -cmped
2863-
2864- if cmped != 0:
2865- return cmped
2866-
2867- return 0
2868-
2869- results.sort(order_compare)
2870-
2871- if query.has_limit():
2872- results = results[:query.limit()]
2873- logger.debug("****results after filtering:****")
2874- logger.debug("%s" % results)
2875-
2876- results = [ent._ToPb() for ent in results]
2877- # Pack Results into a clone of QueryResult #
2878- clone_qr_pb = datastore_pb.QueryResult()
2879- for res in results:
2880- clone_qr_pb.add_result()
2881- clone_qr_pb.result_[-1] = res
2882-
2883- clone_qr_pb.clear_cursor()
2884- clone_qr_pb.set_more_results( len(results)>0 )
2885-
2886- logger.debug("QUERY_RESULT: %s" % clone_qr_pb)
2887- return (clone_qr_pb.Encode(), 0, "")
2888-
2889-
2890- def begin_transaction_request(self, app_id, appscale_version, http_request_data):
2891- transaction_pb = datastore_pb.Transaction()
2892- handle = random.randint(1,1000000)
2893- print "Begin Trans Handle:",handle
2894- transaction_pb.set_handle(handle)
2895- return (transaction_pb.Encode(), 0, "")
2896-
2897- def commit_transaction_request(self, app_id, appscale_version, http_request_data):
2898- commitres_pb = datastore_pb.CommitResponse()
2899- return (commitres_pb.Encode(), 0, "")
2900-
2901-
2902- def rollback_transaction_request(self, app_id, appscale_version, http_request_data):
2903- return (api_base_pb.VoidProto().Encode(), 0, "")
2904-
2905-
2906- def allocate_ids_request(self, app_id, appscale_version, http_request_data):
2907- return (api_base_pb.VoidProto().Encode(),
2908- datastore_pb.Error.PERMISSION_DENIED,
2909- 'Allocation of block ids not implemented.')
2910-
2911-
2912- # Returns Null on error
2913- def getRootKeyFromEntity(self, app_id, entity):
2914- key = entity.key()
2915- if str(key.__class__) == "google.appengine.datastore.entity_pb.Reference":
2916- return getRootKeyFromRef(app_id, key)
2917- else:
2918- return getRootKeyFromKeyType(app_id, key)
2919-
2920-
2921- # For transactions
2922- # Verifies all puts are apart of the same root entity
2923- def getRootKeyFromTransPut(self, app_id, putreq_pb):
2924- ent_list = []
2925- if putreq_pb.entity_size() > 0:
2926- ent_list = putreq_pb.entity_list()
2927- first_ent = ent_list[0]
2928- expected_root = self.getRootKeyFromEntity(app_id, first_ent)
2929- # It is possible that all roots are None
2930- # because it is a root that has not gotten a uid
2931-
2932- for e in ent_list:
2933- root = self.getRootKeyFromEntity(app_id, e)
2934- if root != expected_root:
2935- errcode = datastore_pb.Error.BAD_REQUEST
2936- errdetail = "All puts must be a part of the same group"
2937- return (None, errcode, errdetail)
2938-
2939- return (expected_root, 0, "")
2940-
2941-
2942- # For transactions
2943- # Verifies all puts are apart of the same root entity
2944- def getRootKeyFromTransReq(self, app_id, req_pb):
2945- if req_pb.key_size() <= 0:
2946- errcode = datastore_pb.error.bad_request
2947- errdetail = "Bad key listing"
2948- return (None, errcode, errdetail)
2949-
2950- key_list = req_pb.key_list()
2951- first_key = key_list[0]
2952-
2953- expected_root = getRootKeyFromRef(app_id, first_key)
2954- # It is possible that all roots are None
2955- # because it is a root that has not gotten a uid
2956-
2957- for k in key_list:
2958- root = getRootKeyFromRef(app_id, k)
2959- if root != expected_root:
2960- errcode = datastore_pb.error.bad_request
2961- errdetail = "all transaction gets must be a part of the same group"
2962- return (None, errcode, errdetail)
2963-
2964- return (expected_root, 0, "")
2965-
2966- def getRootKeyFromTransGet(self, app_id, get_pb):
2967- return self.getRootKeyFromTransReq(app_id, get_pb)
2968-
2969- def getRootKeyFromTransDel(self, app_id, del_pb):
2970- return self.getRootKeyFromTransReq(app_id, del_pb)
2971-
2972- def put_request(self, app_id, appscale_version, http_request_data):
2973- global app_datastore
2974- global keySecret
2975- global tableHashTable
2976-
2977- field_name_list = []
2978- field_value_list = []
2979-
2980- start_time = time.time()
2981- putreq_pb = datastore_pb.PutRequest(http_request_data)
2982- logger.debug("RECEIVED PUT_REQUEST %s" % putreq_pb)
2983- putresp_pb = datastore_pb.PutResponse( )
2984- txn = None
2985- root_key = None
2986- # Must assign an id if a put is being done in a transaction
2987- # and it does not have an id and it is a root
2988- for e in putreq_pb.entity_list():
2989- # Only dealing with root puts
2990- if e.key().path().element_size() == 1:
2991- root_path = e.key().path().mutable_element(0)
2992- #print "has id:",root_path.has_id(), "has name:",root_path.has_name()
2993- if root_path.id() == 0 and not root_path.has_name():
2994- #new_key = root_key + "/" + last_path.type()
2995- uid = generateIDFromServer(app_id)
2996- #print "Assigned uid to new root key:",str(uid)
2997- if uid <= 0:
2998- return (putresp_pb.Encode(),
2999- datastore_pb.Error.INTERNAL_ERROR,
3000- 'Unable to assign a unique id')
3001- root_path.set_id(uid)
3002-
3003- # Gather data from Put Request #
3004- #print "Entity list for put:"
3005- #print putreq_pb.entity_list()
3006- for e in putreq_pb.entity_list():
3007-
3008- for prop in e.property_list() + e.raw_property_list():
3009- if prop.value().has_uservalue():
3010- obuid = md5.new(prop.value().uservalue().email().lower()).digest()
3011- obuid = '1' + ''.join(['%02d' % ord(x) for x in obuid])[:20]
3012- prop.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(
3013- obuid)
3014-
3015- #################################
3016- # Key Assignment for new entities
3017- #################################
3018- e.mutable_key().set_app(app_id)
3019-
3020- root_type = e.key().path().element(0).type()
3021- if root_type[-2:] == "__":
3022- return(putresp_pb.Encode(),
3023- datastore_pb.Error.PERMISSION_DENIED,
3024- "Illegal type name contains reserved delimiters \"__\"")
3025-
3026- last_path = e.key().path().element_list()[-1]
3027- uid = last_path.id()
3028- kind = last_path.type()
3029- # this object has no assigned id thus far
3030- if last_path.id() == 0 and not last_path.has_name():
3031- if e.key().path().element_size() == 1:
3032- root_key = None
3033- if root_key:
3034- child_key = root_key + "/" + last_path.type()
3035- else:
3036- child_key = None
3037- # if the root is None or the child is None,
3038- # then the global counter is used
3039- # gen unique id only wants to know if a child exist
3040- uid = generateIDFromServer(app_id)
3041- if uid <= 0:
3042- return(putresp_pb.Encode(),
3043- datastore_pb.Error.INTERNAL_ERROR,
3044- "Unable to assign id to entity")
3045- last_path.set_id(uid)
3046- # It may be its own parent
3047- group = e.mutable_entity_group()
3048- root = e.key().path().element(0)
3049- group.add_element().CopyFrom(root)
3050- if last_path.has_name():
3051- uid = last_path.name()
3052- # It may be its own parent
3053- group = e.mutable_entity_group()
3054- if group.element_size() == 0:
3055- root = e.key().path().element(0)
3056- group.add_element().CopyFrom(root)
3057-
3058- #######################################
3059- # Done with key assignment
3060- # Notify Soap Server of any new tables
3061- #######################################
3062- #print "Putting of type:",kind,"with uid of",str(uid)
3063- # insert key
3064- table_name = getTableName(app_id, kind, appscale_version)
3065- #print "Put Using table name:",table_name
3066- # Notify Users/Apps table if a new class is being added
3067- if table_name not in tableHashTable:
3068- # This is the first time this pbserver has seen this table
3069- # Notify the User/Apps server via soap call
3070- # This function is reentrant
3071- # If the class was deleted, and added a second time there is no
3072- # notifying the users/app server of its creation
3073- if tableServer.add_class(app_id, kind, keySecret) == "true":
3074- tableHashTable[table_name] = 1
3075-
3076- # Store One Entity #
3077- logger.debug("put: %s___%s___%s with id: %s" % (app_id,
3078- kind,
3079- appscale_version,
3080- str(uid)))
3081-
3082- row_key = getRowKey(app_id, e.key().path().element_list())
3083- inter_time = time.time()
3084- logger.debug("Time spent in put before datastore call: " + str(inter_time - start_time))
3085-
3086-
3087- field_name_list = ENTITY_TABLE_SCHEMA
3088- field_value_list = [e.Encode(), NONEXISTANT_TRANSACTION]
3089- err, res = app_datastore.put_entity( table_name,
3090- row_key,
3091- field_name_list,
3092- field_value_list)
3093-
3094- if err not in ERROR_CODES:
3095- #_trans_set.purge(txn)
3096- return (putresp_pb.Encode(),
3097- datastore_pb.Error.INTERNAL_ERROR,
3098- err)
3099-
3100- putresp_pb.key_list().append(e.key())
3101-
3102- inter_time = time.time()
3103- logger.debug("Time spent in put after datastore call: " + str(inter_time - start_time))
3104- logger.debug( "PUT_RESPONSE:%s" % putresp_pb)
3105- return (putresp_pb.Encode(), 0, "")
3106-
3107-
3108- def get_request(self, app_id, appscale_version, http_request_data):
3109- global app_datastore
3110- getreq_pb = datastore_pb.GetRequest(http_request_data)
3111- logger.debug("GET_REQUEST: %s" % getreq_pb)
3112- getresp_pb = datastore_pb.GetResponse()
3113-
3114-
3115- for key in getreq_pb.key_list():
3116- key.set_app(app_id)
3117- last_path = key.path().element_list()[-1]
3118-
3119- if last_path.has_id():
3120- entity_id = last_path.id()
3121-
3122- if last_path.has_name():
3123- entity_id = last_path.name()
3124-
3125- if last_path.has_type():
3126- kind = last_path.type()
3127- logger.debug("get: %s___%s___%s %s" % (app_id, kind, appscale_version, str(entity_id)))
3128- table_name = getTableName(app_id, kind, appscale_version)
3129- row_key = getRowKey(app_id,key.path().element_list())
3130- print "Get row key:",row_key
3131- r = app_datastore.get_entity( table_name, row_key, ENTITY_TABLE_SCHEMA )
3132- err = r[0]
3133- if err not in ERROR_CODES or len(r) != 3:
3134- r = ["",None,NONEXISTANT_TRANSACTION]
3135- print err
3136- entity = r[1]
3137- prev_version = long(r[2])
3138-
3139- group = getresp_pb.add_entity()
3140- if entity:
3141- e_pb = entity_pb.EntityProto( entity )
3142- group.mutable_entity().CopyFrom(e_pb)
3143-
3144- # Send Response #
3145- print getresp_pb
3146- logger.debug("GET_RESPONSE: %s" % getresp_pb)
3147- return (getresp_pb.Encode(), 0, "")
3148-
3149- """ Deletes are just PUTs using a sentinal value of DELETED
3150- All deleted keys are DELETED/entity_group. This is for
3151- rollback to know which entity group a possible failed
3152- transaction belongs to.
3153- """
3154- def delete_request(self, app_id, appscale_version, http_request_data):
3155- global app_datastore
3156- root_key = None
3157- txn = None
3158- logger.debug("DeleteRequest Received...")
3159- delreq_pb = datastore_pb.DeleteRequest( http_request_data )
3160- logger.debug("DELETE_REQUEST: %s" % delreq_pb)
3161- delresp_pb = api_base_pb.VoidProto()
3162-
3163- for key in delreq_pb.key_list():
3164- key.set_app(app_id)
3165- last_path = key.path().element_list()[-1]
3166- if last_path.has_type():
3167- kind = last_path.type()
3168-
3169- row_key = getRowKey(app_id, key.path().element_list())
3170-
3171-
3172- table_name = getTableName(app_id, kind, appscale_version)
3173- res = app_datastore.delete_row( table_name,
3174- row_key)
3175- err = res[0]
3176- logger.debug("Response from DB for delete request %s" % err)
3177- if err not in ERROR_CODES:
3178- if DEBUG: print err
3179- return (delresp_pb.Encode(),
3180- datastore_pb.Error.INTERNAL_ERROR,
3181- err + ", Unable to write to journal")
3182-
3183- return (delresp_pb.Encode(), 0, "")
3184-
3185-
3186- def optimized_delete_request(self, app_id, appscale_version, http_request_data):
3187- pass
3188- def run_optimized_query(self, app_id, appscale_version, http_request_data):
3189- return
3190- def optimized_put_request(self, app_id, appscale_version, http_request_data):
3191- pass
3192-
3193- def void_proto(self, app_id, appscale_version, http_request_data):
3194- resp_pb = api_base_pb.VoidProto()
3195- print "Got void"
3196- logger.debug("VOID_RESPONSE: %s to void" % resp_pb)
3197- return (resp_pb.Encode(), 0, "" )
3198-
3199- def str_proto(self, app_id, appscale_version, http_request_data):
3200- str_pb = api_base_pb.StringProto( http_request_data )
3201- composite_pb = datastore_pb.CompositeIndices()
3202- print "Got a string proto"
3203- print str_pb
3204- logger.debug("String proto received: %s"%str_pb)
3205- logger.debug("CompositeIndex response to string: %s" % composite_pb)
3206- return (composite_pb.Encode(), 0, "" )
3207-
3208- def int64_proto(self, app_id, appscale_version, http_request_data):
3209- int64_pb = api_base_pb.Integer64Proto( http_request_data )
3210- resp_pb = api_base_pb.VoidProto()
3211- print "Got a int 64"
3212- print int64_pb
3213- logger.debug("Int64 proto received: %s"%int64_pb)
3214- logger.debug("VOID_RESPONSE to int64: %s" % resp_pb)
3215- return (resp_pb.Encode(), 0, "")
3216-
3217- def compositeindex_proto(self, app_id, appscale_version, http_request_data):
3218- compindex_pb = entity_pb.CompositeIndex( http_request_data)
3219- resp_pb = api_base_pb.VoidProto()
3220- print "Got Composite Index"
3221- print compindex_pb
3222- logger.debug("CompositeIndex proto recieved: %s"%str(compindex_pb))
3223- logger.debug("VOID_RESPONSE to composite index: %s" % resp_pb)
3224- return (resp_pb.Encode(), 0, "")
3225-
3226-# Returns 0 on success, 1 on failure
3227- def create_index_tables(self, app_id):
3228- global app_datastore
3229- """table_name = "__" + app_id + "__" + "kind"
3230- columns = ["reference"]
3231- print "Building table: " + table_name
3232- returned = app_datastore.create_table( table_name, columns )
3233- err,res = returned
3234- if err not in ERROR_CODES:
3235- logger.debug("%s" % err)
3236- return 1
3237- """
3238- table_name = "__" + app_id + "__" + "single_prop_asc"
3239- print "Building table: " + table_name
3240- columns = ["reference"]
3241- returned = app_datastore.create_table( table_name, columns )
3242- err,res = returned
3243- if err not in ERROR_CODES:
3244- logger.debug("%s" % err)
3245- return 1
3246-
3247- table_name = "__" + app_id + "__" + "single_prop_desc"
3248- print "Building table: " + table_name
3249- returned = app_datastore.create_table( table_name, columns )
3250- err,res = returned
3251- if err not in ERROR_CODES:
3252- logger.debug("%s" % err)
3253- return 1
3254-
3255- table_name = "__" + app_id + "__" + "composite"
3256- print "Building table: " + table_name
3257- returned = app_datastore.create_table( table_name, columns )
3258- err,res = returned
3259- if err not in ERROR_CODES:
3260- logger.debug("%s" % err)
3261- return 1
3262-
3263- return 0
3264-
3265- ##############
3266- # OTHER TYPE #
3267- ##############
3268- def unknown_request(self, app_id, appscale_version, http_request_data, pb_type):
3269- logger.debug("Received Unknown Protocol Buffer %s" % pb_type )
3270- print "ERROR: Received Unknown Protocol Buffer <" + pb_type +">.",
3271- print "Nothing has been implemented to handle this Protocol Buffer type."
3272- print "http request data:"
3273- print http_request_data
3274- print "http done"
3275- self.void_proto(app_id, appscale_version, http_request_data)
3276-
3277- ########################
3278- # GET Request Handling #
3279- ########################
3280- def do_GET( self ):
3281- self.send_error( 404 , 'File Not Found: %s' % self.path )
3282-
3283- #########################
3284- # POST Request Handling #
3285- #########################
3286- def do_POST( self ):
3287- start_time = time.time()
3288- http_request_data = self.rfile.read(int(self.headers.getheader('content-length')))
3289- inter_time = time.time()
3290- logger.debug("Timing for pre pre env setup:" + " " +
3291- str(start_time) + " " + str(inter_time) +
3292- " total time: " + str(inter_time - start_time) + "\n")
3293- print "intertime - starttime:",(str(inter_time - start_time))
3294- pb_type = self.headers.getheader( 'protocolbuffertype' )
3295- app_data = self.headers.getheader('appdata')
3296- app_data = app_data.split(':')
3297- logger.debug("POST len: %d" % len(app_data))
3298- inter_time = time.time()
3299- logger.debug("Timing for pre env setup:" + " " +
3300- str(start_time) + " " + str(inter_time) +
3301- " total time: " + str(inter_time - start_time) + "\n")
3302-
3303- if len(app_data) == 5:
3304- app_id, user_email, nick_name, auth_domain, appscale_version = app_data
3305- os.environ['AUTH_DOMAIN'] = auth_domain
3306- os.environ['USER_EMAIL'] = user_email
3307- os.environ['USER_NICKNAME'] = nick_name
3308- os.environ['APPLICATION_ID'] = app_id
3309- elif len(app_data) == 4:
3310- app_id, user_email, nick_name, auth_domain = app_data
3311- os.environ['AUTH_DOMAIN'] = auth_domain
3312- os.environ['USER_EMAIL'] = user_email
3313- os.environ['USER_NICKNAME'] = nick_name
3314- os.environ['APPLICATION_ID'] = app_id
3315- appscale_version = "1"
3316- elif len(app_data) == 2:
3317- app_id, appscale_version = app_data
3318- app_id = app_data[0]
3319- os.environ['APPLICATION_ID'] = app_id
3320- elif len(app_data) == 1:
3321- app_id = app_data[0]
3322- os.environ['APPLICATION_ID'] = app_id
3323- appscale_version = "1"
3324- else:
3325- logger.debug("UNABLE TO EXTRACT APPLICATION DATA")
3326- return
3327-
3328- # Default HTTP Response Data #
3329- logger.debug("For app id: " + app_id)
3330- logger.debug("For app version: " + appscale_version)
3331- inter_time = time.time()
3332- logger.debug("Timing for env setup:" + pb_type + " " +
3333- app_id + " " + str(start_time) + " " +
3334- str(inter_time) + " total time: " + str(inter_time - start_time) + "\n")
3335-
3336- if pb_type == "Request":
3337- self.remote_request(app_id, appscale_version, http_request_data)
3338- else:
3339- self.unknown_request(app_id, appscale_version, http_request_data, pb_type)
3340-
3341- stop_time = time.time()
3342-
3343- #if logOn == True:
3344- #logFilePtr.write(pb_type + " " + app_id + " " +
3345- #str(start_time) + " " + str(stop_time) + " total time: " +
3346- #str(stop_time - start_time) + "\n")
3347-
3348- logger.debug(pb_type + " " + app_id + " " + str(start_time) + " " +
3349- str(stop_time) + " total time: " + str(stop_time - start_time) + "\n")
3350-
3351-class AppScaleUnSecureServerThreaded( ThreadingMixIn, HTTPServer):
3352- pass
3353-
3354-class AppScaleSecureServerThreaded( ThreadingMixIn, HTTPServer ):
3355- def __init__( self):
3356- global local_server_address
3357- global HandlerClass
3358- global ssl_cert_file
3359- global ssl_key_file
3360- BaseServer.__init__( self, local_server_address, HandlerClass )
3361- ctx = SSL.Context()
3362- ctx.load_cert( ssl_cert_file, ssl_key_file )
3363- self.socket = SSL.Connection( ctx )
3364- self.server_bind()
3365- self.server_activate()
3366-
3367-def usage():
3368- print "AppScale Server"
3369- print
3370- print "Options:"
3371- print "\t--certificate=<path-to-ssl-certificate>"
3372- print "\t--a=<soap server hostname> "
3373- print "\t--key for using keys from the soap server"
3374- print "\t--type=<hypertable, hbase, cassandra, mysql, mongodb>"
3375- print "\t--secret=<secrete to soap server>"
3376- print "\t--blocksize=<key-block-size>"
3377- print "\t--optimized_query"
3378- print "\t--no_encryption"
3379-def main(argv):
3380- global app_datastore
3381- global getKeyFromServer
3382- global tableServer
3383- global keySecret
3384- global logOn
3385- global logFilePtr
3386- global optimizedQuery
3387- global soapServer
3388- global ERROR_CODES
3389- global VALID_DATASTORES
3390- global KEYBLOCKSIZE
3391- cert_file = CERT_LOCATION
3392- key_file = KEY_LOCATION
3393- db_type = "hypertable"
3394- port = DEFAULT_SSL_PORT
3395- isEncrypted = True
3396- try:
3397- opts, args = getopt.getopt( argv, "c:t:l:s:b:a:k:p:o:n:z",
3398- ["certificate=",
3399- "type=",
3400- "log=",
3401- "secret=",
3402- "blocksize=",
3403- "soap=",
3404- "key",
3405- "port",
3406- "optimized_query",
3407- "no_encryption",
3408- "zoo_keeper"] )
3409- except getopt.GetoptError:
3410- usage()
3411- sys.exit(1)
3412- for opt, arg in opts:
3413- if opt in ("-c", "--certificate"):
3414- cert_file = arg
3415- print "Using cert..."
3416- elif opt in ("-k", "--key" ):
3417- getKeyFromServer = True
3418- print "Using key server..."
3419- elif opt in ("-t", "--type"):
3420- db_type = arg
3421- print "Datastore type: ",db_type
3422- elif opt in ("-s", "--secret"):
3423- keySecret = arg
3424- print "Secret set..."
3425- elif opt in ("-l", "--log"):
3426- logOn = True
3427- logFile = arg
3428- logFilePtr = open(logFile, "w")
3429- logFilePtr.write("# type, app, start, end\n")
3430- elif opt in ("-b", "--blocksize"):
3431- KEYBLOCKSIZE = arg
3432- print "Block size: ",KEYBLOCKSIZE
3433- elif opt in ("-a", "--soap"):
3434- soapServer = arg
3435- elif opt in ("-o", "--optimized_query"):
3436- optimizedQuery = True
3437- elif opt in ("-p", "--port"):
3438- port = int(arg)
3439- elif opt in ("-n", "--no_encryption"):
3440- isEncrypted = False
3441- elif opt in ("-z", "--zoo_keeper"):
3442- print "This version does not use zoo keeper"
3443- exit(1)
3444-
3445- app_datastore = appscale_datastore.DatastoreFactory.getDatastore(db_type)
3446- ERROR_CODES = appscale_datastore.DatastoreFactory.error_codes()
3447- VALID_DATASTORES = appscale_datastore.DatastoreFactory.valid_datastores()
3448- if DEBUG: print "ERROR_CODES:"
3449- if DEBUG: print ERROR_CODES
3450- if DEBUG: print "VALID_DATASTORE:"
3451- if DEBUG: print VALID_DATASTORES
3452- if db_type in VALID_DATASTORES:
3453- logger.debug("Using datastore %s" % db_type)
3454- else:
3455- print "Unknown datastore "+ db_type
3456- exit(1)
3457-
3458- tableServer = SOAPpy.SOAPProxy("https://" + soapServer + ":" + str(keyPort))
3459-
3460- # # Bind Port #
3461- #server = AppScaleSecureServer( ('',DEFAULT_SSL_PORT),
3462- # AppScaleSecureHandler, cert_file, key_file )
3463- #help(ThreadedHTTPServer)
3464- global local_server_address
3465- global HandlerClass
3466- global ssl_cert_file
3467- global ssl_key_file
3468- global keyDictionaryLock
3469-
3470- keyDictionaryLock = threading.Lock()
3471- if port == DEFAULT_SSL_PORT and not isEncrypted:
3472- port = DEFAULT_PORT
3473- local_server_address = ('',port)
3474- HandlerClass = AppScaleSecureHandler
3475- ssl_cert_file = cert_file
3476- ssl_key_file = key_file
3477- if isEncrypted:
3478- server = AppScaleSecureServerThreaded()
3479- else:
3480- server = AppScaleUnSecureServerThreaded(local_server_address, HandlerClass)
3481- sa = server.socket.getsockname()
3482- if not db_type == "timesten":
3483- # Stop running as root, security purposes #
3484- drop_privileges()
3485- logger.debug("\n\nStarting AppScale-Secure-Server on %s:%s" % (sa[0], sa[1]))
3486-
3487- while 1:
3488- try:
3489- # Start Server #
3490- server.serve_forever()
3491- except SSL.SSLError:
3492- logger.debug("\n\nUnexcepted input for AppScale-Secure-Server on %s:%s" % (sa[0], sa[1]))
3493- except KeyboardInterrupt:
3494- server.socket.close()
3495- print "Server interrupted by user, terminating..."
3496- exit(1)
3497-
3498-if __name__ == '__main__':
3499- #cProfile.run("main(sys.argv[1:])")
3500- main(sys.argv[1:])
3501
3502=== removed file 'AppDB/as_transaction.py'
3503--- AppDB/as_transaction.py 2010-05-10 21:08:10 +0000
3504+++ AppDB/as_transaction.py 1970-01-01 00:00:00 +0000
3505@@ -1,241 +0,0 @@
3506-import threading
3507-import time
3508-import sys
3509-from google.net.proto import ProtocolBuffer
3510-from google.appengine.datastore import datastore_pb
3511-from google.appengine.runtime import apiproxy_errors
3512-GC_PERIOD = 100
3513-TRANS_TIMEOUT = 60
3514-class ASTransaction:
3515- def __init__(self, handle, entity_group = None, lease = 0):
3516- self.handle = handle
3517- self.group = entity_group
3518- self.lease = lease
3519- if lease == 0:
3520- self.lease = time.time() + TRANS_TIMEOUT
3521- self.change_set = None
3522-
3523-
3524- def addModifiedKey(self, key):
3525- if not self.change_set:
3526- self.change_set = set()
3527- self.change_set.add(key)
3528-
3529-
3530- def getModifiedKeysCopy(self):
3531- if self.change_set == None:
3532- return None
3533- return self.change_set.copy()
3534-
3535-
3536- def __str__(self):
3537- s = "handle: " + str(self.handle) + "\n"
3538- s += "group: " + str(self.group) + "\n"
3539- s += "lease:" + str(self.lease) + "\n"
3540- s += "change set:" + str(self.change_set)
3541- return s
3542-
3543-
3544- def __def__(self):
3545- del self.change_set
3546-
3547-
3548-
3549-#Thread safe set
3550-class ASTransSet:
3551- def __init__(self):
3552- self._txes = {}
3553- self._txes_lock = threading.Lock()
3554- self.startGC()
3555-
3556- def startGC(self):
3557- self.gcthread = threading.Thread(target = self.__gcRunner)
3558- self.gcthread.start()
3559-
3560- def __gcRunner(self):
3561-
3562- while 1:
3563- time.sleep(GC_PERIOD)
3564- del_list = []
3565- cur_time = time.time()
3566- self._txes_lock.acquire()
3567- for ii in self._txes:
3568- if self._txes[ii].lease <= cur_time:
3569- del_list.append(ii)
3570- # iterate backwards to make deletes safe
3571- del_list.reverse()
3572- for ii in del_list:
3573- del self._txes[ii]
3574- self._txes_lock.release()
3575-
3576-
3577- # Takes a transaction_pb
3578- # Checks to see if the transaction exist and if
3579- # it has timed out
3580- def isValid(self, txn):
3581- handle = None
3582- if not txn.has_handle():
3583- return False
3584-
3585- handle = txn.handle()
3586- curtime = time.time()
3587- self._txes_lock.acquire()
3588- if handle not in self._txes:
3589- self._txes_lock.release()
3590- return False
3591- else:
3592- if self._txes[handle].lease <= curtime:
3593- self._txes_lock.release()
3594- return False
3595- self._txes_lock.release()
3596-
3597- return True
3598-
3599-
3600- # Takes a transaction_pb
3601- def needsLock(self, txn):
3602- handle = txn.handle()
3603- self._txes_lock.acquire()
3604- txn_copy = self._txes[handle]
3605- if txn_copy.group == None:
3606- self._txes_lock.release()
3607- return True
3608- else:
3609- self._txes_lock.release()
3610- return False
3611-
3612-
3613- def hasLockExpired(self, txn):
3614- cur_time = time.time()
3615- handle = txn.handle()
3616- self._txes_lock.acquire()
3617- txn_copy = self._txes[handle]
3618- if txn_copy.lease <= cur_time:
3619- print "Cur",str(cur_time),"lease",str(txn_copy.lease)
3620- self._txes_lock.release()
3621- return True
3622- else:
3623- self._txes_lock.release()
3624- return False
3625-
3626-
3627- # Takes a transaction_pb
3628- def purge(self, txn):
3629- if not txn:
3630- return
3631- handle = txn.handle()
3632- self._txes_lock.acquire()
3633- if handle in self._txes:
3634- del self._txes[handle]
3635- self._txes_lock.release()
3636-
3637-
3638- # Takes a transaction_pb
3639- def getGroup(self, txn):
3640- handle = txn.handle()
3641- group = None
3642- self._txes_lock.acquire()
3643- if handle in self._txes:
3644- group = self._txes[handle].group
3645- self._txes_lock.release()
3646- return group
3647-
3648- # Takes a transaction_pb
3649- def add(self, txn):
3650- handle = txn.handle()
3651- tx = ASTransaction(handle)
3652- self._txes_lock.acquire()
3653- if handle in self._txes:
3654- self._txes_lock.release()
3655- return False
3656- self._txes[handle] = tx
3657- self._txes_lock.release()
3658- return True
3659-
3660-
3661- def setLease(self, txn, lease):
3662- handle = txn.handle()
3663- self._txes_lock.acquire()
3664- txn_copy = self._txes[handle]
3665- txn_copy.lease = lease
3666- self._txes[handle] = txn_copy
3667- self._txes_lock.release()
3668-
3669-
3670- def addChangeSet(self, txn, key):
3671- handle = txn.handle()
3672- self._txes_lock.acquire()
3673- txn_copy = self._txes[handle]
3674- txn_copy.addModifiedKey(key)
3675- self._txes[handle] = txn_copy
3676- self._txes_lock.release()
3677-
3678-
3679- def getChangeSet(self, txn):
3680- handle = txn.handle()
3681- self._txes_lock.acquire()
3682- txn_copy = self._txes[handle]
3683- change_set = txn_copy.getModifiedKeysCopy()
3684- self._txes_lock.release()
3685- return change_set
3686-
3687-
3688- def setGroup(self, txn, group):
3689- handle = txn.handle()
3690- self._txes_lock.acquire()
3691- txn_copy = self._txes[handle]
3692- txn_copy.group = group
3693- self._txes[handle] = txn_copy
3694- self._txes_lock.release()
3695-
3696- def printSet(self):
3697- print "============================="
3698- for ii in self._txes:
3699- print self._txes[ii]
3700- print "============================="
3701-
3702-
3703-######################
3704-# Unit testing
3705-######################
3706-def main(argv):
3707- tran_pb = datastore_pb.Transaction()
3708- tran_pb.set_handle(500)
3709- tran_set = ASTransSet()
3710- tran_set.add(tran_pb)
3711- print "Needs the lock without the group set:" + str(tran_set.needsLock(tran_pb))
3712- tran_set.setGroup(tran_pb, "my_group")
3713- print "Does not Need lock with the group set:" + str(tran_set.needsLock(tran_pb))
3714- tran_set.setLease(tran_pb, time.time() + .02)
3715- tran_set.addChangeSet(tran_pb, "first key")
3716- tran_set.addChangeSet(tran_pb, "second key")
3717- tran_set.printSet()
3718- print "Has the lock expired (no)?:" + str(tran_set.hasLockExpired(tran_pb))
3719- time.sleep(1)
3720- print "Has the lock expired (yes)?:" + str(tran_set.hasLockExpired(tran_pb))
3721- print "******************"
3722-
3723- tran_pb = datastore_pb.Transaction()
3724- tran_pb.set_handle(600)
3725- tran_set.add(tran_pb)
3726- print "Needs the lock without the group set:" + str(tran_set.needsLock(tran_pb))
3727- tran_set.setGroup(tran_pb, "my_group")
3728- print "Does not Need lock with the group set:" + str(tran_set.needsLock(tran_pb))
3729- print "Group is: " + str(tran_set.getGroup(tran_pb))
3730- tran_set.printSet()
3731- print "******************"
3732- print "Removing 600"
3733- print "******************"
3734-
3735- tran_set.purge(tran_pb)
3736- tran_set.printSet()
3737- tran_pb.set_handle(500)
3738- print "500 valid (yes) ?: " + str(tran_set.isValid(tran_pb))
3739- change_set = tran_set.getChangeSet(tran_pb)
3740- print "Change set: " + str(change_set)
3741- tran_set.purge(tran_pb)
3742- print "Change set after purge (same): " + str(change_set)
3743-
3744-
3745-if __name__ == '__main__':
3746- main(sys.argv[1:])
3747
3748=== modified file 'AppDB/cassandra/py_cassandra.py'
3749--- AppDB/cassandra/py_cassandra.py 2010-05-21 07:13:04 +0000
3750+++ AppDB/cassandra/py_cassandra.py 2010-06-29 01:49:23 +0000
3751@@ -1,11 +1,9 @@
3752 #
3753 # Cassandra Interface for AppScale
3754-#
3755+# Rewritten by Navraj Chohan for using range queries
3756 # Modified by Chris Bunch for upgrade to Cassandra 0.50.0
3757 # on 2/17/10
3758-
3759-__author__="Soo Hwan Park (suwanny@gmail.com)"
3760-__date__="$2009.6.4 19:44:00$"
3761+# Original author: suwanny@gmail.com
3762
3763 import os,sys
3764 import time
3765@@ -14,11 +12,9 @@
3766 from thrift_cass.ttypes import *
3767
3768 import string
3769-import sys, os
3770 import base64 # base64 2009.04.16
3771 from dbconstants import *
3772 from dbinterface import *
3773-from dhash_datastore import *
3774 import sqlalchemy.pool as pool
3775 import appscale_logger
3776
3777@@ -26,9 +22,12 @@
3778 from thrift.transport import TSocket
3779 from thrift.transport import TTransport
3780 from thrift.protocol import TBinaryProtocol
3781-
3782 ERROR_DEFAULT = "DB_ERROR:" # ERROR_CASSANDRA
3783-
3784+# Store all schema information in a special table
3785+# If a table does not show up in this table, try a range query
3786+# to discover it's schema
3787+SCHEMA_TABLE = "__key__"
3788+SCHEMA_TABLE_SCHEMA = ['schema']
3789 # use 1 Table and 1 ColumnFamily in Cassandra
3790 MAIN_TABLE = "Keyspace1"
3791 COLUMN_FAMILY = "Standard1"
3792@@ -44,71 +43,219 @@
3793 CONSISTENCY_QUORUM = 2
3794 CONSISTENCY_ALL = 5 # don't use this for reads (next version may fix this)
3795
3796-class DatastoreProxy(DHashDatastore):
3797+MAX_ROW_COUNT = 10000000
3798+table_cache = {}
3799+class DatastoreProxy(AppDBInterface):
3800 def __init__(self, logger = appscale_logger.getLogger("datastore-cassandra")):
3801- DHashDatastore.__init__(self, logger)
3802 # TODO: is this correct?
3803 f = open(APPSCALE_HOME + '/.appscale/my_public_ip', 'r')
3804 self.host = f.read()
3805 # self.host = host
3806 self.port = DEFAULT_PORT
3807- self.logger.debug("Cassandra connection init to %s:%d" % (self.host, self.port))
3808 self.pool = pool.QueuePool(self.__create_connection, reset_on_return=False)
3809+ self.logger = logger
3810
3811 def logTiming(self, function, start_time, end_time):
3812 if PROFILING:
3813 self.logger.debug(function + ": " + str(end_time - start_time) + " s")
3814-
3815- ######################################################################
3816- # Cassandra specific methods
3817- ######################################################################
3818-
3819- def get(self, key, column = DEFAULT_VALUE):
3820- value = None
3821- client = None
3822- try:
3823- path = ColumnPath(COLUMN_FAMILY, column=column)
3824- client = self.__setup_connection()
3825- response = client.get(MAIN_TABLE, key, path, CONSISTENCY_QUORUM)
3826-# transport.close()
3827-
3828- value = response.column.value
3829- except NotFoundException: # occurs normally if the item isn't in the db
3830- return value
3831- except Exception, ex:
3832- self.logger.debug("Exception %s" % ex)
3833- self.__close_connection(client)
3834- return value
3835-
3836- def put(self, key, value, column = DEFAULT_VALUE):
3837- client = None
3838- try:
3839- path = ColumnPath(COLUMN_FAMILY, column=column)
3840- client = self.__setup_connection()
3841- client.insert(MAIN_TABLE, key, path, value, self.timestamp(), CONSISTENCY_QUORUM)
3842-# transport.close()
3843- except Exception, e :
3844- self.logger.debug("put key:%s, column: %s value:%s" % (key, column, value))
3845- self.__close_connection(client)
3846-
3847- def remove(self, key, column = ""):
3848- client = None
3849- try:
3850- client = self.__setup_connection()
3851-
3852- # FIXME: cgb: you'd think that you could just remove the if / else
3853- # here, but for some reason it messes up delete if you do
3854- # look into why that is
3855- if column:
3856- path = ColumnPath(COLUMN_FAMILY, column=column)
3857- client.remove(MAIN_TABLE, key, path, self.timestamp(), CONSISTENCY_QUORUM)
3858- else:
3859- path = ColumnPath(COLUMN_FAMILY)
3860- client.remove(MAIN_TABLE, key, path, self.timestamp(), CONSISTENCY_QUORUM)
3861-
3862- except Exception, ex:
3863- self.logger.debug("Exception %s" % ex)
3864- self.__close_connection(client)
3865+
3866+ def get_entity(self, table_name, row_key, column_names):
3867+ error = [ERROR_DEFAULT]
3868+ list = error
3869+ client = None
3870+ row_key = table_name + '/' + row_key
3871+ try:
3872+ slice_predicate = SlicePredicate(column_names=column_names)
3873+ path = ColumnPath(COLUMN_FAMILY)
3874+ client = self.__setup_connection()
3875+ # Result is a column type which has name, value, timestamp
3876+ result = client.get_slice(MAIN_TABLE, row_key, path, slice_predicate,
3877+ CONSISTENCY_QUORUM)
3878+ for column in column_names:
3879+ for r in result:
3880+ c = r.column
3881+ if column == c.name:
3882+ list.append(c.value)
3883+ except NotFoundException: # occurs normally if the item isn't in the db
3884+ list[0] += "Not found"
3885+ self.__close_connection(client)
3886+ return list
3887+ except Exception, ex:
3888+ #self.logger.debug("Exception %s" % ex)
3889+ list[0]+=("Exception: %s"%ex)
3890+ self.__close_connection(client)
3891+ return list
3892+ self.__close_connection(client)
3893+ if len(list) == 1:
3894+ list[0] += "Not found"
3895+ return list
3896+
3897+
3898+ def put_entity(self, table_name, row_key, column_names, cell_values):
3899+ error = [ERROR_DEFAULT]
3900+ list = error
3901+ client = None
3902+
3903+ # The first time a table is seen
3904+ if table_name not in table_cache:
3905+ self.create_table(table_name, column_names)
3906+
3907+ row_key = table_name + '/' + row_key
3908+ try:
3909+ client = self.__setup_connection()
3910+ curtime = self.timestamp()
3911+ # Result is a column type which has name, value, timestamp
3912+ mutations = []
3913+ for index, ii in enumerate(column_names):
3914+ column = Column(name = ii, value=cell_values[index],
3915+ timestamp=curtime)
3916+ c_or_sc = ColumnOrSuperColumn(column=column)
3917+ mutation = Mutation(column_or_supercolumn=c_or_sc)
3918+ mutations.append(mutation)
3919+ mutation_map = {row_key : { COLUMN_FAMILY : mutations } }
3920+ client.batch_mutate(MAIN_TABLE, mutation_map,
3921+ CONSISTENCY_QUORUM)
3922+ except Exception, ex:
3923+ self.logger.debug("Exception %s" % ex)
3924+ list[0]+=("Exception: %s"%ex)
3925+ self.__close_connection(client)
3926+ list.append("0")
3927+ return list
3928+ self.__close_connection(client)
3929+ list.append("0")
3930+ return list
3931+
3932+ def put_entity_dict(self, table_name, row_key, value_dict):
3933+ raise NotImplementedError("put_entity_dict is not implemented in %s." % self.__class__)
3934+
3935+
3936+ def get_table(self, table_name, column_names):
3937+ error = [ERROR_DEFAULT]
3938+ client = None
3939+ result = error
3940+ keyslices = []
3941+ column_parent = ColumnParent(column_family="Standard1")
3942+ predicate = SlicePredicate(column_names=column_names)
3943+ start_key = table_name
3944+ end_key = table_name + '~'
3945+ try:
3946+ client = self.__setup_connection()
3947+ keyslices = client.get_range_slice(MAIN_TABLE,
3948+ column_parent,
3949+ predicate,
3950+ start_key,
3951+ end_key,
3952+ MAX_ROW_COUNT,
3953+ CONSISTENCY_QUORUM)
3954+ except Exception, ex:
3955+ self.logger.debug("Exception %s" % ex)
3956+ result[0] += "Exception: " + str(ex)
3957+ self.__close_connection(client)
3958+ return result
3959+ for keyslice in keyslices:
3960+ ordering_dict = {}
3961+ for c in keyslice.columns:
3962+ column = c.column
3963+ value = column.value
3964+ ordering_dict[column.name] = value
3965+ if len(ordering_dict) == 0:
3966+ continue
3967+ for column in column_names:
3968+ try:
3969+ result.append(ordering_dict[column])
3970+ except:
3971+ result[0] += "Key error, get_table did not return the correct schema"
3972+ self.__close_connection(client)
3973+ return result
3974+
3975+ def delete_row(self, table_name, row_key):
3976+ error = [ERROR_DEFAULT]
3977+ ret = error
3978+ client = None
3979+ row_key = table_name + '/' + row_key
3980+ path = ColumnPath(COLUMN_FAMILY)
3981+ try:
3982+ client = self.__setup_connection()
3983+ curtime = self.timestamp()
3984+ # Result is a column type which has name, value, timestamp
3985+ client.remove(MAIN_TABLE, row_key, path, curtime,
3986+ CONSISTENCY_QUORUM)
3987+ except Exception, ex:
3988+ self.logger.debug("Exception %s" % ex)
3989+ ret[0]+=("Exception: %s"%ex)
3990+ self.__close_connection(client)
3991+ return ret
3992+ self.__close_connection(client)
3993+ ret.append("0")
3994+ return ret
3995+
3996+ def get_schema(self, table_name):
3997+ error = [ERROR_DEFAULT]
3998+ result = error
3999+ ret = self.get_entity(SCHEMA_TABLE,
4000+ table_name,
4001+ SCHEMA_TABLE_SCHEMA)
4002+ if len(ret) > 1:
4003+ schema = ret[1]
4004+ else:
4005+ error[0] = ret[0] + "--unable to get schema"
4006+ return error
4007+ schema = schema.split(':')
4008+ result = result + schema
4009+ return result
4010+
4011+
4012+ def delete_table(self, table_name):
4013+ error = [ERROR_DEFAULT]
4014+ result = error
4015+ keyslices = []
4016+ column_parent = ColumnParent(column_family="Standard1")
4017+ predicate = SlicePredicate(column_names=[])
4018+ curtime = self.timestamp()
4019+ path = ColumnPath(COLUMN_FAMILY)
4020+ start_key = table_name
4021+ end_key = table_name + '~'
4022+ try:
4023+ client = self.__setup_connection()
4024+ keyslices = client.get_range_slice(MAIN_TABLE,
4025+ column_parent,
4026+ predicate,
4027+ start_key,
4028+ end_key,
4029+ MAX_ROW_COUNT,
4030+ CONSISTENCY_QUORUM)
4031+ except Exception, ex:
4032+ self.logger.debug("Exception %s" % ex)
4033+ result[0]+=("Exception: %s"%ex)
4034+ self.__close_connection(client)
4035+ return result
4036+ keys_removed = False
4037+ for keyslice in keyslices:
4038+ row_key = keyslice.key
4039+ client.remove(MAIN_TABLE,
4040+ row_key,
4041+ path,
4042+ curtime,
4043+ CONSISTENCY_QUORUM)
4044+ keys_removed = True
4045+ if table_name not in table_cache and keys_removed:
4046+ result[0] += "Table does not exist"
4047+ return result
4048+ if table_name in table_cache:
4049+ del table_cache[table_name]
4050+
4051+ self.__close_connection(client)
4052+ return result
4053+
4054+ # Only stores the schema
4055+ def create_table(self, table_name, column_names):
4056+ table_cache[table_name] = 1
4057+ columns = ':'.join(column_names)
4058+ row_key = table_name
4059+ # Get and make sure we are not overwriting previous schemas
4060+ ret = self.get_entity(SCHEMA_TABLE, row_key, SCHEMA_TABLE_SCHEMA)
4061+ if ret[0] != ERROR_DEFAULT:
4062+ self.put_entity(SCHEMA_TABLE, row_key, SCHEMA_TABLE_SCHEMA, [columns])
4063
4064 ######################################################################
4065 # private methods
4066
4067=== modified file 'AppDB/cassandra/templates/storage-conf.xml'
4068--- AppDB/cassandra/templates/storage-conf.xml 2010-04-21 11:16:18 +0000
4069+++ AppDB/cassandra/templates/storage-conf.xml 2010-06-29 01:49:23 +0000
4070@@ -178,7 +178,7 @@
4071 ~ directories, since the partitioner can modify the sstable on-disk
4072 ~ format.
4073 -->
4074- <Partitioner>org.apache.cassandra.dht.RandomPartitioner</Partitioner>
4075+ <Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>
4076
4077 <!--
4078 ~ If you are using an order-preserving partitioner and you know your key
4079
4080=== modified file 'AppDB/datastore_tester.py'
4081--- AppDB/datastore_tester.py 2009-12-02 20:15:14 +0000
4082+++ AppDB/datastore_tester.py 2010-06-29 01:49:23 +0000
4083@@ -41,10 +41,10 @@
4084 #print "columns= " + str(columns)
4085 #print "data= " + str(data)
4086 print "table= " + table_name
4087-app_datastore = appscale_datastore.Datastore(datastore_type)
4088-
4089-ERROR_CODES = app_datastore.error_codes()
4090-if datastore_type not in app_datastore.valid_datastores():
4091+app_datastore = appscale_datastore.DatastoreFactory.getDatastore(datastore_type)
4092+ERROR_CODES = appscale_datastore.DatastoreFactory.error_codes()
4093+VALID_DATASTORES = appscale_datastore.DatastoreFactory.valid_datastores()
4094+if datastore_type not in VALID_DATASTORES:
4095 print "Bad selection for datastore. Valid selections are:"
4096 print app_datastore.valid_datastores()
4097 exit(1)
4098@@ -127,10 +127,12 @@
4099 #################################################
4100 # Get and a delete on a table that does not exist
4101 #################################################
4102+# There is too much overhead in checking to see if the table exists
4103+# for cassandra
4104 invalid_table = hf.randomString(10)
4105-ret = app_datastore.delete_row(invalid_table, key)
4106-if ret[0] in ERROR_CODES:
4107- err(hf.lineno(), ret)
4108+#ret = app_datastore.delete_row(invalid_table, key)
4109+#if ret[0] in ERROR_CODES:
4110+# err(hf.lineno(), ret)
4111 ret = app_datastore.get_entity(invalid_table, key, columns)
4112 if ret[0] in ERROR_CODES:
4113 err(hf.lineno(), ret)
4114@@ -186,9 +188,19 @@
4115 # Get schema on a table that exist, and one that doesnt
4116 #######################################################
4117 ret = app_datastore.get_schema(table_name)
4118-if ret[0] not in ERROR_CODES or ret[1:] != columns:
4119+if ret[0] not in ERROR_CODES or (ret[1:]).sort() != columns.sort():
4120+ print "ret[1:]:",ret[1:].sort
4121+ print "columns:",columns.sort
4122 err(hf.lineno(), ret)
4123 ret = app_datastore.get_schema(invalid_table)
4124 if ret[0] in ERROR_CODES:
4125 err(hf.lineno(), ret)
4126+################################################
4127+# Get data from a table that does not exist
4128+# Should return an empty list
4129+################################################
4130+ret = app_datastore.get_table(invalid_table, columns)
4131+if ret[0] not in ERROR_CODES and len(ret) != 1:
4132+ err(hf.lineno(), ret)
4133+
4134 print "SUCCESS"
4135
4136=== modified file 'AppDB/dbinterface.py'
4137--- AppDB/dbinterface.py 2010-06-25 00:08:24 +0000
4138+++ AppDB/dbinterface.py 2010-06-29 01:49:23 +0000
4139@@ -7,19 +7,19 @@
4140 import os
4141
4142 class AppDBInterface:
4143- def get_entity(self, table_name, row_key, column_names):
4144+ def get_entity(self, table_name, row_key, column_names, txnid = 0):
4145 raise NotImplementedError("get_entity is not implemented in %s." % self.__class__)
4146
4147- def put_entity(self, table_name, row_key, column_names, cell_values):
4148+ def put_entity(self, table_name, row_key, column_names, cell_values, txnid = 0):
4149 raise NotImplementedError("put_entity is not implemented in %s." % self.__class__)
4150
4151 def put_entity_dict(self, table_name, row_key, value_dict):
4152 raise NotImplementedError("put_entity_dict is not implemented in %s." % self.__class__)
4153
4154- def get_table(self, table_name, column_names):
4155+ def get_table(self, table_name, column_names, txnid = 0):
4156 raise NotImplementedError("get_table is not implemented in %s." % self.__class__)
4157
4158- def delete_row(self, table_name, row_id):
4159+ def delete_row(self, table_name, row_id, txnid = 0):
4160 raise NotImplementedError("delete_row is not implemented in %s." % self.__class__)
4161
4162 def get_schema(self, table_name):
4163@@ -28,6 +28,14 @@
4164 def delete_table(self, table_name):
4165 raise NotImplementedError("delete_table is not implemented in %s." % self.__class__)
4166
4167+ def commit(self, txnid):
4168+ raise NotImplementedError("commit is not implemented in %s." % self.__class__)
4169+ def rollback(self, txnid):
4170+ raise NotImplementedError("rollback is not implemented in %s." % self.__class__)
4171+ def setupTransaction(self, txnid):
4172+ raise NotImplementedError("rollback is not implemented in %s." % self.__class__)
4173+
4174+
4175 def get_local_ip(self):
4176 try:
4177 local_ip = self.__local_ip
4178
4179=== modified file 'AppDB/dhash_datastore.py'
4180--- AppDB/dhash_datastore.py 2010-05-05 23:27:52 +0000
4181+++ AppDB/dhash_datastore.py 2010-06-29 01:49:23 +0000
4182@@ -217,9 +217,9 @@
4183 if self.remove_key(row_id, table_name):
4184 self.remove(internal_key)
4185 return elist
4186- else:
4187- elist[0] += "row doesn't exist"
4188- return elist
4189+ #else:
4190+ elist.append("0")
4191+ return elist
4192
4193 def get_schema(self, table_name):
4194 self.logger.debug("get_schema: %s" % table_name)
4195
4196=== modified file 'AppDB/hbase/py_hbase.py'
4197--- AppDB/hbase/py_hbase.py 2010-05-21 07:13:04 +0000
4198+++ AppDB/hbase/py_hbase.py 2010-06-29 01:49:23 +0000
4199@@ -1,11 +1,9 @@
4200 #Author: Navraj Chohan
4201
4202-import os,sys
4203+import os
4204
4205 import Hbase
4206 import ttypes
4207-import string
4208-import threading
4209 import time
4210 from thrift import Thrift
4211 from thrift.transport import TSocket
4212@@ -85,15 +83,12 @@
4213 column_names = client.getColumnDescriptors(table_name)
4214 keys = column_names.keys()
4215
4216- table = get_table(table_name, keys)
4217+ table = self.get_table(table_name, keys)
4218 value = (len(table) - 1)/(len(keys))
4219 elist.append(value)
4220 except ttypes.IOError, io:
4221 elist[0] = elist[0] + "Row Count. IO Error--" + io.message
4222 value = 0
4223- except ttypes.IOException, io:
4224- elist[0] = elist[0] + "Row Count. IO Exception--" + io.message
4225- value = 0
4226 self.__closeConnection(client)
4227 et = time.time()
4228 self.logTiming("HB ROWCOUNT", st, et)
4229@@ -272,7 +267,10 @@
4230 client.scannerClose(scanner)
4231 except ttypes.IOError, io:
4232 if io.message:
4233- elist[0] += "IO Error--" + str(io.message)
4234+ if io.message == table_name:
4235+ pass # Return an empty table
4236+ else:
4237+ elist[0] += "IO Error--" + str(io.message)
4238 else:
4239 elist[0] += "IOError"
4240 except ttypes.IllegalArgument, e:
4241
4242=== modified file 'AppDB/helper_functions.py'
4243--- AppDB/helper_functions.py 2009-12-01 07:13:55 +0000
4244+++ AppDB/helper_functions.py 2010-06-29 01:49:23 +0000
4245@@ -57,7 +57,7 @@
4246
4247 def randomString(length):
4248 s = hashlib.sha256()
4249- ret = ""
4250+ ret = "a"
4251 while len(ret) < length:
4252 s.update(str(random.random()))
4253 ret += s.hexdigest()
4254
4255=== modified file 'AppDB/hypertable/py_hypertable.py'
4256--- AppDB/hypertable/py_hypertable.py 2010-05-21 07:13:04 +0000
4257+++ AppDB/hypertable/py_hypertable.py 2010-06-29 01:49:23 +0000
4258@@ -142,6 +142,8 @@
4259 if PROFILING:
4260 self.logger.debug("HT GET: %s"%str(endtime - starttime))
4261 self.__closeConnection(client)
4262+ if len(elist) == 1:
4263+ elist[0] += "Not Found"
4264 return elist
4265
4266 def delete_table(self, table_name):
4267@@ -173,7 +175,10 @@
4268 if not self.__table_exist(table_name):
4269 query = "create table " + table_name + "( " + ', '.join(column_names) + ")"
4270 print "create table query=%s" % query
4271- ret = client.hql_query(query)
4272+ try:
4273+ ret = client.hql_query(query)
4274+ except Exception, e:
4275+ print e.message
4276 # the xml schema is not working currectly.
4277 # schema = "<Schema><AccessGroup name=\"default\"><ColumnFamily>"
4278 # for column in column_names:
4279@@ -269,7 +274,8 @@
4280 if res[ii].column_family in column_names:
4281 elist += [res[ii].value]
4282 except:
4283- elist[0] += "Not Found"
4284+ pass
4285+ #elist[0] += "Not Found"
4286 endtime = time.time()
4287 if PROFILING:
4288 self.logger.debug("HT GET_TABLE: %s"%str(endtime - starttime))
4289
4290=== modified file 'AppDB/mongodb/py_mongodb.py'
4291--- AppDB/mongodb/py_mongodb.py 2010-04-08 23:58:23 +0000
4292+++ AppDB/mongodb/py_mongodb.py 2010-06-29 01:49:23 +0000
4293@@ -74,6 +74,8 @@
4294
4295 elist.append("0")
4296 self.__close_connection(db)
4297+ if len(elist) == 1:
4298+ elist[0] += "Not found"
4299 return elist
4300
4301 def create_table(self, table_name, columns):
4302@@ -166,6 +168,8 @@
4303 if column == schema[ii]:
4304 elist.append(data[ii])
4305 self.__close_connection(db)
4306+ if len(elist) == 1:
4307+ elist[0] += "Not found"
4308 return elist
4309
4310 def get_schema(self, table_name):
4311@@ -222,8 +226,8 @@
4312 usedRows.append(myRow)
4313
4314
4315- if(len(elist) == 1):
4316- elist[0] += "table not found"
4317+ #if len(elist) == 1:
4318+ # elist[0] += "table not found"
4319
4320 self.__close_connection(db)
4321 return elist
4322
4323=== modified file 'AppDB/mysql/drop_all_tables.py'
4324--- AppDB/mysql/drop_all_tables.py 2009-07-20 08:49:28 +0000
4325+++ AppDB/mysql/drop_all_tables.py 2010-06-29 01:49:23 +0000
4326@@ -21,7 +21,7 @@
4327 SLAVES_FILE = APPSCALE_HOME + "/.appscale/slaves"
4328 #DB_LOCATION = socket.gethostbyname(socket.gethostname())
4329 DB_LOCATION = "127.0.0.1"
4330-DATABASE = "test"
4331+DATABASE = "appscale"
4332 def set_db_location():
4333 global DB_LOCATION
4334 file = open(SLAVES_FILE, "r")
4335@@ -34,6 +34,9 @@
4336 cursor = client.cursor()
4337 cursor.execute("DROP DATABASE " + DATABASE)
4338 cursor.execute("CREATE DATABASE " + DATABASE)
4339+ cursor.close()
4340+ client.commit()
4341+ client.close()
4342 except MySQLdb.Error, e:
4343 print e.args[1]
4344 return 0
4345
4346=== modified file 'AppDB/mysql/prime_mysql.py'
4347--- AppDB/mysql/prime_mysql.py 2010-05-27 21:59:59 +0000
4348+++ AppDB/mysql/prime_mysql.py 2010-06-29 01:49:23 +0000
4349@@ -3,18 +3,17 @@
4350
4351 import sys, time
4352 import os
4353+import MySQLdb
4354+import _mysql
4355+import socket
4356+from dbconstants import *
4357+
4358
4359 APPSCALE_HOME = os.environ.get("APPSCALE_HOME")
4360 if APPSCALE_HOME:
4361 pass
4362 else:
4363 print "APPSCALE_HOME env var not set"
4364- APPSCALE_HOME = "/root/appscale/"
4365-
4366-import MySQLdb
4367-import _mysql
4368-import socket
4369-from dbconstants import *
4370
4371 ROW_KEY = "mysql__row_key__"
4372 SLAVES_FILE = APPSCALE_HOME + "/.appscale/slaves"
4373@@ -27,13 +26,23 @@
4374 DB_LOCATION = file.readline()
4375 file.close()
4376
4377+def drop_database():
4378+ client = MySQLdb.connect(host=DB_LOCATION, db="mysql")
4379+ cursor = client.cursor()
4380+ cursor.execute("DROP DATABASE IF EXISTS appscale;")
4381+ cursor.close()
4382+ client.commit()
4383+ client.close()
4384+
4385 def create_database():
4386 client = MySQLdb.connect(host=DB_LOCATION, db="mysql")
4387 cursor = client.cursor()
4388- cursor.execute("drop database if exists appscale;")
4389- cursor.execute("create database appscale;")
4390+ cursor.execute("CREATE DATABASE IF NOT EXISTS appscale;")
4391+ cursor.close()
4392+ client.commit()
4393 client.close()
4394
4395+
4396 def create_table(tablename, columns):
4397 try:
4398 client = MySQLdb.connect(host=DB_LOCATION, db="appscale")
4399@@ -43,13 +52,16 @@
4400 columnscopy += ["x" + columns[ii]]
4401 command = "CREATE TABLE IF NOT EXISTS " + tablename + "( " + ROW_KEY + " CHAR(80) primary key, " + ' MEDIUMBLOB, '.join(columnscopy) + " MEDIUMBLOB) ENGINE=NDBCLUSTER"
4402 print command
4403- cursor.execute(command)
4404+ print cursor.execute(command)
4405 except MySQLdb.Error, e:
4406 print e.args[1]
4407 return 0
4408+ cursor.close()
4409+ client.commit()
4410 client.close()
4411 return 1
4412
4413+drop_database()
4414 create_database()
4415
4416 def prime_mysql():
4417
4418=== modified file 'AppDB/mysql/py_mysql.py'
4419--- AppDB/mysql/py_mysql.py 2010-03-15 22:22:12 +0000
4420+++ AppDB/mysql/py_mysql.py 2010-06-29 01:49:23 +0000
4421@@ -1,16 +1,17 @@
4422 # Author: Navraj Chohan
4423-import sys
4424+#import sys
4425 import os
4426-import string
4427 import MySQLdb
4428-import _mysql
4429-import base64
4430-import sqlalchemy.pool as pool
4431+#import _mysql
4432+#import sqlalchemy.pool as pool
4433 from dbinterface import *
4434 import appscale_logger
4435-
4436-MySQLdb = pool.manage(MySQLdb)
4437-
4438+import threading
4439+import time
4440+#MySQLdb = pool.manage(MySQLdb)
4441+TIMEOUT = 30
4442+# Time till next gc of connections
4443+GC_TIME = 120
4444 ROW_KEY = "mysql__row_key__"
4445 ERROR_MY = "DB_ERROR:"
4446 #DB_LOCATION = "appscale-image"
4447@@ -18,22 +19,47 @@
4448 DB_LOCATION = "127.0.0.1"
4449 #DB_PORT = 3306
4450 DEBUG = False
4451-
4452+transDict = {}
4453+transDict_lock = threading.Lock()
4454+last_gc_time = 0
4455 class DatastoreProxy(AppDBInterface):
4456
4457 def __init__(self, log = appscale_logger.getLogger("datastore-mysql")):
4458 self.logger = log
4459 self.client = None
4460+ self.transactionsOn = False
4461+
4462+
4463+ def commit(self, txnid):
4464+ elist = [ERROR_MY]
4465+ try:
4466+ cursor, client = self.__get_connection(txnid)
4467+ cursor.close()
4468+ client.commit()
4469+ self.__close_connection(txnid)
4470+ except MySQLdb.Error, e:
4471+ if DEBUG: self.logger.info(str(e.args[0]) + "--" + e.args[1])
4472+ elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1]
4473+ return elist
4474+
4475+ def rollback(self, txnid):
4476+ elist = [ERROR_MY]
4477+ try:
4478+ cursor, client = self.__get_connection(txnid)
4479+ cursor.close()
4480+ client.rollback()
4481+ self.__close_connection(txnid)
4482+ except MySQLdb.Error, e:
4483+ elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1]
4484+ return elist
4485
4486 def get_schema(self, table_name):
4487 table_name = "x" + table_name
4488 elist = [ERROR_MY]
4489 client = None
4490 try:
4491- client = self.__get_connection()
4492- client.autocommit(1)
4493+ client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
4494 cursor = client.cursor()
4495-# cursor.execute(USE_DATABASE)
4496 command = "SHOW fields FROM " + table_name
4497 cursor.execute(command)
4498 while (1):
4499@@ -50,6 +76,7 @@
4500 if DEBUG: print str(e.args[0]) + "--" + e.args[1]
4501 elist[0] = ERROR_MY + "Unable to get schema"
4502 if client:
4503+ client.commit()
4504 client.close()
4505 return elist
4506
4507@@ -58,10 +85,8 @@
4508 elist = [ERROR_MY]
4509 client = None
4510 try:
4511- client = self.__get_connection()
4512- client.autocommit(1)
4513+ client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
4514 cursor = client.cursor()
4515-# cursor.execute(USE_DATABASE)
4516 command = "drop table " + table_name
4517 if DEBUG: self.logger.info(command)
4518 cursor.execute(command)
4519@@ -69,23 +94,29 @@
4520 elist[0] += str(e.args[0]) + "--" + e.args[1]
4521 if DEBUG: self.logger.info(elist)
4522 if client:
4523+ client.commit()
4524 client.close()
4525 return elist
4526
4527- def get_entity(self, table_name, row_key, column_names):
4528+ def get_entity(self, table_name, row_key, column_names, txnid = 0):
4529 table_name = "x" + table_name
4530 elist = [ERROR_MY]
4531 client = None
4532+ isTrans = False
4533+ if txnid != 0 and self.transactionsOn:
4534+ isTrans = True
4535 if not row_key:
4536 self.logger.info("Null row key")
4537 elist[0] += "Null row key"
4538 return elist
4539
4540 try:
4541- client = self.__get_connection()
4542- client.autocommit(1)
4543- cursor = client.cursor()
4544-# cursor.execute(USE_DATABASE)
4545+ if not isTrans:
4546+ client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
4547+ cursor = client.cursor()
4548+ else:
4549+ cursor, client = self.__get_connection(txnid)
4550+
4551 command = "select "
4552 # Hacking on a x to make sure all columns start with a letter
4553 columncopy = []
4554@@ -99,32 +130,35 @@
4555 cursor.execute(command)
4556 result = cursor.fetchone()
4557 if result == None:
4558- client.close()
4559+ if not isTrans:
4560+ client.close()
4561 if len(elist) == 1:
4562 elist[0] += "Not found"
4563 return elist
4564 for ii in range(0, len(result)):
4565- #elist += [result[ii]]
4566 if result[ii]:
4567- #elist += [base64.b64decode(result[ii])]
4568 elist.append(result[ii])
4569 else:
4570 elist.append('')
4571 except MySQLdb.Error, e:
4572 if e.args[1].find("exists") == -1:
4573- client.close()
4574+ if not isTrans:
4575+ client.close()
4576 if len(elist) == 1:
4577 elist[0] += "Not found"
4578 return elist
4579 elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1]
4580 if DEBUG: self.logger.info(elist)
4581- if client:
4582+
4583+ if client and not isTrans:
4584 client.close()
4585 if len(elist) == 1:
4586 elist[0] += "Not found"
4587 return elist
4588
4589- def put_entity(self, table_name, row_key, column_names, cell_values):
4590+ def put_entity(self, table_name, row_key, column_names, cell_values, txnid = 0):
4591+ # Hacking on a x to make sure all columns start with a letter
4592+ # Mysql limitation
4593 table_name = "x" + table_name
4594 if DEBUG: self.logger.info("PUT ENTITY")
4595 if DEBUG: self.logger.info(str(cell_values))
4596@@ -135,28 +169,43 @@
4597 self.logger.info("Null row key")
4598 elist[0] += "Null row key"
4599 return elist
4600- # Hacking on a x to make sure all columns start with a letter
4601 columncopy = []
4602 for ii in range(0, len(column_names)):
4603 columncopy.append("x" + column_names[ii])
4604- try:
4605- client = self.__get_connection()
4606- client.autocommit(1)
4607- cursor = client.cursor()
4608-# cursor.execute(USE_DATABASE)
4609+
4610+ # TODO This should be reactive, if a put fails due to no table, create it
4611+ try:
4612+ tempclient = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
4613+ query = "CREATE TABLE IF NOT EXISTS " + table_name + " ( " + ROW_KEY + " CHAR(255) primary key, " + ' BLOB, '.join(columncopy) + " BLOB) ENGINE=NDBCLUSTER"
4614+ tempcursor = tempclient.cursor()
4615+ if DEBUG: self.logger.info(query)
4616+ result = tempcursor.execute(query)
4617+ if DEBUG: self.logger.info("DONE CREATING TABLE...%s",str(result))
4618+ except MySQLdb.Error, e:
4619+ error = ERROR_MY + str(e.args[0]) + "--" + e.args[1]
4620+ print error
4621+ tempcursor.close()
4622+ tempclient.commit()
4623+ tempclient.close()
4624+
4625+ isTrans = False
4626+ if txnid != 0 and self.transactionsOn:
4627+ isTrans = True
4628+ try:
4629+ if not isTrans:
4630+ client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
4631+ cursor = client.cursor()
4632+ else:
4633+ cursor, client = self.__get_connection(txnid)
4634
4635 if len(column_names) != len(cell_values):
4636 elist[0] += "Error in put call |column_names| != |cell_values|"
4637- client.close()
4638+ if not isTrans:
4639+ client.close()
4640 return elist
4641- query = "CREATE TABLE IF NOT EXISTS " + table_name + " ( " + ROW_KEY + " CHAR(255) primary key, " + ' BLOB, '.join(columncopy) + " BLOB) ENGINE=NDBCLUSTER"
4642- if DEBUG: self.logger.info(query)
4643- result = cursor.execute(query)
4644- if DEBUG: self.logger.info("DONE CREATING TABLE...%s",str(result))
4645 values = []
4646 for ii in range(0, len(cell_values)):
4647 if cell_values[ii]:
4648- #value = base64.b64encode(cell_values[ii])
4649 value = cell_values[ii]
4650 values.append(MySQLdb.escape_string(value))
4651 else:
4652@@ -178,8 +227,6 @@
4653 command += "%s)"
4654 if DEBUG: self.logger.info(command)
4655 cursor.execute(command, tuple(cell_values + [MySQLdb.escape_string(row_key)]))
4656- cursor.close()
4657- cursor = ""
4658 else:
4659 # do an update
4660 for ii in range(0, len(cell_values)):
4661@@ -196,13 +243,15 @@
4662 elist.append("0")
4663 if DEBUG: self.logger.info(elist)
4664 if DEBUG: self.logger.info("DONE WITH PUT ENTITY")
4665- if client:
4666+ if client and not isTrans:
4667+ cursor.close()
4668+ client.commit()
4669 client.close()
4670 return elist
4671
4672 def __table_exist(self, table_name):
4673 table_name = "x" + table_name
4674- client = self.__get_connection()
4675+ client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
4676 cursor = client.cursor()
4677 # cursor.execute(USE_DATABASE)
4678 cursor.execute('show tables')
4679@@ -211,18 +260,30 @@
4680 if row == None:
4681 break;
4682 if table_name == row[0]:
4683+ client.close()
4684 return 1
4685+
4686+ cursor.close()
4687+ client.commit()
4688+ client.close()
4689 return 0
4690
4691- def delete_row(self, table_name, row_key):
4692+ def delete_row(self, table_name, row_key, txnid = 0):
4693 table_name = "x" + table_name
4694 if DEBUG: self.logger.info("DELETE ROW")
4695 client = None
4696+
4697+ isTrans = False
4698+ if txnid != 0 and self.transactionsOn:
4699+ isTrans = True
4700 elist = [ERROR_MY]
4701 try:
4702- client = self.__get_connection()
4703- client.autocommit(1)
4704- cursor = client.cursor()
4705+ if txnid == 0 or not self.transactionsOn:
4706+ client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
4707+ #client.autocommit(0)
4708+ cursor = client.cursor()
4709+ else:
4710+ cursor, client = self.__get_connection(txnid)
4711 # cursor.execute(USE_DATABASE)
4712 row_key = MySQLdb.escape_string(row_key)
4713 query = "delete from " + table_name + " WHERE " + ROW_KEY + "= '" + row_key + "'"
4714@@ -232,7 +293,8 @@
4715 elist[0] = ERROR_MY + str(e.args[0]) + "--" + e.args[1]
4716 if DEBUG: self.logger.info(elist)
4717 if DEBUG: self.logger.info("DELETING ROW")
4718- if client:
4719+ if client and not isTrans:
4720+ client.commit()
4721 client.close()
4722 return elist
4723
4724@@ -242,8 +304,8 @@
4725 client = None
4726 elist = [ERROR_MY]
4727 try:
4728- client = self.__get_connection()
4729- client.autocommit(1)
4730+ client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
4731+ #client.autocommit(0)
4732 cursor = client.cursor()
4733 # cursor.execute(USE_DATABASE)
4734 query = "SELECT COUNT(*) FROM " + table_name
4735@@ -255,18 +317,22 @@
4736 if DEBUG: self.logger.info(elist)
4737 if DEBUG: self.logger.info("DONE WITH ROW COUNT")
4738 if client:
4739+ cursor.close()
4740 client.close()
4741 return elist
4742
4743- def get_table(self, table_name, column_names):
4744+ def get_table(self, table_name, column_names, txnid = 0):
4745 table_name = "x" + table_name
4746 if DEBUG: self.logger.info("GET TABLE")
4747 client = None
4748 elist = [ERROR_MY]
4749 try:
4750- client = self.__get_connection()
4751- client.autocommit(1)
4752- cursor = client.cursor()
4753+ if txnid == 0 or not self.transactionsOn:
4754+ client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
4755+ #client.autocommit(0)
4756+ cursor = client.cursor()
4757+ else:
4758+ cursor, client = self.__get_connection(txnid)
4759 # cursor.execute(USE_DATABASE)
4760 # Hacking on a letter to make sure all columns start with a letter
4761 columncopy = []
4762@@ -294,15 +360,15 @@
4763 if DEBUG: self.logger.info(str(elist))
4764 if DEBUG: self.logger.info("DONE GETTING TABLE")
4765 if client:
4766- client.close()
4767+ if not self.transactionsOn or txnid == 0:
4768+ cursor.close()
4769+ client.close()
4770 return elist
4771
4772 def __query_table(self, table_name):
4773 table_name = "x" + table_name
4774 client = self.__get_connection()
4775- client.autocommit(1)
4776 cursor = client.cursor()
4777-# cursor.execute(USE_DATABASE)
4778 cursor.execute("select * from " + table_name )
4779 elist = []
4780 while (1):
4781@@ -310,9 +376,63 @@
4782 if row == None:
4783 break
4784 elist.append(row)
4785+ if cursor:
4786+ cursor.close()
4787 if client:
4788+ client.commit()
4789 client.close()
4790 return elist
4791
4792- def __get_connection(self):
4793- return MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
4794+ def __get_connection(self, txnid):
4795+ client = None
4796+ cursor = None
4797+ self.__gc()
4798+
4799+ transDict_lock.acquire()
4800+ if txnid in transDict:
4801+ cursor, client, start_time = transDict[txnid]
4802+ transDict_lock.release()
4803+ if not client:
4804+ raise MySQLdb.Error(1, "Connection timed out")
4805+ return cursor, client
4806+
4807+ # clean up expired connections
4808+ def __gc(self):
4809+ global last_gc_time
4810+ curtime = time.time()
4811+ if curtime < last_gc_time + GC_TIME:
4812+ return
4813+ transDict_lock.acquire()
4814+ del_list = []
4815+ for ii in transDict:
4816+ cu, cl, st = transDict[ii]
4817+ if st + TIMEOUT < curtime:
4818+ del_list.append(ii)
4819+ # safe deletes
4820+ del_list.reverse()
4821+ for ii in del_list:
4822+ del transDict[ii]
4823+ transDict_lock.release()
4824+ last_gc_time = time.time()
4825+
4826+ def setupTransaction(self, txnid):
4827+ self.transactionsOn = True
4828+ # New connection
4829+ client = MySQLdb.connect(host=DB_LOCATION, db=USE_DATABASE)
4830+ #client.autocommit(0)
4831+ cursor = client.cursor()
4832+ transDict_lock.acquire()
4833+ transDict[txnid] = cursor, client, time.time()
4834+ transDict_lock.release()
4835+
4836+ def __close_connection(self, txnid):
4837+ transDict_lock.acquire()
4838+ if txnid in transDict:
4839+ cursor, client, start_time = transDict[txnid]
4840+ cursor.close()
4841+ client.close()
4842+ del transDict[txnid]
4843+ transDict_lock.release()
4844+ return
4845+
4846+
4847
4848=== added file 'AppDB/mysql/test_mysql_trans.py'
4849--- AppDB/mysql/test_mysql_trans.py 1970-01-01 00:00:00 +0000
4850+++ AppDB/mysql/test_mysql_trans.py 2010-06-29 01:49:23 +0000
4851@@ -0,0 +1,115 @@
4852+import py_mysql
4853+import random
4854+#over write the import
4855+py_mysql = py_mysql.DatastoreProxy()
4856+columns = ["a","b","c"]
4857+data = ["1","2","3"]
4858+invalid_data = ['y','y','y']
4859+table_name = "hello"
4860+key = "1"
4861+print "key= " + key
4862+print "columns= " + str(columns)
4863+print "data= " + str(data)
4864+print "table= " + table_name
4865+txn = random.randint(0,100000000)
4866+py_mysql.setupTransaction(txn)
4867+print "Test: Transaction number:",txn
4868+py_mysql.put_entity(table_name, key, columns, invalid_data, txn)
4869+ret = py_mysql.get_entity(table_name, key, columns, txn)
4870+print "Test: Invalid:"
4871+print ret
4872+ret = py_mysql.get_entity(table_name, key, columns)
4873+print "Test: Outside transaction:"
4874+print ret
4875+print py_mysql.put_entity(table_name, key, columns, data, txn)
4876+print "Test: GET"
4877+ret = py_mysql.get_entity(table_name, key, columns, txn)
4878+print "Test: Valid:"
4879+print ret
4880+print "Test: Committing:"
4881+#print py_mysql.commit(txn)
4882+print ret
4883+if ret[1:] != data:
4884+ print "ERROR doing a put then get. Data does not match"
4885+ print "returned: " + str(ret)
4886+ print "expected: " + str(data)
4887+ exit(1)
4888+else:
4889+ print "Success"
4890+py_mysql.commit(txn)
4891+print "After committed transaction:"
4892+ret = py_mysql.get_entity(table_name, key, columns)
4893+print ret
4894+txn = random.randint(0,100000000)
4895+py_mysql.setupTransaction(txn)
4896+txn2 = random.randint(0,11000000000)
4897+py_mysql.setupTransaction(txn2)
4898+print "Transaction number:",txn
4899+print "PUT:"
4900+print py_mysql.put_entity(table_name, key, columns, invalid_data, txn)
4901+print "outside transaction:"
4902+ret = py_mysql.get_entity(table_name, key, columns,txn2)
4903+print ret
4904+print "inside transaction:"
4905+ret = py_mysql.get_entity(table_name, key, columns, txn)
4906+print ret
4907+print py_mysql.put_entity(table_name, key, columns, invalid_data, txn)
4908+print "Rollback:"
4909+print py_mysql.rollback(txn)
4910+print "doing a put, rollback, then get"
4911+print "GET"
4912+ret = py_mysql.get_entity(table_name, key, columns)
4913+print "doing a put then get"
4914+print ret
4915+if ret[1:] != data:
4916+ print "*" * 60
4917+ print "FAILURE doing a put then get. Data does not match"
4918+ print "returned: " + str(ret)
4919+ print "expected: " + str(data)
4920+ print "*" * 60
4921+ exit(1)
4922+else:
4923+ print "Success"
4924+
4925+ret = py_mysql.get_schema("hello")
4926+print ret
4927+print "checking schema:"
4928+print ret
4929+if ret[1:] != columns:
4930+ print "ERROR in recieved schema"
4931+ print "returned: " + str(ret)
4932+ print "expected: " + str(columns)
4933+
4934+#ret = py_mysql.__table_exist(table_name)
4935+#print "Does table we just created exist?"
4936+#print ret
4937+
4938+ret = py_mysql.delete_row(table_name, key)
4939+print "Deleting the key %s"%key
4940+print ret
4941+
4942+ret = py_mysql.get_entity(table_name, key, columns)
4943+print "Trying to get deleted key:"
4944+print ret
4945+print "doing a put with key %s"%key
4946+print py_mysql.put_entity("hello", "1", ["a","b","c"], ["1","2","3"])
4947+print "doing a get table"
4948+print py_mysql.get_table("hello", ["a","b","c"])
4949+py_mysql.put_entity("hello", "2", ["a","b","c"], ["4","5","6"])
4950+print "doing get table:"
4951+print py_mysql.get_table("hello", ["a","b","c"])
4952+py_mysql.put_entity("hello", "3", ["a","b","c"], ["1","2","3"])
4953+py_mysql.get_table("hello", ["a","b","c"])
4954+
4955+print "TRYING TO REPLACE KEY 3"
4956+py_mysql.put_entity("hello", "3", ["a","b","c"], ["1","2","3"])
4957+py_mysql.get_table("hello", ["a","b","c"])
4958+py_mysql.get_row_count("hello")
4959+ret = py_mysql.delete_row("hello", "1")
4960+ret = py_mysql.delete_row("hello", "2")
4961+ret = py_mysql.delete_row("hello", "3")
4962+py_mysql.get_table("hello", ["a","b","c"])
4963+print "Deleting table:"
4964+print py_mysql.delete_table("hello")
4965+print "deleting twice:"
4966+print py_mysql.delete_table("hello")
4967
4968=== modified file 'AppDB/soap_server.py'
4969--- AppDB/soap_server.py 2010-03-16 21:22:28 +0000
4970+++ AppDB/soap_server.py 2010-06-29 01:49:23 +0000
4971@@ -34,7 +34,7 @@
4972 DEFAULT_APP_LOCATION = ".flatfile_apps"
4973 DEFAULT_DATASTORE = "hbase"
4974 DEFAULT_SSL_PORT = 4343
4975-DEFAULT_PORT = 8080
4976+DEFAULT_PORT = 9899
4977 IP_TABLE = "IPS___"
4978 DEFAULT_ENCRYPTION = 1
4979 VALID_DATASTORES = []
4980
4981=== modified file 'AppDB/voldemort/py_voldemort.py'
4982--- AppDB/voldemort/py_voldemort.py 2010-05-03 21:02:23 +0000
4983+++ AppDB/voldemort/py_voldemort.py 2010-06-29 01:49:23 +0000
4984@@ -47,12 +47,14 @@
4985 def get(self, key):
4986 done = False
4987 timeout = RETRY_TIMEOUT
4988- while (done != True):
4989+ while (done != True and timeout > 0):
4990 try:
4991 value = self.real_get(key)
4992 done = True
4993 except:
4994 timeout -= 1
4995+ if timeout <= 0:
4996+ raise
4997 return value
4998
4999 def real_get(self, key):
5000@@ -88,6 +90,8 @@
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches