Wednesday 22 November 2017

Python Code for Execute Script to Accept Attributes and do custom action

"""
sample record;

Source System,Source table,Element (column) name,From position,To Position,Column Position
Genelco,XCISACT1,GROUP-NUMBER,Integer,1,20,1
Genelco,XCISACT1,ACCOUNT-NUMBER,Integer21,25,2
Genelco,XCISACT1,DUMMY 1,Varchar2,26,35,3
"""

import json
import collections
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback


class PyStreamCallback(StreamCallback):
  def __init__(self,tmp_dir,tmp_mapping): 
        self.tmp_dir = tmp_dir
        self.tmp_mapping = tmp_mapping
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    tmp_dir = self.tmp_dir
    tmp_mapping = self.tmp_mapping
    #temp_dir = IOUtils.toString(temp_dir.getValue(), StandardCharsets.UTF_8)
    create_ddl1 = ';create external table if not exists '
    create_ddl2 = ';create table if not exists '
    dict1 = {}
    for line in text.split('\n'):
        row = line.split(',')
        tablename = row[1]
        key = int(row[6])
        value = row[0:6]
        dict1[key] = value
    create_ddl1 = create_ddl1 + tablename + '_stg('
    create_ddl2 = create_ddl2 + tablename + '('
    input_regex = ''
    regex = '(.{%s})'
    last_key = 0
    for key in sorted(dict1.keys()):
        if key > last_key:
            val = dict1[key]
            input_regex = input_regex + regex
            colType = val[3].lower()
            newColType = tmp_mapping[colType]
            create_ddl1 = create_ddl1 + val[2].lower().replace('-', '_').replace(' ','_') + ' string,'
            create_ddl2 = create_ddl2 + val[2].lower().replace('-', '_').replace(' ','_') + ' ' + newColType + ','
            
            pos = (int(val[5]) - int(val[4]) )+ 1
            input_regex = input_regex%pos
    create_ddl1 = create_ddl1[:-1]
    create_ddl2 = create_ddl2[:-1]
    create_ddl1 = create_ddl1 + ",hive_check string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'"
    create_ddl1 = create_ddl1 + ' WITH SERDEPROPERTIES ("input.regex" = "' + input_regex + '(\\\*)") LOCATION'
    create_ddl1 = create_ddl1 + " '" + tmp_dir + "/" + tablename + "'";
    
    create_ddl2 = create_ddl2 + ",hive_check string ) ROW FORMAT delimited FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE"


    custom_map = create_ddl1 + '|' + create_ddl2    
    outputStream.write(bytearray(custom_map.encode('utf-8'))) 

flowFile = session.get()
if (flowFile != None):
  tmp_dir = flowFile.getAttribute('tmp_dir')
  # to get properties from the processor
  col_mapping = flowFile.getAttribute('column_lkp').replace("'","")
  #col_mapping = '{ "varchar2" : "string", "byte" : "string", "integer" : "int", "bigint" : "bigint", "date" : "date"}'
  tmp_mapping = json.loads(col_mapping)
  flowFile = session.write(flowFile,PyStreamCallback(tmp_dir, tmp_mapping))
  
  session.transfer(flowFile, REL_SUCCESS)

Saturday 18 November 2017

Understanding Hadoop Record Reader and Input Split

 Current Challenge: Loading fixed width mainframe files into Hive.


Reading required: https://hadoopi.wordpress.com/2013/05/27/understand-recordreader-inputsplit/


1. Would share my version of the code and sample file soon.




Sunday 2 October 2016

All about Greenplum Partitioning - The system catalog tables at your help

I was trying to improve and create ways to identify the partitioning mechanism of the Greenplum tables.

Please see the queries below that will help you find all the partitioning details about a Greenplum table

In the next post, we will see how to leverage this information and automate some of your Data Engineering Activities using Python. Finally, we will create a pipeline to move the data to HDFS using Apache Sqoop.


DROP TABLE db_retail.sales ;
CREATE TABLE db_retail.sales (id int, date date, amt decimal(10,2))
DISTRIBUTED BY (id)
PARTITION BY RANGE (date)
( START (date '2008-01-01') INCLUSIVE
   END (date '2009-01-01') EXCLUSIVE every (interval '1 mon')
  ,Start (date '2009-01-01') inclusive
  end (date '2010-01-01') exclusive
   EVERY (INTERVAL '1 day') );

drop table db_retail.sales_amt;
CREATE TABLE db_retail.sales_amt (id int, date date, amt decimal(10,2))
DISTRIBUTED BY (id)
PARTITION BY RANGE (amt)
( START (1) INCLUSIVE
   END (10000) EXCLUSIVE every (10)
 );

--- check all the partitions for the table
select * from pg_partitions where schemaname = 'db_retail' and tablename = 'sales';

select * from pg_partitions where schemaname = 'db_retail' and tablename = 'sales_amt';


--- query to analyze each inherited partition table

select partitiontablename from pg_partitions where schemaname = 'db_retail' and tablename = 'sales';

select * from pg_inherits
join pg_class c on c.oid = inhrelid
where inhparent = 'sales'::regclass;

---fetch interval wise max and min ranges for each partition

select min(partitionrangestart),max(partitionrangeend)
, current_date 
, partitioneveryclause
, partitiontype 
from pg_partitions where schemaname = 'db_retail' and tablename = 'sales'
group by 3,4,5;

select * from information_schema.tables limit 10;

--- recreate the table with the desired partition and column
select column_name, data_type, character_maximum_length
from INFORMATION_SCHEMA.COLUMNS where table_name = 'sales' and table_schema = 'db_retail';

--- find the partition column list
select columnname, position_in_partition_key from pg_partition_columns where schemaname = 'db_retail' and tablename = 'sales';


Sunday 15 May 2016

HortonWorks Sandbox Hive error - How to resolve the issue

If you have recently downloaded the HDP vm and tried to launch HIVE, you must have encountered the below error:

[root@sandbox ~]# hive
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/hdp/2.3.2.0-2950/hadoop/lib/slf4j-log4j12                                                                                                             -1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.3.2.0-2950/spark/lib/spark-assembly                                                                                                             -1.4.1.2.3.2.0-2950-hadoop2.7.1.2.3.2.0-2950.jar!/org/slf4j/impl/StaticLoggerBin                                                                                                             der.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
WARNING: Use "yarn jar" to launch YARN applications.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/hdp/2.3.2.0-2950/hadoop/lib/slf4j-log4j12                                                                                                             -1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.3.2.0-2950/spark/lib/spark-assembly                                                                                                             -1.4.1.2.3.2.0-2950-hadoop2.7.1.2.3.2.0-2950.jar!/org/slf4j/impl/StaticLoggerBin                                                                                                             der.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

Logging initialized using configuration in file:/etc/hive/2.3.2.0-2950/0/hive-lo                                                                                                             g4j.properties

Exception in thread "main" java.lang.RuntimeException: org.apache.hadoop.securit                                                                                                             y.AccessControlException: Permission denied: user=root, access=WRITE, inode="/us                                                                                                             er/root":hdfs:hdfs:drwxr-xr-x
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPe                                                                                                             rmissionChecker.java:319)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPe                                                                                                             rmissionChecker.java:292)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermi                                                                                                             ssion(FSPermissionChecker.java:213)
        at org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer$RangerAcc                                                                                                             essControlEnforcer.checkPermission(RangerHdfsAuthorizer.java:300)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermi                                                                                                             ssion(FSPermissionChecker.java:190)
        at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FS                                                                                                             Directory.java:1771)

The reason is the permission of the hdfs folder /user which is drwxr-xr-x


You cannot change it.



Directory permission in HDFS



[root@sandbox ~]# hadoop fs -mkdir /user/root
mkdir: Permission denied: user=root, access=WRITE, inode="/user/root":hdfs:hdfs:drwxr-xr-x
[root@sandbox ~]# who am i
root     pts/0        2016-05-15 07:50 (192.168.10.1)
[root@sandbox ~]# hadoop fs -chmod 777 /user
chmod: changing permissions of '/user': Permission denied. user=root is not the owner of inode=user

The solution is to change the user:

Change User and login to Hive - Successful login

Now, you are connected to Hive.

hive> show databases;
OK
default
xademo
Time taken: 1.32 seconds, Fetched: 2 row(s)
hive>

Like us on Facebook and Google if liked the post. We provide Live Support for your project. Just fill the Inquiry form on the home page.

Or click on the link and fill your details:   https://www.surveymonkey.com/r/zz962jr


Thursday 21 April 2016

Basic Operations on Arrays - Python vs. Java


Below , we have listed the difference between array operations between Python and Java.
Python
Java
Python Array Declaration of 2 integers
Java Array Decalaration of 2 integers
>>> arr = [0] * 2
>>> arr[0]
0
>>> arr[0] = 10
>>> arr[1] = 2
>>> arr[2] = 9
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
IndexError: list assignment index out of range
>>> arr.append(2) # this feature is not available in Java primitive arrays
>>>
public class hello_world {

 public static void main(String[] args) {
  int[] arr = new int[2];
  arr[0] = 10;
  arr[1] = 9;
  arr[2] = 5;
  System.out.println(arr[0] + " " + arr[1]);
 
 }
}

>> Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 2
 at com.dmac.analytics.spark.hello_world.main(hello_world.java:9)


We can append values to an existing array in python.
In Java, a traditional array has to be reassigned to a new array with larger space allocated.

Java also has a library that can use create a list that is mutable.

Read up on java.util.List() to know more about the java data structure.

Please let us know if you like our posts. Like us on Google+ and Facebook.