https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • h

    Hui Zheng

    12/04/2020, 7:50 PM
    Hello, I am implementing Timeout for one of my flow. I have a few questions. 1. how to get the flow to enter
    Timeout
    state when timeout happens to a task. https://docs.prefect.io/api/latest/engine/state.html#timedout It seems there is only signals.FAIL instead of signals.TIMEOUT. 2. the fow DAG: A > B > C > D > E. Where E is always running with
    trigger=triggers.always_run
    . when timeout happens to task A, B, C, D, the task raises an exception
    TimeoutError
    , which is not a
    FAIL
    signal. and it causes the flow to terminate without running E properly. To overcome that, Currently I have to implement the exception handler in every task, it’s very tedious. Any better way to do it?
    Try:
                except TimeoutError:
                    raise signals.FAIL("Task Timeout.")
    👀 1
    n
    7 replies · 2 participants
  • m

    Mark McDonald

    12/04/2020, 8:01 PM
    Happy Friday Prefect community! I just wanted to share this delicious flow run we recently had. I hope you're all loving Prefect as much as I am!
    😂 2
    🍗 7
    😄 7
    j
    1 reply · 2 participants
  • h

    Hui Zheng

    12/04/2020, 8:44 PM
    Hello, I ran into another issue with timeout setting. I would like to set a default timeout for all the tasks. I shall be able to do it through environment variable
    PREFECT__TASKS__DEFAULTS__TIMEOUT
    according to this doc
    <https://docs.prefect.io/core/concepts/configuration.html#environment-variables>
    However, when I do it properly, I got an error.
    TypeError: Only integer timeouts (representing seconds) are supported.
    even when I assigns the integer value. (See thread for details)
    m
    3 replies · 2 participants
  • d

    David Kuda

    12/04/2020, 9:58 PM
    Hello again! When I set flow storage to GitHub, I don’t understand what the desired behaviour is supposed to be. 1. Does that mean that I do not need to keep my files locally? In a way that prefect automatically creates a temporary directory, pulls the scripts from the given repository with the credentials, executes the scripts and then deletes the script again? 2. Or do I still need to keep files locally? Does then prefect pull the tasks automatically? As far as I understand, I do not need to re-register the flow, but I can change the tasks and then push them. Plus I seem to have encountered a bug in the tutorial here: https://docs.prefect.io/core/idioms/file-based.html -> At the very end of the very first example, I ran into an error. I follow everything until I need to register the flow with this command:
    prefect register flow -f flows/my_flow.py -p MyProject
    This results in this error:
    ValueError: Flow could not be deserialized successfully. Error was: ValidationError({'storage': {'ref': ['Field may not be null.']}})
    Best wishes from Berlin!
    👀 1
    n
    2 replies · 2 participants
  • r

    Riley Hun

    12/05/2020, 9:35 PM
    Hi, I have deployed Prefect Server on K8s on GCP using the Prefect Server helm chart. Unfortunately, I'm experiencing some trouble when using the built-in agent that comes w/ the deployment. However, when I run
    prefect agent kubernetes start
    from outside the cluster w/in my own local machine's CLI, the error goes away and the flow runs successfully. Some insight on this would be much appreciated. I can confirm that the Prefect version of the deployed agent is correct too. Here's the error I'm getting from the using built-in agent that comes with the helm chart deployment:
    [2020-12-05 10:30:03+0000] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ConnectionError(MaxRetryError("HTTPConnectionPool(host='prefect-server-gke-apollo.default', port=4200): Max retries exceeded with url: /graphql//graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f5a560d3690>: Failed to establish a new connection: [Errno -2] Name or service not known'))"))
    Traceback (most recent call last):
      File "/opt/conda/lib/python3.7/site-packages/urllib3/connection.py", line 157, in _new_conn
        (self._dns_host, self.port), self.timeout, **extra_kw
      File "/opt/conda/lib/python3.7/site-packages/urllib3/util/connection.py", line 61, in create_connection
        for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
      File "/opt/conda/lib/python3.7/socket.py", line 752, in getaddrinfo
        for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
    socket.gaierror: [Errno -2] Name or service not known
    m
    4 replies · 2 participants
  • v

    Von Christian Pizarro

    12/07/2020, 4:05 AM
    Hi, We have noticed that some of our flows are running multiple times even if its just scheduled to run once a day. But the odd thing is its running exactly on the same time. Does anyone have similar issues? Also, I saw this https://github.com/PrefectHQ/prefect/issues/3758 just wanted to know if anyone knows if this has been resolved. Thank you!
    d
    j
    5 replies · 3 participants
  • k

    Kostas Chalikias

    12/07/2020, 10:26 AM
    Hello team, is it still not possible to kill a running task/flow from the cloud UI?
    j
    5 replies · 2 participants
  • a

    Adam

    12/07/2020, 2:28 PM
    Hello friends, happy Monday. I’m getting a strange error in Prefect regarding
    Failed to set task state with error: HTTPError('413 Client Error: Request Entity Too Large for url: <https://api.prefect.io/graphql>')
    — whats the best way to debug this?
    c
    19 replies · 2 participants
  • c

    Charles Lariviere

    12/07/2020, 3:52 PM
    Hey 👋 I’m trying to figure out how to properly deploy Prefect on AWS ECS/Fargate such that we always have an agent running, ready to pick up any scheduled flow. I was able to execute
    prefect agent fargate start -t $PREFECT_TOKEN
    locally which appeared to work (it successfully started and was waiting for flows), but; 1. It never received any flows triggered from Prefect Cloud 2. This seems to be tied to my local terminal session (i.e. if I close my terminal, the agent shuts down) 3. I can’t see where this is actually deployed — I was expecting a new cluster to be created in AWS ECS, but that’s not the case Am I misunderstanding how this is supposed to work? If so, how can we go about deploying an always-running agent that’s ready to deploy new Fargate tasks whenever a flow is scheduled? Or is this not how Prefect is supposed to work either?
    j
    m
    +1
    8 replies · 4 participants
  • b

    Brian Mesick

    12/07/2020, 4:27 PM
    Hi folks, we’ve got some older flows that I’d like to get current with how Prefect works now, but I’m not entirely sure I understand how to fit our old paradigm into the new world. I’ll thread in because this might be long…
    k
    6 replies · 2 participants
  • s

    Sean Talia

    12/07/2020, 5:42 PM
    After having combed through a good amount of documentation and a handful of threads in this channel, I'm having a hard time understanding what an appropriate use case of a
    Docker Task
    is; it seems like if you want to string together a flow that pulls an arbitrary set of docker images and runs them as containers in some sequence, the canonical way to do this is to make each instance of pulling + running a container its own
    Flow
    (that leverages docker
    storage
    ), and then to orchestrate a flow of flows. Given this, in what circumstance would you actually want to use a Docker Task? On the face of it, if you wanted to use one, it seems like you'd have to use some form of storage other than Docker for the task's parent flow to avoid a Docker-in-Docker situation; and I think not being able to use Docker Storage also implies that you wouldn't be able to use a Docker Agent for executing that flow + task? Does that sound right, or am I confusing the intended relationship between agent <-> flow <-> task?
    k
    1 reply · 2 participants
  • r

    Richard Hughes

    12/07/2020, 6:01 PM
    Hi Community, I have a question about the prefect agents. If I have a prefect agent running on a machine can it run concurrent flows or does prefect need to have 2 agents running simultaneously to have concurrent flows running?
    j
    5 replies · 2 participants
  • l

    liren zhang

    12/07/2020, 7:16 PM
    Hi Community, We have used graphql to extract prefect logs. The initial approach was just doing getting a list of flow, flow run and task run to iterate through the list. Obviously, it is the least performant way of extracting data from a database. We also started reviewing batching and caching techniques that Facebook uses with their dataloader. in the mean time, I am hoping to check in with the community experts and see if there is a much easier way to extract the logs without even going through graphql interface. Any info would be much appreciated.
    m
    12 replies · 2 participants
  • g

    Gary Levin

    12/07/2020, 9:44 PM
    Just arrived!
    👋 2
  • g

    Gary Levin

    12/07/2020, 9:54 PM
    Looking to see if there is a definition of the functional API that can be used in the Flow context in a with statement. I was wondering why
    with Flow('Arithmetic') as flow:
        x, y = Parameter('x'), Parameter('y')
        operations = {
            '+': x + y,
            '-': x - y,
            '*': x * y,
            '/': x / y
        }
    resulted in operations containing {'+': <Task: Add>, '-': <Task: Sub>, '*': <Task: Mul>, '/': <Task: Div>} That is not what python would normally produce for the dict. "x+y" is coerced to <Task: Add>. "+" must be overridden in the Flow context. Where will I find the description of what magic is going on here? For instance, which operators are overridden?
    👀 1
    m
    3 replies · 2 participants
  • g

    Gary Levin

    12/07/2020, 10:01 PM
    https://docs.prefect.io/core/concepts/tasks.html#operators indicates that this happens, but not which operators are available. I am guessing that
    +
    is defined as something like this in the Flow Context:
    lambda(x:Parameter, y:Parameter) Task("Add". lambda()(x.value + y.value))
    k
    4 replies · 2 participants
  • p

    Pedro Machado

    12/08/2020, 3:58 AM
    Hi. Is it possible to run some flows with a concurrency limit and others without the limit using the same agent? It seem that if I start the agent with the label that has the concurrency limit it won't pickup flows that don't have that label.
    👀 1
    c
    2 replies · 2 participants
  • p

    Pedro Machado

    12/08/2020, 5:46 AM
    Hi. I am noticing an inconsistency between the run time shown on the Flow page and that shown in the Flow run page. Screenshots and flow run ID in the thread.
    j
    m
    7 replies · 3 participants
  • s

    simone

    12/08/2020, 12:41 PM
    Hi! I am running prefect on an on prem HTC cluster using 
    HTCondor
     as workload manager and using a 
    DaskExecutor
     connected to a cluster started with  
    dask jobqueue
    . I am mapping a function over a large number of images  (20000). I started running a subset of data (5000) using a predefine number of workers and the function is mapped and run correctly using all the workers available. If the cluster is started in adaptive  mode using 
    dask jobqueue
      prefect is able to increase the number of processes run as expected by the adaptive mode and monitored in the workload manager however the size of the dask cluster doesn't change and the function isn't mapped, not even to the minimum number of workers that are predefined in the cluster. Interestingly 
    HTCondor
     allocates new running processing but it seems to be independent from the dask cluster. It seems that the 
    dask jobqueue
      initialised cluster cannot communicate with the processes started by prefect. After few minutes the processes started by prefect die printing out this error:
    distributed.scheduler - ERROR - Workers don't have promised key: ['<tcp://192.168.0.2:40093>']
    # the tcp address change
    Any help will be greatly appreciated! Thanks!
    j
    7 replies · 2 participants
  • m

    Mark McDonald

    12/08/2020, 3:16 PM
    Hi - we're seeing some duplicate scheduled flow runs in Prefect cloud. For example, this flow is supposed to run once a day, but we're getting duplicates.
    j
    c
    16 replies · 3 participants
  • l

    liren zhang

    12/08/2020, 3:35 PM
    Hi, is there a Prefect documentation page that list out all the supported Windows, Linux, MacOS platform versions? it is kind hard to go through trial and error to determine if Prefect will work smoothly with specific version of the OS's
    j
    3 replies · 2 participants
  • z

    Zach

    12/08/2020, 6:29 PM
    Is there a way to run a task on a specific docker image, and not have every task in a run operate within the same docker container
  • z

    Zach

    12/08/2020, 6:39 PM
    I am reading in the prefect logs here that you can pull an image, start a container with a command, wait for the container to finish running, then stop the container. This is 4 tasks, but it would be really nice if I could just add a docker image name as an argument to the task decorator.
    j
    10 replies · 2 participants
  • s

    Sergey Gerasimov

    12/08/2020, 6:58 PM
    Hi guys, I have the following task to be automated. There a lot of CREATE TABLE SQL-scripts with dependecies (in FROM JOIN) between tables to be created and other long-living tables. These scripts are partially re-used from project to project. So in one project we use some part of these scripts, in onther project - another part. In each project we have some existing table which is used as input for these scripts (in FROM / JOIN part) And this code based growths. It looks like this codebase can be automatically (via SQL parser) represented as some DAG and in each project we can deal with subDAG parametrized by initial table. Is it good idea to use prefect as backend for modeling and executing such DAGs? We already implemented some prototype on Makefiles and actively use ability of GNU make to track dependencies between script's modification time and table creation time.. It makes possible to make only necessary work if some SQL-scripts are modified. As I understood such time-based execution model is not implemented in prefect. Am I right? Is it good idea to implement it manually? Does prefect have some support of Oracle database for results? Or maybe I should look on other solutions?
    j
    2 replies · 2 participants
  • e

    Emil B

    12/08/2020, 7:39 PM
    Hi, Looking at https://www.prefect.io/get-prefect/#pricing - Do "2 concurrent flow runs" (the Team plan) mean that only 2 flows can run at the same time? I'm aware that the question is self explanatory, however I'm a bit baffled that such a constraint is in place. Am I missing something?
    c
    k
    3 replies · 3 participants
  • d

    DJ Erraballi

    12/08/2020, 10:07 PM
    Hi there, i am running into some memory utilization issues with some of my prefect flows, but am struggling to figure out why it seems memory isn’t being freed up or released between tasks in a specific flow: In the first graph here i have memory utilization for a specific flow (from cloudwatch metrics). As you can see memory steadily increases from 12:22 - 12:46 and then stays level until the flow completes. In the second attachment i have the flow and task durations during the flow. We can see that memory utilization steadily increases during the link patient events run, but after the task is completed the memory is never freed up. The flow in question is defined as such, so the link_patient_events tasks does not return anything other than a boolean: (Using prefect v0.11.5)
    with SRMFlow(
            'TriggeredPatientLoaderFlow',
            feature_flag='patient_upsert_events_enabled'
    ) as trigger_patient_loader_flow:
        client_abbrev: str = Parameter("client_abbrev", default="")
        event_source_id: int = Parameter("event_source_id", default=None)
    
        lock_acquired: bool = acquire_event_source_lock(client_abbrev, event_source_id)
        linked: bool = link_patient_events(client_abbrev, event_source_id, lock_acquired)
        aggregated_patients: List[PersonInputProperties] = aggregate_patients(client_abbrev, linked, event_source_id)
        bulk_activity_metadata: dict = get_bulk_activity_metadata(client_abbrev, aggregated_patients)
        loaded: bool = load_aggregated_patients(client_abbrev, aggregated_patients, bulk_activity_metadata)
        marked_complete: bool = mark_event_source_as_completed(client_abbrev, event_source_id, loaded, lock_acquired)
        lock_released: bool = release_event_source_lock(client_abbrev, event_source_id, marked_complete, lock_acquired)
    c
    j
    4 replies · 3 participants
  • j

    jack

    12/09/2020, 12:00 AM
    Is there a way to specify S3 storage option via config file (and avoid having to specify the bucket explicitly in each flow)?
    c
    1 reply · 2 participants
  • e

    Eric

    12/09/2020, 1:28 AM
    Hi Prefect, after using graphql to update the flow name:
    mutation {
      update_flow (_set: {name: "After Modified"}, where: {_and: {project_id: {_eq:"ea8106f2-1642-4e54-955a-3af1d7c7465e"}, name: {_eq: "Ori Flow Name"}} }){
        affected_rows
      }
    }
    it returned success and affected_rows=5 (5 different versions), but when I click 'start now' to create a new flow-run, it shows that:
    Last State Message
    [9 Dec 2020 9:26am]: Failed to load and execute Flow's environment: KeyError('After Modified')
    Is there any step I missed? I just want to modify the flow name after I registered the flow. Thank you🙏
    c
    3 replies · 2 participants
  • h

    Hamed Sheykhlou

    12/09/2020, 8:48 AM
    Hi I want to use SecretResult in my project. for learning purposes, I write an example project.
    import requests
    from prefect import Flow, task
    
    from prefect.engine.results import SecretResult
    
    
    @task(name='extract_task')
    def extract():
        res = requests.get('<https://jsonplaceholder.typicode.com/todos/1>')
        res_txt = res.text
        print(res_txt)
        return res_txt
    
    
    @task()
    def load():
        """Print the data to indicate it was received"""
        print(SECRET_RESULT.location)
        res_txt = SECRET_RESULT.read('secret1')
        print("Here's your data: {}".format(res_txt))
    
    
    with Flow("Callback-Example") as flow:
    
        SECRET_RESULT = SecretResult(extract, location='somewhere.json')
    
        e = extract()
        l = load()
        l.set_upstream(e)
    
    flow.run()
    but when I run it, gave some error on prefect source:
    File "/home/hamed/PycharmProjects/etlprefectcloud/venv/lib/python3.8/site-packages/prefect/engine/results/secret_result.py", line 38, in read
        new.value = self.secret_task.run(name=location)
    TypeError: extract() got an unexpected keyword argument 'name'
    its on
    prefect/engine/results/secret_result.py
    line 38:
    new.value = self.secret_task.run(name=location)
    . and if I delete the name argumans, it works. am I using the SecretResult wrong way?
    s
    3 replies · 2 participants
  • s

    Sébastien

    12/09/2020, 11:46 AM
    Hey all — I tried using
    task2.set_upstream(task1)
    to manually create a dependency, but that gives me:
    /usr/local/lib/python3.7/dist-packages/prefect/core/task.py:596: UserWarning: You are making a copy of a task that has dependencies on or to other tasks in the active flow context. The copy will not retain those dependencies.
    What's the right way to say (in functional API) that task2 can only run after task1 was completed, without messing up the flow? The entire flow:
    schedule = CronSchedule("0 * * * *", start_date=datetime.now())
    
    with Flow("Q", schedule=schedule) as flow:
        task2.set_upstream(task1)
    
        task1()
        some_result = task2()
        task3(some_result)
    j
    9 replies · 2 participants
Powered by Linen
Title
s

Sébastien

12/09/2020, 11:46 AM
Hey all — I tried using
task2.set_upstream(task1)
to manually create a dependency, but that gives me:
/usr/local/lib/python3.7/dist-packages/prefect/core/task.py:596: UserWarning: You are making a copy of a task that has dependencies on or to other tasks in the active flow context. The copy will not retain those dependencies.
What's the right way to say (in functional API) that task2 can only run after task1 was completed, without messing up the flow? The entire flow:
schedule = CronSchedule("0 * * * *", start_date=datetime.now())

with Flow("Q", schedule=schedule) as flow:
    task2.set_upstream(task1)

    task1()
    some_result = task2()
    task3(some_result)
j

josh

12/09/2020, 1:56 PM
Hi @Sébastien you should define the tasks first before you try to set dependencies. Something like this:
with Flow("Q", schedule=schedule) as flow:
    task_1 = task1()
    some_result = task2()    
    some_result.set_upstream(task_1)
    task3(some_result)
s

Sébastien

12/09/2020, 1:57 PM
@josh Shouldn't
task
have
set_upstream
rather than the result of a task?
j

josh

12/09/2020, 1:59 PM
Yeah that is the task, I was just copying your example. Could be better rewritten as:
with Flow("Q", schedule=schedule) as flow:
    task_1 = task1()
    task_2 = task2()    
    task_2.set_upstream(task_1)
    task3(task_2)
Calling
task3(task2)
is a shorthand way of passing the result of task2 into task3 (and thus making is an upstream task). Calling
task_2.set_upstream(task_1)
is setting the upstream directly without passing any data.
s

Sébastien

12/09/2020, 2:01 PM
You call it a task, but use the return of calling the task. I'm assuming calling the decorator returns a Task-like thing rather than an actual return (since runtime is separate), and that's why you call both
task_2
and
task2
tasks?
For me, a
Task
is
@task
, which is
task2
but not
task_2
j

josh

12/09/2020, 2:03 PM
Yes you are correct, constructing a flow uses deferred computation so while it is technically the result of the task run at runtime, during flow creation it is seen as the task object
s

Sébastien

12/09/2020, 2:04 PM
Sweet, that clarifies things a bit. Thanks for clarifying my assumption. Now it makes sense why it considers my initial approach detached, since it actually branched by creating a copy of the task instead of attaching to one global flow.
At least that's my current best guess — I don't know much (yet) about the internals
@josh That fixed it, thanks!
👍 1
View count: 3