Jeffery Newburn
04/14/2021, 4:32 PMVarun Joshi
04/14/2021, 5:22 PMJustin Chavez
04/14/2021, 8:12 PMrun_command
, and inside it is launching the command on a RunNamespacedJob
to use Kubernetes. I have multiple commands that take a while to complete so I would like multiple Namespaced Jobs to run at the same time, I tried using a mapping like:
with Flow as flow:
run_command.map([cmd1, cmd2,...])
But Prefect is running each Namespaced Job in serial. Would switching to a Dask executor be the key? Or could I adjust the map function to achieve parallelization?Riley Hun
04/14/2021, 10:51 PMRanu Goldan
04/15/2021, 3:13 AMJeremy Tee
04/15/2021, 6:58 AMclient.get_flow_run_info("xxxxx")
. Is there another way for me to get the location on where the task results are stored?Lukas N.
04/15/2021, 11:24 AMNo heartbeat detected from the remote task; retrying the run.
Hawkar Mahmod
04/15/2021, 12:15 PMTypeError: can't pickle generator objects
on a task that returns a generator. Now this task is not persisted using a Result
, there is no Result or checkpointing enabled on the task. When I run locally the flow works just fine. However when I trigger via the Prefect UI, and use S3 Storage it tries to persist all tasks I think. This is what this line in the documentation refers to I believe (see image).
How can I get this task to not be persisted by default if that is what is causing this.Mickael Riani
04/15/2021, 1:51 PMJérémy Trudel
04/15/2021, 2:05 PM@task
def extract_copy_history(cursor, schema_table):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Schema table name is {schema_table}.")
Now when I do a quick run on Prefect Cloud, no mention of it appears in my logs despite it being set on "Showing logs for all log levels". I see the log for the task in itself (extract_copy_history) and all other tasks. Just not my custom log.Satheesh K
04/15/2021, 2:38 PMintermediate_result = 0
with Flow("flow") as flow:
param1 = Parameter("list1")
mapped_list = create_map(param1)
results = task1.map(mapped_list)
intermediate_result = results[1]
results2 = task2(results[1])
Greg Roche
04/15/2021, 4:07 PMmain.py
file that imports code from these other files and defines the actual flow logic.
If main.py
is updated, a simple re-registration of the flow seems to be enough to allow the agent to execute the updated flow, because the updated logic is stored on S3 and is then downloaded by the agent during the next execution. However, if one of the other files (which main.py
imports) is changed, re-registration alone isn't enough to allow the agent to execute the updated flow, seemingly because only the content of main.py
is stored on S3 at registration. Practically this means that almost every time we make any change any of our flows, we need to rebuild our docker image with the updated logic, redeploy it, and replace the old agent with the new one, before then re-registering the flow.
Is there some way for us to register a flow so that all of the flow's code, not just the code in the file that defines the flow, is stored in S3 and we don't need to constantly rebuild and redeploy the agent's image for almost every change? Or is there a cleaner approach to solving this issue which has worked for anybody here? Thanks in advance.Robin
04/15/2021, 4:48 PMERROR - prefect.DbtShellTask | Command failed with exit code 1
Any thoughts on what could be wrong or how to get further information and debug the flow are appreciated! 🙂Joseph Loss
04/15/2021, 7:17 PMCarter Kwon
04/15/2021, 7:28 PM<task functions... >
with Flow("ETL Flow", schedule=schedule, storage=Docker(registry_url=os.getenv("REGISTRY_URL"), image_name=os.getenv("IMAGE_NAME")), run_config=ECSRun(task_role_arn=os.getenv("TASK_ROLE_ARN"), execution_role_arn=os.getenv("EXECUTION_ROLE_ARN"))) as flow:
DAYS_AGO = 5
TARGET_DATE = (datetime.now() - timedelta(days=DAYS_AGO)).strftime('%Y-%m-%d')
<use TARGET_DATE to make API calls inside tasks... >
We have a CI/CD process in place that registers our flows after they've been pushed to git. For this particular flow, TARGET_DATE
should equal today's date - 5 days
because the API needs a few days for the analytics to be available. I've noticed that TARGET_DATE
actually ends up being date of flow registration - 5 days
. Is there a way to have this code executed every time the flow is run instead of once at registration so TARGET_DATE
changes every day?Julio Venegas
04/15/2021, 7:36 PMnout
to the task decorator/constructor, or provide a Tuple
return-type annotation to your task.` when I instantiate the Task with nout=2. Class in the thread. Any suggestions?Ryan Baker
04/15/2021, 9:45 PMjack
04/15/2021, 10:25 PMVincent
04/16/2021, 1:44 AMJeremy Tee
04/16/2021, 4:57 AMMatthew Alhonte
04/16/2021, 5:10 AMJames Gibbard
04/16/2021, 2:11 PMJoseph Loss
04/16/2021, 3:39 PMJulio Venegas
04/16/2021, 5:25 PMimport os
from prefect import task
@task
def get_env():
return os.environ.get("CURRENT_ENV")
with Flow(name="get-env") as flow:
env = get_env()
flow.register(project="get_env")
and I have “CURRENT_ENV” in my bash/zsh environment variables, and run the flow in with a LocalAgent, then it’s not necessary to pass any environment variables when I execute prefect agent local start
because the environment variable is already in the local system.
But if wanted to run the flow in a non-local environment, say in a Dask cluster in Kubernetes, then I would need to pass environment variables to prefect agent kubernetes start
?Peter Peter
04/16/2021, 5:37 PMaction_list_operator
was found in your project. Please verify this in your great_expectations.ymlMarc Lipoff
04/16/2021, 5:41 PMimport pandas as pd
from prefect.storage.docker import Docker
# ... task definitions
with Flow('test_flow', storage= Docker(
registry_url=ecr_registry_url,
image_name=a_repo_name,
python_dependencies=["python==1.2"])
) as flow:
# ... all the steps
I then to to execute prefect build -p path/to/file.py
and it throws an error that pandas is not installed (which it isnt)
ModuleNotFoundError: No module named 'pandas'
Is there a way to register a flow, without having to install the flow's dependencies first?Sean Talia
04/16/2021, 7:07 PMSecret
to set this sensitive value as one of the run config's environment variables, e.g. env = { 'SECRET_KEY' : Secret("SECRET_VALUE").get() }
, but I'm wondering if this raises some kind of security issue when registering the flow to my Cloud instance. In this setup, would this Secret
value be retrieved at the time of registration and then sent to Prefect Cloud as a part of the flow's metadata? Or would Prefect know that this env variable should be brought into the run config container only at flow runtime, and it's perfectly safe to do something like this?Cab Maddux
04/16/2021, 7:49 PMTihomir Dimov
04/16/2021, 8:11 PMflow.environment = LocalEnvironment(executor=LocalDaskExecutor(scheduler="threads", num_workers=num_workers))
to achieve the following execution example: get['string1'], map['string1'], save['string1'] -> get['string2'], map['string2'], save['string2'] -> get['string3']..., but we experience some issues with the LocalDaskExecutor, therefore we want to use DaskExecutor instead, but we struggle configuring it to achieve the same result. Currently we use flow.executor = DaskExecutor()
and the tasks run like this: get['string1'], get['string2'], get['string3'] -> map['string1'], map['string2'], map['string3'] -> save['string1']..., which is not recourse-effective. How can we configure the DaskExecutor to achieve the first execution example?Adam Lewis
04/16/2021, 10:38 PMDaskExecutor
with dask-kubernetes
to spin up a dask cluster when a flow starts and running it to process 5,000 files via a mapped task with a few final aggregation tasks. I sometimes see (via the Prefect UI) that the dask cluster appears to spin down near the end of the task, but before it's completely done leaving a few tasks stuck pending with no workers to process them. Has anyone seen this before? If so, how did you solve it?Adam Lewis
04/16/2021, 10:38 PMDaskExecutor
with dask-kubernetes
to spin up a dask cluster when a flow starts and running it to process 5,000 files via a mapped task with a few final aggregation tasks. I sometimes see (via the Prefect UI) that the dask cluster appears to spin down near the end of the task, but before it's completely done leaving a few tasks stuck pending with no workers to process them. Has anyone seen this before? If so, how did you solve it?Kevin Kho
04/17/2021, 2:13 AMAdam Lewis
04/17/2021, 2:28 PMKevin Kho
04/17/2021, 2:32 PM