Matt Alhonte
11/16/2021, 1:14 AMStartFlowRun
and one of the arguments to it comes from a parameter, and the Task Schematic doesn't even show it exists. ie,
signal = Parameter(
"signal", default="1a,1b,2a,2b,1c,1d,2d,1e,2e,1f,2f,1z,3a,3b"
)
nba1a_results = StartSkippableRun(
signal=signal,
nb_id="1a",
flow_name="TestNBA1a",
project_name=project_name,
wait=True,
parameters=task_parameters,
)
signal
doesn't seem to register.Kevin Kho
StartSkippableRun
?Matt Alhonte
11/16/2021, 1:34 AMclass StartSkippableRun(StartFlowRun):
"""
Task used to kick off a flow run using Prefect Core's server or Prefect Cloud. If multiple
versions of the flow are found, this task will kick off the most recent unarchived version.
Args:
- flow_name (str, optional): the name of the flow to schedule; this value may also be
provided at run time
- project_name (str, optional): if running with Cloud as a backend, this is the project
in which the flow is located; this value may also be provided at runtime. If
running with Prefect Core's server as the backend, this should not be provided.
- parameters (dict, optional): the parameters to pass to the flow run being scheduled;
this value may also be provided at run time
- new_flow_context (dict, optional): the optional run context for the new flow run
- run_name (str, optional): name to be set for the flow run
- wait (bool, optional): whether to wait the triggered flow run's state; if True, this
task will wait until the flow run is complete, and then reflect the corresponding
state as the state of this task. Defaults to `False`.
- scheduled_start_time (datetime, optional): the time to schedule the execution
for; if not provided, defaults to now
- **kwargs (dict, optional): additional keyword arguments to pass to the Task constructor
"""
def __init__(
self,
signal: str = None,
nb_id: str = None,
flow_name: str = None,
project_name: str = None,
parameters: dict = None,
wait: bool = False,
new_flow_context: dict = None,
run_name: str = None,
scheduled_start_time: datetime.datetime = None,
**kwargs: Any,
):
self.signal = signal
self.nb_id = nb_id
self.flow_name = flow_name
self.project_name = project_name
self.parameters = parameters
self.new_flow_context = new_flow_context
self.run_name = run_name
self.wait = wait
self.scheduled_start_time = scheduled_start_time
if flow_name:
kwargs.setdefault("name", f"Flow {flow_name}")
super(StartFlowRun, self).__init__(**kwargs)
@defaults_from_attrs(
"signal",
"nb_id",
"flow_name",
"project_name",
"parameters",
"new_flow_context",
"run_name",
"scheduled_start_time",
)
def run(
self,
signal: str = None,
nb_id: str = None,
flow_name: str = None,
project_name: str = None,
parameters: dict = None,
idempotency_key: str = None,
new_flow_context: dict = None,
run_name: str = None,
scheduled_start_time: datetime.datetime = None,
**kwargs: Any,
) -> str:
"""
Run method for the task; responsible for scheduling the specified flow run.
Args:
- flow_name (str, optional): the name of the flow to schedule; if not provided,
this method will use the flow name provided at initialization
- project_name (str, optional): the Cloud project in which the flow is located; if
not provided, this method will use the project provided at initialization. If
running with Prefect Core's server as the backend, this should not be provided.
- parameters (dict, optional): the parameters to pass to the flow run being
scheduled; if not provided, this method will use the parameters provided at
initialization
- idempotency_key (str, optional): an optional idempotency key for scheduling the
flow run; if provided, ensures that only one run is created if this task is retried
or rerun with the same inputs. If not provided, the current flow run ID will be used.
- new_flow_context (dict, optional): the optional run context for the new flow run
- run_name (str, optional): name to be set for the flow run
- scheduled_start_time (datetime, optional): the time to schedule the execution
for; if not provided, defaults to now
Returns:
- str: the ID of the newly-scheduled flow run
Raises:
- ValueError: if flow was not provided, cannot be found, or if a project name was
not provided while using Cloud as a backend
Example:
```python
from prefect.tasks.prefect.flow_run import StartFlowRun
kickoff_task = StartFlowRun(project_name="Hello, World!", flow_name="My Cloud Flow")
"""
logger = prefect.context.get("logger")
logger.info(flow_name)
logger.info(nb_id)
#logger.info(signal)
signal_to_use = signal.serialize()
logger.info(signal_to_use)
logger.info(parameters)
if nb_id in signal:
return super(StartSkippableRun, self).run(
flow_name=self.flow_name,
project_name=project_name,
parameters=parameters,
new_flow_context=new_flow_context,
run_name=run_name,
scheduled_start_time=scheduled_start_time,
**kwargs,
)
else:
raise signals.SKIP(message=f"{nb_id} skipped")```Kevin Kho
StartSkippableRun
run()? I think that’s just the init there?Matt Alhonte
11/16/2021, 1:40 AMset_dependencies
cuz they've got to run in a particular order. example:
nba1z_results.set_dependencies(upstream_tasks=[nba1a_results, nba1b_results],)
Matt Alhonte
11/16/2021, 1:48 AMUnexpected error: TypeError("argument of type 'Parameter' is not iterable")
, I think because of the if nb_id in signal:
line.Kevin Kho
from prefect import Flow, Task, Parameter
import prefect
from prefect.utilities.tasks import defaults_from_attrs
from typing import Any
class MyTask(Task):
def __init__(self,signal: str = None,**kwargs: Any):
self.signal = signal
super().__init__(**kwargs)
@defaults_from_attrs("signal")
def run(self,signal=None) -> str:
<http://prefect.context.logger.info|prefect.context.logger.info>(signal)
return signal
with Flow("test") as flow:
x = Parameter("x", default="test")()
MyTask(x)()
flow.run()
Kevin Kho
.run()
instead of the init()
for the ParameterMatt Alhonte
11/16/2021, 1:54 AMMatt Alhonte
11/16/2021, 1:54 AM.run()
it tries to run it immediately when I run the cell.Kevin Kho
.run()
. I mean use the second parenthesis like MyTask(x)()
. Is that what you mean also?Matt Alhonte
11/16/2021, 1:56 AM.run()
Kevin Kho
MyTask(init_stuff_here)(run_stuff_here)
so put the parameters in the second ()
when they will have values.Matt Alhonte
11/16/2021, 1:59 AMnba1a_results = StartSkippableRun(
nb_id="1a",
flow_name="TestNBA1a",
project_name=project_name,
wait=True,
parameters=task_parameters,
)(signal=signal)
Kevin Kho
Matt Alhonte
11/16/2021, 2:16 AMnba1a_results = StartSkippableRun(flow_name="TestNBA1a", wait=True,)(
nb_id="1a",
project_name=project_name,
parameters=task_parameters,
signal=signal,
)
Matt Alhonte
11/16/2021, 2:16 AMKevin Kho
Andrea Haessly
02/03/2022, 3:32 PMKevin Kho
Kevin Kho
super()
call. Those kwargs are not attrs
so they are not carried overAndrea Haessly
02/03/2022, 3:47 PMAndrea Haessly
02/03/2022, 3:49 PMKevin Kho
wait
and project_name
might not beAndrea Haessly
02/03/2022, 5:16 PMfrom prefect import task, Flow
from prefect.tasks.gcp import GCSCopy, GCSBlobExists
from google.cloud import storage
source_bucket = "some-bucket"
source_blob = "some-file"
dest_bucket = "some-bucket"
dest_blob = "file-copy"
storage_client = storage.Client()
copy_file_task = GCSCopy(
source_bucket=source_bucket,
source_blob=source_blob,
dest_bucket=dest_bucket,
dest_blob=dest_blob,
name="copyTask")
file_exists_task = GCSBlobExists(
bucket_name=dest_bucket,
blob=dest_blob,
)
@task (name="delete_file")
def delete_cloud_file(bucket_name: str, blob_name: str
) -> None:
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.get_blob(blob_name)
print("deleting blob " + blob_name)
blob.delete()
def main():
with Flow("gcpMoveTest") as flow:
delete_task = delete_cloud_file(source_bucket, source_blob)
delete_task.set_upstream(file_exists_task)
file_exists_task.set_upstream(copy_file_task)
flow.run()
if __name__ == "__main__":
main()
Kevin Kho
Kevin Kho
with Flow("gcpMoveTest") as flow:
copy = copy_file_task()
exists = file_exists_task(upstream_tasks=[copy])
delete = delete_cloud_file(upstream_tasks=[exists])
flow.run()
Andrea Haessly
02/03/2022, 7:14 PM[2022-02-03 12:13:57-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'gcpMoveTest'
[2022-02-03 12:13:57-0500] INFO - prefect.TaskRunner | Task 'copyTask': Starting task run...
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'copyTask': Finished task run for task with final state: 'Success'
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Starting task run...
[2022-02-03 12:13:58-0500] ERROR - prefect.TaskRunner | Task 'GCSBlobExists': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/tasks/gcp/storage.py", line 525, in run
raise ValueError("Missing bucket_name or blob")
ValueError: Missing bucket_name or blob
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'GCSBlobExists': Finished task run for task with final state: 'Failed'
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'delete_file': Starting task run...
[2022-02-03 12:13:58-0500] INFO - prefect.TaskRunner | Task 'delete_file': Finished task run for task with final state: 'TriggerFailed'
[2022-02-03 12:13:58-0500] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Andrea Haessly
02/03/2022, 7:15 PMTraceback (most recent call last):
File "/Users/ahaessly/code/poc_prefect/workflows/GcpMoveFunctional.py", line 48, in <module>
main()
File "/Users/ahaessly/code/poc_prefect/workflows/GcpMoveFunctional.py", line 42, in main
delete = delete_cloud_file(upstream_tasks=[exists])
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/core/task.py", line 633, in __call__
new.bind(
File "/Users/ahaessly/code/poc_prefect/local-dev/lib/python3.10/site-packages/prefect/core/task.py", line 674, in bind
callargs = dict(signature.bind(*args, **kwargs).arguments) # type: Dict
File "/usr/local/Cellar/python@3.10/3.10.0_2/Frameworks/Python.framework/Versions/3.10/lib/python3.10/inspect.py", line 3177, in bind
return self._bind(args, kwargs)
File "/usr/local/Cellar/python@3.10/3.10.0_2/Frameworks/Python.framework/Versions/3.10/lib/python3.10/inspect.py", line 3092, in _bind
raise TypeError(msg) from None
TypeError: missing a required argument: 'bucket_name'
Kevin Kho
Kevin Kho
delete_task = delete_cloud_file(source_bucket, source_blob, upstream_tasks=[...])
Andrea Haessly
02/03/2022, 7:40 PMAndrea Haessly
02/18/2022, 6:41 PM