From: Brian Warner Date: Mon, 4 Dec 2006 02:07:41 +0000 (-0700) Subject: initial file-table support, VirtualDrive service, rearrange Storage somewhat X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~478 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/simplejson/running.html?a=commitdiff_plain;h=dd212c09e1709c1b64ddf6223db644847a40287d;p=tahoe-lafs%2Ftahoe-lafs.git initial file-table support, VirtualDrive service, rearrange Storage somewhat --- diff --git a/Makefile b/Makefile index 1ac53e41..36774af8 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,8 @@ run-client: run-client2: cd client-basedir2 && PYTHONPATH=.. twistd -noy ../client.tac +run-client3: + cd client-basedir3 && PYTHONPATH=.. twistd -noy ../client.tac test: trial allmydata diff --git a/allmydata/bucketstore.py b/allmydata/bucketstore.py index 01b3bdd6..d3963c46 100644 --- a/allmydata/bucketstore.py +++ b/allmydata/bucketstore.py @@ -27,13 +27,14 @@ class BucketStore(service.MultiService, Referenceable): def has_bucket(self, verifierid): return os.path.exists(self._get_bucket_dir(verifierid)) - def allocate_bucket(self, verifierid, bucket_num, size, leaser_credentials): + def allocate_bucket(self, verifierid, bucket_num, size, + leaser_credentials, canary): bucket_dir = self._get_bucket_dir(verifierid) precondition(not os.path.exists(bucket_dir)) precondition(isinstance(bucket_num, int)) bucket = WriteBucket(bucket_dir, verifierid, bucket_num, size) bucket.set_leaser(leaser_credentials) - lease = Lease(verifierid, leaser_credentials, bucket) + lease = Lease(verifierid, leaser_credentials, bucket, canary) self._leases.add(lease) return lease @@ -42,18 +43,18 @@ class BucketStore(service.MultiService, Referenceable): bucket_dir = self._get_bucket_dir(verifierid) if os.path.exists(bucket_dir): b = ReadBucket(bucket_dir, verifierid) - br = BucketReader(b) - return [(b.get_bucket_num(), br)] + return [(b.get_bucket_num(), b)] else: return [] class Lease(Referenceable): implements(RIBucketWriter) - def __init__(self, verifierid, leaser, bucket): + def __init__(self, verifierid, leaser, bucket, canary): self._leaser = leaser self._verifierid = verifierid self._bucket = bucket + canary.notifyOnDisconnect(self._lost_canary) def get_bucket(self): return self._bucket @@ -64,17 +65,8 @@ class Lease(Referenceable): def remote_close(self): self._bucket.close() -class BucketReader(Referenceable): - implements(RIBucketReader) - - def __init__(self, bucket): - self._bucket = bucket - - def remote_get_bucket_num(self): - return self._bucket.get_bucket_num() - - def remote_read(self): - return self._bucket.read() + def _lost_canary(self): + pass class Bucket: def __init__(self, bucket_dir, verifierid): @@ -127,14 +119,17 @@ class WriteBucket(Bucket): _assert(os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size) return complete -class ReadBucket(Bucket): +class ReadBucket(Bucket, Referenceable): + implements(RIBucketReader) + def __init__(self, bucket_dir, verifierid): Bucket.__init__(self, bucket_dir, verifierid) precondition(self.is_complete()) # implicitly asserts bucket_dir exists def get_bucket_num(self): return int(self._read_attr('bucket_num')) + remote_get_bucket_num = get_bucket_num def read(self): return self._read_attr('data') - + remote_read = read diff --git a/allmydata/client.py b/allmydata/client.py index 81473afb..8afc3ac2 100644 --- a/allmydata/client.py +++ b/allmydata/client.py @@ -9,10 +9,11 @@ from allmydata import node from twisted.internet import defer +from allmydata.util import idlib from allmydata.storageserver import StorageServer from allmydata.upload import Uploader from allmydata.download import Downloader -from allmydata.util import idlib +from allmydata.vdrive import VDrive class Client(node.Node, Referenceable): implements(RIClient) @@ -29,6 +30,7 @@ class Client(node.Node, Referenceable): self.add_service(StorageServer(os.path.join(basedir, self.STOREDIR))) self.add_service(Uploader()) self.add_service(Downloader()) + self.add_service(VDrive()) self.queen_pburl = None self.queen_connector = None @@ -63,14 +65,21 @@ class Client(node.Node, Referenceable): self.log("connected to queen") self.queen = queen queen.notifyOnDisconnect(self._lost_queen) - queen.callRemote("hello", - nodeid=self.nodeid, node=self, pburl=self.my_pburl) + d = queen.callRemote("hello", + nodeid=self.nodeid, + node=self, + pburl=self.my_pburl) + d.addCallback(self._got_vdrive_root) + + def _got_vdrive_root(self, root): + self.getServiceNamed("vdrive").set_root(root) def _lost_queen(self): self.log("lost connection to queen") self.queen = None def remote_get_service(self, name): + # TODO: 'vdrive' should not be public in the medium term return self.getServiceNamed(name) def remote_add_peers(self, new_peers): diff --git a/allmydata/filetable.py b/allmydata/filetable.py new file mode 100644 index 00000000..0f099209 --- /dev/null +++ b/allmydata/filetable.py @@ -0,0 +1,98 @@ + +import os, shutil +from zope.interface import implements +from foolscap import Referenceable +from allmydata.interfaces import RIMutableDirectoryNode +from twisted.application import service +from twisted.python import log + +class DeadDirectoryNodeError(Exception): + """The directory referenced by this node has been deleted.""" + +class BadDirectoryError(Exception): + """There was a problem with the directory being referenced.""" +class BadFileError(Exception): + """The file being referenced does not exist.""" + +class MutableDirectoryNode(Referenceable): + implements(RIMutableDirectoryNode) + + def __init__(self, basedir): + self._basedir = basedir + + def validate_name(self, name): + if name == "." or name == ".." or "/" in name: + raise DeadDirectoryNodeError("bad filename component") + + # this is private + def get_child(self, name): + self.validate_name(name) + absname = os.path.join(self._basedir, name) + if os.path.isdir(absname): + return MutableDirectoryNode(absname) + raise DeadDirectoryNodeError("no such directory") + + # these are the public methods, available to anyone who holds a reference + + def list(self): + log.msg("Dir(%s).list" % self._basedir) + results = [] + if not os.path.isdir(self._basedir): + raise DeadDirectoryNodeError("This directory has been deleted") + for name in os.listdir(self._basedir): + absname = os.path.join(self._basedir, name) + if os.path.isdir(absname): + results.append( (name, MutableDirectoryNode(absname)) ) + elif os.path.isfile(absname): + f = open(absname, "rb") + data = f.read() + f.close() + results.append( (name, data) ) + # anything else is ignored + return results + remote_list = list + + def add_directory(self, name): + self.validate_name(name) + absname = os.path.join(self._basedir, name) + if os.path.isdir(absname): + raise BadDirectoryError("the directory '%s' already exists" % name) + if os.path.exists(absname): + raise BadDirectoryError("the directory '%s' already exists " + "(but isn't a directory)" % name) + os.mkdir(absname) + return MutableDirectoryNode(absname) + remote_add_directory = add_directory + + def add_file(self, name, data): + self.validate_name(name) + f = open(os.path.join(self._basedir, name), "wb") + f.write(data) + f.close() + remote_add_file = add_file + + def remove(self, name): + self.validate_name(name) + absname = os.path.join(self._basedir, name) + if os.path.isdir(absname): + shutil.rmtree(absname) + elif os.path.isfile(absname): + os.unlink(absname) + else: + raise BadFileError("Cannot delete non-existent file '%s'" % name) + + +class GlobalVirtualDrive(service.MultiService): + name = "filetable" + VDRIVEDIR = "vdrive" + + def __init__(self, basedir="."): + service.MultiService.__init__(self) + vdrive_dir = os.path.join(basedir, self.VDRIVEDIR) + if not os.path.exists(vdrive_dir): + os.mkdir(vdrive_dir) + self._root = MutableDirectoryNode(vdrive_dir) + + def get_root(self): + return self._root + diff --git a/allmydata/interfaces.py b/allmydata/interfaces.py index 97866b9d..07f36e28 100644 --- a/allmydata/interfaces.py +++ b/allmydata/interfaces.py @@ -12,10 +12,12 @@ RIClient_ = Any() Referenceable_ = Any() RIBucketWriter_ = Any() RIBucketReader_ = Any() +RIMutableDirectoryNode_ = Any() +RIMutableFileNode_ = Any() class RIQueenRoster(RemoteInterface): def hello(nodeid=Nodeid, node=RIClient_, pburl=PBURL): - return Nothing() + return RIMutableDirectoryNode_ # the virtual drive root class RIClient(RemoteInterface): def get_service(name=str): @@ -27,7 +29,8 @@ class RIClient(RemoteInterface): class RIStorageServer(RemoteInterface): def allocate_bucket(verifierid=Verifierid, bucket_num=int, size=int, - leaser=Nodeid): + leaser=Nodeid, canary=Referenceable_): + # if the canary is lost before close(), the bucket is deleted return RIBucketWriter_ def get_buckets(verifierid=Verifierid): return ListOf(TupleOf(int, RIBucketReader_)) @@ -51,3 +54,20 @@ class RIBucketReader(RemoteInterface): return ShareData +class RIMutableDirectoryNode(RemoteInterface): + def list(): + return ListOf( TupleOf(str, # name, relative to directory + (RIMutableDirectoryNode_, Verifierid)), + maxLength=100, + ) + + def add_directory(name=str): + return RIMutableDirectoryNode_ + + def add_file(name=str, data=Verifierid): + return Nothing() + + def remove(name=str): + return Nothing() + + # need more to move directories diff --git a/allmydata/queen.py b/allmydata/queen.py index f8720412..011d57b5 100644 --- a/allmydata/queen.py +++ b/allmydata/queen.py @@ -8,6 +8,7 @@ from allmydata.util import idlib from zope.interface import implements from allmydata.interfaces import RIQueenRoster from allmydata import node +from allmydata.filetable import GlobalVirtualDrive class Roster(service.MultiService, Referenceable): implements(RIQueenRoster) @@ -16,6 +17,10 @@ class Roster(service.MultiService, Referenceable): service.MultiService.__init__(self) self.phonebook = {} self.connections = {} + self.gvd_root = None + + def set_gvd_root(self, root): + self.gvd_root = root def remote_hello(self, nodeid, node, pburl): log.msg("roster: contact from %s" % idlib.b2a(nodeid)) @@ -26,6 +31,7 @@ class Roster(service.MultiService, Referenceable): self.phonebook[nodeid] = pburl self.connections[nodeid] = node node.notifyOnDisconnect(self._lost_node, nodeid) + return self.gvd_root def _educate_the_new_peer(self, nodeid, node, new_peers): log.msg("roster: educating %s (%d)" % (idlib.b2a(nodeid)[:4], len(new_peers))) @@ -56,6 +62,7 @@ class Queen(node.Node): def __init__(self, basedir="."): node.Node.__init__(self, basedir) + self.gvd = self.add_service(GlobalVirtualDrive(basedir)) self.urls = {} def tub_ready(self): @@ -65,5 +72,5 @@ class Queen(node.Node): f = open(os.path.join(self.basedir, "roster_pburl"), "w") f.write(self.urls["roster"] + "\n") f.close() - + r.set_gvd_root(self.gvd.get_root()) diff --git a/allmydata/storageserver.py b/allmydata/storageserver.py index 2d2ef79e..868dc99a 100644 --- a/allmydata/storageserver.py +++ b/allmydata/storageserver.py @@ -22,11 +22,12 @@ class StorageServer(service.MultiService, Referenceable): self._bucketstore = BucketStore(store_dir) self._bucketstore.setServiceParent(self) - def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser): + def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser, + canary): if self._bucketstore.has_bucket(verifierid): raise BucketAlreadyExistsError() lease = self._bucketstore.allocate_bucket(verifierid, bucket_num, size, - idlib.b2a(leaser)) + idlib.b2a(leaser), canary) return lease def remote_get_buckets(self, verifierid): diff --git a/allmydata/test/test_filetable.py b/allmydata/test/test_filetable.py new file mode 100644 index 00000000..060d0cc7 --- /dev/null +++ b/allmydata/test/test_filetable.py @@ -0,0 +1,51 @@ + +import os +from twisted.trial import unittest +from allmydata.filetable import MutableDirectoryNode, \ + DeadDirectoryNodeError, BadDirectoryError, BadFileError + + +class FileTable(unittest.TestCase): + def test_files(self): + os.mkdir("filetable") + root = MutableDirectoryNode(os.path.abspath("filetable")) + self.failUnlessEqual(root.list(), []) + root.add_file("one", "vid-one") + root.add_file("two", "vid-two") + self.failUnlessEqual(root.list(), [("one", "vid-one"), + ("two", "vid-two")]) + root.remove("two") + self.failUnlessEqual(root.list(), [("one", "vid-one")]) + self.failUnlessRaises(BadFileError, root.remove, "two") + self.failUnlessRaises(BadFileError, root.remove, "three") + + # now play with directories + subdir1 = root.add_directory("subdir1") + self.failUnless(isinstance(subdir1, MutableDirectoryNode)) + entries = root.list() + self.failUnlessEqual(len(entries), 2) + one_index = entries.index( ("one", "vid-one") ) + subdir_index = 1 - one_index + self.failUnlessEqual(entries[subdir_index][0], "subdir1") + subdir2 = entries[subdir_index][1] + self.failUnless(isinstance(subdir2, MutableDirectoryNode)) + + self.failUnlessEqual(subdir1.list(), []) + self.failUnlessEqual(subdir2.list(), []) + + subdir1.add_file("subone", "vid-subone") + self.failUnlessEqual(subdir1.list(), [("subone", "vid-subone")]) + self.failUnlessEqual(subdir2.list(), [("subone", "vid-subone")]) + + self.failUnlessEqual(len(root.list()), 2) + + self.failUnlessRaises(BadDirectoryError, root.add_directory, "subdir1") + self.failUnlessRaises(BadDirectoryError, root.add_directory, "one") + + root.remove("subdir1") + self.failUnlessEqual(root.list(), [("one", "vid-one")]) + + # should our (orphaned) subdir1/subdir2 node still be able to do + # anything? + self.failUnlessRaises(DeadDirectoryNodeError, subdir1.list) + diff --git a/allmydata/test/test_storage.py b/allmydata/test/test_storage.py index 69485080..d1444f98 100644 --- a/allmydata/test/test_storage.py +++ b/allmydata/test/test_storage.py @@ -5,11 +5,14 @@ import random from twisted.trial import unittest from twisted.application import service from twisted.internet import defer -from foolscap import Tub +from foolscap import Tub, Referenceable from foolscap.eventual import flushEventualQueue from allmydata import client +class Canary(Referenceable): + pass + class StorageTest(unittest.TestCase): def setUp(self): @@ -39,6 +42,7 @@ class StorageTest(unittest.TestCase): bucket_num=bnum, size=len(data), leaser=self.node.nodeid, + canary=Canary(), ) rssd.addCallback(create_bucket) @@ -99,6 +103,7 @@ class StorageTest(unittest.TestCase): bucket_num=bnum, size=len(data), leaser=self.node.nodeid, + canary=Canary(), ) rssd.addCallback(create_bucket) diff --git a/allmydata/upload.py b/allmydata/upload.py index 6002cff2..5b8588ab 100644 --- a/allmydata/upload.py +++ b/allmydata/upload.py @@ -1,7 +1,9 @@ +from zope.interface import Interface, implements from twisted.python import failure, log from twisted.internet import defer from twisted.application import service +from foolscap import Referenceable from allmydata.util import idlib from allmydata import encode @@ -33,6 +35,7 @@ class FileUploader: filehandle.seek(0) def make_encoder(self): + self._needed_shares = 4 self._shares = 4 self._encoder = encode.Encoder(self._filehandle, self._shares) self._share_size = self._size @@ -53,6 +56,7 @@ class FileUploader: max_peers = None self.permuted = self._peer.permute_peerids(self._verifierid, max_peers) + self._total_peers = len(self.permuted) for p in self.permuted: assert isinstance(p, str) # we will shrink self.permuted as we give up on peers @@ -68,7 +72,12 @@ class FileUploader: def _check_next_peer(self): if len(self.permuted) == 0: # there are no more to check - raise NotEnoughPeersError + raise NotEnoughPeersError("%s goodness, want %s, have %d " + "landlords, %d total peers" % + (self.goodness_points, + self.target_goodness, + len(self.landlords), + self._total_peers)) if self.peer_index >= len(self.permuted): self.peer_index = 0 @@ -82,7 +91,8 @@ class FileUploader: verifierid=self._verifierid, bucket_num=bucket_num, size=self._share_size, - leaser=self._peer.nodeid) + leaser=self._peer.nodeid, + canary=Referenceable()) def _allocate_response(bucket): if self.debug: print " peerid %s will grant us a lease" % idlib.b2a(peerid) @@ -127,6 +137,40 @@ class FileUploader: def netstring(s): return "%d:%s," % (len(s), s) +class IUploadable(Interface): + def get_filehandle(): + pass + def close_filehandle(f): + pass + +class FileName: + implements(IUploadable) + def __init__(self, filename): + self._filename = filename + def get_filehandle(self): + return open(self._filename, "rb") + def close_filehandle(self, f): + f.close() + +class Data: + implements(IUploadable) + def __init__(self, data): + self._data = data + def get_filehandle(self): + return StringIO(self._data) + def close_filehandle(self, f): + pass + +class FileHandle: + implements(IUploadable) + def __init__(self, filehandle): + self._filehandle = filehandle + def get_filehandle(self): + return self._filehandle + def close_filehandle(self, f): + # the originator of the filehandle reserves the right to close it + pass + class Uploader(service.MultiService): """I am a service that allows file uploading. """ @@ -140,26 +184,26 @@ class Uploader(service.MultiService): # note: this is only of the plaintext data, no encryption yet return hasher.digest() - def upload_filename(self, filename): - f = open(filename, "rb") - def _done(res): - f.close() - return res - d = self.upload_filehandle(f) - d.addBoth(_done) - return d - - def upload_data(self, data): - f = StringIO(data) - return self.upload_filehandle(f) - - def upload_filehandle(self, f): + def upload(self, f): assert self.parent assert self.running + f = IUploadable(f) + fh = f.get_filehandle() u = FileUploader(self.parent) - u.set_filehandle(f) - u.set_verifierid(self._compute_verifierid(f)) + u.set_filehandle(fh) + u.set_verifierid(self._compute_verifierid(fh)) u.make_encoder() d = u.start() + def _done(res): + f.close_filehandle(fh) + return res + d.addBoth(_done) return d + # utility functions + def upload_data(self, data): + return self.upload(Data(data)) + def upload_filename(self, filename): + return self.upload(FileName(filename)) + def upload_filehandle(self, filehandle): + return self.upload(FileHandle(filehandle)) diff --git a/allmydata/vdrive.py b/allmydata/vdrive.py new file mode 100644 index 00000000..0d1df3f0 --- /dev/null +++ b/allmydata/vdrive.py @@ -0,0 +1,88 @@ + +"""This is the client-side facility to manipulate virtual drives.""" + +from twisted.application import service +from twisted.internet import defer +from allmydata.upload import Data, FileHandle, FileName + +class VDrive(service.MultiService): + name = "vdrive" + + def set_root(self, root): + self.gvd_root = root + + def dirpath(self, dir_or_path): + if isinstance(dir_or_path, str): + return self.get_dir(dir_or_path) + return defer.succeed(dir_or_path) + + def get_dir(self, path): + """Return a Deferred that fires with a RemoteReference to a + MutableDirectoryNode at the given /-delimited path.""" + d = defer.succeed(self.gvd_root) + if path.startswith("/"): + path = path[1:] + if path == "": + return d + for piece in path.split("/"): + d.addCallback(lambda parent: parent.callRemote("list")) + def _find(table, subdir): + for name,target in table: + if name == subdir: + return subdir + else: + raise KeyError("no such directory '%s' in '%s'" % + (subdir, [t[0] for t in table])) + d.addCallback(_find, piece) + def _check(subdir): + assert not isinstance(subdir, str) + return subdir + d.addCallback(_check) + return d + + def get_root(self): + return self.gvd_root + + def listdir(self, dir_or_path): + d = self.dirpath(dir_or_path) + d.addCallback(lambda parent: parent.callRemote("list")) + def _list(table): + return [t[0] for t in table] + d.addCallback(_list) + return d + + def put_file(self, dir_or_path, name, uploadable): + """Upload an IUploadable and add it to the virtual drive (as an entry + called 'name', in 'dir_or_path') 'dir_or_path' must either be a + string like 'root/subdir1/subdir2', or a directory node (either the + root directory node returned by get_root(), or a subdirectory + returned by list() ). + + The uploadable can be an instance of allmydata.upload.Data, + FileHandle, or FileName. + + I return a deferred that will fire when the operation is complete. + """ + + u = self.parent.getServiceNamed("uploader") + d = self.dirpath(dir_or_path) + def _got_dir(dirnode): + d1 = u.upload(uploadable) + d1.addCallback(lambda vid: + dirnode.callRemote("add_file", name, vid)) + return d1 + d.addCallback(_got_dir) + return d + + def put_file_by_filename(self, dir_or_path, name, filename): + return self.put_file(dir_or_path, name, FileName(filename)) + def put_file_by_data(self, dir_or_path, name, data): + return self.put_file(dir_or_path, name, Data(data)) + def put_file_by_filehandle(self, dir_or_path, name, filehandle): + return self.put_file(dir_or_path, name, FileHandle(filehandle)) + + def make_directory(self, dir_or_path, name): + d = self.dirpath(dir_or_path) + d.addCallback(lambda parent: parent.callRemote("add_directory", name)) + return d +