https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Sarita Patel

    12/02/2021, 3:46 PM
    I have scheduled a job to run daily at 7:am UTC and it takes current_date, from_date and to_date as parameter but seems like it is not updating and keeps running the same date when i registered the flow?
    k
    17 replies · 2 participants
  • t

    Tom Shaffner

    12/02/2021, 4:46 PM
    Is there a way to see, in the UI, what executor a flow executed with? I changed the executor in one of my flows from Local to Dask but I can't tell if it's actually working.
    k
    3 replies · 2 participants
  • t

    Tom Klein

    12/02/2021, 8:44 PM
    It's me again 🙃 wanted to confirm I understood something about the architecture - i'm trying to figure out the difference between : • a
    Run configuration
    (Local, Universal, Docker, Kubernetes) • an
    Agent
    (e.g. Local, Kubernetes, Docker) • and an
    Executor
    - (Local, LocalDask, and DaskExecutor) If i understand correctly - • our
    Agent
    is the entity (an always-running program, basically) which communicates with the Prefect server (and its scheduler, etc.) and brings jobs to execution phase. If it lives in Kubernetes then by default it executes flows as Kubernetes jobs , but - if we were to pick a "local"
    Run configuration
    - it would run it on the agent itself? • the
    Run Configuration
    determines where the
    Agent
    runs the flow (e.g., if we picked a
    Docker
    run_configuration for our
    Docker
    agent - then it would run as a docker image inside the docker image? or alongside it - as a "sibling" docker instance?) • the
    Executor
    determines how the flow should be executed, so for example if we had a
    LocalDask
    executor running in a
    Docker
    run_config with a
    Docker
    Agent - it would spin up a local Dask cluster inside the docker that's running the job? but if we picked a
    DaskExecutor
    then the flow would actually be executed outside the docker that's running the flow? (assuming our dask cluster runs alongside our Docker Daemon, e.g. on EC2) did i get it right or am i missing something? 😄
    m
    k
    22 replies · 3 participants
  • t

    Tilak Maddy

    12/02/2021, 10:19 PM
    Hey everyone I have a huge problem with prefect schedule , because it's not possible to have the schedule as a parameter as per my understanding of the docs. The current problem I have run into while creating an ETL flow is that I have an extract, transform, load template that has source, dest, transform code as parameters. But I also need to accept a Cron interval string as a parameter and monitor each flow run individually. How do I accomplish that ? EDIT 1 The whole scenario is this, users should be able to come to our website , enter source, dest, transform code, cron interval for their ETL and on the backend we should do the rest. EDIT 2 If it is possible to accomplish the above, how do I monitor each flow run. I want to in the future possibly be able to pause and unpause a flow run, not all the flow runs belonging to a particular flow id like shown here https://docs.prefect.io/orchestration/flow-runs/scheduling.html#toggling-flow-schedules
    j
    k
    19 replies · 3 participants
  • b

    Billy McMonagle

    12/02/2021, 10:35 PM
    I am having trouble setting task dependencies where there is not a direct input from one task to the next.
    k
    12 replies · 2 participants
  • t

    Tom Shaffner

    12/03/2021, 12:28 AM
    Is there a way to map sequential tasks? With a dependency? I.e. I want to create a series of tasks where task 1 returns a value and, depending on a case from that value, task 2 kicks off. And I want to do this for A, B, C, etc... I've got a setup that's almost doing this now, but when I try to call the other task in my first task it tells me the context is missing and suggests adding a run(), which doesn't help. Is there a restructure that would work here?
    k
    24 replies · 2 participants
  • s

    Sandip Viradiya

    12/03/2021, 5:47 AM
    Hello Everyone,
    ECSAgent(
    		cluster="NAME OF THE CLUSTER",
    		task_role_arn="ROLE ARN"
    	)
    Above code is working perfectly for one Prefect account but giving below error for another account. Can anyone please guide me what I am missing?
    ValueError: Failed to infer default networkConfiguration, please explicitly configure using
    --run-task-kwargs`` I did not required any configuration for first account for networkConfiguration. I am using Prefect 0.15.9.
    a
    2 replies · 2 participants
  • v

    Vadym Dytyniak

    12/03/2021, 10:13 AM
    Hello Everyone. We want to run lightweight Prefect flows using LocalExecutor on ECS as a separate task. It successfully creates new task and run the flow, but the problem that I have to install additional dependencies for the flow. Can't find the way how to run pip install before running the flow. Can someone help if I have any chance to implement something like this?
    a
    k
    58 replies · 3 participants
  • t

    Tilak Maddy

    12/03/2021, 10:26 AM
    Hey y'all could you explain what this means ?
    UserWarning: No result handler was specified on your Flow. Cloud features such as input caching and resuming task runs from failure may not work properly. registered_flow = client.register(
    That's the whole warning message. and here is the code. It worked properly though when i triggered the run from prefect cloud, there was no problem in execution. But I want to be able to resume tasks is they fail. what do I do ?
    import os
    import time
    from prefect.storage import GitHub
    import prefect
    from prefect import task, Flow, Parameter
    from prefect.run_configs import LocalRun
    from prefect.executors import LocalDaskExecutor
    
    
    @task
    def say_hello(name):
        # Add a sleep to simulate some long-running task
        time.sleep(3)
        # Load the greeting to use from an environment variable
        greeting = os.environ.get("GREETING")
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"{greeting}, {name}!")
    
    
    with Flow("hello-flow") as flow:
        people = Parameter("people", default=["Arthur", "Ford", "Marvin"])
        say_hello.map(people)
    
    flow.storage = GitHub(
        repo="XXX/test-repo",
        path="learning_storage.py",
        access_token_secret="XXX"
    )
    
    flow.run_config = LocalRun(env={"GREETING": "Hello from User 2 "}, labels=["dev"])
    flow.executor = LocalDaskExecutor()
    flow.register(project_name="test_user_2")
    Ran this on my local machine. yes I have a copy of the flow in the mentioned github repo too.
    a
    6 replies · 2 participants
  • t

    Tilak Maddy

    12/03/2021, 11:42 AM
    Is it okay to create a lot of projects with just 1 or 2 flows registered to each of them (I use prefect cloud) ? Because I think I have setup quite a comfortable workflow and we could have anywhere like 300 prefect projects to be created. What should I be worrying about
    a
    4 replies · 2 participants
  • b

    Bruno Murino

    12/03/2021, 1:04 PM
    Hi everyone — I’ve got a security concern I’m not sure how to solve. When running a flow as an ECS Task with a custom task definition, prefect injects a bunch of environment variables which then appear on the ECS Task execution screen on AWS console. This includes the prefect api keys. Does anyone know how to solve that?
    a
    a
    25 replies · 3 participants
  • j

    Jacob Blanco

    12/03/2021, 2:11 PM
    Hey folks, having some persistent issues with the Secrets Vault. In a mapped task with 20 tasks, for some reason 1 out of the 20 mapped runs might error out complaining that Secret A is not available even though the other 19 found Secret A before and after the failing task.
    a
    k
    +1
    5 replies · 4 participants
  • c

    Casey Green

    12/03/2021, 3:24 PM
    Is there a slack channel for Orion questions or should they be posted here?
    👍 1
    k
    1 reply · 2 participants
  • a

    An Hoang

    12/03/2021, 4:02 PM
    question: Can I change the directory of the process of the flow execution midway through the flow? Maybe by doing a
    ShellTask("cd directory/to/switch/to")
    ? so that all of the results outputted are now ``directory/to/switch/to/result` ? My
    directory/to/switch/to/
    is templated at runtime and is a result of a task. Right now I have to pass this path as parameter to all subsequent tasks. Wondering if there's a more efficient/less error-prone way for this
    k
    3 replies · 2 participants
  • c

    Casey Green

    12/03/2021, 4:27 PM
    I'm having trouble creating Deployments in Orion. I've tried two methods: 1. The method described in the docs. 2. Programmatically using
    OrionClient.create_deployment(...)
    In both cases, it supposedly succeeds, but it's not showing up in the UI. I've also tried copying the simple example in the docs verbatim, but no dice.
    $ prefect version
    2.0a5
    $ prefect deployment create ./my_flow_deployment.py
    Loading deployments from python script at 'my_flow_deployment.py'...
    Created deployment 'my-first-deployment' for flow 'Addition Machine'
    note: I'm able to run flows and see them show up in the UI.
    m
    v
    27 replies · 3 participants
  • b

    brian

    12/03/2021, 7:07 PM
    Hi all, I’m running a flow via prefect cloud using github storage and have come across a strange error
    k
    9 replies · 2 participants
  • e

    Erik Amundson

    12/03/2021, 10:26 PM
    This might not be supported, but we're trying to run a docker agent on windows with a named pipe file mounted as a volume. The registration works fine with docker storage, and we can run the container manually from an anaconda prompt and hit the pipe file from within the container with:
    docker run -v "//.pipe/<named_pipe>://.pipe/<named_pipe>" —rm -it <image>
    It also doesn't throw any error when we start the prefect agent with:
    prefect agent docker start --label <agent_label> --volume "//.pipe/<named_pipe>://.pipe/<named_pipe>"
    but once we try to actually run the workflow it fails with this message:
    Internal Server Error ("invalid volume specification: '\\.\pipe\<named_pipe>:\pipe\<named_pipe>:/pipe/<named_pipe>:/pipe/<named_pipe>:rw'")
    I'm not really sure why it seems to be duplicating the volume map and removing some of the forward slashes. Is this supported at all? Worst case we can probably subclass the docker agent and hardcode the
    run_flow()
    command but we'd like to avoid having extra code on the agent side.
    k
    a
    7 replies · 3 participants
  • s

    saml

    12/03/2021, 10:26 PM
    i’m trying to get prefect server running on an ec2 following this tutorial: https://towardsdatascience.com/deploying-prefect-server-with-aws-ecs-fargate-and-docker-storage-36f633226c5f but I can’t seem to get it working. I’ve got server installed, the ecs agent running but when I try to run a task I get this error:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/home/ec2-user/'")
    k
    10 replies · 2 participants
  • l

    Leon Kozlowski

    12/03/2021, 11:36 PM
    I made a one line change to one of my flows and tried to deploy thru my CI job - I’m seeing this message in the
    register
    step
    Registering <flow-name>... Skipped (metadata unchanged)
    What does this check take into account? Must I use the --force command?
    k
    1 reply · 2 participants
  • a

    alins

    12/04/2021, 8:07 AM
    hi i want to know how can i run a cron flow parallel ? this is my task
    import datetime
    import os
    import pendulum
    import time
    
    from prefect import Flow, task
    from prefect.schedules import CronSchedule
    from prefect.tasks.airtable import WriteAirtableRow
    from prefect.tasks.github import GetRepoInfo
    from prefect.triggers import any_failed
    
    
    @task
    def my_process():
        time.sleep(70)
    
    daily_schedule = CronSchedule("*/1 * * * *")
    
    
    with Flow("sleep more than cron", schedule=daily_schedule) as flow:
        my_process()
    
    flow.run()
    while i run this with
    python3 flow.py
    this run and wait 70 second till end and the task start again (it takes two minutes run every task) but i want it run every minutes and don't care about the time of the process need to finished
    a
    4 replies · 2 participants
  • t

    Tilak Maddy

    12/04/2021, 10:49 AM
    Can VertexRun manage to pull an image from private repo on Docker Hub ? If not, how can I accomplish that ?
    flow.run_config = VertexRun(
        image="example/my-custom-private-image:latest",
        machine_type="e2-highmem-8",
    )
    https://docs.prefect.io/orchestration/flow_config/run_configs.html#vertexrun
    a
    1 reply · 2 participants
  • a

    alins

    12/04/2021, 12:59 PM
    while type
    prefect server start
    it say
    ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?
    but the docker is up and running could someone help me please ? the full error
    amir@server:~$ prefect server start
    ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?
    
    If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
    Exception caught; killing services (press ctrl-C to force)
    ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?
    
    If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
    Traceback (most recent call last):
      File "/home/amir/.local/lib/python3.8/site-packages/prefect/cli/server.py", line 623, in start
        subprocess.check_call(
      File "/usr/lib/python3.8/subprocess.py", line 364, in check_call
        raise CalledProcessError(retcode, cmd)
    subprocess.CalledProcessError: Command '['docker-compose', 'pull']' returned non-zero exit status 1.
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/home/amir/.local/bin/prefect", line 8, in <module>
        sys.exit(cli())
      File "/usr/lib/python3/dist-packages/click/core.py", line 764, in __call__
        return self.main(*args, **kwargs)
      File "/usr/lib/python3/dist-packages/click/core.py", line 717, in main
        rv = self.invoke(ctx)
      File "/usr/lib/python3/dist-packages/click/core.py", line 1137, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/lib/python3/dist-packages/click/core.py", line 1137, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/lib/python3/dist-packages/click/core.py", line 956, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/usr/lib/python3/dist-packages/click/core.py", line 555, in invoke
        return callback(*args, **kwargs)
      File "/home/amir/.local/lib/python3.8/site-packages/prefect/cli/server.py", line 664, in start
        subprocess.check_output(
      File "/usr/lib/python3.8/subprocess.py", line 415, in check_output
        return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
      File "/usr/lib/python3.8/subprocess.py", line 516, in run
        raise CalledProcessError(retcode, process.args,
    subprocess.CalledProcessError: Command '['docker-compose', 'down']' returned non-zero exit status 1.
    👀 1
    a
    k
    3 replies · 3 participants
  • o

    Oliver Mannion

    12/05/2021, 11:56 AM
    Hiya are there any examples of using the Orion REST API directly? I'm thinking I'd like to use it from non-Python environments to record executions eg: stored procs/Snowflake tasks
    a
    1 reply · 2 participants
  • f

    Frank Oplinger

    12/05/2021, 5:15 PM
    Is it possible to access flow parameters from within a state handler? On failure, I’d like to send a notification to a specific user who’s info is passed into the flow as a parameter.
    t
    k
    3 replies · 3 participants
  • t

    Tilak Maddy

    12/05/2021, 5:43 PM
    Is run_config, part of the metadata (that get sent when registered to prefect cloud) or is it something that the agent pulls from storage or is it something that must be defined in the executor's environment ?
    k
    1 reply · 2 participants
  • h

    Hugo Shi

    12/05/2021, 9:26 PM
    I seem to be accumulating "concurrent" flows against the concurrency limit I set even though I don't have that many flows running? This seems to happen when an agent dies. Is there a way to clear this out?
    k
    2 replies · 2 participants
  • t

    Tim Micheletto

    12/06/2021, 5:01 AM
    Hi, I'm trying to utilise the ability to fetch batches of results from a snowflake query as described here in a prefect flow. In notice that the prefect
    SnowflakeQuery
    task only supports
    fetch_all
    so I have used the snowflake connector directly in my flow. I'm getting some strange issues trying map over the results of the query and distribute them across child tasks with a dask executor. If I try and use
    fetch_pandas_batches
    I get an error indicating the result of that function is not pickleable. Fair enough that is a know limitation, but if use
    get_result_batches
    and call
    to_pandas
    in a mapped task I get some weird behavior. The flow fails with the following error:
    Unexpected error: KilledWorker('predict-3815-fc5979b578154850ae381b6b7286993a', <WorkerState '<tcp://10.144.173.27:38363>', name: 4, status: closed, memory: 0, processing: 625>)
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
        final_states = executor.wait(
      File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 440, in wait
        return self.client.gather(futures)
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1977, in gather
        return self.sync(
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 865, in sync
        return sync(
      File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 327, in sync
        raise exc.with_traceback(tb)
      File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 310, in f
        result[0] = yield future
      File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
        value = future.result()
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1842, in _gather
        raise exception.with_traceback(traceback)
    distributed.scheduler.KilledWorker: ('predict-3815-fc5979b578154850ae381b6b7286993a', <WorkerState '<tcp://10.144.173.27:38363>', name: 4, status: closed, memory: 0, processing: 625>)
    and after 10 mins each mapped child task fails with this error
    No heartbeat detected from the remote task; marking the run as failed.
    Here's what it looks like.
    conn_info = {
        "account": "some-company.us-east-1",
        "user": "some-user",
        "role": "some-role",
        "warehouse": "some-warehouse",
        "client_session_keep_alive": True
    }
    
    
    @task()
    def fetch_org_batches(conn_info: Dict) -> List[Any]:
    
        sa_password = get_secret_string(
            "some-key")
        conn_info['password'] = sa_password
    
        sql = "select * from invoices_with_statement_and_org_info"
    
        with snowflake.connector.connect(**conn_info) as conn:
            with conn.cursor() as cur:
                cur.execute(sql)
    
                batches = cur.get_result_batches()
    
                num_result_batches = len(batches)
    
                logger = prefect.context.get("logger")
    
                <http://logger.info|logger.info>(f"{num_result_batches} batches retrieved")
    
                return batches
    
    
    @task()
    def predict(batch: Any) -> int:
        if batch is not None:
            df = batch.to_pandas()
    
            logger = prefect.context.get("logger")
    
            <http://logger.info|logger.info>(df['id'])
    
            return len(df.index)
        return 0
    
    
    @task()
    def total(x: Any) -> None:
        logger = prefect.context.get("logger")
    
        <http://logger.info|logger.info>(f"{x} passed to total")
        <http://logger.info|logger.info>(f"{sum(x)} rows in total")
    
    def main(branch_name: str) -> None:
        
        with Flow(f"bills-invoices-inference-{branch_name}",
                  executor=DaskExecutor(cluster_class=lambda: make_executor(image, project_pip_url),
                                        adapt_kwargs={"minimum": 1, "maximum": 10})) as flow:
    
            batches = fetch_org_batches(conn_info)
    
            mapped = predict.map(batches)
            total(mapped)
    
        flow.storage = S3(bucket="some-bucket")
    
        flow.run_config = KubernetesRun(
            env={
                "PREFECT__LOGGING__LEVEL": "INFO",
                # Install the latest version of this code from the relevant branch on github.
                # Allows for faster iteration without having to rebuild the docker image.
                "EXTRA_PIP_PACKAGES": f"dask_kubernetes==2021.10.0 {project_pip_url}"
            },
            image=image,
            labels=[],  # TODO: variable per branch normal vs sandbox (also need label on agent)
            cpu_limit=2,
            cpu_request=2,
            memory_limit="5Gi"
        )
    
        flow.register(project_name="cups")
        # flow.run()
    
    if __name__ == "__main__":
        branch = sys.argv[1]
    
        main(branch)
    Any ideas what's going on here? Is it possible to support this use case currently?
    k
    2 replies · 2 participants
  • d

    Didier Marin

    12/06/2021, 9:50 AM
    Hi! Is there a simple way to skip a failed task for a specific run? My issue is that I've launched a flow run processing a lot of data (probably too much at once), and it failed for a dumb reason (there is no need to relaunch that particular task). The task is badly designed (not idempotent), such that if I retry it it would do a lot of unnecessary processing again, just so that the flow can move on... Obviously I will fix this for the next runs, but for now I need to find a way to unlock this one. Here are options I'm considering: • Tweaking in GraphQL to change that task's state from failed to success, such that when I retry the flow run it would move on to the next task (Is it possible ?) • Write an ad-hoc flow that would start from where the previous run left (Guaranteed to work, but it means writing code and pushing to prod just for this special case) Thank you in advance! 🙏
    a
    a
    6 replies · 3 participants
  • j

    Joël Luijmes

    12/06/2021, 10:00 AM
    Hey is there any documentation on the prefect cli? Specifically I was wondering how prefect tries to discover flows 🙂 Should I have an export call flow or something?
    > prefect run -m src.flows.internal_syncs.firestore_export.flow
    Retrieving local flow... Error
    Found no flows at src.flows.internal_syncs.firestore_export.flow.
    ✅ 1
    a
    11 replies · 2 participants
  • v

    Vadym Dytyniak

    12/06/2021, 10:41 AM
    Hello. Can someone probably tell me if there is a feature to have setup task for flow to run by default before starting the flow tasks? I see that I can use add_task, but how to make it root task?
    a
    k
    43 replies · 3 participants
Powered by Linen
Title
v

Vadym Dytyniak

12/06/2021, 10:41 AM
Hello. Can someone probably tell me if there is a feature to have setup task for flow to run by default before starting the flow tasks? I see that I can use add_task, but how to make it root task?
a

Anna Geller

12/06/2021, 10:52 AM
@Vadym Dytyniak can you just add it as a normal task that runs first? you could then specify that it should run first via task dependencies
v

Vadym Dytyniak

12/06/2021, 10:52 AM
Sure, but I am wondering if there is a chance to avoid adding the same task in each flow
a

Anna Geller

12/06/2021, 10:52 AM
if you need a setup and cleanup, you could have a look at resource manager: https://docs.prefect.io/core/idioms/resource-manager.html
k

Kevin Kho

12/06/2021, 2:45 PM
If you don’t need observability around it, you can use a State Handler for the Flow and then create some kind of
DefaultFlow
class that contains your defaults.
v

Vadym Dytyniak

12/06/2021, 6:38 PM
@Kevin Kho Thanks. Will have a look. Are the async?
k

Kevin Kho

12/06/2021, 6:39 PM
No it’s not. Async support is native in Orion though (Prefect 2.0)
v

Vadym Dytyniak

12/06/2021, 6:40 PM
So, Can I catch the moment before running flow task and do some setup?
k

Kevin Kho

12/06/2021, 6:42 PM
Yeah you can. You can put whatever Python code in the state transition from Submitted to Running, which is before the tasks run. It’s just not async.
Well, you can try but I’m not confident the async will work
v

Vadym Dytyniak

12/06/2021, 6:42 PM
That is what I need actually
thanks
Do you have probably ubuntu16.04 based prefect images?
k

Kevin Kho

12/06/2021, 8:45 PM
We only have the one on DockerHub. I don’t know the base OS of that.
I guess only Debian.
v

Vadym Dytyniak

12/06/2021, 8:57 PM
Yes. that is what we see. We have libraries that work only in Ubuntu
We are trying to build own image, but we have few issues
we installed dependencies, but still on unpickling receiving no module found
k

Kevin Kho

12/06/2021, 8:58 PM
What is your error? Is the module a normal Python module or custom one? Or the Prefect module?
v

Vadym Dytyniak

12/06/2021, 8:59 PM
custom one
but we are using prefect entrypoint to install EXTRA_PIP_PACKAGES, running image locally I am able to import this library. We are not sure what command prefect is using to unpickle the flow
k

Kevin Kho

12/06/2021, 9:01 PM
Do you have multiple Python installations in the image?
v

Vadym Dytyniak

12/06/2021, 9:04 PM
yes, we set python3 to use what we need (3.9), but we actually not sure what prefect is using to start the flow
k

Kevin Kho

12/06/2021, 9:08 PM
It uses the cli command
prefect execute flow
v

Vadym Dytyniak

12/07/2021, 9:35 AM
@Kevin Kho Have a problem to run. Downloading flow file from s3 storage and what should be the command to run it?
a

Anna Geller

12/07/2021, 9:56 AM
@Vadym Dytyniak not sure I understood the question- you don't need to set any command in your Dockerfile to run your flow, Prefect does it automatically when you schedule or start a flow run. Can you share your storage and run configuration, as well as Dockerfile?
v

Vadym Dytyniak

12/07/2021, 10:04 AM
Storage:
return S3(
    bucket=Secret('PREFECT_BUCKET').get(),
    key=f'{self.name}/{self.serialized_hash()}',
    secrets=['AWS_CREDENTIALS'],
)
Run config:
ECSRun(
    image='custom_image',
    env={
        'PIP_EXTRA_INDEX_URL': os.environ['PIP_EXTRA_INDEX_URL'],
        'EXTRA_PIP_PACKAGES': ' '.join(self.dependencies),
    },
)
Custom image: FROM base image # install tini ENV TINI_VERSION v0.19.0 ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini RUN chmod +x /tini ENTRYPOINT ["/tini", "--"] # install prefect ENV PREFECT_VERSION 0.15.10 RUN python3.9 -m pip install prefect[aws]==${PREFECT_VERSION} # update alternatives RUN update-alternatives --install /usr/bin/python python /usr/bin/python3.9 10 RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.9 10 # prepare entrypoint script COPY prefect_entrypoint.sh /usr/local/bin/entrypoint.sh RUN chmod +x /usr/local/bin/entrypoint.sh CMD ["entrypoint.sh"] entrypoint.sh copied from pure prefect image
@Anna Geller I don't want to run prefect execute in Dockerfile, I am trying to debug locally why this image can't start the flow
because all deps are installed
a

Anna Geller

12/07/2021, 10:11 AM
I would strongly recommend using an official Prefect image from Dockerhub. You shouldn’t have to add this:
# prepare entrypoint script
COPY prefect_entrypoint.sh /usr/local/bin/entrypoint.sh
RUN chmod +x /usr/local/bin/entrypoint.sh
CMD ["entrypoint.sh"]
Some additional notes: I usually see the ENV defined with = sign e.g.
ENV PREFECT_VERSION=0.15.10
And then I think this may need to be in quotation marks (not 100% sure):
pip install "prefect[aws]"
If you need some dependencies from this custom image, you can do a multi-stage build, and in the last part use:
FROM prefecthq/prefect:latest
v

Vadym Dytyniak

12/07/2021, 10:13 AM
@Anna Geller Main problem that we have to use Ubuntu instead of Debian
a

Anna Geller

12/07/2021, 10:17 AM
@Vadym Dytyniak I think this contains a good answer https://stackoverflow.com/a/62414670/9509388
v

Vadym Dytyniak

12/07/2021, 10:24 AM
hmm, currently don't understand how this concept works. Do you see it is possible to finally have prefect installed on ubuntu?
a

Anna Geller

12/07/2021, 11:48 AM
yes, this should be possible for sure
v

Vadym Dytyniak

12/07/2021, 12:16 PM
Do you have idea how to get output of docker run command in ECS?
a

Anna Geller

12/07/2021, 12:18 PM
yes, you could go to CloudWatch logs. You can navigate from your ECS task directly to the logs for this task
v

Vadym Dytyniak

12/07/2021, 12:39 PM
The first log I see there it is "Downloading flow from s3..."
how about docker run command log?
a

Anna Geller

12/07/2021, 12:42 PM
it looks good then. Docker run is simply the output of whatever the container is doing. This should be the same what you see in Prefect Cloud UI.
v

Vadym Dytyniak

12/07/2021, 12:43 PM
But I am installing dependencies as part of docker run and I don't see these logs
a

Anna Geller

12/07/2021, 12:43 PM
not sure whether installing dependencies generates any specific logs.
v

Vadym Dytyniak

12/07/2021, 3:10 PM
We finally found the problem. We used CMD ["entrypoint.sh"] that was replaced with prefect execute flow-run during docker run
a

Anna Geller

12/07/2021, 3:26 PM
thanks for sharing @Vadym Dytyniak. Is everything resolved now?
v

Vadym Dytyniak

12/07/2021, 3:27 PM
yes, @Anna Geller and @Kevin Kho thanks for the support!
🙌 1
View count: 4