Wednesday 22 November 2017

Python Code for Execute Script to Accept Attributes and do custom action

"""
sample record;

Source System,Source table,Element (column) name,From position,To Position,Column Position
Genelco,XCISACT1,GROUP-NUMBER,Integer,1,20,1
Genelco,XCISACT1,ACCOUNT-NUMBER,Integer21,25,2
Genelco,XCISACT1,DUMMY 1,Varchar2,26,35,3
"""

import json
import collections
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback


class PyStreamCallback(StreamCallback):
  def __init__(self,tmp_dir,tmp_mapping): 
        self.tmp_dir = tmp_dir
        self.tmp_mapping = tmp_mapping
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    tmp_dir = self.tmp_dir
    tmp_mapping = self.tmp_mapping
    #temp_dir = IOUtils.toString(temp_dir.getValue(), StandardCharsets.UTF_8)
    create_ddl1 = ';create external table if not exists '
    create_ddl2 = ';create table if not exists '
    dict1 = {}
    for line in text.split('\n'):
        row = line.split(',')
        tablename = row[1]
        key = int(row[6])
        value = row[0:6]
        dict1[key] = value
    create_ddl1 = create_ddl1 + tablename + '_stg('
    create_ddl2 = create_ddl2 + tablename + '('
    input_regex = ''
    regex = '(.{%s})'
    last_key = 0
    for key in sorted(dict1.keys()):
        if key > last_key:
            val = dict1[key]
            input_regex = input_regex + regex
            colType = val[3].lower()
            newColType = tmp_mapping[colType]
            create_ddl1 = create_ddl1 + val[2].lower().replace('-', '_').replace(' ','_') + ' string,'
            create_ddl2 = create_ddl2 + val[2].lower().replace('-', '_').replace(' ','_') + ' ' + newColType + ','
            
            pos = (int(val[5]) - int(val[4]) )+ 1
            input_regex = input_regex%pos
    create_ddl1 = create_ddl1[:-1]
    create_ddl2 = create_ddl2[:-1]
    create_ddl1 = create_ddl1 + ",hive_check string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'"
    create_ddl1 = create_ddl1 + ' WITH SERDEPROPERTIES ("input.regex" = "' + input_regex + '(\\\*)") LOCATION'
    create_ddl1 = create_ddl1 + " '" + tmp_dir + "/" + tablename + "'";
    
    create_ddl2 = create_ddl2 + ",hive_check string ) ROW FORMAT delimited FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE"


    custom_map = create_ddl1 + '|' + create_ddl2    
    outputStream.write(bytearray(custom_map.encode('utf-8'))) 

flowFile = session.get()
if (flowFile != None):
  tmp_dir = flowFile.getAttribute('tmp_dir')
  # to get properties from the processor
  col_mapping = flowFile.getAttribute('column_lkp').replace("'","")
  #col_mapping = '{ "varchar2" : "string", "byte" : "string", "integer" : "int", "bigint" : "bigint", "date" : "date"}'
  tmp_mapping = json.loads(col_mapping)
  flowFile = session.write(flowFile,PyStreamCallback(tmp_dir, tmp_mapping))
  
  session.transfer(flowFile, REL_SUCCESS)

Saturday 18 November 2017

Understanding Hadoop Record Reader and Input Split

 Current Challenge: Loading fixed width mainframe files into Hive.


Reading required: https://hadoopi.wordpress.com/2013/05/27/understand-recordreader-inputsplit/


1. Would share my version of the code and sample file soon.