from twisted.internet import defer
from twisted.internet.interfaces import IPushProducer, IConsumer
from twisted.protocols import basic
-from allmydata.interfaces import IFileNode, IFileURI, ICheckable
-from allmydata.util import observer, log, base32
+from foolscap.eventual import eventually
+from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
+ IDownloadTarget
+from allmydata.util import log, base32
from allmydata.immutable.checker import SimpleCHKFileChecker, \
SimpleCHKFileVerifier
from allmydata.immutable import download
self.bytes_left -= len(data)
return data
-class FileNode(_ImmutableFileNodeBase):
- checker_class = SimpleCHKFileChecker
- verifier_class = SimpleCHKFileVerifier
+class CacheFile:
+ implements(IDownloadTarget)
- def __init__(self, uri, client, cachefile):
- _ImmutableFileNodeBase.__init__(self, uri, client)
- self.cachefile = cachefile
+ def __init__(self, node, filename):
+ self.node = node
+ self.milestones = set() # of (offset,size,Deferred)
+ self.cachefilename = filename
+ self.download_in_progress = False
# five states:
# new FileNode, no downloads ever performed
# new FileNode, leftover file (partial)
# new FileNode, leftover file (whole)
# download in progress, not yet complete
# download complete
+
+ def when_range_available(self, offset, size):
+ assert isinstance(offset, (int,long))
+ assert isinstance(size, (int,long))
+
+ d = defer.Deferred()
+ self.milestones.add( (offset,size,d) )
+ self._check_milestones()
+ if self.milestones and not self.download_in_progress:
+ self.download_in_progress = True
+ log.msg(format=("immutable filenode read [%(si)s]: " +
+ "starting download"),
+ si=base32.b2a(self.node.u.storage_index),
+ umid="h26Heg", level=log.OPERATIONAL)
+ downloader = self.node._client.getServiceNamed("downloader")
+ d2 = downloader.download(self.node.get_uri(), self)
+ d2.addBoth(self._download_done)
+ d2.addErrback(self._download_failed)
+ d2.addErrback(log.err, umid="cQaM9g")
+ return d
+
+ def read(self, consumer, offset, size):
+ assert offset+size <= self.get_filesize()
+ f = PortionOfFile(self.cachefilename, offset, size)
+ d = basic.FileSender().beginFileTransfer(f, consumer)
+ d.addCallback(lambda lastSent: consumer)
+ return d
+
+ def _download_done(self, res):
+ # clear download_in_progress, so failed downloads can be re-tried
self.download_in_progress = False
- self.fully_cached_observer = observer.OneShotObserverList()
+ return res
+
+ def _download_failed(self, f):
+ # tell anyone who's waiting that we failed
+ for m in self.milestones:
+ (offset,size,d) = m
+ eventually(d.errback, f)
+ self.milestones.clear()
+
+ def _check_milestones(self):
+ current_size = self.get_filesize()
+ for m in list(self.milestones):
+ (offset,size,d) = m
+ if offset+size <= current_size:
+ log.msg(format=("immutable filenode read [%(si)s] " +
+ "%(offset)d+%(size)d vs %(filesize)d: " +
+ "done"),
+ si=base32.b2a(self.node.u.storage_index),
+ offset=offset, size=size, filesize=current_size,
+ umid="nuedUg", level=log.NOISY)
+ self.milestones.discard(m)
+ eventually(d.callback, None)
+ else:
+ log.msg(format=("immutable filenode read [%(si)s] " +
+ "%(offset)d+%(size)d vs %(filesize)d: " +
+ "still waiting"),
+ si=base32.b2a(self.node.u.storage_index),
+ offset=offset, size=size, filesize=current_size,
+ umid="8PKOhg", level=log.NOISY)
+
+ def get_filesize(self):
+ try:
+ filesize = os.stat(self.cachefilename)[stat.ST_SIZE]
+ except OSError:
+ filesize = 0
+ return filesize
+
+
+ def open(self, size):
+ self.f = open(self.cachefilename, "wb")
+
+ def write(self, data):
+ self.f.write(data)
+ self._check_milestones()
+
+ def close(self):
+ self.f.close()
+ self._check_milestones()
+
+ def fail(self, why):
+ pass
+ def register_canceller(self, cb):
+ pass
+ def finish(self):
+ return None
+
+
+
+class FileNode(_ImmutableFileNodeBase):
+ checker_class = SimpleCHKFileChecker
+ verifier_class = SimpleCHKFileVerifier
+
+ def __init__(self, uri, client, cachefile):
+ _ImmutableFileNodeBase.__init__(self, uri, client)
+ self.cachefile = CacheFile(self, cachefile)
def get_uri(self):
return self.u.to_string()
if size is None:
size = self.get_size() - offset
- assert self.cachefile
-
- try:
- filesize = os.stat(self.cachefile)[stat.ST_SIZE]
- except OSError:
- filesize = 0
- if filesize >= offset+size:
- log.msg(format=("immutable filenode read [%(si)s]: " +
- "satisfied from cache " +
- "(read %(start)d+%(size)d, filesize %(filesize)d)"),
- si=base32.b2a(self.u.storage_index),
- start=offset, size=size, filesize=filesize,
- umid="5p5ECA", level=log.OPERATIONAL)
- f = PortionOfFile(self.cachefile, offset, size)
- d = basic.FileSender().beginFileTransfer(f, consumer)
- d.addCallback(lambda lastSent: consumer)
- return d
if offset == 0 and size == self.get_size():
# don't use the cache, just do a normal streaming download
umid="VRSBwg", level=log.OPERATIONAL)
return self.download(download.ConsumerAdapter(consumer))
- if not self.download_in_progress:
- log.msg(format=("immutable filenode read [%(si)s]: " +
- "starting download"),
- si=base32.b2a(self.u.storage_index),
- umid="h26Heg", level=log.OPERATIONAL)
- self.start_download_to_cache()
-
- # The file is being downloaded, but the portion we want isn't yet
- # available, so we have to wait. First cut: wait for the whole thing
- # to download. The second cut will be to wait for a specific range
- # milestone, with a download target that counts bytes and compares
- # them against a milestone list.
- log.msg(format=("immutable filenode read [%(si)s]: " +
- "waiting for download"),
- si=base32.b2a(self.u.storage_index),
- umid="l48V7Q", level=log.OPERATIONAL)
- d = self.when_fully_cached()
- d.addCallback(lambda ignored: self.read(consumer, offset, size))
+ d = self.cachefile.when_range_available(offset, size)
+ d.addCallback(lambda res: self.cachefile.read(consumer, offset, size))
return d
- def start_download_to_cache(self):
- assert not self.download_in_progress
- self.download_in_progress = True
- downloader = self._client.getServiceNamed("downloader")
- d = downloader.download_to_filename(self.get_uri(), self.cachefile)
- d.addBoth(self.fully_cached_observer.fire)
-
- def when_fully_cached(self):
- return self.fully_cached_observer.when_fired()
-
-
def download(self, target):
downloader = self._client.getServiceNamed("downloader")
return downloader.download(self.get_uri(), target)