Merge lp:~jderose/dmedia/peer-sync into lp:dmedia

Proposed by Jason Gerard DeRose
Status: Merged
Approved by: James Raymond
Approved revision: 373
Merged at revision: 367
Proposed branch: lp:~jderose/dmedia/peer-sync
Merge into: lp:dmedia
Diff against target: 265 lines (+115/-38)
3 files modified
dmedia-service (+22/-1)
dmedia/service/replicator.py (+68/-37)
init-library (+25/-0)
To merge this branch: bzr merge lp:~jderose/dmedia/peer-sync
Reviewer Review Type Date Requested Status
James Raymond Approve
Review via email: mp+104007@code.launchpad.net

Description of the change

When a ~/.local/share/dmedia/library.json file is present, UserCouch is configured to listen on 0.0.0.0 and use OAuth, and we advertise this over Avahi.

As noted in the bug report, this feature provides NO PRIVACY WHATSOEVER.

To post a comment you must log in.
lp:~jderose/dmedia/peer-sync updated
373. By Jason Gerard DeRose

Removed a debugging print

Revision history for this message
James Raymond (jamesmr) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'dmedia-service'
2--- dmedia-service 2012-04-23 18:30:42 +0000
3+++ dmedia-service 2012-04-28 20:46:19 +0000
4@@ -31,6 +31,7 @@
5 import optparse
6 import logging
7 import json
8+from os import path
9
10 import dbus
11 import dbus.service
12@@ -43,6 +44,7 @@
13 from dmedia.core import Core, start_file_server
14 from dmedia.service.udisks import UDisks
15 from dmedia.service.avahi import Avahi
16+from dmedia.service.replicator import Replicator
17
18
19 BUS = dmedia.BUS
20@@ -56,12 +58,20 @@
21
22 def dumps(obj):
23 return json.dumps(obj, sort_keys=True, separators=(',', ': '), indent=4)
24+
25+
26+def get_config(f):
27+ try:
28+ return json.load(open(f, 'r'))
29+ except Exception:
30+ pass
31
32
33 class Service(dbus.service.Object):
34 usercouch = None
35 httpd = None
36 avahi = None
37+ replicator = None
38
39 def __init__(self, bus, env_s):
40 self.bus = bus
41@@ -74,8 +84,14 @@
42 def start_core(self):
43 if self.env_s is None:
44 self.usercouch = usercouch.UserCouch(dmedia.get_dmedia_dir())
45- env = self.usercouch.bootstrap()
46+ f = path.join(self.usercouch.basedir, 'library.json')
47+ self.config = get_config(f)
48+ if self.config is None:
49+ env = self.usercouch.bootstrap()
50+ else:
51+ env = self.usercouch.bootstrap2(self.config['tokens'])
52 else:
53+ self.config = None
54 env = json.loads(self.env_s)
55 self.udisks = UDisks()
56 self.core = Core(env, self.udisks.get_parentdir_info)
57@@ -91,6 +107,9 @@
58 (self.httpd, self.port) = start_file_server(self.core.env)
59 self.avahi = Avahi(self.core.env, self.port)
60 self.avahi.run()
61+ if self.config is not None:
62+ self.replicator = Replicator(self.core.env, self.config)
63+ self.replicator.run()
64
65 def run(self):
66 self.start_core()
67@@ -119,6 +138,8 @@
68 self.usercouch.kill()
69 if self.avahi is not None:
70 self.avahi.free()
71+ if self.replicator is not None:
72+ self.replicator.free()
73 if self.httpd is not None:
74 self.httpd.terminate()
75 self.httpd.join()
76
77=== modified file 'dmedia/service/replicator.py'
78--- dmedia/service/replicator.py 2012-04-24 11:22:28 +0000
79+++ dmedia/service/replicator.py 2012-04-28 20:46:19 +0000
80@@ -25,6 +25,7 @@
81
82 import logging
83 import json
84+from collections import namedtuple
85
86 from microfiber import Server, Database, PreconditionFailed
87 import dbus
88@@ -32,16 +33,19 @@
89
90 log = logging.getLogger()
91 system = dbus.SystemBus()
92-
93+Peer = namedtuple('Peer', 'url names')
94 SERVICE = '_usercouch._tcp'
95
96
97-def get_body(source, target):
98- return {
99+def get_body(source, target, cancel=False):
100+ body = {
101 'source': source,
102 'target': target,
103 'continuous': True,
104 }
105+ if cancel:
106+ body['cancel'] = True
107+ return body
108
109
110 def get_peer(url, dbname, tokens):
111@@ -56,11 +60,10 @@
112 class Replicator:
113 def __init__(self, env, config):
114 self.group = None
115+ self.env = env
116 self.server = Server(env)
117 self.library_id = config['library_id']
118 self.base_id = self.library_id + '-'
119- self.machine_id = env['machine_id']
120- self.id = self.base_id + self.machine_id
121 self.port = env['port']
122 self.tokens = config['tokens']
123 self.peers = {}
124@@ -69,6 +72,8 @@
125 self.free()
126
127 def run(self):
128+ self.machine_id = self.env['machine_id']
129+ self.id = self.base_id + self.machine_id
130 self.avahi = system.get_object('org.freedesktop.Avahi', '/')
131 self.group = system.get_object(
132 'org.freedesktop.Avahi',
133@@ -106,44 +111,70 @@
134 if self.group is not None:
135 self.group.Reset(dbus_interface='org.freedesktop.Avahi.EntryGroup')
136
137- def on_ItemNew(self, interface, protocol, name, _type, domain, flags):
138- if name == self.id: # Ignore what we publish ourselves
139+ def on_ItemNew(self, interface, protocol, key, _type, domain, flags):
140+ if key == self.id: # Ignore what we publish ourselves
141 return
142- if not name.startswith(self.base_id): # Ignore other libraries
143+ if not key.startswith(self.base_id): # Ignore other libraries
144 return
145 (ip, port) = self.avahi.ResolveService(
146- interface, protocol, name, _type, domain, -1, 0,
147+ interface, protocol, key, _type, domain, -1, 0,
148 dbus_interface='org.freedesktop.Avahi.Server'
149 )[7:9]
150 url = 'http://{}:{}/'.format(ip, port)
151- log.info('Replicator: new peer %s at %s', name, url)
152- self.peers[name] = url
153- self.replicate(url, 'foo')
154-
155- def on_ItemRemove(self, interface, protocol, name, _type, domain, flags):
156- log.info('Replicator: removing peer %s', name)
157- try:
158- del self.peers[name]
159- except KeyError:
160- pass
161-
162- def replicate(self, url, dbname):
163- # Create local DB if needed
164- try:
165- self.server.put(None, dbname)
166- except PreconditionFailed:
167- pass
168-
169- # Create remote DB if needed
170- env = {'url': url, 'oauth': self.tokens}
171- db = Database(dbname, env)
172- db.ensure()
173-
174- peer = get_peer(url, dbname, self.tokens)
175- local_to_remote = get_body(dbname, peer)
176- remote_to_local = get_body(peer, dbname)
177- for obj in (local_to_remote, remote_to_local):
178- self.server.post(obj, '_replicate')
179+ log.info('Replicator: new peer %s at %s', key, url)
180+ self.cancel_all(key)
181+ self.peers[key] = Peer(url, [])
182+ self.replicate_all(key)
183+
184+ def on_ItemRemove(self, interface, protocol, key, _type, domain, flags):
185+ log.info('Replicator: peer removed %s', key)
186+ self.cancel_all(str(key))
187+
188+ def cancel_all(self, key):
189+ p = self.peers.pop(key, None)
190+ if p is None:
191+ return
192+ log.info('Canceling replications for %r', key)
193+ for name in p.names:
194+ self.replicate(p.url, name, cancel=True)
195+
196+ def replicate_all(self, key):
197+ p = self.peers[key]
198+ env = {'url': p.url, 'oauth': self.tokens}
199+ remote = Server(env)
200+ for name in self.server.get('_all_dbs'):
201+ if name.startswith('_'):
202+ continue
203+ if not (name.startswith('dmedia-0') or name.startswith('novacut-0')):
204+ continue
205+ # Create remote DB if needed
206+ try:
207+ remote.put(None, name)
208+ except PreconditionFailed:
209+ pass
210+
211+ # Start replication
212+ p.names.append(name)
213+ self.replicate(p.url, name)
214+
215+ def replicate(self, url, name, cancel=False):
216+ """
217+ Start or cancel push replication of database *name* to peer at *url*.
218+
219+ Security note: we only do push replication because pull replication
220+ would allow unauthorized peers to write to our databases via their
221+ changes feed. For both push and pull, there is currently no privacy
222+ whatsoever... everything is in cleartext and uses oauth 1.0a. But push
223+ replication is the only way to at least prevent malicious data
224+ corruption.
225+ """
226+ if cancel:
227+ log.info('Canceling push of %r to %r', name, url)
228+ else:
229+ log.info('Starting push of %r to %r', name, url)
230+ peer = get_peer(url, name, self.tokens)
231+ push = get_body(name, peer, cancel)
232+ self.server.post(push, '_replicate')
233
234
235
236
237=== added file 'init-library'
238--- init-library 1970-01-01 00:00:00 +0000
239+++ init-library 2012-04-28 20:46:19 +0000
240@@ -0,0 +1,25 @@
241+#!/usr/bin/python3
242+
243+import json
244+from os import path
245+
246+from microfiber import random_id
247+from usercouch import random_oauth
248+
249+import dmedia
250+
251+
252+def dumps(obj):
253+ return json.dumps(obj, sort_keys=True, separators=(',', ': '), indent=4)
254+
255+
256+obj = {
257+ 'library_id': random_id(),
258+ 'tokens': random_oauth(),
259+}
260+obj_s = dumps(obj)
261+
262+f = path.join(dmedia.get_dmedia_dir(), 'library.json')
263+open(f, 'w').write(obj_s)
264+print(obj_s)
265+

Subscribers

People subscribed via source and target branches