Skip to main content

Run Logic App workflows as publisher/subscriber with the PubSub

motivation

Asynchronous messaging helps with decoupling publishers from consumers, as it avoids the blocking during publishing. Especially in Azure Logic Apps workflows, where we can't assume that the interaction between publisher and consumer is always synchronous.

Invictus provides a solution called the PubSub, that allows Azure Service Bus to act as a message broker and interact in a publish/subscribe-approach via HTTP endpoints; plus having Azure Blob Storage act as a claim-check provider upon publishing messages with too big a size.

🔗 See also the Publisher-Subscriber integration pattern.

Available endpoints

  • /api/Publish: by sending a HTTP request with a custom content, it places a message on an Azure Service Bus topic.
  • /api/Subscribe: by sending a HTTP request with a specified Azure Service Bus topic subscription name, it response available messages.
  • /api/Acknowledge: by sending a HTTP request with a specified message sequence number, it settles the Azure Service Bus message.

PubSub pseudo Logic App diagram

➡️ Publish single message

The /api/Publish endpoint allows users to send a single message to the configured Azure Service Bus topic (default: pubsubv2router) where subscribers are listening to.

JSON propertyRequired (default)Translates to ServiceBusMessageDescription
ContentyesBodyRaw binary content for the message. If exceeds certain size (default 20 000 bytes), then the component applies the claim-check pattern: The message gets send with an empty body, and the content gets saved in Azure Blob Storage. The Subscribe action automatically loads the content based on specific application properties from either the message itself or from Blob Storage.
ContextyesApplicationPropertiesUser-provided properties, appended with the HTTP request headers x-ms-client-tracking-id and x-ms-workflow-run-id if present
MessageIdno (new GUID)MessageIdOptional message ID for duplicate detection purposes.
Full request example
// POST /api/Publish
{
"Content": "ew0KICAiQ291bnRyeUNvZGUiOiAiQkUiLA0KICAiTW9uZXkiOiAgeyAiQW1vdW50IjogIDUwLCAiQ3VycmVuY3kiOiAgIkdCUCIgIH0NCn0NCg==",
"MessageId": "b0f11049-7f4d-4bae-90b2-91d93e69367d",
"Context": {
"x-applicationName": "InvoiceApp",
"x-batchId": "975f7ea4-6247-431b-afb6-6d27fb47516f",
"x-conversationId": "29500405-d7cf-4877-a72b-a3288cff9dc0",
"x-correlationId": "fc13d345-ebd7-44f2-89a9-4371258c0a08"
}
}

The endpoint will respond with 202 Accepted, if the message got published successfully.

⬅️ Subscribe for messages

The /api/Subscribe endpoint allows users to periodically ask for any available published messages on the configured Azure Service Bus topic (default: pubsubv2router).

JSON propertyRequired (default)Translates to Service BusDescription
SubscriptionyesSubscriptionNameName of Azure Service Bus topic subscription, gets created if not exists. (Name is also used as name of the Rule.)
Filterno (subscribe on all messages)SqlExpressionOptional SQL expression that acts as a filter rule for which messages to subscribe on.
BatchSizeno (10)BatchSizeMaximum messages to receive during this single call.
TimeoutMillisecondsno (1min)MaxWaitTimeMaximum time to wait for a message before responding with an empty set of messages.
ShouldDeleteOnReceiveno (false)ReceiveAndDeletefalse (default) means PeekLock, true means receiving messages with ReceiveAndDelete.

⚠️ In some rare cases, the use of ShouldDeleteOnReceive=true could cause the receiver to lose messages. For example when an error occurs on during receiving and one loses the sequence number, or when cancelled/scaled-down happens at the exact moment the client receives the message (and doesn't exist on the topic subscription anymore).
SkipSubscriptionUpsertno (false)create/update subscription and ruletrue means there should already be a topic subscription available, false (default) means that a subscription will be created with the provided Filter.
tip

One can also use the HTTP request query parameters instead of the request body to POST to the /api/Subscribe endpoint: /api/Subscribe?Subscription=orderProcessor.

Full request example
// POST -> /api/Subscribe
{
"Subscription": "orderProcessor",
"Filter": "sys.label = 'OrderCreated'",
"BatchSize": 11,
"TimeoutMilliseconds": 30000,
"ShouldDeleteOnReceive": false,
"SkipSubscriptionUpsert": false
}
Full response example
// 200 OK <- /api/Subscribe
[
{
"subscription": "orderProcessor",
"content": "ew0KICAiQ291bnRyeUNvZGUiOiAiQkUiLA0KICAiTW9uZXkiOiAgeyAiQW1vdW50IjogIDUwLCAiQ3VycmVuY3kiOiAgIkdCUCIgIH0NCn0NCg==",
"context": {
"x-applicationName": "InvoiceApp",
"x-batchId": "975f7ea4-6247-431b-afb6-6d27fb47516f",
"x-conversationId": "29500405-d7cf-4877-a72b-a3288cff9dc0",
"x-correlationId": "fc13d345-ebd7-44f2-89a9-4371258c0a08",
"x-ms-client-tracking-id": "test",
"Diagnostic-Id": "00-0cc7ed09eeaa51b0e835d90890aefb60-b0a02deac9f6fe6d-00"
},
"sequenceNumber": 99
},
...
]
internal workaround

Because messages can be 'acknowledged' separately from the receive location by 'subscription', the message is internally deferred. This is due to restrictions in the Azure SDK which impose that the same receiver must both receive and settle message. Deferring a message doesn't have this limitation.

✔️ Acknowledge message

The /api/Acknowledge endpoint allows users to 'settle' a received message via the /api/Subscribe endpoint. This requires the sequence number of the message.

JSON PropertyRequired (default)Translates to Service BusDescription
SubscriptionyesCreateReceiverName of Azure Service Bus topic subscription to receive the deferred message on (See internal workaround on /api/Subscribe)
SequenceNumberyesReceiveDeferredMessageUnique number assigned by Service Bus, received by the /api/Subscribe response.
AcknowledgementTypeno (Complete)Message settlementType of acknowledge action to take on the message:
  • Complete
  • Abandon
  • Defer
  • DeadLetter
IgnoreNotFoundExceptionno (false)MessageNotFoundtrue means that MessageNotFound Service Bus failures during lookup of the message by its SequenceNumber will result in 202 Accepted; false means a 400 BadRequest will be responded.
Full request example
// POST /api/Acknowledge
{
"Subscription": "subscriptionName",
"AcknowledgementType":"Complete",
"SequenceNumber": 99,
"IgnoreNotFoundException": false,
}
Showing 12 parameters
NameDescriptionTags
approvedMessageSizeInBytes
default: 200000

The maximum byte threshold where the PubSub component applies the claim-check functionality.

comp:pubsubmessaging
autoResubmitDeferredMessages
default: false

Indicates whether the PubSub component should automatically resubmit/recover an Azure Service Bus message older than the deferral time limit.

comp:pubsubmessaging
deferralMessageThresholdInMinutes
default: 30new since v6.2

The PubSub component will try to recover Azure Service Bus messages older than this time limit that were stuck in deferral.

comp:pubsubmessaging
invictusPubSubV2FunctionLocalContainerImage

The URL that navigates to the Azure Container App image of the PubSub component.

comp:pubsubcontainer-apps
pubSubSubscriptionLockTimeoutInMinutes
default: 1

The amount of time in minutes the PubSub component locks an Azure Service Bus message received on a topic subscription.

comp:pubsubmessaging
pubsubV2FunctionName
default: inv-${resourcePrefix}-pubsub-v2

The name of the Azure Container App deployed for the PubSub component.

comp:pubsubcontainer-apps
pubSubV2TopicName
default: pubsubv2router

The name of the Azure Service Bus topic, used by the PubSub component to send/receive messages from.

comp:pubsubmessaging
serviceBusMessageTimeToLiveMinutes
default: 43200

The time limit of the send Azure Service Bus messages by the PubSub component, see Microsoft's messages expiration for more details.

comp:pubsubmessaging
serviceBusNamespaceName
default: invictus-${resourcePrefix}-sbs

The name of the Azure Service Bus namespace resource where the PubSub component controls its messages.

comp:pubsubmessaging
serviceBusSkuName
default: enableVnetSupport ? Premium : Standard

The pricing tier of the Azure Service Bus, used by the PubSub component.

comp:pubsubmessaging
storageAccountName
default: invictus${resourcePrefix}store

The name of the shared Azure Storage Account, used by all Framework components.

comp:pubsubcomp:transcocomp:regex-translatorcomp:xsd-validatorcomp:xml-json-convertercomp:time-sequencercomp:sequence-controllerstorage