"""
sample record;
Source System,Source table,Element (column) name,From position,To Position,Column Position
Genelco,XCISACT1,GROUP-NUMBER,Integer,1,20,1
Genelco,XCISACT1,ACCOUNT-NUMBER,Integer21,25,2
Genelco,XCISACT1,DUMMY 1,Varchar2,26,35,3
"""
import json
import collections
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self,tmp_dir,tmp_mapping):
self.tmp_dir = tmp_dir
self.tmp_mapping = tmp_mapping
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
tmp_dir = self.tmp_dir
tmp_mapping = self.tmp_mapping
#temp_dir = IOUtils.toString(temp_dir.getValue(), StandardCharsets.UTF_8)
create_ddl1 = ';create external table if not exists '
create_ddl2 = ';create table if not exists '
dict1 = {}
for line in text.split('\n'):
row = line.split(',')
tablename = row[1]
key = int(row[6])
value = row[0:6]
dict1[key] = value
create_ddl1 = create_ddl1 + tablename + '_stg('
create_ddl2 = create_ddl2 + tablename + '('
input_regex = ''
regex = '(.{%s})'
last_key = 0
for key in sorted(dict1.keys()):
if key > last_key:
val = dict1[key]
input_regex = input_regex + regex
colType = val[3].lower()
newColType = tmp_mapping[colType]
create_ddl1 = create_ddl1 + val[2].lower().replace('-', '_').replace(' ','_') + ' string,'
create_ddl2 = create_ddl2 + val[2].lower().replace('-', '_').replace(' ','_') + ' ' + newColType + ','
pos = (int(val[5]) - int(val[4]) )+ 1
input_regex = input_regex%pos
create_ddl1 = create_ddl1[:-1]
create_ddl2 = create_ddl2[:-1]
create_ddl1 = create_ddl1 + ",hive_check string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'"
create_ddl1 = create_ddl1 + ' WITH SERDEPROPERTIES ("input.regex" = "' + input_regex + '(\\\*)") LOCATION'
create_ddl1 = create_ddl1 + " '" + tmp_dir + "/" + tablename + "'";
create_ddl2 = create_ddl2 + ",hive_check string ) ROW FORMAT delimited FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE"
custom_map = create_ddl1 + '|' + create_ddl2
outputStream.write(bytearray(custom_map.encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
tmp_dir = flowFile.getAttribute('tmp_dir')
# to get properties from the processor
col_mapping = flowFile.getAttribute('column_lkp').replace("'","")
#col_mapping = '{ "varchar2" : "string", "byte" : "string", "integer" : "int", "bigint" : "bigint", "date" : "date"}'
tmp_mapping = json.loads(col_mapping)
flowFile = session.write(flowFile,PyStreamCallback(tmp_dir, tmp_mapping))
session.transfer(flowFile, REL_SUCCESS)
sample record;
Source System,Source table,Element (column) name,From position,To Position,Column Position
Genelco,XCISACT1,GROUP-NUMBER,Integer,1,20,1
Genelco,XCISACT1,ACCOUNT-NUMBER,Integer21,25,2
Genelco,XCISACT1,DUMMY 1,Varchar2,26,35,3
"""
import json
import collections
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self,tmp_dir,tmp_mapping):
self.tmp_dir = tmp_dir
self.tmp_mapping = tmp_mapping
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
tmp_dir = self.tmp_dir
tmp_mapping = self.tmp_mapping
#temp_dir = IOUtils.toString(temp_dir.getValue(), StandardCharsets.UTF_8)
create_ddl1 = ';create external table if not exists '
create_ddl2 = ';create table if not exists '
dict1 = {}
for line in text.split('\n'):
row = line.split(',')
tablename = row[1]
key = int(row[6])
value = row[0:6]
dict1[key] = value
create_ddl1 = create_ddl1 + tablename + '_stg('
create_ddl2 = create_ddl2 + tablename + '('
input_regex = ''
regex = '(.{%s})'
last_key = 0
for key in sorted(dict1.keys()):
if key > last_key:
val = dict1[key]
input_regex = input_regex + regex
colType = val[3].lower()
newColType = tmp_mapping[colType]
create_ddl1 = create_ddl1 + val[2].lower().replace('-', '_').replace(' ','_') + ' string,'
create_ddl2 = create_ddl2 + val[2].lower().replace('-', '_').replace(' ','_') + ' ' + newColType + ','
pos = (int(val[5]) - int(val[4]) )+ 1
input_regex = input_regex%pos
create_ddl1 = create_ddl1[:-1]
create_ddl2 = create_ddl2[:-1]
create_ddl1 = create_ddl1 + ",hive_check string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'"
create_ddl1 = create_ddl1 + ' WITH SERDEPROPERTIES ("input.regex" = "' + input_regex + '(\\\*)") LOCATION'
create_ddl1 = create_ddl1 + " '" + tmp_dir + "/" + tablename + "'";
create_ddl2 = create_ddl2 + ",hive_check string ) ROW FORMAT delimited FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE"
custom_map = create_ddl1 + '|' + create_ddl2
outputStream.write(bytearray(custom_map.encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
tmp_dir = flowFile.getAttribute('tmp_dir')
# to get properties from the processor
col_mapping = flowFile.getAttribute('column_lkp').replace("'","")
#col_mapping = '{ "varchar2" : "string", "byte" : "string", "integer" : "int", "bigint" : "bigint", "date" : "date"}'
tmp_mapping = json.loads(col_mapping)
flowFile = session.write(flowFile,PyStreamCallback(tmp_dir, tmp_mapping))
session.transfer(flowFile, REL_SUCCESS)
could you let me know what is the purpose of this code
ReplyDeleteYou can use this code to perform custom actions using apache nifi execute script processor.
ReplyDelete