Matt Alhonte
11/16/2021, 1:14 AMStartFlowRun and one of the arguments to it comes from a parameter, and the Task Schematic doesn't even show it exists.  ie,
signal = Parameter(
        "signal", default="1a,1b,2a,2b,1c,1d,2d,1e,2e,1f,2f,1z,3a,3b"
    )
nba1a_results = StartSkippableRun(
        signal=signal,
        nb_id="1a",
        flow_name="TestNBA1a",
        project_name=project_name,
        wait=True,
        parameters=task_parameters,
    )
signal doesn't seem to register.Kevin Kho
StartSkippableRun?Matt Alhonte
11/16/2021, 1:34 AMclass StartSkippableRun(StartFlowRun):
    """
    Task used to kick off a flow run using Prefect Core's server or Prefect Cloud.  If multiple
    versions of the flow are found, this task will kick off the most recent unarchived version.
    Args:
        - flow_name (str, optional): the name of the flow to schedule; this value may also be
            provided at run time
        - project_name (str, optional): if running with Cloud as a backend, this is the project
            in which the flow is located; this value may also be provided at runtime. If
            running with Prefect Core's server as the backend, this should not be provided.
        - parameters (dict, optional): the parameters to pass to the flow run being scheduled;
            this value may also be provided at run time
        - new_flow_context (dict, optional): the optional run context for the new flow run
        - run_name (str, optional): name to be set for the flow run
        - wait (bool, optional): whether to wait the triggered flow run's state; if True, this
            task will wait until the flow run is complete, and then reflect the corresponding
            state as the state of this task.  Defaults to `False`.
        - scheduled_start_time (datetime, optional): the time to schedule the execution
            for; if not provided, defaults to now
        - **kwargs (dict, optional): additional keyword arguments to pass to the Task constructor
    """
    def __init__(
        self,
        signal: str = None,
        nb_id: str = None,
        flow_name: str = None,
        project_name: str = None,
        parameters: dict = None,
        wait: bool = False,
        new_flow_context: dict = None,
        run_name: str = None,
        scheduled_start_time: datetime.datetime = None,
        **kwargs: Any,
    ):
        self.signal = signal
        self.nb_id = nb_id
        self.flow_name = flow_name
        self.project_name = project_name
        self.parameters = parameters
        self.new_flow_context = new_flow_context
        self.run_name = run_name
        self.wait = wait
        self.scheduled_start_time = scheduled_start_time
        if flow_name:
            kwargs.setdefault("name", f"Flow {flow_name}")
        super(StartFlowRun, self).__init__(**kwargs)
    @defaults_from_attrs(
        "signal",
        "nb_id",
        "flow_name",
        "project_name",
        "parameters",
        "new_flow_context",
        "run_name",
        "scheduled_start_time",
    )
    def run(
        self,
        signal: str = None,
        nb_id: str = None,
        flow_name: str = None,
        project_name: str = None,
        parameters: dict = None,
        idempotency_key: str = None,
        new_flow_context: dict = None,
        run_name: str = None,
        scheduled_start_time: datetime.datetime = None,
        **kwargs: Any,
    ) -> str:
        """
        Run method for the task; responsible for scheduling the specified flow run.
        Args:
            - flow_name (str, optional): the name of the flow to schedule; if not provided,
                this method will use the flow name provided at initialization
            - project_name (str, optional): the Cloud project in which the flow is located; if
                not provided, this method will use the project provided at initialization. If
                running with Prefect Core's server as the backend, this should not be provided.
            - parameters (dict, optional): the parameters to pass to the flow run being
                scheduled; if not provided, this method will use the parameters provided at
                initialization
            - idempotency_key (str, optional): an optional idempotency key for scheduling the
                flow run; if provided, ensures that only one run is created if this task is retried
                or rerun with the same inputs.  If not provided, the current flow run ID will be used.
            - new_flow_context (dict, optional): the optional run context for the new flow run
            - run_name (str, optional): name to be set for the flow run
            - scheduled_start_time (datetime, optional): the time to schedule the execution
                for; if not provided, defaults to now
        Returns:
            - str: the ID of the newly-scheduled flow run
        Raises:
            - ValueError: if flow was not provided, cannot be found, or if a project name was
                not provided while using Cloud as a backend
        Example:
            ```python
            from prefect.tasks.prefect.flow_run import StartFlowRun
            kickoff_task = StartFlowRun(project_name="Hello, World!", flow_name="My Cloud Flow")
        """
        logger = prefect.context.get("logger")
        logger.info(flow_name)
        logger.info(nb_id)
        #logger.info(signal)
        signal_to_use = signal.serialize()
        logger.info(signal_to_use)
        logger.info(parameters)
        if nb_id in signal:
            return super(StartSkippableRun, self).run(
                flow_name=self.flow_name,
                project_name=project_name,
                parameters=parameters,
                new_flow_context=new_flow_context,
                run_name=run_name,
                scheduled_start_time=scheduled_start_time,
                **kwargs,
            )
        else:
            raise signals.SKIP(message=f"{nb_id} skipped")```Kevin Kho
StartSkippableRun run()? I think that’s just the init there?Matt Alhonte
11/16/2021, 1:40 AMset_dependencies cuz they've got to run in a particular order.  example:
nba1z_results.set_dependencies(upstream_tasks=[nba1a_results, nba1b_results],)Matt Alhonte
11/16/2021, 1:48 AMUnexpected error: TypeError("argument of type 'Parameter' is not iterable") , I think because of the if nb_id in signal: line.Kevin Kho
from prefect import Flow, Task, Parameter
import prefect
from prefect.utilities.tasks import defaults_from_attrs
from typing import Any
class MyTask(Task):
    def __init__(self,signal: str = None,**kwargs: Any):
        self.signal = signal
        super().__init__(**kwargs)
    @defaults_from_attrs("signal")
    def run(self,signal=None) -> str:
        <http://prefect.context.logger.info|prefect.context.logger.info>(signal)
        return signal
with Flow("test") as flow:
    x = Parameter("x", default="test")()
    MyTask(x)()
flow.run()Kevin Kho
.run() instead of the init() for the ParameterMatt Alhonte
11/16/2021, 1:54 AMMatt Alhonte
11/16/2021, 1:54 AM.run() it tries to run it immediately when I run the cell.Kevin Kho
.run(). I mean use the second parenthesis like MyTask(x)(). Is that what you mean also?Matt Alhonte
11/16/2021, 1:56 AM.run()Kevin Kho
MyTask(init_stuff_here)(run_stuff_here)  so put the parameters in the second () when they will have values.Matt Alhonte
11/16/2021, 1:59 AMnba1a_results = StartSkippableRun(
        nb_id="1a",
        flow_name="TestNBA1a",
        project_name=project_name,
        wait=True,
        parameters=task_parameters,
    )(signal=signal)Kevin Kho
Matt Alhonte
11/16/2021, 2:16 AMnba1a_results = StartSkippableRun(flow_name="TestNBA1a", wait=True,)(
        nb_id="1a",
        project_name=project_name,
        parameters=task_parameters,
        signal=signal,
    )Matt Alhonte
11/16/2021, 2:16 AMKevin Kho
Andrea Haessly
02/03/2022, 3:32 PMKevin Kho
Kevin Kho
super() call. Those kwargs are not attrs so they are not carried overAndrea Haessly
02/03/2022, 3:47 PMAndrea Haessly
02/03/2022, 3:49 PMKevin Kho
wait and project_name might not beAndrea Haessly
02/03/2022, 5:16 PMfrom prefect import task, Flow
from prefect.tasks.gcp import GCSCopy, GCSBlobExists
from google.cloud import storage
source_bucket = "some-bucket"
source_blob = "some-file"
dest_bucket = "some-bucket"
dest_blob = "file-copy"
storage_client = storage.Client()
copy_file_task = GCSCopy(
            source_bucket=source_bucket,
            source_blob=source_blob,
            dest_bucket=dest_bucket,
            dest_blob=dest_blob,
            name="copyTask")
file_exists_task = GCSBlobExists(
    bucket_name=dest_bucket,
    blob=dest_blob,
)
@task (name="delete_file")
def delete_cloud_file(bucket_name: str, blob_name: str
) -> None:
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.get_blob(blob_name)
    print("deleting blob " + blob_name)
    blob.delete()
def main():
    with Flow("gcpMoveTest") as flow:
        delete_task = delete_cloud_file(source_bucket, source_blob)
        delete_task.set_upstream(file_exists_task)
        file_exists_task.set_upstream(copy_file_task)
    flow.run()
if __name__ == "__main__":
    main()Kevin Kho
Kevin Kho
with Flow("gcpMoveTest") as flow:
        copy = copy_file_task()
        exists = file_exists_task(upstream_tasks=[copy])
        delete = delete_cloud_file(upstream_tasks=[exists])
    flow.run()Andrea Haessly
02/03/2022, 7:14 PM[2022-02-03 12:13:57-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'gcpMoveTest'
[2022-02-03 12:13:57-0500] INFO - prefect.TaskRunner | Task 'copyTask': Starting task run...
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'copyTask': Finished task run for task with final state: 'Success'
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Starting task run...
[2022-02-03 12:13:58-0500] ERROR - prefect.TaskRunner | Task 'GCSBlobExists': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/tasks/gcp/storage.py", line 525, in run
    raise ValueError("Missing bucket_name or blob")
ValueError: Missing bucket_name or blob
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Finished task run for task with final state: 'Failed'
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'delete_file': Starting task run...
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'delete_file': Finished task run for task with final state: 'TriggerFailed'
[2022-02-03 12:13:58-0500] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.Andrea Haessly
02/03/2022, 7:15 PMTraceback (most recent call last):
  File "/Users/ahaessly/code/poc_prefect/workflows/GcpMoveFunctional.py", line 48, in <module>
    main()
  File "/Users/ahaessly/code/poc_prefect/workflows/GcpMoveFunctional.py", line 42, in main
    delete = delete_cloud_file(upstream_tasks=[exists])
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/core/task.py", line 633, in __call__
    new.bind(
  File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/core/task.py", line 674, in bind
    callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
  File "/usr/local/Cellar/python@3.10/3.10.0_2/Frameworks/Python.framework/Versions/3.10/lib/python3.10/inspect.py", line 3177, in bind
    return self._bind(args, kwargs)
  File "/usr/local/Cellar/python@3.10/3.10.0_2/Frameworks/Python.framework/Versions/3.10/lib/python3.10/inspect.py", line 3092, in _bind
    raise TypeError(msg) from None
TypeError: missing a required argument: 'bucket_name'Kevin Kho
Kevin Kho
delete_task = delete_cloud_file(source_bucket, source_blob, upstream_tasks=[...])Andrea Haessly
02/03/2022, 7:40 PMAndrea Haessly
02/18/2022, 6:41 PM