Source code for tests.common_tests

# -*- coding: utf-8 -*-
import warnings

warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.resetwarnings()


[docs]class Generic(object): """Generic class for unit testing :mod:`pydblite.pydblite` and :mod:`pydblite.sqlite`"""
[docs] def test_create_index(self): indices = self.filter_db.get_indices() self.assertEqual(indices, []) self.filter_db.create_index("name") indices = self.filter_db.get_indices() self.assertEqual(indices, ["name"]) self.filter_db.create_index("active") indices = self.filter_db.get_indices() self.assertTrue("active" in indices) self.assertTrue("name" in indices) self.assertEqual(len(indices), 2)
[docs] def test_delete_index(self): self.filter_db.create_index("name") self.filter_db.create_index("active") self.filter_db.delete_index("name") self.assertEqual(self.filter_db.get_indices(), ["active"]) self.filter_db.delete_index("active") self.assertEqual(self.filter_db.get_indices(), [])
[docs] def test_add_field(self): self.setup_db_for_filter() self.filter_db.add_field('age', column_type='INTEGER') # Check the value of age field for record that existed # before the new field was adde record = self.filter_db(unique_id=4)[0] self.assertEqual(record["age"], None) # Check value of age field for new record with age value status = {"unique_id": 110, 'age': 10} self.filter_db.insert(**status) record = self.filter_db(unique_id=110)[0] self.assertEqual(record["age"], 10) # Check value of age field for new record with no age value status = {"unique_id": 111} self.filter_db.insert(**status) record = self.filter_db(unique_id=111)[0] self.assertEqual(record["age"], None)
[docs] def test_add_field_with_default_value(self): self.setup_db_for_filter() self.filter_db.add_field('age', column_type='INTEGER', default=5) # Check the value of age field for record that existed # before the new field was adde record = self.filter_db(unique_id=4)[0] self.assertEqual(record["age"], 5) # Check value of age field for new record with age value status = {"unique_id": 110, 'age': 10} self.filter_db.insert(**status) record = self.filter_db(unique_id=110)[0] self.assertEqual(record["age"], 10) # Check value of age field for new record with no age value status = {"unique_id": 111} record_id = self.filter_db.insert(**status) record = self.filter_db[record_id] self.assertEqual(record["age"], 5)
[docs] def test_insert(self): status = {"unique_id": 1} res = self.filter_db.insert(**status) # First database record id should be 0 self.assertEqual(res, self.first_record_id) self.assertEqual(len(self.filter_db), 1) status = {"unique_id": 2} res = self.filter_db.insert(**status) # Second database record id should be 1 self.assertEqual(res, self.first_record_id + 1) self.assertEqual(len(self.filter_db), 2)
[docs] def test_insert_values_in_order(self): status = (1,) res = self.filter_db.insert(*status) # First database record id should be 0 self.assertEqual(res, self.first_record_id) self.assertEqual(len(self.filter_db), 1) status = (2, "name_value", False) res = self.filter_db.insert(*status) # Second database record id should be 1 self.assertEqual(res, self.first_record_id + 1) self.assertEqual(len(self.filter_db), 2) rec = self.filter_db[res] # Verify that the values inserted in order are correct self.assertEqual(rec["unique_id"], status[0]) self.assertEqual(rec["name"], status[1]) self.assertEqual(rec["active"], status[2])
[docs] def test_update(self): self.setup_db_for_filter() record = self.filter_db(unique_id=4)[0] self.filter_db.update(record, name=record['name'].upper()) self.assertEqual(self.filter_db[record["__id__"]]["name"], "TEST4")
[docs] def test_select(self): self.setup_db_for_filter() record = self.filter_db(unique_id=1)[0] self.assertTrue(record['active'] == 1 or record['active'] is True) self.assertEqual(record["unique_id"], 1) self.assertEqual(record["name"], 'Test0')
[docs] def test_select_unicode(self): status = {"unique_id": 1, "active": True, "name": u"éçùï"} self.filter_db.insert(**status) self.assertEqual(self.filter_db(name='foo'), []) self.assertEqual(self.filter_db(name=u'éçùï')[0]["unique_id"], 1)
[docs] def test_iter(self): self.setup_db_for_filter() self.assertEqual(len([x for x in self.filter_db]), len(self.filter_db)) records = self.filter_db() for r in records: self.assertEqual([x for x in self.filter_db if x['unique_id'] == r['unique_id']], self.filter_db(unique_id=r['unique_id'])) self.assertEqual([x for x in self.filter_db if x['name'] == r['name']], self.filter_db(name=r['name']))
[docs] def test_fetch(self): torrent_count = 10 for i in range(torrent_count): status = {"unique_id": i} self.filter_db.insert(**status) for i in range(torrent_count): records = self.filter_db(unique_id=i) self.assertEqual(records[0]["unique_id"], i)
[docs] def test_delete(self): status = {"unique_id": 1} self.filter_db.insert(**status) status = {"unique_id": 2} self.filter_db.insert(**status) records = self.filter_db(unique_id=1) deleted_count = self.filter_db.delete(records[0]) self.assertEqual(1, deleted_count) records = self.filter_db(unique_id=1) self.assertEqual(records, []) # Number of entries left should be 1 self.assertEqual(len(self.filter_db()), 1) records = self.filter_db(unique_id=2) deleted_count = self.filter_db.delete(records[0]) self.assertEqual(len(self.filter_db()), 0)
[docs] def test_del(self): status = {"unique_id": 1} rec = self.filter_db.insert(**status) del self.filter_db[rec] self.assertEqual(self.filter_db(name='non-existent'), []) self.assertEqual(len(self.filter_db), 0)
[docs] def reset_status_values_for_filter(self): self.status = [] self.status.append({"unique_id": 1, "active": True, "name": "Test0"}) self.status.append({"unique_id": 2, "active": True, "name": "Test0"}) self.status.append({"unique_id": 3, "active": True, "name": "test0"}) self.status.append({"unique_id": 4, "active": True, "name": "Test4"}) self.status.append({"unique_id": 5, "active": False, "name": "Test4"}) self.status.append({"unique_id": 6, "active": False, "name": "Test6"}) self.status.append({"unique_id": 7, "active": False, "name": "Test7"})
[docs] def setup_db_for_filter(self): self.reset_status_values_for_filter() res = self.filter_db.insert(self.status) self.assertTrue(res is None or res == 7) # Depends on the database driver...
[docs] def test_filter_len(self): self.setup_db_for_filter() f = self.filter_db.filter() self.assertEqual(len(f), len(self.status))
[docs] def test_filter_equals(self): self.setup_db_for_filter() self.assertEqual(len((self.filter_db("active") == True)), 4) # noqa
[docs] def test_filter_not_equals(self): self.setup_db_for_filter() self.assertEqual(len((self.filter_db("active") != True)), 3) # noqa
[docs] def test_filter_in(self): """Test IN ( == with a list""" self.setup_db_for_filter() self.assertEqual(len(self.filter_db("name") == ["Test4", "Test7"]), 3)
[docs] def test_filter_greater(self): self.setup_db_for_filter() self.assertEqual(len((self.filter_db("unique_id") > 4)), 3)
[docs] def test_filter_greater_equals(self): self.setup_db_for_filter() self.assertEqual(len((self.filter_db("unique_id") >= 4)), 4)
[docs] def test_filter_less(self): self.setup_db_for_filter() self.assertEqual(len((self.filter_db("unique_id") < 3)), 2)
[docs] def test_filter_less_equals(self): self.setup_db_for_filter() self.assertEqual(len((self.filter_db("unique_id") <= 3)), 3)
[docs] def test_filter_ilike(self): """Test text case sensitive""" self.setup_db_for_filter() self.assertEqual(len(self.filter_db("name").ilike("Test")), 6) self.assertEqual(len(self.filter_db("name").ilike("Test0")), 2)
[docs] def test_filter_like(self): """Test text case insensitive""" self.setup_db_for_filter() self.assertEqual(len(self.filter_db("name").like("Test")), 7) self.assertEqual(len(self.filter_db("name").like("Test0")), 3)
[docs] def test_filter_and(self): """Test AND""" self.setup_db_for_filter() f = (self.filter_db.filter() & (self.filter_db.filter(key="name") == "Test4")) self.assertEqual(str(f), "name = 'Test4'") f = ((self.filter_db.filter(key="name") == "Test4") & (self.filter_db.filter(key="active") == False)) # noqa self.assertEqual(str(f), "((active = 0) AND (name = 'Test4'))") self.assertEqual(len(f), 1)
[docs] def test_filter_or(self): """Test OR""" self.setup_db_for_filter() f = (self.filter_db.filter() | (self.filter_db.filter(key="name") == "Test4")) self.assertEqual(str(f), "name = 'Test4'") f = ((self.filter_db.filter(key="name") == "Test4") | (self.filter_db.filter(key="active") == False)) # noqa self.assertEqual(str(f), "((active = 0) OR (name = 'Test4'))") self.assertEqual(len(f), 4)
[docs] def test_len_with_filter(self): self.setup_db_for_filter() f = self.filter_db.filter() f |= (self.filter_db("active") == True) # noqa self.assertEquals(self.filter_db._len(f), 4)
[docs] def test_len_with_filter_non_matching(self): self.setup_db_for_filter() f = self.filter_db.filter() f |= (self.filter_db("unique_id") == -1) # Will not match any entries self.assertEqual(self.filter_db._len(f), 0)
[docs] def test_filter_get_unique_ids(self): self.setup_db_for_filter() ids = self.filter_db.get_unique_ids("name") self.assertEqual(ids, set(['Test0', 'Test6', 'Test4', 'Test7', 'test0']))
[docs] def test_filter_get_unique_ids_with_filter(self): self.setup_db_for_filter() f = self.filter_db.filter() f |= (self.filter_db("active") == True) # noqa ids = self.filter_db.get_unique_ids("name", f) self.assertEquals(ids, set(['Test0', 'Test4', 'test0']))
[docs] def test_filter_get_unique_ids_with_filter_non_matching(self): self.setup_db_for_filter() f = self.filter_db.filter() | (self.filter_db("unique_id") == -1) # Will not match any ids = self.filter_db.get_unique_ids("name", f) self.assertEqual(ids, set())
[docs] def make_group_count_result(self, key, test_key=None, test_val=None): counts = {} for e in self.status: if test_key and e[test_key] != test_val: continue counts[e[key]] = counts.get(e[key], 0) + 1 return counts
[docs] def test_get_group_count(self): self.setup_db_for_filter() counts = self.make_group_count_result("name") result = self.filter_db.get_group_count("name") for name, count in result: self.assertEqual(count, counts[name])
[docs] def test_get_group_count_with_filter(self): self.setup_db_for_filter() counts = self.make_group_count_result("name", test_key="active", test_val=True) f = self.filter_db.filter() | (self.filter_db("active") == True) # noqa result = self.filter_db.get_group_count("name", f) for name, count in result: self.assertEqual(count, counts[name])
[docs] def test_get_group_count_with_filter_non_matching(self): self.setup_db_for_filter() counts = self.make_group_count_result("name", test_key="unique_id", test_val=-1) f = self.filter_db.filter() | (self.filter_db("unique_id") == -1) result = self.filter_db.get_group_count("name", f) for name, count in result: self.assertEqual(count, counts[name])