Nikolaus Landgraf
07/14/2022, 10:01 AMTom Klein
07/14/2022, 11:35 AMX
that spawn several "children" tasks Y1
... Yn
let's say some of them are stuck on Running
as a finished state - but i wanna guarantee the next phase in the flow executes correctly and ignores the few "hung" tasks --
what's the (default) behavior for if i set those tasks (e.g. Y3
, Y17
and Y91
) to Skipped
? would the next task that depends on them still get executed (even if it has the default all_successful
trigger?)
the reason i'm asking about Skipped
is because i wanna avoid a None
response flowing downstream from these tasksRajeshwar Agrawal
07/14/2022, 12:15 PMconfig.toml
setting for disabling both heartbeat and lazarus for prefect flows on Prefect Server?Abhishek Mitra
07/14/2022, 2:59 PM@task
decorator ?
I get pymysql.err.OperationalError: (2003, "Can't connect to MySQL server on '<http://xyz-mysql8-cluster.cluster-fa43asdfd.us-west-2.rds.amazonaws.com|xyz-mysql8-cluster.cluster-fa43asdfd.us-west-2.rds.amazonaws.com>' (timed out)")
every time. Any suggestions on how to approach in solving this?Vrinda Kailash
07/14/2022, 3:10 PMFuETL
07/14/2022, 3:22 PMJoshua Massover
07/14/2022, 4:22 PMPREFECT__LOGGING__LEVEL=DEBUG
as an environment variable, confirmed it's in the environment, and my agent logs are not showing anything but INFO. Is there anything else that I should need to set?Thomas Opsomer
07/14/2022, 4:23 PMtask_run_state_aggregate
but I don't know how to filter on a specific task 😕
If anyone can help 🙏Joshua Massover
07/14/2022, 5:51 PM[2022-07-14 17:49:44,859] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
[2022-07-14 17:49:44,959] DEBUG - agent | No ready flow runs found.
DEBUG:agent:No ready flow runs found.
DEBUG:agent:Sleeping flow run poller for 10.0 seconds...
[2022-07-14 17:49:44,960] DEBUG - agent | Sleeping flow run poller for 10.0 seconds...
[2022-07-14 17:49:54,960] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
[2022-07-14 17:49:55,046] DEBUG - agent | No ready flow runs found.
DEBUG:agent:No ready flow runs found.
[2022-07-14 17:49:55,046] DEBUG - agent | Sleeping flow run poller for 10.0 seconds...
DEBUG:agent:Sleeping flow run poller for 10.0 seconds...
Anyone have an suggestions on how to continue to debug?Matan Drory
07/14/2022, 5:55 PMMinh Mai
07/14/2022, 6:08 PMflow.run()
will fail once I register it to the UI? I'm current using the local server
prefect backend server
prefect server start
prefect agent local start
the error im getting is
Failed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'utils\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
Sergey Goncharov
07/14/2022, 8:21 PMCreateAndRunJob
marked as succeeded but prefect does not wait for the end of the execution and then function get_our_pod_name
fails because it cannot find Job which is not yet created.
I though to use threading or queue from python here, but I think it's not the right way with Prefect.
Could you please advice here? I believe I do something wrong here.Cab Maddux
07/14/2022, 10:04 PMprefect.packaging.docker.DockerPackager
class and have it push to Google Container Registry. It looks like the current package()
method is unable to push to GCR because the prefect.docker.push_image
here ignores any path provided in the registry url.
So it looks to me like if I provide a registry_url
like <https://gcr.io/my-project-name>
(where my-project-name
is a GCP project) for a flow named my-flow
rather than pushing an image to <http://gcr.io/my-project-name/my-flow|gcr.io/my-project-name/my-flow>
the current implementation will try to push to <http://gcr.io/my-flow|gcr.io/my-flow>
(which fails because I don't have a project named my-flow
).
Wondering if I'm missing something here or maybe this should go into an issue?Alex Fok
07/14/2022, 10:24 PMJoshua Massover
07/14/2022, 10:38 PMdef patch_task():
method = getattr(CloudTaskRunner, "run")
def patched(self, *args, **kwargs):
name = self.task.name
with tracer.trace(f"task", resource=name):
result = method(self, *args, **kwargs)
tracer.flush()
return result
setattr(CloudTaskRunner, "run", patched)
def patch_flow():
method = getattr(CloudFlowRunner, "run")
def patched(self, *args, **kwargs):
name = self.flow.name
with tracer.trace(f"flow", resource=name):
result = method(self, *args, **kwargs)
tracer.flush()
return result
setattr(CloudFlowRunner, "run", patched)
Mohamed Hatem Diabi
07/15/2022, 12:13 AM@task(log_stdout=True)
def create_subflows(list_of_elements):
list_param= []
for element in list_of_elements:
list_param.append(
{
"pr": element,
}
)
mapped_flows = create_flow_run.map(
flow_name=unmapped("Subflow"),
parameters=list_param,
)
with Flow("Parent Flow") as flow:
list_of_elements = [1,2,3]
create_subflows(list_of_elements)
I am getting this error:
`ValueError: Could not infer an active Flow context while creating edge to <Task: create_flow_run>. This often means you called a task outside a with Flow(...) block. If you're trying to run this task outside of a Flow context, you need to call create_flow_run.run(...)
Ievgenii Martynenko
07/15/2022, 6:19 AMwonsun
07/15/2022, 6:56 AMserver.database.connection_url
or server.database.host
to connecting information for MySQL database, not postgres?Mickael ANDRIEU
07/15/2022, 8:11 AMEric So
07/15/2022, 8:38 AMLocalDaskExecutor(scheduler="processes")
while with threads
logs can be seen in UI as normal, any ideas?Mathijs Carlu
07/15/2022, 9:00 AMprefect deployment create file.py
the deployment gets created.
Now, when I modify the flow a little (flow name stays the same), change the deployment name and then re-execute the above command, a new deployment is created. This deployment points at the same flow object (flow_id is the same for both). However, both deployments execute different code, 'different versions of the same flow' if you will, although this 'version number' is not saved anywhere (I think).
This all is due to the fact that the location of the flow code (flow_data) is saved with the deployment, and not with the flow, which seems a little counterintuitive for me. If I see 2 flow runs in the UI that executed the same flow, I would expect them to have executed the same code.Stephen Lloyd
07/15/2022, 10:19 AMharis khan
07/15/2022, 10:24 AMRajvir Jhawar
07/15/2022, 10:43 AMyu zeng
07/15/2022, 12:23 PMfrom prefect.storage import GitHub,GitLab, S3, Webhook
from prefect.backend.artifacts import create_link_artifact
import prefect.engine.cache_validators
@task( task_run_name="mviz_task_{md5}", max_retries=0, cache_for=datetime.timedelta(hours=1),
)
def test( md5, ):
print('do test', md5 )
with Flow("epl" ) as flow:
test( '123' )
test( '123' )
flow.run()
hi, i try to use cache in a single flow run, but i got the belllow output which shows that cache not work. it seems that cache will not work durning same flow run or there are some mistakes in my code ?
[2022-07-15 12:18:00+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'epl'
[2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Starting task run...
[2022-07-15 12:18:00+0000] WARNING - prefect.TaskRunner | Task 'test': Can't use cache because it is now invalid
do test 123
[2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Finished task run for task with final state: 'Cached'
[2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Starting task run...
[2022-07-15 12:18:00+0000] WARNING - prefect.TaskRunner | Task 'test': Can't use cache because it is now invalid
do test 123
[2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Finished task run for task with final state: 'Cached'
[2022-07-15 12:18:00+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Jehan Abduljabbar
07/15/2022, 1:32 PMjack
07/15/2022, 2:02 PMJason
07/15/2022, 3:04 PMalex
07/15/2022, 4:15 PMJosh
07/15/2022, 4:54 PM