Saksham Dixit
09/02/2022, 5:02 PMEla
09/02/2022, 9:38 PMname
parameter in the task definition_"). I tried to see what's happening in the prefect.tasks.py file (which sends the warning) and I made it print out all the names in the PrefectObjectRegistry.get(). I noticed how it printed out the names of the tasks I'm using multiple times
All of this disappeared when I imported tasks/functions from a different directory than where the flow is; the flow runs only once and the warnings are gone - I'm using Prefect 2.3.1
(also when I used version 1 these double runs happened until I changed the location of the func/tasks I was importing)
I hope this makes sense, if you have any advice on why this happens, please let me know! thanks^^datamongus
09/02/2022, 9:47 PMAlix Cook
09/02/2022, 10:23 PMMatt Fysh
09/02/2022, 11:42 PMAraik Grigoryan
09/03/2022, 1:15 AMGhislain Picard
09/03/2022, 5:24 PMYaron Levi
09/03/2022, 6:17 PMYaron Levi
09/03/2022, 6:17 PMYaron Levi
09/03/2022, 6:17 PMYaron Levi
09/03/2022, 6:18 PMYaron Levi
09/03/2022, 6:18 PMYaron Levi
09/03/2022, 6:18 PMYaron Levi
09/03/2022, 6:19 PMvk
09/03/2022, 6:49 PM@flow
def print_hi():
print(f'Hi, 123') # Press ⌘F8 to toggle the breakpoint.
deployment = Deployment.build_from_flow(
name="print_hi",
flow=print_hi,
skip_upload=True,
infrastructure=KubernetesJob(
image='my-image',
)
)
deployment.apply()
This created a deployment, but why does it say This deployment is deprecated
With the General Availability release of Prefect 2.0, we modified the approach to creating deployments.
? This is the code from latest test cases, prefect 2.3.1, how can it be deprecated? And why is "Run" button inactive?
my-image
is based on prefect-onion:latest
and has my flow code in /opt/prefect/flows
. By the way, how does it know which flow to run if there is more that one flow in that folder?Yaron Levi
09/03/2022, 9:24 PMJari Rosti
09/04/2022, 9:24 AMMichael Levenson
09/04/2022, 5:25 PMGeorgi Yanev
09/04/2022, 9:11 PMYousef Hosny
09/05/2022, 2:25 AMprefect-airbyte
to monitor & schedule my airbyte pipelines but unfortunately I am only getting these logsSaurabh Indoria
09/05/2022, 3:55 AMTan Li
09/05/2022, 5:15 AMprefect deployment build --cron "00 1 * * *"
But how can I specify timezone in cli like how it can be done through the python object interface (I am having a weird bug, i.e. some folder under /tmp/* could not be found if a python deployment get execute in the second time)? I searched a bit on Github but only found some python code, and I also tried to looked at the code implementation (on 2.3.0), but it’s not very clear to me where that TimeZone flag got specified (already got lost in the *args maze) Any help would be appreciated! 🙏🙏🙏Jari Rosti
09/05/2022, 8:01 AMcurl -X 'POST' \
'<http://localhost:4200/api/flow_runs/>' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "my-flow-run",
"flow_id": "adc10e6b-62b1-4a5a-ba75-a9d03107c75c",
"deployment_id": "0a26b313-0134-43ac-b4ab-67bcbeeaebf9",
"parameters": { "flow_run_id": "d3f49714-3b14-4bbf-a42c-7081149b9fb3" }
}'
and get a response:
{
"id": "1277b255-4250-45f7-b0bf-c06630dcac86",
"created": "2022-09-05T07:51:42.448486+00:00",
"updated": "2022-09-05T07:51:42.450190+00:00",
"name": "my-flow-run",
"flow_id": "adc10e6b-62b1-4a5a-ba75-a9d03107c75c",
"state_id": "aae8e698-9df6-4efa-8692-2d9e97e01112",
"deployment_id": "0a26b313-0134-43ac-b4ab-67bcbeeaebf9",
"work_queue_name": null,
...
Mohamed Alaa
09/05/2022, 8:18 AMMatt Fysh
09/05/2022, 8:41 AMNuno Silva
09/05/2022, 9:50 AMBen Muller
09/05/2022, 10:48 AMMalavika S Menon
09/05/2022, 12:02 PMYoussef Ben Farhat
09/05/2022, 12:45 PMNiels Prins
09/05/2022, 1:58 PMNiels Prins
09/05/2022, 1:58 PMAnna Geller
09/05/2022, 9:05 PMNiels Prins
09/06/2022, 8:53 AMimport time
from typing import List
from uuid import uuid4
import prefect
from prefect import Flow, Parameter, Task, task
from prefect.engine.signals import FAIL, LOOP, signal_from_state
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
@task
def get_deltas() -> List:
return [1, 2, 3, 4]
class A(Task):
def run(self, v):
<http://self.logger.info|self.logger.info>(f"{v} A")
if v == 3:
raise ValueError("This is not the value you where looking for")
for i in range(4):
time.sleep(1)
<http://self.logger.info|self.logger.info>(f"{v} - {i}")
return "A"
class B(Task):
def run(self, v):
<http://self.logger.info|self.logger.info>(f"{v} B")
time.sleep(5)
return "B"
class C(Task):
def run(
self,
):
<http://self.logger.info|self.logger.info>("C")
time.sleep(2)
return "C"
class Sequenced(Task):
def run(self, ds: List):
LOG = prefect.context.get("logger")
loop_payload = prefect.context.get("task_loop_result", {"ds": ds})
ds = loop_payload["ds"]
if not ds:
return
d = ds[0]
<http://LOG.info|LOG.info>(d)
# setting this is required, if not the flow is detected as already run, and will not be started
idempotency_key = str(uuid4())
id = create_flow_run.run(
idempotency_key=idempotency_key,
flow_name="sub",
project_name="test",
parameters={"delta_id": d},
)
LOG.warning(id)
# raising final state will result in the final state of this task being raised
# this will lead to not calling the loop value
flow_run = wait_for_flow_run.run(id, stream_logs=True, raise_final_state=False)
state_signal = signal_from_state(flow_run.state)(
message=f"{id} finished in state {flow_run.state}",
result=flow_run,
)
if isinstance(state_signal, FAIL):
raise state_signal
raise LOOP(message="", result={"ds": ds[1:]})
with Flow(name="sub", executor=LocalDaskExecutor()) as sub:
delta_id = Parameter(name="delta_id", required=True)
a = A()(delta_id)
b = B()(delta_id)
C()(upstream_tasks=[a, b])
with Flow(name="main") as flow:
deltas = get_deltas()
Sequenced()(deltas)
if __name__ == "__main__":
sub.register("test")
flow.register("test")
flow.run()
Anna Geller
09/06/2022, 10:41 AMNiels Prins
09/06/2022, 1:16 PMAnna Geller
09/12/2022, 2:40 PM