edu.isi.pegasus.planner.code.generator.condor
public class SUBDAXGenerator extends Object
Modifier and Type | Field and Description |
---|---|
private static String |
CACHE_FILE_SUFFIX
Suffix to be applied for cache file generation.
|
static String |
CONDOR_DAGMAN_LOGICAL_NAME
The logical name with which to query the transformation catalog for the
condor_dagman executable, that ends up running the mini dag as one
job.
|
static String |
CONDOR_DAGMAN_NAMESPACE
The namespace to use for condor dagman.
|
static String |
CPLANNER_LOGICAL_NAME
The logical name with which to query the transformation catalog for
cPlanner executable.
|
static String[][] |
DAGMAN_KNOBS
The dagman knobs controlled through property.
|
static String |
DEFAULT_SUBDAX_CATEGORY_KEY
The default category for the sub dax jobs.
|
static boolean |
GENERATE_SUBDAG_KEYWORD
Whether to generate the SUBDAG keyword or not.
|
private PegasusBag |
mBag
Bag of Pegasus objects
|
private PegasusProperties.CLEANUP_SCOPE |
mCleanupScope
The cleanup scope for the workflows.
|
private long |
mCondorVersion
The long value of condor version.
|
private String |
mCurrentDAGCacheFile
Cache file for the current DAG
|
private ADag |
mDAG |
private PrintWriter |
mDAGWriter
The print writer handle to DAG file being written out.
|
private Map<String,String> |
mDAXJobIDToSubmitDirectoryCacheFile
Maps a sub dax job id to it's submit directory.
|
private LogManager |
mLogger
Handle to the logging manager.
|
private NumberFormat |
mNumFormatter
The number formatter to format the run submit dir entries.
|
private PlannerOptions |
mPegasusPlanOptions
The object containing all the options passed to the Concrete Planner.
|
private PegasusProperties |
mProps
The handle to Pegasus Properties.
|
private SiteStore |
mSiteStore |
private TransformationCatalog |
mTCHandle
The handle to the transformation catalog
|
private String |
mUser
The username of the user running the program.
|
private Graph |
mWorkflow |
static String |
NAMESPACE
The namespace to which the job in the MEGA DAG being created refer to.
|
static String |
RETRY_LOGICAL_NAME
The planner utility that needs to be called as a prescript.
|
Constructor and Description |
---|
SUBDAXGenerator()
The default constructor.
|
Modifier and Type | Method and Description |
---|---|
protected Job |
constructDAGJob(Job subdaxJob,
File directory,
File subdaxDirectory,
String basenamePrefix)
Constructs a job that plans and submits the partitioned workflow,
referred to by a Partition.
|
String |
constructDAGManKnobs(Job job)
Constructs Any extra arguments that need to be passed to dagman, as determined
from the properties file.
|
Job |
constructPegasusPlanPrescript(Job job,
PlannerOptions options,
String rootUUID,
String properties,
String log)
Constructs the pegasus plan prescript for the subdax
|
protected File |
constructPlannerPrescriptWrapper(Job dagJob,
File directory,
String executable,
String arguments)
Construct a pegasus plan wrapper script that changes the directory in which
pegasus-plan is launched.
|
private TransformationCatalogEntry |
constructTCEntryFromEnvironment()
Returns a transformation catalog entry object constructed from the environment
An entry is constructed if either of the following environment variables
are defined
1) CONDOR_HOME
2) CONDOR_LOCATION
CONDOR_HOME takes precedence over CONDOR_LOCATION
|
private TransformationCatalogEntry |
constructTCEntryFromEnvProfiles(ENV env)
Returns a tranformation catalog entry object constructed from the environment
An entry is constructed if either of the following environment variables
are defined
1) CONDOR_HOME
2) CONDOR_LOCATION
CONDOR_HOME takes precedence over CONDOR_LOCATION
|
private TransformationCatalogEntry |
constructTCEntryFromPath()
Returns a tranformation catalog entry object constructed from the path
environment variable
|
private TransformationCatalogEntry |
constructTransformationCatalogEntryForDAGMan(String path)
Constructs TransformationCatalogEntry for DAGMan.
|
protected String |
createSubmitDirectory(ADag dag,
String dir,
String user,
String vogroup,
boolean timestampBased)
Creates the submit directory for the workflow.
|
protected String |
createSubmitDirectory(String label,
String dir,
String user,
String vogroup,
boolean timestampBased)
Creates the submit directory for the workflow.
|
protected boolean |
createSymbolicLink(String source,
String destination)
This method generates a symlink between two files
|
protected boolean |
createSymbolicLink(String source,
String destination,
boolean logErrorToDebug)
This method generates a symlink between two files
|
boolean |
createSymbolicLinktoCacheFile(PlannerOptions options,
String label,
String index)
Creates a symbolic link to the DAX file in a dax sub directory in the
submit directory
|
String |
createSymbolicLinktoDAX(String submitDirectory,
String dax)
Creates a symbolic link to the DAX file in a dax sub directory in the
submit directory
|
private TransformationCatalogEntry |
defaultTCEntry(String site)
Returns a default TC entry to be used in case entry is not found in the
transformation catalog.
|
Job |
generateCode(Job job)
Generates code for a job
|
protected String |
getBasename(String prefix,
String suffix)
Returns the basename of a dagman (usually) related file for a particular
partition.
|
protected String |
getCacheFile(PlannerOptions options,
String label,
String index)
Returns the path to the cache file in a workflow's submit directory
|
protected String |
getCacheFileName(PlannerOptions options,
String label,
String index)
Constructs the basename to the cache file that is to be used
to log the transient files.
|
Set<String> |
getParentsTransientRC(Job job)
Returns a set containing the paths to the parent dax jobs
transient replica catalogs.
|
protected String |
getWorkflowFileBasenamePrefix(PlannerOptions options,
String label,
String index) |
protected String |
getWorkflowFileName(PlannerOptions options,
String label,
String index,
String suffix)
Constructs the basename to a workflow file that.
|
void |
initialize(PegasusBag bag,
ADag dag,
Graph workflow,
PrintWriter dagWriter)
Initializes the class.
|
protected static int |
parseInt(String s)
Parses a string into an integer.
|
protected static void |
sanityCheck(File dir)
Checks the destination location for existence, if it can
be created, if it is writable etc.
|
public static final String DEFAULT_SUBDAX_CATEGORY_KEY
public static final boolean GENERATE_SUBDAG_KEYWORD
private static final String CACHE_FILE_SUFFIX
public static final String CPLANNER_LOGICAL_NAME
public static final String CONDOR_DAGMAN_NAMESPACE
public static final String CONDOR_DAGMAN_LOGICAL_NAME
public static final String NAMESPACE
public static final String RETRY_LOGICAL_NAME
public static final String[][] DAGMAN_KNOBS
private String mUser
private NumberFormat mNumFormatter
private PlannerOptions mPegasusPlanOptions
private PegasusProperties mProps
private LogManager mLogger
private PegasusBag mBag
private PrintWriter mDAGWriter
private TransformationCatalog mTCHandle
private PegasusProperties.CLEANUP_SCOPE mCleanupScope
private long mCondorVersion
private Map<String,String> mDAXJobIDToSubmitDirectoryCacheFile
private Graph mWorkflow
private ADag mDAG
private SiteStore mSiteStore
private String mCurrentDAGCacheFile
public void initialize(PegasusBag bag, ADag dag, Graph workflow, PrintWriter dagWriter)
bag
- the bag of objects required for initializationdag
- the dag for which code is being generatedworkflow
- the graph representation of the dagdaxReplicaStore
- the dax replica store.dagWriter
- handle to the dag writerpublic Job generateCode(Job job)
job
- the job for which code has to be generated.Job
if a submit file needs to be generated
for the job. Else return null.protected File constructPlannerPrescriptWrapper(Job dagJob, File directory, String executable, String arguments)
dagJob
- the DAG job corresponding to which the prescript is associated.directory
- the directory where the submit file for dagman job has
to be written out to.executable
- the path to the planner that needs to be called in the prescriptarguments
- the arguments with which the planner is called.protected Job constructDAGJob(Job subdaxJob, File directory, File subdaxDirectory, String basenamePrefix)
subdaxJob
- the original subdax job.directory
- the directory where the submit file for dagman job has
to be written out to.subdaxDirectory
- the submit directory where the submit files for the
subdag reside.basenamePrefix
- the basename to be assigned to the files associated
with DAGManpublic String constructDAGManKnobs(Job job)
job
- the jobprotected static int parseInt(String s)
s
- the String to be parsed as integerprotected String getBasename(String prefix, String suffix)
prefix
- the prefix.suffix
- the suffix for the file basename.protected String getCacheFile(PlannerOptions options, String label, String index)
options
- the options for the workflow.label
- the label for the workflow.index
- the index for the workflow.protected String getCacheFileName(PlannerOptions options, String label, String index)
options
- the options for the sub workflow.label
- the label for the workflow.index
- the index for the workflow.protected String getWorkflowFileName(PlannerOptions options, String label, String index, String suffix)
options
- the options for the sub workflow.label
- the label for the workflow.index
- the index for the workflow.suffix
- the suffix for the workfklow file.protected String getWorkflowFileBasenamePrefix(PlannerOptions options, String label, String index)
private TransformationCatalogEntry defaultTCEntry(String site)
site
- the site for which the default entry is required.private TransformationCatalogEntry constructTCEntryFromEnvironment()
private TransformationCatalogEntry constructTCEntryFromPath()
env
- the environment profiles.private TransformationCatalogEntry constructTCEntryFromEnvProfiles(ENV env)
env
- the environment profiles.private TransformationCatalogEntry constructTransformationCatalogEntryForDAGMan(String path)
path
- path to dagmanpublic Job constructPegasusPlanPrescript(Job job, PlannerOptions options, String rootUUID, String properties, String log)
job
- the subdax joboptions
- the planner options with which subdax has to be invokedrootUUID
- the root workflow uuidproperties
- the properties file.log
- the log for the prescript outputpublic boolean createSymbolicLinktoCacheFile(PlannerOptions options, String label, String index)
options
- the options for the sub workflow.label
- the label for the workflow.index
- the index for the workflow.public String createSymbolicLinktoDAX(String submitDirectory, String dax)
submitDirectory
- the submit directory for the sub workflow.dax
- the dax file to which the symbolic link has
to be created.protected String createSubmitDirectory(ADag dag, String dir, String user, String vogroup, boolean timestampBased) throws IOException
dag
- the workflow being worked upon.dir
- the base directory specified by the user.user
- the username of the user.vogroup
- the vogroup to which the user belongs to.timestampBased
- boolean indicating whether to have a timestamp based dir or notIOException
- in case of unable to create submit directory.protected String createSubmitDirectory(String label, String dir, String user, String vogroup, boolean timestampBased) throws IOException
label
- the label of the workflowdir
- the base directory specified by the user.user
- the username of the user.vogroup
- the vogroup to which the user belongs to.timestampBased
- boolean indicating whether to have a timestamp based dir or notIOException
- in case of unable to create submit directory.protected static void sanityCheck(File dir) throws IOException
dir
- is the new base directory to optionally create.IOException
- in case of error while writing out files.protected boolean createSymbolicLink(String source, String destination)
source
- the file that has to be symlinkeddestination
- the destination of the symlinkprotected boolean createSymbolicLink(String source, String destination, boolean logErrorToDebug)
source
- the file that has to be symlinkeddestination
- the destination of the symlinklogErrorToDebug
- whether to log messeage to debug or not