Dennis Hinnenkamp
03/23/2022, 10:00 AMDavid Charles
03/23/2022, 11:23 AMLana Dann
03/23/2022, 3:43 PMVaibhav Shetye
03/24/2022, 5:40 AMWieger Opmeer
03/24/2022, 9:42 AMTim Wright
03/24/2022, 12:49 PMAri Hershowitz
03/24/2022, 7:17 PMMariusz Olszewski
03/25/2022, 5:28 AMDennis Hinnenkamp
03/25/2022, 7:02 AMNiels Prins
03/28/2022, 1:21 PMFailed to load and execute Flow's environment: ValueError('Local Secret "GITEA" was not found.')
When trying the same thin with a docker agent, everything goes fine. What am I missing in my configuration.
We are still on prefect 0.15.3Nikita Samoylov
03/28/2022, 1:22 PMgroovy-woodpecker
, poetic-lorikeet
etc.
And it's really hard to navigate on these runs in the UI because to check input params you need to open every link in flow's RUNS
tab.
I would like to include input params in the flow names.
Is there a way to define these names by myself instead of default ones?
PS: We use Prefect v1.0.0Christian Nuss
03/28/2022, 5:37 PMEgil Bugge
03/29/2022, 12:58 PMChu Lục Ninh
03/29/2022, 1:09 PMRunKubernetesJob
. But everytime I run the flow using KunernetesAgent
, the agent always spin up new job, which in my case is useless and waste of resource.
Since the flow is mainly about spin up new k8s job, I want KubernetesAgent
run it directly.
Please advise me the way to do that.Aniruddha Sengupta
03/30/2022, 10:57 AMnews_url_extraction
which is run in the UI with the arguments in the run configuration: lang
, region
and root_dir
. I want to be able to run this flow in another flow called news_pipeline
and I have created the following code:
# Set the working directory
working_directory = general.home_dir() + settings.WORKING_DIR
# Functions
def execute_pipeline() -> None:
with Flow("news_pipeline_flow",
state_handlers=[prefect_utils.set_run_name]
) as flow:
# Perform the news url extraction flow
news_url_extraction_flow = create_flow_run(
flow_name="news_url_extraction_flow",
project_name="scraper_pipeline",
parameters={
"root_dir": os.environ.get("root_dir"),
"lang": os.environ.get("lang"),
"region": os.environ.get("region")
}
)
# wait_for_flow_news_url_extraction_flow = wait_for_flow_run(
# news_url_extraction_flow,
# raise_final_state=True
# )
# # Perform the news spider flow
# news_spider_flow = create_flow_run(
# flow_name="news_spider_flow",
# project_name="scraper_pipeline"
# )
# wait_for_flow_news_spider_flow = wait_for_flow_run(
# news_spider_flow,
# raise_final_state=True
# )
news_url_extraction_flow
return flow
if __name__ == "__main__":
"""
The main process to get the Simple News pipeline registered
in the Prefect UI.
"""
print("Creating flow")
flow = execute_pipeline()
# Register news pipeline parent flow
print("Creating labels")
labels = [general.retrieve_hostname()]
print("Setting the configuration")
env = {
"root_dir": "the path where the results of the scraped urls should go",
"lang": "the language param",
"region": "the region param",
}
flow.run_config = LocalRun(working_dir=working_directory, labels=labels, env=env)
project_name = "scraper_pipeline"
print(f"Checking project {project_name} exists")
project_existance = prefect_utils.check_project_exists(project_name)
if not project_existance:
print(f"Project {project_name} doesn't exist, creating now")
client = Client()
client.create_project(project_name=project_name)
print("Registering the flow")
flow.register(project_name=project_name)
print(f"Update the agent labels for agent: {settings.PREFECT_AGENT_NAME}")
prefect_utils.update_agent_labels(agent_name=settings.PREFECT_AGENT_NAME, new_labels=labels)
When I run the child flow news_url_extraction
the process completes within a few seconds. But when I run the parent flow news_pipeline
the flow remains in a continuous running state. Is there anything I am missing here?
I should also say that the reason why I am doing this in the first place is I have two flows I want to run: news_url_extraction
and news_spiderl
one after the other. At the moment, I have to manually register both flows then run them separately, but I want them to run them sequentially in one flow instead. The interesting thing to note here is that news_spider
is not able to run on multithreads where as news_url_extraction
is. So the two flows have slight different configurations, namely news_url_extraction
uses LocalDaskExecutor
but news_spider
does not.
Thanks and hope this all makes sense.Michał Augoff
03/30/2022, 1:43 PMdef get_flow(...):
with Flow(...) as f:
tasks
return f
then I use it in another file for deployment and set some env-specific configuration:
from module_x import get_flow
f = get_flow()
f.result = S3Result(bucket=dev or prod bucket here)
# no results with this storage configuration
f.storage = S3(bucket & key, stored_as_script=True, local_script_path=<this file>)
# results work with this storage
f.storage = Docker(...)
f.register()
With S3 storage I don’t get any results saved in S3, everything works fine when using Docker storage. I wonder if this has anything to do with Docker using serialization and S3 using the “as script” approachPatrick Wyatt
03/30/2022, 3:39 PMThomas Opsomer
03/30/2022, 5:26 PMMichał
03/31/2022, 2:54 PMChristian Nuss
03/31/2022, 4:40 PMEgil Bugge
04/01/2022, 9:58 AMMichael Smith
04/01/2022, 10:06 AMShuchita Tripathi
04/01/2022, 3:49 PMJullian Bellino
04/04/2022, 8:38 AMraise RETRY("Partition not available yet, retrying in 1 hour.", start_time=pendulum.now().add(hours=1))
However it looks like this could block the other running tasks because the runner is not released. Now let's imagine we only rely on a single local agent, with maximum 3 tasks in parallel, in case some S3 partitions are late for any reason, it could quickly block all of the others tasks. Do you have any better way to achieve this ?Chris Duke
04/04/2022, 4:27 PMTraceback (most recent call last):
File "/helix_orchestration/flows/reporting/fhir_rpt_reporting_pipeline.py", line 191, in <module>
flow.register(project_name="helix", idempotency_key=flow.serialized_hash())
File "/usr/local/lib/python3.7/site-packages/prefect/core/flow.py", line 1734, in register
idempotency_key=idempotency_key,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 1211, in register
variables=dict(input=inputs),
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 554, in graphql
retry_on_api_error=retry_on_api_error,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 458, in post
retry_on_api_error=retry_on_api_error,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 738, in _request
session=session, method=method, url=url, params=params, headers=headers
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 660, in _send_request
raise ClientError(f"{exc}\n{graphql_msg}") from exc
prefect.exceptions.ClientError: 400 Client Error: Bad Request for url: <http://apollo:4200/>
The following error messages were provided by the GraphQL server: GRAPHQL_VALIDATION_FAILED: Unknown type "register_tasks_input". Did you mean
"register_agent_input", "delete_agent_input", "delete_flow_input",
"delete_tenant_input", or "log_insert_input"?
GRAPHQL_VALIDATION_FAILED: Cannot query field "register_tasks" on type
"Mutation". Did you mean "register_agent", "delete_task", "insert_task", or "update_task"?
I'm not sure what this error is indicating as i have other flows that register fine using this same register call and similar setup. I'm running this locally using docker image on version 0.15.10Lana Dann
04/05/2022, 12:17 AMEgil Bugge
04/05/2022, 6:16 AM@flow
def hello_world(name="world"):
print(f"Hello {name}!")
DeploymentSpec(
flow = hello_world,
name="hello-world",
)
• Made a work queue and connected the work queue to this deployment
• Span up an agent locally and connected it to this work queue
• Clicked into the Deployments tab in the UI and selected "Quick run" which scheduled a run of my flow
Then nothing happened, my local agent didn't pick up the flow. I then made a new work queue which I did not connect to the deployment and connected my agent to this new work queue instead. Now it immediately picked up the flow that I had started earlier and ran it.
Is there something more I need to do to get an agent to pick up flows connected to a deployment?Dennis Hinnenkamp
04/05/2022, 7:41 AMRay Tang
04/05/2022, 8:08 AMflow.register
to deploy to production Prefect servers, which is not ideal.
So I would like to ask if there are any online guilds on how to restrict the access of flow.register
and any recommended deployment pipelines?
ThanksLiam England
04/05/2022, 2:02 PMretry_delay
to the task and have it wait an hour, or would it be better practice to add some sort of handler to reschedule the flow for an hour later? Is the retry_delay
basically a sleep in the backend, or does it handle scheduling the task?