Alex F
05/17/2023, 11:29 PMHello All , Anyone see this error starting prefect server v1 ? ERROR: for hasura Cannot start service hasura: exec: "docker-init": executable file not found in $PATH
prefect server start
Pulling postgres ... done
Pulling hasura ... done
Pulling graphql ... done
Pulling apollo ... done
Pulling towel ... done
Pulling ui ... done
Creating network "prefect-server" with the default driver
Creating tmp_postgres_1 ... done
Creating tmp_hasura_1 ... error
ERROR: for tmp_hasura_1 Cannot start service hasura: exec: "docker-init": executable file not found in $P ATH
ERROR: for hasura Cannot start service hasura: exec: "docker-init": executable file not found in $PATH
ERROR: Encountered errors while bringing up the project.
Shivam Tiwari
05/18/2023, 2:18 AMHi All,
Facing an issue with prefect mapping of inputs to task.
save_report = save_to_db.map(
combined_reports_list , upstream_tasks = [add_report_signal])
In this the length of combined_reports_list is 5 and so the task save_to_db is expected to run 5 times but it runs only once, any possible reason for this ? Also when i remove the upstream_tasks it works as expected.
I am using prefect version 0.15.13.
Any help would be appreciated on this. Thanks
Karthik
05/18/2023, 9:00 AMimport asyncio
from prefect import flow
@flow
async def subflow_1():
print("Subflow 1 started!")
some_other_flow()
# post result ops
@flow
async def subflow_2():
print("Subflow 2 started!")
some_other_flow_2()
# post result ops
@flow
async def subflow_3():
print("Subflow 3 started!")
some_other_flow_3()
# post result ops
@flow
async def subflow_4():
print("Subflow 4 started!")
some_other_flow_4()
# post result ops
@flow
async def main_flow():
parallel_subflows = [subflow_1(), subflow_2(), subflow_3(), subflow_4()]
await asyncio.gather(*parallel_subflows)
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
Now when I deploy this (or run locally), flow runs are created for the 4 subflows, but that’s about it, thereafter it’s all sequential, nothing gets processed parallely. For Eg: subflow_2 run proceeds only after subflow_3(in turn some_other_flow_3) have finished.
So in the end it makes no difference to calling each of these subflows sequentially instead of using asycio
Am I missing something else here? Kindly adviceGregory Hunt
05/18/2023, 12:25 PMif __name__ == "__main__":
block on code run. Does it run on the Agent? I am working on setting up a streaming flow with GCP Cloudrun jobs for the worker pool and am curious sync this part of the flow is continuous?Chris Gunderson
05/18/2023, 2:14 PMSlackbot
05/18/2023, 3:00 PMOfir
05/18/2023, 5:07 PMprefect
CLI has support for doing both in the same command?David
05/18/2023, 5:46 PMkasteph
05/18/2023, 5:50 PMBacking off due to consecutive errors, using increased interval of 60.0s.
ConnectionState.CLOSED
httpx.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state
raise mapped_exc(message) from exc
line 77, in map_httpcore_exceptions
File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py"
J
05/18/2023, 7:13 PMChoenden Kyirong
05/18/2023, 8:41 PMDavid
05/18/2023, 9:34 PMAjeel Ahmed
05/18/2023, 10:08 PMDockerRegistry
block) through the image_registry
parameter of DockerContainer
doesn’t work, I can tell that by looking at the Docker registry field in the GUI is still empty. This means I have to manually go to the GUI and point it to the registry. I don’t want to set it myself each time, how do I accomplish this through the library?Pedro Machado
05/18/2023, 11:32 PMRuntimeError: Service exceeded error threshold.
It's crashing while running a dbt job that takes about 30 min. The worker is running on my desktop computer (Ubuntu running on WSL 2). This is what I am running.
Version: 2.10.10
API version: 0.8.4
Python version: 3.10.8
Git commit: 8159450b
Built: Thu, May 18, 2023 3:43 PM
OS/Arch: linux/x86_64
Profile: cloud
Server type: cloud
Any ideas about what may be happening?Deceivious
05/19/2023, 12:37 PMAjeel Ahmed
05/19/2023, 2:19 PMDeceivious
05/19/2023, 2:35 PMRyan Brennan
05/19/2023, 3:20 PMRobert Banick
05/19/2023, 3:50 PMmain
.
Now setting up Prefect I’m having trouble designing a similar system. I’m using Prefect 2 on AWS Elastic Container Service (Fargate) tasks to implement ETL runs. We install our ETL repo as a library onto a docker image that the ECSTask
block Task Definition uses.
I’ve tried replicating our previous system by running pip install --upgrade git@<repo>@<branch>
to upgrade the package in question at the very beginning of my flow. This works and I’m even able to see that the function I’m modifying is indeed updated in /usr/local/lib/python3.10/dist-packages/<package>
.
Nevertheless, when my flow run reaches the crucial step I’m testing, it very clearly uses (and fails on) the “old” function currently in the ETL repo main
. Forcing reloading the repository in question via importlib.reload(<package>)
does not appear to resolve the problem.
My questions therefore are:
1. Is it possible to change a library mid-flow like this or is it a hard limitation of Python / Prefect?
2. Is Python code used by flows somehow installed somewhere different from /usr/local/lib/python3.10/dist-packages/
on the container? Such that pip
would install in the wrong place…
3. If it’s not possible to change a library mid-flow, is it possible to have the Prefect Agent/Worker run the pip
installs prior to spinning up the flow? Could the Agent even read the desired branch names from the flow parameters?
4. Any other ideas?
The nuclear option here is manually changing the branches on docker images but that’s very clunky and will make iterative testing extremely time consuming. So we’d really like to avoid that path.
All help and suggestions most appreciated,
Robertkiran
05/19/2023, 5:26 PM16:50:25.956 | ERROR | prefect._internal.concurrency.services - Service 'EventsWorker' failed.
Any idea what's going on?Jenia Varavva
05/19/2023, 6:03 PMprefect agent start --pool data-qa
and some env vars (as set by prefect-agent helm). The agent picks up work from the queue and launches it fine, but the cancellations don't work. Looking at the agent code, there's a chain of calls: check_for_cancelled_flow_runs -> Agent.get_work_queues() -> update_matched_agent_work_queues() with the last one containing a condition on the whole method body:
if self.work_queue_prefix:
and not doing anything otherwise. Since I don't set work_queue_prefix on the CLI, the agent ends up not matching any queues, so the flows in Cancelling
state never get processed. Is this a bug or am I doing something wrong?Dominic Pham
05/19/2023, 7:00 PMDominic Pham
05/19/2023, 7:01 PMsjammula
05/19/2023, 7:35 PMskip_if_running
how it is handled in prefect2. I'm looking for workaround for skipping the deployment .I did notice we have concurrency limit set at work pool/queue level but I require at deployment level.Anders Segerberg
05/19/2023, 8:19 PM.map()
run over its arguments in-order? Is there a guarantee of that? Or is there no guarantee on the iteration orderFaheem Khan
05/21/2023, 12:41 AMOliver E
05/21/2023, 3:04 AMAlessandro De Rose
05/21/2023, 10:22 AMmypy
is complaining when attempting to unpack a tuple returned from a task. In the example below, when attempting to unpack a
& b
from the call to get_numbers
, mypy
raises the error "None" object is not iterable
. However, the type definition for the get_numbers
function shows that None
should never be returned from the task.
Any idea what's wrong here? (Prefect v2.10.10, mypy v1.3.0)
from prefect import flow, task
@task
def get_numbers(a: int, b: int) -> tuple[int, int]:
return a, b
@flow
def get_numbers_flow() -> None:
a, b = get_numbers(1, 2) # <- "None" object is not iterable.
Nitin Bansal
05/22/2023, 2:56 AMNitin Bansal
05/22/2023, 6:51 AM