service.MultiService.__init__(self)
self._store_dir = store_dir
- self._buckets = {} # v_id -> Bucket()
self._leases = set() # should do weakref dances.
def _get_bucket_dir(self, verifierid):
bucket_dir = self._get_bucket_dir(verifierid)
precondition(not os.path.exists(bucket_dir))
precondition(isinstance(bucket_num, int))
- bucket = Bucket(bucket_dir, verifierid, bucket_num, size)
- self._buckets[verifierid] = bucket
+ bucket = WriteBucket(bucket_dir, verifierid, bucket_num, size)
bucket.set_leaser(leaser_credentials)
lease = Lease(verifierid, leaser_credentials, bucket)
self._leases.add(lease)
def get_bucket(self, verifierid):
# for now, only returns those created by this process, in this run
- bucket = self._buckets.get(verifierid)
- if bucket:
- precondition(bucket.is_complete())
- return BucketReader(bucket)
- elif os.path.exists(self._get_bucket_dir(verifierid)):
- bucket_dir = self._get_bucket_dir(verifierid)
- bucket = Bucket(bucket_dir, verifierid, None, None)
- return BucketReader(bucket)
+ bucket_dir = self._get_bucket_dir(verifierid)
+ if os.path.exists(bucket_dir):
+ return BucketReader(ReadBucket(bucket_dir, verifierid))
else:
return NoSuchBucketError()
return self._bucket.read()
class Bucket:
- def __init__(self, bucket_dir, verifierid, bucket_num, size):
- if not os.path.isdir(bucket_dir):
- os.mkdir(bucket_dir)
+ def __init__(self, bucket_dir, verifierid):
self._bucket_dir = bucket_dir
self._verifierid = verifierid
- if size is not None:
- self._size = size
- self._data = file(os.path.join(self._bucket_dir, 'data'), 'wb')
- self._bytes_written = 0
- else:
- precondition(os.path.exists(os.path.join(self._bucket_dir, 'closed')))
- self._size = os.path.getsize(os.path.join(self._bucket_dir, 'data'))
- self._bytes_written = self._size
-
- if bucket_num is not None:
- self._write_attr('bucket_num', str(bucket_num))
- #else:
- #bucket_num = int(self._read_attr('bucket_num'))
-
def _write_attr(self, name, val):
f = file(os.path.join(self._bucket_dir, name), 'wb')
f.write(val)
f.close()
return data
+ def is_complete(self):
+ return os.path.exists(os.path.join(self._bucket_dir, 'closed'))
+
+class WriteBucket(Bucket):
+ def __init__(self, bucket_dir, verifierid, bucket_num, size):
+ Bucket.__init__(self, bucket_dir, verifierid)
+ precondition(not os.path.exists(bucket_dir))
+ os.mkdir(bucket_dir)
+
+ self._size = size
+ self._data = file(os.path.join(self._bucket_dir, 'data'), 'wb')
+ self._bytes_written = 0
+
+ self._write_attr('bucket_num', str(bucket_num))
+
def set_leaser(self, leaser):
- f = file(os.path.join(self._bucket_dir, 'leases'), 'wb')
- f.write(leaser)
- f.close()
+ self._write_attr('leases', leaser)
def write(self, data):
precondition(len(data) + self._bytes_written <= self._size)
self._write_attr('closed', '')
def is_complete(self):
- return os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size
+ complete = Bucket.is_complete(self)
+ if complete:
+ _assert(os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size)
+ return complete
+
+class ReadBucket(Bucket):
+ def __init__(self, bucket_dir, verifierid):
+ Bucket.__init__(self, bucket_dir, verifierid)
+ precondition(self.is_complete()) # implicitly asserts bucket_dir exists
def get_bucket_num(self):
return int(self._read_attr('bucket_num'))
def read(self):
- precondition(self.is_complete())
- f = file(os.path.join(self._bucket_dir, 'data'), 'rb')
- data = f.read()
- f.close()
- return data
+ return self._read_attr('data')