https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Riley Hun

    08/08/2020, 12:51 AM
    Hi everyone - How do I have a Task as a class and inherit from another Parent Class? For example: If I have
    # parent class
    class ABC:
      def __init__(
        self,
        user,
        password
      )
        self.user = user
        self.password = password
    
      def query(self)
        pass
    I want to do something like this
    # task
    class Task_A(Task, ABC):
      def run()
        pass
    Currently, I'm just doing this instead
    @task
    def task_a(user, password, date_col, dataset_id):
        user: str = None,
        password: str = None,
        date_col: str = None,
        dataset_id: str = None
    ):
    
        conn = ABC(user=user, password=password)
        query = f"SELECT DISTINCT {date_col} FROM EDW_DNA_DB.WI.{dataset_id}"
        query_result = conn.query(query)
        return query_result[date_col].tolist()
    j
    • 2
    • 3
  • b

    bral

    08/08/2020, 9:21 AM
    I have a same issue as @Riley Hun , but with local agent. The Flow has dependency from my classes, and after successfully registering and running - i got error " not module named". And it solved If i placed directory with dependency in my environment (C:\ProgramData\Anaconda3\...) For example in airflow there is plugin directory for this case. Does prefect has same option ?
    c
    • 2
    • 5
  • q

    Qwame

    08/09/2020, 9:06 AM
    Hello everyone, I have a task that several other tasks depend on in my prefect flow. In airflow I could do something like
    F1 >> [f2, f3, f4, f5, f6]
    What's the best way to set these dependencies in Prefect. I notice that set_downstream doesn't accept a list of tasks. Is there any efficient way to do this in Prefect? Also does the new Prefect UI mean I don't need docker to run it? Thanks
    n
    a
    • 3
    • 10
  • v

    Vikram Iyer

    08/10/2020, 6:05 AM
    If I restart the apollo, ui and agent containers, I lose the flows. Is there any way to persist them?
    e
    • 2
    • 2
  • v

    Vikram Iyer

    08/10/2020, 6:05 AM
    Is there a volume on the host that I can map to docker container so that it picks up the flows automatically after server restart?
  • e

    emmanuel

    08/10/2020, 7:20 AM
    Hello there, coming from an Airflow background, I was wondering if there is already something built-in like an ExternalTaskSensor to declare dependencies between different flows like in Airflow?
    m
    • 2
    • 4
  • l

    Lewis Bails

    08/10/2020, 8:26 AM
    Hello prefectors! I'm trying to get the DaskKubernetesEnvironment to work and I'm running into what I think is a Docker storage problem. I can build the image outside of the Prefect process, so I don't think my issue is with the dockerfile.
    flow.environment = DaskKubernetesEnvironment(min_workers=1s, max_workers=5)
    flow.storage = Docker(dockerfile='./dockerfile', env_vars={'known_hosts': #known_hosts, 'ssh_prv_key': #prv_key})
    flow.register(project_name='Automated training')
    [2020-08-10 08:07:11] INFO - prefect.Docker | Building the flow's Docker storage...
    Traceback (most recent call last):
      File "/home/leb/anaconda3/envs/ai-pipelines/lib/python3.7/site-packages/docker/api/client.py", line 261, in _raise_for_status
        response.raise_for_status()
      File "/home/leb/.local/lib/python3.7/site-packages/requests/models.py", line 941, in raise_for_status
        raise HTTPError(http_error_msg, response=self)
    requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http+docker://localhost/v1.40/build?t=auto-dask-kube-cloud%3A2020-08-10t08-07-11-222981-00-00&q=False&nocache=False&rm=False&forcerm=True&pull=False&dockerfile=.%2Ftmpez2venih%2FDockerfile>
    Can anyone help me out?
    b
    • 2
    • 3
  • r

    Roy

    08/10/2020, 11:00 AM
    Hello! When I
    merge
    two task results together in a single one, the merge is supposed to return the first "real" result. What is considered a "real" result? It seems that only
    None
    is considered a non-real result. I would have expected that
    NoResult
    results would also be skipped, but this is not the case. Is this intended? Example in thread.
    i
    • 2
    • 3
  • a

    Amit

    08/10/2020, 11:24 AM
    Hi Team, How can make the prefect to log the messages from the default python logger to log to cloud, so for example If I get the logger via:
    logger = logging.getLogger(__name__)
    I have this in code everywhere and I don't want replace it with
    from prefect.utilities.logging import get_logger
    everywhere, is there a way to send the default logger messages to cloud?
    j
    m
    • 3
    • 11
  • j

    Jim Klassen

    08/10/2020, 2:23 PM
    Hi everyone.
  • j

    Jim Klassen

    08/10/2020, 2:25 PM
    I am getting a heartbeat error as the last message from my flow, I believe everything worked correctly. Does anyone know why?
    j
    • 2
    • 1
  • m

    Marwan Sarieddine

    08/10/2020, 3:44 PM
    Hi folks - anyone faced this error before ?
    $ kubectl logs pod/prefect-job-d45c5eda-cxn9x -n prefect
    Usage: prefect execute [OPTIONS] COMMAND [ARGS]...
    Try 'prefect execute -h' for help.
    
    Error: No such command 'flow-run'.
    j
    • 2
    • 2
  • b

    bral

    08/10/2020, 3:47 PM
    Good evening ! I faced with problem : on same server started prefect , and local agent. If i try go to http//serverip:8080 from remote machine i do not see any flow, because ui trying to connect to http:localhost:4200/graphql. Which options i should change ?
    r
    k
    • 3
    • 10
  • p

    Philip MacMenamin

    08/10/2020, 4:23 PM
    Is there a standard way to pass a
    list
    as a
    Parameter
    ? (As opposed to a
    str
    - or do you just pass a string and split?)
    c
    • 2
    • 2
  • m

    Marwan Sarieddine

    08/10/2020, 5:27 PM
    I am facing a cloud UI bug on chrome- when the flow is too large -and I visit the schematic tab, the app (cloud.prefect.io) crashes - and opening a new tab to go to cloud.prefect.io doesnt solve things, I have to open up an incognito session to clear browser session storage and resolve things
    n
    • 2
    • 4
  • m

    Marwan Sarieddine

    08/10/2020, 5:49 PM
    Is there a way using the UI logs tab to open the task that caused an error ?
    n
    • 2
    • 16
  • p

    Philip MacMenamin

    08/10/2020, 6:36 PM
    with Flow("STL Flow") as f:
        files = Parameter("files")
    
    f.run(files='fname.txt')
    k
    • 2
    • 15
  • r

    Riley Hun

    08/10/2020, 8:57 PM
    Not sure if this is a silly question - sorry if it is - but if you store your secrets in a docker file, what's difference between using the EnvVarSecret task and using os.environ to pull in the secrets?
    c
    m
    • 3
    • 4
  • r

    Riley Hun

    08/11/2020, 4:49 AM
    Hello again, Was wondering if there's a way to quickly test a flow using docker storage without actually registering it to the UI. Thanks so much in advance. Cheers, Riley
    j
    • 2
    • 1
  • l

    Lewis Bails

    08/11/2020, 6:05 AM
    Hi there, I have a task that is being wrongfully executed multiple times despite only showing up once in the flow. Has anyone had this issue before? Edit: I'm using Prefect 0.13.1, with Dask 2.22.0, LocalEnvironment with DaskExecutor
    m
    • 2
    • 3
  • e

    emre

    08/11/2020, 7:52 AM
    Hi folks, In prefect
    0.13.1
    , I can’t get the
    slack_notifier
    to work. This used to work in
    0.12.1
    . Here is a minimal example of a flow:
    from prefect import Flow, task, context
    from prefect.utilities.notifications import slack_notifier
    
    @task
    def log_run_id():
        context.get("logger").warning(context.get("flow_run_id"))
    
    
    @task(state_handlers=[slack_notifier])
    def get_1():
        return 1
    
    
    with Flow("slack_example") as flow:
        log = log_run_id()
        result = get_1(upstream_tasks=[log])
    
    
    flow.run()
    Apparently
    slack_notifier
    attempts to communicate with a backend server, but I am not going to use one. More info in thread.
    e
    b
    • 3
    • 31
  • b

    bral

    08/11/2020, 9:35 AM
    After upgrading to 0.13.1 version tasks dont scheduled with local agent. I noticed that docker-compose.yml changed and sheduler.py missed from src dir. So agent does not start task
    j
    • 2
    • 2
  • e

    emmanuel

    08/11/2020, 11:31 AM
    is there a way to run the OSS server and the UI in a secured way, that is not exposing the apollo endpoint publicly? or protecting it?
    b
    • 2
    • 24
  • e

    emmanuel

    08/11/2020, 2:29 PM
    is
    PREFECT_SERVER__GRAPHQL_URL
    the right env variable for the UI docker image? it seems to be the one used in the docker compose file but when ran outside of docker (so not on localhost), it doesn’t seem to be working 😕
    n
    • 2
    • 22
  • r

    Richard Hughes

    08/11/2020, 2:55 PM
    Hi, is there a way to stop all running flows? If I have a bunch of running flows that are hung, how can I stop them quickly?
    j
    j
    • 3
    • 6
  • c

    Chris Martin

    08/11/2020, 2:59 PM
    Hi- I'm trying to set an upstream task on
    task.map
    , but I can't get it to work and wonder if this is possible Here's some example code:
    from prefect import task, Flow, Task
    
    @task
    def task1():
        pass
    
    @task
    def task2():
        return ["a", "b", "c"]
    
    @task
    def task3(x):
        pass
    
    
    with Flow("test") as flow:
        t1 = task1()
        params = task2()
        task3.map(params, upstream_tasks=[t1])
    I can register this flow fine and the dag looks coorrect (see attachment), but when I run,
    task3
    fails with "No upstream states can be mapped over".Am I doing something wrong here?
    j
    • 2
    • 3
  • a

    Akshay Verma

    08/11/2020, 3:12 PM
    Hi, Maybe a repeated/naive question. With the latest release, is it possible to run Prefect for data pipelines completely within a VPN without connecting to an external services? We have to refactor some custom scripts and we are considering between Airflow and Prefect. With Prefect, we are not clear that we can use it independent of the Prefect Cloud.
    j
    • 2
    • 2
  • e

    emmanuel

    08/11/2020, 3:53 PM
    silly question but is
    apollo
    OSS as well?
    j
    t
    c
    • 4
    • 21
  • e

    emmanuel

    08/11/2020, 4:05 PM
    @nicholas so when I see stuff like this https://github.com/PrefectHQ/server/blob/master/services/apollo/src/executors.js#L5 does that mean again that it’s injected at build time?
    d
    • 2
    • 13
  • h

    Hannah Amundson

    08/11/2020, 4:54 PM
    i have a question about logging best practices. so i understand that to get the prefect logger we need to do
    prefect.context.get("logger")
    How do we get that logger inside of each of our classes/functions that are being called within tasks? Is the best practice to just pass it in as a parameter?
    j
    a
    • 3
    • 5
Powered by Linen
Title
h

Hannah Amundson

08/11/2020, 4:54 PM
i have a question about logging best practices. so i understand that to get the prefect logger we need to do
prefect.context.get("logger")
How do we get that logger inside of each of our classes/functions that are being called within tasks? Is the best practice to just pass it in as a parameter?
j

Jim Crist-Harif

08/11/2020, 5:20 PM
If you're calling from inside a class based task (e.g. a subclass of
Task
) you can use the
logger
attribute directly. Otherwise either getting from
prefect.context
or calling
prefect.utilities.logging.get_logger
directly should work the same.
Whether you want to pass around the logger as an argument, or re-retrieve it inside helper functions using one of the above is up to you.
a

alex

08/11/2020, 5:56 PM
You could also use the python logger and log to stdout and enable
log_stdout
for your tasks
h

Hannah Amundson

08/12/2020, 4:11 PM
thank you
to both @Jim Crist-Harif and @alex
View count: 1