]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/offloaded.py
b6f2041e7622d7f841e19f891957e73bd1fb1b85
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / offloaded.py
1
2 import os, stat, time
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 from foolscap.eventual import eventually
8 from allmydata import upload, interfaces, storage, uri
9 from allmydata.util import idlib, log, observer, fileutil, hashutil
10
11
12 class NotEnoughWritersError(Exception):
13     pass
14
15
16 class CHKCheckerAndUEBFetcher:
17     """I check to see if a file is already present in the grid. I also fetch
18     the URI Extension Block, which is useful for an uploading client who
19     wants to avoid the work of encryption and encoding.
20
21     I return False if the file is not completely healthy: i.e. if there are
22     less than 'N' shares present.
23
24     If the file is completely healthy, I return a tuple of (sharemap,
25     UEB_data, UEB_hash).
26     """
27
28     def __init__(self, peer_getter, storage_index, logparent=None):
29         self._peer_getter = peer_getter
30         self._found_shares = set()
31         self._storage_index = storage_index
32         self._sharemap = {}
33         self._readers = set()
34         self._ueb_hash = None
35         self._ueb_data = None
36         self._logparent = logparent
37
38     def log(self, *args, **kwargs):
39         if 'facility' not in kwargs:
40             kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
41         if 'parent' not in kwargs:
42             kwargs['parent'] = self._logparent
43         return log.msg(*args, **kwargs)
44
45     def check(self):
46         d = self._get_all_shareholders(self._storage_index)
47         d.addCallback(self._get_uri_extension)
48         d.addCallback(self._done)
49         return d
50
51     def _get_all_shareholders(self, storage_index):
52         dl = []
53         for (peerid, ss) in self._peer_getter("storage", storage_index):
54             d = ss.callRemote("get_buckets", storage_index)
55             d.addCallbacks(self._got_response, self._got_error,
56                            callbackArgs=(peerid,))
57             dl.append(d)
58         return defer.DeferredList(dl)
59
60     def _got_response(self, buckets, peerid):
61         # buckets is a dict: maps shum to an rref of the server who holds it
62         shnums_s = ",".join([str(shnum) for shnum in buckets])
63         self.log("got_response: [%s] has %d shares (%s)" %
64                  (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
65                  level=log.NOISY)
66         self._found_shares.update(buckets.keys())
67         for k in buckets:
68             if k not in self._sharemap:
69                 self._sharemap[k] = []
70             self._sharemap[k].append(peerid)
71         self._readers.update( [ (bucket, peerid)
72                                 for bucket in buckets.values() ] )
73
74     def _got_error(self, f):
75         if f.check(KeyError):
76             pass
77         log.err(f, parent=self._logparent)
78         pass
79
80     def _get_uri_extension(self, res):
81         # assume that we can pull the UEB from any share. If we get an error,
82         # declare the whole file unavailable.
83         if not self._readers:
84             self.log("no readers, so no UEB", level=log.NOISY)
85             return
86         b,peerid = self._readers.pop()
87         rbp = storage.ReadBucketProxy(b, peerid,
88                                       storage.si_b2a(self._storage_index))
89         d = rbp.startIfNecessary()
90         d.addCallback(lambda res: rbp.get_uri_extension())
91         d.addCallback(self._got_uri_extension)
92         d.addErrback(self._ueb_error)
93         return d
94
95     def _got_uri_extension(self, ueb):
96         self.log("_got_uri_extension", level=log.NOISY)
97         self._ueb_hash = hashutil.uri_extension_hash(ueb)
98         self._ueb_data = uri.unpack_extension(ueb)
99
100     def _ueb_error(self, f):
101         # an error means the file is unavailable, but the overall check
102         # shouldn't fail.
103         self.log("UEB fetch failed", failure=f, level=log.WEIRD)
104         return None
105
106     def _done(self, res):
107         if self._ueb_data:
108             found = len(self._found_shares)
109             total = self._ueb_data['total_shares']
110             self.log(format="got %(found)d shares of %(total)d",
111                      found=found, total=total, level=log.NOISY)
112             if found < total:
113                 # not all shares are present in the grid
114                 self.log("not enough to qualify, file not found in grid",
115                          level=log.NOISY)
116                 return False
117             # all shares are present
118             self.log("all shares present, file is found in grid",
119                      level=log.NOISY)
120             return (self._sharemap, self._ueb_data, self._ueb_hash)
121         # no shares are present
122         self.log("unable to find UEB data, file not found in grid",
123                  level=log.NOISY)
124         return False
125
126
127 class CHKUploadHelper(Referenceable, upload.CHKUploader):
128     """I am the helper-server -side counterpart to AssistedUploader. I handle
129     peer selection, encoding, and share pushing. I read ciphertext from the
130     remote AssistedUploader.
131     """
132     implements(interfaces.RICHKUploadHelper)
133
134     def __init__(self, storage_index, helper,
135                  incoming_file, encoding_file,
136                  results, log_number):
137         self._storage_index = storage_index
138         self._helper = helper
139         self._incoming_file = incoming_file
140         self._encoding_file = encoding_file
141         upload_id = storage.si_b2a(storage_index)[:5]
142         self._log_number = log_number
143         self._results = results
144         self._upload_status = upload.UploadStatus()
145         self._upload_status.set_helper(False)
146         self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
147                          parent=log_number)
148
149         self._client = helper.parent
150         self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
151                                              self._log_number)
152         self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
153         self._finished_observers = observer.OneShotObserverList()
154
155         d = self._fetcher.when_done()
156         d.addCallback(lambda res: self._reader.start())
157         d.addCallback(lambda res: self.start_encrypted(self._reader))
158         d.addCallback(self._finished)
159         d.addErrback(self._failed)
160
161     def log(self, *args, **kwargs):
162         if 'facility' not in kwargs:
163             kwargs['facility'] = "tahoe.helper.chk"
164         return upload.CHKUploader.log(self, *args, **kwargs)
165
166     def start(self):
167         self._started = time.time()
168         # determine if we need to upload the file. If so, return ({},self) .
169         # If not, return (UploadResults,None) .
170         self.log("deciding whether to upload the file or not", level=log.NOISY)
171         if os.path.exists(self._encoding_file):
172             # we have the whole file, and we might be encoding it (or the
173             # encode/upload might have failed, and we need to restart it).
174             self.log("ciphertext already in place", level=log.UNUSUAL)
175             return (self._results, self)
176         if os.path.exists(self._incoming_file):
177             # we have some of the file, but not all of it (otherwise we'd be
178             # encoding). The caller might be useful.
179             self.log("partial ciphertext already present", level=log.UNUSUAL)
180             return (self._results, self)
181         # we don't remember uploading this file
182         self.log("no ciphertext yet", level=log.NOISY)
183         return (self._results, self)
184
185     def remote_upload(self, reader):
186         # reader is an RIEncryptedUploadable. I am specified to return an
187         # UploadResults dictionary.
188
189         # let our fetcher pull ciphertext from the reader.
190         self._fetcher.add_reader(reader)
191         # and also hashes
192         self._reader.add_reader(reader)
193
194         # and inform the client when the upload has finished
195         return self._finished_observers.when_fired()
196
197     def _finished(self, res):
198         (uri_extension_hash, needed_shares, total_shares, size) = res
199         r = self._results
200         r.uri_extension_hash = uri_extension_hash
201         f_times = self._fetcher.get_times()
202         r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
203         r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
204         r.timings["total_fetch"] = f_times["total"]
205         self._reader.close()
206         os.unlink(self._encoding_file)
207         self._finished_observers.fire(r)
208         self._helper.upload_finished(self._storage_index, size)
209         del self._reader
210
211     def _failed(self, f):
212         self._finished_observers.fire(f)
213         self._helper.upload_finished(self._storage_index, 0)
214         del self._reader
215
216 class AskUntilSuccessMixin:
217     # create me with a _reader array
218     _last_failure = None
219
220     def add_reader(self, reader):
221         self._readers.append(reader)
222
223     def call(self, *args, **kwargs):
224         if not self._readers:
225             raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
226         rr = self._readers[0]
227         d = rr.callRemote(*args, **kwargs)
228         def _err(f):
229             self._last_failure = f
230             if rr in self._readers:
231                 self._readers.remove(rr)
232             self._upload_helper.log("call to assisted uploader %s failed" % rr,
233                                     failure=f, level=log.UNUSUAL)
234             # we can try again with someone else who's left
235             return self.call(*args, **kwargs)
236         d.addErrback(_err)
237         return d
238
239 class CHKCiphertextFetcher(AskUntilSuccessMixin):
240     """I use one or more remote RIEncryptedUploadable instances to gather
241     ciphertext on disk. When I'm done, the file I create can be used by a
242     LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
243     process.
244
245     I begin pulling ciphertext as soon as a reader is added. I remove readers
246     when they have any sort of error. If the last reader is removed, I fire
247     my when_done() Deferred with a failure.
248
249     I fire my when_done() Deferred (with None) immediately after I have moved
250     the ciphertext to 'encoded_file'.
251     """
252
253     def __init__(self, helper, incoming_file, encoded_file, logparent):
254         self._upload_helper = helper
255         self._incoming_file = incoming_file
256         self._encoding_file = encoded_file
257         self._log_parent = logparent
258         self._done_observers = observer.OneShotObserverList()
259         self._readers = []
260         self._started = False
261         self._f = None
262         self._times = {
263             "cumulative_fetch": 0.0,
264             "total": 0.0,
265             }
266         self._ciphertext_fetched = 0
267
268     def log(self, *args, **kwargs):
269         if "facility" not in kwargs:
270             kwargs["facility"] = "tahoe.helper.chkupload.fetch"
271         if "parent" not in kwargs:
272             kwargs["parent"] = self._log_parent
273         return log.msg(*args, **kwargs)
274
275     def add_reader(self, reader):
276         AskUntilSuccessMixin.add_reader(self, reader)
277         eventually(self._start)
278
279     def _start(self):
280         if self._started:
281             return
282         self._started = True
283         started = time.time()
284
285         if os.path.exists(self._encoding_file):
286             self.log("ciphertext already present, bypassing fetch",
287                      level=log.UNUSUAL)
288             # we'll still need the plaintext hashes (when
289             # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
290             # called), and currently the easiest way to get them is to ask
291             # the sender for the last byte of ciphertext. That will provoke
292             # them into reading and hashing (but not sending) everything
293             # else.
294             have = os.stat(self._encoding_file)[stat.ST_SIZE]
295             d = self.call("read_encrypted", have-1, 1)
296             d.addCallback(self._done2, started)
297             return
298
299         # first, find out how large the file is going to be
300         d = self.call("get_size")
301         d.addCallback(self._got_size)
302         d.addCallback(self._start_reading)
303         d.addCallback(self._done)
304         d.addCallback(self._done2, started)
305         d.addErrback(self._failed)
306
307     def _got_size(self, size):
308         self.log("total size is %d bytes" % size, level=log.NOISY)
309         self._expected_size = size
310
311     def _start_reading(self, res):
312         # then find out how much crypttext we have on disk
313         if os.path.exists(self._incoming_file):
314             self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
315             self.log("we already have %d bytes" % self._have, level=log.NOISY)
316         else:
317             self._have = 0
318             self.log("we do not have any ciphertext yet", level=log.NOISY)
319         self.log("starting ciphertext fetch", level=log.NOISY)
320         self._f = open(self._incoming_file, "ab")
321
322         # now loop to pull the data from the readers
323         d = defer.Deferred()
324         self._loop(d)
325         # this Deferred will be fired once the last byte has been written to
326         # self._f
327         return d
328
329     # read data in 50kB chunks. We should choose a more considered number
330     # here, possibly letting the client specify it. The goal should be to
331     # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
332     # the upload bandwidth lost because this protocol is non-windowing. Too
333     # large, however, means more memory consumption for both ends. Something
334     # that can be transferred in, say, 10 seconds sounds about right. On my
335     # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
336     # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
337     # memory than I want to hang on to, so I'm going to go with 50kB and see
338     # how that works.
339     CHUNK_SIZE = 50*1024
340
341     def _loop(self, fire_when_done):
342         # this slightly weird structure is needed because Deferreds don't do
343         # tail-recursion, so it is important to let each one retire promptly.
344         # Simply chaining them will cause a stack overflow at the end of a
345         # transfer that involves more than a few hundred chunks.
346         # 'fire_when_done' lives a long time, but the Deferreds returned by
347         # the inner _fetch() call do not.
348         start = time.time()
349         d = defer.maybeDeferred(self._fetch)
350         def _done(finished):
351             elapsed = time.time() - start
352             self._times["cumulative_fetch"] += elapsed
353             if finished:
354                 self.log("finished reading ciphertext", level=log.NOISY)
355                 fire_when_done.callback(None)
356             else:
357                 self._loop(fire_when_done)
358         def _err(f):
359             self.log("ciphertext read failed", failure=f, level=log.UNUSUAL)
360             fire_when_done.errback(f)
361         d.addCallbacks(_done, _err)
362         return None
363
364     def _fetch(self):
365         needed = self._expected_size - self._have
366         fetch_size = min(needed, self.CHUNK_SIZE)
367         if fetch_size == 0:
368             return True # all done
369         self.log(format="fetching %(start)d-%(end)d of %(total)d",
370                  start=self._have,
371                  end=self._have+fetch_size,
372                  total=self._expected_size,
373                  level=log.NOISY)
374         d = self.call("read_encrypted", self._have, fetch_size)
375         def _got_data(ciphertext_v):
376             for data in ciphertext_v:
377                 self._f.write(data)
378                 self._have += len(data)
379                 self._ciphertext_fetched += len(data)
380                 stats_provider = self._upload_helper._helper.stats_provider
381                 if stats_provider:
382                     stats_provider.count("chk_upload_helper.fetched_bytes", len(data))
383             return False # not done
384         d.addCallback(_got_data)
385         return d
386
387     def _done(self, res):
388         self._f.close()
389         self._f = None
390         self.log(format="done fetching ciphertext, size=%(size)d",
391                  size=os.stat(self._incoming_file)[stat.ST_SIZE],
392                  level=log.NOISY)
393         os.rename(self._incoming_file, self._encoding_file)
394
395     def _done2(self, _ignored, started):
396         self.log("done2", level=log.NOISY)
397         elapsed = time.time() - started
398         self._times["total"] = elapsed
399         self._readers = []
400         self._done_observers.fire(None)
401
402     def _failed(self, f):
403         if self._f:
404             self._f.close()
405         self._readers = []
406         self._done_observers.fire(f)
407
408     def when_done(self):
409         return self._done_observers.when_fired()
410
411     def get_times(self):
412         return self._times
413
414     def get_ciphertext_fetched(self):
415         return self._ciphertext_fetched
416
417
418 class LocalCiphertextReader(AskUntilSuccessMixin):
419     implements(interfaces.IEncryptedUploadable)
420
421     def __init__(self, upload_helper, storage_index, encoding_file):
422         self._readers = []
423         self._upload_helper = upload_helper
424         self._storage_index = storage_index
425         self._encoding_file = encoding_file
426         self._status = None
427
428     def set_upload_status(self, upload_status):
429         self._status = interfaces.IUploadStatus(upload_status)
430
431     def start(self):
432         self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
433         self.f = open(self._encoding_file, "rb")
434
435     def get_size(self):
436         return defer.succeed(self._size)
437
438     def get_all_encoding_parameters(self):
439         return self.call("get_all_encoding_parameters")
440
441     def get_storage_index(self):
442         return defer.succeed(self._storage_index)
443
444     def read_encrypted(self, length, hash_only):
445         assert hash_only is False
446         d = defer.maybeDeferred(self.f.read, length)
447         d.addCallback(lambda data: [data])
448         return d
449     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
450         return self.call("get_plaintext_hashtree_leaves", first, last,
451                          num_segments)
452     def get_plaintext_hash(self):
453         return self.call("get_plaintext_hash")
454     def close(self):
455         self.f.close()
456         # ??. I'm not sure if it makes sense to forward the close message.
457         return self.call("close")
458
459
460
461 class Helper(Referenceable, service.MultiService):
462     implements(interfaces.RIHelper, interfaces.IStatsProducer)
463     # this is the non-distributed version. When we need to have multiple
464     # helpers, this object will become the HelperCoordinator, and will query
465     # the farm of Helpers to see if anyone has the storage_index of interest,
466     # and send the request off to them. If nobody has it, we'll choose a
467     # helper at random.
468
469     name = "helper"
470     chk_upload_helper_class = CHKUploadHelper
471
472     def __init__(self, basedir, stats_provider=None):
473         self._basedir = basedir
474         self._chk_incoming = os.path.join(basedir, "CHK_incoming")
475         self._chk_encoding = os.path.join(basedir, "CHK_encoding")
476         fileutil.make_dirs(self._chk_incoming)
477         fileutil.make_dirs(self._chk_encoding)
478         self._active_uploads = {}
479         self.stats_provider = stats_provider
480         if stats_provider:
481             stats_provider.register_producer(self)
482         service.MultiService.__init__(self)
483
484     def setServiceParent(self, parent):
485         service.MultiService.setServiceParent(self, parent)
486
487     def log(self, *args, **kwargs):
488         if 'facility' not in kwargs:
489             kwargs['facility'] = "tahoe.helper"
490         return self.parent.log(*args, **kwargs)
491
492     def get_stats(self):
493         OLD = 86400*2 # 48hours
494         now = time.time()
495         inc_count = inc_size = inc_size_old = 0
496         enc_count = enc_size = enc_size_old = 0
497         inc = os.listdir(self._chk_incoming)
498         enc = os.listdir(self._chk_encoding)
499         for f in inc:
500             s = os.stat(os.path.join(self._chk_incoming, f))
501             size = s[stat.ST_SIZE]
502             mtime = s[stat.ST_MTIME]
503             inc_count += 1
504             inc_size += size
505             if now - mtime > OLD:
506                 inc_size_old += size
507         for f in enc:
508             s = os.stat(os.path.join(self._chk_encoding, f))
509             size = s[stat.ST_SIZE]
510             mtime = s[stat.ST_MTIME]
511             enc_count += 1
512             enc_size += size
513             if now - mtime > OLD:
514                 enc_size_old += size
515         return { 'chk_upload_helper.active_uploads': len(self._active_uploads),
516                  'chk_upload_helper.incoming_count': inc_count,
517                  'chk_upload_helper.incoming_size': inc_size,
518                  'chk_upload_helper.incoming_size_old': inc_size_old,
519                  'chk_upload_helper.encoding_count': enc_count,
520                  'chk_upload_helper.encoding_size': enc_size,
521                  'chk_upload_helper.encoding_size_old': enc_size_old,
522                }
523
524     def remote_upload_chk(self, storage_index):
525         if self.stats_provider:
526             self.stats_provider.count("chk_upload_helper.upload_requests")
527         r = upload.UploadResults()
528         started = time.time()
529         si_s = storage.si_b2a(storage_index)
530         lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
531         incoming_file = os.path.join(self._chk_incoming, si_s)
532         encoding_file = os.path.join(self._chk_encoding, si_s)
533         if storage_index in self._active_uploads:
534             self.log("upload is currently active", parent=lp)
535             uh = self._active_uploads[storage_index]
536             return uh.start()
537
538         d = self._check_for_chk_already_in_grid(storage_index, r, lp)
539         def _checked(already_present):
540             elapsed = time.time() - started
541             r.timings['existence_check'] = elapsed
542             if already_present:
543                 # the necessary results are placed in the UploadResults
544                 if self.stats_provider:
545                     self.stats_provider.count("chk_upload_helper.upload_already_present")
546                 self.log("file already found in grid", parent=lp)
547                 return (r, None)
548
549             if self.stats_provider:
550                 self.stats_provider.count("chk_upload_helper.upload_need_upload")
551             # the file is not present in the grid, by which we mean there are
552             # less than 'N' shares available.
553             self.log("unable to find file in the grid", parent=lp,
554                      level=log.NOISY)
555             # We need an upload helper. Check our active uploads again in
556             # case there was a race.
557             if storage_index in self._active_uploads:
558                 self.log("upload is currently active", parent=lp)
559                 uh = self._active_uploads[storage_index]
560             else:
561                 self.log("creating new upload helper", parent=lp)
562                 uh = self.chk_upload_helper_class(storage_index, self,
563                                                   incoming_file, encoding_file,
564                                                   r, lp)
565                 self._active_uploads[storage_index] = uh
566             return uh.start()
567         d.addCallback(_checked)
568         def _err(f):
569             self.log("error while checking for chk-already-in-grid",
570                      failure=f, level=log.WEIRD, parent=lp)
571             return f
572         d.addErrback(_err)
573         return d
574
575     def _check_for_chk_already_in_grid(self, storage_index, results, lp):
576         # see if this file is already in the grid
577         lp2 = self.log("doing a quick check+UEBfetch",
578                        parent=lp, level=log.NOISY)
579         c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
580                                     storage_index, lp2)
581         d = c.check()
582         def _checked(res):
583             if res:
584                 (sharemap, ueb_data, ueb_hash) = res
585                 self.log("found file in grid", level=log.NOISY, parent=lp)
586                 results.uri_extension_hash = ueb_hash
587                 results.sharemap = {}
588                 for shnum, peerids in sharemap.items():
589                     peers_s = ",".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
590                                         for peerid in peerids])
591                     results.sharemap[shnum] = "Found on " + peers_s
592                 results.uri_extension_data = ueb_data
593                 results.preexisting_shares = len(sharemap)
594                 results.pushed_shares = 0
595                 return True
596             return False
597         d.addCallback(_checked)
598         return d
599
600     def upload_finished(self, storage_index, size):
601         if self.stats_provider:
602             self.stats_provider.count("chk_upload_helper.encoded_bytes", size)
603         del self._active_uploads[storage_index]