import copy
import time
now = time.time
-from zope.interface import implements
+from zope.interface import implements, Interface
from twisted.internet import defer
from twisted.internet.interfaces import IConsumer
from allmydata.immutable.downloader.node import DownloadNode
from allmydata.immutable.downloader.status import DownloadStatus
+class IDownloadStatusHandlingConsumer(Interface):
+ def set_download_status_read_event(read_ev):
+ """Record the DownloadStatus 'read event', to be updated with the
+ time it takes to decrypt each chunk of data."""
+
class CiphertextFileNode:
def __init__(self, verifycap, storage_broker, secret_holder,
- terminator, history, download_status=None):
+ terminator, history):
assert isinstance(verifycap, uri.CHKFileVerifierURI)
self._verifycap = verifycap
self._storage_broker = storage_broker
self._secret_holder = secret_holder
- if download_status is None:
- ds = DownloadStatus(verifycap.storage_index, verifycap.size)
- if history:
- history.add_download(ds)
- download_status = ds
self._terminator = terminator
self._history = history
- self._download_status = download_status
+ self._download_status = None
self._node = None # created lazily, on read()
def _maybe_create_download_node(self):
+ if not self._download_status:
+ ds = DownloadStatus(self._verifycap.storage_index,
+ self._verifycap.size)
+ if self._history:
+ self._history.add_download(ds)
+ self._download_status = ds
if self._node is None:
self._node = DownloadNode(self._verifycap, self._storage_broker,
self._secret_holder,
self._terminator,
self._history, self._download_status)
- def read(self, consumer, offset=0, size=None, read_ev=None):
+ def read(self, consumer, offset=0, size=None):
"""I am the main entry point, from which FileNode.read() can get
data. I feed the consumer with the desired range of ciphertext. I
return a Deferred that fires (with the consumer) when the read is
finished."""
self._maybe_create_download_node()
+ actual_size = size
+ if actual_size is None:
+ actual_size = self._verifycap.size - offset
+ read_ev = self._download_status.add_read_event(offset, actual_size,
+ now())
+ if IDownloadStatusHandlingConsumer.providedBy(consumer):
+ consumer.set_download_status_read_event(read_ev)
return self._node.read(consumer, offset, size, read_ev)
def get_segment(self, segnum):
monitor=monitor)
return v.start()
-
class DecryptingConsumer:
"""I sit between a CiphertextDownloader (which acts as a Producer) and
the real Consumer, decrypting everything that passes by. The real
Consumer sees the real Producer, but the Producer sees us instead of the
real consumer."""
- implements(IConsumer)
+ implements(IConsumer, IDownloadStatusHandlingConsumer)
- def __init__(self, consumer, readkey, offset, read_event):
+ def __init__(self, consumer, readkey, offset):
self._consumer = consumer
- self._read_event = read_event
+ self._read_event = None
# TODO: pycryptopp CTR-mode needs random-access operations: I want
# either a=AES(readkey, offset) or better yet both of:
# a=AES(readkey, offset=0)
self._decryptor = AES(readkey, iv=iv)
self._decryptor.process("\x00"*offset_small)
+ def set_download_status_read_event(self, read_ev):
+ self._read_event = read_ev
+
def registerProducer(self, producer, streaming):
# this passes through, so the real consumer can flow-control the real
# producer. Therefore we don't need to provide any IPushProducer
def write(self, ciphertext):
started = now()
plaintext = self._decryptor.process(ciphertext)
- elapsed = now() - started
- self._read_event.update(0, elapsed, 0)
+ if self._read_event:
+ elapsed = now() - started
+ self._read_event.update(0, elapsed, 0)
self._consumer.write(plaintext)
class ImmutableFileNode:
history):
assert isinstance(filecap, uri.CHKFileURI)
verifycap = filecap.get_verify_cap()
- ds = DownloadStatus(verifycap.storage_index, verifycap.size)
- if history:
- history.add_download(ds)
- self._download_status = ds
self._cnode = CiphertextFileNode(verifycap, storage_broker,
- secret_holder, terminator, history, ds)
+ secret_holder, terminator, history)
assert isinstance(filecap, uri.CHKFileURI)
self.u = filecap
self._readkey = filecap.key
return True
def read(self, consumer, offset=0, size=None):
- actual_size = size
- if actual_size == None:
- actual_size = self.u.size
- actual_size = actual_size - offset
- read_ev = self._download_status.add_read_event(offset,actual_size,
- now())
- decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev)
- d = self._cnode.read(decryptor, offset, size, read_ev)
+ decryptor = DecryptingConsumer(consumer, self._readkey, offset)
+ d = self._cnode.read(decryptor, offset, size)
d.addCallback(lambda dc: consumer)
return d