--- /dev/null
+import os
+
+from foolscap import Referenceable
+from twisted.application import service
+from twisted.python.failure import Failure
+from allmydata.util import idlib
+
+from amdlib.util.assertutil import precondition
+
+class NoSuchBucketError(Failure):
+ pass
+
+class BucketStore(service.MultiService, Referenceable):
+ def __init__(self, store_dir):
+ precondition(os.path.isdir(store_dir))
+ service.MultiService.__init__(self)
+ self._store_dir = store_dir
+
+ self._buckets = {} # v_id -> Bucket()
+ self._leases = set() # should do weakref dances.
+
+ def _get_bucket_dir(self, verifierid):
+ avid = idlib.b2a(verifierid)
+ return os.path.join(self._store_dir, avid)
+
+ def has_bucket(self, verifierid):
+ return os.path.exists(self._get_bucket_dir(verifierid))
+
+ def allocate_bucket(self, verifierid, bucket_num, size, leaser_credentials):
+ bucket_dir = self._get_bucket_dir(verifierid)
+ precondition(not os.path.exists(bucket_dir))
+ precondition(isinstance(bucket_num, int))
+ bucket = Bucket(bucket_dir, verifierid, bucket_num, size)
+ self._buckets[verifierid] = bucket
+ bucket.set_leaser(leaser_credentials)
+ lease = Lease(verifierid, leaser_credentials, bucket)
+ self._leases.add(lease)
+ return lease
+
+ def get_bucket(self, verifierid):
+ # for now, only returns those created by this process, in this run
+ bucket = self._buckets.get(verifierid)
+ if bucket:
+ return BucketReader(bucket)
+ else:
+ return NoSuchBucketError()
+
+class Lease(Referenceable):
+ def __init__(self, verifierid, leaser, bucket):
+ self._leaser = leaser
+ self._verifierid = verifierid
+ self._bucket = bucket
+
+ def get_bucket(self):
+ return self._bucket
+
+ def remote_write(self, data):
+ self._bucket.write(data)
+
+ def remote_finalise(self):
+ self._bucket.finalise()
+
+class BucketReader(Referenceable):
+ def __init__(self, bucket):
+ self._bucket = bucket
+
+ def remote_get_bucket_num(self):
+ return self._bucket.get_bucket_num()
+
+ def remote_read(self):
+ return self._bucket.read()
+
+class Bucket:
+ def __init__(self, bucket_dir, verifierid, bucket_num, size):
+ os.mkdir(bucket_dir)
+ self._bucket_dir = bucket_dir
+ self._size = size
+ self._verifierid = verifierid
+
+ self._data = file(os.path.join(self._bucket_dir, 'data'), 'wb')
+ self._bytes_written = 0
+
+ self._write_attr('bucket_num', str(bucket_num))
+
+ def _write_attr(self, name, val):
+ f = file(os.path.join(self._bucket_dir, 'name'), 'wb')
+ f.write(val)
+ f.close()
+
+ def _read_attr(self, name):
+ f = file(os.path.join(self._bucket_dir, 'name'), 'wb')
+ data = f.read()
+ f.close()
+ return data
+
+ def set_leaser(self, leaser):
+ f = file(os.path.join(self._bucket_dir, 'leases'), 'wb')
+ f.write(leaser)
+ f.close()
+
+ def write(self, data):
+ precondition(len(data) + self._bytes_written <= self._size)
+ self._data.write(data)
+ self._data.flush()
+
+ def finalise(self):
+ precondition(self._bytes_written == self._size)
+ self._data.close()
+
+ def is_complete(self):
+ return os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size
+
+ def get_bucket_num(self):
+ return int(self._read_attr('bucket_num'))
+
+ def read(self):
+ precondition(self.is_complete())
+ f = file(os.path.join(self._bucket_dir, 'data'), 'rb')
+ data = f.read()
+ f.close()
+ return data
+
--- /dev/null
+import os
+
+from foolscap import Referenceable
+from twisted.application import service
+from twisted.python.failure import Failure
+
+from amdlib.util.assertutil import precondition
+
+from allmydata.bucketstore import BucketStore
+
+class BucketAlreadyExistsError(Exception):
+ pass
+
+class StorageServer(service.MultiService, Referenceable):
+ name = 'storageserver'
+
+ def __init__(self, store_dir):
+ precondition(os.path.isdir(store_dir))
+ service.MultiService.__init__(self)
+ self._bucketstore = BucketStore(store_dir)
+ self._bucketstore.setServiceParent(self)
+
+ def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser):
+ if self._bucketstore.has_bucket(verifierid):
+ raise BucketAlreadyExistsError()
+ lease = self._bucketstore.allocate_bucket(verifierid, bucket_num, size, leaser)
+ return lease
+
+ def remote_get_bucket(self, verifierid):
+ return self._bucketstore.get_bucket(verifierid)