def get_sharesets_for_prefix(prefix):
"""
- Return an iterable containing IShareSet objects for all storage
- indices matching the given base-32 prefix, for which this backend
- holds shares.
- XXX This will probably need to return a Deferred, but for now it
- is synchronous.
+ Return a Deferred that fires with an iterable of IShareSet objects
+ for all storage indices matching the given base-32 prefix, for
+ which this backend holds shares.
+
+ A caller will typically perform operations that take locks on some
+ of the sharesets returned by this method. Nothing prevents sharesets
+ matching the prefix from being deleted or added between listing the
+ sharesets and taking any such locks; callers must be able to tolerate
+ this.
"""
def get_shareset(storageindex):
+from weakref import WeakValueDictionary
+
from twisted.application import service
from twisted.internet import defer
class Backend(service.MultiService):
def __init__(self):
service.MultiService.__init__(self)
+ self._lock_table = WeakValueDictionary()
+
+ def _get_lock(self, storage_index):
+ # Getting a shareset ensures that a lock exists for that storage_index.
+ # The _lock_table won't let go of an entry while the ShareSet (or any
+ # other objects that reference the lock) are live, or while it is locked.
+
+ lock = self._lock_table.get(storage_index, None)
+ if lock is None:
+ lock = defer.DeferredLock()
+ self._lock_table[storage_index] = lock
+ return lock
def must_use_tubid_as_permutation_seed(self):
# New backends cannot have been around before #466, and so have no backward
This class implements shareset logic that could work for all backends, but
might be useful to override for efficiency.
"""
- # TODO: queue operations on a shareset to ensure atomicity for each fully
- # successful operation (#1869).
- def __init__(self, storage_index):
+ def __init__(self, storage_index, lock):
self.storage_index = storage_index
+ self.lock = lock
def get_storage_index(self):
return self.storage_index
def make_bucket_reader(self, account, share):
return BucketReader(account, share)
+ def get_shares(self):
+ return self.lock.run(self._locked_get_shares)
+
+ def get_share(self, shnum):
+ return self.lock.run(self._locked_get_share, shnum)
+
+ def delete_share(self, shnum):
+ return self.lock.run(self._locked_delete_share, shnum)
+
def testv_and_readv_and_writev(self, write_enabler,
test_and_write_vectors, read_vector,
expiration_time, account):
+ return self.lock.run(self._locked_testv_and_readv_and_writev, write_enabler,
+ test_and_write_vectors, read_vector,
+ expiration_time, account)
+
+ def _locked_testv_and_readv_and_writev(self, write_enabler,
+ test_and_write_vectors, read_vector,
+ expiration_time, account):
# The implementation here depends on the following helper methods,
# which must be provided by subclasses:
#
# """create a mutable share with the given shnum and write_enabler"""
sharemap = {}
- d = self.get_shares()
+ d = self._locked_get_shares()
def _got_shares( (shares, corrupted) ):
d2 = defer.succeed(None)
for share in shares:
return d
def readv(self, wanted_shnums, read_vector):
+ return self.lock.run(self._locked_readv, wanted_shnums, read_vector)
+
+ def _locked_readv(self, wanted_shnums, read_vector):
"""
Read a vector from the numbered shares in this shareset. An empty
shares list means to return data from all known shares.
"""
shnums = []
dreads = []
- d = self.get_shares()
+ d = self._locked_get_shares()
def _got_shares( (shares, corrupted) ):
# We ignore corrupted shares.
for share in shares:
# XXX we want this to be deterministic, so we return the sharesets sorted
# by their si_strings, but we shouldn't need to explicitly re-sort them
# because list_objects returns a sorted list.
- return [CloudShareSet(si_a2b(s), self._container, self._incomingset) for s in sorted(si_strings)]
+ return [self.get_shareset(si_a2b(s)) for s in sorted(si_strings)]
d.addCallback(_get_sharesets)
return d
def get_shareset(self, storage_index):
- return CloudShareSet(storage_index, self._container, self._incomingset)
+ return CloudShareSet(storage_index, self._get_lock(storage_index),
+ self._container, self._incomingset)
def fill_in_space_stats(self, stats):
# TODO: query space usage of container if supported.
class CloudShareSet(ShareSet):
implements(IShareSet)
- def __init__(self, storage_index, container, incomingset):
- ShareSet.__init__(self, storage_index)
+ def __init__(self, storage_index, lock, container, incomingset):
+ ShareSet.__init__(self, storage_index, lock)
self._container = container
self._incomingset = incomingset
self._key = get_share_key(storage_index)
def get_overhead(self):
return 0
- def get_shares(self):
+ def _locked_get_shares(self):
d = self._container.list_objects(prefix=self._key)
def _get_shares(res):
si = self.get_storage_index()
d.addCallback(lambda shares: (shares, set()) )
return d
- def get_share(self, shnum):
+ def _locked_get_share(self, shnum):
key = "%s%d" % (self._key, shnum)
d = self._container.list_objects(prefix=key)
def _get_share(res):
d.addCallback(_get_share)
return d
- def delete_share(self, shnum):
+ def _locked_delete_share(self, shnum):
key = "%s%d" % (self._key, shnum)
return delete_chunks(self._container, key)
def get_shareset(self, storage_index):
sharehomedir = si_si2dir(self._sharedir, storage_index)
incominghomedir = si_si2dir(self._incomingdir, storage_index)
- return DiskShareSet(storage_index, sharehomedir, incominghomedir)
+ return DiskShareSet(storage_index, self._get_lock(storage_index), sharehomedir, incominghomedir)
def fill_in_space_stats(self, stats):
stats['storage_server.reserved_space'] = self._reserved_space
class DiskShareSet(ShareSet):
implements(IShareSet)
- def __init__(self, storage_index, sharehomedir, incominghomedir=None):
- ShareSet.__init__(self, storage_index)
+ def __init__(self, storage_index, lock, sharehomedir, incominghomedir=None):
+ ShareSet.__init__(self, storage_index, lock)
self._sharehomedir = sharehomedir
self._incominghomedir = incominghomedir
return (fileutil.get_used_space(self._sharehomedir) +
fileutil.get_used_space(self._incominghomedir))
- def get_shares(self):
+ def _locked_get_shares(self):
si = self.get_storage_index()
shares = {}
corrupted = set()
valid = [shares[shnum] for shnum in sorted(shares.keys())]
return defer.succeed( (valid, corrupted) )
- def get_share(self, shnum):
+ def _locked_get_share(self, shnum):
return get_disk_share(os.path.join(self._sharehomedir, str(shnum)),
self.get_storage_index(), shnum)
- def delete_share(self, shnum):
+ def _locked_delete_share(self, shnum):
fileutil.remove(os.path.join(self._sharehomedir, str(shnum)))
return defer.succeed(None)
def get_shareset(self, storage_index):
shareset = self._sharesets.get(storage_index, None)
if shareset is None:
- shareset = NullShareSet(storage_index)
+ shareset = NullShareSet(storage_index, self._get_lock(storage_index))
self._sharesets[storage_index] = shareset
return shareset
class NullShareSet(ShareSet):
implements(IShareSet)
- def __init__(self, storage_index):
- self.storage_index = storage_index
+ def __init__(self, storage_index, lock):
+ ShareSet.__init__(self, storage_index, lock)
self._incoming_shnums = set()
self._immutable_shnums = set()
self._mutable_shnums = set()
def get_overhead(self):
return 0
- def get_shares(self):
+ def _locked_get_shares(self):
shares = {}
for shnum in self._immutable_shnums:
shares[shnum] = ImmutableNullShare(self, shnum)
# This backend never has any corrupt shares.
return defer.succeed( ([shares[shnum] for shnum in sorted(shares.keys())], set()) )
- def get_share(self, shnum):
+ def _locked_get_share(self, shnum):
if shnum in self._immutable_shnums:
return defer.succeed(ImmutableNullShare(self, shnum))
elif shnum in self._mutable_shnums:
def _not_found(): raise IndexError("no such share %d" % (shnum,))
return defer.execute(_not_found)
- def delete_share(self, shnum, include_incoming=False):
+ def _locked_delete_share(self, shnum, include_incoming=False):
if include_incoming and (shnum in self._incoming_shnums):
self._incoming_shnums.remove(shnum)
if shnum in self._immutable_shnums: