Construct the Kafka Apply Engine Script |
Scroll |
Three simple Apply Engine scripts are provided below. The first two process Db2 changed data and the third IMS change data. All three utilize Connect CDC SQData's ability to transform the source data DESCRIPTION or schema into the desired JSON or AVRO formatted Kafka message payload. See the Apply Engine Reference for more details regarding its highly extensible capabilities.
Notes: Apply Engines utilizing AVRO and the Confluent Schema Registry may not use both APPLY and REPLICATE functions for the same Target Datastore.
Example 1 - Db2 to JSON formatted Kafka
Replicate Db2 changed data (CDC) for the IVP EMPLOYEE and DEPARTMENT tables into unique JSON formatted Kafka Topics with default partitioning. The example also includes a filter for the EMPLOYEE table. Only updates to employees with a bonus over $5,000 will cause the record to be written to Kafka. All changes to the DEPT table are Replicated with no filter applied.
----------------------------------------------------------------------
-- Name: DB2TOKAF: Z/OS DB2 To Kafka JSON on Linux
-- Client/Project: client/project
----------------------------------------------------------------------
-- SUBSTITUTION PARMS USED IN THIS SCRIPT:
-- %(ENGINE) - ENGINE Name
-- %(SHOST) - Source HOST of the Capture/Publisher
-- %(SPORT) - Source HOST SQDaemon PORT
-- %(PUBNM) - Source Capture/Publisher Agent Name
----------------------------------------------------------------------
-- Change Log:
----------------------------------------------------------------------
-- 2019-02-01 INITIAL RELEASE using JSON
----------------------------------------------------------------------
JOBNAME DB2TOKAF;
OPTIONS
CDCOP('I','U','D') -- Set CHANGE OP Constants
,USE AVRO COMPATIBLE NAMES -- Recommended for JSON
;
----------------------------------------------------------------------
-- Data Definition Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Descriptions
----------------------------------------------------------------------
BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE
KEY IS EMP_NO;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT
KEY IS DEPT_NO;
END GROUP;
----------------------------------------------------------------------
-- Target Descriptions
----------------------------------------------------------------------
-- None required
----------------------------------------------------------------------
-- Datastore Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Datastore
----------------------------------------------------------------------
DATASTORE cdc://%(SHOST):%(SPORT)/%(PUBNM)/%(ENGINE)
OF UTSCDC
AS CDCIN
DESCRIBED BY GROUP DB2_SOURCE
;
----------------------------------------------------------------------
-- Target Datastore(s)
----------------------------------------------------------------------
DATASTORE kafka:///*/key -- specify dynamic topic
OF JSON -- specify JSON format
AS TARGET
DESCRIBED BY GROUP DB2_SOURCE -- use source for REPLICATE
;
----------------------------------------------------------------------
-- Field Specification Section
----------------------------------------------------------------------
DATEFORMAT 'ISOIBM';
----------------------------------------------------------------------
-- Procedure Section
----------------------------------------------------------------------
CREATE PROC P_EMPLOYEE AS SELECT
{
IF EMPLOYEE.BONUS > '5000'
{
REPLICATE(TARGET, EMPLOYEE)
}
}
FROM CDCIN;
----------------------------------------------------------------------
-- Main Section
----------------------------------------------------------------------
PROCESS INTO TARGET
SELECT
{
CASE RECNAME(CDCIN)
WHEN 'EMP' CALLPROC(P_EMPLOYEE)
WHEN 'DEPT' REPLICATE(TARGET, DEPARTMENT)
}
FROM CDCIN;
Example 2 - Db2 to AVRO formatted Kafka
After confirming the desired results from the Apply Engine script in the Db2 to Kafka JSON use case, the Output format will be switched to AVRO, including a Confluent Schema Registry. Only a few elements of the Apply to Kafka JSON script need to be added or altered as identified by a green bar in the first character of modified lines:
----------------------------------------------------------------------
-- Name: DB2TOKAF: Z/OS DB2 To Kafka AVRO on Linux
...
-- Change Log:
----------------------------------------------------------------------
-- 2019-02-01 INITIAL RELEASE using JSON
-- 2019-02-02 Switch to Confluent AVRO Schema Registry
...
OPTIONS
CDCOP('I','U','D') -- Set CHANGE OP Constants
,USE AVRO COMPATIBLE NAMES -- Required for AVRO Targets
,CONFLUENT REPOSITORY 'http://schema_registry.precisely.com:8081'
;
----------------------------------------------------------------------
-- Source Descriptions
----------------------------------------------------------------------
BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE
KEY IS EMP_NO
TOPIC IVP_HR_EMPLOYEE
SUBJECT IVP_HR_EMPLOYEE-value;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT
KEY IS DEPT_NO
TOPIC IVP_HR_DEPARTMENT
SUBJECT IVP_HR_DEPARTMENT-value;
END GROUP;
----------------------------------------------------------------------
-- Target Datastore(s)
----------------------------------------------------------------------
DATASTORE kafka:///*/key -- specify dynamic topic
OF AVRO -- specify AVRO format
FORMAT CONFLUENT -- use Confluent Schema Registry
AS TARGET
DESCRIBED BY GROUP DB2_SOURCE -- use source for REPLICATE
;
Example 3 IMS to AVRO formatted Kafka
Replicate IMS changed data (CDC) for the IVP EMPLOYEE and ANNULREV segments in the HREMPLDB IMS database into unique AVRO formatted Kafka Topics with partitioning based on the Root Segment key. The example also includes a filter for the EMPLOYEE segment. Only updates to employees with a bonus over $5,000 will cause the record to be written to Kafka. All changes to the ANNULREV segment are Replicated with no filter applied.
Notes:
1.The user friendly AS <alias> names specified in the source DESCRIPTION statements which will be used in the AVRO schema header.
2.Replication of IMS requires that the Target message descriptions maintain the full parent key sequence. This is ensured by SQData when it generates the AVRO schema / JSON message from the Source Datastore Segment Descriptions.
----------------------------------------------------------------------
-- Name: IMSTOKAF: Z/OS IMS To Kafka AVRO on Linux
-- Client/Project: client/project
----------------------------------------------------------------------
-- SUBSTITUTION PARMS USED IN THIS SCRIPT:
-- %(ENGINE) - ENGINE Name
-- %(SHOST) - Source HOST of the Capture/Publisher
-- %(SPORT) - Source HOST SQDaemon PORT
-- %(PUBNM) - Source Capture/Publisher Agent Name
----------------------------------------------------------------------
-- Change Log:
----------------------------------------------------------------------
-- 2019-02-01 INITIAL RELEASE using AVRO
----------------------------------------------------------------------
JOBNAME IMSTOKAF;
OPTIONS
CDCOP('I','U','D') -- Set CHANGE OP Constants
,USE AVRO COMPATIBLE NAMES -- Required for AVRO Targets
,CONFLUENT REPOSITORY 'http://schema_registry.precisely.com:8081'
;
----------------------------------------------------------------------
-- Data Definition Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Descriptions
----------------------------------------------------------------------
BEGIN GROUP IMS_DBD;
DESCRIPTION IMSDBD ./IMSDBD/HREMPLDB.dbd AS HREMPLDB;
END GROUP;
BEGIN GROUP IMS_SEG;
DESCRIPTION COBOL ./IMSSEG/EMPLOYEE.cob AS EMPLOYEE -- User friendly alias
FOR SEGMENT EMPLOYEE
IN DATABASE HREMPLDB
TOPIC IVP_HR_EMPLOYEE
SUBJECT IVP_HR_EMPLOYEE-value;
DESCRIPTION COBOL ./IMSSEG/ANNULREV.cob AS ANNULREV -- User friendly alias
FOR SEGMENT ANNULREV
IN DATABASE HREMPLDB
TOPIC IVP_HR_ANNUAL_REVIEW
SUBJECT IVP_HR_ANNUAL_REVIEW-value;
END GROUP;
----------------------------------------------------------------------
-- Target Descriptions
----------------------------------------------------------------------
-- None required
----------------------------------------------------------------------
-- Datastore Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Datastore
----------------------------------------------------------------------
DATASTORE cdc://%(SHOST):%(SPORT)/%(PUBNM)/%(ENGINE)
OF IMSCDC
AS CDCIN
DESCRIBED BY GROUP IMS_SEG
;
----------------------------------------------------------------------
-- Target Datastore(s)
----------------------------------------------------------------------
DATASTORE kafka:///*/root_key -- specify dynamic topic
OF AVRO -- specify AVRO format
FORMAT CONFLUENT -- use Confluent Schema Registry
AS TARGET
DESCRIBED BY GROUP IMS_SEG -- use source for REPLICATE
;
----------------------------------------------------------------------
-- Field Specification Section
----------------------------------------------------------------------
DATEFORMAT 'ISOIBM';
----------------------------------------------------------------------
-- Procedure Section
----------------------------------------------------------------------
CREATE PROC P_EMPLOYEE AS SELECT
{
IF EMPLOYEE.BONUS > '5000'
{
REPLICATE(TARGET, EMPLOYEE)
}
}
FROM CDCIN;
----------------------------------------------------------------------
-- Main Section
----------------------------------------------------------------------
PROCESS INTO TARGET
SELECT
{
CASE RECNAME(CDCIN)
WHEN 'EMPLOYEE' CALLPROC(P_EMPLOYEE)
WHEN 'ANNULREV' REPLICATE(TARGET, ANNULREV)
}
FROM CDCIN;