Consume Event Mesh Message
We will attempt to consume event messages from an Event Mesh instance that has been created in a different space than the one where the CAP application is deployed.
How SAP Event Mesh Works (Consumption Flow)
SAP Event Mesh is a message broker based on pub/sub (publish–subscribe) pattern.
- Producer (Publisher) sends event/message to a Topic.
- Event Mesh Broker stores & routes message to subscribed queues.
- Queue (Subscriber endpoint) holds messages until consumed.
- Consumer (Your App) reads messages from the queue. Consumer doesnot consume directly from topic, it consume from a queue bound to that topic.
Different ways to Consume Event Messages -
1. Consume Event Mesh with service binding
- Event Driven works on Push Mechanism where your app subscribes to a queue or topic.
- When an event occurs then Event Mesh pushes the message to your app and we donot need to ask to push the message repeatedly.
- It follows Asynchronous communication with Real-time or near real-time communication which avoid unncecssary calls and suitable for High scalable project.
- It uses AMQP (Advanced Message Queuing Protocol) which is standard messaging protocol used for sending messages between applications in a reliable and structured way.
- In this approach, we directly create a Service binding with Event Mesh Service Instance.
2. Consume Event using User-Provided Service
- It is similar to above but we uses a User Provided Service Instance of Event Mesh and bind the CAP Application to it to consume the Messages.
3. Polling
- Polling refers to the method where a consumer application actively and repeatedly requests or pulls messages from a queue, rather than waiting for the message to be pushed to it.
- When you poll, Event Mesh checks queue if message exists or not. If exist then it sends message and marks it as "in-delivery" and if Acknowledgement received after delivery then it removes the message from the queue. If no Acknowledgement received then message is marked as Un-Acknowledged and stays or redelivered.
- It repeatedly calls the API even when there is no data. It also doesnot provides real-time data as it depends on polling interval.
Method 1 - Consume Event Mesh with service binding
This approach works well when both the Event Mesh instance and the CAP application are deployed within the same Cloud Foundry space.
We will be following below steps -
- Create Event Mesh Instance
- Create Event Mesh Service Binding with CAP App
- Define configuration on package.json
- Check xs-security.json configuration
Step 1 - Create Event Mesh Instance
Create the Event Mesh Service Instance on the space on your SAP BTP Subaccount.
In our case, we are creating an Event Mesh Service Instance with below details -
- Event Mesh Instance Name - pocem
- Service Key - key
- emname - pocem
- namespace - companyName/subaccountName/pocem
You can define the similar based on your requirement.
I have used below json parameters while creating the Event Mesh Service Instance on BTP Cockpit Subaccount.
{
"emname": "pocem",
"namespace": "companyName/subaccountName/pocem",
"version": "1.1.0",
"resources": {
"units": "10"
},
"options": {
"management": true,
"messagingrest": true,
"messaging": true
},
"rules": {
"topicRules": {
"publishFilter": [
"${namespace}/*"
],
"subscribeFilter": [
"${namespace}/*"
]
},
"queueRules": {
"publishFilter": [
"${namespace}/*"
],
"subscribeFilter": [
"${namespace}/*"
]
}
},
"xs-security": {
"oauth2-configuration": {
"credential-types": []
}
},
"authorities" : [
"$ACCEPT_GRANTED_AUTHORITIES"
]
}Once the Event Mesh Service Instance has been created, create Queue on the Event Mesh and subscribe to the topic.
In our case, we have created below queue and subscribed to topic -
- Queue - companyName/subaccountName/pocem/pocqueue
- Subscribed topic - companyName/subaccountName/pocem/cap/hello/created
Step 2 - Create Event Mesh Service Binding with CAP App
In SAP CAP, consuming a service typically requires a service binding. Therefore, we create a binding between the Event Mesh service instance and the CAP application.
To create the same, we will define the event mesh instance in mta.yaml file as below -
Under modules requires section, mention the event mesh instance name. In our case the event mesh instance name is pocem, we will mention the same as -
- name: pocemWe will then define the same service under resources like below -
- name: pocem
type: org.cloudfoundry.managed-service
parameters:
service: enterprise-messaging
service-plan: default
service-name: pocemalso define the process after property auth resource in mta.yaml like -
processed-after:
- pocemmta.yaml file will look something like below -
_schema-version: 3.3.0
ID: poc13app
version: 1.0.0
description: "A simple CAP project."
parameters:
enable-parallel-deployments: true
build-parameters:
before-all:
- builder: custom
commands:
- npm ci
- npx cds build --production
modules:
- name: poc13app-srv
type: nodejs
path: gen/srv
parameters:
instances: 1
buildpack: nodejs_buildpack
build-parameters:
builder: npm-ci
provides:
- name: srv-api # required by consumers of CAP services (e.g. approuter)
properties:
srv-url: ${default-url}
requires:
- name: poc13app-auth
- name: poc13app-db
- name: pocem
- name: poc13app-db-deployer
type: hdb
path: gen/db
parameters:
buildpack: nodejs_buildpack
requires:
- name: poc13app-db
- name: poc13app
type: approuter.nodejs
path: app/router
parameters:
keep-existing-routes: true
disk-quota: 256M
memory: 256M
requires:
- name: srv-api
group: destinations
properties:
name: srv-api
url: ~{srv-url}
forwardAuthToken: true
- name: poc13app-auth
provides:
- name: app-api
properties:
app-protocol: ${protocol}
app-uri: ${default-uri}
resources:
- name: poc13app-auth
type: org.cloudfoundry.managed-service
parameters:
service: xsuaa
service-plan: application
path: ./xs-security.json
config:
xsappname: poc13app-${org}-${space}
tenant-mode: dedicated
oauth2-configuration:
redirect-uris:
- https://*~{app-api/app-uri}/**
requires:
- name: app-api
processed-after:
- pocem
- name: poc13app-db
type: com.sap.xs.hdi-container
parameters:
service: hana
service-plan: hdi-shared
- name: pocem
type: org.cloudfoundry.managed-service
parameters:
service: enterprise-messaging
service-plan: default
service-name: pocemStep 3 - Define configuration on package.json
Define the Event Mesh configuration under the cds section in the package.json file as shown below:
In our case, the queue name is - pocqueue
"cds": {
"requires": {
"messaging": {
"kind": "enterprise-messaging",
"queue": {
"name" : "$namespace/pocqueue"
}
}
}
}Step 4 - Check xs-security.json configuration
{
"scopes": [
{
"name": "$XSAPPNAME.emcallback",
"description": "Enterprise-Messaging Callback Access",
"grant-as-authority-to-apps": [
"$XSSERVICENAME(pocem)"
]
},
{
"name": "$XSAPPNAME.emmanagement",
"description": "Enterprise-Messaging Management Access"
}
],
"attributes": [],
"role-templates": [],
"authorities": [
"$XSAPPNAME.emmanagement",
"$XSAPPNAME.mtcallback"
]
}Step 5 - Define Service logic to consume Event Message
Now, consume the messages in your custom service.js file as shown below -
import cds from '@sap/cds';
const CustomService = async (srv) => {
const messaging = await cds.connect.to('messaging');
// Create queue - testing and topic - testing/Worked on step -1
await messaging.on('testing/Worked', (msg) => {
console.log('Event Received: ', msg.data);
// your custom logic comes here
});
};
export default CustomService;!!! Done !!!
Method 2 - Consume Event using User-Provided Service
This approach works well when both the Event Mesh instance and the CAP application are deployed either on same or on different cloud foundry space.
We will be following below steps -
- Create Event Mesh Instance
- Define Service Binding in mta.yaml file
- Define messaging service configuration on package.json
- Maintain Event Mesh Service Key
- Define Service logic to consume Event Message
Step 1 - Create Event Mesh Instance
Create the Event Mesh Service Instance and service key on your SAP BTP Subaccount.
In our case, we are creating an Event Mesh Service Instance with below details -
- Event Mesh Instance Name - poc7accEM
- Service Key - key
- emname - poc7acc-em
- namespace - companyName/subaccountName/poc7acc-em
You can define the similar based on your requirement.
I have used below json parameters while creating the Event Mesh Service Instance on BTP Cockpit Subaccount.
{
"emname": "poc7acc-em",
"namespace": "companyName/subaccountName/poc7acc-em",
"version": "1.1.0",
"resources": {
"units": "10"
},
"options": {
"management": true,
"messagingrest": true,
"messaging": true
},
"rules": {
"topicRules": {
"publishFilter": [
"${namespace}/*"
],
"subscribeFilter": [
"${namespace}/*"
]
},
"queueRules": {
"publishFilter": [
"${namespace}/*"
],
"subscribeFilter": [
"${namespace}/*"
]
}
},
"xs-security": {
"oauth2-configuration": {
"credential-types": []
}
},
"authorities" : [
"$ACCEPT_GRANTED_AUTHORITIES"
]
}Once the Event Mesh Service Instance has been created, create Queue on the Event Mesh and subscribe to the topic.
In our case, we have created below queue and subscribed to topic -
- Queue - companyName/subaccountName/pocem/testing
- Subscribed topic - companyName/subaccountName/pocem/testing/Worked
Step 2 - Define Service Binding in mta.yaml file
If mta.yaml file is not present, use the below command to create it.
cds add mta --for productionDefine User-Provided Service binding under modules requires section in mta.yaml file like below -
- name: pocConsumerEMIn our case -
- service binding name - pocConsumerEM
Define same User-Provided Service binding details under resources section in mta.yaml file like below -
- name: pocConsumerEM
type: org.cloudfoundry.user-provided-service
parameters:
service: User-Provided
service-name: emis-consume-messaging-ups
path: ./em-configuration.jsonwhere -
- service binding name - pocConsumerEM
- service name - User-Provided
- service instance name - emis-consume-messaging-ups (same defined in package.json vcap section)
- path - a local json file created at the project level.
Step 3 - Define messaging service configuration on package.json
Define the messaging configuration under the cds section in the package.json file like below -
Here, the VCAP name refers to the User-Provided Service instance name, which we defined in mta.yaml file.
"cds": {
"requires": {
"messaging": {
"kind": "enterprise-messaging-shared",
"publishPrefix": "$namespace/",
"subscribePrefix": "$namespace/",
"format": "cloudevents",
"vcap": {
"label": "user-provided",
"name": "emis-consume-messaging-ups"
},
"queue": {
"name": "$namespace/testing"
}
}
}
}Step 4 - Maintain Event Mesh Service Key
Create an em-configuration.json file at the project root level, copy the Event Mesh service key details (from Step 1) and paste it into this file.
Step 5 - Define Service logic to consume Event Message
Now, consume the messages in your custom service.js file as shown below -
import cds from '@sap/cds';
const CustomService = async (srv) => {
const messaging = await cds.connect.to('messaging');
// Create queue - testing and topic - testing/Worked on step -1
await messaging.on('testing/Worked', (msg) => {
console.log('Event Received: ', msg.data);
// your custom logic comes here
});
};
export default CustomService;!!! Done !!!
Method 3 - Consume Event using Polling
This approach is useful when messages are consumed manually through action calls.
However, it is not recommended, as it continuously invokes the API even when no data is available. Additionally, it does not provide real-time data, since it relies on a polling interval.
In this approach, we will use Event Mesh API through Destination to consume Event Message.
We will be following below steps -
- Create Event Mesh Instance
- Create Event Mesh Destination on BTP Subaccount
- Define Destination Service Binding in mta.yaml file
- Define Message Consumer and Acknowledgement functionality
- Define Service logic to consume Event Message
Step 1 - Create Event Mesh Instance
Create the Event Mesh Service Instance and service key on your SAP BTP Subaccount.
In our case, we are creating an Event Mesh Service Instance with below details -
- Event Mesh Instance Name - poc7accEM
- Service Key - key
- emname - poc7acc-em
- namespace - companyName/subaccountName/poc7acc-em
You can define the similar based on your requirement.
I have used below json parameters while creating the Event Mesh Service Instance on BTP Cockpit Subaccount.
{
"emname": "poc7acc-em",
"namespace": "companyName/subaccountName/poc7acc-em",
"version": "1.1.0",
"resources": {
"units": "10"
},
"options": {
"management": true,
"messagingrest": true,
"messaging": true
},
"rules": {
"topicRules": {
"publishFilter": [
"${namespace}/*"
],
"subscribeFilter": [
"${namespace}/*"
]
},
"queueRules": {
"publishFilter": [
"${namespace}/*"
],
"subscribeFilter": [
"${namespace}/*"
]
}
},
"xs-security": {
"oauth2-configuration": {
"credential-types": []
}
},
"authorities" : [
"$ACCEPT_GRANTED_AUTHORITIES"
]
}Step 2 - Create Event Mesh Destination on BTP Subaccount
Use the service key details generated from the Event Mesh service instance (created on Step 1) to create the destination.
| Parameter | Values | ||
|---|---|---|---|
| Name | Any Name (In our case its poc7accEMDestination) | ||
| Type | HTTP | ||
| URL | uri from oa2 section of event mesh service binding (append /messagingrest/v1/queues) | URL | uri from oa2 section of event mesh service binding (append /messagingrest/v1/queues) |