]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
rerecord all the storageserver patches in one go
authorRob Kinninmont <robk@allmydata.com>
Fri, 1 Dec 2006 03:14:23 +0000 (20:14 -0700)
committerRob Kinninmont <robk@allmydata.com>
Fri, 1 Dec 2006 03:14:23 +0000 (20:14 -0700)
darcs was dying trying to deal with the conflict resolution patches.

this adds a (very rough) bucketstore and storageserver.
probably needs lots of work both in api and implementation.

allmydata/bucketstore.py [new file with mode: 0644]
allmydata/client.py
allmydata/storageserver.py [new file with mode: 0644]
allmydata/util/idlib.py [new file with mode: 0644]

diff --git a/allmydata/bucketstore.py b/allmydata/bucketstore.py
new file mode 100644 (file)
index 0000000..fcbc19a
--- /dev/null
@@ -0,0 +1,122 @@
+import os
+
+from foolscap import Referenceable
+from twisted.application import service
+from twisted.python.failure import Failure
+from allmydata.util import idlib
+
+from amdlib.util.assertutil import precondition
+
+class NoSuchBucketError(Failure):
+    pass
+
+class BucketStore(service.MultiService, Referenceable):
+    def __init__(self, store_dir):
+        precondition(os.path.isdir(store_dir))
+        service.MultiService.__init__(self)
+        self._store_dir = store_dir
+
+        self._buckets = {} # v_id -> Bucket()
+        self._leases = set() # should do weakref dances.
+
+    def _get_bucket_dir(self, verifierid):
+        avid = idlib.b2a(verifierid)
+        return os.path.join(self._store_dir, avid)
+
+    def has_bucket(self, verifierid):
+        return os.path.exists(self._get_bucket_dir(verifierid))
+
+    def allocate_bucket(self, verifierid, bucket_num, size, leaser_credentials):
+        bucket_dir = self._get_bucket_dir(verifierid)
+        precondition(not os.path.exists(bucket_dir))
+        precondition(isinstance(bucket_num, int))
+        bucket = Bucket(bucket_dir, verifierid, bucket_num, size)
+        self._buckets[verifierid] = bucket
+        bucket.set_leaser(leaser_credentials)
+        lease = Lease(verifierid, leaser_credentials, bucket)
+        self._leases.add(lease)
+        return lease
+
+    def get_bucket(self, verifierid):
+        # for now, only returns those created by this process, in this run
+        bucket = self._buckets.get(verifierid)
+        if bucket:
+            return BucketReader(bucket)
+        else:
+            return NoSuchBucketError()
+
+class Lease(Referenceable):
+    def __init__(self, verifierid, leaser, bucket):
+        self._leaser = leaser
+        self._verifierid = verifierid
+        self._bucket = bucket
+
+    def get_bucket(self):
+        return self._bucket
+
+    def remote_write(self, data):
+        self._bucket.write(data)
+
+    def remote_finalise(self):
+        self._bucket.finalise()
+
+class BucketReader(Referenceable):
+    def __init__(self, bucket):
+        self._bucket = bucket
+
+    def remote_get_bucket_num(self):
+        return self._bucket.get_bucket_num()
+
+    def remote_read(self):
+        return self._bucket.read()
+
+class Bucket:
+    def __init__(self, bucket_dir, verifierid, bucket_num, size):
+        os.mkdir(bucket_dir)
+        self._bucket_dir = bucket_dir
+        self._size = size
+        self._verifierid = verifierid
+
+        self._data = file(os.path.join(self._bucket_dir, 'data'), 'wb')
+        self._bytes_written = 0
+
+        self._write_attr('bucket_num', str(bucket_num))
+
+    def _write_attr(self, name, val):
+        f = file(os.path.join(self._bucket_dir, 'name'), 'wb')
+        f.write(val)
+        f.close()
+
+    def _read_attr(self, name):
+        f = file(os.path.join(self._bucket_dir, 'name'), 'wb')
+        data = f.read()
+        f.close()
+        return data
+
+    def set_leaser(self, leaser):
+        f = file(os.path.join(self._bucket_dir, 'leases'), 'wb')
+        f.write(leaser)
+        f.close()
+
+    def write(self, data):
+        precondition(len(data) + self._bytes_written <= self._size)
+        self._data.write(data)
+        self._data.flush()
+
+    def finalise(self):
+        precondition(self._bytes_written == self._size)
+        self._data.close()
+
+    def is_complete(self):
+        return os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size
+
+    def get_bucket_num(self):
+        return int(self._read_attr('bucket_num'))
+
+    def read(self):
+        precondition(self.is_complete())
+        f = file(os.path.join(self._bucket_dir, 'data'), 'rb')
+        data = f.read()
+        f.close()
+        return data
+
index 909c55c2a19328ba63906dd0c68bbb72b26e831e..85426f68a9b9f73c6e9ed79c5e602fec7e4b3f73 100644 (file)
@@ -9,12 +9,12 @@ from twisted.internet import reactor
 from twisted.internet.base import BlockingResolver
 reactor.installResolver(BlockingResolver())
 
-class Storage(service.MultiService, Referenceable):
-    name = "storage"
-    pass
+from allmydata.storageserver import StorageServer
 
 class Client(service.MultiService, Referenceable):
     CERTFILE = "client.pem"
+    AUTHKEYSFILE = "authorized_keys"
+    STOREDIR = 'storage'
 
     def __init__(self, queen_pburl):
         service.MultiService.__init__(self)
@@ -31,7 +31,7 @@ class Client(service.MultiService, Referenceable):
         self.queen = None # self.queen is either None or a RemoteReference
         self.all_peers = set()
         self.connections = {}
-        s = Storage()
+        s = StorageServer(self.STOREDIR)
         s.setServiceParent(self)
 
         AUTHKEYSFILEBASE = "authorized_keys."
diff --git a/allmydata/storageserver.py b/allmydata/storageserver.py
new file mode 100644 (file)
index 0000000..4f1a208
--- /dev/null
@@ -0,0 +1,30 @@
+import os
+
+from foolscap import Referenceable
+from twisted.application import service
+from twisted.python.failure import Failure
+
+from amdlib.util.assertutil import precondition
+
+from allmydata.bucketstore import BucketStore
+
+class BucketAlreadyExistsError(Exception):
+    pass
+
+class StorageServer(service.MultiService, Referenceable):
+    name = 'storageserver'
+
+    def __init__(self, store_dir):
+        precondition(os.path.isdir(store_dir))
+        service.MultiService.__init__(self)
+        self._bucketstore = BucketStore(store_dir)
+        self._bucketstore.setServiceParent(self)
+
+    def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser):
+        if self._bucketstore.has_bucket(verifierid):
+            raise BucketAlreadyExistsError()
+        lease = self._bucketstore.allocate_bucket(verifierid, bucket_num, size, leaser)
+        return lease
+
+    def remote_get_bucket(self, verifierid):
+        return self._bucketstore.get_bucket(verifierid)
diff --git a/allmydata/util/idlib.py b/allmydata/util/idlib.py
new file mode 100644 (file)
index 0000000..b447a08
--- /dev/null
@@ -0,0 +1,7 @@
+from base64 import b32encode, b32decode
+
+def b2a(i):
+    return b32encode(i).lower()
+
+def a2b(i):
+    return b32decode(i.upper())