From: Brian Warner Date: Fri, 1 Dec 2006 11:06:11 +0000 (-0700) Subject: upload: add WriterProxy X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~502 X-Git-Url: https://git.rkrishnan.org/...?a=commitdiff_plain;h=f87008cf364a3a25c2ba97dcaaa8b9327a8e737d;p=tahoe-lafs%2Ftahoe-lafs.git upload: add WriterProxy --- diff --git a/allmydata/test/test_upload.py b/allmydata/test/test_upload.py index e0f522af..efcbd74e 100644 --- a/allmydata/test/test_upload.py +++ b/allmydata/test/test_upload.py @@ -52,6 +52,13 @@ class NextPeer(unittest.TestCase): "good", # 4 ] + def compare_landlords(self, u, c, expected): + exp = [(peerid, bucketnum, c.peers[peerid]) + for peerid, bucketnum in expected] + landlords = [(peerid, bucketnum, proxy.remote_bucket) + for peerid, bucketnum, proxy in u.landlords] + self.failUnlessEqual(landlords, exp) + def test_0(self): c = FakeClient([]) u = NextPeerUploader(c) @@ -74,10 +81,9 @@ class NextPeer(unittest.TestCase): d = u.start() def _check(res): self.failUnlessEqual(u.goodness_points, 2) - self.failUnlessEqual(u.landlords, - [(0, 0, c.peers[0]), - (4, 1, c.peers[4]), - ]) + self.compare_landlords(u, c, [(0, 0), + (4, 1), + ]) d.addCallback(_check) return d @@ -90,11 +96,10 @@ class NextPeer(unittest.TestCase): d = u.start() def _check(res): self.failUnlessEqual(u.goodness_points, 3) - self.failUnlessEqual(u.landlords, - [(0, 0, c.peers[0]), - (4, 1, c.peers[4]), - (0, 2, c.peers[0]), - ]) + self.compare_landlords(u, c, [(0, 0), + (4, 1), + (0, 2), + ]) d.addCallback(_check) return d @@ -114,11 +119,10 @@ class NextPeer(unittest.TestCase): d = u.start() def _check(res): self.failUnlessEqual(u.goodness_points, 3) - self.failUnlessEqual(u.landlords, - [(0, 0, c.peers[0]), - (3, 1, c.peers[3]), - (0, 2, c.peers[0]), - ]) + self.compare_landlords(u, c, [(0, 0), + (3, 1), + (0, 2), + ]) d.addCallback(_check) return d diff --git a/allmydata/upload.py b/allmydata/upload.py index 6b69ee03..f9f91567 100644 --- a/allmydata/upload.py +++ b/allmydata/upload.py @@ -13,6 +13,30 @@ class HaveAllPeersError(Exception): class TooFullError(Exception): pass +class WriterProxy: + # make this look like a writable file + def __init__(self, remote_bucket): + self.remote_bucket = remote_bucket + self.good = True + + def _broken(self, why): + self.good = False + # do something else here + return why + + def write(self, data): + d = self.remote_bucket.callRemote("write", data=data) + d.addErrback(self._broken) + + def seek(self, offset, whence=0): + d = self.remote_bucket.callRemote("seek", offset=offset, whence=whence) + d.addErrback(self._broken) + + def close(self): + if self._broken: + raise SomethingBrokeError() + d = self.remote_bucket.callRemote("close") + d.addErrback(self._broken) class Uploader: debug = False @@ -20,6 +44,9 @@ class Uploader: def __init__(self, peer): self._peer = peer + def set_encoder(self, encoder): + self._encoder = encoder + def set_verifierid(self, vid): assert isinstance(vid, str) self._verifierid = vid @@ -33,7 +60,7 @@ class Uploader: def start(self): - # who should we upload to? + # first step: who should we upload to? # maybe limit max_peers to 2*len(self.shares), to reduce memory # footprint @@ -71,7 +98,8 @@ class Uploader: def _allocate_response(bucket): if self.debug: print " peerid %s will grant us a lease" % peerid - self.landlords.append( (peerid, bucket_num, bucket) ) + writer = WriterProxy(bucket) + self.landlords.append( (peerid, bucket_num, writer) ) self.goodness_points += 1 if self.goodness_points >= self.target_goodness: if self.debug: print " we're done!"