Having an issue with Parameters not making it to F...
# ask-community
m
Having an issue with Parameters not making it to Flow runs - I think it's because I subclassed another task? I made one that subclassed
StartFlowRun
and one of the arguments to it comes from a parameter, and the Task Schematic doesn't even show it exists. ie,
Copy code
signal = Parameter(
        "signal", default="1a,1b,2a,2b,1c,1d,2d,1e,2e,1f,2f,1z,3a,3b"
    )

nba1a_results = StartSkippableRun(
        signal=signal,
        nb_id="1a",
        flow_name="TestNBA1a",
        project_name=project_name,
        wait=True,
        parameters=task_parameters,
    )
signal
doesn't seem to register.
k
Hey @Matt Alhonte, could you show me the code for
StartSkippableRun
?
m
@Kevin Kho
Copy code
class StartSkippableRun(StartFlowRun):
    """
    Task used to kick off a flow run using Prefect Core's server or Prefect Cloud.  If multiple
    versions of the flow are found, this task will kick off the most recent unarchived version.
    Args:
        - flow_name (str, optional): the name of the flow to schedule; this value may also be
            provided at run time
        - project_name (str, optional): if running with Cloud as a backend, this is the project
            in which the flow is located; this value may also be provided at runtime. If
            running with Prefect Core's server as the backend, this should not be provided.
        - parameters (dict, optional): the parameters to pass to the flow run being scheduled;
            this value may also be provided at run time
        - new_flow_context (dict, optional): the optional run context for the new flow run
        - run_name (str, optional): name to be set for the flow run
        - wait (bool, optional): whether to wait the triggered flow run's state; if True, this
            task will wait until the flow run is complete, and then reflect the corresponding
            state as the state of this task.  Defaults to `False`.
        - scheduled_start_time (datetime, optional): the time to schedule the execution
            for; if not provided, defaults to now
        - **kwargs (dict, optional): additional keyword arguments to pass to the Task constructor
    """

    def __init__(
        self,
        signal: str = None,
        nb_id: str = None,
        flow_name: str = None,
        project_name: str = None,
        parameters: dict = None,
        wait: bool = False,
        new_flow_context: dict = None,
        run_name: str = None,
        scheduled_start_time: datetime.datetime = None,
        **kwargs: Any,
    ):
        self.signal = signal
        self.nb_id = nb_id
        self.flow_name = flow_name
        self.project_name = project_name
        self.parameters = parameters
        self.new_flow_context = new_flow_context
        self.run_name = run_name
        self.wait = wait
        self.scheduled_start_time = scheduled_start_time
        if flow_name:
            kwargs.setdefault("name", f"Flow {flow_name}")
        super(StartFlowRun, self).__init__(**kwargs)

    @defaults_from_attrs(
        "signal",
        "nb_id",
        "flow_name",
        "project_name",
        "parameters",
        "new_flow_context",
        "run_name",
        "scheduled_start_time",
    )
    def run(
        self,
        signal: str = None,
        nb_id: str = None,
        flow_name: str = None,
        project_name: str = None,
        parameters: dict = None,
        idempotency_key: str = None,
        new_flow_context: dict = None,
        run_name: str = None,
        scheduled_start_time: datetime.datetime = None,
        **kwargs: Any,
    ) -> str:
        """
        Run method for the task; responsible for scheduling the specified flow run.
        Args:
            - flow_name (str, optional): the name of the flow to schedule; if not provided,
                this method will use the flow name provided at initialization
            - project_name (str, optional): the Cloud project in which the flow is located; if
                not provided, this method will use the project provided at initialization. If
                running with Prefect Core's server as the backend, this should not be provided.
            - parameters (dict, optional): the parameters to pass to the flow run being
                scheduled; if not provided, this method will use the parameters provided at
                initialization
            - idempotency_key (str, optional): an optional idempotency key for scheduling the
                flow run; if provided, ensures that only one run is created if this task is retried
                or rerun with the same inputs.  If not provided, the current flow run ID will be used.
            - new_flow_context (dict, optional): the optional run context for the new flow run
            - run_name (str, optional): name to be set for the flow run
            - scheduled_start_time (datetime, optional): the time to schedule the execution
                for; if not provided, defaults to now
        Returns:
            - str: the ID of the newly-scheduled flow run
        Raises:
            - ValueError: if flow was not provided, cannot be found, or if a project name was
                not provided while using Cloud as a backend
        Example:
            ```python
            from prefect.tasks.prefect.flow_run import StartFlowRun
            kickoff_task = StartFlowRun(project_name="Hello, World!", flow_name="My Cloud Flow")
""" logger = prefect.context.get("logger") logger.info(flow_name) logger.info(nb_id) #logger.info(signal) signal_to_use = signal.serialize() logger.info(signal_to_use) logger.info(parameters) if nb_id in signal: return super(StartSkippableRun, self).run( flow_name=self.flow_name, project_name=project_name, parameters=parameters, new_flow_context=new_flow_context, run_name=run_name, scheduled_start_time=scheduled_start_time, **kwargs, ) else: raise signals.SKIP(message=f"{nb_id} skipped")```
k
Did you call a
StartSkippableRun
run()? I think that’s just the init there?
m
No, I define them and then call a bunch of
set_dependencies
cuz they've got to run in a particular order. example:
nba1z_results.set_dependencies(upstream_tasks=[nba1a_results, nba1b_results],)
@Kevin Kho Not sure if this helps, but the error I have in particular is
Unexpected error: TypeError("argument of type 'Parameter' is not iterable")
, I think because of the
if nb_id in signal:
line.
k
I am trying a minimal example and my passing of a parameter is not working:
Copy code
from prefect import Flow, Task, Parameter
import prefect
from prefect.utilities.tasks import defaults_from_attrs
from typing import Any

class MyTask(Task):

    def __init__(self,signal: str = None,**kwargs: Any):
        self.signal = signal
        super().__init__(**kwargs)

    @defaults_from_attrs("signal")
    def run(self,signal=None) -> str:
        <http://prefect.context.logger.info|prefect.context.logger.info>(signal)
        return signal

with Flow("test") as flow:
    x = Parameter("x", default="test")()
    MyTask(x)()

flow.run()
I think you might need to use the
.run()
instead of the
init()
for the Parameter
m
I think it's an issue with the Storage? Cuz the issue is more when I register the flow from a Jupyter notebook and then try to run it from the Prefect UI
If I do
.run()
it tries to run it immediately when I run the cell.
k
When I say run, I don’t mean actually call
.run()
. I mean use the second parenthesis like
MyTask(x)()
. Is that what you mean also?
m
oh no, I tried
.run()
k
I mean
MyTask(init_stuff_here)(run_stuff_here)
so put the parameters in the second
()
when they will have values.
m
trying this
Copy code
nba1a_results = StartSkippableRun(
        nb_id="1a",
        flow_name="TestNBA1a",
        project_name=project_name,
        wait=True,
        parameters=task_parameters,
    )(signal=signal)
k
Exactly!
m
Success! Took a bit of futzing with exactly which args go where, but this worked
Copy code
nba1a_results = StartSkippableRun(flow_name="TestNBA1a", wait=True,)(
        nb_id="1a",
        project_name=project_name,
        parameters=task_parameters,
        signal=signal,
    )
Thanks!!!
k
Nice!
a
I know this is an older thread, but I'm new to Prefect and this is confusing me. The run method is defined with the defaults_from_attrs annotation so shouldn't anything specified in the init also be passed to the run?
k
It is if there is nothing yep! Is there a specific problem you have?
I think he inherited from another task that and then passed the kwargs to the
super()
call. Those kwargs are not
attrs
so they are not carried over
a
but in the initial example, it is being passed as a named parameter "signal" so shouldn't that be carried over?
I don't have a specific problem yet, I'll post a new thread when I do. 🙏
k
signal is. Those other inputs like
wait
and
project_name
might not be
a
The original poster was having a problem with the signal argument. And I am having the same problem now with the bucket_name and blob arguments to GCSBlobExists with this code
Copy code
from prefect import task, Flow
from prefect.tasks.gcp import GCSCopy, GCSBlobExists
from google.cloud import storage

source_bucket = "some-bucket"
source_blob = "some-file"
dest_bucket = "some-bucket"
dest_blob = "file-copy"

storage_client = storage.Client()

copy_file_task = GCSCopy(
            source_bucket=source_bucket,
            source_blob=source_blob,
            dest_bucket=dest_bucket,
            dest_blob=dest_blob,
            name="copyTask")

file_exists_task = GCSBlobExists(
    bucket_name=dest_bucket,
    blob=dest_blob,
)

@task (name="delete_file")
def delete_cloud_file(bucket_name: str, blob_name: str
) -> None:
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.get_blob(blob_name)
    print("deleting blob " + blob_name)
    blob.delete()



def main():
    with Flow("gcpMoveTest") as flow:
        delete_task = delete_cloud_file(source_bucket, source_blob)
        delete_task.set_upstream(file_exists_task)
        file_exists_task.set_upstream(copy_file_task)

    flow.run()


if __name__ == "__main__":
    main()
k
These tasks seem to be written correctly. What is the error you are running into?
Why did you write your Flow in this order? You can do:
Copy code
with Flow("gcpMoveTest") as flow:
        copy = copy_file_task()
        exists = file_exists_task(upstream_tasks=[copy])
        delete = delete_cloud_file(upstream_tasks=[exists])

    flow.run()
a
this is the error I get
Copy code
[2022-02-03 12:13:57-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'gcpMoveTest'
[2022-02-03 12:13:57-0500] INFO - prefect.TaskRunner | Task 'copyTask': Starting task run...
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'copyTask': Finished task run for task with final state: 'Success'
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Starting task run...
[2022-02-03 12:13:58-0500] ERROR - prefect.TaskRunner | Task 'GCSBlobExists': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/tasks/gcp/storage.py", line 525, in run
    raise ValueError("Missing bucket_name or blob")
ValueError: Missing bucket_name or blob
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Finished task run for task with final state: 'Failed'
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'delete_file': Starting task run...
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'delete_file': Finished task run for task with final state: 'TriggerFailed'
[2022-02-03 12:13:58-0500] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
thanks for the suggestion to simplify the code. I still get a similar error
Copy code
Traceback (most recent call last):
  File "/Users/ahaessly/code/poc_prefect/workflows/GcpMoveFunctional.py", line 48, in <module>
    main()
  File "/Users/ahaessly/code/poc_prefect/workflows/GcpMoveFunctional.py", line 42, in main
    delete = delete_cloud_file(upstream_tasks=[exists])
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/core/task.py", line 633, in __call__
    new.bind(
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/core/task.py", line 674, in bind
    callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
  File "/usr/local/Cellar/python@3.10/3.10.0_2/Frameworks/Python.framework/Versions/3.10/lib/python3.10/inspect.py", line 3177, in bind
    return self._bind(args, kwargs)
  File "/usr/local/Cellar/python@3.10/3.10.0_2/Frameworks/Python.framework/Versions/3.10/lib/python3.10/inspect.py", line 3092, in _bind
    raise TypeError(msg) from None
TypeError: missing a required argument: 'bucket_name'
k
I’ll take a look at that task
I think that new error is your custom task and you may not have passed the bucket name?
Copy code
delete_task = delete_cloud_file(source_bucket, source_blob, upstream_tasks=[...])
a
thanks. I think I understand now. I think it's related to the fact that within a flow context it makes a copy of a task to use. I assumed it would copy the attributes that I originally initialized the task with but I'm guessing that it doesn't.
@John Kitonyo fyi
👀 1