https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Jacob Blanco

    09/04/2020, 9:02 AM
    Hey folks, Wondering what the right approach is here. We are feeding the output of a task into a FilterTask and somehow if the upstream task fails it the FilterTask gets a signal.FAIL instead of the expected dict. The upstream task from the FilterTask just raises signals.FAIL, but the FilterTask still runs and complains that
    @task
    def some_task_that_can_fail():
        if something_bad:
            raise signals.FAIL()
        return dict()
    
    filter_out_stuff = FilterTask(filter_func=lambda config: config.get('some_parameter') = 'today')
    
    with Flow() as flow:
        .....
        configurations = some_task_that_can_fail.map(some_inputs)
        useable_configs = filter_out_stuff(configurations)
    j
    • 2
    • 4
  • a

    ARun

    09/04/2020, 3:00 PM
    Hey guys, How do I keep a database connection active through out an entire flow? I keep getting the pickling error when I try to share/use the database connection within tasks
    TypeError: cannot pickle '_thread.lock' object
    c
    s
    • 3
    • 4
  • k

    Karthikeyan Rasipalayam Durairaj

    09/04/2020, 4:04 PM
    Has Prefect ability to Integration with Databricks/ability to orchestrate Databricks jobs? Please confirm
    c
    • 2
    • 7
  • c

    Chris Martin

    09/04/2020, 5:38 PM
    Hi- I'm trying to understand how ResourceManagers work. In your example you have something like:
    @resource_manager
    class DaskCluster:
        def init(self, n_workers):
            self.n_workers = n_workers
    
        def setup(self):
            return Client(n_workers=self.n_workers)
    
        def cleanup(self, client):
           client.close()
    In this case how does the
    Client
    get passed to tasks? Is is pickled and sent around? Moreover if the client held some mutable state that could be updated by the tasks, what would happen?
    n
    • 2
    • 2
  • s

    Shaun Cutts

    09/04/2020, 5:44 PM
    I’m trying to start a cluster with a given worker image on dask-gateway, via prefect. How should I specify the image pull secret for the image?
    executor = DaskExecutor(
        address="...",
        cluster_class="dask_gateway.GatewayCluster",
        cluster_kwargs={
            image="my/image/...",
            image_pull_secret="..." # ???
        },
        adapt_kwargs={minimum=2, maximum=10}
    )
    n
    • 2
    • 2
  • b

    Berty

    09/04/2020, 8:05 PM
    Is there a way to specify a "base_url" for the UI when running
    prefect server start
    ?
    n
    • 2
    • 3
  • s

    Shawn Marhanka

    09/04/2020, 9:27 PM
    Hi - has anyone had issues with a FlowRunTask not being added to a flow. I initially thought it was just because I was on prefect core, but even when registering with prefect cloud the task doesn’t show up in the schematics. I am also getting this warning each time…
    UserWarning: Tasks were created but not added to the flow: {<Task: Flo Run Task Flow>}. This can occur when `Task` classes, including `Parameters`, are instantiated inside a `with flow:` block but not added to the flow either explicitly or as the input to another task.
    Thank you
    n
    • 2
    • 10
  • d

    dherincx

    09/06/2020, 4:01 AM
    Hey y'all. I'm trying to use the new depth execution feature on prefect. How do I go about on setting that up?
    c
    • 2
    • 2
  • j

    John Ramirez

    09/06/2020, 5:35 PM
    Hey everyone! How do I add a new member to my team? I sent an invite but it takes them to a login screen not a sign-up screen. Does the person need to create a login in first before I invite him like in GitHub?
    n
    • 2
    • 3
  • s

    Sachit Shivam

    09/06/2020, 6:29 PM
    Hey guys Is it possible to connect to more than one Dask cluster as the executor?
    c
    • 2
    • 6
  • t

    Thomas La Piana

    09/07/2020, 3:26 AM
    does anyone have a way to register a prefect flow from within a docker container? its using docker as storage, so do i need to use a docker in docker image and then add prefect to that? so far im just having to do it locally on my machine and other people are running into dependency problems of course
    n
    • 2
    • 4
  • n

    Nelson

    09/07/2020, 9:33 AM
    Hi all. I am using the functional API, and when using the same task multiple times, I cannot distinguish them in the schematic (example in the image, for
    upload_dataframe...
    for various tables). Opening the tasks also doesn’t provide info on their input to distinguish the different calls. Is there something I am missing that can help here?
    i
    j
    • 3
    • 5
  • s

    Sven Teresniak

    09/07/2020, 11:10 AM
    A task's retry timeout is blocking other tasks from running. Is this on purpose? While the retry-timeout is blocking other runnable tasks, their task-timeout (running time) is running. This seems a bit odd. Or did I miss something? In other words: when a task is waiting for retry, other tasks cannot use that worker (dask-worker in my case) but the waiting task's clock is running towards runtime-timeout. okay, maybe i was wrong and the waiting task is in state "pending". i maybe misinterpreted the message
    /usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py:822: UserWarning: This task is running in a daemonic subprocess; consequently Prefect can only enforce a soft timeout limit, i.e., if your Task reaches its timeout limit it will enter a TimedOut state but continue running in the background.
    😞
  • r

    Robin

    09/07/2020, 12:50 PM
    Hey all, I am having the
    Unexpected error: NoRegionError('You must specify a region.')
    , although I added
    AWS_DEFAULT_REGION
    (along with
    AWS_CREDENTIALS
    ) to the prefect secrets. Do I miss anything?
    n
    • 2
    • 8
  • a

    as

    09/07/2020, 1:38 PM
    Hey, I have a question on the use of the
    target
    parameter of tasks for checkpointing. If I'm correct it will not run the task if it can find a file with the name specified with the
    target
    parameter. correct? I'm wondering what happens if an upstream task has been triggered. Does it still looks for the target file or does task run nonetheless because the input has changed? If not, is there a way to achieve this behavior? Thanks
    j
    j
    • 3
    • 3
  • n

    Nuno Silva

    09/07/2020, 3:31 PM
    Hi. quick question: starting the agent in the command line works fine when specifying the server url
    prefect agent start --api http://<url>:4200
    but in code doesn't:
    from prefect.agent.local import LocalAgent
    agent = LocalAgent(agent_address="http://<url>:4200")
    agent.start()
    fails with:
    /tornado/netutil.py", line 174, in bind_sockets
        sock.bind(sockaddr)
    OSError: [Errno 99] Cannot assign requested address
    Any idea? thanks
    n
    • 2
    • 3
  • i

    itay livni

    09/07/2020, 9:31 PM
    Hi - Is
    apply_map
    the only way to implement a mapped case statement? Thanks
    j
    j
    • 3
    • 7
  • i

    itay livni

    09/08/2020, 4:16 AM
    https://prefect-community.slack.com/archives/CL09KU1K7/p1599529831359600?thread_ts=1599514262.359500&amp;cid=CL09KU1K7
  • t

    Thomas Hoeck

    09/08/2020, 7:17 AM
    Hi. I keeping getting flows failing with
    HTTPSConnectionPool(host='10.0.0.1', port=443): Read timed out. (read timeout=None)
    . I'm running a k8 agent. What is weird is that it is that it is not happening every time but in the same frequency (pictures shows how the same flow only fails every second time.
    j
    j
    • 3
    • 15
  • s

    Sven Teresniak

    09/08/2020, 7:37 AM
    Whats the most elegant/preferred way to synchronize flow runs? Using/setting tags like locks and polling prefect for runs w/ or w/o tags to decide/delay flow runs? I have more than one trigger for a certain flow-run and I need every run to be "atomic", that is, running several times is okay, but never in parallel.
    j
    • 2
    • 7
  • n

    Nate Joselson

    09/08/2020, 7:47 AM
    Hey All! I am trying to run a
    DbtShellTask
    with a dynamic set of
    dbt_kwargs
    that are defined at run time. The task works when I define the
    dbt_kwargs
    in the task initialization, but not when they are provided at runtime.
    from prefect import task, Flow, Parameter
    from prefect.tasks.dbt import DbtShellTask
    
    @task
    def generate_dbt_kwargs_dict(project_id):
        dbt_kwargs_dict = {
            'type': 'bigquery',
            'method': 'service-account',
            'keyfile': 'key.json',
            'project': project_id,
            'dataset': 'derived_data',
            'location': 'EU',
        }
        return dbt_kwargs_dict
    
    dbt_run_task = DbtShellTask(
            name='dbt_run_task',
            command='dbt run',
            profile_name='default',
            environment='test',
            overwrite_profiles=True,
            profiles_dir=profiles_path,
        )
    
    with Flow(name="dbt_flow") as f:
        project_id = Parameter("project_id")
        dbt_kwargs_dict = generate_dbt_kwargs_dict(project_id)
        run_task = dbt_run_task(dbt_kwargs=dbt_kwargs_dict)
    
    out = f.run(project_id='bq-project')
    This returns a prefect error:
    ERROR - prefect.TaskRunner | Unexpected error: TypeError("'NoneType' object is not a mapping",)
    I assume this means from here that the
    dbt_kwargs
    dictionary is
    None
    but I can’t see why, since I am passing the dictionary from the previous task… Does anyone have an idea about this? Thanks!
    j
    • 2
    • 5
  • g

    Georg Zangl

    09/08/2020, 5:17 PM
    Dear Team, my name is Georg, I´m a petroleum engineer, but have worked some time with data management, data analytics and workflow automation. I am not a software developer, more of an advanced user. I have troubles to install Prefect on a clean Ubuntu 18.04 VM and I think I am not alone (btw: I have installed 0.12 previously almost effortless). Potential clients (like me) are fighting for days to install Prefect, which should actually be a matter of some minutes, max. half an hour. I had to find out, that some development packages (GCC and python3-dev) are necessary for a successful pip install, but then failed with a permission error in my virtual environment. Hit the wall - no way to resolve this. So, back to the drawing board. Tried pipenv, didn´t work. then trying without virtual environment, result: prefect: command not found. Guys, if you want this software to be a success, you need to get a deployment manager who makes sure, Prefect can be deployed on a clean machine w/o any troubles. You are loosing clients before you even got them. - Now, CAN ANYBODY PLEASE,PLEASE HELP TO INSTALL PREFECT ON A CLEAN UBUNTU 18.04?
    j
    • 2
    • 2
  • r

    Richard Hughes

    09/08/2020, 7:47 PM
    Hi All, I am looking for how to run a flow using the prefect
    version_group_id
    Where is the documentation for this?
    j
    • 2
    • 16
  • b

    bral

    09/08/2020, 7:50 PM
    Hello! is there currently the possibility of group editing flow? The project has 10+ flow and I want to turn off the schedule via ui. I am using version 0.13.1. Is there such a possibility in new versions?
    j
    • 2
    • 4
  • m

    matta

    09/09/2020, 1:56 AM
    Cool talk on APL, kinda the precursor to Pandas.

    https://www.youtube.com/watch?v=TT8Ub1Xs0dw▾

  • a

    ARun

    09/09/2020, 2:17 PM
    Hey guys, I'm still new to prefect, and trying to figure it out. I'm testing out a simple task. I have a couple dozen sql files, each sql file contains mutiple sql statements I want all the files to be executed against the database at once in parallel. I tried multiple different ways, using the dask executor example, and now the using applied_map. No matter what I seem to do, on prefect cloud, everything runs one by one. what am I missing? my current attempt
    files = []
    for file in os.listdir(path):
        files.append(path+file)
    
    @task
    def run_file(file):
        with open(file) as f:
            line = f.readline()
            name = f.name
            sql = ""
            for _, line in enumerate(f):
                sql += line
                if line.rstrip().endswith(";"):
                    query.run(query=sql)
                    sql = ""
    
    with Flow("parallelfiles") as flow:
        apply_map(run_file, files)
    
    flow.register(project_name="test")
    j
    • 2
    • 5
  • m

    Matt Wong-Kemp

    09/09/2020, 2:57 PM
    Hey there - I've got a flow that the UI thinks has been killed by zombie killer for not heartbeating, but the flow itself is still running in kubernetes updating tasks - is there any way to get the UI into a happy state again? Happy to ping any UI links across if that's useful
    j
    • 2
    • 4
  • r

    Richard Hughes

    09/09/2020, 3:00 PM
    Hi All, when you publish the next version of flow should the schedule flows be disabled from the previous version?
    j
    • 2
    • 5
  • r

    Richard Hughes

    09/09/2020, 3:01 PM
  • j

    Jeremiah

    09/09/2020, 4:24 PM
    @james.lamb has the honor of being the first contest entry via GitHub: https://github.com/PrefectHQ/prefect/issues/2880#issuecomment-689670701
    😊 1
    :marvin: 3
    🚀 4
Powered by Linen
Title
j

Jeremiah

09/09/2020, 4:24 PM
@james.lamb has the honor of being the first contest entry via GitHub: https://github.com/PrefectHQ/prefect/issues/2880#issuecomment-689670701
😊 1
:marvin: 3
🚀 4
View count: 1