https://prefect.io logo
#prefect-community
Title
# prefect-community
t

Thomas Mignon

03/31/2022, 11:38 AM
Hi guys, I'm currently facing an issue with Parameter, how can i get the value of a Parameter ?
a

Anna Geller

03/31/2022, 11:41 AM
could you move the code block below to this thread here?
or rather all blocks? 😄 this helps us keep the main channel cleaner and makes it easier to follow for others reading this
t

Thomas Mignon

03/31/2022, 11:42 AM
Here is my error :
Copy code
File "/home1/datawork/semexp/workspace/prefect_graph/semaphore_template_task.py", line 26, in run
    os.path.join(workspace, f'{command}-test')
    File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/posixpath.py", line 76, in join
    a = os.fspath(a)
  TypeError: expected str, bytes or os.PathLike object, not Parameter
Because
command
was a parameter passed downstream And here are my 2 files, the main_flow.py who is creating the graph of a flow of flow, and a flow named purify_flow.py who use a template task to create tasks inside his flow Here is main_flow.py :
Copy code
from dask_jobqueue import PBSCluster
from prefect import Flow, task, Parameter
from prefect.tasks.prefect import create_flow_run
from prefect.executors import DaskExecutor

from prefect_graph.semaphore_template_task import SemaphoreTemplate
from prefect_graph.purify_flow import flow as purifyFlow

@task
def partition(workspace, semaphore_audience_version, day, month, year):
    SemaphoreTemplate(
        "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/generate_partition",
        year + month + day + " -J input.purify.directory_basepath=/home/datawork-semaphore-exp/cache/data/common/daily/purify/",
        workspace,
        semaphore_audience_version
    ).run()


@task
def event_session(workspace, semaphore_audience_version, day, month, year):
    SemaphoreTemplate(
        "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/generate_event_session",
        year + month + day + " -J input.partition.directory_basepath=/home/datawork-semaphore-exp/cache/data/common/daily/partition/",
        workspace,
        semaphore_audience_version
    ).run()

with Flow("main_flow",
              executor=DaskExecutor(
                  cluster_class=PBSCluster,
                  cluster_kwargs={
                      "cores": 1,
                      "memory": '120GB',
                      "n_workers": 1,
                      "queue": 'omp',
                      "resource_spec": "select=1:ncpus=56:mem=500gb",
                      "walltime": '01:00:00',

                  }
              )
          ) as flow:

    workspace = Parameter("workspace")
    semaphore_audience_version = Parameter("semaphore_audience_version")
    semaphore_metadata_version = Parameter("semaphore_metadata_version")
    day = Parameter("day", default="20200101")
    month = Parameter("month", default="20200101")
    year = Parameter("year", default="20200101")

    purifyFlow.run(name='purify_flow', parameters=dict(year=year, month=month, day=day, workspace=workspace, semaphore_audience_version=semaphore_audience_version))
and here is purify_flow.py:
Copy code
from dask_jobqueue import PBSCluster
from prefect.executors import DaskExecutor
from prefect import Flow, Parameter, Task
from prefect.utilities.tasks import defaults_from_attrs

from semaphore_template_task import SemaphoreTemplate


class PurifyTemplate(Task):
    def __init__(self, command_parameter, workspace, semaphore_audience_version, **kwargs):
        self.command_parameter = command_parameter
        self.workspace = workspace
        self.semaphore_audience_version = semaphore_audience_version

        super().__init__(**kwargs)

    @defaults_from_attrs("command_parameter", "workspace", "semaphore_audience_version")
    def run(self, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
        SemaphoreTemplate(
            command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
            command_parameter=command_parameter,
            workspace=workspace,
            semaphore_audience_version=semaphore_audience_version
        ).run()


with Flow(
    "purify_flow",
        executor=DaskExecutor(
            cluster_class=PBSCluster,
            cluster_kwargs={
                "cores": 1,
                "memory": '120GB',
                "n_workers": 1,
                "queue": 'omp',
                "resource_spec": "select=1:ncpus=56:mem=500gb",
                "walltime": '05:00:00',

            }
        )
) as flow:
    workspace = Parameter("workspace")
    semaphore_audience_version = Parameter("semaphore_audience_version")
    day = Parameter("date")
    month = Parameter("month")
    year = Parameter("year")

    PurifyTemplate(
        command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/" + year + " eftp1 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
        ).run()

    PurifyTemplate(
        command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/" + year + " eftp2 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
    ).run()

    PurifyTemplate(
        command_parameter="/home/datawork-semaphore-exp/spool/input/ftp/ftp/" + year + " ftp -f access.log." + year + month + day + ".gz",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
    ).run()

    PurifyTemplate(
        command_parameter="/home/datawork-semaphore-exp/spool/input/ftp/ftp2/" + year + " ftp2 -f access.log." + year + month + day + ".gz",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
    ).run()

    PurifyTemplate(
        command_parameter="/home/datawork-semaphore-exp/spool/input/http/vpublicnew/" + year + " http_full_vpublicnew -f " + year + "-" + month + "-" + day + ".log",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
    ).run()

    PurifyTemplate(
        command_parameter="/home/datawork-semaphore-exp/spool/input/http/vpublicnew/" + year + " http_full_vpublicnew -f " + year + "-" + month + "-" + day + ".log",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
    ).run()
👍 1
a

Anna Geller

03/31/2022, 11:44 AM
you can pass Parameter values to downstream tasks as data dependencies, just as any data dependencies in a Prefect flow alternatively, you can grab the Parameter values in your custom tasks or state handlers directly from the context:
Copy code
prefect.context["parameters"].get("parameter_name")
I think you misunderstood my intent - I wanted you to move everything after "Hi guys, I'm currently facing an issue with Parameter, how can i get the value of a Parameter?" to the thread - could you do that?
t

Thomas Mignon

03/31/2022, 11:45 AM
Sorry yes i will do this
🙏 1
a

Anna Geller

03/31/2022, 11:46 AM
t

Thomas Mignon

03/31/2022, 11:49 AM
i will try the context option, because passing downstream a parameter of type Parameter to a task that need a parameter as a string isn't working, if there is any chance to do something like
myparameters.value
or something like that ?
👍 1
a

Anna Geller

03/31/2022, 11:49 AM
also, you shouldn't do this:
Copy code
purifyFlow.run(name='purify_flow', parameters=dict(year=year, month=month, day=day, workspace=workspace, semaphore_audience_version=semaphore_audience_version))
if you want to trigger some other flow from this parent flow, you can use the
create_flow_run
task instead. This blog post explains how you could do that and if you need some examples, check this Discourse tag
t

Thomas Mignon

03/31/2022, 11:51 AM
The problem with
create_flow_run
is that there is a request done to api.prefect.io and where i laaunch my script i get an HTTP 443 error, that's why i've done this 'hack'
a

Anna Geller

03/31/2022, 11:51 AM
The error you see is not because of Parameters but because you are calling a flow from another flow in a way that is not supported. The link above show how you can do that but here is a simple example:
Copy code
with Flow(PARENT_FLOW_NAME) as parent_flow:
    normal_non_subflow_task = hello_world_parent()
    first_child_flow_run_id = create_flow_run(
        flow_name=CHILD_FLOW_NAME,
        project_name=PREFECT_PROJECT_NAME,
        parameters=dict(user_input="First child flow run"),
        task_args=dict(name="First subflow"),
        upstream_tasks=[normal_non_subflow_task],
    )
from https://github.com/anna-geller/orchestrator-pattern
I can totally understand that, but this is not how you should trigger a child flow run. The proper way of doing this is by registering this child flow and triggering a child flow run from the parent flow via
create_flow_run
t

Thomas Mignon

03/31/2022, 11:54 AM
Ok thanks for all the info, i will rewrite my code with
create_flow_run
but i think i will face the HTTP error i was getting before 🙈
I will paste the error when i'm done 🙂
a

Anna Geller

03/31/2022, 11:55 AM
to explain it a bit more - when you do:
Copy code
purifyFlow.run(name='purify_flow', parameters=dict(year=year, month=month, day=day, workspace=workspace, semaphore_audience_version=semaphore_audience_version))
this gets evaluated at flow registration (i.e. at build time when the computational graph is constructed) and at build time
workspace
is of type Parameter task. Only once you actually run this flow,
workspace
object becomes an actual parameter value - does this explanation make sense?
t

Thomas Mignon

03/31/2022, 12:01 PM
I understood why i was getting a Parameter type, this make sense 🙂
👍 1
And for tasks in a flow, there is an equivalent like
create_task_run
instead of doing
Copy code
PurifyTemplate(
        year=year,
        month=month,
        day=day,
        command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/" + year + " eftp1 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
        ).run()
?
a

Anna Geller

03/31/2022, 12:17 PM
You shouldn't have to call tasks like that. The right approach with the imperative API would be to initialize it before the Flow constructor and then call it within the flow like so:
Copy code
purify_template = PurifyTemplate(
        year=year,
        month=month,
        day=day,
        command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/" + year + " eftp1 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
        )

with Flow("yourflowname") as flow:
    purify_template()
t

Thomas Mignon

03/31/2022, 12:26 PM
Ok i see , but in this way, if i initalize PurifyTemplate outside of flow i will not have access to year, month, dat, worskapce, semaphore_audience_version because the flow is receveing theses parameters
Maybe by read them from the prefect context ?
a

Anna Geller

03/31/2022, 12:28 PM
you will if you put those into the run method
it depends on how you built your task. This is one of the reasons why the imperative API is so confusing and why it's much easier to just use the functional API. With the imperative, you need to differentiate between the args passed to the init method and those passed to the run method. Here is how you could call this task:
Copy code
with Flow("yourflowname") as flow:
    PurifyTemplate(init_args)(run_method_args)
Does it make things clearer?
t

Thomas Mignon

03/31/2022, 12:37 PM
Not so clearer ahah because my PurifyTemplate class has _ init _ and run method with arguments (this is the Imperative API ?), and in the code above
Copy code
with Flow("yourflowname") as flow:
    PurifyTemplate(init_args)(run_method_args)
PurifyTemplate is initialized inside the flow but you was telling me to initalize it outside, i'm a bit lost 🙂
a

Anna Geller

03/31/2022, 12:39 PM
this:
Copy code
with Flow("yourflowname") as flow:
    PurifyTemplate(init_args)(run_method_args)
is the same as:
Copy code
purify_template = PurifyTemplate(init_args)

with Flow("yourflowname") as flow:
    purify_template(run_method_args)
and run_method_args can receive data dependencies from upstream tasks
the latter is cleaner IMO
t

Thomas Mignon

03/31/2022, 12:42 PM
Ok got it 🙂 But if i use the latter on, i have to pass from this
Copy code
class PurifyTemplate(Task):
    def __init__(self, command_parameter, workspace, semaphore_audience_version, **kwargs):
        self.command_parameter = command_parameter
        self.workspace = workspace
        self.semaphore_audience_version = semaphore_audience_version

        super().__init__(**kwargs)

    @defaults_from_attrs("command_parameter", "workspace", "semaphore_audience_version")
    def run(self, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
        SemaphoreTemplate(
            command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
            command_parameter=command_parameter,
            workspace=workspace,
            semaphore_audience_version=semaphore_audience_version
        ).run()
to this
Copy code
class PurifyTemplate(Task):
    def __init__(self,**kwargs):

        super().__init__(**kwargs)
    
    def run(self, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
        SemaphoreTemplate(
            command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
            command_parameter=command_parameter,
            workspace=workspace,
            semaphore_audience_version=semaphore_audience_version
        ).run()
?
i.e clean the init method because it won't have access to the parameters that the flow gonna get
a

Anna Geller

03/31/2022, 12:44 PM
don't call .run() in a run method 😄 so meta! This seems fine though:
Copy code
class PurifyTemplate(Task):
    def __init__(self,**kwargs):

        super().__init__(**kwargs)
    
    def run(self, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
        SemaphoreTemplate(
            command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
            command_parameter=command_parameter,
            workspace=workspace,
            semaphore_audience_version=semaphore_audience_version
        )
k

Kevin Kho

03/31/2022, 12:46 PM
The init won’t have access to Parameters but you should be able to get them in the run method. I don’t think you need to remove the
default_from_attrs
I think Thomas needs to call the run actually? Otherwise the run will return an instantiated task?
a

Anna Geller

03/31/2022, 12:47 PM
the best solution would be to convert this task to the functional API honestly 😄
Copy code
@task
def purify_template(command_parameter=None, workspace=None, semaphore_audience_version=None):
        your_template_class = SemaphoreTemplate(
            command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
            command_parameter=command_parameter,
            workspace=workspace,
            semaphore_audience_version=semaphore_audience_version
        )
        your_template_class.some_method_doing_sth()
t

Thomas Mignon

03/31/2022, 12:49 PM
If this can help to resolve my problem, here is also the code of my SempahoreTemplate that i will have to transform too i think :
Copy code
class SemaphoreTemplate(Task):
    def __init__(self, command, command_parameter, workspace, semaphore_audience_version, **kwargs):
        self.command = command
        self.command_parameter = command_parameter
        self.workspace = workspace
        self.semaphore_audience_version = semaphore_audience_version

        super().__init__(**kwargs)

    @defaults_from_attrs("command", "command_parameter", "workspace", "semaphore_audience_version")
    def run(self, command=None, command_parameter=None, workspace=None, semaphore_audience_version=None, **kwargs):
        print('os.environ')
        print(os.environ)
        workspace_path: Path = Path(
            os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}')
        )
        template_runner.TemplateRunner(
            command=command,
            version=semaphore_audience_version,
            command_parameter=command_parameter,
            workspace=workspace_path,
            skip_signal=True
        ).run(exit=False)
I will try your version Anna 🙂
👍 1
a

Anna Geller

03/31/2022, 12:50 PM
the functional API with
@task
is the preferred way of defining tasks and the only one supported in Prefect 2.0
ideally, don't create any subclasses of Task like
SemaphoreTemplate(Task):
- if you convert it to functions, this will make it much easier to reason about and debug.
t

Thomas Mignon

03/31/2022, 12:54 PM
Ok 🙂 task come from
from prefect.utilities.tasks
?
or from
prefect.core.task
Here is the error i got with
create_run_flow
Copy code
[2022-03-31 13:39:19+0000] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2022-03-31 13:40:21+0000] ERROR - prefect.TaskRunner | Task 'create_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connection.py", line 174, in _new_conn
    conn = connection.create_connection(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/util/connection.py", line 95, in create_connection
    raise err
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/util/connection.py", line 85, in create_connection
    sock.connect(sa)
OSError: [Errno 101] Network is unreachable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 703, in urlopen
    httplib_response = self._make_request(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 386, in _make_request
    self._validate_conn(conn)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 1040, in _validate_conn
    conn.connect()
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connection.py", line 358, in connect
    conn = self._new_conn()
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connection.py", line 186, in _new_conn
    raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x2aaaaf59de50>: Failed to establish a new connection: [Errno 101] Network is unreachable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/adapters.py", line 440, in send
    resp = conn.urlopen(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 813, in urlopen
    return self.urlopen(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 813, in urlopen
    return self.urlopen(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 813, in urlopen
    return self.urlopen(
  [Previous line repeated 3 more times]
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/connectionpool.py", line 785, in urlopen
    retries = retries.increment(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/urllib3/util/retry.py", line 592, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x2aaaaf59de50>: Failed to establish a new connection: [Errno 101] Network is unreachable'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 131, in create_flow_run
    flow = FlowView.from_flow_name(flow_name, project_name=project_name)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/backend/flow.py", line 205, in from_flow_name
    flows = cls._query_for_flows(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/backend/flow.py", line 289, in _query_for_flows
    result = client.graphql(flow_query)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client/client.py", line 452, in graphql
    result = <http://self.post|self.post>(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client/client.py", line 407, in post
    response = self._request(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client/client.py", line 641, in _request
    response = self._send_request(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client/client.py", line 506, in _send_request
    response = <http://session.post|session.post>(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/sessions.py", line 577, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/sessions.py", line 529, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/sessions.py", line 645, in send
    r = adapter.send(request, **kwargs)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/requests/adapters.py", line 519, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x2aaaaf59de50>: Failed to establish a new connection: [Errno 101] Network is unreachable'))
[2022-03-31 13:40:21+0000] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Failed'
distributed.dask_worker - INFO - Exiting on signal 15
My machine havent access to HTTP, she dialog via TCP with the dask worker
a

Anna Geller

03/31/2022, 1:42 PM
from prefect import task
t

Thomas Mignon

03/31/2022, 1:45 PM
So if there is a way to use
create_flow_run
without any HTTP request i would take a solution 🙂
a

Anna Geller

03/31/2022, 2:02 PM
This is possible in Prefect 2.0 using subflows, but not in 1.0
t

Thomas Mignon

03/31/2022, 2:03 PM
Ok i will try to pass on prefect 2.0 🙂
Hi, Is there any example of how to use subflows in prefect orion ? 🙂 @Anna Geller
a

Anna Geller

04/01/2022, 11:28 AM
You can call flows within other flows:
Copy code
from prefect import flow, task
import time

@flow
def subflow_1():
    print("Subflow 1 started!")
    time.sleep(3)
    return "Hello from subflow!"

@flow
def subflow_2():
    print("Subflow 2 started!")
    time.sleep(3)
    return "Hello from the second subflow!"

@task
def normal_task():
    print("A normal task")

@flow
def main_flow():
    state_subflow_1 = subflow_1()
    state_subflow_2 = subflow_2()
    normal_task(wait_for=[state_subflow_1, state_subflow_2])

if __name__ == "__main__":
    main_flow_state = main_flow()
t

Thomas Mignon

04/01/2022, 11:31 AM
Oh ok simple as that 🙂 And for the equivalent of my executor, where to put the config i was setting as this before ?
Copy code
with Flow("main_flow",
              executor=DaskExecutor(
                  cluster_class=PBSCluster,
                  cluster_kwargs={
                      "cores": 1,
                      "memory": '120GB',
                      "n_workers": 1,
                      "queue": 'omp',
                      "resource_spec": "select=1:ncpus=56:mem=500gb",
                      "walltime": '01:00:00',

                  }
              )
          ) as flow:
Found it, it was taskRunner 🙂
👍 1
But now i'm trying to launch my prfect flow with CLI
before i wa doing this
Copy code
prefect run \
  --path /home1/datawork/semexp/workspace/prefect-graph/prefect_graph/main_flow.py \
  --param workspace="/home1/datawork/semexp/workspace/dagit/running/" \
  --param semaphore_audience_version="3.4.9" \
  --param day=01 \
  --param month=01  \
  --param year=2022 \
  --watch
but
prefect run
doesnt exist anymore
k

Kevin Kho

04/01/2022, 2:04 PM
I think it’s
prefect deployment run
. Let me confirm
Yes there is a run here but I don’t think was versatile as Prefect 1.0
prefect run
yet
t

Thomas Mignon

04/04/2022, 6:21 AM
@Kevin Kho I can't pass --path to and --param to
prefect deployment run
as i was doing previously in v1 with
prefect run
Have you a trick to launch a flow with CLI ? 🙂
I think i have to use
prefect deployment create
before
prefect deployment run
Here is my file
deployment.yaml
i created in order to use ``prefect deployment create ./deployment.yaml``
Copy code
- name: "purify-flow"
  flow_location: "./main_flow.py"

- name: "main-flow"
  flow_location: "./main_flow.py"
And i got this error :
Copy code
│   114 │   def load_flow(self):                                                                   │                                                                                                             
    │   115 │   │   if self.flow_location and not self.flow:                                           │                                                                                                             
    │ ❱ 116 │   │   │   self.flow = load_flow_from_script(self.flow_location, self.flow_name)          │                                                                                                             
    │   117 │   │   │   if not self.flow_name:                                                         │                                                                                                             
    │   118 │   │   │   │   self.flow_name = self.flow.name                                            │                                                                                                             
    │   119                                                                                            │                                                                                                             
    │                                                                                                  │                                                                                                             
    │ /home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/deploy │                                                                                                             
    │ ments.py:199 in load_flow_from_script                                                            │                                                                                                             
    │                                                                                                  │                                                                                                             
    │   196 │   │   )                                                                                  │                                                                                                             
    │   197 │                                                                                          │                                                                                                             
    │   198 │   elif not flow_name and len(flows) > 1:                                                 │                                                                                                             
    │ ❱ 199 │   │   raise UnspecifiedFlowError(                                                        │                                                                                                             
    │   200 │   │   │   f"Found {len(flows)} flows at {script_path!r}: {listrepr(flows.keys())}. "     │                                                                                                             
    │   201 │   │   │   "Specify a flow name to select a flow to deploy.",                             │                                                                                                             
    │   202 │   │   )                                                                                  │                                                                                                             
    ╰──────────────────────────────────────────────────────────────────────────────────────────────────╯                                                                                                             
    UnspecifiedFlowError: Found 2 flows at '/home1/datawork/semexp/workspace/prefect-graph/prefect_graph/main_flow.py': 'purify-flow' 'main-flow'. Specify a flow name to select a flow to deploy.
Because i have to specify a flow on
prefect deployment create ./deployment.yaml
but i don't see any options name-flow or something like that who would look like this :
prefect deployment create ./deployment.yaml --name purify-flow
Maybe i have to spec a
_flow_name:_
in
deployment.yaml
too when i look in the code, this seems to be better but my question remain : how to pass params to myflow via `prefect deployment create`` or after
prefact deployment run
?
a

Anna Geller

04/04/2022, 9:29 AM
if you want to trigger a flow run from the CLI and see this flow run in the UI, you don't have to run a flow run from deployment - you can do:
Copy code
python yourflow.py
At the moment, you can’t override parameter values from a run from deployment via the CLI, but you can override default params using the Python client using
create_flow_run_from_deployment
or REST API to do it from the UI is on the roadmap https://github.com/PrefectHQ/prefect/issues/5617 - the CLI will likely follow
t

Thomas Mignon

04/04/2022, 9:37 AM
The command should be
python /home1/datawork/semexp/workspace/prefect-graph/prefect_graph/main_flow.py   --param workspace="/home1/datawork/semexp/workspace/dagit/running/"   --param semaphore_audience_version="3.4.9"   --param day=01   --param month=01    --param year=2022   --watch
?
a

Anna Geller

04/04/2022, 9:38 AM
python command is not a prefect command 🙂 you would need some argparser. Let me give you an example
t

Thomas Mignon

04/04/2022, 9:39 AM
for
create_flow_run_from_deployment
i will not need it if i can launch the flow via the python command or i misunderstood ?
Ok thank you 🙂
a

Anna Geller

04/04/2022, 9:43 AM
Copy code
from prefect import flow, task
import sys

@task
def do_something_with_path(path: str):
    filename = f"{path}/some_data.txt"
    data = "Hello world from Prefect 2.0!"
    with open(filename, "w") as file:
        file.write(data)


@flow
def parametrized_flow_with_path(path: str):
    do_something_with_path(path)


if __name__ == "__main__":
    path_val = sys.argv[1]
    parametrized_flow_with_path(path_val)
Copy code
python parametrized_path_flow.py "/Users/anna/repos/orion-flows/00_prefect_2_0"
t

Thomas Mignon

04/04/2022, 11:35 AM
Thank you for the example, i will try that 😊
👍 1
My flow is running now, thank you a lot Anna 🙂
I'm facing new erros but it's better, i'm in dask already
I've this strange error :
Copy code
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect_graph/purify_flow.py", line 9, in purify_template
    semaphore_template(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/tasks.py", line 356, in __call__
    return enter_task_run_engine(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 505, in enter_task_run_engine
    raise RuntimeError("Tasks cannot be called outside of a flow.")
and my task is called inside the flow 😅
Copy code
@task
def purify_template(command_parameter=None, workspace=None, semaphore_audience_version=None):

    semaphore_template(
        command="/home/datawork-semaphore-exp/conda/conda_envs/semaphore-audience-4.4.9/lib/python3.7/bin/purify_raw_logs",
        command_parameter=command_parameter,
        workspace=workspace,
        semaphore_audience_version=semaphore_audience_version
    )


@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '120GB',
        "n_workers": 1,
        "queue": 'omp',
        "resource_spec": "select=1:ncpus=56:mem=500gb",
        "walltime": '01:00:00',

    }
))
def purify_flow(workspace: str, semaphore_audience_version: str, day: str, month: str, year: str):

    purify_template(
            command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/" + year + " eftp1 -f " + year + "-" + month + "-" + day + "-eftp1.gz",
            workspace=workspace,
            semaphore_audience_version=semaphore_audience_version
            )
Arg my task called in the flow is calling a task herself, i understood
a

Anna Geller

04/04/2022, 12:00 PM
is this an instantiated Task? looks like it. My recommendation from earlier was to just make it a function and decorate with
@task
t

Thomas Mignon

04/04/2022, 12:13 PM
Copy code
@task
def semaphore_template(command=None, command_parameter=None, workspace=None, semaphore_audience_version=None):
    workspace_path: Path = Path(
        os.path.join(workspace, f'{command}-{os.environ["PBS_JOBID"]}')
    )
    template_runner.TemplateRunner(
        command=command,
        version=semaphore_audience_version,
        command_parameter=command_parameter,
        workspace=workspace_path,
        skip_signal=True
    ).run(exit=False)
It is already a function decorated with
@task
Maybe i'm not supposed to call a task in a task in a flow ?
a

Anna Geller

04/04/2022, 12:31 PM
yup, that's true - you can call tasks in flows and you can call other flows in flows, but calling tasks in tasks is not supported - we've talked about it before
t

Thomas Mignon

04/04/2022, 12:35 PM
Ok i will create a flow with one task to prevent this 🙂
👍 1
13 Views