Yufei Li
07/06/2023, 2:53 PMLee Mendelowitz
07/06/2023, 5:27 PMdatamongus
07/06/2023, 8:53 PMDaniel Lomartra
07/06/2023, 9:47 PMSandip Viradiya
07/07/2023, 12:51 AMprefect-aws prefect-bitbucket
I am getting below error when I try to run any flow. Any ideas? Thanks in advance 🙂
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 395, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/deployments/deployments.py", line 193, in load_flow_from_flow_run
storage_block = Block._from_block_document(storage_document)
File "/usr/local/lib/python3.10/site-packages/prefect/blocks/core.py", line 618, in _from_block_document
else cls.get_block_class_from_schema(block_document.block_schema)
File "/usr/local/lib/python3.10/site-packages/prefect/blocks/core.py", line 672, in get_block_class_from_schema
return cls.get_block_class_from_key(block_schema_to_key(schema))
File "/usr/local/lib/python3.10/site-packages/prefect/blocks/core.py", line 683, in get_block_class_from_key
return lookup_type(cls, key)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/dispatch.py", line 185, in lookup_type
raise KeyError(
KeyError: "No class found for dispatch key 'bitbucket-repository' in registry for type 'Block'."
gojaeheon
07/07/2023, 6:37 AMDeceivious
07/07/2023, 8:33 AMNico Neumann
07/07/2023, 8:39 AMprefect deployment run
) to watch the logs in the terminal?
In prefect 1 there was a watch flag: prefect run --watch
but I couldn’t find anything in prefect 2Tim-Oliver
07/07/2023, 2:02 PMConcurrentTaskRunner
decide how many workers are used? Is there a way to manipulate this?Junjun Zhang
07/07/2023, 2:57 PMprefect-databricks
is it possible to launch jobs using an existing cluster? the documentation only showed how to create new clusterJavier Ochoa
07/07/2023, 4:02 PMCharles Leung
07/07/2023, 4:29 PMale
07/07/2023, 4:35 PMset_upstream_tasks
(in Prefect v2 I would use wait_for
).
The result is that, once the orchestrator flow is deployed, the Schematic view provides a very clear and useful DAG of all dbt flows.
In Prefect v2 there’s no concept of a pre-defined DAG, hence it’s not possible to have this kind of view before running the flow.
And if you made a mistake in setting dependencies, you will only discover it at runtime, which is not nice 😅
However, for an orchestrator flow, I think it would make a lot of sense to have a sort of Schematic view similarly to what is available in Prefect v1.Rio McMahon
07/07/2023, 4:36 PMwidget-flow
that is mirrored between a dev workspace and a prod workspace.
2. To isolate processes: widget-flow
has its own workspace with prod and dev versions of the flow within that workspace.
My intuition is that 1) is “better” in that it reduces proliferation of agents/work queues. This also begs the additional question: can a single agent monitor multiple work-queues from multiple work queues? Thanks for any insights.Jacob Goldberg
07/07/2023, 4:49 PMupstream_tasks
argument to manually block one task on others, is there an equivalent in prefect 2.x?skrawczyk
07/07/2023, 5:52 PMhello_world
with a deployment called 1.0
that is scheduled to run everyday.
I already have some code in place that can automatically create a new version of hello_world
as a deployment named 1.1
that would also run everyday.
I want to retroactively go back and turn off the schedule for hello_world:1.0
so there aren't 2 versions running at the same time.
I see that the Deployment class has a load()
and update()
function, which would take care of turning off the schedule if I knew which deployment name I needed to turn off.
Does anyone know how I can list all deployments a flow has stored and select the most recent or the ones that have active schedules?Ajeel Ahmed
07/07/2023, 9:12 PMAjeel Ahmed
07/07/2023, 9:45 PMtasks
no, the second task fail will also fail but if you run these tasks via task_runners
and say use the sequential one, it’ll be fine and the second one will run even if the first one failedAjeel Ahmed
07/07/2023, 9:45 PMBryan Rodas
07/07/2023, 10:19 PMTom Klein
07/09/2023, 12:40 PMConcurrentTaskRunner
to the DaskTaskRunner
.
Basically we encounter the issue mentioned here: https://discourse.prefect.io/t/picklingerror-error-when-using-a-dasktaskrunner/908
What’s the best way to work around this, other than changing the object being returned (assuming that’s not feasible) ? is there some way to override how serialization is done on it? or - why does it work with the regular task runner but not with dask?Kyle
07/10/2023, 4:51 AMKyle
07/10/2023, 5:02 AMimport redis
from ratelimiter import RateLimiter
# Connect to your Redis server
redis_conn = redis.Redis(host='localhost', port=6379, db=0)
rate_limiter1 = RateLimiter(redis_conn, max_calls=500, period=10, name='ratelimit1')
rate_limiter2 = RateLimiter(redis_conn, max_calls=1000, period=60, name='ratelimit2')
with rate_limiter1:
try:
response = requests.get(BASEURL)
except Exception as e:
msg = f"[{ticker}] API request failed: {e}"
raise Exception(msg)
Deceivious
07/10/2023, 8:30 AMChainedTask
if that makes sense 😄Farhood Etaati
07/10/2023, 8:49 AMjob_variables
object to a deployment object in python? The docs only talk about yaml
and cli
configurations for work-pool
and base job templating.Ajeel Ahmed
07/10/2023, 1:09 PMLoading flow for deployment 'backup_precog_database_flow'...
11:41:18 AM
Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
11:41:18 AM
Executing flow 'backup-precog-database-flow' for flow run 'dynamic-rooster'...
11:41:19 AM
Beginning execution...
11:41:19 AM
Created task run 'backup_to_drive-9749801d-0' for task 'backup_to_drive'
11:41:19 AM
Executing 'backup_to_drive-9749801d-0' immediately...
Ajeel Ahmed
07/10/2023, 1:09 PMStarting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
when I’m not running any task runner?Ajeel Ahmed
07/10/2023, 1:10 PMtask_runners
?jpuris
07/10/2023, 1:17 PMback off limit
for the job pods?
It is currently set to 6 and we do not want the pods to restart at all, if any failure has occurred.Luke Dolan
07/10/2023, 3:08 PMimport asyncio
from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
from prefect.client.schemas.objects import State, StateType
client = PrefectClient(api=f'<http://xxx.xxx.com:4200/api>')
state_filter = FlowRunFilterState(
type=FlowRunFilterStateType(
any_=[StateType.RUNNING]
)
)
flows_resp = asyncio.get_event_loop().run_until_complete(client.read_flow_runs(flow_run_filter=FlowRunFilter(type=state_filter), limit=1))
<http://logger.info|logger.info>(f'got {len(flows_resp)} flow runs')
for flow in flows_resp:
<http://logger.info|logger.info>(f'flow state details {flow.state}')
this gives the result
16:54:32.780 | Info | httpx - HTTP Request: POST <http://xxx.xxx.com:4200/api/flow_runs/filter> "HTTP/1.1 200 OK"
16:54:32.780032000 [Info ] [httpx] HTTP Request: POST <http://xxx.xxx.com:4200/api/flow_runs/filter> "HTTP/1.1 200 OK"
16:54:32.792552000 [Info ] got 1 flow runs
16:54:32.792795000 [Info ] flow state details Completed()
In the above I would expect that the flow state is RUNNING
rather than COMPLETED