Wednesday 22 November 2017

Python Code for Execute Script to Accept Attributes and do custom action

"""
sample record;

Source System,Source table,Element (column) name,From position,To Position,Column Position
Genelco,XCISACT1,GROUP-NUMBER,Integer,1,20,1
Genelco,XCISACT1,ACCOUNT-NUMBER,Integer21,25,2
Genelco,XCISACT1,DUMMY 1,Varchar2,26,35,3
"""

import json
import collections
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback


class PyStreamCallback(StreamCallback):
  def __init__(self,tmp_dir,tmp_mapping): 
        self.tmp_dir = tmp_dir
        self.tmp_mapping = tmp_mapping
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    tmp_dir = self.tmp_dir
    tmp_mapping = self.tmp_mapping
    #temp_dir = IOUtils.toString(temp_dir.getValue(), StandardCharsets.UTF_8)
    create_ddl1 = ';create external table if not exists '
    create_ddl2 = ';create table if not exists '
    dict1 = {}
    for line in text.split('\n'):
        row = line.split(',')
        tablename = row[1]
        key = int(row[6])
        value = row[0:6]
        dict1[key] = value
    create_ddl1 = create_ddl1 + tablename + '_stg('
    create_ddl2 = create_ddl2 + tablename + '('
    input_regex = ''
    regex = '(.{%s})'
    last_key = 0
    for key in sorted(dict1.keys()):
        if key > last_key:
            val = dict1[key]
            input_regex = input_regex + regex
            colType = val[3].lower()
            newColType = tmp_mapping[colType]
            create_ddl1 = create_ddl1 + val[2].lower().replace('-', '_').replace(' ','_') + ' string,'
            create_ddl2 = create_ddl2 + val[2].lower().replace('-', '_').replace(' ','_') + ' ' + newColType + ','
            
            pos = (int(val[5]) - int(val[4]) )+ 1
            input_regex = input_regex%pos
    create_ddl1 = create_ddl1[:-1]
    create_ddl2 = create_ddl2[:-1]
    create_ddl1 = create_ddl1 + ",hive_check string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'"
    create_ddl1 = create_ddl1 + ' WITH SERDEPROPERTIES ("input.regex" = "' + input_regex + '(\\\*)") LOCATION'
    create_ddl1 = create_ddl1 + " '" + tmp_dir + "/" + tablename + "'";
    
    create_ddl2 = create_ddl2 + ",hive_check string ) ROW FORMAT delimited FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE"


    custom_map = create_ddl1 + '|' + create_ddl2    
    outputStream.write(bytearray(custom_map.encode('utf-8'))) 

flowFile = session.get()
if (flowFile != None):
  tmp_dir = flowFile.getAttribute('tmp_dir')
  # to get properties from the processor
  col_mapping = flowFile.getAttribute('column_lkp').replace("'","")
  #col_mapping = '{ "varchar2" : "string", "byte" : "string", "integer" : "int", "bigint" : "bigint", "date" : "date"}'
  tmp_mapping = json.loads(col_mapping)
  flowFile = session.write(flowFile,PyStreamCallback(tmp_dir, tmp_mapping))
  
  session.transfer(flowFile, REL_SUCCESS)

2 comments:

  1. could you let me know what is the purpose of this code

    ReplyDelete
  2. You can use this code to perform custom actions using apache nifi execute script processor.

    ReplyDelete

Please share your thoughts and let us know the topics you want covered