From: Brian Warner Date: Thu, 30 Oct 2008 20:39:09 +0000 (-0700) Subject: #527: expire the cached files that are used to support Range: headers, every hour... X-Git-Url: https://git.rkrishnan.org/pf/content/en/seg/biz?a=commitdiff_plain;h=ba019bfd3ad6fa74d674d843584925834dcc2f3e;p=tahoe-lafs%2Ftahoe-lafs.git #527: expire the cached files that are used to support Range: headers, every hour, when the file is unused and older than an hour --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 22930ab6..886a046b 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -18,7 +18,7 @@ from allmydata.immutable.filenode import FileNode, LiteralFileNode from allmydata.offloaded import Helper from allmydata.control import ControlServer from allmydata.introducer.client import IntroducerClient -from allmydata.util import hashutil, base32, pollmixin, fileutil +from allmydata.util import hashutil, base32, pollmixin, cachedir from allmydata.uri import LiteralFileURI from allmydata.dirnode import NewDirectoryNode from allmydata.mutable.node import MutableFileNode, MutableWatcher @@ -188,9 +188,10 @@ class Client(node.Node, pollmixin.PollMixin): self.convergence = base32.a2b(convergence_s) self._node_cache = weakref.WeakValueDictionary() # uri -> node self.add_service(Uploader(helper_furl, self.stats_provider)) - self.download_cachedir = os.path.join(self.basedir, - "private", "cache", "download") - fileutil.make_dirs(self.download_cachedir) + download_cachedir = os.path.join(self.basedir, + "private", "cache", "download") + self.download_cache = cachedir.CacheDirectoryManager(download_cachedir) + self.download_cache.setServiceParent(self) self.add_service(Downloader(self.stats_provider)) self.add_service(MutableWatcher(self.stats_provider)) def _publish(res): @@ -339,9 +340,8 @@ class Client(node.Node, pollmixin.PollMixin): if isinstance(u, LiteralFileURI): node = LiteralFileNode(u, self) # LIT else: - cachefile = os.path.join(self.download_cachedir, - base32.b2a(u.storage_index)) - # TODO: cachefile manager, weakref, expire policy + key = base32.b2a(u.storage_index) + cachefile = self.download_cache.get_file(key) node = FileNode(u, self, cachefile) # CHK else: assert IMutableFileURI.providedBy(u), u diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py index 083eb845..33395ef9 100644 --- a/src/allmydata/immutable/filenode.py +++ b/src/allmydata/immutable/filenode.py @@ -62,13 +62,15 @@ class PortionOfFile: self.bytes_left -= len(data) return data -class CacheFile: +class DownloadCache: implements(IDownloadTarget) - def __init__(self, node, filename): - self.node = node + def __init__(self, node, cachefile): + self._downloader = node._client.getServiceNamed("downloader") + self._uri = node.get_uri() + self._storage_index = node.get_storage_index() self.milestones = set() # of (offset,size,Deferred) - self.cachefilename = filename + self.cachefile = cachefile self.download_in_progress = False # five states: # new FileNode, no downloads ever performed @@ -88,10 +90,9 @@ class CacheFile: self.download_in_progress = True log.msg(format=("immutable filenode read [%(si)s]: " + "starting download"), - si=base32.b2a(self.node.u.storage_index), + si=base32.b2a(self._storage_index), umid="h26Heg", level=log.OPERATIONAL) - downloader = self.node._client.getServiceNamed("downloader") - d2 = downloader.download(self.node.get_uri(), self) + d2 = self._downloader.download(self._uri, self) d2.addBoth(self._download_done) d2.addErrback(self._download_failed) d2.addErrback(log.err, umid="cQaM9g") @@ -99,7 +100,7 @@ class CacheFile: def read(self, consumer, offset, size): assert offset+size <= self.get_filesize() - f = PortionOfFile(self.cachefilename, offset, size) + f = PortionOfFile(self.cachefile.get_filename(), offset, size) d = basic.FileSender().beginFileTransfer(f, consumer) d.addCallback(lambda lastSent: consumer) return d @@ -124,7 +125,7 @@ class CacheFile: log.msg(format=("immutable filenode read [%(si)s] " + "%(offset)d+%(size)d vs %(filesize)d: " + "done"), - si=base32.b2a(self.node.u.storage_index), + si=base32.b2a(self._storage_index), offset=offset, size=size, filesize=current_size, umid="nuedUg", level=log.NOISY) self.milestones.discard(m) @@ -133,20 +134,20 @@ class CacheFile: log.msg(format=("immutable filenode read [%(si)s] " + "%(offset)d+%(size)d vs %(filesize)d: " + "still waiting"), - si=base32.b2a(self.node.u.storage_index), + si=base32.b2a(self._storage_index), offset=offset, size=size, filesize=current_size, umid="8PKOhg", level=log.NOISY) def get_filesize(self): try: - filesize = os.stat(self.cachefilename)[stat.ST_SIZE] + filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE] except OSError: filesize = 0 return filesize def open(self, size): - self.f = open(self.cachefilename, "wb") + self.f = open(self.cachefile.get_filename(), "wb") def write(self, data): self.f.write(data) @@ -171,7 +172,7 @@ class FileNode(_ImmutableFileNodeBase): def __init__(self, uri, client, cachefile): _ImmutableFileNodeBase.__init__(self, uri, client) - self.cachefile = CacheFile(self, cachefile) + self.download_cache = DownloadCache(self, cachefile) def get_uri(self): return self.u.to_string() @@ -230,8 +231,9 @@ class FileNode(_ImmutableFileNodeBase): umid="VRSBwg", level=log.OPERATIONAL) return self.download(download.ConsumerAdapter(consumer)) - d = self.cachefile.when_range_available(offset, size) - d.addCallback(lambda res: self.cachefile.read(consumer, offset, size)) + d = self.download_cache.when_range_available(offset, size) + d.addCallback(lambda res: + self.download_cache.read(consumer, offset, size)) return d def download(self, target): diff --git a/src/allmydata/test/test_filenode.py b/src/allmydata/test/test_filenode.py index 87a21e89..ecf74089 100644 --- a/src/allmydata/test/test_filenode.py +++ b/src/allmydata/test/test_filenode.py @@ -4,12 +4,17 @@ from allmydata import uri from allmydata.monitor import Monitor from allmydata.immutable import filenode, download from allmydata.mutable.node import MutableFileNode -from allmydata.util import hashutil +from allmydata.util import hashutil, cachedir from allmydata.test.common import download_to_data class NotANode: pass +class FakeClient: + # just enough to let the node acquire a downloader (which it won't use) + def getServiceNamed(self, name): + return None + class Node(unittest.TestCase): def test_chk_filenode(self): u = uri.CHKFileURI(key="\x00"*16, @@ -17,9 +22,10 @@ class Node(unittest.TestCase): needed_shares=3, total_shares=10, size=1000) - c = None - fn1 = filenode.FileNode(u, c, "cachefile") - fn2 = filenode.FileNode(u.to_string(), c, "cachefile") + c = FakeClient() + cf = cachedir.CacheFile("none") + fn1 = filenode.FileNode(u, c, cf) + fn2 = filenode.FileNode(u.to_string(), c, cf) self.failUnlessEqual(fn1, fn2) self.failIfEqual(fn1, "I am not a filenode") self.failIfEqual(fn1, NotANode())