test_draft.py 1.44 KB
Newer Older
xuebingbing's avatar
xuebingbing committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
# -*- coding: utf8 -*-
# Copyright (C) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.

import os
import platform
import time

import pytest
import zmq
from zmq.tests import (
    BaseZMQTestCase, skip_pypy
)


class TestDraftSockets(BaseZMQTestCase):
    def setUp(self):
        if not zmq.DRAFT_API:
            raise pytest.skip("draft api unavailable")
        super(TestDraftSockets, self).setUp()
    

    def test_client_server(self):
        client, server = self.create_bound_pair(zmq.CLIENT, zmq.SERVER)
        client.send(b'request')
        msg = self.recv(server, copy=False)
        assert msg.routing_id is not None
        server.send(b'reply', routing_id=msg.routing_id)
        reply = self.recv(client)
        assert reply == b'reply'

    def test_radio_dish(self):
        dish, radio = self.create_bound_pair(zmq.DISH, zmq.RADIO)
        dish.rcvtimeo = 250
        group = 'mygroup'
        dish.join(group)
        received_count = 0
        received = set()
        sent = set()
        for i in range(10):
            msg = str(i).encode('ascii')
            sent.add(msg)
            radio.send(msg, group=group)
            try:
                recvd = dish.recv()
            except zmq.Again:
                time.sleep(0.1)
            else:
                received.add(recvd)
                received_count += 1
        # assert that we got *something*
        assert len(received.intersection(sent)) >= 5