Merge lp:~cmiller/desktopcouch/replicate-to-u1 into lp:desktopcouch

Proposed by Chad Miller
Status: Merged
Approved by: Chad Miller
Approved revision: 60
Merged at revision: not available
Proposed branch: lp:~cmiller/desktopcouch/replicate-to-u1
Merge into: lp:desktopcouch
Diff against target: None lines
To merge this branch: bzr merge lp:~cmiller/desktopcouch/replicate-to-u1
Reviewer Review Type Date Requested Status
Stuart Langridge (community) Approve
Chad Miller (community) Abstain
dobey (community) Needs Resubmitting
Joshua Blount (community) Approve
Review via email: mp+11209@code.launchpad.net

Commit message

Include ability to un-pair machines; for non-cloud machines, this adds a new attribute to the record, which the replicator reconciles with the other end when it sees it next.

Make the replication manager part of the desktopcouch-service program.

Make services a module with a standard interface, so anyone can add more later; this may need more work; a single instance does not make a good API.

To post a comment you must log in.
Revision history for this message
Joshua Blount (jblount) wrote :

Looks handsome

review: Approve
Revision history for this message
dobey (dobey) wrote :

Josh says it's handsome, so it must be OK.

review: Approve
Revision history for this message
Chad Miller (cmiller) wrote :

I'm restructuring at Rodney's request.

review: Needs Fixing
Revision history for this message
dobey (dobey) :
review: Needs Resubmitting
Revision history for this message
Chad Miller (cmiller) wrote :

Now incorporates aquarius' approved branch.

Revision history for this message
Chad Miller (cmiller) :
review: Abstain
Revision history for this message
Stuart Langridge (sil) wrote :

Looks good, I think.

review: Approve
Revision history for this message
Chad Miller (cmiller) wrote :
Download full text (5.7 KiB)

desktopcouch.contacts.tests.test_contactspicker
  TestContactsPicker
    test_can_contruct_contactspicker ... Apache CouchDB has started, time to relax.
Desktop CouchDB is not running; starting it. ...waiting for couchdb to start...
...waiting for couchdb to start...
Browse your desktop CouchDB at file:///tmp/tmpVMvmKA/xdg_data/desktop-couch/couchdb.html
                                  [OK]
desktopcouch.contacts.tests.test_create
  TestCreate
    test_create_many_contacts ... [OK]
    test_head_or_tails ... [OK]
    test_random_bools ... [OK]
desktopcouch.contacts.tests.test_record
  TestContactRecord
    test_contact_record ... [OK]
                                            [ERROR]
desktopcouch.pair.tests.test_network_io
  TestNetworkIO
    test_successful_lifespan ... [OK]
desktopcouch.records.tests.test_couchgrid
  TestCouchGrid
    test_all_from_database ... [OK]
    test_constructor_guarded ... [OK]
    test_new_rows_with_headings ... [OK]
    test_no_headings_or_stored_records ... [OK]
    test_optional_args_no_stored_records ... [OK]
    test_optional_record_type_arg ... [OK]
    test_programatically_add_row ... [OK]
    test_single_col_from_database ... [OK]
desktopcouch.records.tests.test_field_registry
  TestFieldMapping
    test_mergeable_list_field_mapping ... [OK]
    test_mergeable_list_field_mapping_empty_field ... [OK]
    test_simple_field_mapping ... [OK]
  TestTransformer
    test_from_app ... [OK]
    test_to_app ... [OK]
desktopcouch.records.tests.test_record
  TestRecordFactory
    test_build ... [OK]
  TestRecords
    test_application_annotations ... [OK]
    test_dictionary_access_to_mergeable_list ... [OK]
    test_get ... [OK]
    test_get_item ... [OK]
    test_keys ... [OK]
    test_list ... [OK]
    test_loads_dict_multi_subdict ... [OK]
    test_loads_dict_subdict ... [OK]
    test_mergeable_list_append ... [OK]
    test_mergeable_list_append_record_dict ... [OK]
    test_mergeable_list_del ... ...

Read more...

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bin/desktopcouch-pair'
2--- bin/desktopcouch-pair 2009-08-26 18:01:29 +0000
3+++ bin/desktopcouch-pair 2009-09-03 21:26:51 +0000
4@@ -467,7 +467,7 @@
5 "consumer_secret": "",
6 })
7 listening_hosts.append(None, ["Ubuntu One", "The Ubuntu One cloud service",
8- "ubuntuone.com", 5984, True])
9+ "couchdb.one.ubuntu.com", 5984, True])
10 CLOUD_SERVICES["Ubuntu One"] = oauth_data
11
12 self.inviting = None # pylint: disable-msg=W0201
13
14=== modified file 'bin/desktopcouch-paired-replication-manager'
15--- bin/desktopcouch-paired-replication-manager 2009-08-26 19:04:39 +0000
16+++ bin/desktopcouch-paired-replication-manager 2009-09-03 21:26:51 +0000
17@@ -37,6 +37,21 @@
18
19 already_replicating = False
20
21+
22+def db_prefix_for_statically_addressed_replicators(addr, port):
23+ addrport = addr + "_" + str(port)
24+ module_name = addrport.replace(".", "_")
25+ try:
26+ logging.debug("Looking up prefix %r in mod %s", addrport, module_name)
27+ mod = __import__("desktopcouch.replication_hosts", fromlist=[module_name])
28+ return getattr(mod, module_name).db_name_prefix
29+ except ImportError, e:
30+ logging.info("Not changing remote db name. %s", e)
31+ return ""
32+ except Exception, e:
33+ logging.exception("Not changing remote db name.")
34+ return ""
35+
36 class ReplicatorThread(threading.Thread):
37 def __init__(self, local_port):
38 log.debug("starting up replication thread")
39@@ -48,15 +63,34 @@
40 already_replicating = True # just trying to be polite.
41 try:
42 for uuid, addr, port in dbus_io.get_seen_paired_hosts():
43- log.debug("host %s is seen; want to replicate to it", uuid)
44+ log.debug("want to replipush to discovered host %r @ %s",
45+ uuid, addr)
46 for db_name in couchdb_io.get_database_names_replicatable():
47 couchdb_io.replicate(db_name, db_name,
48 target_host=addr, target_port=port,
49 source_port=self.local_port)
50-
51-
52- # TODO: get static addressed paired hosts and replicate to
53- # those too.
54+
55+ for uuid, addr, port, to_pull, to_push in \
56+ couchdb_io.get_static_paired_hosts():
57+
58+ if to_pull:
59+ remote_db_prefix = db_prefix_for_statically_addressed_replicators(addr, port)
60+ for db_name in couchdb_io.get_database_names_replicatable():
61+ remote_db_name = str(remote_db_prefix)+db_name
62+ log.debug("want to replipush %r to static host %r @ %s",
63+ remote_db_name, uuid, addr)
64+ couchdb_io.replicate(db_name, remote_db_name,
65+ target_host=addr, target_port=port,
66+ source_port=self.local_port, target_ssl=True)
67+ if to_push:
68+ for db_name in couchdb_io.get_database_names_replicatable(addr,
69+ port):
70+ remote_db_name = str(remote_db_prefix)+db_name
71+ log.debug("want to replipull %r from static host %r @ %s",
72+ remote_db_name, uuid, addr)
73+ couchdb_io.replicate(remote_db_name, db_name,
74+ source_host=addr, source_port=port,
75+ target_port=self.local_port, source_ssl=True)
76
77 finally:
78 already_replicating = False
79@@ -70,6 +104,7 @@
80
81 r = ReplicatorThread(local_port)
82 r.start()
83+ return r
84
85 def main(args):
86 log_directory = os.path.join(xdg.BaseDirectory.xdg_cache_home,
87@@ -87,6 +122,8 @@
88 logging.getLogger('').addHandler(rotating_log)
89 logging.getLogger('').setLevel(logging.DEBUG)
90
91+ print "Logging to", os.path.join(log_directory, "desktop-couch-replication.log")
92+
93 try:
94 log.info("Starting.")
95
96@@ -117,6 +154,10 @@
97 b.unpublish()
98
99 finally:
100+ try:
101+ t.stop()
102+ except:
103+ pass
104 log.info("Quitting.")
105
106 if __name__ == "__main__":
107
108=== modified file 'desktopcouch/pair/couchdb_pairing/couchdb_io.py'
109--- desktopcouch/pair/couchdb_pairing/couchdb_io.py 2009-08-26 19:04:39 +0000
110+++ desktopcouch/pair/couchdb_pairing/couchdb_io.py 2009-09-03 21:26:51 +0000
111@@ -21,6 +21,7 @@
112
113 from desktopcouch import find_port as desktopcouch_find_port
114 from desktopcouch.records import server
115+import socket
116
117 RECTYPE_BASE = "http://www.freedesktop.org/wiki/Specifications/desktopcouch/"
118 PAIRED_SERVER_RECORD_TYPE = RECTYPE_BASE + "paired_server"
119@@ -30,13 +31,40 @@
120 port = desktopcouch_find_port() # make sure d-c is running.
121 return server.CouchDatabase(name, create=create)
122
123-def get_database_names_replicatable():
124+def get_static_paired_hosts():
125+ db = _get_db("management")
126+ results = db.get_records(create_view=True)
127+ found = dict()
128+ for row in results[PAIRED_SERVER_RECORD_TYPE]:
129+ try:
130+ if row.value["server"] != "":
131+ uuid = row.value["pairing_identifier"]
132+ to_push = row.value.get("push_to_server", True)
133+ to_pull = row.value.get("pull_from_server", False)
134+ addr, port = row.value["server"].split(":") # What about IPv6?
135+ found[(addr, int(port))] = uuid, to_pull, to_push
136+ except KeyError:
137+ pass
138+ unique_hosts = [(v1, h1, h2, v2, v3) for
139+ (h1, h2), (v1, v2, v3) in found.items()]
140+ logging.debug("static pairings are %s", unique_hosts)
141+ return unique_hosts
142+
143+def get_database_names_replicatable(host=None, port=None):
144 """Find a list of local databases, minus dbs that we do not want to
145 replicate (explicitly or implicitly)."""
146
147- port = int(desktopcouch_find_port())
148- couchdb_server = server.Server("http://localhost:%(port)d/" % locals())
149- all = set([db_name for db_name in couchdb_server])
150+ if host is None:
151+ host = "localhost"
152+ if port is None:
153+ port = int(desktopcouch_find_port())
154+
155+ try:
156+ couchdb_server = server.Server("http://%(host)s:%(port)d/" % locals())
157+ all = set([db_name for db_name in couchdb_server])
158+ except socket.error, e:
159+ logging.error("Can't get list of databases from %s", couchdb_server)
160+ return set()
161
162 excluded = set()
163 excluded.add("management")
164@@ -82,51 +110,52 @@
165 logging.debug("found %d %s records", len(values), key)
166 return values
167
168-def create_remote_database(dst_host, dst_port, dst_name):
169+def create_database(dst_host, dst_port, dst_name):
170 dst_url = u"http://%(dst_host)s:%(dst_port)d/" % locals()
171 return server.CouchDatabase(dst_name, dst_url, create=True)
172
173 def replicate(source_database, target_database, target_host=None,
174- target_port=None, source_host=None, source_port=None):
175+ target_port=None, source_host=None, source_port=None,
176+ source_ssl=False, target_ssl=False):
177 """This replication is instant and blocking, and does not persist. """
178
179- data = {}
180-
181+ source_protocol = "https" if source_ssl else "http"
182+ target_protocol = "https" if target_ssl else "http"
183 if source_host:
184 if source_port is None:
185- source = "http://%(source_host)s/%(source_database)s" % locals()
186+ source = "%(source_protocol)s://%(source_host)s/%(source_database)s" % locals()
187 else:
188- source = "http://%(source_host)s:%(source_port)d/%(source_database)s" % locals()
189+ source = "%(source_protocol)s://%(source_host)s:%(source_port)d/%(source_database)s" % locals()
190 else:
191 source = source_database
192
193 if target_host:
194 if target_port is None:
195- target = "http://%(target_host)s/%(target_database)s" % locals()
196+ target = "%(target_protocol)s://%(target_host)s/%(target_database)s" % locals()
197 else:
198- target = "http://%(target_host)s:%(target_port)d/%(target_database)s" % locals()
199+ target = "%(target_protocol)s://%(target_host)s:%(target_port)d/%(target_database)s" % locals()
200 else:
201 target = target_database
202
203 record = dict(source=source, target=target)
204 try:
205- if target_host:
206- # Remote databases must exist before replicating to them.
207- create_remote_database(target_host, target_port, target_database)
208-
209+ url = None # so logging works in exception handler
210+ port = int(desktopcouch_find_port())
211 # TODO: Get admin username and password from keyring. Populate URL.
212-
213- url = None # so logging works in exception handler
214- port = int(desktopcouch_find_port())
215 url = "http://localhost:%d/" % (port,)
216
217+ if target_host:
218+ # Target databases must exist before replicating to them.
219+ logging.debug("creating %r %s:%d", target_database, target_host, target_port)
220+ create_database(target_host, target_port, target_database)
221+
222+
223 ### All until python-couchdb gets a Server.replicate() function
224 import couchdb
225+ logging.debug("asking %r to send %s to %s", url, source, target)
226 server = couchdb.client.Server(url)
227 resp, data = server.resource.post(path='/_replicate', content=record)
228 logging.debug("replicate result: %r %r", resp, data)
229 ###
230- logging.debug("#############################")
231 except:
232 logging.exception("can't talk to couchdb. %r <== %r", url, record)
233- raise
234
235=== added directory 'desktopcouch/replication_hosts'
236=== added file 'desktopcouch/replication_hosts/__init__.py'
237=== added file 'desktopcouch/replication_hosts/couchdb_one_ubuntu_com.py'
238--- desktopcouch/replication_hosts/couchdb_one_ubuntu_com.py 1970-01-01 00:00:00 +0000
239+++ desktopcouch/replication_hosts/couchdb_one_ubuntu_com.py 2009-09-03 21:26:51 +0000
240@@ -0,0 +1,86 @@
241+import hashlib
242+from oauth import oauth
243+import logging
244+import httplib2
245+import simplejson
246+import gnomekeyring
247+
248+
249+def whoami_user_id():
250+ def get_oauth_token(consumer):
251+ """Get the token from the keyring"""
252+ import gobject
253+ gobject.set_application_name("desktopcouch replication to Ubuntu One")
254+ items = gnomekeyring.find_items_sync(
255+ gnomekeyring.ITEM_GENERIC_SECRET,
256+ {'ubuntuone-realm': "https://one.ubuntu.com",
257+ 'oauth-consumer-key': consumer.key})
258+ if len(items):
259+ return oauth.OAuthToken.from_string(items[0].secret)
260+
261+ def get_oauth_request_header(consumer, access_token, http_url):
262+ """Get an oauth request header given the token and the url"""
263+ signature_method = oauth.OAuthSignatureMethod_HMAC_SHA1()
264+ oauth_request = oauth.OAuthRequest.from_consumer_and_token(
265+ http_url=http_url,
266+ http_method="GET",
267+ oauth_consumer=consumer,
268+ token=access_token)
269+ oauth_request.sign_request(signature_method, consumer, access_token)
270+ return oauth_request.to_header()
271+
272+ url = "https://one.ubuntu.com/api/account/"
273+ consumer = oauth.OAuthConsumer("ubuntuone", "hammertime")
274+ try:
275+ access_token = get_oauth_token(consumer)
276+ except gnomekeyring.NoKeyringDaemonError:
277+ logging.info("No keyring daemon is running for this session.")
278+ return None
279+ if not access_token:
280+ logging.info("Could not get access token from keyring")
281+ return None
282+ oauth_header = get_oauth_request_header(consumer, access_token, url)
283+ client = httplib2.Http()
284+ resp, content = client.request(url, "GET", headers=oauth_header)
285+ if resp['status'] == "200":
286+ try:
287+ userinfo = simplejson.loads(content)
288+ return userinfo["id"]
289+ except:
290+ logging.error("Couldn't find id in response %r", content)
291+ return None
292+ else:
293+ logging.error("Couldn't talk to %r. Got HTTP %s", url, resp['status'])
294+ return None
295+
296+
297+class PrefixGetter():
298+ def __init__(self):
299+ self.str = None
300+
301+ def __str__(self):
302+ if self.str is not None:
303+ return self.str
304+
305+ user_id = whoami_user_id()
306+ if user_id is None:
307+ raise ValueError
308+
309+ hasher = hashlib.md5()
310+ hasher.update(str(user_id))
311+ hashed_id_as_hex = hasher.hexdigest()
312+
313+ prefix = "u/%s/%s/%d/" % (
314+ hashed_id_as_hex[0:3],
315+ hashed_id_as_hex[3:6],
316+ user_id)
317+ self.str = prefix
318+ return prefix
319+
320+
321+# Access to this as a string fires off functions.
322+db_name_prefix = PrefixGetter()
323+
324+if __name__ == "__main__":
325+ logging.basicConfig(level=logging.DEBUG, format="%(message)s")
326+ print str(db_name_prefix)

Subscribers

People subscribed via source and target branches