Please enable JavaScript to view this site.

Navigation: Replicator Engine > Use Case Scenarios

IMS to PostgreSQL Distributor

Scroll

Replicate IMS changed data (CDC) for the IVP HR and Facilities databases into a PostgreSQL RDBMS running on a cloud platform such as AWS RDS PostgreSQL. Two Engines will be used, a Replicator operating as a Parallel Processing Distributor using Kafka and multiple instances of a PostgreSQL Apply Engine operating as a Kafka Consumer.

Example Part 1 - Replicator Engine in Distributor mode

The volume of changes is anticipated to be quite large so  Kafka will be used to split the source CDC data into separate streams to be processed in parallel partitioned by Database Root Key in a cloud VM. The values for TOPIC and SUBJECT used here are arbitrary but were selected based on the source DBD Names, in this example, IMS DBD's IVPHRDBD and IVPFCDBD. Operation of the Replicator will be optimized using one worker thread.

----------------------------------------------------------------------

-- Name: IMSTOKAF:  Z/OS IMS CDC MESSAGE To Apply Engine Consumer

-- Client/Project: client/project

----------------------------------------------------------------------

--       Change Log:

----------------------------------------------------------------------

-- 2022-09-01 INITIAL RELEASE using Replicator / Distributor Engine

--

----------------------------------------------------------------------

--       Replicate Source/Target

----------------------------------------------------------------------

REPLICATE

  IMS cdc://<host_name>:<<%PREFIX_2CHLC%>daemon_port>/<publisher_name>/<subscription_name>

  TO CDC MESSAGE 'kafka://<host_name>:<host_port>/<topic>'

  WITH 1 WORKER

;

OPTIONS

AVRO COMPATIBLE NAMES,

STRIP TRAILING SPACES,

NAMESPACE '<name_space>'

;

MAPPINGS

  SOURCE 'IVPHRDBD'

          SUBJECT 'IVPHRCDC-value>

          ALIAS 'IMSCDC1'

          ROOTKEYLEN 8

  SOURCE 'IVPFCDBD'

          SUBJECT 'IVPFCCDC-value>

          ALIAS 'IMSCDC2'

          ROOTKEYLEN 5

;

 

Example Part 2 - Apply Engine Kafka Consumer

Multiple instances of the same Apply Engine script will run in a cloud VM, each processing in parallel a portion of the original CDC data partitioned by root segment key to maximize RDBMS Insert/Update/Delete performance, the slowest part of the replication process.  

----------------------------------------------------------------------

-- Name: IMSTOPGS:  Z/OS IMS To PostgreSQL via Kafka on Linux

-- Client/Project: client/project

----------------------------------------------------------------------

--  SUBSTITUTION PARMS USED IN THIS SCRIPT:

--   %(ENGINE) - ENGINE Name

--   %(SHOST) - Kafka Cluster

--   %(SPORT) - Kafka Cluster port

--   %(SCHEMA) - Target SCHEMA Name

----------------------------------------------------------------------

--       Change Log:

----------------------------------------------------------------------

-- 2022-09-01 INITIAL RELEASE using AVRO

----------------------------------------------------------------------

JOBNAME IMSTOPGS;

 

COMMIT EVERY 500;           -- The Default is also 500

 

OPTIONS

   CDCOP('I','U','D')     -- Set CHANGE OP Constants

;

 

RDBMS ODBC ORGDATA_FACILITIES FACILRPL passcmd '<./getpassword.sh>';

 

----------------------------------------------------------------------

--       Data Definition Section

----------------------------------------------------------------------

----------------------------------------------------------------------

--       Source Descriptions

----------------------------------------------------------------------

 

BEGIN GROUP IMS_DBD;

DESCRIPTION IMSDBD ./IMSDBD/IVPHRDBD.dbd AS IVPHRDBD;

DESCRIPTION IMSDBD ./IMSDBD/IVPFCDBD.dbd AS IVPFCDBD;

END GROUP

;

 

BEGIN GROUP IMS_SEG;

DESCRIPTION COBOL ./IMSSEG/EMPLOYEE.cob AS EMPLOYEE -- User friendly alias

                FOR SEGMENT EMPLOYEE

                IN DATABASE IVPHRDBD

;

DESCRIPTION COBOL ./IMSSEG/FACILITY.cob AS FACILITY -- User friendly alias

                FOR SEGMENT FACILITY

                IN DATABASE IVPFCDBD

;

END GROUP;

 

----------------------------------------------------------------------

--       Target Descriptions

----------------------------------------------------------------------

BEGIN GROUP PGS_DDL;

DESCRIPTION SQLDDL ./PGSDDL/IVPHRDBD/EMPLOYEE.ddl AS T_EMPLOYEE;

DESCRIPTION SQLDDL ./PGSDDL/IVPFCDBD/FACILITY.ddl AS T_FACILITY;

END GROUP;

 

----------------------------------------------------------------------

--       Datastore Section

----------------------------------------------------------------------

----------------------------------------------------------------------

--       Source Datastore

----------------------------------------------------------------------

DATASTORE kafka://%(SHOST):%(SPORT)         -- specify Kafka cluster

        OF IMSCDC                         -- specify IMS CDC format

        AS SOURCE

        DESCRIBED BY GROUP IMS_SEG        

;

 

----------------------------------------------------------------------

--       Target Datastore(s)

----------------------------------------------------------------------

DATASTORE RDBMS

        OF RELATIONAL

        AS TARGET

        FORCE QUALIFIER %(SCHEMA)

        DESCRIBED BY GROUP PGS_DDL

        FOR CHANGE

;

 

----------------------------------------------------------------------

--       Field Specification Section

----------------------------------------------------------------------

 

----------------------------------------------------------------------

--       Procedure Section

----------------------------------------------------------------------

CREATE PROC M_EMPLOYEE AS SELECT

{

   T_EMPLOYEE.CDC_SOURCE_APPLY_DTTM             = V_SOURCE_APPLY_DTTM

   T_EMPLOYEE.CDC_SOURCE_CAPTURE_DTTM           = V_SOURCE_CAPTURE_DTTM

   T_EMPLOYEE.CDC_SOURCE_ACTION_CODE           = V_SOURCE_CDC_ACTION_CODE

-- Parent Keys

 

-- Segment Keys

   T_EMPLOYEE.EMP_NO                           = EMPLOYEE.EMP-NO

-- Segment Data

   T_EMPLOYEE.FULL_NAME                         = EMPLOYEE.FULL_NAME

   T_EMPLOYEE.LAST_NAME                         = EMPLOYEE.LAST-NAME

   T_EMPLOYEE.FIRST_NAME                       = EMPLOYEE.FIRST-NAME

   T_EMPLOYEE.PHONE                             = EMPLOYEE.PHONE

   T_EMPLOYEE.ZIP-CODE                         = EMPLOYEE.ZIP-CODE

}  

FROM CDCIN;

 

CREATE PROC M_FACILITY AS SELECT

   T_FACILITY.CDC_SOURCE_APPLY_DTTM             = V_SOURCE_APPLY_DTTM

   T_FACILITY.CDC_SOURCE_CAPTURE_DTTM           = V_SOURCE_CAPTURE_DTTM

   T_FACILITY.CDC_SOURCE_ACTION_CODE           = V_SOURCE_CDC_ACTION_CODE

-- Parent Keys

 

-- Segment Keys

   T_FACILITY.STATE_CODE                       = FACILITY.STATE-CODE

   T_FACILITY.OFFICE_NO                         = FACILITY.OFFICE-NUMBER

-- Segment Data

   T_FACILITY.CITY_NAME                         = FACILITY.CITY_NAME  

   T_FACILITY.ADDRESS_1                         = FACILITY.ADDRESS_1  

   T_FACILITY.ADDRESS_2                         = FACILITY.ADDRESS_2  

   T_FACILITY.ADDRESS_3                         = FACILITY.ADDRESS_3  

   T_FACILITY.ZIP_CODE                         = FACILITY.ZIP_CODE  

   T_FACILITY.MANAGER_ID                       = FACILITY.MANAGER_ID    

}

FROM CDCIN;

 

----------------------------------------------------------------------

--       Main Section

----------------------------------------------------------------------

PROCESS INTO TARGET

SELECT

{

-- OUTMSG statement(s)for debugging ONLY and should be commented

-- out when not needed

--   OUTMSG(0,'Segment=',IMSSEGNAME(CDCIN)

--             ,' CDCOP=',CDCOP(CDCIN)

--           )

 

   V_SOURCE_CDC_SOURCE_APPLY_DTTM               = LOCAL2GMT(TIMESTAMP ())

   V_SOURCE_CAPTURE_DTTM                       = CDCTSTMP(CDCIN)

   V_SOURCE_CDC_ACTION_CODE                     = CDCOP(CDCIN)    

 

 

  CASE IMSSEGNAME(CDCIN)

      WHEN 'EMPLOYEE' { CALLPROC(M_EMPLOYEE) APPLY(TARGET,T_EMPLOYEE) }

      WHEN 'FACILITY' { CALLPROC(M_FACILITY) APPLY(TARGET,T_FACILITY) }

 

}

FROM CDCIN;