4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, \
8 ICheckable, ICheckResults, NotEnoughSharesError
9 from allmydata.util import hashutil, log
10 from allmydata.util.assertutil import precondition
11 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
12 from allmydata.monitor import Monitor
13 from pycryptopp.cipher.aes import AES
15 from publish import Publish
16 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
17 ResponseCache, UncoordinatedWriteError
18 from servermap import ServerMap, ServermapUpdater
19 from retrieve import Retrieve
20 from checker import MutableChecker, MutableCheckAndRepairer
21 from repairer import Repairer
25 # these parameters are copied from foolscap.reconnector, which gets them
26 # from twisted.internet.protocol.ReconnectingClientFactory
28 factor = 2.7182818284590451 # (math.e)
29 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
33 self._delay = self.initialDelay
35 def delay(self, node, f):
39 self._delay = self._delay * self.factor
40 self._delay = random.normalvariate(self._delay,
41 self._delay * self.jitter)
43 reactor.callLater(self._delay, d.callback, None)
46 # use nodemaker.create_mutable_file() to make one of these
48 class MutableFileNode:
49 implements(IMutableFileNode, ICheckable)
51 def __init__(self, storage_broker, secret_holder,
52 default_encoding_parameters, history):
53 self._storage_broker = storage_broker
54 self._secret_holder = secret_holder
55 self._default_encoding_parameters = default_encoding_parameters
56 self._history = history
57 self._pubkey = None # filled in upon first read
58 self._privkey = None # filled in if we're mutable
59 # we keep track of the last encoding parameters that we use. These
60 # are updated upon retrieve, and used by publish. If we publish
61 # without ever reading (i.e. overwrite()), then we use these values.
62 self._required_shares = default_encoding_parameters["k"]
63 self._total_shares = default_encoding_parameters["n"]
64 self._sharemap = {} # known shares, shnum-to-[nodeids]
65 self._cache = ResponseCache()
66 self._most_recent_size = None
68 # all users of this MutableFileNode go through the serializer. This
69 # takes advantage of the fact that Deferreds discard the callbacks
70 # that they're done with, so we can keep using the same Deferred
71 # forever without consuming more and more memory.
72 self._serializer = defer.succeed(None)
75 if hasattr(self, '_uri'):
76 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
78 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
80 def init_from_cap(self, filecap):
81 # we have the URI, but we have not yet retrieved the public
82 # verification key, nor things like 'k' or 'N'. If and when someone
83 # wants to get our contents, we'll pull from shares and fill those
85 assert isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI))
88 if isinstance(filecap, WriteableSSKFileURI):
89 self._writekey = self._uri.writekey
90 self._readkey = self._uri.readkey
91 self._storage_index = self._uri.storage_index
92 self._fingerprint = self._uri.fingerprint
93 # the following values are learned during Retrieval
95 # self._required_shares
97 # and these are needed for Publish. They are filled in by Retrieval
98 # if possible, otherwise by the first peer that Publish talks to.
100 self._encprivkey = None
103 def create_with_keys(self, (pubkey, privkey), contents):
104 """Call this to create a brand-new mutable file. It will create the
105 shares, find homes for them, and upload the initial contents (created
106 with the same rules as IClient.create_mutable_file() ). Returns a
107 Deferred that fires (with the MutableFileNode instance you should
108 use) when it completes.
110 self._pubkey, self._privkey = pubkey, privkey
111 pubkey_s = self._pubkey.serialize()
112 privkey_s = self._privkey.serialize()
113 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
114 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
115 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
116 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
117 self._readkey = self._uri.readkey
118 self._storage_index = self._uri.storage_index
119 initial_contents = self._get_initial_contents(contents)
120 return self._upload(initial_contents, None)
122 def _get_initial_contents(self, contents):
123 if isinstance(contents, str):
127 assert callable(contents), "%s should be callable, not %s" % \
128 (contents, type(contents))
129 return contents(self)
131 def _encrypt_privkey(self, writekey, privkey):
133 crypttext = enc.process(privkey)
136 def _decrypt_privkey(self, enc_privkey):
137 enc = AES(self._writekey)
138 privkey = enc.process(enc_privkey)
141 def _populate_pubkey(self, pubkey):
142 self._pubkey = pubkey
143 def _populate_required_shares(self, required_shares):
144 self._required_shares = required_shares
145 def _populate_total_shares(self, total_shares):
146 self._total_shares = total_shares
148 def _populate_privkey(self, privkey):
149 self._privkey = privkey
150 def _populate_encprivkey(self, encprivkey):
151 self._encprivkey = encprivkey
152 def _add_to_cache(self, verinfo, shnum, offset, data, timestamp):
153 self._cache.add(verinfo, shnum, offset, data, timestamp)
154 def _read_from_cache(self, verinfo, shnum, offset, length):
155 return self._cache.read(verinfo, shnum, offset, length)
157 def get_write_enabler(self, peerid):
158 assert len(peerid) == 20
159 return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
160 def get_renewal_secret(self, peerid):
161 assert len(peerid) == 20
162 crs = self._secret_holder.get_renewal_secret()
163 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
164 return hashutil.bucket_renewal_secret_hash(frs, peerid)
165 def get_cancel_secret(self, peerid):
166 assert len(peerid) == 20
167 ccs = self._secret_holder.get_cancel_secret()
168 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
169 return hashutil.bucket_cancel_secret_hash(fcs, peerid)
171 def get_writekey(self):
172 return self._writekey
173 def get_readkey(self):
175 def get_storage_index(self):
176 return self._storage_index
177 def get_fingerprint(self):
178 return self._fingerprint
179 def get_privkey(self):
181 def get_encprivkey(self):
182 return self._encprivkey
183 def get_pubkey(self):
186 def get_required_shares(self):
187 return self._required_shares
188 def get_total_shares(self):
189 return self._total_shares
191 ####################################
195 return self._most_recent_size
196 def get_current_size(self):
197 d = self.get_size_of_best_version()
198 d.addCallback(self._stash_size)
200 def _stash_size(self, size):
201 self._most_recent_size = size
206 def get_readcap(self):
207 return self._uri.get_readonly()
208 def get_verify_cap(self):
209 return self._uri.get_verify_cap()
210 def get_repair_cap(self):
211 if self._uri.is_readonly():
216 return self._uri.to_string()
217 def get_readonly_uri(self):
218 return self._uri.get_readonly().to_string()
220 def get_readonly(self):
221 if self.is_readonly():
223 ro = MutableFileNode(self._storage_broker, self._secret_holder,
224 self._default_encoding_parameters, self._history)
225 ro.init_from_cap(self._uri.get_readonly())
228 def is_mutable(self):
229 return self._uri.is_mutable()
230 def is_readonly(self):
231 return self._uri.is_readonly()
234 return hash((self.__class__, self._uri))
235 def __cmp__(self, them):
236 if cmp(type(self), type(them)):
237 return cmp(type(self), type(them))
238 if cmp(self.__class__, them.__class__):
239 return cmp(self.__class__, them.__class__)
240 return cmp(self._uri, them._uri)
242 def _do_serialized(self, cb, *args, **kwargs):
243 # note: to avoid deadlock, this callable is *not* allowed to invoke
244 # other serialized methods within this (or any other)
245 # MutableFileNode. The callable should be a bound method of this same
248 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
249 # we need to put off d.callback until this Deferred is finished being
250 # processed. Otherwise the caller's subsequent activities (like,
251 # doing other things with this node) can cause reentrancy problems in
252 # the Deferred code itself
253 self._serializer.addBoth(lambda res: eventually(d.callback, res))
254 # add a log.err just in case something really weird happens, because
255 # self._serializer stays around forever, therefore we won't see the
256 # usual Unhandled Error in Deferred that would give us a hint.
257 self._serializer.addErrback(log.err)
260 #################################
263 def check(self, monitor, verify=False, add_lease=False):
264 checker = MutableChecker(self, self._storage_broker,
265 self._history, monitor)
266 return checker.check(verify, add_lease)
268 def check_and_repair(self, monitor, verify=False, add_lease=False):
269 checker = MutableCheckAndRepairer(self, self._storage_broker,
270 self._history, monitor)
271 return checker.check(verify, add_lease)
273 #################################
276 def repair(self, check_results, force=False):
277 assert ICheckResults(check_results)
278 r = Repairer(self, check_results)
283 #################################
286 def download_best_version(self):
287 return self._do_serialized(self._download_best_version)
288 def _download_best_version(self):
289 servermap = ServerMap()
290 d = self._try_once_to_download_best_version(servermap, MODE_READ)
292 f.trap(NotEnoughSharesError)
293 # the download is worth retrying once. Make sure to use the
294 # old servermap, since it is what remembers the bad shares,
295 # but use MODE_WRITE to make it look for even more shares.
296 # TODO: consider allowing this to retry multiple times.. this
297 # approach will let us tolerate about 8 bad shares, I think.
298 return self._try_once_to_download_best_version(servermap,
300 d.addErrback(_maybe_retry)
302 def _try_once_to_download_best_version(self, servermap, mode):
303 d = self._update_servermap(servermap, mode)
304 d.addCallback(self._once_updated_download_best_version, servermap)
306 def _once_updated_download_best_version(self, ignored, servermap):
307 goal = servermap.best_recoverable_version()
309 raise UnrecoverableFileError("no recoverable versions")
310 return self._try_once_to_download_version(servermap, goal)
312 def get_size_of_best_version(self):
313 d = self.get_servermap(MODE_READ)
314 def _got_servermap(smap):
315 ver = smap.best_recoverable_version()
317 raise UnrecoverableFileError("no recoverable version")
318 return smap.size_of_version(ver)
319 d.addCallback(_got_servermap)
322 def overwrite(self, new_contents):
323 return self._do_serialized(self._overwrite, new_contents)
324 def _overwrite(self, new_contents):
325 servermap = ServerMap()
326 d = self._update_servermap(servermap, mode=MODE_WRITE)
327 d.addCallback(lambda ignored: self._upload(new_contents, servermap))
331 def modify(self, modifier, backoffer=None):
332 """I use a modifier callback to apply a change to the mutable file.
333 I implement the following pseudocode::
335 obtain_mutable_filenode_lock()
338 update_servermap(MODE_WRITE)
339 old = retrieve_best_version()
340 new = modifier(old, servermap, first_time)
345 except UncoordinatedWriteError, e:
349 release_mutable_filenode_lock()
351 The idea is that your modifier function can apply a delta of some
352 sort, and it will be re-run as necessary until it succeeds. The
353 modifier must inspect the old version to see whether its delta has
354 already been applied: if so it should return the contents unmodified.
356 Note that the modifier is required to run synchronously, and must not
357 invoke any methods on this MutableFileNode instance.
359 The backoff-er is a callable that is responsible for inserting a
360 random delay between subsequent attempts, to help competing updates
361 from colliding forever. It is also allowed to give up after a while.
362 The backoffer is given two arguments: this MutableFileNode, and the
363 Failure object that contains the UncoordinatedWriteError. It should
364 return a Deferred that will fire when the next attempt should be
365 made, or return the Failure if the loop should give up. If
366 backoffer=None, a default one is provided which will perform
367 exponential backoff, and give up after 4 tries. Note that the
368 backoffer should not invoke any methods on this MutableFileNode
369 instance, and it needs to be highly conscious of deadlock issues.
371 return self._do_serialized(self._modify, modifier, backoffer)
372 def _modify(self, modifier, backoffer):
373 servermap = ServerMap()
374 if backoffer is None:
375 backoffer = BackoffAgent().delay
376 return self._modify_and_retry(servermap, modifier, backoffer, True)
377 def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
378 d = self._modify_once(servermap, modifier, first_time)
380 f.trap(UncoordinatedWriteError)
381 d2 = defer.maybeDeferred(backoffer, self, f)
382 d2.addCallback(lambda ignored:
383 self._modify_and_retry(servermap, modifier,
388 def _modify_once(self, servermap, modifier, first_time):
389 d = self._update_servermap(servermap, MODE_WRITE)
390 d.addCallback(self._once_updated_download_best_version, servermap)
391 def _apply(old_contents):
392 new_contents = modifier(old_contents, servermap, first_time)
393 if new_contents is None or new_contents == old_contents:
394 # no changes need to be made
397 # However, since Publish is not automatically doing a
398 # recovery when it observes UCWE, we need to do a second
399 # publish. See #551 for details. We'll basically loop until
400 # we managed an uncontested publish.
401 new_contents = old_contents
402 precondition(isinstance(new_contents, str),
403 "Modifier function must return a string or None")
404 return self._upload(new_contents, servermap)
405 d.addCallback(_apply)
408 def get_servermap(self, mode):
409 return self._do_serialized(self._get_servermap, mode)
410 def _get_servermap(self, mode):
411 servermap = ServerMap()
412 return self._update_servermap(servermap, mode)
413 def _update_servermap(self, servermap, mode):
414 u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
417 self._history.notify_mapupdate(u.get_status())
420 def download_version(self, servermap, version, fetch_privkey=False):
421 return self._do_serialized(self._try_once_to_download_version,
422 servermap, version, fetch_privkey)
423 def _try_once_to_download_version(self, servermap, version,
424 fetch_privkey=False):
425 r = Retrieve(self, servermap, version, fetch_privkey)
427 self._history.notify_retrieve(r.get_status())
429 d.addCallback(self._downloaded_version)
431 def _downloaded_version(self, data):
432 self._most_recent_size = len(data)
435 def upload(self, new_contents, servermap):
436 return self._do_serialized(self._upload, new_contents, servermap)
437 def _upload(self, new_contents, servermap):
438 assert self._pubkey, "update_servermap must be called before publish"
439 p = Publish(self, self._storage_broker, servermap)
441 self._history.notify_publish(p.get_status(), len(new_contents))
442 d = p.publish(new_contents)
443 d.addCallback(self._did_upload, len(new_contents))
445 def _did_upload(self, res, size):
446 self._most_recent_size = size