Noam polak
02/22/2022, 7:01 AMKonstantin
02/22/2022, 9:48 AMHedgar
02/22/2022, 9:49 AMawswrangler
to save my dataframe to s3. my environment is already awscli
compliant, do I need to go through prefect Storage?
Secondly if I'm doing all this with aws ec2 remote instance how do I activate my flow. Can I do on the command line: python myfirstprefectflow.py
? Assuming I have already indicated a schedule in my flow code ? Or do I use crontab to schedule again š¤. I would be glad if I could be pointed in the right direction on using aws ec2 to run prefect.Steph Clacksman
02/22/2022, 10:31 AMwith Flow("my-flow) as flow:
# do some things here
# EITHER flow.register("project_name")
# OR flow.run(executor=DaskExecutor())
I want to run my flow through prefect cloud, so I understand that I need to register my flow , but when I want to run it it looks like I need to edit the code, which seems a bit weird when it's dockerised and running on a remote server.
Is it normal to register the flow like above before building the docker image, then switch it to the run command before building? Or is there something I'm missing?Daniel Nilsen
02/22/2022, 10:44 AMModuleNotFoundError("No module named 'parameters'")
root
- src
- flows
- flow_1
Ā Ā Ā Ā - flow_1.py
Ā Ā Ā Ā - parameters.py
I am registering the flow with this
bin/prefect register --project «myProject» --path ./src/flows/flow_1/flow_1.py
When I run the flow locally withĀ flow.run()
Ā it works fineĀ š¤Frederick Thomas
02/22/2022, 1:02 PMFrederick Thomas
02/22/2022, 4:13 PMDonnchadh McAuliffe
02/22/2022, 4:37 PM@task(retries=2, retry_delay_seconds=10)
def task_1(time_to_sleep: int):
raise Exception()
time.sleep(time_to_sleep)
@task(retries=3, retry_delay_seconds=10)
def task_2(time_to_sleep: int):
time.sleep(time_to_sleep)
@flow(name="steps_flow_dask", task_runner=DaskTaskRunner(address={dask_address}))
def flow_to_run_tasks():
task_1(3)
task_2(2)
Currently when task_1
throws the exception task_2
immediately executes. The behaviour I want is that task_2
doesn't execute unless task_1
is successful. Also, if task_1
hits its' maximum number of retries then task_2
should not execute.
Any ideas? This is on Orion.David Elliott
02/22/2022, 4:53 PMDeliveroo
prefect tenant, happy to send over some specifics / URLs if helpfulGonzalo Stillo
02/22/2022, 5:04 PMbrian
02/22/2022, 5:21 PMMehdi Nazari
02/22/2022, 5:28 PMMatthew Seligson
02/22/2022, 5:56 PMScott Aefsky
02/22/2022, 6:57 PMJacqueline Riley Garrahan
02/22/2022, 8:05 PMruns = FlowRunView._query_for_flow_run(where={"flow_id": {"_eq": id}})
Constantino Schillebeeckx
02/22/2022, 8:59 PMRUNNING
tasks like those shown below which have been running for days? Are those truly still running? Am I being billed for idle compute here?Chris Martinez
02/22/2022, 9:00 PM0.15.12
from 0.14.21
Failed to load and execute Flow's environment: NotGitRepository('No git repository was found ...
I am using the git storage interface and have no issues when using version 0.14.21
Matthias
02/22/2022, 9:04 PMprefect run
in Orion? Or do you have to run flows with python flow.py
?Lee Cullen
02/22/2022, 9:11 PMFailed to load and execute Flow's environment: ValueError('No flows found in file.')
.
The flow context manager is defined within the body of a function that returns a flow, I'm wondering if that is the reason why cloud can't find the flow?Leon Kozlowski
02/22/2022, 9:28 PMdbt-ol run
)Wesley Jin
02/22/2022, 10:08 PMFailed to load and execute Flow's environment: ModuleNotFoundError("No module named 'constants'")
Iām importing constants which will be shared across many flows: the project name, ECS cluster name, Github Token secret name from a constants.py
file in the same directory. Running the flow locally with flow.run()
works just fine as expected, but when running it in ECS constants
is not found. How do I register other module dependencies with Github storage, if possible? Or do I have to add these constant values to each of my flow files? Thank you!
Example below:
. (repo root)
|____flows
| |____hello.py
| |____constants.py
| |______init__.py
Jack Chang
02/22/2022, 10:17 PMTuoyi Zhao
02/22/2022, 10:48 PMWilliam Grim
02/22/2022, 10:52 PMDbtShellTask
, I'm thinking it would be nice if a flow we register and store on s3 also came with its dbt project directory.
Currently, I don't think it's possible and will store the "packaged files" on s3, but I also don't want to start that work if there's already a solution that I haven't seen. šBrian Phillips
02/22/2022, 11:49 PMAndreas
02/23/2022, 9:46 AMVipul
02/23/2022, 10:24 AMJean-Baptiste Six
02/23/2022, 10:58 AMwith Flow("sec daily", state_handlers=[flow_handler], result=GCSResult(BUCKET_RESULTS)) as sec_daily_flow:
root_dir = init_dirs("sec", archive=False)
download_dir = download(root_dir, False)
metadatas = metadata(download_dir)
upload(metadatas)
result_locations = split_store_batches(metadatas)
mapped_flow_run_ids = create_flow_run.map(
flow_name=unmapped("index_flow"),
project_name=unmapped("Document Pipeline"),
parameters=result_locations
)
With a "subflow":
with Flow("index_flow", result=GCSResult(BUCKET_RESULTS), state_handlers=[flow_handler]) as index_flow:
# Imports
metadatas_batch_location = Parameter("metadatas_batch_location", required=True)
metadatas_batch = get_result(metadatas_batch_location)
imports = build_import(metadatas_batch)
index_weaviate(imports)
But I have the following error :
prefect.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'dictionary update sequence element #0 has length 1; 2 is required', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
That I'm not sure to understand š I need some help please š
Same error for both 'batches':
⢠Task 'create_flow_run[0]': Exception encountered during task execution!
⢠Task 'create_flow_run[1]': Exception encountered during task execution!Italo Barros
02/23/2022, 12:19 PMrequests.exceptions.SSLError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1129)')))