run-client2:
cd client-basedir2 && PYTHONPATH=.. twistd -noy ../client.tac
+run-client3:
+ cd client-basedir3 && PYTHONPATH=.. twistd -noy ../client.tac
test:
trial allmydata
def has_bucket(self, verifierid):
return os.path.exists(self._get_bucket_dir(verifierid))
- def allocate_bucket(self, verifierid, bucket_num, size, leaser_credentials):
+ def allocate_bucket(self, verifierid, bucket_num, size,
+ leaser_credentials, canary):
bucket_dir = self._get_bucket_dir(verifierid)
precondition(not os.path.exists(bucket_dir))
precondition(isinstance(bucket_num, int))
bucket = WriteBucket(bucket_dir, verifierid, bucket_num, size)
bucket.set_leaser(leaser_credentials)
- lease = Lease(verifierid, leaser_credentials, bucket)
+ lease = Lease(verifierid, leaser_credentials, bucket, canary)
self._leases.add(lease)
return lease
bucket_dir = self._get_bucket_dir(verifierid)
if os.path.exists(bucket_dir):
b = ReadBucket(bucket_dir, verifierid)
- br = BucketReader(b)
- return [(b.get_bucket_num(), br)]
+ return [(b.get_bucket_num(), b)]
else:
return []
class Lease(Referenceable):
implements(RIBucketWriter)
- def __init__(self, verifierid, leaser, bucket):
+ def __init__(self, verifierid, leaser, bucket, canary):
self._leaser = leaser
self._verifierid = verifierid
self._bucket = bucket
+ canary.notifyOnDisconnect(self._lost_canary)
def get_bucket(self):
return self._bucket
def remote_close(self):
self._bucket.close()
-class BucketReader(Referenceable):
- implements(RIBucketReader)
-
- def __init__(self, bucket):
- self._bucket = bucket
-
- def remote_get_bucket_num(self):
- return self._bucket.get_bucket_num()
-
- def remote_read(self):
- return self._bucket.read()
+ def _lost_canary(self):
+ pass
class Bucket:
def __init__(self, bucket_dir, verifierid):
_assert(os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size)
return complete
-class ReadBucket(Bucket):
+class ReadBucket(Bucket, Referenceable):
+ implements(RIBucketReader)
+
def __init__(self, bucket_dir, verifierid):
Bucket.__init__(self, bucket_dir, verifierid)
precondition(self.is_complete()) # implicitly asserts bucket_dir exists
def get_bucket_num(self):
return int(self._read_attr('bucket_num'))
+ remote_get_bucket_num = get_bucket_num
def read(self):
return self._read_attr('data')
-
+ remote_read = read
from twisted.internet import defer
+from allmydata.util import idlib
from allmydata.storageserver import StorageServer
from allmydata.upload import Uploader
from allmydata.download import Downloader
-from allmydata.util import idlib
+from allmydata.vdrive import VDrive
class Client(node.Node, Referenceable):
implements(RIClient)
self.add_service(StorageServer(os.path.join(basedir, self.STOREDIR)))
self.add_service(Uploader())
self.add_service(Downloader())
+ self.add_service(VDrive())
self.queen_pburl = None
self.queen_connector = None
self.log("connected to queen")
self.queen = queen
queen.notifyOnDisconnect(self._lost_queen)
- queen.callRemote("hello",
- nodeid=self.nodeid, node=self, pburl=self.my_pburl)
+ d = queen.callRemote("hello",
+ nodeid=self.nodeid,
+ node=self,
+ pburl=self.my_pburl)
+ d.addCallback(self._got_vdrive_root)
+
+ def _got_vdrive_root(self, root):
+ self.getServiceNamed("vdrive").set_root(root)
def _lost_queen(self):
self.log("lost connection to queen")
self.queen = None
def remote_get_service(self, name):
+ # TODO: 'vdrive' should not be public in the medium term
return self.getServiceNamed(name)
def remote_add_peers(self, new_peers):
--- /dev/null
+
+import os, shutil
+from zope.interface import implements
+from foolscap import Referenceable
+from allmydata.interfaces import RIMutableDirectoryNode
+from twisted.application import service
+from twisted.python import log
+
+class DeadDirectoryNodeError(Exception):
+ """The directory referenced by this node has been deleted."""
+
+class BadDirectoryError(Exception):
+ """There was a problem with the directory being referenced."""
+class BadFileError(Exception):
+ """The file being referenced does not exist."""
+
+class MutableDirectoryNode(Referenceable):
+ implements(RIMutableDirectoryNode)
+
+ def __init__(self, basedir):
+ self._basedir = basedir
+
+ def validate_name(self, name):
+ if name == "." or name == ".." or "/" in name:
+ raise DeadDirectoryNodeError("bad filename component")
+
+ # this is private
+ def get_child(self, name):
+ self.validate_name(name)
+ absname = os.path.join(self._basedir, name)
+ if os.path.isdir(absname):
+ return MutableDirectoryNode(absname)
+ raise DeadDirectoryNodeError("no such directory")
+
+ # these are the public methods, available to anyone who holds a reference
+
+ def list(self):
+ log.msg("Dir(%s).list" % self._basedir)
+ results = []
+ if not os.path.isdir(self._basedir):
+ raise DeadDirectoryNodeError("This directory has been deleted")
+ for name in os.listdir(self._basedir):
+ absname = os.path.join(self._basedir, name)
+ if os.path.isdir(absname):
+ results.append( (name, MutableDirectoryNode(absname)) )
+ elif os.path.isfile(absname):
+ f = open(absname, "rb")
+ data = f.read()
+ f.close()
+ results.append( (name, data) )
+ # anything else is ignored
+ return results
+ remote_list = list
+
+ def add_directory(self, name):
+ self.validate_name(name)
+ absname = os.path.join(self._basedir, name)
+ if os.path.isdir(absname):
+ raise BadDirectoryError("the directory '%s' already exists" % name)
+ if os.path.exists(absname):
+ raise BadDirectoryError("the directory '%s' already exists "
+ "(but isn't a directory)" % name)
+ os.mkdir(absname)
+ return MutableDirectoryNode(absname)
+ remote_add_directory = add_directory
+
+ def add_file(self, name, data):
+ self.validate_name(name)
+ f = open(os.path.join(self._basedir, name), "wb")
+ f.write(data)
+ f.close()
+ remote_add_file = add_file
+
+ def remove(self, name):
+ self.validate_name(name)
+ absname = os.path.join(self._basedir, name)
+ if os.path.isdir(absname):
+ shutil.rmtree(absname)
+ elif os.path.isfile(absname):
+ os.unlink(absname)
+ else:
+ raise BadFileError("Cannot delete non-existent file '%s'" % name)
+
+
+class GlobalVirtualDrive(service.MultiService):
+ name = "filetable"
+ VDRIVEDIR = "vdrive"
+
+ def __init__(self, basedir="."):
+ service.MultiService.__init__(self)
+ vdrive_dir = os.path.join(basedir, self.VDRIVEDIR)
+ if not os.path.exists(vdrive_dir):
+ os.mkdir(vdrive_dir)
+ self._root = MutableDirectoryNode(vdrive_dir)
+
+ def get_root(self):
+ return self._root
+
Referenceable_ = Any()
RIBucketWriter_ = Any()
RIBucketReader_ = Any()
+RIMutableDirectoryNode_ = Any()
+RIMutableFileNode_ = Any()
class RIQueenRoster(RemoteInterface):
def hello(nodeid=Nodeid, node=RIClient_, pburl=PBURL):
- return Nothing()
+ return RIMutableDirectoryNode_ # the virtual drive root
class RIClient(RemoteInterface):
def get_service(name=str):
class RIStorageServer(RemoteInterface):
def allocate_bucket(verifierid=Verifierid, bucket_num=int, size=int,
- leaser=Nodeid):
+ leaser=Nodeid, canary=Referenceable_):
+ # if the canary is lost before close(), the bucket is deleted
return RIBucketWriter_
def get_buckets(verifierid=Verifierid):
return ListOf(TupleOf(int, RIBucketReader_))
return ShareData
+class RIMutableDirectoryNode(RemoteInterface):
+ def list():
+ return ListOf( TupleOf(str, # name, relative to directory
+ (RIMutableDirectoryNode_, Verifierid)),
+ maxLength=100,
+ )
+
+ def add_directory(name=str):
+ return RIMutableDirectoryNode_
+
+ def add_file(name=str, data=Verifierid):
+ return Nothing()
+
+ def remove(name=str):
+ return Nothing()
+
+ # need more to move directories
from zope.interface import implements
from allmydata.interfaces import RIQueenRoster
from allmydata import node
+from allmydata.filetable import GlobalVirtualDrive
class Roster(service.MultiService, Referenceable):
implements(RIQueenRoster)
service.MultiService.__init__(self)
self.phonebook = {}
self.connections = {}
+ self.gvd_root = None
+
+ def set_gvd_root(self, root):
+ self.gvd_root = root
def remote_hello(self, nodeid, node, pburl):
log.msg("roster: contact from %s" % idlib.b2a(nodeid))
self.phonebook[nodeid] = pburl
self.connections[nodeid] = node
node.notifyOnDisconnect(self._lost_node, nodeid)
+ return self.gvd_root
def _educate_the_new_peer(self, nodeid, node, new_peers):
log.msg("roster: educating %s (%d)" % (idlib.b2a(nodeid)[:4], len(new_peers)))
def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
+ self.gvd = self.add_service(GlobalVirtualDrive(basedir))
self.urls = {}
def tub_ready(self):
f = open(os.path.join(self.basedir, "roster_pburl"), "w")
f.write(self.urls["roster"] + "\n")
f.close()
-
+ r.set_gvd_root(self.gvd.get_root())
self._bucketstore = BucketStore(store_dir)
self._bucketstore.setServiceParent(self)
- def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser):
+ def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser,
+ canary):
if self._bucketstore.has_bucket(verifierid):
raise BucketAlreadyExistsError()
lease = self._bucketstore.allocate_bucket(verifierid, bucket_num, size,
- idlib.b2a(leaser))
+ idlib.b2a(leaser), canary)
return lease
def remote_get_buckets(self, verifierid):
--- /dev/null
+
+import os
+from twisted.trial import unittest
+from allmydata.filetable import MutableDirectoryNode, \
+ DeadDirectoryNodeError, BadDirectoryError, BadFileError
+
+
+class FileTable(unittest.TestCase):
+ def test_files(self):
+ os.mkdir("filetable")
+ root = MutableDirectoryNode(os.path.abspath("filetable"))
+ self.failUnlessEqual(root.list(), [])
+ root.add_file("one", "vid-one")
+ root.add_file("two", "vid-two")
+ self.failUnlessEqual(root.list(), [("one", "vid-one"),
+ ("two", "vid-two")])
+ root.remove("two")
+ self.failUnlessEqual(root.list(), [("one", "vid-one")])
+ self.failUnlessRaises(BadFileError, root.remove, "two")
+ self.failUnlessRaises(BadFileError, root.remove, "three")
+
+ # now play with directories
+ subdir1 = root.add_directory("subdir1")
+ self.failUnless(isinstance(subdir1, MutableDirectoryNode))
+ entries = root.list()
+ self.failUnlessEqual(len(entries), 2)
+ one_index = entries.index( ("one", "vid-one") )
+ subdir_index = 1 - one_index
+ self.failUnlessEqual(entries[subdir_index][0], "subdir1")
+ subdir2 = entries[subdir_index][1]
+ self.failUnless(isinstance(subdir2, MutableDirectoryNode))
+
+ self.failUnlessEqual(subdir1.list(), [])
+ self.failUnlessEqual(subdir2.list(), [])
+
+ subdir1.add_file("subone", "vid-subone")
+ self.failUnlessEqual(subdir1.list(), [("subone", "vid-subone")])
+ self.failUnlessEqual(subdir2.list(), [("subone", "vid-subone")])
+
+ self.failUnlessEqual(len(root.list()), 2)
+
+ self.failUnlessRaises(BadDirectoryError, root.add_directory, "subdir1")
+ self.failUnlessRaises(BadDirectoryError, root.add_directory, "one")
+
+ root.remove("subdir1")
+ self.failUnlessEqual(root.list(), [("one", "vid-one")])
+
+ # should our (orphaned) subdir1/subdir2 node still be able to do
+ # anything?
+ self.failUnlessRaises(DeadDirectoryNodeError, subdir1.list)
+
from twisted.trial import unittest
from twisted.application import service
from twisted.internet import defer
-from foolscap import Tub
+from foolscap import Tub, Referenceable
from foolscap.eventual import flushEventualQueue
from allmydata import client
+class Canary(Referenceable):
+ pass
+
class StorageTest(unittest.TestCase):
def setUp(self):
bucket_num=bnum,
size=len(data),
leaser=self.node.nodeid,
+ canary=Canary(),
)
rssd.addCallback(create_bucket)
bucket_num=bnum,
size=len(data),
leaser=self.node.nodeid,
+ canary=Canary(),
)
rssd.addCallback(create_bucket)
+from zope.interface import Interface, implements
from twisted.python import failure, log
from twisted.internet import defer
from twisted.application import service
+from foolscap import Referenceable
from allmydata.util import idlib
from allmydata import encode
filehandle.seek(0)
def make_encoder(self):
+ self._needed_shares = 4
self._shares = 4
self._encoder = encode.Encoder(self._filehandle, self._shares)
self._share_size = self._size
max_peers = None
self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
+ self._total_peers = len(self.permuted)
for p in self.permuted:
assert isinstance(p, str)
# we will shrink self.permuted as we give up on peers
def _check_next_peer(self):
if len(self.permuted) == 0:
# there are no more to check
- raise NotEnoughPeersError
+ raise NotEnoughPeersError("%s goodness, want %s, have %d "
+ "landlords, %d total peers" %
+ (self.goodness_points,
+ self.target_goodness,
+ len(self.landlords),
+ self._total_peers))
if self.peer_index >= len(self.permuted):
self.peer_index = 0
verifierid=self._verifierid,
bucket_num=bucket_num,
size=self._share_size,
- leaser=self._peer.nodeid)
+ leaser=self._peer.nodeid,
+ canary=Referenceable())
def _allocate_response(bucket):
if self.debug:
print " peerid %s will grant us a lease" % idlib.b2a(peerid)
def netstring(s):
return "%d:%s," % (len(s), s)
+class IUploadable(Interface):
+ def get_filehandle():
+ pass
+ def close_filehandle(f):
+ pass
+
+class FileName:
+ implements(IUploadable)
+ def __init__(self, filename):
+ self._filename = filename
+ def get_filehandle(self):
+ return open(self._filename, "rb")
+ def close_filehandle(self, f):
+ f.close()
+
+class Data:
+ implements(IUploadable)
+ def __init__(self, data):
+ self._data = data
+ def get_filehandle(self):
+ return StringIO(self._data)
+ def close_filehandle(self, f):
+ pass
+
+class FileHandle:
+ implements(IUploadable)
+ def __init__(self, filehandle):
+ self._filehandle = filehandle
+ def get_filehandle(self):
+ return self._filehandle
+ def close_filehandle(self, f):
+ # the originator of the filehandle reserves the right to close it
+ pass
+
class Uploader(service.MultiService):
"""I am a service that allows file uploading.
"""
# note: this is only of the plaintext data, no encryption yet
return hasher.digest()
- def upload_filename(self, filename):
- f = open(filename, "rb")
- def _done(res):
- f.close()
- return res
- d = self.upload_filehandle(f)
- d.addBoth(_done)
- return d
-
- def upload_data(self, data):
- f = StringIO(data)
- return self.upload_filehandle(f)
-
- def upload_filehandle(self, f):
+ def upload(self, f):
assert self.parent
assert self.running
+ f = IUploadable(f)
+ fh = f.get_filehandle()
u = FileUploader(self.parent)
- u.set_filehandle(f)
- u.set_verifierid(self._compute_verifierid(f))
+ u.set_filehandle(fh)
+ u.set_verifierid(self._compute_verifierid(fh))
u.make_encoder()
d = u.start()
+ def _done(res):
+ f.close_filehandle(fh)
+ return res
+ d.addBoth(_done)
return d
+ # utility functions
+ def upload_data(self, data):
+ return self.upload(Data(data))
+ def upload_filename(self, filename):
+ return self.upload(FileName(filename))
+ def upload_filehandle(self, filehandle):
+ return self.upload(FileHandle(filehandle))
--- /dev/null
+
+"""This is the client-side facility to manipulate virtual drives."""
+
+from twisted.application import service
+from twisted.internet import defer
+from allmydata.upload import Data, FileHandle, FileName
+
+class VDrive(service.MultiService):
+ name = "vdrive"
+
+ def set_root(self, root):
+ self.gvd_root = root
+
+ def dirpath(self, dir_or_path):
+ if isinstance(dir_or_path, str):
+ return self.get_dir(dir_or_path)
+ return defer.succeed(dir_or_path)
+
+ def get_dir(self, path):
+ """Return a Deferred that fires with a RemoteReference to a
+ MutableDirectoryNode at the given /-delimited path."""
+ d = defer.succeed(self.gvd_root)
+ if path.startswith("/"):
+ path = path[1:]
+ if path == "":
+ return d
+ for piece in path.split("/"):
+ d.addCallback(lambda parent: parent.callRemote("list"))
+ def _find(table, subdir):
+ for name,target in table:
+ if name == subdir:
+ return subdir
+ else:
+ raise KeyError("no such directory '%s' in '%s'" %
+ (subdir, [t[0] for t in table]))
+ d.addCallback(_find, piece)
+ def _check(subdir):
+ assert not isinstance(subdir, str)
+ return subdir
+ d.addCallback(_check)
+ return d
+
+ def get_root(self):
+ return self.gvd_root
+
+ def listdir(self, dir_or_path):
+ d = self.dirpath(dir_or_path)
+ d.addCallback(lambda parent: parent.callRemote("list"))
+ def _list(table):
+ return [t[0] for t in table]
+ d.addCallback(_list)
+ return d
+
+ def put_file(self, dir_or_path, name, uploadable):
+ """Upload an IUploadable and add it to the virtual drive (as an entry
+ called 'name', in 'dir_or_path') 'dir_or_path' must either be a
+ string like 'root/subdir1/subdir2', or a directory node (either the
+ root directory node returned by get_root(), or a subdirectory
+ returned by list() ).
+
+ The uploadable can be an instance of allmydata.upload.Data,
+ FileHandle, or FileName.
+
+ I return a deferred that will fire when the operation is complete.
+ """
+
+ u = self.parent.getServiceNamed("uploader")
+ d = self.dirpath(dir_or_path)
+ def _got_dir(dirnode):
+ d1 = u.upload(uploadable)
+ d1.addCallback(lambda vid:
+ dirnode.callRemote("add_file", name, vid))
+ return d1
+ d.addCallback(_got_dir)
+ return d
+
+ def put_file_by_filename(self, dir_or_path, name, filename):
+ return self.put_file(dir_or_path, name, FileName(filename))
+ def put_file_by_data(self, dir_or_path, name, data):
+ return self.put_file(dir_or_path, name, Data(data))
+ def put_file_by_filehandle(self, dir_or_path, name, filehandle):
+ return self.put_file(dir_or_path, name, FileHandle(filehandle))
+
+ def make_directory(self, dir_or_path, name):
+ d = self.dirpath(dir_or_path)
+ d.addCallback(lambda parent: parent.callRemote("add_directory", name))
+ return d
+