Getting started with rabbitmq: python – knoldus blogs

1 Описание ключевого слова:

Брокер: объект сервера очереди сообщений.
Exchange: обмен сообщениями, который определяет правила, по которым сообщения направляются в какую очередь.
Очередь: оператор очереди сообщений, каждое сообщение будет помещено в одну или несколько очередей.
Binding: Binding, его роль заключается в связывании обмена и очереди в соответствии с правилами маршрутизации.
Ключ маршрутизации: ключевое слово маршрутизации, обмен доставляет сообщения на основе этого ключевого слова.
vhost: виртуальный хост, несколько брокеров могут быть настроены в брокере, который используется для разделения разрешений разных пользователей.
производитель: производитель сообщений – это программа, которая доставляет сообщения.
Потребитель . Потребитель сообщений – это программа, принимающая сообщения.
channel: канал сообщений. В каждом соединении клиента может быть установлено несколько каналов, и каждый канал представляет задачу сеанса.

4 Подтверждение сообщения

После того, как клиент удаляет сообщение из очереди, может потребоваться некоторое время для завершения процесса.Если клиент делает ошибку и завершает работу ненормально во время этого процесса, и данные не были обработаны, то, к сожалению, эти данные теряются Потому что rabbitmq пометит это сообщение как завершенное по умолчанию, а затем удалит его из очереди,Подтверждение сообщения состоит в том, что клиент принимает сообщение от rabbitmq, и после завершения обработки он отправляет подтверждение, чтобы сообщить rabbitmq, что обработка сообщения завершена, когда rabbitmq получает запрос клиента на получение сообщения, или помечает Для обработки, когда подтверждение получено снова, оно будет помечено как завершенное, а затем удалено из очереди.

Когда rabbitmq обнаруживает, что клиент отключился от самого себя и не получил подтверждение, он помещает сообщение обратно в очередь сообщений и передает его следующему клиенту, чтобы убедиться, что сообщение не потеряно. То есть RabbitMQ передал клиенту Достаточно времени для обработки данных.

Похожее:  НОУ ИНТУИТ | Лекция | Рекомендации семейства X.500

Используйте no_ack на клиенте, чтобы отметить, нужно ли отправлять ack, по умолчанию False, открытое состояние

4 сообщения постоянство

Сохранение сообщения Механизм подтверждения сообщения делает сообщение сервера не потерянным при сбое клиента, но что, если сбой rabbitmq? Как я могу гарантировать, что сообщения в очереди не будут потеряны? Для этого необходимо, чтобы когда продукт отправлял сообщение в очередь, скажите rabbitmq, что сообщение в этой очереди необходимо сохранить.

channel.basic_publish(exchange='',  
                      routing_key="test",  
                      body=message,  
                      properties=pika.BasicProperties(  
                         delivery_mode = 2, 
                      ))  

Конкретный код:

Callback queue

In general doing RPC over RabbitMQ is easy. A client sends a request
message and a server replies with a response message. In order to
receive a response the client needs to send a ‘callback’ queue address with the
request. Let’s try it:

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...

Message properties

The AMQP 0-9-1 protocol predefines a set of 14 properties that go with
a message. Most of the properties are rarely used, with the exception of
the following:

One more thing…

In my opinion, the greatest benefit of TDD is thinking about the problem before coding. So, I think the problem statement that we’re trying to solve in the first application is:

Return an object that is connected with a specific RabbitMQ server and can listen to N queues (running a function when it receives a new message) and also send a message to any given queue.

So, the main features of this application are:

It looks good. I think we covered all the main features of our application. Since we’re developing a module it is important to have a good folder organization. For this one, we’re going to use the following:

RabbitMQ-Python-Adapter-1.0.0/

├── rabbitmq-adapter/
│ ├── __init__.py
│ └── # Any other file of our module

├── config/
│ ├── config.py
│ └── # Any config file

├── tests/
│ ├── __init__.py
│ └── # All integration and unit tests

├── .gitignore
├── LICENSE
├── README.md
└── requirements.txt
* This folder structure is base on the one by Jean-Paul Calderone

Just some footnotes, it is a good practice to use the name of your module with the version of it (I use SemVer as versioning pattern) and create a folder the just the name of your module inside it. In this structure, it is easier to organize your files and apply domain-driven design in your code.

Rabbitmq tutorial на python. часть 1. (перевод)

Наша вторая программа receive.py будет получать сообщения из очереди и печатать их на экране. 

Вы можете спросить почему мы создаем очередь снова – мы же уже создавали ее в нашем предыдущем коде. Мы бы могли избежать этого, если бы были уверены, что очередь уже существует. Например, если программа

send.py

была запущена до этого. Но мы не можем быть уверены в том, какая из программ будет запущена первой. В таких случаях рекомендуется повторять создание очереди в обеих программах.

Список очередей

Вы можете посмотреть какие очереди существуют в RabbitMQ и сколько сообщений они содержат. Вы можете сделать это (как привилегированный пользователь) используя rabbitmqctl tool:

sudo rabbitmqctl list_queues

В Windows:

rabbitmqctl.bat list_queues

Получение сообщений из очереди является более сложным делом. Обработка осуществляется с применением callback функции к очереди. Всякий раз, когда мы получаем сообщение, callback функция вызывается из библиотеки Pika. В нашем случае эта функция напечатает на экране содержимое сообщения.

defcallback(ch, method, properties, body):
    print(" [x] Received %r" % body)

Далее, нам необходимо сказать RabbitMQ что частный случай этой callback функции должен получать сообщения из нашей очереди “hello”:

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

Чтобы эта команда была успешной, мы должны быть уверены, что очередь, из которой мы хотим получать сообщения, существует. К счастью, мы уверены в этом – мы создали очередь выше, используя

queue_declare

.

Параметр no_ack будет описан позже.

И, наконец, мы вводим бесконечный цикл, который ожидает данные и запускает callback-и, когда это необходимо.

print(' [*] Waiting for messages. To exit press CTRL C')
channel.start_consuming()

Собираем все вместе

Полный исходный код send.py:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

Ссылка на исходный код send.py

Полный исходный код receive.py:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

defcallback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL C')
channel.start_consuming()

Ссылка на исходный код receive.py

Теперь мы можем опробовать наши программы в терминале. Сначала запустим потребителя(consumer), который будет непрерывно ожидать сообщений:

python receive.py

Сейчас запустим поставщика(producer). Программа-поставщик будет останавливаться после каждого запуска:

python send.py

Ура! Мы смогли отправить первое сообщение через RabbitMQ. Как вы могли заметить, программа

receive.py

не останавливается, она готова к приему дальнейших сообщений и может быть прервана через комбинацию клавиш Ctrl C.

Попробуйте запустить send.py в новом терминале.

Мы научились отправлять и принимать сообщения из именованной очереди. Пришло время перейти к части 2 и построить простую очередь обработки.

Ссылка на оригинал статьи.

§

Требования.

В первом уроке мы написали программу для отправки и приема сообщений из именованной очереди. В этом уроке мы создадим обработчик очереди, которая будет использоваться для распределения трудоемких задач среди нескольких сотрудников.

Основная идея обработчика очередей (обработчика задач) заключается в том, чтобы не выполнять ресурсоемкую задачу(task) немедленно и не ожидать завершения ее выполнения. Вместо этого мы запланируем выполнить эту задачу позже. Мы инкапсулируем задачу в сообщение и отправим его в очередь. Процесс обработчика, выполняющийся в фоновом режиме, выполнит задачи (task) и в конечном итоге выполнит всё задание(job). Когда вы запустите несколько обработчиков, задачи будут распределены между ними.

Эта концепция особенно полезна в Web-приложениях, где невозможно обработать сложную задачу за короткое время окна HTTP-запроса.

Подготовка.

В первой части нашего руководства мы отправили сообщение, содержащее “Hello World!”. Теперь мы будем отправлять строки, на которых основываются сложные задачи(complex task). У нас нет реальных задач, таких как изменение размера(resize) изображений или рендеринга pdf-файлов, поэтому сымитируем нашу занятость с помощью функции time.sleep(). Мы будем считать количество точек в строке, как нашу сложность; каждая точка будет занимать одну секунду “на обработку”. Например, сымитированная задача, состоящая из строки “Hello…” займет 3 секунды.

Мы немного изменим код send.py из предыдущего примера, чтобы разрешить отправку произвольных сообщений из командной строки. Эта программа будет отправлять задачи в наш обработчик очереди, назовем ее new_task.py:

import sys

message = ' '.join(sys.argv[1:]) or"Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, 
                      ))
print(" [x] Sent %r" % message)

Полный исходный код модуля

здесь

.

Наш старый скрипт receive.py также необходимо немного изменить: ему нужно имитировать секунду обработки для каждой точки в теле сообщения. Он будет брать сообщение из очереди и выполнять задачу, поэтому назовем его worker.py:

import time

defcallback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

Полный исходный код модуля

здесь

. (Внимание! Полная версия, со всеми изменениями внесенными в этой части урока).

Round-robin диспетчерезация.

Одним из преимуществ очередей задач является простое распараллеливание процесса обработки. Если нам необходимо сократить время обработки, мы можем просто добавить больше обработчиков и тем самым сделать масштабирование проще.

Во-первых, давайте попробуем запустить два скрипта worker.py в одно и то же время. Они оба получат сообщения из очереди, но как именно? Давайте посмотрим.

Вам нужно открыть три терминала. В двух будет запущен скрипт worker.py. Эти терминалы будут нашими двумя потребителями – C1 и C2.

python worker.py
python worker.py

В третьем мы будем запускать новые задачи. После того, как вы запустили потребителей, вы можете опубликовать несколько сообщений:

python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....

Давайте посмотрим, что доставлено нашим обработчикам:

python worker.py
python worker.py

По умолчанию, RabbitMQ будет отправлять сообщение каждому из последующих потребителей последовательно. В среднем каждый потребитель получит одинаковое количество сообщений. Этот способ распространения сообщений называется round-robin. Попробуйте запустить 3 или более обработчика и повторить опыт.

Подтверждение получения сообщения.
Выполнение задачи может занять несколько секунд. У вас может возникнуть вопрос, что произойдет, если потребитель начнет выполнять долгую задачу и “умрет”, лишь частично ее выполнив. В том коде, который мы написали, RabbitMQ доставляет сообщение потребителю один раз, после чего немедленно удаляет его из памяти. В этом случае, если вы убьете обработчик, мы потеряем сообщение которое он обрабатывал. Мы также потеряем все сообщения, которые были отправлены этому обработчику, но еще не были обработаны.

Но мы не хотим терять никаких задач. Если обработчик умер, мы хотим чтобы задание было отправлено другому обработчику.

Чтобы удостовериться, что сообщения никогда не потеряются, RabbitMQ поддерживает подтверждение принятия сообщений. Подтверждение отправляется обратно от потребителя, чтобы сообщить RabbitMQ, что определенное сообщение было принято, обработано и RabbitMQ имеет право удалить его.

Если умрет потребитель (например упадет канал, соединение с RabbitMQ или TCP-соединение) без отправки подтверждения, RabbitMQ поймет, что сообщение не было полностью обработано и поставит его в очередь повторно. Если какой-то из других потребителей будет online в это время, RabbitMQ быстро перенаправит сообщения этому потребителю. Таким образом вы можете быть уверены, что сообщения не теряются, даже если обработчики время от времени умирают.

Нет никаких таймаутов сообщений; RabbitMQ повторно отправит сообщение когда потребитель умрет. Это прекрасно, даже если обработка сообщения занимает очень много времени.

Подтверждение о получении сообщение включено по умолчанию. В предыдущих примерах мы явно выключили их с помощью флага no_ack=True. Пришло время удалить этот флаг и отправить подтверждение от обработчика, после того как мы выполним задачу.

defcallback(ch, method, properties, body):print" [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print" [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello')

Исходный код.

Используя этот исходный код мы можем быть уверены, что даже если вы убьете обработчик используя Ctrl C пока сообщение обрабатывается, ничего не будет потеряно. Вскоре после того как обработчик умрет, все непринятые сообщения будут повторно отправлены.

Забытое подтверждение.

Распространенная ошибка заключается в том, что пользователи забывают про basic_ack. Это простая ошибка, но она приводит к серьезным последствиям. Сообщение будет повторно отправлено, когда ваш клиент выйдет(что может показаться случайной повторной отправкой), но RabbitMQ будет потреблять все больше и больше памяти, так как не сможет отправить другие неподтвержденные сообщения.

Для отладки вы можете использовать rabbitmqctl и вывести поле messages_unacknowledges:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

На Windows просто удалите sudo:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Долговременные сообщения.

Мы узнали, как убедиться, что задача не потеряется, даже если потребитель умрет. Но наши задачи могут потеряться, если остановится сервер RabbitMQ.

Когда сервер RabbitMQ отключается или ломается, все наши очереди и сообщения теряются, если вы не объявили иное. Чтобы убедиться, что сообщения не потеряны, нам необходимо выполнить две вещи: нужно пометить очередь и сообщения как долговременные.

Сначала мы должны сделать так, чтобы RabbitMQ никогда не потеряла нашу очередь. Чтобы сделать это, нам необходимо объявить ее долговременной:

channel.queue_declare(queue='hello', durable=True)

Хотя эта команда верна сама по себе, но с нашими настройками она не сработает. Это потому, что мы уже объявили очередь с именем

hello

 как недолговременную. RabbitMQ не позволяет переопределять существующую очередь с другими параметрами и возвращает ошибку любой программе, которая пытается это сделать. Но есть быстрый обходной путь, объявить очередь с другим именем, например task_queue_d:

channel.queue_declare(queue='task_queue_d', durable=True)

Пояснение: в оригинальной статье очередь переименовали в task_queue, но очередь с таким именем уже была создана и она не durable, вследствие чего возникает ошибка исполнение, поэтому я поправил код на очередь с другим именем.

Эти изменения нужно внести в код поставщика и потребителя.

Исходник worker_d.py
Исходник new_task_d.py

Теперь мы уверены, что очередь task_queue_d не будет потеряна, даже если перезапустится сервер RabbitMQ.  Теперь нам нужно пометить наши сообщения как постоянные, объявив свойство delivery_mode со значением 2:

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, 
                      ))

Примечание о постоянных сообщениях.

Маркировка сообщений как постоянных не дает полной гарантии того, что сообщение не будет потеряно. Несмотря на то, что это указывает RabbitMQ на то, что сообщение необходимо сохранить на диск, все еще остается короткое окно, когда RabbitMQ уже принял сообщение, но все еще не сохранил его. Также, RabbitMQ не делает

fsync(2)

для каждого сообщения – оно может быть просто сохранено в кэше и на самом деле не записано на диск. Гарантированная устойчивость не высока, но этого более чем достаточно для нашей простой очереди задач. Если вам нужны более надежные гарантии, вы можете использовать

подтверждение поставщика

.

Справедливая рассылка.

Как вы могли заметить, рассылка по-прежнему работает не так, как нам нужно. Например, в ситуации с двумя обработчиками, когда все нечетные сообщения тяжелые, а все четные – легкие, один из обработчиков будет постоянно занят, а другой будет простаивать. Ну, RabbitMQ ничего не знает об этом и будет равномерно распределять сообщения.

Это происходит потому что RabbitMQ просто отсылает сообщение, когда оно попадает в очередь. Он не смотрит на количество неподтвержденных сообщений отправленных потребителю. Он слепо посылает каждое n-ое сообщение n-ому потребителю:

Чтобы исправить это, мы можем использовать метод

basic_qos

с параметром

prefetch_count=1

. Это сообщает RabbitMQ, чтобы он не давал обработчику более одного сообщения в единицу времени. Или, другими словами, не отправлять новое сообщение пока не завершится процесс обработки и не будет получено подтверждение на предыдущее сообщение. Вместо этого он отправит его следующему обработчику, который не занят.

channel.basic_qos(prefetch_count=1)

Примечание о размере очереди.

Если все обработчики заняты, ваша очередь может заполниться. Вам необходимо следить за этим и, возможно, добавить несколько обработчиков или использовать TTL сообщений.

Сложим все вместе.

Окончательный код для нашего сценария new_task.py:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or"Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, 
                      ))
print(" [x] Sent %r" % message)
connection.close()

new_task.py

И нашего обработчика:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL C')

defcallback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

worker.py

Используя подтверждение сообщений и prefetch_count вы можете настроить обработку очереди. Настройка долговечности позволяет задачам выживать даже при перезапуске RabbitMQ.

Теперь мы можем перейти к третьему уроку и научиться доставлять одно и то же сообщение нескольким потребителям.

Оригинал статьи на vhod-v-lichnyj-kabinet.ru

Summary

Getting Started with RabbitMQ: Python - Knoldus Blogs

digraph {
bgcolor=transparent;
truecolor=true;
rankdir=LR;
node [style=”filled”];
//
subgraph cluster_C {
label=”Client”;
color=transparent;
C [label=”C”, fillcolor=”#00ffff”];
};
subgraph cluster_XXXa {
color=transparent;
subgraph cluster_Note {
color=transparent;
N [label=”Requestnreply_to=amqp.gen-Xa2…ncorrelation_id=abc”,
fontsize=12,
shape=note];
};
subgraph cluster_Reply {
color=transparent;
R [label=”Replyncorrelation_id=abc”,
fontsize=12,
shape=note];
};
};
subgraph cluster_XXXb {
color=transparent;
subgraph cluster_RPC {
label=”rpc_queue”;
color=transparent;
RPC [label=”{<s>||||<e>}”, fillcolor=”red”, shape=”record”];
};
subgraph cluster_REPLY {
label=”reply_to=amq.gen-Xa2…”;
color=transparent;
REPLY [label=”{<s>||||<e>}”, fillcolor=”red”, shape=”record”];
};
};
subgraph cluster_W {
label=”Server”;
color=transparent;
W [label=”S”, fillcolor=”#00ffff”];
};
//
C -> N;
N -> RPC:s;
RPC:e -> W;
W -> REPLY:e;
REPLY:s -> R;
R -> C;
}

Our RPC will work like this:

Wait… what is tdd? 🤔

In a nutshell, TDD (test-driven development) is a development process oriented to testing. Instead of coding your application from scratch, in TDD you start by coding the tests of your application. You can use a lot of packages to help you coding those tests, but with automated testing, it is way easier to code and maintain your applications.

Testing is one of the most valuable best-practices. It helps your code to be more scalable and also stable. Now, let’s understand a little bit about how to write tests. There are 3 main types of tests:

A common TDD cycle
  1. Unit Tests: they examine the logic of your code. You test each function separately, mocking any integrations, and assert that every time you input some data you get the expected output. It is a very low-level test and usually, you have a single file for each part of your code.
  2. Integration Tests: they test some larger integrated areas of your code. You can mock some other parts of your application, but the idea is to test a “piece” of your application integrated with every dependency of that feature. For example, in a booking website an integration test would assert that your entire checkout feature is working as expected.
  3. End-to-end (Acceptance) Tests: this is the largest and more expensive automated test (although is way cheaper than manual testing). In an end-to-end test, you write a code to test your entire application.

Ok, this is TDD 101. There still is a LOT to cover if you want to learn more about it, but I will write something about it in the future.

You probably noticed that I’ve just written about functions. But, what about classes and OOP? Well, I am going to write this program with functional programming.

What this tutorial focuses on

In the previous tutorial we created a work
queue. The assumption behind a work queue is that each task is
delivered to exactly one worker. In this part we’ll do something
completely different — we’ll deliver a message to multiple
consumers. This pattern is known as “publish/subscribe”.

To illustrate the pattern, we’re going to build a simple logging
system. It will consist of two programs — the first will emit log
messages and the second will receive and print them.

In our logging system every running copy of the receiver program will
get the messages. That way we’ll be able to run one receiver and
direct the logs to disk; and at the same time we’ll be able to run
another receiver and see the logs on the screen.

Essentially, published log messages are going to be broadcast to all
the receivers.

Finally! let’s code something 🙏

Every time I start a new project I like to write down some main integration test cases. It helps me to understand what I need to do and organize my thoughts. The first thing that I do is just write down the test assertions. I use Pytestas my testing framework.

So, my main integration test code would start like:

RabbitMQ-Python-Adapter-1.0.0/tests/test_integration.py

As you can see, I’ve not written any test yet. This is usually how I start a TDD project. I just write the test functions, then I start developing them. It is a good way to organize your coding process.

Now, to run this test you need to install the Pytest framework. It is pretty easy just run pip install -U pytest and it will run the installation. If you have any problems you can check their docs.

After the installation is completed, go to your terminal, in the root project folder and just run pytest. It will start running your tests. Every file that starts with test_* will be matched and run.

Well, since everything is asserting True, you will get a success output like this one:

====================== test session starts ======================
platform linux — Python 3.4.5, pytest-3.0.5, py-1.4.32, pluggy-0.4.0
rootdir: /home/odelucca/Servers/blog/RabbitMQ-Python-Adapter-1.0.0, inifile:
collected 4 items
tests/integration/test_integration.py ….==================== 4 passed in 0.01 seconds ====================

Now we need to code some real tests. The first thing we’re going to do is write the factory scripts. To do so, I’m going to write the test_channel. This test needs to create a new RabbitMQ channel.

RabbitMQ-Python-Adapter-1.0.0/tests/test_channel.py

In the first 6 lines, we are just importing every dependency to run this test. You’ve probably noticed some unknown names here:unittest.mockand tests.__mocks__.

The first one is a mocking library for Python. It is already built in on your Python installation. Mocking is a testing technique where you bypass the usage of a function returning any data that you choose. For example, in this test, we’re going to mock the Pika (an external library) usage.

Every time the tested function calls the mocked function, they will not be called and instead will return something that we have chosen. The second unknown imported lib is the tests.__mock__. This is where I put all my mocks in order to reuse them. It is good practice to do so.

The mock that we’re going to use is the pika mock (imported on line 6). Pika is a pure Python implementation of the AMQP protocol. It helps us to connect with our RabbitMQ server.

RabbitMQ-Python-Adapter-1.0.0/tests/__mocks__/pika.py

Line 8 and 9 is where I import my configs. Usually, I put all configuration in a YAML file on the config folder. To do so, I set the absolute path to my config folder in the CONFIG environment variable.

Also, I need to create a config.py script in order to import the right environment config. For this module, we’re going to need just the testing config, because almost every configuration will come from which function is calling our module. Now, you need to create a config.py file inside your config folder:

RabbitMQ-Python-Adapter-1.0.0/config/config.py

This is a pretty simple script. It just gets all the yaml files on the config folder and loads them. Also, I usually create a parameter inside every config for the following environments: test, development, staging and production.

Now, you need to create the rabbitmq.yaml file:

RabbitMQ-Python-Adapter-1.0.0/config/rabbitmq.yaml

It is a very simple config file. In a larger project, you would have a different object for every environment.

Now, we need to add the pyyaml and dotmap as dependencies to our projects (don’t forget to run pip install -r requirements.txt to install them). Our requirements.txt might look like:

pytest
pyyaml
dotmap

Back to our test_channel.py file, the lines 11, 21 and 32 are Pytest markers. They are metadata that you can use to interact with Pytest itself.

The idea here is to define if the following test is a unit or integration test. Integration tests usually take way longer to run than unit tests. So, you could just want to run only unit tests in a CD structure. In order to do so, instead of running pytest you could run pytest -m unit and then it will test only the functions that are marked as unit.

But, to work with custom markers, you need to change your pytest.ini. This is a file that you put in the root of your project, with the following contents:

RabbitMQ-Python-Adapter-1.0.0/pytest.ini

To make our life easier, I’ve also added some environment variables that we’re using: ENV and CONFIG. But, Pytest doesn’t have this env ini config by default. You need to install the pytest-env lib. Now, our requirements.txt will be looking like this:

pytest
pytest-env
pyyaml
dotmap

Back to our test, in line 12 we start writing the first unit test. The idea behind the channel script is to create a Pika Channel that can communicate with our RabbitMQ server. To do so, we need to:

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *