To enable control flow, you can embed Pig Latin statements and Pig commands in the Python, JavaScript and Groovy scripting languages using a JDBC-like compile, bind, run model. For Python, make sure the Jython jar is included in your class path. For JavaScript, make sure the Rhino jar is included in your classpath. For Groovy, make sure the groovy-all jar is included in your classpath.
Note that host languages and the languages of UDFs (included as part of the embedded Pig) are completely orthogonal. For example, a Pig Latin statement that registers a Python UDF may be embedded in Python, JavaScript, or Java. The exception to this rule is "combined" scripts – here the languages must match (see the Advanced Topics for Python, Advanced Topics for JavaScript and Advanced Topics for Groovy).
Embedded Pig is supported in batch mode only, not interactive mode. You can request that embedded Pig be used by adding the --embedded option to the Pig command line. If this option is passed as an argument, that argument will refer to the language Pig is embedded in, either Python, JavaScript or Groovy. If no argument is specified, it is taken to refer to the reference implementation for Python.
Python
$ pig myembedded.py
Pig will look for the #!/usr/bin/python line in the script.
#!/usr/bin/python # explicitly import Pig class from org.apache.pig.scripting import Pig # COMPILE: compile method returns a Pig object that represents the pipeline P = Pig.compile("a = load '$in'; store a into '$out';") input = 'original' output = 'output' # BIND and RUN result = P.bind().runSingle() if result.isSuccessful() : print 'Pig job succeeded' else : raise 'Pig job failed'
JavaScript
$ pig myembedded.js
Pig will look for the *.js extension in the script.
importPackage(Packages.org.apache.pig.scripting.js) Pig = org.apache.pig.scripting.js.JSPig function main() < input = "original" output = "output" P = Pig.compile("A = load '$in'; store A into '$out';") result = P.bind().runSingle() if (result.isSuccessful()) < print("Pig job succeeded") >else < print("Pig job failed") >>
Groovy
$ pig myembedded.groovy
Pig will look for the *.groovy extension in the script.
import org.apache.pig.scripting.Pig; public static void main(String[] args) < String input = "original" String output = "output" Pig P = Pig.compile("A = load '\$in'; store A into '\$out';") result = P.bind(['in':input, 'out':output]).runSingle() if (result.isSuccessful()) < print("Pig job succeeded") >else < print("Pig job failed") >>
Invocation Process
You invoke Pig in the host scripting language through an embedded Pig object.
Compile: Compile is a static function on the Pig class and in its simplest form takes a fragment of Pig Latin that defines the pipeline as its input:
# COMPILE: complie method returns a Pig object that represents the pipeline P = Pig.compile("""A = load '$in'; store A into '$out';""")
Compile returns an instance of Pig object. This object can have certain values undefined. For example, you may want to define a pipeline without yet specifying the location of the input to the pipeline. The parameter will be indicated by a dollar sign followed by a sequence of alpha-numeric or underscore characters. Values for these parameters must be provided later at the time bind() is called on the Pig object. To call run() on a Pig object without all parameters being bound is an error.
Bind: Resolve the parameters during the bind call.
input = "original” output #BoundScript-Object">BoundScript object that can be used to execute the pipeline. The simplest way to execute the pipeline is to call runSingle function. (However, as mentioned later, this works only if a single set of variables is bound to the parameters. Otherwise, if multiple set of variables are bound, an exception will be thrown if runSingle is called.)result = Q.runSingle()The function returns a PigStats object that tells you whether the run succeeded or failed. In case of success, additional run statistics are provided.
Embedded Python Example
A complete embedded example is shown below.
#!/usr/bin/python # explicitly import Pig class from org.apache.pig.scripting import Pig # COMPILE: compile method returns a Pig object that represents the pipeline P = Pig.compile("""A = load '$in'; store A into '$out';""") input = "original” output = "output” # BIND: bind method binds the variables with the parameters in the pipeline and returns a BoundScript object Q = P.bind() # In this case, only one set of variables is bound to the pipeline, runSingle method returns a PigStats object. # If multiple sets of variables are bound to the pipeline, run method instead must be called and it returns # a list of PigStats objects. result = Q.runSingle() # check the result if result.isSuccessful(): print "Pig job succeeded" else: raise "Pig job failed" OR, SIMPLY DO THIS: #!/usr/bin/python # explicitly import Pig class from org.apache.pig.scripting import Pig in = "original” out = "output” # implicitly bind the parameters to the local variables result= Pig.compile("""A = load '$in'; store A into '$out';""").bind().runSingle() if result.isSuccessful(): print "Pig job succeeded" else: raise "Pig job failed"Invocation Details
All three APIs (compile, bind, run) discussed in the previous section have several versions depending on what you are trying to do.
Compile
In its basic form, compile just takes a Pig Latin fragment that defines the pipeline as described in the previous section. Additionally, the pipeline can be given a name. This name is only used when the embedded script is invoked via the PigRunner Java API (as discussed later in this document).
P = Pig.compile("P1", """A = load '$in'; store A into '$out';""")In addition to providing Pig script via a string, you can store it in a file and pass the file to the compile call:
P = Pig.compileFromFile("myscript.pig")You can also name a pipeline stored in the script:
P = Pig.compileFromFile("P2", "myscript.pig")Bind
In its simplest form, bind takes no parameters. In this case, an implicit bind is performed; Pig internally constructs a map of parameters from the local variables specified by the user in the script.
Q = P.bind()Finally, you might want to run the same pipeline in parallel with a different set of parameters, for instance for multiple dates. In this case, bind function, needs to be passed a list of maps with each element of the list containing parameters for a single invocation. In the example below, the pipeline is run for the US, the UK, and Brazil.
P = Pig.compile("""A = load '$in'; B = filter A by user is not null; . store Z into '$out'; """) Q = P.bind([, , ]) results = Q.run() # it blocks until all pipelines are completed for i in [0, 1, 2]: result = results[i] . # check result for each pipelineRun
We have already seen that the simplest way to run a script is to call runSingle without any parameters. Additionally, a Java Properties object or a file containing a list of properties can be passed to this call. The properties are passed to Pig and a treated as any other properties passed from command line.
# In a jython script from java.util import Properties . . props = Properties() props.put(key1, val1) props.put(key2, val2) . . Pig.compile(. ).bind(. ).runSingle(props)A more general version of run allows to run one or more pipelines concurrently. In this case, a list of PigStats results is returned – one for each pipeline run. The example in the previous section shows how to make use of this call.
As the case with runSingle, a set of Java Properties or a property file can be passed to the call.
Passing Parameters to a Script
Inside your script, you can define parameters and then pass parameters from command line to your script. There are two ways to pass parameters to your script:
1. -param
Similar to regular Pig parameter substitution, you can define parameters using -param/–param_file on Pig's command line. This variable will be treated as one of the binding variables when binding the Pig Latin script. For example, you can invoke the below Python script using: pig –param loadfile=student.txt script.py.
#!/usr/bin/python from org.apache.pig.scripting import Pig P = Pig.compile("""A = load '$loadfile' as (name, age, gpa); store A into 'output';""") Q = P.bind() result = Q.runSingle()2. Command line arguments
Currently this feature is only available in Python and Groovy. You can pass command line arguments (the arguments after the script file name) to Python. These will become sys.argv in Python and will be passed as main's arguments in Groovy. For example: pig script.py student.txt. The corresponding script is:
#!/usr/bin/python import sys from org.apache.pig.scripting import Pig P = Pig.compile("A = load '" + sys.argv[1] + "' as (name, age, gpa);" + "store A into 'output';"); Q = P.bind() result = Q.runSingle()and in Groovy, pig script.groovy student.txt:
import org.apache.pig.scripting.Pig; public static void main(String[] args)PigRunner API
Starting with Pig 0.8, some applications such as Oozie workflow invoke Pig using the PigRunner Java class rather than through the command line. For these applications, the PigRunner interface has been expanded to accommodate embedded Pig. PigRunner accepts Python and JavaScript scripts as input. These scripts can potentially contain multiple Pig pipelines; therefore, we need a way to return results for all of them.
To do this and to preserve backward compatibility PigStats and related objects were expanded as shown below:
For more details, see Java Objects.
This example shows you how to pass an entire Pig script to the compile call.
#!/usr/bin/python from org.apache.pig.scripting import Pig P = Pig.compileFromFile("""myscript.pig""") input = "original" output = "output" result = p.bind().runSingle() if result.isSuccessful(): print "Pig job succeeded" else: raise "Pig job failed"
There is a class of problems that involve iterating over a data pipeline an indeterminate number of times until a certain value is reached. Examples arise in machine learning, graph traversal, and a host of numerical analysis problems which involve finding interpolations, extrapolations or regressions. The Python example below shows one way to achieve convergence using Pig scripts.
#!/usr/bin/python # explicitly import Pig class from org.apache.pig.scripting import Pig P = Pig.compile("""A = load '$input' as (user, age, gpa); B = group A all; C = foreach B generate AVG(A.gpa); store C into '$output'; """) # initial output input = "studenttab5" output = "output-5" final = "final-output" for i in range(1, 4): Q = P.bind() # attaches $input, $output in Pig Latin to input, output Python variable results = Q.runSingle() if results.isSuccessful() == "FAILED": raise "Pig job failed" iter = results.result("C").iterator() if iter.hasNext(): tuple = iter.next() value = tuple.get(0) if float(str(value)) < 3: print "value: " + str(value) input = "studenttab" + str(i+5) output = "output-" + str(i+5) print "output: " + output else: Pig.fs("mv " + output + " " + final) break
A number of user frameworks do automated generation of Pig Latin.
A sub-use case of automated generation is conditional code generation. Different processing might be required based on whether this is weekday or a weekend.
str = "A = load 'input';" if today.isWeekday(): str = str + "B = filter A by weekday_filter(*);" else: str = str + "B = filter A by weekend_filter(*);" str = str + "C = group B by user;" results = Pig.compile(str).bind().runSingle()
Another sub-use case of automated generation is parallel execution of identical pipelines. You may have a single pipeline that you would like to run multiple data sets through in parallel. In the example below, the pipeline is run for the US, the UK, and Brazil.
P = Pig.compile("""A = load '$in'; B = filter A by user is not null; . store Z into '$out'; """) Q = P.bind([, , ]) results = Q.run() # it blocks until all pipelines are completed for i in [0, 1, 2]: result = results[i] . # check result for each pipeline
public class Pig < /** * Run a filesystem command. Any output from this command is written to * stdout or stderr as appropriate. * @param cmd Filesystem command to run along with its arguments as one * string. * @throws IOException */ public static void fs(String cmd) throws IOException /** * Register a jar for use in Pig. Once this is done this jar will be * registered for ALL SUBSEQUENT Pig pipelines in this script. * If you wish to register it for only a single Pig pipeline, use * register within that definition. * @param jarfile Path of jar to include. * @throws IOException if the indicated jarfile cannot be found. */ public static void registerJar(String jarfile) throws IOException /** * Register script UDFs for use in Pig. Once this is done all UDFs * defined in the file will be available for ALL SUBSEQUENT * Pig pipelines in this script. If you wish to register UDFS for * only a single Pig pipeline, use register within that definition. * @param udffile Path of the script UDF file * @param namespace namespace of the UDFs * @throws IOException */ public static void registerUDF(String udffile, String namespace) throws IOException /** * Define an alias for a UDF or a streaming command. This definition * will then be present for ALL SUBSEQUENT Pig pipelines defined in this * script. If you wish to define it for only a single Pig pipeline, use * define within that definition. * @param alias name of the defined alias * @param definition string this alias is defined as */ public static void define(String alias, String definition) throws IOException /** * Set a variable for use in Pig Latin. This set * will then be present for ALL SUBSEQUENT Pig pipelines defined in this * script. If you wish to set it for only a single Pig pipeline, use * set within that definition. * @param var variable to set * @param value to set it to */ public static void set(String var, String value) throws IOException /** * Define a Pig pipeline. * @param pl Pig Latin definition of the pipeline. * @return Pig object representing this pipeline. * @throws IOException if the Pig Latin does not compile. */ public static Pig compile(String pl) throws IOException /** * Define a named portion of a Pig pipeline. This allows it * to be imported into another pipeline. * @param name Name that will be used to define this pipeline. * The namespace is global. * @param pl Pig Latin definition of the pipeline. * @return Pig object representing this pipeline. * @throws IOException if the Pig Latin does not compile. */ public static Pig compile(String name, String pl) throws IOException /** * Define a Pig pipeline based on Pig Latin in a separate file. * @param filename File to read Pig Latin from. This must be a purely * Pig Latin file. It cannot contain host language constructs in it. * @return Pig object representing this pipeline. * @throws IOException if the Pig Latin does not compile or the file * cannot be found. */ public static Pig compileFromFile(String filename) throws IOException /** * Define a named Pig pipeline based on Pig Latin in a separate file. * This allows it to be imported into another pipeline. * @param name Name that will be used to define this pipeline. * The namespace is global. * @param filename File to read Pig Latin from. This must be a purely * Pig Latin file. It cannot contain host language constructs in it. * @return Pig object representing this pipeline. * @throws IOException if the Pig Latin does not compile or the file * cannot be found. */ public static Pig compileFromFile(String name, String filename) throws IOException /** * Bind this to a set of variables. Values must be provided * for all Pig Latin parameters. * @param vars map of variables to bind. Keys should be parameters defined * in the Pig Latin. Values should be strings that provide values for those * parameters. They can be either constants or variables from the host * language. Host language variables must contain strings. * @return a object * @throws IOException if there is not a key for each * Pig Latin parameter or if they contain unsupported types. */ public BoundScript bind(Map vars) throws IOException /** * Bind this to multiple sets of variables. This will * cause the Pig Latin script to be executed in parallel over these sets of * variables. * @param vars list of maps of variables to bind. Keys should be parameters defined * in the Pig Latin. Values should be strings that provide values for those * variables. They can be either constants or variables from the host * language. Host language variables must be strings. * @return a object * @throws IOException if there is not a key for each * Pig Latin parameter or if they contain unsupported types. */ public BoundScript bind(List
public class BoundScript < /** * Run a pipeline on Hadoop. * If there are no stores in this pipeline then nothing will be run. * @return , null if there is no bound query to run. * @throws IOException */ public PigStats runSingle() throws IOException /** * Run a pipeline on Hadoop. * If there are no stores in this pipeline then nothing will be run. * @param prop Map of properties that Pig should set when running the script. * This is intended for use with scripting languages that do not support * the Properties object. * @return , null if there is no bound query to run. * @throws IOException */ public PigStats runSingle(Properties prop) throws IOException /** * Run a pipeline on Hadoop. * If there are no stores in this pipeline then nothing will be run. * @param propfile File with properties that Pig should set when running the script. * @return , null if there is no bound query to run. * @throws IOException */ public PigStats runSingle(String propfile) throws IOException /** * Run multiple instances of bound pipeline on Hadoop in parallel. * If there are no stores in this pipeline then nothing will be run. * Bind is called first with the list of maps of variables to bind. * @return a list of , one for each map of variables passed * to bind. * @throws IOException */ public List run() throws IOException /** * Run multiple instances of bound pipeline on Hadoop in parallel. * @param prop Map of properties that Pig should set when running the script. * This is intended for use with scripting languages that do not support * the Properties object. * @return a list of , one for each map of variables passed * to bind. * @throws IOException */ public List run(Properties prop) throws IOException /** * Run multiple instances of bound pipeline on Hadoop in parallel. * @param propfile File with properties that Pig should set when running the script. * @return a list of PigResults, one for each map of variables passed * to bind. * @throws IOException */ public List run(String propfile) throws IOException /** * Run illustrate for this pipeline. Results will be printed to stdout. * @throws IOException if illustrate fails. */ public void illustrate() throws IOException /** * Explain this pipeline. Results will be printed to stdout. * @throws IOException if explain fails. */ public void explain() throws IOException /** * Describe the schema of an alias in this pipeline. * Results will be printed to stdout. * @param alias to be described * @throws IOException if describe fails. */ public void describe(String alias) throws IOException >
public abstract class PigStats < public abstract boolean isEmbedded(); /** * An embedded script contains one or more pipelines. * For a named pipeline in the script, the key in the returning map is the name of the pipeline. * Otherwise, the key in the returning map is the script id of the pipeline. */ public abstract Map> getAllStats(); public abstract List getAllErrorMessages(); >
public interface PigProgressNotificationListener extends java.util.EventListener < /** * Invoked just before launching MR jobs spawned by the script. * @param scriptId id of the script * @param numJobsToLaunch the total number of MR jobs spawned by the script */ public void launchStartedNotification(String scriptId, int numJobsToLaunch); /** * Invoked just before submitting a batch of MR jobs. * @param scriptId id of the script * @param numJobsSubmitted the number of MR jobs in the batch */ public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted); /** * Invoked after a MR job is started. * @param scriptId id of the script * @param assignedJobId the MR job id */ public void jobStartedNotification(String scriptId, String assignedJobId); /** * Invoked just after a MR job is completed successfully. * @param scriptId id of the script * @param jobStats the object associated with the MR job */ public void jobFinishedNotification(String scriptId, JobStats jobStats); /** * Invoked when a MR job fails. * @param scriptId id of the script * @param jobStats the object associated with the MR job */ public void jobFailedNotification(String scriptId, JobStats jobStats); /** * Invoked just after an output is successfully written. * @param scriptId id of the script * @param outputStats the object associated with the output */ public void outputCompletedNotification(String scriptId, OutputStats outputStats); /** * Invoked to update the execution progress. * @param scriptId id of the script * @param progress the percentage of the execution progress */ public void progressUpdatedNotification(String scriptId, int progress); /** * Invoked just after all MR jobs spawned by the script are completed. * @param scriptId id of the script * @param numJobsSucceeded the total number of MR jobs succeeded */ public void launchCompletedNotification(String scriptId, int numJobsSucceeded); >
To enable control flow, you can embed Pig Latin statements and Pig commands in the Java programming language.
Note that host languages and the languages of UDFs (included as part of the embedded Pig) are completely orthogonal. For example, a Pig Latin statement that registers a Java UDF may be embedded in Python, JavaScript, Groovy, or Java. The exception to this rule is "combined" scripts – here the languages must match (see the Advanced Topics for Python, Advanced Topics for JavaScript and Advanced Topics for Groovy).
Currently, PigServer is the main interface for embedding Pig in Java. PigServer can now be instantiated from multiple threads. (In the past, PigServer contained references to static data that prevented multiple instances of the object to be created from different threads within your application.) Please note that PigServer is NOT thread safe; the same object can't be shared across multiple threads.
Local Mode
From your current working directory, compile the program. (Note that idlocal.class is written to your current working directory. Include “.” in the class path when you run the program.)
$ javac -cp pig.jar idlocal.java
From your current working directory, run the program. To view the results, check the output file, id.out.
Unix: $ java -cp pig.jar:. idlocal Windows: $ java –cp .;pig.jar idlocal
idlocal.java - The sample code is based on Pig Latin statements that extract all user IDs from the /etc/passwd file. Copy the /etc/passwd file to your local working directory.
import java.io.IOException; import org.apache.pig.PigServer; public class idlocal < public static void main(String[] args) < try < PigServer pigServer = new PigServer("local"); runIdQuery(pigServer, "passwd"); >catch(Exception e) < >> public static void runIdQuery(PigServer pigServer, String inputFile) throws IOException < pigServer.registerQuery("A = load '" + inputFile + "' using PigStorage(':');"); pigServer.registerQuery("B = foreach A generate $0 as id;"); pigServer.store("B", "id.out"); >>
Mapreduce Mode
Point $HADOOPDIR to the directory that contains the hadoop-site.xml file. Example:
$ export HADOOPDIR=/yourHADOOPsite/conf
From your current working directory, compile the program. (Note that idmapreduce.class is written to your current working directory. Include “.” in the class path when you run the program.)
$ javac -cp pig.jar idmapreduce.java
From your current working directory, run the program. To view the results, check the idout directory on your Hadoop system.
Unix: $ java -cp pig.jar. $HADOOPDIR idmapreduce Cygwin: $ java –cp '.;pig.jar;$HADOOPDIR' idmapreduce
idmapreduce.java - The sample code is based on Pig Latin statements that extract all user IDs from the /etc/passwd file. Copy the /etc/passwd file to your home directory on the HDFS.
import java.io.IOException; import org.apache.pig.PigServer; public class idmapreduce < public static void main(String[] args) < try < PigServer pigServer = new PigServer("mapreduce"); runIdQuery(pigServer, "passwd"); >catch(Exception e) < >> public static void runIdQuery(PigServer pigServer, String inputFile) throws IOException < pigServer.registerQuery("A = load '" + inputFile + "' using PigStorage(':');") pigServer.registerQuery("B = foreach A generate $0 as id;"); pigServer.store("B", "idout"); >>
Pig Latin supports the definition, expansion, and import of macros.