Logo Search packages:      
Sourcecode: telepathy-gabble version File versions  Download package


Infrastructure code for testing connection managers.

from twisted.internet import glib2reactor
from twisted.internet.protocol import Protocol, Factory, ClientFactory
import sys

import pprint
import unittest

import dbus.glib

from twisted.internet import reactor

import constants as cs

tp_name_prefix = 'org.freedesktop.Telepathy'
tp_path_prefix = '/org/freedesktop/Telepathy'

00023 class Event:
    def __init__(self, type, **kw):
        self.type = type

def format_event(event):
    ret = ['- type %s' % event.type]

    for key in dir(event):
        if key != 'type' and not key.startswith('_'):
            ret.append('- %s: %s' % (
                key, pprint.pformat(getattr(event, key))))

            if key == 'error':
                ret.append('%s' % getattr(event, key))

    return ret

00041 class EventPattern:
    def __init__(self, type, **properties):
        self.type = type
        self.predicate = None
        if 'predicate' in properties:
            self.predicate = properties['predicate']
            del properties['predicate']
        self.properties = properties

    def __repr__(self):
        properties = dict(self.properties)

        if self.predicate is not None:
            properties['predicate'] = self.predicate

        return '%s(%r, **%r)' % (
            self.__class__.__name__, self.type, properties)

    def match(self, event):
        if event.type != self.type:
            return False

        for key, value in self.properties.iteritems():
                if getattr(event, key) != value:
                    return False
            except AttributeError:
                return False

        if self.predicate is None or self.predicate(event):
            return True

        return False

00076 class TimeoutError(Exception):

00079 class BaseEventQueue:
    """Abstract event queue base class.

    Implement the wait() method to have something that works.

    def __init__(self, timeout=None):
        self.verbose = False
        self.forbidden_events = set()

        if timeout is None:
            self.timeout = 5
            self.timeout = timeout

    def log(self, s):
        if self.verbose:
            print s

    def log_event(self, event):
        if self.verbose:
            self.log('got event:')

            if self.verbose:
                map(self.log, format_event(event))

00105     def forbid_events(self, patterns):
        Add patterns (an iterable of EventPattern) to the set of forbidden
        events. If a forbidden event occurs during an expect or expect_many,
        the test will fail.

00113     def unforbid_events(self, patterns):
        Remove 'patterns' (an iterable of EventPattern) from the set of
        forbidden events. These must be the same EventPattern pointers that
        were passed to forbid_events.

    def _check_forbidden(self, event):
        for e in self.forbidden_events:
            if e.match(event):
                print "forbidden event occurred:"
                for x in format_event(event):
                    print x
                assert False

00129     def expect(self, type, **kw):
        Waits for an event matching the supplied pattern to occur, and returns
        it. For example, to await a D-Bus signal with particular arguments:

            e = q.expect('dbus-signal', signal='Badgers', args=["foo", 42])
        pattern = EventPattern(type, **kw)

        while True:
            event = self.wait()

            if pattern.match(event):
                return event

            self.log('not handled')

00151     def expect_many(self, *patterns):
        Waits for events matching all of the supplied EventPattern instances to
        return, and returns a list of events in the same order as the patterns
        they matched. After a pattern is successfully matched, it is not
        considered for future events; if more than one unsatisfied pattern
        matches an event, the first "wins".

        Note that the expected events may occur in any order. If you're
        expecting a series of events in a particular order, use repeated calls
        to expect() instead.

        This method is useful when you're awaiting a number of events which may
        happen in any order. For instance, in telepathy-gabble, calling a D-Bus
        method often causes a value to be returned immediately, as well as a
        query to be sent to the server. Since these events may reach the test
        in either order, the following is incorrect and will fail if the IQ
        happens to reach the test first:

            ret = q.expect('dbus-return', method='Foo')
            query = q.expect('stream-iq', query_ns=ns.FOO)

        The following would be correct:

            ret, query = q.expect_many(
                EventPattern('dbus-return', method='Foo'),
                EventPattern('stream-iq', query_ns=ns.FOO),
        ret = [None] * len(patterns)

        while None in ret:
                event = self.wait()
            except TimeoutError:
                self.log('still expecting:')
                for i, pattern in enumerate(patterns):
                    if ret[i] is None:
                        self.log(' - %r' % pattern)

            for i, pattern in enumerate(patterns):
                if ret[i] is None and pattern.match(event):
                    ret[i] = event
                self.log('not handled')

        return ret

    def demand(self, type, **kw):
        pattern = EventPattern(type, **kw)

        event = self.wait()

        if pattern.match(event):
            return event

        self.log('not handled')
        raise RuntimeError('expected %r, got %r' % (pattern, event))

00221 class IteratingEventQueue(BaseEventQueue):
    """Event queue that works by iterating the Twisted reactor."""

    def __init__(self, timeout=None):
        BaseEventQueue.__init__(self, timeout)
        self.events = []

    def wait(self):
        stop = [False]

        def later():
            stop[0] = True

        delayed_call = reactor.callLater(self.timeout, later)

        while (not self.events) and (not stop[0]):

        if self.events:
            return self.events.pop(0)
            raise TimeoutError

    def append(self, event):

    # compatibility
    handle_event = append

00251 class TestEventQueue(BaseEventQueue):
    def __init__(self, events):
        self.events = events

    def wait(self):
        if self.events:
            return self.events.pop(0)
            raise TimeoutError

00262 class EventQueueTest(unittest.TestCase):
    def test_expect(self):
        queue = TestEventQueue([Event('foo'), Event('bar')])
        assert queue.expect('foo').type == 'foo'
        assert queue.expect('bar').type == 'bar'

    def test_expect_many(self):
        queue = TestEventQueue([Event('foo'), Event('bar')])
        bar, foo = queue.expect_many(
        assert bar.type == 'bar'
        assert foo.type == 'foo'

    def test_expect_many2(self):
        # Test that events are only matched against patterns that haven't yet
        # been matched. This tests a regression.
        queue = TestEventQueue([Event('foo', x=1), Event('foo', x=2)])
        foo1, foo2 = queue.expect_many(
        assert foo1.type == 'foo' and foo1.x == 1
        assert foo2.type == 'foo' and foo2.x == 2

    def test_timeout(self):
        queue = TestEventQueue([])
        self.assertRaises(TimeoutError, queue.expect, 'foo')

    def test_demand(self):
        queue = TestEventQueue([Event('foo'), Event('bar')])
        foo = queue.demand('foo')
        assert foo.type == 'foo'

    def test_demand_fail(self):
        queue = TestEventQueue([Event('foo'), Event('bar')])
        self.assertRaises(RuntimeError, queue.demand, 'bar')

00299 def unwrap(x):
    """Hack to unwrap D-Bus values, so that they're easier to read when

    if isinstance(x, list):
        return map(unwrap, x)

    if isinstance(x, tuple):
        return tuple(map(unwrap, x))

    if isinstance(x, dict):
        return dict([(unwrap(k), unwrap(v)) for k, v in x.iteritems()])

    if isinstance(x, dbus.Boolean):
        return bool(x)

    for t in [unicode, str, long, int, float]:
        if isinstance(x, t):
            return t(x)

    return x

00321 def call_async(test, proxy, method, *args, **kw):
    """Call a D-Bus method asynchronously and generate an event for the
    resulting method return/error."""

    def reply_func(*ret):
        test.handle_event(Event('dbus-return', method=method,

    def error_func(err):
        test.handle_event(Event('dbus-error', method=method, error=err,
            name=err.get_dbus_name(), message=str(err)))

    method_proxy = getattr(proxy, method)
    kw.update({'reply_handler': reply_func, 'error_handler': error_func})
    method_proxy(*args, **kw)

def sync_dbus(bus, q, conn):
    # Dummy D-Bus method call
    # This won't do the right thing unless the proxy has a unique name.
    assert conn.object.bus_name.startswith(':')
    root_object = bus.get_object(conn.object.bus_name, '/')
        q, dbus.Interface(root_object, 'org.freedesktop.DBus.Peer'), 'Ping')
    q.expect('dbus-return', method='Ping')

00346 class ProxyWrapper:
    def __init__(self, object, default, others):
        self.object = object
        self.default_interface = dbus.Interface(object, default)
        self.Properties = dbus.Interface(object, dbus.PROPERTIES_IFACE)
        self.TpProperties = \
            dbus.Interface(object, tp_name_prefix + '.Properties')
        self.interfaces = dict([
            (name, dbus.Interface(object, iface))
            for name, iface in others.iteritems()])

    def __getattr__(self, name):
        if name in self.interfaces:
            return self.interfaces[name]

        if name in self.object.__dict__:
            return getattr(self.object, name)

        return getattr(self.default_interface, name)

def wrap_connection(conn):
    return ProxyWrapper(conn, tp_name_prefix + '.Connection',
            (name, tp_name_prefix + '.Connection.Interface.' + name)
            for name in ['Aliasing', 'Avatars', 'Capabilities', 'Contacts',
              'Presence', 'SimplePresence', 'Requests']] +
        [('Peer', 'org.freedesktop.DBus.Peer'),
         ('ContactCapabilities', cs.CONN_IFACE_CONTACT_CAPS),
         ('ContactInfo', cs.CONN_IFACE_CONTACT_INFO),
         ('Location', cs.CONN_IFACE_LOCATION),
         ('Future', tp_name_prefix + '.Connection.FUTURE'),
         ('MailNotification', cs.CONN_IFACE_MAIL_NOTIFICATION),

def wrap_channel(chan, type_, extra=None):
    interfaces = {
        type_: tp_name_prefix + '.Channel.Type.' + type_,
        'Group': tp_name_prefix + '.Channel.Interface.Group',

    if extra:
            (name, tp_name_prefix + '.Channel.Interface.' + name)
            for name in extra]))

    return ProxyWrapper(chan, tp_name_prefix + '.Channel', interfaces)

def make_connection(bus, event_func, name, proto, params):
    cm = bus.get_object(
        tp_name_prefix + '.ConnectionManager.%s' % name,
        tp_path_prefix + '/ConnectionManager/%s' % name)
    cm_iface = dbus.Interface(cm, tp_name_prefix + '.ConnectionManager')

    connection_name, connection_path = cm_iface.RequestConnection(
        proto, params)
    conn = wrap_connection(bus.get_object(connection_name, connection_path))

    return conn

def make_channel_proxy(conn, path, iface):
    bus = dbus.SessionBus()
    chan = bus.get_object(conn.object.bus_name, path)
    chan = dbus.Interface(chan, tp_name_prefix + '.' + iface)
    return chan

# block_reading can be used if the test want to choose when we start to read
# data from the socket.
00413 class EventProtocol(Protocol):
    def __init__(self, queue=None, block_reading=False):
        self.queue = queue
        self.block_reading = block_reading

    def dataReceived(self, data):
        if self.queue is not None:
            self.queue.handle_event(Event('socket-data', protocol=self,

    def sendData(self, data):

    def connectionMade(self):
        if self.block_reading:

    def connectionLost(self, reason=None):
        if self.queue is not None:
            self.queue.handle_event(Event('socket-disconnected', protocol=self))

00434 class EventProtocolFactory(Factory):
    def __init__(self, queue, block_reading=False):
        self.queue = queue
        self.block_reading = block_reading

    def _create_protocol(self):
        return EventProtocol(self.queue, self.block_reading)

    def buildProtocol(self, addr):
        proto = self._create_protocol()
        self.queue.handle_event(Event('socket-connected', protocol=proto))
        return proto

00447 class EventProtocolClientFactory(EventProtocolFactory, ClientFactory):

def watch_tube_signals(q, tube):
    def got_signal_cb(*args, **kwargs):
            args=map(unwrap, args),

        path_keyword='path', member_keyword='member',

def pretty(x):
    return pprint.pformat(unwrap(x))

def assertEquals(expected, value):
    if expected != value:
        raise AssertionError(
            "expected:\n%s\ngot:\n%s" % (pretty(expected), pretty(value)))

def assertSameSets(expected, value):
    exp_set = set(expected)
    val_set = set(value)

    if exp_set != val_set:
        raise AssertionError(
            "expected contents:\n%s\ngot:\n%s" % (
                pretty(exp_set), pretty(val_set)))

def assertNotEquals(expected, value):
    if expected == value:
        raise AssertionError(
            "expected something other than:\n%s" % pretty(value))

def assertContains(element, value):
    if element not in value:
        raise AssertionError(
            "expected:\n%s\nin:\n%s" % (pretty(element), pretty(value)))

def assertDoesNotContain(element, value):
    if element in value:
        raise AssertionError(
            "expected:\n%s\nnot in:\n%s" % (pretty(element), pretty(value)))

def assertLength(length, value):
    if len(value) != length:
        raise AssertionError("expected: length %d, got length %d:\n%s" % (
            length, len(value), pretty(value)))

def assertFlagsSet(flags, value):
    masked = value & flags
    if masked != flags:
        raise AssertionError(
            "expected flags %u, of which only %u are set in %u" % (
            flags, masked, value))

def assertFlagsUnset(flags, value):
    masked = value & flags
    if masked != 0:
        raise AssertionError(
            "expected none of flags %u, but %u are set in %u" % (
            flags, masked, value))

def install_colourer():
    def red(s):
        return '\x1b[31m%s\x1b[0m' % s

    def green(s):
        return '\x1b[32m%s\x1b[0m' % s

    patterns = {
        'handled': green,
        'not handled': red,

    class Colourer:
        def __init__(self, fh, patterns):
            self.fh = fh
            self.patterns = patterns

        def write(self, s):
            f = self.patterns.get(s, lambda x: x)

    sys.stdout = Colourer(sys.stdout, patterns)
    return sys.stdout

if __name__ == '__main__':

Generated by  Doxygen 1.6.0   Back to index