https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • b

    Benjamin

    07/02/2020, 7:30 PM
    Hello everyone. I'm running a POC with prefect in a very simple flow: read data from s3, standardize numbers and write result to s3. For the moment we're trying local backend server (and not prefect cloud version) and we're getting a strange behavior: everything runs fine when use
    flow.run(executor=executor)
    but not with
    flow.register()
    . We're using using the FargateCluster to create a new cluster for the flow and it's setup properyl with flow.register but no processing is done. I had asked this before a couple of days ago here but now I have a reproducible exemple. I'll provide the details in this thread. Thanks a lot
    👀 1
    c
    23 replies · 2 participants
  • b

    Bob Colner

    07/02/2020, 9:58 PM
    hey hey, I'm a bit stumped trying to implement a data ingestion pattern in prefect. Is it possible to access task parameter values when templating the result 'target' filename? I'm trying to have my results stored in a directory structure (aka 'hive' dir partitioning). Thanks for any insight
    👀 1
    d
    m
    22 replies · 3 participants
  • g

    Greg Desmarais

    07/02/2020, 10:55 PM
    Hi all, newcomer to prefect - happy so far. I'm stuck a bit in a Dask integrated prefect environment. I have a simple flow that looks something like this:
    @task(log_stdout=True)
    def say_hello(name):
        print(f'{datetime.now()}: workflow hello {name}', flush=True)
        worker = get_worker()
        return f'done on {worker.name}, scheduler at {worker.scheduler.address}'
    
    name = Parameter('name')
    with Flow("Simple parallel hello") as flow:
        # Since there is no return value dependency, we end up with possible parallel operations
        for i in range(10):
            say_hello(name)
    If I run the flow from my script, targeting a particular Dask cluster, I can hit the right Dask workers:
    executor = DaskExecutor(address=dask_scheduler_address)
    flow.run(executor=executor)
    My question is about registering this flow and running it, say, from the prefect ui. I can easily register the flow with:
    flow.register()
    But then trying to run it from the ui just hangs. I'm pretty sure it is because the executor isn't registered with the flow. Am I missing something? Thanks in advance...
    c
    l
    7 replies · 3 participants
  • a

    Adrien Boutreau

    07/03/2020, 1:03 PM
    Hello! I have an issue in production, graphql seems to be down : when I call xxx.eu-west-2.compute.amazonaws.com:4200/graphql I have 4200: Connection refused and when I check docker container ls it's not on the list - do you know how I can restart it ? PostGre is also down
    j
    8 replies · 2 participants
  • v

    Vitor Avancini

    07/03/2020, 2:53 PM
    Hello everyone, anyone have an idea on how to write a GQL query for marking as success upstream tasks? What i'm trying to do here is to run a dag from a specific task forward. I had bug in my code, fixed id, registered a new version, and now I have to rerun the dag. The thing is that alot already succesful tasks would ran again It takes almost one hour to run again.
    j
    2 replies · 2 participants
  • b

    Bob Colner

    07/03/2020, 4:13 PM
    question about mapping tasks -is it possible for a task to be 'mapped' over multiple iterators? (think task with 2 parameters and I want to 'map' over both with a list of param values)
    j
    j
    12 replies · 3 participants
  • b

    Bob Colner

    07/03/2020, 4:51 PM
    Hi all, I'm working on some custom prefect serializers for pandas df results. I have something that seems to work but I don't think it is very robust (requires creating a local file).
    class ParquetSerializer(Serializer):
    
        def serialize(self, value: pd.DataFrame) -> bytes:
            # transform a Python object into bytes
            tmp_filename = str(time_ns()) + '.parquet'
            value.to_parquet(
                path=tmp_filename,
                index=False
            )
            with open(tmp_filename, 'rb') as in_file:
                df_bytes = in_file.read()
            Path(tmp_filename).unlink()
            return df_bytes
    
        def deserialize(self, value:bytes) -> pd.DataFrame:
            # recover a Python object from bytes        
            df_bytes_io = BytesIO(value)
            df = pd.read_parquet(df_bytes_io)
            return df
    Does anyone have thoughts about the above approach? (saving as a local file then reading the bytes from the file?)
    j
    b
    14 replies · 3 participants
  • b

    Bob Colner

    07/03/2020, 5:11 PM
    Hi again, I wanted to mention that I've successfully replaced airflow with prefect in production for all our ETL/BI and ML workflows at Instasize.com! Big Thanks for all the help/suport along the way. We ended up building a google cloud centric solution around prefect core. We did look into using prefect cloud (I like the dash/features, cost not an issue) but had friction with the implementation at this time. If anyone on the team is interested I'm happy provide any feedback about our experience. Thanks again!
    👍 1
    🚀 10
    💯 1
    j
    3 replies · 2 participants
  • l

    Luke Orland

    07/03/2020, 7:38 PM
    When I register a flow, I'd like to save arbitrary metadata with the flow version using the Prefect Python API. Can anyone suggest a way to do so? I would like to look at the flow version in Prefect Cloud and perhaps see my custom key/values in the details.
    c
    3 replies · 2 participants
  • n

    nuks

    07/04/2020, 5:26 AM
    Hey guys, I am looking for a solution to abstract computations from supercomputers, mostly for fmri data preprocessing (and, on a lower note, analysis on the underlying output) using already developed binaries within singularity (docker-like) containers on large static dataset. Each fmri session is to be processed parallely. We need to assess the current state of the computing environment (is the inode / lustre file limit close to be exceed, etc) and adapt/restrict the job count to prevent crash + perhaps remotely archive outputs and cleanup space as it is going. This can technically be done beforehand using some heuristic or dynamically. Would you consider prefect (relying on stg like dask_jobqueue.SLURMCluster) to be a good fit to develop such system, or would you have something else to recommend ?
  • n

    nuks

    07/04/2020, 5:37 AM
    (On top of that, we’ll need some versioning system + manual quality control of the data through a web interface.)
  • b

    Brad

    07/04/2020, 11:43 PM
    Hey @josh, one more PR to add
    extra_docker_kwargs
    to
    CreateContainer
    https://github.com/PrefectHQ/prefect/pull/2915
    1 reply · 1 participant
  • n

    nuks

    07/05/2020, 12:05 AM
    I guess we can’t run prefect with singularity instead of docker (cf. https://docs.prefect.io/core/getting_started/installation.html#running-the-local-server-and-ui). Would this be stg easy to change/patch for an external contributor ?
  • k

    Kostas Chalikias

    07/06/2020, 1:22 PM
    Hello everyone, I have a couple of questions regarding deployment. I am considering using Docker or GCS Storage for my flows and trying to understand what makes the most sense. 1. When I use either, do I still run the agent in a virtual environment that includes the imports used by my tasks? Perhaps one is different than the other in that aspect? 2. What is the usual way to register a flow? Do people just use an ad hoc script whenever they have a new flow or perhaps do some kind of diffing & registering what isn't there as part of a release process? When is re-registering a flow ever required? Perhaps when changing the schedule?
    👀 1
    n
    5 replies · 2 participants
  • k

    Kai Weber

    07/06/2020, 1:50 PM
    Hi, can anybody help me with the docker-tasks? I want to design a flow to renew an docker image: 1. pull the latest MQTT-image (docker pull eclipse-mosquitto), 2. remove the running MQTT-container (docker rm MQTT-NAME) 3. start the new MQTT-container (docker run -tid -p 1883:1883 -p 9001:9001 --name=msg-broker --restart=always --network %x_Network% eclipse-mosquitto) How do I phrase that in Prefect? I can't even pull the latest image?! Thanks a lot!
    n
    13 replies · 2 participants
  • b

    Ben Fogelson

    07/06/2020, 4:59 PM
    I’m running into an error using
    flow.replace
    , which seems to have to do with the fact that
    flow.remove(task)
    doesn’t remove
    task
    from `flow.slugs`:
    from prefect import Parameter, Flow, Task
    
    p1 = Parameter('p')
    p2 = Parameter('p')
    t1 = Task()
    t2 = Task()
    
    flow = Flow('flow')
    flow.add_task(p1)
    flow.add_task(t1)
    
    print(flow.slugs)
    # {<Parameter: p>: 'p', <Task: Task>: 'Task-1'}
    
    flow.replace(t1, t2)
    print(flow.slugs)
    # {<Parameter: p>: 'p', <Task: Task>: 'Task-1', <Task: Task>: 'Task-1'}
    
    flow.replace(p1, p2)
    ---------------------------------------------------------------------------
    ValueError                                Traceback (most recent call last)
    <ipython-input-7-5ec138031eb6> in <module>
    ----> 1 flow.replace(p1, p2)
    
    /opt/conda/envs/drugdiscovery/lib/python3.6/site-packages/prefect/core/flow.py in replace(self, old, new, validate)
        291         # update tasks
        292         self.tasks.remove(old)
    --> 293         self.add_task(new)
        294 
        295         self._cache.clear()
    
    /opt/conda/envs/drugdiscovery/lib/python3.6/site-packages/prefect/core/flow.py in add_task(self, task)
        482                 raise ValueError(
        483                     'A task with the slug "{}" already exists in this '
    --> 484                     "flow.".format(task.slug)
        485                 )
        486             self.slugs[task] = task.slug or self._generate_task_slug(task)
    
    ValueError: A task with the slug "p" already exists in this flow.
    c
    m
    4 replies · 3 participants
  • m

    Marwan Sarieddine

    07/06/2020, 5:17 PM
    Hi folks, what is the simplest way in prefect to specify that a task only execute based on the value of a boolean parameter, otherwise should get skipped ? nevermind - I just spotted signals
    from prefect.engine import signals
    
    @task
    def signal_task(run_task: bool):
       if not run_task:
           raise signals.SKIP()
    k
    3 replies · 2 participants
  • j

    james.lamb

    07/06/2020, 6:23 PM
    Hello from Chicago! I've tried searching this Slack and the Prefect documentation but haven't found a good answer for this.
    How should I handle prefect version differences between the environment where a flow is created and registered (
    flow.register()
    ) and the environment where the agent runs?
    So, concretely, let's say that I have an agent running using the image
    prefecthq/prefect:0.12.1-python3.7
    . I have some flows already being run by that agent, and those flows were created with
    prefect
    0.12.1. Imagine that
    prefect
    0.13.0 has just been released on PyPi, and data scientists on my team are going to
    pip install prefect
    , create flows, and register them with
    flow.register()
    What should I do? 1. Add a label to each agent with the prefect major + minor version (e.g. v0.12, v0.13). Make sure flows are registered with such a version label, so 0.12 flows run on the 0.12 agent and 0.13 flows run on the 0.13 agent.
    prefect
    uses semantic versioning, which means that there can be breaking changes between minor releases in the
    0.x
    series. 2. Do nothing. a flow created with 0.13.x should be expected to work with an agent running 0.12.x 3. Upgrade all those 0.12 flows to 0.13. Prefect Cloud is only ever running a single version of
    prefect
    and if your flows + agent are not in sync with that version, bad things will happen. 4. something else Thanks for your time and consideration!
    k
    5 replies · 2 participants
  • b

    Bob Colner

    07/06/2020, 6:40 PM
    Hi all, I'm struggling to get the 'zip' mapping pattern to work, I'm probably missing something basic here. I'm tying to map a task with 2 parameters. I'm looking at the reply to this github issue (https://github.com/PrefectHQ/prefect/issues/1986) where it is suggested to use the
    itertools.product
    function to map over multiple tasks. However, I can't get this to work in a simple example:
    import itertools
    from prefect import Flow, task
    from prefect.engine.executors import LocalDaskExecutor
    
    
    @task(checkpoint=False)
    def cross_product(x, y) -> list:
        return list(itertools.product(x, y))
    
    
    @task(checkpoint=False)
    def concat(a:str, b:str) -> str:
        return a + b
    
    
    a = ['d', 'o', 'g']
    b = ['c', 'a', 't']
    
    with Flow(name='zip-map-test') as flow:
        
        result = concat.map(cross_product(a, b))
    
    
    executor = LocalDaskExecutor(scheduler='threads')
    
    flow_state = flow.run(executor)
    k
    d
    9 replies · 3 participants
  • m

    Matt Allen

    07/06/2020, 7:19 PM
    Could someone help explain how EnvVarSecrets work with Docker storage? I'm registering my flow against a local prefect server and have a docker agent scheduling it successfully, but I can't seem to get environment variables to be in scope no matter what I do. I've tried setting the env variables in the shell that runs
    prefect server start
    , the one that runs
    prefect agent docker
    , and the one that runs the script that registers the flow, all with no luck
    c
    3 replies · 2 participants
  • a

    alex

    07/06/2020, 11:24 PM
    Hello everyone, I'm looking to try out prefect for one of our pipelines. We have multiple "sources" that consist of 2 "highlevel" types: base and parent sources. Parent sources have a list of children sources. We may choose to add or remove sources based on a config. Each source has a
    run
    method, calling run on a parent source calls the
    run
    method for each of the children. Currently, all the top-level sources are called sequentially and then finally aggregated, does anyone have any ideas on the most "prefectic" way to structure this to maximize parallelization and conciseness? One way is to define a task for each highlevel source, but this seems to be a bit tedious. We also won't be able to parallelize the children. Another approach I tried is something like this.
    @task
    def run_for_child(child):
        child.run(max_items=50)
        return child.collection_name
    
    @task
    def run_source(source):
        cond = source.is_parent
        with case(cond, True):
            res1 = run_for_child.map(source.children)
        with case(cond, False):
            source.run(max_items=50)
            res2 = source.collection_name
        return merge(res1, res2)
    
    @task 
    def get_highlevel_sources(config):
        # return list[Source] based off config
    
    # In the flow, get the sources and do runsource.map. At the end, aggregate all collections
    This gave me a
    could not infer active flow context
    and I'm not sure if this is the best way to structure this anyways
    k
    n
    +1
    13 replies · 4 participants
  • b

    Brett Naul

    07/07/2020, 12:25 AM
    $1,000,000 idea for y'all free of charge: what if the prefect favicon for a flow run page showed the color of your flow run status?? like GitHub and CircleCI do
    🆓 2
    :upvote: 1
    n
    j
    3 replies · 3 participants
  • a

    Avi A

    07/07/2020, 9:05 AM
    Hey community, I’m trying to figure out a way to control the flow with the caches. Suppose I have a flow with a dependency graph like this: A_heavy_mapped_task --> B_heavy_reduce_task --> C_some_other_task Now, suppose I have the results for tasks A & B cached in a
    Result
    and I just want to run task C. In Luigi, C would ask for B’s output, and only fetch that. However, in prefect, the flow runner will fetch ALL the results of the mapped tasks A, but since we already have B’s result, that’s totally redundant and wastes a lot of time and data exchange. Any idea on how to tackle this issue? i.e. fetch only task B results somehow so that C can run?
    👀 1
    z
    m
    25 replies · 3 participants
  • a

    Ankit

    07/07/2020, 12:04 PM
    Hey guys, I am new to the community and was trying to use prefect with docker. My use case being, I want to use the prefect server along with dask and run the same on my own ec2 instance using docker. I wanted to ask how to spin up the docker container for the same and when I have to register new flows to the server, I would want it to be as simple as adding the script to the src folder. I am a bit confused here and would appreciate the help. TIA
    z
    13 replies · 2 participants
  • f

    Francisco

    07/07/2020, 2:40 PM
    Hello everyone! I'm using Prefect and everything works well, but.. I want to secure the prefect UI with username and password, anyone can help me with this? TIA 🙂
    j
    r
    +3
    5 replies · 6 participants
  • a

    alex

    07/07/2020, 4:27 PM
    Has anyone else run into an issue with duplicate tasks being created? I have a list of Task instances that I would like to run in parallel.
    with Flow("My flow", schedule=schedule) as flow:
            for source in all_sources:
                source.bind(
                    ## parameters for my run() function here
                )
            res = [source() for feed in all_sources]
    In the schematic and gantt chart I have a dag with all the
    sources
    leading to a list, which is expected but I also duplicated
    source
    tasks without any edges
    z
    j
    7 replies · 3 participants
  • i

    itay livni

    07/07/2020, 5:40 PM
    Hi - Is there a way to check if a list is empty in the positive?
    with Flow("hello"):
        some_lst = make_lst()
        with case(some_lst.not_(), False):
            # do something
    z
    2 replies · 2 participants
  • z

    Zach

    07/07/2020, 6:16 PM
    I am trying to figure out the best way to have a conditional branch in my flow, probably involving the
    ifelse
    fn in prefect. I have a task, we can call it Task A, that queries my database for a value and returns it. If Task A returns some value, then that value should get passed to Task B, whose output gets passed to Task C, whose output goes to Task D. But if Task A returns
    None
    , then I don't want Task B/C/D to run. Right now I have it set up like this:
    TASK_A = query_database_task()
    RUN_TASKS_B_C_d = dummy_task() # task does nothing
    ifelse()is_null(TASK_A), RUN_TASKS_B_C_d, None)
    
    TASK_B = some_task_b(TASK_A)
    TASK_B.set_upstream(RUN_TASKS_B_C_d)
    
    TASK_C = some_task_b(TASK_B)
    TASK_C.set_upstream(RUN_TASKS_B_C_d)
    
    TASK_D = some_task_b(TASK_C)
    TASK_D.set_upstream(RUN_TASKS_B_C_d)
    There has to be a better way to do this
    z
    j
    3 replies · 3 participants
  • b

    Bob Colner

    07/07/2020, 9:18 PM
    question about results targets cacheing, does prefect read the entire cached file or just check if it exists before deciding to run vs. skip?
    z
    1 reply · 2 participants
  • b

    Brett Naul

    07/07/2020, 9:19 PM
    quick q on mapped task inputs (@Jim Crist-Harif mentioned here that this might that this might get resolved in the mapping refactor but we're still seeing it) is passing
    task.map(sequence, x=unmapped(some_large_result))
    a bad idea? we're seeing
    UserWarning: Large object of size 874.55 MB detected in task graph
    and very very slow creation of mapped tasks; I would have expected that these would be re-using the dask futures already present on the cluster, not rehydrating the outputs inside the
    FlowRunner
    loop, but I guess that's not how it works. is this expected behavior and if so is there any known workaround..?
    j
    7 replies · 2 participants
Powered by Linen
Title
b

Brett Naul

07/07/2020, 9:19 PM
quick q on mapped task inputs (@Jim Crist-Harif mentioned here that this might that this might get resolved in the mapping refactor but we're still seeing it) is passing
task.map(sequence, x=unmapped(some_large_result))
a bad idea? we're seeing
UserWarning: Large object of size 874.55 MB detected in task graph
and very very slow creation of mapped tasks; I would have expected that these would be re-using the dask futures already present on the cluster, not rehydrating the outputs inside the
FlowRunner
loop, but I guess that's not how it works. is this expected behavior and if so is there any known workaround..?
j

Jim Crist-Harif

07/07/2020, 9:20 PM
Yeah, as written this will be inefficient if
some_large_result
is not the result of a task (e.g. if it's a constant). If it is the result of a task, we should be able to make the efficient, but the flow runner might be doing something dumb.
b

Brett Naul

07/07/2020, 9:24 PM
thanks @Jim Crist-Harif! in our case it is another upstream task, not a constant. I'm not exactly sure where the problematic behavior is but it does seem like the flow runner is
.submit
ting the actual result bytes for every task
j

Jim Crist-Harif

07/07/2020, 9:26 PM
I wouldn't be surprised. There's lots of low-hanging fruit in the flow runner pipeline in terms of passing around completed results. Since prefect doesn't do any high-level graph heuristics (it just walks a topological sort), it doesn't distinguish between tasks whose results are still needed and tasks whose results can be dropped. I can create an issue for this specific behavior if you don't want to. I suspect we'll need to reconfigure how we walk the graph and submit tasks to avoid reserializing the results around.
b

Brett Naul

07/07/2020, 9:27 PM
if you don't mind that would be great, I expect your description would be quite a bit more helpful/accurate 🙂
👍 1
j

Jim Crist-Harif

07/07/2020, 9:52 PM
https://github.com/PrefectHQ/prefect/issues/2927
Thinking more, I think this wouldn't be that hard to do. Might hack on this in the next week or so.
Thanks for the issue report! Would be good to get this fixed.
🎉 1
View count: 1