from allmydata.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
-from allmydata.util import hashutil, base32, pollmixin, fileutil
+from allmydata.util import hashutil, base32, pollmixin, cachedir
from allmydata.uri import LiteralFileURI
from allmydata.dirnode import NewDirectoryNode
from allmydata.mutable.node import MutableFileNode, MutableWatcher
self.convergence = base32.a2b(convergence_s)
self._node_cache = weakref.WeakValueDictionary() # uri -> node
self.add_service(Uploader(helper_furl, self.stats_provider))
- self.download_cachedir = os.path.join(self.basedir,
- "private", "cache", "download")
- fileutil.make_dirs(self.download_cachedir)
+ download_cachedir = os.path.join(self.basedir,
+ "private", "cache", "download")
+ self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
+ self.download_cache.setServiceParent(self)
self.add_service(Downloader(self.stats_provider))
self.add_service(MutableWatcher(self.stats_provider))
def _publish(res):
if isinstance(u, LiteralFileURI):
node = LiteralFileNode(u, self) # LIT
else:
- cachefile = os.path.join(self.download_cachedir,
- base32.b2a(u.storage_index))
- # TODO: cachefile manager, weakref, expire policy
+ key = base32.b2a(u.storage_index)
+ cachefile = self.download_cache.get_file(key)
node = FileNode(u, self, cachefile) # CHK
else:
assert IMutableFileURI.providedBy(u), u
self.bytes_left -= len(data)
return data
-class CacheFile:
+class DownloadCache:
implements(IDownloadTarget)
- def __init__(self, node, filename):
- self.node = node
+ def __init__(self, node, cachefile):
+ self._downloader = node._client.getServiceNamed("downloader")
+ self._uri = node.get_uri()
+ self._storage_index = node.get_storage_index()
self.milestones = set() # of (offset,size,Deferred)
- self.cachefilename = filename
+ self.cachefile = cachefile
self.download_in_progress = False
# five states:
# new FileNode, no downloads ever performed
self.download_in_progress = True
log.msg(format=("immutable filenode read [%(si)s]: " +
"starting download"),
- si=base32.b2a(self.node.u.storage_index),
+ si=base32.b2a(self._storage_index),
umid="h26Heg", level=log.OPERATIONAL)
- downloader = self.node._client.getServiceNamed("downloader")
- d2 = downloader.download(self.node.get_uri(), self)
+ d2 = self._downloader.download(self._uri, self)
d2.addBoth(self._download_done)
d2.addErrback(self._download_failed)
d2.addErrback(log.err, umid="cQaM9g")
def read(self, consumer, offset, size):
assert offset+size <= self.get_filesize()
- f = PortionOfFile(self.cachefilename, offset, size)
+ f = PortionOfFile(self.cachefile.get_filename(), offset, size)
d = basic.FileSender().beginFileTransfer(f, consumer)
d.addCallback(lambda lastSent: consumer)
return d
log.msg(format=("immutable filenode read [%(si)s] " +
"%(offset)d+%(size)d vs %(filesize)d: " +
"done"),
- si=base32.b2a(self.node.u.storage_index),
+ si=base32.b2a(self._storage_index),
offset=offset, size=size, filesize=current_size,
umid="nuedUg", level=log.NOISY)
self.milestones.discard(m)
log.msg(format=("immutable filenode read [%(si)s] " +
"%(offset)d+%(size)d vs %(filesize)d: " +
"still waiting"),
- si=base32.b2a(self.node.u.storage_index),
+ si=base32.b2a(self._storage_index),
offset=offset, size=size, filesize=current_size,
umid="8PKOhg", level=log.NOISY)
def get_filesize(self):
try:
- filesize = os.stat(self.cachefilename)[stat.ST_SIZE]
+ filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
except OSError:
filesize = 0
return filesize
def open(self, size):
- self.f = open(self.cachefilename, "wb")
+ self.f = open(self.cachefile.get_filename(), "wb")
def write(self, data):
self.f.write(data)
def __init__(self, uri, client, cachefile):
_ImmutableFileNodeBase.__init__(self, uri, client)
- self.cachefile = CacheFile(self, cachefile)
+ self.download_cache = DownloadCache(self, cachefile)
def get_uri(self):
return self.u.to_string()
umid="VRSBwg", level=log.OPERATIONAL)
return self.download(download.ConsumerAdapter(consumer))
- d = self.cachefile.when_range_available(offset, size)
- d.addCallback(lambda res: self.cachefile.read(consumer, offset, size))
+ d = self.download_cache.when_range_available(offset, size)
+ d.addCallback(lambda res:
+ self.download_cache.read(consumer, offset, size))
return d
def download(self, target):
from allmydata.monitor import Monitor
from allmydata.immutable import filenode, download
from allmydata.mutable.node import MutableFileNode
-from allmydata.util import hashutil
+from allmydata.util import hashutil, cachedir
from allmydata.test.common import download_to_data
class NotANode:
pass
+class FakeClient:
+ # just enough to let the node acquire a downloader (which it won't use)
+ def getServiceNamed(self, name):
+ return None
+
class Node(unittest.TestCase):
def test_chk_filenode(self):
u = uri.CHKFileURI(key="\x00"*16,
needed_shares=3,
total_shares=10,
size=1000)
- c = None
- fn1 = filenode.FileNode(u, c, "cachefile")
- fn2 = filenode.FileNode(u.to_string(), c, "cachefile")
+ c = FakeClient()
+ cf = cachedir.CacheFile("none")
+ fn1 = filenode.FileNode(u, c, cf)
+ fn2 = filenode.FileNode(u.to_string(), c, cf)
self.failUnlessEqual(fn1, fn2)
self.failIfEqual(fn1, "I am not a filenode")
self.failIfEqual(fn1, NotANode())