4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
8 ICheckable, ICheckResults, NotEnoughSharesError
9 from allmydata.util import hashutil, log
10 from allmydata.util.assertutil import precondition
11 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
12 from allmydata.monitor import Monitor
13 from pycryptopp.cipher.aes import AES
15 from publish import Publish
16 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
17 ResponseCache, UncoordinatedWriteError
18 from servermap import ServerMap, ServermapUpdater
19 from retrieve import Retrieve
20 from checker import MutableChecker, MutableCheckAndRepairer
21 from repairer import Repairer
25 # these parameters are copied from foolscap.reconnector, which gets them
26 # from twisted.internet.protocol.ReconnectingClientFactory
28 factor = 2.7182818284590451 # (math.e)
29 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
33 self._delay = self.initialDelay
35 def delay(self, node, f):
39 self._delay = self._delay * self.factor
40 self._delay = random.normalvariate(self._delay,
41 self._delay * self.jitter)
43 reactor.callLater(self._delay, d.callback, None)
46 # use nodemaker.create_mutable_file() to make one of these
48 class MutableFileNode:
49 implements(IMutableFileNode, ICheckable)
51 def __init__(self, storage_broker, secret_holder,
52 default_encoding_parameters, history):
53 self._storage_broker = storage_broker
54 self._secret_holder = secret_holder
55 self._default_encoding_parameters = default_encoding_parameters
56 self._history = history
57 self._pubkey = None # filled in upon first read
58 self._privkey = None # filled in if we're mutable
59 # we keep track of the last encoding parameters that we use. These
60 # are updated upon retrieve, and used by publish. If we publish
61 # without ever reading (i.e. overwrite()), then we use these values.
62 self._required_shares = default_encoding_parameters["k"]
63 self._total_shares = default_encoding_parameters["n"]
64 self._sharemap = {} # known shares, shnum-to-[nodeids]
65 self._cache = ResponseCache()
67 # all users of this MutableFileNode go through the serializer. This
68 # takes advantage of the fact that Deferreds discard the callbacks
69 # that they're done with, so we can keep using the same Deferred
70 # forever without consuming more and more memory.
71 self._serializer = defer.succeed(None)
74 if hasattr(self, '_uri'):
75 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
77 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
79 def init_from_uri(self, filecap):
80 # we have the URI, but we have not yet retrieved the public
81 # verification key, nor things like 'k' or 'N'. If and when someone
82 # wants to get our contents, we'll pull from shares and fill those
84 assert isinstance(filecap, str)
85 if filecap.startswith("URI:SSK:"):
86 self._uri = WriteableSSKFileURI.init_from_string(filecap)
87 self._writekey = self._uri.writekey
89 assert filecap.startswith("URI:SSK-RO:")
90 self._uri = ReadonlySSKFileURI.init_from_string(filecap)
92 self._readkey = self._uri.readkey
93 self._storage_index = self._uri.storage_index
94 self._fingerprint = self._uri.fingerprint
95 # the following values are learned during Retrieval
97 # self._required_shares
99 # and these are needed for Publish. They are filled in by Retrieval
100 # if possible, otherwise by the first peer that Publish talks to.
102 self._encprivkey = None
105 def create_with_keys(self, (pubkey, privkey), initial_contents):
106 """Call this to create a brand-new mutable file. It will create the
107 shares, find homes for them, and upload the initial contents. Returns
108 a Deferred that fires (with the MutableFileNode instance you should
109 use) when it completes.
111 self._pubkey, self._privkey = pubkey, privkey
112 pubkey_s = self._pubkey.serialize()
113 privkey_s = self._privkey.serialize()
114 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
115 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
116 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
117 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
118 self._readkey = self._uri.readkey
119 self._storage_index = self._uri.storage_index
120 return self._upload(initial_contents, None)
122 def _encrypt_privkey(self, writekey, privkey):
124 crypttext = enc.process(privkey)
127 def _decrypt_privkey(self, enc_privkey):
128 enc = AES(self._writekey)
129 privkey = enc.process(enc_privkey)
132 def _populate_pubkey(self, pubkey):
133 self._pubkey = pubkey
134 def _populate_required_shares(self, required_shares):
135 self._required_shares = required_shares
136 def _populate_total_shares(self, total_shares):
137 self._total_shares = total_shares
139 def _populate_privkey(self, privkey):
140 self._privkey = privkey
141 def _populate_encprivkey(self, encprivkey):
142 self._encprivkey = encprivkey
145 def get_write_enabler(self, peerid):
146 assert len(peerid) == 20
147 return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
148 def get_renewal_secret(self, peerid):
149 assert len(peerid) == 20
150 crs = self._secret_holder.get_renewal_secret()
151 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
152 return hashutil.bucket_renewal_secret_hash(frs, peerid)
153 def get_cancel_secret(self, peerid):
154 assert len(peerid) == 20
155 ccs = self._secret_holder.get_cancel_secret()
156 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
157 return hashutil.bucket_cancel_secret_hash(fcs, peerid)
159 def get_writekey(self):
160 return self._writekey
161 def get_readkey(self):
163 def get_storage_index(self):
164 return self._storage_index
165 def get_privkey(self):
167 def get_encprivkey(self):
168 return self._encprivkey
169 def get_pubkey(self):
172 def get_required_shares(self):
173 return self._required_shares
174 def get_total_shares(self):
175 return self._total_shares
177 ####################################
181 return self._uri.to_string()
183 return "?" # TODO: this is likely to cause problems, not being an int
184 def get_readonly(self):
185 if self.is_readonly():
187 ro = MutableFileNode(self._storage_broker, self._secret_holder,
188 self._default_encoding_parameters, self._history)
189 ro.init_from_uri(self.get_readonly_uri())
192 def get_readonly_uri(self):
193 return self._uri.get_readonly().to_string()
195 def is_mutable(self):
196 return self._uri.is_mutable()
197 def is_readonly(self):
198 return self._uri.is_readonly()
201 return hash((self.__class__, self._uri))
202 def __cmp__(self, them):
203 if cmp(type(self), type(them)):
204 return cmp(type(self), type(them))
205 if cmp(self.__class__, them.__class__):
206 return cmp(self.__class__, them.__class__)
207 return cmp(self._uri, them._uri)
209 def get_verify_cap(self):
210 return IMutableFileURI(self._uri).get_verify_cap()
212 def get_repair_cap(self):
213 if self._uri.is_readonly():
217 def _do_serialized(self, cb, *args, **kwargs):
218 # note: to avoid deadlock, this callable is *not* allowed to invoke
219 # other serialized methods within this (or any other)
220 # MutableFileNode. The callable should be a bound method of this same
223 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
224 # we need to put off d.callback until this Deferred is finished being
225 # processed. Otherwise the caller's subsequent activities (like,
226 # doing other things with this node) can cause reentrancy problems in
227 # the Deferred code itself
228 self._serializer.addBoth(lambda res: eventually(d.callback, res))
229 # add a log.err just in case something really weird happens, because
230 # self._serializer stays around forever, therefore we won't see the
231 # usual Unhandled Error in Deferred that would give us a hint.
232 self._serializer.addErrback(log.err)
235 #################################
238 def check(self, monitor, verify=False, add_lease=False):
239 checker = MutableChecker(self, self._storage_broker,
240 self._history, monitor)
241 return checker.check(verify, add_lease)
243 def check_and_repair(self, monitor, verify=False, add_lease=False):
244 checker = MutableCheckAndRepairer(self, self._storage_broker,
245 self._history, monitor)
246 return checker.check(verify, add_lease)
248 #################################
251 def repair(self, check_results, force=False):
252 assert ICheckResults(check_results)
253 r = Repairer(self, check_results)
258 #################################
261 # allow the use of IDownloadTarget
262 def download(self, target):
263 # fake it. TODO: make this cleaner.
264 d = self.download_best_version()
266 target.open(len(data))
269 return target.finish()
276 def download_best_version(self):
277 return self._do_serialized(self._download_best_version)
278 def _download_best_version(self):
279 servermap = ServerMap()
280 d = self._try_once_to_download_best_version(servermap, MODE_READ)
282 f.trap(NotEnoughSharesError)
283 # the download is worth retrying once. Make sure to use the
284 # old servermap, since it is what remembers the bad shares,
285 # but use MODE_WRITE to make it look for even more shares.
286 # TODO: consider allowing this to retry multiple times.. this
287 # approach will let us tolerate about 8 bad shares, I think.
288 return self._try_once_to_download_best_version(servermap,
290 d.addErrback(_maybe_retry)
292 def _try_once_to_download_best_version(self, servermap, mode):
293 d = self._update_servermap(servermap, mode)
294 d.addCallback(self._once_updated_download_best_version, servermap)
296 def _once_updated_download_best_version(self, ignored, servermap):
297 goal = servermap.best_recoverable_version()
299 raise UnrecoverableFileError("no recoverable versions")
300 return self._try_once_to_download_version(servermap, goal)
302 def get_size_of_best_version(self):
303 d = self.get_servermap(MODE_READ)
304 def _got_servermap(smap):
305 ver = smap.best_recoverable_version()
307 raise UnrecoverableFileError("no recoverable version")
308 return smap.size_of_version(ver)
309 d.addCallback(_got_servermap)
312 def overwrite(self, new_contents):
313 return self._do_serialized(self._overwrite, new_contents)
314 def _overwrite(self, new_contents):
315 servermap = ServerMap()
316 d = self._update_servermap(servermap, mode=MODE_WRITE)
317 d.addCallback(lambda ignored: self._upload(new_contents, servermap))
321 def modify(self, modifier, backoffer=None):
322 """I use a modifier callback to apply a change to the mutable file.
323 I implement the following pseudocode::
325 obtain_mutable_filenode_lock()
328 update_servermap(MODE_WRITE)
329 old = retrieve_best_version()
330 new = modifier(old, servermap, first_time)
335 except UncoordinatedWriteError, e:
339 release_mutable_filenode_lock()
341 The idea is that your modifier function can apply a delta of some
342 sort, and it will be re-run as necessary until it succeeds. The
343 modifier must inspect the old version to see whether its delta has
344 already been applied: if so it should return the contents unmodified.
346 Note that the modifier is required to run synchronously, and must not
347 invoke any methods on this MutableFileNode instance.
349 The backoff-er is a callable that is responsible for inserting a
350 random delay between subsequent attempts, to help competing updates
351 from colliding forever. It is also allowed to give up after a while.
352 The backoffer is given two arguments: this MutableFileNode, and the
353 Failure object that contains the UncoordinatedWriteError. It should
354 return a Deferred that will fire when the next attempt should be
355 made, or return the Failure if the loop should give up. If
356 backoffer=None, a default one is provided which will perform
357 exponential backoff, and give up after 4 tries. Note that the
358 backoffer should not invoke any methods on this MutableFileNode
359 instance, and it needs to be highly conscious of deadlock issues.
361 return self._do_serialized(self._modify, modifier, backoffer)
362 def _modify(self, modifier, backoffer):
363 servermap = ServerMap()
364 if backoffer is None:
365 backoffer = BackoffAgent().delay
366 return self._modify_and_retry(servermap, modifier, backoffer, True)
367 def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
368 d = self._modify_once(servermap, modifier, first_time)
370 f.trap(UncoordinatedWriteError)
371 d2 = defer.maybeDeferred(backoffer, self, f)
372 d2.addCallback(lambda ignored:
373 self._modify_and_retry(servermap, modifier,
378 def _modify_once(self, servermap, modifier, first_time):
379 d = self._update_servermap(servermap, MODE_WRITE)
380 d.addCallback(self._once_updated_download_best_version, servermap)
381 def _apply(old_contents):
382 new_contents = modifier(old_contents, servermap, first_time)
383 if new_contents is None or new_contents == old_contents:
384 # no changes need to be made
387 # However, since Publish is not automatically doing a
388 # recovery when it observes UCWE, we need to do a second
389 # publish. See #551 for details. We'll basically loop until
390 # we managed an uncontested publish.
391 new_contents = old_contents
392 precondition(isinstance(new_contents, str),
393 "Modifier function must return a string or None")
394 return self._upload(new_contents, servermap)
395 d.addCallback(_apply)
398 def get_servermap(self, mode):
399 return self._do_serialized(self._get_servermap, mode)
400 def _get_servermap(self, mode):
401 servermap = ServerMap()
402 return self._update_servermap(servermap, mode)
403 def _update_servermap(self, servermap, mode):
404 u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
407 self._history.notify_mapupdate(u.get_status())
410 def download_version(self, servermap, version, fetch_privkey=False):
411 return self._do_serialized(self._try_once_to_download_version,
412 servermap, version, fetch_privkey)
413 def _try_once_to_download_version(self, servermap, version,
414 fetch_privkey=False):
415 r = Retrieve(self, servermap, version, fetch_privkey)
417 self._history.notify_retrieve(r.get_status())
420 def upload(self, new_contents, servermap):
421 return self._do_serialized(self._upload, new_contents, servermap)
422 def _upload(self, new_contents, servermap):
423 assert self._pubkey, "update_servermap must be called before publish"
424 p = Publish(self, self._storage_broker, servermap)
426 self._history.notify_publish(p.get_status(), len(new_contents))
427 return p.publish(new_contents)