Asynchronous Packet Testing
Python library for writing simple and human readable tests for future expected packets.
Overview
Async Packet Test is a Python library based on scapy
that can test the contents of yet to be received packets, as well as providing pytest
integration for doing so.
The concept came from writing packet processing pipelines in the P4 programming language, where I wanted to write tests for end to end changes to and the receipt of packets, treating the pipeline itself as a closed and mysterious box. That and the Packet Test Framework I found to be highly coupled and far too detailed/verbose for writing simple tests.
The concept is that, we have an expectation that at some point in the future, we may expect to receive (or explicitly not receive!) a packet on some network interface that matches some condition or property.
from async_packet_test.context import make_pytest_context
from async_packet_test.predicates import saw_dst_mac
context = make_pytest_context()
def test_saw_mac_broadcast(context):
result = context.expect("eth0", saw_dst_mac("ff:ff:ff:ff:ff:ff"))
result.assert_true()
Installation
This is not currently on PyPI, and has to be installed directly from the repository.
git clone https://github.com/CommitThis/async-packet-test
cd async-packet-test && pip install .
Permissions
If you wish to run the tests without using sudo (and the inevitable environment preservation things you might have to do), you can do so, but you should consider the security implications beforehand.
In order to open network sockets, a non-admin user can only do so if the executable that does so has the appropriate permissions (called capabilities).
In this case the capability is CAP_NET_RAW
It would likely be considered unwise to apply such capabilities directly to the system installed Python or pytest. An alternative approach would be to install it into a virtual Python environment, which has taken copies of the Python executables:
python -m venv --copies .venv
sudo setcap CAP_NET_RAW+eip .venv/bin/python
sudo setcap CAP_NET_RAW+eip .venv/bin/python3
sudo setcap CAP_NET_RAW+eip .venv/bin/pytest
setcap
… sets capabilities.+eip
defines the “capability sets”;CAP_NET_RAW
is added to the executables effective (e
) and permitted sets (p
), and that any further process forks and threads will inherit (i
) the capability set of the original executable. That’s my understanding of it anyway.
Usage
There are three main components of the library you will work with:
- The
TestContext
. This is an object that is responsible for starting “expectations” on a specific network interface through theexpect
method, as well as manage their lifetime; - Test predicates, which are passed to an
expect
call and are the things that perform the tests on packets. - Finally, the test result.
Test Context
The TestContext
is used to start and manage running tests/expectations. It takes no constructor arguments, and it’s interface is simple:
class TestContext:
def expect(self
iface: str, # Network interface to bind to
predicate: Predicate, # ... uh.. the test predicate
timeout: float, # Number of seconds before test is cancelled
count: int): # Number of packets before test is cancelled
pass
def stop(self):
pass
That’s all there is to it.
There is a
monitor
method, but it is not well tested, it is functionality that can be achieved using an ordinaryscapy.AsyncSniffer
and may be removed in the future. It is therefore not recommended to be used.
Test Predicates
Predicates have been named to read naturally in English (even if it isn’t my strong suit) and read well in the context of the test. Even though they are technically classes, in spite of PEP
, they are written in the underscore style. This is because they are used in a context where they could be perceived as functions, and that lower case anything is easier to read and above all, in my opinion (outside testing an outcome), unit tests should be comprehensible. [[ citation needed ]]
Because they are testing something that may happen in the future, they cannot be evaluated immediately, and therefore their use may not be as obvious as other testing frameworks.
As an example, we might want to test if a specific port saw a packet with a particular MAC address.
def test_saw_mac_address(context):
dst_mac = 'ab:ab:ab:ab:ab:ab'
iface = 'veth0'
future = context.expect(iface, saw_dst_mac(dst_mac))
assert(future.result() == True)
It may be obvious, but what this is saying that we want to test that at some point in the future that veth0
will see the supplied MAC adress. The future
from the expect
call represents the outcome of the test. Consequently, when retrieving the result, this call will block until the test completes.
Writing Tests
from async_packet_test.context import TestContext
from scapy.all import Ether, ICMP, IP, sendp
context = TestContext()
pkt = Ether()/IP(src='10.0.0.1', dst='10.0.0.2')/ICMP()
test = context.expect('eth0', saw_packet_equals_sent(pkt))
# Send packet
sendp(pkt, iface='eth0')
assert(test.result())
Predicates can be passed to an expect call as either a class or constructed object. This is mainly for terseness; if a predicate doesn’t accept any constructor arguments, you can pass it’s class and an object of that type will be constructed for you:
context.expect('eth0', received_packet)
There are a couple of different ways assertions can be defined:
assert(test.result() == True)
assert(test.result() == 42)
test.assert_true()
test.assert_false()
test.assert_value(42)
If the result value is explicitly a boolean, then an assertion can be triggered ordinarily:
assert result
However, if the result value is not a boolean, NotNakedAssertable
will be raised.
For further examples, look at the unit tests.
Pytest Integration
Pytest integration is achieved by returning a test fixture that wraps a test context.
from async_packet_test.context import make_pytest_context
from async_packet_test.predicates import saw_dst_mac
context = make_pytest_context()
def test_saw_mac_broadcast(context):
result = context.expect("eth0", saw_dst_mac("ff:ff:ff:ff:ff:ff"))
result.assert_true()
Assuming that permissions have been setup correctly, and the library has been installed you should then be able to run
pytest
In you project.
Built-in Predicates
received_packet
timed_out
saw_src_mac
did_not_see_src_mac
saw_dst_mac
did_not_see_dst_mac
saw_vlan_tag
did_not_see_vlan_tag
did_not_see_vlan
saw_packet_equaling
did_not_see_packet_equaling
packet_count_was
packet_count_was_less_than
packet_count
received_count_of
Writing Custom Predicates
This is relatively easy. The predicates are objects that test incoming packets, and are able to carry state from one evaluation from the next. The base test is as follows:
class async_packet_test:
def on_packet(self, pkt):
pass
def stop_condition(self, pkt) -> bool:
pass
def on_finish(self, timed_out) -> bool:
pass
on_packet
receives avery packet. It is there for updating state.-
stop_condition
is the to notify the test manager that the test has completed on_finish
reports the result back to the manager, and ultimately the future given back to the user.
Carrying on with our previous example saw_dst_mac
, the predicate is constructed as follows:
class saw_src_mac(async_packet_test):
def __init__(self, mac):
self._mac = mac
def stop_condition(self, pkt):
return pkt.haslayer(Ether) and pkt[Ether].src == self._mac
def on_finish(self, timed_out):
return not timed_out
This is straightforward as no state is needed between packets; we only need to test each individual packet for the supplied MAC address. As soon as that MAC is seen, the test will terminate and on_finish
will be called. Ultimately, the only thing that needs to be returned is whether the test timed out, if it didn’t time out, the stop condition will never have returned True
.
on_finish
could have been written by default to test whether or not it timed out however I wasn’t sure whether that was reasonable behaviour or not.
Another simple example is testing the count of packets received.
This may be difficult to guarantee as ports may receive packets for things like SSDP, multicast DNS, or any number of packets that may be sent to a port by the OS as a part of it’s normal operation
class packet_count_was(async_packet_test):
def __init__(self, count):
self._expected_count = count
self._received_count = 0
def on_packet(self, pkt):
self._received_count += 1
def on_finish(self, timed_out):
return self._received_count == self._expected_count
As each packet is received, a counter will be incremented. At the end of the time out period, the count will be compared with the expected result.
The stop condition cannot be used for this purpose as it is called before the more general
on_packet
function.
It is important to note differences in between behaviour. This test expects a specific number of packets, if the total count is off, it will return False. However, it could equally be written so that it terminates as soon as the number of packets are counted, that is to say, we care about the minimum number of packets received, and not the total. This could be written as follows:
class min_packet_count_was(async_packet_test):
def __init__(self, count):
self._expected_count = count
self._received_count = 0
def stop_condition(self, pkt):
self._received_count += 1
if self._received_count == self._expected_count:
return True
return False