Tom Manterfield
05/13/2022, 11:31 AM2.0b4
, some great stuff in there. One thing that caught my eye was:
Futures from async tasks in sync flows are now marked as synchronousThe docs section for async tasks just says ‘Coming soon’. Is this a feature that’s already available and the docs are out of date or am I misunderstanding what is meant here?
Guilherme Petris
05/13/2022, 11:35 AMAnurag Bajpai
05/13/2022, 12:53 PMNaga Sravika Bodapati
05/13/2022, 2:00 PMJessica Smith
05/13/2022, 2:33 PMJohn Kang
05/13/2022, 2:44 PMJason
05/13/2022, 3:05 PMRETRY
, is it possible to queue an upstream task that had already passed? For example:
drop_glue_database = Parameter()
case(drop_glue_database, True):
drop_stuff()
save_to_parquet()
If save to parquet fails, I'd like the task to drop and rebuild the database, which is usually unnecessary and parameterized.JK
05/13/2022, 3:09 PMprefect.exceptions.ClientError: [{'path': ['secret_value'], 'message': 'No value found for the requested key ("xxx") in tenant xxx', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Andrew Lawlor
05/13/2022, 3:10 PMTaras Svirskyi
05/13/2022, 4:05 PMGOOGLE_APPLICATION_CREDENTIALS
env var to point to a default location (/Users/test/.config/gcloud/application_default_credentials.json
) before running an agent didn’t help.Muhammad Daniyal
05/13/2022, 5:29 PMdef workflow1():
@task
def abc(): somelogichere
with Flow('one') as f:
abc()
f.run()
@task
def xyz(): workflow1()
with Flow('main flow') as f:
xyz()
f.run()
Benny Warlick
05/13/2022, 6:21 PMJake
05/13/2022, 6:27 PMJohn Kang
05/13/2022, 6:43 PMMalthe Karbo
05/13/2022, 6:52 PMprefect_test_harness
was moved into a new module (prefect.testing
), that is not available in 2.0b4 - even though it is available in the orion branch in the repo. I created an issue at GH also: https://github.com/PrefectHQ/prefect/issues/5787Arnas
05/13/2022, 7:27 PMAndrew Lawlor
05/13/2022, 8:46 PMError during execution of task: KeyError(<Thread(Dask-Default-Threads-12-578, started daemon 140412823688960)>)
when retrying tasks run on dask. is there special configuration i need to do for a retry with dask?Frederick Thomas
05/13/2022, 9:32 PMException raised while calling state handlers: SystemError('unknown opcode')
Traceback (most recent call last):
File "/mnt/data/prefect/venv/lib/python3.8/site-packages/prefect/engine/cloud/flow_runner.py", line 119, in call_runner_target_handlers
new_state = super().call_runner_target_handlers(
File "/mnt/data/prefect/venv/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 116, in call_runner_target_handlers
new_state = handler(self.flow, old_state, new_state) or new_state
File "/mnt/data/prefect/venv3.10/lib/python3.10/site-packages/prefect/utilities/notifications/notifications.py", line 65, in state_handler
def state_handler(
SystemError: unknown opcode
Steve s
05/14/2022, 2:48 PMcreate_flow_run
(and wait_for_flow_run
) tasks. One of these steps is followed up with a get_task_run_result
, which has always worked without issue until today. Now it's throwing this error: ValueError: The task result cannot be loaded if it is not finished
. I'm not seeing how this could be, since I can see in the logs that the upstream task did in fact finish successfully. I tried explicitly setting the result of wait_for_flow_run
as an upstream dependency of get_task_run_result
(which i think shouldn't be needed), and I also tried setting the poll_time
to 30
, but still no luck. Does anyone have any ideas?Ramzi
05/15/2022, 2:44 AMYou have not configured default storage on the server
or set a storage to use for this deployment but this deployment is using a
Kubernetes flow runner which requires remote storage.
I have already defined the s3 bucket as the storage in prior steps and make sure to even reset it as the default before hand. I have no problem creating the deployment locally only an issue when running it on github actions.Mikkel Duif
05/15/2022, 11:07 AMimport asyncio
import pendulum
from datetime import timedelta
from prefect.orion.schemas.schedules import IntervalSchedule
winter_schedule = IntervalSchedule(
interval=timedelta(hours=24),
anchor_date=pendulum.datetime(2022, 1, 1, 0, 30, 0, tz="Europe/Copenhagen")
)
summer_schedule = IntervalSchedule(
interval=timedelta(hours=24),
anchor_date=pendulum.datetime(2022, 4, 1, 0, 30, 0, tz="Europe/Copenhagen")
)
print(asyncio.run(winter_schedule.get_dates(1))[0])
print(asyncio.run(summer_schedule.get_dates(1))[0])
>>> "2022-05-16T01:30:00+02:00"
>>> "2022-05-16T00:30:00+02:00"
Frank Embleton
05/15/2022, 11:38 AMReturn a future
If a flow returns one or more futures, the final state is determined based on the underlying states.
```from prefect import task, flow
@task
def always_fails_task():
raise ValueError("I am bad task")
@task
def always_succeeds_task():
return "foo"
@flow
def always_succeeds_flow():
x = always_fails_task()
y = always_succeeds_task()
return y```What does it mean by futures here? My understanding was that futures are to do with threading and async in Python, of which I see neither here? What am I missing? 🤔
Raviraja Ganta
05/15/2022, 5:25 PMNash Taylor
05/15/2022, 11:44 PMdavzucky
05/15/2022, 11:58 PMRyan Sattler
05/16/2022, 4:09 AMPREFECT__LOGGING__FORMAT
in the flow config env doesn’t seem to work.Horatiu Bota
05/16/2022, 10:35 AM404 Client Error: Not Found for url
. I've tried various combinations of repo/project/auth credentials for Bitbucket storage, but all with the same issue - has anyone successfully set up Bitbucket storage? (using Prefect 1.1.0)Pedro Machado
05/16/2022, 12:05 PMMathijs Carlu
05/16/2022, 12:12 PMFlorian Guily
05/16/2022, 12:33 PMFlorian Guily
05/16/2022, 12:33 PMAnna Geller
05/16/2022, 12:34 PMFlorian Guily
05/16/2022, 12:35 PMAnna Geller
05/16/2022, 12:42 PMKevin Kho
05/16/2022, 1:28 PMFlorian Guily
05/16/2022, 1:29 PM