Michael Ludwig
07/31/2020, 7:08 AMprefect==0.12.2
Michael Ludwig
07/31/2020, 7:21 AMbucket_prefix = create_bucket_prefix_rfy()
prediction_single_files = list_files(s3_folder=prediction)
return load_rfy.map(
input_data_location=prediction_single_files,
bucket_prefix=unmapped(bucket_prefix),
)
psimakis
07/31/2020, 11:28 AMChris Martin
07/31/2020, 4:10 PMKyle Pierce
07/31/2020, 5:09 PMJacob (he/him)
07/31/2020, 5:09 PM@task
def get_file_name(table):
file_name = f'bpi/{table}/-000'
return file_name
with Flow("write to s3") as flow:
exports = ['email_acquisitions', 'new_list_acquisitions', 'revenue', 'segmentation']
for export in exports:
file_name = get_file_name(export)
print(file_name)
I was trying to write a function that does some string manipulation to get a s3 path (this is simplified ^) and when I add the @task decorator, file_name
= <Task: get_file_name>, when I just want it to return the string produced. Am i doing something in a non-prefect way?Mitchell Bregman
07/31/2020, 6:37 PMAshish Arora
07/31/2020, 7:15 PMJustin Mooney
07/31/2020, 8:10 PMAdam Roderick
07/31/2020, 8:33 PMAdam Roderick
07/31/2020, 9:18 PMJohn Ramirez
08/01/2020, 6:26 PMclient
object
prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Flow 8eb08aff-f439-422f-94ba-31d30af635cb not found', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
David Stern
08/03/2020, 12:52 AMdbt
, which I think we'll use anyway.Rafal
08/03/2020, 12:45 PMCreateNamespacedDeployment
task I am not sure, if it could start server based on docker image. Any suggestions?Matthew Maldonado
08/03/2020, 1:01 PMMatthew Maldonado
08/03/2020, 1:01 PMMatthew Maldonado
08/03/2020, 1:02 PMMatthew Maldonado
08/03/2020, 1:07 PMTrever Mock
08/03/2020, 5:48 PMShawn Horton
08/03/2020, 5:51 PMRafal
08/03/2020, 7:17 PMFábio Alves
08/03/2020, 11:10 PMHannah Amundson
08/03/2020, 11:45 PMAlfie
08/04/2020, 4:10 AMMatthias
08/04/2020, 7:02 AMAmaljith
08/04/2020, 7:42 AMAmit
08/04/2020, 1:17 PM@task
def task_process_data(date_to_process):
if date_to_process is None:
# calculate
date_to_process = "some_value"
def flow_process_daily_data(*args, **kwargs):
with Flow(*args, **kwargs) as flow:
date_to_process = Parameter('date_to_process', default=None, kind=Parameter.POSITIONAL_OR_KEYWORD)
task_process_daily_data(date_to_process)
return flow
flow_process_daily_data(
name="process-data",
schedule=Schedule(clocks=[CronClock("0 14 */1 * *")]),
storage=storage,
)
# Then I build the storage and register the flow here ..
This flow has no parameters ; Click "Run" to launch your flow!
Also, I don't understand why all the examples of Parameters have flow.run()
called, as I don't call it, since it runs on schedule.Ethan Shenker
08/04/2020, 2:17 PMsignals.FAIL()
state, which would then trigger another task. However, within that downstream task, I would also like to allow the user to interact with the flow to set a flag that would determine the outcome of that downstream flow, and this interaction would likely occur while the downstream task was in a paused state. However, I'm struggling with determining how I'd be able to achieve this interaction, as the paused state pauses all elements of the flow, thus preventing the user interaction I require.
Additionally, if there are any other ways that a task can be set to occur both as a result of a failed previous task and manually at the same time, that insight would be greatly appreciated too. Thanks!Fábio Alves
08/04/2020, 4:07 PMRiley Hun
08/04/2020, 5:57 PMRiley Hun
08/04/2020, 5:57 PMJim Crist-Harif
08/04/2020, 6:14 PMRiley Hun
08/04/2020, 6:20 PMclass CloudLogging(Task):
def run(
self,
log_name: str = 'etl_logging',
name: str = None,
level=<http://logging.INFO|logging.INFO>
):
"""
:param: log_name (str) = name of logging instance
:param: name (str) = name of log event
:param: level (str) = logging level (INFO, DEBUG, ERROR, WARNING):
:raises:
- ValueError: if all required arguments haven't been provided
:return: logging instance for sending logs to StackDriver
"""
if name is None:
name = self.__class__.__name__
lg_client = google.cloud.logging.Client()
lg_handler = CloudLoggingHandler(lg_client, log_name)
cloud_logger = logging.getLogger(name)
cloud_logger.setLevel(level)
cloud_logger.addHandler(lg_handler)
return cloud_logger
And then I pass in the logger instance to each of the subsequent tasks in the flow like so:
with Flow("Thinknum ETL") as flow:
version = Parameter("version", default="20151130")
bucket_name = Parameter("bucket_name", default="alternative_data")
dataset_id = Parameter("dataset_id", default="job_listings")
client_id = EnvVarSecret("client_id")
client_secret = EnvVarSecret("client_secret")
gcp_key = EnvVarSecret('GOOGLE_APPLICATION_CREDENTIALS')
cloud_logging_task = CloudLogging()
logger = cloud_logging_task(
log_name='Thinknum_ETL_Logging'
)
token_task = ThinkNumGetTokenTask(
logger=logger
)
token = token_task(
version=version,
client_id=client_id,
client_secret=client_secret
)
Am I doing this correctly because it's not logging anything to stackdriver. I'm running my flow locally for testing but it's using a static Dask Cluster on GKE as the executor.Jim Crist-Harif
08/04/2020, 6:24 PMRiley Hun
08/04/2020, 6:25 PMJim Crist-Harif
08/04/2020, 6:31 PMdef ensure_logging():
logger = prefect.utilities.logging.get_logger()
if your_logger_isnt_in(logger.handlers):
setup_logging
@task
def your_task():
ensure_logging()
...
Riley Hun
08/04/2020, 6:35 PMJim Crist-Harif
08/04/2020, 6:42 PMRiley Hun
08/04/2020, 7:39 PMos.environ['GOOOGLE_APPLICATION_CREDENTIALS'] = 'key.json'
gcp_key = EnvVarSecret('GOOGLE_APPLICATION_CREDENTIALS')
But I'm not sure it's doing anything. It would be better to have the json service account key to pass into a task, so I can call these functions:
storage.Client.from_service_account_json()
I'm also getting these warnings:
`/Users/rihun/anaconda3/envs/thinknum_etl/lib/python3.7/site-packages/google/auth/_default.py:69: UserWarning: Your application has authenticated using end user credentials from Google Cloud SDK without a quota project. You might receive a "quota exceeded" or "API not enabled" error. We recommend you rerun gcloud auth application-default login
and make sure a quota project is added. Or you can use service accounts instead. For more information about service accounts, see https://cloud.google.com/docs/authentication/`
warnings.warn(_CLOUD_SDK_CREDENTIALS_WARNING)
def ensure_stackdriver_logging():
logger = get_logger()
lg_client = google.cloud.logging.Client()
lg_handler = CloudLoggingHandler(lg_client, 'thinknum_etl_logging')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
lg_handler.setFormatter(formatter)
logger.setLevel(<http://logging.INFO|logging.INFO>)
logger.addHandler(lg_handler)
Sample Logs from Stackdriver:
I 2020-08-04T23:18:13.499848Z 2020-08-04 16:18:13,499 - prefect.TaskRunner - INFO - Task 'ListLoadedThinkNumDates': finished task run for task with final state: 'Success'
I 2020-08-04T23:18:13.499848Z 2020-08-04 16:18:13,499 - prefect.TaskRunner - INFO - Task 'ListLoadedThinkNumDates': finished task run for task with final state: 'Success'
I 2020-08-04T23:18:13.499848Z 2020-08-04 16:18:13,499 - prefect.TaskRunner - INFO - Task 'ListLoadedThinkNumDates': finished task run for task with final state: 'Success'
I 2020-08-04T23:18:13.529295Z 2020-08-04 16:18:13,529 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': Starting task run...
I 2020-08-04T23:18:13.529295Z 2020-08-04 16:18:13,529 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': Starting task run...
I 2020-08-04T23:18:13.529295Z 2020-08-04 16:18:13,529 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': Starting task run...
I 2020-08-04T23:18:13.545171Z 2020-08-04 16:18:13,545 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': finished task run for task with final state: 'Success'
I 2020-08-04T23:18:13.545171Z 2020-08-04 16:18:13,545 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': finished task run for task with final state: 'Success'
I 2020-08-04T23:18:13.545171Z 2020-08-04 16:18:13,545 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': finished task run for task with final state: 'Success'
I 2020-08-04T23:18:13.546556Z 2020-08-04 16:18:13,546 - prefect.FlowRunner - INFO - Flow run SUCCESS: all reference tasks succeeded
I 2020-08-04T23:18:13.546556Z 2020-08-04 16:18:13,546 - prefect.FlowRunner - INFO - Flow run SUCCESS: all reference tasks succeeded
I 2020-08-04T23:18:13.546556Z 2020-08-04 16:18:13,546 - prefect.FlowRunner - INFO - Flow run SUCCESS: all reference tasks succeeded
I 2020-08-04T23:18:13.547597Z 2020-08-04 16:18:13,547 - prefect.Thinknum ETL - INFO - Waiting for next scheduled run at 2020-08-04T23:19:11.705087+00:00
Jim Crist-Harif
08/05/2020, 2:17 PMIdeally, I'm still a little confused about how authentication works in Prefect?Prefect generally relies on `Secret`s to authenticate, either pulling from environment variables, local configuration (in the
.prefect/config.toml
file) or cloud (where they're stored in vault). A secret can contain whatever values you want, so you're free to pass in whatever auth tokens you need.Riley Hun
08/05/2020, 5:20 PMdef ensure_stackdriver_logging():
logger = get_logger()
if hasattr(logger, 'initialized'):
return logger
else:
setattr(logger, 'initialized', True)
lg_client = google.cloud.logging.Client()
lg_handler = CloudLoggingHandler(lg_client, 'thinknum_etl_logging')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
lg_handler.setFormatter(formatter)
logger.setLevel(<http://logging.INFO|logging.INFO>)
logger.addHandler(lg_handler)