Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
add unit test and integration test for large data
  • Loading branch information
Julia Zakharova committed Jan 16, 2023
commit 057a0c23ffe1211adca6e6fa5f7f97af79dbce28
50 changes: 34 additions & 16 deletions tests/integration/long/test_large_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from cassandra.query import SimpleStatement
from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster
from tests.integration.long.utils import create_schema

import unittest

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -105,13 +104,13 @@ def test_wide_rows(self):
session = self.make_session_and_keyspace()
session.execute('CREATE TABLE %s (k INT, i INT, PRIMARY KEY(k, i))' % table)

prepared = session.prepare('INSERT INTO %s (k, i) VALUES (0, ?)' % (table, ))
prepared = session.prepare('INSERT INTO %s (k, i) VALUES (0, ?)' % (table,))

# Write via async futures
self.batch_futures(session, (prepared.bind((i, )) for i in range(100000)))
self.batch_futures(session, (prepared.bind((i,)) for i in range(100000)))

# Read
results = session.execute('SELECT i FROM %s WHERE k=0' % (table, ))
results = session.execute('SELECT i FROM %s WHERE k=0' % (table,))

# Verify
for i, row in enumerate(results):
Expand Down Expand Up @@ -146,11 +145,11 @@ def test_wide_batch_rows(self):

# Execute insert with larger timeout, since it's a wide row
try:
session.execute(statement,timeout=30.0)
session.execute(statement, timeout=30.0)

except OperationTimedOut:
#If we timeout on insertion that's bad but it could be just slow underlying c*
#Attempt to validate anyway, we will fail if we don't get the right data back.
# If we timeout on insertion that's bad but it could be just slow underlying c*
# Attempt to validate anyway, we will fail if we don't get the right data back.
ex_type, ex, tb = sys.exc_info()
log.warning("Batch wide row insertion timed out, this may require additional investigation")
log.warning("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
Expand All @@ -160,12 +159,13 @@ def test_wide_batch_rows(self):
results = session.execute('SELECT i FROM %s WHERE k=%s' % (table, 0))
lastvalue = 0
for j, row in enumerate(results):
lastValue=row['i']
lastValue = row['i']
self.assertEqual(lastValue, j)

#check the last value make sure it's what we expect
index_value = to_insert-1
self.assertEqual(lastValue,index_value,"Verification failed only found {0} inserted we were expecting {1}".format(j,index_value))
# check the last value make sure it's what we expect
index_value = to_insert - 1
self.assertEqual(lastValue, index_value,
"Verification failed only found {0} inserted we were expecting {1}".format(j, index_value))

session.cluster.shutdown()

Expand All @@ -188,21 +188,23 @@ def test_wide_byte_rows(self):

# Prepare statement and run insertions
to_insert = 100000
prepared = session.prepare('INSERT INTO %s (k, i, v) VALUES (0, ?, 0xCAFE)' % (table, ))
timeouts = self.batch_futures(session, (prepared.bind((i, )) for i in range(to_insert)))
prepared = session.prepare('INSERT INTO %s (k, i, v) VALUES (0, ?, 0xCAFE)' % (table,))
timeouts = self.batch_futures(session, (prepared.bind((i,)) for i in range(to_insert)))

# Read
results = session.execute('SELECT i, v FROM %s WHERE k=0' % (table, ))
results = session.execute('SELECT i, v FROM %s WHERE k=0' % (table,))

# number of expected results
expected_results = to_insert-timeouts-1
expected_results = to_insert - timeouts - 1

# Verify
bb = pack('>H', 0xCAFE)
for i, row in enumerate(results):
self.assertEqual(row['v'], bb)

self.assertGreaterEqual(i, expected_results, "Verification failed only found {0} inserted we were expecting {1}".format(i,expected_results))
self.assertGreaterEqual(i, expected_results,
"Verification failed only found {0} inserted we were expecting {1}".format(i,
expected_results))

session.cluster.shutdown()

Expand Down Expand Up @@ -269,3 +271,19 @@ def test_wide_table(self):
self.assertEqual(row[create_column_name(i)], i)

session.cluster.shutdown()

def test_error(self):
"""
Test for inserting large number of rows

@test_category queries
"""
table = 'large_table'
session = self.make_session_and_keyspace()
session.execute('CREATE TABLE %s (k int PRIMARY KEY, date_u timestamp)' % table)
date_last = '2021-01-28 11:24:00'
date_next = '2020-01-01 11:24:00'
for i in range(100):
session.execute("INSERT INTO %s (k, date_u) VALUES (%s, '%s')" % (table, i, date_last),timeout=10000)
results = session.execute("SELECT k FROM %s WHERE date_u <'%s' allow filtering" % (table, date_next), timeout=10000)
session.cluster.shutdown()
38 changes: 29 additions & 9 deletions tests/unit/test_resultset.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ def test_iter_non_paged(self):
def test_iter_paged(self):
expected = list(range(10))
response_future = Mock(has_more_pages=True, _continuous_paging_session=None)
response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
response_future.result.side_effect = (ResultSet(Mock(), expected[
-5:]),) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
rs = ResultSet(response_future, expected[:5])
itr = iter(rs)
# this is brittle, depends on internal impl details. Would like to find a better way
type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, False)) # after init to avoid side effects being consumed by init
type(response_future).has_more_pages = PropertyMock(
side_effect=(True, True, False)) # after init to avoid side effects being consumed by init
self.assertListEqual(list(itr), expected)

def test_iter_paged_with_empty_pages(self):
Expand Down Expand Up @@ -63,18 +65,21 @@ def test_list_paged(self):
# list access on RS for backwards-compatibility
expected = list(range(10))
response_future = Mock(has_more_pages=True, _continuous_paging_session=None)
response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
response_future.result.side_effect = (ResultSet(Mock(), expected[
-5:]),) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
rs = ResultSet(response_future, expected[:5])
# this is brittle, depends on internal impl details. Would like to find a better way
type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, True, False)) # First two True are consumed on check entering list mode
type(response_future).has_more_pages = PropertyMock(
side_effect=(True, True, True, False)) # First two True are consumed on check entering list mode
self.assertEqual(rs[9], expected[9])
self.assertEqual(list(rs), expected)

def test_has_more_pages(self):
response_future = Mock()
response_future.has_more_pages.side_effect = PropertyMock(side_effect=(True, False))
rs = ResultSet(response_future, [])
type(response_future).has_more_pages = PropertyMock(side_effect=(True, False)) # after init to avoid side effects being consumed by init
type(response_future).has_more_pages = PropertyMock(
side_effect=(True, False)) # after init to avoid side effects being consumed by init
self.assertTrue(rs.has_more_pages)
self.assertFalse(rs.has_more_pages)

Expand All @@ -96,7 +101,8 @@ def test_iterate_then_index(self):

# RuntimeError if indexing during or after pages
response_future = Mock(has_more_pages=True, _continuous_paging_session=None)
response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
response_future.result.side_effect = (ResultSet(Mock(), expected[
-5:]),) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
rs = ResultSet(response_future, expected[:5])
type(response_future).has_more_pages = PropertyMock(side_effect=(True, False))
itr = iter(rs)
Expand Down Expand Up @@ -129,10 +135,12 @@ def test_index_list_mode(self):

# pages
response_future = Mock(has_more_pages=True, _continuous_paging_session=None)
response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
response_future.result.side_effect = (ResultSet(Mock(), expected[
-5:]),) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
rs = ResultSet(response_future, expected[:5])
# this is brittle, depends on internal impl details. Would like to find a better way
type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, True, False)) # First two True are consumed on check entering list mode
type(response_future).has_more_pages = PropertyMock(
side_effect=(True, True, True, False)) # First two True are consumed on check entering list mode
# index access before iteration causes list to be materialized
self.assertEqual(rs[0], expected[0])
self.assertEqual(rs[9], expected[9])
Expand All @@ -157,7 +165,8 @@ def test_eq(self):

# pages
response_future = Mock(has_more_pages=True, _continuous_paging_session=None)
response_future.result.side_effect = (ResultSet(Mock(), expected[-5:]), ) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
response_future.result.side_effect = (ResultSet(Mock(), expected[
-5:]),) # ResultSet is iterable, so it must be protected in order to be returned whole by the Mock
rs = ResultSet(response_future, expected[:5])
type(response_future).has_more_pages = PropertyMock(side_effect=(True, True, True, False))
# eq before iteration causes list to be materialized
Expand Down Expand Up @@ -224,3 +233,14 @@ def test_indexing_deprecation(self, mocked_warn):
self.assertIn('indexing support will be removed in 4.0',
str(index_warning_args[0]))
self.assertIs(index_warning_args[1], DeprecationWarning)

def test_iter_paged_with_more_empty_pages(self):
"""A test that check the result, with a large number of empty pages"""
expected = [[] for i in range(0, 2000)]
response_future = Mock(has_more_pages=True, _continuous_paging_session=None)
response_future.result.side_effect = [
ResultSet(Mock(), []) for i in range(0, 2000)
]
rs = ResultSet(response_future, [])
itr = iter(rs)
self.assertListEqual(list(itr), expected)