How-to create a new ROMI taskLink
We hereafter details how you can make your own algorithms available to the ROMI reconstruction and analysis pipeline by creating a task and registering it as an available module.
For the sake of clarity and to illustrate how-to create a ROMI task from scratch, in this guide we will assume you want to add something quite different from what is already there.
Important
ROMI task usually have a semantic meaning and for example, the task AnglesAndInternodes
may take several types of object in input (mesh, point-cloud & skeletons) but always output the JSON file with the obtained measures.
So, to decide if you have to create a new task or add your algorithm to an existing task, following this rule should help: at a given step of the pipeline, if the output change, this is a NEW task!
Add your algorithm to plant3dvision
Link
You first have to add a file (or append to an existing one), e.g. named algo.py
, under the plant-3d-vision/plant3dvision
directory.
Let's assume the previously added file has a main function called my_algo
like this:
def my_algo(data, *params, **kwargs):
# Do something to data with given parameters to return transformed data `out_data`
return out_data, error
It has:
- data input(s) (e.g. images, point clouds, meshes, ...) that will often be the output of a previous task in the pipeline
- parameter(s), specific to the algorithm you want to add
- output(s), the transformed dataset that will often be the input of a following task in the pipeline
Create a ROMI taskLink
Dependency to luigi
Link
We use luigi
to manage the pipeline execution and handle requirements & tasks dependencies.
To create a task you will thus have to create a new Python class MyTask
inheriting from the RomiTask
class and creates a few methods and at least a run
method used by luigi
.
Dependency to plantdb
Link
To manage the files, inputs and outputs, we use the plantdb
package implementing a local file system database written in pure python.
It provides classes and methods that simplifies and normalize the creation and use of the tasks outputs and inputs.
New RomiTask
templateLink
You will create a new python file my_task.py
in the tasks
submodule: plant-3d-vision/plant3dvision/tasks/my_task.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Briefly describe your module here.
"""
import luigi
from plantdb import RomiTask
from plantdb import io
from plant3dvision.log import logger # Use this as logging method
from plant3dvision.tasks.proc3d import SegmentedPointCloud
# Now import your main method:
from plant3dvision.algo import my_algo
def MyTask(RomiTask):
"""My algorithm is the best!
Attributes
----------
upstream_task : luigi.TaskParameter
Upstream task that will provides the data to your algorithm, here `SegmentedPointCloud`.
param1 : luigi.FloatParameter
An example float parameter parsed from the TOML config file.
Set to `2.0` by default.
param2 : luigi.IntParameter
An example float parameter parsed from the TOML config file.
Set to `5` by default.
log : luigi.BoolParameter
An example boolean parameter.
"""
# No need to write an `__init__` section, declare your class attributes as task parameters:
upstream_task = luigi.TaskParameter(default=SegmentedPointCloud)
param1 = luigi.FloatParameter(default=2.0)
param2 = luigi.IntParameter(default=5)
log = luigi.BoolParameter(default=False)
def requires(self):
"""Used by luigi to check you task dependencies."""
# By default a RomiTask requires a luigi.TaskParameter called `upstream_task`.
# So no need to declare this method if you don't requires more than one upstream task!
# Else you can override with something like (should be of type `luigi.TaskParameter`!):
#return [self.upstream_task1(), self.upstream_task1()]
pass
def run(self):
"""Called by luigi, it will run your algorithm.
Usually consist of 3 steps:
1. Get the input(s) data from the previous task, eg. images or point clouds
2. Run you algorithm on input data
3. Save the result(s) of your method, eg. as a JSON file
Notes
-----
The parameters for your algorithms have been declared at class instantiation!
"""
# -1- Get the input(s) data from the previous task
# To access the single file output of the upstream task use:
uptask_input_file = self.input_file()
# Read it with the proper reader, here a point-cloud reader (SegmentedPointCloud):
in_data = io.read_point_cloud(uptask_input_file)
# -2- Run you algorithm on input data
out_data, error = my_algo(in_data)
# Use example for boolean parameter & logger with 'info' level
if self.log:
logger.info("My task ran perfectly!")
# -3- Write a single output (eg. a JSON file)...
# Create the output `File` object
task_output_file = self.output_file()
# Write a JSON file with your method results
io.write_json(task_output_file, out_data)
# Add metadata to your file, eg. some error measure you don't want to include in the main output file:
task_output_file.set_metadata("my_error", error)
The corresponding TOML configuration file (my_pipeline.toml
) controlling your task behaviour would look like this:
[MyTask]
upstream_task='SegmentedPointCloud'
param1=6.0
param2=3
log=true
Note
You may need to add methods to read and write data, this should be done in the plantdb
library using the plantdb/plantdb/io.py
file!
Multiple I/O for a taskLink
Your method (or the upstream task) may produce a set of object you want to save as separates files. In such case, use Filset
objects.
For example to output multiple JSON files:
list_of_jsonifyable = [...]
task_output_fs = self.output().get()
for i, json_data in enumerate(list_of_jsonifyable):
f = task_output_fs.create_file(f"my_json_{i}") # no extension!
io.write_json(f, json_data)
# Add some metadata to this `File` object
f.set_metadata("foo", f"bar{i}")
Test your taskLink
You should now be able to test your newly created task MyTask
with romi_run_task
:
romi_run_task MyTask /path/to/dataset --config /path/to/my_pipeline.toml --module plant3dvision.tasks.my_algo
Using the --module
option you can test your task without registering it.
Register your taskLink
Once you are satisfied, you can add it to romitask/modules.py
by referring to the task class name & its python module location:
MODULES = {
# ...
"MyTask": "plant3dvision.tasks.my_algo",
# ...
}
Use your newly created taskLink
Finally, you should now be able to use your newly created task MyTask
with romi_run_task
:
romi_run_task MyTask /path/to/dataset --config /path/to/my_pipeline.toml
Warning
Use of absolute path is highly recommended as you may experience some difficulties from luigi
otherwise!