转一篇草稿将来集群方案可能会用得到。
April 24th, 2009 by skysbird
出自:http://app.arat.us/blog/?p=112
Writing an AMQP connector with Python, Twisted, Trial and txAMQP
Published on April 24th, 2009 by Dan Reverri in apparatus Tools:
In this article we’ll be writing an AMQP connector that we can re-use in future AMQP based projects. We’ll be using the Twisted testing component Trial to test our code as we develop it. To start let’s create our project directory, setup a test case, and run our first test.
mkdir connector
cd connector
mkdir test
vi test/test.py
The resulting directory structure will be:
connector/
connector/test/
connector/test/test.py
In test.py:
from twisted.trial import unittest
class AmqpConnectorTest(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_connector_init(self):
connector = AmqpConnector()
Now we can run our first test and enjoy the sweet, sweet failure. Note that tests are run from within the connector directory.
trial test/test.py
test
AmqpConnectorTest
test_connector_init … [ERROR]
===============================================================================
[ERROR]: test.AmqpConnectorTest.test_connector_init
Traceback (most recent call last):
File “test/test.py”, line 11, in test_connector_init
connector = AmqpConnector()
exceptions.NameError: global name ‘AmqpConnector’ is not defined
——————————————————————————-
Ran 1 tests in 0.018s
FAILED (errors=1)
Ok, let’s fix this by implementing connector.py in the connector/ directory.
class AmqpConnector():
pass
Our directory structure is now:
connector/
connector/connector.py
connector/test/
connector/test/test.py
We update our test to import the new connector and run trial.
from connector import AmqpConnector
from twisted.trial import unittest
class AmqpConnectorTest(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_connector_init(self):
connector = AmqpConnector()
trial test/*.py
test
AmqpConnectorTest
test_connector_init … [OK]
——————————————————————————-
Ran 1 tests in 0.003s
PASSED (successes=1)
Thinking about what our connector must do and referencing the examples provided with txAMQP we’ll implement tests for the following:
* connect
* authenticate
* channel_open
And here are the tests:
from twisted.trial import unittest
from twisted.internet.defer import inlineCallbacks
from connector import AmqpConnector
class AmqpConnectorTest(unittest.TestCase):
def setUp(self):
self.connector = AmqpConnector()
def tearDown(self):
pass
@inlineCallbacks
def test_connect(self):
connector = self.connector
yield connector.connect()
@inlineCallbacks
def test_authenticate(self):
connector = self.connector
yield connector.connect()
yield connector.authenticate()
@inlineCallbacks
def test_channel_open(self):
connector = self.connector
yield connector.connect()
yield connector.authenticate()
yield connector.channel_open()
You’ll notice that I’ve added an additional import to provide the inlineCallbacks decorator. This allows us to write asynchronous code as if it were synchronous code. I’ve also moved the connector initialization to setUp() and removed the init test. The setUp() method is run before each test and the tearDown() method is run after each test. This means that each test method will be passed a new connector object to work with (self.connector).
To get an idea of how to start writing our connector lets review the test code that comes with txAMQP. In a previous article I went over fetching txAQMP from launchpad. If you have not done that yet you should do it now. Review the file “src/txamqp/testlib.py”. We’re going to “borrow” the init() and connect() methods but re-arrange it a bit to move authentication into a separate method.
I’ve also changed the reference to the spec file. We’ll need to copy “amqp0-8.xml” into the connector directory.
from twisted.internet.defer import inlineCallbacks, Deferred
from txamqp.protocol import AMQClient, TwistedDelegate
from twisted.internet import protocol, reactor
import txamqp.spec
class AmqpConnector():
def __init__(self):
self.host = “localhost”
self.port = 5672
self.spec = “amqp0-8.xml”
self.user = “guest”
self.password = “guest”
self.vhost = “/”
@inlineCallbacks
def connect(self,host=None,port=None,spec=None,vhost=None):
host = host or self.host
port = port or self.port
spec = spec or self.spec
vhost = vhost or self.vhost
delegate = TwistedDelegate()
onConn = Deferred()
f = protocol._InstanceFactory(reactor, AMQClient(delegate, vhost, txamqp.spec.load(spec)), onConn)
self.cnx = reactor.connectTCP(host, port, f)
self.connection = yield onConn
@inlineCallbacks
def authenticate(self, user=None, password=None):
user = user or self.user
password = password or self.password
yield self.connection.start({”LOGIN”: user, “PASSWORD”: password})
@inlineCallbacks
def channel_open(self):
self.channel = yield self.connection.channel(1)
yield self.channel.channel_open()
When I first ran these tests they all failed:
1
exceptions.IOError: [Errno 2] No such file or directory: ‘amqp0-8.xml’
This stumped me for a while until I realized local files were being searched for in the “_trial_temp” directory that Trial uses. A simple fix to setUp() resolved the issue:
def setUp(self):
self.connector = AmqpConnector()
self.connector.spec = “../amqp0-8.xml”
Please remember that these tests assume you have a RabbitMQ instance running locally on port 5672 that responds to the user “guest” with the password “guest”. I only mention that because my next test run failed because RabbitMQ was not running.
With RabbitMQ running the next test run with slightly more successful:
test
AmqpConnectorTest
test_authenticate … [OK]
[ERROR]
test_channel_open … [OK]
[ERROR]
test_connect … [OK]
[ERROR]
===============================================================================
[ERROR]: test.AmqpConnectorTest.test_authenticate
Traceback (most recent call last):
Failure: twisted.trial.util.DirtyReactorAggregateError: Reactor was unclean.
Selectables:
<
===============================================================================
[ERROR]: test.AmqpConnectorTest.test_channel_open
Traceback (most recent call last):
Failure: twisted.trial.util.DirtyReactorAggregateError: Reactor was unclean.
Selectables:
<
===============================================================================
[ERROR]: test.AmqpConnectorTest.test_connect
Traceback (most recent call last):
Failure: twisted.trial.util.DirtyReactorAggregateError: Reactor was unclean.
Selectables:
<
——————————————————————————-
Ran 3 tests in 1.199s
FAILED (errors=3, successes=3)
The message “Reactor was unclean.” indicates our tests are leaving an open connection. We can fix this by closing the connection in tearDown():
@inlineCallbacks
def tearDown(self):
yield self.connector.cnx.disconnect()
One final test run should give us all green lights:
test
AmqpConnectorTest
test_authenticate … [OK]
test_channel_open … [OK]
test_connect … [OK]
——————————————————————————-
Ran 3 tests in 1.195s
PASSED (successes=3)