https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • v

    Viv Ian

    04/20/2020, 6:20 PM
    @josh - Not sure if you were the josh in during the PyCon Zoom meeting. If so, would it be possible to get the link to the docker examples again?
    🙂 1
    j
    4 replies · 2 participants
  • m

    Manuel Mourato

    04/20/2020, 9:09 PM
    Hello guys I have a question regarding the use of a storage of type Docker: I have defined a flow and have set Docker as the storage type, specifying the image name among other parameters. I then build the image and register the flow, and indeed an image is generated with the respective flow inside. But now, I have other flows, and I want to save them to the same image that the other first flow is in. So, I want to increment the flows inside a given image, so that I can then deploy this single image with all flows to my production environment. However, everytime I register a new flow, it recreates the image, leaving only the most recent flow. Is there a way to get around this? Thank you.
    k
    3 replies · 2 participants
  • m

    Maxime Lavoie

    04/20/2020, 9:43 PM
    Hey folks, question for you all. Is it possible to encapsulate a series of tasks within one task in order to facilitate the writing of flows? Let’s say I have the following tasks A -> B -> C I want to create one task called D in such a way that if I create a flow E -> D, what I am really generating is E -> A -> B -> C Cheers!
    k
    e
    +2
    7 replies · 5 participants
  • d

    David Ojeda

    04/21/2020, 12:20 PM
    Hi, I am digging a bit on a bug and while doing a minimum example I found something strange. Am I using upstream_tasks on a map correctly? On the following example, “flow2” fails (this is how I am used to declare upstream tasks on a map) but “flow1" works:
    from prefect import Flow, Task
    
    
    class Generate(Task):
        def run(self):
            return list(range(10))
    
    
    class GetConfig(Task):
        def run(self):
            return {'offset': 1000}
    
    
    class Process(Task):
        def run(self, *, data):
            result = data + 1000
            <http://self.logger.info|self.logger.info>('Process of %d = %d', data,  result)
            return result
    
    
    generate = Generate()
    config = GetConfig()
    process = Process()
    
    with Flow('flow1') as flow1:
        dataset = generate()
        config_result = config()
        clean = process.map(data=dataset)
        clean.set_upstream(config_result)
    
    
    print('Flow1 edges:', flow1.edges)
    flow1.run()
    
    
    with Flow('flow2') as flow2:
        dataset = generate()
        config_result = config()
        clean = process.map(data=dataset, upstream_tasks=[config_result])
    
    
    print('Flow2 edges:', flow2.edges)
    flow2.run()
    The traceback is:
    [2020-04-21 12:16:45,325] INFO - prefect.TaskRunner | Task 'Process': Starting task run...
    [2020-04-21 12:16:45,325] ERROR - prefect.TaskRunner | Task 'Process': unexpected error while running task: KeyError(0)
    Traceback (most recent call last):
      File "/Users/david/.virtualenvs/iguazu-venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 256, in run
        state = self.run_mapped_task(
      File "/Users/david/.virtualenvs/iguazu-venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 721, in run_mapped_task
        upstream_state.result[i],
    KeyError: 0
    e
    j
    +1
    6 replies · 4 participants
  • a

    An Hoang

    04/21/2020, 2:37 PM
    Hello I’m trying a simple flow of doing linear regression in batches. The flow works when doing it sequentially but when I try with Dask backend it causes memory problems. What is confusing is that there is ample memory per worker. Can someone help me identify the problem? Am I doing something of a Dask anti-pattern somewhere?
    Untitled
    j
    d
    2 replies · 3 participants
  • m

    Manuel Mourato

    04/21/2020, 3:41 PM
    Hello guys I'm having issues getting my Local Agent to execute my flow runs I first register two test flows like this:
    # CREATE STORAGE
    
    storage=storage.build()
    
    # ADD FLOWS TO STORAGE
    storage.add_flow(test_flow)
    storage.add_flow(test_flow2)
    
    storage=storage.build()
    
    test_flow.storage=storage
    test_flow2.storage=storage
    
    # REGISTER FLOWS IN UI/DB
    test_flow.register(build=False)
    test_flow2.register(build=False)
    And they indeed appear in the UI (image bellow) Then I start the agent in my cli (image bellow) But when I try to run one of these flows, they stay permanently schedulled (image bellow) What am I doing wrong? Ty
    z
    m
    12 replies · 3 participants
  • j

    Jacques

    04/21/2020, 4:21 PM
    Hi all, is it possible to get the state of a running flow from the Flow object or FlowRunner?
    z
    9 replies · 2 participants
  • j

    Jacques

    04/21/2020, 4:22 PM
    I'm running the flow in a thread, and would like to grab the result of the first task before the Flow.run() completes.
  • b

    Bertrand GERVAIS

    04/21/2020, 4:41 PM
    Hi, I am trying to implement a geographical data processing worflow, working on datasets that do not fit into memory, and I dont find a proper documented way to perform parallel computation on each object in my datasets. What I would like to do is not return all objects, as the result of a task, to map a function on them. I would like to read data inside a task and dispatch objects (either using a queue or some list containing a reasonnable number of objects) for parallel processing, until I have read and processed all data (+ dumped results into a DB or file, that will be handled by a later task), and I dont find a way to do so with Prefect. It seems I need to return all objects, which I cant, in order to parallelize computation on them with the map function. I have tried using the executor inside a task to call its map function, which works fine and could suit my needs. But I am not sure if this is the right way to do so, and if it can be considered as an official feature you will keep supporting. According to you, what is the best solution to handle this issue ?
    z
    j
    13 replies · 3 participants
  • v

    Viv Ian

    04/21/2020, 5:15 PM
    I’m getting the following error within my app container when attempting to do a remote request. This is part of a kubernetes cluster that also has apollo, hasura, postgres, ui, graphql, and scheduler running. My most recent steps (about 30 minutes ago) were the following: • delete all pods and service in kubernetes •
    docker system prune -a
    for a fresh start 🙂 • created prefect service/deployments for k8s (using latest images) • docker build my app image • created service/deployments for my app for k8s Any ideas? THANKS!
    Untitled
    z
    12 replies · 2 participants
  • d

    David Ojeda

    04/21/2020, 5:26 PM
    I want to share the results of my long day of bug hunting. It all comes down to this screenshot: on the left, the visualization of my graph that does not work (note there are two merge nodes). On the right, the one that is fixed. I was composing three different graphs, and using
    get_terminal_tasks()
    to determine how to connect these graphs. Something has changed on the way
    switch/merge
    is managed and this created several terminal_tasks on one of my flows. The solution for me was to set the reference task of each flow and use
    get_reference_task
    instead of
    get_terminal_tasks
    Anyways, needed to share the small win after a frustrating bug-hunting session (because the bug would present itself randomly due to
    set
    not having an order)
    🎉 5
    d
    z
    +1
    10 replies · 4 participants
  • c

    Christopher Harris

    04/21/2020, 10:15 PM
    Hi guys! Beginner to prefect here - very much excited to get up and running on it! I am currently trying to translate our current, very basic/limited pipeline over to prefect - after which we will iterate on it and take advantage of prefect's offerings. Here is our current pipeline - with several extraneous stuff removed:
    class BasePipeline:
        """
        A series of services that mutate a document object.
        The flow consists of a single 'source',
        followed by x number of 'processors', followed by a 'sink.'
        The document object is passed along these services in a linear fashion.
        A pipeline configuration object is used to define the type of services
        and the order of the processors.
        """
    
        def __init__(self, blueprints: "PipelineConfiguration"):
            """
            Initializes and configures services from PipelineConfiguration
            blueprints.
            :param blueprints: a pipeline configuration object
            """
            self.project = blueprints.project
            self.source: Source = start_source(blueprints.source)
            self.sinks: List[Sink] = [start_sink(sink_blueprint) for sink_blueprint in blueprints.sinks]
            self.processors: List[Processor] = [start_processor(processor_blueprint) for processor_blueprint in blueprints.processors]
    
        def __call__(self, metrics: bool = False, failfast: bool = False) -> None:
            """
            Passes a document through the pipeline.
            """
    
            for doc in self.source.pull():
                for processor in self.processors:
                    processor(doc)
                for sink in self.sinks:
                    sink.push(doc)
    And here is my translation attempt:
    @task
    def init_source(project: str, source_config: Blueprint):
        return start_source(project, source_config)
    
    
    @task
    def init_sink(project: str, sink_config: Blueprint):
        return start_sink(project, sink_config)
    
    
    @task
    def init_processor(project: str, processor_config: Blueprint):
        return start_processor(project, processor_config)
    
    
    @task
    def run_source(source: Source):
        return source.pull()
    
    
    @task
    def run_sink(sink: Sink, data: Document):
        return sink.push(data)
    
    
    @task
    def run_processor(processor: Processor, data: Document):
        return processor(data)
    
    
    def execute(pipeline_config: PipelineConfiguration):
        project = pipeline_config.project
        with Flow("test-flow") as flow:
            # Service Initialization
            source = init_sink(project, pipeline_config.source)
            processors = [init_processor(project, processor_config) for processor_config in pipeline_config.processors]
            sinks = [init_sink(project, sink_config) for sink_config in pipeline_config.sinks]
    
            # Perform ETL
            datastream = run_source(source)
            for doc in datastream:
                for processor in processors:
                    doc = run_processor(processor, doc)
                for sink in sinks:
                    run_sink(sink, doc)
    
        flow.run()
    The issue i'm running into is it is saying datastream is not iterable. While this may be a syntax issue on my end, the output of source.pull() is a generator. Are generators supported in prefect? If not - does anyone have an optimal workaround?
    👋 1
    🎉 1
    z
    3 replies · 2 participants
  • m

    Matias Godoy

    04/22/2020, 9:16 AM
    Hey guys, maybe this has been discussed before, but; wouldn't it be good to have a forum for this? That way it would be easier for someone to find answers. There is a lot of valuable information in this chat!
    💯 1
    👍 4
    z
    2 replies · 2 participants
  • k

    Klemen Strojan

    04/22/2020, 10:10 AM
    Hello all! Is there a way to delete existing Cloud Hook?
    z
    6 replies · 2 participants
  • a

    Adisun Wheelock

    04/22/2020, 4:33 PM
    message has been deleted
    n
    r
    +1
    17 replies · 4 participants
  • l

    Luke Orland

    04/22/2020, 5:25 PM
    I have a flow that is passing pandas DataFrames between tasks. When it runs in Prefect Cloud, I get this logging error that I don't see locally:
    April 22nd 2020 at 12:02:34pm EDT | CloudHandler
    CRITICAL 
    Failed to write log with error: Object of type ndarray is not JSON serializable
    n
    c
    5 replies · 3 participants
  • j

    Jacques

    04/22/2020, 7:59 PM
    Hi all, I'm doing multiple flow.run()'s in different threads - but when I do this output caching breaks. I'm trying to debug, and I think the issue is that the prefect context isn't shared between runs in multiple threads. Docs here: https://docs.prefect.io/api/latest/utilities/context.html#context-2 state that context is thread safe, so I tried creating a context using
    shared_context = prefect.utilities.context.Context()
    and then passing that to run with
    my_flow.run(context=shared_context)
    but this doesn't seem to solve my problem. Would appreciate any pointer in the right direction!
    j
    n
    9 replies · 3 participants
  • c

    Crawford Collins

    04/22/2020, 8:55 PM
    I'm having a little trouble. I am trying to refactor a large Flow into a few smaller pieces. If I initialize this
    MetaModel
    class in the flow it will be successful. If i try to load it in the flow will crash at
    fit_model
    .
    # meta = init_meta_model(problem, tinydb, use_default_models)
        # ^^^ this does not work
        meta = MetaModel(problem, tinydb, use_default_models=True)
        model_path = get_models(meta)
    
        fit_models = fit_model.map(
            model_path=model_path,
            train_data=unmapped(train_data),
            target=unmapped(train_target),
            problem=unmapped(problem),
        )
    ###
    @task
    def init_meta_model(problem, db, use_default_models=True):
        meta = MetaModel(problem, db, use_default_models=True)
        return meta
    @task
    def fit_model(model_path, train_data, target, problem):
        model = joblib.load(model_path)
        model.fit(X=train_data, y=target)
        joblib.dump(model, model_path)
    
    MetaModel.models = ["PATH_TO_MODEL"]
    n
    1 reply · 2 participants
  • j

    Jacques

    04/22/2020, 9:05 PM
    Sorry, got one more question 🙂 - looking at the ETL examples where you are doing something like extracting a list of values, mapping them to a transform (map reduce type operation) and then finally using the reduced transform result load that into e.g. a database. Is there a way to have this fan-out instead, in other words not have the reduce function for the map and end with one task. Not sure if that makes sense, so would something like 1 extract task produces 10 results, and that kicks off 10 parallel transforms, each producing one output that is then passed to 10 parallel load tasks be possible?
    n
    a
    5 replies · 3 participants
  • b

    Ben Fogelson

    04/22/2020, 9:58 PM
    Something I’m not quite sure how to do in
    prefect
    is use control flow to have an optional task inline in a sequence of tasks. Something like:
    with Flow('flow') as flow:
        do_optional_step = Parameter('do_optional_step')
        
        x = Parameter('x')
        y = x + 1
        y = ifelse(do_optional_step, 2*y, y)
    This doesn’t work as intended, but is what I’d like to be able to do. If I were doing this as a pure python function, it would be
    def run_flow_steps(x, do_optional_step):
        y = x + 1
        if do_optional_step:
            y = 2*y
        return y
    👀 1
    n
    5 replies · 2 participants
  • b

    Ben McNeill

    04/23/2020, 2:32 PM
    Hi everyone! I am new here, wandered my way over here after reading up on dask etl jobs that leverage prefect. Anyone running their ETL jobs like that?
    👋 4
    d
    j
    8 replies · 3 participants
  • a

    Adisun Wheelock

    04/23/2020, 3:22 PM
    I have a question abow flow control. If I have a flow that looks like this and run the flow with
    DaskExecutor
    , it will run
    first_condition
    and
    second_condition
    asynchronously correct? And if I use
    LocalExecutor
    , it will run
    first_condition
    and then
    second condition
    synchronously?
    l
    d
    5 replies · 3 participants
  • v

    Vitor Avancini

    04/23/2020, 4:19 PM
    Question about flow versions, Is a flow execution always tied to the flow version when it was registered? If I run a flow and it fails due to some buggy code, fix the code and rerun this failed flow, will it run the version with the bug again or the latest registered version? I have tried this process but after hitting the retry button, it stucked at a pending state. I have the agent running this time.
    👀 1
    d
    c
    +1
    31 replies · 4 participants
  • a

    Alexander Hirner

    04/23/2020, 6:24 PM
    Hello, one question regarding new-type `Result`s: what is planned to happen to
    store_safe_value
    ? At first sight, it seems to be redundant to `Result`'s formatted location.
    👀 1
    l
    6 replies · 2 participants
  • d

    David Ojeda

    04/23/2020, 7:30 PM
    Quick question regarding the new open-sourced ui server 🙌 What ports and services should be facing the external world (if I was deploying this in some out of premises server or cluster)? The
    ui:8080
    is obvious, but I could not make it work without adding
    graphql:4201
    too, and I am not sure if that’s safe…
    👀 1
    d
    6 replies · 2 participants
  • m

    Matthew Perry

    04/23/2020, 8:40 PM
    Hi folks. Setting up prefect server on a local network box, let's call it
    server1
    . When I access the page at
    <http://server1:8080>
    it starts making network calls to the graphql backend at
    <http://localhost:4200>
    . Surely this is configurable but I can't seem to find out where. Here's what I've tried so far: • editing
    service.ui.graphql_url
    in my ~/.prefect/config.toml on server1 • creating a new env and setting
    environments.server1.services_host
    to
    server1
    • exporting
    PREFECT_SERVER__GRAPHQL_URL="<http://server1:4200>"
    I must be missing something obvious here. Any ideas?
    ✔️ 1
    👀 1
    n
    13 replies · 2 participants
  • b

    Brad

    04/23/2020, 11:04 PM
    Hi team - is there any concept of a task timeout in prefect currently ?
    👀 1
    d
    11 replies · 2 participants
  • b

    Brad

    04/23/2020, 11:05 PM
    And just as I side note I have been playing with the graphql API and to say it is exceptional is putting it very lightly.
    😍 5
  • b

    Brad

    04/23/2020, 11:12 PM
    I do have one request actually related to caching - is there plan for first-class support for exceptions ?
    d
    d
    21 replies · 3 participants
  • a

    Adisun Wheelock

    04/24/2020, 5:58 PM
    The
    docker-compose.yml
    (in prefect/cli) that is ran upon
    prefect server start
    brings up a postgres instance. If I already had a postgres instance running, is it possible through the CLI to omit bringing up the postgres instance that is in the
    docker-compose.yml
    ? Similar to
    docker-compose up
    .
    j
    6 replies · 2 participants
Powered by Linen
Title
a

Adisun Wheelock

04/24/2020, 5:58 PM
The
docker-compose.yml
(in prefect/cli) that is ran upon
prefect server start
brings up a postgres instance. If I already had a postgres instance running, is it possible through the CLI to omit bringing up the postgres instance that is in the
docker-compose.yml
? Similar to
docker-compose up
.
I ended up having to take out the
postgres
service in that
docker-compose.yml
but wondering if there was a better way.
or even a flag to point to a custom
docker-compose.yml
j

josh

04/24/2020, 6:03 PM
Is the postgres instance you already have running the one that Prefect Core’s server is going to use? If not you can adjust the port for the postgres container so they don’t interfere with
--postgres-port
a

Adisun Wheelock

04/24/2020, 6:04 PM
yeah that was kind of another question I had. Is it best practice for Prefect to have its own dedicated database instance?
j

josh

04/24/2020, 6:04 PM
As that process currently stands I would say yes (due to migrations against the db)
a

Adisun Wheelock

04/24/2020, 6:05 PM
I see. Okay i'll do that. Thank you!
View count: 1