Hi guys, I'm currently facing an issue with Parame...
# ask-community
t
Hi guys, I'm currently facing an issue with Parameter, how can i get the value of a Parameter ?
a
could you move the code block below to this thread here?
or rather all blocks? πŸ˜„ this helps us keep the main channel cleaner and makes it easier to follow for others reading this
t
Here is my error :
Copy code
File "/home1/datawork/semexp/workspace/prefect_graph/semaphore_template_task.py", line 26, in run
    os.path.join(workspace, f'{command}-test')
    File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/posixpath.py", line 76, in join
    a = os.fspath(a)
  TypeError: expected str, bytes or os.PathLike object, not Parameter
Because
command
was a parameter passed downstream And here are my 2 files, the main_flow.py who is creating the graph of a flow of flow, and a flow named purify_flow.py who use a template task to create tasks inside his flow Here is main_flow.py :
Copy code
from dask_jobqueue import PBSCluster
from prefect import Flow, task, Parameter
from prefect.tasks.prefect import create_flow_run
from prefect.executors import DaskExecutor

from prefect_graph.semaphore_template_task import SemaphoreTemplate
from prefect_graph.purify_flow import flow as purifyFlow

@task
def partition(workspace, semaphore_audience_version, day, month, year):
    SemaphoreTemplate(
        "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/generate_partition",
        year + month + day + " -J input.purify.directory_basepath=/home/datawork-semaphore-exp/cache/data/common/daily/purify/",
        workspace,
        semaphore_audience_version
    ).run()


@task
def event_session(workspace, semaphore_audience_version, day, month, year):
    SemaphoreTemplate(
        "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/generate_event_session",
        year + month + day + " -J input.partition.directory_basepath=/home/datawork-semaphore-exp/cache/data/common/daily/partition/",
        workspace,
        semaphore_audience_version
    ).run()

with Flow("main_flow",
              executor=DaskExecutor(
                  cluster_class=PBSCluster,
                  cluster_kwargs={
                      "cores": 1,
                      "memory": '120GB',
                      "n_workers": 1,
                      "queue": 'omp',
                      "resource_spec": "select=1:ncpus=56:mem=500gb",
                      "walltime": '01:00:00',

                  }
              )
          ) as flow:

    workspace = Parameter("workspace")
    semaphore_audience_version = Parameter("semaphore_audience_version")
    semaphore_metadata_version = Parameter("semaphore_metadata_version")
    day = Parameter("day", default="20200101")
    month = Parameter("month", default="20200101")
    year = Parameter("year", default="20200101")

    purifyFlow.run(name='purify_flow', parameters=dict(year=year, month=month, day=day, workspace=workspace, semaphore_audience_version=semaphore_audience_version))
and here is purify_flow.py:
Copy code
from dask_jobqueue import PBSCluster
from prefect.executors import DaskExecutor
from prefect import Flow, Parameter, Task
from prefect.utilities.tasks import defaults_from_attrs

from semaphore_template_task import SemaphoreTemplate


class PurifyTemplate(Task):
    def __init__(self, command_parameter, workspace, semaphore_audience_version, **kwargs):
        self.command_parameter = command_parameter
        self.workspace = workspace
        self.semaphore_audience_version = semaphore_audience_version

        super().__init__(**kwargs)

    @defaults_from_attrs("command_parameter", "workspace", "semaphore_audience_version")
    def run(self, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
        SemaphoreTemplate(
            command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
            command_parameter=command_parameter,
            workspace=workspace,
            semaphore_audience_version=semaphore_audience_version
        ).run()


with Flow(
    "purify_flow",
        executor=DaskExecutor(
            cluster_class=PBSCluster,
            cluster_kwargs={
                "cores": 1,
                "memory": '120GB',
                "n_workers": 1,
                "queue": 'omp',
                "resource_spec": "select=1:ncpus=56:mem=500gb",
                "walltime": '05:00:00',

            }
        )
) as flow:
    workspace = Parameter("workspace")
    semaphore_audience_version = Parameter("semaphore_audience_version")
    day = Parameter("date")
    month = Parameter("month")
    year = Parameter("year")

    PurifyTemplate(
        command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/" + year + " eftp1 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
        ).run()

    PurifyTemplate(
        command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/" + year + " eftp2 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
    ).run()

    PurifyTemplate(
        command_parameter="/home/datawork-semaphore-exp/spool/input/ftp/ftp/" + year + " ftp -f access.log." + year + month + day + ".gz",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
    ).run()

    PurifyTemplate(
        command_parameter="/home/datawork-semaphore-exp/spool/input/ftp/ftp2/" + year + " ftp2 -f access.log." + year + month + day + ".gz",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
    ).run()

    PurifyTemplate(
        command_parameter="/home/datawork-semaphore-exp/spool/input/http/vpublicnew/" + year + " http_full_vpublicnew -f " + year + "-" + month + "-" + day + ".log",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
    ).run()

    PurifyTemplate(
        command_parameter="/home/datawork-semaphore-exp/spool/input/http/vpublicnew/" + year + " http_full_vpublicnew -f " + year + "-" + month + "-" + day + ".log",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
    ).run()
πŸ‘ 1
a
you can pass Parameter values to downstream tasks as data dependencies, just as any data dependencies in a Prefect flow alternatively, you can grab the Parameter values in your custom tasks or state handlers directly from the context:
Copy code
prefect.context["parameters"].get("parameter_name")
I think you misunderstood my intent - I wanted you to move everything after "Hi guys, I'm currently facing an issue with Parameter, how can i get the value of a Parameter?" to the thread - could you do that?
t
Sorry yes i will do this
πŸ™ 1
a
t
i will try the context option, because passing downstream a parameter of type Parameter to a task that need a parameter as a string isn't working, if there is any chance to do something like
myparameters.value
or something like that ?
πŸ‘ 1
a
also, you shouldn't do this:
Copy code
purifyFlow.run(name='purify_flow', parameters=dict(year=year, month=month, day=day, workspace=workspace, semaphore_audience_version=semaphore_audience_version))
if you want to trigger some other flow from this parent flow, you can use the
create_flow_run
task instead. This blog post explains how you could do that and if you need some examples, check this Discourse tag
t
The problem with
create_flow_run
is that there is a request done to api.prefect.io and where i laaunch my script i get an HTTP 443 error, that's why i've done this 'hack'
a
The error you see is not because of Parameters but because you are calling a flow from another flow in a way that is not supported. The link above show how you can do that but here is a simple example:
Copy code
with Flow(PARENT_FLOW_NAME) as parent_flow:
    normal_non_subflow_task = hello_world_parent()
    first_child_flow_run_id = create_flow_run(
        flow_name=CHILD_FLOW_NAME,
        project_name=PREFECT_PROJECT_NAME,
        parameters=dict(user_input="First child flow run"),
        task_args=dict(name="First subflow"),
        upstream_tasks=[normal_non_subflow_task],
    )
from https://github.com/anna-geller/orchestrator-pattern
I can totally understand that, but this is not how you should trigger a child flow run. The proper way of doing this is by registering this child flow and triggering a child flow run from the parent flow via
create_flow_run
t
Ok thanks for all the info, i will rewrite my code with
create_flow_run
but i think i will face the HTTP error i was getting before πŸ™ˆ
I will paste the error when i'm done πŸ™‚
a
to explain it a bit more - when you do:
Copy code
purifyFlow.run(name='purify_flow', parameters=dict(year=year, month=month, day=day, workspace=workspace, semaphore_audience_version=semaphore_audience_version))
this gets evaluated at flow registration (i.e. at build time when the computational graph is constructed) and at build time
workspace
is of type Parameter task. Only once you actually run this flow,
workspace
object becomes an actual parameter value - does this explanation make sense?
t
I understood why i was getting a Parameter type, this make sense πŸ™‚
πŸ‘ 1
And for tasks in a flow, there is an equivalent like
create_task_run
instead of doing
Copy code
PurifyTemplate(
        year=year,
        month=month,
        day=day,
        command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/" + year + " eftp1 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
        ).run()
?
a
You shouldn't have to call tasks like that. The right approach with the imperative API would be to initialize it before the Flow constructor and then call it within the flow like so:
Copy code
purify_template = PurifyTemplate(
        year=year,
        month=month,
        day=day,
        command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/" + year + " eftp1 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
        )

with Flow("yourflowname") as flow:
    purify_template()
t
Ok i see , but in this way, if i initalize PurifyTemplate outside of flow i will not have access to year, month, dat, worskapce, semaphore_audience_version because the flow is receveing theses parameters
Maybe by read them from the prefect context ?
a
you will if you put those into the run method
it depends on how you built your task. This is one of the reasons why the imperative API is so confusing and why it's much easier to just use the functional API. With the imperative, you need to differentiate between the args passed to the init method and those passed to the run method. Here is how you could call this task:
Copy code
with Flow("yourflowname") as flow:
    PurifyTemplate(init_args)(run_method_args)
Does it make things clearer?
t
Not so clearer ahah because my PurifyTemplate class has _ init _ and run method with arguments (this is the Imperative API ?), and in the code above
Copy code
with Flow("yourflowname") as flow:
    PurifyTemplate(init_args)(run_method_args)
PurifyTemplate is initialized inside the flow but you was telling me to initalize it outside, i'm a bit lost πŸ™‚
a
this:
Copy code
with Flow("yourflowname") as flow:
    PurifyTemplate(init_args)(run_method_args)
is the same as:
Copy code
purify_template = PurifyTemplate(init_args)

with Flow("yourflowname") as flow:
    purify_template(run_method_args)
and run_method_args can receive data dependencies from upstream tasks
the latter is cleaner IMO
t
Ok got it πŸ™‚ But if i use the latter on, i have to pass from this
Copy code
class PurifyTemplate(Task):
    def __init__(self, command_parameter, workspace, semaphore_audience_version, **kwargs):
        self.command_parameter = command_parameter
        self.workspace = workspace
        self.semaphore_audience_version = semaphore_audience_version

        super().__init__(**kwargs)

    @defaults_from_attrs("command_parameter", "workspace", "semaphore_audience_version")
    def run(self, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
        SemaphoreTemplate(
            command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
            command_parameter=command_parameter,
            workspace=workspace,
            semaphore_audience_version=semaphore_audience_version
        ).run()
to this
Copy code
class PurifyTemplate(Task):
    def __init__(self,**kwargs):

        super().__init__(**kwargs)
    
    def run(self, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
        SemaphoreTemplate(
            command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
            command_parameter=command_parameter,
            workspace=workspace,
            semaphore_audience_version=semaphore_audience_version
        ).run()
?
i.e clean the init method because it won't have access to the parameters that the flow gonna get
a
don't call .run() in a run method πŸ˜„ so meta! This seems fine though:
Copy code
class PurifyTemplate(Task):
    def __init__(self,**kwargs):

        super().__init__(**kwargs)
    
    def run(self, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
        SemaphoreTemplate(
            command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
            command_parameter=command_parameter,
            workspace=workspace,
            semaphore_audience_version=semaphore_audience_version
        )
k
The init won’t have access to Parameters but you should be able to get them in the run method. I don’t think you need to remove the
default_from_attrs
I think Thomas needs to call the run actually? Otherwise the run will return an instantiated task?
a
the best solution would be to convert this task to the functional API honestly πŸ˜„
Copy code
@task
def purify_template(command_parameter=None, workspace=None, semaphore_audience_version=None):
        your_template_class = SemaphoreTemplate(
            command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
            command_parameter=command_parameter,
            workspace=workspace,
            semaphore_audience_version=semaphore_audience_version
        )
        your_template_class.some_method_doing_sth()
t
If this can help to resolve my problem, here is also the code of my SempahoreTemplate that i will have to transform too i think :
Copy code
class SemaphoreTemplate(Task):
    def __init__(self, command, command_parameter, workspace, semaphore_audience_version, **kwargs):
        self.command = command
        self.command_parameter = command_parameter
        self.workspace = workspace
        self.semaphore_audience_version = semaphore_audience_version

        super().__init__(**kwargs)

    @defaults_from_attrs("command", "command_parameter", "workspace", "semaphore_audience_version")
    def run(self, command=None, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
        print('os.environ')
        print(os.environ)
        workspace_path: Path = Path(
            os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}')
        )
        template_runner.TemplateRunner(
            command=command,
            version=semaphore_audience_version,
            command_parameter=command_parameter,
            workspace=workspace_path,
            skip_signal=True
        ).run(exit=False)
I will try your version Anna πŸ™‚
πŸ‘ 1
a
the functional API with
@task
is the preferred way of defining tasks and the only one supported in Prefect 2.0
ideally, don't create any subclasses of Task like
SemaphoreTemplate(Task):
- if you convert it to functions, this will make it much easier to reason about and debug.
t
Ok πŸ™‚ task come from
from prefect.utilities.tasks
?
or from
prefect.core.task
Here is the error i got with
create_run_flow
Copy code
[2022-03-31 13:39:19+0000] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2022-03-31 13:40:21+0000] ERROR - prefect.TaskRunner | Task 'create_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connection.py", line 174, in _new_conn
    conn = connection.create_connection(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/util/connection.py", line 95, in create_connection
    raise err
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/util/connection.py", line 85, in create_connection
    sock.connect(sa)
OSError: [Errno 101] Network is unreachable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 703, in urlopen
    httplib_response = self._make_request(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 386, in _make_request
    self._validate_conn(conn)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 1040, in _validate_conn
    conn.connect()
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connection.py", line 358, in connect
    conn = self._new_conn()
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connection.py", line 186, in _new_conn
    raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x2aaaaf59de50>: Failed to establish a new connection: [Errno 101] Network is unreachable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/adapters.py", line 440, in send
    resp = conn.urlopen(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 813, in urlopen
    return self.urlopen(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 813, in urlopen
    return self.urlopen(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 813, in urlopen
    return self.urlopen(
  [Previous line repeated 3 more times]
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 785, in urlopen
    retries = retries.increment(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/util/retry.py", line 592, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x2aaaaf59de50>: Failed to establish a new connection: [Errno 101] Network is unreachable'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 131, in create_flow_run
    flow = FlowView.from_flow_name(flow_name, project_name=project_name)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/backend/flow.py", line 205, in from_flow_name
    flows = cls._query_for_flows(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/backend/flow.py", line 289, in _query_for_flows
    result = client.graphql(flow_query)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client/client.py", line 452, in graphql
    result = <http://self.post|self.post>(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client/client.py", line 407, in post
    response = self._request(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client/client.py", line 641, in _request
    response = self._send_request(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client/client.py", line 506, in _send_request
    response = <http://session.post|session.post>(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/sessions.py", line 577, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/sessions.py", line 529, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/sessions.py", line 645, in send
    r = adapter.send(request, **kwargs)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/adapters.py", line 519, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x2aaaaf59de50>: Failed to establish a new connection: [Errno 101] Network is unreachable'))
[2022-03-31 13:40:21+0000] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Failed'
distributed.dask_worker - INFO - Exiting on signal 15
My machine havent access to HTTP, she dialog via TCP with the dask worker
a
from prefect import task
t
So if there is a way to use
create_flow_run
without any HTTP request i would take a solution πŸ™‚
a
This is possible in Prefect 2.0 using subflows, but not in 1.0
t
Ok i will try to pass on prefect 2.0 πŸ™‚
Hi, Is there any example of how to use subflows in prefect orion ? πŸ™‚ @Anna Geller
a
You can call flows within other flows:
Copy code
from prefect import flow, task
import time

@flow
def subflow_1():
    print("Subflow 1 started!")
    time.sleep(3)
    return "Hello from subflow!"

@flow
def subflow_2():
    print("Subflow 2 started!")
    time.sleep(3)
    return "Hello from the second subflow!"

@task
def normal_task():
    print("A normal task")

@flow
def main_flow():
    state_subflow_1 = subflow_1()
    state_subflow_2 = subflow_2()
    normal_task(wait_for=[state_subflow_1, state_subflow_2])

if __name__ == "__main__":
    main_flow_state = main_flow()
t
Oh ok simple as that πŸ™‚ And for the equivalent of my executor, where to put the config i was setting as this before ?
Copy code
with Flow("main_flow",
              executor=DaskExecutor(
                  cluster_class=PBSCluster,
                  cluster_kwargs={
                      "cores": 1,
                      "memory": '120GB',
                      "n_workers": 1,
                      "queue": 'omp',
                      "resource_spec": "select=1:ncpus=56:mem=500gb",
                      "walltime": '01:00:00',

                  }
              )
          ) as flow:
Found it, it was taskRunner πŸ™‚
πŸ‘ 1
But now i'm trying to launch my prfect flow with CLI
before i wa doing this
Copy code
prefect run \
  --path /home1/datawork/semexp/workspace/prefect-graph/prefect_graph/main_flow.py \
  --param workspace="/home1/datawork/semexp/workspace/dagit/running/" \
  --param semaphore_audience_version="3.4.9" \
  --param day=01 \
  --param month=01  \
  --param year=2022 \
  --watch
but
prefect run
doesnt exist anymore
k
I think it’s
prefect deployment run
. Let me confirm
Yes there is a run here but I don’t think was versatile as Prefect 1.0
prefect run
yet
t
@Kevin Kho I can't pass --path to and --param to
prefect deployment run
as i was doing previously in v1 with
prefect run
Have you a trick to launch a flow with CLI ? πŸ™‚
I think i have to use
prefect deployment create
before
prefect deployment run
Here is my file
deployment.yaml
i created in order to use ``prefect deployment create ./deployment.yaml``
Copy code
- name: "purify-flow"
  flow_location: "./main_flow.py"

- name: "main-flow"
  flow_location: "./main_flow.py"
And i got this error :
Copy code
β”‚   114 β”‚   def load_flow(self):                                                                   β”‚                                                                                                             
    β”‚   115 β”‚   β”‚   if self.flow_location and not self.flow:                                           β”‚                                                                                                             
    β”‚ ❱ 116 β”‚   β”‚   β”‚   self.flow = load_flow_from_script(self.flow_location, self.flow_name)          β”‚                                                                                                             
    β”‚   117 β”‚   β”‚   β”‚   if not self.flow_name:                                                         β”‚                                                                                                             
    β”‚   118 β”‚   β”‚   β”‚   β”‚   self.flow_name = self.flow.name                                            β”‚                                                                                                             
    β”‚   119                                                                                            β”‚                                                                                                             
    β”‚                                                                                                  β”‚                                                                                                             
    β”‚ /home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/deploy β”‚                                                                                                             
    β”‚ ments.py:199 in load_flow_from_script                                                            β”‚                                                                                                             
    β”‚                                                                                                  β”‚                                                                                                             
    β”‚   196 β”‚   β”‚   )                                                                                  β”‚                                                                                                             
    β”‚   197 β”‚                                                                                          β”‚                                                                                                             
    β”‚   198 β”‚   elif not flow_name and len(flows) > 1:                                                 β”‚                                                                                                             
    β”‚ ❱ 199 β”‚   β”‚   raise UnspecifiedFlowError(                                                        β”‚                                                                                                             
    β”‚   200 β”‚   β”‚   β”‚   f"Found {len(flows)} flows at {script_path!r}: {listrepr(flows.keys())}. "     β”‚                                                                                                             
    β”‚   201 β”‚   β”‚   β”‚   "Specify a flow name to select a flow to deploy.",                             β”‚                                                                                                             
    β”‚   202 β”‚   β”‚   )                                                                                  β”‚                                                                                                             
    ╰──────────────────────────────────────────────────────────────────────────────────────────────────╯                                                                                                             
    UnspecifiedFlowError: Found 2 flows at '/home1/datawork/semexp/workspace/prefect-graph/prefect_graph/main_flow.py': 'purify-flow' 'main-flow'. Specify a flow name to select a flow to deploy.
Because i have to specify a flow on
prefect deployment create ./deployment.yaml
but i don't see any options name-flow or something like that who would look like this :
prefect deployment create ./deployment.yaml --name purify-flow
Maybe i have to spec a
_flow_name:_
in
deployment.yaml
too when i look in the code, this seems to be better but my question remain : how to pass params to myflow via `prefect deployment create`` or after
prefact deployment run
?
a
if you want to trigger a flow run from the CLI and see this flow run in the UI, you don't have to run a flow run from deployment - you can do:
Copy code
python yourflow.py
At the moment, you can’t override parameter values from a run from deployment via the CLI, but you can override default params using the Python client using
create_flow_run_from_deployment
or REST API to do it from the UI is on the roadmap https://github.com/PrefectHQ/prefect/issues/5617 - the CLI will likely follow
t
The command should be
python /home1/datawork/semexp/workspace/prefect-graph/prefect_graph/main_flow.py   --param workspace="/home1/datawork/semexp/workspace/dagit/running/"   --param semaphore_audience_version="3.4.9"   --param day=01   --param month=01    --param year=2022   --watch
?
a
python command is not a prefect command πŸ™‚ you would need some argparser. Let me give you an example
t
for
create_flow_run_from_deployment
i will not need it if i can launch the flow via the python command or i misunderstood ?
Ok thank you πŸ™‚
a
Copy code
from prefect import flow, task
import sys

@task
def do_something_with_path(path: str):
    filename = f"{path}/some_data.txt"
    data = "Hello world from Prefect 2.0!"
    with open(filename, "w") as file:
        file.write(data)


@flow
def parametrized_flow_with_path(path: str):
    do_something_with_path(path)


if __name__ == "__main__":
    path_val = sys.argv[1]
    parametrized_flow_with_path(path_val)
Copy code
python parametrized_path_flow.py "/Users/anna/repos/orion-flows/00_prefect_2_0"
t
Thank you for the example, i will try that 😊
πŸ‘ 1
My flow is running now, thank you a lot Anna πŸ™‚
I'm facing new erros but it's better, i'm in dask already
I've this strange error :
Copy code
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect_graph/purify_flow.py", line 9, in purify_template
    semaphore_template(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/tasks.py", line 356, in __call__
    return enter_task_run_engine(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 505, in enter_task_run_engine
    raise RuntimeError("Tasks cannot be called outside of a flow.")
and my task is called inside the flow πŸ˜…
Copy code
@task
def purify_template(command_parameter=None, workspace=None, semaphore_audience_version=None):

    semaphore_template(
        command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
        command_parameter=command_parameter,
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
    )


@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '120GB',
        "n_workers": 1,
        "queue": 'omp',
        "resource_spec": "select=1:ncpus=56:mem=500gb",
        "walltime": '01:00:00',

    }
))
def purify_flow(workspace: str, semaphore_audience_version: str, day: str, month: str, year: str):

    purify_template(
            command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/" + year + " eftp1 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
            workspace=workspace,
            semaphore_audience_version=semaphore_audience_version
            )
Arg my task called in the flow is calling a task herself, i understood
a
is this an instantiated Task? looks like it. My recommendation from earlier was to just make it a function and decorate with
@task
t
Copy code
@task
def semaphore_template(command=None, command_parameter=None, workspace=None, semaphore_audience_version=None):
    workspace_path: Path = Path(
        os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}')
    )
    template_runner.TemplateRunner(
        command=command,
        version=semaphore_audience_version,
        command_parameter=command_parameter,
        workspace=workspace_path,
        skip_signal=True
    ).run(exit=False)
It is already a function decorated with
@task
Maybe i'm not supposed to call a task in a task in a flow ?
a
yup, that's true - you can call tasks in flows and you can call other flows in flows, but calling tasks in tasks is not supported - we've talked about it before
t
Ok i will create a flow with one task to prevent this πŸ™‚
πŸ‘ 1