2 import os, stat, time, weakref
3 from zope.interface import implements
4 from twisted.internet import defer
5 from foolscap.api import Referenceable, DeadReferenceError, eventually
6 import allmydata # for __full_version__
7 from allmydata import interfaces, uri
8 from allmydata.storage.server import si_b2a
9 from allmydata.immutable import upload
10 from allmydata.immutable.layout import ReadBucketProxy
11 from allmydata.util.assertutil import precondition
12 from allmydata.util import log, observer, fileutil, hashutil, dictutil
15 class NotEnoughWritersError(Exception):
19 class CHKCheckerAndUEBFetcher:
20 """I check to see if a file is already present in the grid. I also fetch
21 the URI Extension Block, which is useful for an uploading client who
22 wants to avoid the work of encryption and encoding.
24 I return False if the file is not completely healthy: i.e. if there are
25 less than 'N' shares present.
27 If the file is completely healthy, I return a tuple of (sharemap,
31 def __init__(self, peer_getter, storage_index, logparent=None):
32 self._peer_getter = peer_getter
33 self._found_shares = set()
34 self._storage_index = storage_index
35 self._sharemap = dictutil.DictOfSets()
39 self._logparent = logparent
41 def log(self, *args, **kwargs):
42 if 'facility' not in kwargs:
43 kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
44 if 'parent' not in kwargs:
45 kwargs['parent'] = self._logparent
46 return log.msg(*args, **kwargs)
49 d = self._get_all_shareholders(self._storage_index)
50 d.addCallback(self._get_uri_extension)
51 d.addCallback(self._done)
54 def _get_all_shareholders(self, storage_index):
56 for s in self._peer_getter(storage_index):
57 d = s.get_rref().callRemote("get_buckets", storage_index)
58 d.addCallbacks(self._got_response, self._got_error,
61 return defer.DeferredList(dl)
63 def _got_response(self, buckets, server):
64 # buckets is a dict: maps shum to an rref of the server who holds it
65 shnums_s = ",".join([str(shnum) for shnum in buckets])
66 self.log("got_response: [%s] has %d shares (%s)" %
67 (server.get_name(), len(buckets), shnums_s),
69 self._found_shares.update(buckets.keys())
71 self._sharemap.add(k, server.get_serverid())
72 self._readers.update( [ (bucket, server)
73 for bucket in buckets.values() ] )
75 def _got_error(self, f):
76 if f.check(DeadReferenceError):
78 log.err(f, parent=self._logparent)
81 def _get_uri_extension(self, res):
82 # assume that we can pull the UEB from any share. If we get an error,
83 # declare the whole file unavailable.
85 self.log("no readers, so no UEB", level=log.NOISY)
87 b,server = self._readers.pop()
88 rbp = ReadBucketProxy(b, server, si_b2a(self._storage_index))
89 d = rbp.get_uri_extension()
90 d.addCallback(self._got_uri_extension)
91 d.addErrback(self._ueb_error)
94 def _got_uri_extension(self, ueb):
95 self.log("_got_uri_extension", level=log.NOISY)
96 self._ueb_hash = hashutil.uri_extension_hash(ueb)
97 self._ueb_data = uri.unpack_extension(ueb)
99 def _ueb_error(self, f):
100 # an error means the file is unavailable, but the overall check
102 self.log("UEB fetch failed", failure=f, level=log.WEIRD, umid="sJLKVg")
105 def _done(self, res):
107 found = len(self._found_shares)
108 total = self._ueb_data['total_shares']
109 self.log(format="got %(found)d shares of %(total)d",
110 found=found, total=total, level=log.NOISY)
112 # not all shares are present in the grid
113 self.log("not enough to qualify, file not found in grid",
116 # all shares are present
117 self.log("all shares present, file is found in grid",
119 return (self._sharemap, self._ueb_data, self._ueb_hash)
120 # no shares are present
121 self.log("unable to find UEB data, file not found in grid",
126 class CHKUploadHelper(Referenceable, upload.CHKUploader):
127 """I am the helper-server -side counterpart to AssistedUploader. I handle
128 peer selection, encoding, and share pushing. I read ciphertext from the
129 remote AssistedUploader.
131 implements(interfaces.RICHKUploadHelper)
132 VERSION = { "http://allmydata.org/tahoe/protocols/helper/chk-upload/v1" :
134 "application-version": str(allmydata.__full_version__),
137 def __init__(self, storage_index,
138 helper, storage_broker, secret_holder,
139 incoming_file, encoding_file,
141 self._storage_index = storage_index
142 self._helper = helper
143 self._incoming_file = incoming_file
144 self._encoding_file = encoding_file
145 self._upload_id = si_b2a(storage_index)[:5]
146 self._log_number = log_number
147 self._upload_status = upload.UploadStatus()
148 self._upload_status.set_helper(False)
149 self._upload_status.set_storage_index(storage_index)
150 self._upload_status.set_status("fetching ciphertext")
151 self._upload_status.set_progress(0, 1.0)
152 self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
155 self._storage_broker = storage_broker
156 self._secret_holder = secret_holder
157 self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
159 self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
160 self._finished_observers = observer.OneShotObserverList()
162 self._started = time.time()
163 d = self._fetcher.when_done()
164 d.addCallback(lambda res: self._reader.start())
165 d.addCallback(lambda res: self.start_encrypted(self._reader))
166 d.addCallback(self._finished)
167 d.addErrback(self._failed)
169 def log(self, *args, **kwargs):
170 if 'facility' not in kwargs:
171 kwargs['facility'] = "tahoe.helper.chk"
172 return upload.CHKUploader.log(self, *args, **kwargs)
174 def remote_get_version(self):
177 def remote_upload(self, reader):
178 # reader is an RIEncryptedUploadable. I am specified to return an
179 # UploadResults dictionary.
181 # Log how much ciphertext we need to get.
182 self.log("deciding whether to upload the file or not", level=log.NOISY)
183 if os.path.exists(self._encoding_file):
184 # we have the whole file, and we might be encoding it (or the
185 # encode/upload might have failed, and we need to restart it).
186 self.log("ciphertext already in place", level=log.UNUSUAL)
187 elif os.path.exists(self._incoming_file):
188 # we have some of the file, but not all of it (otherwise we'd be
189 # encoding). The caller might be useful.
190 self.log("partial ciphertext already present", level=log.UNUSUAL)
192 # we don't remember uploading this file
193 self.log("no ciphertext yet", level=log.NOISY)
195 # let our fetcher pull ciphertext from the reader.
196 self._fetcher.add_reader(reader)
198 self._reader.add_reader(reader)
200 # and inform the client when the upload has finished
201 return self._finished_observers.when_fired()
203 def _finished(self, ur):
204 assert interfaces.IUploadResults.providedBy(ur), ur
205 vcapstr = ur.get_verifycapstr()
206 precondition(isinstance(vcapstr, str), vcapstr)
207 v = uri.from_string(vcapstr)
208 f_times = self._fetcher.get_times()
210 hur = upload.HelperUploadResults()
211 hur.timings = {"cumulative_fetch": f_times["cumulative_fetch"],
212 "total_fetch": f_times["total"],
214 for key,val in ur.get_timings().items():
215 hur.timings[key] = val
216 hur.uri_extension_hash = v.uri_extension_hash
217 hur.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
218 hur.preexisting_shares = ur.get_preexisting_shares()
219 # hur.sharemap needs to be {shnum: set(serverid)}
221 for shnum, servers in ur.get_sharemap().items():
222 hur.sharemap[shnum] = set([s.get_serverid() for s in servers])
223 # and hur.servermap needs to be {serverid: set(shnum)}
225 for server, shnums in ur.get_servermap().items():
226 hur.servermap[server.get_serverid()] = set(shnums)
227 hur.pushed_shares = ur.get_pushed_shares()
228 hur.file_size = ur.get_file_size()
229 hur.uri_extension_data = ur.get_uri_extension_data()
230 hur.verifycapstr = vcapstr
233 os.unlink(self._encoding_file)
234 self._finished_observers.fire(hur)
235 self._helper.upload_finished(self._storage_index, v.size)
238 def _failed(self, f):
239 self.log(format="CHKUploadHelper(%(si)s) failed",
240 si=si_b2a(self._storage_index)[:5],
243 self._finished_observers.fire(f)
244 self._helper.upload_finished(self._storage_index, 0)
247 class AskUntilSuccessMixin:
248 # create me with a _reader array
251 def add_reader(self, reader):
252 self._readers.append(reader)
254 def call(self, *args, **kwargs):
255 if not self._readers:
256 raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
257 rr = self._readers[0]
258 d = rr.callRemote(*args, **kwargs)
260 self._last_failure = f
261 if rr in self._readers:
262 self._readers.remove(rr)
263 self._upload_helper.log("call to assisted uploader %s failed" % rr,
264 failure=f, level=log.UNUSUAL)
265 # we can try again with someone else who's left
266 return self.call(*args, **kwargs)
270 class CHKCiphertextFetcher(AskUntilSuccessMixin):
271 """I use one or more remote RIEncryptedUploadable instances to gather
272 ciphertext on disk. When I'm done, the file I create can be used by a
273 LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
276 I begin pulling ciphertext as soon as a reader is added. I remove readers
277 when they have any sort of error. If the last reader is removed, I fire
278 my when_done() Deferred with a failure.
280 I fire my when_done() Deferred (with None) immediately after I have moved
281 the ciphertext to 'encoded_file'.
284 def __init__(self, helper, incoming_file, encoded_file, logparent):
285 self._upload_helper = helper
286 self._incoming_file = incoming_file
287 self._encoding_file = encoded_file
288 self._upload_id = helper._upload_id
289 self._log_parent = logparent
290 self._done_observers = observer.OneShotObserverList()
292 self._started = False
295 "cumulative_fetch": 0.0,
298 self._ciphertext_fetched = 0
300 def log(self, *args, **kwargs):
301 if "facility" not in kwargs:
302 kwargs["facility"] = "tahoe.helper.chkupload.fetch"
303 if "parent" not in kwargs:
304 kwargs["parent"] = self._log_parent
305 return log.msg(*args, **kwargs)
307 def add_reader(self, reader):
308 AskUntilSuccessMixin.add_reader(self, reader)
309 eventually(self._start)
315 started = time.time()
317 if os.path.exists(self._encoding_file):
318 self.log("ciphertext already present, bypassing fetch",
320 # we'll still need the plaintext hashes (when
321 # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
322 # called), and currently the easiest way to get them is to ask
323 # the sender for the last byte of ciphertext. That will provoke
324 # them into reading and hashing (but not sending) everything
326 have = os.stat(self._encoding_file)[stat.ST_SIZE]
327 d = self.call("read_encrypted", have-1, 1)
328 d.addCallback(self._done2, started)
331 # first, find out how large the file is going to be
332 d = self.call("get_size")
333 d.addCallback(self._got_size)
334 d.addCallback(self._start_reading)
335 d.addCallback(self._done)
336 d.addCallback(self._done2, started)
337 d.addErrback(self._failed)
339 def _got_size(self, size):
340 self.log("total size is %d bytes" % size, level=log.NOISY)
341 self._upload_helper._upload_status.set_size(size)
342 self._expected_size = size
344 def _start_reading(self, res):
345 # then find out how much crypttext we have on disk
346 if os.path.exists(self._incoming_file):
347 self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
348 self._upload_helper._helper.count("chk_upload_helper.resumes")
349 self.log("we already have %d bytes" % self._have, level=log.NOISY)
352 self.log("we do not have any ciphertext yet", level=log.NOISY)
353 self.log("starting ciphertext fetch", level=log.NOISY)
354 self._f = open(self._incoming_file, "ab")
356 # now loop to pull the data from the readers
359 # this Deferred will be fired once the last byte has been written to
363 # read data in 50kB chunks. We should choose a more considered number
364 # here, possibly letting the client specify it. The goal should be to
365 # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
366 # the upload bandwidth lost because this protocol is non-windowing. Too
367 # large, however, means more memory consumption for both ends. Something
368 # that can be transferred in, say, 10 seconds sounds about right. On my
369 # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
370 # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
371 # memory than I want to hang on to, so I'm going to go with 50kB and see
375 def _loop(self, fire_when_done):
376 # this slightly weird structure is needed because Deferreds don't do
377 # tail-recursion, so it is important to let each one retire promptly.
378 # Simply chaining them will cause a stack overflow at the end of a
379 # transfer that involves more than a few hundred chunks.
380 # 'fire_when_done' lives a long time, but the Deferreds returned by
381 # the inner _fetch() call do not.
383 d = defer.maybeDeferred(self._fetch)
385 elapsed = time.time() - start
386 self._times["cumulative_fetch"] += elapsed
388 self.log("finished reading ciphertext", level=log.NOISY)
389 fire_when_done.callback(None)
391 self._loop(fire_when_done)
393 self.log(format="[%(si)s] ciphertext read failed",
394 si=self._upload_id, failure=f, level=log.UNUSUAL)
395 fire_when_done.errback(f)
396 d.addCallbacks(_done, _err)
400 needed = self._expected_size - self._have
401 fetch_size = min(needed, self.CHUNK_SIZE)
403 self._upload_helper._upload_status.set_progress(1, 1.0)
404 return True # all done
406 if self._expected_size:
407 percent = 1.0 * (self._have+fetch_size) / self._expected_size
408 self.log(format="fetching [%(si)s] %(start)d-%(end)d of %(total)d (%(percent)d%%)",
411 end=self._have+fetch_size,
412 total=self._expected_size,
413 percent=int(100.0*percent),
415 d = self.call("read_encrypted", self._have, fetch_size)
416 def _got_data(ciphertext_v):
417 for data in ciphertext_v:
419 self._have += len(data)
420 self._ciphertext_fetched += len(data)
421 self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
422 self._upload_helper._upload_status.set_progress(1, percent)
423 return False # not done
424 d.addCallback(_got_data)
427 def _done(self, res):
430 self.log(format="done fetching ciphertext, size=%(size)d",
431 size=os.stat(self._incoming_file)[stat.ST_SIZE],
433 os.rename(self._incoming_file, self._encoding_file)
435 def _done2(self, _ignored, started):
436 self.log("done2", level=log.NOISY)
437 elapsed = time.time() - started
438 self._times["total"] = elapsed
440 self._done_observers.fire(None)
442 def _failed(self, f):
446 self._done_observers.fire(f)
449 return self._done_observers.when_fired()
454 def get_ciphertext_fetched(self):
455 return self._ciphertext_fetched
458 class LocalCiphertextReader(AskUntilSuccessMixin):
459 implements(interfaces.IEncryptedUploadable)
461 def __init__(self, upload_helper, storage_index, encoding_file):
463 self._upload_helper = upload_helper
464 self._storage_index = storage_index
465 self._encoding_file = encoding_file
469 self._upload_helper._upload_status.set_status("pushing")
470 self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
471 self.f = open(self._encoding_file, "rb")
474 return defer.succeed(self._size)
476 def get_all_encoding_parameters(self):
477 return self.call("get_all_encoding_parameters")
479 def get_storage_index(self):
480 return defer.succeed(self._storage_index)
482 def read_encrypted(self, length, hash_only):
483 assert hash_only is False
484 d = defer.maybeDeferred(self.f.read, length)
485 d.addCallback(lambda data: [data])
490 # ??. I'm not sure if it makes sense to forward the close message.
491 return self.call("close")
495 class Helper(Referenceable):
496 implements(interfaces.RIHelper, interfaces.IStatsProducer)
497 # this is the non-distributed version. When we need to have multiple
498 # helpers, this object will become the HelperCoordinator, and will query
499 # the farm of Helpers to see if anyone has the storage_index of interest,
500 # and send the request off to them. If nobody has it, we'll choose a
504 VERSION = { "http://allmydata.org/tahoe/protocols/helper/v1" :
506 "application-version": str(allmydata.__full_version__),
508 MAX_UPLOAD_STATUSES = 10
510 def __init__(self, basedir, storage_broker, secret_holder,
511 stats_provider, history):
512 self._basedir = basedir
513 self._storage_broker = storage_broker
514 self._secret_holder = secret_holder
515 self._chk_incoming = os.path.join(basedir, "CHK_incoming")
516 self._chk_encoding = os.path.join(basedir, "CHK_encoding")
517 fileutil.make_dirs(self._chk_incoming)
518 fileutil.make_dirs(self._chk_encoding)
519 self._active_uploads = {}
520 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
521 self.stats_provider = stats_provider
523 stats_provider.register_producer(self)
524 self._counters = {"chk_upload_helper.upload_requests": 0,
525 "chk_upload_helper.upload_already_present": 0,
526 "chk_upload_helper.upload_need_upload": 0,
527 "chk_upload_helper.resumes": 0,
528 "chk_upload_helper.fetched_bytes": 0,
529 "chk_upload_helper.encoded_bytes": 0,
531 self._history = history
533 def log(self, *args, **kwargs):
534 if 'facility' not in kwargs:
535 kwargs['facility'] = "tahoe.helper"
536 return log.msg(*args, **kwargs)
538 def count(self, key, value=1):
539 if self.stats_provider:
540 self.stats_provider.count(key, value)
541 self._counters[key] += value
544 OLD = 86400*2 # 48hours
546 inc_count = inc_size = inc_size_old = 0
547 enc_count = enc_size = enc_size_old = 0
548 inc = os.listdir(self._chk_incoming)
549 enc = os.listdir(self._chk_encoding)
551 s = os.stat(os.path.join(self._chk_incoming, f))
552 size = s[stat.ST_SIZE]
553 mtime = s[stat.ST_MTIME]
556 if now - mtime > OLD:
559 s = os.stat(os.path.join(self._chk_encoding, f))
560 size = s[stat.ST_SIZE]
561 mtime = s[stat.ST_MTIME]
564 if now - mtime > OLD:
566 stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads),
567 'chk_upload_helper.incoming_count': inc_count,
568 'chk_upload_helper.incoming_size': inc_size,
569 'chk_upload_helper.incoming_size_old': inc_size_old,
570 'chk_upload_helper.encoding_count': enc_count,
571 'chk_upload_helper.encoding_size': enc_size,
572 'chk_upload_helper.encoding_size_old': enc_size_old,
574 stats.update(self._counters)
577 def remote_get_version(self):
580 def remote_upload_chk(self, storage_index):
581 self.count("chk_upload_helper.upload_requests")
582 lp = self.log(format="helper: upload_chk query for SI %(si)s",
583 si=si_b2a(storage_index))
584 if storage_index in self._active_uploads:
585 self.log("upload is currently active", parent=lp)
586 uh = self._active_uploads[storage_index]
589 d = self._check_chk(storage_index, lp)
590 d.addCallback(self._did_chk_check, storage_index, lp)
592 self.log("error while checking for chk-already-in-grid",
593 failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
598 def _check_chk(self, storage_index, lp):
599 # see if this file is already in the grid
600 lp2 = self.log("doing a quick check+UEBfetch",
601 parent=lp, level=log.NOISY)
602 sb = self._storage_broker
603 c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2)
607 (sharemap, ueb_data, ueb_hash) = res
608 self.log("found file in grid", level=log.NOISY, parent=lp)
609 hur = upload.HelperUploadResults()
610 hur.uri_extension_hash = ueb_hash
611 hur.sharemap = sharemap
612 hur.uri_extension_data = ueb_data
613 hur.preexisting_shares = len(sharemap)
614 hur.pushed_shares = 0
617 d.addCallback(_checked)
620 def _did_chk_check(self, already_present, storage_index, lp):
622 # the necessary results are placed in the UploadResults
623 self.count("chk_upload_helper.upload_already_present")
624 self.log("file already found in grid", parent=lp)
625 return (already_present, None)
627 self.count("chk_upload_helper.upload_need_upload")
628 # the file is not present in the grid, by which we mean there are
629 # less than 'N' shares available.
630 self.log("unable to find file in the grid", parent=lp,
632 # We need an upload helper. Check our active uploads again in
633 # case there was a race.
634 if storage_index in self._active_uploads:
635 self.log("upload is currently active", parent=lp)
636 uh = self._active_uploads[storage_index]
638 self.log("creating new upload helper", parent=lp)
639 uh = self._make_chk_upload_helper(storage_index, lp)
640 self._active_uploads[storage_index] = uh
644 def _make_chk_upload_helper(self, storage_index, lp):
645 si_s = si_b2a(storage_index)
646 incoming_file = os.path.join(self._chk_incoming, si_s)
647 encoding_file = os.path.join(self._chk_encoding, si_s)
648 uh = CHKUploadHelper(storage_index, self,
649 self._storage_broker,
651 incoming_file, encoding_file,
655 def _add_upload(self, uh):
656 self._all_uploads[uh] = None
658 s = uh.get_upload_status()
659 self._history.notify_helper_upload(s)
661 def upload_finished(self, storage_index, size):
662 # this is called with size=0 if the upload failed
663 self.count("chk_upload_helper.encoded_bytes", size)
664 uh = self._active_uploads[storage_index]
665 del self._active_uploads[storage_index]
666 s = uh.get_upload_status()