]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/immutable/filenode.py
Add comments and a caveat in webapi.rst indicating that
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
index bace435704b1be70ed8224a4fd4ae37316cecd8f..6b54b2d03287c73ad08a660e28e851c26dc9d5f7 100644 (file)
-import copy, os.path, stat
-from cStringIO import StringIO
+
+import binascii
+import time
+now = time.time
 from zope.interface import implements
 from twisted.internet import defer
-from twisted.internet.interfaces import IPushProducer, IConsumer
-from twisted.protocols import basic
-from foolscap.api import eventually
-from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
-     IDownloadTarget, IUploadResults
-from allmydata.util import dictutil, log, base32
-from allmydata.util.assertutil import precondition
-from allmydata import uri as urimodule
-from allmydata.immutable.checker import Checker
+
+from allmydata import uri
+from twisted.internet.interfaces import IConsumer
+from allmydata.interfaces import IImmutableFileNode, IUploadResults
+from allmydata.util import consumer
 from allmydata.check_results import CheckResults, CheckAndRepairResults
+from allmydata.util.dictutil import DictOfSets
+from pycryptopp.cipher.aes import AES
+
+# local imports
+from allmydata.immutable.checker import Checker
 from allmydata.immutable.repairer import Repairer
-from allmydata.immutable import download
+from allmydata.immutable.downloader.node import DownloadNode, \
+     IDownloadStatusHandlingConsumer
+from allmydata.immutable.downloader.status import DownloadStatus
+
+class CiphertextFileNode:
+    def __init__(self, verifycap, storage_broker, secret_holder,
+                 terminator, history):
+        assert isinstance(verifycap, uri.CHKFileVerifierURI)
+        self._verifycap = verifycap
+        self._storage_broker = storage_broker
+        self._secret_holder = secret_holder
+        self._terminator = terminator
+        self._history = history
+        self._download_status = None
+        self._node = None # created lazily, on read()
+
+    def _maybe_create_download_node(self):
+        if not self._download_status:
+            ds = DownloadStatus(self._verifycap.storage_index,
+                                self._verifycap.size)
+            if self._history:
+                self._history.add_download(ds)
+            self._download_status = ds
+        if self._node is None:
+            self._node = DownloadNode(self._verifycap, self._storage_broker,
+                                      self._secret_holder,
+                                      self._terminator,
+                                      self._history, self._download_status)
 
-class _ImmutableFileNodeBase(object):
-    implements(IFileNode, ICheckable)
+    def read(self, consumer, offset=0, size=None):
+        """I am the main entry point, from which FileNode.read() can get
+        data. I feed the consumer with the desired range of ciphertext. I
+        return a Deferred that fires (with the consumer) when the read is
+        finished."""
+        self._maybe_create_download_node()
+        return self._node.read(consumer, offset, size)
+
+    def get_segment(self, segnum):
+        """Begin downloading a segment. I return a tuple (d, c): 'd' is a
+        Deferred that fires with (offset,data) when the desired segment is
+        available, and c is an object on which c.cancel() can be called to
+        disavow interest in the segment (after which 'd' will never fire).
+
+        You probably need to know the segment size before calling this,
+        unless you want the first few bytes of the file. If you ask for a
+        segment number which turns out to be too large, the Deferred will
+        errback with BadSegmentNumberError.
+
+        The Deferred fires with the offset of the first byte of the data
+        segment, so that you can call get_segment() before knowing the
+        segment size, and still know which data you received.
+        """
+        self._maybe_create_download_node()
+        return self._node.get_segment(segnum)
+
+    def get_segment_size(self):
+        # return a Deferred that fires with the file's real segment size
+        self._maybe_create_download_node()
+        return self._node.get_segsize()
 
-    def __init__(self, uri, client):
-        precondition(urimodule.IImmutableFileURI.providedBy(uri), uri)
-        self.u = IFileURI(uri)
-        self._client = client
+    def get_storage_index(self):
+        return self._verifycap.storage_index
+    def get_verify_cap(self):
+        return self._verifycap
+    def get_size(self):
+        return self._verifycap.size
 
-    def get_readonly_uri(self):
-        return self.get_uri()
+    def raise_error(self):
+        pass
 
     def is_mutable(self):
         return False
 
-    def is_readonly(self):
-        return True
+    def check_and_repair(self, monitor, verify=False, add_lease=False):
+        c = Checker(verifycap=self._verifycap,
+                    servers=self._storage_broker.get_connected_servers(),
+                    verify=verify, add_lease=add_lease,
+                    secret_holder=self._secret_holder,
+                    monitor=monitor)
+        d = c.start()
+        d.addCallback(self._maybe_repair, monitor)
+        return d
+
+    def _maybe_repair(self, cr, monitor):
+        crr = CheckAndRepairResults(self._verifycap.storage_index)
+        crr.pre_repair_results = cr
+        if cr.is_healthy():
+            crr.post_repair_results = cr
+            return defer.succeed(crr)
+
+        crr.repair_attempted = True
+        crr.repair_successful = False # until proven successful
+        def _repair_error(f):
+            # as with mutable repair, I'm not sure if I want to pass
+            # through a failure or not. TODO
+            crr.repair_successful = False
+            crr.repair_failure = f
+            return f
+        r = Repairer(self, storage_broker=self._storage_broker,
+                     secret_holder=self._secret_holder,
+                     monitor=monitor)
+        d = r.start()
+        d.addCallbacks(self._gather_repair_results, _repair_error,
+                       callbackArgs=(cr, crr,))
+        return d
 
+    def _gather_repair_results(self, ur, cr, crr):
+        assert IUploadResults.providedBy(ur), ur
+        # clone the cr (check results) to form the basis of the
+        # prr (post-repair results)
+
+        verifycap = self._verifycap
+        servers_responding = set(cr.get_servers_responding())
+        sm = DictOfSets()
+        assert isinstance(cr.get_sharemap(), DictOfSets)
+        for shnum, servers in cr.get_sharemap().items():
+            for server in servers:
+                sm.add(shnum, server)
+        for shnum, servers in ur.get_sharemap().items():
+            for server in servers:
+                sm.add(shnum, server)
+                servers_responding.add(server)
+        servers_responding = sorted(servers_responding)
+
+        good_hosts = len(reduce(set.union, sm.values(), set()))
+        is_healthy = bool(len(sm) >= verifycap.total_shares)
+        is_recoverable = bool(len(sm) >= verifycap.needed_shares)
+
+        # TODO: this may be wrong, see ticket #1115 comment:27 and ticket #1784.
+        needs_rebalancing = bool(len(sm) >= verifycap.total_shares)
+
+        prr = CheckResults(cr.get_uri(), cr.get_storage_index(),
+                           healthy=is_healthy, recoverable=is_recoverable,
+                           needs_rebalancing=needs_rebalancing,
+                           count_shares_needed=verifycap.needed_shares,
+                           count_shares_expected=verifycap.total_shares,
+                           count_shares_good=len(sm),
+                           count_good_share_hosts=good_hosts,
+                           count_recoverable_versions=int(is_recoverable),
+                           count_unrecoverable_versions=int(not is_recoverable),
+                           servers_responding=list(servers_responding),
+                           sharemap=sm,
+                           count_wrong_shares=0, # no such thing as wrong, for immutable
+                           list_corrupt_shares=cr.get_corrupt_shares(),
+                           count_corrupt_shares=len(cr.get_corrupt_shares()),
+                           list_incompatible_shares=cr.get_incompatible_shares(),
+                           count_incompatible_shares=len(cr.get_incompatible_shares()),
+                           summary="",
+                           report=[],
+                           share_problems=[],
+                           servermap=None)
+        crr.repair_successful = is_healthy
+        crr.post_repair_results = prr
+        return crr
+
+    def check(self, monitor, verify=False, add_lease=False):
+        verifycap = self._verifycap
+        sb = self._storage_broker
+        servers = sb.get_connected_servers()
+        sh = self._secret_holder
+
+        v = Checker(verifycap=verifycap, servers=servers,
+                    verify=verify, add_lease=add_lease, secret_holder=sh,
+                    monitor=monitor)
+        return v.start()
+
+class DecryptingConsumer:
+    """I sit between a CiphertextDownloader (which acts as a Producer) and
+    the real Consumer, decrypting everything that passes by. The real
+    Consumer sees the real Producer, but the Producer sees us instead of the
+    real consumer."""
+    implements(IConsumer, IDownloadStatusHandlingConsumer)
+
+    def __init__(self, consumer, readkey, offset):
+        self._consumer = consumer
+        self._read_ev = None
+        self._download_status = None
+        # TODO: pycryptopp CTR-mode needs random-access operations: I want
+        # either a=AES(readkey, offset) or better yet both of:
+        #  a=AES(readkey, offset=0)
+        #  a.process(ciphertext, offset=xyz)
+        # For now, we fake it with the existing iv= argument.
+        offset_big = offset // 16
+        offset_small = offset % 16
+        iv = binascii.unhexlify("%032x" % offset_big)
+        self._decryptor = AES(readkey, iv=iv)
+        self._decryptor.process("\x00"*offset_small)
+
+    def set_download_status_read_event(self, read_ev):
+        self._read_ev = read_ev
+    def set_download_status(self, ds):
+        self._download_status = ds
+
+    def registerProducer(self, producer, streaming):
+        # this passes through, so the real consumer can flow-control the real
+        # producer. Therefore we don't need to provide any IPushProducer
+        # methods. We implement all the IConsumer methods as pass-throughs,
+        # and only intercept write() to perform decryption.
+        self._consumer.registerProducer(producer, streaming)
+    def unregisterProducer(self):
+        self._consumer.unregisterProducer()
+    def write(self, ciphertext):
+        started = now()
+        plaintext = self._decryptor.process(ciphertext)
+        if self._read_ev:
+            elapsed = now() - started
+            self._read_ev.update(0, elapsed, 0)
+        if self._download_status:
+            self._download_status.add_misc_event("AES", started, now())
+        self._consumer.write(plaintext)
+
+class ImmutableFileNode:
+    implements(IImmutableFileNode)
+
+    # I wrap a CiphertextFileNode with a decryption key
+    def __init__(self, filecap, storage_broker, secret_holder, terminator,
+                 history):
+        assert isinstance(filecap, uri.CHKFileURI)
+        verifycap = filecap.get_verify_cap()
+        self._cnode = CiphertextFileNode(verifycap, storage_broker,
+                                         secret_holder, terminator, history)
+        assert isinstance(filecap, uri.CHKFileURI)
+        self.u = filecap
+        self._readkey = filecap.key
+
+    # TODO: I'm not sure about this.. what's the use case for node==node? If
+    # we keep it here, we should also put this on CiphertextFileNode
     def __hash__(self):
         return self.u.__hash__()
     def __eq__(self, other):
-        if IFileNode.providedBy(other):
+        if isinstance(other, ImmutableFileNode):
             return self.u.__eq__(other.u)
         else:
             return False
     def __ne__(self, other):
-        if IFileNode.providedBy(other):
+        if isinstance(other, ImmutableFileNode):
             return self.u.__eq__(other.u)
         else:
             return True
 
-class PortionOfFile:
-    # like a list slice (things[2:14]), but for a file on disk
-    def __init__(self, fn, offset=0, size=None):
-        self.f = open(fn, "rb")
-        self.f.seek(offset)
-        self.bytes_left = size
-
-    def read(self, size=None):
-        # bytes_to_read = min(size, self.bytes_left), but None>anything
-        if size is None:
-            bytes_to_read = self.bytes_left
-        elif self.bytes_left is None:
-            bytes_to_read = size
-        else:
-            bytes_to_read = min(size, self.bytes_left)
-        data = self.f.read(bytes_to_read)
-        if self.bytes_left is not None:
-            self.bytes_left -= len(data)
-        return data
-
-class DownloadCache:
-    implements(IDownloadTarget)
-
-    def __init__(self, node, cachefile):
-        self._downloader = node._client.getServiceNamed("downloader")
-        self._uri = node.get_uri()
-        self._storage_index = node.get_storage_index()
-        self.milestones = set() # of (offset,size,Deferred)
-        self.cachefile = cachefile
-        self.download_in_progress = False
-        # five states:
-        #  new FileNode, no downloads ever performed
-        #  new FileNode, leftover file (partial)
-        #  new FileNode, leftover file (whole)
-        #  download in progress, not yet complete
-        #  download complete
-
-    def when_range_available(self, offset, size):
-        assert isinstance(offset, (int,long))
-        assert isinstance(size, (int,long))
-
-        d = defer.Deferred()
-        self.milestones.add( (offset,size,d) )
-        self._check_milestones()
-        if self.milestones and not self.download_in_progress:
-            self.download_in_progress = True
-            log.msg(format=("immutable filenode read [%(si)s]: " +
-                            "starting download"),
-                    si=base32.b2a(self._storage_index),
-                    umid="h26Heg", level=log.OPERATIONAL)
-            d2 = self._downloader.download(self._uri, self)
-            d2.addBoth(self._download_done)
-            d2.addErrback(self._download_failed)
-            d2.addErrback(log.err, umid="cQaM9g")
-        return d
-
-    def read(self, consumer, offset, size):
-        assert offset+size <= self.get_filesize()
-        f = PortionOfFile(self.cachefile.get_filename(), offset, size)
-        d = basic.FileSender().beginFileTransfer(f, consumer)
-        d.addCallback(lambda lastSent: consumer)
+    def read(self, consumer, offset=0, size=None):
+        decryptor = DecryptingConsumer(consumer, self._readkey, offset)
+        d = self._cnode.read(decryptor, offset, size)
+        d.addCallback(lambda dc: consumer)
         return d
 
-    def _download_done(self, res):
-        # clear download_in_progress, so failed downloads can be re-tried
-        self.download_in_progress = False
-        return res
-
-    def _download_failed(self, f):
-        # tell anyone who's waiting that we failed
-        for m in self.milestones:
-            (offset,size,d) = m
-            eventually(d.errback, f)
-        self.milestones.clear()
-
-    def _check_milestones(self):
-        current_size = self.get_filesize()
-        for m in list(self.milestones):
-            (offset,size,d) = m
-            if offset+size <= current_size:
-                log.msg(format=("immutable filenode read [%(si)s] " +
-                                "%(offset)d+%(size)d vs %(filesize)d: " +
-                                "done"),
-                        si=base32.b2a(self._storage_index),
-                        offset=offset, size=size, filesize=current_size,
-                        umid="nuedUg", level=log.NOISY)
-                self.milestones.discard(m)
-                eventually(d.callback, None)
-            else:
-                log.msg(format=("immutable filenode read [%(si)s] " +
-                                "%(offset)d+%(size)d vs %(filesize)d: " +
-                                "still waiting"),
-                        si=base32.b2a(self._storage_index),
-                        offset=offset, size=size, filesize=current_size,
-                        umid="8PKOhg", level=log.NOISY)
-
-    def get_filesize(self):
-        try:
-            filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
-        except OSError:
-            filesize = 0
-        return filesize
-
-
-    def open(self, size):
-        self.f = open(self.cachefile.get_filename(), "wb")
-
-    def write(self, data):
-        self.f.write(data)
-        self._check_milestones()
-
-    def close(self):
-        self.f.close()
-        self._check_milestones()
-
-    def fail(self, why):
-        pass
-    def register_canceller(self, cb):
-        pass
-    def finish(self):
-        return None
-    # The following methods are just because the target might be a repairer.DownUpConnector,
-    # and just because the current CHKUpload object expects to find the storage index and
-    # encoding parameters in its Uploadable.
-    def set_storageindex(self, storageindex):
-        pass
-    def set_encodingparams(self, encodingparams):
+    def raise_error(self):
         pass
 
+    def get_write_uri(self):
+        return None
 
-class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
-    def __init__(self, uri, client, cachefile):
-        _ImmutableFileNodeBase.__init__(self, uri, client)
-        self.download_cache = DownloadCache(self, cachefile)
-        prefix = uri.get_verify_cap().to_string()
-        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
-        self.log("starting", level=log.OPERATIONAL)
+    def get_readonly_uri(self):
+        return self.get_uri()
 
     def get_uri(self):
         return self.u.to_string()
-
-    def get_size(self):
-        return self.u.get_size()
-
+    def get_cap(self):
+        return self.u
+    def get_readcap(self):
+        return self.u.get_readonly()
     def get_verify_cap(self):
         return self.u.get_verify_cap()
-
     def get_repair_cap(self):
         # CHK files can be repaired with just the verifycap
         return self.u.get_verify_cap()
 
     def get_storage_index(self):
-        return self.u.storage_index
-
-    def check_and_repair(self, monitor, verify=False, add_lease=False):
-        verifycap = self.get_verify_cap()
-        servers = self._client.get_servers("storage")
-
-        c = Checker(client=self._client, verifycap=verifycap, servers=servers,
-                    verify=verify, add_lease=add_lease, monitor=monitor)
-        d = c.start()
-        def _maybe_repair(cr):
-            crr = CheckAndRepairResults(self.u.storage_index)
-            crr.pre_repair_results = cr
-            if cr.is_healthy():
-                crr.post_repair_results = cr
-                return defer.succeed(crr)
-            else:
-                crr.repair_attempted = True
-                crr.repair_successful = False # until proven successful
-                def _gather_repair_results(ur):
-                    assert IUploadResults.providedBy(ur), ur
-                    # clone the cr -- check results to form the basic of the prr -- post-repair results
-                    prr = CheckResults(cr.uri, cr.storage_index)
-                    prr.data = copy.deepcopy(cr.data)
-
-                    sm = prr.data['sharemap']
-                    assert isinstance(sm, dictutil.DictOfSets), sm
-                    sm.update(ur.sharemap)
-                    servers_responding = set(prr.data['servers-responding'])
-                    servers_responding.union(ur.sharemap.iterkeys())
-                    prr.data['servers-responding'] = list(servers_responding)
-                    prr.data['count-shares-good'] = len(sm)
-                    prr.data['count-good-share-hosts'] = len(sm)
-                    is_healthy = bool(len(sm) >= self.u.total_shares)
-                    is_recoverable = bool(len(sm) >= self.u.needed_shares)
-                    prr.set_healthy(is_healthy)
-                    prr.set_recoverable(is_recoverable)
-                    crr.repair_successful = is_healthy
-                    prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
-
-                    crr.post_repair_results = prr
-                    return crr
-                def _repair_error(f):
-                    # as with mutable repair, I'm not sure if I want to pass
-                    # through a failure or not. TODO
-                    crr.repair_successful = False
-                    crr.repair_failure = f
-                    return f
-                r = Repairer(client=self._client, verifycap=verifycap, monitor=monitor)
-                d = r.start()
-                d.addCallbacks(_gather_repair_results, _repair_error)
-                return d
-
-        d.addCallback(_maybe_repair)
-        return d
-
-    def check(self, monitor, verify=False, add_lease=False):
-        v = Checker(client=self._client, verifycap=self.get_verify_cap(),
-                    servers=self._client.get_servers("storage"),
-                    verify=verify, add_lease=add_lease, monitor=monitor)
-        return v.start()
-
-    def read(self, consumer, offset=0, size=None):
-        if size is None:
-            size = self.get_size() - offset
-        size = min(size, self.get_size() - offset)
-
-        if offset == 0 and size == self.get_size():
-            # don't use the cache, just do a normal streaming download
-            self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
-            return self.download(download.ConsumerAdapter(consumer))
-
-        d = self.download_cache.when_range_available(offset, size)
-        d.addCallback(lambda res:
-                      self.download_cache.read(consumer, offset, size))
-        return d
-
-    def download(self, target):
-        downloader = self._client.getServiceNamed("downloader")
-        history = self._client.get_history()
-        return downloader.download(self.get_uri(), target, self._parentmsgid,
-                                   history=history)
-
-    def download_to_data(self):
-        downloader = self._client.getServiceNamed("downloader")
-        history = self._client.get_history()
-        return downloader.download_to_data(self.get_uri(), history=history)
-
-class LiteralProducer:
-    implements(IPushProducer)
-    def resumeProducing(self):
-        pass
-    def stopProducing(self):
-        pass
-
-
-class LiteralFileNode(_ImmutableFileNodeBase):
-
-    def __init__(self, uri, client):
-        precondition(urimodule.IImmutableFileURI.providedBy(uri), uri)
-        _ImmutableFileNodeBase.__init__(self, uri, client)
-
-    def get_uri(self):
-        return self.u.to_string()
+        return self.u.get_storage_index()
 
     def get_size(self):
-        return len(self.u.data)
+        return self.u.get_size()
+    def get_current_size(self):
+        return defer.succeed(self.get_size())
 
-    def get_verify_cap(self):
-        return None
+    def is_mutable(self):
+        return False
 
-    def get_repair_cap(self):
-        return None
+    def is_readonly(self):
+        return True
 
-    def get_storage_index(self):
-        return None
+    def is_unknown(self):
+        return False
 
-    def check(self, monitor, verify=False, add_lease=False):
-        return defer.succeed(None)
+    def is_allowed_in_immutable_directory(self):
+        return True
 
     def check_and_repair(self, monitor, verify=False, add_lease=False):
-        return defer.succeed(None)
-
-    def read(self, consumer, offset=0, size=None):
-        if size is None:
-            data = self.u.data[offset:]
-        else:
-            data = self.u.data[offset:offset+size]
-
-        # We use twisted.protocols.basic.FileSender, which only does
-        # non-streaming, i.e. PullProducer, where the receiver/consumer must
-        # ask explicitly for each chunk of data. There are only two places in
-        # the Twisted codebase that can't handle streaming=False, both of
-        # which are in the upload path for an FTP/SFTP server
-        # (protocols.ftp.FileConsumer and
-        # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
-        # likely to be used as the target for a Tahoe download.
-
-        d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
-        d.addCallback(lambda lastSent: consumer)
+        return self._cnode.check_and_repair(monitor, verify, add_lease)
+    def check(self, monitor, verify=False, add_lease=False):
+        return self._cnode.check(monitor, verify, add_lease)
+
+    def get_best_readable_version(self):
+        """
+        Return an IReadable of the best version of this file. Since
+        immutable files can have only one version, we just return the
+        current filenode.
+        """
+        return defer.succeed(self)
+
+
+    def download_best_version(self):
+        """
+        Download the best version of this file, returning its contents
+        as a bytestring. Since there is only one version of an immutable
+        file, we download and return the contents of this file.
+        """
+        d = consumer.download_to_data(self)
         return d
 
-    def download(self, target):
-        # note that this does not update the stats_provider
-        data = self.u.data
-        if IConsumer.providedBy(target):
-            target.registerProducer(LiteralProducer(), True)
-        target.open(len(data))
-        target.write(data)
-        if IConsumer.providedBy(target):
-            target.unregisterProducer()
-        target.close()
-        return defer.maybeDeferred(target.finish)
-
-    def download_to_data(self):
-        data = self.u.data
-        return defer.succeed(data)
+    # for an immutable file, download_to_data (specified in IReadable)
+    # is the same as download_best_version (specified in IFileNode). For
+    # mutable files, the difference is more meaningful, since they can
+    # have multiple versions.
+    download_to_data = download_best_version
+
+
+    # get_size() (IReadable), get_current_size() (IFilesystemNode), and
+    # get_size_of_best_version(IFileNode) are all the same for immutable
+    # files.
+    get_size_of_best_version = get_current_size