From: Rob Kinninmont Date: Fri, 1 Dec 2006 03:14:23 +0000 (-0700) Subject: rerecord all the storageserver patches in one go X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~517 X-Git-Url: https://git.rkrishnan.org/%5B//%5D%20/uri/URI:DIR2-RO:%5B%5E?a=commitdiff_plain;h=94e051c1f06bf8b3d95516bae8d8e045132961e3;p=tahoe-lafs%2Ftahoe-lafs.git rerecord all the storageserver patches in one go darcs was dying trying to deal with the conflict resolution patches. this adds a (very rough) bucketstore and storageserver. probably needs lots of work both in api and implementation. --- diff --git a/allmydata/bucketstore.py b/allmydata/bucketstore.py new file mode 100644 index 00000000..fcbc19a8 --- /dev/null +++ b/allmydata/bucketstore.py @@ -0,0 +1,122 @@ +import os + +from foolscap import Referenceable +from twisted.application import service +from twisted.python.failure import Failure +from allmydata.util import idlib + +from amdlib.util.assertutil import precondition + +class NoSuchBucketError(Failure): + pass + +class BucketStore(service.MultiService, Referenceable): + def __init__(self, store_dir): + precondition(os.path.isdir(store_dir)) + service.MultiService.__init__(self) + self._store_dir = store_dir + + self._buckets = {} # v_id -> Bucket() + self._leases = set() # should do weakref dances. + + def _get_bucket_dir(self, verifierid): + avid = idlib.b2a(verifierid) + return os.path.join(self._store_dir, avid) + + def has_bucket(self, verifierid): + return os.path.exists(self._get_bucket_dir(verifierid)) + + def allocate_bucket(self, verifierid, bucket_num, size, leaser_credentials): + bucket_dir = self._get_bucket_dir(verifierid) + precondition(not os.path.exists(bucket_dir)) + precondition(isinstance(bucket_num, int)) + bucket = Bucket(bucket_dir, verifierid, bucket_num, size) + self._buckets[verifierid] = bucket + bucket.set_leaser(leaser_credentials) + lease = Lease(verifierid, leaser_credentials, bucket) + self._leases.add(lease) + return lease + + def get_bucket(self, verifierid): + # for now, only returns those created by this process, in this run + bucket = self._buckets.get(verifierid) + if bucket: + return BucketReader(bucket) + else: + return NoSuchBucketError() + +class Lease(Referenceable): + def __init__(self, verifierid, leaser, bucket): + self._leaser = leaser + self._verifierid = verifierid + self._bucket = bucket + + def get_bucket(self): + return self._bucket + + def remote_write(self, data): + self._bucket.write(data) + + def remote_finalise(self): + self._bucket.finalise() + +class BucketReader(Referenceable): + def __init__(self, bucket): + self._bucket = bucket + + def remote_get_bucket_num(self): + return self._bucket.get_bucket_num() + + def remote_read(self): + return self._bucket.read() + +class Bucket: + def __init__(self, bucket_dir, verifierid, bucket_num, size): + os.mkdir(bucket_dir) + self._bucket_dir = bucket_dir + self._size = size + self._verifierid = verifierid + + self._data = file(os.path.join(self._bucket_dir, 'data'), 'wb') + self._bytes_written = 0 + + self._write_attr('bucket_num', str(bucket_num)) + + def _write_attr(self, name, val): + f = file(os.path.join(self._bucket_dir, 'name'), 'wb') + f.write(val) + f.close() + + def _read_attr(self, name): + f = file(os.path.join(self._bucket_dir, 'name'), 'wb') + data = f.read() + f.close() + return data + + def set_leaser(self, leaser): + f = file(os.path.join(self._bucket_dir, 'leases'), 'wb') + f.write(leaser) + f.close() + + def write(self, data): + precondition(len(data) + self._bytes_written <= self._size) + self._data.write(data) + self._data.flush() + + def finalise(self): + precondition(self._bytes_written == self._size) + self._data.close() + + def is_complete(self): + return os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size + + def get_bucket_num(self): + return int(self._read_attr('bucket_num')) + + def read(self): + precondition(self.is_complete()) + f = file(os.path.join(self._bucket_dir, 'data'), 'rb') + data = f.read() + f.close() + return data + diff --git a/allmydata/client.py b/allmydata/client.py index 909c55c2..85426f68 100644 --- a/allmydata/client.py +++ b/allmydata/client.py @@ -9,12 +9,12 @@ from twisted.internet import reactor from twisted.internet.base import BlockingResolver reactor.installResolver(BlockingResolver()) -class Storage(service.MultiService, Referenceable): - name = "storage" - pass +from allmydata.storageserver import StorageServer class Client(service.MultiService, Referenceable): CERTFILE = "client.pem" + AUTHKEYSFILE = "authorized_keys" + STOREDIR = 'storage' def __init__(self, queen_pburl): service.MultiService.__init__(self) @@ -31,7 +31,7 @@ class Client(service.MultiService, Referenceable): self.queen = None # self.queen is either None or a RemoteReference self.all_peers = set() self.connections = {} - s = Storage() + s = StorageServer(self.STOREDIR) s.setServiceParent(self) AUTHKEYSFILEBASE = "authorized_keys." diff --git a/allmydata/storageserver.py b/allmydata/storageserver.py new file mode 100644 index 00000000..4f1a208b --- /dev/null +++ b/allmydata/storageserver.py @@ -0,0 +1,30 @@ +import os + +from foolscap import Referenceable +from twisted.application import service +from twisted.python.failure import Failure + +from amdlib.util.assertutil import precondition + +from allmydata.bucketstore import BucketStore + +class BucketAlreadyExistsError(Exception): + pass + +class StorageServer(service.MultiService, Referenceable): + name = 'storageserver' + + def __init__(self, store_dir): + precondition(os.path.isdir(store_dir)) + service.MultiService.__init__(self) + self._bucketstore = BucketStore(store_dir) + self._bucketstore.setServiceParent(self) + + def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser): + if self._bucketstore.has_bucket(verifierid): + raise BucketAlreadyExistsError() + lease = self._bucketstore.allocate_bucket(verifierid, bucket_num, size, leaser) + return lease + + def remote_get_bucket(self, verifierid): + return self._bucketstore.get_bucket(verifierid) diff --git a/allmydata/util/idlib.py b/allmydata/util/idlib.py new file mode 100644 index 00000000..b447a08d --- /dev/null +++ b/allmydata/util/idlib.py @@ -0,0 +1,7 @@ +from base64 import b32encode, b32decode + +def b2a(i): + return b32encode(i).lower() + +def a2b(i): + return b32decode(i.upper())