Thursday, 14 April 2016

Advanced structures - Dictionary of tuples in Python

Today, I want to show you how you can create a dictionary object with a dictionary as key and dictionary as value.

So, it is a Dict of Dict. The final object is stored as a tuple.

Remember the thumb rule that any key for a python dictionary must be hashable. So, we will use a feature called as frozendict in python.

Frozendict is hashable and can be used as a key in Python Dictionary.
You can use the below program effectively for searching for a key in Dictionary of Dictionaries in Python.

Let me know if you need any details. Like us on Google+ and Facebook if you like our posts.

import frozendict as util
from collections import defaultdict

### Create the first entry using hashable frozendict
match = util.frozendict({'switch': 1, 'dstmac': '00:00:00:00:00:01', 'srcmac': '00:00:00:00:00:01'})
match_array = tuple[match,60000,5]
count_bucket2 = dict(CountBucket1 = 140271057099664, CountBucket2 = 140271056501008)

### Create the second entry using hashable frozendict
match_entry = util.frozendict(switch= 5, dstmac= '00:00:00:00:00:00', srcmac= '00:00:00:00:00:01')
match_array1 = tuple([match_entry, 59999, 7])

count_bucket1 = dict(CountBucket1 = 140271056467472, CountBucket2 = 140271056411280)

# Initialize the dictionary of tuples
dict_of_tuples = ({tuple(match_array) : count_bucket2},{tuple(match_array1) : count_bucket1})

####### Your match entry
match_entry = [{'switch': 1, 'dstmac': '00:00:00:00:00:01', 'srcmac': '00:00:00:00:00:01'},60000,5]                

#Treating the final structure as a tuple. Each element of the tuple is a #dictionary.
k = 0
while k < len(dict_of_tuples):
    key = dict_of_tuples[k].iterkeys()
    val = dict_of_tuples[k].itervalues()
    if key.next() == tuple(match_entry):
        print ('Has key','corresponding value',val.next())
    else:
        print "Key not present"
    k+= 1

Wednesday, 6 April 2016

Binary Tree implementation using Python - Python recursion usage

Below is a complete implementation of BST using Python. A BST(Binary Search Tree has the below properties)

1. Each parent can have max. 2 child
2. Left child is always smaller than parent
3. Right child is always greater than parent.



## Implementation of Binary tree in python
# this is an unbalanced binary tree, so this may lead to skewness in data

class Node():
    def __init__(self,val,parent= None):
        self.val = val
        self.parent = parent
        self.left = None
        self.right = None
class Tree():
    def __init__(self):
        self.root = None
    def getRoot(self):
        return self.root
    def add(self,val):
        if self.root == None:
            self.root = Node(val)
        else:
            self._add(val,self.root)
    def _add(self,val,node):
        if val < node.val:
            if node.left is not None:
                self._add(val,node.left)
            else:
                node.left = Node(val,node)
        else:
            if node.right is not None:
                self._add(val,node.right)
            else:
                node.right = Node(val,node)
   
    def find(self, val):
        # Search complexity when balanced tree is O(logn)
        if self.root is not None:
            return self._find(val,self.root)
        else:
            return None
    def _find(self,val,node):
        #print val,node.val
        if val == node.val:
            #print "matched"
            return node
        elif val < node.val and node.left is not None:
            return self._find(val,node.left)
        elif val > node.val and node.right is not None:
            return self._find(val,node.right)
        else:
            print val,"value not found"
            return None
    def deleteTree(self):
        self.root = None
       
    def delete_node(self,val):
        node = self.find(val)
        if node is None:
            return None
        else:
            self._delete_node(val,node)
   
    def _delete_node(self,val,node):
        if node == self.root:
            self.root = None
        else:
            # Case 1 when the node is a left node and a right node exists
            #if node == node.parent.left and node.left is not None:
            if node.left is not None:
                replacement = self.find(self.maximum(node.left.val))
                node.parent.left = replacement
                self.delete_node(replacement.val)
            #elif node == node.parent.right and node.right is not None:
            elif node.right is not None:
                r_val = node.right
                print "right node val",r_val.val
                print "minimum", self.minimum(r_val.val)
                replacement = self.find(self.minimum(node.right.val))
                # recursively delete the replacement from original position
                self.delete_node(replacement.val)
                node.parent.right = replacement
                replacement.parent = node.parent
                print "new right child",node.parent.right.val
               
                #removing all links for deleted node
                node.parent = None
                node.left = None
                node.right = None
            elif node.left is None and node.right is None:
                # Case 3 : Its a leaf node
                node.parent = None
               
   
   
   
    def printTree(self):
        if self.root is not None:
            self._printTree(self.root)
   
    # The printTree is a inorder tree walk
    def _printTree(self, node):
        #print "current node",node.val
        if node is not None:
            #print "called by",node.val
            self._printTree(node.left)
            print str(node.val) + ' '
           # print "right child of",node.val,node.right.val
            self._printTree(node.right)
           
    def minimum(self,val):
        node = self.find(val)
        #print node.val
        #minval = 0
        if node is not None:
            #print "recursive call"
            return self._minimum(val,node)
            #print "came back from child process",minval
        else:
            #"in final else"
            return node
    def _minimum(self,val,node):
        minval = 0
        if node.left is not None:
            #print  "parent node value",node.val, node.left.val
            return self._minimum(val,node.left)
        else:
            #print "In else", "parent node value",node.val
            minval = node.val
            #print "minval",minval
            return minval
        #print "outside now",minval
    def maximum(self,val):
        #print "find max"
        node = self.find(val)
        if node.right is not None:
            return self._maximum(val,node)
        else:
            #print "no right node",node.val
            return node.val
    def _maximum(self,val,node):
        if node.right is not None:
            return self._maximum(val,node.right)
        else:
            return node.val
    def successor(self,val):
        node = self.find(val)
        if node is None:
            return None
        elif node is not None and node.right is not None:
            return self._minimum(val,node.right)
        elif node.right is None:
            return self._successor(val,node,node.parent)
    def _successor(self,val,node,parent):
       
        if parent is None:
            return None
        elif parent.left is None :
            return self._successor(parent.val,parent,parent.parent)
        elif parent.left == node :
            #print "matched with left node"
            return parent.val
        elif parent.left <> node:
            return self._successor(parent.val,parent,parent.parent)
    def predecessor(self,val):
        node = self.find(val)
        if node is None:
            return None
        elif node.left is not None:
            return self._maximum(node.left.val,node.left)
        elif node.left is None and node <> node.parent.left:
            return node.parent.val
        else:
            return None
           
    #def transplant(T,u,v):
   
    def level_order_traversal(self,key=None):
        from copy import deepcopy
        if key is None:
            node = self.root
        else:
            node = Node(key)
        p_node = [node]
        level = [0]
        i = 0
        for x in p_node:
            if x is not None:
                if x.parent is not None:
                    #print "chekcing the traversal",x.val,"parent",x.parent.val
                    idx = p_node.index(x.parent) + 1
                    #print "parent info",x.parent.val,idx
                    nxtlevel = idx + 1
                else:
                    idx = 0
                    nxtlevel = 1
                    print "at root", idx, p_node[idx].val
                    temp = []
                if x.left is not None:
                    #print "left node exists"
                    p_node.append(x.left)
                    level.append(nxtlevel)
                if x.right is not None:
                    #print "right node exists","nextlevel",nxtlevel
                    p_node.append(x.right)
                    level.append(nxtlevel)
                # i keeps track of the element position in p_node
                #print "index",i, "level[i]",level[i],"curr node",x.val
                print level
                try:
                    if level[i] == level[i+1]:
                        #print "append"
                        temp.append(x.val)
                    else:
                            temp.append(x.val)
                            print [element for element in temp]
                            temp = []
                except IndexError:
                    temp.append(x.val)
                    print [element for element in temp]
                    temp = []
                    print "no more elements"
            i += 1
    def pre_order_traversal(self):
        if self.root is not None:
            self._pre_order_traversal(self.root)
   
    # The printTree is a inorder tree walk
    def _pre_order_traversal(self, node):
        #print "current node",node.val
        if node is not None:
            #print "called by",node.val
            print str(node.val) + ' '
            self._pre_order_traversal(node.left)
           
           # print "right child of",node.val,node.right.val
            self._pre_order_traversal(node.right)

""""
the graph is as below

            3
           / \
          0   4
         / \   \
       -1  2    8
          / \
         1   2.5
       
"""

tree = Tree()
tree.add(3)
tree.add(4)
tree.add(0)
tree.add(-1)
tree.add(8)
tree.add(2)
tree.add(2.5)
tree.add(1)
tree.printTree()
print tree.root.val
#print (tree.find(-1)).val
print "find 2",(tree.find(2)).parent.val
#print tree.minimum(4)
#print tree.maximum(0)
print "predecessor",tree.predecessor(-1)
tree.delete_node(4)
#tree.printTree()
tree.level_order_traversal()
tree.find(100)
tree.pre_order_traversal()
#print "successor",tree.successor(8)
       



Tuesday, 5 April 2016

How do you run a scala script in scala command line

You might be using interactive mode of Scala to look at your data. But, if you have written the steps in a dot(.)scala file, then how do you execute. You must know by now that .scala is not mandatory, but its a good practice to follow the convention. We can use the :load option to execute this script from a scala REPL


This is what you do:

1. Write the scala commands in a script CountExample_shell.scala

sc;
val pagecounts = sc.textFile("/home/training/pagecounts/");

// take the first 10 lines and print them
pagecounts.take(10).foreach(println);

pagecounts.count;

// filter only lines that have 'en' (english) for 2nd value of the array
val enPages = pagecounts.filter(_.split(" ")(1) == "en").cache;

enPages.count;

//Create key value pairs in scala
val enTuples = enPages.map(line => line.split(" "));

val enKeyValuePairs = enTuples.map(line => (line(0).substring(0,8) , line(3).toInt));

enKeyValuePairs.reduceByKey(_+_, 1).collect;

enPages.map(l => l.split(" ")).map(l => (l(2), l(3).toInt)).reduceByKey(_+_ , 40).filter( x => x._2 > 200000).map (x => (x._2 , x._1)).collect.foreach(println);

2. Now, to execute the script, use

scala> :load /home/training/CountExample_shell.scala

3. The script will execute and display the below lines.

Loading /home/training/CountExample_shell.scala...
res24: String = /home/training/spark-1.6.0-bin-hadoop2.6/bin/spark-shell
res25: org.apache.spark.SparkContext = org.apache.spark.SparkContext@1fa98a22
pagecounts: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at textFile at <console>:32
20090505-000000 aa Main_Page 2 9980
20090505-000000 ab %D0%90%D0%B8%D0%BD%D1%82%D0%B5%D1%80%D0%BD%D0%B5%D1%82 1 465
20090505-000000 ab %D0%98%D1%85%D0%B0%D0%B4%D0%BE%D1%83_%D0%B0%D0%B4%D0%B0%D2%9F%D1%8C%D0%B0 1 16086
20090505-000000 af.b Tuisblad 1 36236
20090505-000000 af.d Tuisblad 4 189738
20090505-000000 af.q Tuisblad 2 56143
20090505-000000 af Afrika 1 46833
20090505-000000 af Afrikaans 2 53577
20090505-000000 af Australi%C3%AB 1 132432
20090505-000000 af Barack_Obama 1 23368
res27: Long = 1398882                                                          
enPages: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:34
res28: Long = 970545                                                          
enTuples: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:36
enKeyValuePairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at <console>:38
res29: Array[(String, Int)] = Array((20090507,6175726), (20090505,7076855))    
(468159,Special:Search)                                                        
(451126,Main_Page)
(1066734,404_error/)

Please post your queries on Scala and let us know your thoughts.

Saturday, 12 March 2016

TPT export script - How to use the TPT export operator and understanding Data Streams

TPT (Teradata Parallel Transporter) is a standard utility that can be used to export data from Teradata system onto a file.

The below diagram explains how TPT operates:

TPT Export in Teradata
Data is generated using the Export operator. Once posted onto the Data Steam, it is consumed by the Consumer.

This is a widely used methodology. It is called Source-Sink in Spring-XD.
Similar architecture is called Publisher-Subscriber for Kafka. Though it is a Message Broker service.
But the ideology remains the same.

Below, I have given a sample TPT Export script for your reference. As you see , once the operators are created, the producer is applied to the consumer.

/*************************************+
* Script Name - export_data.tpt
* Usage - tbuild -f export_data.tpt
* Author - Tduser 
* Description - exports data from 
*               retail.item to 
*               export_tpt_item.txt file
* Version - Relase 1 - 12-Mar-2016
**************************************/

Define Job export_to_file
description 'export records from item table'
(
define schema item_schema
(
item_id char(30),
country_code char(30)
);

/* Consumer operator since in an export job the final target is the file */

define operator file_writer
type dataconnector consumer
schema item_schema
attributes
(
varchar filename = 'export_tpt_item.txt',
varchar format = 'text',
varchar openmode = 'write',
varchar indicatormode='N'
);


/* Producer operator reads data from a teradata table */

define operator export_item
type export
schema item_schema
attributes
(
varchar username = 'dbc',
varchar userpassword = 'dbc',
varchar tdpid = '127.0.0.1',
integer maxsessions = 8,
varchar selectstmt = '
select cast(l_orderkey as char(30)),
cast(l_partkey as char(30))
from retail.item;');


/* apply the producer output to the consumer */

apply to operator(file_writer[1])
select item_id,country_code
from operator(export_item[1]);

);

Execution Command: Use a wrapper to call the below script:


$> tbuild -f export_script.tpt | tee export_script.tpt.log

Log entries will be as follows:

Teradata Parallel Transporter Version 15.00.00.02
Job log: /opt/teradata/client/15.00/tbuild/logs/root-38.out
Job id is root-38, running on TDExpress150001_Sles10
Teradata Parallel Transporter file_writer[1]: TPT19006 Version 15.00.00.02
Teradata Parallel Transporter SQL Selector Operator Version 15.00.00.02
export_item: private log not specified
file_writer[1]: TPT19010 Instance 1 directing private log report to 'dtacop-root-1447-1'.
file_writer[1]: TPT19007 DataConnector Consumer operator Instances: 1
file_writer[1]: TPT19003 ECI operator ID: 'file_writer-1447'
file_writer[1]: TPT19222 Operator instance 1 processing file 'export_tpt_item.txt'.
export_item: connecting sessions
export_item: sending SELECT request
export_item: retrieving data
export_item: Total Rows Exported:  60175
export_item: finished retrieving data
file_writer[1]: TPT19221 Total files processed: 1.
export_item: disconnecting sessions
export_item: Total processor time used = '0.07 Second(s)'
export_item: Start : Sat Mar 12 11:05:33 2016
export_item: End   : Sat Mar 12 11:05:38 2016
Job step MAIN_STEP completed successfully
Job root completed successfully
Job start: Sat Mar 12 11:05:27 2016
Job end:   Sat Mar 12 11:05:38 2016

Reach out to me if you have any doubts. 
Special Offer, register for the online 7-video course just for USD 15.




Wednesday, 21 October 2015

Minimum or Min function in Hive - Use of min() over(partition by ) in Hiveql

 Say , we have this table products in Hive.
+----------+-------+-------+
|   name   | price | notes |
+----------+-------+-------+
| product1 |   100 |       |
| product1 |   200 | note1 |
| product2 |    10 | note2 |
| product2 |     5 | note2 |
+----------+-------+-------+
and I expect to get this result (distinct of products with minimum price)
+----------+-------+-------+
|   name   | price | notes |
+----------+-------+-------+
| product1 |   100 |       |
| product2 |     5 | note2 |
+----------+-------+-------+
How do we go about it:

1. The subquery approach:

select a.name,a.price,b.notes from (select name,min(price) as price from products group by name) as a inner join (select name,price,notes from products) as b on a.name = b.name and a.price = b.price;

2. The alternate and better approach is to use window partitioning:

select name,price,notes from (select *, min(price)over(partition by name) as min_price from products) as 
where a.price = a.min_price;


The 2nd approach is better as it invokes a single-map task to read the table while the first approach will invoke 2 map tasks even on a single node system.