Consume Event Mesh Message

We will attempt to consume event messages from an Event Mesh instance that has been created in a different space than the one where the CAP application is deployed.

How SAP Event Mesh Works (Consumption Flow)

SAP Event Mesh is a message broker based on pub/sub (publish–subscribe) pattern.

  • Producer (Publisher) sends event/message to a Topic.
  • Event Mesh Broker stores & routes message to subscribed queues.
  • Queue (Subscriber endpoint) holds messages until consumed.
  • Consumer (Your App) reads messages from the queue. Consumer doesnot consume directly from topic, it consume from a queue bound to that topic.

Different ways to Consume Event Messages -

1. Consume Event Mesh with service binding

  • Event Driven works on Push Mechanism where your app subscribes to a queue or topic.
  • When an event occurs then Event Mesh pushes the message to your app and we donot need to ask to push the message repeatedly.
  • It follows Asynchronous communication with Real-time or near real-time communication which avoid unncecssary calls and suitable for High scalable project.
  • It uses AMQP (Advanced Message Queuing Protocol) which is standard messaging protocol used for sending messages between applications in a reliable and structured way.
  • In this approach, we directly create a Service binding with Event Mesh Service Instance.

2. Consume Event using User-Provided Service

  • It is similar to above but we uses a User Provided Service Instance of Event Mesh and bind the CAP Application to it to consume the Messages.

3. Polling

  • Polling refers to the method where a consumer application actively and repeatedly requests or pulls messages from a queue, rather than waiting for the message to be pushed to it.
  • When you poll, Event Mesh checks queue if message exists or not. If exist then it sends message and marks it as "in-delivery" and if Acknowledgement received after delivery then it removes the message from the queue. If no Acknowledgement received then message is marked as Un-Acknowledged and stays or redelivered.
  • It repeatedly calls the API even when there is no data. It also doesnot provides real-time data as it depends on polling interval.



Method 1 - Consume Event Mesh with service binding

This approach works well when both the Event Mesh instance and the CAP application are deployed within the same Cloud Foundry space.

We will be following below steps -

  • Create Event Mesh Instance
  • Create Event Mesh Service Binding with CAP App
  • Define configuration on package.json
  • Check xs-security.json configuration

Step 1 - Create Event Mesh Instance

Create the Event Mesh Service Instance on the space on your SAP BTP Subaccount.

In our case, we are creating an Event Mesh Service Instance with below details -

  • Event Mesh Instance Name - pocem
  • Service Key - key
  • emname - pocem
  • namespace - companyName/subaccountName/pocem

You can define the similar based on your requirement.

I have used below json parameters while creating the Event Mesh Service Instance on BTP Cockpit Subaccount.

Parameters
{
    "emname": "pocem",
    "namespace": "companyName/subaccountName/pocem",
     "version": "1.1.0",
    "resources": {
        "units": "10"
    },
    "options": {
        "management": true,
        "messagingrest": true,
        "messaging": true
    },
    "rules": {
        "topicRules": {
            "publishFilter": [
                "${namespace}/*"
            ],
            "subscribeFilter": [
                "${namespace}/*"
            ]
        },
        "queueRules": {
            "publishFilter": [
                "${namespace}/*"
            ],
            "subscribeFilter": [
                "${namespace}/*"
            ]
        }
    },
    "xs-security": {
        "oauth2-configuration": {
            "credential-types": []
        }
    },
    "authorities" : [
        "$ACCEPT_GRANTED_AUTHORITIES"
    ]
}

Once the Event Mesh Service Instance has been created, create Queue on the Event Mesh and subscribe to the topic.

In our case, we have created below queue and subscribed to topic -

  • Queue - companyName/subaccountName/pocem/pocqueue
  • Subscribed topic - companyName/subaccountName/pocem/cap/hello/created

Step 2 - Create Event Mesh Service Binding with CAP App

In SAP CAP, consuming a service typically requires a service binding. Therefore, we create a binding between the Event Mesh service instance and the CAP application.

To create the same, we will define the event mesh instance in mta.yaml file as below -

Under modules requires section, mention the event mesh instance name. In our case the event mesh instance name is pocem, we will mention the same as -

mta.yaml
- name: pocem

We will then define the same service under resources like below -

mta.yaml
- name: pocem
    type: org.cloudfoundry.managed-service
    parameters:
      service: enterprise-messaging
      service-plan: default
      service-name: pocem

also define the process after property auth resource in mta.yaml like -

mta.yaml
processed-after:
      - pocem

mta.yaml file will look something like below -

mta.yaml
_schema-version: 3.3.0
ID: poc13app
version: 1.0.0
description: "A simple CAP project."
parameters:
  enable-parallel-deployments: true
build-parameters:
  before-all:
    - builder: custom
      commands:
        - npm ci
        - npx cds build --production
modules:
  - name: poc13app-srv
    type: nodejs
    path: gen/srv
    parameters:
      instances: 1
      buildpack: nodejs_buildpack
    build-parameters:
      builder: npm-ci
    provides:
      - name: srv-api # required by consumers of CAP services (e.g. approuter)
        properties:
          srv-url: ${default-url}
    requires:
      - name: poc13app-auth
      - name: poc13app-db
      - name: pocem
  - name: poc13app-db-deployer
    type: hdb
    path: gen/db
    parameters:
      buildpack: nodejs_buildpack
    requires:
      - name: poc13app-db
  - name: poc13app
    type: approuter.nodejs
    path: app/router
    parameters:
      keep-existing-routes: true
      disk-quota: 256M
      memory: 256M
    requires:
      - name: srv-api
        group: destinations
        properties:
          name: srv-api 
          url: ~{srv-url}
          forwardAuthToken: true
      - name: poc13app-auth
    provides:
      - name: app-api
        properties:
          app-protocol: ${protocol}
          app-uri: ${default-uri}
resources:
  - name: poc13app-auth
    type: org.cloudfoundry.managed-service
    parameters:
      service: xsuaa
      service-plan: application
      path: ./xs-security.json
      config:
        xsappname: poc13app-${org}-${space}
        tenant-mode: dedicated
        oauth2-configuration:
          redirect-uris:
            - https://*~{app-api/app-uri}/**
    requires:
      - name: app-api
    processed-after:
      - pocem
  - name: poc13app-db
    type: com.sap.xs.hdi-container
    parameters:
      service: hana
      service-plan: hdi-shared
  - name: pocem
    type: org.cloudfoundry.managed-service
    parameters:
      service: enterprise-messaging
      service-plan: default
      service-name: pocem

Step 3 - Define configuration on package.json

Define the Event Mesh configuration under the cds section in the package.json file as shown below:

In our case, the queue name is - pocqueue

package.json
"cds": {
  "requires": {
    "messaging": {
      "kind": "enterprise-messaging",
      "queue": {
        "name" : "$namespace/pocqueue"
      }
    }
  }
}

Step 4 - Check xs-security.json configuration

package.json
{
  "scopes": [
    {
      "name": "$XSAPPNAME.emcallback",
      "description": "Enterprise-Messaging Callback Access",
      "grant-as-authority-to-apps": [
        "$XSSERVICENAME(pocem)"
      ]
    },
    {
      "name": "$XSAPPNAME.emmanagement",
      "description": "Enterprise-Messaging Management Access"
    }
  ],
  "attributes": [],
  "role-templates": [],
  "authorities": [
    "$XSAPPNAME.emmanagement",
    "$XSAPPNAME.mtcallback"
  ]
}

Step 5 - Define Service logic to consume Event Message

Now, consume the messages in your custom service.js file as shown below -

service.js
import cds from '@sap/cds';

const CustomService = async (srv) => {
  
    const messaging = await cds.connect.to('messaging');
    // Create queue - testing and topic - testing/Worked on step -1 
    await messaging.on('testing/Worked', (msg) => {
        console.log('Event Received: ', msg.data);
        // your custom logic comes here
    });

};

export default CustomService;

!!! Done !!!




Method 2 - Consume Event using User-Provided Service

This approach works well when both the Event Mesh instance and the CAP application are deployed either on same or on different cloud foundry space.

We will be following below steps -

  • Create Event Mesh Instance
  • Define Service Binding in mta.yaml file
  • Define messaging service configuration on package.json
  • Maintain Event Mesh Service Key
  • Define Service logic to consume Event Message

Step 1 - Create Event Mesh Instance

Create the Event Mesh Service Instance and service key on your SAP BTP Subaccount.

In our case, we are creating an Event Mesh Service Instance with below details -

  • Event Mesh Instance Name - poc7accEM
  • Service Key - key
  • emname - poc7acc-em
  • namespace - companyName/subaccountName/poc7acc-em

You can define the similar based on your requirement.

I have used below json parameters while creating the Event Mesh Service Instance on BTP Cockpit Subaccount.

poc7accEM Instance Parameters
{
    "emname": "poc7acc-em",
    "namespace": "companyName/subaccountName/poc7acc-em",
     "version": "1.1.0",
    "resources": {
        "units": "10"
    },
    "options": {
        "management": true,
        "messagingrest": true,
        "messaging": true
    },
    "rules": {
        "topicRules": {
            "publishFilter": [
                "${namespace}/*"
            ],
            "subscribeFilter": [
                "${namespace}/*"
            ]
        },
        "queueRules": {
            "publishFilter": [
                "${namespace}/*"
            ],
            "subscribeFilter": [
                "${namespace}/*"
            ]
        }
    },
    "xs-security": {
        "oauth2-configuration": {
            "credential-types": []
        }
    },
    "authorities" : [
        "$ACCEPT_GRANTED_AUTHORITIES"
    ]
}

Once the Event Mesh Service Instance has been created, create Queue on the Event Mesh and subscribe to the topic.

In our case, we have created below queue and subscribed to topic -

  • Queue - companyName/subaccountName/pocem/testing
  • Subscribed topic - companyName/subaccountName/pocem/testing/Worked

Step 2 - Define Service Binding in mta.yaml file

If mta.yaml file is not present, use the below command to create it.

command
cds add mta --for production

Define User-Provided Service binding under modules requires section in mta.yaml file like below -

mta.yaml
- name: pocConsumerEM

In our case -

  • service binding name - pocConsumerEM

Define same User-Provided Service binding details under resources section in mta.yaml file like below -

mta.yaml
- name: pocConsumerEM
    type: org.cloudfoundry.user-provided-service
    parameters:
      service: User-Provided
      service-name: emis-consume-messaging-ups
      path: ./em-configuration.json

where -

  • service binding name - pocConsumerEM
  • service name - User-Provided
  • service instance name - emis-consume-messaging-ups (same defined in package.json vcap section)
  • path - a local json file created at the project level.

Step 3 - Define messaging service configuration on package.json

Define the messaging configuration under the cds section in the package.json file like below -

Here, the VCAP name refers to the User-Provided Service instance name, which we defined in mta.yaml file.

package.json
"cds": {
    "requires": {
      "messaging": {
        "kind": "enterprise-messaging-shared",
        "publishPrefix": "$namespace/",
        "subscribePrefix": "$namespace/",
        "format": "cloudevents",
        "vcap": {
          "label": "user-provided",
          "name": "emis-consume-messaging-ups"
        },
        "queue": {
          "name": "$namespace/testing"
        }
      }
    }
}

Step 4 - Maintain Event Mesh Service Key

Create an em-configuration.json file at the project root level, copy the Event Mesh service key details (from Step 1) and paste it into this file.

Step 5 - Define Service logic to consume Event Message

Now, consume the messages in your custom service.js file as shown below -

service.js
import cds from '@sap/cds';

const CustomService = async (srv) => {
  
    const messaging = await cds.connect.to('messaging');
    // Create queue - testing and topic - testing/Worked on step -1 
    await messaging.on('testing/Worked', (msg) => {
        console.log('Event Received: ', msg.data);
        // your custom logic comes here
    });

};

export default CustomService;

!!! Done !!!




Method 3 - Consume Event using Polling

This approach is useful when messages are consumed manually through action calls.

However, it is not recommended, as it continuously invokes the API even when no data is available. Additionally, it does not provide real-time data, since it relies on a polling interval.

In this approach, we will use Event Mesh API through Destination to consume Event Message.

We will be following below steps -

  • Create Event Mesh Instance
  • Create Event Mesh Destination on BTP Subaccount
  • Define Destination Service Binding in mta.yaml file
  • Define Message Consumer and Acknowledgement functionality
  • Define Service logic to consume Event Message

Step 1 - Create Event Mesh Instance

Create the Event Mesh Service Instance and service key on your SAP BTP Subaccount.

In our case, we are creating an Event Mesh Service Instance with below details -

  • Event Mesh Instance Name - poc7accEM
  • Service Key - key
  • emname - poc7acc-em
  • namespace - companyName/subaccountName/poc7acc-em

You can define the similar based on your requirement.

I have used below json parameters while creating the Event Mesh Service Instance on BTP Cockpit Subaccount.

poc7accEM Instance Parameters
{
    "emname": "poc7acc-em",
    "namespace": "companyName/subaccountName/poc7acc-em",
     "version": "1.1.0",
    "resources": {
        "units": "10"
    },
    "options": {
        "management": true,
        "messagingrest": true,
        "messaging": true
    },
    "rules": {
        "topicRules": {
            "publishFilter": [
                "${namespace}/*"
            ],
            "subscribeFilter": [
                "${namespace}/*"
            ]
        },
        "queueRules": {
            "publishFilter": [
                "${namespace}/*"
            ],
            "subscribeFilter": [
                "${namespace}/*"
            ]
        }
    },
    "xs-security": {
        "oauth2-configuration": {
            "credential-types": []
        }
    },
    "authorities" : [
        "$ACCEPT_GRANTED_AUTHORITIES"
    ]
}

Step 2 - Create Event Mesh Destination on BTP Subaccount

Use the service key details generated from the Event Mesh service instance (created on Step 1) to create the destination.

ParameterValues
NameAny Name (In our case its poc7accEMDestination)
TypeHTTP
URLuri from oa2 section of event mesh service binding (append /messagingrest/v1/queues)URLuri from oa2 section of event mesh service binding (append /messagingrest/v1/queues)
Bold Content