edu.isi.pegasus.planner.refiner
public class TransferEngine extends Engine
Modifier and Type | Field and Description |
---|---|
static int |
DELETED_JOBS_LEVEL
The MAX level is assigned as the level for deleted jobs.
|
private boolean |
mBypassStagingForInputs
A boolean indicating whether to bypass first level staging for inputs
|
private ADag |
mDag
The DAG object to which the transfer nodes are to be added.
|
private List |
mDeletedJobs
Holds all the jobs deleted by the reduction algorithm.
|
private org.griphyn.vdl.euryale.FileFactory |
mFactory
The handle to the file factory, that is used to create the top level
directories for each of the partitions.
|
private OutputMapper |
mOutputMapper
Handle to an OutputMapper that tells what
|
private String |
mOutputSite
The output site where files need to be staged to.
|
private PlannerCache |
mPlannerCache
A SimpleFile Replica Catalog, that tracks all the files that are being
materialized as part of workflow executaion.
|
private PlannerOptions |
mPlannerOptions
The planner options passed to the planner
|
private ReplicaCatalogBridge |
mRCBridge
The bridge to the Replica Catalog.
|
private ReplicaSelector |
mReplicaSelector
The handle to the replica selector that is to used to select the various
replicas.
|
private boolean |
mSetupForCondorIO
A boolean to track whether condor file io is used for the workflow or not.
|
private Map<String,NameValue> |
mSRMServiceURLToMountPointMap
A map that associates the site name with the SRM server url and mount point.
|
private Refiner |
mTXRefiner
The handle to the transfer refiner that adds the transfer nodes into the
workflow.
|
protected boolean |
mUseSymLinks
This member variable if set causes the destination URL for the symlink jobs
to have symlink:// url if the pool attributed associated with the pfn
is same as a particular jobs execution pool.
|
protected String |
mWorkDir
The working directory relative to the mount point of the execution pool.
|
private boolean |
mWorkerNodeExecution
A boolean indicating whether we are doing worker node execution or not.
|
private ReplicaCatalog |
mWorkflowCache
A Replica Catalog, that tracks all the GET URL's for the files on the
staging sites.
|
static String |
REFINER_NAME
The name of the refiner for purposes of error logging
|
static String |
SRM_MOUNT_POINT_PROPERTIES_SUFFIX
The suffix to retrive the mount point for SRM server.
|
static String |
SRM_PROPERTIES_PREFIX
The property prefix for retrieving SRM properties.
|
static String |
SRM_SERVICE_URL_PROPERTIES_SUFFIX
The suffix to retrive the service url for SRM server.
|
static String |
WORKFLOW_CACHE_FILE_IMPLEMENTOR
The name of the Replica Catalog Implementer that is used to write out
the workflow cache file in the submit directory.
|
static String |
WORKFLOW_CACHE_REPLICA_CATALOG_KEY
The name of the source key for Replica Catalog Implementer that serves as
cache
|
mBag, mLogger, mLogMsg, mOutputPool, mPoolFile, mPOptions, mProps, mRLIUrl, mSiteStore, mTCFile, mTCHandle, mTCMode, REGISTRATION_UNIVERSE, TRANSFER_UNIVERSE
Constructor and Description |
---|
TransferEngine(ADag reducedDag,
PegasusBag bag,
List<Job> deletedJobs,
List<Job> deletedLeafJobs)
Overloaded constructor.
|
Modifier and Type | Method and Description |
---|---|
void |
addTransferNodes(ReplicaCatalogBridge rcb,
PlannerCache plannerCache)
Adds the transfer nodes to the workflow.
|
private boolean |
bypassStagingForInputFile(ReplicaCatalogEntry entry,
PegasusFile file,
String computeSite)
Returns a boolean indicating whether to bypass first level staging for a
file or not
|
private void |
complainForScratchFileServer(Job job,
FileServerType.OPERATION operation,
String site)
Complains for a missing head node file server on a site for a job
|
private void |
complainForScratchFileServer(String jobname,
FileServerType.OPERATION operation,
String site)
Complains for a missing head node file server on a site for a job
|
private FileTransfer |
constructFileTX(PegasusFile pf,
Job job,
String destSiteHandle,
String path,
boolean localTransfer)
Constructs the FileTransfer object on the basis of the transiency
information.
|
private String |
constructRegistrationURL(String site,
String lfn)
Constructs a Registration URL for a LFN
|
private Map<String,NameValue> |
constructSiteToSRMServerMap(PegasusProperties props)
Constructs a Properties objects by parsing the relevant SRM
pegasus properties.
|
private String |
getCacheFileName(ADag adag)
Constructs the basename to the cache file that is to be used
to log the transient files.
|
private Vector |
getDeletedFileTX(String pool,
Job job)
This gets the file transfer objects corresponding to the location of files
found in the replica mechanism, and transfers it to the output pool asked
by the user.
|
private void |
getFilesFromRC(DAGJob job,
Collection searchFiles)
Special Handling for a DAGJob for retrieving files from the Replica Catalog.
|
private void |
getFilesFromRC(DAXJob job,
Collection searchFiles)
Special Handling for a DAXJob for retrieving files from the Replica Catalog.
|
private void |
getFilesFromRC(Job job,
Collection searchFiles)
It looks up the RCEngine Hashtable to lookup the locations for the
files and add nodes to transfer them.
|
private Vector |
getFileTX(String destPool,
Job job,
boolean localTransfer)
This gets the Vector of FileTransfer objects for the files which have to
be transferred to an one destination pool.
|
private Collection<FileTransfer>[] |
getInterpoolFileTX(Job job,
List<GraphNode> parents)
This gets the Vector of FileTransfer objects for all the files which have
to be transferred to the destination pool in case of Interpool transfers.
|
private Set<PegasusFile> |
getOutputFiles(List<GraphNode> nodes)
It gets the output files for all the nodes which are specified in
the nodes passed.
|
String |
getStagingSite(Job job)
Returns the staging site to be used for a job.
|
private String |
getURLOnSharedScratch(SiteCatalogEntry entry,
Job job,
FileServerType.OPERATION operation,
String lfn)
Returns a URL on the shared scratch of the staging site
|
private ReplicaCatalog |
initializeWorkflowCacheFile(ADag dag)
Initializes a Replica Catalog Instance that is used to store
the GET URL's for all files on the staging site ( inputs staged and outputs
created ).
|
private void |
logRemoval(Job job,
PegasusFile file,
String prefix,
boolean removed)
Helped method for logging removal message.
|
private String |
poolNotFoundMsg(String poolName,
String universe)
This generates a error message for pool not found in the pool
config file.
|
private void |
processParents(Job job,
List<GraphNode> parents)
It processes a nodes parents and determines if nodes are to be added
or not.
|
protected String |
replaceProtocolFromURL(String pfn)
Replaces the gsiftp URL scheme from the url, and replaces it with the
symlink url scheme and returns in a new object.
|
protected ReplicaCatalogEntry |
replaceSourceProtocolFromURL(ReplicaCatalogEntry rce)
Replaces the SRM URL scheme from the url, and replaces it with the
file url scheme and returns in a new object if replacement happens.
|
boolean |
runTransferOnLocalSite(String site,
String destinationURL,
int type)
Returns whether to run a transfer job on local site or not.
|
private void |
trackInCaches(Job job)
Tracks the files created by a job in the both the planner and workflow cache
The planner cache stores the put URL's and the GET URL is stored in the
workflow cache.
|
private void |
trackInPlannerCache(String lfn,
String pfn,
String site)
Inserts an entry into the planner cache as a put URL.
|
private void |
trackInPlannerCache(String lfn,
String pfn,
String site,
FileServerType.OPERATION type)
Inserts an entry into the planner cache as a put URL.
|
private void |
trackInWorkflowCache(String lfn,
String pfn,
String site)
Inserts an entry into the workflow cache that is to be written out to the
submit directory.
|
addVector, appendArrayList, complainForHeadNodeURLPrefix, complainForHeadNodeURLPrefix, loadProperties, printVector, stringInList, stringInPegVector, stringInVector, vectorToString
public static final int DELETED_JOBS_LEVEL
public static final String WORKFLOW_CACHE_FILE_IMPLEMENTOR
public static final String WORKFLOW_CACHE_REPLICA_CATALOG_KEY
public static final String SRM_PROPERTIES_PREFIX
public static final String SRM_SERVICE_URL_PROPERTIES_SUFFIX
public static final String SRM_MOUNT_POINT_PROPERTIES_SUFFIX
public static final String REFINER_NAME
private Map<String,NameValue> mSRMServiceURLToMountPointMap
private ADag mDag
private ReplicaCatalogBridge mRCBridge
private ReplicaSelector mReplicaSelector
private Refiner mTXRefiner
private List mDeletedJobs
private PlannerCache mPlannerCache
private ReplicaCatalog mWorkflowCache
private org.griphyn.vdl.euryale.FileFactory mFactory
private OutputMapper mOutputMapper
protected String mWorkDir
protected boolean mUseSymLinks
private boolean mWorkerNodeExecution
private PlannerOptions mPlannerOptions
private boolean mBypassStagingForInputs
private final boolean mSetupForCondorIO
private final String mOutputSite
public TransferEngine(ADag reducedDag, PegasusBag bag, List<Job> deletedJobs, List<Job> deletedLeafJobs)
reducedDag
- the reduced workflow.bag
- bag of initialization objectsdeletedJobs
- list of all jobs deleted by reduction algorithm.deletedLeafJobs
- list of deleted leaf jobs by reduction algorithm.public boolean runTransferOnLocalSite(String site, String destinationURL, int type)
site
- the site handle associated with the destination URL.destPutURL
- the destination URLtype
- the type of transfer job for which the URL is being constructed.public void addTransferNodes(ReplicaCatalogBridge rcb, PlannerCache plannerCache)
rcb
- the bridge to the ReplicaCatalog.plannerCache
- an instance of the replica catalog that will
store the locations of the files on the remote
sites.public String getStagingSite(Job job)
job
- the job for which to determine the staging siteprivate Vector getDeletedFileTX(String pool, Job job)
pool
- this the output pool which the user specifies at runtime.job
- The Job object corresponding to the leaf job which was
deleted by the Reduction algorithmFileTransfer
objectsprivate void processParents(Job job, List<GraphNode> parents)
job
- the Job
object containing all the
details of the job.parents
- list GraphNode
ojbects corresponding to the parent jobs
of the job.private Vector getFileTX(String destPool, Job job, boolean localTransfer)
destSiteHandle
- The pool to which the files are to be transferred to.job
- The Job
object of the job whose output files
are needed at the destination pool.localTransfer
- boolean indicating that associated transfer job will run
on local site.FileTransfer
objectsprivate FileTransfer constructFileTX(PegasusFile pf, Job job, String destSiteHandle, String path, boolean localTransfer)
pf
- the PegasusFile for which the transfer has to be done.stagingSiteHandle
- the staging site at which file is placed after execution.destSiteHandle
- the output pool where the job should be transferredjob
- the name of the associated job.path
- the path that a user specifies in the profile for key
remote_initialdir that results in the workdir being
changed for a job on a execution pool.localTransfer
- boolean indicating that associated transfer job will run
on local site.private String constructRegistrationURL(String site, String lfn)
site
- the site handlelfn
- the LFN for which the URL needs to be constructedprivate String poolNotFoundMsg(String poolName, String universe)
poolName
- the name of pool that is not found.universe
- the condor universeprivate Collection<FileTransfer>[] getInterpoolFileTX(Job job, List<GraphNode> parents)
job
- the job with reference to which interpool file transfers
need to be determined.parents
- list GraphNode
ojbects corresponding to the
parent jobs of the job.FileTransfer
objectsprivate void getFilesFromRC(DAGJob job, Collection searchFiles)
job
- the DAGJobsearchFiles
- file that need to be looked in the Replica Catalog.private void getFilesFromRC(DAXJob job, Collection searchFiles)
job
- the DAXJobsearchFiles
- file that need to be looked in the Replica Catalog.private void getFilesFromRC(Job job, Collection searchFiles)
job
- the Job
object for whose ipfile have
to search the Replica Mechanism for.searchFiles
- Vector containing the PegasusFile objects corresponding
to the files that need to have their mapping looked
up from the Replica Mechanism.protected ReplicaCatalogEntry replaceSourceProtocolFromURL(ReplicaCatalogEntry rce)
rce
- the ReplicaCatalogEntry
object whose url need to be
replaced.protected String replaceProtocolFromURL(String pfn)
pfn
- the pfn that needs to be replacedprivate Map<String,NameValue> constructSiteToSRMServerMap(PegasusProperties props)
pegasus.transfer.srm.ligo-cit.service.url srm://osg-se.ligo.caltech.edu:10443/srm/v2/server?SFN=/mnt/hadoop pegasus.transfer.srm.ligo-cit.service.mountpoint /mnt/hadoopthen, a Map is create the associates ligo-cit with NameValue object containing the service url and mount point ( ).
props
- the PegasusProperties
objectprivate Set<PegasusFile> getOutputFiles(List<GraphNode> nodes)
nodes
- Listprivate void trackInCaches(Job job)
job
- the job whose input files need to be tracked.private void trackInPlannerCache(String lfn, String pfn, String site)
lfn
- the logical name of the file.pfn
- the pfnsite
- the site handleprivate void trackInPlannerCache(String lfn, String pfn, String site, FileServerType.OPERATION type)
lfn
- the logical name of the file.pfn
- the pfnsite
- the site handletype
- the type of urlprivate void trackInWorkflowCache(String lfn, String pfn, String site)
lfn
- the logical name of the file.pfn
- the pfnsite
- the site handleprivate String getURLOnSharedScratch(SiteCatalogEntry entry, Job job, FileServerType.OPERATION operation, String lfn)
entry
- the SiteCatalogEntry for the associated stagingsitejob
- the joboperation
- the FileServer operation for which we need the URLlfn
- the LFN can be null to get the path to the directoryprivate void complainForScratchFileServer(Job job, FileServerType.OPERATION operation, String site)
job
- the joboperation
- the operationsite
- the siteprivate void complainForScratchFileServer(String jobname, FileServerType.OPERATION operation, String site)
jobname
- the name of the joboperation
- the file server operationsite
- the siteprivate ReplicaCatalog initializeWorkflowCacheFile(ADag dag)
dag
- the workflow being plannedprivate String getCacheFileName(ADag adag)
adag
- the ADag object containing the workflow that is being
concretized.private boolean bypassStagingForInputFile(ReplicaCatalogEntry entry, PegasusFile file, String computeSite)
entry
- a ReplicaCatalogEntry matching the selected replica location.file
- the corresponding Pegasus File objectcomputeSite
- the compute site where the associated job will run.isExecutable
- whether the file transferred is an executable file or notprivate void logRemoval(Job job, PegasusFile file, String prefix, boolean removed)
job
- the jobfile
- the file to be removedprefix
- prefix for log messageremoved
- whether removal was successful or not.