refactor(tests): Refactor pytest (#449)

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2022-10-31 17:39:20 +03:00 committed by GitHub
parent fabad45d42
commit 072cb2e8d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 229 additions and 156 deletions

View File

@ -4,7 +4,7 @@
## Pytest
The tests assume you have the "dragonfly" binary in `<root>/build-dbg` directory.
You can override the location of the binary using `DRAGONFLY_HOME` environment var.
You can override the location of the binary using `DRAGONFLY_PATH` environment var.
### Before you start
Please make sure that you have python 3 installed on you local host.
If have more both python 2 and python 3 installed on you host, you can run the tests with the following command:
@ -39,9 +39,8 @@ Pytest allows for parameters with a specific name to be automatically resolved t
| ----- | ---- | ----- | ----------- |
| tmp_dir | [pathlib.Path](https://docs.python.org/3/library/pathlib.html) | Session | The temporary directory the Dragonfly binary will be running in. The environment variable `DRAGONFLY_TMP` is also set to this value |
| test_env | `dict` | Session | The environment variables used when running Dragonfly as a dictionary |
| client | [redis.Redis](https://redis-py.readthedocs.io/en/stable/connections.html#generic-client) | Class | The redis client to interact with the Dragonfly instance |
To avoid the overhead of spawning a Dragonfly process for every test the `client` provided fixture has a `Class` scope which means that all test functions in the same class will interact with the same Dragonfly instance.
| client | [redis.Redis](https://redis-py.readthedocs.io/en/stable/connections.html#generic-client) | Class | The redis client to interact with the Dragonfly instance |
| async_client | [aioredis.Redis](https://aioredis.readthedocs.io/en/latest/api/high-level/#aioredis.client.Redis) | Class | The async redis client to interact with the Dragonfly instance |
### Passing CLI commands to Dragonfly
To pass custom flags to the Dragonfly executable two class decorators have been created. `@dfly_args` allows you to pass a list of parameters to the Dragonfly executable, similarly `@dfly_multi_test_args` allows you to specify multiple parameter configurations to test with a given test class.
@ -51,10 +50,9 @@ In the case of `@dfly_multi_test_args` each parameter configuration will create
Parameters can use environmental variables with a formatted string where `"{<VAR>}"` will be replaced with the value of the `<VAR>` environment variable. Due to [current pytest limtations](https://github.com/pytest-dev/pytest/issues/349) fixtures cannot be passed to either of these decorators, this is currently the provided way to pass the temporary directory path in a CLI parameter.
### Test Examples
- **[blpop_test](./dragonfly/blpop_test.py)**: Simple test case interacting with Dragonfly
- **[snapshot_test](./dragonfly/snapshot_test.py)**: Example test using `@dfly_args`, environment variables and pre-test setup
- **[key_limit_test](./dragonfly/key_limit_test.py)**: Example test using `@dfly_multi_test_args`
- **[connection_test](./dragonfly/connection_test.py)**: Example for testing running with asynchronous multiple connections.
- **[generic_test](./dragonfly/generic_test.py)**: Example test using `@dfly_multi_test_args`
- **[connection_test](./dragonfly/connection_test.py)**: Example for testing running with multiple asynchronous connections.
### Writing your own fixtures
The fixture functions located in [conftest.py](./dragonfly/conftest.py).
@ -70,6 +68,8 @@ pip3 freeze > requirements.txt
from [dragonfly](./dragonfly/) directory.
# Integration tests
Integration tests are located in the `integration` folder.
To simplify running integration test each package should have its own Dockerfile. The Dockerfile should contain everything needed in order to test the package against Dragonfly. Docker can assume Dragonfly is running on localhost:6379.
To run the test:
```

View File

@ -1,11 +1,82 @@
import pytest
import time
import subprocess
class DflyInstance:
"""
Represents a runnable and stoppable Dragonfly instance
with fixed arguments.
"""
def __init__(self, path, args, cwd):
self.path = path
self.args = args
self.cwd = cwd
def start(self):
arglist = DflyInstance.format_args(self.args)
print(f"Starting instance on {self.port} with arguments {arglist}")
self.proc = subprocess.Popen([self.path, *arglist], cwd=self.cwd)
# Give Dragonfly time to start and detect possible failure causes
time.sleep(0.3)
return_code = self.proc.poll()
if return_code is not None:
raise Exception(
f"Failed to start instance, return code {return_code}")
def stop(self):
print(f"Stopping instance on {self.port}")
try:
self.proc.terminate()
outs, errs = self.proc.communicate(timeout=15)
except subprocess.TimeoutExpired:
print("Unable to terminate DragonflyDB gracefully, it was killed")
outs, errs = self.proc.communicate()
print(outs, errs)
def __getitem__(self, k):
return self.args.get(k)
@property
def port(self) -> int:
return int(self.args.get("port", "6379"))
@staticmethod
def format_args(args):
out = []
for (k, v) in args.items():
out.extend((str(s) for s in ("--"+k, v) if s != ""))
return out
class DflyInstanceFactory:
"""
A factory for creating dragonfly instances with pre-supplied arguments.
"""
def __init__(self, env, cwd, path, args):
self.env = env
self.cwd = cwd
self.path = path
self.args = args
def create(self, **kwargs) -> DflyInstance:
args = {**self.args, **kwargs}
for k, v in args.items():
args[k] = v.format(**self.env) if isinstance(v, str) else v
return DflyInstance(self.path, args, self.cwd)
def dfly_args(*args):
""" used to define a singular set of arguments for dragonfly test """
return pytest.mark.parametrize("df_server", [args], indirect=True)
""" Used to define a singular set of arguments for dragonfly test """
return pytest.mark.parametrize("df_factory", args, indirect=True)
def dfly_multi_test_args(*args):
""" used to define multiple sets of arguments to test multiple dragonfly configurations """
return pytest.mark.parametrize("df_server", args, indirect=True)
""" Used to define multiple sets of arguments to test multiple dragonfly configurations """
return pytest.mark.parametrize("df_factory", args, indirect=True)

View File

@ -2,25 +2,20 @@
Pytest fixtures to be provided for all tests without import
"""
import os
import sys
import pytest
import pytest_asyncio
import redis
import aioredis
from pathlib import Path
from tempfile import TemporaryDirectory
import os
import subprocess
import time
import pytest
import redis
import aioredis
import asyncio
from . import DflyInstance, DflyInstanceFactory
DATABASE_INDEX = 1
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DRAGONFLY_PATH = os.environ.get("DRAGONFLY_HOME", os.path.join(
SCRIPT_DIR, '../../build-dbg/dragonfly'))
@pytest.fixture(scope="session")
def tmp_dir():
@ -45,62 +40,75 @@ def test_env(tmp_dir: Path):
return env
@pytest.fixture(scope="class", params=[[]])
def df_server(request, tmp_dir: Path, test_env):
""" Starts a single DragonflyDB process, runs once per test class. """
print(f"Starting DragonflyDB [{DRAGONFLY_PATH}]")
arguments = [arg.format(**test_env) for arg in request.param]
dfly_proc = subprocess.Popen([DRAGONFLY_PATH, *arguments],
env=test_env, cwd=str(tmp_dir))
time.sleep(0.3)
return_code = dfly_proc.poll()
if return_code is not None:
dfly_proc.terminate()
pytest.exit(f"Failed to start DragonflyDB [{DRAGONFLY_PATH}]")
@pytest.fixture(scope="session", params=[{}])
def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
"""
Create an instance factory with supplied params.
"""
scripts_dir = os.path.dirname(os.path.abspath(__file__))
path = os.environ.get("DRAGONFLY_PATH", os.path.join(
scripts_dir, '../../build-dbg/dragonfly'))
yield
args = request.param if request.param else {}
return DflyInstanceFactory(test_env, tmp_dir, path=path, args=args)
print(f"Terminating DragonflyDB process [{dfly_proc.pid}]")
@pytest.fixture(scope="session")
def df_server(df_factory: DflyInstanceFactory) -> DflyInstance:
"""
Start the default Dragonfly server that will be used for the default pools
and clients.
"""
instance = df_factory.create()
instance.start()
yield instance
clients_left = None
try:
dfly_proc.terminate()
outs, errs = dfly_proc.communicate(timeout=15)
except subprocess.TimeoutExpired:
print("Unable to terminate DragonflyDB gracefully, it was killed")
outs, errs = dfly_proc.communicate()
print(outs)
print(errs)
client = redis.Redis(port=instance.port)
clients_left = client.execute_command("INFO")['connected_clients']
except Exception as e:
print(e, file=sys.stderr)
@pytest.fixture(scope="function")
def connection(df_server):
return redis.Connection()
instance.stop()
assert clients_left == 1
@pytest.fixture(scope="class")
def raw_client(df_server):
""" Creates the Redis client to interact with the Dragonfly instance """
pool = redis.ConnectionPool(decode_responses=True)
client = redis.Redis(connection_pool=pool)
def connection(df_server: DflyInstance):
return redis.Connection(port=df_server.port)
@pytest.fixture(scope="class")
def sync_pool(df_server: DflyInstance):
pool = redis.ConnectionPool(decode_responses=True, port=df_server.port)
yield pool
pool.disconnect()
@pytest.fixture(scope="class")
def client(sync_pool):
"""
Return a client to the default instance with all entries flushed.
"""
client = redis.Redis(connection_pool=sync_pool)
client.flushall()
return client
@pytest.fixture
def client(raw_client):
""" Flushes all the records, runs before each test. """
raw_client.flushall()
return raw_client
@pytest.fixture(scope="function")
def async_pool(df_server):
pool = aioredis.ConnectionPool(host="localhost", port=6379,
@pytest_asyncio.fixture(scope="function")
async def async_pool(df_server: DflyInstance):
pool = aioredis.ConnectionPool(host="localhost", port=df_server.port,
db=DATABASE_INDEX, decode_responses=True, max_connections=16)
return pool
yield pool
await pool.disconnect()
@pytest.fixture(scope="function")
def event_loop():
policy = asyncio.get_event_loop_policy()
loop = policy.new_event_loop()
yield loop
loop.close()
@pytest_asyncio.fixture(scope="function")
async def async_client(async_pool):
"""
Return an async client to the default instance with all entries flushed.
"""
client = aioredis.Redis(connection_pool=async_pool)
await client.flushall()
return client

View File

@ -1,7 +1,24 @@
import pytest
import asyncio
import aioredis
import async_timeout
#from conftest import DATABASE_INDEX
'''
Test the monitor command.
Open connection which is used for monitoring
Then send on other connection commands to dragonfly instance
Make sure that we are getting the commands in the monitor context
'''
@pytest.mark.asyncio
async def test_monitor_command(async_pool):
def generate(max):
for i in range(max):
yield f"key{i}", f"value={i}"
messages = {a: b for a, b in generate(5)}
assert await run_monitor(messages, async_pool)
def verify_response(monitor_response: dict, key: str, value: str) -> bool:
@ -54,48 +71,6 @@ async def run_monitor(messages: dict, pool: aioredis.ConnectionPool):
return False, f"monitor result: {status}: {message}, set command success {success}"
async def run_monitor_command(connection, messages):
res = await run_monitor(messages, connection)
print(f"finish test monitoring returning: {res}")
return res
'''
Test the monitor command.
Open connection which is used for monitoring
Then send on other connection commands to dragonfly instance
Make sure that we are getting the commands in the monitor context
'''
def test_monitor_command(async_pool, event_loop):
def generate(max):
for i in range(max):
yield f"key{i}", f"value={i}"
messages = {a: b for a, b in generate(5)}
success, message = event_loop.run_until_complete(
run_monitor_command(messages=messages, connection=async_pool))
assert success == True, message
async def run_pipeline_mode(pool, messages):
conn = aioredis.Redis(connection_pool=pool)
pipe = conn.pipeline()
for key, val in messages.items():
pipe.set(key, val)
result = await pipe.execute()
print(f"got result from the pipeline of {result} with len = {len(result)}")
if len(result) != len(messages):
return False, f"number of results from pipe {len(result)} != expected {len(messages)}"
elif False in result:
return False, "expecting to successfully get all result good, but some failed"
else:
return True, "all command processed successfully"
'''
Run test in pipeline mode.
This is mostly how this is done with python - its more like a transaction that
@ -103,20 +78,17 @@ the connections is running all commands in its context
'''
def test_pipeline_support(async_pool, event_loop):
@pytest.mark.asyncio
async def test_pipeline_support(async_client):
def generate(max):
for i in range(max):
yield f"key{i}", f"value={i}"
messages = {a: b for a, b in generate(5)}
success, message = event_loop.run_until_complete(
run_pipeline_mode(async_pool, messages))
assert success, message
assert await run_pipeline_mode(async_client, messages)
async def reader(channel: aioredis.client.PubSub, messages):
success = True
message_count = len(messages)
while message_count > 0:
try:
@ -132,28 +104,22 @@ async def reader(channel: aioredis.client.PubSub, messages):
return True, "success"
async def run_pubsub(pool, messages, channel_name):
conn = aioredis.Redis(connection_pool=pool)
pubsub = conn.pubsub()
await pubsub.subscribe(channel_name)
async def run_pipeline_mode(async_client, messages):
pipe = async_client.pipeline()
for key, val in messages.items():
pipe.set(key, val)
result = await pipe.execute()
future = asyncio.create_task(reader(pubsub, messages))
success = True
for message in messages:
res = await conn.publish(channel_name, message)
if not res:
success = False
break
await future
status, message = future.result()
if status and success:
return True, "successfully completed all"
print(f"got result from the pipeline of {result} with len = {len(result)}")
if len(result) != len(messages):
return False, f"number of results from pipe {len(result)} != expected {len(messages)}"
elif False in result:
return False, "expecting to successfully get all result good, but some failed"
else:
return False, f"subscriber result: {status}: {message}, publisher publish: success {success}"
return True, "all command processed successfully"
'''
'''
Test the pipeline command
Open connection to the subscriber and publish on the other end messages
Make sure that we are able to send all of them and that we are getting the
@ -161,12 +127,34 @@ expected results on the subscriber side
'''
def test_pubsub_command(async_pool, event_loop):
@pytest.mark.asyncio
async def test_pubsub_command(async_client):
def generate(max):
for i in range(max):
yield f"message number {i}"
messages = [a for a in generate(5)]
success, message = event_loop.run_until_complete(
run_pubsub(async_pool, messages, "channel-1"))
assert success, message
assert await run_pubsub(async_client, messages, "channel-1")
async def run_pubsub(async_client, messages, channel_name):
pubsub = async_client.pubsub()
await pubsub.subscribe(channel_name)
future = asyncio.create_task(reader(pubsub, messages))
success = True
for message in messages:
res = await async_client.publish(channel_name, message)
if not res:
success = False
break
await future
status, message = future.result()
await pubsub.close()
if status and success:
return True, "successfully completed all"
else:
return False, f"subscriber result: {status}: {message}, publisher publish: success {success}"

View File

@ -0,0 +1,12 @@
from dragonfly import dfly_multi_test_args
@dfly_multi_test_args({'keys_output_limit': 512}, {'keys_output_limit': 1024})
class TestKeys:
def test_max_keys(self, client, df_server):
max_keys = df_server['keys_output_limit']
for x in range(max_keys*3):
client.set(str(x), str(x))
keys = client.keys()
assert len(keys) in range(max_keys, max_keys+512)

View File

@ -1,10 +0,0 @@
from dragonfly import dfly_multi_test_args
@dfly_multi_test_args(["--keys_output_limit", "512"], ["--keys_output_limit", "1024"])
class TestKeys:
def test_max_keys(self, client):
for x in range(8192):
client.set(str(x), str(x))
keys = client.keys()
assert len(keys) in [513, 1025]

View File

@ -11,4 +11,5 @@ redis==4.3.4
tomli==2.0.1
wrapt==1.14.1
aioredis==2.0.1
pytest-asyncio==0.20.1

View File

@ -1,6 +1,7 @@
import pytest
import redis
def test_quit(connection):
connection.send_command("QUIT")
assert connection.read_response() == b'OK'
@ -8,6 +9,7 @@ def test_quit(connection):
with pytest.raises(redis.exceptions.ConnectionError) as e:
connection.read_response()
def test_quit_after_sub(connection):
connection = redis.Connection()
connection.send_command("SUBSCRIBE", "foo")

View File

@ -9,7 +9,7 @@ import glob
from pathlib import Path
from dragonfly import dfly_args
BASIC_ARGS = ["--alsologtostderr", "--dir", "{DRAGONFLY_TMP}/"]
BASIC_ARGS = {"dir": "{DRAGONFLY_TMP}/"}
class SnapshotTestBase:
@ -37,7 +37,7 @@ class SnapshotTestBase:
return next(f for f in sorted(files) if is_main(f))
@dfly_args(*BASIC_ARGS, "--dbfilename", "test")
@dfly_args({**BASIC_ARGS, "dbfilename": "test"})
class TestRdbSnapshot(SnapshotTestBase):
"""Test single file rdb snapshot"""
@pytest.fixture(autouse=True)
@ -54,7 +54,8 @@ class TestRdbSnapshot(SnapshotTestBase):
super().check(client)
@dfly_args(*BASIC_ARGS, "--dbfilename", "test")
@dfly_args({**BASIC_ARGS, "dbfilename": "test"})
class TestDflySnapshot(SnapshotTestBase):
"""Test multi file snapshot"""
@pytest.fixture(autouse=True)
@ -75,7 +76,7 @@ class TestDflySnapshot(SnapshotTestBase):
super().check(client)
@dfly_args(*BASIC_ARGS, "--dbfilename", "test.rdb", "--save_schedule", "*:*")
@dfly_args({**BASIC_ARGS, "dbfilename": "test.rdb", "save_schedule": "*:*"})
class TestPeriodicSnapshot(SnapshotTestBase):
"""Test periodic snapshotting"""
@pytest.fixture(autouse=True)