https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Tuoyi Zhao

    02/22/2022, 10:48 PM
    Hi everyone, is there a way I can use FileHandler or other means to write log to a log file when I use Prefect Cloud + Local Agent? The FileHandler works when I run locally with flow.run() but it writes nothing if I register the flow and run it on Prefect Cloud.
    k
    7 replies · 2 participants
  • w

    William Grim

    02/22/2022, 10:52 PM
    Hey all! Has anyone here needed to "package" regular files with their flows? We can, of course, upload "file packages" somewhere else and download them each time a flow runs, but specifically for the
    DbtShellTask
    , I'm thinking it would be nice if a flow we register and store on s3 also came with its dbt project directory. Currently, I don't think it's possible and will store the "packaged files" on s3, but I also don't want to start that work if there's already a solution that I haven't seen. 😄
    k
    m
    10 replies · 3 participants
  • b

    Brian Phillips

    02/22/2022, 11:49 PM
    I'm trying to implement flat-mapping in a flow, but the final task doesn't seem to run the correct number of times. Code in thread.
    k
    11 replies · 2 participants
  • a

    Andreas

    02/23/2022, 9:46 AM
    Hi all! Does anyone have experience with controlling BigQuery reservations (Flex slots) from Prefect? Any best practices to share? I'm especially wondering how to ensure slots are canceled after a run. My basic use case would be to lower our BQ costs on our dbt runs: 1. Purchase flex slots 2. Run dbt models 3. Cancel flex slots Thanks for your input!
    a
    2 replies · 2 participants
  • v

    Vipul

    02/23/2022, 10:24 AM
    Quick Question, I have recently seen an issue where the Agent does not pick up the triggered flows. The Prefect Server and Prefect Agent are both running & the labels on the Flow versus the Agent match, so that is not the issue. Is there anything obvious that I need to check? Appreciate the help. I am running on version 0.14.8
    a
    8 replies · 2 participants
  • j

    Jean-Baptiste Six

    02/23/2022, 10:58 AM
    Hi ! I have the following flow :
    with Flow("sec daily", state_handlers=[flow_handler], result=GCSResult(BUCKET_RESULTS)) as sec_daily_flow:
        root_dir = init_dirs("sec", archive=False)
        download_dir = download(root_dir, False)
        metadatas = metadata(download_dir)
        upload(metadatas)
        result_locations = split_store_batches(metadatas)
        mapped_flow_run_ids = create_flow_run.map(
            flow_name=unmapped("index_flow"),
            project_name=unmapped("Document Pipeline"),
            parameters=result_locations
        )
    With a "subflow":
    with Flow("index_flow", result=GCSResult(BUCKET_RESULTS), state_handlers=[flow_handler]) as index_flow:
        # Imports
        metadatas_batch_location = Parameter("metadatas_batch_location", required=True)
        metadatas_batch = get_result(metadatas_batch_location)
        imports = build_import(metadatas_batch)
        index_weaviate(imports)
    But I have the following error :
    prefect.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'dictionary update sequence element #0 has length 1; 2 is required', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    That I'm not sure to understand 😕 I need some help please 🙏 Same error for both 'batches': • Task 'create_flow_run[0]': Exception encountered during task execution! • Task 'create_flow_run[1]': Exception encountered during task execution!
    a
    6 replies · 2 participants
  • i

    Italo Barros

    02/23/2022, 12:19 PM
    Hello everyone, does someone knows how to disable the SSL Verification when using prefect? Is there's something like the "*--set ssl_verify True*"? I'm facing some problems with auth and running the agent due to a corporate network. The following error occurs when I use the "*prefect auth login --key*":
    requests.exceptions.SSLError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1129)')))
    d
    a
    22 replies · 3 participants
  • m

    Matthew Seligson

    02/23/2022, 2:21 PM
    What options do we have for decommissioning a flow we no longer use? Can we archive it without deleting it? Can we do any of this via the API?
    a
    8 replies · 2 participants
  • а

    Андрій Демиденко

    02/23/2022, 4:07 PM
    Hi everybody, Does anybody know how to make a one-time artifact for a flow run? Love this feature but it is a little pain to delete them)) I would really appreciate to hear your experience. Thank you in advance
    k
    5 replies · 2 participants
  • a

    Amar Eid

    02/23/2022, 4:09 PM
    Hello everyone! I am very new to Prefect and have already started to build my new Flow.🙌 When trying to work with Prefect Secrets locally, using the following type of Secrets syntax in my local flow should run as well :
    from prefect.tasks.secrets import PrefectSecret
    
    @task
    def my_task(secret_credentials):
        return secret_credentials
    
    with Flow('test') as flow:
        credentials = PrefectSecret('NAME_OF_SECRET')
        my_task(credentials)
    According to this documentation https://docs.prefect.io/orchestration/concepts/secrets.html#setting-local-secrets to be able to set these local secrets I have to 1. Create a custom config file by running:
    cd ~/.prefect/ && touch config.toml
    2. Add the local secrets into this file But the “.prefect” file doesnt exist on my machine. I have tried to uninstall and install prefect again and it doesnt generate. Any idea why this is happening and what I should do to create this file? Thanks in advance for the help! //fyi @Henrietta Salonen @Maximilian Lutz
    k
    2 replies · 2 participants
  • e

    E Li

    02/23/2022, 4:21 PM
    Hi, if I have 2 sub-flows in a main flow like below, can the flow_b get the output of flow_a as its input?
    with Flow(...) as flow:
        data = extract_some_data()
        flow_a = create_flow_run(…, parameters={'param-key':data})
        flow_b = create_flow_run(…,parameters={'param-key':output_from_flow_a})
    k
    8 replies · 2 participants
  • k

    Kevin Kho

    02/23/2022, 4:54 PM
    Join me in 5 minutes for a Meetup with the MLOps community where I will demo using Prefect with Pandera, Tune, and Evidently AI. Link
    :upvote: 1
    🙌 2
    👍 2
    🚀 1
  • m

    Martim Lobao

    02/23/2022, 4:55 PM
    is there a way i can avoid the superfluous
    List
    and
    Dict
    tasks in my DAG? this is taken from a barely modified example on prefect’s blog (code in thread)
    a
    5 replies · 2 participants
  • f

    Frederick Thomas

    02/23/2022, 5:36 PM
    Hello,
    👋 1
  • f

    Frederick Thomas

    02/23/2022, 5:36 PM
    Quick question is there a way to remove/delete a flow from a project?
    a
    2 replies · 2 participants
  • c

    chris evans

    02/23/2022, 6:22 PM
    guys need to evaluate airflow vs prefect vs dagster which one should we go for?
    m
    a
    15 replies · 3 participants
  • w

    William Grim

    02/23/2022, 6:41 PM
    Hey all! Our team is going to try and setup prefect to be more "proper". We are going to use
    DockerStorage
    , if we can, so that registering flows brings along dependencies. We also want to use
    KubernetesRun
    so that prefect can schedule jobs on k8s. For the latter, however, is
    KubernetesRun
    still the right thing to do? Someone outside this community mentioned to us about dask for scheduling instead of the "kubernetes scheduler" (his words; I'm still new to this and figuring it out). Basically, which way is the "right way"? Adding dask isn't something we're opposed to doing, but we have a lot of stuff going on and are aiming for the lowest effort path, haha. I promise we're not lazy, just overworked.
    a
    k
    25 replies · 3 participants
  • t

    Trevor Sweeney

    02/23/2022, 6:53 PM
    Hello, for SQLServerFetch I'm receiving a login failed error due to invalid connection string attribute (0). Compared code with a teammate who can successfully fetch so not entirely sure the issue.
    mssql_fetch = SQLServerFetch(db_name='db', user='user', host='host', fetch='all')
    k
    3 replies · 2 participants
  • k

    Kevin Kho

    02/23/2022, 7:33 PM
    Join us tomorrow on February 24, 2022 at 11AM PST (2PM EST) to learn how to generate end-to-end lineage graphs of your data pipelines using Monte Carlo and Prefect so you can reduce the time to detection and resolution for critical data incidents: https://www.montecarlodata.com/how-to-build-more-reliable-data-pipelines-with-monte-carlo-and-prefect/
    :upvote: 1
  • c

    chia berry

    02/23/2022, 7:46 PM
    Hey yall, I’m looking for some help with sending emails with child/parent flows. I have a setup like this: https://discourse.prefect.io/t/how-to-pass-data-from-a-child-flow-to-another-task-in-a-parent-flow/126, but with an
    EmailTask
    instead of the transform_and_show task. I want to send the data result in the body of the email. However, I am getting the message
    AttributeError: 'FunctionTask' object has no attribute 'encode'
    . If I make it a string, I get an empty email. I’d like to send the actual result and not the functiontask.
    👀 1
    k
    13 replies · 2 participants
  • c

    Chris Reuter

    02/23/2022, 7:55 PM
    Hey everyone - piling on with another livestream! Join @Dylan and myself on PrefectLive 📺 at 3p Eastern (5 minutes from now) as he writes some Prefect flows with a Twilio integration. We definitely won't be trolling his roommates! 🙂 See you there.
    👍 4
    1 reply · 1 participant
  • d

    Daniel Komisar

    02/23/2022, 8:22 PM
    Hello everyone, is there any code to help with monitoring the state of a flow from code running within the flow? I know how to do it if you have a flow run id already, but I’m not sure how to get this from within that flow. Thanks.
    k
    m
    5 replies · 3 participants
  • a

    Aqib Fayyaz

    02/23/2022, 8:51 PM
    i am trying to register flow with server deployed on gke and docker for flow storage, and getting belw mentioned error for Dockerfile
    k
    m
    +1
    46 replies · 4 participants
  • m

    Max Lei

    02/23/2022, 8:55 PM
    Hi All, if I want to build an image for dask cluster on fargate, would extending
    prefecthq/prefect
    and pip install my source code in the image be enough?
    k
    4 replies · 2 participants
  • d

    Dexter Antonio

    02/23/2022, 9:08 PM
    Can someone point me to an article on why prefect 2.0 is doing away with DAGs? I’m struggling to see how DAGs are restrictive.
    k
    3 replies · 2 participants
  • b

    Ben Muller

    02/23/2022, 10:21 PM
    Hi Community, How do I get the actual value of a parameter?
    data_update = Parameter("data_update", required=True)
    
        put_dateformatted_data_to_s3.map(
            df=data,
            bucket_name=unmapped(get_key_value("bucket")),
            key_name=unmapped("key"),
            suffix=unmapped(f"_{data_update}"),
        )
    This writes a file with a name of
    19_<Parameter: data_update>.parquet
    I want it to be the actual param provided in the flow
    k
    4 replies · 2 participants
  • i

    iñigo

    02/23/2022, 10:26 PM
    Hi there,
  • i

    iñigo

    02/23/2022, 10:27 PM
    Hi there, I'm trying to use mapping in a flow that has a merge function in the middle of the flow. How could I use map in this flow.
    k
    2 replies · 2 participants
  • j

    Jason Motley

    02/23/2022, 11:07 PM
    What's the best way to set up a flow to run for N minutes (hours) maximum?
    k
    2 replies · 2 participants
  • r

    Rio McMahon

    02/23/2022, 11:23 PM
    If I want to have an external python file (e.g. in a
    src/
    directory) what is the best way to import it? I tried following similar logic to this: https://docs.prefect.io/orchestration/flow_config/storage.html#loading-additional-files-with-git-storage but adding to the import path:
    import pathlib, sys
    file_path = pathlib.Path(__file__).resolve().parent
    sys.path.append(file_path)
    But keep getting this error:
    [23 February 2022 4:22pm]: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'src'")
    Is there a best practice for importing external python code into a flow?
    k
    11 replies · 2 participants
Powered by Linen
Title
r

Rio McMahon

02/23/2022, 11:23 PM
If I want to have an external python file (e.g. in a
src/
directory) what is the best way to import it? I tried following similar logic to this: https://docs.prefect.io/orchestration/flow_config/storage.html#loading-additional-files-with-git-storage but adding to the import path:
import pathlib, sys
file_path = pathlib.Path(__file__).resolve().parent
sys.path.append(file_path)
But keep getting this error:
[23 February 2022 4:22pm]: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'src'")
Is there a best practice for importing external python code into a flow?
k

Kevin Kho

02/23/2022, 11:23 PM
Are you using the Local agent?
You can specify the working dir like the last example here
r

Rio McMahon

02/23/2022, 11:26 PM
No I am using Git (preferably GitLab) for storage and ECSRun as my config. My flow looks like:
# general prefect imports
import prefect
from prefect import task, Flow
from prefect.storage import Git
from prefect.run_configs import ECSRun
from prefect.client import Secret

# specific imports to load files from src/
import pathlib, sys
file_path = pathlib.Path(__file__).resolve().parent
sys.path.append(file_path)

from src.seasonality_index_builder_dynamic_agg import run_seasonality_index_builder_dynamic_agg

# define a wrapper task to expose logging
@task(log_stdout=True, checkpoint=False)
def run_script():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Running script...")
    run_seasonality_index_builder_dynamic_agg()

# instantiate the flow - we store the flow definition in gitlab
with Flow("seasonality_index_builder",
        storage=Git(
            [git info]
            ),
        run_config=ECSRun(
            [ECS stuff]
            )
         ) as flow:
    run_script()

# Register the flow under the "tutorial" project
flow.register(project_name="Testing",
        labels=['ds']
        )
k

Kevin Kho

02/23/2022, 11:28 PM
Ah yeah in this case it really needs to go into the container for ECSRun. Git storage is not intended to handle other Python files, just stuff like sql and yaml. The Path manipulation is pretty hard and might be impossible. Of course, if you find a solution please share so we can archive.
r

Rio McMahon

02/23/2022, 11:47 PM
Okay - is something like this pretty typical then:
COPY src /home/mambauser/src
in the dockerfile, then
import pathlib, sys, os
sys.path.append(pathlib.Path(os.environ["HOME"]).resolve())
in the flow. In this case
os.environ["HOME"]
should resolve to
/home/mambauser
k

Kevin Kho

02/23/2022, 11:48 PM
Not really because at this point you may as well install that
src
as a Python package so it’s accessible wherever the Flow runs. Are you familiar with how to do that?
r

Rio McMahon

02/23/2022, 11:50 PM
As in break out the contents of
src
into a module then install via pip within my docker container?
k

Kevin Kho

02/23/2022, 11:51 PM
Yes, but I don’t know what you mean by “break out”. I think you just need to provide a
setup.py
?
If ever it helps you, here is a blog for that
r

Rio McMahon

02/23/2022, 11:54 PM
Poor phrasing on my part - that blog post will be a good starting point. Thanks a ton for the quick response and feedback. Really enjoying prefect so far!
👍 1
k

Kevin Kho

02/23/2022, 11:58 PM
Of course! 🙂
View count: 3