Merge lp:~jderose/dmedia/peer-sync into lp:dmedia

Proposed by Jason Gerard DeRose
Status: Merged
Approved by: James Raymond
Approved revision: 373
Merged at revision: 367
Proposed branch: lp:~jderose/dmedia/peer-sync
Merge into: lp:dmedia
Diff against target: 265 lines (+115/-38)
3 files modified
dmedia-service (+22/-1)
dmedia/service/replicator.py (+68/-37)
init-library (+25/-0)
To merge this branch: bzr merge lp:~jderose/dmedia/peer-sync
Reviewer Review Type Date Requested Status
James Raymond Approve
Review via email: mp+104007@code.launchpad.net

Description of the change

When a ~/.local/share/dmedia/library.json file is present, UserCouch is configured to listen on 0.0.0.0 and use OAuth, and we advertise this over Avahi.

As noted in the bug report, this feature provides NO PRIVACY WHATSOEVER.

To post a comment you must log in.
lp:~jderose/dmedia/peer-sync updated
373. By Jason Gerard DeRose

Removed a debugging print

Revision history for this message
James Raymond (jamesmr) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'dmedia-service'
--- dmedia-service 2012-04-23 18:30:42 +0000
+++ dmedia-service 2012-04-28 20:46:19 +0000
@@ -31,6 +31,7 @@
31import optparse31import optparse
32import logging32import logging
33import json33import json
34from os import path
3435
35import dbus36import dbus
36import dbus.service37import dbus.service
@@ -43,6 +44,7 @@
43from dmedia.core import Core, start_file_server44from dmedia.core import Core, start_file_server
44from dmedia.service.udisks import UDisks45from dmedia.service.udisks import UDisks
45from dmedia.service.avahi import Avahi46from dmedia.service.avahi import Avahi
47from dmedia.service.replicator import Replicator
4648
4749
48BUS = dmedia.BUS50BUS = dmedia.BUS
@@ -56,12 +58,20 @@
5658
57def dumps(obj):59def dumps(obj):
58 return json.dumps(obj, sort_keys=True, separators=(',', ': '), indent=4)60 return json.dumps(obj, sort_keys=True, separators=(',', ': '), indent=4)
61
62
63def get_config(f):
64 try:
65 return json.load(open(f, 'r'))
66 except Exception:
67 pass
5968
6069
61class Service(dbus.service.Object):70class Service(dbus.service.Object):
62 usercouch = None71 usercouch = None
63 httpd = None72 httpd = None
64 avahi = None73 avahi = None
74 replicator = None
6575
66 def __init__(self, bus, env_s):76 def __init__(self, bus, env_s):
67 self.bus = bus77 self.bus = bus
@@ -74,8 +84,14 @@
74 def start_core(self):84 def start_core(self):
75 if self.env_s is None:85 if self.env_s is None:
76 self.usercouch = usercouch.UserCouch(dmedia.get_dmedia_dir())86 self.usercouch = usercouch.UserCouch(dmedia.get_dmedia_dir())
77 env = self.usercouch.bootstrap()87 f = path.join(self.usercouch.basedir, 'library.json')
88 self.config = get_config(f)
89 if self.config is None:
90 env = self.usercouch.bootstrap()
91 else:
92 env = self.usercouch.bootstrap2(self.config['tokens'])
78 else:93 else:
94 self.config = None
79 env = json.loads(self.env_s)95 env = json.loads(self.env_s)
80 self.udisks = UDisks()96 self.udisks = UDisks()
81 self.core = Core(env, self.udisks.get_parentdir_info)97 self.core = Core(env, self.udisks.get_parentdir_info)
@@ -91,6 +107,9 @@
91 (self.httpd, self.port) = start_file_server(self.core.env)107 (self.httpd, self.port) = start_file_server(self.core.env)
92 self.avahi = Avahi(self.core.env, self.port)108 self.avahi = Avahi(self.core.env, self.port)
93 self.avahi.run()109 self.avahi.run()
110 if self.config is not None:
111 self.replicator = Replicator(self.core.env, self.config)
112 self.replicator.run()
94113
95 def run(self):114 def run(self):
96 self.start_core()115 self.start_core()
@@ -119,6 +138,8 @@
119 self.usercouch.kill()138 self.usercouch.kill()
120 if self.avahi is not None:139 if self.avahi is not None:
121 self.avahi.free()140 self.avahi.free()
141 if self.replicator is not None:
142 self.replicator.free()
122 if self.httpd is not None:143 if self.httpd is not None:
123 self.httpd.terminate()144 self.httpd.terminate()
124 self.httpd.join()145 self.httpd.join()
125146
=== modified file 'dmedia/service/replicator.py'
--- dmedia/service/replicator.py 2012-04-24 11:22:28 +0000
+++ dmedia/service/replicator.py 2012-04-28 20:46:19 +0000
@@ -25,6 +25,7 @@
2525
26import logging26import logging
27import json27import json
28from collections import namedtuple
2829
29from microfiber import Server, Database, PreconditionFailed30from microfiber import Server, Database, PreconditionFailed
30import dbus31import dbus
@@ -32,16 +33,19 @@
3233
33log = logging.getLogger()34log = logging.getLogger()
34system = dbus.SystemBus()35system = dbus.SystemBus()
3536Peer = namedtuple('Peer', 'url names')
36SERVICE = '_usercouch._tcp'37SERVICE = '_usercouch._tcp'
3738
3839
39def get_body(source, target):40def get_body(source, target, cancel=False):
40 return {41 body = {
41 'source': source,42 'source': source,
42 'target': target,43 'target': target,
43 'continuous': True,44 'continuous': True,
44 }45 }
46 if cancel:
47 body['cancel'] = True
48 return body
4549
4650
47def get_peer(url, dbname, tokens):51def get_peer(url, dbname, tokens):
@@ -56,11 +60,10 @@
56class Replicator:60class Replicator:
57 def __init__(self, env, config):61 def __init__(self, env, config):
58 self.group = None62 self.group = None
63 self.env = env
59 self.server = Server(env)64 self.server = Server(env)
60 self.library_id = config['library_id']65 self.library_id = config['library_id']
61 self.base_id = self.library_id + '-'66 self.base_id = self.library_id + '-'
62 self.machine_id = env['machine_id']
63 self.id = self.base_id + self.machine_id
64 self.port = env['port']67 self.port = env['port']
65 self.tokens = config['tokens']68 self.tokens = config['tokens']
66 self.peers = {}69 self.peers = {}
@@ -69,6 +72,8 @@
69 self.free()72 self.free()
7073
71 def run(self):74 def run(self):
75 self.machine_id = self.env['machine_id']
76 self.id = self.base_id + self.machine_id
72 self.avahi = system.get_object('org.freedesktop.Avahi', '/')77 self.avahi = system.get_object('org.freedesktop.Avahi', '/')
73 self.group = system.get_object(78 self.group = system.get_object(
74 'org.freedesktop.Avahi',79 'org.freedesktop.Avahi',
@@ -106,44 +111,70 @@
106 if self.group is not None:111 if self.group is not None:
107 self.group.Reset(dbus_interface='org.freedesktop.Avahi.EntryGroup')112 self.group.Reset(dbus_interface='org.freedesktop.Avahi.EntryGroup')
108113
109 def on_ItemNew(self, interface, protocol, name, _type, domain, flags):114 def on_ItemNew(self, interface, protocol, key, _type, domain, flags):
110 if name == self.id: # Ignore what we publish ourselves115 if key == self.id: # Ignore what we publish ourselves
111 return116 return
112 if not name.startswith(self.base_id): # Ignore other libraries117 if not key.startswith(self.base_id): # Ignore other libraries
113 return118 return
114 (ip, port) = self.avahi.ResolveService(119 (ip, port) = self.avahi.ResolveService(
115 interface, protocol, name, _type, domain, -1, 0,120 interface, protocol, key, _type, domain, -1, 0,
116 dbus_interface='org.freedesktop.Avahi.Server'121 dbus_interface='org.freedesktop.Avahi.Server'
117 )[7:9]122 )[7:9]
118 url = 'http://{}:{}/'.format(ip, port)123 url = 'http://{}:{}/'.format(ip, port)
119 log.info('Replicator: new peer %s at %s', name, url)124 log.info('Replicator: new peer %s at %s', key, url)
120 self.peers[name] = url125 self.cancel_all(key)
121 self.replicate(url, 'foo') 126 self.peers[key] = Peer(url, [])
122127 self.replicate_all(key)
123 def on_ItemRemove(self, interface, protocol, name, _type, domain, flags):128
124 log.info('Replicator: removing peer %s', name)129 def on_ItemRemove(self, interface, protocol, key, _type, domain, flags):
125 try:130 log.info('Replicator: peer removed %s', key)
126 del self.peers[name]131 self.cancel_all(str(key))
127 except KeyError:132
128 pass133 def cancel_all(self, key):
129134 p = self.peers.pop(key, None)
130 def replicate(self, url, dbname):135 if p is None:
131 # Create local DB if needed136 return
132 try:137 log.info('Canceling replications for %r', key)
133 self.server.put(None, dbname)138 for name in p.names:
134 except PreconditionFailed:139 self.replicate(p.url, name, cancel=True)
135 pass140
136141 def replicate_all(self, key):
137 # Create remote DB if needed142 p = self.peers[key]
138 env = {'url': url, 'oauth': self.tokens}143 env = {'url': p.url, 'oauth': self.tokens}
139 db = Database(dbname, env)144 remote = Server(env)
140 db.ensure() 145 for name in self.server.get('_all_dbs'):
141146 if name.startswith('_'):
142 peer = get_peer(url, dbname, self.tokens)147 continue
143 local_to_remote = get_body(dbname, peer)148 if not (name.startswith('dmedia-0') or name.startswith('novacut-0')):
144 remote_to_local = get_body(peer, dbname)149 continue
145 for obj in (local_to_remote, remote_to_local):150 # Create remote DB if needed
146 self.server.post(obj, '_replicate')151 try:
152 remote.put(None, name)
153 except PreconditionFailed:
154 pass
155
156 # Start replication
157 p.names.append(name)
158 self.replicate(p.url, name)
159
160 def replicate(self, url, name, cancel=False):
161 """
162 Start or cancel push replication of database *name* to peer at *url*.
163
164 Security note: we only do push replication because pull replication
165 would allow unauthorized peers to write to our databases via their
166 changes feed. For both push and pull, there is currently no privacy
167 whatsoever... everything is in cleartext and uses oauth 1.0a. But push
168 replication is the only way to at least prevent malicious data
169 corruption.
170 """
171 if cancel:
172 log.info('Canceling push of %r to %r', name, url)
173 else:
174 log.info('Starting push of %r to %r', name, url)
175 peer = get_peer(url, name, self.tokens)
176 push = get_body(name, peer, cancel)
177 self.server.post(push, '_replicate')
147 178
148 179
149 180
150181
=== added file 'init-library'
--- init-library 1970-01-01 00:00:00 +0000
+++ init-library 2012-04-28 20:46:19 +0000
@@ -0,0 +1,25 @@
1#!/usr/bin/python3
2
3import json
4from os import path
5
6from microfiber import random_id
7from usercouch import random_oauth
8
9import dmedia
10
11
12def dumps(obj):
13 return json.dumps(obj, sort_keys=True, separators=(',', ': '), indent=4)
14
15
16obj = {
17 'library_id': random_id(),
18 'tokens': random_oauth(),
19}
20obj_s = dumps(obj)
21
22f = path.join(dmedia.get_dmedia_dir(), 'library.json')
23open(f, 'w').write(obj_s)
24print(obj_s)
25

Subscribers

People subscribed via source and target branches