Thomas Mignon
03/31/2022, 11:38 AMAnna Geller
Thomas Mignon
03/31/2022, 11:42 AMFile "/home1/datawork/semexp/workspace/prefect_graph/semaphore_template_task.py", line 26, in run
os.path.join(workspace, f'{command}-test')
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/posixpath.py", line 76, in join
a = os.fspath(a)
TypeError: expected str, bytes or os.PathLike object, not Parameter
Because command
was a parameter passed downstream
And here are my 2 files, the main_flow.py who is creating the graph of a flow of flow, and a flow named purify_flow.py who use a template task to create tasks inside his flow
Here is main_flow.py :
from dask_jobqueue import PBSCluster
from prefect import Flow, task, Parameter
from prefect.tasks.prefect import create_flow_run
from prefect.executors import DaskExecutor
from prefect_graph.semaphore_template_task import SemaphoreTemplate
from prefect_graph.purify_flow import flow as purifyFlow
@task
def partition(workspace, semaphore_audience_version, day, month, year):
SemaphoreTemplate(
"/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/generate_partition",
year + month + day + " -J input.purify.directory_basepath=/home/datawork-semaphore-exp/cache/data/common/daily/purify/",
workspace,
semaphore_audience_version
).run()
@task
def event_session(workspace, semaphore_audience_version, day, month, year):
SemaphoreTemplate(
"/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/generate_event_session",
year + month + day + " -J input.partition.directory_basepath=/home/datawork-semaphore-exp/cache/data/common/daily/partition/",
workspace,
semaphore_audience_version
).run()
with Flow("main_flow",
executor=DaskExecutor(
cluster_class=PBSCluster,
cluster_kwargs={
"cores": 1,
"memory": '120GB',
"n_workers": 1,
"queue": 'omp',
"resource_spec": "select=1:ncpus=56:mem=500gb",
"walltime": '01:00:00',
}
)
) as flow:
workspace = Parameter("workspace")
semaphore_audience_version = Parameter("semaphore_audience_version")
semaphore_metadata_version = Parameter("semaphore_metadata_version")
day = Parameter("day", default="20200101")
month = Parameter("month", default="20200101")
year = Parameter("year", default="20200101")
purifyFlow.run(name='purify_flow', parameters=dict(year=year, month=month, day=day, workspace=workspace, semaphore_audience_version=semaphore_audience_version))
and here is purify_flow.py:
from dask_jobqueue import PBSCluster
from prefect.executors import DaskExecutor
from prefect import Flow, Parameter, Task
from prefect.utilities.tasks import defaults_from_attrs
from semaphore_template_task import SemaphoreTemplate
class PurifyTemplate(Task):
def __init__(self, command_parameter, workspace, semaphore_audience_version, **kwargs):
self.command_parameter = command_parameter
self.workspace = workspace
self.semaphore_audience_version = semaphore_audience_version
super().__init__(**kwargs)
@defaults_from_attrs("command_parameter", "workspace", "semaphore_audience_version")
def run(self, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
SemaphoreTemplate(
command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
command_parameter=command_parameter,
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
).run()
with Flow(
"purify_flow",
executor=DaskExecutor(
cluster_class=PBSCluster,
cluster_kwargs={
"cores": 1,
"memory": '120GB',
"n_workers": 1,
"queue": 'omp',
"resource_spec": "select=1:ncpus=56:mem=500gb",
"walltime": '05:00:00',
}
)
) as flow:
workspace = Parameter("workspace")
semaphore_audience_version = Parameter("semaphore_audience_version")
day = Parameter("date")
month = Parameter("month")
year = Parameter("year")
PurifyTemplate(
command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/" + year + " eftp1 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
).run()
PurifyTemplate(
command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/" + year + " eftp2 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
).run()
PurifyTemplate(
command_parameter="/home/datawork-semaphore-exp/spool/input/ftp/ftp/" + year + " ftp -f access.log." + year + month + day + ".gz",
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
).run()
PurifyTemplate(
command_parameter="/home/datawork-semaphore-exp/spool/input/ftp/ftp2/" + year + " ftp2 -f access.log." + year + month + day + ".gz",
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
).run()
PurifyTemplate(
command_parameter="/home/datawork-semaphore-exp/spool/input/http/vpublicnew/" + year + " http_full_vpublicnew -f " + year + "-" + month + "-" + day + ".log",
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
).run()
PurifyTemplate(
command_parameter="/home/datawork-semaphore-exp/spool/input/http/vpublicnew/" + year + " http_full_vpublicnew -f " + year + "-" + month + "-" + day + ".log",
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
).run()
Anna Geller
prefect.context["parameters"].get("parameter_name")
Thomas Mignon
03/31/2022, 11:45 AMAnna Geller
Thomas Mignon
03/31/2022, 11:49 AMmyparameters.value
or something like that ?Anna Geller
purifyFlow.run(name='purify_flow', parameters=dict(year=year, month=month, day=day, workspace=workspace, semaphore_audience_version=semaphore_audience_version))
if you want to trigger some other flow from this parent flow, you can use the create_flow_run
task instead. This blog post explains how you could do that and if you need some examples, check this Discourse tagThomas Mignon
03/31/2022, 11:51 AMcreate_flow_run
is that there is a request done to api.prefect.io and where i laaunch my script i get an HTTP 443 error, that's why i've done this 'hack'Anna Geller
with Flow(PARENT_FLOW_NAME) as parent_flow:
normal_non_subflow_task = hello_world_parent()
first_child_flow_run_id = create_flow_run(
flow_name=CHILD_FLOW_NAME,
project_name=PREFECT_PROJECT_NAME,
parameters=dict(user_input="First child flow run"),
task_args=dict(name="First subflow"),
upstream_tasks=[normal_non_subflow_task],
)
from https://github.com/anna-geller/orchestrator-patterncreate_flow_run
Thomas Mignon
03/31/2022, 11:54 AMcreate_flow_run
but i think i will face the HTTP error i was getting before 🙈Anna Geller
purifyFlow.run(name='purify_flow', parameters=dict(year=year, month=month, day=day, workspace=workspace, semaphore_audience_version=semaphore_audience_version))
this gets evaluated at flow registration (i.e. at build time when the computational graph is constructed) and at build time workspace
is of type Parameter task. Only once you actually run this flow, workspace
object becomes an actual parameter value - does this explanation make sense?Thomas Mignon
03/31/2022, 12:01 PMcreate_task_run
instead of doing
PurifyTemplate(
year=year,
month=month,
day=day,
command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/" + year + " eftp1 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
).run()
?Anna Geller
purify_template = PurifyTemplate(
year=year,
month=month,
day=day,
command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/" + year + " eftp1 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
)
with Flow("yourflowname") as flow:
purify_template()
Thomas Mignon
03/31/2022, 12:26 PMAnna Geller
with Flow("yourflowname") as flow:
PurifyTemplate(init_args)(run_method_args)
Does it make things clearer?Thomas Mignon
03/31/2022, 12:37 PMwith Flow("yourflowname") as flow:
PurifyTemplate(init_args)(run_method_args)
PurifyTemplate is initialized inside the flow but you was telling me to initalize it outside, i'm a bit lost 🙂Anna Geller
with Flow("yourflowname") as flow:
PurifyTemplate(init_args)(run_method_args)
is the same as:
purify_template = PurifyTemplate(init_args)
with Flow("yourflowname") as flow:
purify_template(run_method_args)
and run_method_args can receive data dependencies from upstream tasksThomas Mignon
03/31/2022, 12:42 PMclass PurifyTemplate(Task):
def __init__(self, command_parameter, workspace, semaphore_audience_version, **kwargs):
self.command_parameter = command_parameter
self.workspace = workspace
self.semaphore_audience_version = semaphore_audience_version
super().__init__(**kwargs)
@defaults_from_attrs("command_parameter", "workspace", "semaphore_audience_version")
def run(self, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
SemaphoreTemplate(
command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
command_parameter=command_parameter,
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
).run()
to this
class PurifyTemplate(Task):
def __init__(self,**kwargs):
super().__init__(**kwargs)
def run(self, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
SemaphoreTemplate(
command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
command_parameter=command_parameter,
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
).run()
?Anna Geller
class PurifyTemplate(Task):
def __init__(self,**kwargs):
super().__init__(**kwargs)
def run(self, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
SemaphoreTemplate(
command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
command_parameter=command_parameter,
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
)
Kevin Kho
default_from_attrs
Anna Geller
@task
def purify_template(command_parameter=None, workspace=None, semaphore_audience_version=None):
your_template_class = SemaphoreTemplate(
command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
command_parameter=command_parameter,
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
)
your_template_class.some_method_doing_sth()
Thomas Mignon
03/31/2022, 12:49 PMclass SemaphoreTemplate(Task):
def __init__(self, command, command_parameter, workspace, semaphore_audience_version, **kwargs):
self.command = command
self.command_parameter = command_parameter
self.workspace = workspace
self.semaphore_audience_version = semaphore_audience_version
super().__init__(**kwargs)
@defaults_from_attrs("command", "command_parameter", "workspace", "semaphore_audience_version")
def run(self, command=None, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
print('os.environ')
print(os.environ)
workspace_path: Path = Path(
os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}')
)
template_runner.TemplateRunner(
command=command,
version=semaphore_audience_version,
command_parameter=command_parameter,
workspace=workspace_path,
skip_signal=True
).run(exit=False)
Anna Geller
@task
is the preferred way of defining tasks and the only one supported in Prefect 2.0SemaphoreTemplate(Task):
- if you convert it to functions, this will make it much easier to reason about and debug.Thomas Mignon
03/31/2022, 12:54 PMfrom prefect.utilities.tasks
?prefect.core.task
create_run_flow
[2022-03-31 13:39:19+0000] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2022-03-31 13:40:21+0000] ERROR - prefect.TaskRunner | Task 'create_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connection.py", line 174, in _new_conn
conn = connection.create_connection(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/util/connection.py", line 95, in create_connection
raise err
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/util/connection.py", line 85, in create_connection
sock.connect(sa)
OSError: [Errno 101] Network is unreachable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 703, in urlopen
httplib_response = self._make_request(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 386, in _make_request
self._validate_conn(conn)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 1040, in _validate_conn
conn.connect()
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connection.py", line 358, in connect
conn = self._new_conn()
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connection.py", line 186, in _new_conn
raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x2aaaaf59de50>: Failed to establish a new connection: [Errno 101] Network is unreachable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/adapters.py", line 440, in send
resp = conn.urlopen(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 813, in urlopen
return self.urlopen(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 813, in urlopen
return self.urlopen(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 813, in urlopen
return self.urlopen(
[Previous line repeated 3 more times]
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 785, in urlopen
retries = retries.increment(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/util/retry.py", line 592, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x2aaaaf59de50>: Failed to establish a new connection: [Errno 101] Network is unreachable'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 131, in create_flow_run
flow = FlowView.from_flow_name(flow_name, project_name=project_name)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/backend/flow.py", line 205, in from_flow_name
flows = cls._query_for_flows(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/backend/flow.py", line 289, in _query_for_flows
result = client.graphql(flow_query)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client/client.py", line 452, in graphql
result = <http://self.post|self.post>(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client/client.py", line 407, in post
response = self._request(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client/client.py", line 641, in _request
response = self._send_request(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client/client.py", line 506, in _send_request
response = <http://session.post|session.post>(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/sessions.py", line 577, in post
return self.request('POST', url, data=data, json=json, **kwargs)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/sessions.py", line 529, in request
resp = self.send(prep, **send_kwargs)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/sessions.py", line 645, in send
r = adapter.send(request, **kwargs)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/adapters.py", line 519, in send
raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x2aaaaf59de50>: Failed to establish a new connection: [Errno 101] Network is unreachable'))
[2022-03-31 13:40:21+0000] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Failed'
distributed.dask_worker - INFO - Exiting on signal 15
My machine havent access to HTTP, she dialog via TCP with the dask workerAnna Geller
Thomas Mignon
03/31/2022, 1:45 PMcreate_flow_run
without any HTTP request i would take a solution 🙂Anna Geller
Thomas Mignon
03/31/2022, 2:03 PMAnna Geller
from prefect import flow, task
import time
@flow
def subflow_1():
print("Subflow 1 started!")
time.sleep(3)
return "Hello from subflow!"
@flow
def subflow_2():
print("Subflow 2 started!")
time.sleep(3)
return "Hello from the second subflow!"
@task
def normal_task():
print("A normal task")
@flow
def main_flow():
state_subflow_1 = subflow_1()
state_subflow_2 = subflow_2()
normal_task(wait_for=[state_subflow_1, state_subflow_2])
if __name__ == "__main__":
main_flow_state = main_flow()
Thomas Mignon
04/01/2022, 11:31 AMwith Flow("main_flow",
executor=DaskExecutor(
cluster_class=PBSCluster,
cluster_kwargs={
"cores": 1,
"memory": '120GB',
"n_workers": 1,
"queue": 'omp',
"resource_spec": "select=1:ncpus=56:mem=500gb",
"walltime": '01:00:00',
}
)
) as flow:
prefect run \
--path /home1/datawork/semexp/workspace/prefect-graph/prefect_graph/main_flow.py \
--param workspace="/home1/datawork/semexp/workspace/dagit/running/" \
--param semaphore_audience_version="3.4.9" \
--param day=01 \
--param month=01 \
--param year=2022 \
--watch
but prefect run
doesnt exist anymoreKevin Kho
prefect deployment run
. Let me confirmprefect run
yetThomas Mignon
04/04/2022, 6:21 AMprefect deployment run
as i was doing previously in v1 with prefect run
Have you a trick to launch a flow with CLI ? 🙂prefect deployment create
before prefect deployment run
Here is my file deployment.yaml
i created in order to use ``prefect deployment create ./deployment.yaml``
- name: "purify-flow"
flow_location: "./main_flow.py"
- name: "main-flow"
flow_location: "./main_flow.py"
And i got this error :
│ 114 │ def load_flow(self): │
│ 115 │ │ if self.flow_location and not self.flow: │
│ ❱ 116 │ │ │ self.flow = load_flow_from_script(self.flow_location, self.flow_name) │
│ 117 │ │ │ if not self.flow_name: │
│ 118 │ │ │ │ self.flow_name = self.flow.name │
│ 119 │
│ │
│ /home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/deploy │
│ ments.py:199 in load_flow_from_script │
│ │
│ 196 │ │ ) │
│ 197 │ │
│ 198 │ elif not flow_name and len(flows) > 1: │
│ ❱ 199 │ │ raise UnspecifiedFlowError( │
│ 200 │ │ │ f"Found {len(flows)} flows at {script_path!r}: {listrepr(flows.keys())}. " │
│ 201 │ │ │ "Specify a flow name to select a flow to deploy.", │
│ 202 │ │ ) │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
UnspecifiedFlowError: Found 2 flows at '/home1/datawork/semexp/workspace/prefect-graph/prefect_graph/main_flow.py': 'purify-flow' 'main-flow'. Specify a flow name to select a flow to deploy.
Because i have to specify a flow on prefect deployment create ./deployment.yaml
but i don't see any options name-flow or something like that who would look like this : prefect deployment create ./deployment.yaml --name purify-flow
_flow_name:_
in deployment.yaml
too when i look in the code, this seems to be better
but my question remain : how to pass params to myflow via `prefect deployment create`` or after prefact deployment run
?Anna Geller
python yourflow.py
create_flow_run_from_deployment
or REST API
to do it from the UI is on the roadmap https://github.com/PrefectHQ/prefect/issues/5617 - the CLI will likely followThomas Mignon
04/04/2022, 9:37 AMpython /home1/datawork/semexp/workspace/prefect-graph/prefect_graph/main_flow.py --param workspace="/home1/datawork/semexp/workspace/dagit/running/" --param semaphore_audience_version="3.4.9" --param day=01 --param month=01 --param year=2022 --watch
?Anna Geller
Thomas Mignon
04/04/2022, 9:39 AMcreate_flow_run_from_deployment
i will not need it if i can launch the flow via the python command or i misunderstood ?Anna Geller
from prefect import flow, task
import sys
@task
def do_something_with_path(path: str):
filename = f"{path}/some_data.txt"
data = "Hello world from Prefect 2.0!"
with open(filename, "w") as file:
file.write(data)
@flow
def parametrized_flow_with_path(path: str):
do_something_with_path(path)
if __name__ == "__main__":
path_val = sys.argv[1]
parametrized_flow_with_path(path_val)
python parametrized_path_flow.py "/Users/anna/repos/orion-flows/00_prefect_2_0"
Thomas Mignon
04/04/2022, 11:35 AMFile "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect_graph/purify_flow.py", line 9, in purify_template
semaphore_template(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/tasks.py", line 356, in __call__
return enter_task_run_engine(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 505, in enter_task_run_engine
raise RuntimeError("Tasks cannot be called outside of a flow.")
and my task is called inside the flow 😅@task
def purify_template(command_parameter=None, workspace=None, semaphore_audience_version=None):
semaphore_template(
command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
command_parameter=command_parameter,
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
)
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '120GB',
"n_workers": 1,
"queue": 'omp',
"resource_spec": "select=1:ncpus=56:mem=500gb",
"walltime": '01:00:00',
}
))
def purify_flow(workspace: str, semaphore_audience_version: str, day: str, month: str, year: str):
purify_template(
command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/" + year + " eftp1 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
workspace=workspace,
semaphore_audience_version=semaphore_audience_version
)
Anna Geller
@task
Thomas Mignon
04/04/2022, 12:13 PM@task
def semaphore_template(command=None, command_parameter=None, workspace=None, semaphore_audience_version=None):
workspace_path: Path = Path(
os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}')
)
template_runner.TemplateRunner(
command=command,
version=semaphore_audience_version,
command_parameter=command_parameter,
workspace=workspace_path,
skip_signal=True
).run(exit=False)
It is already a function decorated with @task
Anna Geller
Thomas Mignon
04/04/2022, 12:35 PM