Run Logic App workflows as publisher/subscriber with the PubSub
Asynchronous messaging helps with decoupling publishers from consumers, as it avoids the blocking during publishing. Especially in Azure Logic Apps workflows, where we can't assume that the interaction between publisher and consumer is always synchronous.
Invictus provides a solution called the PubSub, that allows Azure Service Bus to act as a message broker and interact in a publish/subscribe-approach via HTTP endpoints; plus having Azure Blob Storage act as a claim-check provider upon publishing messages with too big a size.
🔗 See also the Publisher-Subscriber integration pattern.
Available endpoints
/api/Publish: by sending a HTTP request with a custom content, it places a message on an Azure Service Bus topic./api/Subscribe: by sending a HTTP request with a specified Azure Service Bus topic subscription name, it response available messages./api/Acknowledge: by sending a HTTP request with a specified message sequence number, it settles the Azure Service Bus message.

➡️ Publish single message
The /api/Publish endpoint allows users to send a single message to the configured Azure Service Bus topic (default: pubsubv2router) where subscribers are listening to.
| JSON property | Required (default) | Translates to ServiceBusMessage | Description |
|---|---|---|---|
Content | yes | Body | Raw binary content for the message. If exceeds certain size (default 20 000 bytes), then the component applies the claim-check pattern: The message gets send with an empty body, and the content gets saved in Azure Blob Storage. The Subscribe action automatically loads the content based on specific application properties from either the message itself or from Blob Storage. |
Context | yes | ApplicationProperties | User-provided properties, appended with the HTTP request headers x-ms-client-tracking-id and x-ms-workflow-run-id if present |
MessageId | no (new GUID) | MessageId | Optional message ID for duplicate detection purposes. |
Full request example
// POST /api/Publish
{
"Content": "ew0KICAiQ291bnRyeUNvZGUiOiAiQkUiLA0KICAiTW9uZXkiOiAgeyAiQW1vdW50IjogIDUwLCAiQ3VycmVuY3kiOiAgIkdCUCIgIH0NCn0NCg==",
"MessageId": "b0f11049-7f4d-4bae-90b2-91d93e69367d",
"Context": {
"x-applicationName": "InvoiceApp",
"x-batchId": "975f7ea4-6247-431b-afb6-6d27fb47516f",
"x-conversationId": "29500405-d7cf-4877-a72b-a3288cff9dc0",
"x-correlationId": "fc13d345-ebd7-44f2-89a9-4371258c0a08"
}
}
The endpoint will respond with 202 Accepted, if the message got published successfully.
⬅️ Subscribe for messages
The /api/Subscribe endpoint allows users to periodically ask for any available published messages on the configured Azure Service Bus topic (default: pubsubv2router).
| JSON property | Required (default) | Translates to Service Bus | Description |
|---|---|---|---|
Subscription | yes | SubscriptionName | Name of Azure Service Bus topic subscription, gets created if not exists. (Name is also used as name of the Rule.) |
Filter | no (subscribe on all messages) | SqlExpression | Optional SQL expression that acts as a filter rule for which messages to subscribe on. |
BatchSize | no (10) | BatchSize | Maximum messages to receive during this single call. |
TimeoutMilliseconds | no (1min) | MaxWaitTime | Maximum time to wait for a message before responding with an empty set of messages. |
ShouldDeleteOnReceive | no (false) | ReceiveAndDelete | false (default) means PeekLock, true means receiving messages with ReceiveAndDelete. ⚠️ In some rare cases, the use of ShouldDeleteOnReceive=true could cause the receiver to lose messages. For example when an error occurs on during receiving and one loses the sequence number, or when cancelled/scaled-down happens at the exact moment the client receives the message (and doesn't exist on the topic subscription anymore). |
SkipSubscriptionUpsert | no (false) | create/update subscription and rule | true means there should already be a topic subscription available, false (default) means that a subscription will be created with the provided Filter. |
One can also use the HTTP request query parameters instead of the request body to POST to the /api/Subscribe endpoint: /api/Subscribe?Subscription=orderProcessor.
Full request example
// POST -> /api/Subscribe
{
"Subscription": "orderProcessor",
"Filter": "sys.label = 'OrderCreated'",
"BatchSize": 11,
"TimeoutMilliseconds": 30000,
"ShouldDeleteOnReceive": false,
"SkipSubscriptionUpsert": false
}
Full response example
// 200 OK <- /api/Subscribe
[
{
"subscription": "orderProcessor",
"content": "ew0KICAiQ291bnRyeUNvZGUiOiAiQkUiLA0KICAiTW9uZXkiOiAgeyAiQW1vdW50IjogIDUwLCAiQ3VycmVuY3kiOiAgIkdCUCIgIH0NCn0NCg==",
"context": {
"x-applicationName": "InvoiceApp",
"x-batchId": "975f7ea4-6247-431b-afb6-6d27fb47516f",
"x-conversationId": "29500405-d7cf-4877-a72b-a3288cff9dc0",
"x-correlationId": "fc13d345-ebd7-44f2-89a9-4371258c0a08",
"x-ms-client-tracking-id": "test",
"Diagnostic-Id": "00-0cc7ed09eeaa51b0e835d90890aefb60-b0a02deac9f6fe6d-00"
},
"sequenceNumber": 99
},
...
]
Because messages can be 'acknowledged' separately from the receive location by 'subscription', the message is internally deferred. This is due to restrictions in the Azure SDK which impose that the same receiver must both receive and settle message. Deferring a message doesn't have this limitation.
✔️ Acknowledge message
The /api/Acknowledge endpoint allows users to 'settle' a received message via the /api/Subscribe endpoint. This requires the sequence number of the message.
| JSON Property | Required (default) | Translates to Service Bus | Description |
|---|---|---|---|
Subscription | yes | CreateReceiver | Name of Azure Service Bus topic subscription to receive the deferred message on (See internal workaround on /api/Subscribe) |
SequenceNumber | yes | ReceiveDeferredMessage | Unique number assigned by Service Bus, received by the /api/Subscribe response. |
AcknowledgementType | no (Complete) | Message settlement | Type of acknowledge action to take on the message:
|
IgnoreNotFoundException | no (false) | MessageNotFound | true means that MessageNotFound Service Bus failures during lookup of the message by its SequenceNumber will result in 202 Accepted; false means a 400 BadRequest will be responded. |
Full request example
// POST /api/Acknowledge
{
"Subscription": "subscriptionName",
"AcknowledgementType":"Complete",
"SequenceNumber": 99,
"IgnoreNotFoundException": false,
}
Related Bicep template parameters
| Name | Description | Tags |
|---|---|---|
approvedMessageSizeInBytesdefault: 200000 | The maximum byte threshold where the PubSub component applies the claim-check functionality. | comp:pubsubmessaging |
autoResubmitDeferredMessagesdefault: false | Indicates whether the PubSub component should automatically resubmit/recover an Azure Service Bus message older than the deferral time limit. | comp:pubsubmessaging |
deferralMessageThresholdInMinutesdefault: 30new since v6.2 | The PubSub component will try to recover Azure Service Bus messages older than this time limit that were stuck in deferral. | comp:pubsubmessaging |
invictusPubSubV2FunctionLocalContainerImage | The URL that navigates to the Azure Container App image of the PubSub component. | comp:pubsubcontainer-apps |
pubSubSubscriptionLockTimeoutInMinutesdefault: 1 | The amount of time in minutes the PubSub component locks an Azure Service Bus message received on a topic subscription. | comp:pubsubmessaging |
pubsubV2FunctionNamedefault: inv-${resourcePrefix}-pubsub-v2 | The name of the Azure Container App deployed for the PubSub component. | comp:pubsubcontainer-apps |
pubSubV2Scaling | The Azure Container App scaling options of the PubSub component. | comp:pubsubscalingcontainer-apps |
pubSubV2TopicNamedefault: pubsubv2router | The name of the Azure Service Bus topic, used by the PubSub component to send/receive messages from. | comp:pubsubmessaging |
serviceBusMessageTimeToLiveMinutesdefault: 43200 | The time limit of the send Azure Service Bus messages by the PubSub component, see Microsoft's messages expiration for more details. | comp:pubsubmessaging |
serviceBusNamespaceNamedefault: invictus-${resourcePrefix}-sbs | The name of the Azure Service Bus namespace resource where the PubSub component controls its messages. | comp:pubsubmessaging |
serviceBusSkuNamedefault: enableVnetSupport ? Premium : Standard | The pricing tier of the Azure Service Bus, used by the PubSub component. | comp:pubsubmessaging |
storageAccountNamedefault: invictus${resourcePrefix}store | The name of the shared Azure Storage Account, used by all Framework components. | comp:pubsubcomp:transcocomp:regex-translatorcomp:xsd-validatorcomp:xml-json-convertercomp:time-sequencercomp:sequence-controllerstorage |