https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Crawford Collins

    04/24/2020, 5:59 PM
    I've having a little trouble using the SQLiteQuery task. https://docs.prefect.io/api/latest/tasks/sqlite.html#prefect-tasks-database-sqlite-sqlitequerytask I'm trying to pass the db name as a parameter, but it is not working. The query task does not accept parameters, only string. I've attached a concise version of my code and error message below.
    def test_meta_model_regression():
        meta_model_run = meta_model_flow.run(
            db = "test.db",
        )
        assert meta_model_run.message == "All reference tasks succeeded."
    
    with Flow("meta_model_flow") as meta_model_flow:
        db = Parameter("db")
        models = SQLiteQuery(db,"SELECT identifier FROM models")
    
    >> TypeError: expected str, bytes or os.PathLike object, not Parameter
    Is there some way to pass the parameter "test.db" to the query?
    j
    • 2
    • 4
  • t

    Troy Sankey

    04/24/2020, 6:44 PM
    We're just starting to build out a re-usable library of prefect tasks, but ran into the fact that we can't actually serialize any of our flows because they refer to tasks defined in separate (common) python files. For my testing, this is led me to copy+paste common code into each flow file (very not-DRY), so I'm wondering if there's clearly something I'm missing? We're using the Docker storage environment.
    j
    c
    • 3
    • 2
  • c

    Christopher Harris

    04/24/2020, 8:40 PM
    Question #2: Here is an example of a flow I’m using: The idea is we pull in a list of “documents” from a single source and we want to push each document to every sink.
    def execute(pipeline_config: PipelineConfiguration):
        project = pipeline_config.project
        with Flow("test-flow") as flow:
            # Service Initialization
            source = init_source(project, pipeline_config.
            sinks = init_sink.map(unmapped(project), pipeline_config.sinks)
    
            # Perform ETL
            documents = pull(source)
            push.map(documents, sinks)
    
        flow.run(executor=LocalExecutor())
    The problem with this approach is it does a one to one mapping - like the first image. I want a many to one mapping, like the second image. Effectively i am trying to recreate the following logic
    for each document:
        for each sink:
            sink.push(doc)
    j
    e
    • 3
    • 8
  • b

    Brad

    04/25/2020, 2:08 AM
    Hey team, I’m trying to run the docker agent inside of a docker container but having some troubles - has anyone attmped this?
    j
    v
    • 3
    • 9
  • t

    Tom B

    04/25/2020, 3:34 AM
    I have certain tasks that need to run on a monthly basis and other tasks that need to run on a daily basis. The outputs from the monthly tasks are inputs to the daily tasks. Can these tasks be run in a single flow or do I need to create two different flows with each flow tied to the different interval schedule (monthly versus daily schedule, respectively)? I can't seem to run two different tasks with different time interval requirements to run in a single flow. Can anyone help?
    n
    c
    m
    • 4
    • 10
  • j

    Joe Schmid

    04/25/2020, 3:50 AM
    Is there a preferred logging approach from within callbacks? Use case is an
    on_execute()
    (or the existing
    on_start()
    ) called from an Environment. I can pass the environment's
    self.logger
    -- that seems clunky, but the following doesn't seem to work:
    def on_execute(parameters: Dict[str, Any], provider_kwargs: Dict[str, Any]) -> None:
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Checking Flow run parameters: {}".format(parameters))
    👀 1
    c
    m
    • 3
    • 5
  • a

    Arsenii

    04/25/2020, 5:18 AM
    Anyone runs into a bug, when there's a a couple tasks mapping over a list with 10-20 elements (or more) in the Cloud, the execution just randomly stops?
    Task 'Some important work[19]': Calling task.run() method...
    and it just "runs" this task forever, but there's no
    changed state from ... to Running
    afterwards When using a LocalExecutor. I thought that this maybe has something to do with me using
    raise SKIP
    to "filter" some elements, but rewriting using a
    FilterTask
    didn't fix the problem
    n
    m
    • 3
    • 27
  • z

    Ziyao Wei

    04/25/2020, 2:44 PM
    Is there an easy way to pass persistent data between flow runs?
    n
    • 2
    • 6
  • z

    Ziyao Wei

    04/25/2020, 4:34 PM
    Do prefect support Airflow’s
    max_active_runs
    or similar options?
    n
    p
    • 3
    • 4
  • d

    Daniel

    04/26/2020, 9:47 AM
    Hi, just joined this communit. Have started using Prefect in the past month. Have worked with different ETL tools in the past, and really think this is the best so far! My question for now: I am building an open data set for the Netherlands (also pertaining to mapping the spread of COVID-19) and have about 5 smaller flows that pulls data from public sources (bureau of statistics, database of addresses, reported cases etc.). What is the most idiomatic way of running these independent flows in parallel (prior to the actual modeling that needs to be done)? Just execute the data collection flows all at once on a DaskExecutor? Or is there a way to combine e.g. four flows into a fifth that is dependent on the four independent ones?
    :upvote: 1
    😄 1
    👋 2
    j
    d
    • 3
    • 6
  • z

    Ziyao Wei

    04/27/2020, 2:05 AM
    Q: is there a way to cache output persistently?
    l
    • 2
    • 4
  • k

    Kostas Chalikias

    04/27/2020, 9:04 AM
    Hello there, last time we checked it was not possible to kill a running flow/task from the UI. Has this been fixed now or is there a workaround I can use?
    :upvote: 1
    z
    • 2
    • 2
  • k

    Kamil Okáč

    04/27/2020, 1:11 PM
    Hello again, guys. Do I understand correctly that it's not possible to restart server and persist its state (information about past flow/job runs etc.)?
    j
    • 2
    • 2
  • d

    David Ojeda

    04/27/2020, 2:18 PM
    Hi, I have a question that is a follow-up from one question I asked on April 19th. I managed to deploy a prefect server and to run flows with docker storage and a DaskKubernetes environment. The only hurdle I have left (I hope!) is that an important part of our Task depend on some common configuration set on
    prefect.context
    . This includes some secrets and some URLs and object ids on some internal rest services. Before we used the prefect server, we had a small cli that would populate the
    prefect.context
    and use it when running the flow:
    with prefect.context(**our_custom_vars):
        flow_state = flow.run(parameters=flow_parameters, ...)
    My ultimate objective is to have a flow with a default context that I can run from the UI or schedule it with that context when deploying it. In my question of April 19th, @Jeremiah pointed out that there is no way to set these contexts at the moment and you will discuss it internally later… is there any update on this front? Otherwise, I am looking for alternatives or workarounds: One workaround would be to understand where the context would be set on an agent, worker, or runner (I am not sure which one). Jeremiah also pointed our that the FlowRunner requests the context from the server, but I can’t find where or even if a flow has its context saved anywhere (there does not seem to be a field named context on the
    flow_by_pk
    query). Another workaround would be to populate environment variables or change the default
    config.toml
    of the agent, worker, or runner (I am not sure which one) so that the
    prefect.context
    is populated with these values. I am not sure if this would work. Another workaround may be to override the
    __setstate__
    and
    __getstate__
    method, so that the flow can retrieve the context when unpickled. I am not sure if this would work either. Any ideas on which of these workarounds may be the best bet here?
    j
    j
    • 3
    • 17
  • j

    Jacques

    04/27/2020, 2:24 PM
    Hi everyone, quick question - it seems from docs (https://docs.prefect.io/core/concepts/mapping.html) like setting a trigger on a Task should still work if I use map on that Task. I'm trying to do that but doesn't seem to work. Do I need to do something special?
  • j

    Jacques

    04/27/2020, 2:25 PM
    I want to do
    trigger=all_successful
    for the mapped function, but it only works for unmapped tasks
    k
    • 2
    • 13
  • a

    alvin goh

    04/27/2020, 2:42 PM
    Hi I understand that depth first execution was being worked on in the past, is it released yet? If not, is there a workaround I can do to have a depth first execution??
    k
    • 2
    • 1
  • a

    An Hoang

    04/27/2020, 2:48 PM
    Hello, my work’s IT team is not keen on docker apparently so I can’t run prefect server on work’s machine. Here is what they said, quoted directly since I don’t know much about this stuff. Is there a way to deploy prefect without docker?
    X does want to push back because docker is a involved, and has many unsolved aspects since the daemons run as root. It didn’t look like the prefect ui could run in singularity which is the user-space container management thing that they were talking about.
    If it is possible to install the prefect ui as a non-docker, either as a library, or on a server. I think that has many fewer concerns from the systems team.
    j
    d
    • 3
    • 9
  • m

    Matias Godoy

    04/27/2020, 3:42 PM
    Hi guys! I'd like to start a flow execution from another flow. Is it possible? I have flow A that is scheduled and runs every 5 minutes. When it finds something pending in our DB it should start flow B with some parameters. Can I do that?
    k
    j
    +2
    • 5
    • 14
  • p

    Pedro Machado

    04/28/2020, 2:03 AM
    Hi everyone. What is the recommended pattern to rate limit access to an API? Suppose you have a task that returns items and you want to map that output to a task that queries the API for each item. What is the recommended way to rate limit those calls to the API? Thanks!
    👀 1
    n
    • 2
    • 6
  • a

    alvin goh

    04/28/2020, 7:31 AM
    Is it possible to have a List operation complete even if some upstream tasks fail? I want to collect the results of upstream tasks regardless of whether they fail.
    from prefect import task, Flow
    from prefect.engine import signals
    import random
    from prefect.triggers import all_finished
    
    @task()
    def randomly_fail():
        x = random.random()
        if x > 0.7:
            raise ValueError("x is too large")
    
            
    @task(trigger=all_finished)
    def print_result(l):
        print(l)
        return l
    with Flow("random-mapping") as f:
        result = [randomly_fail() for i in range(10)]
        print_result(result)
        
    f.run()
    output shows that the list operation failed so i cant print the final results.
    [2020-04-28 07:33:16,639] INFO - prefect.TaskRunner | Task 'randomly_fail': Starting task run...
    [2020-04-28 07:33:16,640] ERROR - prefect.TaskRunner | Unexpected error: ValueError('x is too large',)
    Traceback (most recent call last):
      File "c:\tools\anaconda3\envs\prefect\lib\site-packages\prefect\engine\runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "c:\tools\anaconda3\envs\prefect\lib\site-packages\prefect\engine\task_runner.py", line 884, in get_task_run_state
        self.task.run, timeout=self.task.timeout, **raw_inputs
      File "c:\tools\anaconda3\envs\prefect\lib\site-packages\prefect\utilities\executors.py", line 185, in timeout_handler
        return fn(*args, **kwargs)
      File "<ipython-input-6-020915229ace>", line 10, in randomly_fail
        raise ValueError("x is too large")
    ValueError: x is too large
    [2020-04-28 07:33:16,647] INFO - prefect.TaskRunner | Task 'randomly_fail': finished task run for task with final state: 'Failed'
    [2020-04-28 07:33:16,662] INFO - prefect.TaskRunner | Task 'List': Starting task run...
    [2020-04-28 07:33:16,669] INFO - prefect.TaskRunner | Task 'List': finished task run for task with final state: 'TriggerFailed'
    [2020-04-28 07:33:16,685] INFO - prefect.TaskRunner | Task 'print_result': Starting task run...
    Trigger was "all_successful" but some of the upstream tasks failed.
    [2020-04-28 07:33:16,693] INFO - prefect.TaskRunner | Task 'print_result': finished task run for task with final state: 'Success'
    [2020-04-28 07:33:16,694] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    d
    • 2
    • 2
  • d

    David Hogarty

    04/28/2020, 1:54 PM
    Got a quick question
  • d

    David Hogarty

    04/28/2020, 1:55 PM
    Using the functional API, sometimes I know of implicit race conditions between two paramaterless tasks
  • d

    David Hogarty

    04/28/2020, 1:55 PM
    or, two tasks that take the same parameter
  • d

    David Hogarty

    04/28/2020, 1:55 PM
    is there any way for me to sequence them explicitly?
  • d

    David Hogarty

    04/28/2020, 1:55 PM
    i.e. make task a an implicit dependency of task b, even though no data flows from a to b
    ✅ 1
    n
    z
    • 3
    • 15
  • t

    Thomas Wiecki

    04/28/2020, 3:18 PM
    so I can’t use map() with a multiprocessing scheduler?
    z
    j
    • 3
    • 51
  • t

    Thomas Wiecki

    04/28/2020, 3:18 PM
    unfortunately my code isn’t thread-safe (theano)
  • t

    Thomas Wiecki

    04/28/2020, 3:29 PM
    is there a work-around?
    j
    • 2
    • 1
  • w

    Will Milner

    04/28/2020, 3:37 PM
    Hello all, I'm having some trouble deploying prefect to a remote server in aws. After I call flow.register() that succeeds, but the flow never shows up in the UI on the remote server, are there any extra steps I need to take to register on a remote host?
    z
    n
    • 3
    • 12
Powered by Linen
Title
w

Will Milner

04/28/2020, 3:37 PM
Hello all, I'm having some trouble deploying prefect to a remote server in aws. After I call flow.register() that succeeds, but the flow never shows up in the UI on the remote server, are there any extra steps I need to take to register on a remote host?
z

Zachary Hughes

04/28/2020, 3:57 PM
Hi @Will Milner, taking a look now!
w

Will Milner

04/28/2020, 4:07 PM
it seems that there is an issue connecting to the core server, all requests are coming back as connection refused. I checked the logs for the core server and it's running fine, both are on the same host
z

Zachary Hughes

04/28/2020, 4:07 PM
It sounds like either your registration step or your UI may be pointed at the incorrect endpoint. Can you confirm that you ran
prefect backend server?
w

Will Milner

04/28/2020, 4:09 PM
I did run prefect backend server. Did some more digging and the issue I'm having is what's discribed in this issue - https://github.com/PrefectHQ/prefect/issues/2237
z

Zachary Hughes

04/28/2020, 4:15 PM
Okay, solid-- thank you! And if I'm understanding correctly, all of your services (UI, Hasura, Postgres, etc) are running on the same instance? If that's the case, I'd anticipate
localhost
endpoints doing the trick-- just trying to get the lay of the land on your setup.
w

Will Milner

04/28/2020, 4:19 PM
they are all running on the same instance, it's an aws linux instance if that makes a difference
z

Zachary Hughes

04/28/2020, 4:25 PM
Okay, gotcha. CCing @nicholas in case this is an obvious UI issue I'm missing.
n

nicholas

04/28/2020, 4:30 PM
Hi @Will Milner - you'll need to set the graphql endpoint for the UI before you spin up the services. You can set that in your
~/.prefect/config.toml
file like this:
# ~/.prefect/config.toml
[server]
    [server.ui]
    graphql_url = "<http://your_ec2_endpoint.com:4200/graphql|your_ec2_endpoint.com:4200/graphql>"
or by setting the environment variable on the machine:
export PREFECT__SERVER__UI__GRAPHQL_URL="<http://your_ec2_endpoint.com:4200/graphql|your_ec2_endpoint.com:4200/graphql>"
After setting that variable, you can run
prefect server start
again!
z

Zachary Hughes

04/28/2020, 4:34 PM
Thanks for the answer, Nicholas! It's also worth noting that you'll need to run Prefect
0.10.3
or higher to take advantage of that.
😄 1
:upvote: 1
w

Will Milner

04/28/2020, 4:42 PM
👍 that worked thanks! Is there any documentation that goes over each config setting we can edit? digging around the git repo I found this https://github.com/PrefectHQ/prefect/blob/master/server/src/prefect_server/config.toml but that referred to the config variable under the category
services
instead of
server
n

nicholas

04/28/2020, 4:46 PM
Great! We're still working on expanding the configuration documentation but for now you can reference this config (different from the one you posted) for the defaults that exist. Sorry for the confusion there, the config you referenced is for the initial build of the images, rather than the runtime configuration.
👍 1
View count: 1