https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • b

    Brian Phillips

    10/31/2022, 4:57 PM
    The Prefect roadmap has mentioned support for languages other than Python for some time. Is there any estimate of when we might start to see support for that? 2023? Are compiled languages likely to be supported or only R and Julia?
    m
    • 2
    • 1
  • e

    eddy davies

    10/31/2022, 4:58 PM
    Hi, I am struggling with Kubernetes Config block, copied the contents of
    config
    file in my
    .kube
    file but doesn't seem to work, any advice?
    ✅ 1
    n
    • 2
    • 8
  • m

    Meghan Franklin

    10/31/2022, 7:04 PM
    Is there a recommended way to produce a LOT of graphs in a prefect flow with matplotlib? If I use
    import matplotlib; matplotlib.use('agg')
    I get a segfault like this:
    ./run_local.sh: line 14: 34109 Segmentation fault: 11  python cli.py local -pfile developer/test_input.json
    make: *** [run_local] Error 139
    (p39_ngs) MB-MFRANKLIN:amplicon-analysis meghanfranklin$ /Users/meghanfranklin/opt/anaconda3/envs/p39_ngs/lib/python3.9/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 2 leaked semaphore objects to clean up at shutdown
      warnings.warn('resource_tracker: There appear to be %d '
    but if I don’t use that, python crashes 🙃
    /Users/meghanfranklin/opt/anaconda3/envs/p39_ngs/lib/python3.9/site-packages/amp_analysis/plots.py:425: UserWarning: Starting a Matplotlib GUI outside of the main thread will likely fail.
      fig, ax = plt.subplots()
    [2022-10-31 15:02:54-0400] INFO - prefect.TaskRunner | Task 'create_pipeline_run': Finished task run for task with final state: 'Success'
    2022-10-31 15:02:54.121 python[34449:328037] *** Terminating app due to uncaught exception 'NSInternalInconsistencyException', reason: 'NSWindow drag regions should only be invalidated on the Main Thread!'
    *** First throw call stack:
    (
    	0   CoreFoundation                      0x00007ff80bf1e7c3 __exceptionPreprocess + 242
    	1   libobjc.A.dylib                     0x00007ff80bc7ebc3 objc_exception_throw + 48
    	2   CoreFoundation                      0x00007ff80bf47076 -[NSException raise] + 9
    [a lot more lines here] 
    )
    libc++abi: terminating with uncaught exception of type NSException
    ./run_local.sh: line 14: 34449 Abort trap: 6           python cli.py local -pfile developer/test_input.json
    make: *** [run_local] Error 134
    (p39_ngs) MB-MFRANKLIN:amplicon-analysis meghanfranklin$ /Users/meghanfranklin/opt/anaconda3/envs/p39_ngs/lib/python3.9/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 2 leaked semaphore objects to clean up at shutdown
      warnings.warn('resource_tracker: There appear to be %d '
    
    -> cue Mac popup window about sending crash report to apple
    m
    j
    • 3
    • 11
  • d

    David Cupp

    10/31/2022, 7:54 PM
    The Prefect *Cloud UI is showing times in our local timezone. Is there a setting to make that UTC? I can't find a setting anywhere. (I did google this and can't find any docs).
    ✅ 1
    m
    • 2
    • 5
  • b

    Ben Muller

    10/31/2022, 10:33 PM
    Hi Community - unbelievably dumb question for 2.0. I am a 1.0 user and trying to play around with 2.0. Have set up a basic hello world flow and trying to run the flow locally with the 2.0 equivalent of
    prefect run -p test_flow.py
    Is this an option in 2.0 on the cli? or do I have to initate through python every time?
    ✅ 1
    r
    n
    • 3
    • 19
  • b

    Ben Muller

    10/31/2022, 11:31 PM
    Hi Community - me again. I am reading the tutorial here. Just trying to understand why there is a need and demonstration for the
    SequentialTaskRunner
    with
    .submit
    . Would this just be the same as not specifying a
    task_runner
    and just running the tasks without
    .submit
    or am I missing something?
    ✅ 1
    m
    r
    +2
    • 5
    • 23
  • t

    Tony Yun

    11/01/2022, 2:46 AM
    Hey, can Prefect 2 stop a flow?? I only see “delete” flow run options.
    ✅ 1
    a
    t
    • 3
    • 5
  • t

    Tim Galvin

    11/01/2022, 3:01 AM
    Hi all -- Hopefully I am posting this in the correct channel. Feel free to slap me down if it is not. I originally posted this message in the public discourse, and it was suggested I try raising a ticket. However, I am having trouble doing that due to my set up. I will try to be brief and clear. I am working on a demonstration of prefect and how it might be used for some of our telescopes. My current toy problems is a long-ish pipeline that leverages: • A postgres running remotely (but nearby) in a singularity container • A SLURM cluster (
    dask_jobqeue.SLURMCluster
    ) with a
    DaskTaskExecutor
    being used, with 10 separate compute nodes being pulled into the distributed dask schedular. • A single large Flow with ~7 tasks, with each task calling a separate python script's main I am just running the postgres database remotely. I have not set an
    prefect orion
    server running remotely - although I can try this. I found that regular running of the pipeline would often raise a
    TimeoutError
    in an unpredictable manner. Setting ``PREFECT_ORION_DATABASE_CONNECTION_TIMEOUT=20` eliminated these. Now my problems seem to be
    TimeoutErrors
    when the flow is closing. I can confirm that all expected data products expected by the pipeline have been created and stored on disk, and all log messages have been issued. The flow is essentially finished executing and wrapping up and then the error is raised. The traceback is too long to post as a comment ( 😢 ) , so it is below.
    File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
        self._handle_dbapi_exception(
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2128, in _handle_dbapi_exception
        util.raise_(exc_info[1], with_traceback=exc_info[2])
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
        raise exception
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
        self.dialect.do_execute(
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
        cursor.execute(statement, parameters)
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
        self._adapt_connection.await_(
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
        return current.driver.switch(awaitable)
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
        value = await result
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
        await adapt_connection._start_transaction()
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
        self._handle_exception(error)
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 684, in _handle_exception
        raise error
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 714, in _start_transaction
        await self._transaction.start()
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/transaction.py", line 138, in start
        await self._connection.execute(query)
      File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/connection.py", line 318, in execute
        return await self._protocol.query(query, timeout)
      File "asyncpg/protocol/protocol.pyx", line 338, in query
    asyncio.exceptions.TimeoutError
    • 1
    • 3
  • b

    Ben Muller

    11/01/2022, 6:09 AM
    Hi Prefect, I am trying to get a working demo running with all python implementation. Our org uses AWS - I have deployed an agent to an ECS Service following some of the prefect recipes here I am now attempting to run a silly example flow from your docs with an
    ECSTask
    that I have successfully deployed :
    import sys
    import prefect
    from prefect import flow, task, get_run_logger
    from utilities import AN_IMPORTED_MESSAGE
    
    from prefect_aws.ecs import ECSTask
    
    ecs_task_block = ECSTask.load("staging-test")
    
    
    @task
    def log_task(name):
        logger = get_run_logger()
        <http://logger.info|logger.info>("Hello %s!", name)
        <http://logger.info|logger.info>("Prefect Version = %s 🚀", prefect.__version__)
        logger.debug(AN_IMPORTED_MESSAGE)
    
    
    @flow()
    def log_flow(name: str):
        log_task(name)
    
    
    if __name__ == "__main__":
        name = sys.argv[1]
        log_flow(name)
    Our org uses 1.0 at present and we have never had to pass AWS Credentials - we dont use credentials like this as we usually use roles that you can assume. I believe this was the case in Prefect 1.0 if I remember correctly. All my agent has to do is assume a role ( and it has access to whatever I need it to with its task role that I set up in deployment ). So with these blocks all requiring credentials, I am wondering if these are optional and would be picked up by the default AWS environment variables if I leave them blank or I will need to configure some type of user access for a prefect machine user? Secondly: I am trying a deployment with storage - but it is coming up with an error:
    from flows.log_flow import log_flow
    from prefect.deployments import Deployment
    from prefect.filesystems import S3
    
    storage_block = S3(bucket_path="prefect-2-test")
    
    
    deployment = Deployment.build_from_flow(
        flow=log_flow,
        name="log-simple",
        parameters={"name": "Marvin"},
        infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
        work_queue_name="staging",
        storage=S3.load("staging-test-block"),
    )
    
    if __name__ == "__main__":
        storage_block.save("staging-test-block", overwrite=True)
        deployment.apply()
    ERROR:
    File "/usr/local/Caskroom/miniconda/base/envs/prefect2/lib/python3.10/site-packages/aiobotocore/client.py", line 82, in create_client
        self._register_s3_control_events(
    TypeError: ClientCreator._register_s3_control_events() takes 2 positional arguments but 6 were given
    Any ideas to help me overcome this?
    a
    j
    m
    • 4
    • 65
  • e

    eddy davies

    11/01/2022, 10:39 AM
    Running flow from prefect cloud on kubernetes-job using helm chart to launch agent but it seems to be lacking
    aws-iam-authenticator
    , any ideas?
    ✅ 1
    :kubernetes: 1
    • 1
    • 1
  • m

    Michael Hadorn

    11/01/2022, 10:58 AM
    Hi there Do you know a method to manually skip a task while the flow is already running? We are using prefect 1.1.0. We have the problem that sometimes we have very long running tasks (database performance issues). There we would like to skip this running task run and continue with the next task. So that everything else is done. We already set for this clean up tasks in the end
    trigger=always_run, skip_on_upstream_skip=False
    , but currently we have to abort the full flow run. So this task will never be executed.
    a
    • 2
    • 5
  • o

    Oscar Björhn

    11/01/2022, 12:53 PM
    Good day! I'm trying to run a flow using azure container instance (ACI) and I'm getting a strange missing context error that I never bumped into during my docker and kubernetes days. Has anyone seen this before?
    ✅ 1
    a
    • 2
    • 3
  • p

    Patrick Alves

    11/01/2022, 1:45 PM
    Hi there, I have a prefect orion server running in kubernetes and I am trying to deploy some flows on it from my local machine. When I run:
    ❯ PREFECT_API_URL=<https://prefect.xxx.xxx/api> prefect deployment ls
    I am getting:
    File "/home/patrick/miniconda3/envs/prefect/lib/python3.8/ssl.py", line 944, in do_handshake
        self._sslobj.do_handshake()
    ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1131)
    An exception occurred.
    • I've tried set envs to disable certificate verification: PYTHONHTTPSVERIFY=false • Tried to add the server certificate (*.xxx.xxx.crt) on the server copying the CRT file
    /usr/local/share/ca-certificates/
    and update the CA store:
    sudo update-ca-certificates
    Nothing works. Any tip to solve this?
    k
    n
    q
    • 4
    • 8
  • r

    Roger Webb

    11/01/2022, 2:01 PM
    Hey all,Can you run Prefect 1.0 agents on the same machine as Prefect 2.0 Agents (While migrating from 1.0 to 2.0) or do they need their own machines?
    ✅ 1
    j
    • 2
    • 1
  • k

    Kalise Richmond

    11/01/2022, 3:03 PM
    Hi everyone! 👋 😒lack: Today we are adding 11 new channels to our Slack Community to open up conversations and discussions relating not only Prefect-specific topics but data engineering and data science. We want this Community to be a place where you can collaborate and get help with your data regardless of the stack or tool you are using. We can all share tips, tricks, and learn together. We have new cloud-specific channels for #prefect-aws, #prefect-gcp, #prefect-azure Channels dedicated to #prefect-kubernetes, #prefect-docker, #prefect-dbt, and #prefect-cloud New channels for #prefect-integrations, #prefect-recipes and #prefect-getting-started that will have live stream events for Build-a-Block or Code with Marvin. Lastly, we are so grateful for all of you in our Community that we wanted to create a #gratitude channel where we can all share our thanks for one another. So make sure to check out the new channels, join the conversations, and look for new events and announcements. tl;dr - 11 new channels in slack and lots of new events or watch the video 😉 📺 https://www.loom.com/share/5ad9da90d6ab4beeadcbd7de6ef7a527
    💯 6
    :cool-llama: 7
    :marvin: 8
    🔥 14
    ✨ 9
    🚀 12
    :prefect: 9
    🙌 7
  • a

    Austen Bouza

    11/01/2022, 4:00 PM
    Hello, is there a way to prevent deployments from overriding whether or not a flow is enabled in Prefect 1? In this case, I have deployed flows that have been turned off in the UI that will automatically be turned on again the next time I deploy changes even for something unrelated. This means having to remember to turn a flow off again after every deployment which can be annoying and create issues if a flow runs by accident when it shouldn't.
    ✅ 1
    t
    k
    • 3
    • 3
  • n

    Noam Banay

    11/01/2022, 4:26 PM
    Hey, I have a flow that is triggered by an API call from an external process. Sometimes, the process fails, and the API call doesn’t send, meaning the flow is not triggered. Is there a way to monitor those delays? I mean - If the flow doesn’t triggered by 8 am, is there a way to monitor it and send a Slack notification or an email? Thanks. @Taylor Curran
    ✅ 1
    a
    • 2
    • 4
  • j

    Javier Ruere

    11/01/2022, 5:46 PM
    Hi!
  • j

    Javier Ruere

    11/01/2022, 5:47 PM
    I'm trying to use Prefect 2.6.5 to run a Flow locally. The Flow has >10 Tasks and quite a bit of concurrency (>1000). It's an async Flow and all Tasks are async although several use
    <http://anyio.to|anyio.to>_thread
    internally. I'm using the ConcurrentTaskRunner. The flow is ran like this:
    anyio.run(
            generate_report,
            datetime.date.fromisoformat(run_date),
            backend_options={"debug": True, "use_uvloop": True},
        )
    The problem I' having is that some Tasks finish but they remain in state Running forever. There's no output I could see to understand the problem. Help, please.
    k
    m
    • 3
    • 6
  • m

    merlin

    11/01/2022, 6:02 PM
    Hello, is there a way to 'pause' a task depending on the outcome of some runtime process? I don't want the task to retry except in this one specific runtime result (normally the task could just fail and retry according to args) Perhaps I could return a manual state
    AwaitingRetry
    ?
    @task
        # some code, oh found out a blocker I'd like to wait for
         time_2_hours_from_now=  .... some date function...
         if function_is_blocked:
               return AwaitingRetry(scheduled_time=time_2_hours_from_now
    j
    m
    • 3
    • 2
  • j

    Jon Young

    11/01/2022, 6:43 PM
    hey all, is there a way to dynamically name flows in v1?
    with prefect.Flow(
        name="SOME_DYNAMIC_NAME" # instead of hardcoding this, i want to pass a value when registering / calling it
    ✅ 1
    👀 1
    b
    n
    i
    • 4
    • 13
  • c

    Carlo

    11/01/2022, 6:48 PM
    Hi, for prefect 2 self hosted where would I find logs around notifications being fired?
    m
    • 2
    • 2
  • v

    Vadym Dytyniak

    11/01/2022, 7:28 PM
    Hi. How to see RRuleSchedule in UI? I just see 'Every year' on Deployment tab and it is not clear when scheduled flow runs will appear and how many?
    k
    • 2
    • 4
  • j

    Jon Young

    11/01/2022, 8:31 PM
    can you pass a task as a flow parameter in prefect 1?
    ✅ 1
    m
    • 2
    • 17
  • m

    merlin

    11/01/2022, 9:04 PM
    How can I fail a task? I am try/catching exceptions, which works file, but I still want the task to fail. The docs didn't quite show this example:
    @task
    def fail_task():
        return Failed()
    
    @flow
    def fail_flow_i_hope():
        fail_task()
    
    fail_flow_i_hope()
    
    11:42:32.292 | DEBUG   | Task run 'always_fail_task-d83fd752-0' - Beginning execution...
    11:42:32.302 | INFO    | Task run 'always_fail_task-d83fd752-0' - Finished in state Completed()
    ...
    11:42:32.313 | INFO    | Flow run 'ludicrous-lemur' - Finished in state Completed('All states completed.')
    I'd like a task to fail, and then do retries (or not) but end the steps in the flow, with the flow also failed.
    ✅ 1
    m
    • 2
    • 4
  • j

    Jon Young

    11/01/2022, 9:53 PM
    Traceback (most recent call last):
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
        executors.prepare_upstream_states_for_mapping(
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 682, in prepare_upstream_states_for_mapping
        value = upstream_state.result[i]
    KeyError: 0
    └── 17:51:25 | ERROR   | Unexpected error occured in FlowRunner: KeyError(0)
    any help what this error means?
    m
    • 2
    • 6
  • a

    Aaron

    11/01/2022, 10:01 PM
    I have multiple deployments in a work queue with unlimited concurrency, but they still run sequentially and one will not start until the other is finished. Am I misunderstanding how concurrency works and it only applies to the same flow? Do I need a separate work queue for each of my deployments? Because what has been happening is we occasionally have a flow that takes a really long time, and all other flows wait until the long-running one finishes, with any missed jobs showing up as late and then running once the long one finishes. Two issues with that: having time-sensitive jobs not run because they are waiting, and then having hundreds of late jobs in the queue and no way to get rid of them other than deleting one by one
    m
    m
    • 3
    • 4
  • a

    Ahmed Ezzat

    11/01/2022, 10:37 PM
    Hi, I have a problem while editing any block from Prefect UI. if the field value matches the default value it'll not set and the value will not be sent on the PATCH request making it impossible to edit it back to the default one
    ✅ 1
    👀 1
    k
    r
    • 3
    • 4
  • b

    Ben Muller

    11/02/2022, 2:20 AM
    Hi Community - me again. I have a strange one, I have set up an agent in ECS with a custom image that has
    prefect-aws
    deps installed. I have an infrastructure
    ECSTask
    block that pulls a custom image:
    FROM        prefecthq/prefect:2.6.5-python3.9
    
    RUN         pip install prefect-aws s3fs==2022.5.0
    This installs everything I need to run a demo flow that is:
    import prefect
    from prefect import flow, task, get_run_logger
    from utilities import AN_IMPORTED_MESSAGE
    
    from prefect_aws.ecs import ECSTask
    
    ecs_task_block = ECSTask.load("staging-test")
    
    
    @task
    def log_task():
        logger = get_run_logger()
        <http://logger.info|logger.info>("Hello %s!", "BEN")
        <http://logger.info|logger.info>("Prefect Version = %s 🚀", prefect.__version__)
        logger.debug(AN_IMPORTED_MESSAGE)
    
    
    @flow
    def my_test_flow(name: str):
        log_task(name)
    
    
    if __name__ == "__main__":
        my_test_flow()
    This flow is using S3 Block Storage, now when I trigger this flow from the cloud UI the task is generated in my cluster, it just fails with a
    ModuleNotFoundError: No module named 'prefect_aws.ecs'
    More details in 🧵
    ✅ 1
    • 1
    • 2
  • i

    iKeepo w

    11/02/2022, 6:36 AM
    hi, my agent log always got httpx error, but seems like the flows or deployments are still work fine, how to deal with this?
    t
    b
    • 3
    • 3
Powered by Linen
Title
i

iKeepo w

11/02/2022, 6:36 AM
hi, my agent log always got httpx error, but seems like the flows or deployments are still work fine, how to deal with this?
t

Tim-Oliver

11/02/2022, 8:02 AM
I observed this too. And my flows completed as well without error.
b

Bianca Hoch

11/02/2022, 9:08 PM
Hey team, thanks for raising this. Would you be able to share any of the following? • account id • workspace id • example flow run id that threw the error • full traceback of the error Feel free to DM the information privately if needed.
If possible, we'd greatly appreciate a summary of what you're seeing as a GitHub issue as well.
View count: 3