https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Madhup Sukoon

    03/17/2022, 8:06 AM
    Hi, I have added state_handlers to my flow, but for some reason they are not getting triggered. Currently it looks like this:
    from prefect.utilities.notifications import slack_notifier
    .
    .
    flow.state_handlers = [slack_notifier()]
    I have added the prefect app and also added the webhook URL to the
    SLACK_WEBHOOK_URL
    secret. Any pointers on why this is happening / how to debug?
    a
    k
    34 replies · 3 participants
  • i

    Ievgenii Martynenko

    03/17/2022, 10:54 AM
    Hi again, I have a question about extra logging. We have own Python module 'bear' whereas logging is defined as:
    logger = logging.getLogger(__ _name ___)
    and I want log messages from that module to appear in Prefect. I registered extra logger, and log messages defined in Task (AWSPOC) appear, but not the log messages defined inside 'bear'. My code for "prefect_flow.py" :
    import logging
    
    format_string = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
    logger = logging.getLogger('magic_logger')
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter(format_string)
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    
    class AWSPOC(Task):
    
        def __init__(self, name: str, config_file: str):
            self.config_file = config_file
            super().__init__(name=name)
    
        def run(self):
            configuration = load_file(self.config_file)
            loader = Loader(configuration=configuration)
            <http://logger.info|logger.info>("This message appears in Prefect log output")
            loader.run()
    
    executor = LocalDaskExecutor()
    host_config = {...}
    storage = S3(...)
    env = {
        "PREFECT__LOGGING__EXTRA_LOGGERS": "['magic_logger']"
    }
    docker_run_config = DockerRun(image=..., host_config=host_config, env=env)
    
    with Flow(name="AWS POC", executor=executor, storage=storage, run_config=docker_run_config) as flow:
        task = AWSPOC(name='...', config_file='...')
        task()
    
    flow.register(project_name='AWS POC')
    Where I'm wrong? The only idea I have is that 'bear' should have it's own named logger.
    k
    33 replies · 2 participants
  • v

    Vadym Dytyniak

    03/17/2022, 11:37 AM
    Hi. 1. Is it possible to specify timeout for wait_for_flow_run task? 2. Is it possible to use prefect.Client to cancel Prefect flows by name?
    a
    k
    17 replies · 3 participants
  • d

    Daniel Nilsen

    03/17/2022, 12:00 PM
    I would like to connect my prefect service, which is deployed to gcp kubernetes engine using the helm chart, with my postgres database hosted in gcp cloudSQL. Is there any documentation or tutorial for doing this?
    a
    k
    4 replies · 3 participants
  • v

    Vishnu

    03/17/2022, 3:03 PM
    Hello, I am wondering if it is possible to set up custom results classes with custom serialisers in Prefect 2.0?
    m
    3 replies · 2 participants
  • m

    Marc Lipoff

    03/17/2022, 3:58 PM
    I have an interesting situation. I'm pulling data from an API. I can pull only 100 items at a time (there are like 200k total). The API response gives me a "next url" that I can use in the next API request. However, that "next url" is the same as I iterate through (I assume it's some sort of stream). Once I get the results, I want to write them to a database. My non-prefect way is to write a task that pulls from the API until there is no next_url (
    while has_next_url: ...
    ) . Is there a better "prefect" way to do this? One of the downsides of my basic way is that, if there is an error along the way (let's say at record 199,999 of 200,000), I lose what I had.
    s
    k
    10 replies · 3 participants
  • k

    Kevin Mullins

    03/17/2022, 4:43 PM
    Sorry if this has been asked/answered before. One thing I really miss from other orchestrators is a simple way to specify how concurrency for a flow should behave. In Airflow I believe this is controlled with using a
    LatestOnlyOperator
    or using
    depends_on_past
    . In Oozie (I know, not my favorite orchestrator), this could be achieved by setting a
    concurrency
    to
    1
    and
    execution
    to
    FIFO
    or
    ONLYLAST
    . Are there any plans in Orion to provide similar functionality so users don’t have to implement custom solutions to this common pattern?
    k
    1 reply · 2 participants
  • a

    Andrew Lane

    03/17/2022, 5:01 PM
    I’m struggling to wrap my head around how state signals interact with the
    retry_delay
    attribute of
    Task
    objects. Based on the description of state signals here, I’d expect to be able to raise
    signals.RETRY()
    and have the task wait
    retry_delay
    before the task is run again. However, the retries appear to be being carried out immediately. Can someone explain where I’m going wrong in the
    wait_until_success
    task in the trailing code snippet?
    k
    7 replies · 2 participants
  • n

    Nico Neumann

    03/17/2022, 5:03 PM
    Hi, I want to change the schedule of a Flow in my code. As an example I set
    flow.schedule = Schedule(clocks=[CronClock("* * * * *")])
    which works fine and later want to add another schedule or delete the current one. My idea is to use GraphQL with the
    set_flow_group_schedule
    and
    delete_flow_group_schedule
    methods.
    k
    10 replies · 2 participants
  • c

    Chris Reuter

    03/17/2022, 5:50 PM
    @Chris White and I are going on PrefectLive starting in 10 minutes! We'll be playing with Prefect 2.0 and giving away Prefect branded socks in honor of our SOC 2 announcement - hope to see you all there.
    :upvote: 1
    :marvin: 3
    :prefect: 2
    🚀 1
    f
    2 replies · 2 participants
  • m

    Marc Lipoff

    03/17/2022, 6:11 PM
    What's the best way to retry a task, only in certain scenarios? For example, I'm pulling from an API. I only want to retry (after 30s) if I get a 429 code (rate limited)
    s
    k
    3 replies · 3 participants
  • m

    Marvin

    03/17/2022, 6:55 PM
    Flow run from TwitchStream completed 🎉
    🤯 7
    :cool-llama: 8
    :ahhhhhhhhh: 6
    :marvin: 7
    :party-parrot: 4
  • m

    Marvin

    03/17/2022, 6:56 PM
    Flow run from TwitchStream completed 🎉
  • s

    Stéphan Taljaard

    03/17/2022, 6:56 PM
    crontab * * * * *
    😆
    :marvin: 2
  • s

    Stéphan Taljaard

    03/17/2022, 7:16 PM
    For 😛refect: 2.0 - do you have some general guidelines to use to decide if I should decorate a function(s) with
    task
    or
    flow
    (i.e. subflows)?
    k
    1 reply · 2 participants
  • a

    alex

    03/17/2022, 7:35 PM
    Hello, I have a flow on prefect cloud that is stuck in a pending state, even though I have an agent ready that was previously working. It says
    x slots
    used in the late runs section, even though the flow does not have any tags associated with the concurrency limit (I have some other flows that do). Any ideas on how I can resolve this issue?
    k
    a
    16 replies · 3 participants
  • s

    Stéphan Taljaard

    03/17/2022, 7:55 PM
    5000 followers 🥳
    :marvin: 2
    🚀 3
  • c

    Constantino Schillebeeckx

    03/17/2022, 7:56 PM
    I'd like to excute a graphQL with this kind of predicate:
    {parameters: {_contains: {"foo": "bar"}},
    but the syntax isn't quite right, I get the error
    Syntax Error: Expected Name, found String "date".
    can someone help?
    k
    l
    9 replies · 3 participants
  • p

    Philip MacMenamin

    03/17/2022, 7:58 PM
    Hi, what's the typical way you would run a ShellTask with an argument that's used in the
    command
    which is a return from a prev Task? eg
    with Flow(name='t') as f:
       a = task_a()
       b = shell_task(a)
    k
    4 replies · 2 participants
  • s

    Serge Tarkovski

    03/17/2022, 8:42 PM
    hi, seems I've found a bug: after upgrade to 2.0b2, I'm getting this when running even the simplest flow:
    $ python flow1.py 
    Traceback (most recent call last):
      File "/home/tarkovskyi/miniconda3/envs/prefect_exp/lib/python3.10/site-packages/prefect/client.py", line 257, in api_healthcheck
        await self._client.get("/health")
      File "/home/tarkovskyi/miniconda3/envs/prefect_exp/lib/python3.10/site-packages/prefect/utilities/httpx.py", line 102, in get
        return await self.request(
      File "/home/tarkovskyi/miniconda3/envs/prefect_exp/lib/python3.10/site-packages/prefect/utilities/httpx.py", line 47, in request
        request = self.build_request(
    TypeError: BaseClient.build_request() got an unexpected keyword argument 'extensions'
    m
    m
    9 replies · 3 participants
  • j

    Jared Robbins

    03/17/2022, 9:01 PM
    for prefect 2.0 is there some sort of feature like a hot reload that will automatically create a new deployment when I change the file in the deployment location?
    k
    5 replies · 2 participants
  • j

    Jared Robbins

    03/17/2022, 9:16 PM
    What's the meta way to run one task if another task fails in 2.0? pass the task as a parameter to the dependent task and check if the state is failed there? From what I can tell, the wait_for keyword will only check if it is Finished, not whether it is completed or failed
    k
    5 replies · 2 participants
  • j

    Jacqueline Riley Garrahan

    03/17/2022, 9:19 PM
    Hello- I'm wondering if there is any reason behind delaying the conda-forge releases of 1.x? https://github.com/conda-forge/prefect-feedstock
    k
    m
    7 replies · 3 participants
  • s

    Serge Tarkovski

    03/17/2022, 10:02 PM
    in 2.0, is there a way to see inputs and outputs of a task?
    k
    8 replies · 2 participants
  • d

    Darshan

    03/17/2022, 10:40 PM
    Hello, in Orion - how to handle logging from python files which are not part of flow code? I usually use loguru for my logging but not able to make it work with Prefect logging seamlessly. I would like to have both prefect logging and app logging to sink to same target (file/console) and with consistent format. Are there any good examples available on this ?
    k
    a
    2 replies · 3 participants
  • b

    Brad

    03/18/2022, 3:07 AM
    Hey team - I’m trying to have a play with some work queues on a remote host and I think something might be not right with the api endpoints
    a
    m
    10 replies · 3 participants
  • l

    lialzm

    03/18/2022, 6:56 AM
    Hello, signed up for cloud beta but now the task always pending
    a
    5 replies · 2 participants
  • a

    Aaron Ash

    03/18/2022, 7:56 AM
    when registering flows from python modules with the cli is it possible to override the
    executor
    and `run_config`s of the flows somehow?
    a
    2 replies · 2 participants
  • a

    Aaron Ash

    03/18/2022, 7:58 AM
    I have a bunch of kubernetes flows for prod/staging and i'm working on a local docker-compose environment for dev purposes, currently the local flows fail when they try to spin up the
    KubernetesRun()
    environments
    a
    1 reply · 2 participants
  • a

    Aaron Ash

    03/18/2022, 7:59 AM
    would a better way to do this be to write a simple python script using the client api and override the executor and run_configs there?
    a
    3 replies · 2 participants
Powered by Linen
Title
a

Aaron Ash

03/18/2022, 7:59 AM
would a better way to do this be to write a simple python script using the client api and override the executor and run_configs there?
a

Anna Geller

03/18/2022, 11:33 AM
I know it's all related to your one single larger question, but actually, I'm glad you created it in separate threads since it allows me to answer each question separately 😄 When you say you would like to override the executor and run config for dev vs prod, does it mean that your flow is exactly the same for both and those two (executor and run_config) and the only files that differ? If so, you could do something that I did in this repo: https://github.com/anna-geller/prefect-dbt-k8s-snowflake/blob/master/flow_utilities/prefect_configs.py And then before you register your flow to a new environment, you could overwrite this single variable:
with Flow(
    FLOW_NAME,
    executor=LocalDaskExecutor(),
    storage=set_storage(FLOW_NAME),
    run_config=set_run_config(local=True),
) as flow:
For the executor, this is more tricky, since as I said before, it's retrieved from storage, but you can try doing something like:
with Flow(
        FLOW_NAME,
        executor=LocalDaskExecutor(),
        storage=set_storage(FLOW_NAME),
        run_config=set_run_config(),
) as flow:
    datasets = ["raw_customers", "raw_orders", "raw_payments"]
    dataframes = extract_and_load.map(datasets)
    
if __name__ == '__main__':
    # register for prod
    flow.register("prod_project")
    # register for dev
    flow.executor = LocalExecutor()
    flow.run_config = set_run_config(local=True) 
    flow.register("dev_project")
But I believe in the above the executor won't be respected since main is not evaluated at flow runtime. So probably your best bet is to define your main flow in one python file, say:
aaron_flow.py
- this defines your flow structure without defining run config or executor:
with Flow("FLOW_NAME", storage=S3(), # just example
) as flow:
    datasets = ["raw_customers", "raw_orders"]
    dataframes = extract_and_load.map(datasets)
Then, you can have a file called `aaron_flow_dev.py`:
from aaron_flow import flow

flow.executor = LocalExecutor()
flow.run_config = KubernetesRun(image="some_dev_image")
and
aaron_flow_prod.py
from aaron_flow import flow

flow.executor = LocalDaskExecutor()
flow.run_config = KubernetesRun(image="some_prod_image")
and then you can register using CLI without worrying about the run config and executor
a

Aaron Ash

03/21/2022, 2:20 AM
Thanks again @Anna Geller
This approach with the separate
*_dev.py
modules looks like it's perfect for me
🙌 1
View count: 11