/amqp_subscriptions

Primary LanguagePythonApache License 2.0Apache-2.0

Extended Semantics for Topic Subscriptions in AMQP: A Proposal

Description

A subscription is identified by the link name.

The subscription identity can be scoped to the AMQP container - i.e. it is unique with the scope of connections with the same container id specified on open - or it can be global, in which case the identity is independent of the container id.

A global subscription identity is indicated by adding the symbol 'global' to the capabilities for the source.

A subscription may be shared by more than one consuming link. In this case, the messages for the subscription are distributed between all the consuming links. To indicate that the link is shared the 'shared' symbol is added to the capabilities for the source.

AMQP requires that link names are unique within the scope of a container, identified by the container id specified in the open frame. In order to allow multiple links consuming from the same subscription over connections with the same container id, we propose a scheme by which such links can be disambiguated while retaining their ability to identify the same subscription. In this scheme, only the part of the link name up to (but excluding) the '|' character is used for establishing the idenity of the subscription the link refers to. E.g. the link names mysub|foo and mysub|bar would both be interpreted as referencing the subscription named 'mysub' (which could be global or container scoped), but as the two names are different they could be used on the same connection.

Examples

The broker.py script implements a very simplistic pub-sub broker against which the different subscriber examples can be run. There is also a simple publisher example called send.py which takes the messages to publish as command line arguments.

The subscriber examples are as follows:

a.py

A simple non-shared, non-durable subscriber. Multiple instances can be run concurrently and each should get all messages. When not attached, no messages are stored.

b.py

A non-shared, durable subscriber with a container scoped subscription identifier. Only one instance of the subscriber can be run at a time with the default settings. However the container-id or the subscription-id can be changed via command line arguments to cause a different subscription to be used.

c.py

A non-shared, durable subscriber with a global subscription identifier. Only one instance of the subscriber can be run at a time with the default settings. However the subscription-id can be changed via a command line argument to cause a different subscription to be used. The container id can also be explicitly set through a command line argument, but this will not affect the subscription attached to.

d.py

A shared, non-durable subscriber with a container scoped subscription identifier. Multiple instances can be attached to the same subscription at one time and the messages for that subscription should be divided between them. To attach to the subscription the same container id and link name[1] must be used. If there are no active consumers attached to the subscription, it is deleted and messages are not stored for it.

e.py

A shared, non-durable subscriber with a global subscription identifier. Multiple instances can be attached to the same subscription at one time[1] and the messages for that subscription should be divided between them. To attach to the subscription only the link name is considered.

f.py

A shared, durable subscriber with a container scoped subscription identifier. Multiple instances can be attached to the same subscription at one time[1] and the messages for that subscription should be divided between them. The subscription can survive the detachment of all active consumers and will continue to store messages.

g.py

A shared, durable subscriber with a global subscription identifier. Multiple instances can be attached to the same subscription at one time[1] and the messages for that subscription should be divided between them. To attach to the subscription only the link name is considered. Again, the subscription can survive the detachment of all active consumers and will continue to store messages.

[1] A link name may have a special suffix added to ensure uniqueness for a given container-id(/connection) without altering the subscription it identifies. For the purposes of identifying a subscription, anything after and including a '|' character in the link name is ignored.

Code Snippets

simple non-shared, non-durable subscriber:

event.container.create_receiver(self.url)

non-shared, durable subscriber with a container scoped subscription identifier:

event.container.create_receiver(self.url, name=self.subscription, options=[DurableSubscription()])

non-shared, durable subscriber with a global subscription identifier:

event.container.create_receiver(self.url, name=self.subscription, options=[DurableSubscription(), Capabilities('global')])

shared, non-durable subscriber with a container scoped subscription identifier:

event.container.create_receiver(self.url, name=self.subscription, options=[Capabilities('shared')])

shared, non-durable subscriber with a global subscription identifier:

event.container.create_receiver(self.url, name=self.subscription, options=[Capabilities(['global', 'shared'])])

shared, durable subscriber with a container scoped subscription identifier:

event.container.create_receiver(self.url, name=self.subscription, options=[Capabilities('shared'), DurableSubscription()])

shared, durable subscriber with a global subscription identifier:

event.container.create_receiver(self.url, name=self.subscription, options=[Capabilities(['global', 'shared']), DurableSubscription()])

Protocol Traces

simple non-shared, non-durable subscriber:

  -> AMQP
0 -> @open(16) [container-id="b4f238f2-cdfa-4fb7-9c1b-3ab4680debd8", hostname="localhost", channel-max=32767]
0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=2147483647]
0 -> @attach(18) [name="b4f238f2-cdfa-4fb7-9c1b-3ab4680debd8-examples", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=0, timeout=0, dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=10, drain=false]
  <- AMQP
0 <- @open(16) [container-id="Router.A", max-frame-size=16384, channel-max=32767, idle-time-out=8000, offered-capabilities=:"ANONYMOUS-RELAY", properties={:product="qpid-dispatch-router", :version="0.7.0"}]
0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, incoming-window=61, outgoing-window=2147483647]
0 <- @attach(18) [name="b4f238f2-cdfa-4fb7-9c1b-3ab4680debd8-examples", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=0, timeout=0, dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]

non-shared, durable subscriber with a container scoped subscription identifier:

  -> AMQP
0 -> @open(16) [container-id="client-b", hostname="localhost", channel-max=32767]
0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=2147483647]
0 -> @attach(18) [name="subscription-b", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=2, expiry-policy=:never, timeout=0, dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=10, drain=false]
  <- AMQP
0 <- @open(16) [container-id="Router.A", max-frame-size=16384, channel-max=32767, idle-time-out=8000, offered-capabilities=:"ANONYMOUS-RELAY", properties={:product="qpid-dispatch-router", :version="0.7.0"}]
0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, incoming-window=61, outgoing-window=2147483647]
0 <- @attach(18) [name="subscription-b", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=2, expiry-policy=:never, timeout=0, dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]

non-shared, durable subscriber with a global subscription identifier:

  -> AMQP
0 -> @open(16) [container-id="58b89233-84ac-48bb-a543-840f3222e03a", hostname="localhost", channel-max=32767]
0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=2147483647]
0 -> @attach(18) [name="subscription-c", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=2, expiry-policy=:never, timeout=0, dynamic=false, capabilities=:global], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=10, drain=false]
  <- AMQP
0 <- @open(16) [container-id="Router.A", max-frame-size=16384, channel-max=32767, idle-time-out=8000, offered-capabilities=:"ANONYMOUS-RELAY", properties={:product="qpid-dispatch-router", :version="0.7.0"}]
0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, incoming-window=61, outgoing-window=2147483647]
0 <- @attach(18) [name="subscription-c", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=2, expiry-policy=:never, timeout=0, dynamic=false, capabilities=:global], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]

shared, non-durable subscriber with a container scoped subscription identifier:

  -> AMQP
0 -> @open(16) [container-id="client-d", hostname="localhost", channel-max=32767]
0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=2147483647]
0 -> @attach(18) [name="subscription-d", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=0, timeout=0, dynamic=false, capabilities=:shared], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=10, drain=false]
  <- AMQP
0 <- @open(16) [container-id="Router.A", max-frame-size=16384, channel-max=32767, idle-time-out=8000, offered-capabilities=:"ANONYMOUS-RELAY", properties={:product="qpid-dispatch-router", :version="0.7.0"}]
0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, incoming-window=61, outgoing-window=2147483647]
0 <- @attach(18) [name="subscription-d", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=0, timeout=0, dynamic=false, capabilities=:shared], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]

shared, non-durable subscriber with a global subscription identifier:

  -> AMQP
0 -> @open(16) [container-id="a0858d9e-a44e-4aff-a1f8-049d53361a7c", hostname="localhost", channel-max=32767]
0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=2147483647]
0 -> @attach(18) [name="subscription-e", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=0, timeout=0, dynamic=false, capabilities=[:global, :shared]], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=10, drain=false]
  <- AMQP
0 <- @open(16) [container-id="Router.A", max-frame-size=16384, channel-max=32767, idle-time-out=8000, offered-capabilities=:"ANONYMOUS-RELAY", properties={:product="qpid-dispatch-router", :version="0.7.0"}]
0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, incoming-window=61, outgoing-window=2147483647]
0 <- @attach(18) [name="subscription-e", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=0, timeout=0, dynamic=false, capabilities=[:global, :shared]], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]

shared, durable subscriber with a container scoped subscription identifier:

  -> AMQP
0 -> @open(16) [container-id="client-f", hostname="localhost", channel-max=32767]
0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=2147483647]
0 -> @attach(18) [name="subscription-f", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=2, expiry-policy=:never, timeout=0, dynamic=false, capabilities=:shared], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=10, drain=false]
  <- AMQP
0 <- @open(16) [container-id="Router.A", max-frame-size=16384, channel-max=32767, idle-time-out=8000, offered-capabilities=:"ANONYMOUS-RELAY", properties={:product="qpid-dispatch-router", :version="0.7.0"}]
0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, incoming-window=61, outgoing-window=2147483647]
0 <- @attach(18) [name="subscription-f", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=2, expiry-policy=:never, timeout=0, dynamic=false, capabilities=:shared], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]

shared, durable subscriber with a global subscription identifier:

  -> AMQP
0 -> @open(16) [container-id="58e4c34e-1c25-4270-847a-28786682cf3d", hostname="localhost", channel-max=32767]
0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=2147483647]
0 -> @attach(18) [name="subscription-g", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=2, expiry-policy=:never, timeout=0, dynamic=false, capabilities=[:global, :shared]], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=10, drain=false]
  <- AMQP
0 <- @open(16) [container-id="Router.A", max-frame-size=16384, channel-max=32767, idle-time-out=8000, offered-capabilities=:"ANONYMOUS-RELAY", properties={:product="qpid-dispatch-router", :version="0.7.0"}]
0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, incoming-window=61, outgoing-window=2147483647]
0 <- @attach(18) [name="subscription-g", handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=2, expiry-policy=:never, timeout=0, dynamic=false, capabilities=[:global, :shared]], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0]