This article runs through a small Python library I have written for Asynchronous Packet Testing.
Note: This is still a bit of a work-in-progress. While writing this, I realised that there is quite an obvious data race a user could write themselves into (this is further explained in a note later). Generally, the concept stands, but I might need to look at re-writing how the test conditions are handled.
I’ve been playing around with P4, which is a C-like programming language designed for defining “forwarding pipelines” for network processing devices. The two targets for such I now have experience in are the P4 Behavioural Model, and Intel’s “Tofino” programmable ASIC.
Since these pipelines can modify and redirect packets, it is useful to be able to run tests against packets that are sent and received from them.
There is a packet test framework available from
the P4 Github repositories, however I found that it was verbose to use, and
there were no simple or straightforward examples. I like to play, and thought I
could have a go at writing something that is expressive, integrated with pytest
and requires no dependencies other than that and scapy
.
Now, I’m not a technical writer, so what follows is a bit of a brain dump of me trying to explain what’s going on. I’m not really sure what level to pitch this at, but here goes anyway…
Usage
The concept is that, we have an expectation that at some point in the future, we
may receive a packet on some network interface that matches some condition or
property (called a predicate). An example of a simple pytest
test you can
write follows:
from async_packet_test.context import make_pytest_context
from async_packet_test.predicates import saw_dst_mac
context = make_pytest_context()
def test_saw_mac_broadcast(context):
result = context.expect("eth0", saw_dst_mac("ff:ff:ff:ff:ff:ff"))
result.assert_true()
That is to say that we expect that eth0
should receive a packet containing a
MAC broadcast address. If it doesn’t, an AssertionError
is thrown and the
test will fail.
The test context starts and manages all of the underlying sniffer sessions.
The expect
function accepts an interface name, the predicate, a timeout and
a limit on the number of packets to observe.
The timeout and count options are optional. The default arguments are to timeout after 5 seconds, and to observe an infinite number of packets.
Finally, the predicate is the condition or property which is asserted against a given packet. The term predicate is just a fancy word for some property or condition we can assert against a given packet. Some examples of these included with the library are:
saw_src_mac
, andsaw_dst_mac
saw_vlan_tag
packet_count_was
The predicate can be supplied by name or by instance. If the type’s constructor takes no arguments, it can be instantiated for you.
Note: This is not checked.
Under the hood, when expect is called, an AsyncSniffer
is started with a bunch
of closures wrapping the functions supplied by the predicate. This returns a
“future” which can be used in the following ways:
-
You only deal in absolutes, there is no room in the world for ambiguity, you want some degree of certainty in this uncertain world:
future = context.expect("eth0", saw_dst_mac("ff:ff:ff:ff:ff:ff")) future.assert_true() future.assert_false()
-
The world is in shades of grey, things change. You can’t possibly predict the the future, but you can adapt to it:
# Soliloquising aside, perhaps things between the start of the test # and the end change, for example, you were only able to send a certain # number of packets, a number which you couldn't know at the start. future = context.expect("eth0", packet_count) future.assert_value(42)
-
You don’t care about the “right way”, you like bending the world to your will, or perhaps you just want to see the world burn.
future = context.expect("eth0", im_only_using_this_to_get_what_i_want) result = future.result()
-
Or maybe, just maybe, you don’t believe in magic.
future = context.expect("eth0", saw_vlan_tag(102)) assert(future.result() == True)
Under the hood, this is accomplished by wrapping all of the predicate’s
functions in closures, and submits a function (which joins on the sniffer) to a
ThreadExecutorPool
. Therefore, any of the result grabbing or assertion
functions on our future will be blocking calls.
The test context can also be used without pytest
:
from async_packet_test.context import TestContext
from async_packet_test.predicates import saw_dst_mac
context = TestContext()
future = context.expect("eth0", saw_dst_mac("ff:ff:ff:ff:ff:ff"))
future.assert_true()
Note: The test context also has a
monitor
function for just … monitoring packets. I’m not sure it works at the moment.
Permissions
Because we are sniffing network interfaces, there are the usual caveats regarding having the correct permissions to do so; you will need to run pytest or the examples with the ability to capture packets. If using a Python virtual environment, the quick and dirty way would be to run it as root:
sudo .venv/bin/python -m pytest ./examples
However, it would probably be more appropriate to create the virtual environment
with copies of the Python binaries and use setcap
to allow the sniffing of
packets:
python -m venv --copies .venv
sudo setcap CAP_NET_RAW+eip .venv/bin/python
sudo setcap CAP_NET_RAW+eip .venv/bin/python3
sudo setcap CAP_NET_RAW+eip .venv/bin/pytest
There may be better ways, but this is all I know of.
Writing Predicates
Despite being classes (and despite PEP
), definitions of these are written in
the underscore style as they are used in a context where they could be perceived
as being functions, makes it more imperative I guess.
The base class is simple:
class Predicate:
def on_packet(self, pkt):
pass
def stop_condition(self, pkt) -> bool:
pass
def on_finish(self, timed_out) -> bool:
pass
These member functions are called by scapy
’s AsyncSniffer
.
This section looks like a failing language exam; so many highlights. Apologies in advance for being hard to read.
-
stop_condition
is called for every single packet, ifTrue
is returned it signals that the test can be finished (success or otherwise), andFalse
indicates it still hasn’t found what it’s looking for … guitar delay intensifies 🤘 1
This is supplied to anAsyncSniffer
through thestop_filter
constructor argument. -
on_packet
is called for almost every single packet, and is for updating any state that might be used. This is supplied to anAsyncSniffer
through theprn
constructor argument. The reason it is almost every packet is because anAsyncSniffer
’sstop_filter
is called before this; ifstop_condition
returnsTrue
this function will not be called, if I recall correctly. This is really a convenience function, anything you can do here you could do instop_condition
, however it does clearly delineate what is for handling state. -
on_finish
is called when the test is ending and packet capture has stopped, returningTrue
indicates that the test was successful, andFalse
otherwise. The tests run with a timeout, and the timeout argument supplied indicates to the test writer that this has expired, to do with what you will.
Note: If you are writing a predicate which acquires state over time, but you definitely need to write a stop condition, you have to be careful: since
stop_condition
is called beforeon_packet
, if it returnsTrue
,on_packet
will not be run, and any state dependent on having that will be stale. I need to look at this with more wrappers, probably.
Here is the implementation of the saw_dst_mac
predicate I mentioned earlier:
class saw_dst_mac(Predicate):
def __init__(self, mac):
self._mac = mac
def stop_condition(self, pkt):
return pkt.haslayer(Ether) and pkt[Ether].dst == self._mac
def on_finish(self, timed_out):
return not timed_out
This predicate stores a desired MAC address, comparing it against every packet
it receives. The first time it sees the MAC address, it will signal that the
test should end. Because we have signalled the end of the test, it should not
have timed out, so we merely need to return it’s boolean counterpart to signal
the test’s success. This holds true in the contrary case – if the test timed
out, we would want to return False
, indicating that the test had failed.
And that’s it. I’ll probably update this article as things change, but it’s bye for now.
Footnotes
-
It’s a reference to the song “I Still Haven’t Found What I’m Looking For”, by U2. I’m not sure I even like them ↩