self.dispatched_steps.append((steptype, lines))
return defer.succeed(None)
+class Reuse(unittest.TestCase):
+ def wq(self, testname):
+ return FakeWorkQueue("test_workqueue/Reuse/%s/workqueue" % testname)
+
+ def testOne(self):
+ wq = self.wq("testOne")
+ # steps must be retained from one session to the next
+ wq.add_upload_chk("source_filename", "box1")
+ wq.add_unlink_uri("someuri")
+ # files in the tmpdir are not: these are either in the process of
+ # being added or in the process of being removed.
+ tmpfile = os.path.join(wq.tmpdir, "foo")
+ f = open(tmpfile, "w")
+ f.write("foo")
+ f.close()
+ # files created with create_tempfile *are* retained, however
+ f, filename = wq.create_tempfile()
+ filename = os.path.join(wq.filesdir, filename)
+ f.write("bar")
+ f.close()
+
+ del wq
+ wq2 = self.wq("testOne")
+ steps = wq2.get_all_steps()
+ self.failUnlessEqual(steps[0], ("upload_chk",
+ ["source_filename", "box1"]))
+ self.failUnlessEqual(steps[1], ("unlink_uri", ["someuri"]))
+ self.failIf(os.path.exists(tmpfile))
+ self.failUnless(os.path.exists(filename))
+
+
class Items(unittest.TestCase):
def wq(self, testname):
return FakeWorkQueue("test_workqueue/Items/%s/workqueue" % testname)
f2.close()
self.failUnlessEqual(data, data2)
+ def testBox(self):
+ wq = self.wq("testBox")
+ boxname = wq.create_boxname()
+ wq.write_to_box(boxname, "contents of box")
+ self.failUnlessEqual(wq.read_from_box(boxname), "contents of box")
+
def testCHK(self):
wq = self.wq("testCHK")
wq.add_upload_chk("source_filename", "box1")