DBus Testing Framework Tutorial

Goals

The main aim should be that its easy to write a linear test script

Allowing to interact with other automated frameworks like dogtail is also a plus

Tutorial

This little tutorial will introduce you to the concets and API of the dbus testing framework without
entering too much in the gory details

General concepts

The framework is based on two things, the Test case and the proxy objects.

Test Cases

Each test case you want to make is a method of a class. More precisely it’s the run(self): method
of a TestCase subclass. To create a test case just define a new class, override the run method
and pass it to the run(cases) method of the framework, like that:

class MyTestCase(TestCase):
	def run(self):
		[Execute test script here]

run(MyTestCase)

You can also pass a list of test cases to the run(cases) method. That way, you can select
which test case you want to run from the command line. Let’s say you defined FooTest, BarTest and BazTest
in a file names mytests.py.

class FooTest(TestCase):
	[]
class BarTest(TestCase):
	[]
class BazTest(TestCase):
	[]

run([FooTest, BarTest, BazTest])

You can run in the console:

$ python mytests.py FooTest
-> execute the FooTest test case

Proxy objects

The framework comes with 3 types of pre-made TestCase subclasses each filling a specific testing need.

  • ClientTestCase: When you want to test a servie behavior (the test script emulates a client)
  • ServiceTestCase: When you want to test a client behavior (the test script emulates a dbus service)
  • MixedTestCase: When you want to test both a service and a client (possibly the same) behavior (the test script
    emulates a client and/or a dbus service)

So we can combine the two points seen above, and say that test cases must be subclasses of {Client,Service,Mixed}TestCase
and must implement a run(self) method. These classes should then be passed to the run(cases) function.

Let’s see more in detail the API of each possible working mode…

Client-side scripting

Client-side scripting takes you in the pants of a client and allows you to make calls on given dbus objects, and
listen for signals emitted by these objects (and thus assert they happen at the right moment)

First thing to do is to tell the framework where he can find the actual remote dbus object, by giving the
service name, the remote object path and the interfaces implemented by the object. This is all usual Dbus jargon.

First simple example, creates under nickname “nick” the given remote objet proxy.

self["nick"] = ("org.foo.bar", "/org/foo/bar", "org.foo.IBar")

If the remote object implements more than one interface, you must use a second definition format. This one
creates under nickname “nick” the given remote object proxy which implement two interfaces, IBar and IBaz.
Each interface has its own nickname (“iface1” and “iface2”).

self["nick"] = ("org.foo.bar", "/org/foo/bar",
	{"iface1": "org.foo.IBar", "iface2": "org.foo.IBaz"})

Now that we have a nickname to refer to our remote object we can access its methods and signal with the python dict mapping

self["nick"]["Method"].xxx
self["nick"]["Signal"].xxx

# If there is more than one interface:
self["nick"]["iface1"]["Method"].xxx
self["nick"]["iface1"]["Signal"].xxx

On each method or signal proxy object returned by the dictionnary, we can do several actions. If the proxy is a method:

  • call(self, *args, **kwargs) : result

    Call the method and block until the method returns. Returns the method return value. You can use the timeout=secs keyword
    argument to specify how much time the call should wait before failure. If not specified the default is used.

If the proxy is a signal:

  • listen(self) : void

    Start listening to the signal (resets previous signal values)

  • wait(self, n=1, timeout=DEFAULT_TIMEOUT) : result

    Block until the signal has been received n times. Wait at most timeout seconds. The return value is the signal parameters, or a list
    of signal parameters if n > 1, ordered by reception time.

Service-side scripting

Service-side scripting takes you in the pants of a service and allows you to ensure the right calls sequences
are made against the exported objects, and emit signals from exported objects. You will be able to test
that way that clients are behaving correctly to service events.

First thing to do is to tell the framework what service he needs to export, by giving the
service name, the remote object path and the methods/signals exported by the object. This is all usual Dbus jargon.

To acheive this you first need to define a class, subclassing MockService, in which you declare the exported methods and signals,
as you would do when using the dbus bindings in python, plus some more details

class FooBarService(MockService):
	@intercept
	@dbus.service.method("org.foo.bar.IBaz")
	def Foo(self):
		return "Foo"

	@intercept
	@dbus.service.method("org.foo.bar.IBaz")
	def Bar(self, param):
		return param+3

	@dbus.service.signal("org.foo.bar.IBaz")
	def BazSig(self, s):
		pass

As you can see, each method is declared and a default implementation is provided. Also note that the @intercept annotation
has to be used for methods (not for signals or things break) and after the @method annotation (which means before
in source code)

Now we can create the service proxy object with a nickname as for the client-side scripting:

self["nick"] = (FooBarService, "org.foo.bar", "/org/foo/bar")

We pass as first argument the class of the exported object, the service name and object path

Now that we have a nickname to refer to our service we can access its methods and signal with the python dict mapping

self["nick"]["Method"].xxx
self["nick"]["Signal"].xxx

On each method or signal proxy object returned by the dictionnary, we can do several actions. If the proxy is a signal:

  • emit(self, *args, **kwargs) : void

    Emit the signal with the given arguments.

If the proxy is a method:

  • listen_call(self, return_val=None) : void

    Start listening to a method call on this service (resets previous calls values). If the return_val is given an expression,
    the expression will be called with the parameters of the method call and its return value will be used instead of the
    default method implementation as given in the MockService subclass

    # When Method is called by a remote client, the return value will be the tuple (args, kwargs)
    # returned by the lambda expression instead of the default return value given in the MockService.
    self["nick"]["Method"].listen_call(lambda *args, **kwargs: return args, kwargs)
    
  • wait_call(self, listening=False, return_val=None, timeout=DEFAULT_TIMEOUT): result

    Block until the method call happens. If listening is False, then listen_call(return_val) is implicitely called. Else it is assumed
    that you have already called listen_call() yourself before. Wait at most timeout seconds. The return value are the arguments provided
    to the method when it was called.

Mixed type scripting

Mixed-type scripting allows one to use both service-side and client-side features of the testing framework. Use this
when you need clients and service at the same time in a script, that possibly needs to call each other or interact in some ways.

To create the service or client proxies, the syntax is the same as defined above for each type of proxies. One additional syntax
is available to defined in one step both a service and a client connected to it, if that’s useful:

self["nick"] = (FooBarService, "org.foo.bar", "/org/foo/bar", "org.foo.bar.IBaz")
# or for the multi interface client
self["nick"] = (FooBarService, "org.foo.bar", "/org/foo/bar",
		{"iface1": "org.foo.bar.IBaz", "iface2": "org.foo.bar.IBaz2"})

Once you have the nicknamed proxy, you can use it like defined above, depending on the type of the proxy under the nickname.

self["nick"]["Method"].xxx
self["nick"]["Signal"].xxx

Where xxx depends on the type of the proxy. If the proxy is a mixed one, then you can use both client and service api on it

Examples

The following examples will guide you “visually” through the usage of this testing framework.
The snippets have to be enclosed in a run(self) method of a TestCase subclass to be chosen among the
available ones (ClientTestCase, ServiceTestCase, MixedTestCase)

Simple calls/signals

Call this method, expect this reply

self["foo"] = ("org.freedesktop.Foo","/org/freedesktop/Foo","org.freedesktop.Foo")
ret = self["foo"]["Half"].call(32)
assert ret == 16

Call this method, wait for this signal

self["foo"] = ("org.freedesktop.Foo","/org/freedesktop/Foo","org.freedesktop.Foo")
self["foo"]["HalfComputed"].listen()
self["foo"]["Half"].call(32)
ret = self["foo"]["HalfComputed"].wait()
assert ret == 16

Hello World examples

A simple client test for an hypothetic HelloWorld service

self["hello"] = ("org.freedesktop.HelloWorld",
		"/org/freedesktop/HelloWorldObject",
		"org.freedesktop.HelloWorldIFace")

echo = self["hello"]["Hello"].call()
assert echo == "olleH"

self["hello"].HalloSig.listen()
echo = self["hello"]["Hallo"].call("echomethis")
assert echo == "echomethis"
sigs = self["hello"]["HalloSig"].wait()
assert sigs[0] == "echomethis"

self["hello"]["HalloSig"].listen()
sigs = self["hello"]["HalloSig"].wait()
assert sigs[0] == "test"

The same test, but on the service side for testing with an hypothetic HelloWorld Client. First the Service definition:

class HelloService(MockService):
	@intercept
	@dbus.service.method("org.freedesktop.HelloWorldIFace")
	def Hello(self):
		return "olleH"

	@intercept
	@dbus.service.method("org.freedesktop.HelloWorldIFace")
	def HalloOverriden(self, param):
		return param

	@intercept
	@dbus.service.method("org.freedesktop.HelloWorldIFace")
	def Hallo(self, param):
		self.HalloSig(param)
		return param

	@dbus.service.signal("org.freedesktop.HelloWorldIFace")
	def HalloSig(self, s):
		pass

Then the actual script:

self["hello"] = (HelloService,
		"org.freedesktop.HelloWorld",
		"/org/freedesktop/HelloWorldObject")

self["hello"]["Hello"].wait_call()
self["hello"]["Hallo"].wait_call()
self["hello"]["HalloSig"].emit("test")

You can of course launch the two tests in separate processes and see that they behave correctly

And finally more or less the same test with mixed scripting. Both the service and the client are in the script. This one
can run standalone since the service and object are both provided.

self["hello"] = (HelloService,
	"org.freedesktop.HelloWorld",
	"/org/freedesktop/HelloWorldObject",
	"org.freedesktop.HelloWorldIFace")

self["hello"]["Hello"].listen_call()
echo = self["hello"]["Hello"].call()
self["hello"]["Hello"].wait_call(True)
assert echo == "olleH"

self["hello"]["HalloSig"].listen()
echo = self["hello"]["Hallo"].call("echomethis")
assert echo == "echomethis"

sigs = self["hello"]["HalloSig"].wait()
assert sigs[0] == "echomethis"

self["hello"]["HalloSig"].listen()
self["hello"]["HalloSig"].emit("test")
sigs = self["hello"]["HalloSig"].wait()
assert sigs[0] == "test"

self["hello"]["HalloOverriden"].listen_call(return_val=lambda x: x[1:-1])
echo = self["hello"]["HalloOverriden"].call("TestString")
self["hello"]["HalloOverriden"].wait_call(True)
assert echo == "estStrin"

Libnotify example

Open a notification bubble with no timeout, close it programmatically, and check that the Closed signal is emitted

self["notif"] = ("org.freedesktop.Notifications",
		"/org/freedesktop/Notifications",
		"org.freedesktop.Notifications")

id = self["notif"]["Notify"].call("notifname", 0, "", "title", "content",[],{}, 0)

self["notif"]["NotificationClosed"].listen()
self["notif"]["CloseNotification"].call(id)
results = self["notif"]["NotificationClosed"].wait()

assert results[0] == id

Advanced cases

Log on to two jabber accounts, talk from one to other account, check all
parameters are correct and that the messages are well exchanged, Disconnect.

def start_manager(self, nick):
	self[nick] = ("org.freedesktop.Telepathy.ConnectionManager.gabble",
		"/org/freedesktop/Telepathy/ConnectionManager/gabble",
		"org.freedesktop.Telepathy.ConnectionManager")

def connect(self, nick, proto, params):
	# Connect to the manager
	conn, obj = self[nick]["Connect"].call(proto, params)
	# register connection as self.nick_conn
	self[nick+"_conn"] = (conn, obj, "org.freedesktop.Telepathy.Connection")

	# Retrieve connection status, and wait until connected
	status = self[nick+"_conn"]["GetStatus"].call()
	while status != 0:
		if status == 2:
			raise Exception("Could not connect")
		self[nick+"_conn"]["StatusChanged"].listen()
		status, reason = self[nick+"_conn"]["StatusChanged"].wait()

	return conn

def run(self):
	# Test Setup
	self.start_manager("client1")
	self.start_manager("client2")

	# Connect the two accounts
	conn1 = self.connect("client1", "jabber",
		{"account": "kikidonk@amessage.be", "password": "kikidonk"})
	conn2 = self.connect("client2", "google-talk",
		{ "account": "tryggve.tryggvason@gmail.com","password": "badger"})

	# Start listening to NewChannel signal emitted when client2 starts to talk to client1
	self["client1_conn"]["NewChannel"].listen()

	# Create a text channel
	self["client2_chan"] = (
		# The connection service
		conn2,
		# A new text channel to the client 1
		self["client2_conn"]["RequestChannel"].call(
			"org.freedesktop.Telepathy.Channel.Type.Text", 1,
			self["client2_conn"]["RequestHandle"].call (1, "kikidonk@amessage.be"), False),
		# The interfaces for this text channel
		{"text": "org.freedesktop.Telepathy.Channel.Type.Text",
		 "chan": "org.freedesktop.Telepathy.Channel"})

	# Send a test message to client 1 from client 2
	self["client2_chan"]["text"]["Send"].call (0, "Test")

	# We got a newchannel from client 1
	obj, channel_type, handle_type, handle, suppress_handler = self["client1_conn"]["NewChannel"].wait()

	assert channel_type == "org.freedesktop.Telepathy.Channel.Type.Text"
	assert handle_type == 1
	assert suppress_handler == False

	# Create the channel structure for client 1
	self["client1_chan"] = (
		conn1,
		obj,
		{"text": "org.freedesktop.Telepathy.Channel.Type.Text",
		 "chan": "org.freedesktop.Telepathy.Channel"})

	# Retreive pending messages
	messages = self["client1_chan"]["text"]["ListPendingMessages"].call()

	id, stamp, from_handle, msg_type, string = messages[0]
	assert len(messages) == 1
	assert from_handle == handle
	assert msg_type == 0
	assert string == "Test"

	# Close/Disconnect stuff
	self["client2_chan"]["chan"]["Close"].call()
	self["client1_chan"]["chan"]["Close"].call()
	self["client2_conn"]["Disconnect"].call()
	self["client1_conn"]["Disconnect"].call()

Log on to two jabber accounts, add yourself to your buddy list,
check you got that on the other end, allow it. Remove yourself, check you got that.
Add yourself again, reject it, check you got that. etc

To be written

Like in telepathy, there’s a voip-engine which is both a service and a client of a connection manager
I want to be able to pretend to be a client, and call handle channel,
then pretend to be a service, send out stuff, test that the right things
are returned, test that prerequisite calls get made.

To be written

Comments

One response to “DBus Testing Framework Tutorial”

  1. Bruno de Oliveira Abinader Avatar

    Hi Raphael,
    First of all congratulations for this great tutorial! I was looking for something like that for some time. I’ve tested all test cases but the telepathy one gave me this error:

    TestCase: Failing __main__.TelepathyClientTest—-
    Method “Connect” with signature “sa{ss}” on interface “org.freedesktop.Telepathy.ConnectionManager” doesn’t exist

    Do you know why it says that? I am using Ubuntu Edgy with all python/dbus/telepathy dev packages installed. Thanks in advance and keep up the good work!

Leave a Reply