https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • k

    Klemen Strojan

    11/20/2020, 9:18 AM
    What is the best way to tell Kubernetes Agent to utilise only the nodes from a specific Node pool? We are running the agent on AKS in a separate namespace.
    j
    • 2
    • 2
  • a

    Alexander

    11/20/2020, 11:06 AM
    Is it possible to re-run tasks in already running flow? I have quite a big DAG of tasks. If one task failed, its downstream tasks are triggerfailed. But the run still continues because there are many other parallel tasks which are running. So imagine i fixed the issue because of my task failed. How do i re-run it within existing running flow? Or should i wait for flow run to complete and only then restart flow run from failed?
  • a

    Alexander

    11/20/2020, 12:01 PM
    So i was able to narrow it down to a such case: imagine you have a flow with two branches, one branch is a long-running and other is short lived and may fail. So if short branch fails, its dependencies are triggerfailed. If i re-run failed task in a still running flow, its state is set to pending and nothing happens. Flow finishes when long task fininished and this task is still in pending state. But, if before re-running failed task i set downstream triggerfailed task to pending state, re-running failed task executes as expected. Is this a bug?
    n
    • 2
    • 20
  • z

    Zach

    11/20/2020, 7:35 PM
    How do I unpickle a file that I used the
    PickleSerializer()
    to store during a prefect flow?
  • z

    Zach

    11/20/2020, 7:36 PM
    The task decorator on the function looked like this:
    @task(
        checkpoint=True,
        result=GCSResult(
            bucket="my-bucket",
            location=RESULT_LOCATION_FORMAT,
            serializer=PickleSerializer(),
        )
    )
    j
    • 2
    • 2
  • h

    Hamed Sheykhlou

    11/20/2020, 9:10 PM
    Hi. I am wanna test local debugging based on this document: https://docs.prefect.io/core/advanced_tutorials/local-debugging.html#locally-check-your-flow-s-docker-storage I have created a task like this:
    import prefect
    import time
    from prefect import task, Flow
    from prefect.environments.storage import Docker
    
    
    @task()
    def whoami():
        logger = prefect.context.get("logger")
        time.sleep(200)
        <http://logger.info|logger.info>('Hello world')
        return 'hello World'
    
    
    storage = Docker()
    flow = Flow("reddit-flow", storage=storage, tasks=[whoami])
    built_storage = flow.storage.build(push=False)
    print(built_storage.flows)
    and run it
    python code.py
    and the image successfully created. but the flow doesn't exist inside the image. neither inside
    /root/.prefect/
    nor
    /opt/prefect/flows/
    . but when I change the code to register the flow, the created image has the flow. (also i think local debugging documentation is outdated) am I missing something? thank you in advance
    👀 1
    n
    m
    • 3
    • 4
  • d

    Dolor Oculus

    11/20/2020, 9:50 PM
    Is the only way to delete project using non-cloud, self-hosted prefect via graphql ala https://docs.prefect.io/orchestration/concepts/projects.html#graphql-2 ? Was hoping to do via UI but it looks like only cloud version has that feature?
    n
    • 2
    • 3
  • d

    Dolor Oculus

    11/20/2020, 10:00 PM
    Would this be a reasonable way to say "Run this TUESDAY through SATURDAY"?
    not_filters=[
                filters.is_day_of_week(1),
                filters.is_day_of_week(7)
            ],
    n
    • 2
    • 10
  • y

    Yuri Caruso

    11/20/2020, 10:47 PM
    Hello I'm experiencing a problem that I don't know how to solve. I have the following program (agent.py) that starts with the supervisor when starting my Linux machine. It aims to load all the folders and files of my project where the flows are.
    from prefect.agent.local.agent import LocalAgent
    from os.path import dirname, abspath, join
    from os import listdir, path as p,walk
    
    path = abspath(join(dirname(__file__)))
    
    import_paths=[]
    for file in listdir(path):
        if not file.startswith(".") and p.isdir(file) and file in ['db','service','flows','util'] :
            import_paths.append(join(path,file))
    
    for path_import in import_paths:
        paths = [p.join(root, name)
                    for root, dirs, files in walk(path_import)
                    for name in dirs
                    if not name.startswith(".") and not name.startswith("__")]
        import_paths.extend(paths)
    
    LocalAgent(import_paths=import_paths,show_flow_logs=True).start()
    My Supervisor File is as follows:
    [program:start_agent_prefect]
    command=python3 /home/dev/projetos/prefect-tasks/agent.py
    user=dev
    autostart=true
    autorestart=true
    stderr_logfile=/var/log/agent_prefect.err.log
    stdout_logfile=/var/log/agent_prefect.out.log
    However, the prefect agent is unable to execute the Flows as it cannot find the prefect giving the message below:
    [Errno 2] No such file or directory: 'prefect'
    
    [2020-11-20 22: 21: 28,139] INFO - agent | Found 1 flow run (s) to submit for execution.
    [2020-11-20 22: 21: 28,176] INFO - agent | Deploying flow run 3ab508f4-731c-450b-8b30-bdaaaeba6117
    [2020-11-20 22: 21: 28,180] ERROR - agent | Logging platform error for flow run 3ab508f4-731c-450b-8b30-bdaaaeba6117
    [2020-11-20 22: 21: 28,232] ERROR - agent | Error while deploying flow: FileNotFoundError (2, 'No such file or directory')
    Note: If you manually run the agent.py file in the folder with: dev $ python3 agent.py The agent works normally, could someone give me a light where I am failing?
    👀 1
    s
    • 2
    • 2
  • d

    Dolor Oculus

    11/20/2020, 11:59 PM
    Are there any good examples of using signal.SKIP() to short circuit running the whole flow? Trying to wrap our brain around it. Example:
    • 1
    • 8
  • k

    Krzysztof Nawara

    11/21/2020, 11:28 AM
    Hi! I'm interested in measuring percentage of tasks in the pipeline that were read from the cache. The only way to obtain that information that I can think of would be to have a shared counter that would be incremented at the beginning of each task. I believe that global variable won't work when running in distributed mode so I was wondering about an alternative. Is it possible to modify prefect.context from within a task? And then access such accumulator in a task that runs at the very end and prints the result/saves it somewhere?
    m
    • 2
    • 5
  • e

    Eric

    11/21/2020, 6:55 PM
    Hi! does anyone know if there is a way to copy a Flow from one project to another programmatically?
    c
    • 2
    • 9
  • e

    Eric

    11/21/2020, 6:55 PM
    using prefect cloud
  • a

    Avi A

    11/22/2020, 3:52 PM
    Hey team, is there a recording of this talk? We are interested in using the helm chart in our deployment
    👀 1
  • r

    Ramses E.

    11/23/2020, 6:47 AM
    Hi! I'm new here, and new to open source/contributing as well, I'd love to contribute to the project and I've went through the contribution part of the docs, read some good first issues and cloned the repo locally, but I'm still a little lost on how to go about solving any issues. Could someone please help me on getting started?
    m
    • 2
    • 9
  • s

    Scott Moreland

    11/23/2020, 12:58 PM
    Hi all! I'm looking at using Prefect to manage ETL processes that are executed by PySpark on a CDSW/Hadoop environment. I'd like to subclass the Results class---much like what has already been done for Google Cloud and AWS---such that each task can read/write to Hive storage by toggling the Prefect cache variable. I.e. loading a cached task would simply read the spark dataframe created by that task via
    sql_context.read.table
    . Are there any references for this?
    k
    • 2
    • 15
  • j

    Joël Luijmes

    11/23/2020, 1:01 PM
    If I use a Kubernetes agent to run my flows, what would be the recommended way to execute non-python code? Spawn off new Kubernetes jobs in the cluster, or are there other ideas on this?
    s
    k
    • 3
    • 10
  • d

    Dolor Oculus

    11/23/2020, 2:41 PM
    I'm registering a flow on a locally hosted server and would like the extra loggers functionality to be in effect when the scheduled flow runs. It looks like setting the environment variable `
    PREFECT__LOGGING__EXTRA_LOGGERS
    as per https://docs.prefect.io/core/concepts/logging.html#extra-loggers during flow registration isn't sufficient -- I'm not seeing my apps
    logging
    statements in the web ui. Is there a way to get this env variable setting to be in effect for when the app runs?
    e
    • 2
    • 3
  • b

    Ben Davison

    11/23/2020, 4:26 PM
    Hiya, trying out the
    DaskKubernetesEnvironment
    and am running into a weird issue.
    k
    • 2
    • 5
  • t

    tkanas

    11/24/2020, 1:16 AM
    Hi, I was wondering what people do when coming across parts of objects that are not serializable by Cloudpickle when trying to save some result objects to GCS. The result objects are heavily nested so it is difficult to identify which parts of them are not pickleable. Is it considered poor practice to try to send the whole result object to GCS? Or is there some way around it?
    k
    s
    • 3
    • 4
  • s

    Saulius Beinorius

    11/24/2020, 8:22 AM
    Hi, maybe I missed the relevant documentation, but what exactly does a python module need for the CLI to be able to register a flow? Is it a module-level variable called
    flow
    ?
    j
    • 2
    • 3
  • r

    Riley Hun

    11/24/2020, 8:25 AM
    Hi everyone, I'm using the newly announced k8s helm chart, which is awesome by the way! I'm trying to add basic authentication to the apollo server to protect it through an nginx controller on kubernetes. But now the "server cannot be reached". Could anyone please kindly offer up any insight on this? Would be most appreciated! The other alternative I could look into is making the apollo server an internal load balancer which I'll try next.
    j
    • 2
    • 6
  • m

    Michelle Wu

    11/24/2020, 9:07 AM
    Hey guys, I have Prefect Server and Dask Scheduler running on machine A and Dask Worker on machine B (machineB has no Prefect or Docker). When I tried to send a job from machine A to machine B to execute by using the task tag described here:https://docs.prefect.io/api/latest/engine/executors.html#executor, error occurred like this on machineB:
    distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x950\x00\x00\x00\x00\x00\x00\x00\x8c\x1dprefect.engine.executors.dask\x94\x8c\n_maybe_run\x94\x93\x94.'
    Traceback (most recent call last):
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/distributed/worker.py", line 3268, in loads_function
        result = cache_loads[bytes_object]
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/distributed/utils.py", line 1573, in __getitem__
        value = super().__getitem__(key)
      File "/anaconda3_a/envs/py37/lib/python3.7/collections/__init__.py", line 1027, in __getitem__
        raise KeyError(key)
    KeyError: b'\x80\x04\x950\x00\x00\x00\x00\x00\x00\x00\x8c\x1dprefect.engine.executors.dask\x94\x8c\n_maybe_run\x94\x93\x94.'
    ...
    ModuleNotFoundError: No module named 'prefect'
    I thought that I must install prefect on machine B for the task to run remotely. So then I installed prefect and reran the task, a new error occurred like this on machineB:
    [2020-11-24 17:02:05+0800] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ConnectionError(MaxRetryError("HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6099fcf10>: Failed to establish a new connection: [Errno 111] Connection refused'))"))
    Traceback (most recent call last):
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
        (self._dns_host, self.port), self.timeout, **extra_kw
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
        raise err
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
        sock.connect(sa)
    ConnectionRefusedError: [Errno 111] Connection refused
    ...
    2020-11-24 17:02:05+0800] INFO - prefect.CloudTaskRunner | Task 'test_task': Finished task run for task with final state: 'ClientFailed'
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
    I am really confused by “ConnectionError” and “Unable to write logs to Prefect Cloud”. Do you guys have any idea why this might be happening? Must I install Docker (on machine B) as well for the task to run?
    j
    • 2
    • 3
  • s

    Saulius Beinorius

    11/24/2020, 9:38 AM
    Hey, I'm (yet another) person who would like to have a more flexible logging configuration in Prefect. I was a bit surprised that Prefect does not allow specifying logging via
    logging.dictConfig()
    -like format, especially since Dask supports it (https://docs.dask.org/en/latest/debugging.html#logs). If you could point me in the right direction where the logging setup happens (I think I found the agent's logging setup, but not sure about task logging, I assume that's separate for each execution environment?), I would be willing to work on a PR to allow specifying logging via dict format. For reference, I have `uvicorn`'s server, which has similar options to Prefect, but also allows passing the entire logging configuration.
    :upvote: 2
    j
    • 2
    • 2
  • f

    Florian L

    11/24/2020, 10:35 AM
    Hi, i was wondering what is the best practice to cancel a flow run from one of our task ? Currently i'm raising a
    signal.ENDRUN(state.Cancelled())
    which put my task in the right state, but the following tasks are still executed meaning the trigger all_successfull is still true. Any idea what am i doing wrong ?
    j
    • 2
    • 1
  • a

    Andrey Tatarinov

    11/24/2020, 11:17 AM
    Hi, I have a question about a family of built-in tasks
    ReadGsheetRow
    ,
    ReadAirtableRow
    etc. What is their use case? All of these tasks Read- and Write- counterparts receive
    n
    - identifier of the row to be read or written. In our experience there's no such scenario when you need just a single row from table. Even more: there's no way to find out the size of the table. So is anybody actually using these tasks in this specific implementation?
    j
    • 2
    • 2
  • a

    Andrey Tatarinov

    11/24/2020, 1:51 PM
    Question about Run Configuration for Flow. What is a meaning behind specifying LocalRun/DockerRun/.. in order to specify list of required labels for agent? What if I don't care what kind of run it will be as long as it satisfies required labels? How do I specify labels without specifying a type of run?
    j
    • 2
    • 3
  • s

    Scott Moreland

    11/24/2020, 2:42 PM
    Suppose I have a long sequence of tasks within a given flow, where each task reads some persisted data created by the previous task, operates on that data, and persists the resulting output to a database via checkpointing. How can I... 1. Run a single task within the flow and pull its input data from the persisted checkpoint of the previous task, i.e. debug a single task. 2. Have my flow rebuild the persisted output of any given task, i.e. ignore the fact that the checkpoint exists, recompute and overwrite it. Thanks in advance! Love the package and selling my team hard on it!
    j
    • 2
    • 7
  • r

    Riley Hun

    11/24/2020, 5:44 PM
    For the Prefect Server Helm Chart, is there a specific reason as to why it's not recommended to deploy the packaged postgresql database that comes w/ the helm chart into production? What are the risks involved?
    j
    m
    • 3
    • 6
  • j

    Jasono

    11/24/2020, 6:00 PM
    Hi does the Context {…} defined in the Prefect Cloud web UI (I’m referring to the Run screen where Parameters and Context are manually entered) override the same context variables in config.toml? It appears to me the context entered in the web UI is ignored.
    j
    • 2
    • 12
Powered by Linen
Title
j

Jasono

11/24/2020, 6:00 PM
Hi does the Context {…} defined in the Prefect Cloud web UI (I’m referring to the Run screen where Parameters and Context are manually entered) override the same context variables in config.toml? It appears to me the context entered in the web UI is ignored.
j

josh

11/24/2020, 6:52 PM
Hi @Jasono certain values present in context at run time will be overwritten after the user provided context is loaded (i.e. flow id, flow run id, scheduled start time) because these values are used for the runner during execution of the flow. Relevant code where context is loaded: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/cloud/flow_runner.py#L372-L380
j

Jasono

11/24/2020, 6:55 PM
Sorry I wasn’t clear. These are user defined variables that are overriden, not those you mentioned.
j

josh

11/24/2020, 6:57 PM
Not sure if I follow, if I set context variables in the web UI I see them being populated in the flow run. 🤔 What are you attempting?
j

Jasono

11/24/2020, 6:58 PM
I want to pass “reporting period” for instance
don’t want to use Parameter for that.
Do you also define those user defined context variables in your config.toml?
I do, because otherwise the flow script fails when I try to register it
j

josh

11/24/2020, 7:02 PM
How are you accessing the variable from context? Registration does not run the flow. If you’re doing something like this:
with Flow():
   my_task(context.get("reports"))
That will not work because context is being evaluated at initialization of the flow object before registration. Subsequently the context provided by the UI will not be read in there because that code is running prior to the flow run. Instead your context should be accessed inside of a task
@task
def my_task():
   prefect.context("reports")
j

Jasono

11/24/2020, 7:06 PM
I’m trying to do it like your first example, and it is necessary as I dynamically create tasks within the Flow block.
with Flow("bdx_process_9xx") as flow:

    acctDate = str(prefect.context.acctDate)

    prodDate = str(prefect.context.prodDate)

    runDesc9xx = prefect.context.runDesc9xx

    print(f'ProdDate={prodDate} ..{runDesc9xx}')

    

    for queryKey, (queryCmd, dependQry) in cmdPairs(acctDate, prodDate, runDesc9xx).items():                      

        task = BDX_Task(name=queryKey)(queryKey =queryKey,

                        queryCmd=queryCmd

                        )

        flow.add_task(task)

 

        if len(dependQry) > 0 :

            if type(dependQry) is list:

                for q in dependQry:

                    flow.get_tasks(name=queryKey)[0].set_upstream(flow.get_tasks(name=q)[0])

            else:

                flow.get_tasks(name=queryKey)[0].set_upstream(flow.get_tasks(name=dependQry)[0])
j

josh

11/24/2020, 7:15 PM
Yeah in your snippet you are grabbing values from your local context and passing them into the tasks you create therefore they are static and cannot be changed. If you want to access a dynamic context value at runtime instead of build time then you will need to grab it from context in a similar way to the snippet I provided above.
j

Jasono

11/24/2020, 7:16 PM
okay. thank you!
View count: 3