Tuoyi Zhao
02/22/2022, 10:48 PMWilliam Grim
02/22/2022, 10:52 PMDbtShellTask
, I'm thinking it would be nice if a flow we register and store on s3 also came with its dbt project directory.
Currently, I don't think it's possible and will store the "packaged files" on s3, but I also don't want to start that work if there's already a solution that I haven't seen. 😄Brian Phillips
02/22/2022, 11:49 PMAndreas
02/23/2022, 9:46 AMVipul
02/23/2022, 10:24 AMJean-Baptiste Six
02/23/2022, 10:58 AMwith Flow("sec daily", state_handlers=[flow_handler], result=GCSResult(BUCKET_RESULTS)) as sec_daily_flow:
root_dir = init_dirs("sec", archive=False)
download_dir = download(root_dir, False)
metadatas = metadata(download_dir)
upload(metadatas)
result_locations = split_store_batches(metadatas)
mapped_flow_run_ids = create_flow_run.map(
flow_name=unmapped("index_flow"),
project_name=unmapped("Document Pipeline"),
parameters=result_locations
)
With a "subflow":
with Flow("index_flow", result=GCSResult(BUCKET_RESULTS), state_handlers=[flow_handler]) as index_flow:
# Imports
metadatas_batch_location = Parameter("metadatas_batch_location", required=True)
metadatas_batch = get_result(metadatas_batch_location)
imports = build_import(metadatas_batch)
index_weaviate(imports)
But I have the following error :
prefect.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'dictionary update sequence element #0 has length 1; 2 is required', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
That I'm not sure to understand 😕 I need some help please 🙏
Same error for both 'batches':
• Task 'create_flow_run[0]': Exception encountered during task execution!
• Task 'create_flow_run[1]': Exception encountered during task execution!Italo Barros
02/23/2022, 12:19 PMrequests.exceptions.SSLError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1129)')))
Matthew Seligson
02/23/2022, 2:21 PMАндрій Демиденко
02/23/2022, 4:07 PMAmar Eid
02/23/2022, 4:09 PMfrom prefect.tasks.secrets import PrefectSecret
@task
def my_task(secret_credentials):
return secret_credentials
with Flow('test') as flow:
credentials = PrefectSecret('NAME_OF_SECRET')
my_task(credentials)
According to this documentation https://docs.prefect.io/orchestration/concepts/secrets.html#setting-local-secrets to be able to set these local secrets I have to
1. Create a custom config file by running:
cd ~/.prefect/ && touch config.toml
2. Add the local secrets into this file
But the “.prefect” file doesnt exist on my machine. I have tried to uninstall and install prefect again and it doesnt generate. Any idea why this is happening and what I should do to create this file?
Thanks in advance for the help! //fyi @Henrietta Salonen @Maximilian LutzE Li
02/23/2022, 4:21 PMwith Flow(...) as flow:
data = extract_some_data()
flow_a = create_flow_run(…, parameters={'param-key':data})
flow_b = create_flow_run(…,parameters={'param-key':output_from_flow_a})
Kevin Kho
02/23/2022, 4:54 PMMartim Lobao
02/23/2022, 4:55 PMList
and Dict
tasks in my DAG? this is taken from a barely modified example on prefect’s blog (code in thread)Frederick Thomas
02/23/2022, 5:36 PMFrederick Thomas
02/23/2022, 5:36 PMchris evans
02/23/2022, 6:22 PMWilliam Grim
02/23/2022, 6:41 PMDockerStorage
, if we can, so that registering flows brings along dependencies. We also want to use KubernetesRun
so that prefect can schedule jobs on k8s. For the latter, however, is KubernetesRun
still the right thing to do? Someone outside this community mentioned to us about dask for scheduling instead of the "kubernetes scheduler" (his words; I'm still new to this and figuring it out).
Basically, which way is the "right way"? Adding dask isn't something we're opposed to doing, but we have a lot of stuff going on and are aiming for the lowest effort path, haha. I promise we're not lazy, just overworked.Trevor Sweeney
02/23/2022, 6:53 PMmssql_fetch = SQLServerFetch(db_name='db', user='user', host='host', fetch='all')
Kevin Kho
02/23/2022, 7:33 PMchia berry
02/23/2022, 7:46 PMEmailTask
instead of the transform_and_show task. I want to send the data result in the body of the email. However, I am getting the message AttributeError: 'FunctionTask' object has no attribute 'encode'
. If I make it a string, I get an empty email. I’d like to send the actual result and not the functiontask.Chris Reuter
02/23/2022, 7:55 PMDaniel Komisar
02/23/2022, 8:22 PMAqib Fayyaz
02/23/2022, 8:51 PMMax Lei
02/23/2022, 8:55 PMprefecthq/prefect
and pip install my source code in the image be enough?Dexter Antonio
02/23/2022, 9:08 PMBen Muller
02/23/2022, 10:21 PMdata_update = Parameter("data_update", required=True)
put_dateformatted_data_to_s3.map(
df=data,
bucket_name=unmapped(get_key_value("bucket")),
key_name=unmapped("key"),
suffix=unmapped(f"_{data_update}"),
)
This writes a file with a name of 19_<Parameter: data_update>.parquet
I want it to be the actual param provided in the flowiñigo
02/23/2022, 10:26 PMiñigo
02/23/2022, 10:27 PMJason Motley
02/23/2022, 11:07 PMRio McMahon
02/23/2022, 11:23 PMsrc/
directory) what is the best way to import it? I tried following similar logic to this: https://docs.prefect.io/orchestration/flow_config/storage.html#loading-additional-files-with-git-storage but adding to the import path:
import pathlib, sys
file_path = pathlib.Path(__file__).resolve().parent
sys.path.append(file_path)
But keep getting this error:
[23 February 2022 4:22pm]: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'src'")
Is there a best practice for importing external python code into a flow?Rio McMahon
02/23/2022, 11:23 PMsrc/
directory) what is the best way to import it? I tried following similar logic to this: https://docs.prefect.io/orchestration/flow_config/storage.html#loading-additional-files-with-git-storage but adding to the import path:
import pathlib, sys
file_path = pathlib.Path(__file__).resolve().parent
sys.path.append(file_path)
But keep getting this error:
[23 February 2022 4:22pm]: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'src'")
Is there a best practice for importing external python code into a flow?Kevin Kho
02/23/2022, 11:23 PMRio McMahon
02/23/2022, 11:26 PM# general prefect imports
import prefect
from prefect import task, Flow
from prefect.storage import Git
from prefect.run_configs import ECSRun
from prefect.client import Secret
# specific imports to load files from src/
import pathlib, sys
file_path = pathlib.Path(__file__).resolve().parent
sys.path.append(file_path)
from src.seasonality_index_builder_dynamic_agg import run_seasonality_index_builder_dynamic_agg
# define a wrapper task to expose logging
@task(log_stdout=True, checkpoint=False)
def run_script():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Running script...")
run_seasonality_index_builder_dynamic_agg()
# instantiate the flow - we store the flow definition in gitlab
with Flow("seasonality_index_builder",
storage=Git(
[git info]
),
run_config=ECSRun(
[ECS stuff]
)
) as flow:
run_script()
# Register the flow under the "tutorial" project
flow.register(project_name="Testing",
labels=['ds']
)
Kevin Kho
02/23/2022, 11:28 PMRio McMahon
02/23/2022, 11:47 PMCOPY src /home/mambauser/src
in the dockerfile, then
import pathlib, sys, os
sys.path.append(pathlib.Path(os.environ["HOME"]).resolve())
in the flow. In this case os.environ["HOME"]
should resolve to /home/mambauser
Kevin Kho
02/23/2022, 11:48 PMsrc
as a Python package so it’s accessible wherever the Flow runs. Are you familiar with how to do that?Rio McMahon
02/23/2022, 11:50 PMsrc
into a module then install via pip within my docker container?Kevin Kho
02/23/2022, 11:51 PMsetup.py
?Rio McMahon
02/23/2022, 11:54 PMKevin Kho
02/23/2022, 11:58 PM