+class Server(unittest.TestCase):
+ def test_duplicate(self):
+ i = IntroducerService()
+ ic1 = IntroducerClient(None,
+ "introducer.furl", u"my_nickname",
+ "ver23", "oldest_version", {})
+ furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
+
+ privkey_s, _ = keyutil.make_keypair()
+ privkey, _ = keyutil.parse_privkey(privkey_s)
+
+ ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
+ ann1_old = make_ann_t(ic1, furl1, privkey, seqnum=9)
+ ann1_new = make_ann_t(ic1, furl1, privkey, seqnum=11)
+ ann1_noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
+
+ i.remote_publish_v2(ann1, None)
+ all = i.get_announcements()
+ self.failUnlessEqual(len(all), 1)
+ self.failUnlessEqual(all[0].announcement["seqnum"], 10)
+ self.failUnlessEqual(i._debug_counts["inbound_message"], 1)
+ self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 0)
+ self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
+ self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
+ self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
+
+ i.remote_publish_v2(ann1, None)
+ all = i.get_announcements()
+ self.failUnlessEqual(len(all), 1)
+ self.failUnlessEqual(all[0].announcement["seqnum"], 10)
+ self.failUnlessEqual(i._debug_counts["inbound_message"], 2)
+ self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
+ self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
+ self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
+ self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
+
+ i.remote_publish_v2(ann1_old, None)
+ all = i.get_announcements()
+ self.failUnlessEqual(len(all), 1)
+ self.failUnlessEqual(all[0].announcement["seqnum"], 10)
+ self.failUnlessEqual(i._debug_counts["inbound_message"], 3)
+ self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
+ self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
+ self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
+ self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
+
+ i.remote_publish_v2(ann1_new, None)
+ all = i.get_announcements()
+ self.failUnlessEqual(len(all), 1)
+ self.failUnlessEqual(all[0].announcement["seqnum"], 11)
+ self.failUnlessEqual(i._debug_counts["inbound_message"], 4)
+ self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
+ self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
+ self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
+ self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
+
+ i.remote_publish_v2(ann1_noseqnum, None)
+ all = i.get_announcements()
+ self.failUnlessEqual(len(all), 1)
+ self.failUnlessEqual(all[0].announcement["seqnum"], 11)
+ self.failUnlessEqual(i._debug_counts["inbound_message"], 5)
+ self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
+ self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 1)
+ self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
+ self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
+
+