2128 lines
73 KiB
Python
2128 lines
73 KiB
Python
"""AMQP Channels."""
|
|
# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>
|
|
|
|
import logging
|
|
import socket
|
|
from collections import defaultdict
|
|
from queue import Queue
|
|
|
|
from vine import ensure_promise
|
|
|
|
from . import spec
|
|
from .abstract_channel import AbstractChannel
|
|
from .exceptions import (ChannelError, ConsumerCancelled, MessageNacked,
|
|
RecoverableChannelError, RecoverableConnectionError,
|
|
error_for_code)
|
|
from .protocol import queue_declare_ok_t
|
|
|
|
__all__ = ('Channel',)
|
|
|
|
AMQP_LOGGER = logging.getLogger('amqp')
|
|
|
|
REJECTED_MESSAGE_WITHOUT_CALLBACK = """\
|
|
Rejecting message with delivery tag %r for reason of having no callbacks.
|
|
consumer_tag=%r exchange=%r routing_key=%r.\
|
|
"""
|
|
|
|
|
|
class VDeprecationWarning(DeprecationWarning):
|
|
pass
|
|
|
|
|
|
class Channel(AbstractChannel):
|
|
"""AMQP Channel.
|
|
|
|
The channel class provides methods for a client to establish a
|
|
virtual connection - a channel - to a server and for both peers to
|
|
operate the virtual connection thereafter.
|
|
|
|
GRAMMAR::
|
|
|
|
channel = open-channel *use-channel close-channel
|
|
open-channel = C:OPEN S:OPEN-OK
|
|
use-channel = C:FLOW S:FLOW-OK
|
|
/ S:FLOW C:FLOW-OK
|
|
/ functional-class
|
|
close-channel = C:CLOSE S:CLOSE-OK
|
|
/ S:CLOSE C:CLOSE-OK
|
|
|
|
Create a channel bound to a connection and using the specified
|
|
numeric channel_id, and open on the server.
|
|
|
|
The 'auto_decode' parameter (defaults to True), indicates
|
|
whether the library should attempt to decode the body
|
|
of Messages to a Unicode string if there's a 'content_encoding'
|
|
property for the message. If there's no 'content_encoding'
|
|
property, or the decode raises an Exception, the message body
|
|
is left as plain bytes.
|
|
"""
|
|
|
|
_METHODS = {
|
|
spec.method(spec.Channel.Close, 'BsBB'),
|
|
spec.method(spec.Channel.CloseOk),
|
|
spec.method(spec.Channel.Flow, 'b'),
|
|
spec.method(spec.Channel.FlowOk, 'b'),
|
|
spec.method(spec.Channel.OpenOk),
|
|
spec.method(spec.Exchange.DeclareOk),
|
|
spec.method(spec.Exchange.DeleteOk),
|
|
spec.method(spec.Exchange.BindOk),
|
|
spec.method(spec.Exchange.UnbindOk),
|
|
spec.method(spec.Queue.BindOk),
|
|
spec.method(spec.Queue.UnbindOk),
|
|
spec.method(spec.Queue.DeclareOk, 'sll'),
|
|
spec.method(spec.Queue.DeleteOk, 'l'),
|
|
spec.method(spec.Queue.PurgeOk, 'l'),
|
|
spec.method(spec.Basic.Cancel, 's'),
|
|
spec.method(spec.Basic.CancelOk, 's'),
|
|
spec.method(spec.Basic.ConsumeOk, 's'),
|
|
spec.method(spec.Basic.Deliver, 'sLbss', content=True),
|
|
spec.method(spec.Basic.GetEmpty, 's'),
|
|
spec.method(spec.Basic.GetOk, 'Lbssl', content=True),
|
|
spec.method(spec.Basic.QosOk),
|
|
spec.method(spec.Basic.RecoverOk),
|
|
spec.method(spec.Basic.Return, 'Bsss', content=True),
|
|
spec.method(spec.Tx.CommitOk),
|
|
spec.method(spec.Tx.RollbackOk),
|
|
spec.method(spec.Tx.SelectOk),
|
|
spec.method(spec.Confirm.SelectOk),
|
|
spec.method(spec.Basic.Ack, 'Lb'),
|
|
spec.method(spec.Basic.Nack, 'Lb'),
|
|
}
|
|
_METHODS = {m.method_sig: m for m in _METHODS}
|
|
|
|
_ALLOWED_METHODS_WHEN_CLOSING = (
|
|
spec.Channel.Close, spec.Channel.CloseOk
|
|
)
|
|
|
|
def __init__(self, connection,
|
|
channel_id=None, auto_decode=True, on_open=None):
|
|
if channel_id:
|
|
connection._claim_channel_id(channel_id)
|
|
else:
|
|
channel_id = connection._get_free_channel_id()
|
|
|
|
AMQP_LOGGER.debug('using channel_id: %s', channel_id)
|
|
|
|
super().__init__(connection, channel_id)
|
|
|
|
self.is_open = False
|
|
self.active = True # Flow control
|
|
self.returned_messages = Queue()
|
|
self.callbacks = {}
|
|
self.cancel_callbacks = {}
|
|
self.auto_decode = auto_decode
|
|
self.events = defaultdict(set)
|
|
self.no_ack_consumers = set()
|
|
|
|
self.on_open = ensure_promise(on_open)
|
|
|
|
# set first time basic_publish_confirm is called
|
|
# and publisher confirms are enabled for this channel.
|
|
self._confirm_selected = False
|
|
if self.connection.confirm_publish:
|
|
self.basic_publish = self.basic_publish_confirm
|
|
|
|
__slots__ = (
|
|
"is_open",
|
|
"active",
|
|
"returned_messages",
|
|
"callbacks",
|
|
"cancel_callbacks",
|
|
"events",
|
|
"no_ack_consumers",
|
|
"on_open",
|
|
"_confirm_selected",
|
|
)
|
|
|
|
def then(self, on_success, on_error=None):
|
|
return self.on_open.then(on_success, on_error)
|
|
|
|
def _setup_listeners(self):
|
|
self._callbacks.update({
|
|
spec.Channel.Close: self._on_close,
|
|
spec.Channel.CloseOk: self._on_close_ok,
|
|
spec.Channel.Flow: self._on_flow,
|
|
spec.Channel.OpenOk: self._on_open_ok,
|
|
spec.Basic.Cancel: self._on_basic_cancel,
|
|
spec.Basic.CancelOk: self._on_basic_cancel_ok,
|
|
spec.Basic.Deliver: self._on_basic_deliver,
|
|
spec.Basic.Return: self._on_basic_return,
|
|
spec.Basic.Ack: self._on_basic_ack,
|
|
spec.Basic.Nack: self._on_basic_nack,
|
|
})
|
|
|
|
def collect(self):
|
|
"""Tear down this object.
|
|
|
|
Best called after we've agreed to close with the server.
|
|
"""
|
|
AMQP_LOGGER.debug('Closed channel #%s', self.channel_id)
|
|
self.is_open = False
|
|
channel_id, self.channel_id = self.channel_id, None
|
|
connection, self.connection = self.connection, None
|
|
if connection:
|
|
connection.channels.pop(channel_id, None)
|
|
try:
|
|
connection._used_channel_ids.remove(channel_id)
|
|
except ValueError:
|
|
# channel id already removed
|
|
pass
|
|
self.callbacks.clear()
|
|
self.cancel_callbacks.clear()
|
|
self.events.clear()
|
|
self.no_ack_consumers.clear()
|
|
|
|
def _do_revive(self):
|
|
self.is_open = False
|
|
self.open()
|
|
|
|
def close(self, reply_code=0, reply_text='', method_sig=(0, 0),
|
|
argsig='BsBB'):
|
|
"""Request a channel close.
|
|
|
|
This method indicates that the sender wants to close the
|
|
channel. This may be due to internal conditions (e.g. a forced
|
|
shut-down) or due to an error handling a specific method, i.e.
|
|
an exception. When a close is due to an exception, the sender
|
|
provides the class and method id of the method which caused
|
|
the exception.
|
|
|
|
RULE:
|
|
|
|
After sending this method any received method except
|
|
Channel.Close-OK MUST be discarded.
|
|
|
|
RULE:
|
|
|
|
The peer sending this method MAY use a counter or timeout
|
|
to detect failure of the other peer to respond correctly
|
|
with Channel.Close-OK..
|
|
|
|
PARAMETERS:
|
|
reply_code: short
|
|
|
|
The reply code. The AMQ reply codes are defined in AMQ
|
|
RFC 011.
|
|
|
|
reply_text: shortstr
|
|
|
|
The localised reply text. This text can be logged as an
|
|
aid to resolving issues.
|
|
|
|
class_id: short
|
|
|
|
failing method class
|
|
|
|
When the close is provoked by a method exception, this
|
|
is the class of the method.
|
|
|
|
method_id: short
|
|
|
|
failing method ID
|
|
|
|
When the close is provoked by a method exception, this
|
|
is the ID of the method.
|
|
"""
|
|
try:
|
|
if self.connection is None:
|
|
return
|
|
if self.connection.channels is None:
|
|
return
|
|
if not self.is_open:
|
|
return
|
|
|
|
self.is_closing = True
|
|
return self.send_method(
|
|
spec.Channel.Close, argsig,
|
|
(reply_code, reply_text, method_sig[0], method_sig[1]),
|
|
wait=spec.Channel.CloseOk,
|
|
)
|
|
finally:
|
|
self.is_closing = False
|
|
self.connection = None
|
|
|
|
def _on_close(self, reply_code, reply_text, class_id, method_id):
|
|
"""Request a channel close.
|
|
|
|
This method indicates that the sender wants to close the
|
|
channel. This may be due to internal conditions (e.g. a forced
|
|
shut-down) or due to an error handling a specific method, i.e.
|
|
an exception. When a close is due to an exception, the sender
|
|
provides the class and method id of the method which caused
|
|
the exception.
|
|
|
|
RULE:
|
|
|
|
After sending this method any received method except
|
|
Channel.Close-OK MUST be discarded.
|
|
|
|
RULE:
|
|
|
|
The peer sending this method MAY use a counter or timeout
|
|
to detect failure of the other peer to respond correctly
|
|
with Channel.Close-OK..
|
|
|
|
PARAMETERS:
|
|
reply_code: short
|
|
|
|
The reply code. The AMQ reply codes are defined in AMQ
|
|
RFC 011.
|
|
|
|
reply_text: shortstr
|
|
|
|
The localised reply text. This text can be logged as an
|
|
aid to resolving issues.
|
|
|
|
class_id: short
|
|
|
|
failing method class
|
|
|
|
When the close is provoked by a method exception, this
|
|
is the class of the method.
|
|
|
|
method_id: short
|
|
|
|
failing method ID
|
|
|
|
When the close is provoked by a method exception, this
|
|
is the ID of the method.
|
|
"""
|
|
self.send_method(spec.Channel.CloseOk)
|
|
if not self.connection.is_closing:
|
|
self._do_revive()
|
|
raise error_for_code(
|
|
reply_code, reply_text, (class_id, method_id), ChannelError,
|
|
)
|
|
|
|
def _on_close_ok(self):
|
|
"""Confirm a channel close.
|
|
|
|
This method confirms a Channel.Close method and tells the
|
|
recipient that it is safe to release resources for the channel
|
|
and close the socket.
|
|
|
|
RULE:
|
|
|
|
A peer that detects a socket closure without having
|
|
received a Channel.Close-Ok handshake method SHOULD log
|
|
the error.
|
|
"""
|
|
self.collect()
|
|
|
|
def flow(self, active):
|
|
"""Enable/disable flow from peer.
|
|
|
|
This method asks the peer to pause or restart the flow of
|
|
content data. This is a simple flow-control mechanism that a
|
|
peer can use to avoid overflowing its queues or otherwise
|
|
finding itself receiving more messages than it can process.
|
|
Note that this method is not intended for window control. The
|
|
peer that receives a request to stop sending content should
|
|
finish sending the current content, if any, and then wait
|
|
until it receives a Flow restart method.
|
|
|
|
RULE:
|
|
|
|
When a new channel is opened, it is active. Some
|
|
applications assume that channels are inactive until
|
|
started. To emulate this behaviour a client MAY open the
|
|
channel, then pause it.
|
|
|
|
RULE:
|
|
|
|
When sending content data in multiple frames, a peer
|
|
SHOULD monitor the channel for incoming methods and
|
|
respond to a Channel.Flow as rapidly as possible.
|
|
|
|
RULE:
|
|
|
|
A peer MAY use the Channel.Flow method to throttle
|
|
incoming content data for internal reasons, for example,
|
|
when exchanging data over a slower connection.
|
|
|
|
RULE:
|
|
|
|
The peer that requests a Channel.Flow method MAY
|
|
disconnect and/or ban a peer that does not respect the
|
|
request.
|
|
|
|
PARAMETERS:
|
|
active: boolean
|
|
|
|
start/stop content frames
|
|
|
|
If True, the peer starts sending content frames. If
|
|
False, the peer stops sending content frames.
|
|
"""
|
|
return self.send_method(
|
|
spec.Channel.Flow, 'b', (active,), wait=spec.Channel.FlowOk,
|
|
)
|
|
|
|
def _on_flow(self, active):
|
|
"""Enable/disable flow from peer.
|
|
|
|
This method asks the peer to pause or restart the flow of
|
|
content data. This is a simple flow-control mechanism that a
|
|
peer can use to avoid overflowing its queues or otherwise
|
|
finding itself receiving more messages than it can process.
|
|
Note that this method is not intended for window control. The
|
|
peer that receives a request to stop sending content should
|
|
finish sending the current content, if any, and then wait
|
|
until it receives a Flow restart method.
|
|
|
|
RULE:
|
|
|
|
When a new channel is opened, it is active. Some
|
|
applications assume that channels are inactive until
|
|
started. To emulate this behaviour a client MAY open the
|
|
channel, then pause it.
|
|
|
|
RULE:
|
|
|
|
When sending content data in multiple frames, a peer
|
|
SHOULD monitor the channel for incoming methods and
|
|
respond to a Channel.Flow as rapidly as possible.
|
|
|
|
RULE:
|
|
|
|
A peer MAY use the Channel.Flow method to throttle
|
|
incoming content data for internal reasons, for example,
|
|
when exchanging data over a slower connection.
|
|
|
|
RULE:
|
|
|
|
The peer that requests a Channel.Flow method MAY
|
|
disconnect and/or ban a peer that does not respect the
|
|
request.
|
|
|
|
PARAMETERS:
|
|
active: boolean
|
|
|
|
start/stop content frames
|
|
|
|
If True, the peer starts sending content frames. If
|
|
False, the peer stops sending content frames.
|
|
"""
|
|
self.active = active
|
|
self._x_flow_ok(self.active)
|
|
|
|
def _x_flow_ok(self, active):
|
|
"""Confirm a flow method.
|
|
|
|
Confirms to the peer that a flow command was received and
|
|
processed.
|
|
|
|
PARAMETERS:
|
|
active: boolean
|
|
|
|
current flow setting
|
|
|
|
Confirms the setting of the processed flow method:
|
|
True means the peer will start sending or continue
|
|
to send content frames; False means it will not.
|
|
"""
|
|
return self.send_method(spec.Channel.FlowOk, 'b', (active,))
|
|
|
|
def open(self):
|
|
"""Open a channel for use.
|
|
|
|
This method opens a virtual connection (a channel).
|
|
|
|
RULE:
|
|
|
|
This method MUST NOT be called when the channel is already
|
|
open.
|
|
|
|
PARAMETERS:
|
|
out_of_band: shortstr (DEPRECATED)
|
|
|
|
out-of-band settings
|
|
|
|
Configures out-of-band transfers on this channel. The
|
|
syntax and meaning of this field will be formally
|
|
defined at a later date.
|
|
"""
|
|
if self.is_open:
|
|
return
|
|
|
|
return self.send_method(
|
|
spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk,
|
|
)
|
|
|
|
def _on_open_ok(self):
|
|
"""Signal that the channel is ready.
|
|
|
|
This method signals to the client that the channel is ready
|
|
for use.
|
|
"""
|
|
self.is_open = True
|
|
self.on_open(self)
|
|
AMQP_LOGGER.debug('Channel open')
|
|
|
|
#############
|
|
#
|
|
# Exchange
|
|
#
|
|
#
|
|
# work with exchanges
|
|
#
|
|
# Exchanges match and distribute messages across queues.
|
|
# Exchanges can be configured in the server or created at runtime.
|
|
#
|
|
# GRAMMAR::
|
|
#
|
|
# exchange = C:DECLARE S:DECLARE-OK
|
|
# / C:DELETE S:DELETE-OK
|
|
#
|
|
# RULE:
|
|
#
|
|
# The server MUST implement the direct and fanout exchange
|
|
# types, and predeclare the corresponding exchanges named
|
|
# amq.direct and amq.fanout in each virtual host. The server
|
|
# MUST also predeclare a direct exchange to act as the default
|
|
# exchange for content Publish methods and for default queue
|
|
# bindings.
|
|
#
|
|
# RULE:
|
|
#
|
|
# The server SHOULD implement the topic exchange type, and
|
|
# predeclare the corresponding exchange named amq.topic in
|
|
# each virtual host.
|
|
#
|
|
# RULE:
|
|
#
|
|
# The server MAY implement the system exchange type, and
|
|
# predeclare the corresponding exchanges named amq.system in
|
|
# each virtual host. If the client attempts to bind a queue to
|
|
# the system exchange, the server MUST raise a connection
|
|
# exception with reply code 507 (not allowed).
|
|
#
|
|
|
|
def exchange_declare(self, exchange, type, passive=False, durable=False,
|
|
auto_delete=True, nowait=False, arguments=None,
|
|
argsig='BssbbbbbF'):
|
|
"""Declare exchange, create if needed.
|
|
|
|
This method creates an exchange if it does not already exist,
|
|
and if the exchange exists, verifies that it is of the correct
|
|
and expected class.
|
|
|
|
RULE:
|
|
|
|
The server SHOULD support a minimum of 16 exchanges per
|
|
virtual host and ideally, impose no limit except as
|
|
defined by available resources.
|
|
|
|
PARAMETERS:
|
|
exchange: shortstr
|
|
|
|
RULE:
|
|
|
|
Exchange names starting with "amq." are reserved
|
|
for predeclared and standardised exchanges. If
|
|
the client attempts to create an exchange starting
|
|
with "amq.", the server MUST raise a channel
|
|
exception with reply code 403 (access refused).
|
|
|
|
type: shortstr
|
|
|
|
exchange type
|
|
|
|
Each exchange belongs to one of a set of exchange
|
|
types implemented by the server. The exchange types
|
|
define the functionality of the exchange - i.e. how
|
|
messages are routed through it. It is not valid or
|
|
meaningful to attempt to change the type of an
|
|
existing exchange.
|
|
|
|
RULE:
|
|
|
|
If the exchange already exists with a different
|
|
type, the server MUST raise a connection exception
|
|
with a reply code 507 (not allowed).
|
|
|
|
RULE:
|
|
|
|
If the server does not support the requested
|
|
exchange type it MUST raise a connection exception
|
|
with a reply code 503 (command invalid).
|
|
|
|
passive: boolean
|
|
|
|
do not create exchange
|
|
|
|
If set, the server will not create the exchange. The
|
|
client can use this to check whether an exchange
|
|
exists without modifying the server state.
|
|
|
|
RULE:
|
|
|
|
If set, and the exchange does not already exist,
|
|
the server MUST raise a channel exception with
|
|
reply code 404 (not found).
|
|
|
|
durable: boolean
|
|
|
|
request a durable exchange
|
|
|
|
If set when creating a new exchange, the exchange will
|
|
be marked as durable. Durable exchanges remain active
|
|
when a server restarts. Non-durable exchanges
|
|
(transient exchanges) are purged if/when a server
|
|
restarts.
|
|
|
|
RULE:
|
|
|
|
The server MUST support both durable and transient
|
|
exchanges.
|
|
|
|
RULE:
|
|
|
|
The server MUST ignore the durable field if the
|
|
exchange already exists.
|
|
|
|
auto_delete: boolean
|
|
|
|
auto-delete when unused
|
|
|
|
If set, the exchange is deleted when all queues have
|
|
finished using it.
|
|
|
|
RULE:
|
|
|
|
The server SHOULD allow for a reasonable delay
|
|
between the point when it determines that an
|
|
exchange is not being used (or no longer used),
|
|
and the point when it deletes the exchange. At
|
|
the least it must allow a client to create an
|
|
exchange and then bind a queue to it, with a small
|
|
but non-zero delay between these two actions.
|
|
|
|
RULE:
|
|
|
|
The server MUST ignore the auto-delete field if
|
|
the exchange already exists.
|
|
|
|
nowait: boolean
|
|
|
|
do not send a reply method
|
|
|
|
If set, the server will not respond to the method. The
|
|
client should not wait for a reply method. If the
|
|
server could not complete the method it will raise a
|
|
channel or connection exception.
|
|
|
|
arguments: table
|
|
|
|
arguments for declaration
|
|
|
|
A set of arguments for the declaration. The syntax and
|
|
semantics of these arguments depends on the server
|
|
implementation. This field is ignored if passive is
|
|
True.
|
|
"""
|
|
self.send_method(
|
|
spec.Exchange.Declare, argsig,
|
|
(0, exchange, type, passive, durable, auto_delete,
|
|
False, nowait, arguments),
|
|
wait=None if nowait else spec.Exchange.DeclareOk,
|
|
)
|
|
|
|
def exchange_delete(self, exchange, if_unused=False, nowait=False,
|
|
argsig='Bsbb'):
|
|
"""Delete an exchange.
|
|
|
|
This method deletes an exchange. When an exchange is deleted
|
|
all queue bindings on the exchange are cancelled.
|
|
|
|
PARAMETERS:
|
|
exchange: shortstr
|
|
|
|
RULE:
|
|
|
|
The exchange MUST exist. Attempting to delete a
|
|
non-existing exchange causes a channel exception.
|
|
|
|
if_unused: boolean
|
|
|
|
delete only if unused
|
|
|
|
If set, the server will only delete the exchange if it
|
|
has no queue bindings. If the exchange has queue
|
|
bindings the server does not delete it but raises a
|
|
channel exception instead.
|
|
|
|
RULE:
|
|
|
|
If set, the server SHOULD delete the exchange but
|
|
only if it has no queue bindings.
|
|
|
|
RULE:
|
|
|
|
If set, the server SHOULD raise a channel
|
|
exception if the exchange is in use.
|
|
|
|
nowait: boolean
|
|
|
|
do not send a reply method
|
|
|
|
If set, the server will not respond to the method. The
|
|
client should not wait for a reply method. If the
|
|
server could not complete the method it will raise a
|
|
channel or connection exception.
|
|
"""
|
|
return self.send_method(
|
|
spec.Exchange.Delete, argsig, (0, exchange, if_unused, nowait),
|
|
wait=None if nowait else spec.Exchange.DeleteOk,
|
|
)
|
|
|
|
def exchange_bind(self, destination, source='', routing_key='',
|
|
nowait=False, arguments=None, argsig='BsssbF'):
|
|
"""Bind an exchange to an exchange.
|
|
|
|
RULE:
|
|
|
|
A server MUST allow and ignore duplicate bindings - that
|
|
is, two or more bind methods for a specific exchanges,
|
|
with identical arguments - without treating these as an
|
|
error.
|
|
|
|
RULE:
|
|
|
|
A server MUST allow cycles of exchange bindings to be
|
|
created including allowing an exchange to be bound to
|
|
itself.
|
|
|
|
RULE:
|
|
|
|
A server MUST not deliver the same message more than once
|
|
to a destination exchange, even if the topology of
|
|
exchanges and bindings results in multiple (even infinite)
|
|
routes to that exchange.
|
|
|
|
PARAMETERS:
|
|
reserved-1: short
|
|
|
|
destination: shortstr
|
|
|
|
Specifies the name of the destination exchange to
|
|
bind.
|
|
|
|
RULE:
|
|
|
|
A client MUST NOT be allowed to bind a non-
|
|
existent destination exchange.
|
|
|
|
RULE:
|
|
|
|
The server MUST accept a blank exchange name to
|
|
mean the default exchange.
|
|
|
|
source: shortstr
|
|
|
|
Specifies the name of the source exchange to bind.
|
|
|
|
RULE:
|
|
|
|
A client MUST NOT be allowed to bind a non-
|
|
existent source exchange.
|
|
|
|
RULE:
|
|
|
|
The server MUST accept a blank exchange name to
|
|
mean the default exchange.
|
|
|
|
routing-key: shortstr
|
|
|
|
Specifies the routing key for the binding. The routing
|
|
key is used for routing messages depending on the
|
|
exchange configuration. Not all exchanges use a
|
|
routing key - refer to the specific exchange
|
|
documentation.
|
|
|
|
no-wait: bit
|
|
|
|
arguments: table
|
|
|
|
A set of arguments for the binding. The syntax and
|
|
semantics of these arguments depends on the exchange
|
|
class.
|
|
"""
|
|
return self.send_method(
|
|
spec.Exchange.Bind, argsig,
|
|
(0, destination, source, routing_key, nowait, arguments),
|
|
wait=None if nowait else spec.Exchange.BindOk,
|
|
)
|
|
|
|
def exchange_unbind(self, destination, source='', routing_key='',
|
|
nowait=False, arguments=None, argsig='BsssbF'):
|
|
"""Unbind an exchange from an exchange.
|
|
|
|
RULE:
|
|
|
|
If a unbind fails, the server MUST raise a connection
|
|
exception.
|
|
|
|
PARAMETERS:
|
|
reserved-1: short
|
|
|
|
destination: shortstr
|
|
|
|
Specifies the name of the destination exchange to
|
|
unbind.
|
|
|
|
RULE:
|
|
|
|
The client MUST NOT attempt to unbind an exchange
|
|
that does not exist from an exchange.
|
|
|
|
RULE:
|
|
|
|
The server MUST accept a blank exchange name to
|
|
mean the default exchange.
|
|
|
|
source: shortstr
|
|
|
|
Specifies the name of the source exchange to unbind.
|
|
|
|
RULE:
|
|
|
|
The client MUST NOT attempt to unbind an exchange
|
|
from an exchange that does not exist.
|
|
|
|
RULE:
|
|
|
|
The server MUST accept a blank exchange name to
|
|
mean the default exchange.
|
|
|
|
routing-key: shortstr
|
|
|
|
Specifies the routing key of the binding to unbind.
|
|
|
|
no-wait: bit
|
|
|
|
arguments: table
|
|
|
|
Specifies the arguments of the binding to unbind.
|
|
"""
|
|
return self.send_method(
|
|
spec.Exchange.Unbind, argsig,
|
|
(0, destination, source, routing_key, nowait, arguments),
|
|
wait=None if nowait else spec.Exchange.UnbindOk,
|
|
)
|
|
|
|
#############
|
|
#
|
|
# Queue
|
|
#
|
|
#
|
|
# work with queues
|
|
#
|
|
# Queues store and forward messages. Queues can be configured in
|
|
# the server or created at runtime. Queues must be attached to at
|
|
# least one exchange in order to receive messages from publishers.
|
|
#
|
|
# GRAMMAR::
|
|
#
|
|
# queue = C:DECLARE S:DECLARE-OK
|
|
# / C:BIND S:BIND-OK
|
|
# / C:PURGE S:PURGE-OK
|
|
# / C:DELETE S:DELETE-OK
|
|
#
|
|
# RULE:
|
|
#
|
|
# A server MUST allow any content class to be sent to any
|
|
# queue, in any mix, and queue and delivery these content
|
|
# classes independently. Note that all methods that fetch
|
|
# content off queues are specific to a given content class.
|
|
#
|
|
|
|
def queue_bind(self, queue, exchange='', routing_key='',
|
|
nowait=False, arguments=None, argsig='BsssbF'):
|
|
"""Bind queue to an exchange.
|
|
|
|
This method binds a queue to an exchange. Until a queue is
|
|
bound it will not receive any messages. In a classic
|
|
messaging model, store-and-forward queues are bound to a dest
|
|
exchange and subscription queues are bound to a dest_wild
|
|
exchange.
|
|
|
|
RULE:
|
|
|
|
A server MUST allow ignore duplicate bindings - that is,
|
|
two or more bind methods for a specific queue, with
|
|
identical arguments - without treating these as an error.
|
|
|
|
RULE:
|
|
|
|
If a bind fails, the server MUST raise a connection
|
|
exception.
|
|
|
|
RULE:
|
|
|
|
The server MUST NOT allow a durable queue to bind to a
|
|
transient exchange. If the client attempts this the server
|
|
MUST raise a channel exception.
|
|
|
|
RULE:
|
|
|
|
Bindings for durable queues are automatically durable and
|
|
the server SHOULD restore such bindings after a server
|
|
restart.
|
|
|
|
RULE:
|
|
|
|
The server SHOULD support at least 4 bindings per queue,
|
|
and ideally, impose no limit except as defined by
|
|
available resources.
|
|
|
|
PARAMETERS:
|
|
queue: shortstr
|
|
|
|
Specifies the name of the queue to bind. If the queue
|
|
name is empty, refers to the current queue for the
|
|
channel, which is the last declared queue.
|
|
|
|
RULE:
|
|
|
|
If the client did not previously declare a queue,
|
|
and the queue name in this method is empty, the
|
|
server MUST raise a connection exception with
|
|
reply code 530 (not allowed).
|
|
|
|
RULE:
|
|
|
|
If the queue does not exist the server MUST raise
|
|
a channel exception with reply code 404 (not
|
|
found).
|
|
|
|
exchange: shortstr
|
|
|
|
The name of the exchange to bind to.
|
|
|
|
RULE:
|
|
|
|
If the exchange does not exist the server MUST
|
|
raise a channel exception with reply code 404 (not
|
|
found).
|
|
|
|
routing_key: shortstr
|
|
|
|
message routing key
|
|
|
|
Specifies the routing key for the binding. The
|
|
routing key is used for routing messages depending on
|
|
the exchange configuration. Not all exchanges use a
|
|
routing key - refer to the specific exchange
|
|
documentation. If the routing key is empty and the
|
|
queue name is empty, the routing key will be the
|
|
current queue for the channel, which is the last
|
|
declared queue.
|
|
|
|
nowait: boolean
|
|
|
|
do not send a reply method
|
|
|
|
If set, the server will not respond to the method. The
|
|
client should not wait for a reply method. If the
|
|
server could not complete the method it will raise a
|
|
channel or connection exception.
|
|
|
|
arguments: table
|
|
|
|
arguments for binding
|
|
|
|
A set of arguments for the binding. The syntax and
|
|
semantics of these arguments depends on the exchange
|
|
class.
|
|
"""
|
|
return self.send_method(
|
|
spec.Queue.Bind, argsig,
|
|
(0, queue, exchange, routing_key, nowait, arguments),
|
|
wait=None if nowait else spec.Queue.BindOk,
|
|
)
|
|
|
|
def queue_unbind(self, queue, exchange, routing_key='',
|
|
nowait=False, arguments=None, argsig='BsssF'):
|
|
"""Unbind a queue from an exchange.
|
|
|
|
This method unbinds a queue from an exchange.
|
|
|
|
RULE:
|
|
|
|
If a unbind fails, the server MUST raise a connection exception.
|
|
|
|
PARAMETERS:
|
|
queue: shortstr
|
|
|
|
Specifies the name of the queue to unbind.
|
|
|
|
RULE:
|
|
|
|
The client MUST either specify a queue name or have
|
|
previously declared a queue on the same channel
|
|
|
|
RULE:
|
|
|
|
The client MUST NOT attempt to unbind a queue that
|
|
does not exist.
|
|
|
|
exchange: shortstr
|
|
|
|
The name of the exchange to unbind from.
|
|
|
|
RULE:
|
|
|
|
The client MUST NOT attempt to unbind a queue from an
|
|
exchange that does not exist.
|
|
|
|
RULE:
|
|
|
|
The server MUST accept a blank exchange name to mean
|
|
the default exchange.
|
|
|
|
routing_key: shortstr
|
|
|
|
routing key of binding
|
|
|
|
Specifies the routing key of the binding to unbind.
|
|
|
|
arguments: table
|
|
|
|
arguments of binding
|
|
|
|
Specifies the arguments of the binding to unbind.
|
|
"""
|
|
return self.send_method(
|
|
spec.Queue.Unbind, argsig,
|
|
(0, queue, exchange, routing_key, arguments),
|
|
wait=None if nowait else spec.Queue.UnbindOk,
|
|
)
|
|
|
|
def queue_declare(self, queue='', passive=False, durable=False,
|
|
exclusive=False, auto_delete=True, nowait=False,
|
|
arguments=None, argsig='BsbbbbbF'):
|
|
"""Declare queue, create if needed.
|
|
|
|
This method creates or checks a queue. When creating a new
|
|
queue the client can specify various properties that control
|
|
the durability of the queue and its contents, and the level of
|
|
sharing for the queue.
|
|
|
|
RULE:
|
|
|
|
The server MUST create a default binding for a newly-
|
|
created queue to the default exchange, which is an
|
|
exchange of type 'direct'.
|
|
|
|
RULE:
|
|
|
|
The server SHOULD support a minimum of 256 queues per
|
|
virtual host and ideally, impose no limit except as
|
|
defined by available resources.
|
|
|
|
PARAMETERS:
|
|
queue: shortstr
|
|
|
|
RULE:
|
|
|
|
The queue name MAY be empty, in which case the
|
|
server MUST create a new queue with a unique
|
|
generated name and return this to the client in
|
|
the Declare-Ok method.
|
|
|
|
RULE:
|
|
|
|
Queue names starting with "amq." are reserved for
|
|
predeclared and standardised server queues. If
|
|
the queue name starts with "amq." and the passive
|
|
option is False, the server MUST raise a connection
|
|
exception with reply code 403 (access refused).
|
|
|
|
passive: boolean
|
|
|
|
do not create queue
|
|
|
|
If set, the server will not create the queue. The
|
|
client can use this to check whether a queue exists
|
|
without modifying the server state.
|
|
|
|
RULE:
|
|
|
|
If set, and the queue does not already exist, the
|
|
server MUST respond with a reply code 404 (not
|
|
found) and raise a channel exception.
|
|
|
|
durable: boolean
|
|
|
|
request a durable queue
|
|
|
|
If set when creating a new queue, the queue will be
|
|
marked as durable. Durable queues remain active when
|
|
a server restarts. Non-durable queues (transient
|
|
queues) are purged if/when a server restarts. Note
|
|
that durable queues do not necessarily hold persistent
|
|
messages, although it does not make sense to send
|
|
persistent messages to a transient queue.
|
|
|
|
RULE:
|
|
|
|
The server MUST recreate the durable queue after a
|
|
restart.
|
|
|
|
RULE:
|
|
|
|
The server MUST support both durable and transient
|
|
queues.
|
|
|
|
RULE:
|
|
|
|
The server MUST ignore the durable field if the
|
|
queue already exists.
|
|
|
|
exclusive: boolean
|
|
|
|
request an exclusive queue
|
|
|
|
Exclusive queues may only be consumed from by the
|
|
current connection. Setting the 'exclusive' flag
|
|
always implies 'auto-delete'.
|
|
|
|
RULE:
|
|
|
|
The server MUST support both exclusive (private)
|
|
and non-exclusive (shared) queues.
|
|
|
|
RULE:
|
|
|
|
The server MUST raise a channel exception if
|
|
'exclusive' is specified and the queue already
|
|
exists and is owned by a different connection.
|
|
|
|
auto_delete: boolean
|
|
|
|
auto-delete queue when unused
|
|
|
|
If set, the queue is deleted when all consumers have
|
|
finished using it. Last consumer can be cancelled
|
|
either explicitly or because its channel is closed. If
|
|
there was no consumer ever on the queue, it won't be
|
|
deleted.
|
|
|
|
RULE:
|
|
|
|
The server SHOULD allow for a reasonable delay
|
|
between the point when it determines that a queue
|
|
is not being used (or no longer used), and the
|
|
point when it deletes the queue. At the least it
|
|
must allow a client to create a queue and then
|
|
create a consumer to read from it, with a small
|
|
but non-zero delay between these two actions. The
|
|
server should equally allow for clients that may
|
|
be disconnected prematurely, and wish to re-
|
|
consume from the same queue without losing
|
|
messages. We would recommend a configurable
|
|
timeout, with a suitable default value being one
|
|
minute.
|
|
|
|
RULE:
|
|
|
|
The server MUST ignore the auto-delete field if
|
|
the queue already exists.
|
|
|
|
nowait: boolean
|
|
|
|
do not send a reply method
|
|
|
|
If set, the server will not respond to the method. The
|
|
client should not wait for a reply method. If the
|
|
server could not complete the method it will raise a
|
|
channel or connection exception.
|
|
|
|
arguments: table
|
|
|
|
arguments for declaration
|
|
|
|
A set of arguments for the declaration. The syntax and
|
|
semantics of these arguments depends on the server
|
|
implementation. This field is ignored if passive is
|
|
True.
|
|
|
|
Returns a tuple containing 3 items:
|
|
the name of the queue (essential for automatically-named queues),
|
|
message count and
|
|
consumer count
|
|
"""
|
|
self.send_method(
|
|
spec.Queue.Declare, argsig,
|
|
(0, queue, passive, durable, exclusive, auto_delete,
|
|
nowait, arguments),
|
|
)
|
|
if not nowait:
|
|
return queue_declare_ok_t(*self.wait(
|
|
spec.Queue.DeclareOk, returns_tuple=True,
|
|
))
|
|
|
|
def queue_delete(self, queue='',
|
|
if_unused=False, if_empty=False, nowait=False,
|
|
argsig='Bsbbb'):
|
|
"""Delete a queue.
|
|
|
|
This method deletes a queue. When a queue is deleted any
|
|
pending messages are sent to a dead-letter queue if this is
|
|
defined in the server configuration, and all consumers on the
|
|
queue are cancelled.
|
|
|
|
RULE:
|
|
|
|
The server SHOULD use a dead-letter queue to hold messages
|
|
that were pending on a deleted queue, and MAY provide
|
|
facilities for a system administrator to move these
|
|
messages back to an active queue.
|
|
|
|
PARAMETERS:
|
|
queue: shortstr
|
|
|
|
Specifies the name of the queue to delete. If the
|
|
queue name is empty, refers to the current queue for
|
|
the channel, which is the last declared queue.
|
|
|
|
RULE:
|
|
|
|
If the client did not previously declare a queue,
|
|
and the queue name in this method is empty, the
|
|
server MUST raise a connection exception with
|
|
reply code 530 (not allowed).
|
|
|
|
RULE:
|
|
|
|
The queue must exist. Attempting to delete a non-
|
|
existing queue causes a channel exception.
|
|
|
|
if_unused: boolean
|
|
|
|
delete only if unused
|
|
|
|
If set, the server will only delete the queue if it
|
|
has no consumers. If the queue has consumers the
|
|
server does does not delete it but raises a channel
|
|
exception instead.
|
|
|
|
RULE:
|
|
|
|
The server MUST respect the if-unused flag when
|
|
deleting a queue.
|
|
|
|
if_empty: boolean
|
|
|
|
delete only if empty
|
|
|
|
If set, the server will only delete the queue if it
|
|
has no messages. If the queue is not empty the server
|
|
raises a channel exception.
|
|
|
|
nowait: boolean
|
|
|
|
do not send a reply method
|
|
|
|
If set, the server will not respond to the method. The
|
|
client should not wait for a reply method. If the
|
|
server could not complete the method it will raise a
|
|
channel or connection exception.
|
|
|
|
If nowait is False, returns the number of deleted messages.
|
|
"""
|
|
return self.send_method(
|
|
spec.Queue.Delete, argsig,
|
|
(0, queue, if_unused, if_empty, nowait),
|
|
wait=None if nowait else spec.Queue.DeleteOk,
|
|
)
|
|
|
|
def queue_purge(self, queue='', nowait=False, argsig='Bsb'):
|
|
"""Purge a queue.
|
|
|
|
This method removes all messages from a queue. It does not
|
|
cancel consumers. Purged messages are deleted without any
|
|
formal "undo" mechanism.
|
|
|
|
RULE:
|
|
|
|
A call to purge MUST result in an empty queue.
|
|
|
|
RULE:
|
|
|
|
On transacted channels the server MUST not purge messages
|
|
that have already been sent to a client but not yet
|
|
acknowledged.
|
|
|
|
RULE:
|
|
|
|
The server MAY implement a purge queue or log that allows
|
|
system administrators to recover accidentally-purged
|
|
messages. The server SHOULD NOT keep purged messages in
|
|
the same storage spaces as the live messages since the
|
|
volumes of purged messages may get very large.
|
|
|
|
PARAMETERS:
|
|
queue: shortstr
|
|
|
|
Specifies the name of the queue to purge. If the
|
|
queue name is empty, refers to the current queue for
|
|
the channel, which is the last declared queue.
|
|
|
|
RULE:
|
|
|
|
If the client did not previously declare a queue,
|
|
and the queue name in this method is empty, the
|
|
server MUST raise a connection exception with
|
|
reply code 530 (not allowed).
|
|
|
|
RULE:
|
|
|
|
The queue must exist. Attempting to purge a non-
|
|
existing queue causes a channel exception.
|
|
|
|
nowait: boolean
|
|
|
|
do not send a reply method
|
|
|
|
If set, the server will not respond to the method. The
|
|
client should not wait for a reply method. If the
|
|
server could not complete the method it will raise a
|
|
channel or connection exception.
|
|
|
|
If nowait is False, returns a number of purged messages.
|
|
"""
|
|
return self.send_method(
|
|
spec.Queue.Purge, argsig, (0, queue, nowait),
|
|
wait=None if nowait else spec.Queue.PurgeOk,
|
|
)
|
|
|
|
#############
|
|
#
|
|
# Basic
|
|
#
|
|
#
|
|
# work with basic content
|
|
#
|
|
# The Basic class provides methods that support an industry-
|
|
# standard messaging model.
|
|
#
|
|
# GRAMMAR::
|
|
#
|
|
# basic = C:QOS S:QOS-OK
|
|
# / C:CONSUME S:CONSUME-OK
|
|
# / C:CANCEL S:CANCEL-OK
|
|
# / C:PUBLISH content
|
|
# / S:RETURN content
|
|
# / S:DELIVER content
|
|
# / C:GET ( S:GET-OK content / S:GET-EMPTY )
|
|
# / C:ACK
|
|
# / C:REJECT
|
|
#
|
|
# RULE:
|
|
#
|
|
# The server SHOULD respect the persistent property of basic
|
|
# messages and SHOULD make a best-effort to hold persistent
|
|
# basic messages on a reliable storage mechanism.
|
|
#
|
|
# RULE:
|
|
#
|
|
# The server MUST NOT discard a persistent basic message in
|
|
# case of a queue overflow. The server MAY use the
|
|
# Channel.Flow method to slow or stop a basic message
|
|
# publisher when necessary.
|
|
#
|
|
# RULE:
|
|
#
|
|
# The server MAY overflow non-persistent basic messages to
|
|
# persistent storage and MAY discard or dead-letter non-
|
|
# persistent basic messages on a priority basis if the queue
|
|
# size exceeds some configured limit.
|
|
#
|
|
# RULE:
|
|
#
|
|
# The server MUST implement at least 2 priority levels for
|
|
# basic messages, where priorities 0-4 and 5-9 are treated as
|
|
# two distinct levels. The server MAY implement up to 10
|
|
# priority levels.
|
|
#
|
|
# RULE:
|
|
#
|
|
# The server MUST deliver messages of the same priority in
|
|
# order irrespective of their individual persistence.
|
|
#
|
|
# RULE:
|
|
#
|
|
# The server MUST support both automatic and explicit
|
|
# acknowledgments on Basic content.
|
|
#
|
|
|
|
def basic_ack(self, delivery_tag, multiple=False, argsig='Lb'):
|
|
"""Acknowledge one or more messages.
|
|
|
|
This method acknowledges one or more messages delivered via
|
|
the Deliver or Get-Ok methods. The client can ask to confirm
|
|
a single message or a set of messages up to and including a
|
|
specific message.
|
|
|
|
PARAMETERS:
|
|
delivery_tag: longlong
|
|
|
|
server-assigned delivery tag
|
|
|
|
The server-assigned and channel-specific delivery tag
|
|
|
|
RULE:
|
|
|
|
The delivery tag is valid only within the channel
|
|
from which the message was received. I.e. a client
|
|
MUST NOT receive a message on one channel and then
|
|
acknowledge it on another.
|
|
|
|
RULE:
|
|
|
|
The server MUST NOT use a zero value for delivery
|
|
tags. Zero is reserved for client use, meaning "all
|
|
messages so far received".
|
|
|
|
multiple: boolean
|
|
|
|
acknowledge multiple messages
|
|
|
|
If set to True, the delivery tag is treated as "up to
|
|
and including", so that the client can acknowledge
|
|
multiple messages with a single method. If set to
|
|
False, the delivery tag refers to a single message.
|
|
If the multiple field is True, and the delivery tag
|
|
is zero, tells the server to acknowledge all
|
|
outstanding messages.
|
|
|
|
RULE:
|
|
|
|
The server MUST validate that a non-zero delivery-
|
|
tag refers to an delivered message, and raise a
|
|
channel exception if this is not the case.
|
|
"""
|
|
return self.send_method(
|
|
spec.Basic.Ack, argsig, (delivery_tag, multiple),
|
|
)
|
|
|
|
def basic_cancel(self, consumer_tag, nowait=False, argsig='sb'):
|
|
"""End a queue consumer.
|
|
|
|
This method cancels a consumer. This does not affect already
|
|
delivered messages, but it does mean the server will not send
|
|
any more messages for that consumer. The client may receive
|
|
an arbitrary number of messages in between sending the cancel
|
|
method and receiving the cancel-ok reply.
|
|
|
|
RULE:
|
|
|
|
If the queue no longer exists when the client sends a
|
|
cancel command, or the consumer has been cancelled for
|
|
other reasons, this command has no effect.
|
|
|
|
PARAMETERS:
|
|
consumer_tag: shortstr
|
|
|
|
consumer tag
|
|
|
|
Identifier for the consumer, valid within the current
|
|
connection.
|
|
|
|
RULE:
|
|
|
|
The consumer tag is valid only within the channel
|
|
from which the consumer was created. I.e. a client
|
|
MUST NOT create a consumer in one channel and then
|
|
use it in another.
|
|
|
|
nowait: boolean
|
|
|
|
do not send a reply method
|
|
|
|
If set, the server will not respond to the method. The
|
|
client should not wait for a reply method. If the
|
|
server could not complete the method it will raise a
|
|
channel or connection exception.
|
|
"""
|
|
if self.connection is not None:
|
|
self.no_ack_consumers.discard(consumer_tag)
|
|
return self.send_method(
|
|
spec.Basic.Cancel, argsig, (consumer_tag, nowait),
|
|
wait=None if nowait else spec.Basic.CancelOk,
|
|
)
|
|
|
|
def _on_basic_cancel(self, consumer_tag):
|
|
"""Consumer cancelled by server.
|
|
|
|
Most likely the queue was deleted.
|
|
|
|
"""
|
|
callback = self._remove_tag(consumer_tag)
|
|
if callback:
|
|
callback(consumer_tag)
|
|
else:
|
|
raise ConsumerCancelled(consumer_tag, spec.Basic.Cancel)
|
|
|
|
def _on_basic_cancel_ok(self, consumer_tag):
|
|
self._remove_tag(consumer_tag)
|
|
|
|
def _remove_tag(self, consumer_tag):
|
|
self.callbacks.pop(consumer_tag, None)
|
|
return self.cancel_callbacks.pop(consumer_tag, None)
|
|
|
|
def basic_consume(self, queue='', consumer_tag='', no_local=False,
|
|
no_ack=False, exclusive=False, nowait=False,
|
|
callback=None, arguments=None, on_cancel=None,
|
|
argsig='BssbbbbF'):
|
|
"""Start a queue consumer.
|
|
|
|
This method asks the server to start a "consumer", which is a
|
|
transient request for messages from a specific queue.
|
|
Consumers last as long as the channel they were created on, or
|
|
until the client cancels them.
|
|
|
|
RULE:
|
|
|
|
The server SHOULD support at least 16 consumers per queue,
|
|
unless the queue was declared as private, and ideally,
|
|
impose no limit except as defined by available resources.
|
|
|
|
PARAMETERS:
|
|
queue: shortstr
|
|
|
|
Specifies the name of the queue to consume from. If
|
|
the queue name is null, refers to the current queue
|
|
for the channel, which is the last declared queue.
|
|
|
|
RULE:
|
|
|
|
If the client did not previously declare a queue,
|
|
and the queue name in this method is empty, the
|
|
server MUST raise a connection exception with
|
|
reply code 530 (not allowed).
|
|
|
|
consumer_tag: shortstr
|
|
|
|
Specifies the identifier for the consumer. The
|
|
consumer tag is local to a connection, so two clients
|
|
can use the same consumer tags. If this field is empty
|
|
the server will generate a unique tag.
|
|
|
|
RULE:
|
|
|
|
The tag MUST NOT refer to an existing consumer. If
|
|
the client attempts to create two consumers with
|
|
the same non-empty tag the server MUST raise a
|
|
connection exception with reply code 530 (not
|
|
allowed).
|
|
|
|
no_local: boolean
|
|
|
|
do not deliver own messages
|
|
|
|
If the no-local field is set the server will not send
|
|
messages to the client that published them.
|
|
|
|
no_ack: boolean
|
|
|
|
no acknowledgment needed
|
|
|
|
If this field is set the server does not expect
|
|
acknowledgments for messages. That is, when a message
|
|
is delivered to the client the server automatically and
|
|
silently acknowledges it on behalf of the client. This
|
|
functionality increases performance but at the cost of
|
|
reliability. Messages can get lost if a client dies
|
|
before it can deliver them to the application.
|
|
|
|
exclusive: boolean
|
|
|
|
request exclusive access
|
|
|
|
Request exclusive consumer access, meaning only this
|
|
consumer can access the queue.
|
|
|
|
RULE:
|
|
|
|
If the server cannot grant exclusive access to the
|
|
queue when asked, - because there are other
|
|
consumers active - it MUST raise a channel
|
|
exception with return code 403 (access refused).
|
|
|
|
nowait: boolean
|
|
|
|
do not send a reply method
|
|
|
|
If set, the server will not respond to the method. The
|
|
client should not wait for a reply method. If the
|
|
server could not complete the method it will raise a
|
|
channel or connection exception.
|
|
|
|
callback: Python callable
|
|
|
|
function/method called with each delivered message
|
|
|
|
For each message delivered by the broker, the
|
|
callable will be called with a Message object
|
|
as the single argument. If no callable is specified,
|
|
messages are quietly discarded, no_ack should probably
|
|
be set to True in that case.
|
|
"""
|
|
p = self.send_method(
|
|
spec.Basic.Consume, argsig,
|
|
(
|
|
0, queue, consumer_tag, no_local, no_ack, exclusive,
|
|
nowait, arguments
|
|
),
|
|
wait=None if nowait else spec.Basic.ConsumeOk,
|
|
returns_tuple=True
|
|
)
|
|
|
|
if not nowait:
|
|
# send_method() returns (consumer_tag,) tuple.
|
|
# consumer_tag is returned by broker using following rules:
|
|
# * consumer_tag is not specified by client, random one
|
|
# is generated by Broker
|
|
# * consumer_tag is provided by client, the same one
|
|
# is returned by broker
|
|
consumer_tag = p[0]
|
|
elif nowait and not consumer_tag:
|
|
raise ValueError(
|
|
'Consumer tag must be specified when nowait is True'
|
|
)
|
|
|
|
self.callbacks[consumer_tag] = callback
|
|
|
|
if on_cancel:
|
|
self.cancel_callbacks[consumer_tag] = on_cancel
|
|
if no_ack:
|
|
self.no_ack_consumers.add(consumer_tag)
|
|
|
|
if not nowait:
|
|
return consumer_tag
|
|
else:
|
|
return p
|
|
|
|
def _on_basic_deliver(self, consumer_tag, delivery_tag, redelivered,
|
|
exchange, routing_key, msg):
|
|
msg.channel = self
|
|
msg.delivery_info = {
|
|
'consumer_tag': consumer_tag,
|
|
'delivery_tag': delivery_tag,
|
|
'redelivered': redelivered,
|
|
'exchange': exchange,
|
|
'routing_key': routing_key,
|
|
}
|
|
|
|
try:
|
|
fun = self.callbacks[consumer_tag]
|
|
except KeyError:
|
|
AMQP_LOGGER.warning(
|
|
REJECTED_MESSAGE_WITHOUT_CALLBACK,
|
|
delivery_tag, consumer_tag, exchange, routing_key,
|
|
)
|
|
self.basic_reject(delivery_tag, requeue=True)
|
|
else:
|
|
fun(msg)
|
|
|
|
def basic_get(self, queue='', no_ack=False, argsig='Bsb'):
|
|
"""Direct access to a queue.
|
|
|
|
This method provides a direct access to the messages in a
|
|
queue using a synchronous dialogue that is designed for
|
|
specific types of application where synchronous functionality
|
|
is more important than performance.
|
|
|
|
PARAMETERS:
|
|
queue: shortstr
|
|
|
|
Specifies the name of the queue to consume from. If
|
|
the queue name is null, refers to the current queue
|
|
for the channel, which is the last declared queue.
|
|
|
|
RULE:
|
|
|
|
If the client did not previously declare a queue,
|
|
and the queue name in this method is empty, the
|
|
server MUST raise a connection exception with
|
|
reply code 530 (not allowed).
|
|
|
|
no_ack: boolean
|
|
|
|
no acknowledgment needed
|
|
|
|
If this field is set the server does not expect
|
|
acknowledgments for messages. That is, when a message
|
|
is delivered to the client the server automatically and
|
|
silently acknowledges it on behalf of the client. This
|
|
functionality increases performance but at the cost of
|
|
reliability. Messages can get lost if a client dies
|
|
before it can deliver them to the application.
|
|
|
|
Non-blocking, returns a amqp.basic_message.Message object,
|
|
or None if queue is empty.
|
|
"""
|
|
ret = self.send_method(
|
|
spec.Basic.Get, argsig, (0, queue, no_ack),
|
|
wait=[spec.Basic.GetOk, spec.Basic.GetEmpty], returns_tuple=True,
|
|
)
|
|
if not ret or len(ret) < 2:
|
|
return self._on_get_empty(*ret)
|
|
return self._on_get_ok(*ret)
|
|
|
|
def _on_get_empty(self, cluster_id=None):
|
|
pass
|
|
|
|
def _on_get_ok(self, delivery_tag, redelivered, exchange, routing_key,
|
|
message_count, msg):
|
|
msg.channel = self
|
|
msg.delivery_info = {
|
|
'delivery_tag': delivery_tag,
|
|
'redelivered': redelivered,
|
|
'exchange': exchange,
|
|
'routing_key': routing_key,
|
|
'message_count': message_count
|
|
}
|
|
return msg
|
|
|
|
def _basic_publish(self, msg, exchange='', routing_key='',
|
|
mandatory=False, immediate=False, timeout=None,
|
|
confirm_timeout=None,
|
|
argsig='Bssbb'):
|
|
"""Publish a message.
|
|
|
|
This method publishes a message to a specific exchange. The
|
|
message will be routed to queues as defined by the exchange
|
|
configuration and distributed to any active consumers when the
|
|
transaction, if any, is committed.
|
|
|
|
When channel is in confirm mode (when Connection parameter
|
|
confirm_publish is set to True), each message is confirmed.
|
|
When broker rejects published message (e.g. due internal broker
|
|
constrains), MessageNacked exception is raised and
|
|
set confirm_timeout to wait maximum confirm_timeout second
|
|
for message to confirm.
|
|
|
|
PARAMETERS:
|
|
exchange: shortstr
|
|
|
|
Specifies the name of the exchange to publish to. The
|
|
exchange name can be empty, meaning the default
|
|
exchange. If the exchange name is specified, and that
|
|
exchange does not exist, the server will raise a
|
|
channel exception.
|
|
|
|
RULE:
|
|
|
|
The server MUST accept a blank exchange name to
|
|
mean the default exchange.
|
|
|
|
RULE:
|
|
|
|
The exchange MAY refuse basic content in which
|
|
case it MUST raise a channel exception with reply
|
|
code 540 (not implemented).
|
|
|
|
routing_key: shortstr
|
|
|
|
Message routing key
|
|
|
|
Specifies the routing key for the message. The
|
|
routing key is used for routing messages depending on
|
|
the exchange configuration.
|
|
|
|
mandatory: boolean
|
|
|
|
indicate mandatory routing
|
|
|
|
This flag tells the server how to react if the message
|
|
cannot be routed to a queue. If this flag is True, the
|
|
server will return an unroutable message with a Return
|
|
method. If this flag is False, the server silently
|
|
drops the message.
|
|
|
|
RULE:
|
|
|
|
The server SHOULD implement the mandatory flag.
|
|
|
|
immediate: boolean
|
|
|
|
request immediate delivery
|
|
|
|
This flag tells the server how to react if the message
|
|
cannot be routed to a queue consumer immediately. If
|
|
this flag is set, the server will return an
|
|
undeliverable message with a Return method. If this
|
|
flag is zero, the server will queue the message, but
|
|
with no guarantee that it will ever be consumed.
|
|
|
|
RULE:
|
|
|
|
The server SHOULD implement the immediate flag.
|
|
|
|
timeout: short
|
|
|
|
timeout for publish
|
|
|
|
Set timeout to wait maximum timeout second
|
|
for message to publish.
|
|
|
|
confirm_timeout: short
|
|
|
|
confirm_timeout for publish in confirm mode
|
|
|
|
When the channel is in confirm mode set
|
|
confirm_timeout to wait maximum confirm_timeout
|
|
second for message to confirm.
|
|
|
|
"""
|
|
if not self.connection:
|
|
raise RecoverableConnectionError(
|
|
'basic_publish: connection closed')
|
|
|
|
capabilities = self.connection. \
|
|
client_properties.get('capabilities', {})
|
|
if capabilities.get('connection.blocked', False):
|
|
try:
|
|
# Check if an event was sent, such as the out of memory message
|
|
self.connection.drain_events(timeout=0)
|
|
except socket.timeout:
|
|
pass
|
|
|
|
try:
|
|
with self.connection.transport.having_timeout(timeout):
|
|
return self.send_method(
|
|
spec.Basic.Publish, argsig,
|
|
(0, exchange, routing_key, mandatory, immediate), msg
|
|
)
|
|
except socket.timeout:
|
|
raise RecoverableChannelError('basic_publish: timed out')
|
|
|
|
basic_publish = _basic_publish
|
|
|
|
def basic_publish_confirm(self, *args, **kwargs):
|
|
confirm_timeout = kwargs.pop('confirm_timeout', None)
|
|
|
|
def confirm_handler(method, *args):
|
|
# When RMQ nacks message we are raising MessageNacked exception
|
|
if method == spec.Basic.Nack:
|
|
raise MessageNacked()
|
|
|
|
if not self._confirm_selected:
|
|
self._confirm_selected = True
|
|
self.confirm_select()
|
|
ret = self._basic_publish(*args, **kwargs)
|
|
# Waiting for confirmation of message.
|
|
timeout = confirm_timeout or kwargs.get('timeout', None)
|
|
self.wait([spec.Basic.Ack, spec.Basic.Nack],
|
|
callback=confirm_handler,
|
|
timeout=timeout)
|
|
return ret
|
|
|
|
def basic_qos(self, prefetch_size, prefetch_count, a_global,
|
|
argsig='lBb'):
|
|
"""Specify quality of service.
|
|
|
|
This method requests a specific quality of service. The QoS
|
|
can be specified for the current channel or for all channels
|
|
on the connection. The particular properties and semantics of
|
|
a qos method always depend on the content class semantics.
|
|
Though the qos method could in principle apply to both peers,
|
|
it is currently meaningful only for the server.
|
|
|
|
PARAMETERS:
|
|
prefetch_size: long
|
|
|
|
prefetch window in octets
|
|
|
|
The client can request that messages be sent in
|
|
advance so that when the client finishes processing a
|
|
message, the following message is already held
|
|
locally, rather than needing to be sent down the
|
|
channel. Prefetching gives a performance improvement.
|
|
This field specifies the prefetch window size in
|
|
octets. The server will send a message in advance if
|
|
it is equal to or smaller in size than the available
|
|
prefetch size (and also falls into other prefetch
|
|
limits). May be set to zero, meaning "no specific
|
|
limit", although other prefetch limits may still
|
|
apply. The prefetch-size is ignored if the no-ack
|
|
option is set.
|
|
|
|
RULE:
|
|
|
|
The server MUST ignore this setting when the
|
|
client is not processing any messages - i.e. the
|
|
prefetch size does not limit the transfer of
|
|
single messages to a client, only the sending in
|
|
advance of more messages while the client still
|
|
has one or more unacknowledged messages.
|
|
|
|
prefetch_count: short
|
|
|
|
prefetch window in messages
|
|
|
|
Specifies a prefetch window in terms of whole
|
|
messages. This field may be used in combination with
|
|
the prefetch-size field; a message will only be sent
|
|
in advance if both prefetch windows (and those at the
|
|
channel and connection level) allow it. The prefetch-
|
|
count is ignored if the no-ack option is set.
|
|
|
|
RULE:
|
|
|
|
The server MAY send less data in advance than
|
|
allowed by the client's specified prefetch windows
|
|
but it MUST NOT send more.
|
|
|
|
a_global: boolean
|
|
|
|
Defines a scope of QoS. Semantics of this parameter differs
|
|
between AMQP 0-9-1 standard and RabbitMQ broker:
|
|
|
|
MEANING IN AMQP 0-9-1:
|
|
False: shared across all consumers on the channel
|
|
True: shared across all consumers on the connection
|
|
MEANING IN RABBITMQ:
|
|
False: applied separately to each new consumer
|
|
on the channel
|
|
True: shared across all consumers on the channel
|
|
"""
|
|
return self.send_method(
|
|
spec.Basic.Qos, argsig, (prefetch_size, prefetch_count, a_global),
|
|
wait=spec.Basic.QosOk,
|
|
)
|
|
|
|
def basic_recover(self, requeue=False):
|
|
"""Redeliver unacknowledged messages.
|
|
|
|
This method asks the broker to redeliver all unacknowledged
|
|
messages on a specified channel. Zero or more messages may be
|
|
redelivered. This method is only allowed on non-transacted
|
|
channels.
|
|
|
|
RULE:
|
|
|
|
The server MUST set the redelivered flag on all messages
|
|
that are resent.
|
|
|
|
RULE:
|
|
|
|
The server MUST raise a channel exception if this is
|
|
called on a transacted channel.
|
|
|
|
PARAMETERS:
|
|
requeue: boolean
|
|
|
|
requeue the message
|
|
|
|
If this field is False, the message will be redelivered
|
|
to the original recipient. If this field is True, the
|
|
server will attempt to requeue the message,
|
|
potentially then delivering it to an alternative
|
|
subscriber.
|
|
"""
|
|
return self.send_method(spec.Basic.Recover, 'b', (requeue,))
|
|
|
|
def basic_recover_async(self, requeue=False):
|
|
return self.send_method(spec.Basic.RecoverAsync, 'b', (requeue,))
|
|
|
|
def basic_reject(self, delivery_tag, requeue, argsig='Lb'):
|
|
"""Reject an incoming message.
|
|
|
|
This method allows a client to reject a message. It can be
|
|
used to interrupt and cancel large incoming messages, or
|
|
return untreatable messages to their original queue.
|
|
|
|
RULE:
|
|
|
|
The server SHOULD be capable of accepting and process the
|
|
Reject method while sending message content with a Deliver
|
|
or Get-Ok method. I.e. the server should read and process
|
|
incoming methods while sending output frames. To cancel a
|
|
partially-send content, the server sends a content body
|
|
frame of size 1 (i.e. with no data except the frame-end
|
|
octet).
|
|
|
|
RULE:
|
|
|
|
The server SHOULD interpret this method as meaning that
|
|
the client is unable to process the message at this time.
|
|
|
|
RULE:
|
|
|
|
A client MUST NOT use this method as a means of selecting
|
|
messages to process. A rejected message MAY be discarded
|
|
or dead-lettered, not necessarily passed to another
|
|
client.
|
|
|
|
PARAMETERS:
|
|
delivery_tag: longlong
|
|
|
|
server-assigned delivery tag
|
|
|
|
The server-assigned and channel-specific delivery tag
|
|
|
|
RULE:
|
|
|
|
The delivery tag is valid only within the channel
|
|
from which the message was received. I.e. a client
|
|
MUST NOT receive a message on one channel and then
|
|
acknowledge it on another.
|
|
|
|
RULE:
|
|
|
|
The server MUST NOT use a zero value for delivery
|
|
tags. Zero is reserved for client use, meaning "all
|
|
messages so far received".
|
|
|
|
requeue: boolean
|
|
|
|
requeue the message
|
|
|
|
If this field is False, the message will be discarded.
|
|
If this field is True, the server will attempt to
|
|
requeue the message.
|
|
|
|
RULE:
|
|
|
|
The server MUST NOT deliver the message to the
|
|
same client within the context of the current
|
|
channel. The recommended strategy is to attempt
|
|
to deliver the message to an alternative consumer,
|
|
and if that is not possible, to move the message
|
|
to a dead-letter queue. The server MAY use more
|
|
sophisticated tracking to hold the message on the
|
|
queue and redeliver it to the same client at a
|
|
later stage.
|
|
"""
|
|
return self.send_method(
|
|
spec.Basic.Reject, argsig, (delivery_tag, requeue),
|
|
)
|
|
|
|
def _on_basic_return(self, reply_code, reply_text,
|
|
exchange, routing_key, message):
|
|
"""Return a failed message.
|
|
|
|
This method returns an undeliverable message that was
|
|
published with the "immediate" flag set, or an unroutable
|
|
message published with the "mandatory" flag set. The reply
|
|
code and text provide information about the reason that the
|
|
message was undeliverable.
|
|
|
|
PARAMETERS:
|
|
reply_code: short
|
|
|
|
The reply code. The AMQ reply codes are defined in AMQ
|
|
RFC 011.
|
|
|
|
reply_text: shortstr
|
|
|
|
The localised reply text. This text can be logged as an
|
|
aid to resolving issues.
|
|
|
|
exchange: shortstr
|
|
|
|
Specifies the name of the exchange that the message
|
|
was originally published to.
|
|
|
|
routing_key: shortstr
|
|
|
|
Message routing key
|
|
|
|
Specifies the routing key name specified when the
|
|
message was published.
|
|
"""
|
|
exc = error_for_code(
|
|
reply_code, reply_text, spec.Basic.Return, ChannelError,
|
|
)
|
|
handlers = self.events.get('basic_return')
|
|
if not handlers:
|
|
raise exc
|
|
for callback in handlers:
|
|
callback(exc, exchange, routing_key, message)
|
|
|
|
#############
|
|
#
|
|
# Tx
|
|
#
|
|
#
|
|
# work with standard transactions
|
|
#
|
|
# Standard transactions provide so-called "1.5 phase commit". We
|
|
# can ensure that work is never lost, but there is a chance of
|
|
# confirmations being lost, so that messages may be resent.
|
|
# Applications that use standard transactions must be able to
|
|
# detect and ignore duplicate messages.
|
|
#
|
|
# GRAMMAR::
|
|
#
|
|
# tx = C:SELECT S:SELECT-OK
|
|
# / C:COMMIT S:COMMIT-OK
|
|
# / C:ROLLBACK S:ROLLBACK-OK
|
|
#
|
|
# RULE:
|
|
#
|
|
# An client using standard transactions SHOULD be able to
|
|
# track all messages received within a reasonable period, and
|
|
# thus detect and reject duplicates of the same message. It
|
|
# SHOULD NOT pass these to the application layer.
|
|
#
|
|
#
|
|
|
|
def tx_commit(self):
|
|
"""Commit the current transaction.
|
|
|
|
This method commits all messages published and acknowledged in
|
|
the current transaction. A new transaction starts immediately
|
|
after a commit.
|
|
"""
|
|
return self.send_method(spec.Tx.Commit, wait=spec.Tx.CommitOk)
|
|
|
|
def tx_rollback(self):
|
|
"""Abandon the current transaction.
|
|
|
|
This method abandons all messages published and acknowledged
|
|
in the current transaction. A new transaction starts
|
|
immediately after a rollback.
|
|
"""
|
|
return self.send_method(spec.Tx.Rollback, wait=spec.Tx.RollbackOk)
|
|
|
|
def tx_select(self):
|
|
"""Select standard transaction mode.
|
|
|
|
This method sets the channel to use standard transactions.
|
|
The client must use this method at least once on a channel
|
|
before using the Commit or Rollback methods.
|
|
"""
|
|
return self.send_method(spec.Tx.Select, wait=spec.Tx.SelectOk)
|
|
|
|
def confirm_select(self, nowait=False):
|
|
"""Enable publisher confirms for this channel.
|
|
|
|
Note: This is an RabbitMQ extension.
|
|
|
|
Can now be used if the channel is in transactional mode.
|
|
|
|
:param nowait:
|
|
If set, the server will not respond to the method.
|
|
The client should not wait for a reply method. If the
|
|
server could not complete the method it will raise a channel
|
|
or connection exception.
|
|
"""
|
|
return self.send_method(
|
|
spec.Confirm.Select, 'b', (nowait,),
|
|
wait=None if nowait else spec.Confirm.SelectOk,
|
|
)
|
|
|
|
def _on_basic_ack(self, delivery_tag, multiple):
|
|
for callback in self.events['basic_ack']:
|
|
callback(delivery_tag, multiple)
|
|
|
|
def _on_basic_nack(self, delivery_tag, multiple):
|
|
for callback in self.events['basic_nack']:
|
|
callback(delivery_tag, multiple)
|