self._parent = parent
if self._parent:
self._log_number = self._parent.log("creating Encoder %s" % self)
+ self._aborted = False
def __repr__(self):
if hasattr(self, "_storage_index"):
d.addCallbacks(lambda res: self.done(), self.err)
return d
+ def abort(self):
+ self.log("aborting upload")
+ assert self._codec, "don't call abort before start"
+ self._aborted = True
+ # the next segment read (in _gather_data inside _encode_segment) will
+ # raise UploadAborted(), which will bypass the rest of the upload
+ # chain. If we've sent the final segment's shares, it's too late to
+ # abort. TODO: allow abort any time up to close_all_shareholders.
+
def _turn_barrier(self, res):
# putting this method in a Deferred chain imposes a guaranteed
# reactor turn between the pre- and post- portions of that chain.
with the combination of any 'previous_chunks' and the new chunks
which were gathered."""
+ if self._aborted:
+ raise UploadAborted()
+
if not num_chunks:
return defer.succeed(previous_chunks)
d = self._uploadable.read_encrypted(input_chunk_size)
def _got(data):
+ if self._aborted:
+ raise UploadAborted()
encrypted_pieces = []
length = 0
while data:
def err(self, f):
self.log("UNUSUAL: %s: upload failed: %s" % (self, f))
- if f.check(defer.FirstError):
- return f.value.subFailure
- return f
+ # we need to abort any remaining shareholders, so they'll delete the
+ # partial share, allowing someone else to upload it again.
+ self.log("aborting shareholders")
+ dl = []
+ for shareid in list(self.landlords.keys()):
+ d = self.landlords[shareid].abort()
+ d.addErrback(self._remove_shareholder, shareid, "abort")
+ dl.append(d)
+ d = self._gather_responses(dl)
+ def _done(res):
+ self.log("shareholders aborted")
+ if f.check(defer.FirstError):
+ return f.value.subFailure
+ return f
+ d.addCallback(_done)
+ return d
"""
return None
+ def abort():
+ """Abandon all the data that has been written.
+ """
+ return None
+
class RIBucketReader(RemoteInterface):
def read(offset=int, length=int):
return ShareData
class BucketWriter(Referenceable):
implements(RIBucketWriter)
- def __init__(self, ss, incominghome, finalhome, size, lease_info):
+ def __init__(self, ss, incominghome, finalhome, size, lease_info, canary):
self.ss = ss
self.incominghome = incominghome
self.finalhome = finalhome
self._size = size
+ self._canary = canary
+ self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
self.closed = False
self.throw_out_all_data = False
# touch the file, so later callers will see that we're working on it.
fileutil.rename(self.incominghome, self.finalhome)
self._sharefile = None
self.closed = True
+ self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
self.ss.bucket_writer_closed(self, filelen)
if not os.listdir(parentdir):
os.rmdir(parentdir)
+ def _disconnected(self):
+ if not self.closed:
+ self._abort()
+
+ def remote_abort(self):
+ log.msg("storage: aborting sharefile %s" % self.incominghome,
+ facility="tahoe.storage", level=log.UNUSUAL)
+ if not self.closed:
+ self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
+ self._abort()
+
+ def _abort(self):
+ if self.closed:
+ return
+ os.remove(self.incominghome)
+ # if we were the last share to be moved, remove the incoming/
+ # directory that was our parent
+ parentdir = os.path.split(self.incominghome)[0]
+ if not os.listdir(parentdir):
+ os.rmdir(parentdir)
+
+
class BucketReader(Referenceable):
implements(RIBucketReader)
# ok! we need to create the new share file.
fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
bw = BucketWriter(self, incominghome, finalhome,
- space_per_bucket, lease_info)
+ space_per_bucket, lease_info, canary)
if self.no_storage:
bw.throw_out_all_data = True
bucketwriters[shnum] = bw
def close(self):
return self._rref.callRemote("close")
+ def abort(self):
+ return self._rref.callRemote("abort")
+
class ReadBucketProxy:
implements(IStorageBucketReader)
def __init__(self, rref):
self.closed = True
return defer.maybeDeferred(_try)
+ def abort(self):
+ return defer.succeed(None)
+
def get_block(self, blocknum):
def _try():
assert isinstance(blocknum, (int, long))
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res):
self.failUnless(isinstance(res, Failure))
- self.failUnless(res.check(encode.NotEnoughPeersError))
+ self.failUnless(res.check(encode.NotEnoughPeersError), res)
d.addBoth(_done)
return d
from allmydata.interfaces import BadWriteEnablerError
from allmydata.test.common import LoggingServiceParent
+class FakeCanary:
+ def notifyOnDisconnect(self, *args, **kwargs):
+ pass
+ def dontNotifyOnDisconnect(self, marker):
+ pass
+
class Bucket(unittest.TestCase):
def make_workdir(self, name):
basedir = os.path.join("storage", "Bucket", name)
def test_create(self):
incoming, final = self.make_workdir("test_create")
- bw = BucketWriter(self, incoming, final, 200, self.make_lease())
+ bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
+ FakeCanary())
bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*25)
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
- bw = BucketWriter(self, incoming, final, 200, self.make_lease())
+ bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
+ FakeCanary())
bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*7) # last block may be short
final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
- bw = BucketWriter(self, incoming, final, size, self.make_lease())
+ bw = BucketWriter(self, incoming, final, size, self.make_lease(),
+ FakeCanary())
rb = RemoteBucket()
rb.target = bw
return bw, rb, final
cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
return ss.remote_allocate_buckets(storage_index,
renew_secret, cancel_secret,
- sharenums, size, Referenceable())
+ sharenums, size, FakeCanary())
def test_remove_incoming(self):
ss = self.create("test_remove_incoming")
self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
- canary = Referenceable()
+ canary = FakeCanary()
already,writers = self.allocate(ss, "vid", [0,1,2], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
def test_sizelimits(self):
ss = self.create("test_sizelimits", 5000)
- canary = Referenceable()
+ canary = FakeCanary()
# a newly created and filled share incurs this much overhead, beyond
# the size we request.
OVERHEAD = 3*4
def test_leases(self):
ss = self.create("test_leases")
- canary = Referenceable()
+ canary = FakeCanary()
sharenums = range(5)
size = 100
self._client = client
self._options = options
self._log_number = self._client.log("CHKUploader starting")
+ self._encoder = None
def set_params(self, encoding_parameters):
self._encoding_parameters = encoding_parameters
d.addCallback(_uploaded)
return d
+ def abort(self):
+ """Call this is the upload must be abandoned before it completes.
+ This will tell the shareholders to delete their partial shares. I
+ return a Deferred that fires when these messages have been acked."""
+ if not self._encoder:
+ # how did you call abort() before calling start() ?
+ return defer.succeed(None)
+ return self._encoder.abort()
+
def start_encrypted(self, encrypted):
eu = IEncryptedUploadable(encrypted)
- e = encode.Encoder(self._options, self)
+ self._encoder = e = encode.Encoder(self._options, self)
e.set_params(self._encoding_parameters)
d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders)
def __init__(self, encrypted_uploadable):
self._eu = IEncryptedUploadable(encrypted_uploadable)
self._offset = 0
+ self._bytes_read = 0
+ self._cutoff = None # set by debug options
+ self._cutoff_cb = None
def remote_get_size(self):
return self._eu.get_size()
def remote_read_encrypted(self, offset, length):
# we don't yet implement seek
assert offset == self._offset, "%d != %d" % (offset, self._offset)
+ if self._cutoff is not None and offset+length > self._cutoff:
+ self._cutoff_cb()
d = self._eu.read_encrypted(length)
def _read(strings):
- self._offset += sum([len(data) for data in strings])
+ size = sum([len(data) for data in strings])
+ self._bytes_read += size
+ self._offset += size
return strings
d.addCallback(_read)
return d
self.log("helper says we need to upload")
# we need to upload the file
reu = RemoteEncryptedUploabable(self._encuploadable)
+ if "debug_stash_RemoteEncryptedUploadable" in self._options:
+ self._options["RemoteEncryptedUploabable"] = reu
+ if "debug_interrupt" in self._options:
+ reu._cutoff = self._options["debug_interrupt"]
+ def _cutoff():
+ # simulate the loss of the connection to the helper
+ self.log("debug_interrupt killing connection to helper",
+ level=log.WEIRD)
+ upload_helper.tracker.broker.transport.loseConnection()
+ return
+ reu._cutoff_cb = _cutoff
d = upload_helper.callRemote("upload", reu)
# this Deferred will fire with the upload results
return d