-import os.path, stat
-from cStringIO import StringIO
+import binascii
+import time
+now = time.time
from zope.interface import implements
from twisted.internet import defer
-from twisted.internet.interfaces import IPushProducer, IConsumer
-from twisted.protocols import basic
-from foolscap.eventual import eventually
-from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
- IDownloadTarget
-from allmydata.util import log, base32
-from allmydata.immutable.checker import SimpleCHKFileChecker, \
- SimpleCHKFileVerifier
-from allmydata.immutable import download
-
-class _ImmutableFileNodeBase(object):
- implements(IFileNode, ICheckable)
-
- def __init__(self, uri, client):
- self.u = IFileURI(uri)
- self._client = client
- def get_readonly_uri(self):
- return self.get_uri()
+from allmydata import uri
+from twisted.internet.interfaces import IConsumer
+from allmydata.interfaces import IImmutableFileNode, IUploadResults
+from allmydata.util import consumer
+from allmydata.check_results import CheckResults, CheckAndRepairResults
+from allmydata.util.dictutil import DictOfSets
+from pycryptopp.cipher.aes import AES
+
+# local imports
+from allmydata.immutable.checker import Checker
+from allmydata.immutable.repairer import Repairer
+from allmydata.immutable.downloader.node import DownloadNode, \
+ IDownloadStatusHandlingConsumer
+from allmydata.immutable.downloader.status import DownloadStatus
+
+class CiphertextFileNode:
+ def __init__(self, verifycap, storage_broker, secret_holder,
+ terminator, history):
+ assert isinstance(verifycap, uri.CHKFileVerifierURI)
+ self._verifycap = verifycap
+ self._storage_broker = storage_broker
+ self._secret_holder = secret_holder
+ self._terminator = terminator
+ self._history = history
+ self._download_status = None
+ self._node = None # created lazily, on read()
+
+ def _maybe_create_download_node(self):
+ if not self._download_status:
+ ds = DownloadStatus(self._verifycap.storage_index,
+ self._verifycap.size)
+ if self._history:
+ self._history.add_download(ds)
+ self._download_status = ds
+ if self._node is None:
+ self._node = DownloadNode(self._verifycap, self._storage_broker,
+ self._secret_holder,
+ self._terminator,
+ self._history, self._download_status)
+
+ def read(self, consumer, offset=0, size=None):
+ """I am the main entry point, from which FileNode.read() can get
+ data. I feed the consumer with the desired range of ciphertext. I
+ return a Deferred that fires (with the consumer) when the read is
+ finished."""
+ self._maybe_create_download_node()
+ return self._node.read(consumer, offset, size)
+
+ def get_segment(self, segnum):
+ """Begin downloading a segment. I return a tuple (d, c): 'd' is a
+ Deferred that fires with (offset,data) when the desired segment is
+ available, and c is an object on which c.cancel() can be called to
+ disavow interest in the segment (after which 'd' will never fire).
+
+ You probably need to know the segment size before calling this,
+ unless you want the first few bytes of the file. If you ask for a
+ segment number which turns out to be too large, the Deferred will
+ errback with BadSegmentNumberError.
+
+ The Deferred fires with the offset of the first byte of the data
+ segment, so that you can call get_segment() before knowing the
+ segment size, and still know which data you received.
+ """
+ self._maybe_create_download_node()
+ return self._node.get_segment(segnum)
+
+ def get_segment_size(self):
+ # return a Deferred that fires with the file's real segment size
+ self._maybe_create_download_node()
+ return self._node.get_segsize()
+
+ def get_storage_index(self):
+ return self._verifycap.storage_index
+ def get_verify_cap(self):
+ return self._verifycap
+ def get_size(self):
+ return self._verifycap.size
+
+ def raise_error(self):
+ pass
def is_mutable(self):
return False
- def is_readonly(self):
- return True
+ def check_and_repair(self, monitor, verify=False, add_lease=False):
+ c = Checker(verifycap=self._verifycap,
+ servers=self._storage_broker.get_connected_servers(),
+ verify=verify, add_lease=add_lease,
+ secret_holder=self._secret_holder,
+ monitor=monitor)
+ d = c.start()
+ d.addCallback(self._maybe_repair, monitor)
+ return d
+
+ def _maybe_repair(self, cr, monitor):
+ crr = CheckAndRepairResults(self._verifycap.storage_index)
+ crr.pre_repair_results = cr
+ if cr.is_healthy():
+ crr.post_repair_results = cr
+ return defer.succeed(crr)
+
+ crr.repair_attempted = True
+ crr.repair_successful = False # until proven successful
+ def _repair_error(f):
+ # as with mutable repair, I'm not sure if I want to pass
+ # through a failure or not. TODO
+ crr.repair_successful = False
+ crr.repair_failure = f
+ return f
+ r = Repairer(self, storage_broker=self._storage_broker,
+ secret_holder=self._secret_holder,
+ monitor=monitor)
+ d = r.start()
+ d.addCallbacks(self._gather_repair_results, _repair_error,
+ callbackArgs=(cr, crr,))
+ return d
+
+ def _gather_repair_results(self, ur, cr, crr):
+ assert IUploadResults.providedBy(ur), ur
+ # clone the cr (check results) to form the basis of the
+ # prr (post-repair results)
+
+ verifycap = self._verifycap
+ servers_responding = set(cr.get_servers_responding())
+ sm = DictOfSets()
+ assert isinstance(cr.get_sharemap(), DictOfSets)
+ for shnum, servers in cr.get_sharemap().items():
+ for server in servers:
+ sm.add(shnum, server)
+ for shnum, servers in ur.get_sharemap().items():
+ for server in servers:
+ sm.add(shnum, server)
+ servers_responding.add(server)
+ servers_responding = sorted(servers_responding)
+
+ good_hosts = len(reduce(set.union, sm.values(), set()))
+ is_healthy = bool(len(sm) >= verifycap.total_shares)
+ is_recoverable = bool(len(sm) >= verifycap.needed_shares)
+
+ # TODO: this may be wrong, see ticket #1115 comment:27 and ticket #1784.
+ needs_rebalancing = bool(len(sm) >= verifycap.total_shares)
+
+ prr = CheckResults(cr.get_uri(), cr.get_storage_index(),
+ healthy=is_healthy, recoverable=is_recoverable,
+ needs_rebalancing=needs_rebalancing,
+ count_shares_needed=verifycap.needed_shares,
+ count_shares_expected=verifycap.total_shares,
+ count_shares_good=len(sm),
+ count_good_share_hosts=good_hosts,
+ count_recoverable_versions=int(is_recoverable),
+ count_unrecoverable_versions=int(not is_recoverable),
+ servers_responding=list(servers_responding),
+ sharemap=sm,
+ count_wrong_shares=0, # no such thing as wrong, for immutable
+ list_corrupt_shares=cr.get_corrupt_shares(),
+ count_corrupt_shares=len(cr.get_corrupt_shares()),
+ list_incompatible_shares=cr.get_incompatible_shares(),
+ count_incompatible_shares=len(cr.get_incompatible_shares()),
+ summary="",
+ report=[],
+ share_problems=[],
+ servermap=None)
+ crr.repair_successful = is_healthy
+ crr.post_repair_results = prr
+ return crr
+
+ def check(self, monitor, verify=False, add_lease=False):
+ verifycap = self._verifycap
+ sb = self._storage_broker
+ servers = sb.get_connected_servers()
+ sh = self._secret_holder
+
+ v = Checker(verifycap=verifycap, servers=servers,
+ verify=verify, add_lease=add_lease, secret_holder=sh,
+ monitor=monitor)
+ return v.start()
+class DecryptingConsumer:
+ """I sit between a CiphertextDownloader (which acts as a Producer) and
+ the real Consumer, decrypting everything that passes by. The real
+ Consumer sees the real Producer, but the Producer sees us instead of the
+ real consumer."""
+ implements(IConsumer, IDownloadStatusHandlingConsumer)
+
+ def __init__(self, consumer, readkey, offset):
+ self._consumer = consumer
+ self._read_ev = None
+ self._download_status = None
+ # TODO: pycryptopp CTR-mode needs random-access operations: I want
+ # either a=AES(readkey, offset) or better yet both of:
+ # a=AES(readkey, offset=0)
+ # a.process(ciphertext, offset=xyz)
+ # For now, we fake it with the existing iv= argument.
+ offset_big = offset // 16
+ offset_small = offset % 16
+ iv = binascii.unhexlify("%032x" % offset_big)
+ self._decryptor = AES(readkey, iv=iv)
+ self._decryptor.process("\x00"*offset_small)
+
+ def set_download_status_read_event(self, read_ev):
+ self._read_ev = read_ev
+ def set_download_status(self, ds):
+ self._download_status = ds
+
+ def registerProducer(self, producer, streaming):
+ # this passes through, so the real consumer can flow-control the real
+ # producer. Therefore we don't need to provide any IPushProducer
+ # methods. We implement all the IConsumer methods as pass-throughs,
+ # and only intercept write() to perform decryption.
+ self._consumer.registerProducer(producer, streaming)
+ def unregisterProducer(self):
+ self._consumer.unregisterProducer()
+ def write(self, ciphertext):
+ started = now()
+ plaintext = self._decryptor.process(ciphertext)
+ if self._read_ev:
+ elapsed = now() - started
+ self._read_ev.update(0, elapsed, 0)
+ if self._download_status:
+ self._download_status.add_misc_event("AES", started, now())
+ self._consumer.write(plaintext)
+
+class ImmutableFileNode:
+ implements(IImmutableFileNode)
+
+ # I wrap a CiphertextFileNode with a decryption key
+ def __init__(self, filecap, storage_broker, secret_holder, terminator,
+ history):
+ assert isinstance(filecap, uri.CHKFileURI)
+ verifycap = filecap.get_verify_cap()
+ self._cnode = CiphertextFileNode(verifycap, storage_broker,
+ secret_holder, terminator, history)
+ assert isinstance(filecap, uri.CHKFileURI)
+ self.u = filecap
+ self._readkey = filecap.key
+
+ # TODO: I'm not sure about this.. what's the use case for node==node? If
+ # we keep it here, we should also put this on CiphertextFileNode
def __hash__(self):
return self.u.__hash__()
def __eq__(self, other):
- if IFileNode.providedBy(other):
+ if isinstance(other, ImmutableFileNode):
return self.u.__eq__(other.u)
else:
return False
def __ne__(self, other):
- if IFileNode.providedBy(other):
+ if isinstance(other, ImmutableFileNode):
return self.u.__eq__(other.u)
else:
return True
-class PortionOfFile:
- # like a list slice (things[2:14]), but for a file on disk
- def __init__(self, fn, offset=0, size=None):
- self.f = open(fn, "rb")
- self.f.seek(offset)
- self.bytes_left = size
-
- def read(self, size=None):
- # bytes_to_read = min(size, self.bytes_left), but None>anything
- if size is None:
- bytes_to_read = self.bytes_left
- elif self.bytes_left is None:
- bytes_to_read = size
- else:
- bytes_to_read = min(size, self.bytes_left)
- data = self.f.read(bytes_to_read)
- if self.bytes_left is not None:
- self.bytes_left -= len(data)
- return data
-
-class DownloadCache:
- implements(IDownloadTarget)
-
- def __init__(self, node, cachefile):
- self._downloader = node._client.getServiceNamed("downloader")
- self._uri = node.get_uri()
- self._storage_index = node.get_storage_index()
- self.milestones = set() # of (offset,size,Deferred)
- self.cachefile = cachefile
- self.download_in_progress = False
- # five states:
- # new FileNode, no downloads ever performed
- # new FileNode, leftover file (partial)
- # new FileNode, leftover file (whole)
- # download in progress, not yet complete
- # download complete
-
- def when_range_available(self, offset, size):
- assert isinstance(offset, (int,long))
- assert isinstance(size, (int,long))
-
- d = defer.Deferred()
- self.milestones.add( (offset,size,d) )
- self._check_milestones()
- if self.milestones and not self.download_in_progress:
- self.download_in_progress = True
- log.msg(format=("immutable filenode read [%(si)s]: " +
- "starting download"),
- si=base32.b2a(self._storage_index),
- umid="h26Heg", level=log.OPERATIONAL)
- d2 = self._downloader.download(self._uri, self)
- d2.addBoth(self._download_done)
- d2.addErrback(self._download_failed)
- d2.addErrback(log.err, umid="cQaM9g")
- return d
-
- def read(self, consumer, offset, size):
- assert offset+size <= self.get_filesize()
- f = PortionOfFile(self.cachefile.get_filename(), offset, size)
- d = basic.FileSender().beginFileTransfer(f, consumer)
- d.addCallback(lambda lastSent: consumer)
+ def read(self, consumer, offset=0, size=None):
+ decryptor = DecryptingConsumer(consumer, self._readkey, offset)
+ d = self._cnode.read(decryptor, offset, size)
+ d.addCallback(lambda dc: consumer)
return d
- def _download_done(self, res):
- # clear download_in_progress, so failed downloads can be re-tried
- self.download_in_progress = False
- return res
-
- def _download_failed(self, f):
- # tell anyone who's waiting that we failed
- for m in self.milestones:
- (offset,size,d) = m
- eventually(d.errback, f)
- self.milestones.clear()
-
- def _check_milestones(self):
- current_size = self.get_filesize()
- for m in list(self.milestones):
- (offset,size,d) = m
- if offset+size <= current_size:
- log.msg(format=("immutable filenode read [%(si)s] " +
- "%(offset)d+%(size)d vs %(filesize)d: " +
- "done"),
- si=base32.b2a(self._storage_index),
- offset=offset, size=size, filesize=current_size,
- umid="nuedUg", level=log.NOISY)
- self.milestones.discard(m)
- eventually(d.callback, None)
- else:
- log.msg(format=("immutable filenode read [%(si)s] " +
- "%(offset)d+%(size)d vs %(filesize)d: " +
- "still waiting"),
- si=base32.b2a(self._storage_index),
- offset=offset, size=size, filesize=current_size,
- umid="8PKOhg", level=log.NOISY)
-
- def get_filesize(self):
- try:
- filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
- except OSError:
- filesize = 0
- return filesize
-
-
- def open(self, size):
- self.f = open(self.cachefile.get_filename(), "wb")
-
- def write(self, data):
- self.f.write(data)
- self._check_milestones()
-
- def close(self):
- self.f.close()
- self._check_milestones()
-
- def fail(self, why):
+ def raise_error(self):
pass
- def register_canceller(self, cb):
- pass
- def finish(self):
- return None
-
-
-class FileNode(_ImmutableFileNodeBase):
- checker_class = SimpleCHKFileChecker
- verifier_class = SimpleCHKFileVerifier
+ def get_write_uri(self):
+ return None
- def __init__(self, uri, client, cachefile):
- _ImmutableFileNodeBase.__init__(self, uri, client)
- self.download_cache = DownloadCache(self, cachefile)
+ def get_readonly_uri(self):
+ return self.get_uri()
def get_uri(self):
return self.u.to_string()
-
- def get_size(self):
- return self.u.get_size()
-
- def get_verifier(self):
- return self.u.get_verifier()
+ def get_cap(self):
+ return self.u
+ def get_readcap(self):
+ return self.u.get_readonly()
+ def get_verify_cap(self):
+ return self.u.get_verify_cap()
+ def get_repair_cap(self):
+ # CHK files can be repaired with just the verifycap
+ return self.u.get_verify_cap()
def get_storage_index(self):
- return self.u.storage_index
-
- def check(self, monitor, verify=False):
- # TODO: pass the Monitor to SimpleCHKFileChecker or
- # SimpleCHKFileVerifier, have it call monitor.raise_if_cancelled()
- # before sending each request.
- storage_index = self.u.storage_index
- k = self.u.needed_shares
- N = self.u.total_shares
- size = self.u.size
- ueb_hash = self.u.uri_extension_hash
- if verify:
- v = self.verifier_class(self._client,
- self.get_uri(), storage_index,
- k, N, size, ueb_hash)
- else:
- v = self.checker_class(self._client,
- self.get_uri(), storage_index,
- k, N)
- return v.start()
-
- def check_and_repair(self, monitor, verify=False):
- # this is a stub, to allow the deep-check tests to pass.
- #raise NotImplementedError("not implemented yet")
- from allmydata.checker_results import CheckAndRepairResults
- cr = CheckAndRepairResults(self.u.storage_index)
- d = self.check(verify)
- def _done(r):
- cr.pre_repair_results = cr.post_repair_results = r
- cr.repair_attempted = False
- return cr
- d.addCallback(_done)
- return d
-
- def read(self, consumer, offset=0, size=None):
- if size is None:
- size = self.get_size() - offset
- size = min(size, self.get_size() - offset)
-
- if offset == 0 and size == self.get_size():
- # don't use the cache, just do a normal streaming download
- log.msg(format=("immutable filenode read [%(si)s]: " +
- "doing normal full download"),
- si=base32.b2a(self.u.storage_index),
- umid="VRSBwg", level=log.OPERATIONAL)
- return self.download(download.ConsumerAdapter(consumer))
-
- d = self.download_cache.when_range_available(offset, size)
- d.addCallback(lambda res:
- self.download_cache.read(consumer, offset, size))
- return d
-
- def download(self, target):
- downloader = self._client.getServiceNamed("downloader")
- return downloader.download(self.get_uri(), target)
-
- def download_to_data(self):
- downloader = self._client.getServiceNamed("downloader")
- return downloader.download_to_data(self.get_uri())
-
-class LiteralProducer:
- implements(IPushProducer)
- def resumeProducing(self):
- pass
- def stopProducing(self):
- pass
-
-
-class LiteralFileNode(_ImmutableFileNodeBase):
-
- def __init__(self, uri, client):
- _ImmutableFileNodeBase.__init__(self, uri, client)
-
- def get_uri(self):
- return self.u.to_string()
+ return self.u.get_storage_index()
def get_size(self):
- return len(self.u.data)
+ return self.u.get_size()
+ def get_current_size(self):
+ return defer.succeed(self.get_size())
- def get_verifier(self):
- return None
+ def is_mutable(self):
+ return False
- def get_storage_index(self):
- return None
+ def is_readonly(self):
+ return True
- def check(self, monitor, verify=False):
- return defer.succeed(None)
+ def is_unknown(self):
+ return False
- def check_and_repair(self, monitor, verify=False):
- return defer.succeed(None)
+ def is_allowed_in_immutable_directory(self):
+ return True
- def read(self, consumer, offset=0, size=None):
- if size is None:
- data = self.u.data[offset:]
- else:
- data = self.u.data[offset:offset+size]
-
- # We use twisted.protocols.basic.FileSender, which only does
- # non-streaming, i.e. PullProducer, where the receiver/consumer must
- # ask explicitly for each chunk of data. There are only two places in
- # the Twisted codebase that can't handle streaming=False, both of
- # which are in the upload path for an FTP/SFTP server
- # (protocols.ftp.FileConsumer and
- # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
- # likely to be used as the target for a Tahoe download.
-
- d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
- d.addCallback(lambda lastSent: consumer)
+ def check_and_repair(self, monitor, verify=False, add_lease=False):
+ return self._cnode.check_and_repair(monitor, verify, add_lease)
+ def check(self, monitor, verify=False, add_lease=False):
+ return self._cnode.check(monitor, verify, add_lease)
+
+ def get_best_readable_version(self):
+ """
+ Return an IReadable of the best version of this file. Since
+ immutable files can have only one version, we just return the
+ current filenode.
+ """
+ return defer.succeed(self)
+
+
+ def download_best_version(self):
+ """
+ Download the best version of this file, returning its contents
+ as a bytestring. Since there is only one version of an immutable
+ file, we download and return the contents of this file.
+ """
+ d = consumer.download_to_data(self)
return d
- def download(self, target):
- # note that this does not update the stats_provider
- data = self.u.data
- if IConsumer.providedBy(target):
- target.registerProducer(LiteralProducer(), True)
- target.open(len(data))
- target.write(data)
- if IConsumer.providedBy(target):
- target.unregisterProducer()
- target.close()
- return defer.maybeDeferred(target.finish)
-
- def download_to_data(self):
- data = self.u.data
- return defer.succeed(data)
+ # for an immutable file, download_to_data (specified in IReadable)
+ # is the same as download_best_version (specified in IFileNode). For
+ # mutable files, the difference is more meaningful, since they can
+ # have multiple versions.
+ download_to_data = download_best_version
+
+
+ # get_size() (IReadable), get_current_size() (IFilesystemNode), and
+ # get_size_of_best_version(IFileNode) are all the same for immutable
+ # files.
+ get_size_of_best_version = get_current_size