]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/offloaded.py
helper status: include percentage fetched+pushed, add helper-uploads to the upload...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / offloaded.py
1
2 import os, stat, time
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 from foolscap.eventual import eventually
8 from allmydata import upload, interfaces, storage, uri
9 from allmydata.util import idlib, log, observer, fileutil, hashutil
10
11
12 class NotEnoughWritersError(Exception):
13     pass
14
15
16 class CHKCheckerAndUEBFetcher:
17     """I check to see if a file is already present in the grid. I also fetch
18     the URI Extension Block, which is useful for an uploading client who
19     wants to avoid the work of encryption and encoding.
20
21     I return False if the file is not completely healthy: i.e. if there are
22     less than 'N' shares present.
23
24     If the file is completely healthy, I return a tuple of (sharemap,
25     UEB_data, UEB_hash).
26     """
27
28     def __init__(self, peer_getter, storage_index, logparent=None):
29         self._peer_getter = peer_getter
30         self._found_shares = set()
31         self._storage_index = storage_index
32         self._sharemap = {}
33         self._readers = set()
34         self._ueb_hash = None
35         self._ueb_data = None
36         self._logparent = logparent
37
38     def log(self, *args, **kwargs):
39         if 'facility' not in kwargs:
40             kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
41         if 'parent' not in kwargs:
42             kwargs['parent'] = self._logparent
43         return log.msg(*args, **kwargs)
44
45     def check(self):
46         d = self._get_all_shareholders(self._storage_index)
47         d.addCallback(self._get_uri_extension)
48         d.addCallback(self._done)
49         return d
50
51     def _get_all_shareholders(self, storage_index):
52         dl = []
53         for (peerid, ss) in self._peer_getter("storage", storage_index):
54             d = ss.callRemote("get_buckets", storage_index)
55             d.addCallbacks(self._got_response, self._got_error,
56                            callbackArgs=(peerid,))
57             dl.append(d)
58         return defer.DeferredList(dl)
59
60     def _got_response(self, buckets, peerid):
61         # buckets is a dict: maps shum to an rref of the server who holds it
62         shnums_s = ",".join([str(shnum) for shnum in buckets])
63         self.log("got_response: [%s] has %d shares (%s)" %
64                  (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
65                  level=log.NOISY)
66         self._found_shares.update(buckets.keys())
67         for k in buckets:
68             if k not in self._sharemap:
69                 self._sharemap[k] = []
70             self._sharemap[k].append(peerid)
71         self._readers.update( [ (bucket, peerid)
72                                 for bucket in buckets.values() ] )
73
74     def _got_error(self, f):
75         if f.check(KeyError):
76             pass
77         log.err(f, parent=self._logparent)
78         pass
79
80     def _get_uri_extension(self, res):
81         # assume that we can pull the UEB from any share. If we get an error,
82         # declare the whole file unavailable.
83         if not self._readers:
84             self.log("no readers, so no UEB", level=log.NOISY)
85             return
86         b,peerid = self._readers.pop()
87         rbp = storage.ReadBucketProxy(b, peerid,
88                                       storage.si_b2a(self._storage_index))
89         d = rbp.startIfNecessary()
90         d.addCallback(lambda res: rbp.get_uri_extension())
91         d.addCallback(self._got_uri_extension)
92         d.addErrback(self._ueb_error)
93         return d
94
95     def _got_uri_extension(self, ueb):
96         self.log("_got_uri_extension", level=log.NOISY)
97         self._ueb_hash = hashutil.uri_extension_hash(ueb)
98         self._ueb_data = uri.unpack_extension(ueb)
99
100     def _ueb_error(self, f):
101         # an error means the file is unavailable, but the overall check
102         # shouldn't fail.
103         self.log("UEB fetch failed", failure=f, level=log.WEIRD)
104         return None
105
106     def _done(self, res):
107         if self._ueb_data:
108             found = len(self._found_shares)
109             total = self._ueb_data['total_shares']
110             self.log(format="got %(found)d shares of %(total)d",
111                      found=found, total=total, level=log.NOISY)
112             if found < total:
113                 # not all shares are present in the grid
114                 self.log("not enough to qualify, file not found in grid",
115                          level=log.NOISY)
116                 return False
117             # all shares are present
118             self.log("all shares present, file is found in grid",
119                      level=log.NOISY)
120             return (self._sharemap, self._ueb_data, self._ueb_hash)
121         # no shares are present
122         self.log("unable to find UEB data, file not found in grid",
123                  level=log.NOISY)
124         return False
125
126
127 class CHKUploadHelper(Referenceable, upload.CHKUploader):
128     """I am the helper-server -side counterpart to AssistedUploader. I handle
129     peer selection, encoding, and share pushing. I read ciphertext from the
130     remote AssistedUploader.
131     """
132     implements(interfaces.RICHKUploadHelper)
133
134     def __init__(self, storage_index, helper,
135                  incoming_file, encoding_file,
136                  results, log_number):
137         self._storage_index = storage_index
138         self._helper = helper
139         self._incoming_file = incoming_file
140         self._encoding_file = encoding_file
141         upload_id = storage.si_b2a(storage_index)[:5]
142         self._log_number = log_number
143         self._results = results
144         self._upload_status = upload.UploadStatus()
145         self._upload_status.set_helper(False)
146         self._upload_status.set_storage_index(storage_index)
147         self._upload_status.set_status("fetching ciphertext")
148         self._upload_status.set_progress(0, 1.0)
149         self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
150                          parent=log_number)
151
152         self._client = helper.parent
153         self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
154                                              self._log_number)
155         self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
156         self._finished_observers = observer.OneShotObserverList()
157
158         d = self._fetcher.when_done()
159         d.addCallback(lambda res: self._reader.start())
160         d.addCallback(lambda res: self.start_encrypted(self._reader))
161         d.addCallback(self._finished)
162         d.addErrback(self._failed)
163
164     def log(self, *args, **kwargs):
165         if 'facility' not in kwargs:
166             kwargs['facility'] = "tahoe.helper.chk"
167         return upload.CHKUploader.log(self, *args, **kwargs)
168
169     def start(self):
170         self._started = time.time()
171         # determine if we need to upload the file. If so, return ({},self) .
172         # If not, return (UploadResults,None) .
173         self.log("deciding whether to upload the file or not", level=log.NOISY)
174         if os.path.exists(self._encoding_file):
175             # we have the whole file, and we might be encoding it (or the
176             # encode/upload might have failed, and we need to restart it).
177             self.log("ciphertext already in place", level=log.UNUSUAL)
178             return (self._results, self)
179         if os.path.exists(self._incoming_file):
180             # we have some of the file, but not all of it (otherwise we'd be
181             # encoding). The caller might be useful.
182             self.log("partial ciphertext already present", level=log.UNUSUAL)
183             return (self._results, self)
184         # we don't remember uploading this file
185         self.log("no ciphertext yet", level=log.NOISY)
186         return (self._results, self)
187
188     def remote_upload(self, reader):
189         # reader is an RIEncryptedUploadable. I am specified to return an
190         # UploadResults dictionary.
191
192         # let our fetcher pull ciphertext from the reader.
193         self._fetcher.add_reader(reader)
194         # and also hashes
195         self._reader.add_reader(reader)
196
197         # and inform the client when the upload has finished
198         return self._finished_observers.when_fired()
199
200     def _finished(self, res):
201         (uri_extension_hash, needed_shares, total_shares, size) = res
202         r = self._results
203         r.uri_extension_hash = uri_extension_hash
204         f_times = self._fetcher.get_times()
205         r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
206         r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
207         r.timings["total_fetch"] = f_times["total"]
208         self._reader.close()
209         os.unlink(self._encoding_file)
210         self._finished_observers.fire(r)
211         self._helper.upload_finished(self._storage_index, size)
212         del self._reader
213
214     def _failed(self, f):
215         self.log(format="CHKUploadHelper(%(si)s) failed",
216                  si=storage.si_b2a(self._storage_index)[:5],
217                  failure=f,
218                  level=log.UNUSUAL)
219         self._finished_observers.fire(f)
220         self._helper.upload_finished(self._storage_index, 0)
221         del self._reader
222
223 class AskUntilSuccessMixin:
224     # create me with a _reader array
225     _last_failure = None
226
227     def add_reader(self, reader):
228         self._readers.append(reader)
229
230     def call(self, *args, **kwargs):
231         if not self._readers:
232             raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
233         rr = self._readers[0]
234         d = rr.callRemote(*args, **kwargs)
235         def _err(f):
236             self._last_failure = f
237             if rr in self._readers:
238                 self._readers.remove(rr)
239             self._upload_helper.log("call to assisted uploader %s failed" % rr,
240                                     failure=f, level=log.UNUSUAL)
241             # we can try again with someone else who's left
242             return self.call(*args, **kwargs)
243         d.addErrback(_err)
244         return d
245
246 class CHKCiphertextFetcher(AskUntilSuccessMixin):
247     """I use one or more remote RIEncryptedUploadable instances to gather
248     ciphertext on disk. When I'm done, the file I create can be used by a
249     LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
250     process.
251
252     I begin pulling ciphertext as soon as a reader is added. I remove readers
253     when they have any sort of error. If the last reader is removed, I fire
254     my when_done() Deferred with a failure.
255
256     I fire my when_done() Deferred (with None) immediately after I have moved
257     the ciphertext to 'encoded_file'.
258     """
259
260     def __init__(self, helper, incoming_file, encoded_file, logparent):
261         self._upload_helper = helper
262         self._incoming_file = incoming_file
263         self._encoding_file = encoded_file
264         self._log_parent = logparent
265         self._done_observers = observer.OneShotObserverList()
266         self._readers = []
267         self._started = False
268         self._f = None
269         self._times = {
270             "cumulative_fetch": 0.0,
271             "total": 0.0,
272             }
273         self._ciphertext_fetched = 0
274
275     def log(self, *args, **kwargs):
276         if "facility" not in kwargs:
277             kwargs["facility"] = "tahoe.helper.chkupload.fetch"
278         if "parent" not in kwargs:
279             kwargs["parent"] = self._log_parent
280         return log.msg(*args, **kwargs)
281
282     def add_reader(self, reader):
283         AskUntilSuccessMixin.add_reader(self, reader)
284         eventually(self._start)
285
286     def _start(self):
287         if self._started:
288             return
289         self._started = True
290         started = time.time()
291
292         if os.path.exists(self._encoding_file):
293             self.log("ciphertext already present, bypassing fetch",
294                      level=log.UNUSUAL)
295             # we'll still need the plaintext hashes (when
296             # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
297             # called), and currently the easiest way to get them is to ask
298             # the sender for the last byte of ciphertext. That will provoke
299             # them into reading and hashing (but not sending) everything
300             # else.
301             have = os.stat(self._encoding_file)[stat.ST_SIZE]
302             d = self.call("read_encrypted", have-1, 1)
303             d.addCallback(self._done2, started)
304             return
305
306         # first, find out how large the file is going to be
307         d = self.call("get_size")
308         d.addCallback(self._got_size)
309         d.addCallback(self._start_reading)
310         d.addCallback(self._done)
311         d.addCallback(self._done2, started)
312         d.addErrback(self._failed)
313
314     def _got_size(self, size):
315         self.log("total size is %d bytes" % size, level=log.NOISY)
316         self._upload_helper._upload_status.set_size(size)
317         self._expected_size = size
318
319     def _start_reading(self, res):
320         # then find out how much crypttext we have on disk
321         if os.path.exists(self._incoming_file):
322             self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
323             self.log("we already have %d bytes" % self._have, level=log.NOISY)
324         else:
325             self._have = 0
326             self.log("we do not have any ciphertext yet", level=log.NOISY)
327         self.log("starting ciphertext fetch", level=log.NOISY)
328         self._f = open(self._incoming_file, "ab")
329
330         # now loop to pull the data from the readers
331         d = defer.Deferred()
332         self._loop(d)
333         # this Deferred will be fired once the last byte has been written to
334         # self._f
335         return d
336
337     # read data in 50kB chunks. We should choose a more considered number
338     # here, possibly letting the client specify it. The goal should be to
339     # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
340     # the upload bandwidth lost because this protocol is non-windowing. Too
341     # large, however, means more memory consumption for both ends. Something
342     # that can be transferred in, say, 10 seconds sounds about right. On my
343     # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
344     # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
345     # memory than I want to hang on to, so I'm going to go with 50kB and see
346     # how that works.
347     CHUNK_SIZE = 50*1024
348
349     def _loop(self, fire_when_done):
350         # this slightly weird structure is needed because Deferreds don't do
351         # tail-recursion, so it is important to let each one retire promptly.
352         # Simply chaining them will cause a stack overflow at the end of a
353         # transfer that involves more than a few hundred chunks.
354         # 'fire_when_done' lives a long time, but the Deferreds returned by
355         # the inner _fetch() call do not.
356         start = time.time()
357         d = defer.maybeDeferred(self._fetch)
358         def _done(finished):
359             elapsed = time.time() - start
360             self._times["cumulative_fetch"] += elapsed
361             if finished:
362                 self.log("finished reading ciphertext", level=log.NOISY)
363                 fire_when_done.callback(None)
364             else:
365                 self._loop(fire_when_done)
366         def _err(f):
367             self.log("ciphertext read failed", failure=f, level=log.UNUSUAL)
368             fire_when_done.errback(f)
369         d.addCallbacks(_done, _err)
370         return None
371
372     def _fetch(self):
373         needed = self._expected_size - self._have
374         fetch_size = min(needed, self.CHUNK_SIZE)
375         if fetch_size == 0:
376             self._upload_helper._upload_status.set_progress(1, 1.0)
377             return True # all done
378         percent = 0.0
379         if self._expected_size:
380             percent = 1.0 * (self._have+fetch_size) / self._expected_size
381         self.log(format="fetching %(start)d-%(end)d of %(total)d (%(percent)d%%)",
382                  start=self._have,
383                  end=self._have+fetch_size,
384                  total=self._expected_size,
385                  percent=int(100.0*percent),
386                  level=log.NOISY)
387         d = self.call("read_encrypted", self._have, fetch_size)
388         def _got_data(ciphertext_v):
389             for data in ciphertext_v:
390                 self._f.write(data)
391                 self._have += len(data)
392                 self._ciphertext_fetched += len(data)
393                 self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
394                 self._upload_helper._upload_status.set_progress(1, percent)
395             return False # not done
396         d.addCallback(_got_data)
397         return d
398
399     def _done(self, res):
400         self._f.close()
401         self._f = None
402         self.log(format="done fetching ciphertext, size=%(size)d",
403                  size=os.stat(self._incoming_file)[stat.ST_SIZE],
404                  level=log.NOISY)
405         os.rename(self._incoming_file, self._encoding_file)
406
407     def _done2(self, _ignored, started):
408         self.log("done2", level=log.NOISY)
409         elapsed = time.time() - started
410         self._times["total"] = elapsed
411         self._readers = []
412         self._done_observers.fire(None)
413
414     def _failed(self, f):
415         if self._f:
416             self._f.close()
417         self._readers = []
418         self._done_observers.fire(f)
419
420     def when_done(self):
421         return self._done_observers.when_fired()
422
423     def get_times(self):
424         return self._times
425
426     def get_ciphertext_fetched(self):
427         return self._ciphertext_fetched
428
429
430 class LocalCiphertextReader(AskUntilSuccessMixin):
431     implements(interfaces.IEncryptedUploadable)
432
433     def __init__(self, upload_helper, storage_index, encoding_file):
434         self._readers = []
435         self._upload_helper = upload_helper
436         self._storage_index = storage_index
437         self._encoding_file = encoding_file
438         self._status = None
439
440     def set_upload_status(self, upload_status):
441         self._status = interfaces.IUploadStatus(upload_status)
442
443     def start(self):
444         self._upload_helper._upload_status.set_status("pushing")
445         self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
446         self.f = open(self._encoding_file, "rb")
447
448     def get_size(self):
449         return defer.succeed(self._size)
450
451     def get_all_encoding_parameters(self):
452         return self.call("get_all_encoding_parameters")
453
454     def get_storage_index(self):
455         return defer.succeed(self._storage_index)
456
457     def read_encrypted(self, length, hash_only):
458         assert hash_only is False
459         d = defer.maybeDeferred(self.f.read, length)
460         d.addCallback(lambda data: [data])
461         return d
462     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
463         return self.call("get_plaintext_hashtree_leaves", first, last,
464                          num_segments)
465     def get_plaintext_hash(self):
466         return self.call("get_plaintext_hash")
467     def close(self):
468         self.f.close()
469         # ??. I'm not sure if it makes sense to forward the close message.
470         return self.call("close")
471
472
473
474 class Helper(Referenceable, service.MultiService):
475     implements(interfaces.RIHelper, interfaces.IStatsProducer)
476     # this is the non-distributed version. When we need to have multiple
477     # helpers, this object will become the HelperCoordinator, and will query
478     # the farm of Helpers to see if anyone has the storage_index of interest,
479     # and send the request off to them. If nobody has it, we'll choose a
480     # helper at random.
481
482     name = "helper"
483     chk_upload_helper_class = CHKUploadHelper
484     MAX_UPLOAD_STATUSES = 10
485
486     def __init__(self, basedir, stats_provider=None):
487         self._basedir = basedir
488         self._chk_incoming = os.path.join(basedir, "CHK_incoming")
489         self._chk_encoding = os.path.join(basedir, "CHK_encoding")
490         fileutil.make_dirs(self._chk_incoming)
491         fileutil.make_dirs(self._chk_encoding)
492         self._active_uploads = {}
493         self._recent_upload_statuses = []
494         self.stats_provider = stats_provider
495         if stats_provider:
496             stats_provider.register_producer(self)
497         self._counters = {"chk_upload_helper.upload_requests": 0,
498                           "chk_upload_helper.upload_already_present": 0,
499                           "chk_upload_helper.upload_need_upload": 0,
500                           "chk_upload_helper.fetched_bytes": 0,
501                           "chk_upload_helper.encoded_bytes": 0,
502                           }
503         service.MultiService.__init__(self)
504
505     def setServiceParent(self, parent):
506         service.MultiService.setServiceParent(self, parent)
507
508     def log(self, *args, **kwargs):
509         if 'facility' not in kwargs:
510             kwargs['facility'] = "tahoe.helper"
511         return self.parent.log(*args, **kwargs)
512
513     def count(self, key, value=1):
514         if self.stats_provider:
515             self.stats_provider.count(key, value)
516         self._counters[key] += value
517
518     def get_stats(self):
519         OLD = 86400*2 # 48hours
520         now = time.time()
521         inc_count = inc_size = inc_size_old = 0
522         enc_count = enc_size = enc_size_old = 0
523         inc = os.listdir(self._chk_incoming)
524         enc = os.listdir(self._chk_encoding)
525         for f in inc:
526             s = os.stat(os.path.join(self._chk_incoming, f))
527             size = s[stat.ST_SIZE]
528             mtime = s[stat.ST_MTIME]
529             inc_count += 1
530             inc_size += size
531             if now - mtime > OLD:
532                 inc_size_old += size
533         for f in enc:
534             s = os.stat(os.path.join(self._chk_encoding, f))
535             size = s[stat.ST_SIZE]
536             mtime = s[stat.ST_MTIME]
537             enc_count += 1
538             enc_size += size
539             if now - mtime > OLD:
540                 enc_size_old += size
541         stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads),
542                   'chk_upload_helper.incoming_count': inc_count,
543                   'chk_upload_helper.incoming_size': inc_size,
544                   'chk_upload_helper.incoming_size_old': inc_size_old,
545                   'chk_upload_helper.encoding_count': enc_count,
546                   'chk_upload_helper.encoding_size': enc_size,
547                   'chk_upload_helper.encoding_size_old': enc_size_old,
548                   }
549         stats.update(self._counters)
550         return stats
551
552     def remote_upload_chk(self, storage_index):
553         self.count("chk_upload_helper.upload_requests")
554         r = upload.UploadResults()
555         started = time.time()
556         si_s = storage.si_b2a(storage_index)
557         lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
558         incoming_file = os.path.join(self._chk_incoming, si_s)
559         encoding_file = os.path.join(self._chk_encoding, si_s)
560         if storage_index in self._active_uploads:
561             self.log("upload is currently active", parent=lp)
562             uh = self._active_uploads[storage_index]
563             return uh.start()
564
565         d = self._check_for_chk_already_in_grid(storage_index, r, lp)
566         def _checked(already_present):
567             elapsed = time.time() - started
568             r.timings['existence_check'] = elapsed
569             if already_present:
570                 # the necessary results are placed in the UploadResults
571                 self.count("chk_upload_helper.upload_already_present")
572                 self.log("file already found in grid", parent=lp)
573                 return (r, None)
574
575             self.count("chk_upload_helper.upload_need_upload")
576             # the file is not present in the grid, by which we mean there are
577             # less than 'N' shares available.
578             self.log("unable to find file in the grid", parent=lp,
579                      level=log.NOISY)
580             # We need an upload helper. Check our active uploads again in
581             # case there was a race.
582             if storage_index in self._active_uploads:
583                 self.log("upload is currently active", parent=lp)
584                 uh = self._active_uploads[storage_index]
585             else:
586                 self.log("creating new upload helper", parent=lp)
587                 uh = self.chk_upload_helper_class(storage_index, self,
588                                                   incoming_file, encoding_file,
589                                                   r, lp)
590                 self._active_uploads[storage_index] = uh
591             return uh.start()
592         d.addCallback(_checked)
593         def _err(f):
594             self.log("error while checking for chk-already-in-grid",
595                      failure=f, level=log.WEIRD, parent=lp)
596             return f
597         d.addErrback(_err)
598         return d
599
600     def _check_for_chk_already_in_grid(self, storage_index, results, lp):
601         # see if this file is already in the grid
602         lp2 = self.log("doing a quick check+UEBfetch",
603                        parent=lp, level=log.NOISY)
604         c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
605                                     storage_index, lp2)
606         d = c.check()
607         def _checked(res):
608             if res:
609                 (sharemap, ueb_data, ueb_hash) = res
610                 self.log("found file in grid", level=log.NOISY, parent=lp)
611                 results.uri_extension_hash = ueb_hash
612                 results.sharemap = {}
613                 for shnum, peerids in sharemap.items():
614                     peers_s = ",".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
615                                         for peerid in peerids])
616                     results.sharemap[shnum] = "Found on " + peers_s
617                 results.uri_extension_data = ueb_data
618                 results.preexisting_shares = len(sharemap)
619                 results.pushed_shares = 0
620                 return True
621             return False
622         d.addCallback(_checked)
623         return d
624
625     def upload_finished(self, storage_index, size):
626         self.count("chk_upload_helper.encoded_bytes", size)
627         uh = self._active_uploads[storage_index]
628         del self._active_uploads[storage_index]
629         s = uh.get_upload_status()
630         s.set_active(False)
631         self._recent_upload_statuses.append(s)
632         while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
633             self._recent_upload_statuses.pop(0)
634
635     def get_active_upload_statuses(self):
636         return [u.get_upload_status() for u in self._active_uploads.values()]
637
638     def get_recent_upload_statuses(self):
639         return self._recent_upload_statuses