https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Anh Nguyen

    01/13/2022, 12:59 AM
    I define the Flow that is started with multiple starttime and at the specified time, i wanna to run the specified task. how to do that? tks
    k
    • 2
    • 5
  • a

    Anh Nguyen

    01/13/2022, 5:37 AM
    I have a problems with task that handle reading files from ftp server. After that, I insert data into database. That's exception: Plz give me solution
    k
    • 2
    • 2
  • a

    Aqib Fayyaz

    01/13/2022, 7:53 AM
    Hi, i want to use docker compose for prefect like we are using for other services. Is there a way so i can store everything like custom modules and flows in docker image and when i run the docker compose i should be able to access and run the flow using prefect cloud
    a
    • 2
    • 12
  • j

    Jamie McDonald

    01/13/2022, 11:40 AM
    Is there an elegant way to control the level of concurrency using the
    map()
    functionality? My scenario is a list of URLs that should be used for making requests to but want to perform the requests in batches of 'n' rather than overwhelming a server with them all at once.
    a
    • 2
    • 2
  • m

    Martim Lobao

    01/13/2022, 11:49 AM
    we’ve got a large DAG with the same task repeated in several places using different inputs (do X for A, do X for B, etc), and it can be hard to quickly understand which task failed or even just understand the structure of the DAG itself. I tried using both the
    task_run_name
    and
    name
    args in the
    task
    decorator to provide more context, but neither works the way I’d like it to. as a MWE, here’s a sketch of what I’d like to happen:
    with Flow() as flow:
        task_a(entity="foo")  # shows up as "foo_task" in the DAG
        task_a(entity="bar")  # shows up as "bar_task" in the DAG
    the issue is that
    name
    only takes in static strings (so
    name="{entity}"
    doesn’t work) and
    task_run_name
    only sets the task run name, meaning it will never show up in the schematic outside of flow runs (even in flow runs, the name is only shown when clicking on each individual task card, making it hard to see an overall picture). is there any way to achieve what i’d like to do?
    a
    • 2
    • 4
  • d

    davzucky

    01/13/2022, 12:32 PM
    Can we write a task that could return the list of files/folder from the result storage that prefect is using. The idea is that the flow is setup with S3 storage this would do an s3 list_object and a local would a recursive scan. This task could take a prefix to Supports filtering A speudo code could look like that
    @task
    def list_file_from_storate(prefix: str) -> List[str]:
        result = prefect.context["result"]
        return result.list_files(prefix)
    Looking at the context doc I cannot see the result been available here https://docs.prefect.io/api/latest/utilities/context.html We we want is to be able to interact with the result using task from the flow setup.
    a
    • 2
    • 11
  • a

    Ahmed Rafik

    01/13/2022, 12:39 PM
    Hello guys, I have a weird behaviour while using prefect Orion deployments with the UI, I have this deployment file:
    import pandas as pd
    from prefect.deployments import DeploymentSpec
    from prefect.orion.schemas.schedules import IntervalSchedule
    
    from pred.predict import predict_flow
    
    DeploymentSpec(
        flow=predict_flow,
        name="Prediction"
    )
    I create the deployment without a problem and I can see it in the UI. I also can run it successfully from CLI using:
    prefect deployment execute 'Prediction flow/Prediction'
    But When I try to run it using the “Quick Run” in the UI, a “Scheduled” task is created and is never run. I can see it in the lateness graph. one task didn’t run for over an hour during my lunch break. Same happens if I add a schedule to the deployment. any ideas why that happens or how to fix it?
    a
    • 2
    • 10
  • a

    Andrew Hah

    01/13/2022, 2:27 PM
    Hi all, If I have a flow with, say, 10 tasks that execute sequentially, is there an easy way to start that flow at an arbitrary task (assuming the tasks before it have been completed)?
    a
    k
    • 3
    • 4
  • e

    Emma Rizzi

    01/13/2022, 3:30 PM
    Hi! Do you have any information on the best practice to manage agents? For now, I've been launching them manually on my VMs with commands like
    prefect agent docker start ... &
    and killing manually with linux signals Is there a recommended method to do this ?
    a
    • 2
    • 5
  • b

    brian

    01/13/2022, 3:48 PM
    Hi prefecters! I’m having an issue where tasks are getting killed by the ZombieKiller after a short period of time, 30m on yesterday’s flow run.
    a
    • 2
    • 8
  • y

    Yusuf Khan

    01/13/2022, 4:12 PM
    Does anyone have any suggestions for something equivalent to supervisord for Windows servers/desktops?
    a
    • 2
    • 1
  • p

    Pablo Espeso

    01/13/2022, 5:42 PM
    Hi! I've been here for a few days and I'd like to say that this community is incredible. I wish other products have this answer rate, this details and this "official" responses. I don't want to ask anything -not yet; I just want to thank you everyone for your kind work.
    :thank-you: 4
    🙌 4
    c
    • 2
    • 1
  • h

    Hwi Moon

    01/13/2022, 6:10 PM
    Hi, I can’t use the UI to edit README files… is it just me?
    n
    • 2
    • 1
  • b

    Brett Naul

    01/13/2022, 6:15 PM
    re the new orion FlowRunner class, is a KubernetesFlowRunner coming soon or could that still be a ways off?
    k
    m
    • 3
    • 3
  • h

    Hwi Moon

    01/13/2022, 6:25 PM
    Another Q… is there a way I can assign a user with an ability of using the Quick Run only and nothing else?
    k
    • 2
    • 1
  • m

    Marwan Sarieddine

    01/13/2022, 7:44 PM
    Hi folks, question about the
    ResourceManager
    and using
    set_upstream
    How can I ensure that a task is run after the resource manager cleanup ? (Please see the thread for more details)
    k
    • 2
    • 6
  • j

    Jason Motley

    01/13/2022, 10:28 PM
    When using a
    case
    statement, can one rename it so the flow schematic is clearer? Example below, as opposed to the photo
    with case(condition, True, tasK-args = {name: bla bla bla}):
    k
    • 2
    • 3
  • j

    Justin Grosvenor

    01/13/2022, 10:39 PM
    Is it possible to use GitHub storage for a codebase that spans multiple files with the Kubernetes agent? Im getting moduleNotFound errors when I try to run flows from Prefect Cloud. I have tried a few variations of the file structure, but I was unable to shell in to the pod to verify that all the files are being fetched from github, as it is terminating right away. Do I need to be using docker storage or is this probably an issue with my settings?
    k
    • 2
    • 3
  • l

    Ling Chen

    01/13/2022, 11:26 PM
    Hello, is there a way in python to get flow id by its name and project?
    k
    m
    • 3
    • 15
  • t

    Trevor Sweeney

    01/14/2022, 12:35 AM
    Hello, does anyone know why the PostgresFetch task returns a json string's key value pairs with single quotes instead of double quotes?
    k
    • 2
    • 3
  • w

    William Grim

    01/14/2022, 2:31 AM
    Hey all! It seems with the new
    prefecthq/server:latest
    that came out yesterday, things like
    graphql
    just go into crash loops in docker/kubernetes. I had to pin the version to
    2021.11.30
    to resolve crashing issues. We also use
    hasure/graphql-engine:v1.3.0
    , and it seems like that's been updated quite a few versions. Would upgrading that resolve the issues, you think? I'm not sure what major things have changed in hasura though.
    m
    • 2
    • 1
  • l

    Ling Chen

    01/14/2022, 5:36 AM
    It seems that I can call
    create_flow_run
    in normal python console but not in a python Flask app? The following minimal example will raise
    AttributeError: 'Context' object has no attribute 'logger'
    error. Any ideas?
    from flask import Flask
    from prefect.tasks.prefect import create_flow_run
    
    app = Flask(__name__)
    
    @app.route("/")
    def hello_world():
        create_flow_run.run(flow_name="test_flow", 
                            project_name="test_project")
        return "<p>Create Prefect Flow Run From Flask!</p>"
    a
    • 2
    • 2
  • a

    andres aava

    01/14/2022, 6:40 AM
    Hi all! Having bit of trouble with PrefectSecret running the task. 1. I have added JSON file containing ['api_key'] in Prefect Cloud > Team > Secrets 2. Created my custom Task class called 'saveChartmogulActivities'
    class saveChartmogulActivities(Task):
        def __init__(self, apikey, **kwargs):
            self.apikey = apikey
            self.endpoint = ENDPOINT_CHARTMOGUL_ACTIVITIES_EXPORT
            self.tmp_file_path = TMP_FILE_PATH
            super().__init__(**kwargs)
    
        def run(self):
            return self.apikey
    apikey seems to be passed as Task not as actual value. Flow looks like this, it seems to be something with my custom defined Task class, but could not figure it out or find quickly from community here. Could anybody tell just by looking at it? :)
    with Flow('chartmogul-extract', run_config=RUN_CONFIG) as flow:
    
    # Reads Secrets from Prefect Cloud > Team > Secrets
        chartmogul_secrets = PrefectSecret("CHARTMOGUL")
        chartmogul_apikey = chartmogul_secrets['api_key']
    
        # Extract
        cm_activities_task = saveChartmogulActivities(chartmogul_apikey, log_stdout=True)
        cm_activities_run = cm_activities_task()
    Although when I pass apikey as hardcoded string value, it works.
    e
    • 2
    • 4
  • a

    Arcondo Dasilva

    01/14/2022, 9:04 AM
    Hi, I'm trying to test Prefect Orion on Windows 10 but seems not working because orion.db is not created in .prefect folder. I heard that for now Orion is not foreseen in windows platform !!!! My question is anyone has a workable workaround to get this orion.db database ? That's annoying that prefect guys have excluded windows platform from their orion agenda.
    a
    • 2
    • 4
  • e

    Emil N.

    01/14/2022, 10:02 AM
    Hi! Any clue why the flow parameters won’t update on the scheduled runs after applying it on the main flow? I tried turning the Schedule off and on again, to create new runs and yet the parameters don’t change.
    a
    • 2
    • 15
  • t

    Tony Waddle

    01/14/2022, 11:17 AM
    Hi! Is there any guidance on best practice for structuring a Python project of Prefect flows? For example - one project per flow, with common methods in private repos? Or one mega project with separate folders/scripts for flows, tasks, methods? Edit: there are extreme examples I just wonder if there is any reading anywhere on this topic?
    a
    • 2
    • 2
  • а

    Андрій Демиденко

    01/14/2022, 1:06 PM
    Hi everybody! I've stuck with a problem regarding flow state result being different when I run it locally and via Prefect Cloud In my project I need to get access to finished tasks states of a flow. So I've implemented a state handler for a flow where I am trying to catch this result. And when I run it locally (via flow.run() ) it gives me the result I've expected. But when I run it via Prefect Cloud the result is empty (the following pictures are: my state_handler, output for the flow.run(), output in the Prefect Cloud) I am not sure if it's a bug but I do want to be able to read the state result in the Cloud Thank you in advance
    a
    • 2
    • 7
  • b

    Blake Enyart

    01/14/2022, 1:58 PM
    Hey all, I'm using Prefect Cloud to orchestrate a flow of flows and today I started getting failures on the AirbyteConnectionTask with the following stack trace:
    Task 'AirbyteConnectionTask[4]': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
        logger=self.logger,
      File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 456, in method
        return run_method(self, *args, **kwargs)
      File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/tasks/airbyte/airbyte.py", line 250, in run
        self._check_health_status(session, airbyte_base_url)
      File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/tasks/airbyte/airbyte.py", line 78, in _check_health_status
        health_status = response.json()["db"]
    KeyError: 'db'
    I'm using it within a mapped flow. Has anyone run into this issue yet?
    c
    a
    • 3
    • 17
  • p

    Patrick Alves

    01/14/2022, 2:01 PM
    Hi there, I am struggling to run a flow on a Docker Container. I have set up a Prefect Server and a DockerAgent: Then try to run this flow:
    import prefect
    from prefect import Flow, task
    from prefect.run_configs import DockerRun
    from prefect.storage import Docker
    
    logger = prefect.context.get("logger")
    
    @task
    def task01():
        <http://logger.info|logger.info>("Task 01")
    
    @task
    def task02():
        <http://logger.info|logger.info>("Task 02")
    
    @task
    def task03():
        <http://logger.info|logger.info>("Task 03")
    
    
    with Flow("Check Computers",
            storage=Docker(dockerfile="Dockerfile", image_name="check_computers", image_tag="latest"),
            run_config=DockerRun(image="check_computers:latest")) as flow:
    
        task01()
        task02()
        task03()
    
    # Register the flow under the "tutorial" project
    flow.register(project_name="CIS")
    Then, in the logs I've got:
    Failed to load and execute Flow's environment: FileNotFoundError(2, 'No such file or directory')
    Does anyone know what I am missing?
    a
    • 2
    • 9
  • y

    yslx

    01/14/2022, 2:14 PM
    When using gitlab storage am I right in thinking only the flow is cloned down to a remote agent running the flow? Or does the whole repo get cloned... I can't see it in the docs
    k
    a
    • 3
    • 7
Powered by Linen
Title
y

yslx

01/14/2022, 2:14 PM
When using gitlab storage am I right in thinking only the flow is cloned down to a remote agent running the flow? Or does the whole repo get cloned... I can't see it in the docs
Specifically GitLab, I can see the base git storage does load the whole repo
k

Kevin Kho

01/14/2022, 2:29 PM
The whole repo is cloned, but this is meant to pull SQL or YAML files. It’s not meant to execute other Python modules because it doesn’t pip install the repo. If you have other dependencies, using Docker is the recommendation
✅ 1
:upvote: 1
a

Anna Geller

01/14/2022, 2:29 PM
Correct, you shouldn’t rely on the presence of other modules being cloned from the repository because Prefect executes only the flow and even if other files were cloned as well, Prefect wouldn’t know how to install those dependencies. That’s why it’s best to package your dependencies either into: 1. An installable python package that gets installed in your execution environment 2. Docker image
✅ 1
y

yslx

01/14/2022, 2:40 PM
Thanks for the advice, but for other files that are part of the project for example a main.py and a config.py containing just env vars, then cloning should be fine right? Main.py can import all python files in the same directory as itself
k

Kevin Kho

01/14/2022, 2:42 PM
The working dir path is different when you the Flow is running and you are cloning the repo. Some users have tried to hack it with a
sys.path.append
but I haven’t seen it work. It’s pretty tricky
y

yslx

01/14/2022, 2:43 PM
Hmm I see, i thought by default cwd is added to the python path, so no matter where it's cloned main.py would be able to import other .py files that share the same directory
I think I might be misinformed about the python path thing. Might be able to use relative imports but yeah you're right it's tricky and potentially messy
View count: 3