Thread
#prefect-community
    m

    Matt Alhonte

    10 months ago
    Having an issue with Parameters not making it to Flow runs - I think it's because I subclassed another task? I made one that subclassed
    StartFlowRun
    and one of the arguments to it comes from a parameter, and the Task Schematic doesn't even show it exists. ie,
    signal = Parameter(
            "signal", default="1a,1b,2a,2b,1c,1d,2d,1e,2e,1f,2f,1z,3a,3b"
        )
    
    nba1a_results = StartSkippableRun(
            signal=signal,
            nb_id="1a",
            flow_name="TestNBA1a",
            project_name=project_name,
            wait=True,
            parameters=task_parameters,
        )
    signal
    doesn't seem to register.
    Kevin Kho

    Kevin Kho

    10 months ago
    Hey @Matt Alhonte, could you show me the code for
    StartSkippableRun
    ?
    m

    Matt Alhonte

    10 months ago
    @Kevin Kho
    class StartSkippableRun(StartFlowRun):
        """
        Task used to kick off a flow run using Prefect Core's server or Prefect Cloud.  If multiple
        versions of the flow are found, this task will kick off the most recent unarchived version.
        Args:
            - flow_name (str, optional): the name of the flow to schedule; this value may also be
                provided at run time
            - project_name (str, optional): if running with Cloud as a backend, this is the project
                in which the flow is located; this value may also be provided at runtime. If
                running with Prefect Core's server as the backend, this should not be provided.
            - parameters (dict, optional): the parameters to pass to the flow run being scheduled;
                this value may also be provided at run time
            - new_flow_context (dict, optional): the optional run context for the new flow run
            - run_name (str, optional): name to be set for the flow run
            - wait (bool, optional): whether to wait the triggered flow run's state; if True, this
                task will wait until the flow run is complete, and then reflect the corresponding
                state as the state of this task.  Defaults to `False`.
            - scheduled_start_time (datetime, optional): the time to schedule the execution
                for; if not provided, defaults to now
            - **kwargs (dict, optional): additional keyword arguments to pass to the Task constructor
        """
    
        def __init__(
            self,
            signal: str = None,
            nb_id: str = None,
            flow_name: str = None,
            project_name: str = None,
            parameters: dict = None,
            wait: bool = False,
            new_flow_context: dict = None,
            run_name: str = None,
            scheduled_start_time: datetime.datetime = None,
            **kwargs: Any,
        ):
            self.signal = signal
            self.nb_id = nb_id
            self.flow_name = flow_name
            self.project_name = project_name
            self.parameters = parameters
            self.new_flow_context = new_flow_context
            self.run_name = run_name
            self.wait = wait
            self.scheduled_start_time = scheduled_start_time
            if flow_name:
                kwargs.setdefault("name", f"Flow {flow_name}")
            super(StartFlowRun, self).__init__(**kwargs)
    
        @defaults_from_attrs(
            "signal",
            "nb_id",
            "flow_name",
            "project_name",
            "parameters",
            "new_flow_context",
            "run_name",
            "scheduled_start_time",
        )
        def run(
            self,
            signal: str = None,
            nb_id: str = None,
            flow_name: str = None,
            project_name: str = None,
            parameters: dict = None,
            idempotency_key: str = None,
            new_flow_context: dict = None,
            run_name: str = None,
            scheduled_start_time: datetime.datetime = None,
            **kwargs: Any,
        ) -> str:
            """
            Run method for the task; responsible for scheduling the specified flow run.
            Args:
                - flow_name (str, optional): the name of the flow to schedule; if not provided,
                    this method will use the flow name provided at initialization
                - project_name (str, optional): the Cloud project in which the flow is located; if
                    not provided, this method will use the project provided at initialization. If
                    running with Prefect Core's server as the backend, this should not be provided.
                - parameters (dict, optional): the parameters to pass to the flow run being
                    scheduled; if not provided, this method will use the parameters provided at
                    initialization
                - idempotency_key (str, optional): an optional idempotency key for scheduling the
                    flow run; if provided, ensures that only one run is created if this task is retried
                    or rerun with the same inputs.  If not provided, the current flow run ID will be used.
                - new_flow_context (dict, optional): the optional run context for the new flow run
                - run_name (str, optional): name to be set for the flow run
                - scheduled_start_time (datetime, optional): the time to schedule the execution
                    for; if not provided, defaults to now
            Returns:
                - str: the ID of the newly-scheduled flow run
            Raises:
                - ValueError: if flow was not provided, cannot be found, or if a project name was
                    not provided while using Cloud as a backend
            Example:
                ```python
                from prefect.tasks.prefect.flow_run import StartFlowRun
                kickoff_task = StartFlowRun(project_name="Hello, World!", flow_name="My Cloud Flow")
    """ logger = prefect.context.get("logger") logger.info(flow_name) logger.info(nb_id) #logger.info(signal) signal_to_use = signal.serialize() logger.info(signal_to_use) logger.info(parameters) if nb_id in signal: return super(StartSkippableRun, self).run( flow_name=self.flow_name, project_name=project_name, parameters=parameters, new_flow_context=new_flow_context, run_name=run_name, scheduled_start_time=scheduled_start_time, **kwargs, ) else: raise signals.SKIP(message=f"{nb_id} skipped")```
    Kevin Kho

    Kevin Kho

    10 months ago
    Did you call a
    StartSkippableRun
    run()? I think that’s just the init there?
    m

    Matt Alhonte

    10 months ago
    No, I define them and then call a bunch of
    set_dependencies
    cuz they've got to run in a particular order. example:
    nba1z_results.set_dependencies(upstream_tasks=[nba1a_results, nba1b_results],)
    @Kevin Kho Not sure if this helps, but the error I have in particular is
    Unexpected error: TypeError("argument of type 'Parameter' is not iterable")
    , I think because of the
    if nb_id in signal:
    line.
    Kevin Kho

    Kevin Kho

    10 months ago
    I am trying a minimal example and my passing of a parameter is not working:
    from prefect import Flow, Task, Parameter
    import prefect
    from prefect.utilities.tasks import defaults_from_attrs
    from typing import Any
    
    class MyTask(Task):
    
        def __init__(self,signal: str = None,**kwargs: Any):
            self.signal = signal
            super().__init__(**kwargs)
    
        @defaults_from_attrs("signal")
        def run(self,signal=None) -> str:
            <http://prefect.context.logger.info|prefect.context.logger.info>(signal)
            return signal
    
    with Flow("test") as flow:
        x = Parameter("x", default="test")()
        MyTask(x)()
    
    flow.run()
    I think you might need to use the
    .run()
    instead of the
    init()
    for the Parameter
    m

    Matt Alhonte

    10 months ago
    I think it's an issue with the Storage? Cuz the issue is more when I register the flow from a Jupyter notebook and then try to run it from the Prefect UI
    If I do
    .run()
    it tries to run it immediately when I run the cell.
    Kevin Kho

    Kevin Kho

    10 months ago
    When I say run, I don’t mean actually call
    .run()
    . I mean use the second parenthesis like
    MyTask(x)()
    . Is that what you mean also?
    m

    Matt Alhonte

    10 months ago
    oh no, I tried
    .run()
    Kevin Kho

    Kevin Kho

    10 months ago
    I mean
    MyTask(init_stuff_here)(run_stuff_here)
    so put the parameters in the second
    ()
    when they will have values.
    m

    Matt Alhonte

    10 months ago
    trying this
    nba1a_results = StartSkippableRun(
            nb_id="1a",
            flow_name="TestNBA1a",
            project_name=project_name,
            wait=True,
            parameters=task_parameters,
        )(signal=signal)
    Kevin Kho

    Kevin Kho

    10 months ago
    Exactly!
    m

    Matt Alhonte

    10 months ago
    Success! Took a bit of futzing with exactly which args go where, but this worked
    nba1a_results = StartSkippableRun(flow_name="TestNBA1a", wait=True,)(
            nb_id="1a",
            project_name=project_name,
            parameters=task_parameters,
            signal=signal,
        )
    Thanks!!!
    Kevin Kho

    Kevin Kho

    10 months ago
    Nice!
    Andrea Haessly

    Andrea Haessly

    7 months ago
    I know this is an older thread, but I'm new to Prefect and this is confusing me. The run method is defined with the defaults_from_attrs annotation so shouldn't anything specified in the init also be passed to the run?
    Kevin Kho

    Kevin Kho

    7 months ago
    It is if there is nothing yep! Is there a specific problem you have?
    I think he inherited from another task that and then passed the kwargs to the
    super()
    call. Those kwargs are not
    attrs
    so they are not carried over
    Andrea Haessly

    Andrea Haessly

    7 months ago
    but in the initial example, it is being passed as a named parameter "signal" so shouldn't that be carried over?
    I don't have a specific problem yet, I'll post a new thread when I do. :thank-you:
    Kevin Kho

    Kevin Kho

    7 months ago
    signal is. Those other inputs like
    wait
    and
    project_name
    might not be
    Andrea Haessly

    Andrea Haessly

    7 months ago
    The original poster was having a problem with the signal argument. And I am having the same problem now with the bucket_name and blob arguments to GCSBlobExists with this code
    from prefect import task, Flow
    from prefect.tasks.gcp import GCSCopy, GCSBlobExists
    from google.cloud import storage
    
    source_bucket = "some-bucket"
    source_blob = "some-file"
    dest_bucket = "some-bucket"
    dest_blob = "file-copy"
    
    storage_client = storage.Client()
    
    copy_file_task = GCSCopy(
                source_bucket=source_bucket,
                source_blob=source_blob,
                dest_bucket=dest_bucket,
                dest_blob=dest_blob,
                name="copyTask")
    
    file_exists_task = GCSBlobExists(
        bucket_name=dest_bucket,
        blob=dest_blob,
    )
    
    @task (name="delete_file")
    def delete_cloud_file(bucket_name: str, blob_name: str
    ) -> None:
        bucket = storage_client.get_bucket(bucket_name)
        blob = bucket.get_blob(blob_name)
        print("deleting blob " + blob_name)
        blob.delete()
    
    
    
    def main():
        with Flow("gcpMoveTest") as flow:
            delete_task = delete_cloud_file(source_bucket, source_blob)
            delete_task.set_upstream(file_exists_task)
            file_exists_task.set_upstream(copy_file_task)
    
        flow.run()
    
    
    if __name__ == "__main__":
        main()
    Kevin Kho

    Kevin Kho

    7 months ago
    These tasks seem to be written correctly. What is the error you are running into?
    Why did you write your Flow in this order? You can do:
    with Flow("gcpMoveTest") as flow:
            copy = copy_file_task()
            exists = file_exists_task(upstream_tasks=[copy])
            delete = delete_cloud_file(upstream_tasks=[exists])
    
        flow.run()
    Andrea Haessly

    Andrea Haessly

    7 months ago
    this is the error I get
    [2022-02-03 12:13:57-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'gcpMoveTest'
    [2022-02-03 12:13:57-0500] INFO - prefect.TaskRunner | Task 'copyTask': Starting task run...
    [2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'copyTask': Finished task run for task with final state: 'Success'
    [2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Starting task run...
    [2022-02-03 12:13:58-0500] ERROR - prefect.TaskRunner | Task 'GCSBlobExists': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/utilities/tasks.py", line 456, in method
        return run_method(self, *args, **kwargs)
      File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/tasks/gcp/storage.py", line 525, in run
        raise ValueError("Missing bucket_name or blob")
    ValueError: Missing bucket_name or blob
    [2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Finished task run for task with final state: 'Failed'
    [2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'delete_file': Starting task run...
    [2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'delete_file': Finished task run for task with final state: 'TriggerFailed'
    [2022-02-03 12:13:58-0500] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    thanks for the suggestion to simplify the code. I still get a similar error
    Traceback (most recent call last):
      File "/Users/ahaessly/code/poc_prefect/workflows/GcpMoveFunctional.py", line 48, in <module>
        main()
      File "/Users/ahaessly/code/poc_prefect/workflows/GcpMoveFunctional.py", line 42, in main
        delete = delete_cloud_file(upstream_tasks=[exists])
      File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/core/task.py", line 633, in __call__
        new.bind(
      File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/core/task.py", line 674, in bind
        callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
      File "/usr/local/Cellar/python@3.10/3.10.0_2/Frameworks/Python.framework/Versions/3.10/lib/python3.10/inspect.py", line 3177, in bind
        return self._bind(args, kwargs)
      File "/usr/local/Cellar/python@3.10/3.10.0_2/Frameworks/Python.framework/Versions/3.10/lib/python3.10/inspect.py", line 3092, in _bind
        raise TypeError(msg) from None
    TypeError: missing a required argument: 'bucket_name'
    Kevin Kho

    Kevin Kho

    7 months ago
    I’ll take a look at that task
    I think that new error is your custom task and you may not have passed the bucket name?
    delete_task = delete_cloud_file(source_bucket, source_blob, upstream_tasks=[...])
    Andrea Haessly

    Andrea Haessly

    7 months ago
    thanks. I think I understand now. I think it's related to the fact that within a flow context it makes a copy of a task to use. I assumed it would copy the attributes that I originally initialized the task with but I'm guessing that it doesn't.
    @John Kitonyo fyi