2 import os.path, stat, time
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 from foolscap.eventual import eventually
8 from allmydata import upload, interfaces, storage, uri
9 from allmydata.util import idlib, log, observer, fileutil, hashutil
12 class NotEnoughWritersError(Exception):
16 class CHKCheckerAndUEBFetcher:
17 """I check to see if a file is already present in the grid. I also fetch
18 the URI Extension Block, which is useful for an uploading client who
19 wants to avoid the work of encryption and encoding.
21 I return False if the file is not completely healthy: i.e. if there are
22 less than 'N' shares present.
24 If the file is completely healthy, I return a tuple of (sharemap,
28 def __init__(self, peer_getter, storage_index, logparent=None):
29 self._peer_getter = peer_getter
30 self._found_shares = set()
31 self._storage_index = storage_index
36 self._logparent = logparent
38 def log(self, *args, **kwargs):
39 if 'facility' not in kwargs:
40 kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
41 if 'parent' not in kwargs:
42 kwargs['parent'] = self._logparent
43 return log.msg(*args, **kwargs)
46 d = self._get_all_shareholders(self._storage_index)
47 d.addCallback(self._get_uri_extension)
48 d.addCallback(self._done)
51 def _get_all_shareholders(self, storage_index):
53 for (peerid, ss) in self._peer_getter("storage", storage_index):
54 d = ss.callRemote("get_buckets", storage_index)
55 d.addCallbacks(self._got_response, self._got_error,
56 callbackArgs=(peerid,))
58 return defer.DeferredList(dl)
60 def _got_response(self, buckets, peerid):
61 # buckets is a dict: maps shum to an rref of the server who holds it
62 shnums_s = ",".join([str(shnum) for shnum in buckets])
63 self.log("got_response: [%s] has %d shares (%s)" %
64 (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
66 self._found_shares.update(buckets.keys())
68 if k not in self._sharemap:
69 self._sharemap[k] = []
70 self._sharemap[k].append(peerid)
71 self._readers.update( [ (bucket, peerid)
72 for bucket in buckets.values() ] )
74 def _got_error(self, f):
77 log.err(f, parent=self._logparent)
80 def _get_uri_extension(self, res):
81 # assume that we can pull the UEB from any share. If we get an error,
82 # declare the whole file unavailable.
84 self.log("no readers, so no UEB", level=log.NOISY)
86 b,peerid = self._readers.pop()
87 rbp = storage.ReadBucketProxy(b, peerid,
88 storage.si_b2a(self._storage_index))
89 d = rbp.startIfNecessary()
90 d.addCallback(lambda res: rbp.get_uri_extension())
91 d.addCallback(self._got_uri_extension)
92 d.addErrback(self._ueb_error)
95 def _got_uri_extension(self, ueb):
96 self.log("_got_uri_extension", level=log.NOISY)
97 self._ueb_hash = hashutil.uri_extension_hash(ueb)
98 self._ueb_data = uri.unpack_extension(ueb)
100 def _ueb_error(self, f):
101 # an error means the file is unavailable, but the overall check
103 self.log("UEB fetch failed", failure=f, level=log.WEIRD)
106 def _done(self, res):
108 found = len(self._found_shares)
109 total = self._ueb_data['total_shares']
110 self.log(format="got %(found)d shares of %(total)d",
111 found=found, total=total, level=log.NOISY)
113 # not all shares are present in the grid
114 self.log("not enough to qualify, file not found in grid",
117 # all shares are present
118 self.log("all shares present, file is found in grid",
120 return (self._sharemap, self._ueb_data, self._ueb_hash)
121 # no shares are present
122 self.log("unable to find UEB data, file not found in grid",
127 class CHKUploadHelper(Referenceable, upload.CHKUploader):
128 """I am the helper-server -side counterpart to AssistedUploader. I handle
129 peer selection, encoding, and share pushing. I read ciphertext from the
130 remote AssistedUploader.
132 implements(interfaces.RICHKUploadHelper)
134 def __init__(self, storage_index, helper,
135 incoming_file, encoding_file,
136 results, log_number):
137 self._storage_index = storage_index
138 self._helper = helper
139 self._incoming_file = incoming_file
140 self._encoding_file = encoding_file
141 upload_id = storage.si_b2a(storage_index)[:5]
142 self._log_number = log_number
143 self._results = results
144 self._upload_status = upload.UploadStatus()
145 self._upload_status.set_helper(False)
146 self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
149 self._client = helper.parent
150 self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
152 self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
153 self._finished_observers = observer.OneShotObserverList()
155 d = self._fetcher.when_done()
156 d.addCallback(lambda res: self._reader.start())
157 d.addCallback(lambda res: self.start_encrypted(self._reader))
158 d.addCallback(self._finished)
159 d.addErrback(self._failed)
161 def log(self, *args, **kwargs):
162 if 'facility' not in kwargs:
163 kwargs['facility'] = "tahoe.helper.chk"
164 return upload.CHKUploader.log(self, *args, **kwargs)
167 self._started = time.time()
168 # determine if we need to upload the file. If so, return ({},self) .
169 # If not, return (UploadResults,None) .
170 self.log("deciding whether to upload the file or not", level=log.NOISY)
171 if os.path.exists(self._encoding_file):
172 # we have the whole file, and we might be encoding it (or the
173 # encode/upload might have failed, and we need to restart it).
174 self.log("ciphertext already in place", level=log.UNUSUAL)
175 return (self._results, self)
176 if os.path.exists(self._incoming_file):
177 # we have some of the file, but not all of it (otherwise we'd be
178 # encoding). The caller might be useful.
179 self.log("partial ciphertext already present", level=log.UNUSUAL)
180 return (self._results, self)
181 # we don't remember uploading this file
182 self.log("no ciphertext yet", level=log.NOISY)
183 return (self._results, self)
185 def remote_upload(self, reader):
186 # reader is an RIEncryptedUploadable. I am specified to return an
187 # UploadResults dictionary.
189 # let our fetcher pull ciphertext from the reader.
190 self._fetcher.add_reader(reader)
192 self._reader.add_reader(reader)
194 # and inform the client when the upload has finished
195 return self._finished_observers.when_fired()
197 def _finished(self, res):
198 (uri_extension_hash, needed_shares, total_shares, size) = res
200 r.uri_extension_hash = uri_extension_hash
201 f_times = self._fetcher.get_times()
202 r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
203 r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
204 r.timings["total_fetch"] = f_times["total"]
206 os.unlink(self._encoding_file)
207 self._finished_observers.fire(r)
208 self._helper.upload_finished(self._storage_index)
211 def _failed(self, f):
212 self._finished_observers.fire(f)
213 self._helper.upload_finished(self._storage_index)
216 class AskUntilSuccessMixin:
217 # create me with a _reader array
220 def add_reader(self, reader):
221 self._readers.append(reader)
223 def call(self, *args, **kwargs):
224 if not self._readers:
225 raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
226 rr = self._readers[0]
227 d = rr.callRemote(*args, **kwargs)
229 self._last_failure = f
230 if rr in self._readers:
231 self._readers.remove(rr)
232 self._upload_helper.log("call to assisted uploader %s failed" % rr,
233 failure=f, level=log.UNUSUAL)
234 # we can try again with someone else who's left
235 return self.call(*args, **kwargs)
239 class CHKCiphertextFetcher(AskUntilSuccessMixin):
240 """I use one or more remote RIEncryptedUploadable instances to gather
241 ciphertext on disk. When I'm done, the file I create can be used by a
242 LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
245 I begin pulling ciphertext as soon as a reader is added. I remove readers
246 when they have any sort of error. If the last reader is removed, I fire
247 my when_done() Deferred with a failure.
249 I fire my when_done() Deferred (with None) immediately after I have moved
250 the ciphertext to 'encoded_file'.
253 def __init__(self, helper, incoming_file, encoded_file, logparent):
254 self._upload_helper = helper
255 self._incoming_file = incoming_file
256 self._encoding_file = encoded_file
257 self._log_parent = logparent
258 self._done_observers = observer.OneShotObserverList()
260 self._started = False
263 "cumulative_fetch": 0.0,
266 self._ciphertext_fetched = 0
268 def log(self, *args, **kwargs):
269 if "facility" not in kwargs:
270 kwargs["facility"] = "tahoe.helper.chkupload.fetch"
271 if "parent" not in kwargs:
272 kwargs["parent"] = self._log_parent
273 return log.msg(*args, **kwargs)
275 def add_reader(self, reader):
276 AskUntilSuccessMixin.add_reader(self, reader)
277 eventually(self._start)
283 started = time.time()
285 if os.path.exists(self._encoding_file):
286 self.log("ciphertext already present, bypassing fetch",
288 # we'll still need the plaintext hashes (when
289 # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
290 # called), and currently the easiest way to get them is to ask
291 # the sender for the last byte of ciphertext. That will provoke
292 # them into reading and hashing (but not sending) everything
294 have = os.stat(self._encoding_file)[stat.ST_SIZE]
295 d = self.call("read_encrypted", have-1, 1)
296 d.addCallback(self._done2, started)
299 # first, find out how large the file is going to be
300 d = self.call("get_size")
301 d.addCallback(self._got_size)
302 d.addCallback(self._start_reading)
303 d.addCallback(self._done)
304 d.addCallback(self._done2, started)
305 d.addErrback(self._failed)
307 def _got_size(self, size):
308 self.log("total size is %d bytes" % size, level=log.NOISY)
309 self._expected_size = size
311 def _start_reading(self, res):
312 # then find out how much crypttext we have on disk
313 if os.path.exists(self._incoming_file):
314 self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
315 self.log("we already have %d bytes" % self._have, level=log.NOISY)
318 self.log("we do not have any ciphertext yet", level=log.NOISY)
319 self.log("starting ciphertext fetch", level=log.NOISY)
320 self._f = open(self._incoming_file, "ab")
322 # now loop to pull the data from the readers
325 # this Deferred will be fired once the last byte has been written to
329 # read data in 50kB chunks. We should choose a more considered number
330 # here, possibly letting the client specify it. The goal should be to
331 # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
332 # the upload bandwidth lost because this protocol is non-windowing. Too
333 # large, however, means more memory consumption for both ends. Something
334 # that can be transferred in, say, 10 seconds sounds about right. On my
335 # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
336 # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
337 # memory than I want to hang on to, so I'm going to go with 50kB and see
341 def _loop(self, fire_when_done):
342 # this slightly weird structure is needed because Deferreds don't do
343 # tail-recursion, so it is important to let each one retire promptly.
344 # Simply chaining them will cause a stack overflow at the end of a
345 # transfer that involves more than a few hundred chunks.
346 # 'fire_when_done' lives a long time, but the Deferreds returned by
347 # the inner _fetch() call do not.
349 d = defer.maybeDeferred(self._fetch)
351 elapsed = time.time() - start
352 self._times["cumulative_fetch"] += elapsed
354 self.log("finished reading ciphertext", level=log.NOISY)
355 fire_when_done.callback(None)
357 self._loop(fire_when_done)
359 self.log("ciphertext read failed", failure=f, level=log.UNUSUAL)
360 fire_when_done.errback(f)
361 d.addCallbacks(_done, _err)
365 needed = self._expected_size - self._have
366 fetch_size = min(needed, self.CHUNK_SIZE)
368 return True # all done
369 self.log(format="fetching %(start)d-%(end)d of %(total)d",
371 end=self._have+fetch_size,
372 total=self._expected_size,
374 d = self.call("read_encrypted", self._have, fetch_size)
375 def _got_data(ciphertext_v):
376 for data in ciphertext_v:
378 self._have += len(data)
379 self._ciphertext_fetched += len(data)
380 return False # not done
381 d.addCallback(_got_data)
384 def _done(self, res):
387 self.log(format="done fetching ciphertext, size=%(size)d",
388 size=os.stat(self._incoming_file)[stat.ST_SIZE],
390 os.rename(self._incoming_file, self._encoding_file)
392 def _done2(self, _ignored, started):
393 self.log("done2", level=log.NOISY)
394 elapsed = time.time() - started
395 self._times["total"] = elapsed
397 self._done_observers.fire(None)
399 def _failed(self, f):
403 self._done_observers.fire(f)
406 return self._done_observers.when_fired()
411 def get_ciphertext_fetched(self):
412 return self._ciphertext_fetched
415 class LocalCiphertextReader(AskUntilSuccessMixin):
416 implements(interfaces.IEncryptedUploadable)
418 def __init__(self, upload_helper, storage_index, encoding_file):
420 self._upload_helper = upload_helper
421 self._storage_index = storage_index
422 self._encoding_file = encoding_file
425 def set_upload_status(self, upload_status):
426 self._status = interfaces.IUploadStatus(upload_status)
429 self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
430 self.f = open(self._encoding_file, "rb")
433 return defer.succeed(self._size)
435 def get_all_encoding_parameters(self):
436 return self.call("get_all_encoding_parameters")
438 def get_storage_index(self):
439 return defer.succeed(self._storage_index)
441 def read_encrypted(self, length, hash_only):
442 assert hash_only is False
443 d = defer.maybeDeferred(self.f.read, length)
444 d.addCallback(lambda data: [data])
446 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
447 return self.call("get_plaintext_hashtree_leaves", first, last,
449 def get_plaintext_hash(self):
450 return self.call("get_plaintext_hash")
453 # ??. I'm not sure if it makes sense to forward the close message.
454 return self.call("close")
458 class Helper(Referenceable, service.MultiService):
459 implements(interfaces.RIHelper)
460 # this is the non-distributed version. When we need to have multiple
461 # helpers, this object will become the HelperCoordinator, and will query
462 # the farm of Helpers to see if anyone has the storage_index of interest,
463 # and send the request off to them. If nobody has it, we'll choose a
467 chk_upload_helper_class = CHKUploadHelper
469 def __init__(self, basedir):
470 self._basedir = basedir
471 self._chk_incoming = os.path.join(basedir, "CHK_incoming")
472 self._chk_encoding = os.path.join(basedir, "CHK_encoding")
473 fileutil.make_dirs(self._chk_incoming)
474 fileutil.make_dirs(self._chk_encoding)
475 self._active_uploads = {}
476 service.MultiService.__init__(self)
478 def log(self, *args, **kwargs):
479 if 'facility' not in kwargs:
480 kwargs['facility'] = "tahoe.helper"
481 return self.parent.log(*args, **kwargs)
483 def remote_upload_chk(self, storage_index):
484 r = upload.UploadResults()
485 started = time.time()
486 si_s = storage.si_b2a(storage_index)
487 lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
488 incoming_file = os.path.join(self._chk_incoming, si_s)
489 encoding_file = os.path.join(self._chk_encoding, si_s)
490 if storage_index in self._active_uploads:
491 self.log("upload is currently active", parent=lp)
492 uh = self._active_uploads[storage_index]
495 d = self._check_for_chk_already_in_grid(storage_index, r, lp)
496 def _checked(already_present):
497 elapsed = time.time() - started
498 r.timings['existence_check'] = elapsed
500 # the necessary results are placed in the UploadResults
501 self.log("file already found in grid", parent=lp)
504 # the file is not present in the grid, by which we mean there are
505 # less than 'N' shares available.
506 self.log("unable to find file in the grid", parent=lp,
508 # We need an upload helper. Check our active uploads again in
509 # case there was a race.
510 if storage_index in self._active_uploads:
511 self.log("upload is currently active", parent=lp)
512 uh = self._active_uploads[storage_index]
514 self.log("creating new upload helper", parent=lp)
515 uh = self.chk_upload_helper_class(storage_index, self,
516 incoming_file, encoding_file,
518 self._active_uploads[storage_index] = uh
520 d.addCallback(_checked)
522 self.log("error while checking for chk-already-in-grid",
523 failure=f, level=log.WEIRD, parent=lp)
528 def _check_for_chk_already_in_grid(self, storage_index, results, lp):
529 # see if this file is already in the grid
530 lp2 = self.log("doing a quick check+UEBfetch",
531 parent=lp, level=log.NOISY)
532 c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
537 (sharemap, ueb_data, ueb_hash) = res
538 self.log("found file in grid", level=log.NOISY, parent=lp)
539 results.uri_extension_hash = ueb_hash
540 results.sharemap = {}
541 for shnum, peerids in sharemap.items():
542 peers_s = ",".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
543 for peerid in peerids])
544 results.sharemap[shnum] = "Found on " + peers_s
545 results.uri_extension_data = ueb_data
546 results.preexisting_shares = len(sharemap)
547 results.pushed_shares = 0
550 d.addCallback(_checked)
553 def upload_finished(self, storage_index):
554 del self._active_uploads[storage_index]