]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/immutable/filenode.py
do not cache and re-use imm filenodes in nodemaker
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
index ed3785b7152e75938ca5bbdded51fe76dbfba5e3..79e282d90e232a2645b56e7b14cf1a765fd463e3 100644 (file)
@@ -1,14 +1,14 @@
 
 import binascii
-import copy
 import time
 now = time.time
-from zope.interface import implements, Interface
+from zope.interface import implements
 from twisted.internet import defer
-from twisted.internet.interfaces import IConsumer
 
-from allmydata.interfaces import IImmutableFileNode, IUploadResults
 from allmydata import uri
+from twisted.internet.interfaces import IConsumer
+from allmydata.interfaces import IImmutableFileNode, IUploadResults
+from allmydata.util import consumer
 from allmydata.check_results import CheckResults, CheckAndRepairResults
 from allmydata.util.dictutil import DictOfSets
 from pycryptopp.cipher.aes import AES
@@ -16,14 +16,10 @@ from pycryptopp.cipher.aes import AES
 # local imports
 from allmydata.immutable.checker import Checker
 from allmydata.immutable.repairer import Repairer
-from allmydata.immutable.downloader.node import DownloadNode
+from allmydata.immutable.downloader.node import DownloadNode, \
+     IDownloadStatusHandlingConsumer
 from allmydata.immutable.downloader.status import DownloadStatus
 
-class IDownloadStatusHandlingConsumer(Interface):
-    def set_download_status_read_event(read_ev):
-        """Record the DownloadStatus 'read event', to be updated with the
-        time it takes to decrypt each chunk of data."""
-
 class CiphertextFileNode:
     def __init__(self, verifycap, storage_broker, secret_holder,
                  terminator, history):
@@ -55,14 +51,7 @@ class CiphertextFileNode:
         return a Deferred that fires (with the consumer) when the read is
         finished."""
         self._maybe_create_download_node()
-        actual_size = size
-        if actual_size is None:
-            actual_size = self._verifycap.size - offset
-        read_ev = self._download_status.add_read_event(offset, actual_size,
-                                                       now())
-        if IDownloadStatusHandlingConsumer.providedBy(consumer):
-            consumer.set_download_status_read_event(read_ev)
-        return self._node.read(consumer, offset, size, read_ev)
+        return self._node.read(consumer, offset, size)
 
     def get_segment(self, segnum):
         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
@@ -97,70 +86,92 @@ class CiphertextFileNode:
     def raise_error(self):
         pass
 
+    def is_mutable(self):
+        return False
 
     def check_and_repair(self, monitor, verify=False, add_lease=False):
-        verifycap = self._verifycap
-        storage_index = verifycap.storage_index
-        sb = self._storage_broker
-        servers = sb.get_all_servers()
-        sh = self._secret_holder
-
-        c = Checker(verifycap=verifycap, servers=servers,
-                    verify=verify, add_lease=add_lease, secret_holder=sh,
+        c = Checker(verifycap=self._verifycap,
+                    servers=self._storage_broker.get_connected_servers(),
+                    verify=verify, add_lease=add_lease,
+                    secret_holder=self._secret_holder,
                     monitor=monitor)
         d = c.start()
-        def _maybe_repair(cr):
-            crr = CheckAndRepairResults(storage_index)
-            crr.pre_repair_results = cr
-            if cr.is_healthy():
-                crr.post_repair_results = cr
-                return defer.succeed(crr)
-            else:
-                crr.repair_attempted = True
-                crr.repair_successful = False # until proven successful
-                def _gather_repair_results(ur):
-                    assert IUploadResults.providedBy(ur), ur
-                    # clone the cr (check results) to form the basis of the
-                    # prr (post-repair results)
-                    prr = CheckResults(cr.uri, cr.storage_index)
-                    prr.data = copy.deepcopy(cr.data)
-
-                    sm = prr.data['sharemap']
-                    assert isinstance(sm, DictOfSets), sm
-                    sm.update(ur.sharemap)
-                    servers_responding = set(prr.data['servers-responding'])
-                    servers_responding.union(ur.sharemap.iterkeys())
-                    prr.data['servers-responding'] = list(servers_responding)
-                    prr.data['count-shares-good'] = len(sm)
-                    prr.data['count-good-share-hosts'] = len(sm)
-                    is_healthy = bool(len(sm) >= verifycap.total_shares)
-                    is_recoverable = bool(len(sm) >= verifycap.needed_shares)
-                    prr.set_healthy(is_healthy)
-                    prr.set_recoverable(is_recoverable)
-                    crr.repair_successful = is_healthy
-                    prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
-
-                    crr.post_repair_results = prr
-                    return crr
-                def _repair_error(f):
-                    # as with mutable repair, I'm not sure if I want to pass
-                    # through a failure or not. TODO
-                    crr.repair_successful = False
-                    crr.repair_failure = f
-                    return f
-                r = Repairer(self, storage_broker=sb, secret_holder=sh,
-                             monitor=monitor)
-                d = r.start()
-                d.addCallbacks(_gather_repair_results, _repair_error)
-                return d
-
-        d.addCallback(_maybe_repair)
+        d.addCallback(self._maybe_repair, monitor)
         return d
 
+    def _maybe_repair(self, cr, monitor):
+        crr = CheckAndRepairResults(self._verifycap.storage_index)
+        crr.pre_repair_results = cr
+        if cr.is_healthy():
+            crr.post_repair_results = cr
+            return defer.succeed(crr)
+
+        crr.repair_attempted = True
+        crr.repair_successful = False # until proven successful
+        def _repair_error(f):
+            # as with mutable repair, I'm not sure if I want to pass
+            # through a failure or not. TODO
+            crr.repair_successful = False
+            crr.repair_failure = f
+            return f
+        r = Repairer(self, storage_broker=self._storage_broker,
+                     secret_holder=self._secret_holder,
+                     monitor=monitor)
+        d = r.start()
+        d.addCallbacks(self._gather_repair_results, _repair_error,
+                       callbackArgs=(cr, crr,))
+        return d
+
+    def _gather_repair_results(self, ur, cr, crr):
+        assert IUploadResults.providedBy(ur), ur
+        # clone the cr (check results) to form the basis of the
+        # prr (post-repair results)
+
+        verifycap = self._verifycap
+        servers_responding = set(cr.get_servers_responding())
+        sm = DictOfSets()
+        assert isinstance(cr.get_sharemap(), DictOfSets)
+        for shnum, servers in cr.get_sharemap().items():
+            for server in servers:
+                sm.add(shnum, server)
+        for shnum, servers in ur.get_sharemap().items():
+            for server in servers:
+                sm.add(shnum, server)
+                servers_responding.add(server)
+        servers_responding = sorted(servers_responding)
+
+        good_hosts = len(reduce(set.union, sm.values(), set()))
+        is_healthy = bool(len(sm) >= verifycap.total_shares)
+        is_recoverable = bool(len(sm) >= verifycap.needed_shares)
+        needs_rebalancing = bool(len(sm) >= verifycap.total_shares)
+        prr = CheckResults(cr.get_uri(), cr.get_storage_index(),
+                           healthy=is_healthy, recoverable=is_recoverable,
+                           needs_rebalancing=needs_rebalancing,
+                           count_shares_needed=verifycap.needed_shares,
+                           count_shares_expected=verifycap.total_shares,
+                           count_shares_good=len(sm),
+                           count_good_share_hosts=good_hosts,
+                           count_recoverable_versions=int(is_recoverable),
+                           count_unrecoverable_versions=int(not is_recoverable),
+                           servers_responding=list(servers_responding),
+                           sharemap=sm,
+                           count_wrong_shares=0, # no such thing as wrong, for immutable
+                           list_corrupt_shares=cr.get_corrupt_shares(),
+                           count_corrupt_shares=len(cr.get_corrupt_shares()),
+                           list_incompatible_shares=cr.get_incompatible_shares(),
+                           count_incompatible_shares=len(cr.get_incompatible_shares()),
+                           summary="",
+                           report=[],
+                           share_problems=[],
+                           servermap=None)
+        crr.repair_successful = is_healthy
+        crr.post_repair_results = prr
+        return crr
+
     def check(self, monitor, verify=False, add_lease=False):
         verifycap = self._verifycap
         sb = self._storage_broker
-        servers = sb.get_all_servers()
+        servers = sb.get_connected_servers()
         sh = self._secret_holder
 
         v = Checker(verifycap=verifycap, servers=servers,
@@ -177,7 +188,8 @@ class DecryptingConsumer:
 
     def __init__(self, consumer, readkey, offset):
         self._consumer = consumer
-        self._read_event = None
+        self._read_ev = None
+        self._download_status = None
         # TODO: pycryptopp CTR-mode needs random-access operations: I want
         # either a=AES(readkey, offset) or better yet both of:
         #  a=AES(readkey, offset=0)
@@ -190,7 +202,9 @@ class DecryptingConsumer:
         self._decryptor.process("\x00"*offset_small)
 
     def set_download_status_read_event(self, read_ev):
-        self._read_event = read_ev
+        self._read_ev = read_ev
+    def set_download_status(self, ds):
+        self._download_status = ds
 
     def registerProducer(self, producer, streaming):
         # this passes through, so the real consumer can flow-control the real
@@ -203,9 +217,11 @@ class DecryptingConsumer:
     def write(self, ciphertext):
         started = now()
         plaintext = self._decryptor.process(ciphertext)
-        if self._read_event:
+        if self._read_ev:
             elapsed = now() - started
-            self._read_event.update(0, elapsed, 0)
+            self._read_ev.update(0, elapsed, 0)
+        if self._download_status:
+            self._download_status.add_misc_event("AES", started, now())
         self._consumer.write(plaintext)
 
 class ImmutableFileNode:
@@ -288,3 +304,33 @@ class ImmutableFileNode:
         return self._cnode.check_and_repair(monitor, verify, add_lease)
     def check(self, monitor, verify=False, add_lease=False):
         return self._cnode.check(monitor, verify, add_lease)
+
+    def get_best_readable_version(self):
+        """
+        Return an IReadable of the best version of this file. Since
+        immutable files can have only one version, we just return the
+        current filenode.
+        """
+        return defer.succeed(self)
+
+
+    def download_best_version(self):
+        """
+        Download the best version of this file, returning its contents
+        as a bytestring. Since there is only one version of an immutable
+        file, we download and return the contents of this file.
+        """
+        d = consumer.download_to_data(self)
+        return d
+
+    # for an immutable file, download_to_data (specified in IReadable)
+    # is the same as download_best_version (specified in IFileNode). For
+    # mutable files, the difference is more meaningful, since they can
+    # have multiple versions.
+    download_to_data = download_best_version
+
+
+    # get_size() (IReadable), get_current_size() (IFilesystemNode), and
+    # get_size_of_best_version(IFileNode) are all the same for immutable
+    # files.
+    get_size_of_best_version = get_current_size