https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Michael Ludwig

    07/31/2020, 7:08 AM
    So both parent tasks in that screenshot raise a SKIP signal and the task which should be mapped complains about that he had no states to be mapped. This is true but he should be skipped as well as the parents are skipped. Any ideas?
    prefect==0.12.2
  • m

    Michael Ludwig

    07/31/2020, 7:21 AM
    This is how we apply map:
    bucket_prefix = create_bucket_prefix_rfy()
    prediction_single_files = list_files(s3_folder=prediction)
    return load_rfy.map(
        input_data_location=prediction_single_files,
        bucket_prefix=unmapped(bucket_prefix),
    )
    n
    d
    m
    • 4
    • 4
  • p

    psimakis

    07/31/2020, 11:28 AM
    Hello community, I just wanted to let you know that preview.prefect.io is not responding. If you are already aware of this issue, then ignore this message 😄 Wishes.
    j
    • 2
    • 3
  • c

    Chris Martin

    07/31/2020, 4:10 PM
    Hi- if you have some common code that should be shared between flows (let's say creation of some common resource, or a custom task type) what's the best way to lay this out? I'm currently putting this in a separate library and making sure it's available in the docker base image, but it feels a bit suboptimal as the only point of this code is to be used by the flows and so ideally it would live alongside them. Is there something better I can do here?
    👀 1
    n
    • 2
    • 2
  • k

    Kyle Pierce

    07/31/2020, 5:09 PM
    Hey I am a little confused on how to incorporate mapping to uploading to S3. I am using the first task to get all of the files. 2nd step to modify each file to a new json in memory. Finally, the 3rd task I need to upload those new files. Should I save the files locally before uploading? or is there a way to upload them in the same step of the mapping?
    n
    • 2
    • 6
  • j

    Jacob (he/him)

    07/31/2020, 5:09 PM
    I have a question about the task decorator
    @task
    def get_file_name(table):
        file_name = f'bpi/{table}/-000'
        return file_name
    
    with Flow("write to s3") as flow:
        exports = ['email_acquisitions', 'new_list_acquisitions', 'revenue', 'segmentation']
        for export in exports:
            file_name = get_file_name(export)
            print(file_name)
    I was trying to write a function that does some string manipulation to get a s3 path (this is simplified ^) and when I add the @task decorator,
    file_name
    = <Task: get_file_name>, when I just want it to return the string produced. Am i doing something in a non-prefect way?
    n
    • 2
    • 6
  • m

    Mitchell Bregman

    07/31/2020, 6:37 PM
    Hi there, has anyone done a performance analysis comparing Prefect to Airflow? Is there some sort of white paper I can use to present to my team?
    j
    c
    • 3
    • 7
  • a

    Ashish Arora

    07/31/2020, 7:15 PM
    When executing a flow locally, does using .map() with your tasks automatically does the parallelization using different workers(multiprocessing) or we need to use DaskExecutor for that?
    n
    • 2
    • 1
  • j

    Justin Mooney

    07/31/2020, 8:10 PM
    I have a use case where I need to use multiple resource managers in a flow which cannot co-exist for reasons beyond my control. Is there a way to have a resource manager's initialization await an upstream task?
    c
    m
    • 3
    • 9
  • a

    Adam Roderick

    07/31/2020, 8:33 PM
    3,000 stars. Keep on rocking 😛refect:
    🤩 11
    💯 5
    🚀 7
  • a

    Adam Roderick

    07/31/2020, 9:18 PM
    Why would this result in an error message in prefect cloud?
    n
    • 2
    • 4
  • j

    John Ramirez

    08/01/2020, 6:26 PM
    Why am I getting this error when I am trying to create flow run using the
    client
    object
    prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Flow 8eb08aff-f439-422f-94ba-31d30af635cb not found', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    c
    • 2
    • 9
  • d

    David Stern

    08/03/2020, 12:52 AM
    Apologies if this has already been answered, but the docs don't make it super clear what terminology I should use so it's difficult to search for an answer. Basically, I want to know if we can have a flow where it waits for one task to complete, basically because some data isn't yet available from a provider, so it checks that for example every 15 mins and continues the flow when it's available? And importantly, is this available in the open-source version? This is the crucial reason we'd use Prefect instead of exclusively using something like
    dbt
    , which I think we'll use anyway.
    c
    • 2
    • 2
  • r

    Rafal

    08/03/2020, 12:45 PM
    Hello, I am looking into Kubernetes Tasks, and even though there is a
    CreateNamespacedDeployment
    task I am not sure, if it could start server based on docker image. Any suggestions?
    j
    • 2
    • 5
  • m

    Matthew Maldonado

    08/03/2020, 1:01 PM
    what does the result ffolder holder in the .prefect folder?
  • m

    Matthew Maldonado

    08/03/2020, 1:01 PM
    sorry result folder in .prefect folder
  • m

    Matthew Maldonado

    08/03/2020, 1:02 PM
    It now has 516 gb of of HD space taken
  • m

    Matthew Maldonado

    08/03/2020, 1:07 PM
    can I delete some of the files here to free up disk space?
    j
    • 2
    • 1
  • t

    Trever Mock

    08/03/2020, 5:48 PM
    Hi! I'm trying to deploy the Prefect core server into an existing kubernetes cluster. Is there a recommended way to do this? I found docs on doing this for agents, but didn't see anything on doing it for the server (perhaps I missed it). My apologies if it's obvious, I'm a bit on the newer side to kubernetes in addition to learning about Prefect. If anyone could point me in the right direction I'd really appreciate it!
    k
    • 2
    • 2
  • s

    Shawn Horton

    08/03/2020, 5:51 PM
    Hello....can you please share the firewall rule necessary to successfully run flow.register() ? We opened things wide in order to overcome the challenge and are looking to only open what is necessary. Thank you.
    k
    • 2
    • 1
  • r

    Rafal

    08/03/2020, 7:17 PM
    Hello! Can I test prefect cloud free trial behind company network?
    k
    • 2
    • 3
  • f

    Fábio Alves

    08/03/2020, 11:10 PM
    Hi Everyone, I'm trying to run multiple dask-workers through Dockerfile and also dask-scheduler is it possible? The reason i'm doing this is to simulate a local run but using AWS EC2 instance resources
    k
    • 2
    • 1
  • h

    Hannah Amundson

    08/03/2020, 11:45 PM
    Hi! I am wanting to map a task, but I also don't want it to map concurrently in this specific instance. I want them to map one after another. Is there a way to do this?
    k
    • 2
    • 4
  • a

    Alfie

    08/04/2020, 4:10 AM
    hi folks, I’d like to keep some internal status across tasks in a flow run, every task will add some info into the status and the last task will process on the whole status. any suggestions to achieve that? thanks.
  • m

    Matthias

    08/04/2020, 7:02 AM
    Hi! Is there a way to kill a flow run after a defined time? E.g. I want to kill a flow if it runs for 30 minutes, as my expected run time is around 10 minutes. This usually means something outside of my control is not working and I would just wait for the next scheduled run to happen. Any ideas? Thanks!
    r
    • 2
    • 2
  • a

    Amaljith

    08/04/2020, 7:42 AM
    Hi!, Hope you all are doing fine in these harsh situations. Is there a way you can implement prefect with the class implementation? persay, with Flow "ok" as f" x =classA y= classB(x) Every time I create a flow in the above manner, I get a task runner object in "x" and classB doesn't obviously understand that. For now, am using this by implementing the same in a method, decorate task and run it. Is there a better implementation of this. any help is appreciated. Thank you. Stay Healthy.
    j
    • 2
    • 1
  • a

    Amit

    08/04/2020, 1:17 PM
    Hi there, I added a parameter to a flow, but it still says on the dashboard:
    @task
    def task_process_data(date_to_process):
        if date_to_process is None:
            # calculate
            date_to_process = "some_value"
    
    
    def flow_process_daily_data(*args, **kwargs):
        with Flow(*args, **kwargs) as flow:
            date_to_process = Parameter('date_to_process', default=None, kind=Parameter.POSITIONAL_OR_KEYWORD)
            task_process_daily_data(date_to_process)
        return flow
    
    flow_process_daily_data(
                name="process-data",
                schedule=Schedule(clocks=[CronClock("0 14 */1 * *")]),
                storage=storage,
    )
    # Then I build the storage and register the flow here ..
    This flow has no parameters ; Click "Run" to launch your flow!
    Also, I don't understand why all the examples of Parameters have
    flow.run()
    called, as I don't call it, since it runs on schedule.
    j
    • 2
    • 4
  • e

    Ethan Shenker

    08/04/2020, 2:17 PM
    Hi everyone! I'm working on an internship project over this summer and have a question about working with Prefect states. Part of the goal of my workflow is to allow the user to manually trigger a task once a previous task has "failed". I plan on achieving this by implementing the
    signals.FAIL()
    state, which would then trigger another task. However, within that downstream task, I would also like to allow the user to interact with the flow to set a flag that would determine the outcome of that downstream flow, and this interaction would likely occur while the downstream task was in a paused state. However, I'm struggling with determining how I'd be able to achieve this interaction, as the paused state pauses all elements of the flow, thus preventing the user interaction I require. Additionally, if there are any other ways that a task can be set to occur both as a result of a failed previous task and manually at the same time, that insight would be greatly appreciated too. Thanks!
    j
    • 2
    • 14
  • f

    Fábio Alves

    08/04/2020, 4:07 PM
    Hi everyone, I'm trying to setup a Fargate Dask client following the example from prefect in here with Dark Cloud Provider https://docs.prefect.io/orchestration/execution/dask_cloud_provider_environment.html#process. The problem is since the scheduler uses by default ports 8786 and 8787 for security reasons these ports are blocked at the AWS account I'm using, and because of that is causing the scheduler to time out. Is there any way to change these ports? Couldn't find anything in the docs just for Azure integration. Thanks!
    j
    • 2
    • 7
  • r

    Riley Hun

    08/04/2020, 5:57 PM
    Hi everyone, I'm new to Prefect and I am loving it so far. We will be using it for our production workflows to ingest external data. I have started to build my first dataflow using Prefect, but I have some questions. I feel like I need to demo it though to get some feedback. Is there a Prefect Support Group I can have a VC with to verify a few things? But the items I'm struggling with are: • Integrating GCP Stackdriver logging • It takes forever to push some data to GCS Bucket • Using GCP credentials
    j
    • 2
    • 15
Powered by Linen
Title
r

Riley Hun

08/04/2020, 5:57 PM
Hi everyone, I'm new to Prefect and I am loving it so far. We will be using it for our production workflows to ingest external data. I have started to build my first dataflow using Prefect, but I have some questions. I feel like I need to demo it though to get some feedback. Is there a Prefect Support Group I can have a VC with to verify a few things? But the items I'm struggling with are: • Integrating GCP Stackdriver logging • It takes forever to push some data to GCS Bucket • Using GCP credentials
j

Jim Crist-Harif

08/04/2020, 6:14 PM
Hi Riley, Slack is the only support we provide for users, if you have a specific question about any of the above we'd be happy to help.
r

Riley Hun

08/04/2020, 6:20 PM
Okay thanks Jim! I'll start first with my inquiry regarding logging to GCP stackdriver logging. I have a task defined like this:
class CloudLogging(Task):

    def run(
        self,
        log_name: str = 'etl_logging',
        name: str = None,
        level=<http://logging.INFO|logging.INFO>
    ):
        """
        :param: log_name (str) = name of logging instance
        :param: name (str) = name of log event
        :param: level (str) = logging level (INFO, DEBUG, ERROR, WARNING):

        :raises:
            - ValueError: if all required arguments haven't been provided

        :return: logging instance for sending logs to StackDriver

        """

        if name is None:
            name = self.__class__.__name__

        lg_client = google.cloud.logging.Client()
        lg_handler = CloudLoggingHandler(lg_client, log_name)
        cloud_logger = logging.getLogger(name)
        cloud_logger.setLevel(level)
        cloud_logger.addHandler(lg_handler)

        return cloud_logger
And then I pass in the logger instance to each of the subsequent tasks in the flow like so:
with Flow("Thinknum ETL") as flow:
    version = Parameter("version", default="20151130")
    bucket_name = Parameter("bucket_name", default="alternative_data")
    dataset_id = Parameter("dataset_id", default="job_listings")
    client_id = EnvVarSecret("client_id")
    client_secret = EnvVarSecret("client_secret")
    gcp_key = EnvVarSecret('GOOGLE_APPLICATION_CREDENTIALS')

    cloud_logging_task = CloudLogging()
    logger = cloud_logging_task(
        log_name='Thinknum_ETL_Logging'
    )
    token_task = ThinkNumGetTokenTask(
        logger=logger
    )
    token = token_task(
        version=version,
        client_id=client_id,
        client_secret=client_secret
    )
Am I doing this correctly because it's not logging anything to stackdriver. I'm running my flow locally for testing but it's using a static Dask Cluster on GKE as the executor.
j

Jim Crist-Harif

08/04/2020, 6:24 PM
Is your goal for all the logs prefect generates to go to stackdriver, or just logging calls you make yourself within your tasks?
r

Riley Hun

08/04/2020, 6:25 PM
Preferably all logs going to stackdriver
But if not possible, then my logs generated should be sufficient
j

Jim Crist-Harif

08/04/2020, 6:31 PM
Gotcha. You shouldn't configure a logger as part of a task, since tasks will only run on one node. Loggers don't serialize with their handlers attached, so your configuration isn't being used everywhere. Instead, you should configure the logging once everywhere. I don't think prefect currently has a nice way to do this as part of our config (though we could add this). For now, you might add a hook that adds your handler if not already configured, then call this as part of your tasks. Something like (pseudocode)
def ensure_logging():
    logger = prefect.utilities.logging.get_logger()
    if your_logger_isnt_in(logger.handlers):
        setup_logging

@task
def your_task():
    ensure_logging()
    ...
This definitely isn't ideal, but should at least get you going. Depending on how you're starting your dask cluster you might make use of dask's plugins to ensure your handlers are configured on worker startup. But I think we should add a config option to add extra handlers to the prefect logger, will create an issue.
r

Riley Hun

08/04/2020, 6:35 PM
Okay - thanks Jim. Let me give it a go using your pseudo code as a template
j

Jim Crist-Harif

08/04/2020, 6:42 PM
https://github.com/PrefectHQ/prefect/issues/3088
r

Riley Hun

08/04/2020, 7:39 PM
Hi Jim, I can confirm that logging to stackdriver seems to be working just fine now. I will now proceed with my second inquiry which is somewhat related. I'm authenticating w/ GCP using my "end user credentials", but I should probably be using a service account key file. Ideally, I'm still a little confused about how authentication works in Prefect? As it stands, I am doing something like this:
os.environ['GOOOGLE_APPLICATION_CREDENTIALS'] = 'key.json'
gcp_key = EnvVarSecret('GOOGLE_APPLICATION_CREDENTIALS')
But I'm not sure it's doing anything. It would be better to have the json service account key to pass into a task, so I can call these functions:
storage.Client.from_service_account_json()
I'm also getting these warnings: `/Users/rihun/anaconda3/envs/thinknum_etl/lib/python3.7/site-packages/google/auth/_default.py:69: UserWarning: Your application has authenticated using end user credentials from Google Cloud SDK without a quota project. You might receive a "quota exceeded" or "API not enabled" error. We recommend you rerun
gcloud auth application-default login
and make sure a quota project is added. Or you can use service accounts instead. For more information about service accounts, see https://cloud.google.com/docs/authentication/`
warnings.warn(_CLOUD_SDK_CREDENTIALS_WARNING)
One thing I'm noticed is that the logs are being duplicated in Stackdriver. I implemented your interim solutions like so:
def ensure_stackdriver_logging():
    logger = get_logger()
    lg_client = google.cloud.logging.Client()
    lg_handler = CloudLoggingHandler(lg_client, 'thinknum_etl_logging')
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    lg_handler.setFormatter(formatter)
    logger.setLevel(<http://logging.INFO|logging.INFO>)
    logger.addHandler(lg_handler)
Sample Logs from Stackdriver:
I 2020-08-04T23:18:13.499848Z 2020-08-04 16:18:13,499 - prefect.TaskRunner - INFO - Task 'ListLoadedThinkNumDates': finished task run for task with final state: 'Success' 
I 2020-08-04T23:18:13.499848Z 2020-08-04 16:18:13,499 - prefect.TaskRunner - INFO - Task 'ListLoadedThinkNumDates': finished task run for task with final state: 'Success' 
I 2020-08-04T23:18:13.499848Z 2020-08-04 16:18:13,499 - prefect.TaskRunner - INFO - Task 'ListLoadedThinkNumDates': finished task run for task with final state: 'Success' 
I 2020-08-04T23:18:13.529295Z 2020-08-04 16:18:13,529 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': Starting task run... 
I 2020-08-04T23:18:13.529295Z 2020-08-04 16:18:13,529 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': Starting task run... 
I 2020-08-04T23:18:13.529295Z 2020-08-04 16:18:13,529 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': Starting task run... 
I 2020-08-04T23:18:13.545171Z 2020-08-04 16:18:13,545 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': finished task run for task with final state: 'Success' 
I 2020-08-04T23:18:13.545171Z 2020-08-04 16:18:13,545 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': finished task run for task with final state: 'Success' 
I 2020-08-04T23:18:13.545171Z 2020-08-04 16:18:13,545 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': finished task run for task with final state: 'Success' 
I 2020-08-04T23:18:13.546556Z 2020-08-04 16:18:13,546 - prefect.FlowRunner - INFO - Flow run SUCCESS: all reference tasks succeeded 
I 2020-08-04T23:18:13.546556Z 2020-08-04 16:18:13,546 - prefect.FlowRunner - INFO - Flow run SUCCESS: all reference tasks succeeded 
I 2020-08-04T23:18:13.546556Z 2020-08-04 16:18:13,546 - prefect.FlowRunner - INFO - Flow run SUCCESS: all reference tasks succeeded 
I 2020-08-04T23:18:13.547597Z 2020-08-04 16:18:13,547 - prefect.Thinknum ETL - INFO - Waiting for next scheduled run at 2020-08-04T23:19:11.705087+00:00
j

Jim Crist-Harif

08/05/2020, 2:17 PM
It looks like you're not checking if the handler already exists, so if the task is rerun the handler will be added multiple times.
Ideally, I'm still a little confused about how authentication works in Prefect?
Prefect generally relies on `Secret`s to authenticate, either pulling from environment variables, local configuration (in the
.prefect/config.toml
file) or cloud (where they're stored in vault). A secret can contain whatever values you want, so you're free to pass in whatever auth tokens you need.
r

Riley Hun

08/05/2020, 5:20 PM
So I'm not really sure how do that handler check, but I did do something like this which in similar in idea, but it didn't seem to work:
def ensure_stackdriver_logging():
    logger = get_logger()

    if hasattr(logger, 'initialized'):
        return logger
    else:
        setattr(logger, 'initialized', True)

    lg_client = google.cloud.logging.Client()
    lg_handler = CloudLoggingHandler(lg_client, 'thinknum_etl_logging')
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    lg_handler.setFormatter(formatter)
    logger.setLevel(<http://logging.INFO|logging.INFO>)
    logger.addHandler(lg_handler)
On the subject of secrets/passwords, is it acceptable to have them stored as environment variables in the docker file of the dask workers? I've already secured my Dask cluster on GCP.
View count: 1