https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • k

    Kenneth Miller

    05/08/2021, 5:49 PM
    Hi everyone - I'm currently evaluating using Prefect/Dask for our ETL engine, running through our various use-cases - reading through the open-source docs I'm wondering if there is a simple way to trigger execution of a task from the outside, say, from an AWS lambda function written in Javascript...any ideas?
    k
    • 2
    • 8
  • n

    Nelson Griffiths

    05/08/2021, 6:56 PM
    I am running into some issues deploying Docker Containers that I am not quite sure how to debug. I am using bitbucket as my flow storage and Docker as my run environment. I try to run the flow through my Docker Agent and it successfully pulls down the docker image and then does nothing else for a long time until it seems to fail silently and a Lazarus process restarts it. Any ideas on where to start looking to figure out the issue? The logs aren't any help and I can't tell why it is failing.
    m
    j
    +3
    • 6
    • 23
  • s

    sark

    05/10/2021, 6:53 AM
    hello everyone, how do i flatten/join a task of tasks? let’s say i already have tasks a,b,c defined but i would like to group them as a task, perhaps because i want a conditional flow: for one branch i want to execute a,b,c in order, otherwise i want to execute d,e,f how do i go about this? without redefining a task which does what a,b,c/d,e,f already do?
    e
    • 2
    • 2
  • t

    Trevor Kramer

    05/10/2021, 2:12 PM
    I frequently can't load cloud.prefect.io. I get blinking dots forever, Doing a hard reload doesn't help. Has anyone else run into this?
    n
    n
    +3
    • 6
    • 9
  • s

    Saksham Dixit

    05/10/2021, 4:51 PM
    Hello everyone, I am trying to write a state handler for my flow
  • s

    Saksham Dixit

    05/10/2021, 4:52 PM
    when it is called the new_state's 'result' attribute is an empty dictionary
  • s

    Saksham Dixit

    05/10/2021, 4:52 PM
    Its should ideally have all the tasks and whether the failed or succeeded
    m
    • 2
    • 9
  • s

    Saksham Dixit

    05/10/2021, 4:53 PM
    Can you guys suggested why the result attribute would be an empty dictionary?
  • m

    Matthew Millendorf

    05/10/2021, 5:17 PM
    Hello, I am wondering if anyone has an example of getting a flow’s ID using the flow name and project name to execute a pre-registered flow from within a web loop? I’m not entirely sure on the best approach and how to do this: Option 1: if there is a route that I can make a GET request too, how would I find that? (been searching the repo but no luck yet) Option 2: I would write this as a graphql query. How would I know what to write? Thanks - any help is much appreciated!
    k
    t
    • 3
    • 47
  • b

    Belal Aboabdo

    05/10/2021, 6:10 PM
    Hi All My CI process is failing health checks when building my flow with
    prefect build -p
    with the following error. Anyone experience this issue?
    Beginning health checks...
    System Version check: OK
    Traceback (most recent call last):
      File "/opt/prefect/healthcheck.py", line 150, in <module>
        flows = import_flow_from_script_check(flow_file_paths)
      File "/opt/prefect/healthcheck.py", line 63, in import_flow_from_script_check
        flows.append(extract_flow_from_file(file_path=flow_file_path))
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/storage.py", line 86, in extract_flow_from_file
        exec(contents, exec_vals)
      File "<string>", line 129, in <module>
    NameError: name '__file__' is not defined
    m
    j
    m
    • 4
    • 19
  • e

    Enda Peng

    05/10/2021, 6:44 PM
    Is there any tips for using local agent and local run? I find it is really error prone. e.g I have a file
    foo.py
    where
    flowA
    is encoded. I can easily run into following cases: • register with
    -p ./foo.py
    , it doesn’t work while
    -p foo.py
    works. • Under same folder
    /pipelines/
    there are two files
    flow.py, common_task.py
    , I can also run into import error, sometimes
    from .common_task import xxx
    works sometimes not, depending on how PYTHONPATH is set. So far my experience is to make sure local agent run on the same level where my
    resiger
    command is called, no leading dot in register command etc. Curious about whether ppl have better ideas on how to manage the entire process.
    k
    • 2
    • 4
  • f

    Fabrice Toussaint

    05/11/2021, 8:00 AM
    Hi everyone, Can someone explain to me how I integrate Sentry with Prefect? Do I need to initialize it in each flow or? Just adding it to my .py files does not work unfortunately when the flow is being registered. Sentry: https://docs.sentry.io/platforms/python/
    k
    • 2
    • 7
  • s

    Salohy

    05/11/2021, 9:00 AM
    Hi every one,  I have this error when using SqlServerExecute `ImportError: Using
    prefect.tasks.sql_server
    requires Prefect to be installed with the "sql_server" extra`. I am using, docker for storage using a docker file as explained here. My code looks like this
    from prefect.tasks.sql_server import SqlServerExecute
    
    STORAGE = Docker(registry_url="<http://xxx.azurecr.io|xxx.azurecr.io>",
             image_name="prefect/test",
             dockerfile="Dockerfile",
             python_dependencies=["pyodbc"])
    
    SQL = SqlServerExecute(db_name="myDb",
                user="user",
                host="<tcp:xxx.database.windows.net>",
                port=1433,
                driver="ODBC Driver 17 for SQL Server")
    
    QUERY = """
      CREATE TABLE [dbo].[myTable] (
        [id] VARCHAR(22) COLLATE Latin1_General_100_BIN2 NOT NULL,
        [FirstName] VARCHAR(222) NULL,
        [LastName] VARCHAR(222) NULL
      )
    """
    
    @task
    def create_table():
      logger = prefect.context.get('logger')
      <http://logger.info|logger.info>("Create table in DW for loading the results")
      SQL.run(query=QUERY, password='myPass')
    
    with Flow("candidates-flow", storage=STORAGE, run_config=RUN_CONFIG, executor=EXECUTOR) as flow:
      create_table()
    Can someone help? I do not understand the error. Many thanks already 🙏
    k
    • 2
    • 23
  • d

    David

    05/11/2021, 10:25 AM
    Hi all 🙂 I am encountering an issue regards the new ECS agents and I will be glad for some advice, we are executing many flows nightly. For the last half year used the Fargate agents, which worked pretty fine, we tried to move to the new agent - ECS agent but now after deploying new flow and running the batches we receive -
    An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
    The ECS agent logs (verbose) isn’t very helpful and I can’t debug the error as it happens when the system at its peak. And I have couple of questions - 1. Why does the flow register itself on AWS only when its running and not when it was registered? 2. Can I ask prefect to just dockerize my flow instead of register a new task definition (because nothing changed in the task definition), I saw the parameter of
    idempotency_key
    but I don’t understand if this is helpful in my situation 3. Should I call
    register
    if my docker is the only thing changed? or should I call
    flow,storage.build
    instead? I can provide code if needed, Thank you in advance
    k
    • 2
    • 4
  • s

    Saulius Beinorius

    05/11/2021, 10:39 AM
    Hi, I'm trying to set up the ECS agent without using dynamic flow task definitions (I already have a static task definition specified in the flow's
    RunConfig/ECSRun
    ). When I am trying to start the agent, it tries to build the default parameters for a task definition and fails (since we don't have a default VPC). Is there a way to start the ECS agent without providing a task template and to only rely on the flow configuration instead?
    k
    • 2
    • 5
  • i

    Ian Harvey

    05/11/2021, 3:37 PM
    Hi all, hopefully a quick question. Can
    PREFECT__LOGGING__EXTRA_LOGGERS
    accept wildcards? So instead of something like
    flow.run_config = LocalRun(
        env={
            "PREFECT__LOGGING__EXTRA_LOGGERS": json.dumps([
                'foo.bar.aaa',
                'foo.bar.bbb',
            ])
        }
    )
    I could use
    flow.run_config = LocalRun(
        env={
            "PREFECT__LOGGING__EXTRA_LOGGERS": json.dumps([
                'foo.bar.*'
            ])
        }
    )
    With larger projects it's quite time consuming to add each class individually.
    k
    m
    m
    • 4
    • 6
  • m

    Matej

    05/11/2021, 3:56 PM
    Hi all I am trying to deploy prefect-agent on a local kubernetes cluster
    prefect.utilities.exceptions.AuthorizationError: [{'path': ['auth_info'], 'message': 'AuthenticationError: Forbidden', 'extensions': {'code': 'UNAUTHENTICATED'}}]
    I am unsure on how to set up the yaml variables:
    - name: PREFECT__CLOUD__AGENT__AUTH_TOKEN
      value: ''
    - name: PREFECT__CLOUD__API
      value: <https://api.prefect.io>
    - name: NAMESPACE
      value: default
    - name: IMAGE_PULL_SECRETS
      value: ''
    - name: PREFECT__CLOUD__AGENT__LABELS
      value: '[]'
    - name: JOB_MEM_REQUEST
      value: ''
    - name: JOB_MEM_LIMIT
      value: ''
    - name: JOB_CPU_REQUEST
      value: ''
    - name: JOB_CPU_LIMIT
      value: ''
    - name: IMAGE_PULL_POLICY
      value: ''
    - name: SERVICE_ACCOUNT_NAME
      value: ''
    - name: PREFECT__BACKEND
      value: cloud
    - name: PREFECT__CLOUD__AGENT__AGENT_ADDRESS
      value: http://:8080
    image: prefecthq/prefect:0.14
    specifically the prefect__cloud___agent__address seem to be incorrect. Q: Is there a better documentation on how to set up prefect-agent ?
    k
    m
    • 3
    • 2
  • c

    Christian

    05/11/2021, 3:58 PM
    Hi 👋 I’m currently experimenting with the recent Youtube Demo of some data science prefect flows… We have an on-premise S3 server that has a custom endpoint_url. Can I use S3Result() to store my plots? How would I provide the endpoint_url and other custom settings?
    k
    • 2
    • 6
  • j

    Joseph Loss

    05/11/2021, 5:15 PM
    I'm having some issue importing prefect.context.today. Was this changed from 14.16 to 14.17? I'm on .16, can't seem to use "import prefect.context" https://docs.prefect.io/api/latest/utilities/context.html
    k
    • 2
    • 9
  • m

    Matej

    05/11/2021, 5:17 PM
    Hi all, I am trying to run a prefect flow on a dask executor. I am getting the following error
    [2021-05-11 19:11:14+0200] ERROR - prefect.FlowRunner | Unexpected error: ModuleNotFoundError("No module named 'prefect'")
    it seems that the worker does not have the prefect installed. Dask is running in a kubernetes cluster so is there a smart way on how to install prefect module on all the dask workers? M
    k
    • 2
    • 53
  • s

    sapatil

    05/11/2021, 5:41 PM
    I’m trying to set up email task, and this page gives information about how to set up email, if you are using gmail. What I’m looking for is - similar level of details if I’m using Microsoft Outlook (my office email) - does anyone here work on such use case before? Any documentation regarding set up would be great help. Thank you.
    k
    • 2
    • 14
  • s

    Salohy

    05/11/2021, 5:54 PM
    Hello all 🙂 I am getting this error
    TypeError: execute() takes no keyword arguments
    When running sql query using SqlServerExecute. Here is my task code following the doc explained here https://docs.prefect.io/api/latest/tasks/sql_server.html#sqlserverexecute
    from prefect.tasks.sql_server import SqlServerExecute
    
    SQL = SqlServerExecute(db_name="my"Db",
                user="user",
                host="<tcp:xxxs.database.windows.net>",
                port=1433,
                driver="ODBC Driver 17 for SQL Server")
    
    QUERY = "CREATE TABLE [dbo].[CandidateDetails] ([CandidateId] VARCHAR(22) COLLATE Latin1_General_100_BIN2 NOT NULL,[FirstName] VARCHAR(222) NULL,[LastName] VARCHAR(222) NULL)"
    
    @task
    def create_table():
      logger = prefect.context.get('logger')
      <http://logger.info|logger.info>("Create table in DW for loading the results")
      SQL.run(query=QUERY, password='myPass')
    Can you help me on this? many thanks already 🙂
    k
    • 2
    • 3
  • m

    Matthew Blau

    05/11/2021, 6:19 PM
    Hello all, does an easy way in the graphql api to cancel all submitted, but not currently running flows exist? If so, where in the docs can I read more about this?
    k
    • 2
    • 4
  • z

    Zach Hodowanec

    05/11/2021, 10:59 PM
    Hi Team, I'm working to define a custom job spec to be referenced via a KubernetesRun config in my flow, but running into a few issues when the custom job spec gets kicked off via a Kubernetes Agent. 1. The job is not being tracked via Prefect Cloud. It gets scheduled and I can see the job execute to completion in Kubernetes but Prefect Cloud never reports beyond the scheduled status. 2. The Flow itself never gets executed and the only logs are the CLI printout of the main
    prefect --help
    results.
    k
    t
    +2
    • 5
    • 57
  • a

    Anze Kravanja

    05/12/2021, 3:30 AM
    Hello! I was wondering if it is possible to map parameters in dictionary to their inputs of the task. Here’s the example:
    from prefect import Task, Flow, task
    
    @task()
    def task_success():
        print("Success!")
        return {'p': 1, 'k': 2, 'j': 3}
    
    
    @task()
    def task_to_fail(p, k):
        print(f"p: {p}; k: {k}")
        raise NotImplementedError()
    
    
    with Flow("catch-all-failed-flow", state_handlers=[STATE_HANDLER_CATCH_ALL_FAILED]) as flow:
        ts = task_success()
        tf = task_to_fail(ts) # how can I map p and k params to the task_to_fail task instead of passing the whole dict in p but nothing as k
    
    flow.run()
    I know I can do ts[‘p’] and ts[‘k’], but I’m looking for something like **ts Thank you!
    k
    • 2
    • 6
  • k

    Kamil Okáč

    05/12/2021, 8:02 AM
    Hi all. I was wondering, is this a correct way to set task defaults for an entire flow?
    from prefect.configuration import config
    
    config.tasks.defaults.max_retries = 2
    config.tasks.defaults.retry_delay = datetime.timedelta(seconds=10)
    k
    • 2
    • 3
  • s

    Sven Teresniak

    05/12/2021, 11:52 AM
    [~] -> docker run -it prefecthq/<server:core-0.14.18> pip show prefect
    Name: prefect
    Version: 0.14.16+95.g377f2fe7a
    holy crap! what is the purpose of the version number in the tag if not the prefect version?
    k
    m
    • 3
    • 10
  • s

    Sven Teresniak

    05/12/2021, 12:25 PM
    hmm in https://github.com/PrefectHQ/prefect/blob/0.14.18/requirements.txt the dep
    click >= 7.0.0
    will install
    click v8.0.0
    . This is not compatible with
    click v7.1.2
    which is part of the prefect-server docker image (with tag
    core-0.14.18
    )
  • j

    Joseph Ellis

    05/12/2021, 12:27 PM
    Hi all, hopefully an easy one.. I need to pass the flow run ID of any flow execution in to my tasks. This should be possible through the ‘Context’ object, but when I try and register my flow to Prefect Cloud, I get the following error:
    AttributeError: ‘Context’ object has no attribute ‘flow_run_id’
    Any ideas?
  • j

    Joseph Ellis

    05/12/2021, 12:28 PM
    prefect.context.flow_run_id
    k
    • 2
    • 2
Powered by Linen
Title
j

Joseph Ellis

05/12/2021, 12:28 PM
prefect.context.flow_run_id
Solved:
prefect.context.get("flow_run_id")
k

Kevin Kho

05/12/2021, 1:38 PM
Glad you figured it out 🙂
View count: 1