Asynchronous Packet Testing

Meeting packet expectations

Photo by Jordan Harrison on Unsplash

This article runs through a small Python library I have written for Asynchronous Packet Testing.

Note: This is still a bit of a work-in-progress. While writing this, I realised that there is quite an obvious data race a user could write themselves into (this is further explained in a note later). Generally, the concept stands, but I might need to look at re-writing how the test conditions are handled.

I’ve been playing around with P4, which is a C-like programming language designed for defining “forwarding pipelines” for network processing devices. The two targets for such I now have experience in are the P4 Behavioural Model, and Intel’s “Tofino” programmable ASIC.

Since these pipelines can modify and redirect packets, it is useful to be able to run tests against packets that are sent and received from them.

There is a packet test framework available from the P4 Github repositories, however I found that it was verbose to use, and there were no simple or straightforward examples. I like to play, and thought I could have a go at writing something that is expressive, integrated with pytest and requires no dependencies other than that and scapy.

Now, I’m not a technical writer, so what follows is a bit of a brain dump of me trying to explain what’s going on. I’m not really sure what level to pitch this at, but here goes anyway…

Usage

The concept is that, we have an expectation that at some point in the future, we may receive a packet on some network interface that matches some condition or property (called a predicate). An example of a simple pytest test you can write follows:

from async_packet_test.context import make_pytest_context
from async_packet_test.predicates import saw_dst_mac

context = make_pytest_context()

def test_saw_mac_broadcast(context):
    result = context.expect("eth0", saw_dst_mac("ff:ff:ff:ff:ff:ff"))
    result.assert_true()

That is to say that we expect that eth0 should receive a packet containing a MAC broadcast address. If it doesn’t, an AssertionError is thrown and the test will fail.

The test context starts and manages all of the underlying sniffer sessions. The expect function accepts an interface name, the predicate, a timeout and a limit on the number of packets to observe.

The timeout and count options are optional. The default arguments are to timeout after 5 seconds, and to observe an infinite number of packets.

Finally, the predicate is the condition or property which is asserted against a given packet. The term predicate is just a fancy word for some property or condition we can assert against a given packet. Some examples of these included with the library are:

The predicate can be supplied by name or by instance. If the type’s constructor takes no arguments, it can be instantiated for you.

Note: This is not checked.

Under the hood, when expect is called, an AsyncSniffer is started with a bunch of closures wrapping the functions supplied by the predicate. This returns a “future” which can be used in the following ways:

Under the hood, this is accomplished by wrapping all of the predicate’s functions in closures, and submits a function (which joins on the sniffer) to a ThreadExecutorPool. Therefore, any of the result grabbing or assertion functions on our future will be blocking calls.

The test context can also be used without pytest:

from async_packet_test.context import TestContext
from async_packet_test.predicates import saw_dst_mac

context = TestContext()

future = context.expect("eth0", saw_dst_mac("ff:ff:ff:ff:ff:ff"))
future.assert_true()

Note: The test context also has a monitor function for just … monitoring packets. I’m not sure it works at the moment.

Permissions

Because we are sniffing network interfaces, there are the usual caveats regarding having the correct permissions to do so; you will need to run pytest or the examples with the ability to capture packets. If using a Python virtual environment, the quick and dirty way would be to run it as root:

sudo .venv/bin/python -m pytest ./examples 

However, it would probably be more appropriate to create the virtual environment with copies of the Python binaries and use setcap to allow the sniffing of packets:

python -m venv --copies .venv 
sudo setcap CAP_NET_RAW+eip .venv/bin/python
sudo setcap CAP_NET_RAW+eip .venv/bin/python3
sudo setcap CAP_NET_RAW+eip .venv/bin/pytest

There may be better ways, but this is all I know of.

Writing Predicates

Despite being classes (and despite PEP), definitions of these are written in the underscore style as they are used in a context where they could be perceived as being functions, makes it more imperative I guess.

The base class is simple:

class Predicate:
    def on_packet(self, pkt):
        pass
    def stop_condition(self, pkt) -> bool:
        pass
    def on_finish(self, timed_out) -> bool:
        pass

These member functions are called by scapy’s AsyncSniffer.

This section looks like a failing language exam; so many highlights. Apologies in advance for being hard to read.

Note: If you are writing a predicate which acquires state over time, but you definitely need to write a stop condition, you have to be careful: since stop_condition is called before on_packet, if it returns True, on_packet will not be run, and any state dependent on having that will be stale. I need to look at this with more wrappers, probably.

Here is the implementation of the saw_dst_mac predicate I mentioned earlier:

class saw_dst_mac(Predicate):
    def __init__(self, mac):
        self._mac = mac

    def stop_condition(self, pkt):
        return pkt.haslayer(Ether) and pkt[Ether].dst == self._mac

    def on_finish(self, timed_out):
        return not timed_out

This predicate stores a desired MAC address, comparing it against every packet it receives. The first time it sees the MAC address, it will signal that the test should end. Because we have signalled the end of the test, it should not have timed out, so we merely need to return it’s boolean counterpart to signal the test’s success. This holds true in the contrary case – if the test timed out, we would want to return False, indicating that the test had failed.

And that’s it. I’ll probably update this article as things change, but it’s bye for now.

Footnotes

  1. It’s a reference to the song “I Still Haven’t Found What I’m Looking For”, by U2. I’m not sure I even like them