Integrating with the Pipeline Module

2024-03-28

The Pipeline module provides a basic framework for performing analysis and loading data into LabKey Server. It maintains a queue of jobs to be run, delegates them to a machine to perform the work (which may be a remote server, or more typically the same machine that the LabKey Server web server is running on), and ensures that jobs are restarted if the server is shut down while they are running. Other modules can register themselves as providing pipeline functionality, and the Pipeline module will let them indicate the types of analysis that can be done on files, as well as delegate to them to do the actual work.

Integration Points

org.labkey.api.pipeline.PipelineProvider
PipelineProviders let modules hook into the Pipeline module's user interface for browsing through the file system to find files on which to operate. This is always done within the context of a pipeline root for the current folder. The Pipeline module calls updateFileProperties() on all the PipelineProviders to determine what actions should be available. Each module provides its own URL which can collect additional information from the user before kicking off any work that needs to be done.

For example, the org.labkey.api.exp.ExperimentPipelineProvider registered by the Experiment module provides actions associated with .xar and .xar.xml files. It also provides a URL that the Pipeline module associates with the actions. If the users clicks to load a XAR, the user's browser will go to the Experiment module's URL.

PipelineProviders are registered by calling org.labkey.api.pipeline.PipelineServer.registerPipelineProvider().

org.labkey.api.pipeline.PipelineJob
PipelineJobs allow modules to do work relating to a particular piece of analysis. PipelineJobs sit in a queue until the Pipeline module determines that it is their turn to run. The Pipeline module then calls the PipelineJob's run() method. The PipelineJob base class provides logging and status functionality so that implementations can inform the user of their progress.

The Pipeline module attempts to serialize the PipelineJob object when it is submitted to the queue. If the server is restarted while there are jobs in the queue, the Pipeline module will look for all the jobs that were not in the COMPLETE or ERROR state, deserialize the PipelineJob objects from disk, and resubmit them to the queue. A PipelineJob implementation is responsible for restarting correctly if it is interrupted in the middle of processing. This might involve resuming analysis at the point it was interrupted, or deleting a partially loaded file from the database before starting to load it again.

For example, the org.labkey.api.exp.ExperimentPipelineJob provided by the Experiment module knows how to parse and load a XAR file. If the input file is not a valid XAR, it will put the job into an error state and write the reason to the log file.

PipelineJobs do not need to be explicitly registered with the Pipeline module. Other modules can add jobs to the queue using the org.labkey.api.pipeline.PipelineService.queueJob() method.

Pipeline Serialization using Jackson

Pipeline jobs are serialized to JSON using Jackson.

To ensure a pipeline job serializes properly, it needs either:

  • a default constructor (no params), if no member fields are final.
  • OR
  • a constructor annotated with @JsonCreator, with a parameter for each final field annotated with @JsonProperty("<field name>").
If there are member fields that are other classes the developer has created, those classes may need a constructor as specified in the two options above.

Developers should generally avoid a member field that is a map with a non-String key. But if needed they can annotate such a member as follows:

@JsonSerialize(keyUsing = ObjectKeySerialization.Serializer.class)
@JsonDeserialize(keyUsing = ObjectKeySerialization.Deserializer.class)
private Map<PropertyDescriptor, Object> _propMap;

Developers should avoid non-static inner classes and circular references.

Note: Prior to release 18.3, pipeline job serialization was performed using XStream.