2 import os, stat, time, weakref
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap.api import Referenceable, DeadReferenceError, eventually
7 import allmydata # for __full_version__
8 from allmydata import interfaces, uri
9 from allmydata.storage.server import si_b2a
10 from allmydata.immutable import upload
11 from allmydata.immutable.layout import ReadBucketProxy
12 from allmydata.util.assertutil import precondition
13 from allmydata.util import idlib, log, observer, fileutil, hashutil, dictutil
16 class NotEnoughWritersError(Exception):
20 class CHKCheckerAndUEBFetcher:
21 """I check to see if a file is already present in the grid. I also fetch
22 the URI Extension Block, which is useful for an uploading client who
23 wants to avoid the work of encryption and encoding.
25 I return False if the file is not completely healthy: i.e. if there are
26 less than 'N' shares present.
28 If the file is completely healthy, I return a tuple of (sharemap,
32 def __init__(self, peer_getter, storage_index, logparent=None):
33 self._peer_getter = peer_getter
34 self._found_shares = set()
35 self._storage_index = storage_index
36 self._sharemap = dictutil.DictOfSets()
40 self._logparent = logparent
42 def log(self, *args, **kwargs):
43 if 'facility' not in kwargs:
44 kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
45 if 'parent' not in kwargs:
46 kwargs['parent'] = self._logparent
47 return log.msg(*args, **kwargs)
50 d = self._get_all_shareholders(self._storage_index)
51 d.addCallback(self._get_uri_extension)
52 d.addCallback(self._done)
55 def _get_all_shareholders(self, storage_index):
57 for (peerid, ss) in self._peer_getter(storage_index):
58 d = ss.callRemote("get_buckets", storage_index)
59 d.addCallbacks(self._got_response, self._got_error,
60 callbackArgs=(peerid,))
62 return defer.DeferredList(dl)
64 def _got_response(self, buckets, peerid):
65 # buckets is a dict: maps shum to an rref of the server who holds it
66 shnums_s = ",".join([str(shnum) for shnum in buckets])
67 self.log("got_response: [%s] has %d shares (%s)" %
68 (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
70 self._found_shares.update(buckets.keys())
72 self._sharemap.add(k, peerid)
73 self._readers.update( [ (bucket, peerid)
74 for bucket in buckets.values() ] )
76 def _got_error(self, f):
77 if f.check(DeadReferenceError):
79 log.err(f, parent=self._logparent)
82 def _get_uri_extension(self, res):
83 # assume that we can pull the UEB from any share. If we get an error,
84 # declare the whole file unavailable.
86 self.log("no readers, so no UEB", level=log.NOISY)
88 b,peerid = self._readers.pop()
89 rbp = ReadBucketProxy(b, peerid, si_b2a(self._storage_index))
90 d = rbp.get_uri_extension()
91 d.addCallback(self._got_uri_extension)
92 d.addErrback(self._ueb_error)
95 def _got_uri_extension(self, ueb):
96 self.log("_got_uri_extension", level=log.NOISY)
97 self._ueb_hash = hashutil.uri_extension_hash(ueb)
98 self._ueb_data = uri.unpack_extension(ueb)
100 def _ueb_error(self, f):
101 # an error means the file is unavailable, but the overall check
103 self.log("UEB fetch failed", failure=f, level=log.WEIRD, umid="sJLKVg")
106 def _done(self, res):
108 found = len(self._found_shares)
109 total = self._ueb_data['total_shares']
110 self.log(format="got %(found)d shares of %(total)d",
111 found=found, total=total, level=log.NOISY)
113 # not all shares are present in the grid
114 self.log("not enough to qualify, file not found in grid",
117 # all shares are present
118 self.log("all shares present, file is found in grid",
120 return (self._sharemap, self._ueb_data, self._ueb_hash)
121 # no shares are present
122 self.log("unable to find UEB data, file not found in grid",
127 class CHKUploadHelper(Referenceable, upload.CHKUploader):
128 """I am the helper-server -side counterpart to AssistedUploader. I handle
129 peer selection, encoding, and share pushing. I read ciphertext from the
130 remote AssistedUploader.
132 implements(interfaces.RICHKUploadHelper)
133 VERSION = { "http://allmydata.org/tahoe/protocols/helper/chk-upload/v1" :
135 "application-version": str(allmydata.__full_version__),
138 def __init__(self, storage_index, helper,
139 incoming_file, encoding_file,
140 results, log_number):
141 self._storage_index = storage_index
142 self._helper = helper
143 self._incoming_file = incoming_file
144 self._encoding_file = encoding_file
145 self._upload_id = si_b2a(storage_index)[:5]
146 self._log_number = log_number
147 self._results = results
148 self._upload_status = upload.UploadStatus()
149 self._upload_status.set_helper(False)
150 self._upload_status.set_storage_index(storage_index)
151 self._upload_status.set_status("fetching ciphertext")
152 self._upload_status.set_progress(0, 1.0)
153 self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
156 self._client = helper.parent
157 self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
159 self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
160 self._finished_observers = observer.OneShotObserverList()
162 d = self._fetcher.when_done()
163 d.addCallback(lambda res: self._reader.start())
164 d.addCallback(lambda res: self.start_encrypted(self._reader))
165 d.addCallback(self._finished)
166 d.addErrback(self._failed)
168 def log(self, *args, **kwargs):
169 if 'facility' not in kwargs:
170 kwargs['facility'] = "tahoe.helper.chk"
171 return upload.CHKUploader.log(self, *args, **kwargs)
174 self._started = time.time()
175 # determine if we need to upload the file. If so, return ({},self) .
176 # If not, return (UploadResults,None) .
177 self.log("deciding whether to upload the file or not", level=log.NOISY)
178 if os.path.exists(self._encoding_file):
179 # we have the whole file, and we might be encoding it (or the
180 # encode/upload might have failed, and we need to restart it).
181 self.log("ciphertext already in place", level=log.UNUSUAL)
182 return (self._results, self)
183 if os.path.exists(self._incoming_file):
184 # we have some of the file, but not all of it (otherwise we'd be
185 # encoding). The caller might be useful.
186 self.log("partial ciphertext already present", level=log.UNUSUAL)
187 return (self._results, self)
188 # we don't remember uploading this file
189 self.log("no ciphertext yet", level=log.NOISY)
190 return (self._results, self)
192 def remote_get_version(self):
195 def remote_upload(self, reader):
196 # reader is an RIEncryptedUploadable. I am specified to return an
197 # UploadResults dictionary.
199 # let our fetcher pull ciphertext from the reader.
200 self._fetcher.add_reader(reader)
202 self._reader.add_reader(reader)
204 # and inform the client when the upload has finished
205 return self._finished_observers.when_fired()
207 def _finished(self, uploadresults):
208 precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
209 assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
211 v = uri.from_string(r.verifycapstr)
212 r.uri_extension_hash = v.uri_extension_hash
213 f_times = self._fetcher.get_times()
214 r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
215 r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
216 r.timings["total_fetch"] = f_times["total"]
218 os.unlink(self._encoding_file)
219 self._finished_observers.fire(r)
220 self._helper.upload_finished(self._storage_index, v.size)
223 def _failed(self, f):
224 self.log(format="CHKUploadHelper(%(si)s) failed",
225 si=si_b2a(self._storage_index)[:5],
228 self._finished_observers.fire(f)
229 self._helper.upload_finished(self._storage_index, 0)
232 class AskUntilSuccessMixin:
233 # create me with a _reader array
236 def add_reader(self, reader):
237 self._readers.append(reader)
239 def call(self, *args, **kwargs):
240 if not self._readers:
241 raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
242 rr = self._readers[0]
243 d = rr.callRemote(*args, **kwargs)
245 self._last_failure = f
246 if rr in self._readers:
247 self._readers.remove(rr)
248 self._upload_helper.log("call to assisted uploader %s failed" % rr,
249 failure=f, level=log.UNUSUAL)
250 # we can try again with someone else who's left
251 return self.call(*args, **kwargs)
255 class CHKCiphertextFetcher(AskUntilSuccessMixin):
256 """I use one or more remote RIEncryptedUploadable instances to gather
257 ciphertext on disk. When I'm done, the file I create can be used by a
258 LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
261 I begin pulling ciphertext as soon as a reader is added. I remove readers
262 when they have any sort of error. If the last reader is removed, I fire
263 my when_done() Deferred with a failure.
265 I fire my when_done() Deferred (with None) immediately after I have moved
266 the ciphertext to 'encoded_file'.
269 def __init__(self, helper, incoming_file, encoded_file, logparent):
270 self._upload_helper = helper
271 self._incoming_file = incoming_file
272 self._encoding_file = encoded_file
273 self._upload_id = helper._upload_id
274 self._log_parent = logparent
275 self._done_observers = observer.OneShotObserverList()
277 self._started = False
280 "cumulative_fetch": 0.0,
283 self._ciphertext_fetched = 0
285 def log(self, *args, **kwargs):
286 if "facility" not in kwargs:
287 kwargs["facility"] = "tahoe.helper.chkupload.fetch"
288 if "parent" not in kwargs:
289 kwargs["parent"] = self._log_parent
290 return log.msg(*args, **kwargs)
292 def add_reader(self, reader):
293 AskUntilSuccessMixin.add_reader(self, reader)
294 eventually(self._start)
300 started = time.time()
302 if os.path.exists(self._encoding_file):
303 self.log("ciphertext already present, bypassing fetch",
305 # we'll still need the plaintext hashes (when
306 # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
307 # called), and currently the easiest way to get them is to ask
308 # the sender for the last byte of ciphertext. That will provoke
309 # them into reading and hashing (but not sending) everything
311 have = os.stat(self._encoding_file)[stat.ST_SIZE]
312 d = self.call("read_encrypted", have-1, 1)
313 d.addCallback(self._done2, started)
316 # first, find out how large the file is going to be
317 d = self.call("get_size")
318 d.addCallback(self._got_size)
319 d.addCallback(self._start_reading)
320 d.addCallback(self._done)
321 d.addCallback(self._done2, started)
322 d.addErrback(self._failed)
324 def _got_size(self, size):
325 self.log("total size is %d bytes" % size, level=log.NOISY)
326 self._upload_helper._upload_status.set_size(size)
327 self._expected_size = size
329 def _start_reading(self, res):
330 # then find out how much crypttext we have on disk
331 if os.path.exists(self._incoming_file):
332 self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
333 self._upload_helper._helper.count("chk_upload_helper.resumes")
334 self.log("we already have %d bytes" % self._have, level=log.NOISY)
337 self.log("we do not have any ciphertext yet", level=log.NOISY)
338 self.log("starting ciphertext fetch", level=log.NOISY)
339 self._f = open(self._incoming_file, "ab")
341 # now loop to pull the data from the readers
344 # this Deferred will be fired once the last byte has been written to
348 # read data in 50kB chunks. We should choose a more considered number
349 # here, possibly letting the client specify it. The goal should be to
350 # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
351 # the upload bandwidth lost because this protocol is non-windowing. Too
352 # large, however, means more memory consumption for both ends. Something
353 # that can be transferred in, say, 10 seconds sounds about right. On my
354 # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
355 # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
356 # memory than I want to hang on to, so I'm going to go with 50kB and see
360 def _loop(self, fire_when_done):
361 # this slightly weird structure is needed because Deferreds don't do
362 # tail-recursion, so it is important to let each one retire promptly.
363 # Simply chaining them will cause a stack overflow at the end of a
364 # transfer that involves more than a few hundred chunks.
365 # 'fire_when_done' lives a long time, but the Deferreds returned by
366 # the inner _fetch() call do not.
368 d = defer.maybeDeferred(self._fetch)
370 elapsed = time.time() - start
371 self._times["cumulative_fetch"] += elapsed
373 self.log("finished reading ciphertext", level=log.NOISY)
374 fire_when_done.callback(None)
376 self._loop(fire_when_done)
378 self.log(format="[%(si)s] ciphertext read failed",
379 si=self._upload_id, failure=f, level=log.UNUSUAL)
380 fire_when_done.errback(f)
381 d.addCallbacks(_done, _err)
385 needed = self._expected_size - self._have
386 fetch_size = min(needed, self.CHUNK_SIZE)
388 self._upload_helper._upload_status.set_progress(1, 1.0)
389 return True # all done
391 if self._expected_size:
392 percent = 1.0 * (self._have+fetch_size) / self._expected_size
393 self.log(format="fetching [%(si)s] %(start)d-%(end)d of %(total)d (%(percent)d%%)",
396 end=self._have+fetch_size,
397 total=self._expected_size,
398 percent=int(100.0*percent),
400 d = self.call("read_encrypted", self._have, fetch_size)
401 def _got_data(ciphertext_v):
402 for data in ciphertext_v:
404 self._have += len(data)
405 self._ciphertext_fetched += len(data)
406 self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
407 self._upload_helper._upload_status.set_progress(1, percent)
408 return False # not done
409 d.addCallback(_got_data)
412 def _done(self, res):
415 self.log(format="done fetching ciphertext, size=%(size)d",
416 size=os.stat(self._incoming_file)[stat.ST_SIZE],
418 os.rename(self._incoming_file, self._encoding_file)
420 def _done2(self, _ignored, started):
421 self.log("done2", level=log.NOISY)
422 elapsed = time.time() - started
423 self._times["total"] = elapsed
425 self._done_observers.fire(None)
427 def _failed(self, f):
431 self._done_observers.fire(f)
434 return self._done_observers.when_fired()
439 def get_ciphertext_fetched(self):
440 return self._ciphertext_fetched
443 class LocalCiphertextReader(AskUntilSuccessMixin):
444 implements(interfaces.IEncryptedUploadable)
446 def __init__(self, upload_helper, storage_index, encoding_file):
448 self._upload_helper = upload_helper
449 self._storage_index = storage_index
450 self._encoding_file = encoding_file
454 self._upload_helper._upload_status.set_status("pushing")
455 self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
456 self.f = open(self._encoding_file, "rb")
459 return defer.succeed(self._size)
461 def get_all_encoding_parameters(self):
462 return self.call("get_all_encoding_parameters")
464 def get_storage_index(self):
465 return defer.succeed(self._storage_index)
467 def read_encrypted(self, length, hash_only):
468 assert hash_only is False
469 d = defer.maybeDeferred(self.f.read, length)
470 d.addCallback(lambda data: [data])
475 # ??. I'm not sure if it makes sense to forward the close message.
476 return self.call("close")
480 class Helper(Referenceable, service.MultiService):
481 implements(interfaces.RIHelper, interfaces.IStatsProducer)
482 # this is the non-distributed version. When we need to have multiple
483 # helpers, this object will become the HelperCoordinator, and will query
484 # the farm of Helpers to see if anyone has the storage_index of interest,
485 # and send the request off to them. If nobody has it, we'll choose a
489 VERSION = { "http://allmydata.org/tahoe/protocols/helper/v1" :
491 "application-version": str(allmydata.__full_version__),
493 chk_upload_helper_class = CHKUploadHelper
494 MAX_UPLOAD_STATUSES = 10
496 def __init__(self, basedir, stats_provider=None):
497 self._basedir = basedir
498 self._chk_incoming = os.path.join(basedir, "CHK_incoming")
499 self._chk_encoding = os.path.join(basedir, "CHK_encoding")
500 fileutil.make_dirs(self._chk_incoming)
501 fileutil.make_dirs(self._chk_encoding)
502 self._active_uploads = {}
503 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
504 self._all_upload_statuses = weakref.WeakKeyDictionary()
505 self._recent_upload_statuses = []
506 self.stats_provider = stats_provider
508 stats_provider.register_producer(self)
509 self._counters = {"chk_upload_helper.upload_requests": 0,
510 "chk_upload_helper.upload_already_present": 0,
511 "chk_upload_helper.upload_need_upload": 0,
512 "chk_upload_helper.resumes": 0,
513 "chk_upload_helper.fetched_bytes": 0,
514 "chk_upload_helper.encoded_bytes": 0,
516 service.MultiService.__init__(self)
518 def setServiceParent(self, parent):
519 service.MultiService.setServiceParent(self, parent)
521 def log(self, *args, **kwargs):
522 if 'facility' not in kwargs:
523 kwargs['facility'] = "tahoe.helper"
524 return self.parent.log(*args, **kwargs)
526 def count(self, key, value=1):
527 if self.stats_provider:
528 self.stats_provider.count(key, value)
529 self._counters[key] += value
532 OLD = 86400*2 # 48hours
534 inc_count = inc_size = inc_size_old = 0
535 enc_count = enc_size = enc_size_old = 0
536 inc = os.listdir(self._chk_incoming)
537 enc = os.listdir(self._chk_encoding)
539 s = os.stat(os.path.join(self._chk_incoming, f))
540 size = s[stat.ST_SIZE]
541 mtime = s[stat.ST_MTIME]
544 if now - mtime > OLD:
547 s = os.stat(os.path.join(self._chk_encoding, f))
548 size = s[stat.ST_SIZE]
549 mtime = s[stat.ST_MTIME]
552 if now - mtime > OLD:
554 stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads),
555 'chk_upload_helper.incoming_count': inc_count,
556 'chk_upload_helper.incoming_size': inc_size,
557 'chk_upload_helper.incoming_size_old': inc_size_old,
558 'chk_upload_helper.encoding_count': enc_count,
559 'chk_upload_helper.encoding_size': enc_size,
560 'chk_upload_helper.encoding_size_old': enc_size_old,
562 stats.update(self._counters)
565 def remote_get_version(self):
568 def remote_upload_chk(self, storage_index):
569 self.count("chk_upload_helper.upload_requests")
570 r = upload.UploadResults()
571 started = time.time()
572 si_s = si_b2a(storage_index)
573 lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
574 incoming_file = os.path.join(self._chk_incoming, si_s)
575 encoding_file = os.path.join(self._chk_encoding, si_s)
576 if storage_index in self._active_uploads:
577 self.log("upload is currently active", parent=lp)
578 uh = self._active_uploads[storage_index]
581 d = self._check_for_chk_already_in_grid(storage_index, r, lp)
582 def _checked(already_present):
583 elapsed = time.time() - started
584 r.timings['existence_check'] = elapsed
586 # the necessary results are placed in the UploadResults
587 self.count("chk_upload_helper.upload_already_present")
588 self.log("file already found in grid", parent=lp)
591 self.count("chk_upload_helper.upload_need_upload")
592 # the file is not present in the grid, by which we mean there are
593 # less than 'N' shares available.
594 self.log("unable to find file in the grid", parent=lp,
596 # We need an upload helper. Check our active uploads again in
597 # case there was a race.
598 if storage_index in self._active_uploads:
599 self.log("upload is currently active", parent=lp)
600 uh = self._active_uploads[storage_index]
602 self.log("creating new upload helper", parent=lp)
603 uh = self.chk_upload_helper_class(storage_index, self,
604 incoming_file, encoding_file,
606 self._active_uploads[storage_index] = uh
609 d.addCallback(_checked)
611 self.log("error while checking for chk-already-in-grid",
612 failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
617 def _check_for_chk_already_in_grid(self, storage_index, results, lp):
618 # see if this file is already in the grid
619 lp2 = self.log("doing a quick check+UEBfetch",
620 parent=lp, level=log.NOISY)
621 sb = self.parent.get_storage_broker()
622 c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2)
626 (sharemap, ueb_data, ueb_hash) = res
627 self.log("found file in grid", level=log.NOISY, parent=lp)
628 results.uri_extension_hash = ueb_hash
629 results.sharemap = sharemap
630 results.uri_extension_data = ueb_data
631 results.preexisting_shares = len(sharemap)
632 results.pushed_shares = 0
635 d.addCallback(_checked)
638 def _add_upload(self, uh):
639 self._all_uploads[uh] = None
640 s = uh.get_upload_status()
641 self._all_upload_statuses[s] = None
642 self._recent_upload_statuses.append(s)
643 while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
644 self._recent_upload_statuses.pop(0)
646 def upload_finished(self, storage_index, size):
647 # this is called with size=0 if the upload failed
648 self.count("chk_upload_helper.encoded_bytes", size)
649 uh = self._active_uploads[storage_index]
650 del self._active_uploads[storage_index]
651 s = uh.get_upload_status()
654 def get_all_upload_statuses(self):
655 return self._all_upload_statuses