Mehdi Nazari
07/27/2021, 5:18 PMwith Flow("Example-Flow",
storage=Docker(
image_name="example_flow_import",
image_tag="dev",
dockerfile="<path/to/dockerfile>",
python_dependencies=["numpy", "pandas"],
files={}, # Trying to avoid this
env_vars={}, # Trying to avoid this
)) as flow:
Trevor Kramer
07/27/2021, 5:41 PMUnexpected error: TypeError('Object of type SUCCESS is not JSON serializable')
Traceback (most recent call last):
File "/root/.local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/root/.local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 911, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/root/.local/lib/python3.8/site-packages/prefect/engine/results/prefect_result.py", line 62, in write
new.location = self.serializer.serialize(new.value).decode("utf-8")
File "/root/.local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 110, in serialize
return json.dumps(value).encode()
File "/usr/local/lib/python3.8/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/local/lib/python3.8/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.8/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type SUCCESS is not JSON serializable
Jelle Vegter
07/27/2021, 7:14 PMchicago-joe
07/27/2021, 8:05 PMRoss Timm
07/27/2021, 8:11 PMMichael S
07/27/2021, 8:35 PMBrad I
07/27/2021, 8:37 PMdef main():
state = flow.run(executor=LocalExecutor())
parameters = {...}
Andre Muraro
07/27/2021, 8:39 PMdef terminal_state_handler(flow, state, reference_task_states):
for task_state in reference_task_states:
if task_state.is_skipped():
return Skipped(state.message, state.result, state.context, state.cached_inputs)
return state
which kind of works, however the UI is still counting the duration as if it was still running...An Hoang
07/27/2021, 10:03 PM21:15:07 | ERROR | Failed to retrieve task state with error: ClientError([{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}])
Traceback (most recent call last):
File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
task_run_info = self.client.get_task_run_info(
File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 1721, in get_task_run_info
result = self.graphql(mutation) # type: Any
File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 564, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}]
1. `Error: Task function was not provided required {parameter}`: Some upstream task was skipped and thus result were not passed down.
2. Setting up non-data dependency: the difference between
a. task_A.map(param1=param1); task_A.set_upstream(task_B)
b. task_A.map(param1=param1, upstream_task = [task_B])
c. task_A_result =task_A.map(param1=param1); task_A_result.set_upstream(task_B)
(First one is wrong, it will create another different instance of task_B
and task_A
unmapped, second one expects task_B
to return iterable and will error if it doesn't, third one is correct if task_B
doesn't return iterable.
4. In general, one of the best ways in debugging tasks' dependencies (when you have unit-tested each task independently) in a flow is to run it locally using flow_result = flow.run()
and then visualize with flow.visualize(flow_state= flow_result)
Jimmy Le
07/27/2021, 10:44 PMNick Coy
07/28/2021, 1:25 AMwith Flow('mapped_flow') as flow:
aws_list = aws_list_files(AWS_CREDENTIALS)
file = aws_download.map(
key = aws_list,
credentials = unmapped(AWS_CREDENTIALS),
bucket = unmapped('some_bucket'),
upstream_tasks=[unmapped(aws_list)])
format_df = format_data.map(file, aws_list, upstream_tasks=[unmapped(file)])
move_to_cloud = move_to_cloud_storage.map(format_df, aws_list, unmapped(GCP_CREDENTIALS), upstream_tasks=[unmapped(format_df)])
load_job = load_files_gcs_bq(GCP_CREDENTIALS, upstream_task=[move_to_cloud])
the problem is that the load_files_gcs_bq starts once all the mapped tasks enter a mapped state but none have actually run. I have been reading the mapping docs but I feel like I am missing something? any help would be greatly appreciatedmatta
07/28/2021, 3:11 AMBigQueryLoadGoogleCloudStorage
seems to not be inferring my GCP credentials? Not sure how to pass it, either (I can grab it as a PrefectSecret
fine, and can pass it directly to GCP functions)Zhilong Li
07/28/2021, 5:06 AMCreateNamespacedDeployment
task for standard k8s deployments, but I want to deploy a Seldon deployment where the crd starts with
apiVersion: <http://machinelearning.seldon.io/v1|machinelearning.seldon.io/v1>
kind: SeldonDeployment
Does anyone have an idea how i can do this deployment as a prefect task? Thanks so much!jake lee
07/28/2021, 9:34 AMmontardon
07/28/2021, 12:15 PMSamuel Hinton
07/28/2021, 1:33 PMAn Hoang
07/28/2021, 3:47 PMSKIP
signal inside the task or 2) use conditional task case
. I have provided the two scenarios on this gist https://gist.github.com/hoangthienan95/e0d8c3d73cb25f90f0d427c689ea80d8/revisions?diff=unified, along with the flow visualizations
In the Gist, initial version is SKIP
signal, and the second version are the modifications to make it use case
. I have tested both of the flows, they work and both satisfy my requirements so far, so is there any caveats I haven't thought of or best practices that I should consider to choose one over the other?Kurt Rhee
07/28/2021, 4:32 PMHugo Kitano
07/28/2021, 6:15 PMTim Enders
07/28/2021, 7:04 PMImportError: Using `prefect.tasks.gcp` requires Prefect to be installed with the "gcp" extra.
gcp
isn't listed and doesn't seem to fix the problemTim Enders
07/28/2021, 7:04 PMgoogle
?Harry Baker
07/28/2021, 8:25 PMHarry Baker
07/28/2021, 10:01 PMTraceback (most recent call last):
File "/home/ubuntu/anaconda3/envs/ca_covid_main/lib/python3.8/site-packages/prefect/agent/agent.py", line 384, in _deploy_flow_run
deployment_info = self.deploy_flow(flow_run)
File "/home/ubuntu/anaconda3/envs/ca_covid_main/lib/python3.8/site-packages/prefect/agent/local/agent.py", line 142, in deploy_flow
env = self.populate_env_vars(flow_run, run_config=run_config)
File "/home/ubuntu/anaconda3/envs/ca_covid_main/lib/python3.8/site-packages/prefect/agent/local/agent.py", line 199, in populate_env_vars
else os.getcwd()
FileNotFoundError: [Errno 2] No such file or directory
Harish
07/28/2021, 11:41 PMfrom prefect import Flow, task
from prefect.tasks.shell import ShellTask
import prefect
def test_on_cancel(flow, old_state, new_state):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(old_state)
<http://logger.info|logger.info>(new_state)
@task(on_failure=test_on_cancel, state_handlers=[test_on_cancel])
def plus_one(x):
"""A task that adds 1 to a number"""
return x + 1
@task(on_failure=test_on_cancel, state_handlers=[test_on_cancel])
def build_command():
return 'sleep 200'
run_in_bash = ShellTask(name='run a command in bash', on_failure=test_on_cancel, state_handlers=[test_on_cancel])
with Flow('Best Practices') as flow:
two = plus_one(1)
cmd = build_command()
shell_result = run_in_bash(command=cmd)
shell_result.set_upstream(two)
Danny Vilela
07/28/2021, 11:43 PMSendSlackNotificationTask
task at the end of my (linear and static) flow, but it fails to run if any of its parents failed. This means the notification won’t run, so I won’t know things failed, which is probably bad.
My question: is there a way to make a task run regardless of its parent tasks’ states? I know there’s the case
option — is that a better fit here? Can case
work with a task’s state?Danny Vilela
07/29/2021, 1:44 AMtimeout
work in conjunction with max_retries
? Is a “timeout” the same as a “failure”?Chris L.
07/29/2021, 2:10 AMflow.is_registered()
). Otherwise, is there a gist for using prefect.client
and working with the graphql response? Thanks in advanceArran
07/29/2021, 8:20 AMSamuel Tober
07/29/2021, 12:07 PMTypeError: Task is not iterable.
Samuel Tober
07/29/2021, 12:08 PM