Philip MacMenamin
09/26/2022, 3:19 PM@task(name="Task A")
def task_a(val):
if random.random() > 0.5:
raise ValueError(f"Non-deterministic error has occurred.{val}")
@task
def send_log(val):
<http://logger.info|logger.info>(f"hello {val}")
l = [1,2,3,4]
with Flow("Trigger example") as flow:
ta_s = task_a.map(val=l)
sl_s = send_log.map(val=l, upstream_tasks=[ta_s])
# flow run should be success if ANY of task_a's succeeded.
flow.set_reference_tasks([something])
Matt Delacour
09/26/2022, 3:38 PMWilliam Wolfe-McGuire
09/26/2022, 4:14 PMTraceback (most recent call last):
File "/home/wwolfe-mcguire/.conda/envs/dev_env/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/home/wwolfe-mcguire/.conda/envs/dev_env/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
executors.prepare_upstream_states_for_mapping(
File "/home/wwolfe-mcguire/.conda/envs/dev_env/lib/python3.9/site-packages/prefect/utilities/executors.py", line 681, in prepare_upstream_states_for_mapping
value = upstream_state.result[i]
KeyError: 0
[2022-09-26 15:42:45+0000] ERROR - prefect.ml-graph-energy | Unexpected error occured in FlowRunner: KeyError(0)
I wasn't able to find other examples of a similar error online and was hoping someone could give me some hints about how to start debugging this. thanksChris Gunderson
09/26/2022, 4:27 PMaws_credentials_block = AwsCredentials.load("****-user")
s3_block = S3.load("***-s3")
s3_bucket_block = S3Bucket(
bucket_name=s3_block.bucket_path,
aws_credentials=aws_credentials_block,
basepath=f"Allocations/{date.today().year}"
)
output_file = f'''SR Allocations {datetime.now().strftime('%Y%m%d-%H%M%S')}.csv'''
bytes_to_write = df_alloc.to_csv(None).encode()
csv_file = s3_bucket_block.write_path(path=output_file, content=bytes_to_write)
<http://logging.info|logging.info>('Filtering for **** Trades: Rows = %s, Accounts = %s' % (len(df_alloc), df_alloc.custodianAccnt.nunique()))
return csv_file
@flow(name = "***** Allocations")
def ****AllocationsFlow():
try:
slack_webhook_block = SlackWebhook.load("****-webhook")
state = allocations_process(return_state = True)
excel_file = state.result()
if 'Completed'== state.name:
slack_webhook_block.notify("**** Allocations was successful")
else:
slack_webhook_block.notify("***** Allocations failed")
Error:
cannot pickle 'coroutine' object
11:23:31.736 | INFO | Task run 'Fidelity Allocations-e728df66-0' - Crash detected! Execution was interrupted by an unexpected exception.
11:23:32.443 | ERROR | Flow run 'visionary-cat' - Finished in state Failed('1/1 states failed.')
Traceback (most recent call last):
File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/orion.py", line 82, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/engine.py", line 239, in create_then_begin_flow_run
return state.result()
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result
state.result()
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
TypeError: cannot pickle 'coroutine' objectMichael Hadorn
09/26/2022, 4:40 PMKeith
09/26/2022, 4:56 PMfrom prefect2_flow import extract_load_transform
from prefect.deployments import Deployment
from prefect.infrastructure import KubernetesJob
from prefect.orion.schemas.schedules import CronSchedule
schedule = CronSchedule(cron="15 * * * *", timezone="UTC")
deployment = Deployment().build_from_flow(
flow=extract_load_transform,
name="test_hourly_elt",
parameters={
'collection_duration': '1h',
'uri': '<https://google.com>',
},
skip_upload=True,
schedule=schedule,
tags=["test"],
version=2,
work_queue_name="test-kubernetes",
infrastructure=KubernetesJob(
finished_job_ttl=30,
image="us-central1-docker.pkg.dev/.../prefect2-flows/elt:latest",
image_pull_policy="Always",
namespace="test-prefect2",
pod_watch_timeout_seconds=180,
job={
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {"labels": {}},
"spec": {
"template": {
"spec": {
"parallelism": 1,
"completions": 1,
"restartPolicy": "Never",
"containers": [
{
"name": "prefect-job",
"env": [],
"resources": {
"requests": {
"memory": "5Gi",
"cpu": "2",
"ephemeral-storage": "1Gi",
}
}
}
],
}
}
},
}
)
)
if __name__ == "__main__":
deployment.apply()
1. Is it okay to set skip_upload
to true? Since we are using Docker I'm not sure what the benefit of uploading our project to GCS is.
2. The Prefect API has a parameter for is_schedule_active
but it doesn't look like that parameter has made it over to the Python API b/c when I try to add it above it gives me an error about not including more parameters than needed, is this something I can contribute to or add functionality for?
3. The k8s job configuration is pretty verbose to request resources above the default, is there a better way to set these that I am missing?
Thank you for the time and any help provided!Nathaniel Russell
09/26/2022, 5:10 PMFROM prefecthq/prefect:2.4.2-python3.9
COPY service.py .
COPY requirements.txt .
RUN pip install -r requirements.txt
CMD [ "service.handler" ]
For example this doesn't work because I am not using aws' base image so CMD [ "service.handler" ] isn't properly setting the handlerRoger Webb
09/26/2022, 7:28 PMLeon Kozlowski
09/26/2022, 7:40 PMSean Turner
09/26/2022, 7:42 PMRemoteFileSystem
.
rfs = RemoteFileSystem(basepath="<s3://bucket-name/some/path/on/bucket>")
cfg = rfs.read_path("base_config.yaml")
I end up with cfg
being a coroutine
. I imagine this returns bytes but there aren't any helper methods and it's not clear how I'm supposed to proceed? I am interested in reading this file into memory.Nathaniel Russell
09/26/2022, 8:12 PMBradley Hurley
09/26/2022, 8:14 PM"No heartbeat detected from the remote task"
issue a few times and its well documented here to consult https://discourse.prefect.io/t/flow-is-failing-with-an-error-message-no-heartbeat-detected-from-the-remote-task/79.badasstronaut
09/26/2022, 9:13 PMDaskTaskRunner
class has a Dask client under self._client
, but it does not look like that’s intended to be used by the flow or tasks. Any tips are appricated!badasstronaut
09/27/2022, 1:07 AMZac Hooper
09/27/2022, 2:13 AMBen Ayers-Glassey
09/27/2022, 2:48 AMsecret = prefect.client.secrets.Secret(SFTP_PASSWORD_SECRET_NAME)
return secret.get()
...and it fails with a prefect.exceptions.ClientError
from the GraphQL API.Ben Ayers-Glassey
09/27/2022, 3:37 AMGuillaume G
09/27/2022, 7:25 AMKubernetesJob
? I want to pull a image from a private registry.
reception_deployment : Deployment = Deployment.build_from_flow(
name="private-flow",
flow=main_flow,
output="deployment-private-flow.yaml",
description="private-flow",
version="snapshot",
work_queue_name="kubernetes",
infrastructure=KubernetesJob(),
infra_overrides=dict(
image="myprivate-image:latest",
env={},
finished_job_ttl=300)
)
I can not pull "myprivate-image:latest"
Do I have to use service_account
? 🤨
Thank youMalavika S Menon
09/27/2022, 7:26 AM@flow
def random_flow():
id = Flow.flow_id
return 1
How do I access any member say flow_id inside the flow, when it is being run?Hongbo Miao
09/27/2022, 8:04 AMawait
. Posted the answer. Hi Prefect team, worth updating that demo. Thanks!flapili
09/27/2022, 8:17 AMflapili
09/27/2022, 8:18 AMDeepanshu Aggarwal
09/27/2022, 9:29 AMDavid
09/27/2022, 9:52 AMMichal Luščon
09/27/2022, 11:48 AMHa Pham
09/27/2022, 12:18 PMstart_datetime
and end_datetime
to go together and start should comes first, not after like thisHamza Naanani
09/27/2022, 1:47 PMoverride
parameter in prefect deployment build
to change kubernetes job parameters ?Tim Helfensdörfer
09/27/2022, 2:32 PMif profile_name is None:
return profile_map.get('default', {})
elif profile_name not in profile_map:
....
So if you do not want to use a profile you have to overwrite profile_name
manually with None
in your code and then save it again. A fix would be, as an example, a checkbox next to nullable fields to set this field to null.Stephen Herron
09/27/2022, 2:44 PMChris Gunderson
09/27/2022, 3:00 PMChris Gunderson
09/27/2022, 3:00 PMMichael Adkins
09/27/2022, 3:07 PMChris Gunderson
09/27/2022, 3:09 PM