4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
8 ICheckable, ICheckResults, NotEnoughSharesError
9 from allmydata.util import hashutil, log
10 from allmydata.util.assertutil import precondition
11 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
12 from allmydata.monitor import Monitor
13 from pycryptopp.cipher.aes import AES
15 from publish import Publish
16 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
17 ResponseCache, UncoordinatedWriteError
18 from servermap import ServerMap, ServermapUpdater
19 from retrieve import Retrieve
20 from checker import MutableChecker, MutableCheckAndRepairer
21 from repairer import Repairer
25 # these parameters are copied from foolscap.reconnector, which gets them
26 # from twisted.internet.protocol.ReconnectingClientFactory
28 factor = 2.7182818284590451 # (math.e)
29 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
33 self._delay = self.initialDelay
35 def delay(self, node, f):
39 self._delay = self._delay * self.factor
40 self._delay = random.normalvariate(self._delay,
41 self._delay * self.jitter)
43 reactor.callLater(self._delay, d.callback, None)
46 # use nodemaker.create_mutable_file() to make one of these
48 class MutableFileNode:
49 implements(IMutableFileNode, ICheckable)
51 def __init__(self, storage_broker, secret_holder,
52 default_encoding_parameters, history):
53 self._storage_broker = storage_broker
54 self._secret_holder = secret_holder
55 self._default_encoding_parameters = default_encoding_parameters
56 self._history = history
57 self._pubkey = None # filled in upon first read
58 self._privkey = None # filled in if we're mutable
59 # we keep track of the last encoding parameters that we use. These
60 # are updated upon retrieve, and used by publish. If we publish
61 # without ever reading (i.e. overwrite()), then we use these values.
62 self._required_shares = default_encoding_parameters["k"]
63 self._total_shares = default_encoding_parameters["n"]
64 self._sharemap = {} # known shares, shnum-to-[nodeids]
65 self._cache = ResponseCache()
67 # all users of this MutableFileNode go through the serializer. This
68 # takes advantage of the fact that Deferreds discard the callbacks
69 # that they're done with, so we can keep using the same Deferred
70 # forever without consuming more and more memory.
71 self._serializer = defer.succeed(None)
74 if hasattr(self, '_uri'):
75 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
77 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
79 def init_from_uri(self, filecap):
80 # we have the URI, but we have not yet retrieved the public
81 # verification key, nor things like 'k' or 'N'. If and when someone
82 # wants to get our contents, we'll pull from shares and fill those
84 assert isinstance(filecap, str)
85 if filecap.startswith("URI:SSK:"):
86 self._uri = WriteableSSKFileURI.init_from_string(filecap)
87 self._writekey = self._uri.writekey
89 assert filecap.startswith("URI:SSK-RO:")
90 self._uri = ReadonlySSKFileURI.init_from_string(filecap)
92 self._readkey = self._uri.readkey
93 self._storage_index = self._uri.storage_index
94 self._fingerprint = self._uri.fingerprint
95 # the following values are learned during Retrieval
97 # self._required_shares
99 # and these are needed for Publish. They are filled in by Retrieval
100 # if possible, otherwise by the first peer that Publish talks to.
102 self._encprivkey = None
105 def create_with_keys(self, (pubkey, privkey), contents):
106 """Call this to create a brand-new mutable file. It will create the
107 shares, find homes for them, and upload the initial contents (created
108 with the same rules as IClient.create_mutable_file() ). Returns a
109 Deferred that fires (with the MutableFileNode instance you should
110 use) when it completes.
112 self._pubkey, self._privkey = pubkey, privkey
113 pubkey_s = self._pubkey.serialize()
114 privkey_s = self._privkey.serialize()
115 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
116 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
117 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
118 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
119 self._readkey = self._uri.readkey
120 self._storage_index = self._uri.storage_index
121 initial_contents = self._get_initial_contents(contents)
122 return self._upload(initial_contents, None)
124 def _get_initial_contents(self, contents):
125 if isinstance(contents, str):
129 assert callable(contents), "%s should be callable, not %s" % \
130 (contents, type(contents))
131 return contents(self)
133 def _encrypt_privkey(self, writekey, privkey):
135 crypttext = enc.process(privkey)
138 def _decrypt_privkey(self, enc_privkey):
139 enc = AES(self._writekey)
140 privkey = enc.process(enc_privkey)
143 def _populate_pubkey(self, pubkey):
144 self._pubkey = pubkey
145 def _populate_required_shares(self, required_shares):
146 self._required_shares = required_shares
147 def _populate_total_shares(self, total_shares):
148 self._total_shares = total_shares
150 def _populate_privkey(self, privkey):
151 self._privkey = privkey
152 def _populate_encprivkey(self, encprivkey):
153 self._encprivkey = encprivkey
156 def get_write_enabler(self, peerid):
157 assert len(peerid) == 20
158 return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
159 def get_renewal_secret(self, peerid):
160 assert len(peerid) == 20
161 crs = self._secret_holder.get_renewal_secret()
162 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
163 return hashutil.bucket_renewal_secret_hash(frs, peerid)
164 def get_cancel_secret(self, peerid):
165 assert len(peerid) == 20
166 ccs = self._secret_holder.get_cancel_secret()
167 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
168 return hashutil.bucket_cancel_secret_hash(fcs, peerid)
170 def get_writekey(self):
171 return self._writekey
172 def get_readkey(self):
174 def get_storage_index(self):
175 return self._storage_index
176 def get_privkey(self):
178 def get_encprivkey(self):
179 return self._encprivkey
180 def get_pubkey(self):
183 def get_required_shares(self):
184 return self._required_shares
185 def get_total_shares(self):
186 return self._total_shares
188 ####################################
192 return self._uri.to_string()
194 return "?" # TODO: this is likely to cause problems, not being an int
195 def get_readonly(self):
196 if self.is_readonly():
198 ro = MutableFileNode(self._storage_broker, self._secret_holder,
199 self._default_encoding_parameters, self._history)
200 ro.init_from_uri(self.get_readonly_uri())
203 def get_readonly_uri(self):
204 return self._uri.get_readonly().to_string()
206 def is_mutable(self):
207 return self._uri.is_mutable()
208 def is_readonly(self):
209 return self._uri.is_readonly()
212 return hash((self.__class__, self._uri))
213 def __cmp__(self, them):
214 if cmp(type(self), type(them)):
215 return cmp(type(self), type(them))
216 if cmp(self.__class__, them.__class__):
217 return cmp(self.__class__, them.__class__)
218 return cmp(self._uri, them._uri)
220 def get_verify_cap(self):
221 return IMutableFileURI(self._uri).get_verify_cap()
223 def get_repair_cap(self):
224 if self._uri.is_readonly():
228 def _do_serialized(self, cb, *args, **kwargs):
229 # note: to avoid deadlock, this callable is *not* allowed to invoke
230 # other serialized methods within this (or any other)
231 # MutableFileNode. The callable should be a bound method of this same
234 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
235 # we need to put off d.callback until this Deferred is finished being
236 # processed. Otherwise the caller's subsequent activities (like,
237 # doing other things with this node) can cause reentrancy problems in
238 # the Deferred code itself
239 self._serializer.addBoth(lambda res: eventually(d.callback, res))
240 # add a log.err just in case something really weird happens, because
241 # self._serializer stays around forever, therefore we won't see the
242 # usual Unhandled Error in Deferred that would give us a hint.
243 self._serializer.addErrback(log.err)
246 #################################
249 def check(self, monitor, verify=False, add_lease=False):
250 checker = MutableChecker(self, self._storage_broker,
251 self._history, monitor)
252 return checker.check(verify, add_lease)
254 def check_and_repair(self, monitor, verify=False, add_lease=False):
255 checker = MutableCheckAndRepairer(self, self._storage_broker,
256 self._history, monitor)
257 return checker.check(verify, add_lease)
259 #################################
262 def repair(self, check_results, force=False):
263 assert ICheckResults(check_results)
264 r = Repairer(self, check_results)
269 #################################
272 # allow the use of IDownloadTarget
273 def download(self, target):
274 # fake it. TODO: make this cleaner.
275 d = self.download_best_version()
277 target.open(len(data))
280 return target.finish()
287 def download_best_version(self):
288 return self._do_serialized(self._download_best_version)
289 def _download_best_version(self):
290 servermap = ServerMap()
291 d = self._try_once_to_download_best_version(servermap, MODE_READ)
293 f.trap(NotEnoughSharesError)
294 # the download is worth retrying once. Make sure to use the
295 # old servermap, since it is what remembers the bad shares,
296 # but use MODE_WRITE to make it look for even more shares.
297 # TODO: consider allowing this to retry multiple times.. this
298 # approach will let us tolerate about 8 bad shares, I think.
299 return self._try_once_to_download_best_version(servermap,
301 d.addErrback(_maybe_retry)
303 def _try_once_to_download_best_version(self, servermap, mode):
304 d = self._update_servermap(servermap, mode)
305 d.addCallback(self._once_updated_download_best_version, servermap)
307 def _once_updated_download_best_version(self, ignored, servermap):
308 goal = servermap.best_recoverable_version()
310 raise UnrecoverableFileError("no recoverable versions")
311 return self._try_once_to_download_version(servermap, goal)
313 def get_size_of_best_version(self):
314 d = self.get_servermap(MODE_READ)
315 def _got_servermap(smap):
316 ver = smap.best_recoverable_version()
318 raise UnrecoverableFileError("no recoverable version")
319 return smap.size_of_version(ver)
320 d.addCallback(_got_servermap)
323 def overwrite(self, new_contents):
324 return self._do_serialized(self._overwrite, new_contents)
325 def _overwrite(self, new_contents):
326 servermap = ServerMap()
327 d = self._update_servermap(servermap, mode=MODE_WRITE)
328 d.addCallback(lambda ignored: self._upload(new_contents, servermap))
332 def modify(self, modifier, backoffer=None):
333 """I use a modifier callback to apply a change to the mutable file.
334 I implement the following pseudocode::
336 obtain_mutable_filenode_lock()
339 update_servermap(MODE_WRITE)
340 old = retrieve_best_version()
341 new = modifier(old, servermap, first_time)
346 except UncoordinatedWriteError, e:
350 release_mutable_filenode_lock()
352 The idea is that your modifier function can apply a delta of some
353 sort, and it will be re-run as necessary until it succeeds. The
354 modifier must inspect the old version to see whether its delta has
355 already been applied: if so it should return the contents unmodified.
357 Note that the modifier is required to run synchronously, and must not
358 invoke any methods on this MutableFileNode instance.
360 The backoff-er is a callable that is responsible for inserting a
361 random delay between subsequent attempts, to help competing updates
362 from colliding forever. It is also allowed to give up after a while.
363 The backoffer is given two arguments: this MutableFileNode, and the
364 Failure object that contains the UncoordinatedWriteError. It should
365 return a Deferred that will fire when the next attempt should be
366 made, or return the Failure if the loop should give up. If
367 backoffer=None, a default one is provided which will perform
368 exponential backoff, and give up after 4 tries. Note that the
369 backoffer should not invoke any methods on this MutableFileNode
370 instance, and it needs to be highly conscious of deadlock issues.
372 return self._do_serialized(self._modify, modifier, backoffer)
373 def _modify(self, modifier, backoffer):
374 servermap = ServerMap()
375 if backoffer is None:
376 backoffer = BackoffAgent().delay
377 return self._modify_and_retry(servermap, modifier, backoffer, True)
378 def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
379 d = self._modify_once(servermap, modifier, first_time)
381 f.trap(UncoordinatedWriteError)
382 d2 = defer.maybeDeferred(backoffer, self, f)
383 d2.addCallback(lambda ignored:
384 self._modify_and_retry(servermap, modifier,
389 def _modify_once(self, servermap, modifier, first_time):
390 d = self._update_servermap(servermap, MODE_WRITE)
391 d.addCallback(self._once_updated_download_best_version, servermap)
392 def _apply(old_contents):
393 new_contents = modifier(old_contents, servermap, first_time)
394 if new_contents is None or new_contents == old_contents:
395 # no changes need to be made
398 # However, since Publish is not automatically doing a
399 # recovery when it observes UCWE, we need to do a second
400 # publish. See #551 for details. We'll basically loop until
401 # we managed an uncontested publish.
402 new_contents = old_contents
403 precondition(isinstance(new_contents, str),
404 "Modifier function must return a string or None")
405 return self._upload(new_contents, servermap)
406 d.addCallback(_apply)
409 def get_servermap(self, mode):
410 return self._do_serialized(self._get_servermap, mode)
411 def _get_servermap(self, mode):
412 servermap = ServerMap()
413 return self._update_servermap(servermap, mode)
414 def _update_servermap(self, servermap, mode):
415 u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
418 self._history.notify_mapupdate(u.get_status())
421 def download_version(self, servermap, version, fetch_privkey=False):
422 return self._do_serialized(self._try_once_to_download_version,
423 servermap, version, fetch_privkey)
424 def _try_once_to_download_version(self, servermap, version,
425 fetch_privkey=False):
426 r = Retrieve(self, servermap, version, fetch_privkey)
428 self._history.notify_retrieve(r.get_status())
431 def upload(self, new_contents, servermap):
432 return self._do_serialized(self._upload, new_contents, servermap)
433 def _upload(self, new_contents, servermap):
434 assert self._pubkey, "update_servermap must be called before publish"
435 p = Publish(self, self._storage_broker, servermap)
437 self._history.notify_publish(p.get_status(), len(new_contents))
438 return p.publish(new_contents)