import binascii
-import copy
import time
now = time.time
from zope.interface import implements
from twisted.internet import defer
-from twisted.internet.interfaces import IConsumer
-from allmydata.interfaces import IImmutableFileNode, IUploadResults
from allmydata import uri
+from twisted.internet.interfaces import IConsumer
+from allmydata.interfaces import IImmutableFileNode, IUploadResults
+from allmydata.util import consumer
from allmydata.check_results import CheckResults, CheckAndRepairResults
from allmydata.util.dictutil import DictOfSets
from pycryptopp.cipher.aes import AES
# local imports
from allmydata.immutable.checker import Checker
from allmydata.immutable.repairer import Repairer
-from allmydata.immutable.downloader.node import DownloadNode
+from allmydata.immutable.downloader.node import DownloadNode, \
+ IDownloadStatusHandlingConsumer
from allmydata.immutable.downloader.status import DownloadStatus
class CiphertextFileNode:
def __init__(self, verifycap, storage_broker, secret_holder,
- terminator, history, download_status=None):
+ terminator, history):
assert isinstance(verifycap, uri.CHKFileVerifierURI)
self._verifycap = verifycap
self._storage_broker = storage_broker
self._secret_holder = secret_holder
- if download_status is None:
- ds = DownloadStatus(verifycap.storage_index, verifycap.size)
- if history:
- history.add_download(ds)
- download_status = ds
self._terminator = terminator
self._history = history
- self._download_status = download_status
+ self._download_status = None
self._node = None # created lazily, on read()
def _maybe_create_download_node(self):
+ if not self._download_status:
+ ds = DownloadStatus(self._verifycap.storage_index,
+ self._verifycap.size)
+ if self._history:
+ self._history.add_download(ds)
+ self._download_status = ds
if self._node is None:
self._node = DownloadNode(self._verifycap, self._storage_broker,
self._secret_holder,
self._terminator,
self._history, self._download_status)
- def read(self, consumer, offset=0, size=None, read_ev=None):
+ def read(self, consumer, offset=0, size=None):
"""I am the main entry point, from which FileNode.read() can get
data. I feed the consumer with the desired range of ciphertext. I
return a Deferred that fires (with the consumer) when the read is
finished."""
self._maybe_create_download_node()
- return self._node.read(consumer, offset, size, read_ev)
+ return self._node.read(consumer, offset, size)
def get_segment(self, segnum):
"""Begin downloading a segment. I return a tuple (d, c): 'd' is a
def raise_error(self):
pass
+ def is_mutable(self):
+ return False
def check_and_repair(self, monitor, verify=False, add_lease=False):
- verifycap = self._verifycap
- storage_index = verifycap.storage_index
- sb = self._storage_broker
- servers = sb.get_all_servers()
- sh = self._secret_holder
-
- c = Checker(verifycap=verifycap, servers=servers,
- verify=verify, add_lease=add_lease, secret_holder=sh,
+ c = Checker(verifycap=self._verifycap,
+ servers=self._storage_broker.get_connected_servers(),
+ verify=verify, add_lease=add_lease,
+ secret_holder=self._secret_holder,
monitor=monitor)
d = c.start()
- def _maybe_repair(cr):
- crr = CheckAndRepairResults(storage_index)
- crr.pre_repair_results = cr
- if cr.is_healthy():
- crr.post_repair_results = cr
- return defer.succeed(crr)
- else:
- crr.repair_attempted = True
- crr.repair_successful = False # until proven successful
- def _gather_repair_results(ur):
- assert IUploadResults.providedBy(ur), ur
- # clone the cr (check results) to form the basis of the
- # prr (post-repair results)
- prr = CheckResults(cr.uri, cr.storage_index)
- prr.data = copy.deepcopy(cr.data)
-
- sm = prr.data['sharemap']
- assert isinstance(sm, DictOfSets), sm
- sm.update(ur.sharemap)
- servers_responding = set(prr.data['servers-responding'])
- servers_responding.union(ur.sharemap.iterkeys())
- prr.data['servers-responding'] = list(servers_responding)
- prr.data['count-shares-good'] = len(sm)
- prr.data['count-good-share-hosts'] = len(sm)
- is_healthy = bool(len(sm) >= verifycap.total_shares)
- is_recoverable = bool(len(sm) >= verifycap.needed_shares)
- prr.set_healthy(is_healthy)
- prr.set_recoverable(is_recoverable)
- crr.repair_successful = is_healthy
- prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
-
- crr.post_repair_results = prr
- return crr
- def _repair_error(f):
- # as with mutable repair, I'm not sure if I want to pass
- # through a failure or not. TODO
- crr.repair_successful = False
- crr.repair_failure = f
- return f
- r = Repairer(self, storage_broker=sb, secret_holder=sh,
- monitor=monitor)
- d = r.start()
- d.addCallbacks(_gather_repair_results, _repair_error)
- return d
-
- d.addCallback(_maybe_repair)
+ d.addCallback(self._maybe_repair, monitor)
return d
+ def _maybe_repair(self, cr, monitor):
+ crr = CheckAndRepairResults(self._verifycap.storage_index)
+ crr.pre_repair_results = cr
+ if cr.is_healthy():
+ crr.post_repair_results = cr
+ return defer.succeed(crr)
+
+ crr.repair_attempted = True
+ crr.repair_successful = False # until proven successful
+ def _repair_error(f):
+ # as with mutable repair, I'm not sure if I want to pass
+ # through a failure or not. TODO
+ crr.repair_successful = False
+ crr.repair_failure = f
+ return f
+ r = Repairer(self, storage_broker=self._storage_broker,
+ secret_holder=self._secret_holder,
+ monitor=monitor)
+ d = r.start()
+ d.addCallbacks(self._gather_repair_results, _repair_error,
+ callbackArgs=(cr, crr,))
+ return d
+
+ def _gather_repair_results(self, ur, cr, crr):
+ assert IUploadResults.providedBy(ur), ur
+ # clone the cr (check results) to form the basis of the
+ # prr (post-repair results)
+
+ verifycap = self._verifycap
+ servers_responding = set(cr.get_servers_responding())
+ sm = DictOfSets()
+ assert isinstance(cr.get_sharemap(), DictOfSets)
+ for shnum, servers in cr.get_sharemap().items():
+ for server in servers:
+ sm.add(shnum, server)
+ for shnum, servers in ur.get_sharemap().items():
+ for server in servers:
+ sm.add(shnum, server)
+ servers_responding.add(server)
+ servers_responding = sorted(servers_responding)
+
+ good_hosts = len(reduce(set.union, sm.values(), set()))
+ is_healthy = bool(len(sm) >= verifycap.total_shares)
+ is_recoverable = bool(len(sm) >= verifycap.needed_shares)
+
+ # TODO: this may be wrong, see ticket #1115 comment:27 and ticket #1784.
+ needs_rebalancing = bool(len(sm) >= verifycap.total_shares)
+
+ prr = CheckResults(cr.get_uri(), cr.get_storage_index(),
+ healthy=is_healthy, recoverable=is_recoverable,
+ needs_rebalancing=needs_rebalancing,
+ count_shares_needed=verifycap.needed_shares,
+ count_shares_expected=verifycap.total_shares,
+ count_shares_good=len(sm),
+ count_good_share_hosts=good_hosts,
+ count_recoverable_versions=int(is_recoverable),
+ count_unrecoverable_versions=int(not is_recoverable),
+ servers_responding=list(servers_responding),
+ sharemap=sm,
+ count_wrong_shares=0, # no such thing as wrong, for immutable
+ list_corrupt_shares=cr.get_corrupt_shares(),
+ count_corrupt_shares=len(cr.get_corrupt_shares()),
+ list_incompatible_shares=cr.get_incompatible_shares(),
+ count_incompatible_shares=len(cr.get_incompatible_shares()),
+ summary="",
+ report=[],
+ share_problems=[],
+ servermap=None)
+ crr.repair_successful = is_healthy
+ crr.post_repair_results = prr
+ return crr
+
def check(self, monitor, verify=False, add_lease=False):
verifycap = self._verifycap
sb = self._storage_broker
- servers = sb.get_all_servers()
+ servers = sb.get_connected_servers()
sh = self._secret_holder
v = Checker(verifycap=verifycap, servers=servers,
monitor=monitor)
return v.start()
-
class DecryptingConsumer:
"""I sit between a CiphertextDownloader (which acts as a Producer) and
the real Consumer, decrypting everything that passes by. The real
Consumer sees the real Producer, but the Producer sees us instead of the
real consumer."""
- implements(IConsumer)
+ implements(IConsumer, IDownloadStatusHandlingConsumer)
- def __init__(self, consumer, readkey, offset, read_event):
+ def __init__(self, consumer, readkey, offset):
self._consumer = consumer
- self._read_event = read_event
+ self._read_ev = None
+ self._download_status = None
# TODO: pycryptopp CTR-mode needs random-access operations: I want
# either a=AES(readkey, offset) or better yet both of:
# a=AES(readkey, offset=0)
self._decryptor = AES(readkey, iv=iv)
self._decryptor.process("\x00"*offset_small)
+ def set_download_status_read_event(self, read_ev):
+ self._read_ev = read_ev
+ def set_download_status(self, ds):
+ self._download_status = ds
+
def registerProducer(self, producer, streaming):
# this passes through, so the real consumer can flow-control the real
# producer. Therefore we don't need to provide any IPushProducer
def write(self, ciphertext):
started = now()
plaintext = self._decryptor.process(ciphertext)
- elapsed = now() - started
- self._read_event.update(0, elapsed, 0)
+ if self._read_ev:
+ elapsed = now() - started
+ self._read_ev.update(0, elapsed, 0)
+ if self._download_status:
+ self._download_status.add_misc_event("AES", started, now())
self._consumer.write(plaintext)
class ImmutableFileNode:
history):
assert isinstance(filecap, uri.CHKFileURI)
verifycap = filecap.get_verify_cap()
- ds = DownloadStatus(verifycap.storage_index, verifycap.size)
- if history:
- history.add_download(ds)
- self._download_status = ds
self._cnode = CiphertextFileNode(verifycap, storage_broker,
- secret_holder, terminator, history, ds)
+ secret_holder, terminator, history)
assert isinstance(filecap, uri.CHKFileURI)
self.u = filecap
self._readkey = filecap.key
return True
def read(self, consumer, offset=0, size=None):
- actual_size = size
- if actual_size == None:
- actual_size = self.u.size
- actual_size = actual_size - offset
- read_ev = self._download_status.add_read_event(offset,actual_size,
- now())
- decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev)
- d = self._cnode.read(decryptor, offset, size, read_ev)
+ decryptor = DecryptingConsumer(consumer, self._readkey, offset)
+ d = self._cnode.read(decryptor, offset, size)
d.addCallback(lambda dc: consumer)
return d
return self._cnode.check_and_repair(monitor, verify, add_lease)
def check(self, monitor, verify=False, add_lease=False):
return self._cnode.check(monitor, verify, add_lease)
+
+ def get_best_readable_version(self):
+ """
+ Return an IReadable of the best version of this file. Since
+ immutable files can have only one version, we just return the
+ current filenode.
+ """
+ return defer.succeed(self)
+
+
+ def download_best_version(self):
+ """
+ Download the best version of this file, returning its contents
+ as a bytestring. Since there is only one version of an immutable
+ file, we download and return the contents of this file.
+ """
+ d = consumer.download_to_data(self)
+ return d
+
+ # for an immutable file, download_to_data (specified in IReadable)
+ # is the same as download_best_version (specified in IFileNode). For
+ # mutable files, the difference is more meaningful, since they can
+ # have multiple versions.
+ download_to_data = download_best_version
+
+
+ # get_size() (IReadable), get_current_size() (IFilesystemNode), and
+ # get_size_of_best_version(IFileNode) are all the same for immutable
+ # files.
+ get_size_of_best_version = get_current_size