Aleksandr Liadov
01/18/2023, 10:59 AMKubernetesRun
(cpu_request
, memory_request
,cpu_limit
, memory_limit
)
In Prefect2 I use KubernetesJob
, how can I specify the same parameters?Kasper Djernæs
01/18/2023, 1:25 PMKhyaati Jindal
01/18/2023, 1:31 PMStephen Herron
01/18/2023, 2:35 PMSantiago Toso
01/18/2023, 2:52 PMconda create -n my-env python=3.10
• I activate the environment and only instal prefect with pip install -U prefect
• Finally I try to set the prefect server to the right IP using prefect confit set PREFECT_API_URL=<http://XX.XX.XX.XXX:4200>
The following error pops up:
AttributeError: module 'lib' has no attribute 'X509_V_FLAG_CB_ISSUER_CHECK'
. Has anyone seen this error before and can provide guidance?Richard Alexander
01/18/2023, 3:26 PM.prefectignore
set up properly. Details on my problem are in this github issue.
In short, I have large files that are being uploaded on each deployment. And it seems like they are staying there regardless of .prefectignore
. Do I need to delete the entire deployment and redeploy? Also, where does prefect look for .prefectignore
when deploying from a python file?Jeff Hale
01/18/2023, 3:58 PMMia
01/18/2023, 4:00 PMFailed to load and execute flow run: ModuleNotFoundError("No module named '<flow>'")
I’m running on prefect-server with git as a module storage. How do I debug this?Jon
01/18/2023, 4:54 PMcreate_flow_run.map()
and it's spinning up too many threads. im running out of threads/memory. i would rather execute sequentiallyAnders Segerberg
01/18/2023, 5:43 PMcheck
which takes the result of a task run, and returns True
if it failed with a TimeOutError, False
otherwise
Adding a state handler would let me do this, but all im interested in is the terminal state of the task; and it seems one has to handle each state transition, in the state handler. This feels somewhat brittle to me. Moreover, I would have to communicate the state handler's behavior back to the parent Flow.
The other idea is to have a downstream task that sets the timing-out task as an upstream, and changes the trigger to be any_failed. However, this isn't helpful, because the behavior I want is
• If the upstream task times out, this is considered successful behavior
• if it returns before timing out, this is considered failed behavior
So I can't really inspect both conditions in the same function.Roger Dunbar
01/18/2023, 5:48 PMDanilo Drobac
01/18/2023, 7:24 PMprefect cloud login -k PREFECT_API_KEY
and I'm getting the following error:
When not using an interactive terminal, you must supply a `--key` and `--workspace`
Do I have to hardcode a workspace for this to work?Aram Panasenco
01/18/2023, 11:18 PMterraform apply
to deploy the Prefect AWS Terraform module, but it's not showing up in my agents. The terraform command now returns "No changes. Your infrastructure matches the configuration." and doesn't throw any errors, but the agent is just not showing up. How can I troubleshoot this? New to Terraform, not new to Prefect.merlin
01/19/2023, 12:43 AMSlackbot
01/19/2023, 3:53 AMPinakpani Mukherjee
01/19/2023, 3:53 AMDaniel Shinoda
01/19/2023, 3:57 AMCarlos Cueto
01/19/2023, 5:48 AMRendering the template exceeded the CPU, memory, or time limit.
Template source:
`Flow run {{ flow.name }}/{{ flow_run.name }} entered state {{ flow_run.state.name }}
at {{ flow_run.state.timestamp }}.`
Flow ID: {{ flow_run.flow_id }}
Flow run ID: {{ flow_run.id }}
Flow run URL: {{ flow_run|ui_url }}
State message: {{ flow_run.state.message }}
Seems like a bug, any ideas?YD
01/19/2023, 6:37 AMMohit Singhal
01/19/2023, 6:56 AMRuntimeError: File system created with scheme 's3' from base path '<s3://gdm-de-prefect-dev/de_ingest_workfront>' could not be created. You are likely missing a Python module required to use the given storage protocol.
from de_ingest_workfront import de_ingest_workfront
from prefect.blocks.core import Block
from prefect.deployments import Deployment
from prefect.filesystems import S3
storage = S3.load("test")
##In this "test" block we have set path as bucket_name
deployment = Deployment.build_from_flow(
flow=de_ingest_workfront,
name="workfront",
parameters={},
infra_overrides={
"image": "docker_image",
"service_account_name": "test-sa",
"image_pull_policy": "Always",
"namespace": "dp",
},
# infrastructure={"type": "kubernetes-job"},
work_queue_name="k8s",
storage=storage,
path="de_ingest_workfront",
skip_upload=True
)
if __name__ == "__main__":
deployment.apply()
Andreas Nigg
01/19/2023, 7:38 AMRendering the template exceeded the CPU, memory, or time limit.
Template source:
Flow run {{ flow.name }}/{{ flow_run.name }} entered state `{{ flow_run.state.name }}` at {{ flow_run.state.timestamp }}.
Flow ID: {{ flow_run.flow_id }}
Flow run ID: {{ flow_run.id }}
Flow run URL: {{ flow_run|ui_url }}
State message: {{ flow_run.state.message }}
Jari Rosti
01/19/2023, 7:56 AMJens
01/19/2023, 9:28 AM@task()
def test_task(some_param: str):
print(some_param)
@flow()
def test_flow():
test_Task()
If I run this code and check the FlowRun inside the UI (Orion) I only see the given parameters inside the Flow but not inside the Task
If I check the Radar I see test_flow -----> test_task. If I click on the test_flow under the tab parameters I see { "some_param": "blahblah"} but if I click on the test_task and go to the "Task Inputs" tab I see { "some_param": []}
Why I can't se the parameters which hand over from test_flow to test_task?Andreas Nord
01/19/2023, 10:58 AMShruti Hande
01/19/2023, 12:34 PMxyzz
01/19/2023, 1:12 PMBartek Roszak
01/19/2023, 1:42 PMSeif Harrathi
01/19/2023, 3:05 PMHello Hello prefect gurus, I have convinced my company to use Prefect for our workflow orchestrations, So we will have multiple ETL flows and also different teams will work on the same project.
I was thinking about having one Prefect projects that groups all the flows by submodules with one submodule
Something like this:
Seif Harrathi
01/19/2023, 3:06 PMAny recommendation for the best practices in cases like this?
Also now when it comes to starting those different agents, can I start an agent in a different machine that the Prefect Orion Server (PS: I'm thinking about deploying the code on S3 storage)
What y'all thinks ?
Thanks in advance
Carlos Cueto
01/19/2023, 3:56 PMprefect-dask
and my flows that use prefect-shell
's shell_run_command
started failing with the following error:
Encountered exception during execution:
Traceback (most recent call last):
File "C:\scripts\prefect2\lib\site-packages\prefect\engine.py", line 1478, in orchestrate_task_run
result = await task.fn(*args, **kwargs)
File "C:\scripts\prefect2\lib\site-packages\prefect_shell\commands.py", line 86, in shell_run_command
async with await open_process(
File "C:\scripts\prefect2\lib\site-packages\anyio\_core\_subprocesses.py", line 127, in open_process
return await get_asynclib().open_process(
File "C:\scripts\prefect2\lib\site-packages\anyio\_backends\_asyncio.py", line 1105, in open_process
process = await asyncio.create_subprocess_exec(
File "C:\python310\lib\asyncio\subprocess.py", line 218, in create_subprocess_exec
transport, protocol = await loop.subprocess_exec(
File "C:\python310\lib\asyncio\base_events.py", line 1675, in subprocess_exec
transport = await self._make_subprocess_transport(
File "C:\python310\lib\asyncio\base_events.py", line 498, in _make_subprocess_transport
raise NotImplementedError
NotImplementedError
Whenever I do uninstall prefect-dask via pip uninstall prefect-dask
, everything goes back to working smoothly. This seems related to asyncio based on the error. Anyone has any clues as to why this might be happening?