Sunday 2 October 2016

All about Greenplum Partitioning - The system catalog tables at your help

I was trying to improve and create ways to identify the partitioning mechanism of the Greenplum tables.

Please see the queries below that will help you find all the partitioning details about a Greenplum table

In the next post, we will see how to leverage this information and automate some of your Data Engineering Activities using Python. Finally, we will create a pipeline to move the data to HDFS using Apache Sqoop.


DROP TABLE db_retail.sales ;
CREATE TABLE db_retail.sales (id int, date date, amt decimal(10,2))
DISTRIBUTED BY (id)
PARTITION BY RANGE (date)
( START (date '2008-01-01') INCLUSIVE
   END (date '2009-01-01') EXCLUSIVE every (interval '1 mon')
  ,Start (date '2009-01-01') inclusive
  end (date '2010-01-01') exclusive
   EVERY (INTERVAL '1 day') );

drop table db_retail.sales_amt;
CREATE TABLE db_retail.sales_amt (id int, date date, amt decimal(10,2))
DISTRIBUTED BY (id)
PARTITION BY RANGE (amt)
( START (1) INCLUSIVE
   END (10000) EXCLUSIVE every (10)
 );

--- check all the partitions for the table
select * from pg_partitions where schemaname = 'db_retail' and tablename = 'sales';

select * from pg_partitions where schemaname = 'db_retail' and tablename = 'sales_amt';


--- query to analyze each inherited partition table

select partitiontablename from pg_partitions where schemaname = 'db_retail' and tablename = 'sales';

select * from pg_inherits
join pg_class c on c.oid = inhrelid
where inhparent = 'sales'::regclass;

---fetch interval wise max and min ranges for each partition

select min(partitionrangestart),max(partitionrangeend)
, current_date 
, partitioneveryclause
, partitiontype 
from pg_partitions where schemaname = 'db_retail' and tablename = 'sales'
group by 3,4,5;

select * from information_schema.tables limit 10;

--- recreate the table with the desired partition and column
select column_name, data_type, character_maximum_length
from INFORMATION_SCHEMA.COLUMNS where table_name = 'sales' and table_schema = 'db_retail';

--- find the partition column list
select columnname, position_in_partition_key from pg_partition_columns where schemaname = 'db_retail' and tablename = 'sales';


Sunday 15 May 2016

HortonWorks Sandbox Hive error - How to resolve the issue

If you have recently downloaded the HDP vm and tried to launch HIVE, you must have encountered the below error:

[root@sandbox ~]# hive
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/hdp/2.3.2.0-2950/hadoop/lib/slf4j-log4j12                                                                                                             -1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.3.2.0-2950/spark/lib/spark-assembly                                                                                                             -1.4.1.2.3.2.0-2950-hadoop2.7.1.2.3.2.0-2950.jar!/org/slf4j/impl/StaticLoggerBin                                                                                                             der.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
WARNING: Use "yarn jar" to launch YARN applications.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/hdp/2.3.2.0-2950/hadoop/lib/slf4j-log4j12                                                                                                             -1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.3.2.0-2950/spark/lib/spark-assembly                                                                                                             -1.4.1.2.3.2.0-2950-hadoop2.7.1.2.3.2.0-2950.jar!/org/slf4j/impl/StaticLoggerBin                                                                                                             der.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

Logging initialized using configuration in file:/etc/hive/2.3.2.0-2950/0/hive-lo                                                                                                             g4j.properties

Exception in thread "main" java.lang.RuntimeException: org.apache.hadoop.securit                                                                                                             y.AccessControlException: Permission denied: user=root, access=WRITE, inode="/us                                                                                                             er/root":hdfs:hdfs:drwxr-xr-x
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPe                                                                                                             rmissionChecker.java:319)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPe                                                                                                             rmissionChecker.java:292)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermi                                                                                                             ssion(FSPermissionChecker.java:213)
        at org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer$RangerAcc                                                                                                             essControlEnforcer.checkPermission(RangerHdfsAuthorizer.java:300)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermi                                                                                                             ssion(FSPermissionChecker.java:190)
        at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FS                                                                                                             Directory.java:1771)

The reason is the permission of the hdfs folder /user which is drwxr-xr-x


You cannot change it.



Directory permission in HDFS



[root@sandbox ~]# hadoop fs -mkdir /user/root
mkdir: Permission denied: user=root, access=WRITE, inode="/user/root":hdfs:hdfs:drwxr-xr-x
[root@sandbox ~]# who am i
root     pts/0        2016-05-15 07:50 (192.168.10.1)
[root@sandbox ~]# hadoop fs -chmod 777 /user
chmod: changing permissions of '/user': Permission denied. user=root is not the owner of inode=user

The solution is to change the user:

Change User and login to Hive - Successful login

Now, you are connected to Hive.

hive> show databases;
OK
default
xademo
Time taken: 1.32 seconds, Fetched: 2 row(s)
hive>

Like us on Facebook and Google if liked the post. We provide Live Support for your project. Just fill the Inquiry form on the home page.

Or click on the link and fill your details:   https://www.surveymonkey.com/r/zz962jr


Thursday 21 April 2016

Basic Operations on Arrays - Python vs. Java


Below , we have listed the difference between array operations between Python and Java.
Python
Java
Python Array Declaration of 2 integers
Java Array Decalaration of 2 integers
>>> arr = [0] * 2
>>> arr[0]
0
>>> arr[0] = 10
>>> arr[1] = 2
>>> arr[2] = 9
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
IndexError: list assignment index out of range
>>> arr.append(2) # this feature is not available in Java primitive arrays
>>>
public class hello_world {

 public static void main(String[] args) {
  int[] arr = new int[2];
  arr[0] = 10;
  arr[1] = 9;
  arr[2] = 5;
  System.out.println(arr[0] + " " + arr[1]);
 
 }
}

>> Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 2
 at com.dmac.analytics.spark.hello_world.main(hello_world.java:9)


We can append values to an existing array in python.
In Java, a traditional array has to be reassigned to a new array with larger space allocated.

Java also has a library that can use create a list that is mutable.

Read up on java.util.List() to know more about the java data structure.

Please let us know if you like our posts. Like us on Google+ and Facebook.  

Thursday 14 April 2016

Advanced structures - Dictionary of tuples in Python

Today, I want to show you how you can create a dictionary object with a dictionary as key and dictionary as value.

So, it is a Dict of Dict. The final object is stored as a tuple.

Remember the thumb rule that any key for a python dictionary must be hashable. So, we will use a feature called as frozendict in python.

Frozendict is hashable and can be used as a key in Python Dictionary.
You can use the below program effectively for searching for a key in Dictionary of Dictionaries in Python.

Let me know if you need any details. Like us on Google+ and Facebook if you like our posts.

import frozendict as util
from collections import defaultdict

### Create the first entry using hashable frozendict
match = util.frozendict({'switch': 1, 'dstmac': '00:00:00:00:00:01', 'srcmac': '00:00:00:00:00:01'})
match_array = tuple[match,60000,5]
count_bucket2 = dict(CountBucket1 = 140271057099664, CountBucket2 = 140271056501008)

### Create the second entry using hashable frozendict
match_entry = util.frozendict(switch= 5, dstmac= '00:00:00:00:00:00', srcmac= '00:00:00:00:00:01')
match_array1 = tuple([match_entry, 59999, 7])

count_bucket1 = dict(CountBucket1 = 140271056467472, CountBucket2 = 140271056411280)

# Initialize the dictionary of tuples
dict_of_tuples = ({tuple(match_array) : count_bucket2},{tuple(match_array1) : count_bucket1})

####### Your match entry
match_entry = [{'switch': 1, 'dstmac': '00:00:00:00:00:01', 'srcmac': '00:00:00:00:00:01'},60000,5]                

#Treating the final structure as a tuple. Each element of the tuple is a #dictionary.
k = 0
while k < len(dict_of_tuples):
    key = dict_of_tuples[k].iterkeys()
    val = dict_of_tuples[k].itervalues()
    if key.next() == tuple(match_entry):
        print ('Has key','corresponding value',val.next())
    else:
        print "Key not present"
    k+= 1

Wednesday 6 April 2016

Binary Tree implementation using Python - Python recursion usage

Below is a complete implementation of BST using Python. A BST(Binary Search Tree has the below properties)

1. Each parent can have max. 2 child
2. Left child is always smaller than parent
3. Right child is always greater than parent.



## Implementation of Binary tree in python
# this is an unbalanced binary tree, so this may lead to skewness in data

class Node():
    def __init__(self,val,parent= None):
        self.val = val
        self.parent = parent
        self.left = None
        self.right = None
class Tree():
    def __init__(self):
        self.root = None
    def getRoot(self):
        return self.root
    def add(self,val):
        if self.root == None:
            self.root = Node(val)
        else:
            self._add(val,self.root)
    def _add(self,val,node):
        if val < node.val:
            if node.left is not None:
                self._add(val,node.left)
            else:
                node.left = Node(val,node)
        else:
            if node.right is not None:
                self._add(val,node.right)
            else:
                node.right = Node(val,node)
   
    def find(self, val):
        # Search complexity when balanced tree is O(logn)
        if self.root is not None:
            return self._find(val,self.root)
        else:
            return None
    def _find(self,val,node):
        #print val,node.val
        if val == node.val:
            #print "matched"
            return node
        elif val < node.val and node.left is not None:
            return self._find(val,node.left)
        elif val > node.val and node.right is not None:
            return self._find(val,node.right)
        else:
            print val,"value not found"
            return None
    def deleteTree(self):
        self.root = None
       
    def delete_node(self,val):
        node = self.find(val)
        if node is None:
            return None
        else:
            self._delete_node(val,node)
   
    def _delete_node(self,val,node):
        if node == self.root:
            self.root = None
        else:
            # Case 1 when the node is a left node and a right node exists
            #if node == node.parent.left and node.left is not None:
            if node.left is not None:
                replacement = self.find(self.maximum(node.left.val))
                node.parent.left = replacement
                self.delete_node(replacement.val)
            #elif node == node.parent.right and node.right is not None:
            elif node.right is not None:
                r_val = node.right
                print "right node val",r_val.val
                print "minimum", self.minimum(r_val.val)
                replacement = self.find(self.minimum(node.right.val))
                # recursively delete the replacement from original position
                self.delete_node(replacement.val)
                node.parent.right = replacement
                replacement.parent = node.parent
                print "new right child",node.parent.right.val
               
                #removing all links for deleted node
                node.parent = None
                node.left = None
                node.right = None
            elif node.left is None and node.right is None:
                # Case 3 : Its a leaf node
                node.parent = None
               
   
   
   
    def printTree(self):
        if self.root is not None:
            self._printTree(self.root)
   
    # The printTree is a inorder tree walk
    def _printTree(self, node):
        #print "current node",node.val
        if node is not None:
            #print "called by",node.val
            self._printTree(node.left)
            print str(node.val) + ' '
           # print "right child of",node.val,node.right.val
            self._printTree(node.right)
           
    def minimum(self,val):
        node = self.find(val)
        #print node.val
        #minval = 0
        if node is not None:
            #print "recursive call"
            return self._minimum(val,node)
            #print "came back from child process",minval
        else:
            #"in final else"
            return node
    def _minimum(self,val,node):
        minval = 0
        if node.left is not None:
            #print  "parent node value",node.val, node.left.val
            return self._minimum(val,node.left)
        else:
            #print "In else", "parent node value",node.val
            minval = node.val
            #print "minval",minval
            return minval
        #print "outside now",minval
    def maximum(self,val):
        #print "find max"
        node = self.find(val)
        if node.right is not None:
            return self._maximum(val,node)
        else:
            #print "no right node",node.val
            return node.val
    def _maximum(self,val,node):
        if node.right is not None:
            return self._maximum(val,node.right)
        else:
            return node.val
    def successor(self,val):
        node = self.find(val)
        if node is None:
            return None
        elif node is not None and node.right is not None:
            return self._minimum(val,node.right)
        elif node.right is None:
            return self._successor(val,node,node.parent)
    def _successor(self,val,node,parent):
       
        if parent is None:
            return None
        elif parent.left is None :
            return self._successor(parent.val,parent,parent.parent)
        elif parent.left == node :
            #print "matched with left node"
            return parent.val
        elif parent.left <> node:
            return self._successor(parent.val,parent,parent.parent)
    def predecessor(self,val):
        node = self.find(val)
        if node is None:
            return None
        elif node.left is not None:
            return self._maximum(node.left.val,node.left)
        elif node.left is None and node <> node.parent.left:
            return node.parent.val
        else:
            return None
           
    #def transplant(T,u,v):
   
    def level_order_traversal(self,key=None):
        from copy import deepcopy
        if key is None:
            node = self.root
        else:
            node = Node(key)
        p_node = [node]
        level = [0]
        i = 0
        for x in p_node:
            if x is not None:
                if x.parent is not None:
                    #print "chekcing the traversal",x.val,"parent",x.parent.val
                    idx = p_node.index(x.parent) + 1
                    #print "parent info",x.parent.val,idx
                    nxtlevel = idx + 1
                else:
                    idx = 0
                    nxtlevel = 1
                    print "at root", idx, p_node[idx].val
                    temp = []
                if x.left is not None:
                    #print "left node exists"
                    p_node.append(x.left)
                    level.append(nxtlevel)
                if x.right is not None:
                    #print "right node exists","nextlevel",nxtlevel
                    p_node.append(x.right)
                    level.append(nxtlevel)
                # i keeps track of the element position in p_node
                #print "index",i, "level[i]",level[i],"curr node",x.val
                print level
                try:
                    if level[i] == level[i+1]:
                        #print "append"
                        temp.append(x.val)
                    else:
                            temp.append(x.val)
                            print [element for element in temp]
                            temp = []
                except IndexError:
                    temp.append(x.val)
                    print [element for element in temp]
                    temp = []
                    print "no more elements"
            i += 1
    def pre_order_traversal(self):
        if self.root is not None:
            self._pre_order_traversal(self.root)
   
    # The printTree is a inorder tree walk
    def _pre_order_traversal(self, node):
        #print "current node",node.val
        if node is not None:
            #print "called by",node.val
            print str(node.val) + ' '
            self._pre_order_traversal(node.left)
           
           # print "right child of",node.val,node.right.val
            self._pre_order_traversal(node.right)

""""
the graph is as below

            3
           / \
          0   4
         / \   \
       -1  2    8
          / \
         1   2.5
       
"""

tree = Tree()
tree.add(3)
tree.add(4)
tree.add(0)
tree.add(-1)
tree.add(8)
tree.add(2)
tree.add(2.5)
tree.add(1)
tree.printTree()
print tree.root.val
#print (tree.find(-1)).val
print "find 2",(tree.find(2)).parent.val
#print tree.minimum(4)
#print tree.maximum(0)
print "predecessor",tree.predecessor(-1)
tree.delete_node(4)
#tree.printTree()
tree.level_order_traversal()
tree.find(100)
tree.pre_order_traversal()
#print "successor",tree.successor(8)
       



Tuesday 5 April 2016

How do you run a scala script in scala command line

You might be using interactive mode of Scala to look at your data. But, if you have written the steps in a dot(.)scala file, then how do you execute. You must know by now that .scala is not mandatory, but its a good practice to follow the convention. We can use the :load option to execute this script from a scala REPL


This is what you do:

1. Write the scala commands in a script CountExample_shell.scala

sc;
val pagecounts = sc.textFile("/home/training/pagecounts/");

// take the first 10 lines and print them
pagecounts.take(10).foreach(println);

pagecounts.count;

// filter only lines that have 'en' (english) for 2nd value of the array
val enPages = pagecounts.filter(_.split(" ")(1) == "en").cache;

enPages.count;

//Create key value pairs in scala
val enTuples = enPages.map(line => line.split(" "));

val enKeyValuePairs = enTuples.map(line => (line(0).substring(0,8) , line(3).toInt));

enKeyValuePairs.reduceByKey(_+_, 1).collect;

enPages.map(l => l.split(" ")).map(l => (l(2), l(3).toInt)).reduceByKey(_+_ , 40).filter( x => x._2 > 200000).map (x => (x._2 , x._1)).collect.foreach(println);

2. Now, to execute the script, use

scala> :load /home/training/CountExample_shell.scala

3. The script will execute and display the below lines.

Loading /home/training/CountExample_shell.scala...
res24: String = /home/training/spark-1.6.0-bin-hadoop2.6/bin/spark-shell
res25: org.apache.spark.SparkContext = org.apache.spark.SparkContext@1fa98a22
pagecounts: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at textFile at <console>:32
20090505-000000 aa Main_Page 2 9980
20090505-000000 ab %D0%90%D0%B8%D0%BD%D1%82%D0%B5%D1%80%D0%BD%D0%B5%D1%82 1 465
20090505-000000 ab %D0%98%D1%85%D0%B0%D0%B4%D0%BE%D1%83_%D0%B0%D0%B4%D0%B0%D2%9F%D1%8C%D0%B0 1 16086
20090505-000000 af.b Tuisblad 1 36236
20090505-000000 af.d Tuisblad 4 189738
20090505-000000 af.q Tuisblad 2 56143
20090505-000000 af Afrika 1 46833
20090505-000000 af Afrikaans 2 53577
20090505-000000 af Australi%C3%AB 1 132432
20090505-000000 af Barack_Obama 1 23368
res27: Long = 1398882                                                          
enPages: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:34
res28: Long = 970545                                                          
enTuples: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:36
enKeyValuePairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at <console>:38
res29: Array[(String, Int)] = Array((20090507,6175726), (20090505,7076855))    
(468159,Special:Search)                                                        
(451126,Main_Page)
(1066734,404_error/)

Please post your queries on Scala and let us know your thoughts.

Saturday 12 March 2016

TPT export script - How to use the TPT export operator and understanding Data Streams

TPT (Teradata Parallel Transporter) is a standard utility that can be used to export data from Teradata system onto a file.

The below diagram explains how TPT operates:

TPT Export in Teradata
Data is generated using the Export operator. Once posted onto the Data Steam, it is consumed by the Consumer.

This is a widely used methodology. It is called Source-Sink in Spring-XD.
Similar architecture is called Publisher-Subscriber for Kafka. Though it is a Message Broker service.
But the ideology remains the same.

Below, I have given a sample TPT Export script for your reference. As you see , once the operators are created, the producer is applied to the consumer.

/*************************************+
* Script Name - export_data.tpt
* Usage - tbuild -f export_data.tpt
* Author - Tduser 
* Description - exports data from 
*               retail.item to 
*               export_tpt_item.txt file
* Version - Relase 1 - 12-Mar-2016
**************************************/

Define Job export_to_file
description 'export records from item table'
(
define schema item_schema
(
item_id char(30),
country_code char(30)
);

/* Consumer operator since in an export job the final target is the file */

define operator file_writer
type dataconnector consumer
schema item_schema
attributes
(
varchar filename = 'export_tpt_item.txt',
varchar format = 'text',
varchar openmode = 'write',
varchar indicatormode='N'
);


/* Producer operator reads data from a teradata table */

define operator export_item
type export
schema item_schema
attributes
(
varchar username = 'dbc',
varchar userpassword = 'dbc',
varchar tdpid = '127.0.0.1',
integer maxsessions = 8,
varchar selectstmt = '
select cast(l_orderkey as char(30)),
cast(l_partkey as char(30))
from retail.item;');


/* apply the producer output to the consumer */

apply to operator(file_writer[1])
select item_id,country_code
from operator(export_item[1]);

);

Execution Command: Use a wrapper to call the below script:


$> tbuild -f export_script.tpt | tee export_script.tpt.log

Log entries will be as follows:

Teradata Parallel Transporter Version 15.00.00.02
Job log: /opt/teradata/client/15.00/tbuild/logs/root-38.out
Job id is root-38, running on TDExpress150001_Sles10
Teradata Parallel Transporter file_writer[1]: TPT19006 Version 15.00.00.02
Teradata Parallel Transporter SQL Selector Operator Version 15.00.00.02
export_item: private log not specified
file_writer[1]: TPT19010 Instance 1 directing private log report to 'dtacop-root-1447-1'.
file_writer[1]: TPT19007 DataConnector Consumer operator Instances: 1
file_writer[1]: TPT19003 ECI operator ID: 'file_writer-1447'
file_writer[1]: TPT19222 Operator instance 1 processing file 'export_tpt_item.txt'.
export_item: connecting sessions
export_item: sending SELECT request
export_item: retrieving data
export_item: Total Rows Exported:  60175
export_item: finished retrieving data
file_writer[1]: TPT19221 Total files processed: 1.
export_item: disconnecting sessions
export_item: Total processor time used = '0.07 Second(s)'
export_item: Start : Sat Mar 12 11:05:33 2016
export_item: End   : Sat Mar 12 11:05:38 2016
Job step MAIN_STEP completed successfully
Job root completed successfully
Job start: Sat Mar 12 11:05:27 2016
Job end:   Sat Mar 12 11:05:38 2016

Reach out to me if you have any doubts. 
Special Offer, register for the online 7-video course just for USD 15.