Andreas Nord
09/05/2022, 3:13 PMRhys Mansal
09/05/2022, 3:29 PMTim Helfensdörfer
09/05/2022, 3:40 PMwork_queue_name
and tags
. Both work queues are running at the moment, e.g. with prefect agent start --work-queue "Production"
. But only the Staging work queue picks up jobs. The production work queue does not.
It is correctly assigned in the cloud ui, i.e. I can see the late jobs in the work queue tab, but the queue does not get any data: (moved to thread)F. Nikita Thomas
09/05/2022, 4:54 PMprefect[viz]
was installed, but with the current version ,<2.3.1> , I'm having difficulties get some basic code to work - Could someone please assist? Thanks!
import requests
from pprint import pprint as pp
from prefect import flow, task, Flow
import pandas as pd
import json
## extract
@task(name="extract")
def get_data():
r = requests.get("<https://rickandmortyapi.com/api/character/>")
return r.json()
@task
def transform(r: dict):
df = pd.json_normalize(r["results"])
print(df)
@task(name="load")
def write_frame(df: pd.DataFrame):
pass
"""
## This is the way to visualize flow before current version
with Flow("ETL") as flow:
e = get_data()
t = transform(e)
l = write_frame(t)
flow.visualize()
"""
@flow
def flow_test():
e = get_data()
t = transform(e)
l = write_frame(t)
flow_test().visualize() # This doesn't work...
Arun Giridharan
09/05/2022, 4:58 PMSomething went wrong. Please wait a few moments and try again
. Any idea how I can debug this?Tom Kaszemacher
09/05/2022, 5:20 PMEsdras Lopes Nani
09/05/2022, 8:21 PMIan Andres Etnyre Mercader
09/06/2022, 12:33 AMorion_agent-orion_server-1 | 2022-09-06 00:21:29,528 - distributed.worker - WARNING - Compute Failed
orion_agent-orion_server-1 | Key: 1843a132-66f7-45c1-9c76-9736ca31cd4a
orion_agent-orion_server-1 | Function: begin_task_run
orion_agent-orion_server-1 | args: ()
orion_agent-orion_server-1 | kwargs: {'task': <prefect.tasks.Task object at 0x7f28a2f18a60>, 'task_run': TaskRun(id=UUID('9ac621e9-cac9-4ff7-813e-da0ce5d2bda3'), created=DateTime(2022, 9, 6, 0, 15, 11, 280979, tzinfo=Timezone('+00:00')), updated=DateTime(2022, 9, 6, 0, 15, 12, 995201, tzinfo=Timezone('+00:00')), name='download_xml-39058d84-636', flow_run_id=UUID('258498db-6397-412e-a79a-39f252b8316a'), task_key='lib.dask_context.dask_context.<locals>._dask_task', dynamic_key='636', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=15, retry_delay=2), tags=['map_process'], state_id=UUID('79733d18-24d6-43b5-9f5f-1b04f3022edd'), task_inputs={'args': [TaskRunResult(input_type='task_run', id=UUID('fb87e646-0e3b-4647-abe5-8dea70156bbd'))], 'kwargs': []}, state_type=StateType.PENDING, state_name='Pending', run_count=0, expected_start_time=DateTime(2022, 9, 6, 0, 15, 11, 280780, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_
orion_agent-orion_server-1 | Exception: "RuntimeError('The connection pool was closed while 1 HTTP requests/responses were still in-flight.')"
are they the attempts of the task that failed and are going to retry? or are they something else?Faheem Khan
09/06/2022, 1:09 AMEmon Li
09/06/2022, 2:25 AMClovis
09/06/2022, 8:55 AM######## ERROR 1
<...>
File "/root/.local/lib/python3.9/site-packages/prefect/tasks/airbyte/airbyte.py", line 345, in run
job_id, job_created_at = self._trigger_manual_sync_connection(
TypeError: cannot unpack non-iterable NoneType object
######## ERROR 2
<...>
File "/root/.local/lib/python3.8/site-packages/prefect/tasks/airbyte/airbyte.py", line 333, in run
job_status, job_created_at, job_updated_at = self._get_job_status(
TypeError: cannot unpack non-iterable NoneType object
The task failed even if the Airbyte sync is correctly launched.
That’s odd because I did not make any modifications since yesterday on either 😛refect: Prefect or Airbyte. I first encountered this issue with prefect core v1.1.0, and I face the same behavior after upgrading my core version to v1.3.0.
Have other people experienced this behavior ?Barada Sahu
09/06/2022, 8:59 AM@flow
def test_flow(nums: list = None):
test_futures = [square.submit(num) for num in nums]
return do_cleanup.submit(wait_for=test_futures)
In the above does do_cleanup get invoked or does the flow directly go into a failed state?Hamza Naanani
09/06/2022, 11:53 AM--name
argument, and there isn't an argument to specify the flow name in the CLI (unless we do it by hand after the creation of the file). I believe that's not a good behaviour since it complicates managing lot of flows.
For instance let's assume we have this structure:
- Flows
--| - flow_1.py
--| - flow_2.py
If both files look like this
@task
def test_task():
return 'hello'
@flow
def main():
message = test_task()
print(message)
if __name__= '__main__':
main()
The flow name in the yaml file will be the same (main), and that will put confusion on the prefect UI. As we'll have the same flow but with different deployment names, but it's not the same flow code under the hood.
Is there a better way to handle this apart from changing the name after the creation of the yaml file ?Vlad Tudor
09/06/2022, 12:22 PMFailed to load and execute flow run: ModuleNotFoundError("No module named '/root/'")
I am running the Prefect Agent on a separate machine than the Prefect Server, but the communication works (The Flow starts on the Agent)
Thanks!Matt Delacour
09/06/2022, 1:07 PMRajvir Jhawar
09/06/2022, 1:20 PMdeployment build --override customizations = resource_object
Timo
09/06/2022, 1:39 PMprefect-dbt
's trigger_dbt_cli_command()
inside another task?
This does not work for async
tasks.
ThanksKyle McChesney
09/06/2022, 3:11 PM@task
def required():
return [
1,
2,
3,
]
@task
def optional_case():
return True
@task
def optional(data):
data.append(4)
return data
@task
def report_case():
return False
@task(skip_on_upstream_skip=False)
def report():
print('all done!')
with Flow(
'test',
executor=LocalDaskExecutor(),
) as flow:
data = required()
with case(optional_case, True):
opt_data = optional(data)
with case(report_case, True):
report(upstream_tasks=(data, opt_data,))
The idea is there there are 2 variables, summarized as:
• variable 1: should optional data processing be run
• variable 2: should reporting be run at the end (note that reporting does not take data as an explicit input)
I am trying to achieve this in prefect one using case
statements. The issue is that I need to set skip_on_upstream_skip
on report, so that it runs even if optional data processing is not run. I just want to ensure that report is run if its case is True, otherwise it is not run, but it must only be run AFTER data
and optional
have runAndreas Nord
09/06/2022, 3:12 PMfrom prefect import task, Flow
def foo(x):
return _foo(x, task_args=dict(name=x))
@task(task_run_name="{x}")
def _foo(x):
print(x)
with Flow("flow") as flow:
foo('a') # should be named 'a'
foo('b') # should be named 'b'
flow()
Igor Morgunov
09/06/2022, 4:57 PM@task()
def task1():
# do stuff
@task()
def task2():
# do other stuff
ids = ['aaa', 'bbb', 'ccc']
for id in ids:
x = task1()
y = task2(upstream_tasks=[x])
flow.run(executor=LocalDaskExecutor)
My problem is that task2()
fires when the first instance of task1()
completes successfully - I need task2()
to fire only once all of task1()
instances have completed - what am I doing wrong here?Barada Sahu
09/06/2022, 5:16 PMwait_for
), it’s never triggered, rather goes into an undocumented NotReady
state.
To overcome this, I have to set .result(raise_on_failure=False)
on the upstream task. See images below for states. See screengrabs 👇 - left fails, right works.
This seems like a common DAG use-case which should be handled well by prefect, how do we wait on a set of upstream tasks to complete and then perform a cleanup / notification at the end (with an aggregate of results from all upstream tasks).Sam Garvis
09/06/2022, 6:59 PMKrishnan Chandra
09/06/2022, 7:00 PMcancel_flow_run
function but I don’t see a way to restart the run once cancelledNathaniel Russell
09/06/2022, 8:11 PMError downloading Flow from S3: An error occurred (AccessDenied) when calling the GetObject operation: Access Denied
I keep getting this error when flows are supposed to download their definitions from storage. I have both a prefect user and prefect role specified in the bucket's policy.Bradley Hurley
09/06/2022, 9:07 PMJohn Kang
09/06/2022, 9:09 PMpip install "prefect==2.3.2"
ERROR: Could not find a version that satisfies the requirement prefect==2.3.2 (from versions: 0.5.0, 0.5.1, 0.5.2, 0.5.3, 0.5.4, 0.5.5, 0.6.0, 0.6.1, 0.6.2, 0.6.3, 0.6.4, 0.6.5, 0.6.6, 0.6.7, 0.7.0, 0.7.1, 0.7.2, 0.7.3, 0.8.0, 0.8.1, 0.9.0, 0.9.1, 0.9.2, 0.9.3, 0.9.4, 0.9.5, 0.9.6, 0.9.7, 0.9.8, 0.10.0, 0.10.1, 0.10.2, 0.10.3, 0.10.4, 0.10.5, 0.10.6, 0.10.7, 0.11.0, 0.11.1, 0.11.2, 0.11.3, 0.11.4, 0.11.5, 0.12.0, 0.12.1, 0.12.2, 0.12.3, 0.12.4, 0.12.5, 0.12.6, 0.13.0, 0.13.1, 0.13.2, 0.13.3, 0.13.4, 0.13.5, 0.13.6, 0.13.7, 0.13.8, 0.13.9, 0.13.10, 0.13.11, 0.13.12, 0.13.13, 0.13.14, 0.13.15, 0.13.16, 0.13.17, 0.13.18, 0.13.19, 0.14.0, 0.14.1, 0.14.2, 0.14.3, 0.14.4, 0.14.5, 0.14.6, 0.14.7, 0.14.8, 0.14.9, 0.14.10, 0.14.11, 0.14.12, 0.14.13, 0.14.14, 0.14.15, 0.14.16, 0.14.17, 0.14.18, 0.14.19, 0.14.20, 0.14.21, 0.14.22, 0.15.0, 0.15.1, 0.15.2, 0.15.3, 0.15.4, 0.15.5, 0.15.6, 0.15.7, 0.15.8, 0.15.9, 0.15.10, 0.15.11, 0.15.12, 0.15.13, 1.0rc1, 1.0.0, 1.1.0, 1.2.0, 1.2.1, 1.2.2, 1.2.3, 1.2.4, 1.3.0, 2.0a1, 2.0a2, 2.0a3, 2.0a4, 2.0a5, 2.0a6, 2.0a7, 2.0a8, 2.0a9, 2.0a10, 2.0a11, 2.0a12, 2.0a13, 2.0b1, 2.0b2, 2.0b3, 2.0b4, 2.0b5, 2.0b6, 2.0b7, 2.0b8, 2.0b9, 2.0b10, 2.0b11, 2.0b12, 2.0b13, 2.0b14, 2.0b15, 2.0b16, 2.0.0, 2.0.1, 2.0.2, 2.0.3, 2.0.4, 2.1.0, 2.1.1, 2.2.0, 2.3.0, 2.3.1)
ERROR: No matching distribution found for prefect==2.3.2
Darin Douglass
09/06/2022, 9:38 PMfastapi
has a way to override the default operationId
which helps give endpoints better fn names. any chance orion could implement this? as they’re currently implemented, the operation ids (though descriptive) are pretty bad from a usability-perspective 😕Chris Goddard
09/06/2022, 10:07 PMAlexey Stoletny
09/06/2022, 10:09 PMAlexey Stoletny
09/06/2022, 10:09 PM